mod acceptor;
mod consensus;
mod fallback;
mod fsm;
mod genesis;
mod header_validation;
mod metrics;
use std::collections::HashMap;
use std::ops::Deref;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use async_trait::async_trait;
use dusk_consensus::config::is_emergency_block;
use dusk_consensus::errors::ConsensusError;
use dusk_core::abi::ContractId;
use dusk_core::signatures::bls::PublicKey as BlsPublicKey;
pub use header_validation::verify_att;
use node_data::events::Event;
use node_data::ledger::{to_str, BlockWithLabel, Label};
use node_data::message::payload::RatificationResult;
use node_data::message::{AsyncQueue, Payload, Topics};
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
use tokio::time::{sleep_until, Instant};
use tracing::{debug, error, info, warn};
use self::acceptor::Acceptor;
use self::fsm::SimpleFSM;
#[cfg(feature = "archive")]
use crate::archive::Archive;
use crate::database::rocksdb::MD_HASH_KEY;
use crate::database::{Ledger, Metadata};
use crate::{database, vm, LongLivedService, Message, Network};
const TOPICS: &[u8] = &[
Topics::Block as u8,
Topics::Candidate as u8,
Topics::Validation as u8,
Topics::Ratification as u8,
Topics::Quorum as u8,
Topics::ValidationQuorum as u8,
];
const HEARTBEAT_SEC: Duration = Duration::from_secs(3);
pub struct ChainSrv<N: Network, DB: database::DB, VM: vm::VMExecution> {
inbound: AsyncQueue<Message>,
keys_path: String,
acceptor: Option<Arc<RwLock<Acceptor<N, DB, VM>>>>,
max_consensus_queue_size: usize,
event_sender: Sender<Event>,
genesis_timestamp: u64,
dusk_key: BlsPublicKey,
finality_activation: u64,
blob_expire_after: u64,
module_shading: HashMap<ContractId, Vec<(u64, u64)>>,
#[cfg(feature = "archive")]
archive: Archive,
}
#[async_trait]
impl<N: Network, DB: database::DB, VM: vm::VMExecution>
LongLivedService<N, DB, VM> for ChainSrv<N, DB, VM>
{
async fn initialize(
&mut self,
network: Arc<RwLock<N>>,
db: Arc<RwLock<DB>>,
vm: Arc<RwLock<VM>>,
) -> anyhow::Result<()> {
let tip = Self::load_tip(
db.read().await.deref(),
vm.read().await.deref(),
self.genesis_timestamp,
)
.await?;
let acc = Acceptor::init_consensus(
&self.keys_path,
tip,
db,
network,
vm,
#[cfg(feature = "archive")]
self.archive.clone(),
self.max_consensus_queue_size,
self.event_sender.clone(),
self.dusk_key,
self.finality_activation,
self.blob_expire_after,
self.module_shading.clone(),
)
.await?;
self.acceptor = Some(Arc::new(RwLock::new(acc)));
Ok(())
}
async fn execute(
&mut self,
network: Arc<RwLock<N>>,
_db: Arc<RwLock<DB>>,
_vm: Arc<RwLock<VM>>,
) -> anyhow::Result<usize> {
LongLivedService::<N, DB, VM>::add_routes(
self,
TOPICS,
self.inbound.clone(),
&network,
)
.await?;
let acc = self.acceptor.as_mut().expect("initialize is called");
acc.write().await.spawn_task().await;
let mut fsm = SimpleFSM::new(acc.clone(), network.clone()).await;
let outbound_chan = acc.read().await.get_outbound_chan().await;
let result_chan = acc.read().await.get_result_chan().await;
let mut heartbeat = Instant::now().checked_add(HEARTBEAT_SEC).unwrap();
loop {
tokio::select! {
biased;
recv = result_chan.recv() => {
match recv? {
Err(ConsensusError::Canceled(round)) => {
debug!(event = "consensus canceled", round);
}
Err(err) => {
error!(event = "failed_consensus", ?err);
fsm.on_failed_consensus().await;
}
_ => {}
}
},
recv = self.inbound.recv() => {
let msg = recv?;
match msg.payload {
Payload::Candidate(_)
| Payload::Validation(_)
| Payload::Ratification(_)
| Payload::ValidationQuorum(_) => {
self.reroute_acceptor(msg).await;
}
Payload::Quorum(ref q) => {
fsm.on_quorum(q, msg.metadata.as_ref()).await;
self.reroute_acceptor(msg).await;
}
Payload::Block(blk) => {
info!(
event = "New block",
src = "Block msg",
height = blk.header().height,
iter = blk.header().iteration,
hash = to_str(&blk.header().hash),
metadata = ?msg.metadata,
);
match fsm.on_block_event(*blk, msg.metadata.clone()).await {
Ok(res) => {
if let Some(accepted_blk) = res {
if is_emergency_block(accepted_blk.header().iteration){
let mut eb_msg = Message::from(accepted_blk);
eb_msg.metadata = msg.metadata;
if let Err(e) = network.read().await.broadcast(&eb_msg).await {
warn!("Unable to re-broadcast Emergency Block: {e}");
}
}
}
}
Err(err) => {
error!(event = "fsm::on_event failed", src = "wire", err = ?err);
}
}
}
_ => {
warn!("invalid inbound message");
},
}
},
recv = outbound_chan.recv() => {
let msg = recv?;
if let Payload::Quorum(quorum) = &msg.payload {
if let RatificationResult::Success(_) = quorum.att.result {
fsm.on_success_quorum(quorum, msg.metadata.clone()).await;
}
}
if let Payload::GetResource(res) = &msg.payload {
if let Err(e) = network.read().await.flood_request(res.get_inv(), None, 16).await {
warn!("Unable to re-route message {e}");
}
} else if let Err(e) = network.read().await.broadcast(&msg).await {
warn!("Unable to broadcast message {e}");
}
},
_ = sleep_until(heartbeat) => {
if let Err(err) = fsm.on_heartbeat_event().await {
error!(event = "heartbeat_failed", ?err);
}
heartbeat = Instant::now().checked_add(HEARTBEAT_SEC).unwrap();
},
}
}
}
fn name(&self) -> &'static str {
"chain"
}
}
impl<N: Network, DB: database::DB, VM: vm::VMExecution> ChainSrv<N, DB, VM> {
#[allow(clippy::too_many_arguments)]
pub fn new(
keys_path: String,
max_inbound_size: usize,
event_sender: Sender<Event>,
genesis_timestamp: u64,
dusk_key: BlsPublicKey,
finality_activation: u64,
blob_expire_after: u64,
module_shading: HashMap<ContractId, Vec<(u64, u64)>>,
#[cfg(feature = "archive")] archive: Archive,
) -> Self {
info!(
"ChainSrv::new with keys_path: {}, max_inbound_size: {}",
keys_path, max_inbound_size
);
Self {
inbound: AsyncQueue::bounded(max_inbound_size, "chain_inbound"),
keys_path,
acceptor: None,
max_consensus_queue_size: max_inbound_size,
event_sender,
genesis_timestamp,
dusk_key,
finality_activation,
blob_expire_after,
module_shading,
#[cfg(feature = "archive")]
archive,
}
}
async fn load_tip(
db: &DB,
vm: &VM,
genesis_timestamp: u64,
) -> Result<BlockWithLabel> {
let stored_block = db.view(|t| {
anyhow::Ok(t.op_read(MD_HASH_KEY)?.and_then(|tip_hash| {
t.block(&tip_hash[..])
.expect("block to be found if metadata is set")
}))
})?;
let block = match stored_block {
Some(blk) => {
let (_, label) = db
.view(|t| t.block_label_by_height(blk.header().height))?
.unwrap();
BlockWithLabel::new_with_label(blk, label)
}
None => {
let state = vm.get_state_root()?;
let genesis_blk =
genesis::generate_block(state, genesis_timestamp);
db.update(|t| {
t.store_block(
genesis_blk.header(),
&[],
&[],
Label::Final(0),
)
})?;
BlockWithLabel::new_with_label(genesis_blk, Label::Final(0))
}
};
let block_header = block.inner().header();
tracing::info!(
event = "Ledger block loaded",
height = block_header.height,
hash = hex::encode(block_header.hash),
state_root = hex::encode(block_header.state_hash),
label = ?block.label()
);
Ok(block)
}
pub async fn revert_last_final(&self) -> anyhow::Result<()> {
self.acceptor
.as_ref()
.expect("Chain to be initialized")
.read()
.await
.try_revert(acceptor::RevertTarget::LastFinalizedState)
.await
}
async fn reroute_acceptor(&self, msg: Message) {
debug!(
event = "Consensus message received",
topic = ?msg.topic(),
info = ?msg.header,
metadata = ?msg.metadata,
);
let acc = self.acceptor.as_ref().expect("initialize is called");
if let Err(e) = acc.read().await.reroute_msg(msg).await {
warn!("Could not reroute msg to Consensus: {}", e);
}
}
}