1mod acceptor;
8mod consensus;
9mod fallback;
10mod fsm;
11mod genesis;
12
13mod header_validation;
14mod metrics;
15
16use std::ops::Deref;
17use std::sync::Arc;
18use std::time::Duration;
19
20use anyhow::Result;
21use async_trait::async_trait;
22use dusk_consensus::config::is_emergency_block;
23use dusk_consensus::errors::ConsensusError;
24use dusk_core::signatures::bls::PublicKey as BlsPublicKey;
25pub use header_validation::verify_att;
26use node_data::events::Event;
27use node_data::ledger::{to_str, BlockWithLabel, Label};
28use node_data::message::payload::RatificationResult;
29use node_data::message::{AsyncQueue, Payload, Topics};
30use tokio::sync::mpsc::Sender;
31use tokio::sync::RwLock;
32use tokio::time::{sleep_until, Instant};
33use tracing::{debug, error, info, warn};
34
35use self::acceptor::Acceptor;
36use self::fsm::SimpleFSM;
37#[cfg(feature = "archive")]
38use crate::archive::Archive;
39use crate::database::rocksdb::MD_HASH_KEY;
40use crate::database::{Ledger, Metadata};
41use crate::{database, vm, LongLivedService, Message, Network};
42
43const TOPICS: &[u8] = &[
44 Topics::Block as u8,
45 Topics::Candidate as u8,
46 Topics::Validation as u8,
47 Topics::Ratification as u8,
48 Topics::Quorum as u8,
49 Topics::ValidationQuorum as u8,
50];
51
52const HEARTBEAT_SEC: Duration = Duration::from_secs(3);
53
54pub struct ChainSrv<N: Network, DB: database::DB, VM: vm::VMExecution> {
55 inbound: AsyncQueue<Message>,
57 keys_path: String,
58 acceptor: Option<Arc<RwLock<Acceptor<N, DB, VM>>>>,
59 max_consensus_queue_size: usize,
60 event_sender: Sender<Event>,
62 genesis_timestamp: u64,
63 dusk_key: BlsPublicKey,
64 finality_activation: u64,
65 blob_expire_after: u64,
66 #[cfg(feature = "archive")]
67 archive: Archive,
68}
69
70#[async_trait]
71impl<N: Network, DB: database::DB, VM: vm::VMExecution>
72 LongLivedService<N, DB, VM> for ChainSrv<N, DB, VM>
73{
74 async fn initialize(
75 &mut self,
76 network: Arc<RwLock<N>>,
77 db: Arc<RwLock<DB>>,
78 vm: Arc<RwLock<VM>>,
79 ) -> anyhow::Result<()> {
80 let tip = Self::load_tip(
81 db.read().await.deref(),
82 vm.read().await.deref(),
83 self.genesis_timestamp,
84 )
85 .await?;
86
87 let acc = Acceptor::init_consensus(
89 &self.keys_path,
90 tip,
91 db,
92 network,
93 vm,
94 #[cfg(feature = "archive")]
95 self.archive.clone(),
96 self.max_consensus_queue_size,
97 self.event_sender.clone(),
98 self.dusk_key,
99 self.finality_activation,
100 self.blob_expire_after,
101 )
102 .await?;
103
104 self.acceptor = Some(Arc::new(RwLock::new(acc)));
105
106 Ok(())
107 }
108
109 async fn execute(
110 &mut self,
111 network: Arc<RwLock<N>>,
112 _db: Arc<RwLock<DB>>,
113 _vm: Arc<RwLock<VM>>,
114 ) -> anyhow::Result<usize> {
115 LongLivedService::<N, DB, VM>::add_routes(
117 self,
118 TOPICS,
119 self.inbound.clone(),
120 &network,
121 )
122 .await?;
123
124 let acc = self.acceptor.as_mut().expect("initialize is called");
125 acc.write().await.spawn_task().await;
126
127 let mut fsm = SimpleFSM::new(acc.clone(), network.clone()).await;
129
130 let outbound_chan = acc.read().await.get_outbound_chan().await;
131 let result_chan = acc.read().await.get_result_chan().await;
132
133 let mut heartbeat = Instant::now().checked_add(HEARTBEAT_SEC).unwrap();
134
135 loop {
137 tokio::select! {
138 biased;
139 recv = result_chan.recv() => {
141 match recv? {
142 Err(ConsensusError::Canceled(round)) => {
143 debug!(event = "consensus canceled", round);
144 }
145 Err(err) => {
146 error!(event = "failed_consensus", ?err);
148 fsm.on_failed_consensus().await;
149 }
150 _ => {}
151 }
152 },
153 recv = self.inbound.recv() => {
155 let msg = recv?;
156
157 match msg.payload {
158 Payload::Candidate(_)
159 | Payload::Validation(_)
160 | Payload::Ratification(_)
161 | Payload::ValidationQuorum(_) => {
162 self.reroute_acceptor(msg).await;
163 }
164
165 Payload::Quorum(ref q) => {
166 fsm.on_quorum(q, msg.metadata.as_ref()).await;
167 self.reroute_acceptor(msg).await;
168
169 }
170
171 Payload::Block(blk) => {
172 info!(
173 event = "New block",
174 src = "Block msg",
175 height = blk.header().height,
176 iter = blk.header().iteration,
177 hash = to_str(&blk.header().hash),
178 metadata = ?msg.metadata,
179 );
180
181 match fsm.on_block_event(*blk, msg.metadata.clone()).await {
185 Ok(res) => {
186 if let Some(accepted_blk) = res {
187 if is_emergency_block(accepted_blk.header().iteration){
190 let mut eb_msg = Message::from(accepted_blk);
194 eb_msg.metadata = msg.metadata;
195 if let Err(e) = network.read().await.broadcast(&eb_msg).await {
196 warn!("Unable to re-broadcast Emergency Block: {e}");
197 }
198 }
199 }
200 }
201 Err(err) => {
202 error!(event = "fsm::on_event failed", src = "wire", err = ?err);
203 }
204 }
205 }
206
207 _ => {
208 warn!("invalid inbound message");
209 },
210 }
211
212 },
213 recv = outbound_chan.recv() => {
215 let msg = recv?;
216
217 if let Payload::Quorum(quorum) = &msg.payload {
221 if let RatificationResult::Success(_) = quorum.att.result {
222 fsm.on_success_quorum(quorum, msg.metadata.clone()).await;
223 }
224 }
225
226 if let Payload::GetResource(res) = &msg.payload {
227 if let Err(e) = network.read().await.flood_request(res.get_inv(), None, 16).await {
228 warn!("Unable to re-route message {e}");
229 }
230 } else if let Err(e) = network.read().await.broadcast(&msg).await {
231 warn!("Unable to broadcast message {e}");
232 }
233
234 },
235 _ = sleep_until(heartbeat) => {
237 if let Err(err) = fsm.on_heartbeat_event().await {
238 error!(event = "heartbeat_failed", ?err);
239 }
240
241 heartbeat = Instant::now().checked_add(HEARTBEAT_SEC).unwrap();
242 },
243 }
244 }
245 }
246
247 fn name(&self) -> &'static str {
249 "chain"
250 }
251}
252
253impl<N: Network, DB: database::DB, VM: vm::VMExecution> ChainSrv<N, DB, VM> {
254 #[allow(clippy::too_many_arguments)]
255 pub fn new(
256 keys_path: String,
257 max_inbound_size: usize,
258 event_sender: Sender<Event>,
259 genesis_timestamp: u64,
260 dusk_key: BlsPublicKey,
261 finality_activation: u64,
262 blob_expire_after: u64,
263 #[cfg(feature = "archive")] archive: Archive,
264 ) -> Self {
265 info!(
266 "ChainSrv::new with keys_path: {}, max_inbound_size: {}",
267 keys_path, max_inbound_size
268 );
269
270 Self {
271 inbound: AsyncQueue::bounded(max_inbound_size, "chain_inbound"),
272 keys_path,
273 acceptor: None,
274 max_consensus_queue_size: max_inbound_size,
275 event_sender,
276 genesis_timestamp,
277 dusk_key,
278 finality_activation,
279 blob_expire_after,
280 #[cfg(feature = "archive")]
281 archive,
282 }
283 }
284
285 async fn load_tip(
291 db: &DB,
292 vm: &VM,
293 genesis_timestamp: u64,
294 ) -> Result<BlockWithLabel> {
295 let stored_block = db.view(|t| {
296 anyhow::Ok(t.op_read(MD_HASH_KEY)?.and_then(|tip_hash| {
297 t.block(&tip_hash[..])
298 .expect("block to be found if metadata is set")
299 }))
300 })?;
301
302 let block = match stored_block {
303 Some(blk) => {
304 let (_, label) = db
305 .view(|t| t.block_label_by_height(blk.header().height))?
306 .unwrap();
307
308 BlockWithLabel::new_with_label(blk, label)
309 }
310 None => {
311 let state = vm.get_state_root()?;
314 let genesis_blk =
315 genesis::generate_block(state, genesis_timestamp);
316 db.update(|t| {
317 t.store_block(
319 genesis_blk.header(),
320 &[],
321 &[],
322 Label::Final(0),
323 )
324 })?;
325
326 BlockWithLabel::new_with_label(genesis_blk, Label::Final(0))
327 }
328 };
329
330 let block_header = block.inner().header();
331
332 tracing::info!(
333 event = "Ledger block loaded",
334 height = block_header.height,
335 hash = hex::encode(block_header.hash),
336 state_root = hex::encode(block_header.state_hash),
337 label = ?block.label()
338 );
339
340 Ok(block)
341 }
342
343 pub async fn revert_last_final(&self) -> anyhow::Result<()> {
344 self.acceptor
345 .as_ref()
346 .expect("Chain to be initialized")
347 .read()
348 .await
349 .try_revert(acceptor::RevertTarget::LastFinalizedState)
350 .await
351 }
352
353 async fn reroute_acceptor(&self, msg: Message) {
354 debug!(
355 event = "Consensus message received",
356 topic = ?msg.topic(),
357 info = ?msg.header,
358 metadata = ?msg.metadata,
359 );
360
361 let acc = self.acceptor.as_ref().expect("initialize is called");
363 if let Err(e) = acc.read().await.reroute_msg(msg).await {
364 warn!("Could not reroute msg to Consensus: {}", e);
365 }
366 }
367}