Skip to main content

dusk_node/
chain.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7mod acceptor;
8mod consensus;
9mod fallback;
10mod fsm;
11mod genesis;
12
13mod header_validation;
14mod metrics;
15
16use std::collections::HashMap;
17use std::ops::Deref;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::Result;
22use async_trait::async_trait;
23use dusk_consensus::config::is_emergency_block;
24use dusk_consensus::errors::ConsensusError;
25use dusk_core::abi::ContractId;
26use dusk_core::signatures::bls::PublicKey as BlsPublicKey;
27pub use genesis::generate_block as genesis_block;
28pub use header_validation::verify_att;
29use node_data::events::Event;
30use node_data::ledger::{BlockWithLabel, Header, Label, to_str};
31use node_data::message::payload::RatificationResult;
32use node_data::message::{AsyncQueue, Payload, Topics};
33use tokio::sync::RwLock;
34use tokio::sync::mpsc::Sender;
35use tokio::time::{Instant, sleep_until};
36use tracing::{debug, error, info, warn};
37
38use self::acceptor::Acceptor;
39use self::fsm::SimpleFSM;
40#[cfg(feature = "archive")]
41use crate::archive::Archive;
42use crate::database::rocksdb::MD_HASH_KEY;
43use crate::database::{Ledger, Metadata};
44use crate::{LongLivedService, Message, Network, database, vm};
45
46const TOPICS: &[u8] = &[
47    Topics::Block as u8,
48    Topics::Candidate as u8,
49    Topics::Validation as u8,
50    Topics::Ratification as u8,
51    Topics::Quorum as u8,
52    Topics::ValidationQuorum as u8,
53];
54
55const HEARTBEAT_SEC: Duration = Duration::from_secs(3);
56
57/// Finds the stored block header matching `state_root`.
58///
59/// Returns `Ok(None)` only when the chain DB has no tip metadata stored yet,
60/// which means the consumer must decide how to initialize the header. If tip
61/// metadata exists, the matching header must be present and missing data is
62/// reported as an error.
63pub fn find_block_header_by_state_root<DB>(
64    db: &DB,
65    state_root: [u8; 32],
66) -> Result<Option<Header>>
67where
68    DB: database::DB,
69{
70    db.view(|db| {
71        let Some(latest) = db.latest_block_opt()? else {
72            return Ok(None);
73        };
74
75        let mut header = latest.header;
76
77        loop {
78            if header.state_hash == state_root {
79                return Ok(Some(header));
80            }
81
82            if header.height == 0 {
83                return Err(anyhow::anyhow!(
84                    "Cannot find block header for state root {}",
85                    to_str(&state_root)
86                ));
87            }
88
89            let prev_hash = header.prev_block_hash;
90            header = db.block_header(&prev_hash)?.ok_or_else(|| {
91                anyhow::anyhow!(
92                    "Cannot get header for hash {}",
93                    to_str(&prev_hash)
94                )
95            })?;
96        }
97    })
98}
99
100pub struct ChainSrv<N: Network, DB: database::DB, VM: vm::VMExecution> {
101    /// Inbound wire messages queue
102    inbound: AsyncQueue<Message>,
103    keys_path: String,
104    acceptor: Option<Arc<RwLock<Acceptor<N, DB, VM>>>>,
105    max_consensus_queue_size: usize,
106    /// Sender channel for sending out RUES events
107    event_sender: Sender<Event>,
108    genesis_timestamp: u64,
109    dusk_key: BlsPublicKey,
110    finality_activation: u64,
111    blob_expire_after: u64,
112    module_shading: HashMap<ContractId, Vec<(u64, u64)>>,
113    #[cfg(feature = "archive")]
114    archive: Archive,
115}
116
117#[async_trait]
118impl<N: Network, DB: database::DB, VM: vm::VMExecution>
119    LongLivedService<N, DB, VM> for ChainSrv<N, DB, VM>
120{
121    async fn initialize(
122        &mut self,
123        network: Arc<RwLock<N>>,
124        db: Arc<RwLock<DB>>,
125        vm: Arc<RwLock<VM>>,
126    ) -> anyhow::Result<()> {
127        let tip = Self::load_tip(
128            db.read().await.deref(),
129            vm.read().await.deref(),
130            self.genesis_timestamp,
131        )
132        .await?;
133
134        // Initialize Acceptor
135        let acc = Acceptor::init_consensus(
136            &self.keys_path,
137            tip,
138            db,
139            network,
140            vm,
141            #[cfg(feature = "archive")]
142            self.archive.clone(),
143            self.max_consensus_queue_size,
144            self.event_sender.clone(),
145            self.dusk_key,
146            self.finality_activation,
147            self.blob_expire_after,
148            self.module_shading.clone(),
149        )
150        .await?;
151
152        self.acceptor = Some(Arc::new(RwLock::new(acc)));
153
154        Ok(())
155    }
156
157    async fn execute(
158        &mut self,
159        network: Arc<RwLock<N>>,
160        _db: Arc<RwLock<DB>>,
161        _vm: Arc<RwLock<VM>>,
162    ) -> anyhow::Result<usize> {
163        // Register routes
164        LongLivedService::<N, DB, VM>::add_routes(
165            self,
166            TOPICS,
167            self.inbound.clone(),
168            &network,
169        )
170        .await?;
171
172        let acc = self.acceptor.as_mut().expect("initialize is called");
173        acc.write().await.spawn_task().await;
174
175        // Start-up FSM instance
176        let mut fsm = SimpleFSM::new(acc.clone(), network.clone()).await;
177
178        let outbound_chan = acc.read().await.get_outbound_chan().await;
179        let result_chan = acc.read().await.get_result_chan().await;
180
181        let mut heartbeat = Instant::now().checked_add(HEARTBEAT_SEC).unwrap();
182
183        // Message loop for Chain context
184        loop {
185            tokio::select! {
186                biased;
187                // Receives results from the upper layer
188                recv = result_chan.recv() => {
189                    match recv? {
190                        Err(ConsensusError::Canceled(round)) => {
191                            debug!(event = "consensus canceled", round);
192                        }
193                        Err(err) => {
194                            // Internal consensus execution has terminated with an error
195                            error!(event = "failed_consensus", ?err);
196                            fsm.on_failed_consensus().await;
197                        }
198                        _ => {}
199                    }
200                },
201                // Handles any inbound wire.
202                recv = self.inbound.recv() => {
203                    let msg = recv?;
204
205                    match msg.payload {
206                        Payload::Candidate(ref candidate) => {
207                            // Let the FSM inspect Candidates first. While we
208                            // are syncing, a valid next-round Candidate can be
209                            // enough to prove the sync session is unnecessary.
210                            if let Err(err) = fsm.on_candidate(candidate).await {
211                                error!(event = "fsm::on_candidate failed", src = "wire", err = ?err);
212                            }
213                            self.reroute_acceptor(msg).await;
214                        }
215
216                        Payload::Validation(_)
217                        | Payload::Ratification(_)
218                        | Payload::ValidationQuorum(_) => {
219                            self.reroute_acceptor(msg).await;
220                        }
221
222                        Payload::Quorum(ref q) => {
223                            fsm.on_quorum(q, msg.metadata.as_ref()).await;
224                            self.reroute_acceptor(msg).await;
225
226                        }
227
228                        Payload::Block(blk) => {
229                            info!(
230                                event = "New block",
231                                src = "Block msg",
232                                height = blk.header().height,
233                                iter = blk.header().iteration,
234                                hash = to_str(&blk.header().hash),
235                                metadata = ?msg.metadata,
236                            );
237
238                            // Handle a block that originates from a network peer.
239                            // By disabling block broadcast, a block may be received
240                            // from a peer only after explicit request (on demand).
241                            match fsm.on_block_event(*blk, msg.metadata.clone()).await {
242                                Ok(res) => {
243                                    if let Some(accepted_blk) = res {
244                                        // Repropagate Emergency Blocks
245                                        // We already know it's valid because we accepted it
246                                        if is_emergency_block(accepted_blk.header().iteration){
247                                            // We build a new `msg` to avoid cloning `blk` when
248                                            // passing it to `on_block_event`.
249                                            // We copy the metadata to keep the original ray_id.
250                                            let mut eb_msg = Message::from(accepted_blk);
251                                            eb_msg.metadata = msg.metadata;
252                                            if let Err(e) = network.read().await.broadcast(&eb_msg).await {
253                                                warn!("Unable to re-broadcast Emergency Block: {e}");
254                                            }
255                                        }
256                                    }
257                                }
258                                Err(err) => {
259                                    error!(event = "fsm::on_event failed", src = "wire", err = ?err);
260                                }
261                            }
262                        }
263
264                        _ => {
265                            warn!("invalid inbound message");
266                        },
267                    }
268
269                },
270                // Re-routes messages originated from Consensus (upper) layer to the network layer.
271                recv = outbound_chan.recv() => {
272                    let msg = recv?;
273
274                    // Handle quorum messages from Consensus layer.
275                    // If the associated candidate block already exists,
276                    // the winner block will be compiled and redirected to the Acceptor.
277                                        if let Payload::Quorum(quorum) = &msg.payload
278                                            && let RatificationResult::Success(_) = quorum.att.result {
279                                                    fsm.on_success_quorum(quorum, msg.metadata.clone()).await;
280                    }
281
282                    if let Payload::GetResource(res) = &msg.payload {
283                        if let Err(e) = network.read().await.flood_request(res.get_inv(), None, 16).await {
284                            warn!("Unable to re-route message {e}");
285                        }
286                    } else if let Err(e) = network.read().await.broadcast(&msg).await {
287                            warn!("Unable to broadcast message {e}");
288                    }
289
290                },
291                 // Handles heartbeat event
292                _ = sleep_until(heartbeat) => {
293                    if let Err(err) = fsm.on_heartbeat_event().await {
294                        error!(event = "heartbeat_failed", ?err);
295                    }
296
297                    heartbeat = Instant::now().checked_add(HEARTBEAT_SEC).unwrap();
298                },
299            }
300        }
301    }
302
303    /// Returns service name.
304    fn name(&self) -> &'static str {
305        "chain"
306    }
307}
308
309impl<N: Network, DB: database::DB, VM: vm::VMExecution> ChainSrv<N, DB, VM> {
310    #[allow(clippy::too_many_arguments)]
311    pub fn new(
312        keys_path: String,
313        max_inbound_size: usize,
314        event_sender: Sender<Event>,
315        genesis_timestamp: u64,
316        dusk_key: BlsPublicKey,
317        finality_activation: u64,
318        blob_expire_after: u64,
319        module_shading: HashMap<ContractId, Vec<(u64, u64)>>,
320        #[cfg(feature = "archive")] archive: Archive,
321    ) -> Self {
322        info!(
323            "ChainSrv::new with keys_path: {}, max_inbound_size: {}",
324            keys_path, max_inbound_size
325        );
326
327        Self {
328            inbound: AsyncQueue::bounded(max_inbound_size, "chain_inbound"),
329            keys_path,
330            acceptor: None,
331            max_consensus_queue_size: max_inbound_size,
332            event_sender,
333            genesis_timestamp,
334            dusk_key,
335            finality_activation,
336            blob_expire_after,
337            module_shading,
338            #[cfg(feature = "archive")]
339            archive,
340        }
341    }
342
343    /// Load both the chain tip and last finalized block from persisted ledger.
344    ///
345    /// Panics
346    ///
347    /// If register entry is read but block is not found.
348    async fn load_tip(
349        db: &DB,
350        vm: &VM,
351        genesis_timestamp: u64,
352    ) -> Result<BlockWithLabel> {
353        let stored_block = db.view(|t| {
354            anyhow::Ok(t.op_read(MD_HASH_KEY)?.and_then(|tip_hash| {
355                t.block(&tip_hash[..])
356                    .expect("block to be found if metadata is set")
357            }))
358        })?;
359
360        let block = match stored_block {
361            Some(blk) => {
362                let (_, label) = db
363                    .view(|t| t.block_label_by_height(blk.header().height))?
364                    .unwrap();
365
366                BlockWithLabel::new_with_label(blk, label)
367            }
368            None => {
369                // Lack of register record means the loaded database is
370                // either malformed or empty.
371                let state = vm.get_state_root()?;
372                let genesis_blk =
373                    genesis::generate_block(state, genesis_timestamp);
374                db.update(|t| {
375                    // Persist genesis block
376                    t.store_block(
377                        genesis_blk.header(),
378                        &[],
379                        &[],
380                        Label::Final(0),
381                    )
382                })?;
383
384                BlockWithLabel::new_with_label(genesis_blk, Label::Final(0))
385            }
386        };
387
388        let block_header = block.inner().header();
389
390        tracing::info!(
391            event = "Ledger block loaded",
392            height = block_header.height,
393            hash = hex::encode(block_header.hash),
394            state_root = hex::encode(block_header.state_hash),
395            label = ?block.label()
396        );
397
398        Ok(block)
399    }
400
401    pub async fn revert_last_final(&self) -> anyhow::Result<()> {
402        self.acceptor
403            .as_ref()
404            .expect("Chain to be initialized")
405            .read()
406            .await
407            .try_revert(acceptor::RevertTarget::LastFinalizedState)
408            .await
409    }
410
411    async fn reroute_acceptor(&self, msg: Message) {
412        debug!(
413            event = "Consensus message received",
414            topic = ?msg.topic(),
415            info = ?msg.header,
416            metadata = ?msg.metadata,
417        );
418
419        // Re-route message to the Consensus
420        let acc = self.acceptor.as_ref().expect("initialize is called");
421        if let Err(e) = acc.read().await.reroute_msg(msg).await {
422            warn!("Could not reroute msg to Consensus: {}", e);
423        }
424    }
425}
426
427#[cfg(test)]
428mod tests {
429    use node_data::ledger::{Header, Label};
430    use tempfile::tempdir;
431
432    use super::*;
433    use crate::database::{DB as _, DatabaseOptions, rocksdb};
434
435    fn test_header(height: u64, state: u8, hash: u8, prev_hash: u8) -> Header {
436        Header {
437            height,
438            state_hash: [state; 32],
439            hash: [hash; 32],
440            prev_block_hash: [prev_hash; 32],
441            ..Default::default()
442        }
443    }
444
445    fn store_header(db: &rocksdb::Backend, header: &Header) -> Result<()> {
446        db.update(|tx| {
447            tx.store_block(header, &[], &[], Label::Final(0))?;
448            Ok(())
449        })?;
450        Ok(())
451    }
452
453    // Covers the restart path where the persisted finalized state root is
454    // behind the chain DB tip and must be matched by walking back headers.
455    #[test]
456    fn finds_header_by_state_root_before_tip() -> Result<()> {
457        let dir = tempdir()?;
458        let db = rocksdb::Backend::create_or_open(
459            dir.path(),
460            DatabaseOptions::default(),
461        );
462
463        let genesis = test_header(0, 1, 10, 0);
464        let block_one = test_header(1, 2, 11, 10);
465        let tip = test_header(2, 3, 12, 11);
466
467        store_header(&db, &genesis)?;
468        store_header(&db, &block_one)?;
469        store_header(&db, &tip)?;
470
471        let recovered =
472            find_block_header_by_state_root(&db, block_one.state_hash)?
473                .expect("header should be found");
474
475        assert_eq!(recovered.height, block_one.height);
476        assert_eq!(recovered.hash, block_one.hash);
477        assert_eq!(recovered.state_hash, block_one.state_hash);
478
479        Ok(())
480    }
481
482    // Covers corrupted/incomplete chain metadata: once tip metadata exists,
483    // a missing state root must be reported instead of returning `None`.
484    #[test]
485    fn errors_when_metadata_exists_but_state_root_is_missing() -> Result<()> {
486        let dir = tempdir()?;
487        let db = rocksdb::Backend::create_or_open(
488            dir.path(),
489            DatabaseOptions::default(),
490        );
491
492        let genesis = test_header(0, 1, 10, 0);
493        let tip = test_header(1, 2, 11, 10);
494
495        store_header(&db, &genesis)?;
496        store_header(&db, &tip)?;
497
498        let err = find_block_header_by_state_root(&db, [99; 32])
499            .expect_err("missing state root should be an error");
500
501        assert!(
502            err.to_string().contains("Cannot find block header"),
503            "unexpected error: {err}"
504        );
505
506        Ok(())
507    }
508}