use std::{collections::HashSet, time::Duration};
use futures::{future, pin_mut, stream::FuturesUnordered, StreamExt};
use tokio::{
sync::watch,
task::JoinHandle,
time::{sleep, timeout},
};
use tower::{timeout::Timeout, BoxError, Service, ServiceExt};
use tracing_futures::Instrument;
use zebra_chain::{block::Height, transaction::UnminedTxId};
use zebra_network as zn;
use zebra_node_services::mempool::Gossip;
use zebra_state::ChainTipChange;
use crate::components::{
mempool::{self, Config},
sync::SyncStatus,
};
#[cfg(test)]
mod tests;
const FANOUT: usize = 3;
pub const RATE_LIMIT_DELAY: Duration = Duration::from_secs(73);
const PEER_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6);
pub struct Crawler<PeerSet, Mempool> {
peer_set: Timeout<PeerSet>,
mempool: Mempool,
sync_status: SyncStatus,
chain_tip_change: ChainTipChange,
debug_enable_at_height: Option<Height>,
}
impl<PeerSet, Mempool> Crawler<PeerSet, Mempool>
where
PeerSet:
Service<zn::Request, Response = zn::Response, Error = BoxError> + Clone + Send + 'static,
PeerSet::Future: Send,
Mempool:
Service<mempool::Request, Response = mempool::Response, Error = BoxError> + Send + 'static,
Mempool::Future: Send,
{
pub fn spawn(
config: &Config,
peer_set: PeerSet,
mempool: Mempool,
sync_status: SyncStatus,
chain_tip_change: ChainTipChange,
) -> JoinHandle<Result<(), BoxError>> {
let crawler = Crawler {
peer_set: Timeout::new(peer_set, PEER_RESPONSE_TIMEOUT),
mempool,
sync_status,
chain_tip_change,
debug_enable_at_height: config.debug_enable_at_height.map(Height),
};
tokio::spawn(crawler.run().in_current_span())
}
async fn wait_until_enabled_by_debug(&mut self) -> Result<(), watch::error::RecvError> {
if self.debug_enable_at_height.is_none() {
return future::pending().await;
}
let enable_at_height = self
.debug_enable_at_height
.expect("unexpected debug_enable_at_height: just checked for None");
loop {
let best_tip_height = self
.chain_tip_change
.wait_for_tip_change()
.await?
.best_tip_height();
if best_tip_height >= enable_at_height {
return Ok(());
}
}
}
async fn wait_until_enabled(&mut self) -> Result<(), watch::error::RecvError> {
let mut sync_status = self.sync_status.clone();
let tip_future = sync_status.wait_until_close_to_tip();
let debug_future = self.wait_until_enabled_by_debug();
pin_mut!(tip_future);
pin_mut!(debug_future);
let (result, _unready_future) = future::select(tip_future, debug_future)
.await
.factor_first();
result
}
pub async fn run(mut self) -> Result<(), BoxError> {
#[cfg(not(test))]
info!("initializing mempool crawler task");
#[cfg(test)]
debug!("initializing mempool crawler task");
loop {
self.wait_until_enabled().await?;
timeout(RATE_LIMIT_DELAY, self.crawl_transactions())
.await
.unwrap_or_else(|timeout| {
info!("mempool crawl timed out: {timeout:?}");
Ok(())
})?;
sleep(RATE_LIMIT_DELAY).await;
}
}
async fn crawl_transactions(&mut self) -> Result<(), BoxError> {
let peer_set = self.peer_set.clone();
trace!("Crawling for mempool transactions");
let mut requests = FuturesUnordered::new();
for attempt in 0..FANOUT {
if attempt > 0 {
tokio::task::yield_now().await;
}
let mut peer_set = peer_set.clone();
let peer_set = peer_set.ready().await?;
requests.push(peer_set.call(zn::Request::MempoolTransactionIds));
}
while let Some(result) = requests.next().await {
match result {
Ok(response) => self.handle_response(response).await?,
Err(error) => debug!("Failed to crawl peer for mempool transactions: {}", error),
}
}
Ok(())
}
async fn handle_response(&mut self, response: zn::Response) -> Result<(), BoxError> {
let transaction_ids: HashSet<_> = match response {
zn::Response::TransactionIds(ids) => ids.into_iter().collect(),
_ => unreachable!("Peer set did not respond with transaction IDs to mempool crawler"),
};
trace!(
"Mempool crawler received {} transaction IDs",
transaction_ids.len()
);
if !transaction_ids.is_empty() {
self.queue_transactions(transaction_ids).await?;
}
Ok(())
}
async fn queue_transactions(
&mut self,
transaction_ids: HashSet<UnminedTxId>,
) -> Result<(), BoxError> {
let transaction_ids = transaction_ids.into_iter().map(Gossip::Id).collect();
let call_result = self
.mempool
.ready()
.await?
.call(mempool::Request::Queue(transaction_ids))
.await;
let queue_errors = match call_result {
Ok(mempool::Response::Queued(queue_results)) => {
queue_results.into_iter().filter_map(Result::err)
}
Ok(_) => unreachable!("Mempool did not respond with queue results to mempool crawler"),
Err(call_error) => {
debug!("Ignoring unexpected peer behavior: {}", call_error);
return Ok(());
}
};
for error in queue_errors {
debug!("Failed to download a crawled transaction: {}", error);
}
Ok(())
}
}