use std::collections::BTreeMap;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::ops::Bound::{Excluded, Unbounded};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use dusk_consensus::config::{
MAX_BLOCK_SIZE, MAX_NUMBER_OF_FAULTS, MAX_NUMBER_OF_TRANSACTIONS,
};
use dusk_consensus::errors::HeaderError;
use dusk_consensus::merkle::merkle_root;
use node_data::get_current_timestamp;
use node_data::ledger::Block;
use node_data::message::Metadata;
use node_data::message::payload::{Candidate, GetResource, Inv, Quorum};
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use super::PresyncInfo;
use crate::chain::acceptor::Acceptor;
use crate::chain::header_validation::Validator;
use crate::{Network, database, vm};
const MAX_POOL_BLOCKS_SIZE: usize = 1000;
const MAX_BLOCKS_TO_REQUEST: u64 = 100;
const MAX_SYNC_PEERS: usize = 32;
const SYNC_TIMEOUT: Duration = Duration::from_secs(5);
const SYNC_ATTEMPTS: u8 = 3;
const LIVE_CANDIDATE_MAX_AGE: u64 = 30;
fn next_sync_peer(
peers: &BTreeMap<SocketAddr, u64>,
current_peer: SocketAddr,
next_height: u64,
) -> Option<SocketAddr> {
peers
.range((Excluded(current_peer), Unbounded))
.find(|(_, height)| **height >= next_height)
.or_else(|| {
peers
.range(..current_peer)
.find(|(_, height)| **height >= next_height)
})
.map(|(peer, _)| *peer)
}
fn prune_sync_peers(
peers: &mut BTreeMap<SocketAddr, u64>,
next_height: u64,
keep_peer: SocketAddr,
) {
peers.retain(|_, height| *height >= next_height);
while peers.len() > MAX_SYNC_PEERS {
let Some(peer_addr) =
peers.keys().copied().find(|peer| *peer != keep_peer)
else {
break;
};
peers.remove(&peer_addr);
}
}
fn remember_sync_peer(
peers: &mut BTreeMap<SocketAddr, u64>,
peer_addr: SocketAddr,
target_height: u64,
next_height: u64,
keep_peer: SocketAddr,
) {
peers
.entry(peer_addr)
.and_modify(|height| *height = (*height).max(target_height))
.or_insert(target_height);
prune_sync_peers(peers, next_height, keep_peer);
}
pub(super) struct OutOfSyncImpl<
DB: database::DB,
VM: vm::VMExecution,
N: Network,
> {
range: (u64, u64),
last_request: u64,
start_time: SystemTime,
pool: BTreeMap<u64, Block>,
peers: BTreeMap<SocketAddr, u64>,
remote_peer: SocketAddr,
retries: u8,
acc: Arc<RwLock<Acceptor<N, DB, VM>>>,
network: Arc<RwLock<N>>,
local_peer: SocketAddr,
}
impl<DB: database::DB, VM: vm::VMExecution, N: Network>
OutOfSyncImpl<DB, VM, N>
{
pub async fn new(
acc: Arc<RwLock<Acceptor<N, DB, VM>>>,
network: Arc<RwLock<N>>,
) -> Self {
let this_peer = *network.read().await.public_addr();
Self {
start_time: SystemTime::now(),
range: (0, 0),
last_request: 0,
pool: BTreeMap::new(),
peers: BTreeMap::new(),
acc,
local_peer: this_peer,
network,
remote_peer: SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
8000,
)),
retries: 0,
}
}
pub async fn on_entering(&mut self, presync: PresyncInfo) {
let peer_addr = presync.peer_addr;
let pool = presync.pool;
let curr_height = self.acc.read().await.get_curr_height().await;
self.retries = 0;
self.range = (curr_height + 1, presync.remote_height);
self.acc.write().await.abort_consensus().await;
self.drain_pool().await;
for b in &pool {
self.pool.insert(b.header().height, b.clone());
}
self.peers.clear();
self.peers.insert(peer_addr, self.range.1);
self.remote_peer = peer_addr;
if let Some(last_request) = self.request_pool_missing_blocks().await {
self.last_request = last_request
}
let (from, to) = &self.range;
info!(event = "entering", from, to, ?peer_addr);
for (_, b) in self.pool.clone() {
let _ = self.on_block_event(&b).await;
}
}
pub async fn on_exiting(&mut self) {
self.drain_pool().await;
self.peers.clear();
}
pub async fn drain_pool(&mut self) {
let curr_height = self.acc.read().await.get_curr_height().await;
self.pool.retain(|h, _| h >= &curr_height);
}
pub async fn on_quorum(
&mut self,
quorum: &Quorum,
metadata: Option<&Metadata>,
) {
let target_height = quorum.header.round.saturating_sub(1);
if let Some(peer_addr) = metadata.map(|m| m.src_addr) {
remember_sync_peer(
&mut self.peers,
peer_addr,
target_height,
self.range.0,
self.remote_peer,
);
}
if self.range.1 < target_height {
debug!(
event = "update sync target due to quorum",
prev = self.range.1,
new = target_height,
);
self.range.1 = target_height;
if let Some(last_request) = self.request_pool_missing_blocks().await
{
self.last_request = last_request;
}
}
}
pub async fn on_candidate(
&mut self,
candidate: &Candidate,
) -> anyhow::Result<bool> {
let (tip_header, db, context_provisioners) = {
let acc = self.acc.read().await;
let tip_header = acc.tip_header().await;
let db = acc.db.clone();
let context_provisioners = {
let provisioners = acc.provisioners_list.read().await;
provisioners.clone()
};
(tip_header, db, context_provisioners)
};
let candidate_header = candidate.candidate.header();
if candidate_header.height != tip_header.height + 1
|| candidate_header.prev_block_hash != tip_header.hash
{
return Ok(false);
}
let now = get_current_timestamp();
if candidate_header
.timestamp
.saturating_add(LIVE_CANDIDATE_MAX_AGE)
< now
{
debug!(
event = "ignore stale sync candidate",
candidate_timestamp = candidate_header.timestamp,
now,
height = candidate_header.height,
iter = candidate_header.iteration,
);
return Ok(false);
}
let expected_generator = context_provisioners.current().get_generator(
candidate_header.iteration,
tip_header.seed,
candidate_header.height,
);
let validator = Validator::new(db, &tip_header, &context_provisioners);
match validator
.verify_block_header_fields(
candidate_header,
&expected_generator,
false,
)
.await
{
Ok(_) => {}
Err(
err @ (HeaderError::Storage(_, _) | HeaderError::Generic(_)),
) => {
return Err(err.into());
}
Err(err) => {
debug!(
event = "ignore sync candidate with invalid header",
?err,
height = candidate_header.height,
iter = candidate_header.iteration,
);
return Ok(false);
}
}
let live_tip = self.acc.read().await.tip_header().await;
if live_tip.height != tip_header.height
|| live_tip.hash != tip_header.hash
{
debug!(
event = "skip sync candidate after tip change",
snapshot_height = tip_header.height,
live_height = live_tip.height,
);
return Ok(false);
}
if let Err(err) = validate_candidate_payload(candidate) {
debug!(
event = "ignore malformed sync candidate",
?err,
height = candidate_header.height,
iter = candidate_header.iteration,
);
return Ok(false);
}
let live_tip = self.acc.read().await.tip_header().await;
if live_tip.height != tip_header.height
|| live_tip.hash != tip_header.hash
{
debug!(
event = "skip sync candidate after tip change",
snapshot_height = tip_header.height,
live_height = live_tip.height,
);
return Ok(false);
}
self.acc.write().await.restart_consensus().await;
Ok(true)
}
pub async fn on_block_event(
&mut self,
blk: &Block,
) -> anyhow::Result<bool> {
let mut acc = self.acc.write().await;
let block_height = blk.header().height;
let current_height = acc.get_curr_height().await;
if block_height <= current_height {
return Ok(false);
}
if block_height != current_height + 1
&& self.retries == SYNC_ATTEMPTS - 1
&& self.is_timeout_expired()
{
acc.restart_consensus().await;
return Ok(true);
}
if block_height > self.range.1 {
debug!(
event = "update sync target",
prev = self.range.1,
new = block_height,
);
self.range.1 = block_height
}
if block_height == current_height + 1 {
acc.accept_block(blk, false).await?;
self.start_time = SystemTime::now();
self.retries = 0;
debug!(
event = "accepted block",
block_height,
last_request = self.last_request,
);
self.range.0 = block_height + 1;
for height in self.range.0..=self.range.1 {
if let Some(blk) = self.pool.get(&height) {
acc.accept_block(blk, false).await?;
self.start_time = SystemTime::now();
self.range.0 += 1;
debug!(
event = "accepted next block",
block_height = height,
last_request = self.last_request,
);
} else {
if let Some((&h, _)) = self.pool.first_key_value() {
if h < self.last_request {
self.request_missing_block(height).await;
}
}
break;
}
}
self.pool.retain(|k, _| k >= &self.range.0);
prune_sync_peers(&mut self.peers, self.range.0, self.remote_peer);
debug!(
event = "pool drain",
pool_len = self.pool.len(),
last_request = self.last_request,
);
let tip = acc.get_curr_height().await;
if tip >= self.range.1 {
debug!(event = "sync target reached", height = tip);
self.pool.clear();
acc.restart_consensus().await;
return Ok(true);
}
return Ok(false);
}
let block_height = blk.header().height;
let pool_len = self.pool.len();
if self.pool.contains_key(&block_height) {
debug!(
event = "block skipped (already present)",
block_height, pool_len,
);
return Ok(false);
}
if self.last_request < current_height + (MAX_BLOCKS_TO_REQUEST / 3)
&& let Some(last_request) = self.request_pool_missing_blocks().await
{
self.last_request = last_request
}
if pool_len >= MAX_POOL_BLOCKS_SIZE
&& let Some(entry) = self.pool.last_entry()
{
let stored_height = *entry.key();
if stored_height > block_height {
debug!(
event = "block removed",
block_height, stored_height, pool_len,
);
entry.remove();
} else {
debug!(event = "block skipped", block_height, pool_len);
return Ok(false);
}
}
self.pool.insert(block_height, blk.clone());
debug!(
event = "block saved",
block_height,
pool_len = self.pool.len(),
);
Ok(false)
}
fn is_timeout_expired(&self) -> bool {
self.start_time.checked_add(SYNC_TIMEOUT).unwrap() <= SystemTime::now()
}
pub async fn on_heartbeat(&mut self) -> anyhow::Result<bool> {
if self.is_timeout_expired() {
if self.retries == SYNC_ATTEMPTS - 1 {
debug!(event = "timer expired", retries = self.retries);
self.acc.write().await.restart_consensus().await;
return Ok(true);
}
self.retries += 1;
prune_sync_peers(&mut self.peers, self.range.0, self.remote_peer);
if let Some(peer_addr) =
next_sync_peer(&self.peers, self.remote_peer, self.range.0)
{
debug!(
event = "rotate sync peer",
prev = ?self.remote_peer,
new = ?peer_addr,
next_height = self.range.0,
);
self.remote_peer = peer_addr;
}
if let Some(last_request) = self.request_pool_missing_blocks().await
{
self.last_request = last_request
}
self.start_time = SystemTime::now();
}
Ok(false)
}
async fn request_missing_block(&self, height: u64) {
let mut inv = Inv::new(0);
inv.add_block_from_height(height);
let get_resource =
GetResource::new(inv, Some(self.local_peer), u64::MAX, 1);
debug!(event = "request block", height);
if let Err(e) = self
.network
.read()
.await
.send_to_alive_peers(get_resource.into(), 2)
.await
{
warn!(event = "Unable to request missing block", ?e);
}
}
async fn request_pool_missing_blocks(&self) -> Option<u64> {
let mut last_request = None;
let mut inv = Inv::new(0);
let mut inv_count = 0;
for height in self.range.0..=self.range.1 {
if self.pool.contains_key(&height) {
continue;
}
inv.add_block_from_height(height);
inv_count += 1;
last_request = Some(height);
if inv_count >= MAX_BLOCKS_TO_REQUEST {
break;
}
}
if !inv.inv_list.is_empty() {
debug!(
event = "request blocks",
target = last_request.unwrap_or_default(),
);
let get_resource =
GetResource::new(inv, Some(self.local_peer), u64::MAX, 1);
if let Err(e) = self
.network
.read()
.await
.send_to_peer(get_resource.into(), self.remote_peer)
.await
{
debug!(event = "Unable to request missing blocks", ?e);
warn!("Unable to request missing blocks {e}");
return None;
}
}
last_request
}
}
fn validate_candidate_payload(candidate: &Candidate) -> anyhow::Result<()> {
let candidate_block = &candidate.candidate;
let candidate_header = candidate_block.header();
if candidate_block.txs().len() > MAX_NUMBER_OF_TRANSACTIONS {
anyhow::bail!("candidate contains too many transactions");
}
if candidate_block.faults().len() > MAX_NUMBER_OF_FAULTS {
anyhow::bail!("candidate contains too many faults");
}
let candidate_size = candidate_block
.size()
.map_err(|_| anyhow::anyhow!("unknown block size"))?;
if candidate_size > MAX_BLOCK_SIZE {
anyhow::bail!("candidate exceeds max block size");
}
let tx_digests: Vec<_> =
candidate_block.txs().iter().map(|tx| tx.digest()).collect();
if merkle_root(&tx_digests[..]) != candidate_header.txroot {
anyhow::bail!("candidate has invalid tx merkle root");
}
let fault_digests: Vec<_> = candidate_block
.faults()
.iter()
.map(|fault| fault.digest())
.collect();
if merkle_root(&fault_digests[..]) != candidate_header.faultroot {
anyhow::bail!("candidate has invalid fault merkle root");
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn rotates_to_next_eligible_sync_peer() {
let current_peer: SocketAddr = "127.0.0.1:9000".parse().unwrap();
let alt_one: SocketAddr = "127.0.0.1:9001".parse().unwrap();
let alt_two: SocketAddr = "127.0.0.1:9002".parse().unwrap();
let peers = BTreeMap::from([
(current_peer, 100),
(alt_one, 105),
(alt_two, 110),
]);
assert_eq!(next_sync_peer(&peers, current_peer, 101), Some(alt_one),);
}
#[test]
fn wraps_to_earliest_eligible_sync_peer() {
let first_peer: SocketAddr = "127.0.0.1:9000".parse().unwrap();
let second_peer: SocketAddr = "127.0.0.1:9001".parse().unwrap();
let current_peer: SocketAddr = "127.0.0.1:9002".parse().unwrap();
let peers = BTreeMap::from([
(first_peer, 104),
(second_peer, 105),
(current_peer, 110),
]);
assert_eq!(next_sync_peer(&peers, current_peer, 101), Some(first_peer),);
}
#[test]
fn prunes_sync_peers_below_next_height() {
let keep_peer: SocketAddr = "127.0.0.1:9000".parse().unwrap();
let stale_peer: SocketAddr = "127.0.0.1:9001".parse().unwrap();
let useful_peer: SocketAddr = "127.0.0.1:9002".parse().unwrap();
let mut peers = BTreeMap::from([
(keep_peer, 110),
(stale_peer, 100),
(useful_peer, 111),
]);
prune_sync_peers(&mut peers, 105, keep_peer);
assert_eq!(peers.len(), 2);
assert!(peers.contains_key(&keep_peer));
assert!(peers.contains_key(&useful_peer));
assert!(!peers.contains_key(&stale_peer));
}
#[test]
fn caps_sync_peer_set_without_dropping_current_peer() {
let keep_peer: SocketAddr = "127.0.0.1:9050".parse().unwrap();
let mut peers = BTreeMap::new();
for port in 9000..=9033 {
let peer: SocketAddr = format!("127.0.0.1:{port}").parse().unwrap();
peers.insert(peer, 200);
}
peers.insert(keep_peer, 200);
prune_sync_peers(&mut peers, 150, keep_peer);
assert_eq!(peers.len(), MAX_SYNC_PEERS);
assert!(peers.contains_key(&keep_peer));
}
}