fuel_core/schema/
block.rs

1use super::scalars::{
2    Bytes32,
3    HexString,
4    Tai64Timestamp,
5    TransactionId,
6};
7use crate::{
8    fuel_core_graphql_api::{
9        Config as GraphQLConfig,
10        IntoApiResult,
11        api_service::ConsensusModule,
12        database::ReadView,
13        query_costs,
14        require_expensive_subscriptions,
15    },
16    graphql_api,
17    schema::{
18        ReadViewProvider,
19        scalars::{
20            BlockId,
21            Signature,
22            U16,
23            U32,
24            U64,
25        },
26        tx::types::Transaction,
27    },
28};
29use anyhow::anyhow;
30use async_graphql::{
31    Context,
32    Enum,
33    Object,
34    SimpleObject,
35    Subscription,
36    Union,
37    connection::{
38        Connection,
39        EmptyFields,
40    },
41};
42use fuel_core_storage::{
43    Result as StorageResult,
44    iter::IterDirection,
45};
46use fuel_core_types::{
47    blockchain::{
48        block::CompressedBlock,
49        header::BlockHeader,
50    },
51    fuel_tx::TxId,
52    fuel_types::{
53        self,
54        BlockHeight,
55    },
56};
57use futures::{
58    Stream,
59    StreamExt,
60    TryStreamExt,
61};
62use tokio_stream::wrappers::BroadcastStream;
63
64pub struct Block(pub(crate) CompressedBlock);
65
66pub struct Header(pub(crate) BlockHeader);
67
68#[derive(Union)]
69#[non_exhaustive]
70pub enum Consensus {
71    Genesis(Genesis),
72    PoA(PoAConsensus),
73}
74
75type CoreGenesis = fuel_core_types::blockchain::consensus::Genesis;
76type CoreConsensus = fuel_core_types::blockchain::consensus::Consensus;
77
78#[derive(SimpleObject)]
79pub struct Genesis {
80    /// The chain configs define what consensus type to use, what settlement layer to use,
81    /// rules of block validity, etc.
82    pub chain_config_hash: Bytes32,
83    /// The Binary Merkle Tree root of all genesis coins.
84    pub coins_root: Bytes32,
85    /// The Binary Merkle Tree root of state, balances, contracts code hash of each contract.
86    pub contracts_root: Bytes32,
87    /// The Binary Merkle Tree root of all genesis messages.
88    pub messages_root: Bytes32,
89    /// The Binary Merkle Tree root of all processed transaction ids.
90    pub transactions_root: Bytes32,
91}
92
93pub struct PoAConsensus {
94    signature: Signature,
95}
96
97#[derive(Clone, Copy, Debug, Enum, Eq, PartialEq)]
98pub enum BlockVersion {
99    V1,
100}
101
102#[Object]
103impl Block {
104    async fn version(&self) -> BlockVersion {
105        match self.0 {
106            CompressedBlock::V1(_) => BlockVersion::V1,
107        }
108    }
109
110    async fn id(&self) -> BlockId {
111        let bytes: fuel_types::Bytes32 = self.0.header().id().into();
112        bytes.into()
113    }
114
115    async fn height(&self) -> U32 {
116        let height: u32 = (*self.0.header().height()).into();
117        height.into()
118    }
119
120    async fn header(&self) -> Header {
121        self.0.header().clone().into()
122    }
123
124    #[graphql(complexity = "query_costs().storage_read + child_complexity")]
125    async fn consensus(&self, ctx: &Context<'_>) -> async_graphql::Result<Consensus> {
126        let query = ctx.read_view()?;
127        let height = self.0.header().height();
128        Ok(query.consensus(height)?.try_into()?)
129    }
130
131    #[graphql(complexity = "query_costs().block_transactions_ids")]
132    async fn transaction_ids(&self) -> Vec<TransactionId> {
133        self.0
134            .transactions()
135            .iter()
136            .map(|tx_id| (*tx_id).into())
137            .collect()
138    }
139
140    // Assume that in average we have 32 transactions per block.
141    #[graphql(complexity = "query_costs().block_transactions + child_complexity")]
142    async fn transactions(
143        &self,
144        ctx: &Context<'_>,
145    ) -> async_graphql::Result<Vec<Transaction>> {
146        let query = ctx.read_view()?;
147        let tx_ids = futures::stream::iter(self.0.transactions().iter().copied());
148
149        let result = tx_ids
150            .chunks(query.batch_size)
151            .filter_map(move |tx_ids: Vec<TxId>| {
152                let async_query = query.as_ref().clone();
153                async move {
154                    let txs = async_query.transactions(tx_ids.clone()).await;
155                    let txs = txs
156                        .into_iter()
157                        .zip(tx_ids.into_iter())
158                        .map(|(r, tx_id)| r.map(|tx| Transaction::from_tx(tx_id, tx)));
159
160                    Some(futures::stream::iter(txs))
161                }
162            })
163            .flatten()
164            .try_collect()
165            .await?;
166
167        Ok(result)
168    }
169}
170
171#[derive(Clone, Copy, Debug, Enum, Eq, PartialEq)]
172pub enum HeaderVersion {
173    V1,
174    V2,
175}
176
177#[Object]
178impl Header {
179    /// Version of the header
180    async fn version(&self) -> HeaderVersion {
181        match self.0 {
182            BlockHeader::V1(_) => HeaderVersion::V1,
183            #[cfg(feature = "fault-proving")]
184            BlockHeader::V2(_) => HeaderVersion::V2,
185        }
186    }
187
188    /// Hash of the header
189    async fn id(&self) -> BlockId {
190        let bytes: fuel_core_types::fuel_types::Bytes32 = self.0.id().into();
191        bytes.into()
192    }
193
194    /// The layer 1 height of messages and events to include since the last layer 1 block number.
195    async fn da_height(&self) -> U64 {
196        self.0.da_height().0.into()
197    }
198
199    /// The version of the consensus parameters used to create this block.
200    async fn consensus_parameters_version(&self) -> U32 {
201        self.0.consensus_parameters_version().into()
202    }
203
204    /// The version of the state transition bytecode used to create this block.
205    async fn state_transition_bytecode_version(&self) -> U32 {
206        self.0.state_transition_bytecode_version().into()
207    }
208
209    /// Number of transactions in this block.
210    async fn transactions_count(&self) -> U16 {
211        self.0.transactions_count().into()
212    }
213
214    /// Number of message receipts in this block.
215    async fn message_receipt_count(&self) -> U32 {
216        self.0.message_receipt_count().into()
217    }
218
219    /// Merkle root of transactions.
220    async fn transactions_root(&self) -> Bytes32 {
221        self.0.transactions_root().into()
222    }
223
224    /// Merkle root of message receipts in this block.
225    async fn message_outbox_root(&self) -> Bytes32 {
226        self.0.message_outbox_root().into()
227    }
228
229    /// Merkle root of inbox events in this block.
230    async fn event_inbox_root(&self) -> Bytes32 {
231        self.0.event_inbox_root().into()
232    }
233
234    /// Fuel block height.
235    async fn height(&self) -> U32 {
236        (*self.0.height()).into()
237    }
238
239    /// Merkle root of all previous block header hashes.
240    async fn prev_root(&self) -> Bytes32 {
241        (*self.0.prev_root()).into()
242    }
243
244    /// The block producer time.
245    async fn time(&self) -> Tai64Timestamp {
246        Tai64Timestamp(self.0.time())
247    }
248
249    /// Hash of the application header.
250    async fn application_hash(&self) -> Bytes32 {
251        (*self.0.application_hash()).into()
252    }
253
254    /// Transaction ID Commitment
255    async fn tx_id_commitment(&self) -> Option<Bytes32> {
256        self.0.tx_id_commitment().map(Into::into)
257    }
258}
259
260#[Object]
261impl PoAConsensus {
262    /// Gets the signature of the block produced by `PoA` consensus.
263    async fn signature(&self) -> Signature {
264        self.signature
265    }
266}
267
268#[derive(Default)]
269pub struct BlockQuery;
270
271#[Object]
272impl BlockQuery {
273    #[graphql(complexity = "query_costs().block_header + child_complexity")]
274    async fn block(
275        &self,
276        ctx: &Context<'_>,
277        #[graphql(desc = "ID of the block")] id: Option<BlockId>,
278        #[graphql(desc = "Height of the block")] height: Option<U32>,
279    ) -> async_graphql::Result<Option<Block>> {
280        let query = ctx.read_view()?;
281        let height = match (id, height) {
282            (Some(_), Some(_)) => {
283                return Err(async_graphql::Error::new(
284                    "Can't provide both an id and a height",
285                ))
286            }
287            (Some(id), None) => query.block_height(&id.0.into()),
288            (None, Some(height)) => {
289                let height: u32 = height.into();
290                Ok(height.into())
291            }
292            (None, None) => {
293                return Err(async_graphql::Error::new("Missing either id or height"))
294            }
295        };
296
297        height
298            .and_then(|height| query.block(&height))
299            .into_api_result()
300    }
301
302    #[graphql(complexity = "{\
303        (query_costs().block_header + child_complexity) \
304        * (first.unwrap_or_default() as usize + last.unwrap_or_default() as usize) \
305    }")]
306    async fn blocks(
307        &self,
308        ctx: &Context<'_>,
309        first: Option<i32>,
310        after: Option<String>,
311        last: Option<i32>,
312        before: Option<String>,
313    ) -> async_graphql::Result<Connection<U32, Block, EmptyFields, EmptyFields>> {
314        let query = ctx.read_view()?;
315        crate::schema::query_pagination(after, before, first, last, |start, direction| {
316            Ok(blocks_query(
317                query.as_ref(),
318                start.map(Into::into),
319                direction,
320            ))
321        })
322        .await
323    }
324}
325
326#[derive(Default)]
327pub struct HeaderQuery;
328
329#[Object]
330impl HeaderQuery {
331    #[graphql(complexity = "query_costs().block_header + child_complexity")]
332    async fn header(
333        &self,
334        ctx: &Context<'_>,
335        #[graphql(desc = "ID of the block")] id: Option<BlockId>,
336        #[graphql(desc = "Height of the block")] height: Option<U32>,
337    ) -> async_graphql::Result<Option<Header>> {
338        Ok(BlockQuery
339            .block(ctx, id, height)
340            .await?
341            .map(|b| b.0.header().clone().into()))
342    }
343
344    #[graphql(complexity = "{\
345        (query_costs().block_header + child_complexity) \
346        * (first.unwrap_or_default() as usize + last.unwrap_or_default() as usize) \
347    }")]
348    async fn headers(
349        &self,
350        ctx: &Context<'_>,
351        first: Option<i32>,
352        after: Option<String>,
353        last: Option<i32>,
354        before: Option<String>,
355    ) -> async_graphql::Result<Connection<U32, Header, EmptyFields, EmptyFields>> {
356        let query = ctx.read_view()?;
357        crate::schema::query_pagination(after, before, first, last, |start, direction| {
358            Ok(blocks_query(
359                query.as_ref(),
360                start.map(Into::into),
361                direction,
362            ))
363        })
364        .await
365    }
366}
367
368fn blocks_query<T>(
369    query: &ReadView,
370    height: Option<BlockHeight>,
371    direction: IterDirection,
372) -> impl Stream<Item = StorageResult<(U32, T)>> + '_
373where
374    T: async_graphql::OutputType,
375    T: From<CompressedBlock>,
376{
377    query.compressed_blocks(height, direction).map(|result| {
378        result.map(|block| ((*block.header().height()).into(), block.into()))
379    })
380}
381
382#[derive(Default)]
383pub struct BlockMutation;
384
385#[Object]
386impl BlockMutation {
387    /// Sequentially produces `blocks_to_produce` blocks. The first block starts with
388    /// `start_timestamp`. If the block production in the [`crate::service::Config`] is
389    /// `Trigger::Interval { block_time }`, produces blocks with `block_time ` intervals between
390    /// them. The `start_timestamp` is the timestamp in seconds.
391    async fn produce_blocks(
392        &self,
393        ctx: &Context<'_>,
394        start_timestamp: Option<Tai64Timestamp>,
395        blocks_to_produce: U32,
396    ) -> async_graphql::Result<U32> {
397        let config = ctx.data_unchecked::<GraphQLConfig>().clone();
398
399        if !config.debug {
400            return Err(anyhow!("`debug` must be enabled to use this endpoint").into())
401        }
402
403        let consensus_module = ctx.data_unchecked::<ConsensusModule>();
404
405        let start_time = start_timestamp.map(|timestamp| timestamp.0);
406        let blocks_to_produce: u32 = blocks_to_produce.into();
407        consensus_module
408            .manually_produce_blocks(start_time, blocks_to_produce)
409            .await?;
410
411        let on_chain_height = ctx.read_view()?.latest_block_height()?;
412        let shared_state =
413            ctx.data_unchecked::<graphql_api::worker_service::SharedState>();
414        shared_state
415            .block_height_subscription_handler
416            .subscribe()
417            .wait_for_block_height(on_chain_height)
418            .await?;
419        Ok(on_chain_height.into())
420    }
421}
422
423#[derive(Default)]
424pub struct BlockSubscription;
425
426#[Subscription]
427impl BlockSubscription {
428    #[graphql(name = "alpha__new_blocks")]
429    async fn new_blocks<'a>(
430        &self,
431        ctx: &Context<'a>,
432    ) -> async_graphql::Result<
433        impl Stream<Item = async_graphql::Result<HexString>> + 'a + use<'a>,
434    > {
435        require_expensive_subscriptions(ctx)?;
436        let worker_state: &graphql_api::worker_service::SharedState =
437            ctx.data_unchecked();
438
439        let receiver = worker_state.block_subscription.subscribe();
440        let stream = BroadcastStream::new(receiver).map(|result| {
441            result
442                .map(|bytes| {
443                    // TODO: Avoid cloning the bytes here.
444                    //  https://github.com/FuelLabs/fuel-core/issues/2657
445                    HexString(bytes.as_ref().clone())
446                })
447                .map_err(Into::into)
448        });
449
450        Ok(stream)
451    }
452}
453
454impl From<CompressedBlock> for Block {
455    fn from(block: CompressedBlock) -> Self {
456        Block(block)
457    }
458}
459
460impl From<BlockHeader> for Header {
461    fn from(header: BlockHeader) -> Self {
462        Header(header)
463    }
464}
465
466impl From<CompressedBlock> for Header {
467    fn from(block: CompressedBlock) -> Self {
468        Header(block.into_inner().0)
469    }
470}
471
472impl From<CoreGenesis> for Genesis {
473    fn from(genesis: CoreGenesis) -> Self {
474        Genesis {
475            chain_config_hash: genesis.chain_config_hash.into(),
476            coins_root: genesis.coins_root.into(),
477            contracts_root: genesis.contracts_root.into(),
478            messages_root: genesis.messages_root.into(),
479            transactions_root: genesis.transactions_root.into(),
480        }
481    }
482}
483
484impl TryFrom<CoreConsensus> for Consensus {
485    type Error = String;
486
487    fn try_from(consensus: CoreConsensus) -> Result<Self, Self::Error> {
488        match consensus {
489            CoreConsensus::Genesis(genesis) => Ok(Consensus::Genesis(genesis.into())),
490            CoreConsensus::PoA(poa) => Ok(Consensus::PoA(PoAConsensus {
491                signature: poa.signature.into(),
492            })),
493            _ => Err(format!("Unknown consensus type: {:?}", consensus)),
494        }
495    }
496}