1#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate tracing;
20
21use snarkos_account::Account;
22use snarkos_node_bft::{
23 BFT,
24 MAX_BATCH_DELAY_IN_MS,
25 Primary,
26 helpers::{
27 ConsensusReceiver,
28 PrimaryReceiver,
29 PrimarySender,
30 Storage as NarwhalStorage,
31 fmt_id,
32 init_consensus_channels,
33 },
34 spawn_blocking,
35};
36use snarkos_node_bft_ledger_service::LedgerService;
37use snarkos_node_bft_storage_service::BFTPersistentStorage;
38use snarkvm::{
39 ledger::{
40 block::Transaction,
41 narwhal::{BatchHeader, Data, Subdag, Transmission, TransmissionID},
42 puzzle::{Solution, SolutionID},
43 },
44 prelude::*,
45};
46
47use aleo_std::StorageMode;
48use anyhow::Result;
49use colored::Colorize;
50use indexmap::IndexMap;
51#[cfg(feature = "locktick")]
52use locktick::parking_lot::Mutex;
53use lru::LruCache;
54#[cfg(not(feature = "locktick"))]
55use parking_lot::Mutex;
56use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc, time::Duration};
57use tokio::{
58 sync::{OnceCell, oneshot},
59 task::JoinHandle,
60};
61
62#[cfg(feature = "metrics")]
63use std::collections::HashMap;
64
65const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
68const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
71const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
74const MAX_DEPLOYMENTS_PER_INTERVAL: usize = 1;
77
78struct TransactionsQueue<N: Network> {
80 pub deployments: LruCache<N::TransactionID, Transaction<N>>,
81 pub executions: LruCache<N::TransactionID, Transaction<N>>,
82}
83
84impl<N: Network> Default for TransactionsQueue<N> {
85 fn default() -> Self {
86 Self {
87 deployments: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()),
88 executions: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()),
89 }
90 }
91}
92
93#[derive(Clone)]
94pub struct Consensus<N: Network> {
95 ledger: Arc<dyn LedgerService<N>>,
97 bft: BFT<N>,
99 primary_sender: Arc<OnceCell<PrimarySender<N>>>,
101 solutions_queue: Arc<Mutex<LruCache<SolutionID<N>, Solution<N>>>>,
103 transactions_queue: Arc<Mutex<TransactionsQueue<N>>>,
105 seen_solutions: Arc<Mutex<LruCache<SolutionID<N>, ()>>>,
107 seen_transactions: Arc<Mutex<LruCache<N::TransactionID, ()>>>,
109 #[cfg(feature = "metrics")]
110 transmissions_queue_timestamps: Arc<Mutex<HashMap<TransmissionID<N>, i64>>>,
111 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
113}
114
115impl<N: Network> Consensus<N> {
116 pub fn new(
118 account: Account<N>,
119 ledger: Arc<dyn LedgerService<N>>,
120 ip: Option<SocketAddr>,
121 trusted_validators: &[SocketAddr],
122 storage_mode: StorageMode,
123 ) -> Result<Self> {
124 let transmissions = Arc::new(BFTPersistentStorage::open(storage_mode.clone())?);
126 let storage = NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::<N>::MAX_GC_ROUNDS as u64);
128 let bft = BFT::new(account, storage, ledger.clone(), ip, trusted_validators, storage_mode)?;
130 Ok(Self {
132 ledger,
133 bft,
134 primary_sender: Default::default(),
135 solutions_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
136 transactions_queue: Default::default(),
137 seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
138 seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
139 #[cfg(feature = "metrics")]
140 transmissions_queue_timestamps: Default::default(),
141 handles: Default::default(),
142 })
143 }
144
145 pub async fn run(&mut self, primary_sender: PrimarySender<N>, primary_receiver: PrimaryReceiver<N>) -> Result<()> {
147 info!("Starting the consensus instance...");
148 self.primary_sender.set(primary_sender.clone()).expect("Primary sender already set");
150
151 let (consensus_sender, consensus_receiver) = init_consensus_channels();
153 self.start_handlers(consensus_receiver);
155 self.bft.run(Some(consensus_sender), primary_sender, primary_receiver).await?;
157 Ok(())
158 }
159
160 pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
162 &self.ledger
163 }
164
165 pub const fn bft(&self) -> &BFT<N> {
167 &self.bft
168 }
169
170 pub fn primary_sender(&self) -> &PrimarySender<N> {
172 self.primary_sender.get().expect("Primary sender not set")
173 }
174}
175
176impl<N: Network> Consensus<N> {
177 pub fn num_unconfirmed_transmissions(&self) -> usize {
179 self.bft.num_unconfirmed_transmissions()
180 }
181
182 pub fn num_unconfirmed_ratifications(&self) -> usize {
184 self.bft.num_unconfirmed_ratifications()
185 }
186
187 pub fn num_unconfirmed_solutions(&self) -> usize {
189 self.bft.num_unconfirmed_solutions()
190 }
191
192 pub fn num_unconfirmed_transactions(&self) -> usize {
194 self.bft.num_unconfirmed_transactions()
195 }
196}
197
198impl<N: Network> Consensus<N> {
199 pub fn unconfirmed_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
201 self.worker_transmission_ids().chain(self.inbound_transmission_ids())
202 }
203
204 pub fn unconfirmed_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
206 self.worker_transmissions().chain(self.inbound_transmissions())
207 }
208
209 pub fn unconfirmed_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
211 self.worker_solutions().chain(self.inbound_solutions())
212 }
213
214 pub fn unconfirmed_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
216 self.worker_transactions().chain(self.inbound_transactions())
217 }
218}
219
220impl<N: Network> Consensus<N> {
221 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
223 self.bft.worker_transmission_ids()
224 }
225
226 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
228 self.bft.worker_transmissions()
229 }
230
231 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
233 self.bft.worker_solutions()
234 }
235
236 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
238 self.bft.worker_transactions()
239 }
240}
241
242impl<N: Network> Consensus<N> {
243 pub fn inbound_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
245 self.inbound_transmissions().map(|(id, _)| id)
246 }
247
248 pub fn inbound_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
250 self.inbound_transactions()
251 .map(|(id, tx)| {
252 (
253 TransmissionID::Transaction(id, tx.to_checksum::<N>().unwrap_or_default()),
254 Transmission::Transaction(tx),
255 )
256 })
257 .chain(self.inbound_solutions().map(|(id, solution)| {
258 (
259 TransmissionID::Solution(id, solution.to_checksum::<N>().unwrap_or_default()),
260 Transmission::Solution(solution),
261 )
262 }))
263 }
264
265 pub fn inbound_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
267 self.solutions_queue.lock().clone().into_iter().map(|(id, solution)| (id, Data::Object(solution)))
269 }
270
271 pub fn inbound_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
273 let tx_queue = self.transactions_queue.lock();
275 tx_queue
277 .deployments
278 .clone()
279 .into_iter()
280 .chain(tx_queue.executions.clone())
281 .map(|(id, tx)| (id, Data::Object(tx)))
282 }
283}
284
285impl<N: Network> Consensus<N> {
286 pub async fn add_unconfirmed_solution(&self, solution: Solution<N>) -> Result<()> {
288 let checksum = Data::<Solution<N>>::Buffer(solution.to_bytes_le()?.into()).to_checksum::<N>()?;
290 #[cfg(feature = "metrics")]
291 {
292 metrics::increment_gauge(metrics::consensus::UNCONFIRMED_SOLUTIONS, 1f64);
293 let timestamp = snarkos_node_bft::helpers::now();
294 self.transmissions_queue_timestamps
295 .lock()
296 .insert(TransmissionID::Solution(solution.id(), checksum), timestamp);
297 }
298 {
300 let solution_id = solution.id();
301
302 if self.seen_solutions.lock().put(solution_id, ()).is_some() {
304 return Ok(());
306 }
307 if self.ledger.contains_transmission(&TransmissionID::Solution(solution_id, checksum))? {
309 bail!("Solution '{}' exists in the ledger {}", fmt_id(solution_id), "(skipping)".dimmed());
310 }
311 trace!("Received unconfirmed solution '{}' in the queue", fmt_id(solution_id));
313 if self.solutions_queue.lock().put(solution_id, solution).is_some() {
314 bail!("Solution '{}' exists in the memory pool", fmt_id(solution_id));
315 }
316 }
317
318 self.process_unconfirmed_solutions().await
320 }
321
322 pub async fn process_unconfirmed_solutions(&self) -> Result<()> {
324 let num_unconfirmed_solutions = self.num_unconfirmed_solutions();
326 let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
327 if num_unconfirmed_solutions >= N::MAX_SOLUTIONS
328 || num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE
329 {
330 return Ok(());
331 }
332 let solutions = {
334 let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions);
336 let mut queue = self.solutions_queue.lock();
338 let num_solutions = queue.len().min(capacity);
340 (0..num_solutions).filter_map(|_| queue.pop_lru().map(|(_, solution)| solution)).collect::<Vec<_>>()
342 };
343 for solution in solutions.into_iter() {
345 let solution_id = solution.id();
346 trace!("Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id));
347 if let Err(e) = self.primary_sender().send_unconfirmed_solution(solution_id, Data::Object(solution)).await {
349 if self.bft.is_synced() {
351 if self.ledger().latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
353 warn!("Failed to add unconfirmed solution '{}' to the memory pool - {e}", fmt_id(solution_id))
354 };
355 }
356 }
357 }
358 Ok(())
359 }
360
361 pub async fn add_unconfirmed_transaction(&self, transaction: Transaction<N>) -> Result<()> {
363 let checksum = Data::<Transaction<N>>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::<N>()?;
365 #[cfg(feature = "metrics")]
366 {
367 metrics::increment_gauge(metrics::consensus::UNCONFIRMED_TRANSACTIONS, 1f64);
368 let timestamp = snarkos_node_bft::helpers::now();
369 self.transmissions_queue_timestamps
370 .lock()
371 .insert(TransmissionID::Transaction(transaction.id(), checksum), timestamp);
372 }
373 {
375 let transaction_id = transaction.id();
376
377 if transaction.is_fee() {
379 bail!("Transaction '{}' is a fee transaction {}", fmt_id(transaction_id), "(skipping)".dimmed());
380 }
381 if self.seen_transactions.lock().put(transaction_id, ()).is_some() {
383 return Ok(());
385 }
386 if self.ledger.contains_transmission(&TransmissionID::Transaction(transaction_id, checksum))? {
388 bail!("Transaction '{}' exists in the ledger {}", fmt_id(transaction_id), "(skipping)".dimmed());
389 }
390 trace!("Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id));
392 if transaction.is_deploy() {
393 if self.transactions_queue.lock().deployments.put(transaction_id, transaction).is_some() {
394 bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
395 }
396 } else if self.transactions_queue.lock().executions.put(transaction_id, transaction).is_some() {
397 bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
398 }
399
400 self.process_unconfirmed_transactions().await
402 }
403 }
404
405 pub async fn process_unconfirmed_transactions(&self) -> Result<()> {
407 let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
409 if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE {
410 return Ok(());
411 }
412 let transactions = {
414 let capacity = Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions);
416 let mut tx_queue = self.transactions_queue.lock();
418 let num_deployments = tx_queue.deployments.len().min(capacity).min(MAX_DEPLOYMENTS_PER_INTERVAL);
420 let num_executions = tx_queue.executions.len().min(capacity.saturating_sub(num_deployments));
422 let selector_iter = (0..num_deployments).map(|_| true).interleave((0..num_executions).map(|_| false));
425 selector_iter
427 .filter_map(|select_deployment| {
428 if select_deployment {
429 tx_queue.deployments.pop_lru().map(|(_, tx)| tx)
430 } else {
431 tx_queue.executions.pop_lru().map(|(_, tx)| tx)
432 }
433 })
434 .collect_vec()
435 };
436 for transaction in transactions.into_iter() {
438 let transaction_id = transaction.id();
439 trace!("Adding unconfirmed transaction '{}' to the memory pool...", fmt_id(transaction_id));
440 if let Err(e) =
442 self.primary_sender().send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await
443 {
444 if self.bft.is_synced() {
446 warn!(
447 "Failed to add unconfirmed transaction '{}' to the memory pool - {e}",
448 fmt_id(transaction_id)
449 );
450 }
451 }
452 }
453 Ok(())
454 }
455}
456
457impl<N: Network> Consensus<N> {
458 fn start_handlers(&self, consensus_receiver: ConsensusReceiver<N>) {
460 let ConsensusReceiver { mut rx_consensus_subdag } = consensus_receiver;
461
462 let self_ = self.clone();
464 self.spawn(async move {
465 while let Some((committed_subdag, transmissions, callback)) = rx_consensus_subdag.recv().await {
466 self_.process_bft_subdag(committed_subdag, transmissions, callback).await;
467 }
468 });
469
470 let self_ = self.clone();
472 self.spawn(async move {
473 loop {
474 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
476 if let Err(e) = self_.process_unconfirmed_transactions().await {
478 warn!("Cannot process unconfirmed transactions - {e}");
479 }
480 if let Err(e) = self_.process_unconfirmed_solutions().await {
482 warn!("Cannot process unconfirmed solutions - {e}");
483 }
484 }
485 });
486 }
487
488 async fn process_bft_subdag(
490 &self,
491 subdag: Subdag<N>,
492 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
493 callback: oneshot::Sender<Result<()>>,
494 ) {
495 let self_ = self.clone();
497 let transmissions_ = transmissions.clone();
498 let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_) };
499
500 if let Err(e) = &result {
502 error!("Unable to advance to the next block - {e}");
503 self.reinsert_transmissions(transmissions).await;
505 }
506 callback.send(result).ok();
509 }
510
511 fn try_advance_to_next_block(
513 &self,
514 subdag: Subdag<N>,
515 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
516 ) -> Result<()> {
517 #[cfg(feature = "metrics")]
518 let start = subdag.leader_certificate().batch_header().timestamp();
519 #[cfg(feature = "metrics")]
520 let num_committed_certificates = subdag.values().map(|c| c.len()).sum::<usize>();
521 #[cfg(feature = "metrics")]
522 let current_block_timestamp = self.ledger.latest_block().header().metadata().timestamp();
523
524 let next_block = self.ledger.prepare_advance_to_next_quorum_block(subdag, transmissions)?;
526 self.ledger.check_next_block(&next_block)?;
528 self.ledger.advance_to_next_block(&next_block)?;
530
531 if next_block.height() % N::NUM_BLOCKS_PER_EPOCH == 0 {
533 self.solutions_queue.lock().clear();
535 self.bft.primary().clear_worker_solutions();
537 }
538
539 #[cfg(feature = "metrics")]
540 {
541 let elapsed = std::time::Duration::from_secs((snarkos_node_bft::helpers::now() - start) as u64);
542 let next_block_timestamp = next_block.header().metadata().timestamp();
543 let block_latency = next_block_timestamp - current_block_timestamp;
544 let proof_target = next_block.header().proof_target();
545 let coinbase_target = next_block.header().coinbase_target();
546 let cumulative_proof_target = next_block.header().cumulative_proof_target();
547
548 metrics::add_transmission_latency_metric(&self.transmissions_queue_timestamps, &next_block);
549
550 metrics::gauge(metrics::consensus::COMMITTED_CERTIFICATES, num_committed_certificates as f64);
551 metrics::histogram(metrics::consensus::CERTIFICATE_COMMIT_LATENCY, elapsed.as_secs_f64());
552 metrics::histogram(metrics::consensus::BLOCK_LATENCY, block_latency as f64);
553 metrics::gauge(metrics::blocks::PROOF_TARGET, proof_target as f64);
554 metrics::gauge(metrics::blocks::COINBASE_TARGET, coinbase_target as f64);
555 metrics::gauge(metrics::blocks::CUMULATIVE_PROOF_TARGET, cumulative_proof_target as f64);
556 }
557 Ok(())
558 }
559
560 async fn reinsert_transmissions(&self, transmissions: IndexMap<TransmissionID<N>, Transmission<N>>) {
562 for (transmission_id, transmission) in transmissions.into_iter() {
564 if let Err(e) = self.reinsert_transmission(transmission_id, transmission).await {
566 warn!(
567 "Unable to reinsert transmission {}.{} into the memory pool - {e}",
568 fmt_id(transmission_id),
569 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
570 );
571 }
572 }
573 }
574
575 async fn reinsert_transmission(
577 &self,
578 transmission_id: TransmissionID<N>,
579 transmission: Transmission<N>,
580 ) -> Result<()> {
581 let (callback, callback_receiver) = oneshot::channel();
583 match (transmission_id, transmission) {
585 (TransmissionID::Ratification, Transmission::Ratification) => return Ok(()),
586 (TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => {
587 self.primary_sender().tx_unconfirmed_solution.send((solution_id, solution, callback)).await?;
589 }
590 (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) => {
591 self.primary_sender().tx_unconfirmed_transaction.send((transaction_id, transaction, callback)).await?;
593 }
594 _ => bail!("Mismatching `(transmission_id, transmission)` pair in consensus"),
595 }
596 callback_receiver.await?
598 }
599
600 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
602 self.handles.lock().push(tokio::spawn(future));
603 }
604
605 pub async fn shut_down(&self) {
607 info!("Shutting down consensus...");
608 self.bft.shut_down().await;
610 self.handles.lock().iter().for_each(|handle| handle.abort());
612 }
613}