#![allow(clippy::unwrap_in_result)]
use std::{env, fmt, sync::Arc};
use proptest::{collection::vec, prelude::*};
use proptest_derive::Arbitrary;
use chrono::Duration;
use tokio::time;
use tower::{buffer::Buffer, util::BoxService};
use zebra_chain::{
block::{self, Block},
fmt::{DisplayToDebug, TypeNameToDebug},
parameters::{Network, NetworkUpgrade},
serialization::ZcashDeserializeInto,
transaction::VerifiedUnminedTx,
};
use zebra_consensus::{error::TransactionError, transaction as tx};
use zebra_network as zn;
use zebra_state::{self as zs, ChainTipBlock, ChainTipSender};
use zebra_test::mock_service::{MockService, PropTestAssertion};
use zs::CheckpointVerifiedBlock;
use crate::components::{
mempool::tests::standard_verified_unmined_tx_strategy,
mempool::{config::Config, Mempool},
sync::{RecentSyncLengths, SyncStatus},
};
type MockPeerSet = MockService<zn::Request, zn::Response, PropTestAssertion>;
type MockState = MockService<zs::Request, zs::Response, PropTestAssertion>;
type MockTxVerifier = MockService<tx::Request, tx::Response, PropTestAssertion, TransactionError>;
const CHAIN_LENGTH: usize = 5;
const DEFAULT_MEMPOOL_PROPTEST_CASES: u32 = 8;
fn standard_verified_unmined_tx_display_strategy(
) -> BoxedStrategy<DisplayToDebug<VerifiedUnminedTx>> {
standard_verified_unmined_tx_strategy()
.prop_map(DisplayToDebug)
.boxed()
}
proptest! {
#![proptest_config(proptest::test_runner::Config::with_cases(env::var("PROPTEST_CASES")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(DEFAULT_MEMPOOL_PROPTEST_CASES)))]
#[test]
fn storage_is_cleared_on_single_chain_reset(
network in any::<Network>(),
transaction in standard_verified_unmined_tx_display_strategy(),
chain_tip in any::<DisplayToDebug<ChainTipBlock>>(),
) {
let (runtime, _init_guard) = zebra_test::init_async();
runtime.block_on(async move {
let (
mut mempool,
_peer_set,
_state_service,
_tx_verifier,
mut recent_syncs,
mut chain_tip_sender,
) = setup(&network);
time::pause();
mempool.enable(&mut recent_syncs).await;
mempool
.storage()
.insert(transaction.0, Vec::new(), None)
.expect("Inserting a transaction should succeed");
mempool.dummy_call().await;
prop_assert_eq!(mempool.storage().transaction_count(), 1);
chain_tip_sender.set_finalized_tip(chain_tip.0);
mempool.dummy_call().await;
prop_assert_eq!(mempool.storage().transaction_count(), 0);
Ok(())
})?;
}
#[test]
fn storage_is_cleared_on_multiple_chain_resets(
network in any::<Network>(),
mut previous_chain_tip in any::<DisplayToDebug<ChainTipBlock>>(),
mut transactions in vec(standard_verified_unmined_tx_display_strategy(), 0..CHAIN_LENGTH),
fake_chain_tips in vec(any::<TypeNameToDebug<FakeChainTip>>(), 0..CHAIN_LENGTH),
) {
let (runtime, _init_guard) = zebra_test::init_async();
runtime.block_on(async move {
let (
mut mempool,
_peer_set,
_state_service,
_tx_verifier,
mut recent_syncs,
mut chain_tip_sender,
) = setup(&network);
time::pause();
mempool.enable(&mut recent_syncs).await;
chain_tip_sender.set_best_non_finalized_tip(previous_chain_tip.0.clone());
mempool.dummy_call().await;
for (fake_chain_tip, transaction) in fake_chain_tips.iter().zip(transactions.iter_mut()) {
let chain_tip = fake_chain_tip.to_chain_tip_block(&previous_chain_tip, &network);
if let Some(expiry_height) = transaction.transaction.transaction.expiry_height() {
if chain_tip.height >= expiry_height {
let mut tmp_tx = (*transaction.transaction.transaction).clone();
*tmp_tx.expiry_height_mut() = block::Height(chain_tip.height.0 + 1);
transaction.transaction = tmp_tx.into();
}
}
mempool
.storage()
.insert(transaction.0.clone(), Vec::new(), None)
.expect("Inserting a transaction should succeed");
chain_tip_sender.set_best_non_finalized_tip(chain_tip.clone());
mempool.dummy_call().await;
match fake_chain_tip.0 {
FakeChainTip::Grow(_) => {
prop_assert_ne!(mempool.storage().transaction_count(), 0);
}
FakeChainTip::Reset(_) => {
prop_assert_eq!(mempool.storage().transaction_count(), 0);
},
}
previous_chain_tip = chain_tip.into();
}
Ok(())
})?;
}
#[test]
fn storage_is_cleared_if_syncer_falls_behind(
network in any::<Network>(),
transaction in standard_verified_unmined_tx_strategy(),
) {
let (runtime, _init_guard) = zebra_test::init_async();
runtime.block_on(async move {
let (
mut mempool,
mut peer_set,
mut state_service,
mut tx_verifier,
mut recent_syncs,
mut chain_tip_sender,
) = setup(&network);
time::pause();
mempool.enable(&mut recent_syncs).await;
mempool
.storage()
.insert(transaction, Vec::new(), None)
.expect("Inserting a transaction should succeed");
mempool.dummy_call().await;
prop_assert_eq!(mempool.storage().transaction_count(), 1);
mempool.disable(&mut recent_syncs).await;
mempool.dummy_call().await;
chain_tip_sender.set_finalized_tip(block1_chain_tip());
mempool.enable(&mut recent_syncs).await;
prop_assert_eq!(mempool.storage().transaction_count(), 0);
peer_set.expect_no_requests().await?;
state_service.expect_no_requests().await?;
tx_verifier.expect_no_requests().await?;
Ok(())
})?;
}
}
fn genesis_chain_tip() -> Option<ChainTipBlock> {
zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES
.zcash_deserialize_into::<Arc<Block>>()
.map(CheckpointVerifiedBlock::from)
.map(ChainTipBlock::from)
.ok()
}
fn block1_chain_tip() -> Option<ChainTipBlock> {
zebra_test::vectors::BLOCK_MAINNET_1_BYTES
.zcash_deserialize_into::<Arc<Block>>()
.map(CheckpointVerifiedBlock::from)
.map(ChainTipBlock::from)
.ok()
}
fn setup(
network: &Network,
) -> (
Mempool,
MockPeerSet,
MockState,
MockTxVerifier,
RecentSyncLengths,
ChainTipSender,
) {
let peer_set = MockService::build().for_prop_tests();
let state_service = MockService::build().for_prop_tests();
let tx_verifier = MockService::build().for_prop_tests();
let (sync_status, recent_syncs) = SyncStatus::new();
let (mut chain_tip_sender, latest_chain_tip, chain_tip_change) =
ChainTipSender::new(None, network);
let (misbehavior_tx, _misbehavior_rx) = tokio::sync::mpsc::channel(1);
let (mempool, mempool_transaction_subscriber) = Mempool::new(
&Config {
tx_cost_limit: 160_000_000,
..Default::default()
},
Buffer::new(BoxService::new(peer_set.clone()), 1),
Buffer::new(BoxService::new(state_service.clone()), 1),
Buffer::new(BoxService::new(tx_verifier.clone()), 1),
sync_status,
latest_chain_tip,
chain_tip_change,
misbehavior_tx,
);
let mut transaction_receiver = mempool_transaction_subscriber.subscribe();
tokio::spawn(async move { while transaction_receiver.recv().await.is_ok() {} });
chain_tip_sender.set_finalized_tip(genesis_chain_tip());
(
mempool,
peer_set,
state_service,
tx_verifier,
recent_syncs,
chain_tip_sender,
)
}
#[derive(Arbitrary, Clone, Debug, Eq, PartialEq)]
enum FakeChainTip {
Grow(ChainTipBlock),
Reset(ChainTipBlock),
}
impl fmt::Display for FakeChainTip {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let (mut f, inner) = match self {
FakeChainTip::Grow(inner) => (f.debug_tuple("FakeChainTip::Grow"), inner),
FakeChainTip::Reset(inner) => (f.debug_tuple("FakeChainTip::Reset"), inner),
};
f.field(&inner).finish()
}
}
impl FakeChainTip {
fn to_chain_tip_block(&self, previous: &ChainTipBlock, network: &Network) -> ChainTipBlock {
match self {
Self::Grow(chain_tip_block) => {
let height = block::Height(previous.height.0 + 1);
let target_spacing = NetworkUpgrade::target_spacing_for_height(network, height);
let mock_block_time_delta = Duration::seconds(
previous.time.timestamp() % (2 * target_spacing.num_seconds()),
);
ChainTipBlock {
hash: chain_tip_block.hash,
height,
time: previous.time + mock_block_time_delta,
transactions: chain_tip_block.transactions.clone(),
transaction_hashes: chain_tip_block.transaction_hashes.clone(),
previous_block_hash: previous.hash,
}
}
Self::Reset(chain_tip_block) => chain_tip_block.clone(),
}
}
}