1#![forbid(unsafe_code)]
17
18mod transactions_queue;
19use transactions_queue::TransactionsQueue;
20
21#[macro_use]
22extern crate tracing;
23
24#[cfg(feature = "metrics")]
25extern crate snarkos_node_metrics as metrics;
26
27use snarkos_account::Account;
28use snarkos_node_bft::{
29 BFT,
30 MAX_BATCH_DELAY_IN_MS,
31 Primary,
32 helpers::{
33 ConsensusReceiver,
34 PrimarySender,
35 Storage as NarwhalStorage,
36 fmt_id,
37 init_consensus_channels,
38 init_primary_channels,
39 },
40 spawn_blocking,
41};
42use snarkos_node_bft_ledger_service::LedgerService;
43use snarkos_node_bft_storage_service::BFTPersistentStorage;
44use snarkos_node_sync::{BlockSync, Ping};
45use snarkos_utilities::NodeDataDir;
46
47use snarkvm::{
48 ledger::{
49 block::Transaction,
50 narwhal::{BatchHeader, Data, Subdag, Transmission, TransmissionID},
51 puzzle::{Solution, SolutionID},
52 },
53 prelude::*,
54 utilities::flatten_error,
55};
56
57use aleo_std::StorageMode;
58use anyhow::{Context, Result};
59use colored::Colorize;
60use indexmap::IndexMap;
61#[cfg(feature = "locktick")]
62use locktick::parking_lot::{Mutex, RwLock};
63use lru::LruCache;
64#[cfg(not(feature = "locktick"))]
65use parking_lot::{Mutex, RwLock};
66use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc, time::Duration};
67use tokio::{sync::oneshot, task::JoinHandle};
68
69#[cfg(feature = "metrics")]
70use std::collections::HashMap;
71
72const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
75const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
78const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
81const MAX_DEPLOYMENTS_PER_INTERVAL: usize = 1;
84
85#[derive(Clone)]
92pub struct Consensus<N: Network> {
93 ledger: Arc<dyn LedgerService<N>>,
95 bft: BFT<N>,
97 primary_sender: PrimarySender<N>,
99 solutions_queue: Arc<Mutex<LruCache<SolutionID<N>, Solution<N>>>>,
101 transactions_queue: Arc<RwLock<TransactionsQueue<N>>>,
103 seen_solutions: Arc<Mutex<LruCache<SolutionID<N>, ()>>>,
105 seen_transactions: Arc<Mutex<LruCache<N::TransactionID, ()>>>,
107 #[cfg(feature = "metrics")]
108 transmissions_tracker: Arc<Mutex<HashMap<TransmissionID<N>, i64>>>,
109 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
111 ping: Arc<Ping<N>>,
113 block_sync: Arc<BlockSync<N>>,
115}
116
117impl<N: Network> Consensus<N> {
118 #[allow(clippy::too_many_arguments)]
120 pub async fn new(
121 account: Account<N>,
122 ledger: Arc<dyn LedgerService<N>>,
123 block_sync: Arc<BlockSync<N>>,
124 ip: Option<SocketAddr>,
125 trusted_validators: &[SocketAddr],
126 trusted_peers_only: bool,
127 storage_mode: StorageMode,
128 node_data_dir: NodeDataDir,
129 ping: Arc<Ping<N>>,
130 dev: Option<u16>,
131 ) -> Result<Self> {
132 let (primary_sender, primary_receiver) = init_primary_channels::<N>();
134 let transmissions = Arc::new(BFTPersistentStorage::open(storage_mode.clone())?);
136 let storage = NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::<N>::MAX_GC_ROUNDS as u64);
138 let bft = BFT::new(
140 account,
141 storage,
142 ledger.clone(),
143 block_sync.clone(),
144 ip,
145 trusted_validators,
146 trusted_peers_only,
147 node_data_dir,
148 dev,
149 )?;
150 let mut _self = Self {
152 ledger,
153 bft,
154 block_sync,
155 primary_sender,
156 solutions_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
157 transactions_queue: Default::default(),
158 seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
159 seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
160 #[cfg(feature = "metrics")]
161 transmissions_tracker: Default::default(),
162 handles: Default::default(),
163 ping: ping.clone(),
164 };
165
166 info!("Starting the consensus instance...");
167
168 let (consensus_sender, consensus_receiver) = init_consensus_channels();
170 _self.start_handlers(consensus_receiver);
172 _self.bft.run(Some(ping), Some(consensus_sender), _self.primary_sender.clone(), primary_receiver).await?;
174
175 Ok(_self)
176 }
177
178 pub const fn bft(&self) -> &BFT<N> {
180 &self.bft
181 }
182
183 pub fn contains_transaction(&self, transaction_id: &N::TransactionID) -> bool {
184 self.transactions_queue.read().contains(transaction_id)
185 }
186}
187
188impl<N: Network> Consensus<N> {
189 pub fn num_unconfirmed_transmissions(&self) -> usize {
191 self.bft.num_unconfirmed_transmissions()
192 }
193
194 pub fn num_unconfirmed_ratifications(&self) -> usize {
196 self.bft.num_unconfirmed_ratifications()
197 }
198
199 pub fn num_unconfirmed_solutions(&self) -> usize {
201 self.bft.num_unconfirmed_solutions()
202 }
203
204 pub fn num_unconfirmed_transactions(&self) -> usize {
206 self.bft.num_unconfirmed_transactions()
207 }
208}
209
210impl<N: Network> Consensus<N> {
211 pub fn unconfirmed_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
213 self.worker_transmission_ids().chain(self.inbound_transmission_ids())
214 }
215
216 pub fn unconfirmed_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
218 self.worker_transmissions().chain(self.inbound_transmissions())
219 }
220
221 pub fn unconfirmed_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
223 self.worker_solutions().chain(self.inbound_solutions())
224 }
225
226 pub fn unconfirmed_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
228 self.worker_transactions().chain(self.inbound_transactions())
229 }
230}
231
232impl<N: Network> Consensus<N> {
233 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
235 self.bft.worker_transmission_ids()
236 }
237
238 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
240 self.bft.worker_transmissions()
241 }
242
243 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
245 self.bft.worker_solutions()
246 }
247
248 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
250 self.bft.worker_transactions()
251 }
252}
253
254impl<N: Network> Consensus<N> {
255 pub fn inbound_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
257 self.inbound_transmissions().map(|(id, _)| id)
258 }
259
260 pub fn inbound_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
262 self.inbound_transactions()
263 .map(|(id, tx)| {
264 (
265 TransmissionID::Transaction(id, tx.to_checksum::<N>().unwrap_or_default()),
266 Transmission::Transaction(tx),
267 )
268 })
269 .chain(self.inbound_solutions().map(|(id, solution)| {
270 (
271 TransmissionID::Solution(id, solution.to_checksum::<N>().unwrap_or_default()),
272 Transmission::Solution(solution),
273 )
274 }))
275 }
276
277 pub fn inbound_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
279 self.solutions_queue.lock().clone().into_iter().map(|(id, solution)| (id, Data::Object(solution)))
281 }
282
283 pub fn inbound_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
285 self.transactions_queue.read().transactions().map(|(id, tx)| (id, Data::Object(tx)))
287 }
288}
289
290impl<N: Network> Consensus<N> {
291 pub async fn add_unconfirmed_solution(&self, solution: Solution<N>) -> Result<()> {
294 let checksum = Data::<Solution<N>>::Buffer(solution.to_bytes_le()?.into()).to_checksum::<N>()?;
296 {
298 let solution_id = solution.id();
299
300 if self.seen_solutions.lock().put(solution_id, ()).is_some() {
302 return Ok(());
304 }
305 if self.ledger.contains_transmission(&TransmissionID::Solution(solution_id, checksum))? {
307 bail!("Solution '{}' exists in the ledger {}", fmt_id(solution_id), "(skipping)".dimmed());
308 }
309 #[cfg(feature = "metrics")]
310 {
311 metrics::increment_gauge(metrics::consensus::UNCONFIRMED_SOLUTIONS, 1f64);
312 let timestamp = snarkos_node_bft::helpers::now();
313 self.transmissions_tracker.lock().insert(TransmissionID::Solution(solution.id(), checksum), timestamp);
314 }
315 trace!("Received unconfirmed solution '{}' in the queue", fmt_id(solution_id));
317 if self.solutions_queue.lock().put(solution_id, solution).is_some() {
318 bail!("Solution '{}' exists in the memory pool", fmt_id(solution_id));
319 }
320 }
321
322 self.process_unconfirmed_solutions().await
324 }
325
326 async fn process_unconfirmed_solutions(&self) -> Result<()> {
329 let num_unconfirmed_solutions = self.num_unconfirmed_solutions();
331 let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
332 if num_unconfirmed_solutions >= N::MAX_SOLUTIONS
333 || num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE
334 {
335 return Ok(());
336 }
337 let solutions = {
339 let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions);
341 let mut queue = self.solutions_queue.lock();
343 let num_solutions = queue.len().min(capacity);
345 (0..num_solutions).filter_map(|_| queue.pop_lru().map(|(_, solution)| solution)).collect::<Vec<_>>()
347 };
348 for solution in solutions.into_iter() {
350 let solution_id = solution.id();
351 trace!("Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id));
352 match self.primary_sender.send_unconfirmed_solution(solution_id, Data::Object(solution)).await {
354 Ok(true) => {}
355 Ok(false) => debug!(
356 "Unable to add unconfirmed solution '{}' to the memory pool. Already exists.",
357 fmt_id(solution_id)
358 ),
359 Err(err) => {
360 let err = err.context(format!(
361 "Unable to add unconfirmed solution '{}' to the memory pool",
362 fmt_id(solution_id)
363 ));
364
365 if self.bft.is_synced() && self.ledger.latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
367 warn!("{}", flatten_error(err));
368 } else {
369 trace!("{}", flatten_error(err));
370 }
371 }
372 }
373 }
374 Ok(())
375 }
376
377 pub async fn add_unconfirmed_transaction(&self, transaction: Transaction<N>) -> Result<()> {
380 let checksum = Data::<Transaction<N>>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::<N>()?;
382 {
384 let transaction_id = transaction.id();
385
386 if transaction.is_fee() {
388 bail!("Transaction '{}' is a fee transaction {}", fmt_id(transaction_id), "(skipping)".dimmed());
389 }
390 if self.seen_transactions.lock().put(transaction_id, ()).is_some() {
392 return Ok(());
394 }
395 if self.ledger.contains_transmission(&TransmissionID::Transaction(transaction_id, checksum))? {
397 bail!("Transaction '{}' exists in the ledger {}", fmt_id(transaction_id), "(skipping)".dimmed());
398 }
399 if self.contains_transaction(&transaction_id) {
401 bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
402 }
403 #[cfg(feature = "metrics")]
404 {
405 metrics::increment_gauge(metrics::consensus::UNCONFIRMED_TRANSACTIONS, 1f64);
406 let timestamp = snarkos_node_bft::helpers::now();
407 self.transmissions_tracker
408 .lock()
409 .insert(TransmissionID::Transaction(transaction.id(), checksum), timestamp);
410 }
411 trace!("Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id));
413 let priority_fee = transaction.priority_fee_amount()?;
414 self.transactions_queue.write().insert(transaction_id, transaction, priority_fee)?;
415 }
416
417 self.process_unconfirmed_transactions().await
419 }
420
421 async fn process_unconfirmed_transactions(&self) -> Result<()> {
424 let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
426 if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE {
427 return Ok(());
428 }
429 let transactions = {
431 let capacity = Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions);
433 let mut tx_queue = self.transactions_queue.write();
435 let num_deployments = tx_queue.deployments.len().min(capacity).min(MAX_DEPLOYMENTS_PER_INTERVAL);
437 let num_executions = tx_queue.executions.len().min(capacity.saturating_sub(num_deployments));
439 let selector_iter = (0..num_deployments).map(|_| true).interleave((0..num_executions).map(|_| false));
442 selector_iter
444 .filter_map(
445 |select_deployment| {
446 if select_deployment { tx_queue.deployments.pop() } else { tx_queue.executions.pop() }
447 },
448 )
449 .map(|(_, tx)| tx)
450 .collect_vec()
451 };
452 for transaction in transactions.into_iter() {
454 let transaction_id = transaction.id();
455 let tx_type_str = match transaction {
457 Transaction::Deploy(..) => "deployment",
458 Transaction::Execute(..) => "execution",
459 Transaction::Fee(..) => "fee",
460 };
461 trace!("Adding unconfirmed {tx_type_str} transaction '{}' to the memory pool...", fmt_id(transaction_id));
462 match self.primary_sender.send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await {
464 Ok(true) => {}
465 Ok(false) => debug!(
466 "Unable to add unconfirmed {tx_type_str} transaction '{}' to the memory pool. Already exists.",
467 fmt_id(transaction_id)
468 ),
469 Err(err) => {
470 if self.bft.is_synced() {
472 let err = err.context(format!(
473 "Unable to add unconfirmed {tx_type_str} transaction '{}' to the memory pool",
474 fmt_id(transaction_id)
475 ));
476 warn!("{}", flatten_error(err));
477 }
478 }
479 }
480 }
481 Ok(())
482 }
483}
484
485impl<N: Network> Consensus<N> {
486 fn start_handlers(&self, consensus_receiver: ConsensusReceiver<N>) {
490 let ConsensusReceiver { mut rx_consensus_subdag } = consensus_receiver;
491
492 let self_ = self.clone();
494 self.spawn(async move {
495 while let Some((committed_subdag, transmissions, callback)) = rx_consensus_subdag.recv().await {
496 self_.process_bft_subdag(committed_subdag, transmissions, callback).await;
497 }
498 });
499
500 let self_ = self.clone();
505 self.spawn(async move {
506 loop {
507 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
509 if let Err(err) = self_.process_unconfirmed_transactions().await {
511 warn!("{}", flatten_error(err.context("Cannot process unconfirmed transactions")));
512 }
513 if let Err(err) = self_.process_unconfirmed_solutions().await {
515 warn!("{}", flatten_error(err.context("Cannot process unconfirmed solutions")));
516 }
517 }
518 });
519 }
520
521 async fn process_bft_subdag(
523 &self,
524 subdag: Subdag<N>,
525 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
526 callback: oneshot::Sender<Result<()>>,
527 ) {
528 let self_ = self.clone();
530 let transmissions_ = transmissions.clone();
531 let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_).with_context(|| "Unable to advance to the next block") };
532
533 if let Err(e) = &result {
535 error!("{}", flatten_error(e));
536 self.reinsert_transmissions(transmissions).await;
538 }
539 callback.send(result).ok();
542 }
543
544 fn try_advance_to_next_block(
546 &self,
547 subdag: Subdag<N>,
548 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
549 ) -> Result<()> {
550 #[cfg(feature = "metrics")]
551 let start = subdag.leader_certificate().batch_header().timestamp();
552 #[cfg(feature = "metrics")]
553 let num_committed_certificates = subdag.values().map(|c| c.len()).sum::<usize>();
554 #[cfg(feature = "metrics")]
555 let current_block_timestamp = self.ledger.latest_block().header().metadata().timestamp();
556
557 let next_block = self.ledger.prepare_advance_to_next_quorum_block(subdag, transmissions)?;
559 self.ledger.check_next_block(&next_block)?;
561 self.ledger.advance_to_next_block(&next_block)?;
563 #[cfg(feature = "telemetry")]
564 let latest_committee = self.ledger.current_committee()?;
566
567 if next_block.height() % N::NUM_BLOCKS_PER_EPOCH == 0 {
569 self.solutions_queue.lock().clear();
571 self.bft.primary().clear_worker_solutions();
573 }
574
575 let locators = self.block_sync.get_block_locators()?;
577 self.ping.update_block_locators(locators);
578
579 self.block_sync.set_sync_height(next_block.height());
581
582 #[cfg(feature = "metrics")]
586 {
587 let now_utc = snarkos_node_bft::helpers::now_utc();
588 let elapsed = std::time::Duration::from_secs((now_utc.unix_timestamp() - start) as u64);
589 let next_block_timestamp = next_block.header().metadata().timestamp();
590 let next_block_utc = snarkos_node_bft::helpers::to_utc_datetime(next_block_timestamp);
591 let block_latency = next_block_timestamp - current_block_timestamp;
592 let block_lag = (now_utc - next_block_utc).whole_milliseconds();
593
594 let proof_target = next_block.header().proof_target();
595 let coinbase_target = next_block.header().coinbase_target();
596 let cumulative_proof_target = next_block.header().cumulative_proof_target();
597
598 metrics::add_transmission_latency_metric(&self.transmissions_tracker, &next_block);
600
601 metrics::gauge(metrics::consensus::COMMITTED_CERTIFICATES, num_committed_certificates as f64);
602 metrics::histogram(metrics::consensus::CERTIFICATE_COMMIT_LATENCY, elapsed.as_secs_f64());
603 metrics::histogram(metrics::consensus::BLOCK_LATENCY, block_latency as f64);
604 metrics::histogram(metrics::consensus::BLOCK_LAG, block_lag as f64);
605 metrics::gauge(metrics::blocks::PROOF_TARGET, proof_target as f64);
606 metrics::gauge(metrics::blocks::COINBASE_TARGET, coinbase_target as f64);
607 metrics::gauge(metrics::blocks::CUMULATIVE_PROOF_TARGET, cumulative_proof_target as f64);
608
609 #[cfg(feature = "telemetry")]
610 {
611 let participation_scores =
613 self.bft().primary().gateway().validator_telemetry().get_participation_scores(&latest_committee);
614
615 for (address, participation_score) in participation_scores {
617 metrics::histogram_label(
618 metrics::consensus::VALIDATOR_PARTICIPATION,
619 "validator_address",
620 address.to_string(),
621 participation_score,
622 )
623 }
624 }
625 }
626 Ok(())
627 }
628
629 async fn reinsert_transmissions(&self, transmissions: IndexMap<TransmissionID<N>, Transmission<N>>) {
631 for (transmission_id, transmission) in transmissions.into_iter() {
633 match self.reinsert_transmission(transmission_id, transmission).await {
635 Ok(true) => {}
636 Ok(false) => debug!(
637 "Unable to reinsert transmission {}.{} into the memory pool. Already exists.",
638 fmt_id(transmission_id),
639 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
640 ),
641 Err(err) => {
642 let err = err.context(format!(
643 "Unable to reinsert transmission {}.{} into the memory pool",
644 fmt_id(transmission_id),
645 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
646 ));
647 warn!("{}", flatten_error(err));
648 }
649 }
650 }
651 }
652
653 async fn reinsert_transmission(
660 &self,
661 transmission_id: TransmissionID<N>,
662 transmission: Transmission<N>,
663 ) -> Result<bool> {
664 let (callback, callback_receiver) = oneshot::channel();
666 match (transmission_id, transmission) {
668 (TransmissionID::Ratification, Transmission::Ratification) => return Ok(true),
669 (TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => {
670 self.primary_sender.tx_unconfirmed_solution.send((solution_id, solution, callback)).await?;
672 }
673 (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) => {
674 self.primary_sender.tx_unconfirmed_transaction.send((transaction_id, transaction, callback)).await?;
676 }
677 _ => bail!("Mismatching `(transmission_id, transmission)` pair in consensus"),
678 }
679 callback_receiver.await?
681 }
682
683 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
685 self.handles.lock().push(tokio::spawn(future));
686 }
687
688 pub async fn shut_down(&self) {
690 info!("Shutting down consensus...");
691 self.bft.shut_down().await;
693 self.handles.lock().iter().for_each(|handle| handle.abort());
695 }
696}