snarkos_node_consensus/
lib.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate tracing;
20
21use snarkos_account::Account;
22use snarkos_node_bft::{
23    BFT,
24    MAX_BATCH_DELAY_IN_MS,
25    Primary,
26    helpers::{
27        ConsensusReceiver,
28        PrimarySender,
29        Storage as NarwhalStorage,
30        fmt_id,
31        init_consensus_channels,
32        init_primary_channels,
33    },
34    spawn_blocking,
35};
36use snarkos_node_bft_ledger_service::LedgerService;
37use snarkos_node_bft_storage_service::BFTPersistentStorage;
38use snarkos_node_sync::{BlockSync, Ping};
39
40use snarkvm::{
41    ledger::{
42        block::Transaction,
43        narwhal::{BatchHeader, Data, Subdag, Transmission, TransmissionID},
44        puzzle::{Solution, SolutionID},
45    },
46    prelude::*,
47};
48
49use aleo_std::StorageMode;
50use anyhow::Result;
51use colored::Colorize;
52use indexmap::IndexMap;
53#[cfg(feature = "locktick")]
54use locktick::parking_lot::Mutex;
55use lru::LruCache;
56#[cfg(not(feature = "locktick"))]
57use parking_lot::Mutex;
58use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc, time::Duration};
59use tokio::{sync::oneshot, task::JoinHandle};
60
61#[cfg(feature = "metrics")]
62use std::collections::HashMap;
63
64/// The capacity of the queue reserved for deployments.
65/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
66const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
67/// The capacity of the queue reserved for executions.
68/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
69const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
70/// The capacity of the queue reserved for solutions.
71/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
72const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
73/// The **suggested** maximum number of deployments in each interval.
74/// Note: This is an inbound queue limit, not a Narwhal-enforced limit.
75const MAX_DEPLOYMENTS_PER_INTERVAL: usize = 1;
76
77/// Helper struct to track incoming transactions.
78struct TransactionsQueue<N: Network> {
79    pub deployments: LruCache<N::TransactionID, Transaction<N>>,
80    pub executions: LruCache<N::TransactionID, Transaction<N>>,
81}
82
83impl<N: Network> Default for TransactionsQueue<N> {
84    fn default() -> Self {
85        Self {
86            deployments: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()),
87            executions: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()),
88        }
89    }
90}
91
92/// Wrapper around `BFT` that adds additional functionality, such as a mempool.
93///
94/// Consensus acts as a rate limiter to prevents workers in BFT from being overloaded.
95/// Each worker maintains a ready queue (which is essentially also a mempool), but verifies transactions/solutions
96/// before enquing them.
97/// Consensus only passes more transactions/solutions to the BFT layer if its ready queues are not already full.
98#[derive(Clone)]
99pub struct Consensus<N: Network> {
100    /// The ledger.
101    ledger: Arc<dyn LedgerService<N>>,
102    /// The BFT.
103    bft: BFT<N>,
104    /// The primary sender.
105    primary_sender: PrimarySender<N>,
106    /// The unconfirmed solutions queue.
107    solutions_queue: Arc<Mutex<LruCache<SolutionID<N>, Solution<N>>>>,
108    /// The unconfirmed transactions queue.
109    transactions_queue: Arc<Mutex<TransactionsQueue<N>>>,
110    /// The recently-seen unconfirmed solutions.
111    seen_solutions: Arc<Mutex<LruCache<SolutionID<N>, ()>>>,
112    /// The recently-seen unconfirmed transactions.
113    seen_transactions: Arc<Mutex<LruCache<N::TransactionID, ()>>>,
114    #[cfg(feature = "metrics")]
115    transmissions_tracker: Arc<Mutex<HashMap<TransmissionID<N>, i64>>>,
116    /// The spawned handles.
117    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
118    /// The ping logic.
119    ping: Arc<Ping<N>>,
120    /// The block sync logic.
121    block_sync: Arc<BlockSync<N>>,
122}
123
124impl<N: Network> Consensus<N> {
125    /// Initializes a new instance of consensus and spawn its background tasks.
126    pub async fn new(
127        account: Account<N>,
128        ledger: Arc<dyn LedgerService<N>>,
129        block_sync: Arc<BlockSync<N>>,
130        ip: Option<SocketAddr>,
131        trusted_validators: &[SocketAddr],
132        storage_mode: StorageMode,
133        ping: Arc<Ping<N>>,
134    ) -> Result<Self> {
135        // Initialize the primary channels.
136        let (primary_sender, primary_receiver) = init_primary_channels::<N>();
137        // Initialize the Narwhal transmissions.
138        let transmissions = Arc::new(BFTPersistentStorage::open(storage_mode.clone())?);
139        // Initialize the Narwhal storage.
140        let storage = NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::<N>::MAX_GC_ROUNDS as u64);
141        // Initialize the BFT.
142        let bft = BFT::new(account, storage, ledger.clone(), block_sync.clone(), ip, trusted_validators, storage_mode)?;
143        // Create a new instance of Consensus.
144        let mut _self = Self {
145            ledger,
146            bft,
147            block_sync,
148            primary_sender,
149            solutions_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
150            transactions_queue: Default::default(),
151            seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
152            seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
153            #[cfg(feature = "metrics")]
154            transmissions_tracker: Default::default(),
155            handles: Default::default(),
156            ping: ping.clone(),
157        };
158
159        info!("Starting the consensus instance...");
160
161        // First, initialize the consensus channels.
162        let (consensus_sender, consensus_receiver) = init_consensus_channels();
163        // Then, start the consensus handlers.
164        _self.start_handlers(consensus_receiver);
165        // Lastly, also start BFTs handlers.
166        _self.bft.run(Some(ping), Some(consensus_sender), _self.primary_sender.clone(), primary_receiver).await?;
167
168        Ok(_self)
169    }
170
171    /// Returns the underlying `BFT` struct.
172    pub const fn bft(&self) -> &BFT<N> {
173        &self.bft
174    }
175}
176
177impl<N: Network> Consensus<N> {
178    /// Returns the number of unconfirmed transmissions in the BFT's workers (not in the mempool).
179    pub fn num_unconfirmed_transmissions(&self) -> usize {
180        self.bft.num_unconfirmed_transmissions()
181    }
182
183    /// Returns the number of unconfirmed ratifications in the BFT's workers (not in the mempool).
184    pub fn num_unconfirmed_ratifications(&self) -> usize {
185        self.bft.num_unconfirmed_ratifications()
186    }
187
188    /// Returns the number unconfirmed solutions in the BFT's workers (not in the mempool).
189    pub fn num_unconfirmed_solutions(&self) -> usize {
190        self.bft.num_unconfirmed_solutions()
191    }
192
193    /// Returns the number of unconfirmed transactions.
194    pub fn num_unconfirmed_transactions(&self) -> usize {
195        self.bft.num_unconfirmed_transactions()
196    }
197}
198
199impl<N: Network> Consensus<N> {
200    /// Returns the unconfirmed transmission IDs.
201    pub fn unconfirmed_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
202        self.worker_transmission_ids().chain(self.inbound_transmission_ids())
203    }
204
205    /// Returns the unconfirmed transmissions.
206    pub fn unconfirmed_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
207        self.worker_transmissions().chain(self.inbound_transmissions())
208    }
209
210    /// Returns the unconfirmed solutions.
211    pub fn unconfirmed_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
212        self.worker_solutions().chain(self.inbound_solutions())
213    }
214
215    /// Returns the unconfirmed transactions.
216    pub fn unconfirmed_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
217        self.worker_transactions().chain(self.inbound_transactions())
218    }
219}
220
221impl<N: Network> Consensus<N> {
222    /// Returns the worker transmission IDs.
223    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
224        self.bft.worker_transmission_ids()
225    }
226
227    /// Returns the worker transmissions.
228    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
229        self.bft.worker_transmissions()
230    }
231
232    /// Returns the worker solutions.
233    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
234        self.bft.worker_solutions()
235    }
236
237    /// Returns the worker transactions.
238    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
239        self.bft.worker_transactions()
240    }
241}
242
243impl<N: Network> Consensus<N> {
244    /// Returns the transmission IDs in the inbound queue.
245    pub fn inbound_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
246        self.inbound_transmissions().map(|(id, _)| id)
247    }
248
249    /// Returns the transmissions in the inbound queue.
250    pub fn inbound_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
251        self.inbound_transactions()
252            .map(|(id, tx)| {
253                (
254                    TransmissionID::Transaction(id, tx.to_checksum::<N>().unwrap_or_default()),
255                    Transmission::Transaction(tx),
256                )
257            })
258            .chain(self.inbound_solutions().map(|(id, solution)| {
259                (
260                    TransmissionID::Solution(id, solution.to_checksum::<N>().unwrap_or_default()),
261                    Transmission::Solution(solution),
262                )
263            }))
264    }
265
266    /// Returns the solutions in the inbound queue.
267    pub fn inbound_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
268        // Return an iterator over the solutions in the inbound queue.
269        self.solutions_queue.lock().clone().into_iter().map(|(id, solution)| (id, Data::Object(solution)))
270    }
271
272    /// Returns the transactions in the inbound queue.
273    pub fn inbound_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
274        // Acquire the lock on the transactions queue.
275        let tx_queue = self.transactions_queue.lock();
276        // Return an iterator over the deployment and execution transactions in the inbound queue.
277        tx_queue
278            .deployments
279            .clone()
280            .into_iter()
281            .chain(tx_queue.executions.clone())
282            .map(|(id, tx)| (id, Data::Object(tx)))
283    }
284}
285
286impl<N: Network> Consensus<N> {
287    /// Adds the given unconfirmed solution to the memory pool, which will then eventually be passed
288    /// to the BFT layer for inclusion in a batch.
289    pub async fn add_unconfirmed_solution(&self, solution: Solution<N>) -> Result<()> {
290        // Calculate the transmission checksum.
291        let checksum = Data::<Solution<N>>::Buffer(solution.to_bytes_le()?.into()).to_checksum::<N>()?;
292        // Queue the unconfirmed solution.
293        {
294            let solution_id = solution.id();
295
296            // Check if the transaction was recently seen.
297            if self.seen_solutions.lock().put(solution_id, ()).is_some() {
298                // If the transaction was recently seen, return early.
299                return Ok(());
300            }
301            // Check if the solution already exists in the ledger.
302            if self.ledger.contains_transmission(&TransmissionID::Solution(solution_id, checksum))? {
303                bail!("Solution '{}' exists in the ledger {}", fmt_id(solution_id), "(skipping)".dimmed());
304            }
305            #[cfg(feature = "metrics")]
306            {
307                metrics::increment_gauge(metrics::consensus::UNCONFIRMED_SOLUTIONS, 1f64);
308                let timestamp = snarkos_node_bft::helpers::now();
309                self.transmissions_tracker.lock().insert(TransmissionID::Solution(solution.id(), checksum), timestamp);
310            }
311            // Add the solution to the memory pool.
312            trace!("Received unconfirmed solution '{}' in the queue", fmt_id(solution_id));
313            if self.solutions_queue.lock().put(solution_id, solution).is_some() {
314                bail!("Solution '{}' exists in the memory pool", fmt_id(solution_id));
315            }
316        }
317
318        // Try to process the unconfirmed solutions in the memory pool.
319        self.process_unconfirmed_solutions().await
320    }
321
322    /// Processes unconfirmed solutions in the mempool, and passes them to the BFT layer
323    /// (if sufficient space is available).
324    async fn process_unconfirmed_solutions(&self) -> Result<()> {
325        // If the memory pool of this node is full, return early.
326        let num_unconfirmed_solutions = self.num_unconfirmed_solutions();
327        let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
328        if num_unconfirmed_solutions >= N::MAX_SOLUTIONS
329            || num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE
330        {
331            return Ok(());
332        }
333        // Retrieve the solutions.
334        let solutions = {
335            // Determine the available capacity.
336            let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions);
337            // Acquire the lock on the queue.
338            let mut queue = self.solutions_queue.lock();
339            // Determine the number of solutions to send.
340            let num_solutions = queue.len().min(capacity);
341            // Drain the solutions from the queue.
342            (0..num_solutions).filter_map(|_| queue.pop_lru().map(|(_, solution)| solution)).collect::<Vec<_>>()
343        };
344        // Iterate over the solutions.
345        for solution in solutions.into_iter() {
346            let solution_id = solution.id();
347            trace!("Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id));
348            // Send the unconfirmed solution to the primary.
349            if let Err(e) = self.primary_sender.send_unconfirmed_solution(solution_id, Data::Object(solution)).await {
350                // If the BFT is synced, then log the warning.
351                if self.bft.is_synced() {
352                    // If error occurs after the first 10 blocks of the epoch, log it as a warning, otherwise ignore.
353                    if self.ledger.latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
354                        warn!("Failed to add unconfirmed solution '{}' to the memory pool - {e}", fmt_id(solution_id))
355                    };
356                }
357            }
358        }
359        Ok(())
360    }
361
362    /// Adds the given unconfirmed transaction to the memory pool, which will then eventually be passed
363    /// to the BFT layer for inclusion in a batch.
364    pub async fn add_unconfirmed_transaction(&self, transaction: Transaction<N>) -> Result<()> {
365        // Calculate the transmission checksum.
366        let checksum = Data::<Transaction<N>>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::<N>()?;
367        // Queue the unconfirmed transaction.
368        {
369            let transaction_id = transaction.id();
370
371            // Check that the transaction is not a fee transaction.
372            if transaction.is_fee() {
373                bail!("Transaction '{}' is a fee transaction {}", fmt_id(transaction_id), "(skipping)".dimmed());
374            }
375            // Check if the transaction was recently seen.
376            if self.seen_transactions.lock().put(transaction_id, ()).is_some() {
377                // If the transaction was recently seen, return early.
378                return Ok(());
379            }
380            // Check if the transaction already exists in the ledger.
381            if self.ledger.contains_transmission(&TransmissionID::Transaction(transaction_id, checksum))? {
382                bail!("Transaction '{}' exists in the ledger {}", fmt_id(transaction_id), "(skipping)".dimmed());
383            }
384            #[cfg(feature = "metrics")]
385            {
386                metrics::increment_gauge(metrics::consensus::UNCONFIRMED_TRANSACTIONS, 1f64);
387                let timestamp = snarkos_node_bft::helpers::now();
388                self.transmissions_tracker
389                    .lock()
390                    .insert(TransmissionID::Transaction(transaction.id(), checksum), timestamp);
391            }
392            // Add the transaction to the memory pool.
393            trace!("Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id));
394            if transaction.is_deploy() {
395                if self.transactions_queue.lock().deployments.put(transaction_id, transaction).is_some() {
396                    bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
397                }
398            } else if self.transactions_queue.lock().executions.put(transaction_id, transaction).is_some() {
399                bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
400            }
401        }
402
403        // Try to process the unconfirmed transactions in the memory pool.
404        self.process_unconfirmed_transactions().await
405    }
406
407    /// Processes unconfirmed transactions in the mempool, and passes them to the BFT layer
408    /// (if sufficient space is available).
409    async fn process_unconfirmed_transactions(&self) -> Result<()> {
410        // If the memory pool of this node is full, return early.
411        let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
412        if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE {
413            return Ok(());
414        }
415        // Retrieve the transactions.
416        let transactions = {
417            // Determine the available capacity.
418            let capacity = Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions);
419            // Acquire the lock on the transactions queue.
420            let mut tx_queue = self.transactions_queue.lock();
421            // Determine the number of deployments to send.
422            let num_deployments = tx_queue.deployments.len().min(capacity).min(MAX_DEPLOYMENTS_PER_INTERVAL);
423            // Determine the number of executions to send.
424            let num_executions = tx_queue.executions.len().min(capacity.saturating_sub(num_deployments));
425            // Create an iterator which will select interleaved deployments and executions within the capacity.
426            // Note: interleaving ensures we will never have consecutive invalid deployments blocking the queue.
427            let selector_iter = (0..num_deployments).map(|_| true).interleave((0..num_executions).map(|_| false));
428            // Drain the transactions from the queue, interleaving deployments and executions.
429            selector_iter
430                .filter_map(|select_deployment| {
431                    if select_deployment {
432                        tx_queue.deployments.pop_lru().map(|(_, tx)| tx)
433                    } else {
434                        tx_queue.executions.pop_lru().map(|(_, tx)| tx)
435                    }
436                })
437                .collect_vec()
438        };
439        // Iterate over the transactions.
440        for transaction in transactions.into_iter() {
441            let transaction_id = transaction.id();
442            // Determine the type of the transaction. The fee type is technically not possible here.
443            let tx_type_str = match transaction {
444                Transaction::Deploy(..) => "deployment",
445                Transaction::Execute(..) => "execution",
446                Transaction::Fee(..) => "fee",
447            };
448            trace!("Adding unconfirmed {tx_type_str} transaction '{}' to the memory pool...", fmt_id(transaction_id));
449            // Send the unconfirmed transaction to the primary.
450            if let Err(e) =
451                self.primary_sender.send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await
452            {
453                // If the BFT is synced, then log the warning.
454                if self.bft.is_synced() {
455                    warn!(
456                        "Failed to add unconfirmed {tx_type_str} transaction '{}' to the memory pool - {e}",
457                        fmt_id(transaction_id)
458                    );
459                }
460            }
461        }
462        Ok(())
463    }
464}
465
466impl<N: Network> Consensus<N> {
467    /// Starts the consensus handlers.
468    ///
469    /// This is only invoked once, in the constructor.
470    fn start_handlers(&self, consensus_receiver: ConsensusReceiver<N>) {
471        let ConsensusReceiver { mut rx_consensus_subdag } = consensus_receiver;
472
473        // Process the committed subdag and transmissions from the BFT.
474        let self_ = self.clone();
475        self.spawn(async move {
476            while let Some((committed_subdag, transmissions, callback)) = rx_consensus_subdag.recv().await {
477                self_.process_bft_subdag(committed_subdag, transmissions, callback).await;
478            }
479        });
480
481        // Process the unconfirmed transactions in the memory pool.
482        //
483        // TODO (kaimast): This shouldn't happen periodically but only when new batches/blocks are accepted
484        // by the BFT layer, after which the worker's ready queue may have capacity for more transactions/solutions.
485        let self_ = self.clone();
486        self.spawn(async move {
487            loop {
488                // Sleep briefly.
489                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
490                // Process the unconfirmed transactions in the memory pool.
491                if let Err(e) = self_.process_unconfirmed_transactions().await {
492                    warn!("Cannot process unconfirmed transactions - {e}");
493                }
494                // Process the unconfirmed solutions in the memory pool.
495                if let Err(e) = self_.process_unconfirmed_solutions().await {
496                    warn!("Cannot process unconfirmed solutions - {e}");
497                }
498            }
499        });
500    }
501
502    /// Attempts to build a new block from the given subDAG, and (tries to) advance the legder to it.
503    async fn process_bft_subdag(
504        &self,
505        subdag: Subdag<N>,
506        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
507        callback: oneshot::Sender<Result<()>>,
508    ) {
509        // Try to advance to the next block.
510        let self_ = self.clone();
511        let transmissions_ = transmissions.clone();
512        let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_) };
513
514        // If the block failed to advance, reinsert the transmissions into the memory pool.
515        if let Err(e) = &result {
516            error!("Unable to advance to the next block - {e}");
517            // On failure, reinsert the transmissions into the memory pool.
518            self.reinsert_transmissions(transmissions).await;
519        }
520        // Send the callback **after** advancing to the next block.
521        // Note: We must await the block to be advanced before sending the callback.
522        callback.send(result).ok();
523    }
524
525    /// Attempts to advance the ledger to the next block, and updates the metrics (if enabled) accordingly.
526    fn try_advance_to_next_block(
527        &self,
528        subdag: Subdag<N>,
529        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
530    ) -> Result<()> {
531        #[cfg(feature = "metrics")]
532        let start = subdag.leader_certificate().batch_header().timestamp();
533        #[cfg(feature = "metrics")]
534        let num_committed_certificates = subdag.values().map(|c| c.len()).sum::<usize>();
535        #[cfg(feature = "metrics")]
536        let current_block_timestamp = self.ledger.latest_block().header().metadata().timestamp();
537
538        // Create the candidate next block.
539        let next_block = self.ledger.prepare_advance_to_next_quorum_block(subdag, transmissions)?;
540        // Check that the block is well-formed.
541        self.ledger.check_next_block(&next_block)?;
542        // Advance to the next block.
543        self.ledger.advance_to_next_block(&next_block)?;
544        #[cfg(feature = "telemetry")]
545        // Fetch the latest committee
546        let latest_committee = self.ledger.current_committee()?;
547
548        // If the next block starts a new epoch, clear the existing solutions.
549        if next_block.height() % N::NUM_BLOCKS_PER_EPOCH == 0 {
550            // Clear the solutions queue.
551            self.solutions_queue.lock().clear();
552            // Clear the worker solutions.
553            self.bft.primary().clear_worker_solutions();
554        }
555
556        // Notify peers that we have a new block.
557        let locators = self.block_sync.get_block_locators()?;
558        self.ping.update_block_locators(locators);
559
560        // Make block sync aware of the new block.
561        self.block_sync.set_sync_height(next_block.height());
562
563        // TODO(kaimast): This should also remove any transmissions/solutions contained in the block from the mempool.
564        // Removal currently happens when Consensus eventually passes them to the worker, which then just discards them.
565
566        #[cfg(feature = "metrics")]
567        {
568            let elapsed = std::time::Duration::from_secs((snarkos_node_bft::helpers::now() - start) as u64);
569            let next_block_timestamp = next_block.header().metadata().timestamp();
570            let block_latency = next_block_timestamp - current_block_timestamp;
571            let proof_target = next_block.header().proof_target();
572            let coinbase_target = next_block.header().coinbase_target();
573            let cumulative_proof_target = next_block.header().cumulative_proof_target();
574
575            metrics::add_transmission_latency_metric(&self.transmissions_tracker, &next_block);
576
577            metrics::gauge(metrics::consensus::COMMITTED_CERTIFICATES, num_committed_certificates as f64);
578            metrics::histogram(metrics::consensus::CERTIFICATE_COMMIT_LATENCY, elapsed.as_secs_f64());
579            metrics::histogram(metrics::consensus::BLOCK_LATENCY, block_latency as f64);
580            metrics::gauge(metrics::blocks::PROOF_TARGET, proof_target as f64);
581            metrics::gauge(metrics::blocks::COINBASE_TARGET, coinbase_target as f64);
582            metrics::gauge(metrics::blocks::CUMULATIVE_PROOF_TARGET, cumulative_proof_target as f64);
583
584            #[cfg(feature = "telemetry")]
585            {
586                // Retrieve the latest participation scores.
587                let participation_scores =
588                    self.bft().primary().gateway().validator_telemetry().get_participation_scores(&latest_committee);
589
590                // Log the participation scores.
591                for (address, participation_score) in participation_scores {
592                    metrics::histogram_label(
593                        metrics::consensus::VALIDATOR_PARTICIPATION,
594                        "validator_address",
595                        address.to_string(),
596                        participation_score,
597                    )
598                }
599            }
600        }
601        Ok(())
602    }
603
604    /// Reinserts the given transmissions into the memory pool.
605    async fn reinsert_transmissions(&self, transmissions: IndexMap<TransmissionID<N>, Transmission<N>>) {
606        // Iterate over the transmissions.
607        for (transmission_id, transmission) in transmissions.into_iter() {
608            // Reinsert the transmission into the memory pool.
609            if let Err(e) = self.reinsert_transmission(transmission_id, transmission).await {
610                warn!(
611                    "Unable to reinsert transmission {}.{} into the memory pool - {e}",
612                    fmt_id(transmission_id),
613                    fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
614                );
615            }
616        }
617    }
618
619    /// Reinserts the given transmission into the memory pool.
620    async fn reinsert_transmission(
621        &self,
622        transmission_id: TransmissionID<N>,
623        transmission: Transmission<N>,
624    ) -> Result<()> {
625        // Initialize a callback sender and receiver.
626        let (callback, callback_receiver) = oneshot::channel();
627        // Send the transmission to the primary.
628        match (transmission_id, transmission) {
629            (TransmissionID::Ratification, Transmission::Ratification) => return Ok(()),
630            (TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => {
631                // Send the solution to the primary.
632                self.primary_sender.tx_unconfirmed_solution.send((solution_id, solution, callback)).await?;
633            }
634            (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) => {
635                // Send the transaction to the primary.
636                self.primary_sender.tx_unconfirmed_transaction.send((transaction_id, transaction, callback)).await?;
637            }
638            _ => bail!("Mismatching `(transmission_id, transmission)` pair in consensus"),
639        }
640        // Await the callback.
641        callback_receiver.await?
642    }
643
644    /// Spawns a task with the given future; it should only be used for long-running tasks.
645    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
646        self.handles.lock().push(tokio::spawn(future));
647    }
648
649    /// Shuts down the consensus and BFT layers.
650    pub async fn shut_down(&self) {
651        info!("Shutting down consensus...");
652        // Shut down the BFT.
653        self.bft.shut_down().await;
654        // Abort the tasks.
655        self.handles.lock().iter().for_each(|handle| handle.abort());
656    }
657}