use crate::schema::tx::types::TransactionStatus as ApiTxStatus;
use fuel_core_storage::Result as StorageResult;
use fuel_core_txpool::service::TxStatusMessage;
use fuel_core_types::{
fuel_types::Bytes32,
services::txpool::TransactionStatus as TxPoolTxStatus,
};
use futures::{
stream::BoxStream,
Stream,
StreamExt,
};
#[cfg(test)]
mod test;
#[cfg_attr(test, mockall::automock)]
pub(crate) trait TxnStatusChangeState {
fn get_tx_status(&self, id: Bytes32) -> StorageResult<Option<TxPoolTxStatus>>;
}
impl<F> TxnStatusChangeState for F
where
F: Fn(Bytes32) -> StorageResult<Option<TxPoolTxStatus>> + Send + Sync,
{
fn get_tx_status(&self, id: Bytes32) -> StorageResult<Option<TxPoolTxStatus>> {
self(id)
}
}
#[tracing::instrument(skip(state, stream), fields(transaction_id = %transaction_id))]
pub(crate) fn transaction_status_change<'a, State>(
state: State,
stream: BoxStream<'a, TxStatusMessage>,
transaction_id: Bytes32,
) -> impl Stream<Item = anyhow::Result<ApiTxStatus>> + 'a
where
State: TxnStatusChangeState + Send + Sync + 'a,
{
let check_db_first = state
.get_tx_status(transaction_id)
.transpose()
.map(TxStatusMessage::from);
let (close, closed) = tokio::sync::oneshot::channel();
let mut close = Some(close);
futures::stream::iter(check_db_first)
.chain(stream)
.take_until(closed)
.map(move |status| {
if !matches!(
status,
TxStatusMessage::Status(TxPoolTxStatus::Submitted { .. })
) {
if let Some(close) = close.take() {
let _ = close.send(());
}
}
match status {
TxStatusMessage::Status(status) => Ok(status.into()),
TxStatusMessage::FailedStatus => {
Err(anyhow::anyhow!("Failed to get transaction status"))
}
}
})
}