1#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate tracing;
20
21#[macro_use]
22extern crate amareleo_chain_tracing;
23
24use amareleo_chain_account::Account;
25use amareleo_chain_tracing::{TracingHandler, TracingHandlerGuard};
26use amareleo_node_bft::{
27 BFT,
28 MAX_BATCH_DELAY_IN_MS,
29 Primary,
30 helpers::{
31 ConsensusReceiver,
32 PrimaryReceiver,
33 PrimarySender,
34 Storage as NarwhalStorage,
35 fmt_id,
36 init_consensus_channels,
37 },
38 spawn_blocking,
39};
40use amareleo_node_bft_ledger_service::LedgerService;
41use amareleo_node_bft_storage_service::BFTPersistentStorage;
42use snarkvm::{
43 ledger::{
44 block::Transaction,
45 narwhal::{BatchHeader, Data, Subdag, Transmission, TransmissionID},
46 puzzle::{Solution, SolutionID},
47 },
48 prelude::*,
49};
50
51use aleo_std::StorageMode;
52use anyhow::Result;
53use colored::Colorize;
54use indexmap::IndexMap;
55use lru::LruCache;
56use parking_lot::Mutex;
57use std::{future::Future, num::NonZeroUsize, sync::Arc, time::Duration};
58use tokio::{
59 sync::{OnceCell, oneshot},
60 task::JoinHandle,
61};
62use tracing::subscriber::DefaultGuard;
63
64#[cfg(feature = "metrics")]
65use std::collections::HashMap;
66
67const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
70const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
73const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
76const MAX_DEPLOYMENTS_PER_INTERVAL: usize = 1;
79
80struct TransactionsQueue<N: Network> {
82 pub deployments: LruCache<N::TransactionID, Transaction<N>>,
83 pub executions: LruCache<N::TransactionID, Transaction<N>>,
84}
85
86impl<N: Network> Default for TransactionsQueue<N> {
87 fn default() -> Self {
88 Self {
89 deployments: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()),
90 executions: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()),
91 }
92 }
93}
94
95#[derive(Clone)]
96pub struct Consensus<N: Network> {
97 ledger: Arc<dyn LedgerService<N>>,
99 bft: BFT<N>,
101 primary_sender: Arc<OnceCell<PrimarySender<N>>>,
103 solutions_queue: Arc<Mutex<LruCache<SolutionID<N>, Solution<N>>>>,
105 transactions_queue: Arc<Mutex<TransactionsQueue<N>>>,
107 seen_solutions: Arc<Mutex<LruCache<SolutionID<N>, ()>>>,
109 seen_transactions: Arc<Mutex<LruCache<N::TransactionID, ()>>>,
111 #[cfg(feature = "metrics")]
112 transmissions_queue_timestamps: Arc<Mutex<HashMap<TransmissionID<N>, i64>>>,
113 tracing: Option<TracingHandler>,
115 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
117}
118
119impl<N: Network> TracingHandlerGuard for Consensus<N> {
120 fn get_tracing_guard(&self) -> Option<DefaultGuard> {
122 self.tracing.as_ref().and_then(|trace_handle| trace_handle.get_tracing_guard())
123 }
124}
125
126impl<N: Network> Consensus<N> {
127 pub fn new(
129 account: Account<N>,
130 ledger: Arc<dyn LedgerService<N>>,
131 keep_state: bool,
132 storage_mode: StorageMode,
133 tracing: Option<TracingHandler>,
134 ) -> Result<Self> {
135 let transmissions = Arc::new(BFTPersistentStorage::open(storage_mode.clone(), tracing.clone())?);
137 let storage =
139 NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::<N>::MAX_GC_ROUNDS as u64, tracing.clone());
140 let bft = BFT::new(account, storage, keep_state, storage_mode, ledger.clone(), tracing.clone())?;
142 Ok(Self {
144 ledger,
145 bft,
146 primary_sender: Default::default(),
147 solutions_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
148 transactions_queue: Default::default(),
149 seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
150 seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
151 #[cfg(feature = "metrics")]
152 transmissions_queue_timestamps: Default::default(),
153 tracing: tracing.clone(),
154 handles: Default::default(),
155 })
156 }
157
158 pub async fn run(&mut self, primary_sender: PrimarySender<N>, primary_receiver: PrimaryReceiver<N>) -> Result<()> {
160 guard_info!(self, "Starting the consensus instance...");
161
162 let result = self.primary_sender.set(primary_sender.clone());
164 if result.is_err() {
165 bail!("Unexpected: Primary sender already set");
166 }
167
168 let (consensus_sender, consensus_receiver) = init_consensus_channels();
170
171 self.start_handlers(consensus_receiver);
173
174 let result = self.bft.run(Some(consensus_sender), primary_sender, primary_receiver).await;
176 if let Err(err) = result {
177 guard_error!(self, "Consensus failed to run the BFT instance - {err}");
178 self.shut_down().await;
179 return Err(err);
180 }
181
182 Ok(())
183 }
184
185 pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
187 &self.ledger
188 }
189
190 pub const fn bft(&self) -> &BFT<N> {
192 &self.bft
193 }
194
195 fn primary_sender(&self) -> &PrimarySender<N> {
197 self.primary_sender.get().expect("Primary sender not set")
198 }
199}
200
201impl<N: Network> Consensus<N> {
202 pub fn num_unconfirmed_transmissions(&self) -> usize {
204 self.bft.num_unconfirmed_transmissions()
205 }
206
207 pub fn num_unconfirmed_ratifications(&self) -> usize {
209 self.bft.num_unconfirmed_ratifications()
210 }
211
212 pub fn num_unconfirmed_solutions(&self) -> usize {
214 self.bft.num_unconfirmed_solutions()
215 }
216
217 pub fn num_unconfirmed_transactions(&self) -> usize {
219 self.bft.num_unconfirmed_transactions()
220 }
221}
222
223impl<N: Network> Consensus<N> {
224 pub fn unconfirmed_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
226 self.worker_transmission_ids().chain(self.inbound_transmission_ids())
227 }
228
229 pub fn unconfirmed_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
231 self.worker_transmissions().chain(self.inbound_transmissions())
232 }
233
234 pub fn unconfirmed_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
236 self.worker_solutions().chain(self.inbound_solutions())
237 }
238
239 pub fn unconfirmed_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
241 self.worker_transactions().chain(self.inbound_transactions())
242 }
243}
244
245impl<N: Network> Consensus<N> {
246 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
248 self.bft.worker_transmission_ids()
249 }
250
251 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
253 self.bft.worker_transmissions()
254 }
255
256 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
258 self.bft.worker_solutions()
259 }
260
261 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
263 self.bft.worker_transactions()
264 }
265}
266
267impl<N: Network> Consensus<N> {
268 pub fn inbound_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
270 self.inbound_transmissions().map(|(id, _)| id)
271 }
272
273 pub fn inbound_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
275 self.inbound_transactions()
276 .map(|(id, tx)| {
277 (
278 TransmissionID::Transaction(id, tx.to_checksum::<N>().unwrap_or_default()),
279 Transmission::Transaction(tx),
280 )
281 })
282 .chain(self.inbound_solutions().map(|(id, solution)| {
283 (
284 TransmissionID::Solution(id, solution.to_checksum::<N>().unwrap_or_default()),
285 Transmission::Solution(solution),
286 )
287 }))
288 }
289
290 pub fn inbound_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
292 self.solutions_queue.lock().clone().into_iter().map(|(id, solution)| (id, Data::Object(solution)))
294 }
295
296 pub fn inbound_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
298 let tx_queue = self.transactions_queue.lock();
300 tx_queue
302 .deployments
303 .clone()
304 .into_iter()
305 .chain(tx_queue.executions.clone())
306 .map(|(id, tx)| (id, Data::Object(tx)))
307 }
308}
309
310impl<N: Network> Consensus<N> {
311 pub async fn add_unconfirmed_solution(&self, solution: Solution<N>) -> Result<()> {
313 let checksum = Data::<Solution<N>>::Buffer(solution.to_bytes_le()?.into()).to_checksum::<N>()?;
315 #[cfg(feature = "metrics")]
316 {
317 metrics::increment_gauge(metrics::consensus::UNCONFIRMED_SOLUTIONS, 1f64);
318 let timestamp = amareleo_node_bft::helpers::now();
319 self.transmissions_queue_timestamps
320 .lock()
321 .insert(TransmissionID::Solution(solution.id(), checksum), timestamp);
322 }
323 {
325 let solution_id = solution.id();
326
327 if self.seen_solutions.lock().put(solution_id, ()).is_some() {
329 return Ok(());
331 }
332 if self.ledger.contains_transmission(&TransmissionID::Solution(solution_id, checksum))? {
334 bail!("Solution '{}' exists in the ledger {}", fmt_id(solution_id), "(skipping)".dimmed());
335 }
336 guard_trace!(self, "Received unconfirmed solution '{}' in the queue", fmt_id(solution_id));
338 if self.solutions_queue.lock().put(solution_id, solution).is_some() {
339 bail!("Solution '{}' exists in the memory pool", fmt_id(solution_id));
340 }
341 }
342
343 self.process_unconfirmed_solutions().await
345 }
346
347 pub async fn process_unconfirmed_solutions(&self) -> Result<()> {
349 let num_unconfirmed_solutions = self.num_unconfirmed_solutions();
351 let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
352 if num_unconfirmed_solutions >= N::MAX_SOLUTIONS
353 || num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE
354 {
355 return Ok(());
356 }
357 let solutions = {
359 let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions);
361 let mut queue = self.solutions_queue.lock();
363 let num_solutions = queue.len().min(capacity);
365 (0..num_solutions).filter_map(|_| queue.pop_lru().map(|(_, solution)| solution)).collect::<Vec<_>>()
367 };
368 for solution in solutions.into_iter() {
370 let solution_id = solution.id();
371 guard_trace!(self, "Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id));
372 if let Err(e) = self.primary_sender().send_unconfirmed_solution(solution_id, Data::Object(solution)).await {
374 if self.bft.is_synced() {
376 if self.ledger().latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
378 guard_warn!(
379 self,
380 "Failed to add unconfirmed solution '{}' to the memory pool - {e}",
381 fmt_id(solution_id)
382 )
383 };
384 }
385 }
386 }
387 Ok(())
388 }
389
390 pub async fn add_unconfirmed_transaction(&self, transaction: Transaction<N>) -> Result<()> {
392 let checksum = Data::<Transaction<N>>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::<N>()?;
394 #[cfg(feature = "metrics")]
395 {
396 metrics::increment_gauge(metrics::consensus::UNCONFIRMED_TRANSACTIONS, 1f64);
397 let timestamp = amareleo_node_bft::helpers::now();
398 self.transmissions_queue_timestamps
399 .lock()
400 .insert(TransmissionID::Transaction(transaction.id(), checksum), timestamp);
401 }
402 {
404 let transaction_id = transaction.id();
405
406 if transaction.is_fee() {
408 bail!("Transaction '{}' is a fee transaction {}", fmt_id(transaction_id), "(skipping)".dimmed());
409 }
410 if self.seen_transactions.lock().put(transaction_id, ()).is_some() {
412 return Ok(());
414 }
415 if self.ledger.contains_transmission(&TransmissionID::Transaction(transaction_id, checksum))? {
417 bail!("Transaction '{}' exists in the ledger {}", fmt_id(transaction_id), "(skipping)".dimmed());
418 }
419 guard_trace!(self, "Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id));
421 if transaction.is_deploy() {
422 if self.transactions_queue.lock().deployments.put(transaction_id, transaction).is_some() {
423 bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
424 }
425 } else if self.transactions_queue.lock().executions.put(transaction_id, transaction).is_some() {
426 bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
427 }
428
429 self.process_unconfirmed_transactions().await
431 }
432 }
433
434 pub async fn process_unconfirmed_transactions(&self) -> Result<()> {
436 let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
438 if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE {
439 return Ok(());
440 }
441 let transactions = {
443 let capacity = Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions);
445 let mut tx_queue = self.transactions_queue.lock();
447 let num_deployments = tx_queue.deployments.len().min(capacity).min(MAX_DEPLOYMENTS_PER_INTERVAL);
449 let num_executions = tx_queue.executions.len().min(capacity.saturating_sub(num_deployments));
451 let selector_iter = (0..num_deployments).map(|_| true).interleave((0..num_executions).map(|_| false));
454 selector_iter
456 .filter_map(|select_deployment| {
457 if select_deployment {
458 tx_queue.deployments.pop_lru().map(|(_, tx)| tx)
459 } else {
460 tx_queue.executions.pop_lru().map(|(_, tx)| tx)
461 }
462 })
463 .collect_vec()
464 };
465 for transaction in transactions.into_iter() {
467 let transaction_id = transaction.id();
468 guard_trace!(self, "Adding unconfirmed transaction '{}' to the memory pool...", fmt_id(transaction_id));
469 if let Err(e) =
471 self.primary_sender().send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await
472 {
473 if self.bft.is_synced() {
475 guard_warn!(
476 self,
477 "Failed to add unconfirmed transaction '{}' to the memory pool - {e}",
478 fmt_id(transaction_id)
479 );
480 }
481 }
482 }
483 Ok(())
484 }
485}
486
487impl<N: Network> Consensus<N> {
488 fn start_handlers(&self, consensus_receiver: ConsensusReceiver<N>) {
490 let ConsensusReceiver { mut rx_consensus_subdag } = consensus_receiver;
491
492 let self_ = self.clone();
494 self.spawn(async move {
495 while let Some((committed_subdag, transmissions, callback)) = rx_consensus_subdag.recv().await {
496 self_.process_bft_subdag(committed_subdag, transmissions, callback).await;
497 }
498 });
499
500 let self_ = self.clone();
502 self.spawn(async move {
503 loop {
504 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
506 if let Err(e) = self_.process_unconfirmed_transactions().await {
508 guard_warn!(self_, "Cannot process unconfirmed transactions - {e}");
509 }
510 if let Err(e) = self_.process_unconfirmed_solutions().await {
512 guard_warn!(self_, "Cannot process unconfirmed solutions - {e}");
513 }
514 }
515 });
516 }
517
518 async fn process_bft_subdag(
520 &self,
521 subdag: Subdag<N>,
522 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
523 callback: oneshot::Sender<Result<()>>,
524 ) {
525 let self_ = self.clone();
527 let transmissions_ = transmissions.clone();
528 let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_) };
529
530 if let Err(e) = &result {
532 guard_error!(self, "Unable to advance to the next block - {e}");
533 self.reinsert_transmissions(transmissions).await;
535 }
536 callback.send(result).ok();
539 }
540
541 fn try_advance_to_next_block(
543 &self,
544 subdag: Subdag<N>,
545 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
546 ) -> Result<()> {
547 #[cfg(feature = "metrics")]
548 let start = subdag.leader_certificate().batch_header().timestamp();
549 #[cfg(feature = "metrics")]
550 let num_committed_certificates = subdag.values().map(|c| c.len()).sum::<usize>();
551 #[cfg(feature = "metrics")]
552 let current_block_timestamp = self.ledger.latest_block().header().metadata().timestamp();
553
554 let next_block = self.ledger.prepare_advance_to_next_quorum_block(subdag, transmissions)?;
556
557 self.ledger.advance_to_next_block(&next_block)?;
565
566 if next_block.height() % N::NUM_BLOCKS_PER_EPOCH == 0 {
568 self.solutions_queue.lock().clear();
570 self.bft.primary().clear_worker_solutions();
572 }
573
574 #[cfg(feature = "metrics")]
575 {
576 let elapsed = std::time::Duration::from_secs((amareleo_node_bft::helpers::now() - start) as u64);
577 let next_block_timestamp = next_block.header().metadata().timestamp();
578 let block_latency = next_block_timestamp - current_block_timestamp;
579 let proof_target = next_block.header().proof_target();
580 let coinbase_target = next_block.header().coinbase_target();
581 let cumulative_proof_target = next_block.header().cumulative_proof_target();
582
583 metrics::add_transmission_latency_metric(&self.transmissions_queue_timestamps, &next_block);
584
585 metrics::gauge(metrics::consensus::COMMITTED_CERTIFICATES, num_committed_certificates as f64);
586 metrics::histogram(metrics::consensus::CERTIFICATE_COMMIT_LATENCY, elapsed.as_secs_f64());
587 metrics::histogram(metrics::consensus::BLOCK_LATENCY, block_latency as f64);
588 metrics::gauge(metrics::blocks::PROOF_TARGET, proof_target as f64);
589 metrics::gauge(metrics::blocks::COINBASE_TARGET, coinbase_target as f64);
590 metrics::gauge(metrics::blocks::CUMULATIVE_PROOF_TARGET, cumulative_proof_target as f64);
591 }
592 Ok(())
593 }
594
595 async fn reinsert_transmissions(&self, transmissions: IndexMap<TransmissionID<N>, Transmission<N>>) {
597 for (transmission_id, transmission) in transmissions.into_iter() {
599 if let Err(e) = self.reinsert_transmission(transmission_id, transmission).await {
601 guard_warn!(
602 self,
603 "Unable to reinsert transmission {}.{} into the memory pool - {e}",
604 fmt_id(transmission_id),
605 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
606 );
607 }
608 }
609 }
610
611 async fn reinsert_transmission(
613 &self,
614 transmission_id: TransmissionID<N>,
615 transmission: Transmission<N>,
616 ) -> Result<()> {
617 let (callback, callback_receiver) = oneshot::channel();
619 match (transmission_id, transmission) {
621 (TransmissionID::Ratification, Transmission::Ratification) => return Ok(()),
622 (TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => {
623 self.primary_sender().tx_unconfirmed_solution.send((solution_id, solution, callback)).await?;
625 }
626 (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) => {
627 self.primary_sender().tx_unconfirmed_transaction.send((transaction_id, transaction, callback)).await?;
629 }
630 _ => bail!("Mismatching `(transmission_id, transmission)` pair in consensus"),
631 }
632 callback_receiver.await?
634 }
635
636 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
638 self.handles.lock().push(tokio::spawn(future));
639 }
640
641 pub async fn shut_down(&self) {
643 guard_info!(self, "Shutting down consensus...");
644
645 self.bft.shut_down().await;
647
648 let mut handles = self.handles.lock();
650 handles.iter().for_each(|handle| handle.abort());
651 handles.clear();
652 }
653}