dusk_node/
chain.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7mod acceptor;
8mod consensus;
9mod fallback;
10mod fsm;
11mod genesis;
12
13mod header_validation;
14mod metrics;
15
16use std::collections::HashMap;
17use std::ops::Deref;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::Result;
22use async_trait::async_trait;
23use dusk_consensus::config::is_emergency_block;
24use dusk_consensus::errors::ConsensusError;
25use dusk_core::abi::ContractId;
26use dusk_core::signatures::bls::PublicKey as BlsPublicKey;
27pub use header_validation::verify_att;
28use node_data::events::Event;
29use node_data::ledger::{to_str, BlockWithLabel, Label};
30use node_data::message::payload::RatificationResult;
31use node_data::message::{AsyncQueue, Payload, Topics};
32use tokio::sync::mpsc::Sender;
33use tokio::sync::RwLock;
34use tokio::time::{sleep_until, Instant};
35use tracing::{debug, error, info, warn};
36
37use self::acceptor::Acceptor;
38use self::fsm::SimpleFSM;
39#[cfg(feature = "archive")]
40use crate::archive::Archive;
41use crate::database::rocksdb::MD_HASH_KEY;
42use crate::database::{Ledger, Metadata};
43use crate::{database, vm, LongLivedService, Message, Network};
44
45const TOPICS: &[u8] = &[
46    Topics::Block as u8,
47    Topics::Candidate as u8,
48    Topics::Validation as u8,
49    Topics::Ratification as u8,
50    Topics::Quorum as u8,
51    Topics::ValidationQuorum as u8,
52];
53
54const HEARTBEAT_SEC: Duration = Duration::from_secs(3);
55
56pub struct ChainSrv<N: Network, DB: database::DB, VM: vm::VMExecution> {
57    /// Inbound wire messages queue
58    inbound: AsyncQueue<Message>,
59    keys_path: String,
60    acceptor: Option<Arc<RwLock<Acceptor<N, DB, VM>>>>,
61    max_consensus_queue_size: usize,
62    /// Sender channel for sending out RUES events
63    event_sender: Sender<Event>,
64    genesis_timestamp: u64,
65    dusk_key: BlsPublicKey,
66    finality_activation: u64,
67    blob_expire_after: u64,
68    module_shading: HashMap<ContractId, Vec<(u64, u64)>>,
69    #[cfg(feature = "archive")]
70    archive: Archive,
71}
72
73#[async_trait]
74impl<N: Network, DB: database::DB, VM: vm::VMExecution>
75    LongLivedService<N, DB, VM> for ChainSrv<N, DB, VM>
76{
77    async fn initialize(
78        &mut self,
79        network: Arc<RwLock<N>>,
80        db: Arc<RwLock<DB>>,
81        vm: Arc<RwLock<VM>>,
82    ) -> anyhow::Result<()> {
83        let tip = Self::load_tip(
84            db.read().await.deref(),
85            vm.read().await.deref(),
86            self.genesis_timestamp,
87        )
88        .await?;
89
90        // Initialize Acceptor
91        let acc = Acceptor::init_consensus(
92            &self.keys_path,
93            tip,
94            db,
95            network,
96            vm,
97            #[cfg(feature = "archive")]
98            self.archive.clone(),
99            self.max_consensus_queue_size,
100            self.event_sender.clone(),
101            self.dusk_key,
102            self.finality_activation,
103            self.blob_expire_after,
104            self.module_shading.clone(),
105        )
106        .await?;
107
108        self.acceptor = Some(Arc::new(RwLock::new(acc)));
109
110        Ok(())
111    }
112
113    async fn execute(
114        &mut self,
115        network: Arc<RwLock<N>>,
116        _db: Arc<RwLock<DB>>,
117        _vm: Arc<RwLock<VM>>,
118    ) -> anyhow::Result<usize> {
119        // Register routes
120        LongLivedService::<N, DB, VM>::add_routes(
121            self,
122            TOPICS,
123            self.inbound.clone(),
124            &network,
125        )
126        .await?;
127
128        let acc = self.acceptor.as_mut().expect("initialize is called");
129        acc.write().await.spawn_task().await;
130
131        // Start-up FSM instance
132        let mut fsm = SimpleFSM::new(acc.clone(), network.clone()).await;
133
134        let outbound_chan = acc.read().await.get_outbound_chan().await;
135        let result_chan = acc.read().await.get_result_chan().await;
136
137        let mut heartbeat = Instant::now().checked_add(HEARTBEAT_SEC).unwrap();
138
139        // Message loop for Chain context
140        loop {
141            tokio::select! {
142                biased;
143                // Receives results from the upper layer
144                recv = result_chan.recv() => {
145                    match recv? {
146                        Err(ConsensusError::Canceled(round)) => {
147                            debug!(event = "consensus canceled", round);
148                        }
149                        Err(err) => {
150                            // Internal consensus execution has terminated with an error
151                            error!(event = "failed_consensus", ?err);
152                            fsm.on_failed_consensus().await;
153                        }
154                        _ => {}
155                    }
156                },
157                // Handles any inbound wire.
158                recv = self.inbound.recv() => {
159                    let msg = recv?;
160
161                    match msg.payload {
162                        Payload::Candidate(_)
163                        | Payload::Validation(_)
164                        | Payload::Ratification(_)
165                        | Payload::ValidationQuorum(_) => {
166                            self.reroute_acceptor(msg).await;
167                        }
168
169                        Payload::Quorum(ref q) => {
170                            fsm.on_quorum(q, msg.metadata.as_ref()).await;
171                            self.reroute_acceptor(msg).await;
172
173                        }
174
175                        Payload::Block(blk) => {
176                            info!(
177                                event = "New block",
178                                src = "Block msg",
179                                height = blk.header().height,
180                                iter = blk.header().iteration,
181                                hash = to_str(&blk.header().hash),
182                                metadata = ?msg.metadata,
183                            );
184
185                            // Handle a block that originates from a network peer.
186                            // By disabling block broadcast, a block may be received
187                            // from a peer only after explicit request (on demand).
188                            match fsm.on_block_event(*blk, msg.metadata.clone()).await {
189                                Ok(res) => {
190                                    if let Some(accepted_blk) = res {
191                                        // Repropagate Emergency Blocks
192                                        // We already know it's valid because we accepted it
193                                        if is_emergency_block(accepted_blk.header().iteration){
194                                            // We build a new `msg` to avoid cloning `blk` when
195                                            // passing it to `on_block_event`.
196                                            // We copy the metadata to keep the original ray_id.
197                                            let mut eb_msg = Message::from(accepted_blk);
198                                            eb_msg.metadata = msg.metadata;
199                                            if let Err(e) = network.read().await.broadcast(&eb_msg).await {
200                                                warn!("Unable to re-broadcast Emergency Block: {e}");
201                                            }
202                                        }
203                                    }
204                                }
205                                Err(err) => {
206                                    error!(event = "fsm::on_event failed", src = "wire", err = ?err);
207                                }
208                            }
209                        }
210
211                        _ => {
212                            warn!("invalid inbound message");
213                        },
214                    }
215
216                },
217                // Re-routes messages originated from Consensus (upper) layer to the network layer.
218                recv = outbound_chan.recv() => {
219                    let msg = recv?;
220
221                    // Handle quorum messages from Consensus layer.
222                    // If the associated candidate block already exists,
223                    // the winner block will be compiled and redirected to the Acceptor.
224                    if let Payload::Quorum(quorum) = &msg.payload {
225                      if let RatificationResult::Success(_) = quorum.att.result {
226                          fsm.on_success_quorum(quorum, msg.metadata.clone()).await;
227                      }
228                    }
229
230                    if let Payload::GetResource(res) = &msg.payload {
231                        if let Err(e) = network.read().await.flood_request(res.get_inv(), None, 16).await {
232                            warn!("Unable to re-route message {e}");
233                        }
234                    } else if let Err(e) = network.read().await.broadcast(&msg).await {
235                            warn!("Unable to broadcast message {e}");
236                    }
237
238                },
239                 // Handles heartbeat event
240                _ = sleep_until(heartbeat) => {
241                    if let Err(err) = fsm.on_heartbeat_event().await {
242                        error!(event = "heartbeat_failed", ?err);
243                    }
244
245                    heartbeat = Instant::now().checked_add(HEARTBEAT_SEC).unwrap();
246                },
247            }
248        }
249    }
250
251    /// Returns service name.
252    fn name(&self) -> &'static str {
253        "chain"
254    }
255}
256
257impl<N: Network, DB: database::DB, VM: vm::VMExecution> ChainSrv<N, DB, VM> {
258    #[allow(clippy::too_many_arguments)]
259    pub fn new(
260        keys_path: String,
261        max_inbound_size: usize,
262        event_sender: Sender<Event>,
263        genesis_timestamp: u64,
264        dusk_key: BlsPublicKey,
265        finality_activation: u64,
266        blob_expire_after: u64,
267        module_shading: HashMap<ContractId, Vec<(u64, u64)>>,
268        #[cfg(feature = "archive")] archive: Archive,
269    ) -> Self {
270        info!(
271            "ChainSrv::new with keys_path: {}, max_inbound_size: {}",
272            keys_path, max_inbound_size
273        );
274
275        Self {
276            inbound: AsyncQueue::bounded(max_inbound_size, "chain_inbound"),
277            keys_path,
278            acceptor: None,
279            max_consensus_queue_size: max_inbound_size,
280            event_sender,
281            genesis_timestamp,
282            dusk_key,
283            finality_activation,
284            blob_expire_after,
285            module_shading,
286            #[cfg(feature = "archive")]
287            archive,
288        }
289    }
290
291    /// Load both the chain tip and last finalized block from persisted ledger.
292    ///
293    /// Panics
294    ///
295    /// If register entry is read but block is not found.
296    async fn load_tip(
297        db: &DB,
298        vm: &VM,
299        genesis_timestamp: u64,
300    ) -> Result<BlockWithLabel> {
301        let stored_block = db.view(|t| {
302            anyhow::Ok(t.op_read(MD_HASH_KEY)?.and_then(|tip_hash| {
303                t.block(&tip_hash[..])
304                    .expect("block to be found if metadata is set")
305            }))
306        })?;
307
308        let block = match stored_block {
309            Some(blk) => {
310                let (_, label) = db
311                    .view(|t| t.block_label_by_height(blk.header().height))?
312                    .unwrap();
313
314                BlockWithLabel::new_with_label(blk, label)
315            }
316            None => {
317                // Lack of register record means the loaded database is
318                // either malformed or empty.
319                let state = vm.get_state_root()?;
320                let genesis_blk =
321                    genesis::generate_block(state, genesis_timestamp);
322                db.update(|t| {
323                    // Persist genesis block
324                    t.store_block(
325                        genesis_blk.header(),
326                        &[],
327                        &[],
328                        Label::Final(0),
329                    )
330                })?;
331
332                BlockWithLabel::new_with_label(genesis_blk, Label::Final(0))
333            }
334        };
335
336        let block_header = block.inner().header();
337
338        tracing::info!(
339            event = "Ledger block loaded",
340            height = block_header.height,
341            hash = hex::encode(block_header.hash),
342            state_root = hex::encode(block_header.state_hash),
343            label = ?block.label()
344        );
345
346        Ok(block)
347    }
348
349    pub async fn revert_last_final(&self) -> anyhow::Result<()> {
350        self.acceptor
351            .as_ref()
352            .expect("Chain to be initialized")
353            .read()
354            .await
355            .try_revert(acceptor::RevertTarget::LastFinalizedState)
356            .await
357    }
358
359    async fn reroute_acceptor(&self, msg: Message) {
360        debug!(
361            event = "Consensus message received",
362            topic = ?msg.topic(),
363            info = ?msg.header,
364            metadata = ?msg.metadata,
365        );
366
367        // Re-route message to the Consensus
368        let acc = self.acceptor.as_ref().expect("initialize is called");
369        if let Err(e) = acc.read().await.reroute_msg(msg).await {
370            warn!("Could not reroute msg to Consensus: {}", e);
371        }
372    }
373}