1#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate tracing;
20
21use amareleo_chain_account::Account;
22use amareleo_chain_tracing::TracingHandler;
23use amareleo_node_bft::{
24 BFT,
25 MAX_BATCH_DELAY_IN_MS,
26 Primary,
27 helpers::{
28 ConsensusReceiver,
29 PrimaryReceiver,
30 PrimarySender,
31 Storage as NarwhalStorage,
32 fmt_id,
33 init_consensus_channels,
34 },
35 spawn_blocking,
36};
37use amareleo_node_bft_ledger_service::LedgerService;
38use amareleo_node_bft_storage_service::BFTPersistentStorage;
39use snarkvm::{
40 ledger::{
41 block::Transaction,
42 narwhal::{BatchHeader, Data, Subdag, Transmission, TransmissionID},
43 puzzle::{Solution, SolutionID},
44 },
45 prelude::*,
46};
47
48use aleo_std::StorageMode;
49use anyhow::Result;
50use colored::Colorize;
51use indexmap::IndexMap;
52use lru::LruCache;
53use parking_lot::Mutex;
54use std::{future::Future, num::NonZeroUsize, sync::Arc, time::Duration};
55use tokio::{
56 sync::{OnceCell, oneshot},
57 task::JoinHandle,
58};
59use tracing::subscriber::DefaultGuard;
60
61#[cfg(feature = "metrics")]
62use std::collections::HashMap;
63
64const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
67const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
70const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
73const MAX_DEPLOYMENTS_PER_INTERVAL: usize = 1;
76
77struct TransactionsQueue<N: Network> {
79 pub deployments: LruCache<N::TransactionID, Transaction<N>>,
80 pub executions: LruCache<N::TransactionID, Transaction<N>>,
81}
82
83impl<N: Network> Default for TransactionsQueue<N> {
84 fn default() -> Self {
85 Self {
86 deployments: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()),
87 executions: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()),
88 }
89 }
90}
91
92#[derive(Clone)]
93pub struct Consensus<N: Network> {
94 ledger: Arc<dyn LedgerService<N>>,
96 bft: BFT<N>,
98 primary_sender: Arc<OnceCell<PrimarySender<N>>>,
100 solutions_queue: Arc<Mutex<LruCache<SolutionID<N>, Solution<N>>>>,
102 transactions_queue: Arc<Mutex<TransactionsQueue<N>>>,
104 seen_solutions: Arc<Mutex<LruCache<SolutionID<N>, ()>>>,
106 seen_transactions: Arc<Mutex<LruCache<N::TransactionID, ()>>>,
108 #[cfg(feature = "metrics")]
109 transmissions_queue_timestamps: Arc<Mutex<HashMap<TransmissionID<N>, i64>>>,
110 tracing: Option<TracingHandler>,
112 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
114}
115
116impl<N: Network> Consensus<N> {
117 pub fn new(
119 account: Account<N>,
120 ledger: Arc<dyn LedgerService<N>>,
121 keep_state: bool,
122 storage_mode: StorageMode,
123 tracing: Option<TracingHandler>,
124 ) -> Result<Self> {
125 let transmissions = Arc::new(BFTPersistentStorage::open(storage_mode.clone(), tracing.clone())?);
127 let storage =
129 NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::<N>::MAX_GC_ROUNDS as u64, tracing.clone());
130 let bft = BFT::new(account, storage, keep_state, storage_mode, ledger.clone(), tracing.clone())?;
132 Ok(Self {
134 ledger,
135 bft,
136 primary_sender: Default::default(),
137 solutions_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
138 transactions_queue: Default::default(),
139 seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
140 seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
141 #[cfg(feature = "metrics")]
142 transmissions_queue_timestamps: Default::default(),
143 tracing: tracing.clone(),
144 handles: Default::default(),
145 })
146 }
147
148 pub async fn run(&mut self, primary_sender: PrimarySender<N>, primary_receiver: PrimaryReceiver<N>) -> Result<()> {
150 let _guard = self.get_tracing_guard();
151 info!("Starting the consensus instance...");
152 self.primary_sender.set(primary_sender.clone()).expect("Primary sender already set");
154
155 let (consensus_sender, consensus_receiver) = init_consensus_channels();
157 self.start_handlers(consensus_receiver);
159 self.bft.run(Some(consensus_sender), primary_sender, primary_receiver).await?;
161 Ok(())
162 }
163
164 pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
166 &self.ledger
167 }
168
169 pub const fn bft(&self) -> &BFT<N> {
171 &self.bft
172 }
173
174 pub fn primary_sender(&self) -> &PrimarySender<N> {
176 self.primary_sender.get().expect("Primary sender not set")
177 }
178
179 pub fn get_tracing_guard(&self) -> Option<DefaultGuard> {
181 self.tracing.clone().map(|trace_handle| trace_handle.subscribe_thread())
182 }
183}
184
185impl<N: Network> Consensus<N> {
186 pub fn num_unconfirmed_transmissions(&self) -> usize {
188 self.bft.num_unconfirmed_transmissions()
189 }
190
191 pub fn num_unconfirmed_ratifications(&self) -> usize {
193 self.bft.num_unconfirmed_ratifications()
194 }
195
196 pub fn num_unconfirmed_solutions(&self) -> usize {
198 self.bft.num_unconfirmed_solutions()
199 }
200
201 pub fn num_unconfirmed_transactions(&self) -> usize {
203 self.bft.num_unconfirmed_transactions()
204 }
205}
206
207impl<N: Network> Consensus<N> {
208 pub fn unconfirmed_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
210 self.worker_transmission_ids().chain(self.inbound_transmission_ids())
211 }
212
213 pub fn unconfirmed_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
215 self.worker_transmissions().chain(self.inbound_transmissions())
216 }
217
218 pub fn unconfirmed_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
220 self.worker_solutions().chain(self.inbound_solutions())
221 }
222
223 pub fn unconfirmed_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
225 self.worker_transactions().chain(self.inbound_transactions())
226 }
227}
228
229impl<N: Network> Consensus<N> {
230 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
232 self.bft.worker_transmission_ids()
233 }
234
235 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
237 self.bft.worker_transmissions()
238 }
239
240 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
242 self.bft.worker_solutions()
243 }
244
245 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
247 self.bft.worker_transactions()
248 }
249}
250
251impl<N: Network> Consensus<N> {
252 pub fn inbound_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
254 self.inbound_transmissions().map(|(id, _)| id)
255 }
256
257 pub fn inbound_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
259 self.inbound_transactions()
260 .map(|(id, tx)| {
261 (
262 TransmissionID::Transaction(id, tx.to_checksum::<N>().unwrap_or_default()),
263 Transmission::Transaction(tx),
264 )
265 })
266 .chain(self.inbound_solutions().map(|(id, solution)| {
267 (
268 TransmissionID::Solution(id, solution.to_checksum::<N>().unwrap_or_default()),
269 Transmission::Solution(solution),
270 )
271 }))
272 }
273
274 pub fn inbound_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
276 self.solutions_queue.lock().clone().into_iter().map(|(id, solution)| (id, Data::Object(solution)))
278 }
279
280 pub fn inbound_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
282 let tx_queue = self.transactions_queue.lock();
284 tx_queue
286 .deployments
287 .clone()
288 .into_iter()
289 .chain(tx_queue.executions.clone())
290 .map(|(id, tx)| (id, Data::Object(tx)))
291 }
292}
293
294impl<N: Network> Consensus<N> {
295 pub async fn add_unconfirmed_solution(&self, solution: Solution<N>) -> Result<()> {
297 let _guard = self.get_tracing_guard();
298
299 let checksum = Data::<Solution<N>>::Buffer(solution.to_bytes_le()?.into()).to_checksum::<N>()?;
301 #[cfg(feature = "metrics")]
302 {
303 metrics::increment_gauge(metrics::consensus::UNCONFIRMED_SOLUTIONS, 1f64);
304 let timestamp = amareleo_node_bft::helpers::now();
305 self.transmissions_queue_timestamps
306 .lock()
307 .insert(TransmissionID::Solution(solution.id(), checksum), timestamp);
308 }
309 {
311 let solution_id = solution.id();
312
313 if self.seen_solutions.lock().put(solution_id, ()).is_some() {
315 return Ok(());
317 }
318 if self.ledger.contains_transmission(&TransmissionID::Solution(solution_id, checksum))? {
320 bail!("Solution '{}' exists in the ledger {}", fmt_id(solution_id), "(skipping)".dimmed());
321 }
322 trace!("Received unconfirmed solution '{}' in the queue", fmt_id(solution_id));
324 if self.solutions_queue.lock().put(solution_id, solution).is_some() {
325 bail!("Solution '{}' exists in the memory pool", fmt_id(solution_id));
326 }
327 }
328
329 self.process_unconfirmed_solutions().await
331 }
332
333 pub async fn process_unconfirmed_solutions(&self) -> Result<()> {
335 let _guard = self.get_tracing_guard();
336
337 let num_unconfirmed_solutions = self.num_unconfirmed_solutions();
339 let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
340 if num_unconfirmed_solutions >= N::MAX_SOLUTIONS
341 || num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE
342 {
343 return Ok(());
344 }
345 let solutions = {
347 let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions);
349 let mut queue = self.solutions_queue.lock();
351 let num_solutions = queue.len().min(capacity);
353 (0..num_solutions).filter_map(|_| queue.pop_lru().map(|(_, solution)| solution)).collect::<Vec<_>>()
355 };
356 for solution in solutions.into_iter() {
358 let solution_id = solution.id();
359 trace!("Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id));
360 if let Err(e) = self.primary_sender().send_unconfirmed_solution(solution_id, Data::Object(solution)).await {
362 if self.bft.is_synced() {
364 if self.ledger().latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
366 warn!("Failed to add unconfirmed solution '{}' to the memory pool - {e}", fmt_id(solution_id))
367 };
368 }
369 }
370 }
371 Ok(())
372 }
373
374 pub async fn add_unconfirmed_transaction(&self, transaction: Transaction<N>) -> Result<()> {
376 let _guard = self.get_tracing_guard();
377
378 let checksum = Data::<Transaction<N>>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::<N>()?;
380 #[cfg(feature = "metrics")]
381 {
382 metrics::increment_gauge(metrics::consensus::UNCONFIRMED_TRANSACTIONS, 1f64);
383 let timestamp = amareleo_node_bft::helpers::now();
384 self.transmissions_queue_timestamps
385 .lock()
386 .insert(TransmissionID::Transaction(transaction.id(), checksum), timestamp);
387 }
388 {
390 let transaction_id = transaction.id();
391
392 if transaction.is_fee() {
394 bail!("Transaction '{}' is a fee transaction {}", fmt_id(transaction_id), "(skipping)".dimmed());
395 }
396 if self.seen_transactions.lock().put(transaction_id, ()).is_some() {
398 return Ok(());
400 }
401 if self.ledger.contains_transmission(&TransmissionID::Transaction(transaction_id, checksum))? {
403 bail!("Transaction '{}' exists in the ledger {}", fmt_id(transaction_id), "(skipping)".dimmed());
404 }
405 trace!("Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id));
407 if transaction.is_deploy() {
408 if self.transactions_queue.lock().deployments.put(transaction_id, transaction).is_some() {
409 bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
410 }
411 } else if self.transactions_queue.lock().executions.put(transaction_id, transaction).is_some() {
412 bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
413 }
414
415 self.process_unconfirmed_transactions().await
417 }
418 }
419
420 pub async fn process_unconfirmed_transactions(&self) -> Result<()> {
422 let _guard = self.get_tracing_guard();
423
424 let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
426 if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE {
427 return Ok(());
428 }
429 let transactions = {
431 let capacity = Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions);
433 let mut tx_queue = self.transactions_queue.lock();
435 let num_deployments = tx_queue.deployments.len().min(capacity).min(MAX_DEPLOYMENTS_PER_INTERVAL);
437 let num_executions = tx_queue.executions.len().min(capacity.saturating_sub(num_deployments));
439 let selector_iter = (0..num_deployments).map(|_| true).interleave((0..num_executions).map(|_| false));
442 selector_iter
444 .filter_map(|select_deployment| {
445 if select_deployment {
446 tx_queue.deployments.pop_lru().map(|(_, tx)| tx)
447 } else {
448 tx_queue.executions.pop_lru().map(|(_, tx)| tx)
449 }
450 })
451 .collect_vec()
452 };
453 for transaction in transactions.into_iter() {
455 let transaction_id = transaction.id();
456 trace!("Adding unconfirmed transaction '{}' to the memory pool...", fmt_id(transaction_id));
457 if let Err(e) =
459 self.primary_sender().send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await
460 {
461 if self.bft.is_synced() {
463 warn!(
464 "Failed to add unconfirmed transaction '{}' to the memory pool - {e}",
465 fmt_id(transaction_id)
466 );
467 }
468 }
469 }
470 Ok(())
471 }
472}
473
474impl<N: Network> Consensus<N> {
475 fn start_handlers(&self, consensus_receiver: ConsensusReceiver<N>) {
477 let ConsensusReceiver { mut rx_consensus_subdag } = consensus_receiver;
478
479 let self_ = self.clone();
481 self.spawn(async move {
482 while let Some((committed_subdag, transmissions, callback)) = rx_consensus_subdag.recv().await {
483 self_.process_bft_subdag(committed_subdag, transmissions, callback).await;
484 }
485 });
486
487 let self_ = self.clone();
489 let trace_guard = self.get_tracing_guard();
490 self.spawn(async move {
491 let _guard = trace_guard;
492 loop {
493 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
495 if let Err(e) = self_.process_unconfirmed_transactions().await {
497 warn!("Cannot process unconfirmed transactions - {e}");
498 }
499 if let Err(e) = self_.process_unconfirmed_solutions().await {
501 warn!("Cannot process unconfirmed solutions - {e}");
502 }
503 }
504 });
505 }
506
507 async fn process_bft_subdag(
509 &self,
510 subdag: Subdag<N>,
511 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
512 callback: oneshot::Sender<Result<()>>,
513 ) {
514 let _guard = self.get_tracing_guard();
515
516 let self_ = self.clone();
518 let transmissions_ = transmissions.clone();
519 let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_) };
520
521 if let Err(e) = &result {
523 error!("Unable to advance to the next block - {e}");
524 self.reinsert_transmissions(transmissions).await;
526 }
527 callback.send(result).ok();
530 }
531
532 fn try_advance_to_next_block(
534 &self,
535 subdag: Subdag<N>,
536 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
537 ) -> Result<()> {
538 #[cfg(feature = "metrics")]
539 let start = subdag.leader_certificate().batch_header().timestamp();
540 #[cfg(feature = "metrics")]
541 let num_committed_certificates = subdag.values().map(|c| c.len()).sum::<usize>();
542 #[cfg(feature = "metrics")]
543 let current_block_timestamp = self.ledger.latest_block().header().metadata().timestamp();
544
545 let next_block = self.ledger.prepare_advance_to_next_quorum_block(subdag, transmissions)?;
547
548 self.ledger.advance_to_next_block(&next_block)?;
556
557 if next_block.height() % N::NUM_BLOCKS_PER_EPOCH == 0 {
559 self.solutions_queue.lock().clear();
561 self.bft.primary().clear_worker_solutions();
563 }
564
565 #[cfg(feature = "metrics")]
566 {
567 let elapsed = std::time::Duration::from_secs((amareleo_node_bft::helpers::now() - start) as u64);
568 let next_block_timestamp = next_block.header().metadata().timestamp();
569 let block_latency = next_block_timestamp - current_block_timestamp;
570 let proof_target = next_block.header().proof_target();
571 let coinbase_target = next_block.header().coinbase_target();
572 let cumulative_proof_target = next_block.header().cumulative_proof_target();
573
574 metrics::add_transmission_latency_metric(&self.transmissions_queue_timestamps, &next_block);
575
576 metrics::gauge(metrics::consensus::COMMITTED_CERTIFICATES, num_committed_certificates as f64);
577 metrics::histogram(metrics::consensus::CERTIFICATE_COMMIT_LATENCY, elapsed.as_secs_f64());
578 metrics::histogram(metrics::consensus::BLOCK_LATENCY, block_latency as f64);
579 metrics::gauge(metrics::blocks::PROOF_TARGET, proof_target as f64);
580 metrics::gauge(metrics::blocks::COINBASE_TARGET, coinbase_target as f64);
581 metrics::gauge(metrics::blocks::CUMULATIVE_PROOF_TARGET, cumulative_proof_target as f64);
582 }
583 Ok(())
584 }
585
586 async fn reinsert_transmissions(&self, transmissions: IndexMap<TransmissionID<N>, Transmission<N>>) {
588 let _guard = self.get_tracing_guard();
589
590 for (transmission_id, transmission) in transmissions.into_iter() {
592 if let Err(e) = self.reinsert_transmission(transmission_id, transmission).await {
594 warn!(
595 "Unable to reinsert transmission {}.{} into the memory pool - {e}",
596 fmt_id(transmission_id),
597 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
598 );
599 }
600 }
601 }
602
603 async fn reinsert_transmission(
605 &self,
606 transmission_id: TransmissionID<N>,
607 transmission: Transmission<N>,
608 ) -> Result<()> {
609 let (callback, callback_receiver) = oneshot::channel();
611 match (transmission_id, transmission) {
613 (TransmissionID::Ratification, Transmission::Ratification) => return Ok(()),
614 (TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => {
615 self.primary_sender().tx_unconfirmed_solution.send((solution_id, solution, callback)).await?;
617 }
618 (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) => {
619 self.primary_sender().tx_unconfirmed_transaction.send((transaction_id, transaction, callback)).await?;
621 }
622 _ => bail!("Mismatching `(transmission_id, transmission)` pair in consensus"),
623 }
624 callback_receiver.await?
626 }
627
628 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
630 self.handles.lock().push(tokio::spawn(future));
631 }
632
633 pub async fn shut_down(&self) {
635 let _guard = self.get_tracing_guard();
636 info!("Shutting down consensus...");
637 self.bft.shut_down().await;
639 self.handles.lock().iter().for_each(|handle| handle.abort());
641 }
642}