Skip to main content

snarkos_node_consensus/
lib.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![forbid(unsafe_code)]
17
18mod transactions_queue;
19use transactions_queue::TransactionsQueue;
20
21#[macro_use]
22extern crate tracing;
23
24#[cfg(feature = "metrics")]
25extern crate snarkos_node_metrics as metrics;
26
27use snarkos_account::Account;
28use snarkos_node_bft::{
29    BFT,
30    MAX_BATCH_DELAY,
31    Primary,
32    helpers::{
33        ConsensusReceiver,
34        PrimarySender,
35        Storage as NarwhalStorage,
36        fmt_id,
37        init_consensus_channels,
38        init_primary_channels,
39    },
40    spawn_blocking,
41};
42use snarkos_node_bft_ledger_service::LedgerService;
43use snarkos_node_bft_storage_service::BFTPersistentStorage;
44use snarkos_node_sync::{BlockSync, Ping};
45use snarkos_utilities::NodeDataDir;
46
47use snarkvm::{
48    ledger::{
49        CheckBlockError,
50        block::Transaction,
51        narwhal::{BatchHeader, Data, Subdag, Transmission, TransmissionID},
52        puzzle::{Solution, SolutionID},
53    },
54    prelude::*,
55    utilities::flatten_error,
56};
57
58use aleo_std::StorageMode;
59use anyhow::{Context, Result};
60use cfg_if::cfg_if;
61use colored::Colorize;
62use indexmap::IndexMap;
63#[cfg(feature = "locktick")]
64use locktick::parking_lot::{Mutex, RwLock};
65use lru::LruCache;
66#[cfg(not(feature = "locktick"))]
67use parking_lot::{Mutex, RwLock};
68use std::{future::Future, net::SocketAddr, num::NonZeroUsize, sync::Arc};
69use tokio::{
70    sync::{Notify, oneshot},
71    task::JoinHandle,
72};
73
74#[cfg(feature = "metrics")]
75use std::collections::HashMap;
76
77/// The capacity of the queue reserved for deployments.
78/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
79const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
80/// The capacity of the queue reserved for executions.
81/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
82const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
83/// The capacity of the queue reserved for solutions.
84/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
85const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
86/// The **suggested** maximum number of deployments in each interval.
87/// Note: This is an inbound queue limit, not a Narwhal-enforced limit.
88const MAX_DEPLOYMENTS_PER_INTERVAL: usize = 1;
89
90/// Wrapper around `BFT` that adds additional functionality, such as a mempool.
91///
92/// Consensus acts as a rate limiter to prevents workers in BFT from being overloaded.
93/// Each worker maintains a ready queue (which is essentially also a mempool), but verifies transactions/solutions
94/// before enqueuing them.
95/// Consensus only passes more transactions/solutions to the BFT layer if its ready queues are not already full.
96#[derive(Clone)]
97pub struct Consensus<N: Network> {
98    /// The ledger.
99    ledger: Arc<dyn LedgerService<N>>,
100    /// The BFT.
101    bft: BFT<N>,
102    /// The primary sender.
103    primary_sender: PrimarySender<N>,
104    /// The unconfirmed solutions queue.
105    solutions_queue: Arc<Mutex<LruCache<SolutionID<N>, Solution<N>>>>,
106    /// The unconfirmed transactions queue.
107    transactions_queue: Arc<RwLock<TransactionsQueue<N>>>,
108    /// The recently-seen unconfirmed solutions.
109    seen_solutions: Arc<Mutex<LruCache<SolutionID<N>, ()>>>,
110    /// The recently-seen unconfirmed transactions.
111    seen_transactions: Arc<Mutex<LruCache<N::TransactionID, ()>>>,
112    #[cfg(feature = "metrics")]
113    transmissions_tracker: Arc<Mutex<HashMap<TransmissionID<N>, i64>>>,
114    /// The spawned handles.
115    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
116    /// The ping logic.
117    ping: Arc<Ping<N>>,
118    /// The block sync logic.
119    block_sync: Arc<BlockSync<N>>,
120    /// Notifies when a block is committed, and relays it to the primary.
121    block_commit_notify: Arc<Notify>,
122}
123
124impl<N: Network> Consensus<N> {
125    /// Initializes a new instance of consensus and spawn its background tasks.
126    #[allow(clippy::too_many_arguments)]
127    pub async fn new(
128        account: Account<N>,
129        ledger: Arc<dyn LedgerService<N>>,
130        block_sync: Arc<BlockSync<N>>,
131        ip: Option<SocketAddr>,
132        trusted_validators: &[SocketAddr],
133        trusted_peers_only: bool,
134        storage_mode: StorageMode,
135        node_data_dir: NodeDataDir,
136        ping: Arc<Ping<N>>,
137        dev: Option<u16>,
138    ) -> Result<Self> {
139        // Initialize the primary channels.
140        let (primary_sender, primary_receiver) = init_primary_channels::<N>();
141        // Initialize the Narwhal transmissions.
142        let transmissions = Arc::new(BFTPersistentStorage::open(storage_mode.clone())?);
143        // Initialize the Narwhal storage.
144        let storage = NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::<N>::MAX_GC_ROUNDS as u64)
145            .with_context(|| "Failed to initialize the BFT storage")?;
146        // Initialize the BFT.
147        let bft = BFT::new(
148            account,
149            storage,
150            ledger.clone(),
151            block_sync.clone(),
152            ip,
153            trusted_validators,
154            trusted_peers_only,
155            node_data_dir,
156            dev,
157        )?;
158        // Create a new instance of Consensus.
159        let mut _self = Self {
160            ledger,
161            bft,
162            block_sync,
163            primary_sender,
164            solutions_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
165            transactions_queue: Default::default(),
166            seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
167            seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
168            #[cfg(feature = "metrics")]
169            transmissions_tracker: Default::default(),
170            handles: Default::default(),
171            ping: ping.clone(),
172            block_commit_notify: Arc::new(Notify::new()),
173        };
174
175        info!("Starting the consensus instance...");
176
177        // First, initialize the consensus channels.
178        let (consensus_sender, consensus_receiver) = init_consensus_channels();
179        // Then, start the consensus handlers.
180        _self.start_handlers(consensus_receiver);
181        // Lastly, also start BFTs handlers.
182        _self.bft.run(Some(ping), Some(consensus_sender), _self.primary_sender.clone(), primary_receiver).await?;
183
184        Ok(_self)
185    }
186
187    /// Returns the underlying `BFT` struct.
188    pub const fn bft(&self) -> &BFT<N> {
189        &self.bft
190    }
191
192    pub fn contains_transaction(&self, transaction_id: &N::TransactionID) -> bool {
193        self.transactions_queue.read().contains(transaction_id)
194    }
195}
196
197impl<N: Network> Consensus<N> {
198    /// Returns the number of unconfirmed transmissions in the BFT's workers (not in the mempool).
199    pub fn num_unconfirmed_transmissions(&self) -> usize {
200        self.bft.num_unconfirmed_transmissions()
201    }
202
203    /// Returns the number of unconfirmed ratifications in the BFT's workers (not in the mempool).
204    pub fn num_unconfirmed_ratifications(&self) -> usize {
205        self.bft.num_unconfirmed_ratifications()
206    }
207
208    /// Returns the number unconfirmed solutions in the BFT's workers (not in the mempool).
209    pub fn num_unconfirmed_solutions(&self) -> usize {
210        self.bft.num_unconfirmed_solutions()
211    }
212
213    /// Returns the number of unconfirmed transactions.
214    pub fn num_unconfirmed_transactions(&self) -> usize {
215        self.bft.num_unconfirmed_transactions()
216    }
217}
218
219impl<N: Network> Consensus<N> {
220    /// Returns the unconfirmed transmission IDs.
221    pub fn unconfirmed_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
222        self.worker_transmission_ids().chain(self.inbound_transmission_ids())
223    }
224
225    /// Returns the unconfirmed transmissions.
226    pub fn unconfirmed_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
227        self.worker_transmissions().chain(self.inbound_transmissions())
228    }
229
230    /// Returns the unconfirmed solutions.
231    pub fn unconfirmed_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
232        self.worker_solutions().chain(self.inbound_solutions())
233    }
234
235    /// Returns the unconfirmed transactions.
236    pub fn unconfirmed_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
237        self.worker_transactions().chain(self.inbound_transactions())
238    }
239}
240
241impl<N: Network> Consensus<N> {
242    /// Returns the worker transmission IDs.
243    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
244        self.bft.worker_transmission_ids()
245    }
246
247    /// Returns the worker transmissions.
248    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
249        self.bft.worker_transmissions()
250    }
251
252    /// Returns the worker solutions.
253    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
254        self.bft.worker_solutions()
255    }
256
257    /// Returns the worker transactions.
258    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
259        self.bft.worker_transactions()
260    }
261}
262
263impl<N: Network> Consensus<N> {
264    /// Returns the transmission IDs in the inbound queue.
265    pub fn inbound_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
266        self.inbound_transmissions().map(|(id, _)| id)
267    }
268
269    /// Returns the transmissions in the inbound queue.
270    pub fn inbound_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
271        self.inbound_transactions()
272            .map(|(id, tx)| {
273                (
274                    TransmissionID::Transaction(id, tx.to_checksum::<N>().unwrap_or_default()),
275                    Transmission::Transaction(tx),
276                )
277            })
278            .chain(self.inbound_solutions().map(|(id, solution)| {
279                (
280                    TransmissionID::Solution(id, solution.to_checksum::<N>().unwrap_or_default()),
281                    Transmission::Solution(solution),
282                )
283            }))
284    }
285
286    /// Returns the solutions in the inbound queue.
287    pub fn inbound_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
288        // Return an iterator over the solutions in the inbound queue.
289        self.solutions_queue.lock().clone().into_iter().map(|(id, solution)| (id, Data::Object(solution)))
290    }
291
292    /// Returns the transactions in the inbound queue.
293    pub fn inbound_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
294        // Return an iterator over the deployment and execution transactions in the inbound queue.
295        self.transactions_queue.read().transactions().map(|(id, tx)| (id, Data::Object(tx)))
296    }
297}
298
299impl<N: Network> Consensus<N> {
300    /// Adds the given unconfirmed solution to the memory pool, which will then eventually be passed
301    /// to the BFT layer for inclusion in a batch.
302    pub async fn add_unconfirmed_solution(&self, solution: Solution<N>) -> Result<()> {
303        // Calculate the transmission checksum.
304        let checksum = Data::<Solution<N>>::Buffer(solution.to_bytes_le()?.into()).to_checksum::<N>()?;
305        // Queue the unconfirmed solution.
306        {
307            let solution_id = solution.id();
308
309            // Check if the transaction was recently seen.
310            if self.seen_solutions.lock().put(solution_id, ()).is_some() {
311                // If the transaction was recently seen, return early.
312                return Ok(());
313            }
314            // Check if the solution already exists in the ledger.
315            if self.ledger.contains_transmission(&TransmissionID::Solution(solution_id, checksum))? {
316                bail!("Solution '{}' exists in the ledger {}", fmt_id(solution_id), "(skipping)".dimmed());
317            }
318            #[cfg(feature = "metrics")]
319            {
320                metrics::increment_gauge(metrics::consensus::UNCONFIRMED_SOLUTIONS, 1f64);
321                let timestamp = snarkos_node_bft::helpers::now();
322                self.transmissions_tracker.lock().insert(TransmissionID::Solution(solution.id(), checksum), timestamp);
323            }
324            // Add the solution to the memory pool.
325            trace!("Received unconfirmed solution '{}' in the queue", fmt_id(solution_id));
326            if self.solutions_queue.lock().put(solution_id, solution).is_some() {
327                bail!("Solution '{}' exists in the memory pool", fmt_id(solution_id));
328            }
329        }
330
331        // Try to process the unconfirmed solutions in the memory pool.
332        self.process_unconfirmed_solutions().await
333    }
334
335    /// Processes unconfirmed solutions in the mempool, and passes them to the BFT layer
336    /// (if sufficient space is available).
337    async fn process_unconfirmed_solutions(&self) -> Result<()> {
338        // If the memory pool of this node is full, return early.
339        let num_unconfirmed_solutions = self.num_unconfirmed_solutions();
340        let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
341        if num_unconfirmed_solutions >= N::MAX_SOLUTIONS
342            || num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE
343        {
344            return Ok(());
345        }
346        // Retrieve the solutions.
347        let solutions = {
348            // Determine the available capacity.
349            let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions);
350            // Acquire the lock on the queue.
351            let mut queue = self.solutions_queue.lock();
352            // Determine the number of solutions to send.
353            let num_solutions = queue.len().min(capacity);
354            // Drain the solutions from the queue.
355            (0..num_solutions).filter_map(|_| queue.pop_lru().map(|(_, solution)| solution)).collect::<Vec<_>>()
356        };
357        // Iterate over the solutions.
358        for solution in solutions.into_iter() {
359            let solution_id = solution.id();
360            trace!("Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id));
361            // Send the unconfirmed solution to the primary.
362            match self.primary_sender.send_unconfirmed_solution(solution_id, Data::Object(solution)).await {
363                Ok(true) => {}
364                Ok(false) => debug!(
365                    "Unable to add unconfirmed solution '{}' to the memory pool. Already exists.",
366                    fmt_id(solution_id)
367                ),
368                Err(err) => {
369                    let err = err.context(format!(
370                        "Unable to add unconfirmed solution '{}' to the memory pool",
371                        fmt_id(solution_id)
372                    ));
373
374                    // If the node is synced and the occurs after the first 10 blocks of an epoch, log it as a warning, otherwise trace it.
375                    if self.bft.is_synced() && self.ledger.latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
376                        warn!("{}", flatten_error(err));
377                    } else {
378                        trace!("{}", flatten_error(err));
379                    }
380                }
381            }
382        }
383        Ok(())
384    }
385
386    /// Adds the given unconfirmed transaction to the memory pool, which will then eventually be passed
387    /// to the BFT layer for inclusion in a batch.
388    pub async fn add_unconfirmed_transaction(&self, transaction: Transaction<N>) -> Result<()> {
389        // Calculate the transmission checksum.
390        let checksum = Data::<Transaction<N>>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::<N>()?;
391        // Queue the unconfirmed transaction.
392        {
393            let transaction_id = transaction.id();
394
395            // Check that the transaction is not a fee transaction.
396            if transaction.is_fee() {
397                bail!("Transaction '{}' is a fee transaction {}", fmt_id(transaction_id), "(skipping)".dimmed());
398            }
399            // Check if the transaction was recently seen.
400            if self.seen_transactions.lock().put(transaction_id, ()).is_some() {
401                // If the transaction was recently seen, return early.
402                return Ok(());
403            }
404            // Check if the transaction already exists in the ledger.
405            if self.ledger.contains_transmission(&TransmissionID::Transaction(transaction_id, checksum))? {
406                bail!("Transaction '{}' exists in the ledger {}", fmt_id(transaction_id), "(skipping)".dimmed());
407            }
408            // Check that the transaction is not in the mempool.
409            if self.contains_transaction(&transaction_id) {
410                bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
411            }
412            #[cfg(feature = "metrics")]
413            {
414                metrics::increment_gauge(metrics::consensus::UNCONFIRMED_TRANSACTIONS, 1f64);
415                let timestamp = snarkos_node_bft::helpers::now();
416                self.transmissions_tracker
417                    .lock()
418                    .insert(TransmissionID::Transaction(transaction.id(), checksum), timestamp);
419            }
420            // Add the transaction to the memory pool.
421            trace!("Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id));
422            let priority_fee = transaction.priority_fee_amount()?;
423            self.transactions_queue.write().insert(transaction_id, transaction, priority_fee)?;
424        }
425
426        // Try to process the unconfirmed transactions in the memory pool.
427        self.process_unconfirmed_transactions().await
428    }
429
430    /// Processes unconfirmed transactions in the mempool, and passes them to the BFT layer
431    /// (if sufficient space is available).
432    async fn process_unconfirmed_transactions(&self) -> Result<()> {
433        // If the memory pool of this node is full, return early.
434        let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
435        if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE {
436            return Ok(());
437        }
438        // Retrieve the transactions.
439        let transactions = {
440            // Determine the available capacity.
441            let capacity = Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions);
442            // Acquire the lock on the transactions queue.
443            let mut tx_queue = self.transactions_queue.write();
444            // Determine the number of deployments to send.
445            let num_deployments = tx_queue.deployments.len().min(capacity).min(MAX_DEPLOYMENTS_PER_INTERVAL);
446            // Determine the number of executions to send.
447            let num_executions = tx_queue.executions.len().min(capacity.saturating_sub(num_deployments));
448            // Create an iterator which will select interleaved deployments and executions within the capacity.
449            // Note: interleaving ensures we will never have consecutive invalid deployments blocking the queue.
450            let selector_iter = (0..num_deployments).map(|_| true).interleave((0..num_executions).map(|_| false));
451            // Drain the transactions from the queue, interleaving deployments and executions.
452            selector_iter
453                .filter_map(
454                    |select_deployment| {
455                        if select_deployment { tx_queue.deployments.pop() } else { tx_queue.executions.pop() }
456                    },
457                )
458                .map(|(_, tx)| tx)
459                .collect_vec()
460        };
461        // Iterate over the transactions.
462        for transaction in transactions.into_iter() {
463            let transaction_id = transaction.id();
464            // Determine the type of the transaction. The fee type is technically not possible here.
465            let tx_type_str = match transaction {
466                Transaction::Deploy(..) => "deployment",
467                Transaction::Execute(..) => "execution",
468                Transaction::Fee(..) => "fee",
469            };
470            trace!("Adding unconfirmed {tx_type_str} transaction '{}' to the memory pool...", fmt_id(transaction_id));
471            // Send the unconfirmed transaction to the primary.
472            match self.primary_sender.send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await {
473                Ok(true) => {}
474                Ok(false) => debug!(
475                    "Unable to add unconfirmed {tx_type_str} transaction '{}' to the memory pool. Already exists.",
476                    fmt_id(transaction_id)
477                ),
478                Err(err) => {
479                    // If the BFT is synced, then log the warning.
480                    if self.bft.is_synced() {
481                        let err = err.context(format!(
482                            "Unable to add unconfirmed {tx_type_str} transaction '{}' to the memory pool",
483                            fmt_id(transaction_id)
484                        ));
485                        warn!("{}", flatten_error(err));
486                    }
487                }
488            }
489        }
490        Ok(())
491    }
492}
493
494impl<N: Network> Consensus<N> {
495    /// Starts the consensus handlers.
496    ///
497    /// This is only invoked once, in the constructor.
498    fn start_handlers(&self, consensus_receiver: ConsensusReceiver<N>) {
499        let ConsensusReceiver { mut rx_consensus_subdag } = consensus_receiver;
500
501        // Process the committed subdag and transmissions from the BFT.
502        let self_ = self.clone();
503        self.spawn(async move {
504            while let Some((committed_subdag, transmissions, callback)) = rx_consensus_subdag.recv().await {
505                self_.process_bft_subdag(committed_subdag, transmissions, callback).await;
506            }
507        });
508
509        // Process the unconfirmed transactions in the memory pool.
510        //
511        // This loop wakes up either when a block is committed (signaled via notify) or after a timeout.
512        // When a block is committed, the BFT workers have freed capacity for more transmissions.
513        // The timeout serves as a fallback for startup or idle periods when blocks are not being produced.
514        //
515        // TODO(kaimast): wake up the loop after a proposal is created, not only when a block commits.
516        let self_ = self.clone();
517        self.spawn(async move {
518            loop {
519                // Wait for either a block commit notification or timeout.
520                tokio::select! {
521                    _ = self_.block_commit_notify.notified() => {}
522                    _ = tokio::time::sleep(MAX_BATCH_DELAY) => {}
523                }
524                // Process the unconfirmed transactions in the memory pool.
525                if let Err(err) = self_.process_unconfirmed_transactions().await {
526                    warn!("{}", flatten_error(err.context("Cannot process unconfirmed transactions")));
527                }
528                // Process the unconfirmed solutions in the memory pool.
529                if let Err(err) = self_.process_unconfirmed_solutions().await {
530                    warn!("{}", flatten_error(err.context("Cannot process unconfirmed solutions")));
531                }
532            }
533        });
534    }
535
536    /// Attempts to build a new block from the given subDAG, and (tries to) advance the legder to it.
537    ///
538    /// # Returns
539    /// - `Ok(true)` if the ledger was advanced to the next block.
540    /// - `Ok(false)` if the block may be valide but the ledger already advanced.
541    /// - `Err(anyhow::Error)` for incorrect blocks and any other error.
542    async fn process_bft_subdag(
543        &self,
544        subdag: Subdag<N>,
545        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
546        callback: oneshot::Sender<Result<bool>>,
547    ) {
548        // Try to advance to the next block.
549        let self_ = self.clone();
550        let transmissions_ = transmissions.clone();
551        let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_).with_context(|| "Unable to advance to the next block") };
552
553        // If the block failed to advance, reinsert the transmissions into the memory pool.
554        match result {
555            Ok(true) => {
556                // On success, notify that the BFT workers have freed capacity for more transmissions.
557                self.block_commit_notify.notify_one();
558            }
559            Ok(false) | Err(_) => self.reinsert_transmissions(transmissions).await,
560        }
561
562        callback.send(result).ok();
563    }
564
565    /// Attempts to advance the ledger to the next block, and updates the metrics (if enabled) accordingly.
566    ///
567    /// # Returns
568    /// - `Ok(true)` if the ledger was advanced to the next block.
569    /// - `Ok(false)` if the block may be valid but the ledger already advanced.
570    /// - `Err(anyhow::Error)` for incorrect blocks and any other error.
571    fn try_advance_to_next_block(
572        &self,
573        subdag: Subdag<N>,
574        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
575    ) -> Result<bool> {
576        #[cfg(feature = "metrics")]
577        let start = subdag.leader_certificate().batch_header().timestamp();
578        #[cfg(feature = "metrics")]
579        let num_committed_certificates = subdag.values().map(|c| c.len()).sum::<usize>();
580        #[cfg(feature = "metrics")]
581        let current_block_timestamp = self.ledger.latest_block().header().metadata().timestamp();
582
583        // Create the candidate next block.
584        let ledger_update = self.ledger.begin_ledger_update()?;
585
586        let prepare_instant = std::time::Instant::now();
587        let block = match ledger_update.prepare_advance_to_next_quorum_block(subdag, transmissions) {
588            Ok(block) => block,
589            Err(err) => return Err(err.into_anyhow()),
590        };
591        let prepare_elapsed = prepare_instant.elapsed();
592        trace!("prepare_advance_to_next_quorum_block took {:.3}s", prepare_elapsed.as_secs_f64());
593        #[cfg(feature = "metrics")]
594        metrics::histogram(metrics::consensus::PREPARE_ADVANCE_SECS, prepare_elapsed.as_secs_f64());
595
596        let check_instant = std::time::Instant::now();
597        cfg_if! {
598            if #[cfg(feature = "test_network")] {
599                // If we are using a hotswapped dev committee, skip checking the block.
600                let result = if self.ledger.dev_committee_for_round(block.round())?.is_some() {
601                    Ok(block)
602                } else {
603                    ledger_update.check_next_block(block)
604                };
605            } else {
606                let result = ledger_update.check_next_block(block);
607            }
608        }
609
610        let block = match result {
611            Ok(block) => block,
612            Err(CheckBlockError::BlockAlreadyExists { .. }) => {
613                debug!("The given block hash already exists in the ledger");
614                return Ok(false);
615            }
616            Err(CheckBlockError::InvalidHeight { .. }) => {
617                debug!("The ledger advanced while we were constructing the next block");
618                return Ok(false);
619            }
620            Err(CheckBlockError::InvalidRound { new, previous }) => {
621                debug!("The subDAG round is too low. Expected >{previous}, got {new}");
622                return Ok(false);
623            }
624            Err(err) => return Err(err.into_anyhow()),
625        };
626
627        let check_elapsed = check_instant.elapsed();
628        trace!("check_next_block took {:.3}s", check_elapsed.as_secs_f64());
629        #[cfg(feature = "metrics")]
630        metrics::histogram(metrics::consensus::CHECK_NEXT_BLOCK_SECS, check_elapsed.as_secs_f64());
631
632        let block_height = block.height();
633
634        // Advance to the next block.
635        let advance_instant = std::time::Instant::now();
636        ledger_update.advance_to_next_block(&block)?;
637        let advance_elapsed = advance_instant.elapsed();
638        trace!("advance_to_next_block took {:.3}s", advance_elapsed.as_secs_f64());
639        #[cfg(feature = "metrics")]
640        metrics::histogram(metrics::consensus::ADVANCE_TO_NEXT_BLOCK_SECS, advance_elapsed.as_secs_f64());
641
642        #[cfg(feature = "telemetry")]
643        // Fetch the committee lookback for the latest round.
644        // Note: Do not abort here if this returns an error, because the committee is only needed for telemetry,
645        // not for block advancement itself.
646        let latest_committee = self.ledger.get_committee_lookback_for_round(self.ledger.latest_round());
647
648        // If the next block starts a new epoch, clear the existing solutions.
649        if block_height.is_multiple_of(N::NUM_BLOCKS_PER_EPOCH) {
650            // Clear the solutions queue.
651            self.solutions_queue.lock().clear();
652            // Clear the worker solutions.
653            self.bft.primary().clear_worker_solutions();
654        }
655
656        // Notify peers that we have a new block.
657        match self.block_sync.get_block_locators() {
658            Ok(locators) => self.ping.update_block_locators(locators),
659            Err(err) => error!(
660                "{}",
661                flatten_error(err.context("Failed to generate new block locators after block advancement"))
662            ),
663        }
664
665        // Make block sync aware of the new block.
666        self.block_sync.set_sync_height(block_height);
667
668        // TODO(kaimast): This should also remove any transmissions/solutions contained in the block from the mempool.
669        // Removal currently happens when Consensus eventually passes them to the worker, which then just discards them.
670
671        #[cfg(feature = "metrics")]
672        {
673            let now_utc = snarkos_node_bft::helpers::now_utc();
674            let elapsed = std::time::Duration::from_secs((now_utc.unix_timestamp() - start) as u64);
675            let next_block_timestamp = block.header().metadata().timestamp();
676            let next_block_utc = snarkos_node_bft::helpers::to_utc_datetime(next_block_timestamp);
677            let block_latency = next_block_timestamp - current_block_timestamp;
678            let block_lag = (now_utc - next_block_utc).whole_milliseconds();
679
680            let proof_target = block.header().proof_target();
681            let coinbase_target = block.header().coinbase_target();
682            let cumulative_proof_target = block.header().cumulative_proof_target();
683
684            // Calculate latency for all transmissions included in this block.
685            metrics::add_transmission_latency_metric(&self.transmissions_tracker, &block);
686
687            metrics::gauge(metrics::consensus::COMMITTED_CERTIFICATES, num_committed_certificates as f64);
688            metrics::histogram(metrics::consensus::CERTIFICATE_COMMIT_LATENCY, elapsed.as_secs_f64());
689            metrics::histogram(metrics::consensus::BLOCK_LATENCY, block_latency as f64);
690            metrics::histogram(metrics::consensus::BLOCK_LAG, block_lag as f64);
691            metrics::gauge(metrics::blocks::PROOF_TARGET, proof_target as f64);
692            metrics::gauge(metrics::blocks::COINBASE_TARGET, coinbase_target as f64);
693            metrics::gauge(metrics::blocks::CUMULATIVE_PROOF_TARGET, cumulative_proof_target as f64);
694
695            // If telemetry is enabled, update participation scores.
696            #[cfg(feature = "telemetry")]
697            {
698                match latest_committee {
699                    Ok(latest_committee) => {
700                        // Retrieve the individual participation scores.
701                        let participation_scores = self
702                            .bft()
703                            .primary()
704                            .gateway()
705                            .validator_telemetry()
706                            .get_participation_scores(&latest_committee);
707
708                        // Export the certificate and signature participation scores as individual gauges.
709                        for (address, (certificate_score, signature_score)) in participation_scores {
710                            let address_str = address.to_string();
711                            metrics::gauge_label(
712                                metrics::consensus::VALIDATOR_CERTIFICATE_PARTICIPATION,
713                                "validator_address",
714                                address_str.clone(),
715                                certificate_score,
716                            );
717                            metrics::gauge_label(
718                                metrics::consensus::VALIDATOR_SIGNATURE_PARTICIPATION,
719                                "validator_address",
720                                address_str,
721                                signature_score,
722                            );
723                        }
724                    }
725                    Err(err) => warn!("{}", flatten_error(err.context("Failed to get latest committee for telemetry"))),
726                }
727            }
728        }
729
730        Ok(true)
731    }
732
733    /// Reinserts the given transmissions into the memory pool.
734    async fn reinsert_transmissions(&self, transmissions: IndexMap<TransmissionID<N>, Transmission<N>>) {
735        // Iterate over the transmissions.
736        for (transmission_id, transmission) in transmissions.into_iter() {
737            // Reinsert the transmission into the memory pool.
738            match self.reinsert_transmission(transmission_id, transmission).await {
739                Ok(true) => {}
740                Ok(false) => debug!(
741                    "Unable to reinsert transmission {}:{} into the memory pool. Already exists.",
742                    fmt_id(transmission_id),
743                    fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
744                ),
745                Err(err) => {
746                    let err = err.context(format!(
747                        "Unable to reinsert transmission {}.{} into the memory pool",
748                        fmt_id(transmission_id),
749                        fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
750                    ));
751                    warn!("{}", flatten_error(err));
752                }
753            }
754        }
755    }
756
757    /// Reinserts the given transmission into the memory pool.
758    ///
759    /// # Returns
760    /// - `Ok(true)` if the transmission was added to the memory pool.
761    /// - `Ok(false)` if the transmission was valid but already exists in the memory pool.
762    /// - `Err(anyhow::Error)` if the transmission was invalid.
763    async fn reinsert_transmission(
764        &self,
765        transmission_id: TransmissionID<N>,
766        transmission: Transmission<N>,
767    ) -> Result<bool> {
768        // Initialize a callback sender and receiver.
769        let (callback, callback_receiver) = oneshot::channel();
770        // Send the transmission to the primary.
771        match (transmission_id, transmission) {
772            (TransmissionID::Ratification, Transmission::Ratification) => return Ok(true),
773            (TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => {
774                // Send the solution to the primary.
775                self.primary_sender.tx_unconfirmed_solution.send((solution_id, solution, callback)).await?;
776            }
777            (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) => {
778                // Send the transaction to the primary.
779                self.primary_sender.tx_unconfirmed_transaction.send((transaction_id, transaction, callback)).await?;
780            }
781            _ => bail!("Mismatching `(transmission_id, transmission)` pair in consensus"),
782        }
783        // Await the callback.
784        callback_receiver.await?
785    }
786
787    /// Spawns a task with the given future; it should only be used for long-running tasks.
788    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
789        self.handles.lock().push(tokio::spawn(future));
790    }
791
792    /// Shuts down the consensus and BFT layers.
793    pub async fn shut_down(&self) {
794        info!("Shutting down consensus...");
795        // Shut down the BFT.
796        self.bft.shut_down().await;
797        // Abort the tasks.
798        self.handles.lock().iter().for_each(|handle| handle.abort());
799    }
800}