use std::{
collections::HashMap,
pin::Pin,
task::{Context, Poll},
};
use futures::{
future::TryFutureExt,
ready,
stream::{FuturesUnordered, Stream},
};
use pin_project::pin_project;
use tokio::{sync::oneshot, task::JoinHandle};
use tower::{Service, ServiceExt};
use tracing_futures::Instrument;
use zebra_chain::{
block::{self, HeightDiff},
chain_tip::ChainTip,
};
use zebra_network::{self as zn, PeerSocketAddr};
use zebra_state as zs;
use crate::components::sync::MIN_CONCURRENCY_LIMIT;
type BoxError = Box<dyn std::error::Error + Send + Sync + 'static>;
pub const MAX_INBOUND_CONCURRENCY: usize = 30;
pub enum DownloadAction {
AddedToQueue,
AlreadyQueued,
FullQueue,
}
#[pin_project]
#[derive(Debug)]
pub struct Downloads<ZN, ZV, ZS>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN::Future: Send,
ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
+ Send
+ Clone
+ 'static,
ZV::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
{
full_verify_concurrency_limit: usize,
network: ZN,
verifier: ZV,
state: ZS,
latest_chain_tip: zs::LatestChainTip,
#[pin]
pending: FuturesUnordered<
JoinHandle<Result<block::Hash, (BoxError, block::Hash, Option<PeerSocketAddr>)>>,
>,
cancel_handles: HashMap<block::Hash, oneshot::Sender<()>>,
}
impl<ZN, ZV, ZS> Stream for Downloads<ZN, ZV, ZS>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN::Future: Send,
ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
+ Send
+ Clone
+ 'static,
ZV::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
{
type Item = Result<block::Hash, (BoxError, Option<PeerSocketAddr>)>;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
let this = self.project();
if let Some(join_result) = ready!(this.pending.poll_next(cx)) {
match join_result.expect("block download and verify tasks must not panic") {
Ok(hash) => {
this.cancel_handles.remove(&hash);
Poll::Ready(Some(Ok(hash)))
}
Err((e, hash, advertiser_addr)) => {
this.cancel_handles.remove(&hash);
Poll::Ready(Some(Err((e, advertiser_addr))))
}
}
} else {
Poll::Ready(None)
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.pending.size_hint()
}
}
impl<ZN, ZV, ZS> Downloads<ZN, ZV, ZS>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN::Future: Send,
ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
+ Send
+ Clone
+ 'static,
ZV::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError> + Send + Clone + 'static,
ZS::Future: Send,
{
pub fn new(
full_verify_concurrency_limit: usize,
network: ZN,
verifier: ZV,
state: ZS,
latest_chain_tip: zs::LatestChainTip,
) -> Self {
let full_verify_concurrency_limit =
full_verify_concurrency_limit.clamp(MIN_CONCURRENCY_LIMIT, MAX_INBOUND_CONCURRENCY);
Self {
full_verify_concurrency_limit,
network,
verifier,
state,
latest_chain_tip,
pending: FuturesUnordered::new(),
cancel_handles: HashMap::new(),
}
}
#[instrument(skip(self, hash), fields(hash = %hash))]
pub fn download_and_verify(&mut self, hash: block::Hash) -> DownloadAction {
if self.cancel_handles.contains_key(&hash) {
debug!(
?hash,
queue_len = self.pending.len(),
concurrency_limit = self.full_verify_concurrency_limit,
"block hash already queued for inbound download: ignored block",
);
metrics::gauge!("gossip.queued.block.count").set(self.pending.len() as f64);
metrics::counter!("gossip.already.queued.dropped.block.hash.count").increment(1);
return DownloadAction::AlreadyQueued;
}
if self.pending.len() >= self.full_verify_concurrency_limit {
debug!(
?hash,
queue_len = self.pending.len(),
concurrency_limit = self.full_verify_concurrency_limit,
"too many blocks queued for inbound download: ignored block",
);
metrics::gauge!("gossip.queued.block.count").set(self.pending.len() as f64);
metrics::counter!("gossip.full.queue.dropped.block.hash.count").increment(1);
return DownloadAction::FullQueue;
}
let (cancel_tx, mut cancel_rx) = oneshot::channel::<()>();
let state = self.state.clone();
let network = self.network.clone();
let verifier = self.verifier.clone();
let latest_chain_tip = self.latest_chain_tip.clone();
let full_verify_concurrency_limit = self.full_verify_concurrency_limit;
let fut = async move {
match state.oneshot(zs::Request::KnownBlock(hash)).await {
Ok(zs::Response::KnownBlock(None)) => Ok(()),
Ok(zs::Response::KnownBlock(Some(_))) => Err("already present".into()),
Ok(_) => unreachable!("wrong response"),
Err(e) => Err(e),
}
.map_err(|e| (e, None))?;
let (block, advertiser_addr) = if let zn::Response::Blocks(blocks) = network
.oneshot(zn::Request::BlocksByHash(std::iter::once(hash).collect()))
.await
.map_err(|e| (e, None))?
{
assert_eq!(
blocks.len(),
1,
"wrong number of blocks in response to a single hash",
);
blocks
.first()
.expect("just checked length")
.available()
.expect(
"unexpected missing block status: single block failures should be errors",
)
} else {
unreachable!("wrong response to block request");
};
metrics::counter!("gossip.downloaded.block.count").increment(1);
let tip_height = latest_chain_tip.best_tip_height();
let max_lookahead_height = if let Some(tip_height) = tip_height {
let lookahead = HeightDiff::try_from(full_verify_concurrency_limit)
.expect("fits in HeightDiff");
(tip_height + lookahead).expect("tip is much lower than Height::MAX")
} else {
let genesis_lookahead =
u32::try_from(full_verify_concurrency_limit - 1).expect("fits in u32");
block::Height(genesis_lookahead)
};
let min_accepted_height = tip_height
.map(|tip_height| {
block::Height(tip_height.0.saturating_sub(zs::MAX_BLOCK_REORG_HEIGHT))
})
.unwrap_or(block::Height(0));
let block_height = block
.coinbase_height()
.ok_or_else(|| {
debug!(
?hash,
"gossiped block with no height: dropped downloaded block"
);
metrics::counter!("gossip.no.height.dropped.block.count").increment(1);
BoxError::from("gossiped block with no height")
})
.map_err(|e| (e, None))?;
if block_height > max_lookahead_height {
debug!(
?hash,
?block_height,
?tip_height,
?max_lookahead_height,
lookahead_limit = full_verify_concurrency_limit,
"gossiped block height too far ahead of the tip: dropped downloaded block",
);
metrics::counter!("gossip.max.height.limit.dropped.block.count").increment(1);
Err("gossiped block height too far ahead").map_err(|e| (e.into(), None))?;
} else if block_height < min_accepted_height {
debug!(
?hash,
?block_height,
?tip_height,
?min_accepted_height,
behind_tip_limit = ?zs::MAX_BLOCK_REORG_HEIGHT,
"gossiped block height behind the finalized tip: dropped downloaded block",
);
metrics::counter!("gossip.min.height.limit.dropped.block.count").increment(1);
Err("gossiped block height behind the finalized tip")
.map_err(|e| (e.into(), None))?;
}
verifier
.oneshot(zebra_consensus::Request::Commit(block))
.await
.map(|hash| (hash, block_height))
.map_err(|e| (e, advertiser_addr))
}
.map_ok(|(hash, height)| {
info!(?height, "downloaded and verified gossiped block");
metrics::counter!("gossip.verified.block.count").increment(1);
hash
})
.map_err(move |(e, advertiser_addr)| (e, hash, advertiser_addr))
.in_current_span();
let task = tokio::spawn(async move {
tokio::select! {
biased;
_ = &mut cancel_rx => {
trace!("task cancelled prior to completion");
metrics::counter!("gossip.cancelled.count").increment(1);
Err(("canceled".into(), hash, None))
}
verification = fut => verification,
}
});
self.pending.push(task);
assert!(
self.cancel_handles.insert(hash, cancel_tx).is_none(),
"blocks are only queued once"
);
debug!(
?hash,
queue_len = self.pending.len(),
concurrency_limit = self.full_verify_concurrency_limit,
"queued hash for download",
);
metrics::gauge!("gossip.queued.block.count").set(self.pending.len() as f64);
DownloadAction::AddedToQueue
}
}