use std::{sync::Arc, time::Duration};
use bitcoin::{
block::Header,
hashes::Hash,
p2p::{
message_blockdata::GetHeadersMessage,
message_filter::{CFHeaders, CFilter},
message_network::VersionMessage,
ServiceFlags,
},
Block, BlockHash, Network, Wtxid,
};
use tokio::{
select,
sync::mpsc::{self},
};
use tokio::{
sync::mpsc::{Receiver, UnboundedReceiver},
time::MissedTickBehavior,
};
use crate::{
chain::{
block_queue::{BlockQueue, ProcessBlockResponse},
chain::Chain,
checkpoints::HeaderCheckpoint,
CFHeaderChanges, ChainState, FilterCheck, HeaderSyncEffect, IndexedHeader,
},
error::FetchBlockError,
messages::ClientRequest,
network::{
peer_map::PeerMap, LastBlockMonitor, MainThreadMessage, PeerId, PeerMessage,
PeerThreadMessage,
},
Config, IndexedBlock, NodeState, Package,
};
use super::{
client::Client,
error::NodeError,
messages::{ClientMessage, Event, Info, SyncUpdate, Warning},
Dialog,
};
pub(crate) const WTXID_VERSION: u32 = 70016;
const LOOP_TIMEOUT: Duration = Duration::from_millis(10);
type PeerRequirement = usize;
#[derive(Debug)]
pub struct Node {
state: NodeState,
chain: Chain,
peer_map: PeerMap,
required_peers: PeerRequirement,
dialog: Arc<Dialog>,
block_queue: BlockQueue,
client_recv: UnboundedReceiver<ClientMessage>,
peer_recv: Receiver<PeerThreadMessage>,
}
impl Node {
pub(crate) fn new(network: Network, config: Config) -> (Self, Client) {
let Config {
required_peers,
white_list,
whitelist_only,
data_path: _,
chain_state,
connection_type,
peer_timeout_config,
filter_type,
block_type,
} = config;
let (info_tx, info_rx) = mpsc::channel::<Info>(32);
let (warn_tx, warn_rx) = mpsc::unbounded_channel::<Warning>();
let (event_tx, event_rx) = mpsc::unbounded_channel::<Event>();
let (ctx, crx) = mpsc::unbounded_channel::<ClientMessage>();
let client = Client::new(info_rx, warn_rx, event_rx, ctx);
let dialog = Arc::new(Dialog::new(info_tx, warn_tx, event_tx));
let state = NodeState::Behind;
let (mtx, mrx) = mpsc::channel::<PeerThreadMessage>(32);
let peer_map = PeerMap::new(
mtx,
network,
block_type,
white_list,
whitelist_only,
Arc::clone(&dialog),
connection_type,
peer_timeout_config,
);
let chain_state = chain_state.unwrap_or(ChainState::Checkpoint(
HeaderCheckpoint::from_genesis(network),
));
let chain = Chain::new(
network,
chain_state,
Arc::clone(&dialog),
required_peers,
filter_type,
);
(
Self {
state,
chain,
peer_map,
required_peers: required_peers.into(),
dialog,
block_queue: BlockQueue::new(),
client_recv: crx,
peer_recv: mrx,
},
client,
)
}
pub async fn run(mut self) -> Result<(), NodeError> {
crate::debug!("Starting node");
crate::debug!(format!(
"Configured connection requirement: {} peers",
self.required_peers
));
let mut last_block = LastBlockMonitor::new();
let mut interval = tokio::time::interval(LOOP_TIMEOUT);
interval.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
self.advance_state(&mut last_block).await;
self.dispatch().await?;
self.get_blocks().await;
select! {
peer = self.peer_recv.recv() => {
match peer {
Some(peer_thread) => {
match peer_thread.message {
PeerMessage::Version(version) => {
self.peer_map.set_services(peer_thread.nonce, version.services);
let response = self.handle_version(peer_thread.nonce, version).await?;
self.peer_map.send_message(peer_thread.nonce, response).await;
crate::debug!(format!("[{}]: version", peer_thread.nonce));
}
PeerMessage::Headers(headers) => {
last_block.reset();
crate::debug!(format!("[{}]: headers", peer_thread.nonce));
match self.handle_headers(peer_thread.nonce, headers).await {
Some(response) => {
self.peer_map.send_message(peer_thread.nonce, response).await;
}
None => continue,
}
}
PeerMessage::FilterHeaders(cf_headers) => {
crate::debug!(format!("[{}]: filter headers", peer_thread.nonce));
match self.handle_cf_headers(peer_thread.nonce, cf_headers).await {
Some(response) => {
self.peer_map.broadcast(response).await;
}
None => continue,
}
}
PeerMessage::Filter(filter) => {
match self.handle_filter(peer_thread.nonce, filter).await {
Some(response) => {
self.peer_map.send_message(peer_thread.nonce, response).await;
}
None => continue,
}
}
PeerMessage::Block(block) => match self.handle_block(peer_thread.nonce, block).await {
Some(response) => {
self.peer_map.send_message(peer_thread.nonce, response).await;
}
None => continue,
},
PeerMessage::FeeFilter(feerate) => {
self.peer_map.set_broadcast_min(peer_thread.nonce, feerate);
}
}
},
_ => continue,
}
},
message = self.client_recv.recv() => {
if let Some(message) = message {
match message {
ClientMessage::Shutdown => return Ok(()),
ClientMessage::Broadcast(transaction) => {
self.broadcast_transaction(transaction).await;
},
ClientMessage::Rescan(height_opt) => {
if let Some(response) = self.rescan(height_opt) {
self.peer_map.broadcast(response).await;
}
},
ClientMessage::GetBlock(request) => {
let height_opt = self.chain.header_chain.height_of_hash(request.data());
if height_opt.is_none() {
let (_, oneshot) = request.into_values();
let err_reponse = oneshot.send(Err(FetchBlockError::UnknownHash));
if err_reponse.is_err() {
self.dialog.send_warning(Warning::ChannelDropped);
}
} else {
crate::debug!(
format!("Adding block {} to queue", request.data())
);
self.block_queue.add(request);
}
},
ClientMessage::BestBlock(request) => {
let (_, oneshot) = request.into_values();
let block_tree = &self.chain.header_chain;
let hash = block_tree.tip_hash();
let height = block_tree.height();
let checkpoint = HeaderCheckpoint::new(height, hash);
let send_result = oneshot.send(checkpoint);
if send_result.is_err() {
self.dialog.send_warning(Warning::ChannelDropped);
};
},
ClientMessage::AddPeer(peer) => {
self.peer_map.add_trusted_peer(peer);
},
ClientMessage::GetBroadcastMinFeeRate(request) => {
let (_, oneshot) = request.into_values();
let fee_rate = self.peer_map.broadcast_min();
let send_result = oneshot.send(fee_rate);
if send_result.is_err() {
self.dialog.send_warning(Warning::ChannelDropped);
};
}
ClientMessage::GetPeerInfo(request) => {
let (_, oneshot) = request.into_values();
let peers = self.peer_map.peer_info();
let send_result = oneshot.send(peers);
if send_result.is_err() {
self.dialog.send_warning(Warning::ChannelDropped);
};
}
ClientMessage::GetHeader(request) => {
let (height, oneshot) = request.into_values();
let header = self
.chain
.header_chain
.header_at_height(height)
.map(|h| IndexedHeader::new(height, h));
if oneshot.send(header).is_err() {
self.dialog.send_warning(Warning::ChannelDropped);
};
}
ClientMessage::HeightOfHash(request) => {
let (hash, oneshot) = request.into_values();
let height =
self.chain.header_chain.height_of_hash_canonical_only(hash);
if oneshot.send(height).is_err() {
self.dialog.send_warning(Warning::ChannelDropped);
};
}
ClientMessage::NoOp => (),
}
}
}
_ = interval.tick() => (),
}
}
}
async fn dispatch(&mut self) -> Result<(), NodeError> {
self.peer_map.clean().await;
let live = self.peer_map.live();
let required = self.next_required_peers();
if live < required {
self.dialog.send_warning(Warning::NeedConnections {
connected: live,
required,
});
let address = self
.peer_map
.next_peer()
.await
.ok_or(NodeError::NoReachablePeers)?;
if self.peer_map.dispatch(address).await.is_err() {
self.dialog.send_warning(Warning::CouldNotConnect);
}
}
Ok(())
}
async fn get_blocks(&mut self) {
if let Some(block_request) = self.pop_block_queue() {
crate::debug!("Sending block request to random peer");
self.peer_map.send_random(block_request).await;
}
}
async fn broadcast_transaction(&self, broadcast: ClientRequest<Package, Wtxid>) {
let mut queue = self.peer_map.tx_queue.lock().await;
let (transaction, oneshot) = broadcast.into_values();
queue.add_to_queue(transaction, oneshot);
drop(queue);
crate::debug!("Sending transaction to a random peer");
self.peer_map
.send_random(MainThreadMessage::BroadcastPending)
.await;
}
async fn advance_state(&mut self, last_block: &mut LastBlockMonitor) {
match self.state {
NodeState::Behind => (),
NodeState::HeadersSynced => {
if self.chain.is_cf_headers_synced() {
self.state = NodeState::FilterHeadersSynced;
}
}
NodeState::FilterHeadersSynced => {
if self.chain.is_filters_synced() {
self.state = NodeState::FiltersSynced;
let update = SyncUpdate::new(
HeaderCheckpoint::new(
self.chain.header_chain.height(),
self.chain.header_chain.tip_hash(),
),
self.chain.last_ten(),
);
self.dialog.send_event(Event::FiltersSynced(update));
}
}
NodeState::FiltersSynced => {
if last_block.stale() {
self.dialog.send_warning(Warning::PotentialStaleTip);
crate::debug!("Disconnecting from remote nodes to find new connections");
self.peer_map.broadcast(MainThreadMessage::Disconnect).await;
last_block.reset();
}
}
}
}
fn next_required_peers(&self) -> PeerRequirement {
match self.state {
NodeState::Behind => 1,
_ => self.required_peers,
}
}
async fn next_stateful_message(&mut self) -> Option<MainThreadMessage> {
if self.state == NodeState::Behind {
let headers = GetHeadersMessage {
version: WTXID_VERSION,
locator_hashes: self.chain.header_chain.locators(),
stop_hash: BlockHash::all_zeros(),
};
return Some(MainThreadMessage::GetHeaders(headers));
} else if !self.chain.is_cf_headers_synced() {
return Some(MainThreadMessage::GetFilterHeaders(
self.chain.next_cf_header_message(),
));
} else if !self.chain.is_filters_synced() {
return Some(MainThreadMessage::GetFilters(
self.chain.next_filter_message(),
));
}
None
}
async fn handle_version(
&mut self,
nonce: PeerId,
version_message: VersionMessage,
) -> Result<MainThreadMessage, NodeError> {
if version_message.version < WTXID_VERSION {
return Ok(MainThreadMessage::Disconnect);
}
match self.state {
NodeState::Behind => (),
_ => {
if !version_message.services.has(ServiceFlags::COMPACT_FILTERS)
|| !version_message.services.has(ServiceFlags::NETWORK)
{
self.dialog.send_warning(Warning::NoCompactFilters);
return Ok(MainThreadMessage::Disconnect);
}
}
}
self.peer_map.tried(nonce).await;
self.peer_map
.send_message(nonce, MainThreadMessage::SendAddrV2)
.await;
self.peer_map
.send_message(nonce, MainThreadMessage::WtxidRelay)
.await;
self.peer_map
.send_message(nonce, MainThreadMessage::Verack)
.await;
self.peer_map
.send_message(nonce, MainThreadMessage::SendHeaders)
.await;
if !self.peer_map.whitelist_only {
crate::debug!("Requesting new addresses");
self.peer_map
.send_message(nonce, MainThreadMessage::GetAddr)
.await;
}
if self.peer_map.live().eq(&self.required_peers) {
self.dialog.send_info(Info::ConnectionsMet).await;
}
let next_headers = GetHeadersMessage {
version: WTXID_VERSION,
locator_hashes: self.chain.header_chain.locators(),
stop_hash: BlockHash::all_zeros(),
};
Ok(MainThreadMessage::GetHeaders(next_headers))
}
async fn handle_headers(
&mut self,
peer_id: PeerId,
headers: Vec<Header>,
) -> Option<MainThreadMessage> {
let chain = &mut self.chain;
match chain.sync_chain(headers) {
Ok(effect) => match effect {
HeaderSyncEffect::Added => {
if self.state != NodeState::Behind {
self.state = NodeState::Behind;
}
self.chain.send_chain_update().await;
}
HeaderSyncEffect::Empty => {
if self.state == NodeState::Behind {
self.state = NodeState::HeadersSynced;
}
}
HeaderSyncEffect::Reorg(reorgs) => {
if self.state != NodeState::HeadersSynced {
self.state = NodeState::HeadersSynced;
}
self.chain.send_chain_update().await;
self.block_queue.remove(&reorgs);
}
},
Err(e) => {
self.dialog.send_warning(Warning::UnexpectedSyncError {
warning: format!("Unexpected header syncing error: {e}"),
});
self.peer_map.ban(peer_id).await;
return Some(MainThreadMessage::Disconnect);
}
}
self.next_stateful_message().await
}
async fn handle_cf_headers(
&mut self,
peer_id: PeerId,
cf_headers: CFHeaders,
) -> Option<MainThreadMessage> {
self.chain.send_chain_update().await;
match self.chain.sync_cf_headers(peer_id, cf_headers) {
Ok(potential_message) => match potential_message {
CFHeaderChanges::AddedToQueue => None,
CFHeaderChanges::Extended => self.next_stateful_message().await,
CFHeaderChanges::Conflict => {
self.dialog.send_warning(Warning::UnexpectedSyncError {
warning: "Found a conflict while peers are sending filter headers".into(),
});
Some(MainThreadMessage::Disconnect)
}
},
Err(e) => {
self.dialog.send_warning(Warning::UnexpectedSyncError {
warning: format!("Compact filter header syncing encountered an error: {e}"),
});
self.peer_map.ban(peer_id).await;
Some(MainThreadMessage::Disconnect)
}
}
}
async fn handle_filter(
&mut self,
peer_id: PeerId,
filter: CFilter,
) -> Option<MainThreadMessage> {
match self.chain.sync_filter(filter) {
Ok(potential_message) => {
let FilterCheck { was_last_in_batch } = potential_message;
if was_last_in_batch {
self.chain.send_chain_update().await;
if !self.chain.is_filters_synced() {
let next_filters = self.chain.next_filter_message();
return Some(MainThreadMessage::GetFilters(next_filters));
}
}
None
}
Err(e) => {
self.dialog.send_warning(Warning::UnexpectedSyncError {
warning: format!("Compact filter syncing encountered an error: {e}"),
});
self.peer_map.ban(peer_id).await;
Some(MainThreadMessage::Disconnect)
}
}
}
async fn handle_block(&mut self, peer_id: PeerId, block: Block) -> Option<MainThreadMessage> {
let block_hash = block.block_hash();
let height = match self.chain.header_chain.height_of_hash(block_hash) {
Some(height) => height,
None => {
self.dialog.send_warning(Warning::UnexpectedSyncError {
warning: "A block received does not have a known hash".into(),
});
self.peer_map.ban(peer_id).await;
return Some(MainThreadMessage::Disconnect);
}
};
if !block.check_merkle_root() {
self.dialog.send_warning(Warning::UnexpectedSyncError {
warning: "A block received does not have a valid merkle root".into(),
});
self.peer_map.ban(peer_id).await;
return Some(MainThreadMessage::Disconnect);
}
let process_block_response = self.block_queue.process_block(&block_hash);
match process_block_response {
ProcessBlockResponse::Accepted { block_recipient } => {
self.dialog
.send_info(Info::BlockReceived(block.block_hash()))
.await;
let send_err = block_recipient
.send(Ok(IndexedBlock::new(height, block)))
.is_err();
if send_err {
self.dialog.send_warning(Warning::ChannelDropped);
};
}
ProcessBlockResponse::LateResponse => {
crate::debug!(format!(
"Peer {} responded late to a request for hash {}",
peer_id, block_hash
));
}
ProcessBlockResponse::UnknownHash => {
crate::debug!(format!(
"Peer {} responded with an irrelevant block",
peer_id
));
}
}
None
}
fn pop_block_queue(&mut self) -> Option<MainThreadMessage> {
if matches!(
self.state,
NodeState::FilterHeadersSynced | NodeState::FiltersSynced
) {
let next_block_hash = self.block_queue.pop();
return next_block_hash.map(MainThreadMessage::GetBlock);
}
None
}
fn rescan(&mut self, height_opt: Option<u32>) -> Option<MainThreadMessage> {
match self.state {
NodeState::Behind => None,
NodeState::HeadersSynced => None,
_ => {
self.chain.clear_filters();
if let Some(height) = height_opt {
self.chain.header_chain.assume_checked_to(height);
}
self.state = NodeState::FilterHeadersSynced;
Some(MainThreadMessage::GetFilters(
self.chain.next_filter_message(),
))
}
}
}
}