snarkos_node_consensus/
lib.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![forbid(unsafe_code)]
17
18mod transactions_queue;
19use transactions_queue::TransactionsQueue;
20
21#[macro_use]
22extern crate tracing;
23
24#[cfg(feature = "metrics")]
25extern crate snarkos_node_metrics as metrics;
26
27use snarkos_account::Account;
28use snarkos_node_bft::{
29    BFT,
30    MAX_BATCH_DELAY_IN_MS,
31    Primary,
32    helpers::{
33        ConsensusReceiver,
34        PrimarySender,
35        Storage as NarwhalStorage,
36        fmt_id,
37        init_consensus_channels,
38        init_primary_channels,
39    },
40    spawn_blocking,
41};
42use snarkos_node_bft_ledger_service::LedgerService;
43use snarkos_node_bft_storage_service::BFTPersistentStorage;
44use snarkos_node_sync::{BlockSync, Ping};
45
46use snarkvm::{
47    ledger::{
48        block::Transaction,
49        narwhal::{BatchHeader, Data, Subdag, Transmission, TransmissionID},
50        puzzle::{Solution, SolutionID},
51    },
52    prelude::*,
53};
54
55use aleo_std::StorageMode;
56use anyhow::Result;
57use colored::Colorize;
58use indexmap::IndexMap;
59#[cfg(feature = "locktick")]
60use locktick::parking_lot::{Mutex, RwLock};
61use lru::LruCache;
62#[cfg(not(feature = "locktick"))]
63use parking_lot::{Mutex, RwLock};
64use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc, time::Duration};
65use tokio::{sync::oneshot, task::JoinHandle};
66
67#[cfg(feature = "metrics")]
68use std::collections::HashMap;
69
70/// The capacity of the queue reserved for deployments.
71/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
72const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
73/// The capacity of the queue reserved for executions.
74/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
75const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
76/// The capacity of the queue reserved for solutions.
77/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
78const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
79/// The **suggested** maximum number of deployments in each interval.
80/// Note: This is an inbound queue limit, not a Narwhal-enforced limit.
81const MAX_DEPLOYMENTS_PER_INTERVAL: usize = 1;
82
83/// Wrapper around `BFT` that adds additional functionality, such as a mempool.
84///
85/// Consensus acts as a rate limiter to prevents workers in BFT from being overloaded.
86/// Each worker maintains a ready queue (which is essentially also a mempool), but verifies transactions/solutions
87/// before enquing them.
88/// Consensus only passes more transactions/solutions to the BFT layer if its ready queues are not already full.
89#[derive(Clone)]
90pub struct Consensus<N: Network> {
91    /// The ledger.
92    ledger: Arc<dyn LedgerService<N>>,
93    /// The BFT.
94    bft: BFT<N>,
95    /// The primary sender.
96    primary_sender: PrimarySender<N>,
97    /// The unconfirmed solutions queue.
98    solutions_queue: Arc<Mutex<LruCache<SolutionID<N>, Solution<N>>>>,
99    /// The unconfirmed transactions queue.
100    transactions_queue: Arc<RwLock<TransactionsQueue<N>>>,
101    /// The recently-seen unconfirmed solutions.
102    seen_solutions: Arc<Mutex<LruCache<SolutionID<N>, ()>>>,
103    /// The recently-seen unconfirmed transactions.
104    seen_transactions: Arc<Mutex<LruCache<N::TransactionID, ()>>>,
105    #[cfg(feature = "metrics")]
106    transmissions_tracker: Arc<Mutex<HashMap<TransmissionID<N>, i64>>>,
107    /// The spawned handles.
108    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
109    /// The ping logic.
110    ping: Arc<Ping<N>>,
111    /// The block sync logic.
112    block_sync: Arc<BlockSync<N>>,
113}
114
115impl<N: Network> Consensus<N> {
116    /// Initializes a new instance of consensus and spawn its background tasks.
117    #[allow(clippy::too_many_arguments)]
118    pub async fn new(
119        account: Account<N>,
120        ledger: Arc<dyn LedgerService<N>>,
121        block_sync: Arc<BlockSync<N>>,
122        ip: Option<SocketAddr>,
123        trusted_validators: &[SocketAddr],
124        trusted_peers_only: bool,
125        storage_mode: StorageMode,
126        ping: Arc<Ping<N>>,
127        dev: Option<u16>,
128    ) -> Result<Self> {
129        // Initialize the primary channels.
130        let (primary_sender, primary_receiver) = init_primary_channels::<N>();
131        // Initialize the Narwhal transmissions.
132        let transmissions = Arc::new(BFTPersistentStorage::open(storage_mode.clone())?);
133        // Initialize the Narwhal storage.
134        let storage = NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::<N>::MAX_GC_ROUNDS as u64);
135        // Initialize the BFT.
136        let bft = BFT::new(
137            account,
138            storage,
139            ledger.clone(),
140            block_sync.clone(),
141            ip,
142            trusted_validators,
143            trusted_peers_only,
144            storage_mode,
145            dev,
146        )?;
147        // Create a new instance of Consensus.
148        let mut _self = Self {
149            ledger,
150            bft,
151            block_sync,
152            primary_sender,
153            solutions_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
154            transactions_queue: Default::default(),
155            seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
156            seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
157            #[cfg(feature = "metrics")]
158            transmissions_tracker: Default::default(),
159            handles: Default::default(),
160            ping: ping.clone(),
161        };
162
163        info!("Starting the consensus instance...");
164
165        // First, initialize the consensus channels.
166        let (consensus_sender, consensus_receiver) = init_consensus_channels();
167        // Then, start the consensus handlers.
168        _self.start_handlers(consensus_receiver);
169        // Lastly, also start BFTs handlers.
170        _self.bft.run(Some(ping), Some(consensus_sender), _self.primary_sender.clone(), primary_receiver).await?;
171
172        Ok(_self)
173    }
174
175    /// Returns the underlying `BFT` struct.
176    pub const fn bft(&self) -> &BFT<N> {
177        &self.bft
178    }
179
180    pub fn contains_transaction(&self, transaction_id: &N::TransactionID) -> bool {
181        self.transactions_queue.read().contains(transaction_id)
182    }
183}
184
185impl<N: Network> Consensus<N> {
186    /// Returns the number of unconfirmed transmissions in the BFT's workers (not in the mempool).
187    pub fn num_unconfirmed_transmissions(&self) -> usize {
188        self.bft.num_unconfirmed_transmissions()
189    }
190
191    /// Returns the number of unconfirmed ratifications in the BFT's workers (not in the mempool).
192    pub fn num_unconfirmed_ratifications(&self) -> usize {
193        self.bft.num_unconfirmed_ratifications()
194    }
195
196    /// Returns the number unconfirmed solutions in the BFT's workers (not in the mempool).
197    pub fn num_unconfirmed_solutions(&self) -> usize {
198        self.bft.num_unconfirmed_solutions()
199    }
200
201    /// Returns the number of unconfirmed transactions.
202    pub fn num_unconfirmed_transactions(&self) -> usize {
203        self.bft.num_unconfirmed_transactions()
204    }
205}
206
207impl<N: Network> Consensus<N> {
208    /// Returns the unconfirmed transmission IDs.
209    pub fn unconfirmed_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
210        self.worker_transmission_ids().chain(self.inbound_transmission_ids())
211    }
212
213    /// Returns the unconfirmed transmissions.
214    pub fn unconfirmed_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
215        self.worker_transmissions().chain(self.inbound_transmissions())
216    }
217
218    /// Returns the unconfirmed solutions.
219    pub fn unconfirmed_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
220        self.worker_solutions().chain(self.inbound_solutions())
221    }
222
223    /// Returns the unconfirmed transactions.
224    pub fn unconfirmed_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
225        self.worker_transactions().chain(self.inbound_transactions())
226    }
227}
228
229impl<N: Network> Consensus<N> {
230    /// Returns the worker transmission IDs.
231    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
232        self.bft.worker_transmission_ids()
233    }
234
235    /// Returns the worker transmissions.
236    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
237        self.bft.worker_transmissions()
238    }
239
240    /// Returns the worker solutions.
241    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
242        self.bft.worker_solutions()
243    }
244
245    /// Returns the worker transactions.
246    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
247        self.bft.worker_transactions()
248    }
249}
250
251impl<N: Network> Consensus<N> {
252    /// Returns the transmission IDs in the inbound queue.
253    pub fn inbound_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
254        self.inbound_transmissions().map(|(id, _)| id)
255    }
256
257    /// Returns the transmissions in the inbound queue.
258    pub fn inbound_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
259        self.inbound_transactions()
260            .map(|(id, tx)| {
261                (
262                    TransmissionID::Transaction(id, tx.to_checksum::<N>().unwrap_or_default()),
263                    Transmission::Transaction(tx),
264                )
265            })
266            .chain(self.inbound_solutions().map(|(id, solution)| {
267                (
268                    TransmissionID::Solution(id, solution.to_checksum::<N>().unwrap_or_default()),
269                    Transmission::Solution(solution),
270                )
271            }))
272    }
273
274    /// Returns the solutions in the inbound queue.
275    pub fn inbound_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
276        // Return an iterator over the solutions in the inbound queue.
277        self.solutions_queue.lock().clone().into_iter().map(|(id, solution)| (id, Data::Object(solution)))
278    }
279
280    /// Returns the transactions in the inbound queue.
281    pub fn inbound_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
282        // Return an iterator over the deployment and execution transactions in the inbound queue.
283        self.transactions_queue.read().transactions().map(|(id, tx)| (id, Data::Object(tx)))
284    }
285}
286
287impl<N: Network> Consensus<N> {
288    /// Adds the given unconfirmed solution to the memory pool, which will then eventually be passed
289    /// to the BFT layer for inclusion in a batch.
290    pub async fn add_unconfirmed_solution(&self, solution: Solution<N>) -> Result<()> {
291        // Calculate the transmission checksum.
292        let checksum = Data::<Solution<N>>::Buffer(solution.to_bytes_le()?.into()).to_checksum::<N>()?;
293        // Queue the unconfirmed solution.
294        {
295            let solution_id = solution.id();
296
297            // Check if the transaction was recently seen.
298            if self.seen_solutions.lock().put(solution_id, ()).is_some() {
299                // If the transaction was recently seen, return early.
300                return Ok(());
301            }
302            // Check if the solution already exists in the ledger.
303            if self.ledger.contains_transmission(&TransmissionID::Solution(solution_id, checksum))? {
304                bail!("Solution '{}' exists in the ledger {}", fmt_id(solution_id), "(skipping)".dimmed());
305            }
306            #[cfg(feature = "metrics")]
307            {
308                metrics::increment_gauge(metrics::consensus::UNCONFIRMED_SOLUTIONS, 1f64);
309                let timestamp = snarkos_node_bft::helpers::now();
310                self.transmissions_tracker.lock().insert(TransmissionID::Solution(solution.id(), checksum), timestamp);
311            }
312            // Add the solution to the memory pool.
313            trace!("Received unconfirmed solution '{}' in the queue", fmt_id(solution_id));
314            if self.solutions_queue.lock().put(solution_id, solution).is_some() {
315                bail!("Solution '{}' exists in the memory pool", fmt_id(solution_id));
316            }
317        }
318
319        // Try to process the unconfirmed solutions in the memory pool.
320        self.process_unconfirmed_solutions().await
321    }
322
323    /// Processes unconfirmed solutions in the mempool, and passes them to the BFT layer
324    /// (if sufficient space is available).
325    async fn process_unconfirmed_solutions(&self) -> Result<()> {
326        // If the memory pool of this node is full, return early.
327        let num_unconfirmed_solutions = self.num_unconfirmed_solutions();
328        let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
329        if num_unconfirmed_solutions >= N::MAX_SOLUTIONS
330            || num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE
331        {
332            return Ok(());
333        }
334        // Retrieve the solutions.
335        let solutions = {
336            // Determine the available capacity.
337            let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions);
338            // Acquire the lock on the queue.
339            let mut queue = self.solutions_queue.lock();
340            // Determine the number of solutions to send.
341            let num_solutions = queue.len().min(capacity);
342            // Drain the solutions from the queue.
343            (0..num_solutions).filter_map(|_| queue.pop_lru().map(|(_, solution)| solution)).collect::<Vec<_>>()
344        };
345        // Iterate over the solutions.
346        for solution in solutions.into_iter() {
347            let solution_id = solution.id();
348            trace!("Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id));
349            // Send the unconfirmed solution to the primary.
350            if let Err(e) = self.primary_sender.send_unconfirmed_solution(solution_id, Data::Object(solution)).await {
351                // If the BFT is synced, then log the warning.
352                if self.bft.is_synced() {
353                    // If error occurs after the first 10 blocks of the epoch, log it as a warning, otherwise ignore.
354                    if self.ledger.latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
355                        warn!("Failed to add unconfirmed solution '{}' to the memory pool - {e}", fmt_id(solution_id))
356                    };
357                }
358            }
359        }
360        Ok(())
361    }
362
363    /// Adds the given unconfirmed transaction to the memory pool, which will then eventually be passed
364    /// to the BFT layer for inclusion in a batch.
365    pub async fn add_unconfirmed_transaction(&self, transaction: Transaction<N>) -> Result<()> {
366        // Calculate the transmission checksum.
367        let checksum = Data::<Transaction<N>>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::<N>()?;
368        // Queue the unconfirmed transaction.
369        {
370            let transaction_id = transaction.id();
371
372            // Check that the transaction is not a fee transaction.
373            if transaction.is_fee() {
374                bail!("Transaction '{}' is a fee transaction {}", fmt_id(transaction_id), "(skipping)".dimmed());
375            }
376            // Check if the transaction was recently seen.
377            if self.seen_transactions.lock().put(transaction_id, ()).is_some() {
378                // If the transaction was recently seen, return early.
379                return Ok(());
380            }
381            // Check if the transaction already exists in the ledger.
382            if self.ledger.contains_transmission(&TransmissionID::Transaction(transaction_id, checksum))? {
383                bail!("Transaction '{}' exists in the ledger {}", fmt_id(transaction_id), "(skipping)".dimmed());
384            }
385            // Check that the transaction is not in the mempool.
386            if self.contains_transaction(&transaction_id) {
387                bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
388            }
389            #[cfg(feature = "metrics")]
390            {
391                metrics::increment_gauge(metrics::consensus::UNCONFIRMED_TRANSACTIONS, 1f64);
392                let timestamp = snarkos_node_bft::helpers::now();
393                self.transmissions_tracker
394                    .lock()
395                    .insert(TransmissionID::Transaction(transaction.id(), checksum), timestamp);
396            }
397            // Add the transaction to the memory pool.
398            trace!("Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id));
399            let priority_fee = transaction.priority_fee_amount()?;
400            self.transactions_queue.write().insert(transaction_id, transaction, priority_fee)?;
401        }
402
403        // Try to process the unconfirmed transactions in the memory pool.
404        self.process_unconfirmed_transactions().await
405    }
406
407    /// Processes unconfirmed transactions in the mempool, and passes them to the BFT layer
408    /// (if sufficient space is available).
409    async fn process_unconfirmed_transactions(&self) -> Result<()> {
410        // If the memory pool of this node is full, return early.
411        let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
412        if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE {
413            return Ok(());
414        }
415        // Retrieve the transactions.
416        let transactions = {
417            // Determine the available capacity.
418            let capacity = Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions);
419            // Acquire the lock on the transactions queue.
420            let mut tx_queue = self.transactions_queue.write();
421            // Determine the number of deployments to send.
422            let num_deployments = tx_queue.deployments.len().min(capacity).min(MAX_DEPLOYMENTS_PER_INTERVAL);
423            // Determine the number of executions to send.
424            let num_executions = tx_queue.executions.len().min(capacity.saturating_sub(num_deployments));
425            // Create an iterator which will select interleaved deployments and executions within the capacity.
426            // Note: interleaving ensures we will never have consecutive invalid deployments blocking the queue.
427            let selector_iter = (0..num_deployments).map(|_| true).interleave((0..num_executions).map(|_| false));
428            // Drain the transactions from the queue, interleaving deployments and executions.
429            selector_iter
430                .filter_map(
431                    |select_deployment| {
432                        if select_deployment { tx_queue.deployments.pop() } else { tx_queue.executions.pop() }
433                    },
434                )
435                .map(|(_, tx)| tx)
436                .collect_vec()
437        };
438        // Iterate over the transactions.
439        for transaction in transactions.into_iter() {
440            let transaction_id = transaction.id();
441            // Determine the type of the transaction. The fee type is technically not possible here.
442            let tx_type_str = match transaction {
443                Transaction::Deploy(..) => "deployment",
444                Transaction::Execute(..) => "execution",
445                Transaction::Fee(..) => "fee",
446            };
447            trace!("Adding unconfirmed {tx_type_str} transaction '{}' to the memory pool...", fmt_id(transaction_id));
448            // Send the unconfirmed transaction to the primary.
449            if let Err(e) =
450                self.primary_sender.send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await
451            {
452                // If the BFT is synced, then log the warning.
453                if self.bft.is_synced() {
454                    warn!(
455                        "Failed to add unconfirmed {tx_type_str} transaction '{}' to the memory pool - {e}",
456                        fmt_id(transaction_id)
457                    );
458                }
459            }
460        }
461        Ok(())
462    }
463}
464
465impl<N: Network> Consensus<N> {
466    /// Starts the consensus handlers.
467    ///
468    /// This is only invoked once, in the constructor.
469    fn start_handlers(&self, consensus_receiver: ConsensusReceiver<N>) {
470        let ConsensusReceiver { mut rx_consensus_subdag } = consensus_receiver;
471
472        // Process the committed subdag and transmissions from the BFT.
473        let self_ = self.clone();
474        self.spawn(async move {
475            while let Some((committed_subdag, transmissions, callback)) = rx_consensus_subdag.recv().await {
476                self_.process_bft_subdag(committed_subdag, transmissions, callback).await;
477            }
478        });
479
480        // Process the unconfirmed transactions in the memory pool.
481        //
482        // TODO (kaimast): This shouldn't happen periodically but only when new batches/blocks are accepted
483        // by the BFT layer, after which the worker's ready queue may have capacity for more transactions/solutions.
484        let self_ = self.clone();
485        self.spawn(async move {
486            loop {
487                // Sleep briefly.
488                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
489                // Process the unconfirmed transactions in the memory pool.
490                if let Err(e) = self_.process_unconfirmed_transactions().await {
491                    warn!("Cannot process unconfirmed transactions - {e}");
492                }
493                // Process the unconfirmed solutions in the memory pool.
494                if let Err(e) = self_.process_unconfirmed_solutions().await {
495                    warn!("Cannot process unconfirmed solutions - {e}");
496                }
497            }
498        });
499    }
500
501    /// Attempts to build a new block from the given subDAG, and (tries to) advance the legder to it.
502    async fn process_bft_subdag(
503        &self,
504        subdag: Subdag<N>,
505        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
506        callback: oneshot::Sender<Result<()>>,
507    ) {
508        // Try to advance to the next block.
509        let self_ = self.clone();
510        let transmissions_ = transmissions.clone();
511        let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_) };
512
513        // If the block failed to advance, reinsert the transmissions into the memory pool.
514        if let Err(e) = &result {
515            error!("Unable to advance to the next block - {e}");
516            // On failure, reinsert the transmissions into the memory pool.
517            self.reinsert_transmissions(transmissions).await;
518        }
519        // Send the callback **after** advancing to the next block.
520        // Note: We must await the block to be advanced before sending the callback.
521        callback.send(result).ok();
522    }
523
524    /// Attempts to advance the ledger to the next block, and updates the metrics (if enabled) accordingly.
525    fn try_advance_to_next_block(
526        &self,
527        subdag: Subdag<N>,
528        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
529    ) -> Result<()> {
530        #[cfg(feature = "metrics")]
531        let start = subdag.leader_certificate().batch_header().timestamp();
532        #[cfg(feature = "metrics")]
533        let num_committed_certificates = subdag.values().map(|c| c.len()).sum::<usize>();
534        #[cfg(feature = "metrics")]
535        let current_block_timestamp = self.ledger.latest_block().header().metadata().timestamp();
536
537        // Create the candidate next block.
538        let next_block = self.ledger.prepare_advance_to_next_quorum_block(subdag, transmissions)?;
539        // Check that the block is well-formed.
540        self.ledger.check_next_block(&next_block)?;
541        // Advance to the next block.
542        self.ledger.advance_to_next_block(&next_block)?;
543        #[cfg(feature = "telemetry")]
544        // Fetch the latest committee
545        let latest_committee = self.ledger.current_committee()?;
546
547        // If the next block starts a new epoch, clear the existing solutions.
548        if next_block.height() % N::NUM_BLOCKS_PER_EPOCH == 0 {
549            // Clear the solutions queue.
550            self.solutions_queue.lock().clear();
551            // Clear the worker solutions.
552            self.bft.primary().clear_worker_solutions();
553        }
554
555        // Notify peers that we have a new block.
556        let locators = self.block_sync.get_block_locators()?;
557        self.ping.update_block_locators(locators);
558
559        // Make block sync aware of the new block.
560        self.block_sync.set_sync_height(next_block.height());
561
562        // TODO(kaimast): This should also remove any transmissions/solutions contained in the block from the mempool.
563        // Removal currently happens when Consensus eventually passes them to the worker, which then just discards them.
564
565        #[cfg(feature = "metrics")]
566        {
567            let now_utc = snarkos_node_bft::helpers::now_utc();
568            let elapsed = std::time::Duration::from_secs((now_utc.unix_timestamp() - start) as u64);
569            let next_block_timestamp = next_block.header().metadata().timestamp();
570            let next_block_utc = snarkos_node_bft::helpers::to_utc_datetime(next_block_timestamp);
571            let block_latency = next_block_timestamp - current_block_timestamp;
572            let block_lag = (now_utc - next_block_utc).whole_milliseconds();
573
574            let proof_target = next_block.header().proof_target();
575            let coinbase_target = next_block.header().coinbase_target();
576            let cumulative_proof_target = next_block.header().cumulative_proof_target();
577
578            // Calculate latency for all transmissions included in this block.
579            metrics::add_transmission_latency_metric(&self.transmissions_tracker, &next_block);
580
581            metrics::gauge(metrics::consensus::COMMITTED_CERTIFICATES, num_committed_certificates as f64);
582            metrics::histogram(metrics::consensus::CERTIFICATE_COMMIT_LATENCY, elapsed.as_secs_f64());
583            metrics::histogram(metrics::consensus::BLOCK_LATENCY, block_latency as f64);
584            metrics::histogram(metrics::consensus::BLOCK_LAG, block_lag as f64);
585            metrics::gauge(metrics::blocks::PROOF_TARGET, proof_target as f64);
586            metrics::gauge(metrics::blocks::COINBASE_TARGET, coinbase_target as f64);
587            metrics::gauge(metrics::blocks::CUMULATIVE_PROOF_TARGET, cumulative_proof_target as f64);
588
589            #[cfg(feature = "telemetry")]
590            {
591                // Retrieve the latest participation scores.
592                let participation_scores =
593                    self.bft().primary().gateway().validator_telemetry().get_participation_scores(&latest_committee);
594
595                // Log the participation scores.
596                for (address, participation_score) in participation_scores {
597                    metrics::histogram_label(
598                        metrics::consensus::VALIDATOR_PARTICIPATION,
599                        "validator_address",
600                        address.to_string(),
601                        participation_score,
602                    )
603                }
604            }
605        }
606        Ok(())
607    }
608
609    /// Reinserts the given transmissions into the memory pool.
610    async fn reinsert_transmissions(&self, transmissions: IndexMap<TransmissionID<N>, Transmission<N>>) {
611        // Iterate over the transmissions.
612        for (transmission_id, transmission) in transmissions.into_iter() {
613            // Reinsert the transmission into the memory pool.
614            if let Err(e) = self.reinsert_transmission(transmission_id, transmission).await {
615                warn!(
616                    "Unable to reinsert transmission {}.{} into the memory pool - {e}",
617                    fmt_id(transmission_id),
618                    fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
619                );
620            }
621        }
622    }
623
624    /// Reinserts the given transmission into the memory pool.
625    async fn reinsert_transmission(
626        &self,
627        transmission_id: TransmissionID<N>,
628        transmission: Transmission<N>,
629    ) -> Result<()> {
630        // Initialize a callback sender and receiver.
631        let (callback, callback_receiver) = oneshot::channel();
632        // Send the transmission to the primary.
633        match (transmission_id, transmission) {
634            (TransmissionID::Ratification, Transmission::Ratification) => return Ok(()),
635            (TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => {
636                // Send the solution to the primary.
637                self.primary_sender.tx_unconfirmed_solution.send((solution_id, solution, callback)).await?;
638            }
639            (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) => {
640                // Send the transaction to the primary.
641                self.primary_sender.tx_unconfirmed_transaction.send((transaction_id, transaction, callback)).await?;
642            }
643            _ => bail!("Mismatching `(transmission_id, transmission)` pair in consensus"),
644        }
645        // Await the callback.
646        callback_receiver.await?
647    }
648
649    /// Spawns a task with the given future; it should only be used for long-running tasks.
650    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
651        self.handles.lock().push(tokio::spawn(future));
652    }
653
654    /// Shuts down the consensus and BFT layers.
655    pub async fn shut_down(&self) {
656        info!("Shutting down consensus...");
657        // Shut down the BFT.
658        self.bft.shut_down().await;
659        // Abort the tasks.
660        self.handles.lock().iter().for_each(|handle| handle.abort());
661    }
662}