1#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate tracing;
20
21use snarkos_account::Account;
22use snarkos_node_bft::{
23 BFT,
24 MAX_BATCH_DELAY_IN_MS,
25 Primary,
26 helpers::{
27 ConsensusReceiver,
28 PrimaryReceiver,
29 PrimarySender,
30 Storage as NarwhalStorage,
31 fmt_id,
32 init_consensus_channels,
33 },
34 spawn_blocking,
35};
36use snarkos_node_bft_ledger_service::LedgerService;
37use snarkos_node_bft_storage_service::BFTPersistentStorage;
38use snarkvm::{
39 ledger::{
40 block::Transaction,
41 narwhal::{BatchHeader, Data, Subdag, Transmission, TransmissionID},
42 puzzle::{Solution, SolutionID},
43 },
44 prelude::*,
45};
46
47use aleo_std::StorageMode;
48use anyhow::Result;
49use colored::Colorize;
50use indexmap::IndexMap;
51use lru::LruCache;
52use parking_lot::Mutex;
53use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc, time::Duration};
54use tokio::{
55 sync::{OnceCell, oneshot},
56 task::JoinHandle,
57};
58
59#[cfg(feature = "metrics")]
60use std::collections::HashMap;
61
62const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
65const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
68const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
71const MAX_DEPLOYMENTS_PER_INTERVAL: usize = 1;
74
75struct TransactionsQueue<N: Network> {
77 pub deployments: LruCache<N::TransactionID, Transaction<N>>,
78 pub executions: LruCache<N::TransactionID, Transaction<N>>,
79}
80
81impl<N: Network> Default for TransactionsQueue<N> {
82 fn default() -> Self {
83 Self {
84 deployments: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()),
85 executions: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()),
86 }
87 }
88}
89
90#[derive(Clone)]
91pub struct Consensus<N: Network> {
92 ledger: Arc<dyn LedgerService<N>>,
94 bft: BFT<N>,
96 primary_sender: Arc<OnceCell<PrimarySender<N>>>,
98 solutions_queue: Arc<Mutex<LruCache<SolutionID<N>, Solution<N>>>>,
100 transactions_queue: Arc<Mutex<TransactionsQueue<N>>>,
102 seen_solutions: Arc<Mutex<LruCache<SolutionID<N>, ()>>>,
104 seen_transactions: Arc<Mutex<LruCache<N::TransactionID, ()>>>,
106 #[cfg(feature = "metrics")]
107 transmissions_queue_timestamps: Arc<Mutex<HashMap<TransmissionID<N>, i64>>>,
108 handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
110}
111
112impl<N: Network> Consensus<N> {
113 pub fn new(
115 account: Account<N>,
116 ledger: Arc<dyn LedgerService<N>>,
117 ip: Option<SocketAddr>,
118 trusted_validators: &[SocketAddr],
119 storage_mode: StorageMode,
120 ) -> Result<Self> {
121 let dev = match storage_mode {
123 StorageMode::Development(id) => Some(id),
124 StorageMode::Production | StorageMode::Custom(..) => None,
125 };
126 let transmissions = Arc::new(BFTPersistentStorage::open(storage_mode)?);
128 let storage = NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::<N>::MAX_GC_ROUNDS as u64);
130 let bft = BFT::new(account, storage, ledger.clone(), ip, trusted_validators, dev)?;
132 Ok(Self {
134 ledger,
135 bft,
136 primary_sender: Default::default(),
137 solutions_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
138 transactions_queue: Default::default(),
139 seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
140 seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
141 #[cfg(feature = "metrics")]
142 transmissions_queue_timestamps: Default::default(),
143 handles: Default::default(),
144 })
145 }
146
147 pub async fn run(&mut self, primary_sender: PrimarySender<N>, primary_receiver: PrimaryReceiver<N>) -> Result<()> {
149 info!("Starting the consensus instance...");
150 self.primary_sender.set(primary_sender.clone()).expect("Primary sender already set");
152
153 let (consensus_sender, consensus_receiver) = init_consensus_channels();
155 self.start_handlers(consensus_receiver);
157 self.bft.run(Some(consensus_sender), primary_sender, primary_receiver).await?;
159 Ok(())
160 }
161
162 pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
164 &self.ledger
165 }
166
167 pub const fn bft(&self) -> &BFT<N> {
169 &self.bft
170 }
171
172 pub fn primary_sender(&self) -> &PrimarySender<N> {
174 self.primary_sender.get().expect("Primary sender not set")
175 }
176}
177
178impl<N: Network> Consensus<N> {
179 pub fn num_unconfirmed_transmissions(&self) -> usize {
181 self.bft.num_unconfirmed_transmissions()
182 }
183
184 pub fn num_unconfirmed_ratifications(&self) -> usize {
186 self.bft.num_unconfirmed_ratifications()
187 }
188
189 pub fn num_unconfirmed_solutions(&self) -> usize {
191 self.bft.num_unconfirmed_solutions()
192 }
193
194 pub fn num_unconfirmed_transactions(&self) -> usize {
196 self.bft.num_unconfirmed_transactions()
197 }
198}
199
200impl<N: Network> Consensus<N> {
201 pub fn unconfirmed_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
203 self.worker_transmission_ids().chain(self.inbound_transmission_ids())
204 }
205
206 pub fn unconfirmed_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
208 self.worker_transmissions().chain(self.inbound_transmissions())
209 }
210
211 pub fn unconfirmed_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
213 self.worker_solutions().chain(self.inbound_solutions())
214 }
215
216 pub fn unconfirmed_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
218 self.worker_transactions().chain(self.inbound_transactions())
219 }
220}
221
222impl<N: Network> Consensus<N> {
223 pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
225 self.bft.worker_transmission_ids()
226 }
227
228 pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
230 self.bft.worker_transmissions()
231 }
232
233 pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
235 self.bft.worker_solutions()
236 }
237
238 pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
240 self.bft.worker_transactions()
241 }
242}
243
244impl<N: Network> Consensus<N> {
245 pub fn inbound_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
247 self.inbound_transmissions().map(|(id, _)| id)
248 }
249
250 pub fn inbound_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
252 self.inbound_transactions()
253 .map(|(id, tx)| {
254 (
255 TransmissionID::Transaction(id, tx.to_checksum::<N>().unwrap_or_default()),
256 Transmission::Transaction(tx),
257 )
258 })
259 .chain(self.inbound_solutions().map(|(id, solution)| {
260 (
261 TransmissionID::Solution(id, solution.to_checksum::<N>().unwrap_or_default()),
262 Transmission::Solution(solution),
263 )
264 }))
265 }
266
267 pub fn inbound_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
269 self.solutions_queue.lock().clone().into_iter().map(|(id, solution)| (id, Data::Object(solution)))
271 }
272
273 pub fn inbound_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
275 let tx_queue = self.transactions_queue.lock();
277 tx_queue
279 .deployments
280 .clone()
281 .into_iter()
282 .chain(tx_queue.executions.clone())
283 .map(|(id, tx)| (id, Data::Object(tx)))
284 }
285}
286
287impl<N: Network> Consensus<N> {
288 pub async fn add_unconfirmed_solution(&self, solution: Solution<N>) -> Result<()> {
290 let checksum = Data::<Solution<N>>::Buffer(solution.to_bytes_le()?.into()).to_checksum::<N>()?;
292 #[cfg(feature = "metrics")]
293 {
294 metrics::increment_gauge(metrics::consensus::UNCONFIRMED_SOLUTIONS, 1f64);
295 let timestamp = snarkos_node_bft::helpers::now();
296 self.transmissions_queue_timestamps
297 .lock()
298 .insert(TransmissionID::Solution(solution.id(), checksum), timestamp);
299 }
300 {
302 let solution_id = solution.id();
303
304 if self.seen_solutions.lock().put(solution_id, ()).is_some() {
306 return Ok(());
308 }
309 if self.ledger.contains_transmission(&TransmissionID::Solution(solution_id, checksum))? {
311 bail!("Solution '{}' exists in the ledger {}", fmt_id(solution_id), "(skipping)".dimmed());
312 }
313 trace!("Received unconfirmed solution '{}' in the queue", fmt_id(solution_id));
315 if self.solutions_queue.lock().put(solution_id, solution).is_some() {
316 bail!("Solution '{}' exists in the memory pool", fmt_id(solution_id));
317 }
318 }
319
320 self.process_unconfirmed_solutions().await
322 }
323
324 pub async fn process_unconfirmed_solutions(&self) -> Result<()> {
326 let num_unconfirmed_solutions = self.num_unconfirmed_solutions();
328 let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
329 if num_unconfirmed_solutions >= N::MAX_SOLUTIONS
330 || num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE
331 {
332 return Ok(());
333 }
334 let solutions = {
336 let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions);
338 let mut queue = self.solutions_queue.lock();
340 let num_solutions = queue.len().min(capacity);
342 (0..num_solutions).filter_map(|_| queue.pop_lru().map(|(_, solution)| solution)).collect::<Vec<_>>()
344 };
345 for solution in solutions.into_iter() {
347 let solution_id = solution.id();
348 trace!("Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id));
349 if let Err(e) = self.primary_sender().send_unconfirmed_solution(solution_id, Data::Object(solution)).await {
351 if self.bft.is_synced() {
353 if self.ledger().latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
355 warn!("Failed to add unconfirmed solution '{}' to the memory pool - {e}", fmt_id(solution_id))
356 };
357 }
358 }
359 }
360 Ok(())
361 }
362
363 pub async fn add_unconfirmed_transaction(&self, transaction: Transaction<N>) -> Result<()> {
365 let checksum = Data::<Transaction<N>>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::<N>()?;
367 #[cfg(feature = "metrics")]
368 {
369 metrics::increment_gauge(metrics::consensus::UNCONFIRMED_TRANSACTIONS, 1f64);
370 let timestamp = snarkos_node_bft::helpers::now();
371 self.transmissions_queue_timestamps
372 .lock()
373 .insert(TransmissionID::Transaction(transaction.id(), checksum), timestamp);
374 }
375 {
377 let transaction_id = transaction.id();
378
379 if transaction.is_fee() {
381 bail!("Transaction '{}' is a fee transaction {}", fmt_id(transaction_id), "(skipping)".dimmed());
382 }
383 if self.seen_transactions.lock().put(transaction_id, ()).is_some() {
385 return Ok(());
387 }
388 if self.ledger.contains_transmission(&TransmissionID::Transaction(transaction_id, checksum))? {
390 bail!("Transaction '{}' exists in the ledger {}", fmt_id(transaction_id), "(skipping)".dimmed());
391 }
392 trace!("Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id));
394 if transaction.is_deploy() {
395 if self.transactions_queue.lock().deployments.put(transaction_id, transaction).is_some() {
396 bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
397 }
398 } else if self.transactions_queue.lock().executions.put(transaction_id, transaction).is_some() {
399 bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
400 }
401
402 self.process_unconfirmed_transactions().await
404 }
405 }
406
407 pub async fn process_unconfirmed_transactions(&self) -> Result<()> {
409 let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
411 if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE {
412 return Ok(());
413 }
414 let transactions = {
416 let capacity = Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions);
418 let mut tx_queue = self.transactions_queue.lock();
420 let num_deployments = tx_queue.deployments.len().min(capacity).min(MAX_DEPLOYMENTS_PER_INTERVAL);
422 let num_executions = tx_queue.executions.len().min(capacity.saturating_sub(num_deployments));
424 let selector_iter = (0..num_deployments).map(|_| true).interleave((0..num_executions).map(|_| false));
427 selector_iter
429 .filter_map(|select_deployment| {
430 if select_deployment {
431 tx_queue.deployments.pop_lru().map(|(_, tx)| tx)
432 } else {
433 tx_queue.executions.pop_lru().map(|(_, tx)| tx)
434 }
435 })
436 .collect_vec()
437 };
438 for transaction in transactions.into_iter() {
440 let transaction_id = transaction.id();
441 trace!("Adding unconfirmed transaction '{}' to the memory pool...", fmt_id(transaction_id));
442 if let Err(e) =
444 self.primary_sender().send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await
445 {
446 if self.bft.is_synced() {
448 warn!(
449 "Failed to add unconfirmed transaction '{}' to the memory pool - {e}",
450 fmt_id(transaction_id)
451 );
452 }
453 }
454 }
455 Ok(())
456 }
457}
458
459impl<N: Network> Consensus<N> {
460 fn start_handlers(&self, consensus_receiver: ConsensusReceiver<N>) {
462 let ConsensusReceiver { mut rx_consensus_subdag } = consensus_receiver;
463
464 let self_ = self.clone();
466 self.spawn(async move {
467 while let Some((committed_subdag, transmissions, callback)) = rx_consensus_subdag.recv().await {
468 self_.process_bft_subdag(committed_subdag, transmissions, callback).await;
469 }
470 });
471
472 let self_ = self.clone();
474 self.spawn(async move {
475 loop {
476 tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
478 if let Err(e) = self_.process_unconfirmed_transactions().await {
480 warn!("Cannot process unconfirmed transactions - {e}");
481 }
482 if let Err(e) = self_.process_unconfirmed_solutions().await {
484 warn!("Cannot process unconfirmed solutions - {e}");
485 }
486 }
487 });
488 }
489
490 async fn process_bft_subdag(
492 &self,
493 subdag: Subdag<N>,
494 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
495 callback: oneshot::Sender<Result<()>>,
496 ) {
497 let self_ = self.clone();
499 let transmissions_ = transmissions.clone();
500 let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_) };
501
502 if let Err(e) = &result {
504 error!("Unable to advance to the next block - {e}");
505 self.reinsert_transmissions(transmissions).await;
507 }
508 callback.send(result).ok();
511 }
512
513 fn try_advance_to_next_block(
515 &self,
516 subdag: Subdag<N>,
517 transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
518 ) -> Result<()> {
519 #[cfg(feature = "metrics")]
520 let start = subdag.leader_certificate().batch_header().timestamp();
521 #[cfg(feature = "metrics")]
522 let num_committed_certificates = subdag.values().map(|c| c.len()).sum::<usize>();
523 #[cfg(feature = "metrics")]
524 let current_block_timestamp = self.ledger.latest_block().header().metadata().timestamp();
525
526 let next_block = self.ledger.prepare_advance_to_next_quorum_block(subdag, transmissions)?;
528 self.ledger.check_next_block(&next_block)?;
530 self.ledger.advance_to_next_block(&next_block)?;
532
533 if next_block.height() % N::NUM_BLOCKS_PER_EPOCH == 0 {
535 self.solutions_queue.lock().clear();
537 self.bft.primary().clear_worker_solutions();
539 }
540
541 #[cfg(feature = "metrics")]
542 {
543 let elapsed = std::time::Duration::from_secs((snarkos_node_bft::helpers::now() - start) as u64);
544 let next_block_timestamp = next_block.header().metadata().timestamp();
545 let block_latency = next_block_timestamp - current_block_timestamp;
546 let proof_target = next_block.header().proof_target();
547 let coinbase_target = next_block.header().coinbase_target();
548 let cumulative_proof_target = next_block.header().cumulative_proof_target();
549
550 metrics::add_transmission_latency_metric(&self.transmissions_queue_timestamps, &next_block);
551
552 metrics::gauge(metrics::consensus::COMMITTED_CERTIFICATES, num_committed_certificates as f64);
553 metrics::histogram(metrics::consensus::CERTIFICATE_COMMIT_LATENCY, elapsed.as_secs_f64());
554 metrics::histogram(metrics::consensus::BLOCK_LATENCY, block_latency as f64);
555 metrics::gauge(metrics::blocks::PROOF_TARGET, proof_target as f64);
556 metrics::gauge(metrics::blocks::COINBASE_TARGET, coinbase_target as f64);
557 metrics::gauge(metrics::blocks::CUMULATIVE_PROOF_TARGET, cumulative_proof_target as f64);
558 }
559 Ok(())
560 }
561
562 async fn reinsert_transmissions(&self, transmissions: IndexMap<TransmissionID<N>, Transmission<N>>) {
564 for (transmission_id, transmission) in transmissions.into_iter() {
566 if let Err(e) = self.reinsert_transmission(transmission_id, transmission).await {
568 warn!(
569 "Unable to reinsert transmission {}.{} into the memory pool - {e}",
570 fmt_id(transmission_id),
571 fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
572 );
573 }
574 }
575 }
576
577 async fn reinsert_transmission(
579 &self,
580 transmission_id: TransmissionID<N>,
581 transmission: Transmission<N>,
582 ) -> Result<()> {
583 let (callback, callback_receiver) = oneshot::channel();
585 match (transmission_id, transmission) {
587 (TransmissionID::Ratification, Transmission::Ratification) => return Ok(()),
588 (TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => {
589 self.primary_sender().tx_unconfirmed_solution.send((solution_id, solution, callback)).await?;
591 }
592 (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) => {
593 self.primary_sender().tx_unconfirmed_transaction.send((transaction_id, transaction, callback)).await?;
595 }
596 _ => bail!("Mismatching `(transmission_id, transmission)` pair in consensus"),
597 }
598 callback_receiver.await?
600 }
601
602 fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
604 self.handles.lock().push(tokio::spawn(future));
605 }
606
607 pub async fn shut_down(&self) {
609 info!("Shutting down consensus...");
610 self.bft.shut_down().await;
612 self.handles.lock().iter().for_each(|handle| handle.abort());
614 }
615}