1mod acceptor;
8mod consensus;
9mod fallback;
10mod fsm;
11mod genesis;
12
13mod header_validation;
14mod metrics;
15
16use std::collections::HashMap;
17use std::ops::Deref;
18use std::sync::Arc;
19use std::time::Duration;
20
21use anyhow::Result;
22use async_trait::async_trait;
23use dusk_consensus::config::is_emergency_block;
24use dusk_consensus::errors::ConsensusError;
25use dusk_core::abi::ContractId;
26use dusk_core::signatures::bls::PublicKey as BlsPublicKey;
27pub use header_validation::verify_att;
28use node_data::events::Event;
29use node_data::ledger::{to_str, BlockWithLabel, Label};
30use node_data::message::payload::RatificationResult;
31use node_data::message::{AsyncQueue, Payload, Topics};
32use tokio::sync::mpsc::Sender;
33use tokio::sync::RwLock;
34use tokio::time::{sleep_until, Instant};
35use tracing::{debug, error, info, warn};
36
37use self::acceptor::Acceptor;
38use self::fsm::SimpleFSM;
39#[cfg(feature = "archive")]
40use crate::archive::Archive;
41use crate::database::rocksdb::MD_HASH_KEY;
42use crate::database::{Ledger, Metadata};
43use crate::{database, vm, LongLivedService, Message, Network};
44
45const TOPICS: &[u8] = &[
46 Topics::Block as u8,
47 Topics::Candidate as u8,
48 Topics::Validation as u8,
49 Topics::Ratification as u8,
50 Topics::Quorum as u8,
51 Topics::ValidationQuorum as u8,
52];
53
54const HEARTBEAT_SEC: Duration = Duration::from_secs(3);
55
56pub struct ChainSrv<N: Network, DB: database::DB, VM: vm::VMExecution> {
57 inbound: AsyncQueue<Message>,
59 keys_path: String,
60 acceptor: Option<Arc<RwLock<Acceptor<N, DB, VM>>>>,
61 max_consensus_queue_size: usize,
62 event_sender: Sender<Event>,
64 genesis_timestamp: u64,
65 dusk_key: BlsPublicKey,
66 finality_activation: u64,
67 blob_expire_after: u64,
68 module_shading: HashMap<ContractId, Vec<(u64, u64)>>,
69 #[cfg(feature = "archive")]
70 archive: Archive,
71}
72
73#[async_trait]
74impl<N: Network, DB: database::DB, VM: vm::VMExecution>
75 LongLivedService<N, DB, VM> for ChainSrv<N, DB, VM>
76{
77 async fn initialize(
78 &mut self,
79 network: Arc<RwLock<N>>,
80 db: Arc<RwLock<DB>>,
81 vm: Arc<RwLock<VM>>,
82 ) -> anyhow::Result<()> {
83 let tip = Self::load_tip(
84 db.read().await.deref(),
85 vm.read().await.deref(),
86 self.genesis_timestamp,
87 )
88 .await?;
89
90 let acc = Acceptor::init_consensus(
92 &self.keys_path,
93 tip,
94 db,
95 network,
96 vm,
97 #[cfg(feature = "archive")]
98 self.archive.clone(),
99 self.max_consensus_queue_size,
100 self.event_sender.clone(),
101 self.dusk_key,
102 self.finality_activation,
103 self.blob_expire_after,
104 self.module_shading.clone(),
105 )
106 .await?;
107
108 self.acceptor = Some(Arc::new(RwLock::new(acc)));
109
110 Ok(())
111 }
112
113 async fn execute(
114 &mut self,
115 network: Arc<RwLock<N>>,
116 _db: Arc<RwLock<DB>>,
117 _vm: Arc<RwLock<VM>>,
118 ) -> anyhow::Result<usize> {
119 LongLivedService::<N, DB, VM>::add_routes(
121 self,
122 TOPICS,
123 self.inbound.clone(),
124 &network,
125 )
126 .await?;
127
128 let acc = self.acceptor.as_mut().expect("initialize is called");
129 acc.write().await.spawn_task().await;
130
131 let mut fsm = SimpleFSM::new(acc.clone(), network.clone()).await;
133
134 let outbound_chan = acc.read().await.get_outbound_chan().await;
135 let result_chan = acc.read().await.get_result_chan().await;
136
137 let mut heartbeat = Instant::now().checked_add(HEARTBEAT_SEC).unwrap();
138
139 loop {
141 tokio::select! {
142 biased;
143 recv = result_chan.recv() => {
145 match recv? {
146 Err(ConsensusError::Canceled(round)) => {
147 debug!(event = "consensus canceled", round);
148 }
149 Err(err) => {
150 error!(event = "failed_consensus", ?err);
152 fsm.on_failed_consensus().await;
153 }
154 _ => {}
155 }
156 },
157 recv = self.inbound.recv() => {
159 let msg = recv?;
160
161 match msg.payload {
162 Payload::Candidate(_)
163 | Payload::Validation(_)
164 | Payload::Ratification(_)
165 | Payload::ValidationQuorum(_) => {
166 self.reroute_acceptor(msg).await;
167 }
168
169 Payload::Quorum(ref q) => {
170 fsm.on_quorum(q, msg.metadata.as_ref()).await;
171 self.reroute_acceptor(msg).await;
172
173 }
174
175 Payload::Block(blk) => {
176 info!(
177 event = "New block",
178 src = "Block msg",
179 height = blk.header().height,
180 iter = blk.header().iteration,
181 hash = to_str(&blk.header().hash),
182 metadata = ?msg.metadata,
183 );
184
185 match fsm.on_block_event(*blk, msg.metadata.clone()).await {
189 Ok(res) => {
190 if let Some(accepted_blk) = res {
191 if is_emergency_block(accepted_blk.header().iteration){
194 let mut eb_msg = Message::from(accepted_blk);
198 eb_msg.metadata = msg.metadata;
199 if let Err(e) = network.read().await.broadcast(&eb_msg).await {
200 warn!("Unable to re-broadcast Emergency Block: {e}");
201 }
202 }
203 }
204 }
205 Err(err) => {
206 error!(event = "fsm::on_event failed", src = "wire", err = ?err);
207 }
208 }
209 }
210
211 _ => {
212 warn!("invalid inbound message");
213 },
214 }
215
216 },
217 recv = outbound_chan.recv() => {
219 let msg = recv?;
220
221 if let Payload::Quorum(quorum) = &msg.payload {
225 if let RatificationResult::Success(_) = quorum.att.result {
226 fsm.on_success_quorum(quorum, msg.metadata.clone()).await;
227 }
228 }
229
230 if let Payload::GetResource(res) = &msg.payload {
231 if let Err(e) = network.read().await.flood_request(res.get_inv(), None, 16).await {
232 warn!("Unable to re-route message {e}");
233 }
234 } else if let Err(e) = network.read().await.broadcast(&msg).await {
235 warn!("Unable to broadcast message {e}");
236 }
237
238 },
239 _ = sleep_until(heartbeat) => {
241 if let Err(err) = fsm.on_heartbeat_event().await {
242 error!(event = "heartbeat_failed", ?err);
243 }
244
245 heartbeat = Instant::now().checked_add(HEARTBEAT_SEC).unwrap();
246 },
247 }
248 }
249 }
250
251 fn name(&self) -> &'static str {
253 "chain"
254 }
255}
256
257impl<N: Network, DB: database::DB, VM: vm::VMExecution> ChainSrv<N, DB, VM> {
258 #[allow(clippy::too_many_arguments)]
259 pub fn new(
260 keys_path: String,
261 max_inbound_size: usize,
262 event_sender: Sender<Event>,
263 genesis_timestamp: u64,
264 dusk_key: BlsPublicKey,
265 finality_activation: u64,
266 blob_expire_after: u64,
267 module_shading: HashMap<ContractId, Vec<(u64, u64)>>,
268 #[cfg(feature = "archive")] archive: Archive,
269 ) -> Self {
270 info!(
271 "ChainSrv::new with keys_path: {}, max_inbound_size: {}",
272 keys_path, max_inbound_size
273 );
274
275 Self {
276 inbound: AsyncQueue::bounded(max_inbound_size, "chain_inbound"),
277 keys_path,
278 acceptor: None,
279 max_consensus_queue_size: max_inbound_size,
280 event_sender,
281 genesis_timestamp,
282 dusk_key,
283 finality_activation,
284 blob_expire_after,
285 module_shading,
286 #[cfg(feature = "archive")]
287 archive,
288 }
289 }
290
291 async fn load_tip(
297 db: &DB,
298 vm: &VM,
299 genesis_timestamp: u64,
300 ) -> Result<BlockWithLabel> {
301 let stored_block = db.view(|t| {
302 anyhow::Ok(t.op_read(MD_HASH_KEY)?.and_then(|tip_hash| {
303 t.block(&tip_hash[..])
304 .expect("block to be found if metadata is set")
305 }))
306 })?;
307
308 let block = match stored_block {
309 Some(blk) => {
310 let (_, label) = db
311 .view(|t| t.block_label_by_height(blk.header().height))?
312 .unwrap();
313
314 BlockWithLabel::new_with_label(blk, label)
315 }
316 None => {
317 let state = vm.get_state_root()?;
320 let genesis_blk =
321 genesis::generate_block(state, genesis_timestamp);
322 db.update(|t| {
323 t.store_block(
325 genesis_blk.header(),
326 &[],
327 &[],
328 Label::Final(0),
329 )
330 })?;
331
332 BlockWithLabel::new_with_label(genesis_blk, Label::Final(0))
333 }
334 };
335
336 let block_header = block.inner().header();
337
338 tracing::info!(
339 event = "Ledger block loaded",
340 height = block_header.height,
341 hash = hex::encode(block_header.hash),
342 state_root = hex::encode(block_header.state_hash),
343 label = ?block.label()
344 );
345
346 Ok(block)
347 }
348
349 pub async fn revert_last_final(&self) -> anyhow::Result<()> {
350 self.acceptor
351 .as_ref()
352 .expect("Chain to be initialized")
353 .read()
354 .await
355 .try_revert(acceptor::RevertTarget::LastFinalizedState)
356 .await
357 }
358
359 async fn reroute_acceptor(&self, msg: Message) {
360 debug!(
361 event = "Consensus message received",
362 topic = ?msg.topic(),
363 info = ?msg.header,
364 metadata = ?msg.metadata,
365 );
366
367 let acc = self.acceptor.as_ref().expect("initialize is called");
369 if let Err(e) = acc.read().await.reroute_msg(msg).await {
370 warn!("Could not reroute msg to Consensus: {}", e);
371 }
372 }
373}