use std::{cmp::max, collections::HashSet, convert, pin::Pin, task::Poll, time::Duration};
use color_eyre::eyre::{eyre, Report};
use futures::stream::{FuturesUnordered, StreamExt};
use indexmap::IndexSet;
use serde::{Deserialize, Serialize};
use tokio::{
sync::{mpsc, watch},
task::JoinError,
time::{sleep, timeout},
};
use tower::{
builder::ServiceBuilder, hedge::Hedge, limit::ConcurrencyLimit, retry::Retry, timeout::Timeout,
Service, ServiceExt,
};
use zebra_chain::{
block::{self, Height, HeightDiff},
chain_tip::ChainTip,
};
use zebra_network::{self as zn, PeerSocketAddr};
use zebra_state as zs;
use crate::{
components::sync::downloads::BlockDownloadVerifyError, config::ZebradConfig, BoxError,
};
mod downloads;
pub mod end_of_support;
mod gossip;
mod progress;
mod recent_sync_lengths;
mod status;
#[cfg(test)]
mod tests;
use downloads::{AlwaysHedge, Downloads};
pub use downloads::VERIFICATION_PIPELINE_SCALING_MULTIPLIER;
pub use gossip::{gossip_best_tip_block_hashes, BlockGossipError};
pub use progress::show_block_chain_progress;
pub use recent_sync_lengths::RecentSyncLengths;
pub use status::SyncStatus;
const FANOUT: usize = 3;
const BLOCK_DOWNLOAD_RETRY_LIMIT: usize = 3;
pub const MIN_CHECKPOINT_CONCURRENCY_LIMIT: usize = zebra_consensus::MAX_CHECKPOINT_HEIGHT_GAP;
pub const DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT: usize = MAX_TIPS_RESPONSE_HASH_COUNT * 2;
pub const MIN_CONCURRENCY_LIMIT: usize = 1;
pub const MAX_TIPS_RESPONSE_HASH_COUNT: usize = 500;
pub const TIPS_RESPONSE_TIMEOUT: Duration = Duration::from_secs(6);
pub const PEER_GOSSIP_DELAY: Duration = Duration::from_secs(7);
pub(super) const BLOCK_DOWNLOAD_TIMEOUT: Duration = Duration::from_secs(20);
pub(super) const BLOCK_VERIFY_TIMEOUT: Duration = Duration::from_secs(8 * 60);
const FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT: Duration = Duration::from_secs(2 * 60);
const FINAL_CHECKPOINT_BLOCK_VERIFY_TIMEOUT_LIMIT: HeightDiff = 100;
const SYNC_RESTART_DELAY: Duration = Duration::from_secs(67);
const GENESIS_TIMEOUT_RETRY: Duration = Duration::from_secs(10);
#[derive(Clone, Debug, Eq, PartialEq, Deserialize, Serialize)]
#[serde(deny_unknown_fields, default)]
pub struct Config {
#[serde(alias = "max_concurrent_block_requests")]
pub download_concurrency_limit: usize,
#[serde(alias = "lookahead_limit")]
pub checkpoint_verify_concurrency_limit: usize,
pub full_verify_concurrency_limit: usize,
pub parallel_cpu_threads: usize,
}
impl Default for Config {
fn default() -> Self {
Self {
download_concurrency_limit: 50,
checkpoint_verify_concurrency_limit: DEFAULT_CHECKPOINT_CONCURRENCY_LIMIT,
full_verify_concurrency_limit: 20,
parallel_cpu_threads: 0,
}
}
}
#[derive(Copy, Clone, Debug, Hash, PartialEq, Eq)]
struct CheckedTip {
tip: block::Hash,
expected_next: block::Hash,
}
pub struct ChainSync<ZN, ZS, ZV, ZSTip>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
ZN::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
ZS::Future: Send,
ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
ZV::Future: Send,
ZSTip: ChainTip + Clone + Send + 'static,
{
genesis_hash: block::Hash,
max_checkpoint_height: Height,
checkpoint_verify_concurrency_limit: usize,
full_verify_concurrency_limit: usize,
tip_network: Timeout<ZN>,
downloads: Pin<
Box<
Downloads<
Hedge<ConcurrencyLimit<Retry<zn::RetryLimit, Timeout<ZN>>>, AlwaysHedge>,
Timeout<ZV>,
ZSTip,
>,
>,
>,
state: ZS,
latest_chain_tip: ZSTip,
prospective_tips: HashSet<CheckedTip>,
recent_syncs: RecentSyncLengths,
past_lookahead_limit_receiver: zs::WatchReceiver<bool>,
misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
}
impl<ZN, ZS, ZV, ZSTip> ChainSync<ZN, ZS, ZV, ZSTip>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
ZN::Future: Send,
ZS: Service<zs::Request, Response = zs::Response, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
ZS::Future: Send,
ZV: Service<zebra_consensus::Request, Response = block::Hash, Error = BoxError>
+ Send
+ Sync
+ Clone
+ 'static,
ZV::Future: Send,
ZSTip: ChainTip + Clone + Send + 'static,
{
pub fn new(
config: &ZebradConfig,
max_checkpoint_height: Height,
peers: ZN,
verifier: ZV,
state: ZS,
latest_chain_tip: ZSTip,
misbehavior_sender: mpsc::Sender<(PeerSocketAddr, u32)>,
) -> (Self, SyncStatus) {
let mut download_concurrency_limit = config.sync.download_concurrency_limit;
let mut checkpoint_verify_concurrency_limit =
config.sync.checkpoint_verify_concurrency_limit;
let mut full_verify_concurrency_limit = config.sync.full_verify_concurrency_limit;
if download_concurrency_limit < MIN_CONCURRENCY_LIMIT {
warn!(
"configured download concurrency limit {} too low, increasing to {}",
config.sync.download_concurrency_limit, MIN_CONCURRENCY_LIMIT,
);
download_concurrency_limit = MIN_CONCURRENCY_LIMIT;
}
if checkpoint_verify_concurrency_limit < MIN_CHECKPOINT_CONCURRENCY_LIMIT {
warn!(
"configured checkpoint verify concurrency limit {} too low, increasing to {}",
config.sync.checkpoint_verify_concurrency_limit, MIN_CHECKPOINT_CONCURRENCY_LIMIT,
);
checkpoint_verify_concurrency_limit = MIN_CHECKPOINT_CONCURRENCY_LIMIT;
}
if full_verify_concurrency_limit < MIN_CONCURRENCY_LIMIT {
warn!(
"configured full verify concurrency limit {} too low, increasing to {}",
config.sync.full_verify_concurrency_limit, MIN_CONCURRENCY_LIMIT,
);
full_verify_concurrency_limit = MIN_CONCURRENCY_LIMIT;
}
let tip_network = Timeout::new(peers.clone(), TIPS_RESPONSE_TIMEOUT);
let block_network = Hedge::new(
ServiceBuilder::new()
.concurrency_limit(download_concurrency_limit)
.retry(zn::RetryLimit::new(BLOCK_DOWNLOAD_RETRY_LIMIT))
.timeout(BLOCK_DOWNLOAD_TIMEOUT)
.service(peers),
AlwaysHedge,
20,
0.95,
2 * SYNC_RESTART_DELAY,
);
let verifier = Timeout::new(verifier, BLOCK_VERIFY_TIMEOUT);
let (sync_status, recent_syncs) = SyncStatus::new();
let (past_lookahead_limit_sender, past_lookahead_limit_receiver) = watch::channel(false);
let past_lookahead_limit_receiver = zs::WatchReceiver::new(past_lookahead_limit_receiver);
let downloads = Box::pin(Downloads::new(
block_network,
verifier,
latest_chain_tip.clone(),
past_lookahead_limit_sender,
max(
checkpoint_verify_concurrency_limit,
full_verify_concurrency_limit,
),
max_checkpoint_height,
));
let new_syncer = Self {
genesis_hash: config.network.network.genesis_hash(),
max_checkpoint_height,
checkpoint_verify_concurrency_limit,
full_verify_concurrency_limit,
tip_network,
downloads,
state,
latest_chain_tip,
prospective_tips: HashSet::new(),
recent_syncs,
past_lookahead_limit_receiver,
misbehavior_sender,
};
(new_syncer, sync_status)
}
#[instrument(skip(self))]
pub async fn sync(mut self) -> Result<(), Report> {
self.request_genesis().await?;
loop {
if self.try_to_sync().await.is_err() {
self.downloads.cancel_all();
}
self.update_metrics();
info!(
timeout = ?SYNC_RESTART_DELAY,
state_tip = ?self.latest_chain_tip.best_tip_height(),
"waiting to restart sync"
);
sleep(SYNC_RESTART_DELAY).await;
}
}
#[instrument(skip(self))]
async fn try_to_sync(&mut self) -> Result<(), Report> {
self.prospective_tips = HashSet::new();
info!(
state_tip = ?self.latest_chain_tip.best_tip_height(),
"starting sync, obtaining new tips"
);
let mut extra_hashes = timeout(SYNC_RESTART_DELAY, self.obtain_tips())
.await
.map_err(Into::into)
.and_then(convert::identity)
.map_err(|e| {
info!("temporary error obtaining tips: {:#}", e);
e
})?;
self.update_metrics();
while !self.prospective_tips.is_empty() || !extra_hashes.is_empty() {
extra_hashes = timeout(BLOCK_VERIFY_TIMEOUT, self.try_to_sync_once(extra_hashes))
.await
.map_err(Into::into)
.and_then(convert::identity)?;
}
info!("exhausted prospective tip set");
Ok(())
}
#[instrument(skip(self, extra_hashes))]
async fn try_to_sync_once(
&mut self,
mut extra_hashes: IndexSet<block::Hash>,
) -> Result<IndexSet<block::Hash>, Report> {
while let Poll::Ready(Some(rsp)) = futures::poll!(self.downloads.next()) {
self.handle_block_response(rsp)?;
}
self.update_metrics();
while self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len())
|| (self.downloads.in_flight() >= self.lookahead_limit(extra_hashes.len()) / 2
&& self.past_lookahead_limit_receiver.cloned_watch_data())
{
trace!(
tips.len = self.prospective_tips.len(),
in_flight = self.downloads.in_flight(),
extra_hashes = extra_hashes.len(),
lookahead_limit = self.lookahead_limit(extra_hashes.len()),
state_tip = ?self.latest_chain_tip.best_tip_height(),
"waiting for pending blocks",
);
let response = self.downloads.next().await.expect("downloads is nonempty");
self.handle_block_response(response)?;
self.update_metrics();
}
if !extra_hashes.is_empty() {
debug!(
tips.len = self.prospective_tips.len(),
in_flight = self.downloads.in_flight(),
extra_hashes = extra_hashes.len(),
lookahead_limit = self.lookahead_limit(extra_hashes.len()),
state_tip = ?self.latest_chain_tip.best_tip_height(),
"requesting more blocks",
);
let response = self.request_blocks(extra_hashes).await;
extra_hashes = Self::handle_hash_response(response)?;
} else {
info!(
tips.len = self.prospective_tips.len(),
in_flight = self.downloads.in_flight(),
extra_hashes = extra_hashes.len(),
lookahead_limit = self.lookahead_limit(extra_hashes.len()),
state_tip = ?self.latest_chain_tip.best_tip_height(),
"extending tips",
);
extra_hashes = self.extend_tips().await.map_err(|e| {
info!("temporary error extending tips: {:#}", e);
e
})?;
}
self.update_metrics();
Ok(extra_hashes)
}
#[instrument(skip(self))]
async fn obtain_tips(&mut self) -> Result<IndexSet<block::Hash>, Report> {
let stage_start = std::time::Instant::now();
let block_locator = self
.state
.ready()
.await
.map_err(|e| eyre!(e))?
.call(zebra_state::Request::BlockLocator)
.await
.map(|response| match response {
zebra_state::Response::BlockLocator(block_locator) => block_locator,
_ => unreachable!(
"GetBlockLocator request can only result in Response::BlockLocator"
),
})
.map_err(|e| eyre!(e))?;
debug!(
tip = ?block_locator.first().expect("we have at least one block locator object"),
?block_locator,
"got block locator and trying to obtain new chain tips"
);
let mut requests = FuturesUnordered::new();
for attempt in 0..FANOUT {
if attempt > 0 {
tokio::task::yield_now().await;
}
let ready_tip_network = self.tip_network.ready().await;
requests.push(tokio::spawn(ready_tip_network.map_err(|e| eyre!(e))?.call(
zn::Request::FindBlocks {
known_blocks: block_locator.clone(),
stop: None,
},
)));
}
let mut download_set = IndexSet::new();
while let Some(res) = requests.next().await {
match res
.unwrap_or_else(|e @ JoinError { .. }| {
if e.is_panic() {
panic!("panic in obtain tips task: {e:?}");
} else {
info!(
"task error during obtain tips task: {e:?},\
is Zebra shutting down?"
);
Err(e.into())
}
})
.map_err::<Report, _>(|e| eyre!(e))
{
Ok(zn::Response::BlockHashes(hashes)) => {
trace!(?hashes);
let hashes = match hashes.as_slice() {
[] => continue,
[rest @ .., _last] => rest,
};
let mut first_unknown = None;
for (i, &hash) in hashes.iter().enumerate() {
if !self.state_contains(hash).await? {
first_unknown = Some(i);
break;
}
}
debug!(hashes.len = ?hashes.len(), ?first_unknown);
let unknown_hashes = if let Some(index) = first_unknown {
&hashes[index..]
} else {
continue;
};
trace!(?unknown_hashes);
let new_tip = if let Some(end) = unknown_hashes.rchunks_exact(2).next() {
CheckedTip {
tip: end[0],
expected_next: end[1],
}
} else {
debug!("discarding response that extends only one block");
continue;
};
if !download_set.contains(&new_tip.expected_next) {
debug!(?new_tip,
"adding new prospective tip, and removing existing tips in the new block hash list");
self.prospective_tips
.retain(|t| !unknown_hashes.contains(&t.expected_next));
self.prospective_tips.insert(new_tip);
} else {
debug!(
?new_tip,
"discarding prospective tip: already in download set"
);
}
let prev_download_len = download_set.len();
download_set.extend(unknown_hashes);
let new_download_len = download_set.len();
let new_hashes = new_download_len - prev_download_len;
debug!(new_hashes, "added hashes to download set");
metrics::histogram!("sync.obtain.response.hash.count")
.record(new_hashes as f64);
}
Ok(_) => unreachable!("network returned wrong response"),
Err(e) => debug!(?e),
}
}
debug!(?self.prospective_tips);
for hash in &download_set {
debug!(?hash, "checking if state contains hash");
if self.state_contains(*hash).await? {
return Err(eyre!("queued download of hash behind our chain tip"));
}
}
let new_downloads = download_set.len();
debug!(new_downloads, "queueing new downloads");
metrics::gauge!("sync.obtain.queued.hash.count").set(new_downloads as f64);
self.recent_syncs.push_obtain_tips_length(new_downloads);
let response = self.request_blocks(download_set).await;
metrics::histogram!("sync.stage.duration_seconds", "stage" => "obtain_tips")
.record(stage_start.elapsed().as_secs_f64());
Self::handle_hash_response(response).map_err(Into::into)
}
#[instrument(skip(self))]
async fn extend_tips(&mut self) -> Result<IndexSet<block::Hash>, Report> {
let stage_start = std::time::Instant::now();
let tips = std::mem::take(&mut self.prospective_tips);
let mut download_set = IndexSet::new();
debug!(tips = ?tips.len(), "trying to extend chain tips");
for tip in tips {
debug!(?tip, "asking peers to extend chain tip");
let mut responses = FuturesUnordered::new();
for attempt in 0..FANOUT {
if attempt > 0 {
tokio::task::yield_now().await;
}
let ready_tip_network = self.tip_network.ready().await;
responses.push(tokio::spawn(ready_tip_network.map_err(|e| eyre!(e))?.call(
zn::Request::FindBlocks {
known_blocks: vec![tip.tip],
stop: None,
},
)));
}
while let Some(res) = responses.next().await {
match res
.expect("panic in spawned extend tips request")
.map_err::<Report, _>(|e| eyre!(e))
{
Ok(zn::Response::BlockHashes(hashes)) => {
debug!(first = ?hashes.first(), len = ?hashes.len());
trace!(?hashes);
let unknown_hashes = match hashes.as_slice() {
[expected_hash, rest @ ..] if expected_hash == &tip.expected_next => {
rest
}
[first_hash, expected_hash, rest @ ..]
if expected_hash == &tip.expected_next =>
{
debug!(?first_hash,
?tip.expected_next,
?tip.tip,
"unexpected first hash, but the second matches: using the hashes after the match");
rest
}
[] => continue,
[single_hash] => {
debug!(?single_hash,
?tip.expected_next,
?tip.tip,
"discarding response containing a single unexpected hash");
continue;
}
[first_hash, second_hash, rest @ ..] => {
debug!(?first_hash,
?second_hash,
rest_len = ?rest.len(),
?tip.expected_next,
?tip.tip,
"discarding response that starts with two unexpected hashes");
continue;
}
};
let unknown_hashes = match unknown_hashes {
[] => continue,
[rest @ .., _last] => rest,
};
let new_tip = if let Some(end) = unknown_hashes.rchunks_exact(2).next() {
CheckedTip {
tip: end[0],
expected_next: end[1],
}
} else {
debug!("discarding response that extends only one block");
continue;
};
trace!(?unknown_hashes);
if !download_set.contains(&new_tip.expected_next) {
debug!(?new_tip,
"adding new prospective tip, and removing any existing tips in the new block hash list");
self.prospective_tips
.retain(|t| !unknown_hashes.contains(&t.expected_next));
self.prospective_tips.insert(new_tip);
} else {
debug!(
?new_tip,
"discarding prospective tip: already in download set"
);
}
let prev_download_len = download_set.len();
download_set.extend(unknown_hashes);
let new_download_len = download_set.len();
let new_hashes = new_download_len - prev_download_len;
debug!(new_hashes, "added hashes to download set");
metrics::histogram!("sync.extend.response.hash.count")
.record(new_hashes as f64);
}
Ok(_) => unreachable!("network returned wrong response"),
Err(e) => debug!(?e),
}
}
}
let new_downloads = download_set.len();
debug!(new_downloads, "queueing new downloads");
metrics::gauge!("sync.extend.queued.hash.count").set(new_downloads as f64);
self.recent_syncs.push_extend_tips_length(new_downloads);
let response = self.request_blocks(download_set).await;
metrics::histogram!("sync.stage.duration_seconds", "stage" => "extend_tips")
.record(stage_start.elapsed().as_secs_f64());
Self::handle_hash_response(response).map_err(Into::into)
}
async fn request_genesis(&mut self) -> Result<(), Report> {
while !self.state_contains(self.genesis_hash).await? {
info!("starting genesis block download and verify");
let response = timeout(SYNC_RESTART_DELAY, self.request_genesis_once())
.await
.map_err(Into::into);
match response {
Ok(Ok(Ok(response))) => self
.handle_block_response(Ok(response))
.expect("never returns Err for Ok"),
Ok(Err(fatal_error)) => Err(fatal_error)?,
Err(error) | Ok(Ok(Err(error))) => {
if Self::should_restart_sync(&error) {
warn!(
?error,
"could not download or verify genesis block, retrying"
);
} else {
info!(
?error,
"temporary error downloading or verifying genesis block, retrying"
);
}
tokio::time::sleep(GENESIS_TIMEOUT_RETRY).await;
}
}
}
Ok(())
}
async fn request_genesis_once(
&mut self,
) -> Result<Result<(Height, block::Hash), BlockDownloadVerifyError>, Report> {
let response = self.downloads.download_and_verify(self.genesis_hash).await;
Self::handle_response(response).map_err(|e| eyre!(e))?;
let response = self.downloads.next().await.expect("downloads is nonempty");
Ok(response)
}
async fn request_blocks(
&mut self,
mut hashes: IndexSet<block::Hash>,
) -> Result<IndexSet<block::Hash>, BlockDownloadVerifyError> {
let lookahead_limit = self.lookahead_limit(hashes.len());
debug!(
hashes.len = hashes.len(),
?lookahead_limit,
"requesting blocks",
);
let extra_hashes = if hashes.len() > lookahead_limit {
hashes.split_off(lookahead_limit)
} else {
IndexSet::new()
};
for hash in hashes.into_iter() {
self.downloads.download_and_verify(hash).await?;
}
Ok(extra_hashes)
}
fn lookahead_limit(&self, new_hashes: usize) -> usize {
let max_checkpoint_height: usize = self
.max_checkpoint_height
.0
.try_into()
.expect("fits in usize");
let verified_height: usize = self
.latest_chain_tip
.best_tip_height()
.unwrap_or(Height(0))
.0
.try_into()
.expect("fits in usize");
if verified_height >= max_checkpoint_height {
self.full_verify_concurrency_limit
} else if (verified_height + new_hashes) >= max_checkpoint_height {
let checkpoint_hashes = verified_height + new_hashes - max_checkpoint_height;
self.full_verify_concurrency_limit + checkpoint_hashes
} else {
self.checkpoint_verify_concurrency_limit
}
}
#[allow(unknown_lints)]
fn handle_block_response(
&mut self,
response: Result<(Height, block::Hash), BlockDownloadVerifyError>,
) -> Result<(), BlockDownloadVerifyError> {
match response {
Ok((height, hash)) => {
trace!(?height, ?hash, "verified and committed block to state");
return Ok(());
}
Err(BlockDownloadVerifyError::Invalid {
ref error,
advertiser_addr: Some(advertiser_addr),
..
}) if error.misbehavior_score() != 0 => {
let _ = self
.misbehavior_sender
.try_send((advertiser_addr, error.misbehavior_score()));
}
Err(_) => {}
};
Self::handle_response(response)
}
#[allow(unknown_lints)]
fn handle_hash_response(
response: Result<IndexSet<block::Hash>, BlockDownloadVerifyError>,
) -> Result<IndexSet<block::Hash>, BlockDownloadVerifyError> {
match response {
Ok(extra_hashes) => Ok(extra_hashes),
Err(_) => Self::handle_response(response).map(|()| IndexSet::new()),
}
}
#[allow(unknown_lints)]
fn handle_response<T>(
response: Result<T, BlockDownloadVerifyError>,
) -> Result<(), BlockDownloadVerifyError> {
match response {
Ok(_t) => Ok(()),
Err(error) => {
if Self::should_restart_sync(&error) {
Err(error)
} else {
Ok(())
}
}
}
}
pub(crate) async fn state_contains(&mut self, hash: block::Hash) -> Result<bool, Report> {
match self
.state
.ready()
.await
.map_err(|e| eyre!(e))?
.call(zebra_state::Request::KnownBlock(hash))
.await
.map_err(|e| eyre!(e))?
{
zs::Response::KnownBlock(loc) => Ok(loc.is_some()),
_ => unreachable!("wrong response to known block request"),
}
}
fn update_metrics(&mut self) {
metrics::gauge!("sync.prospective_tips.len",).set(self.prospective_tips.len() as f64);
metrics::gauge!("sync.downloads.in_flight",).set(self.downloads.in_flight() as f64);
}
fn should_restart_sync(e: &BlockDownloadVerifyError) -> bool {
match e {
BlockDownloadVerifyError::Invalid { error, .. } if error.is_duplicate_request() => {
debug!(error = ?e, "block was already verified or committed, possibly from a previous sync run, continuing");
false
}
BlockDownloadVerifyError::CancelledDuringDownload { .. }
| BlockDownloadVerifyError::CancelledDuringVerification { .. } => {
debug!(error = ?e, "block verification was cancelled, continuing");
false
}
BlockDownloadVerifyError::BehindTipHeightLimit { .. } => {
debug!(
error = ?e,
"block height is behind the current state tip, \
assuming the syncer will eventually catch up to the state, continuing"
);
false
}
BlockDownloadVerifyError::DuplicateBlockQueuedForDownload { .. } => {
debug!(
error = ?e,
"queued duplicate block hash for download, \
assuming the syncer will eventually resolve duplicates, continuing"
);
false
}
BlockDownloadVerifyError::DownloadFailed { ref error, .. }
if format!("{error:?}").contains("NotFound") =>
{
debug!(error = ?e, "block was not found, possibly from a peer that doesn't have the block yet, continuing");
false
}
_ => {
let err_str = format!("{e:?}");
if err_str.contains("NotFound") {
error!(?e,
"a BlockDownloadVerifyError that should have been filtered out was detected, \
which possibly indicates a programming error in the downcast inside \
zebrad::components::sync::downloads::Downloads::download_and_verify"
)
}
warn!(?e, "error downloading and verifying block");
true
}
}
}
}