use std::time::Duration;
use futures::TryFutureExt;
use thiserror::Error;
use tokio::sync::{mpsc, watch};
use tower::{timeout::Timeout, Service, ServiceExt};
use tracing::Instrument;
use zebra_chain::{block, chain_tip::ChainTip};
use zebra_network as zn;
use zebra_state::ChainTipChange;
use crate::{
components::sync::{SyncStatus, PEER_GOSSIP_DELAY, TIPS_RESPONSE_TIMEOUT},
BoxError,
};
use BlockGossipError::*;
#[derive(Error, Debug)]
pub enum BlockGossipError {
#[error("chain tip sender was dropped")]
TipChange(watch::error::RecvError),
#[error("sync status sender was dropped")]
SyncStatus(watch::error::RecvError),
#[error("permanent peer set failure")]
PeerSetReadiness(zn::BoxError),
}
pub async fn gossip_best_tip_block_hashes<ZN>(
sync_status: SyncStatus,
mut chain_state: ChainTipChange,
broadcast_network: ZN,
mut mined_block_receiver: Option<mpsc::Receiver<(block::Hash, block::Height)>>,
) -> Result<(), BlockGossipError>
where
ZN: Service<zn::Request, Response = zn::Response, Error = BoxError> + Send + Clone + 'static,
ZN::Future: Send,
{
info!("initializing block gossip task");
let mut broadcast_network = Timeout::new(broadcast_network, TIPS_RESPONSE_TIMEOUT);
loop {
let mut sync_status = sync_status.clone();
let mut chain_tip = chain_state.clone();
let tip_change_close_to_network_tip_fut = async move {
const WAIT_FOR_BLOCK_SUBMISSION_DELAY: Duration = Duration::from_micros(100);
tokio::time::sleep(PEER_GOSSIP_DELAY).await;
let tip_action = chain_tip.wait_for_tip_change().await.map_err(TipChange)?;
tokio::time::sleep(WAIT_FOR_BLOCK_SUBMISSION_DELAY).await;
sync_status
.wait_until_close_to_tip()
.map_err(SyncStatus)
.await?;
let best_tip = chain_tip
.last_tip_change()
.unwrap_or(tip_action)
.best_tip_hash_and_height();
Ok((best_tip, "sending committed block broadcast", chain_tip))
}
.in_current_span();
let (((hash, height), log_msg, updated_chain_state), is_block_submission) =
if let Some(mined_block_receiver) = mined_block_receiver.as_mut() {
tokio::select! {
tip_change_close_to_network_tip = tip_change_close_to_network_tip_fut => {
(tip_change_close_to_network_tip?, false)
},
Some(tip_change) = mined_block_receiver.recv() => {
((tip_change, "sending mined block broadcast", chain_state), true)
}
}
} else {
(tip_change_close_to_network_tip_fut.await?, false)
};
chain_state = updated_chain_state;
let request = if is_block_submission {
zn::Request::AdvertiseBlockToAll(hash)
} else {
zn::Request::AdvertiseBlock(hash)
};
info!(?height, ?request, log_msg);
let broadcast_fut = broadcast_network
.ready()
.await
.map_err(PeerSetReadiness)?
.call(request);
tokio::spawn(broadcast_fut);
if is_block_submission
&& mined_block_receiver
.as_ref()
.is_some_and(|rx| rx.is_empty())
&& chain_state.latest_chain_tip().best_tip_hash() == Some(hash)
{
chain_state.mark_last_change_hash(hash);
}
}
}