snarkos_node_consensus/
lib.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate tracing;
20
21use snarkos_account::Account;
22use snarkos_node_bft::{
23    BFT,
24    MAX_BATCH_DELAY_IN_MS,
25    Primary,
26    helpers::{
27        ConsensusReceiver,
28        PrimaryReceiver,
29        PrimarySender,
30        Storage as NarwhalStorage,
31        fmt_id,
32        init_consensus_channels,
33    },
34    spawn_blocking,
35};
36use snarkos_node_bft_ledger_service::LedgerService;
37use snarkos_node_bft_storage_service::BFTPersistentStorage;
38use snarkvm::{
39    ledger::{
40        block::Transaction,
41        narwhal::{BatchHeader, Data, Subdag, Transmission, TransmissionID},
42        puzzle::{Solution, SolutionID},
43    },
44    prelude::*,
45};
46
47use aleo_std::StorageMode;
48use anyhow::Result;
49use colored::Colorize;
50use indexmap::IndexMap;
51use lru::LruCache;
52use parking_lot::Mutex;
53use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc, time::Duration};
54use tokio::{
55    sync::{OnceCell, oneshot},
56    task::JoinHandle,
57};
58
59#[cfg(feature = "metrics")]
60use std::collections::HashMap;
61
62/// The capacity of the queue reserved for deployments.
63/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
64const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
65/// The capacity of the queue reserved for executions.
66/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
67const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
68/// The capacity of the queue reserved for solutions.
69/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
70const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
71/// The **suggested** maximum number of deployments in each interval.
72/// Note: This is an inbound queue limit, not a Narwhal-enforced limit.
73const MAX_DEPLOYMENTS_PER_INTERVAL: usize = 1;
74
75/// Helper struct to track incoming transactions.
76struct TransactionsQueue<N: Network> {
77    pub deployments: LruCache<N::TransactionID, Transaction<N>>,
78    pub executions: LruCache<N::TransactionID, Transaction<N>>,
79}
80
81impl<N: Network> Default for TransactionsQueue<N> {
82    fn default() -> Self {
83        Self {
84            deployments: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()),
85            executions: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()),
86        }
87    }
88}
89
90#[derive(Clone)]
91pub struct Consensus<N: Network> {
92    /// The ledger.
93    ledger: Arc<dyn LedgerService<N>>,
94    /// The BFT.
95    bft: BFT<N>,
96    /// The primary sender.
97    primary_sender: Arc<OnceCell<PrimarySender<N>>>,
98    /// The unconfirmed solutions queue.
99    solutions_queue: Arc<Mutex<LruCache<SolutionID<N>, Solution<N>>>>,
100    /// The unconfirmed transactions queue.
101    transactions_queue: Arc<Mutex<TransactionsQueue<N>>>,
102    /// The recently-seen unconfirmed solutions.
103    seen_solutions: Arc<Mutex<LruCache<SolutionID<N>, ()>>>,
104    /// The recently-seen unconfirmed transactions.
105    seen_transactions: Arc<Mutex<LruCache<N::TransactionID, ()>>>,
106    #[cfg(feature = "metrics")]
107    transmissions_queue_timestamps: Arc<Mutex<HashMap<TransmissionID<N>, i64>>>,
108    /// The spawned handles.
109    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
110}
111
112impl<N: Network> Consensus<N> {
113    /// Initializes a new instance of consensus.
114    pub fn new(
115        account: Account<N>,
116        ledger: Arc<dyn LedgerService<N>>,
117        ip: Option<SocketAddr>,
118        trusted_validators: &[SocketAddr],
119        storage_mode: StorageMode,
120    ) -> Result<Self> {
121        // Recover the development ID, if it is present.
122        let dev = match storage_mode {
123            StorageMode::Development(id) => Some(id),
124            StorageMode::Production | StorageMode::Custom(..) => None,
125        };
126        // Initialize the Narwhal transmissions.
127        let transmissions = Arc::new(BFTPersistentStorage::open(storage_mode)?);
128        // Initialize the Narwhal storage.
129        let storage = NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::<N>::MAX_GC_ROUNDS as u64);
130        // Initialize the BFT.
131        let bft = BFT::new(account, storage, ledger.clone(), ip, trusted_validators, dev)?;
132        // Return the consensus.
133        Ok(Self {
134            ledger,
135            bft,
136            primary_sender: Default::default(),
137            solutions_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
138            transactions_queue: Default::default(),
139            seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
140            seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
141            #[cfg(feature = "metrics")]
142            transmissions_queue_timestamps: Default::default(),
143            handles: Default::default(),
144        })
145    }
146
147    /// Run the consensus instance.
148    pub async fn run(&mut self, primary_sender: PrimarySender<N>, primary_receiver: PrimaryReceiver<N>) -> Result<()> {
149        info!("Starting the consensus instance...");
150        // Set the primary sender.
151        self.primary_sender.set(primary_sender.clone()).expect("Primary sender already set");
152
153        // First, initialize the consensus channels.
154        let (consensus_sender, consensus_receiver) = init_consensus_channels();
155        // Then, start the consensus handlers.
156        self.start_handlers(consensus_receiver);
157        // Lastly, the consensus.
158        self.bft.run(Some(consensus_sender), primary_sender, primary_receiver).await?;
159        Ok(())
160    }
161
162    /// Returns the ledger.
163    pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
164        &self.ledger
165    }
166
167    /// Returns the BFT.
168    pub const fn bft(&self) -> &BFT<N> {
169        &self.bft
170    }
171
172    /// Returns the primary sender.
173    pub fn primary_sender(&self) -> &PrimarySender<N> {
174        self.primary_sender.get().expect("Primary sender not set")
175    }
176}
177
178impl<N: Network> Consensus<N> {
179    /// Returns the number of unconfirmed transmissions.
180    pub fn num_unconfirmed_transmissions(&self) -> usize {
181        self.bft.num_unconfirmed_transmissions()
182    }
183
184    /// Returns the number of unconfirmed ratifications.
185    pub fn num_unconfirmed_ratifications(&self) -> usize {
186        self.bft.num_unconfirmed_ratifications()
187    }
188
189    /// Returns the number of solutions.
190    pub fn num_unconfirmed_solutions(&self) -> usize {
191        self.bft.num_unconfirmed_solutions()
192    }
193
194    /// Returns the number of unconfirmed transactions.
195    pub fn num_unconfirmed_transactions(&self) -> usize {
196        self.bft.num_unconfirmed_transactions()
197    }
198}
199
200impl<N: Network> Consensus<N> {
201    /// Returns the unconfirmed transmission IDs.
202    pub fn unconfirmed_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
203        self.worker_transmission_ids().chain(self.inbound_transmission_ids())
204    }
205
206    /// Returns the unconfirmed transmissions.
207    pub fn unconfirmed_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
208        self.worker_transmissions().chain(self.inbound_transmissions())
209    }
210
211    /// Returns the unconfirmed solutions.
212    pub fn unconfirmed_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
213        self.worker_solutions().chain(self.inbound_solutions())
214    }
215
216    /// Returns the unconfirmed transactions.
217    pub fn unconfirmed_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
218        self.worker_transactions().chain(self.inbound_transactions())
219    }
220}
221
222impl<N: Network> Consensus<N> {
223    /// Returns the worker transmission IDs.
224    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
225        self.bft.worker_transmission_ids()
226    }
227
228    /// Returns the worker transmissions.
229    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
230        self.bft.worker_transmissions()
231    }
232
233    /// Returns the worker solutions.
234    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
235        self.bft.worker_solutions()
236    }
237
238    /// Returns the worker transactions.
239    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
240        self.bft.worker_transactions()
241    }
242}
243
244impl<N: Network> Consensus<N> {
245    /// Returns the transmission IDs in the inbound queue.
246    pub fn inbound_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
247        self.inbound_transmissions().map(|(id, _)| id)
248    }
249
250    /// Returns the transmissions in the inbound queue.
251    pub fn inbound_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
252        self.inbound_transactions()
253            .map(|(id, tx)| {
254                (
255                    TransmissionID::Transaction(id, tx.to_checksum::<N>().unwrap_or_default()),
256                    Transmission::Transaction(tx),
257                )
258            })
259            .chain(self.inbound_solutions().map(|(id, solution)| {
260                (
261                    TransmissionID::Solution(id, solution.to_checksum::<N>().unwrap_or_default()),
262                    Transmission::Solution(solution),
263                )
264            }))
265    }
266
267    /// Returns the solutions in the inbound queue.
268    pub fn inbound_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
269        // Return an iterator over the solutions in the inbound queue.
270        self.solutions_queue.lock().clone().into_iter().map(|(id, solution)| (id, Data::Object(solution)))
271    }
272
273    /// Returns the transactions in the inbound queue.
274    pub fn inbound_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
275        // Acquire the lock on the transactions queue.
276        let tx_queue = self.transactions_queue.lock();
277        // Return an iterator over the deployment and execution transactions in the inbound queue.
278        tx_queue
279            .deployments
280            .clone()
281            .into_iter()
282            .chain(tx_queue.executions.clone())
283            .map(|(id, tx)| (id, Data::Object(tx)))
284    }
285}
286
287impl<N: Network> Consensus<N> {
288    /// Adds the given unconfirmed solution to the memory pool.
289    pub async fn add_unconfirmed_solution(&self, solution: Solution<N>) -> Result<()> {
290        // Calculate the transmission checksum.
291        let checksum = Data::<Solution<N>>::Buffer(solution.to_bytes_le()?.into()).to_checksum::<N>()?;
292        #[cfg(feature = "metrics")]
293        {
294            metrics::increment_gauge(metrics::consensus::UNCONFIRMED_SOLUTIONS, 1f64);
295            let timestamp = snarkos_node_bft::helpers::now();
296            self.transmissions_queue_timestamps
297                .lock()
298                .insert(TransmissionID::Solution(solution.id(), checksum), timestamp);
299        }
300        // Queue the unconfirmed solution.
301        {
302            let solution_id = solution.id();
303
304            // Check if the transaction was recently seen.
305            if self.seen_solutions.lock().put(solution_id, ()).is_some() {
306                // If the transaction was recently seen, return early.
307                return Ok(());
308            }
309            // Check if the solution already exists in the ledger.
310            if self.ledger.contains_transmission(&TransmissionID::Solution(solution_id, checksum))? {
311                bail!("Solution '{}' exists in the ledger {}", fmt_id(solution_id), "(skipping)".dimmed());
312            }
313            // Add the solution to the memory pool.
314            trace!("Received unconfirmed solution '{}' in the queue", fmt_id(solution_id));
315            if self.solutions_queue.lock().put(solution_id, solution).is_some() {
316                bail!("Solution '{}' exists in the memory pool", fmt_id(solution_id));
317            }
318        }
319
320        // Try to process the unconfirmed solutions in the memory pool.
321        self.process_unconfirmed_solutions().await
322    }
323
324    /// Processes unconfirmed transactions in the memory pool.
325    pub async fn process_unconfirmed_solutions(&self) -> Result<()> {
326        // If the memory pool of this node is full, return early.
327        let num_unconfirmed_solutions = self.num_unconfirmed_solutions();
328        let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
329        if num_unconfirmed_solutions >= N::MAX_SOLUTIONS
330            || num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE
331        {
332            return Ok(());
333        }
334        // Retrieve the solutions.
335        let solutions = {
336            // Determine the available capacity.
337            let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions);
338            // Acquire the lock on the queue.
339            let mut queue = self.solutions_queue.lock();
340            // Determine the number of solutions to send.
341            let num_solutions = queue.len().min(capacity);
342            // Drain the solutions from the queue.
343            (0..num_solutions).filter_map(|_| queue.pop_lru().map(|(_, solution)| solution)).collect::<Vec<_>>()
344        };
345        // Iterate over the solutions.
346        for solution in solutions.into_iter() {
347            let solution_id = solution.id();
348            trace!("Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id));
349            // Send the unconfirmed solution to the primary.
350            if let Err(e) = self.primary_sender().send_unconfirmed_solution(solution_id, Data::Object(solution)).await {
351                // If the BFT is synced, then log the warning.
352                if self.bft.is_synced() {
353                    // If error occurs after the first 10 blocks of the epoch, log it as a warning, otherwise ignore.
354                    if self.ledger().latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
355                        warn!("Failed to add unconfirmed solution '{}' to the memory pool - {e}", fmt_id(solution_id))
356                    };
357                }
358            }
359        }
360        Ok(())
361    }
362
363    /// Adds the given unconfirmed transaction to the memory pool.
364    pub async fn add_unconfirmed_transaction(&self, transaction: Transaction<N>) -> Result<()> {
365        // Calculate the transmission checksum.
366        let checksum = Data::<Transaction<N>>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::<N>()?;
367        #[cfg(feature = "metrics")]
368        {
369            metrics::increment_gauge(metrics::consensus::UNCONFIRMED_TRANSACTIONS, 1f64);
370            let timestamp = snarkos_node_bft::helpers::now();
371            self.transmissions_queue_timestamps
372                .lock()
373                .insert(TransmissionID::Transaction(transaction.id(), checksum), timestamp);
374        }
375        // Queue the unconfirmed transaction.
376        {
377            let transaction_id = transaction.id();
378
379            // Check that the transaction is not a fee transaction.
380            if transaction.is_fee() {
381                bail!("Transaction '{}' is a fee transaction {}", fmt_id(transaction_id), "(skipping)".dimmed());
382            }
383            // Check if the transaction was recently seen.
384            if self.seen_transactions.lock().put(transaction_id, ()).is_some() {
385                // If the transaction was recently seen, return early.
386                return Ok(());
387            }
388            // Check if the transaction already exists in the ledger.
389            if self.ledger.contains_transmission(&TransmissionID::Transaction(transaction_id, checksum))? {
390                bail!("Transaction '{}' exists in the ledger {}", fmt_id(transaction_id), "(skipping)".dimmed());
391            }
392            // Add the transaction to the memory pool.
393            trace!("Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id));
394            if transaction.is_deploy() {
395                if self.transactions_queue.lock().deployments.put(transaction_id, transaction).is_some() {
396                    bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
397                }
398            } else if self.transactions_queue.lock().executions.put(transaction_id, transaction).is_some() {
399                bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
400            }
401
402            // Try to process the unconfirmed transactions in the memory pool.
403            self.process_unconfirmed_transactions().await
404        }
405    }
406
407    /// Processes unconfirmed transactions in the memory pool.
408    pub async fn process_unconfirmed_transactions(&self) -> Result<()> {
409        // If the memory pool of this node is full, return early.
410        let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
411        if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE {
412            return Ok(());
413        }
414        // Retrieve the transactions.
415        let transactions = {
416            // Determine the available capacity.
417            let capacity = Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions);
418            // Acquire the lock on the transactions queue.
419            let mut tx_queue = self.transactions_queue.lock();
420            // Determine the number of deployments to send.
421            let num_deployments = tx_queue.deployments.len().min(capacity).min(MAX_DEPLOYMENTS_PER_INTERVAL);
422            // Determine the number of executions to send.
423            let num_executions = tx_queue.executions.len().min(capacity.saturating_sub(num_deployments));
424            // Create an iterator which will select interleaved deployments and executions within the capacity.
425            // Note: interleaving ensures we will never have consecutive invalid deployments blocking the queue.
426            let selector_iter = (0..num_deployments).map(|_| true).interleave((0..num_executions).map(|_| false));
427            // Drain the transactions from the queue, interleaving deployments and executions.
428            selector_iter
429                .filter_map(|select_deployment| {
430                    if select_deployment {
431                        tx_queue.deployments.pop_lru().map(|(_, tx)| tx)
432                    } else {
433                        tx_queue.executions.pop_lru().map(|(_, tx)| tx)
434                    }
435                })
436                .collect_vec()
437        };
438        // Iterate over the transactions.
439        for transaction in transactions.into_iter() {
440            let transaction_id = transaction.id();
441            trace!("Adding unconfirmed transaction '{}' to the memory pool...", fmt_id(transaction_id));
442            // Send the unconfirmed transaction to the primary.
443            if let Err(e) =
444                self.primary_sender().send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await
445            {
446                // If the BFT is synced, then log the warning.
447                if self.bft.is_synced() {
448                    warn!(
449                        "Failed to add unconfirmed transaction '{}' to the memory pool - {e}",
450                        fmt_id(transaction_id)
451                    );
452                }
453            }
454        }
455        Ok(())
456    }
457}
458
459impl<N: Network> Consensus<N> {
460    /// Starts the consensus handlers.
461    fn start_handlers(&self, consensus_receiver: ConsensusReceiver<N>) {
462        let ConsensusReceiver { mut rx_consensus_subdag } = consensus_receiver;
463
464        // Process the committed subdag and transmissions from the BFT.
465        let self_ = self.clone();
466        self.spawn(async move {
467            while let Some((committed_subdag, transmissions, callback)) = rx_consensus_subdag.recv().await {
468                self_.process_bft_subdag(committed_subdag, transmissions, callback).await;
469            }
470        });
471
472        // Process the unconfirmed transactions in the memory pool.
473        let self_ = self.clone();
474        self.spawn(async move {
475            loop {
476                // Sleep briefly.
477                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
478                // Process the unconfirmed transactions in the memory pool.
479                if let Err(e) = self_.process_unconfirmed_transactions().await {
480                    warn!("Cannot process unconfirmed transactions - {e}");
481                }
482                // Process the unconfirmed solutions in the memory pool.
483                if let Err(e) = self_.process_unconfirmed_solutions().await {
484                    warn!("Cannot process unconfirmed solutions - {e}");
485                }
486            }
487        });
488    }
489
490    /// Processes the committed subdag and transmissions from the BFT.
491    async fn process_bft_subdag(
492        &self,
493        subdag: Subdag<N>,
494        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
495        callback: oneshot::Sender<Result<()>>,
496    ) {
497        // Try to advance to the next block.
498        let self_ = self.clone();
499        let transmissions_ = transmissions.clone();
500        let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_) };
501
502        // If the block failed to advance, reinsert the transmissions into the memory pool.
503        if let Err(e) = &result {
504            error!("Unable to advance to the next block - {e}");
505            // On failure, reinsert the transmissions into the memory pool.
506            self.reinsert_transmissions(transmissions).await;
507        }
508        // Send the callback **after** advancing to the next block.
509        // Note: We must await the block to be advanced before sending the callback.
510        callback.send(result).ok();
511    }
512
513    /// Attempts to advance to the next block.
514    fn try_advance_to_next_block(
515        &self,
516        subdag: Subdag<N>,
517        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
518    ) -> Result<()> {
519        #[cfg(feature = "metrics")]
520        let start = subdag.leader_certificate().batch_header().timestamp();
521        #[cfg(feature = "metrics")]
522        let num_committed_certificates = subdag.values().map(|c| c.len()).sum::<usize>();
523        #[cfg(feature = "metrics")]
524        let current_block_timestamp = self.ledger.latest_block().header().metadata().timestamp();
525
526        // Create the candidate next block.
527        let next_block = self.ledger.prepare_advance_to_next_quorum_block(subdag, transmissions)?;
528        // Check that the block is well-formed.
529        self.ledger.check_next_block(&next_block)?;
530        // Advance to the next block.
531        self.ledger.advance_to_next_block(&next_block)?;
532
533        // If the next block starts a new epoch, clear the existing solutions.
534        if next_block.height() % N::NUM_BLOCKS_PER_EPOCH == 0 {
535            // Clear the solutions queue.
536            self.solutions_queue.lock().clear();
537            // Clear the worker solutions.
538            self.bft.primary().clear_worker_solutions();
539        }
540
541        #[cfg(feature = "metrics")]
542        {
543            let elapsed = std::time::Duration::from_secs((snarkos_node_bft::helpers::now() - start) as u64);
544            let next_block_timestamp = next_block.header().metadata().timestamp();
545            let block_latency = next_block_timestamp - current_block_timestamp;
546            let proof_target = next_block.header().proof_target();
547            let coinbase_target = next_block.header().coinbase_target();
548            let cumulative_proof_target = next_block.header().cumulative_proof_target();
549
550            metrics::add_transmission_latency_metric(&self.transmissions_queue_timestamps, &next_block);
551
552            metrics::gauge(metrics::consensus::COMMITTED_CERTIFICATES, num_committed_certificates as f64);
553            metrics::histogram(metrics::consensus::CERTIFICATE_COMMIT_LATENCY, elapsed.as_secs_f64());
554            metrics::histogram(metrics::consensus::BLOCK_LATENCY, block_latency as f64);
555            metrics::gauge(metrics::blocks::PROOF_TARGET, proof_target as f64);
556            metrics::gauge(metrics::blocks::COINBASE_TARGET, coinbase_target as f64);
557            metrics::gauge(metrics::blocks::CUMULATIVE_PROOF_TARGET, cumulative_proof_target as f64);
558        }
559        Ok(())
560    }
561
562    /// Reinserts the given transmissions into the memory pool.
563    async fn reinsert_transmissions(&self, transmissions: IndexMap<TransmissionID<N>, Transmission<N>>) {
564        // Iterate over the transmissions.
565        for (transmission_id, transmission) in transmissions.into_iter() {
566            // Reinsert the transmission into the memory pool.
567            if let Err(e) = self.reinsert_transmission(transmission_id, transmission).await {
568                warn!(
569                    "Unable to reinsert transmission {}.{} into the memory pool - {e}",
570                    fmt_id(transmission_id),
571                    fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
572                );
573            }
574        }
575    }
576
577    /// Reinserts the given transmission into the memory pool.
578    async fn reinsert_transmission(
579        &self,
580        transmission_id: TransmissionID<N>,
581        transmission: Transmission<N>,
582    ) -> Result<()> {
583        // Initialize a callback sender and receiver.
584        let (callback, callback_receiver) = oneshot::channel();
585        // Send the transmission to the primary.
586        match (transmission_id, transmission) {
587            (TransmissionID::Ratification, Transmission::Ratification) => return Ok(()),
588            (TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => {
589                // Send the solution to the primary.
590                self.primary_sender().tx_unconfirmed_solution.send((solution_id, solution, callback)).await?;
591            }
592            (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) => {
593                // Send the transaction to the primary.
594                self.primary_sender().tx_unconfirmed_transaction.send((transaction_id, transaction, callback)).await?;
595            }
596            _ => bail!("Mismatching `(transmission_id, transmission)` pair in consensus"),
597        }
598        // Await the callback.
599        callback_receiver.await?
600    }
601
602    /// Spawns a task with the given future; it should only be used for long-running tasks.
603    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
604        self.handles.lock().push(tokio::spawn(future));
605    }
606
607    /// Shuts down the BFT.
608    pub async fn shut_down(&self) {
609        info!("Shutting down consensus...");
610        // Shut down the BFT.
611        self.bft.shut_down().await;
612        // Abort the tasks.
613        self.handles.lock().iter().for_each(|handle| handle.abort());
614    }
615}