1#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate tracing;
20
21#[macro_use]
22extern crate amareleo_chain_tracing;
23
24use amareleo_chain_account::Account;
25use amareleo_chain_tracing::{TracingHandler, TracingHandlerGuard};
26use amareleo_node_bft::{
27 BFT,
28 MAX_BATCH_DELAY_IN_MS,
29 Primary,
30 helpers::{
31 ConsensusReceiver,
32 PrimaryReceiver,
33 PrimarySender,
34 Storage as NarwhalStorage,
35 fmt_id,
36 init_consensus_channels,
37 },
38 spawn_blocking,
39};
40use amareleo_node_bft_ledger_service::LedgerService;
41use amareleo_node_bft_storage_service::BFTPersistentStorage;
42use snarkvm::{
43 ledger::{
44 block::Transaction,
45 narwhal::{BatchHeader, Data, Subdag, Transmission, TransmissionID},
46 puzzle::{Solution, SolutionID},
47 },
48 prelude::*,
49};
50
51use aleo_std::StorageMode;
52use anyhow::Result;
53use colored::Colorize;
54use indexmap::IndexMap;
55#[cfg(feature = "locktick")]
56use locktick::parking_lot::Mutex;
57use lru::LruCache;
58#[cfg(not(feature = "locktick"))]
59use parking_lot::Mutex;
60use std::{future::Future, num::NonZeroUsize, sync::Arc, time::Duration};
61use tokio::{
62 sync::{OnceCell, oneshot},
63 task::JoinHandle,
64};
65use tracing::subscriber::DefaultGuard;
66
67#[cfg(feature = "metrics")]
68use std::collections::HashMap;
69
70const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
73const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
76const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
79const MAX_DEPLOYMENTS_PER_INTERVAL: usize = 1;
82
83struct TransactionsQueue<N: Network> {
85 pub deployments: LruCache<N::TransactionID, Transaction<N>>,
86 pub executions: LruCache<N::TransactionID, Transaction<N>>,
87}
88
89impl<N: Network> Default for TransactionsQueue<N> {
90 fn default() -> Self {
91 Self {
92 deployments: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()),
93 executions: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()),
94 }
95 }
96}
97
98#[derive(Clone)]
99pub struct Consensus<N: Network> {
100 ledger: Arc<dyn LedgerService<N>>,
102 bft: BFT<N>,
104 primary_sender: Arc<OnceCell<PrimarySender<N>>>,
106 solutions_queue: Arc<Mutex<LruCache<SolutionID<N>, Solution<N>>>>,
108 transactions_queue: Arc<Mutex<TransactionsQueue<N>>>,
110 seen_solutions: Arc<Mutex<LruCache<SolutionID<N>, ()>>>,
112 seen_transactions: Arc<Mutex<LruCache<N::TransactionID, ()>>>,
114 #[cfg(feature = "metrics")]
115 transmissions_queue_timestamps: Arc<Mutex<HashMap<TransmissionID<N>, i64>>>,
116 tracing: Option<TracingHandler>,
118 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
120}
121
122impl<N: Network> TracingHandlerGuard for Consensus<N> {
123 fn get_tracing_guard(&self) -> Option<DefaultGuard> {
125 self.tracing.as_ref().and_then(|trace_handle| trace_handle.get_tracing_guard())
126 }
127}
128
129impl<N: Network> Consensus<N> {
130 pub fn new(
132 account: Account<N>,
133 ledger: Arc<dyn LedgerService<N>>,
134 keep_state: bool,
135 storage_mode: StorageMode,
136 tracing: Option<TracingHandler>,
137 ) -> Result<Self> {
138 let transmissions = Arc::new(BFTPersistentStorage::open(storage_mode.clone(), tracing.clone())?);
140 let storage =
142 NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::<N>::MAX_GC_ROUNDS as u64, tracing.clone());
143 let bft = BFT::new(account, storage, keep_state, storage_mode, ledger.clone(), tracing.clone())?;
145 Ok(Self {
147 ledger,
148 bft,
149 primary_sender: Default::default(),
150 solutions_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
151 transactions_queue: Default::default(),
152 seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
153 seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
154 #[cfg(feature = "metrics")]
155 transmissions_queue_timestamps: Default::default(),
156 tracing: tracing.clone(),
157 handles: Default::default(),
158 })
159 }
160
161 pub async fn run(&mut self, primary_sender: PrimarySender<N>, primary_receiver: PrimaryReceiver<N>) -> Result<()> {
163 guard_info!(self, "Starting the consensus instance...");
164
165 let result = self.primary_sender.set(primary_sender.clone());
167 if result.is_err() {
168 bail!("Unexpected: Primary sender already set");
169 }
170
171 let (consensus_sender, consensus_receiver) = init_consensus_channels();
173
174 self.start_handlers(consensus_receiver);
176
177 let result = self.bft.run(Some(consensus_sender), primary_sender, primary_receiver).await;
179 if let Err(err) = result {
180 guard_error!(self, "Consensus failed to run the BFT instance - {err}");
181 self.shut_down().await;
182 return Err(err);
183 }
184
185 Ok(())
186 }
187
188 pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
190 &self.ledger
191 }
192
193 pub const fn bft(&self) -> &BFT<N> {
195 &self.bft
196 }
197
198 fn primary_sender(&self) -> &PrimarySender<N> {
200 self.primary_sender.get().expect("Primary sender not set")
201 }
202}
203
204impl<N: Network> Consensus<N> {
205 pub fn num_unconfirmed_transmissions(&self) -> usize {
207 self.bft.num_unconfirmed_transmissions()
208 }
209
210 pub fn num_unconfirmed_ratifications(&self) -> usize {
212 self.bft.num_unconfirmed_ratifications()
213 }
214
215 pub fn num_unconfirmed_solutions(&self) -> usize {
217 self.bft.num_unconfirmed_solutions()
218 }
219
220 pub fn num_unconfirmed_transactions(&self) -> usize {
222 self.bft.num_unconfirmed_transactions()
223 }
224}
225
226impl<N: Network> Consensus<N> {
227 pub fn unconfirmed_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
229 self.worker_transmission_ids().chain(self.inbound_transmission_ids())
230 }
231
232 pub fn unconfirmed_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
234 self.worker_transmissions().chain(self.inbound_transmissions())
235 }
236
237 pub fn unconfirmed_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
239 self.worker_solutions().chain(self.inbound_solutions())
240 }
241
242 pub fn unconfirmed_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
244 self.worker_transactions().chain(self.inbound_transactions())
245 }
246}
247
248impl<N: Network> Consensus<N> {
249 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
251 self.bft.worker_transmission_ids()
252 }
253
254 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
256 self.bft.worker_transmissions()
257 }
258
259 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
261 self.bft.worker_solutions()
262 }
263
264 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
266 self.bft.worker_transactions()
267 }
268}
269
270impl<N: Network> Consensus<N> {
271 pub fn inbound_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
273 self.inbound_transmissions().map(|(id, _)| id)
274 }
275
276 pub fn inbound_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
278 self.inbound_transactions()
279 .map(|(id, tx)| {
280 (
281 TransmissionID::Transaction(id, tx.to_checksum::<N>().unwrap_or_default()),
282 Transmission::Transaction(tx),
283 )
284 })
285 .chain(self.inbound_solutions().map(|(id, solution)| {
286 (
287 TransmissionID::Solution(id, solution.to_checksum::<N>().unwrap_or_default()),
288 Transmission::Solution(solution),
289 )
290 }))
291 }
292
293 pub fn inbound_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
295 self.solutions_queue.lock().clone().into_iter().map(|(id, solution)| (id, Data::Object(solution)))
297 }
298
299 pub fn inbound_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
301 let tx_queue = self.transactions_queue.lock();
303 tx_queue
305 .deployments
306 .clone()
307 .into_iter()
308 .chain(tx_queue.executions.clone())
309 .map(|(id, tx)| (id, Data::Object(tx)))
310 }
311}
312
313impl<N: Network> Consensus<N> {
314 pub async fn add_unconfirmed_solution(&self, solution: Solution<N>) -> Result<()> {
316 let checksum = Data::<Solution<N>>::Buffer(solution.to_bytes_le()?.into()).to_checksum::<N>()?;
318 #[cfg(feature = "metrics")]
319 {
320 metrics::increment_gauge(metrics::consensus::UNCONFIRMED_SOLUTIONS, 1f64);
321 let timestamp = amareleo_node_bft::helpers::now();
322 self.transmissions_queue_timestamps
323 .lock()
324 .insert(TransmissionID::Solution(solution.id(), checksum), timestamp);
325 }
326 {
328 let solution_id = solution.id();
329
330 if self.seen_solutions.lock().put(solution_id, ()).is_some() {
332 return Ok(());
334 }
335 if self.ledger.contains_transmission(&TransmissionID::Solution(solution_id, checksum))? {
337 bail!("Solution '{}' exists in the ledger {}", fmt_id(solution_id), "(skipping)".dimmed());
338 }
339 guard_trace!(self, "Received unconfirmed solution '{}' in the queue", fmt_id(solution_id));
341 if self.solutions_queue.lock().put(solution_id, solution).is_some() {
342 bail!("Solution '{}' exists in the memory pool", fmt_id(solution_id));
343 }
344 }
345
346 self.process_unconfirmed_solutions().await
348 }
349
350 pub async fn process_unconfirmed_solutions(&self) -> Result<()> {
352 let num_unconfirmed_solutions = self.num_unconfirmed_solutions();
354 let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
355 if num_unconfirmed_solutions >= N::MAX_SOLUTIONS
356 || num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE
357 {
358 return Ok(());
359 }
360 let solutions = {
362 let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions);
364 let mut queue = self.solutions_queue.lock();
366 let num_solutions = queue.len().min(capacity);
368 (0..num_solutions).filter_map(|_| queue.pop_lru().map(|(_, solution)| solution)).collect::<Vec<_>>()
370 };
371 for solution in solutions.into_iter() {
373 let solution_id = solution.id();
374 guard_trace!(self, "Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id));
375 if let Err(e) = self.primary_sender().send_unconfirmed_solution(solution_id, Data::Object(solution)).await {
377 if self.bft.is_synced() {
379 if self.ledger().latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
381 guard_warn!(
382 self,
383 "Failed to add unconfirmed solution '{}' to the memory pool - {e}",
384 fmt_id(solution_id)
385 )
386 };
387 }
388 }
389 }
390 Ok(())
391 }
392
393 pub async fn add_unconfirmed_transaction(&self, transaction: Transaction<N>) -> Result<()> {
395 let checksum = Data::<Transaction<N>>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::<N>()?;
397 #[cfg(feature = "metrics")]
398 {
399 metrics::increment_gauge(metrics::consensus::UNCONFIRMED_TRANSACTIONS, 1f64);
400 let timestamp = amareleo_node_bft::helpers::now();
401 self.transmissions_queue_timestamps
402 .lock()
403 .insert(TransmissionID::Transaction(transaction.id(), checksum), timestamp);
404 }
405 {
407 let transaction_id = transaction.id();
408
409 if transaction.is_fee() {
411 bail!("Transaction '{}' is a fee transaction {}", fmt_id(transaction_id), "(skipping)".dimmed());
412 }
413 if self.seen_transactions.lock().put(transaction_id, ()).is_some() {
415 return Ok(());
417 }
418 if self.ledger.contains_transmission(&TransmissionID::Transaction(transaction_id, checksum))? {
420 bail!("Transaction '{}' exists in the ledger {}", fmt_id(transaction_id), "(skipping)".dimmed());
421 }
422 guard_trace!(self, "Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id));
424 if transaction.is_deploy() {
425 if self.transactions_queue.lock().deployments.put(transaction_id, transaction).is_some() {
426 bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
427 }
428 } else if self.transactions_queue.lock().executions.put(transaction_id, transaction).is_some() {
429 bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
430 }
431
432 self.process_unconfirmed_transactions().await
434 }
435 }
436
437 pub async fn process_unconfirmed_transactions(&self) -> Result<()> {
439 let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
441 if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE {
442 return Ok(());
443 }
444 let transactions = {
446 let capacity = Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions);
448 let mut tx_queue = self.transactions_queue.lock();
450 let num_deployments = tx_queue.deployments.len().min(capacity).min(MAX_DEPLOYMENTS_PER_INTERVAL);
452 let num_executions = tx_queue.executions.len().min(capacity.saturating_sub(num_deployments));
454 let selector_iter = (0..num_deployments).map(|_| true).interleave((0..num_executions).map(|_| false));
457 selector_iter
459 .filter_map(|select_deployment| {
460 if select_deployment {
461 tx_queue.deployments.pop_lru().map(|(_, tx)| tx)
462 } else {
463 tx_queue.executions.pop_lru().map(|(_, tx)| tx)
464 }
465 })
466 .collect_vec()
467 };
468 for transaction in transactions.into_iter() {
470 let transaction_id = transaction.id();
471 guard_trace!(self, "Adding unconfirmed transaction '{}' to the memory pool...", fmt_id(transaction_id));
472 if let Err(e) =
474 self.primary_sender().send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await
475 {
476 if self.bft.is_synced() {
478 guard_warn!(
479 self,
480 "Failed to add unconfirmed transaction '{}' to the memory pool - {e}",
481 fmt_id(transaction_id)
482 );
483 }
484 }
485 }
486 Ok(())
487 }
488}
489
490impl<N: Network> Consensus<N> {
491 fn start_handlers(&self, consensus_receiver: ConsensusReceiver<N>) {
493 let ConsensusReceiver { mut rx_consensus_subdag } = consensus_receiver;
494
495 let self_ = self.clone();
497 self.spawn(async move {
498 while let Some((committed_subdag, transmissions, callback)) = rx_consensus_subdag.recv().await {
499 self_.process_bft_subdag(committed_subdag, transmissions, callback).await;
500 }
501 });
502
503 let self_ = self.clone();
505 self.spawn(async move {
506 loop {
507 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
509 if let Err(e) = self_.process_unconfirmed_transactions().await {
511 guard_warn!(self_, "Cannot process unconfirmed transactions - {e}");
512 }
513 if let Err(e) = self_.process_unconfirmed_solutions().await {
515 guard_warn!(self_, "Cannot process unconfirmed solutions - {e}");
516 }
517 }
518 });
519 }
520
521 async fn process_bft_subdag(
523 &self,
524 subdag: Subdag<N>,
525 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
526 callback: oneshot::Sender<Result<()>>,
527 ) {
528 let self_ = self.clone();
530 let transmissions_ = transmissions.clone();
531 let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_) };
532
533 if let Err(e) = &result {
535 guard_error!(self, "Unable to advance to the next block - {e}");
536 self.reinsert_transmissions(transmissions).await;
538 }
539 callback.send(result).ok();
542 }
543
544 fn try_advance_to_next_block(
546 &self,
547 subdag: Subdag<N>,
548 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
549 ) -> Result<()> {
550 #[cfg(feature = "metrics")]
551 let start = subdag.leader_certificate().batch_header().timestamp();
552 #[cfg(feature = "metrics")]
553 let num_committed_certificates = subdag.values().map(|c| c.len()).sum::<usize>();
554 #[cfg(feature = "metrics")]
555 let current_block_timestamp = self.ledger.latest_block().header().metadata().timestamp();
556
557 let next_block = self.ledger.prepare_advance_to_next_quorum_block(subdag, transmissions)?;
559
560 self.ledger.advance_to_next_block(&next_block)?;
568
569 if next_block.height() % N::NUM_BLOCKS_PER_EPOCH == 0 {
571 self.solutions_queue.lock().clear();
573 self.bft.primary().clear_worker_solutions();
575 }
576
577 #[cfg(feature = "metrics")]
578 {
579 let elapsed = std::time::Duration::from_secs((amareleo_node_bft::helpers::now() - start) as u64);
580 let next_block_timestamp = next_block.header().metadata().timestamp();
581 let block_latency = next_block_timestamp - current_block_timestamp;
582 let proof_target = next_block.header().proof_target();
583 let coinbase_target = next_block.header().coinbase_target();
584 let cumulative_proof_target = next_block.header().cumulative_proof_target();
585
586 metrics::add_transmission_latency_metric(&self.transmissions_queue_timestamps, &next_block);
587
588 metrics::gauge(metrics::consensus::COMMITTED_CERTIFICATES, num_committed_certificates as f64);
589 metrics::histogram(metrics::consensus::CERTIFICATE_COMMIT_LATENCY, elapsed.as_secs_f64());
590 metrics::histogram(metrics::consensus::BLOCK_LATENCY, block_latency as f64);
591 metrics::gauge(metrics::blocks::PROOF_TARGET, proof_target as f64);
592 metrics::gauge(metrics::blocks::COINBASE_TARGET, coinbase_target as f64);
593 metrics::gauge(metrics::blocks::CUMULATIVE_PROOF_TARGET, cumulative_proof_target as f64);
594 }
595 Ok(())
596 }
597
598 async fn reinsert_transmissions(&self, transmissions: IndexMap<TransmissionID<N>, Transmission<N>>) {
600 for (transmission_id, transmission) in transmissions.into_iter() {
602 if let Err(e) = self.reinsert_transmission(transmission_id, transmission).await {
604 guard_warn!(
605 self,
606 "Unable to reinsert transmission {}.{} into the memory pool - {e}",
607 fmt_id(transmission_id),
608 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
609 );
610 }
611 }
612 }
613
614 async fn reinsert_transmission(
616 &self,
617 transmission_id: TransmissionID<N>,
618 transmission: Transmission<N>,
619 ) -> Result<()> {
620 let (callback, callback_receiver) = oneshot::channel();
622 match (transmission_id, transmission) {
624 (TransmissionID::Ratification, Transmission::Ratification) => return Ok(()),
625 (TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => {
626 self.primary_sender().tx_unconfirmed_solution.send((solution_id, solution, callback)).await?;
628 }
629 (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) => {
630 self.primary_sender().tx_unconfirmed_transaction.send((transaction_id, transaction, callback)).await?;
632 }
633 _ => bail!("Mismatching `(transmission_id, transmission)` pair in consensus"),
634 }
635 callback_receiver.await?
637 }
638
639 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
641 self.handles.lock().push(tokio::spawn(future));
642 }
643
644 pub async fn shut_down(&self) {
646 guard_info!(self, "Shutting down consensus...");
647
648 self.bft.shut_down().await;
650
651 let mut handles = self.handles.lock();
653 handles.iter().for_each(|handle| handle.abort());
654 handles.clear();
655 }
656}