#![allow(clippy::arc_with_non_send_sync)]
use fuel_core_txpool::{
error::RemovedReason,
TxStatusMessage,
};
use fuel_core_types::{
fuel_types::Bytes32,
services::txpool::TransactionStatus,
tai64::Tai64,
};
use futures::StreamExt;
use std::ops::ControlFlow;
use fuel_core_storage::Error as StorageError;
use proptest::{
prelude::prop,
prop_oneof,
strategy::{
Just,
Strategy,
},
};
use test_strategy::*;
fn txn_id(i: u8) -> Bytes32 {
[i; 32].into()
}
fn submitted() -> TransactionStatus {
TransactionStatus::Submitted { time: Tai64(0) }
}
fn success() -> TransactionStatus {
TransactionStatus::Success {
block_height: Default::default(),
time: Tai64(0),
result: None,
receipts: vec![],
total_gas: 0,
total_fee: 0,
}
}
fn failed() -> TransactionStatus {
TransactionStatus::Failed {
block_height: Default::default(),
time: Tai64(0),
result: None,
receipts: vec![],
total_gas: 0,
total_fee: 0,
}
}
fn squeezed() -> TransactionStatus {
TransactionStatus::SqueezedOut {
reason: fuel_core_txpool::error::Error::Removed(RemovedReason::Ttl).to_string(),
}
}
#[derive(Debug, Clone, PartialEq, Eq, Arbitrary)]
enum TxStatus {
Submitted,
Final(FinalTxStatus),
}
#[derive(Debug, Clone, PartialEq, Eq, Arbitrary)]
enum FinalTxStatus {
Success,
Squeezed,
Failed,
}
fn state() -> impl Strategy<Value = Option<TransactionStatus>> {
prop::option::of(transaction_status())
}
fn state_result() -> impl Strategy<Value = Result<Option<TransactionStatus>, Error>> {
prop::result::maybe_ok(state(), Just(Error))
}
fn tx_status_message() -> impl Strategy<Value = TxStatusMessage> {
prop_oneof![
Just(TxStatusMessage::FailedStatus),
transaction_status().prop_map(TxStatusMessage::Status),
]
}
fn transaction_status() -> impl Strategy<Value = TransactionStatus> {
prop_oneof![
Just(submitted()),
Just(success()),
Just(failed()),
Just(squeezed()),
]
}
fn input_stream() -> impl Strategy<Value = Vec<TxStatusMessage>> {
prop::collection::vec(tx_status_message(), 0..=5)
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Arbitrary)]
struct Error;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Arbitrary)]
struct Submitted;
type Flow = ControlFlow<FinalTxStatus, Submitted>;
fn transaction_status_change_model(
state: Option<TxStatusMessage>,
stream: impl Iterator<Item = TxStatusMessage>,
) -> impl Iterator<Item = Result<TxStatus, Error>> {
let out = state
.into_iter()
.chain(stream)
.try_fold(Vec::new(), |mut out, state| match state {
TxStatusMessage::Status(status) => match next_state(status) {
Flow::Continue(Submitted) => {
out.push(Ok(TxStatus::Submitted));
ControlFlow::Continue(out)
}
Flow::Break(r) => {
out.push(Ok(TxStatus::Final(r)));
ControlFlow::Break(out)
}
},
TxStatusMessage::FailedStatus => {
out.push(Err(Error));
ControlFlow::Break(out)
}
});
match out {
ControlFlow::Continue(out) | ControlFlow::Break(out) => out.into_iter(),
}
}
fn next_state(state: TransactionStatus) -> Flow {
match state {
TransactionStatus::Submitted { .. } => Flow::Continue(Submitted),
TransactionStatus::Success { .. } => Flow::Break(FinalTxStatus::Success),
TransactionStatus::Failed { .. } => Flow::Break(FinalTxStatus::Failed),
TransactionStatus::SqueezedOut { .. } => Flow::Break(FinalTxStatus::Squeezed),
}
}
thread_local!(static RT: tokio::runtime::Runtime =
tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
);
#[proptest]
fn test_tsc(
#[strategy(state_result())] state: Result<Option<TransactionStatus>, Error>,
#[strategy(input_stream())] stream: Vec<TxStatusMessage>,
) {
test_tsc_inner(state, stream)
}
fn test_tsc_inner(
state: Result<Option<TransactionStatus>, Error>,
stream: Vec<TxStatusMessage>,
) {
let model_out: Vec<_> = transaction_status_change_model(
state.clone().transpose().map(TxStatusMessage::from),
stream.clone().into_iter(),
)
.collect();
let out = RT.with(|rt| {
rt.block_on(async {
let mut mock_state = super::MockTxnStatusChangeState::new();
mock_state
.expect_get_tx_status()
.returning(move |_| match state.clone() {
Ok(Some(t)) => Ok(Some(t)),
Ok(None) => Ok(None),
Err(_) => Err(StorageError::NotFound("", "")),
});
let stream = futures::stream::iter(stream).boxed();
super::transaction_status_change(mock_state, stream, txn_id(0))
.await
.collect::<Vec<_>>()
.await
})
});
let out: Vec<_> = out
.into_iter()
.map(|r| r.map(|s| s.into()).map_err(|_| Error))
.collect();
assert_eq!(model_out, out);
}
impl From<crate::schema::tx::types::TransactionStatus> for TxStatus {
fn from(status: crate::schema::tx::types::TransactionStatus) -> Self {
match status {
crate::schema::tx::types::TransactionStatus::Submitted(_) => {
TxStatus::Submitted
}
crate::schema::tx::types::TransactionStatus::Success(_) => {
TxStatus::Final(FinalTxStatus::Success)
}
crate::schema::tx::types::TransactionStatus::SqueezedOut(_) => {
TxStatus::Final(FinalTxStatus::Squeezed)
}
crate::schema::tx::types::TransactionStatus::Failed(_) => {
TxStatus::Final(FinalTxStatus::Failed)
}
}
}
}