#![allow(clippy::unwrap_in_result)]
use std::{sync::Arc, time::Duration};
use color_eyre::Report;
use tokio::time::{self, timeout};
use tower::{ServiceBuilder, ServiceExt};
use rand::{seq::SliceRandom, thread_rng};
use zebra_chain::{
amount::Amount,
block::Block,
fmt::humantime_seconds,
parameters::Network,
serialization::ZcashDeserializeInto,
transaction::{Transaction, VerifiedUnminedTx},
transparent::{self, OutPoint},
};
use zebra_consensus::transaction as tx;
use zebra_state::{Config as StateConfig, CHAIN_TIP_UPDATE_WAIT_LIMIT};
use zebra_test::mock_service::{MockService, PanicAssertion};
use crate::components::{
mempool::{self, *},
sync::RecentSyncLengths,
};
type MockPeerSet = MockService<zn::Request, zn::Response, PanicAssertion>;
type StateService = Buffer<BoxService<zs::Request, zs::Response, zs::BoxError>, zs::Request>;
type MockTxVerifier = MockService<tx::Request, tx::Response, PanicAssertion, TransactionError>;
#[tokio::test]
async fn mempool_service_basic() -> Result<(), Report> {
for _ in 0..10 {
mempool_service_basic_single().await?;
}
Ok(())
}
async fn mempool_service_basic_single() -> Result<(), Report> {
let network = Network::Mainnet;
let mut unmined_transactions = network.unmined_transactions_in_blocks(1..=10);
let genesis_transaction = unmined_transactions
.next()
.expect("Missing genesis transaction");
let last_transaction = unmined_transactions.next_back().unwrap();
let more_transactions = unmined_transactions.collect::<Vec<_>>();
let cost_limit = more_transactions.iter().map(|tx| tx.cost()).sum();
let (
mut service,
_peer_set,
_state_service,
_chain_tip_change,
_tx_verifier,
mut recent_syncs,
_mempool_transaction_receiver,
) = setup(&network, cost_limit, true).await;
service.enable(&mut recent_syncs).await;
let mut inserted_ids = HashSet::new();
service
.storage()
.insert(genesis_transaction.clone(), Vec::new(), None)?;
inserted_ids.insert(genesis_transaction.transaction.id);
let response = service
.ready()
.await
.unwrap()
.call(Request::TransactionIds)
.await
.unwrap();
let genesis_transaction_ids = match response {
Response::TransactionIds(ids) => ids,
_ => unreachable!("will never happen in this test"),
};
let genesis_transactions_hash_set = genesis_transaction_ids
.iter()
.copied()
.collect::<HashSet<_>>();
let response = service
.ready()
.await
.unwrap()
.call(Request::TransactionsById(
genesis_transactions_hash_set.clone(),
))
.await
.unwrap();
let transactions = match response {
Response::Transactions(transactions) => transactions,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(genesis_transaction.transaction, transactions[0]);
let genesis_transactions_mined_hash_set = genesis_transaction_ids
.iter()
.map(|txid| txid.mined_id())
.collect::<HashSet<_>>();
let response = service
.ready()
.await
.unwrap()
.call(Request::TransactionsByMinedId(
genesis_transactions_mined_hash_set,
))
.await
.unwrap();
let transactions = match response {
Response::Transactions(transactions) => transactions,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(genesis_transaction.transaction, transactions[0]);
for tx in more_transactions {
inserted_ids.insert(tx.transaction.id);
let _ = service.storage().insert(tx.clone(), Vec::new(), None);
}
let response = service
.ready()
.await
.unwrap()
.call(Request::RejectedTransactionIds(
genesis_transactions_hash_set,
))
.await
.unwrap();
let rejected_ids = match response {
Response::RejectedTransactionIds(ids) => ids,
_ => unreachable!("will never happen in this test"),
};
assert!(rejected_ids.is_subset(&inserted_ids));
let response = service
.ready()
.await
.unwrap()
.call(Request::Queue(vec![last_transaction.transaction.id.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(service.tx_downloads().in_flight(), 1);
let response = service
.ready()
.await
.unwrap()
.call(Request::QueueStats)
.await
.unwrap();
let (actual_size, actual_bytes, actual_usage) = match response {
Response::QueueStats {
size,
bytes,
usage,
fully_notified: None,
} => (size, bytes, usage),
_ => unreachable!("expected QueueStats response"),
};
let expected_size = service.storage().transaction_count();
let expected_bytes: usize = service
.storage()
.transactions()
.values()
.map(|tx| tx.transaction.size)
.sum();
let expected_usage = expected_bytes;
assert_eq!(actual_size, expected_size, "QueueStats size mismatch");
assert_eq!(actual_bytes, expected_bytes, "QueueStats bytes mismatch");
assert_eq!(actual_usage, expected_usage, "QueueStats usage mismatch");
Ok(())
}
#[tokio::test]
async fn mempool_queue() -> Result<(), Report> {
for _ in 0..10 {
mempool_queue_single().await?;
}
Ok(())
}
async fn mempool_queue_single() -> Result<(), Report> {
let network = Network::Mainnet;
let unmined_transactions = network.unmined_transactions_in_blocks(1..=10);
let mut transactions = unmined_transactions.collect::<Vec<_>>();
let new_tx = transactions.pop().unwrap();
let cost_limit = transactions
.iter()
.take(transactions.len() - 1)
.map(|tx| tx.cost())
.sum();
let (
mut service,
_peer_set,
_state_service,
_chain_tip_change,
_tx_verifier,
mut recent_syncs,
_mempool_transaction_receiver,
) = setup(&network, cost_limit, true).await;
service.enable(&mut recent_syncs).await;
for tx in transactions.iter() {
let _ = service.storage().insert(tx.clone(), Vec::new(), None);
}
let response = service
.ready()
.await
.unwrap()
.call(Request::Queue(vec![new_tx.transaction.id.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
let response = service
.ready()
.await
.unwrap()
.call(Request::Queue(
transactions
.iter()
.map(|tx| tx.transaction.id.into())
.collect(),
))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), transactions.len());
let mut in_mempool_count = 0;
let mut evicted_count = 0;
for response in queued_responses {
match response.unbox_mempool_error() {
MempoolError::StorageEffectsChain(SameEffectsChainRejectionError::RandomlyEvicted) => {
evicted_count += 1
}
MempoolError::InMempool => in_mempool_count += 1,
error => panic!("transaction should not be rejected with reason {error:?}"),
}
}
assert_eq!(in_mempool_count, transactions.len() - 1);
assert_eq!(evicted_count, 1);
Ok(())
}
#[tokio::test]
async fn mempool_service_disabled() -> Result<(), Report> {
let network = Network::Mainnet;
let (
mut service,
_peer_set,
_state_service,
_chain_tip_change,
_tx_verifier,
mut recent_syncs,
_mempool_transaction_receiver,
) = setup(&network, u64::MAX, true).await;
let mut unmined_transactions = network.unmined_transactions_in_blocks(1..=10);
let genesis_transaction = unmined_transactions
.next()
.expect("Missing genesis transaction");
let more_transactions = unmined_transactions;
assert!(!service.is_enabled());
service.enable(&mut recent_syncs).await;
assert!(service.is_enabled());
service
.storage()
.insert(genesis_transaction.clone(), Vec::new(), None)?;
let response = service
.ready()
.await
.unwrap()
.call(Request::TransactionIds)
.await
.unwrap();
let _genesis_transaction_ids = match response {
Response::TransactionIds(ids) => ids,
_ => unreachable!("will never happen in this test"),
};
let txid = more_transactions.last().unwrap().transaction.id;
let response = service
.ready()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(service.tx_downloads().in_flight(), 1);
service.disable(&mut recent_syncs).await;
assert!(!service.is_enabled());
let response = service
.ready()
.await
.unwrap()
.call(Request::TransactionIds)
.await
.unwrap();
match response {
Response::TransactionIds(ids) => {
assert_eq!(
ids.len(),
0,
"mempool should return no transactions when disabled"
)
}
_ => unreachable!("will never happen in this test"),
};
let response = service
.ready()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert_eq!(
queued_responses
.into_iter()
.next()
.unwrap()
.unbox_mempool_error(),
MempoolError::Disabled
);
let response = service
.ready()
.await
.unwrap()
.call(Request::QueueStats)
.await
.unwrap();
let (size, bytes, usage, fully_notified) = match response {
Response::QueueStats {
size,
bytes,
usage,
fully_notified,
} => (size, bytes, usage, fully_notified),
_ => unreachable!("expected QueueStats response"),
};
assert_eq!(size, 0, "size should be zero when mempool is disabled");
assert_eq!(bytes, 0, "bytes should be zero when mempool is disabled");
assert_eq!(usage, 0, "usage should be zero when mempool is disabled");
assert_eq!(
fully_notified, None,
"fully_notified should be None when mempool is disabled"
);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn mempool_cancel_mined() -> Result<(), Report> {
let block1: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_1_BYTES
.zcash_deserialize_into()
.unwrap();
let block2: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_2_BYTES
.zcash_deserialize_into()
.unwrap();
let network = Network::Mainnet;
let (
mut mempool,
_peer_set,
mut state_service,
mut chain_tip_change,
_tx_verifier,
mut recent_syncs,
mut mempool_transaction_receiver,
) = setup(&network, u64::MAX, true).await;
mempool.enable(&mut recent_syncs).await;
assert!(mempool.is_enabled());
mempool.dummy_call().await;
state_service
.ready()
.await
.unwrap()
.call(zebra_state::Request::CommitCheckpointVerifiedBlock(
block1.clone().into(),
))
.await
.unwrap();
if let Err(timeout_error) = timeout(
CHAIN_TIP_UPDATE_WAIT_LIMIT,
chain_tip_change.wait_for_tip_change(),
)
.await
.map(|change_result| change_result.expect("unexpected chain tip update failure"))
{
info!(
timeout = ?humantime_seconds(CHAIN_TIP_UPDATE_WAIT_LIMIT),
?timeout_error,
"timeout waiting for chain tip change after committing block"
);
}
mempool.dummy_call().await;
let txid = block2.transactions[0].unmined_id();
let response = mempool
.ready()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let mut queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
let queued_response = queued_responses
.pop()
.expect("already checked that there is exactly 1 item in Vec")
.expect("initial queue checks result should be Ok");
assert_eq!(mempool.tx_downloads().in_flight(), 1);
state_service
.oneshot(zebra_state::Request::CommitCheckpointVerifiedBlock(
block2.clone().into(),
))
.await
.unwrap();
if let Err(timeout_error) = timeout(
CHAIN_TIP_UPDATE_WAIT_LIMIT,
chain_tip_change.wait_for_tip_change(),
)
.await
.map(|change_result| change_result.expect("unexpected chain tip update failure"))
{
info!(
timeout = ?humantime_seconds(CHAIN_TIP_UPDATE_WAIT_LIMIT),
?timeout_error,
"timeout waiting for chain tip change after committing block"
);
}
for _ in 0..2 {
mempool.dummy_call().await;
time::sleep(time::Duration::from_millis(100)).await;
}
assert_eq!(mempool.tx_downloads().in_flight(), 0);
assert!(
queued_response
.await
.expect("channel should not be closed")
.is_err(),
"queued tx should fail to download and verify due to chain tip change"
);
let mempool_change = timeout(Duration::from_secs(3), mempool_transaction_receiver.recv())
.await
.expect("should not timeout")
.expect("recv should return Ok");
assert_eq!(
mempool_change,
MempoolChange::invalidated([txid].into_iter().collect())
);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn mempool_cancel_downloads_after_network_upgrade() -> Result<(), Report> {
let block1: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_1_BYTES
.zcash_deserialize_into()
.unwrap();
let block2: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_2_BYTES
.zcash_deserialize_into()
.unwrap();
let network = Network::Mainnet;
let (
mut mempool,
mut peer_set,
mut state_service,
mut chain_tip_change,
_tx_verifier,
mut recent_syncs,
_mempool_transaction_receiver,
) = setup(&network, u64::MAX, true).await;
mempool.enable(&mut recent_syncs).await;
assert!(mempool.is_enabled());
let txid = block2.transactions[0].unmined_id();
let response = mempool
.ready()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(mempool.tx_downloads().in_flight(), 1);
mempool.dummy_call().await;
state_service
.ready()
.await
.unwrap()
.call(zebra_state::Request::CommitCheckpointVerifiedBlock(
block1.clone().into(),
))
.await
.unwrap();
if let Err(timeout_error) = timeout(
CHAIN_TIP_UPDATE_WAIT_LIMIT,
chain_tip_change.wait_for_tip_change(),
)
.await
.map(|change_result| change_result.expect("unexpected chain tip update failure"))
{
info!(
timeout = ?humantime_seconds(CHAIN_TIP_UPDATE_WAIT_LIMIT),
?timeout_error,
"timeout waiting for chain tip change after committing block"
);
}
while let Some(_request) = peer_set.try_next_request().await {}
mempool.dummy_call().await;
let request = peer_set
.try_next_request()
.await
.expect("unexpected missing mempool retry");
assert_eq!(
request.request(),
&zebra_network::Request::TransactionsById(iter::once(txid).collect()),
);
assert_eq!(mempool.tx_downloads().in_flight(), 1);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn mempool_failed_verification_is_rejected() -> Result<(), Report> {
let network = Network::Mainnet;
let (
mut mempool,
_peer_set,
_state_service,
_chain_tip_change,
mut tx_verifier,
mut recent_syncs,
mut mempool_transaction_receiver,
) = setup(&network, u64::MAX, true).await;
let mut unmined_transactions = network.unmined_transactions_in_blocks(1..=2);
let rejected_tx = unmined_transactions.next().unwrap().clone();
mempool.enable(&mut recent_syncs).await;
let request = mempool
.ready()
.await
.unwrap()
.call(Request::Queue(vec![rejected_tx.transaction.clone().into()]));
let verification = tx_verifier.expect_request_that(|_| true).map(|responder| {
responder.respond(Err(TransactionError::BadBalance));
});
let (response, _) = futures::join!(request, verification);
let queued_responses = match response.unwrap() {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
for _ in 0..2 {
mempool.dummy_call().await;
time::sleep(time::Duration::from_millis(100)).await;
}
let response = mempool
.ready()
.await
.unwrap()
.call(Request::Queue(vec![rejected_tx.transaction.id.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(matches!(
queued_responses
.into_iter()
.next()
.unwrap()
.unbox_mempool_error(),
MempoolError::StorageExactTip(ExactTipRejectionError::FailedVerification(_))
));
let mempool_change = timeout(Duration::from_secs(3), mempool_transaction_receiver.recv())
.await
.expect("should not timeout")
.expect("recv should return Ok");
assert_eq!(
mempool_change,
MempoolChange::invalidated([rejected_tx.transaction.id].into_iter().collect())
);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn mempool_failed_download_is_not_rejected() -> Result<(), Report> {
let network = Network::Mainnet;
let (
mut mempool,
mut peer_set,
_state_service,
_chain_tip_change,
_tx_verifier,
mut recent_syncs,
mut mempool_transaction_receiver,
) = setup(&network, u64::MAX, true).await;
let mut unmined_transactions = network.unmined_transactions_in_blocks(1..=2);
let rejected_valid_tx = unmined_transactions.next().unwrap().clone();
mempool.enable(&mut recent_syncs).await;
let request = mempool
.ready()
.await
.unwrap()
.call(Request::Queue(vec![rejected_valid_tx
.transaction
.id
.into()]));
let verification = peer_set
.expect_request_that(|r| matches!(r, zn::Request::TransactionsById(_)))
.map(|responder| {
responder.respond(zn::Response::Transactions(vec![]));
});
let (response, _) = futures::join!(request, verification);
let queued_responses = match response.unwrap() {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
for _ in 0..2 {
mempool.dummy_call().await;
time::sleep(time::Duration::from_millis(100)).await;
}
let response = mempool
.ready()
.await
.unwrap()
.call(Request::Queue(vec![rejected_valid_tx
.transaction
.id
.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
let mempool_change = timeout(Duration::from_secs(3), mempool_transaction_receiver.recv())
.await
.expect("should not timeout")
.expect("recv should return Ok");
assert_eq!(
mempool_change,
MempoolChange::invalidated([rejected_valid_tx.transaction.id].into_iter().collect())
);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn mempool_reverifies_after_tip_change() -> Result<(), Report> {
let network = Network::Mainnet;
let block1: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_1_BYTES
.zcash_deserialize_into()
.unwrap();
let block2: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_2_BYTES
.zcash_deserialize_into()
.unwrap();
let block3: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_3_BYTES
.zcash_deserialize_into()
.unwrap();
let (
mut mempool,
mut peer_set,
mut state_service,
mut chain_tip_change,
mut tx_verifier,
mut recent_syncs,
_mempool_transaction_receiver,
) = setup(&network, u64::MAX, true).await;
mempool.enable(&mut recent_syncs).await;
assert!(mempool.is_enabled());
let tx = block3.transactions[0].clone();
let txid = block3.transactions[0].unmined_id();
let response = mempool
.ready()
.await
.unwrap()
.call(Request::Queue(vec![txid.into()]))
.await
.unwrap();
let queued_responses = match response {
Response::Queued(queue_responses) => queue_responses,
_ => unreachable!("will never happen in this test"),
};
assert_eq!(queued_responses.len(), 1);
assert!(queued_responses[0].is_ok());
assert_eq!(mempool.tx_downloads().in_flight(), 1);
peer_set
.expect_request_that(|req| matches!(req, zn::Request::TransactionsById(_)))
.map(|responder| {
responder.respond(zn::Response::Transactions(vec![
zn::InventoryResponse::Available((tx.clone().into(), None)),
]));
})
.await;
tx_verifier
.expect_request_that(|_| true)
.map(|responder| {
let transaction = responder
.request()
.clone()
.mempool_transaction()
.expect("unexpected non-mempool request");
responder.respond(transaction::Response::from(
VerifiedUnminedTx::new(
transaction,
Amount::try_from(1_000_000).expect("invalid value"),
0,
std::sync::Arc::new(vec![]),
)
.expect("verification should pass"),
));
})
.await;
state_service
.ready()
.await
.unwrap()
.call(zebra_state::Request::CommitCheckpointVerifiedBlock(
block1.clone().into(),
))
.await
.unwrap();
chain_tip_change
.wait_for_tip_change()
.await
.expect("unexpected chain tip update failure");
mempool.dummy_call().await;
assert_eq!(mempool.tx_downloads().in_flight(), 1);
assert_eq!(mempool.storage().transaction_count(), 0);
peer_set
.expect_request_that(|req| matches!(req, zn::Request::TransactionsById(_)))
.map(|responder| {
responder.respond(zn::Response::Transactions(vec![
zn::InventoryResponse::Available((tx.into(), None)),
]));
})
.await;
tx_verifier
.expect_request_that(|_| true)
.map(|responder| {
let transaction = responder
.request()
.clone()
.mempool_transaction()
.expect("unexpected non-mempool request");
responder.respond(transaction::Response::from(
VerifiedUnminedTx::new(
transaction,
Amount::try_from(1_000_000).expect("invalid value"),
0,
std::sync::Arc::new(vec![]),
)
.expect("verification should pass"),
));
})
.await;
state_service
.ready()
.await
.unwrap()
.call(zebra_state::Request::CommitCheckpointVerifiedBlock(
block2.clone().into(),
))
.await
.unwrap();
chain_tip_change
.wait_for_tip_change()
.await
.expect("unexpected chain tip update failure");
mempool.dummy_call().await;
assert_eq!(mempool.tx_downloads().in_flight(), 1);
assert_eq!(mempool.storage().transaction_count(), 0);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn mempool_responds_to_await_output() -> Result<(), Report> {
let network = Network::Mainnet;
let (
mut mempool,
_peer_set,
_state_service,
_chain_tip_change,
mut tx_verifier,
mut recent_syncs,
mut mempool_transaction_receiver,
) = setup(&network, u64::MAX, true).await;
mempool.enable(&mut recent_syncs).await;
let verified_unmined_tx = network
.unmined_transactions_in_blocks(1..=10)
.find(|tx| !tx.transaction.transaction.outputs().is_empty())
.expect("should have at least 1 tx with transparent outputs");
let unmined_tx = verified_unmined_tx.transaction.clone();
let unmined_tx_id = unmined_tx.id;
let output_index = 0;
let outpoint = OutPoint::from_usize(unmined_tx.id.mined_id(), output_index);
let expected_output = unmined_tx
.transaction
.outputs()
.get(output_index)
.expect("already checked that tx has outputs")
.clone();
let request = Request::AwaitOutput(outpoint);
let await_output_response_fut = mempool.ready().await.unwrap().call(request);
let request = Request::Queue(vec![Gossip::Tx(unmined_tx)]);
let queue_response_fut = mempool.ready().await.unwrap().call(request);
let mock_verify_tx_fut = tx_verifier.expect_request_that(|_| true).map(|responder| {
responder.respond(transaction::Response::Mempool {
transaction: verified_unmined_tx,
spent_mempool_outpoints: Vec::new(),
});
});
let (response, _) = futures::join!(queue_response_fut, mock_verify_tx_fut);
let Response::Queued(mut results) = response.expect("response should be Ok") else {
panic!("wrong response from mempool to Queued request");
};
let result_rx = results.remove(0).expect("should pass initial checks");
assert!(results.is_empty(), "should have 1 result for 1 queued tx");
tokio::time::sleep(Duration::from_secs(1)).await;
mempool
.ready()
.await
.expect("polling mempool should succeed");
tokio::time::timeout(Duration::from_secs(10), result_rx)
.await
.expect("should not time out")
.expect("mempool tx verification result channel should not be closed")
.expect("mocked verification should be successful");
assert_eq!(
mempool.storage().transaction_count(),
1,
"should have 1 transaction in mempool's verified set"
);
assert_eq!(
mempool.storage().created_output(&outpoint),
Some(expected_output.clone()),
"created output should match expected output"
);
let response_fut = tokio::time::timeout(Duration::from_secs(30), await_output_response_fut);
let response = response_fut
.await
.expect("should not time out")
.expect("should not return RecvError");
let Response::UnspentOutput(response) = response else {
panic!("wrong response from mempool to AwaitOutput request");
};
assert_eq!(
response, expected_output,
"AwaitOutput response should match expected output"
);
let request = Request::AwaitOutput(outpoint);
let await_output_response_fut = mempool.ready().await.unwrap().call(request);
let response_fut = tokio::time::timeout(Duration::from_secs(30), await_output_response_fut);
let response = response_fut
.await
.expect("should not time out")
.expect("should not return RecvError");
let Response::UnspentOutput(response) = response else {
panic!("wrong response from mempool to AwaitOutput request");
};
assert_eq!(
response, expected_output,
"AwaitOutput response should match expected output"
);
let mempool_change = timeout(Duration::from_secs(3), mempool_transaction_receiver.recv())
.await
.expect("should not timeout")
.expect("recv should return Ok");
assert_eq!(
mempool_change,
MempoolChange::added([unmined_tx_id].into_iter().collect())
);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn mempool_reject_non_standard() -> Result<(), Report> {
let network = Network::Mainnet;
let unmined_transactions = network.unmined_transactions_in_blocks(1..=10);
let transactions = unmined_transactions.collect::<Vec<_>>();
let mut rng = thread_rng();
let mut last_transaction = transactions
.choose(&mut rng)
.expect("Missing transaction")
.clone();
last_transaction.height = Some(Height(100_000));
let mut tx = last_transaction.transaction.transaction.clone();
let tx_mut = Arc::make_mut(&mut tx);
*tx_mut.outputs_mut() = vec![transparent::Output {
value: Amount::new(10), lock_script: p2pkh_script([0u8; 20]),
}];
last_transaction.transaction.transaction = tx;
let cost_limit = last_transaction.cost();
let (
mut service,
_peer_set,
_state_service,
_chain_tip_change,
_tx_verifier,
mut recent_syncs,
_mempool_transaction_receiver,
) = setup(&network, cost_limit, true).await;
service.enable(&mut recent_syncs).await;
let insert_err = service
.storage()
.insert(last_transaction.clone(), Vec::new(), None)
.expect_err("expected insert to fail for non-standard tx");
assert_eq!(
insert_err,
MempoolError::NonStandardTransaction(storage::NonStandardTransactionError::IsDust)
);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn mempool_accept_standard_op_return() -> Result<(), Report> {
let network = Network::Mainnet;
let mut last_transaction = network
.unmined_transactions_in_blocks(1..=10)
.next()
.expect("missing transaction");
last_transaction.height = Some(Height(100_000));
let mut tx = last_transaction.transaction.transaction.clone();
let tx_mut = Arc::make_mut(&mut tx);
*tx_mut.outputs_mut() = vec![transparent::Output {
value: Amount::new(0),
lock_script: op_return_script(&[0x01]),
}];
last_transaction.transaction.transaction = tx;
let cost_limit = last_transaction.cost();
let (
mut service,
_peer_set,
_state_service,
_chain_tip_change,
_tx_verifier,
mut recent_syncs,
_mempool_transaction_receiver,
) = setup(&network, cost_limit, true).await;
service.enable(&mut recent_syncs).await;
service
.storage()
.insert(last_transaction.clone(), Vec::new(), None)?;
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn mempool_reject_op_return_too_large() -> Result<(), Report> {
let network = Network::Mainnet;
let mut last_transaction = network
.unmined_transactions_in_blocks(1..=10)
.next()
.expect("missing transaction");
last_transaction.height = Some(Height(100_000));
let mut tx = last_transaction.transaction.transaction.clone();
let tx_mut = Arc::make_mut(&mut tx);
*tx_mut.outputs_mut() = vec![transparent::Output {
value: Amount::new(0),
lock_script: op_return_script(&[0x03]),
}];
last_transaction.transaction.transaction = tx;
let cost_limit = last_transaction.cost();
let mempool_config = mempool::Config {
tx_cost_limit: cost_limit,
max_datacarrier_bytes: Some(2),
..Default::default()
};
let (
mut service,
_peer_set,
_state_service,
_chain_tip_change,
_tx_verifier,
mut recent_syncs,
_mempool_transaction_receiver,
) = setup_with_mempool_config(&network, mempool_config, true).await;
service.enable(&mut recent_syncs).await;
let insert_err = service
.storage()
.insert(last_transaction.clone(), Vec::new(), None)
.expect_err("expected insert to fail for non-standard tx");
assert_eq!(
insert_err,
MempoolError::NonStandardTransaction(
storage::NonStandardTransactionError::DataCarrierTooLarge
)
);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn mempool_reject_multi_op_return() -> Result<(), Report> {
let network = Network::Mainnet;
let mut last_transaction = network
.unmined_transactions_in_blocks(1..=10)
.next()
.expect("missing transaction");
last_transaction.height = Some(Height(100_000));
let mut tx = last_transaction.transaction.transaction.clone();
let tx_mut = Arc::make_mut(&mut tx);
*tx_mut.outputs_mut() = vec![
transparent::Output {
value: Amount::new(0),
lock_script: op_return_script(&[0x04]),
},
transparent::Output {
value: Amount::new(0),
lock_script: op_return_script(&[0x05]),
},
];
last_transaction.transaction.transaction = tx;
let cost_limit = last_transaction.cost();
let (
mut service,
_peer_set,
_state_service,
_chain_tip_change,
_tx_verifier,
mut recent_syncs,
_mempool_transaction_receiver,
) = setup(&network, cost_limit, true).await;
service.enable(&mut recent_syncs).await;
let insert_err = service
.storage()
.insert(last_transaction.clone(), Vec::new(), None)
.expect_err("expected insert to fail for non-standard tx");
assert_eq!(
insert_err,
MempoolError::NonStandardTransaction(storage::NonStandardTransactionError::MultiOpReturn)
);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn mempool_reject_non_standard_scriptpubkey() -> Result<(), Report> {
let network = Network::Mainnet;
let mut last_transaction = network
.unmined_transactions_in_blocks(1..=10)
.next()
.expect("missing transaction");
last_transaction.height = Some(Height(100_000));
let mut tx = last_transaction.transaction.transaction.clone();
let tx_mut = Arc::make_mut(&mut tx);
*tx_mut.outputs_mut() = vec![transparent::Output {
value: Amount::new(1000),
lock_script: transparent::Script::new(&[0x00]),
}];
last_transaction.transaction.transaction = tx;
let cost_limit = last_transaction.cost();
let (
mut service,
_peer_set,
_state_service,
_chain_tip_change,
_tx_verifier,
mut recent_syncs,
_mempool_transaction_receiver,
) = setup(&network, cost_limit, true).await;
service.enable(&mut recent_syncs).await;
let insert_err = service
.storage()
.insert(last_transaction.clone(), Vec::new(), None)
.expect_err("expected insert to fail for non-standard tx");
assert_eq!(
insert_err,
MempoolError::NonStandardTransaction(
storage::NonStandardTransactionError::ScriptPubKeyNonStandard
)
);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn mempool_reject_bare_multisig() -> Result<(), Report> {
let network = Network::Mainnet;
let mut last_transaction = network
.unmined_transactions_in_blocks(1..=10)
.next()
.expect("missing transaction");
last_transaction.height = Some(Height(100_000));
let mut tx = last_transaction.transaction.transaction.clone();
let tx_mut = Arc::make_mut(&mut tx);
*tx_mut.outputs_mut() = vec![transparent::Output {
value: Amount::new(1000),
lock_script: multisig_script(1, 1),
}];
last_transaction.transaction.transaction = tx;
let cost_limit = last_transaction.cost();
let (
mut service,
_peer_set,
_state_service,
_chain_tip_change,
_tx_verifier,
mut recent_syncs,
_mempool_transaction_receiver,
) = setup(&network, cost_limit, true).await;
service.enable(&mut recent_syncs).await;
let insert_err = service
.storage()
.insert(last_transaction.clone(), Vec::new(), None)
.expect_err("expected insert to fail for non-standard tx");
assert_eq!(
insert_err,
MempoolError::NonStandardTransaction(storage::NonStandardTransactionError::BareMultiSig)
);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn mempool_reject_large_multisig() -> Result<(), Report> {
let network = Network::Mainnet;
let mut last_transaction = network
.unmined_transactions_in_blocks(1..=10)
.next()
.expect("missing transaction");
last_transaction.height = Some(Height(100_000));
let mut tx = last_transaction.transaction.transaction.clone();
let tx_mut = Arc::make_mut(&mut tx);
*tx_mut.outputs_mut() = vec![transparent::Output {
value: Amount::new(1000),
lock_script: multisig_script(1, 4),
}];
last_transaction.transaction.transaction = tx;
let cost_limit = last_transaction.cost();
let (
mut service,
_peer_set,
_state_service,
_chain_tip_change,
_tx_verifier,
mut recent_syncs,
_mempool_transaction_receiver,
) = setup(&network, cost_limit, true).await;
service.enable(&mut recent_syncs).await;
let insert_err = service
.storage()
.insert(last_transaction.clone(), Vec::new(), None)
.expect_err("expected insert to fail for non-standard tx");
assert_eq!(
insert_err,
MempoolError::NonStandardTransaction(
storage::NonStandardTransactionError::ScriptPubKeyNonStandard
)
);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn mempool_reject_large_scriptsig() -> Result<(), Report> {
let network = Network::Mainnet;
let mut last_transaction = pick_transaction_with_prevout(&network);
last_transaction.height = Some(Height(100_000));
let mut tx = last_transaction.transaction.transaction.clone();
let tx_mut = Arc::make_mut(&mut tx);
set_first_prevout_unlock_script(tx_mut, transparent::Script::new(&vec![0u8; 1651]));
last_transaction.transaction.transaction = tx;
let cost_limit = last_transaction.cost();
let (
mut service,
_peer_set,
_state_service,
_chain_tip_change,
_tx_verifier,
mut recent_syncs,
_mempool_transaction_receiver,
) = setup(&network, cost_limit, true).await;
service.enable(&mut recent_syncs).await;
let insert_err = service
.storage()
.insert(last_transaction.clone(), Vec::new(), None)
.expect_err("expected insert to fail for non-standard tx");
assert_eq!(
insert_err,
MempoolError::NonStandardTransaction(
storage::NonStandardTransactionError::ScriptSigTooLarge
)
);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn mempool_reject_non_push_only_scriptsig() -> Result<(), Report> {
let network = Network::Mainnet;
let mut last_transaction = pick_transaction_with_prevout(&network);
last_transaction.height = Some(Height(100_000));
let mut tx = last_transaction.transaction.transaction.clone();
let tx_mut = Arc::make_mut(&mut tx);
set_first_prevout_unlock_script(tx_mut, transparent::Script::new(&[0xac]));
last_transaction.transaction.transaction = tx;
let cost_limit = last_transaction.cost();
let (
mut service,
_peer_set,
_state_service,
_chain_tip_change,
_tx_verifier,
mut recent_syncs,
_mempool_transaction_receiver,
) = setup(&network, cost_limit, true).await;
service.enable(&mut recent_syncs).await;
let insert_err = service
.storage()
.insert(last_transaction.clone(), Vec::new(), None)
.expect_err("expected insert to fail for non-standard tx");
assert_eq!(
insert_err,
MempoolError::NonStandardTransaction(
storage::NonStandardTransactionError::ScriptSigNotPushOnly
)
);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn mempool_reject_too_many_sigops() -> Result<(), Report> {
let network = Network::Mainnet;
let mut last_transaction = network
.unmined_transactions_in_blocks(1..=10)
.next()
.expect("missing transaction");
last_transaction.height = Some(Height(100_000));
last_transaction.legacy_sigop_count = 4001;
let cost_limit = last_transaction.cost();
let (
mut service,
_peer_set,
_state_service,
_chain_tip_change,
_tx_verifier,
mut recent_syncs,
_mempool_transaction_receiver,
) = setup(&network, cost_limit, true).await;
service.enable(&mut recent_syncs).await;
let insert_err = service
.storage()
.insert(last_transaction.clone(), Vec::new(), None)
.expect_err("expected insert to fail for too many sigops");
assert_eq!(
insert_err,
MempoolError::NonStandardTransaction(storage::NonStandardTransactionError::TooManySigops)
);
Ok(())
}
#[tokio::test(flavor = "multi_thread")]
async fn mempool_reject_non_standard_inputs() -> Result<(), Report> {
let network = Network::Mainnet;
let mut last_transaction = pick_transaction_with_prevout(&network);
last_transaction.height = Some(Height(100_000));
let non_standard_script = transparent::Script::new(&[0x51, 0x52, 0x93]);
let non_standard_output = transparent::Output {
value: 0u64.try_into().unwrap(),
lock_script: non_standard_script,
};
let input_count = last_transaction.transaction.transaction.inputs().len();
last_transaction.spent_outputs = std::sync::Arc::new(vec![non_standard_output; input_count]);
let cost_limit = last_transaction.cost();
let (
mut service,
_peer_set,
_state_service,
_chain_tip_change,
_tx_verifier,
mut recent_syncs,
_mempool_transaction_receiver,
) = setup(&network, cost_limit, true).await;
service.enable(&mut recent_syncs).await;
let insert_err = service
.storage()
.insert(last_transaction.clone(), Vec::new(), None)
.expect_err("expected insert to fail for non-standard inputs");
assert_eq!(
insert_err,
MempoolError::NonStandardTransaction(
storage::NonStandardTransactionError::NonStandardInputs
)
);
Ok(())
}
fn op_return_script(data: &[u8]) -> transparent::Script {
assert!(data.len() <= 75, "test helper only supports small pushdata");
let mut bytes = Vec::with_capacity(2 + data.len());
bytes.push(0x6a);
bytes.push(data.len() as u8);
bytes.extend_from_slice(data);
transparent::Script::new(&bytes)
}
fn multisig_script(required: u8, key_count: usize) -> transparent::Script {
assert!(required >= 1 && required <= key_count as u8);
assert!(key_count <= 16);
let mut bytes = Vec::new();
bytes.push(op_n(required));
for i in 0..key_count {
bytes.push(33u8);
let mut pubkey = vec![0u8; 33];
pubkey[0] = 0x02;
pubkey[1] = i as u8;
bytes.extend_from_slice(&pubkey);
}
bytes.push(op_n(key_count as u8));
bytes.push(0xae);
transparent::Script::new(&bytes)
}
fn p2pkh_script(pubkey_hash: [u8; 20]) -> transparent::Script {
let mut bytes = Vec::with_capacity(25);
bytes.push(0x76);
bytes.push(0xa9);
bytes.push(20);
bytes.extend_from_slice(&pubkey_hash);
bytes.push(0x88);
bytes.push(0xac);
transparent::Script::new(&bytes)
}
fn op_n(n: u8) -> u8 {
if n == 0 {
0x00
} else {
0x50 + n
}
}
fn set_first_prevout_unlock_script(tx: &mut Transaction, script: transparent::Script) {
for input in tx.inputs_mut() {
if let transparent::Input::PrevOut { unlock_script, .. } = input {
*unlock_script = script;
return;
}
}
panic!("missing prevout input");
}
fn pick_transaction_with_prevout(network: &Network) -> VerifiedUnminedTx {
network
.unmined_transactions_in_blocks(..)
.find(|transaction| {
transaction
.transaction
.transaction
.inputs()
.iter()
.any(|input| matches!(input, transparent::Input::PrevOut { .. }))
})
.expect("missing non-coinbase transaction")
}
async fn setup(
network: &Network,
tx_cost_limit: u64,
should_commit_genesis_block: bool,
) -> (
Mempool,
MockPeerSet,
StateService,
ChainTipChange,
MockTxVerifier,
RecentSyncLengths,
tokio::sync::broadcast::Receiver<MempoolChange>,
) {
let mempool_config = mempool::Config {
tx_cost_limit,
..Default::default()
};
setup_with_mempool_config(network, mempool_config, should_commit_genesis_block).await
}
async fn setup_with_mempool_config(
network: &Network,
mempool_config: mempool::Config,
should_commit_genesis_block: bool,
) -> (
Mempool,
MockPeerSet,
StateService,
ChainTipChange,
MockTxVerifier,
RecentSyncLengths,
tokio::sync::broadcast::Receiver<MempoolChange>,
) {
let peer_set = MockService::build().for_unit_tests();
let state_config = StateConfig::ephemeral();
let (state, _read_only_state_service, latest_chain_tip, mut chain_tip_change) =
zebra_state::init(state_config, network, Height::MAX, 0).await;
let mut state_service = ServiceBuilder::new().buffer(10).service(state);
let tx_verifier = MockService::build().for_unit_tests();
let (sync_status, recent_syncs) = SyncStatus::new();
let (misbehavior_tx, _misbehavior_rx) = tokio::sync::mpsc::channel(1);
let (mempool, mempool_transaction_subscriber) = Mempool::new(
&mempool_config,
Buffer::new(BoxService::new(peer_set.clone()), 1),
state_service.clone(),
Buffer::new(BoxService::new(tx_verifier.clone()), 1),
sync_status,
latest_chain_tip,
chain_tip_change.clone(),
misbehavior_tx,
);
let mut mempool_transaction_receiver = mempool_transaction_subscriber.subscribe();
tokio::spawn(async move { while mempool_transaction_receiver.recv().await.is_ok() {} });
if should_commit_genesis_block {
let genesis_block: Arc<Block> = zebra_test::vectors::BLOCK_MAINNET_GENESIS_BYTES
.zcash_deserialize_into()
.unwrap();
state_service
.ready()
.await
.unwrap()
.call(zebra_state::Request::CommitCheckpointVerifiedBlock(
genesis_block.clone().into(),
))
.await
.unwrap();
chain_tip_change
.wait_for_tip_change()
.await
.expect("unexpected chain tip update failure");
}
(
mempool,
peer_set,
state_service,
chain_tip_change,
tx_verifier,
recent_syncs,
mempool_transaction_subscriber.subscribe(),
)
}