dusk_node/
chain.rs

1// This Source Code Form is subject to the terms of the Mozilla Public
2// License, v. 2.0. If a copy of the MPL was not distributed with this
3// file, You can obtain one at http://mozilla.org/MPL/2.0/.
4//
5// Copyright (c) DUSK NETWORK. All rights reserved.
6
7mod acceptor;
8mod consensus;
9mod fallback;
10mod fsm;
11mod genesis;
12
13mod header_validation;
14mod metrics;
15
16use std::ops::Deref;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::Result;
21use async_trait::async_trait;
22use dusk_consensus::config::is_emergency_block;
23use dusk_consensus::errors::ConsensusError;
24use dusk_core::signatures::bls::PublicKey as BlsPublicKey;
25pub use header_validation::verify_att;
26use node_data::events::Event;
27use node_data::ledger::{to_str, BlockWithLabel, Label};
28use node_data::message::payload::RatificationResult;
29use node_data::message::{AsyncQueue, Payload, Topics};
30use tokio::sync::mpsc::Sender;
31use tokio::sync::RwLock;
32use tokio::time::{sleep_until, Instant};
33use tracing::{debug, error, info, warn};
34
35use self::acceptor::Acceptor;
36use self::fsm::SimpleFSM;
37#[cfg(feature = "archive")]
38use crate::archive::Archive;
39use crate::database::rocksdb::MD_HASH_KEY;
40use crate::database::{Ledger, Metadata};
41use crate::{database, vm, LongLivedService, Message, Network};
42
43const TOPICS: &[u8] = &[
44    Topics::Block as u8,
45    Topics::Candidate as u8,
46    Topics::Validation as u8,
47    Topics::Ratification as u8,
48    Topics::Quorum as u8,
49    Topics::ValidationQuorum as u8,
50];
51
52const HEARTBEAT_SEC: Duration = Duration::from_secs(3);
53
54pub struct ChainSrv<N: Network, DB: database::DB, VM: vm::VMExecution> {
55    /// Inbound wire messages queue
56    inbound: AsyncQueue<Message>,
57    keys_path: String,
58    acceptor: Option<Arc<RwLock<Acceptor<N, DB, VM>>>>,
59    max_consensus_queue_size: usize,
60    /// Sender channel for sending out RUES events
61    event_sender: Sender<Event>,
62    genesis_timestamp: u64,
63    dusk_key: BlsPublicKey,
64    finality_activation: u64,
65    blob_expire_after: u64,
66    #[cfg(feature = "archive")]
67    archive: Archive,
68}
69
70#[async_trait]
71impl<N: Network, DB: database::DB, VM: vm::VMExecution>
72    LongLivedService<N, DB, VM> for ChainSrv<N, DB, VM>
73{
74    async fn initialize(
75        &mut self,
76        network: Arc<RwLock<N>>,
77        db: Arc<RwLock<DB>>,
78        vm: Arc<RwLock<VM>>,
79    ) -> anyhow::Result<()> {
80        let tip = Self::load_tip(
81            db.read().await.deref(),
82            vm.read().await.deref(),
83            self.genesis_timestamp,
84        )
85        .await?;
86
87        // Initialize Acceptor
88        let acc = Acceptor::init_consensus(
89            &self.keys_path,
90            tip,
91            db,
92            network,
93            vm,
94            #[cfg(feature = "archive")]
95            self.archive.clone(),
96            self.max_consensus_queue_size,
97            self.event_sender.clone(),
98            self.dusk_key,
99            self.finality_activation,
100            self.blob_expire_after,
101        )
102        .await?;
103
104        self.acceptor = Some(Arc::new(RwLock::new(acc)));
105
106        Ok(())
107    }
108
109    async fn execute(
110        &mut self,
111        network: Arc<RwLock<N>>,
112        _db: Arc<RwLock<DB>>,
113        _vm: Arc<RwLock<VM>>,
114    ) -> anyhow::Result<usize> {
115        // Register routes
116        LongLivedService::<N, DB, VM>::add_routes(
117            self,
118            TOPICS,
119            self.inbound.clone(),
120            &network,
121        )
122        .await?;
123
124        let acc = self.acceptor.as_mut().expect("initialize is called");
125        acc.write().await.spawn_task().await;
126
127        // Start-up FSM instance
128        let mut fsm = SimpleFSM::new(acc.clone(), network.clone()).await;
129
130        let outbound_chan = acc.read().await.get_outbound_chan().await;
131        let result_chan = acc.read().await.get_result_chan().await;
132
133        let mut heartbeat = Instant::now().checked_add(HEARTBEAT_SEC).unwrap();
134
135        // Message loop for Chain context
136        loop {
137            tokio::select! {
138                biased;
139                // Receives results from the upper layer
140                recv = result_chan.recv() => {
141                    match recv? {
142                        Err(ConsensusError::Canceled(round)) => {
143                            debug!(event = "consensus canceled", round);
144                        }
145                        Err(err) => {
146                            // Internal consensus execution has terminated with an error
147                            error!(event = "failed_consensus", ?err);
148                            fsm.on_failed_consensus().await;
149                        }
150                        _ => {}
151                    }
152                },
153                // Handles any inbound wire.
154                recv = self.inbound.recv() => {
155                    let msg = recv?;
156
157                    match msg.payload {
158                        Payload::Candidate(_)
159                        | Payload::Validation(_)
160                        | Payload::Ratification(_)
161                        | Payload::ValidationQuorum(_) => {
162                            self.reroute_acceptor(msg).await;
163                        }
164
165                        Payload::Quorum(ref q) => {
166                            fsm.on_quorum(q, msg.metadata.as_ref()).await;
167                            self.reroute_acceptor(msg).await;
168
169                        }
170
171                        Payload::Block(blk) => {
172                            info!(
173                                event = "New block",
174                                src = "Block msg",
175                                height = blk.header().height,
176                                iter = blk.header().iteration,
177                                hash = to_str(&blk.header().hash),
178                                metadata = ?msg.metadata,
179                            );
180
181                            // Handle a block that originates from a network peer.
182                            // By disabling block broadcast, a block may be received
183                            // from a peer only after explicit request (on demand).
184                            match fsm.on_block_event(*blk, msg.metadata.clone()).await {
185                                Ok(res) => {
186                                    if let Some(accepted_blk) = res {
187                                        // Repropagate Emergency Blocks
188                                        // We already know it's valid because we accepted it
189                                        if is_emergency_block(accepted_blk.header().iteration){
190                                            // We build a new `msg` to avoid cloning `blk` when
191                                            // passing it to `on_block_event`.
192                                            // We copy the metadata to keep the original ray_id.
193                                            let mut eb_msg = Message::from(accepted_blk);
194                                            eb_msg.metadata = msg.metadata;
195                                            if let Err(e) = network.read().await.broadcast(&eb_msg).await {
196                                                warn!("Unable to re-broadcast Emergency Block: {e}");
197                                            }
198                                        }
199                                    }
200                                }
201                                Err(err) => {
202                                    error!(event = "fsm::on_event failed", src = "wire", err = ?err);
203                                }
204                            }
205                        }
206
207                        _ => {
208                            warn!("invalid inbound message");
209                        },
210                    }
211
212                },
213                // Re-routes messages originated from Consensus (upper) layer to the network layer.
214                recv = outbound_chan.recv() => {
215                    let msg = recv?;
216
217                    // Handle quorum messages from Consensus layer.
218                    // If the associated candidate block already exists,
219                    // the winner block will be compiled and redirected to the Acceptor.
220                    if let Payload::Quorum(quorum) = &msg.payload {
221                      if let RatificationResult::Success(_) = quorum.att.result {
222                          fsm.on_success_quorum(quorum, msg.metadata.clone()).await;
223                      }
224                    }
225
226                    if let Payload::GetResource(res) = &msg.payload {
227                        if let Err(e) = network.read().await.flood_request(res.get_inv(), None, 16).await {
228                            warn!("Unable to re-route message {e}");
229                        }
230                    } else if let Err(e) = network.read().await.broadcast(&msg).await {
231                            warn!("Unable to broadcast message {e}");
232                    }
233
234                },
235                 // Handles heartbeat event
236                _ = sleep_until(heartbeat) => {
237                    if let Err(err) = fsm.on_heartbeat_event().await {
238                        error!(event = "heartbeat_failed", ?err);
239                    }
240
241                    heartbeat = Instant::now().checked_add(HEARTBEAT_SEC).unwrap();
242                },
243            }
244        }
245    }
246
247    /// Returns service name.
248    fn name(&self) -> &'static str {
249        "chain"
250    }
251}
252
253impl<N: Network, DB: database::DB, VM: vm::VMExecution> ChainSrv<N, DB, VM> {
254    #[allow(clippy::too_many_arguments)]
255    pub fn new(
256        keys_path: String,
257        max_inbound_size: usize,
258        event_sender: Sender<Event>,
259        genesis_timestamp: u64,
260        dusk_key: BlsPublicKey,
261        finality_activation: u64,
262        blob_expire_after: u64,
263        #[cfg(feature = "archive")] archive: Archive,
264    ) -> Self {
265        info!(
266            "ChainSrv::new with keys_path: {}, max_inbound_size: {}",
267            keys_path, max_inbound_size
268        );
269
270        Self {
271            inbound: AsyncQueue::bounded(max_inbound_size, "chain_inbound"),
272            keys_path,
273            acceptor: None,
274            max_consensus_queue_size: max_inbound_size,
275            event_sender,
276            genesis_timestamp,
277            dusk_key,
278            finality_activation,
279            blob_expire_after,
280            #[cfg(feature = "archive")]
281            archive,
282        }
283    }
284
285    /// Load both the chain tip and last finalized block from persisted ledger.
286    ///
287    /// Panics
288    ///
289    /// If register entry is read but block is not found.
290    async fn load_tip(
291        db: &DB,
292        vm: &VM,
293        genesis_timestamp: u64,
294    ) -> Result<BlockWithLabel> {
295        let stored_block = db.view(|t| {
296            anyhow::Ok(t.op_read(MD_HASH_KEY)?.and_then(|tip_hash| {
297                t.block(&tip_hash[..])
298                    .expect("block to be found if metadata is set")
299            }))
300        })?;
301
302        let block = match stored_block {
303            Some(blk) => {
304                let (_, label) = db
305                    .view(|t| t.block_label_by_height(blk.header().height))?
306                    .unwrap();
307
308                BlockWithLabel::new_with_label(blk, label)
309            }
310            None => {
311                // Lack of register record means the loaded database is
312                // either malformed or empty.
313                let state = vm.get_state_root()?;
314                let genesis_blk =
315                    genesis::generate_block(state, genesis_timestamp);
316                db.update(|t| {
317                    // Persist genesis block
318                    t.store_block(
319                        genesis_blk.header(),
320                        &[],
321                        &[],
322                        Label::Final(0),
323                    )
324                })?;
325
326                BlockWithLabel::new_with_label(genesis_blk, Label::Final(0))
327            }
328        };
329
330        let block_header = block.inner().header();
331
332        tracing::info!(
333            event = "Ledger block loaded",
334            height = block_header.height,
335            hash = hex::encode(block_header.hash),
336            state_root = hex::encode(block_header.state_hash),
337            label = ?block.label()
338        );
339
340        Ok(block)
341    }
342
343    pub async fn revert_last_final(&self) -> anyhow::Result<()> {
344        self.acceptor
345            .as_ref()
346            .expect("Chain to be initialized")
347            .read()
348            .await
349            .try_revert(acceptor::RevertTarget::LastFinalizedState)
350            .await
351    }
352
353    async fn reroute_acceptor(&self, msg: Message) {
354        debug!(
355            event = "Consensus message received",
356            topic = ?msg.topic(),
357            info = ?msg.header,
358            metadata = ?msg.metadata,
359        );
360
361        // Re-route message to the Consensus
362        let acc = self.acceptor.as_ref().expect("initialize is called");
363        if let Err(e) = acc.read().await.reroute_msg(msg).await {
364            warn!("Could not reroute msg to Consensus: {}", e);
365        }
366    }
367}