mod insync;
mod outofsync;
mod stalled;
use std::collections::{HashMap, HashSet};
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::Duration;
use dusk_consensus::config::is_emergency_block;
use metrics::counter;
use node_data::ledger::{to_str, Attestation, Block};
use node_data::message::payload::{Inv, Quorum, RatificationResult, Vote};
use node_data::message::Metadata;
use tokio::sync::RwLock;
use tokio::time::Instant;
use tracing::{debug, error, info, trace, warn};
use self::insync::InSyncImpl;
use self::outofsync::OutOfSyncImpl;
use self::stalled::StalledChainFSM;
use super::acceptor::{Acceptor, RevertTarget};
use crate::database::{ConsensusStorage, Ledger};
use crate::{database, vm, Network};
use anyhow::{anyhow, Result};
const DEFAULT_ATT_CACHE_EXPIRY: Duration = Duration::from_secs(60);
const DEFAULT_HOPS_LIMIT: u16 = 16;
type SharedHashSet = Arc<RwLock<HashSet<[u8; 32]>>>;
#[derive(Clone)]
struct PresyncInfo {
peer_addr: SocketAddr,
tip_height: u64,
remote_height: u64,
expiry: Instant,
pool: Vec<Block>,
}
impl PresyncInfo {
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
fn from_block(
peer_addr: SocketAddr,
remote_block: Block,
tip_height: u64,
) -> Self {
let remote_height = remote_block.header().height;
let mut info = Self::from_height(peer_addr, remote_height, tip_height);
info.pool.push(remote_block);
info
}
fn from_height(
peer_addr: SocketAddr,
remote_height: u64,
tip_height: u64,
) -> Self {
Self {
peer_addr,
remote_height,
expiry: Instant::now().checked_add(Self::DEFAULT_TIMEOUT).unwrap(),
tip_height,
pool: vec![],
}
}
fn start_height(&self) -> u64 {
self.tip_height
}
}
enum State<N: Network, DB: database::DB, VM: vm::VMExecution> {
InSync(InSyncImpl<DB, VM, N>),
OutOfSync(OutOfSyncImpl<DB, VM, N>),
}
pub(crate) struct SimpleFSM<N: Network, DB: database::DB, VM: vm::VMExecution> {
curr: State<N, DB, VM>,
acc: Arc<RwLock<Acceptor<N, DB, VM>>>,
network: Arc<RwLock<N>>,
blacklisted_blocks: SharedHashSet,
attestations_cache: HashMap<[u8; 32], (Attestation, Instant)>,
stalled_sm: StalledChainFSM<DB, N, VM>,
}
impl<N: Network, DB: database::DB, VM: vm::VMExecution> SimpleFSM<N, DB, VM> {
pub async fn new(
acc: Arc<RwLock<Acceptor<N, DB, VM>>>,
network: Arc<RwLock<N>>,
) -> Self {
let blacklisted_blocks = Arc::new(RwLock::new(HashSet::new()));
let stalled_sm = StalledChainFSM::new_with_acc(acc.clone()).await;
let curr = State::InSync(InSyncImpl::<DB, VM, N>::new(
acc.clone(),
network.clone(),
blacklisted_blocks.clone(),
));
Self {
curr,
acc,
network: network.clone(),
blacklisted_blocks,
attestations_cache: Default::default(),
stalled_sm,
}
}
pub async fn on_failed_consensus(&mut self) {
self.acc.write().await.restart_consensus().await;
}
pub async fn on_quorum(
&mut self,
quorum: &Quorum,
metadata: Option<&Metadata>,
) {
match &mut self.curr {
State::OutOfSync(oos) => oos.on_quorum(quorum).await,
State::InSync(is) => is.on_quorum(quorum, metadata).await,
}
}
pub async fn on_block_event(
&mut self,
mut blk: Block,
metadata: Option<Metadata>,
) -> anyhow::Result<Option<Block>> {
let block_hash = &blk.header().hash;
if self.blacklisted_blocks.read().await.contains(block_hash) {
info!(
event = "block discarded",
reason = "blacklisted",
hash = to_str(&blk.header().hash),
height = blk.header().height,
iter = blk.header().iteration,
);
return Ok(None);
}
if !Self::is_block_attested(&blk)
&& !is_emergency_block(blk.header().iteration)
{
if let Err(err) = self.attach_blk_att(&mut blk) {
warn!(event = "block discarded", ?err);
return Ok(None);
}
}
let fsm_res = match &mut self.curr {
State::InSync(ref mut curr) => {
if let Some(presync) =
curr.on_block_event(&blk, metadata).await?
{
curr.on_exiting().await;
let mut next = OutOfSyncImpl::new(
self.acc.clone(),
self.network.clone(),
)
.await;
next.on_entering(presync).await;
self.curr = State::OutOfSync(next);
}
anyhow::Ok(())
}
State::OutOfSync(ref mut curr) => {
if curr.on_block_event(&blk).await? {
curr.on_exiting().await;
let mut next = InSyncImpl::new(
self.acc.clone(),
self.network.clone(),
self.blacklisted_blocks.clone(),
);
next.on_entering(&blk).await.map_err(|e| {
error!("Unable to enter in_sync state: {e}");
e
})?;
self.curr = State::InSync(next);
}
anyhow::Ok(())
}
};
let res = self.stalled_sm.on_block_received(&blk).await.clone();
match res {
stalled::State::StalledOnFork(local_hash_at_fork, remote_blk) => {
info!(
event = "stalled on fork",
local_hash = to_str(&local_hash_at_fork),
remote_hash = to_str(&remote_blk.header().hash),
remote_height = remote_blk.header().height,
);
let mut acc = self.acc.write().await;
let prev_local_state_root = acc.db.read().await.view(|t| {
let local_blk = t
.block_header(&local_hash_at_fork)?
.expect("local hash should exist");
let prev_blk = t
.block_header(&local_blk.prev_block_hash)?
.expect("prev block hash should exist");
anyhow::Ok(prev_blk.state_hash)
})?;
match acc
.try_revert(RevertTarget::Commit(prev_local_state_root))
.await
{
Ok(_) => {
counter!("dusk_revert_count").increment(1);
info!(event = "reverted to last finalized");
info!(
event = "recovery block",
height = remote_blk.header().height,
hash = to_str(&remote_blk.header().hash),
);
acc.accept_block(&remote_blk, true).await?;
self.blacklisted_blocks
.write()
.await
.insert(local_hash_at_fork);
if let Err(err) =
self.stalled_sm.reset(remote_blk.header())
{
info!(
event = "revert failed",
err = format!("{err:?}")
);
}
}
Err(e) => {
error!(event = "revert failed", err = format!("{e:?}"));
return Ok(None);
}
}
}
stalled::State::Stalled(_) => {
self.blacklisted_blocks.write().await.clear();
}
_ => {}
}
fsm_res?;
Ok(Some(blk))
}
async fn flood_request_block(&mut self, hash: [u8; 32], att: Attestation) {
if self.attestations_cache.contains_key(&hash) {
return;
}
let expiry = Instant::now()
.checked_add(DEFAULT_ATT_CACHE_EXPIRY)
.unwrap();
self.attestations_cache.insert(hash, (att, expiry));
let mut inv = Inv::new(1);
inv.add_candidate_from_hash(hash);
flood_request(&self.network, &inv).await;
}
pub(crate) async fn on_success_quorum(
&mut self,
qmsg: &Quorum,
metadata: Option<Metadata>,
) {
self.clean_att_cache();
if let RatificationResult::Success(Vote::Valid(candidate)) =
qmsg.att.result
{
let db = self.acc.read().await.db.clone();
let tip_header = self.acc.read().await.tip_header().await;
let tip_height = tip_header.height;
let quorum_height = qmsg.header.round;
if let Ok(blk_exists) =
db.read().await.view(|t| t.block_exists(&candidate))
{
if blk_exists {
warn!("skipping Quorum for known block");
return;
}
};
let quorum_blk = if quorum_height > tip_height + 1 {
None
} else if (quorum_height == tip_height + 1)
|| (quorum_height == tip_height && tip_header.hash != candidate)
{
let res = db.read().await.view(|t| t.candidate(&candidate));
res.unwrap_or_default()
} else {
None
};
let attestation = qmsg.att;
if let Some(mut blk) = quorum_blk {
info!(
event = "New block",
src = "Quorum msg",
height = blk.header().height,
iter = blk.header().iteration,
hash = to_str(&blk.header().hash)
);
blk.set_attestation(attestation);
let res = self.on_block_event(blk, metadata).await;
match res {
Ok(_) => {}
Err(e) => {
error!("Error on block handling: {e}");
}
}
} else {
debug!(
event = "Candidate not found. Requesting it to the network",
hash = to_str(&candidate),
height = quorum_height,
);
self.flood_request_block(candidate, attestation).await;
}
} else {
error!("Invalid Quorum message");
}
}
pub(crate) async fn on_heartbeat_event(&mut self) -> anyhow::Result<()> {
self.stalled_sm.on_heartbeat_event().await;
match &mut self.curr {
State::InSync(ref mut curr) => {
if curr.on_heartbeat().await? {
curr.on_exiting().await;
let next = OutOfSyncImpl::new(
self.acc.clone(),
self.network.clone(),
)
.await;
self.curr = State::OutOfSync(next);
}
}
State::OutOfSync(ref mut curr) => {
if curr.on_heartbeat().await? {
curr.on_exiting().await;
let next = InSyncImpl::new(
self.acc.clone(),
self.network.clone(),
self.blacklisted_blocks.clone(),
);
self.curr = State::InSync(next);
}
}
};
Ok(())
}
fn is_block_attested(blk: &Block) -> bool {
blk.header().att != Attestation::default()
}
fn attach_blk_att(&mut self, blk: &mut Block) -> Result<()> {
let block_hash = blk.header().hash;
if let Some((att, _)) = self.attestations_cache.get(&block_hash) {
blk.set_attestation(*att);
} else {
return Err(anyhow!(
"Attestation not found for {}",
hex::encode(block_hash)
));
}
self.clean_att_cache();
self.attestations_cache.remove(&block_hash);
Ok(())
}
fn clean_att_cache(&mut self) {
let now = Instant::now();
self.attestations_cache
.retain(|_, (_, expiry)| *expiry > now);
}
}
async fn flood_request<N: Network>(network: &Arc<RwLock<N>>, inv: &Inv) {
debug!(event = "flood_request", ?inv);
if let Err(err) = network
.read()
.await
.flood_request(inv, None, DEFAULT_HOPS_LIMIT)
.await
{
warn!("could not request block {err}")
};
}