fuel_core/service/
query.rs

1//! Queries we can run directly on `FuelService`.
2use crate::graphql_api::ports::TxStatusManager;
3use fuel_core_storage::Result as StorageResult;
4use fuel_core_types::{
5    fuel_tx::{
6        Transaction,
7        UniqueIdentifier,
8    },
9    fuel_types::Bytes32,
10    services::transaction_status::TransactionStatus as TxPoolTxStatus,
11    tai64::Tai64,
12};
13use futures::{
14    Stream,
15    StreamExt,
16};
17
18use crate::{
19    database::OffChainIterableKeyValueView,
20    query::{
21        TxnStatusChangeState,
22        transaction_status_change,
23    },
24    schema::tx::types::TransactionStatus,
25};
26
27use super::*;
28
29impl FuelService {
30    /// Submit a transaction to the txpool.
31    pub async fn submit(&self, tx: Transaction) -> anyhow::Result<()> {
32        self.shared
33            .txpool_shared_state
34            .insert(tx)
35            .await
36            .map_err(|e| anyhow::anyhow!(e))
37    }
38
39    /// Submit a transaction to the txpool and return a stream of status changes.
40    pub async fn submit_and_status_change(
41        &self,
42        tx: Transaction,
43    ) -> anyhow::Result<impl Stream<Item = anyhow::Result<TransactionStatus>> + '_> {
44        let id = tx.id(&self
45            .shared
46            .config
47            .snapshot_reader
48            .chain_config()
49            .consensus_parameters
50            .chain_id());
51        let stream = self.transaction_status_change(id).await?;
52        self.submit(tx).await?;
53        Ok(stream)
54    }
55
56    /// Submit a transaction to the txpool and return the final status.
57    pub async fn submit_and_await_commit(
58        &self,
59        tx: Transaction,
60    ) -> anyhow::Result<TransactionStatus> {
61        let id = tx.id(&self
62            .shared
63            .config
64            .snapshot_reader
65            .chain_config()
66            .consensus_parameters
67            .chain_id());
68        let stream = self.transaction_status_change(id).await?.filter(|status| {
69            futures::future::ready(status.as_ref().is_ok_and(|status| status.is_final()))
70        });
71        futures::pin_mut!(stream);
72        self.submit(tx).await?;
73        stream
74            .next()
75            .await
76            .ok_or_else(|| anyhow::anyhow!("Stream closed without transaction status"))?
77    }
78
79    /// Return a stream of status changes for a transaction.
80    pub async fn transaction_status_change(
81        &self,
82        id: Bytes32,
83    ) -> anyhow::Result<impl Stream<Item = anyhow::Result<TransactionStatus>> + '_> {
84        // First subscribe to the statuses, and only after that create a view.
85        let tx_status_manager = &self.shared.tx_status_manager;
86        let rx = tx_status_manager.tx_update_subscribe(id).await?;
87        let db = self.shared.database.off_chain().latest_view()?;
88        let state = StatusChangeState {
89            db,
90            tx_status_manager,
91        };
92        Ok(transaction_status_change(state, rx, id, true).await)
93    }
94}
95
96struct StatusChangeState<'a> {
97    db: OffChainIterableKeyValueView,
98    tx_status_manager: &'a TxStatusManagerAdapter,
99}
100
101impl TxnStatusChangeState for StatusChangeState<'_> {
102    async fn get_tx_status(
103        &self,
104        id: Bytes32,
105        include_preconfirmation: bool,
106    ) -> StorageResult<Option<TxPoolTxStatus>> {
107        match self.db.get_tx_status(&id)? {
108            Some(status) => Ok(Some(status.into())),
109            None => {
110                let status = self.tx_status_manager.status(id).await?;
111                let status = status.map(|status| {
112                    // Filter out preconfirmation statuses if not allowed. Converting to submitted status
113                    // because it's the closest to the preconfirmation status.
114                    // Having `now()` as timestamp isn't ideal but shouldn't cause much inconsistency.
115                    if !include_preconfirmation
116                        && status.is_preconfirmation()
117                        && !status.is_final()
118                    {
119                        TxPoolTxStatus::submitted(Tai64::now())
120                    } else {
121                        status
122                    }
123                });
124                Ok(status)
125            }
126        }
127    }
128}