1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
use crate::schema::tx::types::{
SqueezedOutStatus,
TransactionStatus,
};
use fuel_core_interfaces::{
common::prelude::Bytes32,
txpool::TxUpdate,
};
use futures::{
stream::BoxStream,
Stream,
StreamExt,
TryStreamExt,
};
use tokio_stream::wrappers::errors::BroadcastStreamRecvError;
#[cfg(test)]
mod test;
#[cfg_attr(test, mockall::automock)]
#[async_trait::async_trait]
pub(crate) trait TxnStatusChangeState {
/// Return the transaction status from the tx pool and database.
async fn get_tx_status(
&self,
id: fuel_core_interfaces::common::fuel_types::Bytes32,
) -> anyhow::Result<Option<TransactionStatus>>;
}
pub(crate) async fn transaction_status_change(
state: Box<dyn TxnStatusChangeState + Send + Sync>,
stream: BoxStream<'static, Result<TxUpdate, BroadcastStreamRecvError>>,
transaction_id: Bytes32,
) -> impl Stream<Item = anyhow::Result<TransactionStatus>> {
let check_db_first = state.get_tx_status(transaction_id).await.transpose();
let (close, mut closed) = tokio::sync::oneshot::channel();
let mut close = Some(close);
let stream = futures::stream::unfold((state, stream), move |(state, mut stream)| {
let is_closed = !matches!(
closed.try_recv(),
Err(tokio::sync::oneshot::error::TryRecvError::Empty)
);
async move {
if is_closed {
return None
}
match stream.next().await {
// Got status update that matches the transaction_id we are looking for.
Some(Ok(tx_update)) if *tx_update.tx_id() == transaction_id => {
if tx_update.was_squeezed_out() {
match tx_update.into_squeezed_out_reason() {
Some(reason) => {
// Squeezed out status is never stored in the database so must be
// outputted inline.
let status =
TransactionStatus::SqueezedOut(SqueezedOutStatus {
reason: reason.to_string(),
});
Some((Some(Ok(status)), (state, stream)))
}
None => {
unreachable!("due to the was squeezed out check above")
}
}
} else {
let status = state.get_tx_status(transaction_id).await;
match status {
// Got the status from the db.
Ok(Some(s)) => Some((Some(Ok(s)), (state, stream))),
// Could not get status from the db so the stream must exit
// as a value has been missed and the only valid thing to do
// is to restart the stream.
Ok(None) => None,
// Got an error so return it.
Err(e) => Some((Some(Err(e)), (state, stream))),
}
}
}
// Got a status update but it's not this transaction so ignore it.
Some(Ok(_)) => Some((None, (state, stream))),
// Buffer filled up before this stream was polled.
Some(Err(BroadcastStreamRecvError::Lagged(_))) => {
// Check the db incase a missed status was our transaction.
let status = state.get_tx_status(transaction_id).await.transpose();
Some((status, (state, stream)))
}
// Channel is closed.
None => None,
}
}
});
// CHeck the database first incase there is already a status.
futures::stream::once(futures::future::ready(check_db_first))
// Then wait for a status update.
.chain(stream)
// Filter out values that don't apply to this query.
.filter_map(futures::future::ready)
// Continue this stream while a submitted status is received
// as this stream should only end when no more status updates are possible.
.map_ok(move |status| {
if !matches!(status, TransactionStatus::Submitted(_)) {
if let Some(close) = close.take() {
let _ = close.send(());
}
}
status
})
}