use std::{
collections::HashMap,
sync::Arc,
time::Duration,
};
use fuel_core_types::{
fuel_tx::Bytes32,
services::txpool::TransactionStatus,
tai64::Tai64,
};
use parking_lot::lock_api::Mutex;
use test_strategy::proptest;
use crate::{
tests::utils::{
box_senders,
construct_senders,
senders_strategy_any,
tx_update_strategy,
SenderData,
},
tx_status_stream::{
State,
TxStatusMessage,
TxUpdate,
},
update_sender::{
MockSendStatus,
SendError,
SendStatus,
Sender,
UpdateSender,
},
};
use super::tests_update_stream_state::{
validate_tx_update_stream_state,
StateTransitions,
};
pub(super) fn validate_send(
tx: Result<(), SendError>,
state: State,
msg: TxStatusMessage,
) -> State {
let state = validate_tx_update_stream_state(state, StateTransitions::AddMsg(msg));
let state = validate_tx_update_stream_state(state, StateTransitions::Next);
match tx {
Ok(()) => state,
Err(SendError::Closed) => {
validate_tx_update_stream_state(state, StateTransitions::CloseRecv)
}
Err(SendError::Full) => {
validate_tx_update_stream_state(state, StateTransitions::AddFailure)
}
}
}
#[proptest]
fn test_send(
#[strategy(tx_update_strategy())] update: TxUpdate,
#[strategy(senders_strategy_any())] senders: HashMap<
Bytes32,
Vec<Sender<(), MockSendStatus>>,
>,
) {
test_send_inner(update, senders);
}
#[test]
fn test_send_reg() {
use State::*;
let update = TxUpdate {
tx_id: Bytes32::from([2; 32]),
message: TxStatusMessage::Status(TransactionStatus::Success {
block_height: Default::default(),
time: Tai64(0),
result: None,
receipts: vec![],
total_gas: 0,
total_fee: 0,
}),
};
test_send_inner(
update,
construct_senders(&[(
2,
&[
SenderData::closed(Success(
TransactionStatus::Submitted { time: Tai64(0) },
TransactionStatus::Submitted { time: Tai64(0) },
)),
SenderData::ok(Initial(TransactionStatus::Submitted { time: Tai64(0) })),
],
)]),
);
}
fn test_send_inner(
msg: TxUpdate,
senders: HashMap<Bytes32, Vec<Sender<(), MockSendStatus>>>,
) {
let before = senders.get(&msg.tx_id).map(|senders| {
senders
.iter()
.map(|sender| {
let tx = if sender.tx.is_full() {
Err(SendError::Full)
} else if sender.tx.is_closed() {
Err(SendError::Closed)
} else {
Ok(())
};
(tx, sender.stream.state().clone())
})
.filter(|(_, state)| {
!matches!(
state,
State::Closed | State::EarlySuccess(_) | State::Failed
)
})
.collect::<Vec<_>>()
});
let update = UpdateSender {
senders: Arc::new(Mutex::new(box_senders(senders))),
permits: Arc::new(()),
ttl: Duration::from_secs(5),
};
update.send(msg.clone());
if let Some(before) = before {
let lock = update.senders.lock();
if let Some(senders) = lock.get(&msg.tx_id) {
let mut i = 0;
for (tx, state) in before {
let new_state = validate_send(tx, state, msg.message.clone());
if matches!(new_state, State::Closed) {
continue
}
assert_eq!(*senders[i].stream.state(), new_state);
i = i.saturating_add(1);
}
}
}
}