use std::{
collections::{HashMap, HashSet},
pin::Pin,
task::{Context, Poll},
time::Duration,
};
use futures::{
future::TryFutureExt,
ready,
stream::{FuturesUnordered, Stream},
FutureExt,
};
use pin_project::{pin_project, pinned_drop};
use thiserror::Error;
use tokio::{sync::oneshot, task::JoinHandle};
use tower::{Service, ServiceExt};
use tracing_futures::Instrument;
use zebra_chain::{
block::Height,
transaction::{self, UnminedTxId, VerifiedUnminedTx},
transparent,
};
use zebra_consensus::transaction as tx;
use zebra_network::{self as zn, PeerSocketAddr};
use zebra_node_services::mempool::Gossip;
use zebra_state::{self as zs, CloneError};
use crate::components::{
mempool::crawler::RATE_LIMIT_DELAY,
sync::{BLOCK_DOWNLOAD_TIMEOUT, BLOCK_VERIFY_TIMEOUT},
};
use super::MempoolError;
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub(crate) const TRANSACTION_DOWNLOAD_TIMEOUT: Duration = BLOCK_DOWNLOAD_TIMEOUT;
pub(crate) const TRANSACTION_VERIFY_TIMEOUT: Duration = BLOCK_VERIFY_TIMEOUT;
pub const MAX_INBOUND_CONCURRENCY: usize = 25;
#[derive(Copy, Clone, Debug, Eq, PartialEq)]
struct CancelDownloadAndVerify;
#[derive(Error, Debug, Clone)]
#[allow(dead_code)]
pub enum TransactionDownloadVerifyError {
#[error("transaction is already in state")]
InState,
#[error("error in state service: {0}")]
StateError(#[source] CloneError),
#[error("error downloading transaction: {0}")]
DownloadFailed(#[source] CloneError),
#[error("transaction download / verification was cancelled")]
Cancelled,
#[error("transaction did not pass consensus validation: {error}")]
Invalid {
error: zebra_consensus::error::TransactionError,
advertiser_addr: Option<PeerSocketAddr>,
},
}
#[pin_project(PinnedDrop)]
#[derive(Debug)]
pub struct Downloads<ZN, ZV, ZS>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN::Future: Send,
ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
ZV::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
{
network: ZN,
verifier: ZV,
state: ZS,
#[pin]
pending: FuturesUnordered<
JoinHandle<
Result<
Result<
(
VerifiedUnminedTx,
Vec<transparent::OutPoint>,
Option<Height>,
Option<oneshot::Sender<Result<(), BoxError>>>,
),
Box<(TransactionDownloadVerifyError, UnminedTxId)>,
>,
tokio::time::error::Elapsed,
>,
>,
>,
cancel_handles: HashMap<UnminedTxId, (oneshot::Sender<CancelDownloadAndVerify>, Gossip)>,
}
impl<ZN, ZV, ZS> Stream for Downloads<ZN, ZV, ZS>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN::Future: Send,
ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
ZV::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
{
type Item = Result<
Result<
(
VerifiedUnminedTx,
Vec<transparent::OutPoint>,
Option<Height>,
Option<oneshot::Sender<Result<(), BoxError>>>,
),
Box<(UnminedTxId, TransactionDownloadVerifyError)>,
>,
tokio::time::error::Elapsed,
>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
let item = if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
let result = join_result.expect("transaction download and verify tasks must not panic");
let result = match result {
Ok(Ok((tx, spent_mempool_outpoints, tip_height, rsp_tx))) => {
this.cancel_handles.remove(&tx.transaction.id);
Ok(Ok((tx, spent_mempool_outpoints, tip_height, rsp_tx)))
}
Ok(Err(boxed_err)) => {
let (e, hash) = *boxed_err;
this.cancel_handles.remove(&hash);
Ok(Err(Box::new((hash, e))))
}
Err(elapsed) => Err(elapsed),
};
Some(result)
} else {
None
};
Poll::Ready(item)
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.pending.size_hint()
}
}
impl<ZN, ZV, ZS> Downloads<ZN, ZV, ZS>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN::Future: Send,
ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
ZV::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
{
pub fn new(network: ZN, verifier: ZV, state: ZS) -> Self {
Self {
network,
verifier,
state,
pending: FuturesUnordered::new(),
cancel_handles: HashMap::new(),
}
}
#[instrument(skip(self, gossiped_tx), fields(txid = %gossiped_tx.id()))]
#[allow(clippy::unwrap_in_result)]
pub fn download_if_needed_and_verify(
&mut self,
gossiped_tx: Gossip,
mut rsp_tx: Option<oneshot::Sender<Result<(), BoxError>>>,
) -> Result<(), MempoolError> {
let txid = gossiped_tx.id();
if self.cancel_handles.contains_key(&txid) {
debug!(
?txid,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"transaction id already queued for inbound download: ignored transaction"
);
metrics::gauge!("mempool.currently.queued.transactions",)
.set(self.pending.len() as f64);
return Err(MempoolError::AlreadyQueued);
}
if self.pending.len() >= MAX_INBOUND_CONCURRENCY {
debug!(
?txid,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"too many transactions queued for inbound download: ignored transaction"
);
metrics::gauge!("mempool.currently.queued.transactions",)
.set(self.pending.len() as f64);
return Err(MempoolError::FullQueue);
}
let (cancel_tx, mut cancel_rx) = oneshot::channel::<CancelDownloadAndVerify>();
let network = self.network.clone();
let verifier = self.verifier.clone();
let mut state = self.state.clone();
let gossiped_tx_req = gossiped_tx.clone();
let fut = async move {
Self::transaction_in_best_chain(&mut state, txid).await?;
trace!(?txid, "transaction is not in best chain");
let (tip_height, next_height) = match state.oneshot(zs::Request::Tip).await {
Ok(zs::Response::Tip(None)) => Ok((None, Height(0))),
Ok(zs::Response::Tip(Some((height, _hash)))) => {
let next_height =
(height + 1).expect("valid heights are far below the maximum");
Ok((Some(height), next_height))
}
Ok(_) => unreachable!("wrong response"),
Err(e) => Err(TransactionDownloadVerifyError::StateError(e.into())),
}?;
trace!(?txid, ?next_height, "got next height");
let (tx, advertiser_addr) = match gossiped_tx {
Gossip::Id(txid) => {
let req = zn::Request::TransactionsById(std::iter::once(txid).collect());
let tx = match network
.oneshot(req)
.await
.map_err(CloneError::from)
.map_err(TransactionDownloadVerifyError::DownloadFailed)?
{
zn::Response::Transactions(mut txs) => txs.pop().ok_or_else(|| {
TransactionDownloadVerifyError::DownloadFailed(
BoxError::from("no transactions returned").into(),
)
})?,
_ => unreachable!("wrong response to transaction request"),
};
let (tx, advertiser_addr) = tx.available().expect(
"unexpected missing tx status: single tx failures should be errors",
);
metrics::counter!(
"mempool.downloaded.transactions.total",
"version" => format!("{}",tx.transaction.version()),
).increment(1);
(tx, advertiser_addr)
}
Gossip::Tx(tx) => {
metrics::counter!(
"mempool.pushed.transactions.total",
"version" => format!("{}",tx.transaction.version()),
).increment(1);
(tx, None)
}
};
trace!(?txid, "got tx");
let result = verifier
.oneshot(tx::Request::Mempool {
transaction: tx.clone(),
height: next_height,
})
.map_ok(|rsp| {
let tx::Response::Mempool { transaction, spent_mempool_outpoints } = rsp else {
panic!("unexpected non-mempool response to mempool request")
};
(transaction, spent_mempool_outpoints, tip_height)
})
.await;
trace!(?txid, result = ?result.as_ref().map(|_tx| ()), "verified transaction for the mempool");
result.map_err(|e| TransactionDownloadVerifyError::Invalid { error: e.into(), advertiser_addr } )
}
.map_ok(|(tx, spent_mempool_outpoints, tip_height)| {
metrics::counter!(
"mempool.verified.transactions.total",
"version" => format!("{}", tx.transaction.transaction.version()),
).increment(1);
(tx, spent_mempool_outpoints, tip_height)
})
.map_err(move |e| Box::new((e, txid)))
.inspect(move |result| {
let result = result.as_ref().map(|_tx| txid);
debug!("mempool transaction result: {result:?}");
})
.in_current_span();
let task = tokio::spawn(async move {
let fut = tokio::time::timeout(RATE_LIMIT_DELAY, fut);
let result = tokio::select! {
biased;
_ = &mut cancel_rx => {
trace!("task cancelled prior to completion");
metrics::counter!("mempool.cancelled.verify.tasks.total").increment(1);
if let Some(rsp_tx) = rsp_tx.take() {
let _ = rsp_tx.send(Err("verification cancelled".into()));
}
Ok(Err(Box::new((TransactionDownloadVerifyError::Cancelled, txid))))
}
verification = fut => {
verification
.inspect_err(|_elapsed| {
if let Some(rsp_tx) = rsp_tx.take() {
let _ = rsp_tx.send(Err("timeout waiting for verification result".into()));
}
})
.map(|inner_result| {
match inner_result {
Ok((transaction, spent_mempool_outpoints, tip_height)) => Ok((transaction, spent_mempool_outpoints, tip_height, rsp_tx)),
Err(boxed_err) => {
let (tx_verifier_error, tx_id) = *boxed_err;
if let Some(rsp_tx) = rsp_tx.take() {
let error_msg = format!(
"failed to validate tx: {tx_id}, error: {tx_verifier_error}"
);
let _ = rsp_tx.send(Err(error_msg.into()));
};
Err(Box::new((tx_verifier_error, tx_id)))
}
}
})
},
};
result
});
self.pending.push(task);
assert!(
self.cancel_handles
.insert(txid, (cancel_tx, gossiped_tx_req))
.is_none(),
"transactions are only queued once"
);
debug!(
?txid,
queue_len = self.pending.len(),
?MAX_INBOUND_CONCURRENCY,
"queued transaction hash for download"
);
metrics::gauge!("mempool.currently.queued.transactions",).set(self.pending.len() as f64);
metrics::counter!("mempool.queued.transactions.total").increment(1);
Ok(())
}
pub fn cancel(&mut self, mined_ids: &HashSet<transaction::Hash>) {
let removed_txids: Vec<UnminedTxId> = self
.cancel_handles
.keys()
.filter(|txid| mined_ids.contains(&txid.mined_id()))
.cloned()
.collect();
for txid in removed_txids {
if let Some(handle) = self.cancel_handles.remove(&txid) {
let _ = handle.0.send(CancelDownloadAndVerify);
}
}
}
pub fn cancel_all(&mut self) {
let _ = std::mem::take(&mut self.pending);
for (_hash, cancel) in self.cancel_handles.drain() {
let _ = cancel.0.send(CancelDownloadAndVerify);
}
assert!(self.pending.is_empty());
assert!(self.cancel_handles.is_empty());
metrics::gauge!("mempool.currently.queued.transactions",).set(self.pending.len() as f64);
}
#[allow(dead_code)]
pub fn in_flight(&self) -> usize {
self.pending.len()
}
pub fn transaction_requests(&self) -> impl Iterator<Item = &Gossip> {
self.cancel_handles.iter().map(|(_tx_id, (_handle, tx))| tx)
}
async fn transaction_in_best_chain(
state: &mut ZS,
txid: UnminedTxId,
) -> Result<(), TransactionDownloadVerifyError> {
match state
.ready()
.await
.map_err(CloneError::from)
.map_err(TransactionDownloadVerifyError::StateError)?
.call(zs::Request::Transaction(txid.mined_id()))
.await
{
Ok(zs::Response::Transaction(None)) => Ok(()),
Ok(zs::Response::Transaction(Some(_))) => Err(TransactionDownloadVerifyError::InState),
Ok(_) => unreachable!("wrong response"),
Err(e) => Err(TransactionDownloadVerifyError::StateError(e.into())),
}?;
Ok(())
}
}
#[pinned_drop]
impl<ZN, ZV, ZS> PinnedDrop for Downloads<ZN, ZV, ZS>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN::Future: Send,
ZV: Service<tx::Request, Response = tx::Response, Error = BoxError> + Send + Clone + 'static,
ZV::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
{
fn drop(mut self: Pin<&mut Self>) {
self.cancel_all();
metrics::gauge!("mempool.currently.queued.transactions").set(0 as f64);
}
}