fuel_core/schema/
tx.rs

1use super::scalars::{
2    AssetId,
3    U16,
4    U32,
5    U64,
6};
7use crate::{
8    coins_query::CoinsQueryError,
9    fuel_core_graphql_api::{
10        Config as GraphQLConfig,
11        IntoApiResult,
12        api_service::{
13            BlockProducer,
14            ChainInfoProvider,
15            DynTxStatusManager,
16            TxPool,
17        },
18        query_costs,
19    },
20    graphql_api::{
21        database::ReadView,
22        ports::MemoryPool,
23        require_expensive_subscriptions,
24    },
25    query::{
26        TxnStatusChangeState,
27        asset_query::Exclude,
28        transaction_status_change,
29    },
30    schema::{
31        ReadViewProvider,
32        coins::ExcludeInput,
33        gas_price::EstimateGasPriceExt,
34        scalars::{
35            Address,
36            HexString,
37            SortedTxCursor,
38            TransactionId,
39            TxPointer,
40        },
41        tx::{
42            assemble_tx::{
43                AssembleArguments,
44                AssembleTx,
45            },
46            types::{
47                AssembleTransactionResult,
48                TransactionStatus,
49                get_tx_status,
50            },
51        },
52    },
53    service::adapters::SharedMemoryPool,
54};
55use async_graphql::{
56    Context,
57    Object,
58    Subscription,
59    connection::{
60        Connection,
61        EmptyFields,
62    },
63};
64use fuel_core_storage::{
65    Error as StorageError,
66    IsNotFound,
67    PredicateStorageRequirements,
68    Result as StorageResult,
69    iter::IterDirection,
70};
71use fuel_core_syscall::handlers::log_collector::EcalLogCollector;
72use fuel_core_tx_status_manager::TxStatusMessage;
73use fuel_core_types::{
74    blockchain::transaction::TransactionExt,
75    fuel_tx::{
76        self,
77        Bytes32,
78        Cacheable,
79        Transaction as FuelTx,
80        UniqueIdentifier,
81    },
82    fuel_types::{
83        self,
84        canonical::Deserialize,
85    },
86    fuel_vm::checked_transaction::{
87        CheckPredicateParams,
88        EstimatePredicates,
89    },
90    services::{
91        executor::DryRunResult,
92        transaction_status,
93    },
94};
95use futures::{
96    Stream,
97    TryStreamExt,
98};
99use std::{
100    borrow::Cow,
101    future::Future,
102    iter,
103    sync::Arc,
104};
105use types::{
106    DryRunStorageReads,
107    DryRunTransactionExecutionStatus,
108    StorageReadReplayEvent,
109    Transaction,
110};
111
112mod assemble_tx;
113pub mod input;
114pub mod output;
115pub mod receipt;
116pub mod types;
117pub mod upgrade_purpose;
118
119#[derive(Default)]
120pub struct TxQuery;
121
122impl TxQuery {
123    /// The actual logic of all different dry-run queries.
124    async fn dry_run_inner(
125        &self,
126        ctx: &Context<'_>,
127        txs: Vec<HexString>,
128        // If set to false, disable input utxo validation, overriding the configuration of the node.
129        // This allows for non-existent inputs to be used without signature validation
130        // for read-only calls.
131        utxo_validation: Option<bool>,
132        gas_price: Option<U64>,
133        // This can be used to run the dry-run on top of a past block.
134        // Requires `--historical-execution` flag to be enabled.
135        block_height: Option<U32>,
136        // Record storage reads, so this tx can be used with execution tracer in a local debugger.
137        record_storage_reads: bool,
138    ) -> async_graphql::Result<DryRunStorageReads> {
139        let config = ctx.data_unchecked::<GraphQLConfig>().clone();
140        let block_producer = ctx.data_unchecked::<BlockProducer>();
141        let consensus_params = ctx
142            .data_unchecked::<ChainInfoProvider>()
143            .current_consensus_params();
144        let block_gas_limit = consensus_params.block_gas_limit();
145
146        if block_height.is_some() && !config.historical_execution {
147            return Err(anyhow::anyhow!(
148                "The `blockHeight` parameter requires the `--historical-execution` option"
149            )
150            .into());
151        }
152
153        let mut transactions = txs
154            .iter()
155            .map(|tx| FuelTx::from_bytes(&tx.0))
156            .collect::<Result<Vec<FuelTx>, _>>()?;
157        transactions.iter_mut().try_fold::<_, _, async_graphql::Result<u64>>(0u64, |acc, tx| {
158            let gas = tx.max_gas(&consensus_params)?;
159            let gas = gas.saturating_add(acc);
160            if gas > block_gas_limit {
161                return Err(anyhow::anyhow!("The sum of the gas usable by the transactions is greater than the block gas limit").into());
162            }
163            tx.precompute(&consensus_params.chain_id())?;
164            Ok(gas)
165        })?;
166
167        let DryRunResult {
168            transactions,
169            storage_reads,
170        } = block_producer
171            .dry_run_txs(
172                transactions,
173                block_height.map(|x| x.into()),
174                None, // TODO(#1749): Pass parameter from API
175                utxo_validation,
176                gas_price.map(|x| x.into()),
177                record_storage_reads,
178            )
179            .await?;
180
181        let tx_statuses = transactions
182            .into_iter()
183            .map(|(_, status)| DryRunTransactionExecutionStatus(status))
184            .collect();
185
186        let storage_reads = storage_reads
187            .into_iter()
188            .map(|event| event.into())
189            .collect();
190
191        Ok(DryRunStorageReads {
192            tx_statuses,
193            storage_reads,
194        })
195    }
196}
197
198#[Object]
199impl TxQuery {
200    #[graphql(complexity = "query_costs().storage_read + child_complexity")]
201    async fn transaction(
202        &self,
203        ctx: &Context<'_>,
204        #[graphql(desc = "The ID of the transaction")] id: TransactionId,
205    ) -> async_graphql::Result<Option<Transaction>> {
206        let query = ctx.read_view()?;
207        let id = id.0;
208        let txpool = ctx.data_unchecked::<TxPool>();
209
210        match txpool.transaction(id).await? {
211            Some(transaction) => Ok(Some(Transaction(transaction, id))),
212            _ => query
213                .transaction(&id)
214                .map(|tx| Transaction::from_tx(id, tx))
215                .into_api_result(),
216        }
217    }
218
219    // We assume that each block has 100 transactions.
220    #[graphql(complexity = "{\
221        (query_costs().tx_get + child_complexity) \
222        * (first.unwrap_or_default() as usize + last.unwrap_or_default() as usize)
223    }")]
224    async fn transactions(
225        &self,
226        ctx: &Context<'_>,
227        first: Option<i32>,
228        after: Option<String>,
229        last: Option<i32>,
230        before: Option<String>,
231    ) -> async_graphql::Result<
232        Connection<SortedTxCursor, Transaction, EmptyFields, EmptyFields>,
233    > {
234        use futures::stream::StreamExt;
235        let query = ctx.read_view()?;
236        let query_ref = query.as_ref();
237        crate::schema::query_pagination(
238            after,
239            before,
240            first,
241            last,
242            |start: &Option<SortedTxCursor>, direction| {
243                let start = *start;
244                let block_id = start.map(|sorted| sorted.block_height);
245                let compressed_blocks = query.compressed_blocks(block_id, direction);
246
247                let all_txs = compressed_blocks
248                    .map_ok(move |fuel_block| {
249                        let (header, mut txs) = fuel_block.into_inner();
250
251                        if direction == IterDirection::Reverse {
252                            txs.reverse();
253                        }
254
255                        let iter = txs.into_iter().zip(iter::repeat(*header.height()));
256                        futures::stream::iter(iter).map(Ok)
257                    })
258                    .try_flatten()
259                    .map_ok(|(tx_id, block_height)| {
260                        SortedTxCursor::new(block_height, tx_id.into())
261                    })
262                    .try_skip_while(move |sorted| {
263                        let skip = if let Some(start) = start {
264                            sorted != &start
265                        } else {
266                            false
267                        };
268
269                        async move { Ok::<_, StorageError>(skip) }
270                    })
271                    .chunks(query_ref.batch_size)
272                    .map(|chunk| {
273                        use itertools::Itertools;
274
275                        let chunk = chunk.into_iter().try_collect::<_, Vec<_>, _>()?;
276                        Ok::<_, StorageError>(chunk)
277                    })
278                    .try_filter_map(move |chunk| {
279                        let async_query = query_ref.clone();
280                        async move {
281                            let tx_ids = chunk
282                                .iter()
283                                .map(|sorted| sorted.tx_id.0)
284                                .collect::<Vec<_>>();
285                            let txs = async_query.transactions(tx_ids).await;
286                            let txs = txs.into_iter().zip(chunk.into_iter()).map(
287                                |(result, sorted)| {
288                                    result.map(|tx| {
289                                        (sorted, Transaction::from_tx(sorted.tx_id.0, tx))
290                                    })
291                                },
292                            );
293                            Ok(Some(futures::stream::iter(txs)))
294                        }
295                    })
296                    .try_flatten();
297
298                Ok(all_txs)
299            },
300        )
301        .await
302    }
303
304    #[graphql(complexity = "{\
305        query_costs().storage_iterator\
306        + first.unwrap_or_default() as usize * (child_complexity + query_costs().storage_read) \
307        + last.unwrap_or_default() as usize * (child_complexity + query_costs().storage_read) \
308    }")]
309    async fn transactions_by_owner(
310        &self,
311        ctx: &Context<'_>,
312        owner: Address,
313        first: Option<i32>,
314        after: Option<String>,
315        last: Option<i32>,
316        before: Option<String>,
317    ) -> async_graphql::Result<Connection<TxPointer, Transaction, EmptyFields, EmptyFields>>
318    {
319        use futures::stream::StreamExt;
320        let query = ctx.read_view()?;
321        let params = ctx
322            .data_unchecked::<ChainInfoProvider>()
323            .current_consensus_params();
324        let owner = fuel_types::Address::from(owner);
325
326        crate::schema::query_pagination(
327            after,
328            before,
329            first,
330            last,
331            |start: &Option<TxPointer>, direction| {
332                let start = (*start).map(Into::into);
333                let txs =
334                    query
335                        .owned_transactions(owner, start, direction)
336                        .map(|result| {
337                            result.map(|(cursor, tx)| {
338                                let tx_id = tx.id(&params.chain_id());
339                                (cursor.into(), Transaction::from_tx(tx_id, tx))
340                            })
341                        });
342                Ok(txs)
343            },
344        )
345        .await
346    }
347
348    /// Assembles the transaction based on the provided requirements.
349    /// The return transaction contains:
350    /// - Input coins to cover `required_balances`
351    /// - Input coins to cover the fee of the transaction based on the gas price from `block_horizon`
352    /// - `Change` or `Destroy` outputs for all assets from the inputs
353    /// - `Variable` outputs in the case they are required during the execution
354    /// - `Contract` inputs and outputs in the case they are required during the execution
355    /// - Reserved witness slots for signed coins filled with `64` zeroes
356    /// - Set script gas limit(unless `script` is empty)
357    /// - Estimated predicates, if `estimate_predicates == true`
358    ///
359    /// Returns an error if:
360    /// - The number of required balances exceeds the maximum number of inputs allowed.
361    /// - The fee address index is out of bounds.
362    /// - The same asset has multiple change policies(either the receiver of
363    ///     the change is different, or one of the policies states about the destruction
364    ///     of the token while the other does not). The `Change` output from the transaction
365    ///     also count as a `ChangePolicy`.
366    /// - The number of excluded coin IDs exceeds the maximum number of inputs allowed.
367    /// - Required assets have multiple entries.
368    /// - If accounts don't have sufficient amounts to cover the transaction requirements in assets.
369    /// - If a constructed transaction breaks the rules defined by consensus parameters.
370    #[graphql(complexity = "query_costs().assemble_tx")]
371    #[allow(clippy::too_many_arguments)]
372    async fn assemble_tx(
373        &self,
374        ctx: &Context<'_>,
375        #[graphql(
376            desc = "The original transaction that contains application level logic only"
377        )]
378        tx: HexString,
379        #[graphql(
380            desc = "Number of blocks into the future to estimate the gas price for"
381        )]
382        block_horizon: U32,
383        #[graphql(
384            desc = "The list of required balances for the transaction to include as inputs. \
385                    The list should be created based on the application-required assets. \
386                    The base asset requirement should not require assets to cover the \
387                    transaction fee, which will be calculated and added automatically \
388                    at the end of the assembly process."
389        )]
390        required_balances: Vec<schema_types::RequiredBalance>,
391        #[graphql(desc = "The index from the `required_balances` list \
392                that points to the address who pays fee for the transaction. \
393                If you only want to cover the fee of transaction, you can set the required balance \
394                to 0, set base asset and point to this required address.")]
395        fee_address_index: U16,
396        #[graphql(
397            desc = "The list of resources to exclude from the selection for the inputs"
398        )]
399        exclude_input: Option<ExcludeInput>,
400        #[graphql(
401            desc = "Perform the estimation of the predicates before cover fee of the transaction"
402        )]
403        estimate_predicates: Option<bool>,
404        #[graphql(
405            desc = "During the phase of the fee calculation, adds `reserve_gas` to the \
406                    total gas used by the transaction and fetch assets to cover the fee."
407        )]
408        reserve_gas: Option<U64>,
409    ) -> async_graphql::Result<AssembleTransactionResult> {
410        let consensus_parameters = ctx
411            .data_unchecked::<ChainInfoProvider>()
412            .current_consensus_params();
413
414        let max_input = consensus_parameters.tx_params().max_inputs();
415
416        if required_balances.len() > max_input as usize {
417            return Err(CoinsQueryError::TooManyCoinsSelected {
418                required: required_balances.len(),
419                max: max_input,
420            }
421            .into());
422        }
423
424        let fee_index: u16 = fee_address_index.into();
425        let estimate_predicates: bool = estimate_predicates.unwrap_or(false);
426        let reserve_gas: u64 = reserve_gas.map(Into::into).unwrap_or(0);
427
428        let excluded_id_count = exclude_input.as_ref().map_or(0, |exclude| {
429            exclude.utxos.len().saturating_add(exclude.messages.len())
430        });
431        if excluded_id_count > max_input as usize {
432            return Err(CoinsQueryError::TooManyExcludedId {
433                provided: excluded_id_count,
434                allowed: max_input,
435            }
436            .into());
437        }
438
439        let required_balances: Vec<RequiredBalance> =
440            required_balances.into_iter().map(Into::into).collect();
441        let exclude: Exclude = exclude_input.into();
442
443        let gas_price = ctx.estimate_gas_price(Some(block_horizon.into()))?;
444        let config = &ctx.data_unchecked::<GraphQLConfig>().config;
445
446        let tx = FuelTx::from_bytes(&tx.0)?;
447
448        let read_view = Arc::new(ctx.read_view()?.into_owned());
449        let block_producer = ctx.data_unchecked::<BlockProducer>();
450        let shared_memory_pool = ctx.data_unchecked::<SharedMemoryPool>();
451
452        let arguments = AssembleArguments {
453            fee_index,
454            required_balances,
455            exclude,
456            estimate_predicates,
457            reserve_gas,
458            consensus_parameters,
459            gas_price,
460            dry_run_limit: config.assemble_tx_dry_run_limit,
461            estimate_predicates_limit: config.assemble_tx_estimate_predicates_limit,
462            block_producer,
463            read_view,
464            shared_memory_pool,
465        };
466
467        let assembled_tx: fuel_tx::Transaction = match tx {
468            fuel_tx::Transaction::Script(tx) => {
469                AssembleTx::new(tx, arguments)?.assemble().await?.into()
470            }
471            fuel_tx::Transaction::Create(tx) => {
472                AssembleTx::new(tx, arguments)?.assemble().await?.into()
473            }
474            fuel_tx::Transaction::Mint(_) => {
475                return Err(anyhow::anyhow!("Mint transaction is not supported").into());
476            }
477            fuel_tx::Transaction::Upgrade(tx) => {
478                AssembleTx::new(tx, arguments)?.assemble().await?.into()
479            }
480            fuel_tx::Transaction::Upload(tx) => {
481                AssembleTx::new(tx, arguments)?.assemble().await?.into()
482            }
483            fuel_tx::Transaction::Blob(tx) => {
484                AssembleTx::new(tx, arguments)?.assemble().await?.into()
485            }
486        };
487
488        let (assembled_tx, status) = block_producer
489            .dry_run_txs(
490                vec![assembled_tx],
491                None,
492                None,
493                Some(false),
494                Some(gas_price),
495                false,
496            )
497            .await?
498            .transactions
499            .into_iter()
500            .next()
501            .ok_or_else(|| {
502                anyhow::anyhow!(
503                    "Failed to do the final `dry_run` of the assembled transaction"
504                )
505            })?;
506
507        let result = AssembleTransactionResult {
508            tx_id: status.id,
509            tx: assembled_tx,
510            status: status.result,
511            gas_price,
512        };
513
514        Ok(result)
515    }
516
517    /// Estimate the predicate gas for the provided transaction
518    #[graphql(complexity = "query_costs().estimate_predicates + child_complexity")]
519    async fn estimate_predicates(
520        &self,
521        ctx: &Context<'_>,
522        tx: HexString,
523    ) -> async_graphql::Result<Transaction> {
524        let query = ctx.read_view()?.into_owned();
525
526        let tx = FuelTx::from_bytes(&tx.0)?;
527
528        let tx = ctx.estimate_predicates(tx, query).await?;
529        let chain_id = ctx
530            .data_unchecked::<ChainInfoProvider>()
531            .current_consensus_params()
532            .chain_id();
533
534        Ok(Transaction::from_tx(tx.id(&chain_id), tx))
535    }
536
537    #[cfg(feature = "test-helpers")]
538    /// Returns all possible receipts for test purposes.
539    async fn all_receipts(&self) -> Vec<receipt::Receipt> {
540        receipt::all_receipts()
541            .into_iter()
542            .map(Into::into)
543            .collect()
544    }
545
546    /// Execute a dry-run of multiple transactions using a fork of current state, no changes are committed.
547    #[graphql(
548        complexity = "query_costs().dry_run * txs.len() + child_complexity * txs.len()"
549    )]
550    async fn dry_run(
551        &self,
552        ctx: &Context<'_>,
553        txs: Vec<HexString>,
554        // If set to false, disable input utxo validation, overriding the configuration of the node.
555        // This allows for non-existent inputs to be used without signature validation
556        // for read-only calls.
557        utxo_validation: Option<bool>,
558        gas_price: Option<U64>,
559        // This can be used to run the dry-run on top of a past block.
560        // Requires `--historical-execution` flag to be enabled.
561        block_height: Option<U32>,
562    ) -> async_graphql::Result<Vec<DryRunTransactionExecutionStatus>> {
563        Ok(self
564            .dry_run_inner(ctx, txs, utxo_validation, gas_price, block_height, false)
565            .await?
566            .tx_statuses)
567    }
568
569    /// Execute a dry-run of multiple transactions using a fork of current state, no changes are committed.
570    /// Also records accesses, so the execution can be replicated locally.
571    #[graphql(
572        complexity = "query_costs().dry_run * txs.len() + child_complexity * txs.len()"
573    )]
574    async fn dry_run_record_storage_reads(
575        &self,
576        ctx: &Context<'_>,
577        txs: Vec<HexString>,
578        // If set to false, disable input utxo validation, overriding the configuration of the node.
579        // This allows for non-existent inputs to be used without signature validation
580        // for read-only calls.
581        utxo_validation: Option<bool>,
582        gas_price: Option<U64>,
583        // This can be used to run the dry-run on top of a past block.
584        // Requires `--historical-execution` flag to be enabled.
585        block_height: Option<U32>,
586    ) -> async_graphql::Result<DryRunStorageReads> {
587        self.dry_run_inner(ctx, txs, utxo_validation, gas_price, block_height, true)
588            .await
589    }
590
591    /// Get execution trace for an already-executed block.
592    #[graphql(complexity = "query_costs().storage_read_replay + child_complexity")]
593    async fn storage_read_replay(
594        &self,
595        ctx: &Context<'_>,
596        height: U32,
597    ) -> async_graphql::Result<Vec<StorageReadReplayEvent>> {
598        let config = ctx.data_unchecked::<GraphQLConfig>();
599        if !config.historical_execution {
600            return Err(anyhow::anyhow!(
601                "`--historical-execution` is required for this operation"
602            )
603            .into());
604        }
605
606        let block_height = height.into();
607        let block_producer = ctx.data_unchecked::<BlockProducer>();
608        Ok(block_producer
609            .storage_read_replay(block_height)
610            .await?
611            .into_iter()
612            .map(StorageReadReplayEvent::from)
613            .collect())
614    }
615}
616
617#[derive(Default)]
618pub struct TxMutation;
619
620#[Object]
621impl TxMutation {
622    /// Execute a dry-run of multiple transactions using a fork of current state, no changes are committed.
623    #[graphql(
624        complexity = "query_costs().dry_run * txs.len() + child_complexity * txs.len()",
625        deprecation = "This doesn't need to be a mutation. Use query of the same name instead."
626    )]
627    async fn dry_run(
628        &self,
629        ctx: &Context<'_>,
630        txs: Vec<HexString>,
631        // If set to false, disable input utxo validation, overriding the configuration of the node.
632        // This allows for non-existent inputs to be used without signature validation
633        // for read-only calls.
634        utxo_validation: Option<bool>,
635        gas_price: Option<U64>,
636        // This can be used to run the dry-run on top of a past block.
637        // Requires `--historical-execution` flag to be enabled.
638        block_height: Option<U32>,
639    ) -> async_graphql::Result<Vec<DryRunTransactionExecutionStatus>> {
640        TxQuery::dry_run(&TxQuery, ctx, txs, utxo_validation, gas_price, block_height)
641            .await
642    }
643
644    /// Submits transaction to the `TxPool`.
645    ///
646    /// Returns submitted transaction if the transaction is included in the `TxPool` without problems.
647    #[graphql(complexity = "query_costs().submit + child_complexity")]
648    async fn submit(
649        &self,
650        ctx: &Context<'_>,
651        tx: HexString,
652        estimate_predicates: Option<bool>,
653    ) -> async_graphql::Result<Transaction> {
654        let txpool = ctx.data_unchecked::<TxPool>();
655        let mut tx = FuelTx::from_bytes(&tx.0)?;
656
657        if estimate_predicates.unwrap_or(false) {
658            let query = ctx.read_view()?.into_owned();
659            tx = ctx.estimate_predicates(tx, query).await?;
660        }
661
662        txpool
663            .insert(tx.clone())
664            .await
665            .map_err(|e| anyhow::anyhow!(e))?;
666
667        let chain_id = ctx
668            .data_unchecked::<ChainInfoProvider>()
669            .current_consensus_params()
670            .chain_id();
671        let id = tx.id(&chain_id);
672
673        let tx = Transaction(tx, id);
674        Ok(tx)
675    }
676}
677
678#[derive(Default)]
679pub struct TxStatusSubscription;
680
681#[Subscription]
682impl TxStatusSubscription {
683    /// Returns a stream of status updates for the given transaction id.
684    /// If the current status is [`TransactionStatus::Success`], [`TransactionStatus::Failed`],
685    /// or [`TransactionStatus::SqueezedOut`] the stream will return that and end immediately.
686    /// Other, intermediate statuses will also be returned but the stream will remain active
687    /// and wait for a future updates.
688    ///
689    /// This stream will wait forever so it's advised to use within a timeout.
690    ///
691    /// It is possible for the stream to miss an update if it is polled slower
692    /// then the updates arrive. In such a case the stream will close without
693    /// a status. If this occurs the stream can simply be restarted to return
694    /// the latest status.
695    #[graphql(complexity = "query_costs().status_change + child_complexity")]
696    async fn status_change<'a>(
697        &self,
698        ctx: &'a Context<'a>,
699        #[graphql(desc = "The ID of the transaction")] id: TransactionId,
700        #[graphql(desc = "If true, accept to receive the preconfirmation status")]
701        include_preconfirmation: Option<bool>,
702    ) -> anyhow::Result<
703        impl Stream<Item = async_graphql::Result<TransactionStatus>> + 'a + use<'a>,
704    > {
705        let tx_status_manager = ctx.data_unchecked::<DynTxStatusManager>();
706        let rx = tx_status_manager.tx_update_subscribe(id.into()).await?;
707        let query = ctx.read_view()?;
708
709        let status_change_state = StatusChangeState {
710            tx_status_manager,
711            query,
712        };
713        Ok(transaction_status_change(
714            status_change_state,
715            rx,
716            id.into(),
717            include_preconfirmation.unwrap_or(false),
718        )
719        .await
720        .map_err(async_graphql::Error::from))
721    }
722
723    #[graphql(name = "alpha__preconfirmations")]
724    async fn preconfirmations<'a>(
725        &self,
726        ctx: &'a Context<'a>,
727    ) -> async_graphql::Result<
728        impl Stream<Item = async_graphql::Result<TransactionStatus>> + 'a + use<'a>,
729    > {
730        use futures::StreamExt;
731
732        require_expensive_subscriptions(ctx)?;
733
734        let tx_status_manager = ctx.data_unchecked::<DynTxStatusManager>();
735        let stream = tx_status_manager.subscribe_txs_updates()?;
736
737        let stream = stream.map(|result| {
738            result
739                .map(|(id, status)| TransactionStatus::new(id, status))
740                .map_err(Into::into)
741        });
742
743        Ok(stream)
744    }
745
746    /// Submits transaction to the `TxPool` and await either success or failure.
747    #[graphql(complexity = "query_costs().submit_and_await + child_complexity")]
748    async fn submit_and_await<'a>(
749        &self,
750        ctx: &'a Context<'a>,
751        tx: HexString,
752        estimate_predicates: Option<bool>,
753    ) -> async_graphql::Result<
754        impl Stream<Item = async_graphql::Result<TransactionStatus>> + 'a + use<'a>,
755    > {
756        use tokio_stream::StreamExt;
757        let subscription =
758            submit_and_await_status(ctx, tx, estimate_predicates.unwrap_or(false), false)
759                .await?;
760
761        Ok(subscription
762            .skip_while(|event| event.as_ref().map_or(true, |status| !status.is_final()))
763            .take(1))
764    }
765
766    /// Submits the transaction to the `TxPool` and returns a stream of events.
767    /// Compared to the `submitAndAwait`, the stream also contains
768    /// `SubmittedStatus` and potentially preconfirmation as an intermediate state.
769    #[graphql(complexity = "query_costs().submit_and_await + child_complexity")]
770    async fn submit_and_await_status<'a>(
771        &self,
772        ctx: &'a Context<'a>,
773        tx: HexString,
774        estimate_predicates: Option<bool>,
775        include_preconfirmation: Option<bool>,
776    ) -> async_graphql::Result<
777        impl Stream<Item = async_graphql::Result<TransactionStatus>> + 'a + use<'a>,
778    > {
779        submit_and_await_status(
780            ctx,
781            tx,
782            estimate_predicates.unwrap_or(false),
783            include_preconfirmation.unwrap_or(false),
784        )
785        .await
786    }
787}
788
789async fn submit_and_await_status<'a>(
790    ctx: &'a Context<'a>,
791    tx: HexString,
792    estimate_predicates: bool,
793    include_preconfirmation: bool,
794) -> async_graphql::Result<
795    impl Stream<Item = async_graphql::Result<TransactionStatus>> + 'a,
796> {
797    use tokio_stream::StreamExt;
798    let txpool = ctx.data_unchecked::<TxPool>();
799    let tx_status_manager = ctx.data_unchecked::<DynTxStatusManager>();
800    let params = ctx
801        .data_unchecked::<ChainInfoProvider>()
802        .current_consensus_params();
803    let mut tx = FuelTx::from_bytes(&tx.0)?;
804    let tx_id = tx.id(&params.chain_id());
805
806    if estimate_predicates {
807        let query = ctx.read_view()?.into_owned();
808        tx = ctx.estimate_predicates(tx, query).await?;
809    }
810
811    let subscription = tx_status_manager.tx_update_subscribe(tx_id).await?;
812
813    txpool.insert(tx).await?;
814
815    Ok(subscription
816        .filter_map(move |status| {
817            match status {
818                TxStatusMessage::Status(status) => {
819                    let status = TransactionStatus::new(tx_id, status);
820                    if !include_preconfirmation && status.is_preconfirmation() {
821                        None
822                    } else {
823                        Some(Ok(status))
824                    }
825                }
826                // Map a failed status to an error for the api.
827                TxStatusMessage::FailedStatus => Some(Err(anyhow::anyhow!(
828                    "Failed to get transaction status"
829                )
830                .into())),
831            }
832        })
833        .take(3))
834}
835
836struct StatusChangeState<'a> {
837    query: Cow<'a, ReadView>,
838    tx_status_manager: &'a DynTxStatusManager,
839}
840
841impl TxnStatusChangeState for StatusChangeState<'_> {
842    async fn get_tx_status(
843        &self,
844        id: Bytes32,
845        include_preconfirmation: bool,
846    ) -> StorageResult<Option<transaction_status::TransactionStatus>> {
847        get_tx_status(
848            &id,
849            self.query.as_ref(),
850            self.tx_status_manager,
851            include_preconfirmation,
852        )
853        .await
854    }
855}
856
857pub mod schema_types {
858    use super::*;
859
860    #[derive(async_graphql::Enum, Clone, Copy, Hash, PartialEq, Eq, PartialOrd, Ord)]
861    pub enum Destroy {
862        Destroy,
863    }
864
865    #[derive(async_graphql::OneofObject)]
866    pub enum ChangePolicy {
867        /// Adds `Output::Change` to the transaction if it is not already present.
868        /// Sending remaining assets to the provided address.
869        Change(Address),
870        /// Destroys the remaining assets by the transaction for provided address.
871        Destroy(Destroy),
872    }
873
874    #[derive(async_graphql::OneofObject)]
875    pub enum Account {
876        Address(Address),
877        Predicate(Predicate),
878    }
879
880    #[derive(async_graphql::InputObject)]
881    pub struct Predicate {
882        // The address of the predicate can be different from the actual bytecode.
883        // This feature is used by wallets during estimation of the predicate that requires
884        // signature verification. They provide a mocked version of the predicate that
885        // returns `true` even if the signature doesn't match.
886        pub predicate_address: Address,
887        pub predicate: HexString,
888        pub predicate_data: HexString,
889    }
890
891    #[derive(async_graphql::InputObject)]
892    pub struct RequiredBalance {
893        pub asset_id: AssetId,
894        pub amount: U64,
895        pub account: Account,
896        pub change_policy: ChangePolicy,
897    }
898}
899
900#[derive(Clone, Copy, PartialEq, Eq)]
901pub enum ChangePolicy {
902    /// Adds `Output::Change` to the transaction if it is not already present.
903    /// Sending remaining assets to the provided address.
904    Change(fuel_tx::Address),
905    /// Destroys the remaining assets by the transaction for provided address.
906    Destroy,
907}
908
909#[derive(Clone)]
910pub enum Account {
911    Address(fuel_tx::Address),
912    Predicate(Predicate),
913}
914
915impl Account {
916    pub fn owner(&self) -> fuel_tx::Address {
917        match self {
918            Account::Address(address) => *address,
919            Account::Predicate(predicate) => predicate.predicate_address,
920        }
921    }
922}
923
924#[derive(Clone)]
925pub struct Predicate {
926    pub predicate_address: fuel_tx::Address,
927    pub predicate: Vec<u8>,
928    pub predicate_data: Vec<u8>,
929}
930
931struct RequiredBalance {
932    asset_id: fuel_tx::AssetId,
933    amount: fuel_tx::Word,
934    account: Account,
935    change_policy: ChangePolicy,
936}
937
938impl From<schema_types::RequiredBalance> for RequiredBalance {
939    fn from(required_balance: schema_types::RequiredBalance) -> Self {
940        let asset_id: fuel_tx::AssetId = required_balance.asset_id.into();
941        let amount: fuel_tx::Word = required_balance.amount.into();
942        let account = match required_balance.account {
943            schema_types::Account::Address(address) => Account::Address(address.into()),
944            schema_types::Account::Predicate(predicate) => {
945                let predicate_address = predicate.predicate_address.into();
946                let predicate_data = predicate.predicate_data.into();
947                let predicate = predicate.predicate.into();
948                Account::Predicate(Predicate {
949                    predicate_address,
950                    predicate,
951                    predicate_data,
952                })
953            }
954        };
955
956        let change_policy = match required_balance.change_policy {
957            schema_types::ChangePolicy::Change(address) => {
958                ChangePolicy::Change(address.into())
959            }
960            schema_types::ChangePolicy::Destroy(_) => ChangePolicy::Destroy,
961        };
962
963        Self {
964            asset_id,
965            amount,
966            account,
967            change_policy,
968        }
969    }
970}
971
972pub trait ContextExt {
973    fn try_find_tx(
974        &self,
975        id: Bytes32,
976    ) -> impl Future<Output = StorageResult<Option<FuelTx>>> + Send;
977
978    fn estimate_predicates(
979        &self,
980        tx: FuelTx,
981        query: impl PredicateStorageRequirements + Send + Sync + 'static,
982    ) -> impl Future<Output = anyhow::Result<FuelTx>> + Send;
983}
984
985impl ContextExt for Context<'_> {
986    async fn try_find_tx(&self, id: Bytes32) -> StorageResult<Option<FuelTx>> {
987        let query = self.read_view()?;
988        let txpool = self.data_unchecked::<TxPool>();
989
990        match txpool.transaction(id).await? {
991            Some(tx) => Ok(Some(tx)),
992            _ => {
993                let result = query.transaction(&id);
994
995                if result.is_not_found() {
996                    Ok(None)
997                } else {
998                    result.map(Some)
999                }
1000            }
1001        }
1002    }
1003
1004    async fn estimate_predicates(
1005        &self,
1006        mut tx: FuelTx,
1007        query: impl PredicateStorageRequirements + Send + Sync + 'static,
1008    ) -> anyhow::Result<FuelTx> {
1009        let mut has_predicates = false;
1010
1011        for input in tx.inputs().iter() {
1012            if input.predicate().is_some() {
1013                has_predicates = true;
1014                break;
1015            }
1016        }
1017
1018        if !has_predicates {
1019            return Ok(tx);
1020        }
1021
1022        let params = self
1023            .data_unchecked::<ChainInfoProvider>()
1024            .current_consensus_params();
1025
1026        let memory_pool = self.data_unchecked::<SharedMemoryPool>();
1027        let memory = memory_pool.get_memory().await;
1028
1029        let config = self.data_unchecked::<GraphQLConfig>();
1030        let allow_syscall = config.allow_syscall;
1031
1032        let parameters = CheckPredicateParams::from(params.as_ref());
1033        let tx = tokio_rayon::spawn_fifo(move || {
1034            let ecal = EcalLogCollector {
1035                enabled: allow_syscall,
1036                ..Default::default()
1037            };
1038
1039            let chain_id = params.chain_id();
1040
1041            let result =
1042                tx.estimate_predicates_ecal(&parameters, memory, &query, ecal.clone());
1043
1044            ecal.maybe_print_logs(
1045                tracing::info_span!("estimation", tx_id = % &tx.id(&chain_id)),
1046            );
1047            result.map(|_| tx)
1048        })
1049        .await
1050        .map_err(|err| anyhow::anyhow!("{:?}", err))?;
1051
1052        Ok(tx)
1053    }
1054}