snarkos_node_consensus/
lib.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate tracing;
20
21use snarkos_account::Account;
22use snarkos_node_bft::{
23    BFT,
24    MAX_BATCH_DELAY_IN_MS,
25    Primary,
26    helpers::{
27        ConsensusReceiver,
28        PrimaryReceiver,
29        PrimarySender,
30        Storage as NarwhalStorage,
31        fmt_id,
32        init_consensus_channels,
33    },
34    spawn_blocking,
35};
36use snarkos_node_bft_ledger_service::LedgerService;
37use snarkos_node_bft_storage_service::BFTPersistentStorage;
38use snarkvm::{
39    ledger::{
40        block::Transaction,
41        narwhal::{BatchHeader, Data, Subdag, Transmission, TransmissionID},
42        puzzle::{Solution, SolutionID},
43    },
44    prelude::*,
45};
46
47use aleo_std::StorageMode;
48use anyhow::Result;
49use colored::Colorize;
50use indexmap::IndexMap;
51#[cfg(feature = "locktick")]
52use locktick::parking_lot::Mutex;
53use lru::LruCache;
54#[cfg(not(feature = "locktick"))]
55use parking_lot::Mutex;
56use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc, time::Duration};
57use tokio::{
58    sync::{OnceCell, oneshot},
59    task::JoinHandle,
60};
61
62#[cfg(feature = "metrics")]
63use std::collections::HashMap;
64
65/// The capacity of the queue reserved for deployments.
66/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
67const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
68/// The capacity of the queue reserved for executions.
69/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
70const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
71/// The capacity of the queue reserved for solutions.
72/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
73const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
74/// The **suggested** maximum number of deployments in each interval.
75/// Note: This is an inbound queue limit, not a Narwhal-enforced limit.
76const MAX_DEPLOYMENTS_PER_INTERVAL: usize = 1;
77
78/// Helper struct to track incoming transactions.
79struct TransactionsQueue<N: Network> {
80    pub deployments: LruCache<N::TransactionID, Transaction<N>>,
81    pub executions: LruCache<N::TransactionID, Transaction<N>>,
82}
83
84impl<N: Network> Default for TransactionsQueue<N> {
85    fn default() -> Self {
86        Self {
87            deployments: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()),
88            executions: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()),
89        }
90    }
91}
92
93#[derive(Clone)]
94pub struct Consensus<N: Network> {
95    /// The ledger.
96    ledger: Arc<dyn LedgerService<N>>,
97    /// The BFT.
98    bft: BFT<N>,
99    /// The primary sender.
100    primary_sender: Arc<OnceCell<PrimarySender<N>>>,
101    /// The unconfirmed solutions queue.
102    solutions_queue: Arc<Mutex<LruCache<SolutionID<N>, Solution<N>>>>,
103    /// The unconfirmed transactions queue.
104    transactions_queue: Arc<Mutex<TransactionsQueue<N>>>,
105    /// The recently-seen unconfirmed solutions.
106    seen_solutions: Arc<Mutex<LruCache<SolutionID<N>, ()>>>,
107    /// The recently-seen unconfirmed transactions.
108    seen_transactions: Arc<Mutex<LruCache<N::TransactionID, ()>>>,
109    #[cfg(feature = "metrics")]
110    transmissions_queue_timestamps: Arc<Mutex<HashMap<TransmissionID<N>, i64>>>,
111    /// The spawned handles.
112    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
113}
114
115impl<N: Network> Consensus<N> {
116    /// Initializes a new instance of consensus.
117    pub fn new(
118        account: Account<N>,
119        ledger: Arc<dyn LedgerService<N>>,
120        ip: Option<SocketAddr>,
121        trusted_validators: &[SocketAddr],
122        storage_mode: StorageMode,
123    ) -> Result<Self> {
124        // Initialize the Narwhal transmissions.
125        let transmissions = Arc::new(BFTPersistentStorage::open(storage_mode.clone())?);
126        // Initialize the Narwhal storage.
127        let storage = NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::<N>::MAX_GC_ROUNDS as u64);
128        // Initialize the BFT.
129        let bft = BFT::new(account, storage, ledger.clone(), ip, trusted_validators, storage_mode)?;
130        // Return the consensus.
131        Ok(Self {
132            ledger,
133            bft,
134            primary_sender: Default::default(),
135            solutions_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
136            transactions_queue: Default::default(),
137            seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
138            seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
139            #[cfg(feature = "metrics")]
140            transmissions_queue_timestamps: Default::default(),
141            handles: Default::default(),
142        })
143    }
144
145    /// Run the consensus instance.
146    pub async fn run(&mut self, primary_sender: PrimarySender<N>, primary_receiver: PrimaryReceiver<N>) -> Result<()> {
147        info!("Starting the consensus instance...");
148        // Set the primary sender.
149        self.primary_sender.set(primary_sender.clone()).expect("Primary sender already set");
150
151        // First, initialize the consensus channels.
152        let (consensus_sender, consensus_receiver) = init_consensus_channels();
153        // Then, start the consensus handlers.
154        self.start_handlers(consensus_receiver);
155        // Lastly, the consensus.
156        self.bft.run(Some(consensus_sender), primary_sender, primary_receiver).await?;
157        Ok(())
158    }
159
160    /// Returns the ledger.
161    pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
162        &self.ledger
163    }
164
165    /// Returns the BFT.
166    pub const fn bft(&self) -> &BFT<N> {
167        &self.bft
168    }
169
170    /// Returns the primary sender.
171    pub fn primary_sender(&self) -> &PrimarySender<N> {
172        self.primary_sender.get().expect("Primary sender not set")
173    }
174}
175
176impl<N: Network> Consensus<N> {
177    /// Returns the number of unconfirmed transmissions.
178    pub fn num_unconfirmed_transmissions(&self) -> usize {
179        self.bft.num_unconfirmed_transmissions()
180    }
181
182    /// Returns the number of unconfirmed ratifications.
183    pub fn num_unconfirmed_ratifications(&self) -> usize {
184        self.bft.num_unconfirmed_ratifications()
185    }
186
187    /// Returns the number of solutions.
188    pub fn num_unconfirmed_solutions(&self) -> usize {
189        self.bft.num_unconfirmed_solutions()
190    }
191
192    /// Returns the number of unconfirmed transactions.
193    pub fn num_unconfirmed_transactions(&self) -> usize {
194        self.bft.num_unconfirmed_transactions()
195    }
196}
197
198impl<N: Network> Consensus<N> {
199    /// Returns the unconfirmed transmission IDs.
200    pub fn unconfirmed_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
201        self.worker_transmission_ids().chain(self.inbound_transmission_ids())
202    }
203
204    /// Returns the unconfirmed transmissions.
205    pub fn unconfirmed_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
206        self.worker_transmissions().chain(self.inbound_transmissions())
207    }
208
209    /// Returns the unconfirmed solutions.
210    pub fn unconfirmed_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
211        self.worker_solutions().chain(self.inbound_solutions())
212    }
213
214    /// Returns the unconfirmed transactions.
215    pub fn unconfirmed_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
216        self.worker_transactions().chain(self.inbound_transactions())
217    }
218}
219
220impl<N: Network> Consensus<N> {
221    /// Returns the worker transmission IDs.
222    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
223        self.bft.worker_transmission_ids()
224    }
225
226    /// Returns the worker transmissions.
227    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
228        self.bft.worker_transmissions()
229    }
230
231    /// Returns the worker solutions.
232    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
233        self.bft.worker_solutions()
234    }
235
236    /// Returns the worker transactions.
237    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
238        self.bft.worker_transactions()
239    }
240}
241
242impl<N: Network> Consensus<N> {
243    /// Returns the transmission IDs in the inbound queue.
244    pub fn inbound_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
245        self.inbound_transmissions().map(|(id, _)| id)
246    }
247
248    /// Returns the transmissions in the inbound queue.
249    pub fn inbound_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
250        self.inbound_transactions()
251            .map(|(id, tx)| {
252                (
253                    TransmissionID::Transaction(id, tx.to_checksum::<N>().unwrap_or_default()),
254                    Transmission::Transaction(tx),
255                )
256            })
257            .chain(self.inbound_solutions().map(|(id, solution)| {
258                (
259                    TransmissionID::Solution(id, solution.to_checksum::<N>().unwrap_or_default()),
260                    Transmission::Solution(solution),
261                )
262            }))
263    }
264
265    /// Returns the solutions in the inbound queue.
266    pub fn inbound_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
267        // Return an iterator over the solutions in the inbound queue.
268        self.solutions_queue.lock().clone().into_iter().map(|(id, solution)| (id, Data::Object(solution)))
269    }
270
271    /// Returns the transactions in the inbound queue.
272    pub fn inbound_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
273        // Acquire the lock on the transactions queue.
274        let tx_queue = self.transactions_queue.lock();
275        // Return an iterator over the deployment and execution transactions in the inbound queue.
276        tx_queue
277            .deployments
278            .clone()
279            .into_iter()
280            .chain(tx_queue.executions.clone())
281            .map(|(id, tx)| (id, Data::Object(tx)))
282    }
283}
284
285impl<N: Network> Consensus<N> {
286    /// Adds the given unconfirmed solution to the memory pool.
287    pub async fn add_unconfirmed_solution(&self, solution: Solution<N>) -> Result<()> {
288        // Calculate the transmission checksum.
289        let checksum = Data::<Solution<N>>::Buffer(solution.to_bytes_le()?.into()).to_checksum::<N>()?;
290        #[cfg(feature = "metrics")]
291        {
292            metrics::increment_gauge(metrics::consensus::UNCONFIRMED_SOLUTIONS, 1f64);
293            let timestamp = snarkos_node_bft::helpers::now();
294            self.transmissions_queue_timestamps
295                .lock()
296                .insert(TransmissionID::Solution(solution.id(), checksum), timestamp);
297        }
298        // Queue the unconfirmed solution.
299        {
300            let solution_id = solution.id();
301
302            // Check if the transaction was recently seen.
303            if self.seen_solutions.lock().put(solution_id, ()).is_some() {
304                // If the transaction was recently seen, return early.
305                return Ok(());
306            }
307            // Check if the solution already exists in the ledger.
308            if self.ledger.contains_transmission(&TransmissionID::Solution(solution_id, checksum))? {
309                bail!("Solution '{}' exists in the ledger {}", fmt_id(solution_id), "(skipping)".dimmed());
310            }
311            // Add the solution to the memory pool.
312            trace!("Received unconfirmed solution '{}' in the queue", fmt_id(solution_id));
313            if self.solutions_queue.lock().put(solution_id, solution).is_some() {
314                bail!("Solution '{}' exists in the memory pool", fmt_id(solution_id));
315            }
316        }
317
318        // Try to process the unconfirmed solutions in the memory pool.
319        self.process_unconfirmed_solutions().await
320    }
321
322    /// Processes unconfirmed transactions in the memory pool.
323    pub async fn process_unconfirmed_solutions(&self) -> Result<()> {
324        // If the memory pool of this node is full, return early.
325        let num_unconfirmed_solutions = self.num_unconfirmed_solutions();
326        let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
327        if num_unconfirmed_solutions >= N::MAX_SOLUTIONS
328            || num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE
329        {
330            return Ok(());
331        }
332        // Retrieve the solutions.
333        let solutions = {
334            // Determine the available capacity.
335            let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions);
336            // Acquire the lock on the queue.
337            let mut queue = self.solutions_queue.lock();
338            // Determine the number of solutions to send.
339            let num_solutions = queue.len().min(capacity);
340            // Drain the solutions from the queue.
341            (0..num_solutions).filter_map(|_| queue.pop_lru().map(|(_, solution)| solution)).collect::<Vec<_>>()
342        };
343        // Iterate over the solutions.
344        for solution in solutions.into_iter() {
345            let solution_id = solution.id();
346            trace!("Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id));
347            // Send the unconfirmed solution to the primary.
348            if let Err(e) = self.primary_sender().send_unconfirmed_solution(solution_id, Data::Object(solution)).await {
349                // If the BFT is synced, then log the warning.
350                if self.bft.is_synced() {
351                    // If error occurs after the first 10 blocks of the epoch, log it as a warning, otherwise ignore.
352                    if self.ledger().latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
353                        warn!("Failed to add unconfirmed solution '{}' to the memory pool - {e}", fmt_id(solution_id))
354                    };
355                }
356            }
357        }
358        Ok(())
359    }
360
361    /// Adds the given unconfirmed transaction to the memory pool.
362    pub async fn add_unconfirmed_transaction(&self, transaction: Transaction<N>) -> Result<()> {
363        // Calculate the transmission checksum.
364        let checksum = Data::<Transaction<N>>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::<N>()?;
365        #[cfg(feature = "metrics")]
366        {
367            metrics::increment_gauge(metrics::consensus::UNCONFIRMED_TRANSACTIONS, 1f64);
368            let timestamp = snarkos_node_bft::helpers::now();
369            self.transmissions_queue_timestamps
370                .lock()
371                .insert(TransmissionID::Transaction(transaction.id(), checksum), timestamp);
372        }
373        // Queue the unconfirmed transaction.
374        {
375            let transaction_id = transaction.id();
376
377            // Check that the transaction is not a fee transaction.
378            if transaction.is_fee() {
379                bail!("Transaction '{}' is a fee transaction {}", fmt_id(transaction_id), "(skipping)".dimmed());
380            }
381            // Check if the transaction was recently seen.
382            if self.seen_transactions.lock().put(transaction_id, ()).is_some() {
383                // If the transaction was recently seen, return early.
384                return Ok(());
385            }
386            // Check if the transaction already exists in the ledger.
387            if self.ledger.contains_transmission(&TransmissionID::Transaction(transaction_id, checksum))? {
388                bail!("Transaction '{}' exists in the ledger {}", fmt_id(transaction_id), "(skipping)".dimmed());
389            }
390            // Add the transaction to the memory pool.
391            trace!("Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id));
392            if transaction.is_deploy() {
393                if self.transactions_queue.lock().deployments.put(transaction_id, transaction).is_some() {
394                    bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
395                }
396            } else if self.transactions_queue.lock().executions.put(transaction_id, transaction).is_some() {
397                bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
398            }
399
400            // Try to process the unconfirmed transactions in the memory pool.
401            self.process_unconfirmed_transactions().await
402        }
403    }
404
405    /// Processes unconfirmed transactions in the memory pool.
406    pub async fn process_unconfirmed_transactions(&self) -> Result<()> {
407        // If the memory pool of this node is full, return early.
408        let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
409        if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE {
410            return Ok(());
411        }
412        // Retrieve the transactions.
413        let transactions = {
414            // Determine the available capacity.
415            let capacity = Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions);
416            // Acquire the lock on the transactions queue.
417            let mut tx_queue = self.transactions_queue.lock();
418            // Determine the number of deployments to send.
419            let num_deployments = tx_queue.deployments.len().min(capacity).min(MAX_DEPLOYMENTS_PER_INTERVAL);
420            // Determine the number of executions to send.
421            let num_executions = tx_queue.executions.len().min(capacity.saturating_sub(num_deployments));
422            // Create an iterator which will select interleaved deployments and executions within the capacity.
423            // Note: interleaving ensures we will never have consecutive invalid deployments blocking the queue.
424            let selector_iter = (0..num_deployments).map(|_| true).interleave((0..num_executions).map(|_| false));
425            // Drain the transactions from the queue, interleaving deployments and executions.
426            selector_iter
427                .filter_map(|select_deployment| {
428                    if select_deployment {
429                        tx_queue.deployments.pop_lru().map(|(_, tx)| tx)
430                    } else {
431                        tx_queue.executions.pop_lru().map(|(_, tx)| tx)
432                    }
433                })
434                .collect_vec()
435        };
436        // Iterate over the transactions.
437        for transaction in transactions.into_iter() {
438            let transaction_id = transaction.id();
439            trace!("Adding unconfirmed transaction '{}' to the memory pool...", fmt_id(transaction_id));
440            // Send the unconfirmed transaction to the primary.
441            if let Err(e) =
442                self.primary_sender().send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await
443            {
444                // If the BFT is synced, then log the warning.
445                if self.bft.is_synced() {
446                    warn!(
447                        "Failed to add unconfirmed transaction '{}' to the memory pool - {e}",
448                        fmt_id(transaction_id)
449                    );
450                }
451            }
452        }
453        Ok(())
454    }
455}
456
457impl<N: Network> Consensus<N> {
458    /// Starts the consensus handlers.
459    fn start_handlers(&self, consensus_receiver: ConsensusReceiver<N>) {
460        let ConsensusReceiver { mut rx_consensus_subdag } = consensus_receiver;
461
462        // Process the committed subdag and transmissions from the BFT.
463        let self_ = self.clone();
464        self.spawn(async move {
465            while let Some((committed_subdag, transmissions, callback)) = rx_consensus_subdag.recv().await {
466                self_.process_bft_subdag(committed_subdag, transmissions, callback).await;
467            }
468        });
469
470        // Process the unconfirmed transactions in the memory pool.
471        let self_ = self.clone();
472        self.spawn(async move {
473            loop {
474                // Sleep briefly.
475                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
476                // Process the unconfirmed transactions in the memory pool.
477                if let Err(e) = self_.process_unconfirmed_transactions().await {
478                    warn!("Cannot process unconfirmed transactions - {e}");
479                }
480                // Process the unconfirmed solutions in the memory pool.
481                if let Err(e) = self_.process_unconfirmed_solutions().await {
482                    warn!("Cannot process unconfirmed solutions - {e}");
483                }
484            }
485        });
486    }
487
488    /// Processes the committed subdag and transmissions from the BFT.
489    async fn process_bft_subdag(
490        &self,
491        subdag: Subdag<N>,
492        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
493        callback: oneshot::Sender<Result<()>>,
494    ) {
495        // Try to advance to the next block.
496        let self_ = self.clone();
497        let transmissions_ = transmissions.clone();
498        let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_) };
499
500        // If the block failed to advance, reinsert the transmissions into the memory pool.
501        if let Err(e) = &result {
502            error!("Unable to advance to the next block - {e}");
503            // On failure, reinsert the transmissions into the memory pool.
504            self.reinsert_transmissions(transmissions).await;
505        }
506        // Send the callback **after** advancing to the next block.
507        // Note: We must await the block to be advanced before sending the callback.
508        callback.send(result).ok();
509    }
510
511    /// Attempts to advance to the next block.
512    fn try_advance_to_next_block(
513        &self,
514        subdag: Subdag<N>,
515        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
516    ) -> Result<()> {
517        #[cfg(feature = "metrics")]
518        let start = subdag.leader_certificate().batch_header().timestamp();
519        #[cfg(feature = "metrics")]
520        let num_committed_certificates = subdag.values().map(|c| c.len()).sum::<usize>();
521        #[cfg(feature = "metrics")]
522        let current_block_timestamp = self.ledger.latest_block().header().metadata().timestamp();
523
524        // Create the candidate next block.
525        let next_block = self.ledger.prepare_advance_to_next_quorum_block(subdag, transmissions)?;
526        // Check that the block is well-formed.
527        self.ledger.check_next_block(&next_block)?;
528        // Advance to the next block.
529        self.ledger.advance_to_next_block(&next_block)?;
530
531        // If the next block starts a new epoch, clear the existing solutions.
532        if next_block.height() % N::NUM_BLOCKS_PER_EPOCH == 0 {
533            // Clear the solutions queue.
534            self.solutions_queue.lock().clear();
535            // Clear the worker solutions.
536            self.bft.primary().clear_worker_solutions();
537        }
538
539        #[cfg(feature = "metrics")]
540        {
541            let elapsed = std::time::Duration::from_secs((snarkos_node_bft::helpers::now() - start) as u64);
542            let next_block_timestamp = next_block.header().metadata().timestamp();
543            let block_latency = next_block_timestamp - current_block_timestamp;
544            let proof_target = next_block.header().proof_target();
545            let coinbase_target = next_block.header().coinbase_target();
546            let cumulative_proof_target = next_block.header().cumulative_proof_target();
547
548            metrics::add_transmission_latency_metric(&self.transmissions_queue_timestamps, &next_block);
549
550            metrics::gauge(metrics::consensus::COMMITTED_CERTIFICATES, num_committed_certificates as f64);
551            metrics::histogram(metrics::consensus::CERTIFICATE_COMMIT_LATENCY, elapsed.as_secs_f64());
552            metrics::histogram(metrics::consensus::BLOCK_LATENCY, block_latency as f64);
553            metrics::gauge(metrics::blocks::PROOF_TARGET, proof_target as f64);
554            metrics::gauge(metrics::blocks::COINBASE_TARGET, coinbase_target as f64);
555            metrics::gauge(metrics::blocks::CUMULATIVE_PROOF_TARGET, cumulative_proof_target as f64);
556        }
557        Ok(())
558    }
559
560    /// Reinserts the given transmissions into the memory pool.
561    async fn reinsert_transmissions(&self, transmissions: IndexMap<TransmissionID<N>, Transmission<N>>) {
562        // Iterate over the transmissions.
563        for (transmission_id, transmission) in transmissions.into_iter() {
564            // Reinsert the transmission into the memory pool.
565            if let Err(e) = self.reinsert_transmission(transmission_id, transmission).await {
566                warn!(
567                    "Unable to reinsert transmission {}.{} into the memory pool - {e}",
568                    fmt_id(transmission_id),
569                    fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
570                );
571            }
572        }
573    }
574
575    /// Reinserts the given transmission into the memory pool.
576    async fn reinsert_transmission(
577        &self,
578        transmission_id: TransmissionID<N>,
579        transmission: Transmission<N>,
580    ) -> Result<()> {
581        // Initialize a callback sender and receiver.
582        let (callback, callback_receiver) = oneshot::channel();
583        // Send the transmission to the primary.
584        match (transmission_id, transmission) {
585            (TransmissionID::Ratification, Transmission::Ratification) => return Ok(()),
586            (TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => {
587                // Send the solution to the primary.
588                self.primary_sender().tx_unconfirmed_solution.send((solution_id, solution, callback)).await?;
589            }
590            (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) => {
591                // Send the transaction to the primary.
592                self.primary_sender().tx_unconfirmed_transaction.send((transaction_id, transaction, callback)).await?;
593            }
594            _ => bail!("Mismatching `(transmission_id, transmission)` pair in consensus"),
595        }
596        // Await the callback.
597        callback_receiver.await?
598    }
599
600    /// Spawns a task with the given future; it should only be used for long-running tasks.
601    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
602        self.handles.lock().push(tokio::spawn(future));
603    }
604
605    /// Shuts down the BFT.
606    pub async fn shut_down(&self) {
607        info!("Shutting down consensus...");
608        // Shut down the BFT.
609        self.bft.shut_down().await;
610        // Abort the tasks.
611        self.handles.lock().iter().for_each(|handle| handle.abort());
612    }
613}