1use super::scalars::{
2 Bytes32,
3 HexString,
4 Tai64Timestamp,
5 TransactionId,
6};
7use crate::{
8 fuel_core_graphql_api::{
9 Config as GraphQLConfig,
10 IntoApiResult,
11 api_service::ConsensusModule,
12 database::ReadView,
13 query_costs,
14 require_expensive_subscriptions,
15 },
16 graphql_api,
17 schema::{
18 ReadViewProvider,
19 scalars::{
20 BlockId,
21 Signature,
22 U16,
23 U32,
24 U64,
25 },
26 tx::types::Transaction,
27 },
28};
29use anyhow::anyhow;
30use async_graphql::{
31 Context,
32 Enum,
33 Object,
34 SimpleObject,
35 Subscription,
36 Union,
37 connection::{
38 Connection,
39 EmptyFields,
40 },
41};
42use fuel_core_storage::{
43 Result as StorageResult,
44 iter::IterDirection,
45};
46use fuel_core_types::{
47 blockchain::{
48 block::CompressedBlock,
49 header::BlockHeader,
50 },
51 fuel_tx::TxId,
52 fuel_types::{
53 self,
54 BlockHeight,
55 },
56};
57use futures::{
58 Stream,
59 StreamExt,
60 TryStreamExt,
61};
62use tokio_stream::wrappers::BroadcastStream;
63
64pub struct Block(pub(crate) CompressedBlock);
65
66pub struct Header(pub(crate) BlockHeader);
67
68#[derive(Union)]
69#[non_exhaustive]
70pub enum Consensus {
71 Genesis(Genesis),
72 PoA(PoAConsensus),
73}
74
75type CoreGenesis = fuel_core_types::blockchain::consensus::Genesis;
76type CoreConsensus = fuel_core_types::blockchain::consensus::Consensus;
77
78#[derive(SimpleObject)]
79pub struct Genesis {
80 pub chain_config_hash: Bytes32,
83 pub coins_root: Bytes32,
85 pub contracts_root: Bytes32,
87 pub messages_root: Bytes32,
89 pub transactions_root: Bytes32,
91}
92
93pub struct PoAConsensus {
94 signature: Signature,
95}
96
97#[derive(Clone, Copy, Debug, Enum, Eq, PartialEq)]
98pub enum BlockVersion {
99 V1,
100}
101
102#[Object]
103impl Block {
104 async fn version(&self) -> BlockVersion {
105 match self.0 {
106 CompressedBlock::V1(_) => BlockVersion::V1,
107 }
108 }
109
110 async fn id(&self) -> BlockId {
111 let bytes: fuel_types::Bytes32 = self.0.header().id().into();
112 bytes.into()
113 }
114
115 async fn height(&self) -> U32 {
116 let height: u32 = (*self.0.header().height()).into();
117 height.into()
118 }
119
120 async fn header(&self) -> Header {
121 self.0.header().clone().into()
122 }
123
124 #[graphql(complexity = "query_costs().storage_read + child_complexity")]
125 async fn consensus(&self, ctx: &Context<'_>) -> async_graphql::Result<Consensus> {
126 let query = ctx.read_view()?;
127 let height = self.0.header().height();
128 Ok(query.consensus(height)?.try_into()?)
129 }
130
131 #[graphql(complexity = "query_costs().block_transactions_ids")]
132 async fn transaction_ids(&self) -> Vec<TransactionId> {
133 self.0
134 .transactions()
135 .iter()
136 .map(|tx_id| (*tx_id).into())
137 .collect()
138 }
139
140 #[graphql(complexity = "query_costs().block_transactions + child_complexity")]
142 async fn transactions(
143 &self,
144 ctx: &Context<'_>,
145 ) -> async_graphql::Result<Vec<Transaction>> {
146 let query = ctx.read_view()?;
147 let tx_ids = futures::stream::iter(self.0.transactions().iter().copied());
148
149 let result = tx_ids
150 .chunks(query.batch_size)
151 .filter_map(move |tx_ids: Vec<TxId>| {
152 let async_query = query.as_ref().clone();
153 async move {
154 let txs = async_query.transactions(tx_ids.clone()).await;
155 let txs = txs
156 .into_iter()
157 .zip(tx_ids.into_iter())
158 .map(|(r, tx_id)| r.map(|tx| Transaction::from_tx(tx_id, tx)));
159
160 Some(futures::stream::iter(txs))
161 }
162 })
163 .flatten()
164 .try_collect()
165 .await?;
166
167 Ok(result)
168 }
169}
170
171#[derive(Clone, Copy, Debug, Enum, Eq, PartialEq)]
172pub enum HeaderVersion {
173 V1,
174 V2,
175}
176
177#[Object]
178impl Header {
179 async fn version(&self) -> HeaderVersion {
181 match self.0 {
182 BlockHeader::V1(_) => HeaderVersion::V1,
183 #[cfg(feature = "fault-proving")]
184 BlockHeader::V2(_) => HeaderVersion::V2,
185 }
186 }
187
188 async fn id(&self) -> BlockId {
190 let bytes: fuel_core_types::fuel_types::Bytes32 = self.0.id().into();
191 bytes.into()
192 }
193
194 async fn da_height(&self) -> U64 {
196 self.0.da_height().0.into()
197 }
198
199 async fn consensus_parameters_version(&self) -> U32 {
201 self.0.consensus_parameters_version().into()
202 }
203
204 async fn state_transition_bytecode_version(&self) -> U32 {
206 self.0.state_transition_bytecode_version().into()
207 }
208
209 async fn transactions_count(&self) -> U16 {
211 self.0.transactions_count().into()
212 }
213
214 async fn message_receipt_count(&self) -> U32 {
216 self.0.message_receipt_count().into()
217 }
218
219 async fn transactions_root(&self) -> Bytes32 {
221 self.0.transactions_root().into()
222 }
223
224 async fn message_outbox_root(&self) -> Bytes32 {
226 self.0.message_outbox_root().into()
227 }
228
229 async fn event_inbox_root(&self) -> Bytes32 {
231 self.0.event_inbox_root().into()
232 }
233
234 async fn height(&self) -> U32 {
236 (*self.0.height()).into()
237 }
238
239 async fn prev_root(&self) -> Bytes32 {
241 (*self.0.prev_root()).into()
242 }
243
244 async fn time(&self) -> Tai64Timestamp {
246 Tai64Timestamp(self.0.time())
247 }
248
249 async fn application_hash(&self) -> Bytes32 {
251 (*self.0.application_hash()).into()
252 }
253
254 async fn tx_id_commitment(&self) -> Option<Bytes32> {
256 self.0.tx_id_commitment().map(Into::into)
257 }
258}
259
260#[Object]
261impl PoAConsensus {
262 async fn signature(&self) -> Signature {
264 self.signature
265 }
266}
267
268#[derive(Default)]
269pub struct BlockQuery;
270
271#[Object]
272impl BlockQuery {
273 #[graphql(complexity = "query_costs().block_header + child_complexity")]
274 async fn block(
275 &self,
276 ctx: &Context<'_>,
277 #[graphql(desc = "ID of the block")] id: Option<BlockId>,
278 #[graphql(desc = "Height of the block")] height: Option<U32>,
279 ) -> async_graphql::Result<Option<Block>> {
280 let query = ctx.read_view()?;
281 let height = match (id, height) {
282 (Some(_), Some(_)) => {
283 return Err(async_graphql::Error::new(
284 "Can't provide both an id and a height",
285 ))
286 }
287 (Some(id), None) => query.block_height(&id.0.into()),
288 (None, Some(height)) => {
289 let height: u32 = height.into();
290 Ok(height.into())
291 }
292 (None, None) => {
293 return Err(async_graphql::Error::new("Missing either id or height"))
294 }
295 };
296
297 height
298 .and_then(|height| query.block(&height))
299 .into_api_result()
300 }
301
302 #[graphql(complexity = "{\
303 (query_costs().block_header + child_complexity) \
304 * (first.unwrap_or_default() as usize + last.unwrap_or_default() as usize) \
305 }")]
306 async fn blocks(
307 &self,
308 ctx: &Context<'_>,
309 first: Option<i32>,
310 after: Option<String>,
311 last: Option<i32>,
312 before: Option<String>,
313 ) -> async_graphql::Result<Connection<U32, Block, EmptyFields, EmptyFields>> {
314 let query = ctx.read_view()?;
315 crate::schema::query_pagination(after, before, first, last, |start, direction| {
316 Ok(blocks_query(
317 query.as_ref(),
318 start.map(Into::into),
319 direction,
320 ))
321 })
322 .await
323 }
324}
325
326#[derive(Default)]
327pub struct HeaderQuery;
328
329#[Object]
330impl HeaderQuery {
331 #[graphql(complexity = "query_costs().block_header + child_complexity")]
332 async fn header(
333 &self,
334 ctx: &Context<'_>,
335 #[graphql(desc = "ID of the block")] id: Option<BlockId>,
336 #[graphql(desc = "Height of the block")] height: Option<U32>,
337 ) -> async_graphql::Result<Option<Header>> {
338 Ok(BlockQuery
339 .block(ctx, id, height)
340 .await?
341 .map(|b| b.0.header().clone().into()))
342 }
343
344 #[graphql(complexity = "{\
345 (query_costs().block_header + child_complexity) \
346 * (first.unwrap_or_default() as usize + last.unwrap_or_default() as usize) \
347 }")]
348 async fn headers(
349 &self,
350 ctx: &Context<'_>,
351 first: Option<i32>,
352 after: Option<String>,
353 last: Option<i32>,
354 before: Option<String>,
355 ) -> async_graphql::Result<Connection<U32, Header, EmptyFields, EmptyFields>> {
356 let query = ctx.read_view()?;
357 crate::schema::query_pagination(after, before, first, last, |start, direction| {
358 Ok(blocks_query(
359 query.as_ref(),
360 start.map(Into::into),
361 direction,
362 ))
363 })
364 .await
365 }
366}
367
368fn blocks_query<T>(
369 query: &ReadView,
370 height: Option<BlockHeight>,
371 direction: IterDirection,
372) -> impl Stream<Item = StorageResult<(U32, T)>> + '_
373where
374 T: async_graphql::OutputType,
375 T: From<CompressedBlock>,
376{
377 query.compressed_blocks(height, direction).map(|result| {
378 result.map(|block| ((*block.header().height()).into(), block.into()))
379 })
380}
381
382#[derive(Default)]
383pub struct BlockMutation;
384
385#[Object]
386impl BlockMutation {
387 async fn produce_blocks(
392 &self,
393 ctx: &Context<'_>,
394 start_timestamp: Option<Tai64Timestamp>,
395 blocks_to_produce: U32,
396 ) -> async_graphql::Result<U32> {
397 let config = ctx.data_unchecked::<GraphQLConfig>().clone();
398
399 if !config.debug {
400 return Err(anyhow!("`debug` must be enabled to use this endpoint").into())
401 }
402
403 let consensus_module = ctx.data_unchecked::<ConsensusModule>();
404
405 let start_time = start_timestamp.map(|timestamp| timestamp.0);
406 let blocks_to_produce: u32 = blocks_to_produce.into();
407 consensus_module
408 .manually_produce_blocks(start_time, blocks_to_produce)
409 .await?;
410
411 let on_chain_height = ctx.read_view()?.latest_block_height()?;
412 let shared_state =
413 ctx.data_unchecked::<graphql_api::worker_service::SharedState>();
414 shared_state
415 .block_height_subscription_handler
416 .subscribe()
417 .wait_for_block_height(on_chain_height)
418 .await?;
419 Ok(on_chain_height.into())
420 }
421}
422
423#[derive(Default)]
424pub struct BlockSubscription;
425
426#[Subscription]
427impl BlockSubscription {
428 #[graphql(name = "alpha__new_blocks")]
429 async fn new_blocks<'a>(
430 &self,
431 ctx: &Context<'a>,
432 ) -> async_graphql::Result<
433 impl Stream<Item = async_graphql::Result<HexString>> + 'a + use<'a>,
434 > {
435 require_expensive_subscriptions(ctx)?;
436 let worker_state: &graphql_api::worker_service::SharedState =
437 ctx.data_unchecked();
438
439 let receiver = worker_state.block_subscription.subscribe();
440 let stream = BroadcastStream::new(receiver).map(|result| {
441 result
442 .map(|bytes| {
443 HexString(bytes.as_ref().clone())
446 })
447 .map_err(Into::into)
448 });
449
450 Ok(stream)
451 }
452}
453
454impl From<CompressedBlock> for Block {
455 fn from(block: CompressedBlock) -> Self {
456 Block(block)
457 }
458}
459
460impl From<BlockHeader> for Header {
461 fn from(header: BlockHeader) -> Self {
462 Header(header)
463 }
464}
465
466impl From<CompressedBlock> for Header {
467 fn from(block: CompressedBlock) -> Self {
468 Header(block.into_inner().0)
469 }
470}
471
472impl From<CoreGenesis> for Genesis {
473 fn from(genesis: CoreGenesis) -> Self {
474 Genesis {
475 chain_config_hash: genesis.chain_config_hash.into(),
476 coins_root: genesis.coins_root.into(),
477 contracts_root: genesis.contracts_root.into(),
478 messages_root: genesis.messages_root.into(),
479 transactions_root: genesis.transactions_root.into(),
480 }
481 }
482}
483
484impl TryFrom<CoreConsensus> for Consensus {
485 type Error = String;
486
487 fn try_from(consensus: CoreConsensus) -> Result<Self, Self::Error> {
488 match consensus {
489 CoreConsensus::Genesis(genesis) => Ok(Consensus::Genesis(genesis.into())),
490 CoreConsensus::PoA(poa) => Ok(Consensus::PoA(PoAConsensus {
491 signature: poa.signature.into(),
492 })),
493 _ => Err(format!("Unknown consensus type: {:?}", consensus)),
494 }
495 }
496}