1#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate tracing;
20
21use snarkos_account::Account;
22use snarkos_node_bft::{
23 BFT,
24 MAX_BATCH_DELAY_IN_MS,
25 Primary,
26 helpers::{
27 ConsensusReceiver,
28 PrimarySender,
29 Storage as NarwhalStorage,
30 fmt_id,
31 init_consensus_channels,
32 init_primary_channels,
33 },
34 spawn_blocking,
35};
36use snarkos_node_bft_ledger_service::LedgerService;
37use snarkos_node_bft_storage_service::BFTPersistentStorage;
38use snarkos_node_sync::{BlockSync, Ping};
39
40use snarkvm::{
41 ledger::{
42 block::Transaction,
43 narwhal::{BatchHeader, Data, Subdag, Transmission, TransmissionID},
44 puzzle::{Solution, SolutionID},
45 },
46 prelude::*,
47};
48
49use aleo_std::StorageMode;
50use anyhow::Result;
51use colored::Colorize;
52use indexmap::IndexMap;
53#[cfg(feature = "locktick")]
54use locktick::parking_lot::Mutex;
55use lru::LruCache;
56#[cfg(not(feature = "locktick"))]
57use parking_lot::Mutex;
58use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc, time::Duration};
59use tokio::{sync::oneshot, task::JoinHandle};
60
61#[cfg(feature = "metrics")]
62use std::collections::HashMap;
63
64const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
67const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
70const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
73const MAX_DEPLOYMENTS_PER_INTERVAL: usize = 1;
76
77struct TransactionsQueue<N: Network> {
79 pub deployments: LruCache<N::TransactionID, Transaction<N>>,
80 pub executions: LruCache<N::TransactionID, Transaction<N>>,
81}
82
83impl<N: Network> Default for TransactionsQueue<N> {
84 fn default() -> Self {
85 Self {
86 deployments: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()),
87 executions: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()),
88 }
89 }
90}
91
92#[derive(Clone)]
99pub struct Consensus<N: Network> {
100 ledger: Arc<dyn LedgerService<N>>,
102 bft: BFT<N>,
104 primary_sender: PrimarySender<N>,
106 solutions_queue: Arc<Mutex<LruCache<SolutionID<N>, Solution<N>>>>,
108 transactions_queue: Arc<Mutex<TransactionsQueue<N>>>,
110 seen_solutions: Arc<Mutex<LruCache<SolutionID<N>, ()>>>,
112 seen_transactions: Arc<Mutex<LruCache<N::TransactionID, ()>>>,
114 #[cfg(feature = "metrics")]
115 transmissions_tracker: Arc<Mutex<HashMap<TransmissionID<N>, i64>>>,
116 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
118 ping: Arc<Ping<N>>,
120 block_sync: Arc<BlockSync<N>>,
122}
123
124impl<N: Network> Consensus<N> {
125 pub async fn new(
127 account: Account<N>,
128 ledger: Arc<dyn LedgerService<N>>,
129 block_sync: Arc<BlockSync<N>>,
130 ip: Option<SocketAddr>,
131 trusted_validators: &[SocketAddr],
132 storage_mode: StorageMode,
133 ping: Arc<Ping<N>>,
134 ) -> Result<Self> {
135 let (primary_sender, primary_receiver) = init_primary_channels::<N>();
137 let transmissions = Arc::new(BFTPersistentStorage::open(storage_mode.clone())?);
139 let storage = NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::<N>::MAX_GC_ROUNDS as u64);
141 let bft = BFT::new(account, storage, ledger.clone(), block_sync.clone(), ip, trusted_validators, storage_mode)?;
143 let mut _self = Self {
145 ledger,
146 bft,
147 block_sync,
148 primary_sender,
149 solutions_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
150 transactions_queue: Default::default(),
151 seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
152 seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
153 #[cfg(feature = "metrics")]
154 transmissions_tracker: Default::default(),
155 handles: Default::default(),
156 ping: ping.clone(),
157 };
158
159 info!("Starting the consensus instance...");
160
161 let (consensus_sender, consensus_receiver) = init_consensus_channels();
163 _self.start_handlers(consensus_receiver);
165 _self.bft.run(Some(ping), Some(consensus_sender), _self.primary_sender.clone(), primary_receiver).await?;
167
168 Ok(_self)
169 }
170
171 pub const fn bft(&self) -> &BFT<N> {
173 &self.bft
174 }
175}
176
177impl<N: Network> Consensus<N> {
178 pub fn num_unconfirmed_transmissions(&self) -> usize {
180 self.bft.num_unconfirmed_transmissions()
181 }
182
183 pub fn num_unconfirmed_ratifications(&self) -> usize {
185 self.bft.num_unconfirmed_ratifications()
186 }
187
188 pub fn num_unconfirmed_solutions(&self) -> usize {
190 self.bft.num_unconfirmed_solutions()
191 }
192
193 pub fn num_unconfirmed_transactions(&self) -> usize {
195 self.bft.num_unconfirmed_transactions()
196 }
197}
198
199impl<N: Network> Consensus<N> {
200 pub fn unconfirmed_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
202 self.worker_transmission_ids().chain(self.inbound_transmission_ids())
203 }
204
205 pub fn unconfirmed_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
207 self.worker_transmissions().chain(self.inbound_transmissions())
208 }
209
210 pub fn unconfirmed_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
212 self.worker_solutions().chain(self.inbound_solutions())
213 }
214
215 pub fn unconfirmed_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
217 self.worker_transactions().chain(self.inbound_transactions())
218 }
219}
220
221impl<N: Network> Consensus<N> {
222 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
224 self.bft.worker_transmission_ids()
225 }
226
227 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
229 self.bft.worker_transmissions()
230 }
231
232 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
234 self.bft.worker_solutions()
235 }
236
237 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
239 self.bft.worker_transactions()
240 }
241}
242
243impl<N: Network> Consensus<N> {
244 pub fn inbound_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
246 self.inbound_transmissions().map(|(id, _)| id)
247 }
248
249 pub fn inbound_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
251 self.inbound_transactions()
252 .map(|(id, tx)| {
253 (
254 TransmissionID::Transaction(id, tx.to_checksum::<N>().unwrap_or_default()),
255 Transmission::Transaction(tx),
256 )
257 })
258 .chain(self.inbound_solutions().map(|(id, solution)| {
259 (
260 TransmissionID::Solution(id, solution.to_checksum::<N>().unwrap_or_default()),
261 Transmission::Solution(solution),
262 )
263 }))
264 }
265
266 pub fn inbound_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
268 self.solutions_queue.lock().clone().into_iter().map(|(id, solution)| (id, Data::Object(solution)))
270 }
271
272 pub fn inbound_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
274 let tx_queue = self.transactions_queue.lock();
276 tx_queue
278 .deployments
279 .clone()
280 .into_iter()
281 .chain(tx_queue.executions.clone())
282 .map(|(id, tx)| (id, Data::Object(tx)))
283 }
284}
285
286impl<N: Network> Consensus<N> {
287 pub async fn add_unconfirmed_solution(&self, solution: Solution<N>) -> Result<()> {
290 let checksum = Data::<Solution<N>>::Buffer(solution.to_bytes_le()?.into()).to_checksum::<N>()?;
292 {
294 let solution_id = solution.id();
295
296 if self.seen_solutions.lock().put(solution_id, ()).is_some() {
298 return Ok(());
300 }
301 if self.ledger.contains_transmission(&TransmissionID::Solution(solution_id, checksum))? {
303 bail!("Solution '{}' exists in the ledger {}", fmt_id(solution_id), "(skipping)".dimmed());
304 }
305 #[cfg(feature = "metrics")]
306 {
307 metrics::increment_gauge(metrics::consensus::UNCONFIRMED_SOLUTIONS, 1f64);
308 let timestamp = snarkos_node_bft::helpers::now();
309 self.transmissions_tracker.lock().insert(TransmissionID::Solution(solution.id(), checksum), timestamp);
310 }
311 trace!("Received unconfirmed solution '{}' in the queue", fmt_id(solution_id));
313 if self.solutions_queue.lock().put(solution_id, solution).is_some() {
314 bail!("Solution '{}' exists in the memory pool", fmt_id(solution_id));
315 }
316 }
317
318 self.process_unconfirmed_solutions().await
320 }
321
322 async fn process_unconfirmed_solutions(&self) -> Result<()> {
325 let num_unconfirmed_solutions = self.num_unconfirmed_solutions();
327 let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
328 if num_unconfirmed_solutions >= N::MAX_SOLUTIONS
329 || num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE
330 {
331 return Ok(());
332 }
333 let solutions = {
335 let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions);
337 let mut queue = self.solutions_queue.lock();
339 let num_solutions = queue.len().min(capacity);
341 (0..num_solutions).filter_map(|_| queue.pop_lru().map(|(_, solution)| solution)).collect::<Vec<_>>()
343 };
344 for solution in solutions.into_iter() {
346 let solution_id = solution.id();
347 trace!("Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id));
348 if let Err(e) = self.primary_sender.send_unconfirmed_solution(solution_id, Data::Object(solution)).await {
350 if self.bft.is_synced() {
352 if self.ledger.latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
354 warn!("Failed to add unconfirmed solution '{}' to the memory pool - {e}", fmt_id(solution_id))
355 };
356 }
357 }
358 }
359 Ok(())
360 }
361
362 pub async fn add_unconfirmed_transaction(&self, transaction: Transaction<N>) -> Result<()> {
365 let checksum = Data::<Transaction<N>>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::<N>()?;
367 {
369 let transaction_id = transaction.id();
370
371 if transaction.is_fee() {
373 bail!("Transaction '{}' is a fee transaction {}", fmt_id(transaction_id), "(skipping)".dimmed());
374 }
375 if self.seen_transactions.lock().put(transaction_id, ()).is_some() {
377 return Ok(());
379 }
380 if self.ledger.contains_transmission(&TransmissionID::Transaction(transaction_id, checksum))? {
382 bail!("Transaction '{}' exists in the ledger {}", fmt_id(transaction_id), "(skipping)".dimmed());
383 }
384 #[cfg(feature = "metrics")]
385 {
386 metrics::increment_gauge(metrics::consensus::UNCONFIRMED_TRANSACTIONS, 1f64);
387 let timestamp = snarkos_node_bft::helpers::now();
388 self.transmissions_tracker
389 .lock()
390 .insert(TransmissionID::Transaction(transaction.id(), checksum), timestamp);
391 }
392 trace!("Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id));
394 if transaction.is_deploy() {
395 if self.transactions_queue.lock().deployments.put(transaction_id, transaction).is_some() {
396 bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
397 }
398 } else if self.transactions_queue.lock().executions.put(transaction_id, transaction).is_some() {
399 bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
400 }
401 }
402
403 self.process_unconfirmed_transactions().await
405 }
406
407 async fn process_unconfirmed_transactions(&self) -> Result<()> {
410 let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
412 if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE {
413 return Ok(());
414 }
415 let transactions = {
417 let capacity = Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions);
419 let mut tx_queue = self.transactions_queue.lock();
421 let num_deployments = tx_queue.deployments.len().min(capacity).min(MAX_DEPLOYMENTS_PER_INTERVAL);
423 let num_executions = tx_queue.executions.len().min(capacity.saturating_sub(num_deployments));
425 let selector_iter = (0..num_deployments).map(|_| true).interleave((0..num_executions).map(|_| false));
428 selector_iter
430 .filter_map(|select_deployment| {
431 if select_deployment {
432 tx_queue.deployments.pop_lru().map(|(_, tx)| tx)
433 } else {
434 tx_queue.executions.pop_lru().map(|(_, tx)| tx)
435 }
436 })
437 .collect_vec()
438 };
439 for transaction in transactions.into_iter() {
441 let transaction_id = transaction.id();
442 let tx_type_str = match transaction {
444 Transaction::Deploy(..) => "deployment",
445 Transaction::Execute(..) => "execution",
446 Transaction::Fee(..) => "fee",
447 };
448 trace!("Adding unconfirmed {tx_type_str} transaction '{}' to the memory pool...", fmt_id(transaction_id));
449 if let Err(e) =
451 self.primary_sender.send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await
452 {
453 if self.bft.is_synced() {
455 warn!(
456 "Failed to add unconfirmed {tx_type_str} transaction '{}' to the memory pool - {e}",
457 fmt_id(transaction_id)
458 );
459 }
460 }
461 }
462 Ok(())
463 }
464}
465
466impl<N: Network> Consensus<N> {
467 fn start_handlers(&self, consensus_receiver: ConsensusReceiver<N>) {
471 let ConsensusReceiver { mut rx_consensus_subdag } = consensus_receiver;
472
473 let self_ = self.clone();
475 self.spawn(async move {
476 while let Some((committed_subdag, transmissions, callback)) = rx_consensus_subdag.recv().await {
477 self_.process_bft_subdag(committed_subdag, transmissions, callback).await;
478 }
479 });
480
481 let self_ = self.clone();
486 self.spawn(async move {
487 loop {
488 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
490 if let Err(e) = self_.process_unconfirmed_transactions().await {
492 warn!("Cannot process unconfirmed transactions - {e}");
493 }
494 if let Err(e) = self_.process_unconfirmed_solutions().await {
496 warn!("Cannot process unconfirmed solutions - {e}");
497 }
498 }
499 });
500 }
501
502 async fn process_bft_subdag(
504 &self,
505 subdag: Subdag<N>,
506 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
507 callback: oneshot::Sender<Result<()>>,
508 ) {
509 let self_ = self.clone();
511 let transmissions_ = transmissions.clone();
512 let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_) };
513
514 if let Err(e) = &result {
516 error!("Unable to advance to the next block - {e}");
517 self.reinsert_transmissions(transmissions).await;
519 }
520 callback.send(result).ok();
523 }
524
525 fn try_advance_to_next_block(
527 &self,
528 subdag: Subdag<N>,
529 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
530 ) -> Result<()> {
531 #[cfg(feature = "metrics")]
532 let start = subdag.leader_certificate().batch_header().timestamp();
533 #[cfg(feature = "metrics")]
534 let num_committed_certificates = subdag.values().map(|c| c.len()).sum::<usize>();
535 #[cfg(feature = "metrics")]
536 let current_block_timestamp = self.ledger.latest_block().header().metadata().timestamp();
537
538 let next_block = self.ledger.prepare_advance_to_next_quorum_block(subdag, transmissions)?;
540 self.ledger.check_next_block(&next_block)?;
542 self.ledger.advance_to_next_block(&next_block)?;
544 #[cfg(feature = "telemetry")]
545 let latest_committee = self.ledger.current_committee()?;
547
548 if next_block.height() % N::NUM_BLOCKS_PER_EPOCH == 0 {
550 self.solutions_queue.lock().clear();
552 self.bft.primary().clear_worker_solutions();
554 }
555
556 let locators = self.block_sync.get_block_locators()?;
558 self.ping.update_block_locators(locators);
559
560 self.block_sync.set_sync_height(next_block.height());
562
563 #[cfg(feature = "metrics")]
567 {
568 let elapsed = std::time::Duration::from_secs((snarkos_node_bft::helpers::now() - start) as u64);
569 let next_block_timestamp = next_block.header().metadata().timestamp();
570 let block_latency = next_block_timestamp - current_block_timestamp;
571 let proof_target = next_block.header().proof_target();
572 let coinbase_target = next_block.header().coinbase_target();
573 let cumulative_proof_target = next_block.header().cumulative_proof_target();
574
575 metrics::add_transmission_latency_metric(&self.transmissions_tracker, &next_block);
576
577 metrics::gauge(metrics::consensus::COMMITTED_CERTIFICATES, num_committed_certificates as f64);
578 metrics::histogram(metrics::consensus::CERTIFICATE_COMMIT_LATENCY, elapsed.as_secs_f64());
579 metrics::histogram(metrics::consensus::BLOCK_LATENCY, block_latency as f64);
580 metrics::gauge(metrics::blocks::PROOF_TARGET, proof_target as f64);
581 metrics::gauge(metrics::blocks::COINBASE_TARGET, coinbase_target as f64);
582 metrics::gauge(metrics::blocks::CUMULATIVE_PROOF_TARGET, cumulative_proof_target as f64);
583
584 #[cfg(feature = "telemetry")]
585 {
586 let participation_scores =
588 self.bft().primary().gateway().validator_telemetry().get_participation_scores(&latest_committee);
589
590 for (address, participation_score) in participation_scores {
592 metrics::histogram_label(
593 metrics::consensus::VALIDATOR_PARTICIPATION,
594 "validator_address",
595 address.to_string(),
596 participation_score,
597 )
598 }
599 }
600 }
601 Ok(())
602 }
603
604 async fn reinsert_transmissions(&self, transmissions: IndexMap<TransmissionID<N>, Transmission<N>>) {
606 for (transmission_id, transmission) in transmissions.into_iter() {
608 if let Err(e) = self.reinsert_transmission(transmission_id, transmission).await {
610 warn!(
611 "Unable to reinsert transmission {}.{} into the memory pool - {e}",
612 fmt_id(transmission_id),
613 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
614 );
615 }
616 }
617 }
618
619 async fn reinsert_transmission(
621 &self,
622 transmission_id: TransmissionID<N>,
623 transmission: Transmission<N>,
624 ) -> Result<()> {
625 let (callback, callback_receiver) = oneshot::channel();
627 match (transmission_id, transmission) {
629 (TransmissionID::Ratification, Transmission::Ratification) => return Ok(()),
630 (TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => {
631 self.primary_sender.tx_unconfirmed_solution.send((solution_id, solution, callback)).await?;
633 }
634 (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) => {
635 self.primary_sender.tx_unconfirmed_transaction.send((transaction_id, transaction, callback)).await?;
637 }
638 _ => bail!("Mismatching `(transmission_id, transmission)` pair in consensus"),
639 }
640 callback_receiver.await?
642 }
643
644 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
646 self.handles.lock().push(tokio::spawn(future));
647 }
648
649 pub async fn shut_down(&self) {
651 info!("Shutting down consensus...");
652 self.bft.shut_down().await;
654 self.handles.lock().iter().for_each(|handle| handle.abort());
656 }
657}