1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
use crate::schema::tx::types::{
SqueezedOutStatus,
TransactionStatus,
};
use fuel_core_storage::Result as StorageResult;
use fuel_core_txpool::service::TxUpdate;
use fuel_core_types::fuel_types::Bytes32;
use futures::{
stream::BoxStream,
Stream,
StreamExt,
TryStreamExt,
};
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
use tracing::Instrument;
#[cfg(test)]
mod test;
#[cfg_attr(test, mockall::automock)]
#[async_trait::async_trait]
pub(crate) trait TxnStatusChangeState {
/// Return the transaction status from the tx pool and database.
async fn get_tx_status(
&self,
id: Bytes32,
) -> StorageResult<Option<TransactionStatus>>;
}
#[tracing::instrument(skip(state, stream), fields(transaction_id = %transaction_id))]
pub(crate) async fn transaction_status_change<'a, State>(
state: State,
stream: BoxStream<'a, Result<TxUpdate, BroadcastStreamRecvError>>,
transaction_id: Bytes32,
) -> impl Stream<Item = anyhow::Result<TransactionStatus>> + 'a
where
State: TxnStatusChangeState + Send + Sync + 'a,
{
let check_db_first = state
.get_tx_status(transaction_id)
.await
.map_err(Into::into)
.transpose();
let (close, mut closed) = tokio::sync::oneshot::channel();
let mut close = Some(close);
let stream = futures::stream::unfold((state, stream), move |(state, mut stream)| {
let is_closed = !matches!(
closed.try_recv(),
Err(tokio::sync::oneshot::error::TryRecvError::Empty)
);
async move {
tracing::debug!("{is_closed}");
if is_closed {
return None
}
match stream.next().await {
// Got status update that matches the transaction_id we are looking for.
Some(Ok(tx_update)) if *tx_update.tx_id() == transaction_id => {
if tx_update.was_squeezed_out() {
match tx_update.into_squeezed_out_reason() {
Some(reason) => {
tracing::debug!(
"transaction was squeezed out: {reason:?}"
);
// Squeezed out status is never stored in the database so must be
// outputted inline.
let status =
TransactionStatus::SqueezedOut(SqueezedOutStatus {
reason: reason.to_string(),
});
Some((Some(Ok(status)), (state, stream)))
}
None => {
unreachable!("due to the was squeezed out check above")
}
}
} else {
let status = state.get_tx_status(transaction_id).await;
tracing::debug!("got status update: {status:?}");
match status {
// Got the status from the db.
Ok(Some(s)) => Some((Some(Ok(s)), (state, stream))),
// Could not get status from the db so the stream must exit
// as a value has been missed and the only valid thing to do
// is to restart the stream.
Ok(None) => None,
// Got an error so return it.
Err(e) => Some((Some(Err(e.into())), (state, stream))),
}
}
}
// Got a status update but it's not this transaction so ignore it.
Some(Ok(r)) => {
tracing::debug!(
"ignoring status update for another transaction: {r:?}"
);
Some((None, (state, stream)))
}
// Buffer filled up before this stream was polled.
Some(Err(BroadcastStreamRecvError::Lagged(_))) => {
// Check the db incase a missed status was our transaction.
let status = state
.get_tx_status(transaction_id)
.await
.map_err(Into::into)
.transpose();
tracing::debug!("lagged: {status:?}");
Some((status, (state, stream)))
}
// Channel is closed.
None => None,
}
}
.in_current_span()
});
// Check the database first incase there is already a status.
futures::stream::once(futures::future::ready(check_db_first))
// Then wait for a status update.
.chain(stream)
// Filter out values that don't apply to this query.
.filter_map(futures::future::ready)
// Continue this stream while a submitted status is received
// as this stream should only end when no more status updates are possible.
.map_ok(move |status| {
if !matches!(status, TransactionStatus::Submitted(_)) {
if let Some(close) = close.take() {
let _ = close.send(());
}
}
status
})
}