1use crate::fuel_core_graphql_api::{
2 api_service::ReadDatabase,
3 database::ReadView,
4};
5use anyhow::anyhow;
6use async_graphql::{
7 Context,
8 MergedObject,
9 MergedSubscription,
10 OutputType,
11 Schema,
12 SchemaBuilder,
13 connection::{
14 Connection,
15 CursorType,
16 Edge,
17 EmptyFields,
18 query,
19 },
20 parser::types::OperationType,
21};
22use fuel_core_storage::{
23 Result as StorageResult,
24 iter::IterDirection,
25};
26use futures::{
27 Stream,
28 TryStreamExt,
29};
30use std::borrow::Cow;
31use tokio_stream::StreamExt;
32
33pub mod assets;
34pub mod balance;
35pub mod blob;
36pub mod block;
37pub mod chain;
38pub mod coins;
39pub mod contract;
40pub mod da_compressed;
41pub mod dap;
42pub mod health;
43pub mod message;
44pub mod node_info;
45pub mod upgrades;
46
47pub mod gas_price;
48pub mod scalars;
49pub mod tx;
50
51pub mod relayed_tx;
52pub mod storage;
53
54#[derive(MergedObject, Default)]
55pub struct Query(
56 assets::AssetInfoQuery,
57 dap::DapQuery,
58 balance::BalanceQuery,
59 blob::BlobQuery,
60 block::BlockQuery,
61 chain::ChainQuery,
62 tx::TxQuery,
63 health::HealthQuery,
64 coins::CoinQuery,
65 da_compressed::DaCompressedBlockQuery,
66 contract::ContractQuery,
67 contract::ContractBalanceQuery,
68 node_info::NodeQuery,
69 gas_price::LatestGasPriceQuery,
70 gas_price::EstimateGasPriceQuery,
71 message::MessageQuery,
72 relayed_tx::RelayedTransactionQuery,
73 upgrades::UpgradeQuery,
74 storage::StorageQuery,
75);
76
77#[derive(MergedObject, Default)]
78pub struct Mutation(dap::DapMutation, tx::TxMutation, block::BlockMutation);
79
80#[derive(MergedSubscription, Default)]
81pub struct Subscription(
82 tx::TxStatusSubscription,
83 storage::StorageSubscription,
84 block::BlockSubscription,
85);
86
87pub type CoreSchema = Schema<Query, Mutation, Subscription>;
88pub type CoreSchemaBuilder = SchemaBuilder<Query, Mutation, Subscription>;
89
90pub fn build_schema() -> CoreSchemaBuilder {
91 Schema::build_with_ignore_name_conflicts(
92 Query::default(),
93 Mutation::default(),
94 Subscription::default(),
95 ["TransactionConnection", "MessageConnection"],
96 )
97}
98
99async fn query_pagination<F, Entries, SchemaKey, SchemaValue>(
100 after: Option<String>,
101 before: Option<String>,
102 first: Option<i32>,
103 last: Option<i32>,
104 entries: F,
105) -> async_graphql::Result<Connection<SchemaKey, SchemaValue, EmptyFields, EmptyFields>>
106where
107 SchemaKey: CursorType + Send + Sync,
108 <SchemaKey as CursorType>::Error: core::fmt::Display + Send + Sync + 'static,
109 SchemaValue: OutputType,
110 F: FnOnce(&Option<SchemaKey>, IterDirection) -> StorageResult<Entries>,
114 Entries: Stream<Item = StorageResult<(SchemaKey, SchemaValue)>>,
115 SchemaKey: Eq,
116{
117 match (after.as_ref(), before.as_ref(), first, last) {
118 (_, _, Some(first), Some(last)) => {
119 return Err(anyhow!(
120 "Either first `{first}` or latest `{last}` elements, not both"
121 )
122 .into())
123 }
124 (Some(after), _, _, Some(last)) => {
125 return Err(anyhow!(
126 "After `{after:?}` with last `{last}` elements is not supported"
127 )
128 .into())
129 }
130 (_, Some(before), Some(first), _) => {
131 return Err(anyhow!(
132 "Before `{before:?}` with first `{first}` elements is not supported"
133 )
134 .into())
135 }
136 (_, _, None, None) => {
137 return Err(anyhow!("The queries for the whole range is not supported").into())
138 }
139 (_, _, _, _) => { }
140 };
141
142 query(
143 after,
144 before,
145 first,
146 last,
147 |after: Option<SchemaKey>, before: Option<SchemaKey>, first, last| async move {
148 let (count, direction) = if let Some(first) = first {
149 (first, IterDirection::Forward)
150 } else if let Some(last) = last {
151 (last, IterDirection::Reverse)
152 } else {
153 return Err(anyhow!("Either `first` or `last` should be provided"))
154 };
155
156 let start;
157 let end;
158
159 if direction == IterDirection::Forward {
160 start = after;
161 end = before;
162 } else {
163 start = before;
164 end = after;
165 }
166
167 let entries = entries(&start, direction)?;
168 let mut has_previous_page = false;
169 let mut has_next_page = false;
170
171 let entries = entries.skip_while(|result| {
174 if let Ok((key, _)) = result {
175 if let Some(start) = start.as_ref() {
178 if key == start {
180 has_previous_page = true;
181 return true
182 }
183 }
184 }
185 false
186 });
187
188 let mut count = count.saturating_add(1) ;
189 let entries = entries.take(count).take_while(|result| {
190 if let Ok((key, _)) = result {
191 if let Some(end) = end.as_ref() {
192 if key == end {
194 has_next_page = true;
195 return false
196 }
197 }
198 count = count.saturating_sub(1);
199 has_next_page |= count == 0;
200 count != 0
201 } else {
202 false
204 }
205 });
206
207 let entries: Vec<_> = entries.try_collect().await?;
208 let entries = entries.into_iter();
209
210 let mut connection = Connection::new(has_previous_page, has_next_page);
211
212 connection.edges.extend(
213 entries
214 .into_iter()
215 .map(|(key, value)| Edge::new(key, value)),
216 );
217
218 Ok::<Connection<SchemaKey, SchemaValue>, anyhow::Error>(connection)
219 },
220 )
221 .await
222}
223
224pub trait ReadViewProvider {
225 fn read_view(&self) -> StorageResult<Cow<'_, ReadView>>;
227}
228
229impl<'a> ReadViewProvider for Context<'a> {
230 fn read_view(&self) -> StorageResult<Cow<'a, ReadView>> {
231 let operation_type = self.query_env.operation.node.ty;
232
233 if operation_type != OperationType::Query {
236 let database: &ReadDatabase = self.data_unchecked();
237 database.view().map(Cow::Owned)
238 } else {
239 let read_view: &ReadView = self.data_unchecked();
240 Ok(Cow::Borrowed(read_view))
241 }
242 }
243}