fuel_core/service/
query.rs1use crate::graphql_api::ports::TxStatusManager;
3use fuel_core_storage::Result as StorageResult;
4use fuel_core_types::{
5 fuel_tx::{
6 Transaction,
7 UniqueIdentifier,
8 },
9 fuel_types::Bytes32,
10 services::transaction_status::TransactionStatus as TxPoolTxStatus,
11 tai64::Tai64,
12};
13use futures::{
14 Stream,
15 StreamExt,
16};
17
18use crate::{
19 database::OffChainIterableKeyValueView,
20 query::{
21 TxnStatusChangeState,
22 transaction_status_change,
23 },
24 schema::tx::types::TransactionStatus,
25};
26
27use super::*;
28
29impl FuelService {
30 pub async fn submit(&self, tx: Transaction) -> anyhow::Result<()> {
32 self.shared
33 .txpool_shared_state
34 .insert(tx)
35 .await
36 .map_err(|e| anyhow::anyhow!(e))
37 }
38
39 pub async fn submit_and_status_change(
41 &self,
42 tx: Transaction,
43 ) -> anyhow::Result<impl Stream<Item = anyhow::Result<TransactionStatus>> + '_> {
44 let id = tx.id(&self
45 .shared
46 .config
47 .snapshot_reader
48 .chain_config()
49 .consensus_parameters
50 .chain_id());
51 let stream = self.transaction_status_change(id).await?;
52 self.submit(tx).await?;
53 Ok(stream)
54 }
55
56 pub async fn submit_and_await_commit(
58 &self,
59 tx: Transaction,
60 ) -> anyhow::Result<TransactionStatus> {
61 let id = tx.id(&self
62 .shared
63 .config
64 .snapshot_reader
65 .chain_config()
66 .consensus_parameters
67 .chain_id());
68 let stream = self.transaction_status_change(id).await?.filter(|status| {
69 futures::future::ready(status.as_ref().is_ok_and(|status| status.is_final()))
70 });
71 futures::pin_mut!(stream);
72 self.submit(tx).await?;
73 stream
74 .next()
75 .await
76 .ok_or_else(|| anyhow::anyhow!("Stream closed without transaction status"))?
77 }
78
79 pub async fn transaction_status_change(
81 &self,
82 id: Bytes32,
83 ) -> anyhow::Result<impl Stream<Item = anyhow::Result<TransactionStatus>> + '_> {
84 let tx_status_manager = &self.shared.tx_status_manager;
86 let rx = tx_status_manager.tx_update_subscribe(id).await?;
87 let db = self.shared.database.off_chain().latest_view()?;
88 let state = StatusChangeState {
89 db,
90 tx_status_manager,
91 };
92 Ok(transaction_status_change(state, rx, id, true).await)
93 }
94}
95
96struct StatusChangeState<'a> {
97 db: OffChainIterableKeyValueView,
98 tx_status_manager: &'a TxStatusManagerAdapter,
99}
100
101impl TxnStatusChangeState for StatusChangeState<'_> {
102 async fn get_tx_status(
103 &self,
104 id: Bytes32,
105 include_preconfirmation: bool,
106 ) -> StorageResult<Option<TxPoolTxStatus>> {
107 match self.db.get_tx_status(&id)? {
108 Some(status) => Ok(Some(status.into())),
109 None => {
110 let status = self.tx_status_manager.status(id).await?;
111 let status = status.map(|status| {
112 if !include_preconfirmation
116 && status.is_preconfirmation()
117 && !status.is_final()
118 {
119 TxPoolTxStatus::submitted(Tai64::now())
120 } else {
121 status
122 }
123 });
124 Ok(status)
125 }
126 }
127 }
128}