use std::collections::BTreeMap;
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use node_data::ledger::Block;
use node_data::message::payload::{GetResource, Inv, Quorum};
use tokio::sync::RwLock;
use tracing::{debug, info, warn};
use super::PresyncInfo;
use crate::chain::acceptor::Acceptor;
use crate::{database, vm, Network};
const MAX_POOL_BLOCKS_SIZE: usize = 1000;
const MAX_BLOCKS_TO_REQUEST: u64 = 100;
const SYNC_TIMEOUT: Duration = Duration::from_secs(5);
pub(super) struct OutOfSyncImpl<
DB: database::DB,
VM: vm::VMExecution,
N: Network,
> {
range: (u64, u64),
last_request: u64,
start_time: SystemTime,
pool: BTreeMap<u64, Block>,
remote_peer: SocketAddr,
attempts: u8,
acc: Arc<RwLock<Acceptor<N, DB, VM>>>,
network: Arc<RwLock<N>>,
local_peer: SocketAddr,
}
impl<DB: database::DB, VM: vm::VMExecution, N: Network>
OutOfSyncImpl<DB, VM, N>
{
pub async fn new(
acc: Arc<RwLock<Acceptor<N, DB, VM>>>,
network: Arc<RwLock<N>>,
) -> Self {
let this_peer = *network.read().await.public_addr();
Self {
start_time: SystemTime::now(),
range: (0, 0),
last_request: 0,
pool: BTreeMap::new(),
acc,
local_peer: this_peer,
network,
remote_peer: SocketAddr::V4(SocketAddrV4::new(
Ipv4Addr::new(127, 0, 0, 1),
8000,
)),
attempts: 3,
}
}
pub async fn on_entering(&mut self, presync: PresyncInfo) {
let peer_addr = presync.peer_addr;
let pool = presync.pool;
let curr_height = self.acc.read().await.get_curr_height().await;
self.range = (curr_height + 1, presync.remote_height);
self.drain_pool().await;
for b in &pool {
self.pool.insert(b.header().height, b.clone());
}
self.remote_peer = peer_addr;
if let Some(last_request) = self.request_pool_missing_blocks().await {
self.last_request = last_request
}
let (from, to) = &self.range;
info!(event = "entering", from, to, ?peer_addr);
for (_, b) in self.pool.clone() {
let _ = self.on_block_event(&b).await;
}
}
pub async fn on_exiting(&mut self) {
self.drain_pool().await;
}
pub async fn drain_pool(&mut self) {
let curr_height = self.acc.read().await.get_curr_height().await;
self.pool.retain(|h, _| h >= &curr_height);
}
pub async fn on_quorum(&mut self, quorum: &Quorum) {
let prev_quorum_height = quorum.header.round - 1;
if self.range.1 < prev_quorum_height {
debug!(
event = "update sync target due to quorum",
prev = self.range.1,
new = prev_quorum_height,
);
self.range.1 = prev_quorum_height;
self.request_pool_missing_blocks().await;
}
}
pub async fn on_block_event(
&mut self,
blk: &Block,
) -> anyhow::Result<bool> {
let mut acc = self.acc.write().await;
let block_height = blk.header().height;
if self.attempts == 0 && self.is_timeout_expired() {
acc.restart_consensus().await;
return Ok(true);
}
let current_height = acc.get_curr_height().await;
if block_height <= current_height {
return Ok(false);
}
if block_height > self.range.1 {
debug!(
event = "update sync target",
prev = self.range.1,
new = block_height,
);
self.range.1 = block_height
}
if block_height == current_height + 1 {
acc.accept_block(blk, false).await?;
self.start_time = SystemTime::now();
debug!(
event = "accepted block",
block_height,
last_request = self.last_request,
);
self.range.0 = block_height + 1;
for height in self.range.0..=self.range.1 {
if let Some(blk) = self.pool.get(&height) {
acc.accept_block(blk, false).await?;
self.start_time = SystemTime::now();
self.range.0 += 1;
debug!(
event = "accepted next block",
block_height = height,
last_request = self.last_request,
);
} else {
if let Some((&h, _)) = self.pool.first_key_value() {
if h < self.last_request {
self.request_missing_block(height).await;
}
}
break;
}
}
self.pool.retain(|k, _| k >= &self.range.0);
debug!(
event = "pool drain",
pool_len = self.pool.len(),
last_request = self.last_request,
);
let tip = acc.get_curr_height().await;
if tip >= self.range.1 {
debug!(event = "sync target reached", height = tip);
self.pool.clear();
acc.restart_consensus().await;
return Ok(true);
}
return Ok(false);
}
let block_height = blk.header().height;
let pool_len = self.pool.len();
if self.pool.contains_key(&block_height) {
debug!(
event = "block skipped (already present)",
block_height, pool_len,
);
return Ok(false);
}
if self.last_request < current_height + (MAX_BLOCKS_TO_REQUEST / 3) {
if let Some(last_request) = self.request_pool_missing_blocks().await
{
self.last_request = last_request
}
}
if pool_len >= MAX_POOL_BLOCKS_SIZE {
if let Some(entry) = self.pool.last_entry() {
let stored_height = *entry.key();
if stored_height > block_height {
debug!(
event = "block removed",
block_height, stored_height, pool_len,
);
entry.remove();
} else {
debug!(event = "block skipped", block_height, pool_len);
return Ok(false);
}
}
}
self.pool.insert(block_height, blk.clone());
debug!(
event = "block saved",
block_height,
pool_len = self.pool.len(),
);
Ok(false)
}
fn is_timeout_expired(&self) -> bool {
self.start_time.checked_add(SYNC_TIMEOUT).unwrap() <= SystemTime::now()
}
pub async fn on_heartbeat(&mut self) -> anyhow::Result<bool> {
if self.is_timeout_expired() {
if self.attempts == 0 {
debug!(event = "timer expired", attempts = self.attempts);
self.acc.write().await.restart_consensus().await;
return Ok(true);
}
if let Some(last_request) = self.request_pool_missing_blocks().await
{
self.last_request = last_request
}
self.start_time = SystemTime::now();
self.attempts -= 1;
}
Ok(false)
}
async fn request_missing_block(&self, height: u64) {
let mut inv = Inv::new(0);
inv.add_block_from_height(height);
let get_resource =
GetResource::new(inv, Some(self.local_peer), u64::MAX, 1);
debug!(event = "request block", height);
if let Err(e) = self
.network
.read()
.await
.send_to_alive_peers(get_resource.into(), 2)
.await
{
warn!(event = "Unable to request missing block", ?e);
}
}
async fn request_pool_missing_blocks(&self) -> Option<u64> {
let mut last_request = None;
let mut inv = Inv::new(0);
let mut inv_count = 0;
for height in self.range.0..=self.range.1 {
if self.pool.contains_key(&height) {
continue;
}
inv.add_block_from_height(height);
inv_count += 1;
last_request = Some(height);
if inv_count >= MAX_BLOCKS_TO_REQUEST {
break;
}
}
if !inv.inv_list.is_empty() {
debug!(
event = "request blocks",
target = last_request.unwrap_or_default(),
);
let get_resource =
GetResource::new(inv, Some(self.local_peer), u64::MAX, 1);
if let Err(e) = self
.network
.read()
.await
.send_to_peer(get_resource.into(), self.remote_peer)
.await
{
debug!(event = "Unable to request missing blocks", ?e);
warn!("Unable to request missing blocks {e}");
return None;
}
}
last_request
}
}