Skip to main content

snarkos_node_consensus/
lib.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![forbid(unsafe_code)]
17
18mod transactions_queue;
19use transactions_queue::TransactionsQueue;
20
21#[macro_use]
22extern crate tracing;
23
24#[cfg(feature = "metrics")]
25extern crate snarkos_node_metrics as metrics;
26
27use snarkos_account::Account;
28use snarkos_node_bft::{
29    BFT,
30    MAX_BATCH_DELAY_IN_MS,
31    Primary,
32    helpers::{
33        ConsensusReceiver,
34        PrimarySender,
35        Storage as NarwhalStorage,
36        fmt_id,
37        init_consensus_channels,
38        init_primary_channels,
39    },
40    spawn_blocking,
41};
42use snarkos_node_bft_ledger_service::LedgerService;
43use snarkos_node_bft_storage_service::BFTPersistentStorage;
44use snarkos_node_sync::{BlockSync, Ping};
45use snarkos_utilities::NodeDataDir;
46
47use snarkvm::{
48    ledger::{
49        block::Transaction,
50        narwhal::{BatchHeader, Data, Subdag, Transmission, TransmissionID},
51        puzzle::{Solution, SolutionID},
52    },
53    prelude::*,
54    utilities::flatten_error,
55};
56
57use aleo_std::StorageMode;
58use anyhow::{Context, Result};
59use colored::Colorize;
60use indexmap::IndexMap;
61#[cfg(feature = "locktick")]
62use locktick::parking_lot::{Mutex, RwLock};
63use lru::LruCache;
64#[cfg(not(feature = "locktick"))]
65use parking_lot::{Mutex, RwLock};
66use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc, time::Duration};
67use tokio::{sync::oneshot, task::JoinHandle};
68
69#[cfg(feature = "metrics")]
70use std::collections::HashMap;
71
72/// The capacity of the queue reserved for deployments.
73/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
74const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
75/// The capacity of the queue reserved for executions.
76/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
77const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
78/// The capacity of the queue reserved for solutions.
79/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
80const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
81/// The **suggested** maximum number of deployments in each interval.
82/// Note: This is an inbound queue limit, not a Narwhal-enforced limit.
83const MAX_DEPLOYMENTS_PER_INTERVAL: usize = 1;
84
85/// Wrapper around `BFT` that adds additional functionality, such as a mempool.
86///
87/// Consensus acts as a rate limiter to prevents workers in BFT from being overloaded.
88/// Each worker maintains a ready queue (which is essentially also a mempool), but verifies transactions/solutions
89/// before enquing them.
90/// Consensus only passes more transactions/solutions to the BFT layer if its ready queues are not already full.
91#[derive(Clone)]
92pub struct Consensus<N: Network> {
93    /// The ledger.
94    ledger: Arc<dyn LedgerService<N>>,
95    /// The BFT.
96    bft: BFT<N>,
97    /// The primary sender.
98    primary_sender: PrimarySender<N>,
99    /// The unconfirmed solutions queue.
100    solutions_queue: Arc<Mutex<LruCache<SolutionID<N>, Solution<N>>>>,
101    /// The unconfirmed transactions queue.
102    transactions_queue: Arc<RwLock<TransactionsQueue<N>>>,
103    /// The recently-seen unconfirmed solutions.
104    seen_solutions: Arc<Mutex<LruCache<SolutionID<N>, ()>>>,
105    /// The recently-seen unconfirmed transactions.
106    seen_transactions: Arc<Mutex<LruCache<N::TransactionID, ()>>>,
107    #[cfg(feature = "metrics")]
108    transmissions_tracker: Arc<Mutex<HashMap<TransmissionID<N>, i64>>>,
109    /// The spawned handles.
110    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
111    /// The ping logic.
112    ping: Arc<Ping<N>>,
113    /// The block sync logic.
114    block_sync: Arc<BlockSync<N>>,
115}
116
117impl<N: Network> Consensus<N> {
118    /// Initializes a new instance of consensus and spawn its background tasks.
119    #[allow(clippy::too_many_arguments)]
120    pub async fn new(
121        account: Account<N>,
122        ledger: Arc<dyn LedgerService<N>>,
123        block_sync: Arc<BlockSync<N>>,
124        ip: Option<SocketAddr>,
125        trusted_validators: &[SocketAddr],
126        trusted_peers_only: bool,
127        storage_mode: StorageMode,
128        node_data_dir: NodeDataDir,
129        ping: Arc<Ping<N>>,
130        dev: Option<u16>,
131    ) -> Result<Self> {
132        // Initialize the primary channels.
133        let (primary_sender, primary_receiver) = init_primary_channels::<N>();
134        // Initialize the Narwhal transmissions.
135        let transmissions = Arc::new(BFTPersistentStorage::open(storage_mode.clone())?);
136        // Initialize the Narwhal storage.
137        let storage = NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::<N>::MAX_GC_ROUNDS as u64);
138        // Initialize the BFT.
139        let bft = BFT::new(
140            account,
141            storage,
142            ledger.clone(),
143            block_sync.clone(),
144            ip,
145            trusted_validators,
146            trusted_peers_only,
147            node_data_dir,
148            dev,
149        )?;
150        // Create a new instance of Consensus.
151        let mut _self = Self {
152            ledger,
153            bft,
154            block_sync,
155            primary_sender,
156            solutions_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
157            transactions_queue: Default::default(),
158            seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
159            seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
160            #[cfg(feature = "metrics")]
161            transmissions_tracker: Default::default(),
162            handles: Default::default(),
163            ping: ping.clone(),
164        };
165
166        info!("Starting the consensus instance...");
167
168        // First, initialize the consensus channels.
169        let (consensus_sender, consensus_receiver) = init_consensus_channels();
170        // Then, start the consensus handlers.
171        _self.start_handlers(consensus_receiver);
172        // Lastly, also start BFTs handlers.
173        _self.bft.run(Some(ping), Some(consensus_sender), _self.primary_sender.clone(), primary_receiver).await?;
174
175        Ok(_self)
176    }
177
178    /// Returns the underlying `BFT` struct.
179    pub const fn bft(&self) -> &BFT<N> {
180        &self.bft
181    }
182
183    pub fn contains_transaction(&self, transaction_id: &N::TransactionID) -> bool {
184        self.transactions_queue.read().contains(transaction_id)
185    }
186}
187
188impl<N: Network> Consensus<N> {
189    /// Returns the number of unconfirmed transmissions in the BFT's workers (not in the mempool).
190    pub fn num_unconfirmed_transmissions(&self) -> usize {
191        self.bft.num_unconfirmed_transmissions()
192    }
193
194    /// Returns the number of unconfirmed ratifications in the BFT's workers (not in the mempool).
195    pub fn num_unconfirmed_ratifications(&self) -> usize {
196        self.bft.num_unconfirmed_ratifications()
197    }
198
199    /// Returns the number unconfirmed solutions in the BFT's workers (not in the mempool).
200    pub fn num_unconfirmed_solutions(&self) -> usize {
201        self.bft.num_unconfirmed_solutions()
202    }
203
204    /// Returns the number of unconfirmed transactions.
205    pub fn num_unconfirmed_transactions(&self) -> usize {
206        self.bft.num_unconfirmed_transactions()
207    }
208}
209
210impl<N: Network> Consensus<N> {
211    /// Returns the unconfirmed transmission IDs.
212    pub fn unconfirmed_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
213        self.worker_transmission_ids().chain(self.inbound_transmission_ids())
214    }
215
216    /// Returns the unconfirmed transmissions.
217    pub fn unconfirmed_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
218        self.worker_transmissions().chain(self.inbound_transmissions())
219    }
220
221    /// Returns the unconfirmed solutions.
222    pub fn unconfirmed_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
223        self.worker_solutions().chain(self.inbound_solutions())
224    }
225
226    /// Returns the unconfirmed transactions.
227    pub fn unconfirmed_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
228        self.worker_transactions().chain(self.inbound_transactions())
229    }
230}
231
232impl<N: Network> Consensus<N> {
233    /// Returns the worker transmission IDs.
234    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
235        self.bft.worker_transmission_ids()
236    }
237
238    /// Returns the worker transmissions.
239    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
240        self.bft.worker_transmissions()
241    }
242
243    /// Returns the worker solutions.
244    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
245        self.bft.worker_solutions()
246    }
247
248    /// Returns the worker transactions.
249    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
250        self.bft.worker_transactions()
251    }
252}
253
254impl<N: Network> Consensus<N> {
255    /// Returns the transmission IDs in the inbound queue.
256    pub fn inbound_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
257        self.inbound_transmissions().map(|(id, _)| id)
258    }
259
260    /// Returns the transmissions in the inbound queue.
261    pub fn inbound_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
262        self.inbound_transactions()
263            .map(|(id, tx)| {
264                (
265                    TransmissionID::Transaction(id, tx.to_checksum::<N>().unwrap_or_default()),
266                    Transmission::Transaction(tx),
267                )
268            })
269            .chain(self.inbound_solutions().map(|(id, solution)| {
270                (
271                    TransmissionID::Solution(id, solution.to_checksum::<N>().unwrap_or_default()),
272                    Transmission::Solution(solution),
273                )
274            }))
275    }
276
277    /// Returns the solutions in the inbound queue.
278    pub fn inbound_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
279        // Return an iterator over the solutions in the inbound queue.
280        self.solutions_queue.lock().clone().into_iter().map(|(id, solution)| (id, Data::Object(solution)))
281    }
282
283    /// Returns the transactions in the inbound queue.
284    pub fn inbound_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
285        // Return an iterator over the deployment and execution transactions in the inbound queue.
286        self.transactions_queue.read().transactions().map(|(id, tx)| (id, Data::Object(tx)))
287    }
288}
289
290impl<N: Network> Consensus<N> {
291    /// Adds the given unconfirmed solution to the memory pool, which will then eventually be passed
292    /// to the BFT layer for inclusion in a batch.
293    pub async fn add_unconfirmed_solution(&self, solution: Solution<N>) -> Result<()> {
294        // Calculate the transmission checksum.
295        let checksum = Data::<Solution<N>>::Buffer(solution.to_bytes_le()?.into()).to_checksum::<N>()?;
296        // Queue the unconfirmed solution.
297        {
298            let solution_id = solution.id();
299
300            // Check if the transaction was recently seen.
301            if self.seen_solutions.lock().put(solution_id, ()).is_some() {
302                // If the transaction was recently seen, return early.
303                return Ok(());
304            }
305            // Check if the solution already exists in the ledger.
306            if self.ledger.contains_transmission(&TransmissionID::Solution(solution_id, checksum))? {
307                bail!("Solution '{}' exists in the ledger {}", fmt_id(solution_id), "(skipping)".dimmed());
308            }
309            #[cfg(feature = "metrics")]
310            {
311                metrics::increment_gauge(metrics::consensus::UNCONFIRMED_SOLUTIONS, 1f64);
312                let timestamp = snarkos_node_bft::helpers::now();
313                self.transmissions_tracker.lock().insert(TransmissionID::Solution(solution.id(), checksum), timestamp);
314            }
315            // Add the solution to the memory pool.
316            trace!("Received unconfirmed solution '{}' in the queue", fmt_id(solution_id));
317            if self.solutions_queue.lock().put(solution_id, solution).is_some() {
318                bail!("Solution '{}' exists in the memory pool", fmt_id(solution_id));
319            }
320        }
321
322        // Try to process the unconfirmed solutions in the memory pool.
323        self.process_unconfirmed_solutions().await
324    }
325
326    /// Processes unconfirmed solutions in the mempool, and passes them to the BFT layer
327    /// (if sufficient space is available).
328    async fn process_unconfirmed_solutions(&self) -> Result<()> {
329        // If the memory pool of this node is full, return early.
330        let num_unconfirmed_solutions = self.num_unconfirmed_solutions();
331        let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
332        if num_unconfirmed_solutions >= N::MAX_SOLUTIONS
333            || num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE
334        {
335            return Ok(());
336        }
337        // Retrieve the solutions.
338        let solutions = {
339            // Determine the available capacity.
340            let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions);
341            // Acquire the lock on the queue.
342            let mut queue = self.solutions_queue.lock();
343            // Determine the number of solutions to send.
344            let num_solutions = queue.len().min(capacity);
345            // Drain the solutions from the queue.
346            (0..num_solutions).filter_map(|_| queue.pop_lru().map(|(_, solution)| solution)).collect::<Vec<_>>()
347        };
348        // Iterate over the solutions.
349        for solution in solutions.into_iter() {
350            let solution_id = solution.id();
351            trace!("Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id));
352            // Send the unconfirmed solution to the primary.
353            match self.primary_sender.send_unconfirmed_solution(solution_id, Data::Object(solution)).await {
354                Ok(true) => {}
355                Ok(false) => debug!(
356                    "Unable to add unconfirmed solution '{}' to the memory pool. Already exists.",
357                    fmt_id(solution_id)
358                ),
359                Err(err) => {
360                    let err = err.context(format!(
361                        "Unable to add unconfirmed solution '{}' to the memory pool",
362                        fmt_id(solution_id)
363                    ));
364
365                    // If the node is synced and the occurs after the first 10 blocks of an epoch, log it as a warning, otherwise trace it.
366                    if self.bft.is_synced() && self.ledger.latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
367                        warn!("{}", flatten_error(err));
368                    } else {
369                        trace!("{}", flatten_error(err));
370                    }
371                }
372            }
373        }
374        Ok(())
375    }
376
377    /// Adds the given unconfirmed transaction to the memory pool, which will then eventually be passed
378    /// to the BFT layer for inclusion in a batch.
379    pub async fn add_unconfirmed_transaction(&self, transaction: Transaction<N>) -> Result<()> {
380        // Calculate the transmission checksum.
381        let checksum = Data::<Transaction<N>>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::<N>()?;
382        // Queue the unconfirmed transaction.
383        {
384            let transaction_id = transaction.id();
385
386            // Check that the transaction is not a fee transaction.
387            if transaction.is_fee() {
388                bail!("Transaction '{}' is a fee transaction {}", fmt_id(transaction_id), "(skipping)".dimmed());
389            }
390            // Check if the transaction was recently seen.
391            if self.seen_transactions.lock().put(transaction_id, ()).is_some() {
392                // If the transaction was recently seen, return early.
393                return Ok(());
394            }
395            // Check if the transaction already exists in the ledger.
396            if self.ledger.contains_transmission(&TransmissionID::Transaction(transaction_id, checksum))? {
397                bail!("Transaction '{}' exists in the ledger {}", fmt_id(transaction_id), "(skipping)".dimmed());
398            }
399            // Check that the transaction is not in the mempool.
400            if self.contains_transaction(&transaction_id) {
401                bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
402            }
403            #[cfg(feature = "metrics")]
404            {
405                metrics::increment_gauge(metrics::consensus::UNCONFIRMED_TRANSACTIONS, 1f64);
406                let timestamp = snarkos_node_bft::helpers::now();
407                self.transmissions_tracker
408                    .lock()
409                    .insert(TransmissionID::Transaction(transaction.id(), checksum), timestamp);
410            }
411            // Add the transaction to the memory pool.
412            trace!("Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id));
413            let priority_fee = transaction.priority_fee_amount()?;
414            self.transactions_queue.write().insert(transaction_id, transaction, priority_fee)?;
415        }
416
417        // Try to process the unconfirmed transactions in the memory pool.
418        self.process_unconfirmed_transactions().await
419    }
420
421    /// Processes unconfirmed transactions in the mempool, and passes them to the BFT layer
422    /// (if sufficient space is available).
423    async fn process_unconfirmed_transactions(&self) -> Result<()> {
424        // If the memory pool of this node is full, return early.
425        let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
426        if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE {
427            return Ok(());
428        }
429        // Retrieve the transactions.
430        let transactions = {
431            // Determine the available capacity.
432            let capacity = Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions);
433            // Acquire the lock on the transactions queue.
434            let mut tx_queue = self.transactions_queue.write();
435            // Determine the number of deployments to send.
436            let num_deployments = tx_queue.deployments.len().min(capacity).min(MAX_DEPLOYMENTS_PER_INTERVAL);
437            // Determine the number of executions to send.
438            let num_executions = tx_queue.executions.len().min(capacity.saturating_sub(num_deployments));
439            // Create an iterator which will select interleaved deployments and executions within the capacity.
440            // Note: interleaving ensures we will never have consecutive invalid deployments blocking the queue.
441            let selector_iter = (0..num_deployments).map(|_| true).interleave((0..num_executions).map(|_| false));
442            // Drain the transactions from the queue, interleaving deployments and executions.
443            selector_iter
444                .filter_map(
445                    |select_deployment| {
446                        if select_deployment { tx_queue.deployments.pop() } else { tx_queue.executions.pop() }
447                    },
448                )
449                .map(|(_, tx)| tx)
450                .collect_vec()
451        };
452        // Iterate over the transactions.
453        for transaction in transactions.into_iter() {
454            let transaction_id = transaction.id();
455            // Determine the type of the transaction. The fee type is technically not possible here.
456            let tx_type_str = match transaction {
457                Transaction::Deploy(..) => "deployment",
458                Transaction::Execute(..) => "execution",
459                Transaction::Fee(..) => "fee",
460            };
461            trace!("Adding unconfirmed {tx_type_str} transaction '{}' to the memory pool...", fmt_id(transaction_id));
462            // Send the unconfirmed transaction to the primary.
463            match self.primary_sender.send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await {
464                Ok(true) => {}
465                Ok(false) => debug!(
466                    "Unable to add unconfirmed {tx_type_str} transaction '{}' to the memory pool. Already exists.",
467                    fmt_id(transaction_id)
468                ),
469                Err(err) => {
470                    // If the BFT is synced, then log the warning.
471                    if self.bft.is_synced() {
472                        let err = err.context(format!(
473                            "Unable to add unconfirmed {tx_type_str} transaction '{}' to the memory pool",
474                            fmt_id(transaction_id)
475                        ));
476                        warn!("{}", flatten_error(err));
477                    }
478                }
479            }
480        }
481        Ok(())
482    }
483}
484
485impl<N: Network> Consensus<N> {
486    /// Starts the consensus handlers.
487    ///
488    /// This is only invoked once, in the constructor.
489    fn start_handlers(&self, consensus_receiver: ConsensusReceiver<N>) {
490        let ConsensusReceiver { mut rx_consensus_subdag } = consensus_receiver;
491
492        // Process the committed subdag and transmissions from the BFT.
493        let self_ = self.clone();
494        self.spawn(async move {
495            while let Some((committed_subdag, transmissions, callback)) = rx_consensus_subdag.recv().await {
496                self_.process_bft_subdag(committed_subdag, transmissions, callback).await;
497            }
498        });
499
500        // Process the unconfirmed transactions in the memory pool.
501        //
502        // TODO (kaimast): This shouldn't happen periodically but only when new batches/blocks are accepted
503        // by the BFT layer, after which the worker's ready queue may have capacity for more transactions/solutions.
504        let self_ = self.clone();
505        self.spawn(async move {
506            loop {
507                // Sleep briefly.
508                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
509                // Process the unconfirmed transactions in the memory pool.
510                if let Err(err) = self_.process_unconfirmed_transactions().await {
511                    warn!("{}", flatten_error(err.context("Cannot process unconfirmed transactions")));
512                }
513                // Process the unconfirmed solutions in the memory pool.
514                if let Err(err) = self_.process_unconfirmed_solutions().await {
515                    warn!("{}", flatten_error(err.context("Cannot process unconfirmed solutions")));
516                }
517            }
518        });
519    }
520
521    /// Attempts to build a new block from the given subDAG, and (tries to) advance the legder to it.
522    async fn process_bft_subdag(
523        &self,
524        subdag: Subdag<N>,
525        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
526        callback: oneshot::Sender<Result<()>>,
527    ) {
528        // Try to advance to the next block.
529        let self_ = self.clone();
530        let transmissions_ = transmissions.clone();
531        let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_).with_context(|| "Unable to advance to the next block") };
532
533        // If the block failed to advance, reinsert the transmissions into the memory pool.
534        if let Err(e) = &result {
535            error!("{}", flatten_error(e));
536            // On failure, reinsert the transmissions into the memory pool.
537            self.reinsert_transmissions(transmissions).await;
538        }
539        // Send the callback **after** advancing to the next block.
540        // Note: We must await the block to be advanced before sending the callback.
541        callback.send(result).ok();
542    }
543
544    /// Attempts to advance the ledger to the next block, and updates the metrics (if enabled) accordingly.
545    fn try_advance_to_next_block(
546        &self,
547        subdag: Subdag<N>,
548        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
549    ) -> Result<()> {
550        #[cfg(feature = "metrics")]
551        let start = subdag.leader_certificate().batch_header().timestamp();
552        #[cfg(feature = "metrics")]
553        let num_committed_certificates = subdag.values().map(|c| c.len()).sum::<usize>();
554        #[cfg(feature = "metrics")]
555        let current_block_timestamp = self.ledger.latest_block().header().metadata().timestamp();
556
557        // Create the candidate next block.
558        let next_block = self.ledger.prepare_advance_to_next_quorum_block(subdag, transmissions)?;
559        // Check that the block is well-formed.
560        self.ledger.check_next_block(&next_block)?;
561        // Advance to the next block.
562        self.ledger.advance_to_next_block(&next_block)?;
563        #[cfg(feature = "telemetry")]
564        // Fetch the latest committee
565        let latest_committee = self.ledger.current_committee()?;
566
567        // If the next block starts a new epoch, clear the existing solutions.
568        if next_block.height() % N::NUM_BLOCKS_PER_EPOCH == 0 {
569            // Clear the solutions queue.
570            self.solutions_queue.lock().clear();
571            // Clear the worker solutions.
572            self.bft.primary().clear_worker_solutions();
573        }
574
575        // Notify peers that we have a new block.
576        let locators = self.block_sync.get_block_locators()?;
577        self.ping.update_block_locators(locators);
578
579        // Make block sync aware of the new block.
580        self.block_sync.set_sync_height(next_block.height());
581
582        // TODO(kaimast): This should also remove any transmissions/solutions contained in the block from the mempool.
583        // Removal currently happens when Consensus eventually passes them to the worker, which then just discards them.
584
585        #[cfg(feature = "metrics")]
586        {
587            let now_utc = snarkos_node_bft::helpers::now_utc();
588            let elapsed = std::time::Duration::from_secs((now_utc.unix_timestamp() - start) as u64);
589            let next_block_timestamp = next_block.header().metadata().timestamp();
590            let next_block_utc = snarkos_node_bft::helpers::to_utc_datetime(next_block_timestamp);
591            let block_latency = next_block_timestamp - current_block_timestamp;
592            let block_lag = (now_utc - next_block_utc).whole_milliseconds();
593
594            let proof_target = next_block.header().proof_target();
595            let coinbase_target = next_block.header().coinbase_target();
596            let cumulative_proof_target = next_block.header().cumulative_proof_target();
597
598            // Calculate latency for all transmissions included in this block.
599            metrics::add_transmission_latency_metric(&self.transmissions_tracker, &next_block);
600
601            metrics::gauge(metrics::consensus::COMMITTED_CERTIFICATES, num_committed_certificates as f64);
602            metrics::histogram(metrics::consensus::CERTIFICATE_COMMIT_LATENCY, elapsed.as_secs_f64());
603            metrics::histogram(metrics::consensus::BLOCK_LATENCY, block_latency as f64);
604            metrics::histogram(metrics::consensus::BLOCK_LAG, block_lag as f64);
605            metrics::gauge(metrics::blocks::PROOF_TARGET, proof_target as f64);
606            metrics::gauge(metrics::blocks::COINBASE_TARGET, coinbase_target as f64);
607            metrics::gauge(metrics::blocks::CUMULATIVE_PROOF_TARGET, cumulative_proof_target as f64);
608
609            #[cfg(feature = "telemetry")]
610            {
611                // Retrieve the latest participation scores.
612                let participation_scores =
613                    self.bft().primary().gateway().validator_telemetry().get_participation_scores(&latest_committee);
614
615                // Log the participation scores.
616                for (address, participation_score) in participation_scores {
617                    metrics::histogram_label(
618                        metrics::consensus::VALIDATOR_PARTICIPATION,
619                        "validator_address",
620                        address.to_string(),
621                        participation_score,
622                    )
623                }
624            }
625        }
626        Ok(())
627    }
628
629    /// Reinserts the given transmissions into the memory pool.
630    async fn reinsert_transmissions(&self, transmissions: IndexMap<TransmissionID<N>, Transmission<N>>) {
631        // Iterate over the transmissions.
632        for (transmission_id, transmission) in transmissions.into_iter() {
633            // Reinsert the transmission into the memory pool.
634            match self.reinsert_transmission(transmission_id, transmission).await {
635                Ok(true) => {}
636                Ok(false) => debug!(
637                    "Unable to reinsert transmission {}.{} into the memory pool. Already exists.",
638                    fmt_id(transmission_id),
639                    fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
640                ),
641                Err(err) => {
642                    let err = err.context(format!(
643                        "Unable to reinsert transmission {}.{} into the memory pool",
644                        fmt_id(transmission_id),
645                        fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
646                    ));
647                    warn!("{}", flatten_error(err));
648                }
649            }
650        }
651    }
652
653    /// Reinserts the given transmission into the memory pool.
654    ///
655    /// # Returns
656    /// - `Ok(true)` if the transmission was added to the memory pool.
657    /// - `Ok(false)` if the transmission was valid but already exists in the memory pool.
658    /// - `Err(anyhow::Error)` if the transmission was invalid.
659    async fn reinsert_transmission(
660        &self,
661        transmission_id: TransmissionID<N>,
662        transmission: Transmission<N>,
663    ) -> Result<bool> {
664        // Initialize a callback sender and receiver.
665        let (callback, callback_receiver) = oneshot::channel();
666        // Send the transmission to the primary.
667        match (transmission_id, transmission) {
668            (TransmissionID::Ratification, Transmission::Ratification) => return Ok(true),
669            (TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => {
670                // Send the solution to the primary.
671                self.primary_sender.tx_unconfirmed_solution.send((solution_id, solution, callback)).await?;
672            }
673            (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) => {
674                // Send the transaction to the primary.
675                self.primary_sender.tx_unconfirmed_transaction.send((transaction_id, transaction, callback)).await?;
676            }
677            _ => bail!("Mismatching `(transmission_id, transmission)` pair in consensus"),
678        }
679        // Await the callback.
680        callback_receiver.await?
681    }
682
683    /// Spawns a task with the given future; it should only be used for long-running tasks.
684    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
685        self.handles.lock().push(tokio::spawn(future));
686    }
687
688    /// Shuts down the consensus and BFT layers.
689    pub async fn shut_down(&self) {
690        info!("Shutting down consensus...");
691        // Shut down the BFT.
692        self.bft.shut_down().await;
693        // Abort the tasks.
694        self.handles.lock().iter().for_each(|handle| handle.abort());
695    }
696}