1#![forbid(unsafe_code)]
17
18mod transactions_queue;
19use transactions_queue::TransactionsQueue;
20
21#[macro_use]
22extern crate tracing;
23
24#[cfg(feature = "metrics")]
25extern crate snarkos_node_metrics as metrics;
26
27use snarkos_account::Account;
28use snarkos_node_bft::{
29 BFT,
30 MAX_BATCH_DELAY_IN_MS,
31 Primary,
32 helpers::{
33 ConsensusReceiver,
34 PrimarySender,
35 Storage as NarwhalStorage,
36 fmt_id,
37 init_consensus_channels,
38 init_primary_channels,
39 },
40 spawn_blocking,
41};
42use snarkos_node_bft_ledger_service::LedgerService;
43use snarkos_node_bft_storage_service::BFTPersistentStorage;
44use snarkos_node_sync::{BlockSync, Ping};
45
46use snarkvm::{
47 ledger::{
48 block::Transaction,
49 narwhal::{BatchHeader, Data, Subdag, Transmission, TransmissionID},
50 puzzle::{Solution, SolutionID},
51 },
52 prelude::*,
53};
54
55use aleo_std::StorageMode;
56use anyhow::Result;
57use colored::Colorize;
58use indexmap::IndexMap;
59#[cfg(feature = "locktick")]
60use locktick::parking_lot::{Mutex, RwLock};
61use lru::LruCache;
62#[cfg(not(feature = "locktick"))]
63use parking_lot::{Mutex, RwLock};
64use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc, time::Duration};
65use tokio::{sync::oneshot, task::JoinHandle};
66
67#[cfg(feature = "metrics")]
68use std::collections::HashMap;
69
70const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
73const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
76const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
79const MAX_DEPLOYMENTS_PER_INTERVAL: usize = 1;
82
83#[derive(Clone)]
90pub struct Consensus<N: Network> {
91 ledger: Arc<dyn LedgerService<N>>,
93 bft: BFT<N>,
95 primary_sender: PrimarySender<N>,
97 solutions_queue: Arc<Mutex<LruCache<SolutionID<N>, Solution<N>>>>,
99 transactions_queue: Arc<RwLock<TransactionsQueue<N>>>,
101 seen_solutions: Arc<Mutex<LruCache<SolutionID<N>, ()>>>,
103 seen_transactions: Arc<Mutex<LruCache<N::TransactionID, ()>>>,
105 #[cfg(feature = "metrics")]
106 transmissions_tracker: Arc<Mutex<HashMap<TransmissionID<N>, i64>>>,
107 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
109 ping: Arc<Ping<N>>,
111 block_sync: Arc<BlockSync<N>>,
113}
114
115impl<N: Network> Consensus<N> {
116 #[allow(clippy::too_many_arguments)]
118 pub async fn new(
119 account: Account<N>,
120 ledger: Arc<dyn LedgerService<N>>,
121 block_sync: Arc<BlockSync<N>>,
122 ip: Option<SocketAddr>,
123 trusted_validators: &[SocketAddr],
124 trusted_peers_only: bool,
125 storage_mode: StorageMode,
126 ping: Arc<Ping<N>>,
127 dev: Option<u16>,
128 ) -> Result<Self> {
129 let (primary_sender, primary_receiver) = init_primary_channels::<N>();
131 let transmissions = Arc::new(BFTPersistentStorage::open(storage_mode.clone())?);
133 let storage = NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::<N>::MAX_GC_ROUNDS as u64);
135 let bft = BFT::new(
137 account,
138 storage,
139 ledger.clone(),
140 block_sync.clone(),
141 ip,
142 trusted_validators,
143 trusted_peers_only,
144 storage_mode,
145 dev,
146 )?;
147 let mut _self = Self {
149 ledger,
150 bft,
151 block_sync,
152 primary_sender,
153 solutions_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
154 transactions_queue: Default::default(),
155 seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
156 seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
157 #[cfg(feature = "metrics")]
158 transmissions_tracker: Default::default(),
159 handles: Default::default(),
160 ping: ping.clone(),
161 };
162
163 info!("Starting the consensus instance...");
164
165 let (consensus_sender, consensus_receiver) = init_consensus_channels();
167 _self.start_handlers(consensus_receiver);
169 _self.bft.run(Some(ping), Some(consensus_sender), _self.primary_sender.clone(), primary_receiver).await?;
171
172 Ok(_self)
173 }
174
175 pub const fn bft(&self) -> &BFT<N> {
177 &self.bft
178 }
179
180 pub fn contains_transaction(&self, transaction_id: &N::TransactionID) -> bool {
181 self.transactions_queue.read().contains(transaction_id)
182 }
183}
184
185impl<N: Network> Consensus<N> {
186 pub fn num_unconfirmed_transmissions(&self) -> usize {
188 self.bft.num_unconfirmed_transmissions()
189 }
190
191 pub fn num_unconfirmed_ratifications(&self) -> usize {
193 self.bft.num_unconfirmed_ratifications()
194 }
195
196 pub fn num_unconfirmed_solutions(&self) -> usize {
198 self.bft.num_unconfirmed_solutions()
199 }
200
201 pub fn num_unconfirmed_transactions(&self) -> usize {
203 self.bft.num_unconfirmed_transactions()
204 }
205}
206
207impl<N: Network> Consensus<N> {
208 pub fn unconfirmed_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
210 self.worker_transmission_ids().chain(self.inbound_transmission_ids())
211 }
212
213 pub fn unconfirmed_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
215 self.worker_transmissions().chain(self.inbound_transmissions())
216 }
217
218 pub fn unconfirmed_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
220 self.worker_solutions().chain(self.inbound_solutions())
221 }
222
223 pub fn unconfirmed_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
225 self.worker_transactions().chain(self.inbound_transactions())
226 }
227}
228
229impl<N: Network> Consensus<N> {
230 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
232 self.bft.worker_transmission_ids()
233 }
234
235 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
237 self.bft.worker_transmissions()
238 }
239
240 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
242 self.bft.worker_solutions()
243 }
244
245 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
247 self.bft.worker_transactions()
248 }
249}
250
251impl<N: Network> Consensus<N> {
252 pub fn inbound_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
254 self.inbound_transmissions().map(|(id, _)| id)
255 }
256
257 pub fn inbound_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
259 self.inbound_transactions()
260 .map(|(id, tx)| {
261 (
262 TransmissionID::Transaction(id, tx.to_checksum::<N>().unwrap_or_default()),
263 Transmission::Transaction(tx),
264 )
265 })
266 .chain(self.inbound_solutions().map(|(id, solution)| {
267 (
268 TransmissionID::Solution(id, solution.to_checksum::<N>().unwrap_or_default()),
269 Transmission::Solution(solution),
270 )
271 }))
272 }
273
274 pub fn inbound_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
276 self.solutions_queue.lock().clone().into_iter().map(|(id, solution)| (id, Data::Object(solution)))
278 }
279
280 pub fn inbound_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
282 self.transactions_queue.read().transactions().map(|(id, tx)| (id, Data::Object(tx)))
284 }
285}
286
287impl<N: Network> Consensus<N> {
288 pub async fn add_unconfirmed_solution(&self, solution: Solution<N>) -> Result<()> {
291 let checksum = Data::<Solution<N>>::Buffer(solution.to_bytes_le()?.into()).to_checksum::<N>()?;
293 {
295 let solution_id = solution.id();
296
297 if self.seen_solutions.lock().put(solution_id, ()).is_some() {
299 return Ok(());
301 }
302 if self.ledger.contains_transmission(&TransmissionID::Solution(solution_id, checksum))? {
304 bail!("Solution '{}' exists in the ledger {}", fmt_id(solution_id), "(skipping)".dimmed());
305 }
306 #[cfg(feature = "metrics")]
307 {
308 metrics::increment_gauge(metrics::consensus::UNCONFIRMED_SOLUTIONS, 1f64);
309 let timestamp = snarkos_node_bft::helpers::now();
310 self.transmissions_tracker.lock().insert(TransmissionID::Solution(solution.id(), checksum), timestamp);
311 }
312 trace!("Received unconfirmed solution '{}' in the queue", fmt_id(solution_id));
314 if self.solutions_queue.lock().put(solution_id, solution).is_some() {
315 bail!("Solution '{}' exists in the memory pool", fmt_id(solution_id));
316 }
317 }
318
319 self.process_unconfirmed_solutions().await
321 }
322
323 async fn process_unconfirmed_solutions(&self) -> Result<()> {
326 let num_unconfirmed_solutions = self.num_unconfirmed_solutions();
328 let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
329 if num_unconfirmed_solutions >= N::MAX_SOLUTIONS
330 || num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE
331 {
332 return Ok(());
333 }
334 let solutions = {
336 let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions);
338 let mut queue = self.solutions_queue.lock();
340 let num_solutions = queue.len().min(capacity);
342 (0..num_solutions).filter_map(|_| queue.pop_lru().map(|(_, solution)| solution)).collect::<Vec<_>>()
344 };
345 for solution in solutions.into_iter() {
347 let solution_id = solution.id();
348 trace!("Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id));
349 if let Err(e) = self.primary_sender.send_unconfirmed_solution(solution_id, Data::Object(solution)).await {
351 if self.bft.is_synced() {
353 if self.ledger.latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
355 warn!("Failed to add unconfirmed solution '{}' to the memory pool - {e}", fmt_id(solution_id))
356 };
357 }
358 }
359 }
360 Ok(())
361 }
362
363 pub async fn add_unconfirmed_transaction(&self, transaction: Transaction<N>) -> Result<()> {
366 let checksum = Data::<Transaction<N>>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::<N>()?;
368 {
370 let transaction_id = transaction.id();
371
372 if transaction.is_fee() {
374 bail!("Transaction '{}' is a fee transaction {}", fmt_id(transaction_id), "(skipping)".dimmed());
375 }
376 if self.seen_transactions.lock().put(transaction_id, ()).is_some() {
378 return Ok(());
380 }
381 if self.ledger.contains_transmission(&TransmissionID::Transaction(transaction_id, checksum))? {
383 bail!("Transaction '{}' exists in the ledger {}", fmt_id(transaction_id), "(skipping)".dimmed());
384 }
385 if self.contains_transaction(&transaction_id) {
387 bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
388 }
389 #[cfg(feature = "metrics")]
390 {
391 metrics::increment_gauge(metrics::consensus::UNCONFIRMED_TRANSACTIONS, 1f64);
392 let timestamp = snarkos_node_bft::helpers::now();
393 self.transmissions_tracker
394 .lock()
395 .insert(TransmissionID::Transaction(transaction.id(), checksum), timestamp);
396 }
397 trace!("Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id));
399 let priority_fee = transaction.priority_fee_amount()?;
400 self.transactions_queue.write().insert(transaction_id, transaction, priority_fee)?;
401 }
402
403 self.process_unconfirmed_transactions().await
405 }
406
407 async fn process_unconfirmed_transactions(&self) -> Result<()> {
410 let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
412 if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE {
413 return Ok(());
414 }
415 let transactions = {
417 let capacity = Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions);
419 let mut tx_queue = self.transactions_queue.write();
421 let num_deployments = tx_queue.deployments.len().min(capacity).min(MAX_DEPLOYMENTS_PER_INTERVAL);
423 let num_executions = tx_queue.executions.len().min(capacity.saturating_sub(num_deployments));
425 let selector_iter = (0..num_deployments).map(|_| true).interleave((0..num_executions).map(|_| false));
428 selector_iter
430 .filter_map(
431 |select_deployment| {
432 if select_deployment { tx_queue.deployments.pop() } else { tx_queue.executions.pop() }
433 },
434 )
435 .map(|(_, tx)| tx)
436 .collect_vec()
437 };
438 for transaction in transactions.into_iter() {
440 let transaction_id = transaction.id();
441 let tx_type_str = match transaction {
443 Transaction::Deploy(..) => "deployment",
444 Transaction::Execute(..) => "execution",
445 Transaction::Fee(..) => "fee",
446 };
447 trace!("Adding unconfirmed {tx_type_str} transaction '{}' to the memory pool...", fmt_id(transaction_id));
448 if let Err(e) =
450 self.primary_sender.send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await
451 {
452 if self.bft.is_synced() {
454 warn!(
455 "Failed to add unconfirmed {tx_type_str} transaction '{}' to the memory pool - {e}",
456 fmt_id(transaction_id)
457 );
458 }
459 }
460 }
461 Ok(())
462 }
463}
464
465impl<N: Network> Consensus<N> {
466 fn start_handlers(&self, consensus_receiver: ConsensusReceiver<N>) {
470 let ConsensusReceiver { mut rx_consensus_subdag } = consensus_receiver;
471
472 let self_ = self.clone();
474 self.spawn(async move {
475 while let Some((committed_subdag, transmissions, callback)) = rx_consensus_subdag.recv().await {
476 self_.process_bft_subdag(committed_subdag, transmissions, callback).await;
477 }
478 });
479
480 let self_ = self.clone();
485 self.spawn(async move {
486 loop {
487 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
489 if let Err(e) = self_.process_unconfirmed_transactions().await {
491 warn!("Cannot process unconfirmed transactions - {e}");
492 }
493 if let Err(e) = self_.process_unconfirmed_solutions().await {
495 warn!("Cannot process unconfirmed solutions - {e}");
496 }
497 }
498 });
499 }
500
501 async fn process_bft_subdag(
503 &self,
504 subdag: Subdag<N>,
505 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
506 callback: oneshot::Sender<Result<()>>,
507 ) {
508 let self_ = self.clone();
510 let transmissions_ = transmissions.clone();
511 let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_) };
512
513 if let Err(e) = &result {
515 error!("Unable to advance to the next block - {e}");
516 self.reinsert_transmissions(transmissions).await;
518 }
519 callback.send(result).ok();
522 }
523
524 fn try_advance_to_next_block(
526 &self,
527 subdag: Subdag<N>,
528 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
529 ) -> Result<()> {
530 #[cfg(feature = "metrics")]
531 let start = subdag.leader_certificate().batch_header().timestamp();
532 #[cfg(feature = "metrics")]
533 let num_committed_certificates = subdag.values().map(|c| c.len()).sum::<usize>();
534 #[cfg(feature = "metrics")]
535 let current_block_timestamp = self.ledger.latest_block().header().metadata().timestamp();
536
537 let next_block = self.ledger.prepare_advance_to_next_quorum_block(subdag, transmissions)?;
539 self.ledger.check_next_block(&next_block)?;
541 self.ledger.advance_to_next_block(&next_block)?;
543 #[cfg(feature = "telemetry")]
544 let latest_committee = self.ledger.current_committee()?;
546
547 if next_block.height() % N::NUM_BLOCKS_PER_EPOCH == 0 {
549 self.solutions_queue.lock().clear();
551 self.bft.primary().clear_worker_solutions();
553 }
554
555 let locators = self.block_sync.get_block_locators()?;
557 self.ping.update_block_locators(locators);
558
559 self.block_sync.set_sync_height(next_block.height());
561
562 #[cfg(feature = "metrics")]
566 {
567 let now_utc = snarkos_node_bft::helpers::now_utc();
568 let elapsed = std::time::Duration::from_secs((now_utc.unix_timestamp() - start) as u64);
569 let next_block_timestamp = next_block.header().metadata().timestamp();
570 let next_block_utc = snarkos_node_bft::helpers::to_utc_datetime(next_block_timestamp);
571 let block_latency = next_block_timestamp - current_block_timestamp;
572 let block_lag = (now_utc - next_block_utc).whole_milliseconds();
573
574 let proof_target = next_block.header().proof_target();
575 let coinbase_target = next_block.header().coinbase_target();
576 let cumulative_proof_target = next_block.header().cumulative_proof_target();
577
578 metrics::add_transmission_latency_metric(&self.transmissions_tracker, &next_block);
580
581 metrics::gauge(metrics::consensus::COMMITTED_CERTIFICATES, num_committed_certificates as f64);
582 metrics::histogram(metrics::consensus::CERTIFICATE_COMMIT_LATENCY, elapsed.as_secs_f64());
583 metrics::histogram(metrics::consensus::BLOCK_LATENCY, block_latency as f64);
584 metrics::histogram(metrics::consensus::BLOCK_LAG, block_lag as f64);
585 metrics::gauge(metrics::blocks::PROOF_TARGET, proof_target as f64);
586 metrics::gauge(metrics::blocks::COINBASE_TARGET, coinbase_target as f64);
587 metrics::gauge(metrics::blocks::CUMULATIVE_PROOF_TARGET, cumulative_proof_target as f64);
588
589 #[cfg(feature = "telemetry")]
590 {
591 let participation_scores =
593 self.bft().primary().gateway().validator_telemetry().get_participation_scores(&latest_committee);
594
595 for (address, participation_score) in participation_scores {
597 metrics::histogram_label(
598 metrics::consensus::VALIDATOR_PARTICIPATION,
599 "validator_address",
600 address.to_string(),
601 participation_score,
602 )
603 }
604 }
605 }
606 Ok(())
607 }
608
609 async fn reinsert_transmissions(&self, transmissions: IndexMap<TransmissionID<N>, Transmission<N>>) {
611 for (transmission_id, transmission) in transmissions.into_iter() {
613 if let Err(e) = self.reinsert_transmission(transmission_id, transmission).await {
615 warn!(
616 "Unable to reinsert transmission {}.{} into the memory pool - {e}",
617 fmt_id(transmission_id),
618 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
619 );
620 }
621 }
622 }
623
624 async fn reinsert_transmission(
626 &self,
627 transmission_id: TransmissionID<N>,
628 transmission: Transmission<N>,
629 ) -> Result<()> {
630 let (callback, callback_receiver) = oneshot::channel();
632 match (transmission_id, transmission) {
634 (TransmissionID::Ratification, Transmission::Ratification) => return Ok(()),
635 (TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => {
636 self.primary_sender.tx_unconfirmed_solution.send((solution_id, solution, callback)).await?;
638 }
639 (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) => {
640 self.primary_sender.tx_unconfirmed_transaction.send((transaction_id, transaction, callback)).await?;
642 }
643 _ => bail!("Mismatching `(transmission_id, transmission)` pair in consensus"),
644 }
645 callback_receiver.await?
647 }
648
649 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
651 self.handles.lock().push(tokio::spawn(future));
652 }
653
654 pub async fn shut_down(&self) {
656 info!("Shutting down consensus...");
657 self.bft.shut_down().await;
659 self.handles.lock().iter().for_each(|handle| handle.abort());
661 }
662}