1mod acceptor;
8mod consensus;
9mod fallback;
10mod fsm;
11mod genesis;
12
13mod header_validation;
14mod metrics;
15
16use std::collections::HashMap;
17use std::ops::Deref;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::Result;
22use async_trait::async_trait;
23use dusk_consensus::config::is_emergency_block;
24use dusk_consensus::errors::ConsensusError;
25use dusk_core::abi::ContractId;
26use dusk_core::signatures::bls::PublicKey as BlsPublicKey;
27pub use genesis::generate_block as genesis_block;
28pub use header_validation::verify_att;
29use node_data::events::Event;
30use node_data::ledger::{BlockWithLabel, Header, Label, to_str};
31use node_data::message::payload::RatificationResult;
32use node_data::message::{AsyncQueue, Payload, Topics};
33use tokio::sync::RwLock;
34use tokio::sync::mpsc::Sender;
35use tokio::time::{Instant, sleep_until};
36use tracing::{debug, error, info, warn};
37
38use self::acceptor::Acceptor;
39use self::fsm::SimpleFSM;
40#[cfg(feature = "archive")]
41use crate::archive::Archive;
42use crate::database::rocksdb::MD_HASH_KEY;
43use crate::database::{Ledger, Metadata};
44use crate::{LongLivedService, Message, Network, database, vm};
45
46const TOPICS: &[u8] = &[
47 Topics::Block as u8,
48 Topics::Candidate as u8,
49 Topics::Validation as u8,
50 Topics::Ratification as u8,
51 Topics::Quorum as u8,
52 Topics::ValidationQuorum as u8,
53];
54
55const HEARTBEAT_SEC: Duration = Duration::from_secs(3);
56
57pub fn find_block_header_by_state_root<DB>(
64 db: &DB,
65 state_root: [u8; 32],
66) -> Result<Option<Header>>
67where
68 DB: database::DB,
69{
70 db.view(|db| {
71 let Some(latest) = db.latest_block_opt()? else {
72 return Ok(None);
73 };
74
75 let mut header = latest.header;
76
77 loop {
78 if header.state_hash == state_root {
79 return Ok(Some(header));
80 }
81
82 if header.height == 0 {
83 return Err(anyhow::anyhow!(
84 "Cannot find block header for state root {}",
85 to_str(&state_root)
86 ));
87 }
88
89 let prev_hash = header.prev_block_hash;
90 header = db.block_header(&prev_hash)?.ok_or_else(|| {
91 anyhow::anyhow!(
92 "Cannot get header for hash {}",
93 to_str(&prev_hash)
94 )
95 })?;
96 }
97 })
98}
99
100pub struct ChainSrv<N: Network, DB: database::DB, VM: vm::VMExecution> {
101 inbound: AsyncQueue<Message>,
103 keys_path: String,
104 acceptor: Option<Arc<RwLock<Acceptor<N, DB, VM>>>>,
105 max_consensus_queue_size: usize,
106 event_sender: Sender<Event>,
108 genesis_timestamp: u64,
109 dusk_key: BlsPublicKey,
110 finality_activation: u64,
111 blob_expire_after: u64,
112 module_shading: HashMap<ContractId, Vec<(u64, u64)>>,
113 #[cfg(feature = "archive")]
114 archive: Archive,
115}
116
117#[async_trait]
118impl<N: Network, DB: database::DB, VM: vm::VMExecution>
119 LongLivedService<N, DB, VM> for ChainSrv<N, DB, VM>
120{
121 async fn initialize(
122 &mut self,
123 network: Arc<RwLock<N>>,
124 db: Arc<RwLock<DB>>,
125 vm: Arc<RwLock<VM>>,
126 ) -> anyhow::Result<()> {
127 let tip = Self::load_tip(
128 db.read().await.deref(),
129 vm.read().await.deref(),
130 self.genesis_timestamp,
131 )
132 .await?;
133
134 let acc = Acceptor::init_consensus(
136 &self.keys_path,
137 tip,
138 db,
139 network,
140 vm,
141 #[cfg(feature = "archive")]
142 self.archive.clone(),
143 self.max_consensus_queue_size,
144 self.event_sender.clone(),
145 self.dusk_key,
146 self.finality_activation,
147 self.blob_expire_after,
148 self.module_shading.clone(),
149 )
150 .await?;
151
152 self.acceptor = Some(Arc::new(RwLock::new(acc)));
153
154 Ok(())
155 }
156
157 async fn execute(
158 &mut self,
159 network: Arc<RwLock<N>>,
160 _db: Arc<RwLock<DB>>,
161 _vm: Arc<RwLock<VM>>,
162 ) -> anyhow::Result<usize> {
163 LongLivedService::<N, DB, VM>::add_routes(
165 self,
166 TOPICS,
167 self.inbound.clone(),
168 &network,
169 )
170 .await?;
171
172 let acc = self.acceptor.as_mut().expect("initialize is called");
173 acc.write().await.spawn_task().await;
174
175 let mut fsm = SimpleFSM::new(acc.clone(), network.clone()).await;
177
178 let outbound_chan = acc.read().await.get_outbound_chan().await;
179 let result_chan = acc.read().await.get_result_chan().await;
180
181 let mut heartbeat = Instant::now().checked_add(HEARTBEAT_SEC).unwrap();
182
183 loop {
185 tokio::select! {
186 biased;
187 recv = result_chan.recv() => {
189 match recv? {
190 Err(ConsensusError::Canceled(round)) => {
191 debug!(event = "consensus canceled", round);
192 }
193 Err(err) => {
194 error!(event = "failed_consensus", ?err);
196 fsm.on_failed_consensus().await;
197 }
198 _ => {}
199 }
200 },
201 recv = self.inbound.recv() => {
203 let msg = recv?;
204
205 match msg.payload {
206 Payload::Candidate(ref candidate) => {
207 if let Err(err) = fsm.on_candidate(candidate).await {
211 error!(event = "fsm::on_candidate failed", src = "wire", err = ?err);
212 }
213 self.reroute_acceptor(msg).await;
214 }
215
216 Payload::Validation(_)
217 | Payload::Ratification(_)
218 | Payload::ValidationQuorum(_) => {
219 self.reroute_acceptor(msg).await;
220 }
221
222 Payload::Quorum(ref q) => {
223 fsm.on_quorum(q, msg.metadata.as_ref()).await;
224 self.reroute_acceptor(msg).await;
225
226 }
227
228 Payload::Block(blk) => {
229 info!(
230 event = "New block",
231 src = "Block msg",
232 height = blk.header().height,
233 iter = blk.header().iteration,
234 hash = to_str(&blk.header().hash),
235 metadata = ?msg.metadata,
236 );
237
238 match fsm.on_block_event(*blk, msg.metadata.clone()).await {
242 Ok(res) => {
243 if let Some(accepted_blk) = res {
244 if is_emergency_block(accepted_blk.header().iteration){
247 let mut eb_msg = Message::from(accepted_blk);
251 eb_msg.metadata = msg.metadata;
252 if let Err(e) = network.read().await.broadcast(&eb_msg).await {
253 warn!("Unable to re-broadcast Emergency Block: {e}");
254 }
255 }
256 }
257 }
258 Err(err) => {
259 error!(event = "fsm::on_event failed", src = "wire", err = ?err);
260 }
261 }
262 }
263
264 _ => {
265 warn!("invalid inbound message");
266 },
267 }
268
269 },
270 recv = outbound_chan.recv() => {
272 let msg = recv?;
273
274 if let Payload::Quorum(quorum) = &msg.payload
278 && let RatificationResult::Success(_) = quorum.att.result {
279 fsm.on_success_quorum(quorum, msg.metadata.clone()).await;
280 }
281
282 if let Payload::GetResource(res) = &msg.payload {
283 if let Err(e) = network.read().await.flood_request(res.get_inv(), None, 16).await {
284 warn!("Unable to re-route message {e}");
285 }
286 } else if let Err(e) = network.read().await.broadcast(&msg).await {
287 warn!("Unable to broadcast message {e}");
288 }
289
290 },
291 _ = sleep_until(heartbeat) => {
293 if let Err(err) = fsm.on_heartbeat_event().await {
294 error!(event = "heartbeat_failed", ?err);
295 }
296
297 heartbeat = Instant::now().checked_add(HEARTBEAT_SEC).unwrap();
298 },
299 }
300 }
301 }
302
303 fn name(&self) -> &'static str {
305 "chain"
306 }
307}
308
309impl<N: Network, DB: database::DB, VM: vm::VMExecution> ChainSrv<N, DB, VM> {
310 #[allow(clippy::too_many_arguments)]
311 pub fn new(
312 keys_path: String,
313 max_inbound_size: usize,
314 event_sender: Sender<Event>,
315 genesis_timestamp: u64,
316 dusk_key: BlsPublicKey,
317 finality_activation: u64,
318 blob_expire_after: u64,
319 module_shading: HashMap<ContractId, Vec<(u64, u64)>>,
320 #[cfg(feature = "archive")] archive: Archive,
321 ) -> Self {
322 info!(
323 "ChainSrv::new with keys_path: {}, max_inbound_size: {}",
324 keys_path, max_inbound_size
325 );
326
327 Self {
328 inbound: AsyncQueue::bounded(max_inbound_size, "chain_inbound"),
329 keys_path,
330 acceptor: None,
331 max_consensus_queue_size: max_inbound_size,
332 event_sender,
333 genesis_timestamp,
334 dusk_key,
335 finality_activation,
336 blob_expire_after,
337 module_shading,
338 #[cfg(feature = "archive")]
339 archive,
340 }
341 }
342
343 async fn load_tip(
349 db: &DB,
350 vm: &VM,
351 genesis_timestamp: u64,
352 ) -> Result<BlockWithLabel> {
353 let stored_block = db.view(|t| {
354 anyhow::Ok(t.op_read(MD_HASH_KEY)?.and_then(|tip_hash| {
355 t.block(&tip_hash[..])
356 .expect("block to be found if metadata is set")
357 }))
358 })?;
359
360 let block = match stored_block {
361 Some(blk) => {
362 let (_, label) = db
363 .view(|t| t.block_label_by_height(blk.header().height))?
364 .unwrap();
365
366 BlockWithLabel::new_with_label(blk, label)
367 }
368 None => {
369 let state = vm.get_state_root()?;
372 let genesis_blk =
373 genesis::generate_block(state, genesis_timestamp);
374 db.update(|t| {
375 t.store_block(
377 genesis_blk.header(),
378 &[],
379 &[],
380 Label::Final(0),
381 )
382 })?;
383
384 BlockWithLabel::new_with_label(genesis_blk, Label::Final(0))
385 }
386 };
387
388 let block_header = block.inner().header();
389
390 tracing::info!(
391 event = "Ledger block loaded",
392 height = block_header.height,
393 hash = hex::encode(block_header.hash),
394 state_root = hex::encode(block_header.state_hash),
395 label = ?block.label()
396 );
397
398 Ok(block)
399 }
400
401 pub async fn revert_last_final(&self) -> anyhow::Result<()> {
402 self.acceptor
403 .as_ref()
404 .expect("Chain to be initialized")
405 .read()
406 .await
407 .try_revert(acceptor::RevertTarget::LastFinalizedState)
408 .await
409 }
410
411 async fn reroute_acceptor(&self, msg: Message) {
412 debug!(
413 event = "Consensus message received",
414 topic = ?msg.topic(),
415 info = ?msg.header,
416 metadata = ?msg.metadata,
417 );
418
419 let acc = self.acceptor.as_ref().expect("initialize is called");
421 if let Err(e) = acc.read().await.reroute_msg(msg).await {
422 warn!("Could not reroute msg to Consensus: {}", e);
423 }
424 }
425}
426
427#[cfg(test)]
428mod tests {
429 use node_data::ledger::{Header, Label};
430 use tempfile::tempdir;
431
432 use super::*;
433 use crate::database::{DB as _, DatabaseOptions, rocksdb};
434
435 fn test_header(height: u64, state: u8, hash: u8, prev_hash: u8) -> Header {
436 Header {
437 height,
438 state_hash: [state; 32],
439 hash: [hash; 32],
440 prev_block_hash: [prev_hash; 32],
441 ..Default::default()
442 }
443 }
444
445 fn store_header(db: &rocksdb::Backend, header: &Header) -> Result<()> {
446 db.update(|tx| {
447 tx.store_block(header, &[], &[], Label::Final(0))?;
448 Ok(())
449 })?;
450 Ok(())
451 }
452
453 #[test]
456 fn finds_header_by_state_root_before_tip() -> Result<()> {
457 let dir = tempdir()?;
458 let db = rocksdb::Backend::create_or_open(
459 dir.path(),
460 DatabaseOptions::default(),
461 );
462
463 let genesis = test_header(0, 1, 10, 0);
464 let block_one = test_header(1, 2, 11, 10);
465 let tip = test_header(2, 3, 12, 11);
466
467 store_header(&db, &genesis)?;
468 store_header(&db, &block_one)?;
469 store_header(&db, &tip)?;
470
471 let recovered =
472 find_block_header_by_state_root(&db, block_one.state_hash)?
473 .expect("header should be found");
474
475 assert_eq!(recovered.height, block_one.height);
476 assert_eq!(recovered.hash, block_one.hash);
477 assert_eq!(recovered.state_hash, block_one.state_hash);
478
479 Ok(())
480 }
481
482 #[test]
485 fn errors_when_metadata_exists_but_state_root_is_missing() -> Result<()> {
486 let dir = tempdir()?;
487 let db = rocksdb::Backend::create_or_open(
488 dir.path(),
489 DatabaseOptions::default(),
490 );
491
492 let genesis = test_header(0, 1, 10, 0);
493 let tip = test_header(1, 2, 11, 10);
494
495 store_header(&db, &genesis)?;
496 store_header(&db, &tip)?;
497
498 let err = find_block_header_by_state_root(&db, [99; 32])
499 .expect_err("missing state root should be an error");
500
501 assert!(
502 err.to_string().contains("Cannot find block header"),
503 "unexpected error: {err}"
504 );
505
506 Ok(())
507 }
508}