fuel_core/
schema.rs

1use crate::fuel_core_graphql_api::{
2    api_service::ReadDatabase,
3    database::ReadView,
4};
5use anyhow::anyhow;
6use async_graphql::{
7    Context,
8    MergedObject,
9    MergedSubscription,
10    OutputType,
11    Schema,
12    SchemaBuilder,
13    connection::{
14        Connection,
15        CursorType,
16        Edge,
17        EmptyFields,
18        query,
19    },
20    parser::types::OperationType,
21};
22use fuel_core_storage::{
23    Result as StorageResult,
24    iter::IterDirection,
25};
26use futures::{
27    Stream,
28    TryStreamExt,
29};
30use std::borrow::Cow;
31use tokio_stream::StreamExt;
32
33pub mod assets;
34pub mod balance;
35pub mod blob;
36pub mod block;
37pub mod chain;
38pub mod coins;
39pub mod contract;
40pub mod da_compressed;
41pub mod dap;
42pub mod health;
43pub mod message;
44pub mod node_info;
45pub mod upgrades;
46
47pub mod gas_price;
48pub mod scalars;
49pub mod tx;
50
51pub mod relayed_tx;
52pub mod storage;
53
54#[derive(MergedObject, Default)]
55pub struct Query(
56    assets::AssetInfoQuery,
57    dap::DapQuery,
58    balance::BalanceQuery,
59    blob::BlobQuery,
60    block::BlockQuery,
61    chain::ChainQuery,
62    tx::TxQuery,
63    health::HealthQuery,
64    coins::CoinQuery,
65    da_compressed::DaCompressedBlockQuery,
66    contract::ContractQuery,
67    contract::ContractBalanceQuery,
68    node_info::NodeQuery,
69    gas_price::LatestGasPriceQuery,
70    gas_price::EstimateGasPriceQuery,
71    message::MessageQuery,
72    relayed_tx::RelayedTransactionQuery,
73    upgrades::UpgradeQuery,
74    storage::StorageQuery,
75);
76
77#[derive(MergedObject, Default)]
78pub struct Mutation(dap::DapMutation, tx::TxMutation, block::BlockMutation);
79
80#[derive(MergedSubscription, Default)]
81pub struct Subscription(
82    tx::TxStatusSubscription,
83    storage::StorageSubscription,
84    block::BlockSubscription,
85);
86
87pub type CoreSchema = Schema<Query, Mutation, Subscription>;
88pub type CoreSchemaBuilder = SchemaBuilder<Query, Mutation, Subscription>;
89
90pub fn build_schema() -> CoreSchemaBuilder {
91    Schema::build_with_ignore_name_conflicts(
92        Query::default(),
93        Mutation::default(),
94        Subscription::default(),
95        ["TransactionConnection", "MessageConnection"],
96    )
97}
98
99async fn query_pagination<F, Entries, SchemaKey, SchemaValue>(
100    after: Option<String>,
101    before: Option<String>,
102    first: Option<i32>,
103    last: Option<i32>,
104    entries: F,
105) -> async_graphql::Result<Connection<SchemaKey, SchemaValue, EmptyFields, EmptyFields>>
106where
107    SchemaKey: CursorType + Send + Sync,
108    <SchemaKey as CursorType>::Error: core::fmt::Display + Send + Sync + 'static,
109    SchemaValue: OutputType,
110    // TODO: Optimization: Support `count` here including skipping of entities.
111    //  It means also returning `has_previous_page` and `has_next_page` values.
112    // entries(start_key: Option<DBKey>)
113    F: FnOnce(&Option<SchemaKey>, IterDirection) -> StorageResult<Entries>,
114    Entries: Stream<Item = StorageResult<(SchemaKey, SchemaValue)>>,
115    SchemaKey: Eq,
116{
117    match (after.as_ref(), before.as_ref(), first, last) {
118        (_, _, Some(first), Some(last)) => {
119            return Err(anyhow!(
120                "Either first `{first}` or latest `{last}` elements, not both"
121            )
122            .into())
123        }
124        (Some(after), _, _, Some(last)) => {
125            return Err(anyhow!(
126                "After `{after:?}` with last `{last}` elements is not supported"
127            )
128            .into())
129        }
130        (_, Some(before), Some(first), _) => {
131            return Err(anyhow!(
132                "Before `{before:?}` with first `{first}` elements is not supported"
133            )
134            .into())
135        }
136        (_, _, None, None) => {
137            return Err(anyhow!("The queries for the whole range is not supported").into())
138        }
139        (_, _, _, _) => { /* Other combinations are allowed */ }
140    };
141
142    query(
143        after,
144        before,
145        first,
146        last,
147        |after: Option<SchemaKey>, before: Option<SchemaKey>, first, last| async move {
148            let (count, direction) = if let Some(first) = first {
149                (first, IterDirection::Forward)
150            } else if let Some(last) = last {
151                (last, IterDirection::Reverse)
152            } else {
153                return Err(anyhow!("Either `first` or `last` should be provided"))
154            };
155
156            let start;
157            let end;
158
159            if direction == IterDirection::Forward {
160                start = after;
161                end = before;
162            } else {
163                start = before;
164                end = after;
165            }
166
167            let entries = entries(&start, direction)?;
168            let mut has_previous_page = false;
169            let mut has_next_page = false;
170
171            // TODO: Add support of `skip` field for pages with huge list of entities with
172            //  the same `SchemaKey`.
173            let entries = entries.skip_while(|result| {
174                if let Ok((key, _)) = result {
175                    // TODO: `entries` should return information about `has_previous_page` for wild
176                    //  queries
177                    if let Some(start) = start.as_ref() {
178                        // Skip until start + 1
179                        if key == start {
180                            has_previous_page = true;
181                            return true
182                        }
183                    }
184                }
185                false
186            });
187
188            let mut count = count.saturating_add(1) /* for `has_next_page` */;
189            let entries = entries.take(count).take_while(|result| {
190                if let Ok((key, _)) = result {
191                    if let Some(end) = end.as_ref() {
192                        // take until we've reached the end
193                        if key == end {
194                            has_next_page = true;
195                            return false
196                        }
197                    }
198                    count = count.saturating_sub(1);
199                    has_next_page |= count == 0;
200                    count != 0
201                } else {
202                    // We want to stop immediately in the case of error
203                    false
204                }
205            });
206
207            let entries: Vec<_> = entries.try_collect().await?;
208            let entries = entries.into_iter();
209
210            let mut connection = Connection::new(has_previous_page, has_next_page);
211
212            connection.edges.extend(
213                entries
214                    .into_iter()
215                    .map(|(key, value)| Edge::new(key, value)),
216            );
217
218            Ok::<Connection<SchemaKey, SchemaValue>, anyhow::Error>(connection)
219        },
220    )
221    .await
222}
223
224pub trait ReadViewProvider {
225    /// Returns the read view for the current operation.
226    fn read_view(&self) -> StorageResult<Cow<'_, ReadView>>;
227}
228
229impl<'a> ReadViewProvider for Context<'a> {
230    fn read_view(&self) -> StorageResult<Cow<'a, ReadView>> {
231        let operation_type = self.query_env.operation.node.ty;
232
233        // Sometimes, during mutable queries or subscription the resolvers
234        // need access to an updated view of the database.
235        if operation_type != OperationType::Query {
236            let database: &ReadDatabase = self.data_unchecked();
237            database.view().map(Cow::Owned)
238        } else {
239            let read_view: &ReadView = self.data_unchecked();
240            Ok(Cow::Borrowed(read_view))
241        }
242    }
243}