1mod acceptor;
8mod consensus;
9mod fallback;
10mod fsm;
11mod genesis;
12
13mod header_validation;
14mod metrics;
15
16use std::ops::Deref;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::Result;
21use async_trait::async_trait;
22use dusk_consensus::config::is_emergency_block;
23use dusk_consensus::errors::ConsensusError;
24use dusk_core::signatures::bls::PublicKey as BlsPublicKey;
25pub use header_validation::verify_att;
26use node_data::events::Event;
27use node_data::ledger::{to_str, BlockWithLabel, Label};
28use node_data::message::payload::RatificationResult;
29use node_data::message::{AsyncQueue, Payload, Topics};
30use tokio::sync::mpsc::Sender;
31use tokio::sync::RwLock;
32use tokio::time::{sleep_until, Instant};
33use tracing::{debug, error, info, warn};
34
35use self::acceptor::Acceptor;
36use self::fsm::SimpleFSM;
37#[cfg(feature = "archive")]
38use crate::archive::Archive;
39use crate::database::rocksdb::MD_HASH_KEY;
40use crate::database::{Ledger, Metadata};
41use crate::{database, vm, LongLivedService, Message, Network};
42
43const TOPICS: &[u8] = &[
44 Topics::Block as u8,
45 Topics::Candidate as u8,
46 Topics::Validation as u8,
47 Topics::Ratification as u8,
48 Topics::Quorum as u8,
49 Topics::ValidationQuorum as u8,
50];
51
52const HEARTBEAT_SEC: Duration = Duration::from_secs(3);
53
54pub struct ChainSrv<N: Network, DB: database::DB, VM: vm::VMExecution> {
55 inbound: AsyncQueue<Message>,
57 keys_path: String,
58 acceptor: Option<Arc<RwLock<Acceptor<N, DB, VM>>>>,
59 max_consensus_queue_size: usize,
60 event_sender: Sender<Event>,
62 genesis_timestamp: u64,
63 dusk_key: BlsPublicKey,
64 finality_activation: u64,
65 #[cfg(feature = "archive")]
66 archive: Archive,
67}
68
69#[async_trait]
70impl<N: Network, DB: database::DB, VM: vm::VMExecution>
71 LongLivedService<N, DB, VM> for ChainSrv<N, DB, VM>
72{
73 async fn initialize(
74 &mut self,
75 network: Arc<RwLock<N>>,
76 db: Arc<RwLock<DB>>,
77 vm: Arc<RwLock<VM>>,
78 ) -> anyhow::Result<()> {
79 let tip = Self::load_tip(
80 db.read().await.deref(),
81 vm.read().await.deref(),
82 self.genesis_timestamp,
83 )
84 .await?;
85
86 let acc = Acceptor::init_consensus(
88 &self.keys_path,
89 tip,
90 db,
91 network,
92 vm,
93 #[cfg(feature = "archive")]
94 self.archive.clone(),
95 self.max_consensus_queue_size,
96 self.event_sender.clone(),
97 self.dusk_key,
98 self.finality_activation,
99 )
100 .await?;
101
102 self.acceptor = Some(Arc::new(RwLock::new(acc)));
103
104 Ok(())
105 }
106
107 async fn execute(
108 &mut self,
109 network: Arc<RwLock<N>>,
110 _db: Arc<RwLock<DB>>,
111 _vm: Arc<RwLock<VM>>,
112 ) -> anyhow::Result<usize> {
113 LongLivedService::<N, DB, VM>::add_routes(
115 self,
116 TOPICS,
117 self.inbound.clone(),
118 &network,
119 )
120 .await?;
121
122 let acc = self.acceptor.as_mut().expect("initialize is called");
123 acc.write().await.spawn_task().await;
124
125 let mut fsm = SimpleFSM::new(acc.clone(), network.clone()).await;
127
128 let outbound_chan = acc.read().await.get_outbound_chan().await;
129 let result_chan = acc.read().await.get_result_chan().await;
130
131 let mut heartbeat = Instant::now().checked_add(HEARTBEAT_SEC).unwrap();
132
133 loop {
135 tokio::select! {
136 biased;
137 recv = result_chan.recv() => {
139 match recv? {
140 Err(ConsensusError::Canceled(round)) => {
141 debug!(event = "consensus canceled", round);
142 }
143 Err(err) => {
144 error!(event = "failed_consensus", ?err);
146 fsm.on_failed_consensus().await;
147 }
148 _ => {}
149 }
150 },
151 recv = self.inbound.recv() => {
153 let msg = recv?;
154
155 match msg.payload {
156 Payload::Candidate(_)
157 | Payload::Validation(_)
158 | Payload::Ratification(_)
159 | Payload::ValidationQuorum(_) => {
160 self.reroute_acceptor(msg).await;
161 }
162
163 Payload::Quorum(ref q) => {
164 fsm.on_quorum(q, msg.metadata.as_ref()).await;
165 self.reroute_acceptor(msg).await;
166
167 }
168
169 Payload::Block(blk) => {
170 info!(
171 event = "New block",
172 src = "Block msg",
173 height = blk.header().height,
174 iter = blk.header().iteration,
175 hash = to_str(&blk.header().hash),
176 metadata = ?msg.metadata,
177 );
178
179 match fsm.on_block_event(*blk, msg.metadata.clone()).await {
183 Ok(res) => {
184 if let Some(accepted_blk) = res {
185 if is_emergency_block(accepted_blk.header().iteration){
188 let mut eb_msg = Message::from(accepted_blk);
192 eb_msg.metadata = msg.metadata;
193 if let Err(e) = network.read().await.broadcast(&eb_msg).await {
194 warn!("Unable to re-broadcast Emergency Block: {e}");
195 }
196 }
197 }
198 }
199 Err(err) => {
200 error!(event = "fsm::on_event failed", src = "wire", err = ?err);
201 }
202 }
203 }
204
205 _ => {
206 warn!("invalid inbound message");
207 },
208 }
209
210 },
211 recv = outbound_chan.recv() => {
213 let msg = recv?;
214
215 if let Payload::Quorum(quorum) = &msg.payload {
219 if let RatificationResult::Success(_) = quorum.att.result {
220 fsm.on_success_quorum(quorum, msg.metadata.clone()).await;
221 }
222 }
223
224 if let Payload::GetResource(res) = &msg.payload {
225 if let Err(e) = network.read().await.flood_request(res.get_inv(), None, 16).await {
226 warn!("Unable to re-route message {e}");
227 }
228 } else if let Err(e) = network.read().await.broadcast(&msg).await {
229 warn!("Unable to broadcast message {e}");
230 }
231
232 },
233 _ = sleep_until(heartbeat) => {
235 if let Err(err) = fsm.on_heartbeat_event().await {
236 error!(event = "heartbeat_failed", ?err);
237 }
238
239 heartbeat = Instant::now().checked_add(HEARTBEAT_SEC).unwrap();
240 },
241 }
242 }
243 }
244
245 fn name(&self) -> &'static str {
247 "chain"
248 }
249}
250
251impl<N: Network, DB: database::DB, VM: vm::VMExecution> ChainSrv<N, DB, VM> {
252 pub fn new(
253 keys_path: String,
254 max_inbound_size: usize,
255 event_sender: Sender<Event>,
256 genesis_timestamp: u64,
257 dusk_key: BlsPublicKey,
258 finality_activation: u64,
259 #[cfg(feature = "archive")] archive: Archive,
260 ) -> Self {
261 info!(
262 "ChainSrv::new with keys_path: {}, max_inbound_size: {}",
263 keys_path, max_inbound_size
264 );
265
266 Self {
267 inbound: AsyncQueue::bounded(max_inbound_size, "chain_inbound"),
268 keys_path,
269 acceptor: None,
270 max_consensus_queue_size: max_inbound_size,
271 event_sender,
272 genesis_timestamp,
273 dusk_key,
274 finality_activation,
275 #[cfg(feature = "archive")]
276 archive,
277 }
278 }
279
280 async fn load_tip(
286 db: &DB,
287 vm: &VM,
288 genesis_timestamp: u64,
289 ) -> Result<BlockWithLabel> {
290 let stored_block = db.view(|t| {
291 anyhow::Ok(t.op_read(MD_HASH_KEY)?.and_then(|tip_hash| {
292 t.block(&tip_hash[..])
293 .expect("block to be found if metadata is set")
294 }))
295 })?;
296
297 let block = match stored_block {
298 Some(blk) => {
299 let (_, label) = db
300 .view(|t| t.block_label_by_height(blk.header().height))?
301 .unwrap();
302
303 BlockWithLabel::new_with_label(blk, label)
304 }
305 None => {
306 let state = vm.get_state_root()?;
309 let genesis_blk =
310 genesis::generate_block(state, genesis_timestamp);
311 db.update(|t| {
312 t.store_block(
314 genesis_blk.header(),
315 &[],
316 &[],
317 Label::Final(0),
318 )
319 })?;
320
321 BlockWithLabel::new_with_label(genesis_blk, Label::Final(0))
322 }
323 };
324
325 let block_header = block.inner().header();
326
327 tracing::info!(
328 event = "Ledger block loaded",
329 height = block_header.height,
330 hash = hex::encode(block_header.hash),
331 state_root = hex::encode(block_header.state_hash),
332 label = ?block.label()
333 );
334
335 Ok(block)
336 }
337
338 pub async fn revert_last_final(&self) -> anyhow::Result<()> {
339 self.acceptor
340 .as_ref()
341 .expect("Chain to be initialized")
342 .read()
343 .await
344 .try_revert(acceptor::RevertTarget::LastFinalizedState)
345 .await
346 }
347
348 async fn reroute_acceptor(&self, msg: Message) {
349 debug!(
350 event = "Consensus message received",
351 topic = ?msg.topic(),
352 info = ?msg.header,
353 metadata = ?msg.metadata,
354 );
355
356 let acc = self.acceptor.as_ref().expect("initialize is called");
358 if let Err(e) = acc.read().await.reroute_msg(msg).await {
359 warn!("Could not reroute msg to Consensus: {}", e);
360 }
361 }
362}