amareleo_node_consensus/
lib.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate tracing;
20
21#[macro_use]
22extern crate amareleo_chain_tracing;
23
24use amareleo_chain_account::Account;
25use amareleo_chain_tracing::{TracingHandler, TracingHandlerGuard};
26use amareleo_node_bft::{
27    BFT,
28    MAX_BATCH_DELAY_IN_MS,
29    Primary,
30    helpers::{
31        ConsensusReceiver,
32        PrimaryReceiver,
33        PrimarySender,
34        Storage as NarwhalStorage,
35        fmt_id,
36        init_consensus_channels,
37    },
38    spawn_blocking,
39};
40use amareleo_node_bft_ledger_service::LedgerService;
41use amareleo_node_bft_storage_service::BFTPersistentStorage;
42use snarkvm::{
43    ledger::{
44        block::Transaction,
45        narwhal::{BatchHeader, Data, Subdag, Transmission, TransmissionID},
46        puzzle::{Solution, SolutionID},
47    },
48    prelude::*,
49};
50
51use aleo_std::StorageMode;
52use anyhow::Result;
53use colored::Colorize;
54use indexmap::IndexMap;
55use lru::LruCache;
56use parking_lot::Mutex;
57use std::{future::Future, num::NonZeroUsize, sync::Arc, time::Duration};
58use tokio::{
59    sync::{OnceCell, oneshot},
60    task::JoinHandle,
61};
62use tracing::subscriber::DefaultGuard;
63
64#[cfg(feature = "metrics")]
65use std::collections::HashMap;
66
67/// The capacity of the queue reserved for deployments.
68/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
69const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
70/// The capacity of the queue reserved for executions.
71/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
72const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
73/// The capacity of the queue reserved for solutions.
74/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
75const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
76/// The **suggested** maximum number of deployments in each interval.
77/// Note: This is an inbound queue limit, not a Narwhal-enforced limit.
78const MAX_DEPLOYMENTS_PER_INTERVAL: usize = 1;
79
80/// Helper struct to track incoming transactions.
81struct TransactionsQueue<N: Network> {
82    pub deployments: LruCache<N::TransactionID, Transaction<N>>,
83    pub executions: LruCache<N::TransactionID, Transaction<N>>,
84}
85
86impl<N: Network> Default for TransactionsQueue<N> {
87    fn default() -> Self {
88        Self {
89            deployments: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()),
90            executions: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()),
91        }
92    }
93}
94
95#[derive(Clone)]
96pub struct Consensus<N: Network> {
97    /// The ledger.
98    ledger: Arc<dyn LedgerService<N>>,
99    /// The BFT.
100    bft: BFT<N>,
101    /// The primary sender.
102    primary_sender: Arc<OnceCell<PrimarySender<N>>>,
103    /// The unconfirmed solutions queue.
104    solutions_queue: Arc<Mutex<LruCache<SolutionID<N>, Solution<N>>>>,
105    /// The unconfirmed transactions queue.
106    transactions_queue: Arc<Mutex<TransactionsQueue<N>>>,
107    /// The recently-seen unconfirmed solutions.
108    seen_solutions: Arc<Mutex<LruCache<SolutionID<N>, ()>>>,
109    /// The recently-seen unconfirmed transactions.
110    seen_transactions: Arc<Mutex<LruCache<N::TransactionID, ()>>>,
111    #[cfg(feature = "metrics")]
112    transmissions_queue_timestamps: Arc<Mutex<HashMap<TransmissionID<N>, i64>>>,
113    /// Tracing handle
114    tracing: Option<TracingHandler>,
115    /// The spawned handles.
116    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
117}
118
119impl<N: Network> TracingHandlerGuard for Consensus<N> {
120    /// Retruns tracing guard
121    fn get_tracing_guard(&self) -> Option<DefaultGuard> {
122        self.tracing.as_ref().and_then(|trace_handle| trace_handle.get_tracing_guard())
123    }
124}
125
126impl<N: Network> Consensus<N> {
127    /// Initializes a new instance of consensus.
128    pub fn new(
129        account: Account<N>,
130        ledger: Arc<dyn LedgerService<N>>,
131        keep_state: bool,
132        storage_mode: StorageMode,
133        tracing: Option<TracingHandler>,
134    ) -> Result<Self> {
135        // Initialize the Narwhal transmissions.
136        let transmissions = Arc::new(BFTPersistentStorage::open(storage_mode.clone(), tracing.clone())?);
137        // Initialize the Narwhal storage.
138        let storage =
139            NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::<N>::MAX_GC_ROUNDS as u64, tracing.clone());
140        // Initialize the BFT.
141        let bft = BFT::new(account, storage, keep_state, storage_mode, ledger.clone(), tracing.clone())?;
142        // Return the consensus.
143        Ok(Self {
144            ledger,
145            bft,
146            primary_sender: Default::default(),
147            solutions_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
148            transactions_queue: Default::default(),
149            seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
150            seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
151            #[cfg(feature = "metrics")]
152            transmissions_queue_timestamps: Default::default(),
153            tracing: tracing.clone(),
154            handles: Default::default(),
155        })
156    }
157
158    /// Run the consensus instance.
159    pub async fn run(&mut self, primary_sender: PrimarySender<N>, primary_receiver: PrimaryReceiver<N>) -> Result<()> {
160        guard_info!(self, "Starting the consensus instance...");
161
162        // Set the primary sender.
163        let result = self.primary_sender.set(primary_sender.clone());
164        if result.is_err() {
165            bail!("Unexpected: Primary sender already set");
166        }
167
168        // Initialize communications between Consensus and BFT.
169        let (consensus_sender, consensus_receiver) = init_consensus_channels();
170
171        // Start the Consensus handlers.
172        self.start_handlers(consensus_receiver);
173
174        // Run the BFT.
175        let result = self.bft.run(Some(consensus_sender), primary_sender, primary_receiver).await;
176        if let Err(err) = result {
177            guard_error!(self, "Consensus failed to run the BFT instance - {err}");
178            self.shut_down().await;
179            return Err(err);
180        }
181
182        Ok(())
183    }
184
185    /// Returns the ledger.
186    pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
187        &self.ledger
188    }
189
190    /// Returns the BFT.
191    pub const fn bft(&self) -> &BFT<N> {
192        &self.bft
193    }
194
195    /// Returns the primary sender.
196    fn primary_sender(&self) -> &PrimarySender<N> {
197        self.primary_sender.get().expect("Primary sender not set")
198    }
199}
200
201impl<N: Network> Consensus<N> {
202    /// Returns the number of unconfirmed transmissions.
203    pub fn num_unconfirmed_transmissions(&self) -> usize {
204        self.bft.num_unconfirmed_transmissions()
205    }
206
207    /// Returns the number of unconfirmed ratifications.
208    pub fn num_unconfirmed_ratifications(&self) -> usize {
209        self.bft.num_unconfirmed_ratifications()
210    }
211
212    /// Returns the number of solutions.
213    pub fn num_unconfirmed_solutions(&self) -> usize {
214        self.bft.num_unconfirmed_solutions()
215    }
216
217    /// Returns the number of unconfirmed transactions.
218    pub fn num_unconfirmed_transactions(&self) -> usize {
219        self.bft.num_unconfirmed_transactions()
220    }
221}
222
223impl<N: Network> Consensus<N> {
224    /// Returns the unconfirmed transmission IDs.
225    pub fn unconfirmed_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
226        self.worker_transmission_ids().chain(self.inbound_transmission_ids())
227    }
228
229    /// Returns the unconfirmed transmissions.
230    pub fn unconfirmed_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
231        self.worker_transmissions().chain(self.inbound_transmissions())
232    }
233
234    /// Returns the unconfirmed solutions.
235    pub fn unconfirmed_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
236        self.worker_solutions().chain(self.inbound_solutions())
237    }
238
239    /// Returns the unconfirmed transactions.
240    pub fn unconfirmed_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
241        self.worker_transactions().chain(self.inbound_transactions())
242    }
243}
244
245impl<N: Network> Consensus<N> {
246    /// Returns the worker transmission IDs.
247    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
248        self.bft.worker_transmission_ids()
249    }
250
251    /// Returns the worker transmissions.
252    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
253        self.bft.worker_transmissions()
254    }
255
256    /// Returns the worker solutions.
257    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
258        self.bft.worker_solutions()
259    }
260
261    /// Returns the worker transactions.
262    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
263        self.bft.worker_transactions()
264    }
265}
266
267impl<N: Network> Consensus<N> {
268    /// Returns the transmission IDs in the inbound queue.
269    pub fn inbound_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
270        self.inbound_transmissions().map(|(id, _)| id)
271    }
272
273    /// Returns the transmissions in the inbound queue.
274    pub fn inbound_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
275        self.inbound_transactions()
276            .map(|(id, tx)| {
277                (
278                    TransmissionID::Transaction(id, tx.to_checksum::<N>().unwrap_or_default()),
279                    Transmission::Transaction(tx),
280                )
281            })
282            .chain(self.inbound_solutions().map(|(id, solution)| {
283                (
284                    TransmissionID::Solution(id, solution.to_checksum::<N>().unwrap_or_default()),
285                    Transmission::Solution(solution),
286                )
287            }))
288    }
289
290    /// Returns the solutions in the inbound queue.
291    pub fn inbound_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
292        // Return an iterator over the solutions in the inbound queue.
293        self.solutions_queue.lock().clone().into_iter().map(|(id, solution)| (id, Data::Object(solution)))
294    }
295
296    /// Returns the transactions in the inbound queue.
297    pub fn inbound_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
298        // Acquire the lock on the transactions queue.
299        let tx_queue = self.transactions_queue.lock();
300        // Return an iterator over the deployment and execution transactions in the inbound queue.
301        tx_queue
302            .deployments
303            .clone()
304            .into_iter()
305            .chain(tx_queue.executions.clone())
306            .map(|(id, tx)| (id, Data::Object(tx)))
307    }
308}
309
310impl<N: Network> Consensus<N> {
311    /// Adds the given unconfirmed solution to the memory pool.
312    pub async fn add_unconfirmed_solution(&self, solution: Solution<N>) -> Result<()> {
313        // Calculate the transmission checksum.
314        let checksum = Data::<Solution<N>>::Buffer(solution.to_bytes_le()?.into()).to_checksum::<N>()?;
315        #[cfg(feature = "metrics")]
316        {
317            metrics::increment_gauge(metrics::consensus::UNCONFIRMED_SOLUTIONS, 1f64);
318            let timestamp = amareleo_node_bft::helpers::now();
319            self.transmissions_queue_timestamps
320                .lock()
321                .insert(TransmissionID::Solution(solution.id(), checksum), timestamp);
322        }
323        // Queue the unconfirmed solution.
324        {
325            let solution_id = solution.id();
326
327            // Check if the transaction was recently seen.
328            if self.seen_solutions.lock().put(solution_id, ()).is_some() {
329                // If the transaction was recently seen, return early.
330                return Ok(());
331            }
332            // Check if the solution already exists in the ledger.
333            if self.ledger.contains_transmission(&TransmissionID::Solution(solution_id, checksum))? {
334                bail!("Solution '{}' exists in the ledger {}", fmt_id(solution_id), "(skipping)".dimmed());
335            }
336            // Add the solution to the memory pool.
337            guard_trace!(self, "Received unconfirmed solution '{}' in the queue", fmt_id(solution_id));
338            if self.solutions_queue.lock().put(solution_id, solution).is_some() {
339                bail!("Solution '{}' exists in the memory pool", fmt_id(solution_id));
340            }
341        }
342
343        // Try to process the unconfirmed solutions in the memory pool.
344        self.process_unconfirmed_solutions().await
345    }
346
347    /// Processes unconfirmed transactions in the memory pool.
348    pub async fn process_unconfirmed_solutions(&self) -> Result<()> {
349        // If the memory pool of this node is full, return early.
350        let num_unconfirmed_solutions = self.num_unconfirmed_solutions();
351        let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
352        if num_unconfirmed_solutions >= N::MAX_SOLUTIONS
353            || num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE
354        {
355            return Ok(());
356        }
357        // Retrieve the solutions.
358        let solutions = {
359            // Determine the available capacity.
360            let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions);
361            // Acquire the lock on the queue.
362            let mut queue = self.solutions_queue.lock();
363            // Determine the number of solutions to send.
364            let num_solutions = queue.len().min(capacity);
365            // Drain the solutions from the queue.
366            (0..num_solutions).filter_map(|_| queue.pop_lru().map(|(_, solution)| solution)).collect::<Vec<_>>()
367        };
368        // Iterate over the solutions.
369        for solution in solutions.into_iter() {
370            let solution_id = solution.id();
371            guard_trace!(self, "Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id));
372            // Send the unconfirmed solution to the primary.
373            if let Err(e) = self.primary_sender().send_unconfirmed_solution(solution_id, Data::Object(solution)).await {
374                // If the BFT is synced, then log the warning.
375                if self.bft.is_synced() {
376                    // If error occurs after the first 10 blocks of the epoch, log it as a warning, otherwise ignore.
377                    if self.ledger().latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
378                        guard_warn!(
379                            self,
380                            "Failed to add unconfirmed solution '{}' to the memory pool - {e}",
381                            fmt_id(solution_id)
382                        )
383                    };
384                }
385            }
386        }
387        Ok(())
388    }
389
390    /// Adds the given unconfirmed transaction to the memory pool.
391    pub async fn add_unconfirmed_transaction(&self, transaction: Transaction<N>) -> Result<()> {
392        // Calculate the transmission checksum.
393        let checksum = Data::<Transaction<N>>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::<N>()?;
394        #[cfg(feature = "metrics")]
395        {
396            metrics::increment_gauge(metrics::consensus::UNCONFIRMED_TRANSACTIONS, 1f64);
397            let timestamp = amareleo_node_bft::helpers::now();
398            self.transmissions_queue_timestamps
399                .lock()
400                .insert(TransmissionID::Transaction(transaction.id(), checksum), timestamp);
401        }
402        // Queue the unconfirmed transaction.
403        {
404            let transaction_id = transaction.id();
405
406            // Check that the transaction is not a fee transaction.
407            if transaction.is_fee() {
408                bail!("Transaction '{}' is a fee transaction {}", fmt_id(transaction_id), "(skipping)".dimmed());
409            }
410            // Check if the transaction was recently seen.
411            if self.seen_transactions.lock().put(transaction_id, ()).is_some() {
412                // If the transaction was recently seen, return early.
413                return Ok(());
414            }
415            // Check if the transaction already exists in the ledger.
416            if self.ledger.contains_transmission(&TransmissionID::Transaction(transaction_id, checksum))? {
417                bail!("Transaction '{}' exists in the ledger {}", fmt_id(transaction_id), "(skipping)".dimmed());
418            }
419            // Add the transaction to the memory pool.
420            guard_trace!(self, "Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id));
421            if transaction.is_deploy() {
422                if self.transactions_queue.lock().deployments.put(transaction_id, transaction).is_some() {
423                    bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
424                }
425            } else if self.transactions_queue.lock().executions.put(transaction_id, transaction).is_some() {
426                bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
427            }
428
429            // Try to process the unconfirmed transactions in the memory pool.
430            self.process_unconfirmed_transactions().await
431        }
432    }
433
434    /// Processes unconfirmed transactions in the memory pool.
435    pub async fn process_unconfirmed_transactions(&self) -> Result<()> {
436        // If the memory pool of this node is full, return early.
437        let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
438        if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE {
439            return Ok(());
440        }
441        // Retrieve the transactions.
442        let transactions = {
443            // Determine the available capacity.
444            let capacity = Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions);
445            // Acquire the lock on the transactions queue.
446            let mut tx_queue = self.transactions_queue.lock();
447            // Determine the number of deployments to send.
448            let num_deployments = tx_queue.deployments.len().min(capacity).min(MAX_DEPLOYMENTS_PER_INTERVAL);
449            // Determine the number of executions to send.
450            let num_executions = tx_queue.executions.len().min(capacity.saturating_sub(num_deployments));
451            // Create an iterator which will select interleaved deployments and executions within the capacity.
452            // Note: interleaving ensures we will never have consecutive invalid deployments blocking the queue.
453            let selector_iter = (0..num_deployments).map(|_| true).interleave((0..num_executions).map(|_| false));
454            // Drain the transactions from the queue, interleaving deployments and executions.
455            selector_iter
456                .filter_map(|select_deployment| {
457                    if select_deployment {
458                        tx_queue.deployments.pop_lru().map(|(_, tx)| tx)
459                    } else {
460                        tx_queue.executions.pop_lru().map(|(_, tx)| tx)
461                    }
462                })
463                .collect_vec()
464        };
465        // Iterate over the transactions.
466        for transaction in transactions.into_iter() {
467            let transaction_id = transaction.id();
468            guard_trace!(self, "Adding unconfirmed transaction '{}' to the memory pool...", fmt_id(transaction_id));
469            // Send the unconfirmed transaction to the primary.
470            if let Err(e) =
471                self.primary_sender().send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await
472            {
473                // If the BFT is synced, then log the warning.
474                if self.bft.is_synced() {
475                    guard_warn!(
476                        self,
477                        "Failed to add unconfirmed transaction '{}' to the memory pool - {e}",
478                        fmt_id(transaction_id)
479                    );
480                }
481            }
482        }
483        Ok(())
484    }
485}
486
487impl<N: Network> Consensus<N> {
488    /// Starts the consensus handlers.
489    fn start_handlers(&self, consensus_receiver: ConsensusReceiver<N>) {
490        let ConsensusReceiver { mut rx_consensus_subdag } = consensus_receiver;
491
492        // Process the committed subdag and transmissions from the BFT.
493        let self_ = self.clone();
494        self.spawn(async move {
495            while let Some((committed_subdag, transmissions, callback)) = rx_consensus_subdag.recv().await {
496                self_.process_bft_subdag(committed_subdag, transmissions, callback).await;
497            }
498        });
499
500        // Process the unconfirmed transactions in the memory pool.
501        let self_ = self.clone();
502        self.spawn(async move {
503            loop {
504                // Sleep briefly.
505                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
506                // Process the unconfirmed transactions in the memory pool.
507                if let Err(e) = self_.process_unconfirmed_transactions().await {
508                    guard_warn!(self_, "Cannot process unconfirmed transactions - {e}");
509                }
510                // Process the unconfirmed solutions in the memory pool.
511                if let Err(e) = self_.process_unconfirmed_solutions().await {
512                    guard_warn!(self_, "Cannot process unconfirmed solutions - {e}");
513                }
514            }
515        });
516    }
517
518    /// Processes the committed subdag and transmissions from the BFT.
519    async fn process_bft_subdag(
520        &self,
521        subdag: Subdag<N>,
522        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
523        callback: oneshot::Sender<Result<()>>,
524    ) {
525        // Try to advance to the next block.
526        let self_ = self.clone();
527        let transmissions_ = transmissions.clone();
528        let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_) };
529
530        // If the block failed to advance, reinsert the transmissions into the memory pool.
531        if let Err(e) = &result {
532            guard_error!(self, "Unable to advance to the next block - {e}");
533            // On failure, reinsert the transmissions into the memory pool.
534            self.reinsert_transmissions(transmissions).await;
535        }
536        // Send the callback **after** advancing to the next block.
537        // Note: We must await the block to be advanced before sending the callback.
538        callback.send(result).ok();
539    }
540
541    /// Attempts to advance to the next block.
542    fn try_advance_to_next_block(
543        &self,
544        subdag: Subdag<N>,
545        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
546    ) -> Result<()> {
547        #[cfg(feature = "metrics")]
548        let start = subdag.leader_certificate().batch_header().timestamp();
549        #[cfg(feature = "metrics")]
550        let num_committed_certificates = subdag.values().map(|c| c.len()).sum::<usize>();
551        #[cfg(feature = "metrics")]
552        let current_block_timestamp = self.ledger.latest_block().header().metadata().timestamp();
553
554        // Create the candidate next block.
555        let next_block = self.ledger.prepare_advance_to_next_quorum_block(subdag, transmissions)?;
556
557        // AlexZ: Disabling block validation since this would catch our
558        //        cheating in always assigning the same leader.
559        //        Validation is within SnarkVM which we don't want to touch.
560        // Check that the block is well-formed.
561        // self.ledger.check_next_block(&next_block)?;
562
563        // Advance to the next block.
564        self.ledger.advance_to_next_block(&next_block)?;
565
566        // If the next block starts a new epoch, clear the existing solutions.
567        if next_block.height() % N::NUM_BLOCKS_PER_EPOCH == 0 {
568            // Clear the solutions queue.
569            self.solutions_queue.lock().clear();
570            // Clear the worker solutions.
571            self.bft.primary().clear_worker_solutions();
572        }
573
574        #[cfg(feature = "metrics")]
575        {
576            let elapsed = std::time::Duration::from_secs((amareleo_node_bft::helpers::now() - start) as u64);
577            let next_block_timestamp = next_block.header().metadata().timestamp();
578            let block_latency = next_block_timestamp - current_block_timestamp;
579            let proof_target = next_block.header().proof_target();
580            let coinbase_target = next_block.header().coinbase_target();
581            let cumulative_proof_target = next_block.header().cumulative_proof_target();
582
583            metrics::add_transmission_latency_metric(&self.transmissions_queue_timestamps, &next_block);
584
585            metrics::gauge(metrics::consensus::COMMITTED_CERTIFICATES, num_committed_certificates as f64);
586            metrics::histogram(metrics::consensus::CERTIFICATE_COMMIT_LATENCY, elapsed.as_secs_f64());
587            metrics::histogram(metrics::consensus::BLOCK_LATENCY, block_latency as f64);
588            metrics::gauge(metrics::blocks::PROOF_TARGET, proof_target as f64);
589            metrics::gauge(metrics::blocks::COINBASE_TARGET, coinbase_target as f64);
590            metrics::gauge(metrics::blocks::CUMULATIVE_PROOF_TARGET, cumulative_proof_target as f64);
591        }
592        Ok(())
593    }
594
595    /// Reinserts the given transmissions into the memory pool.
596    async fn reinsert_transmissions(&self, transmissions: IndexMap<TransmissionID<N>, Transmission<N>>) {
597        // Iterate over the transmissions.
598        for (transmission_id, transmission) in transmissions.into_iter() {
599            // Reinsert the transmission into the memory pool.
600            if let Err(e) = self.reinsert_transmission(transmission_id, transmission).await {
601                guard_warn!(
602                    self,
603                    "Unable to reinsert transmission {}.{} into the memory pool - {e}",
604                    fmt_id(transmission_id),
605                    fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
606                );
607            }
608        }
609    }
610
611    /// Reinserts the given transmission into the memory pool.
612    async fn reinsert_transmission(
613        &self,
614        transmission_id: TransmissionID<N>,
615        transmission: Transmission<N>,
616    ) -> Result<()> {
617        // Initialize a callback sender and receiver.
618        let (callback, callback_receiver) = oneshot::channel();
619        // Send the transmission to the primary.
620        match (transmission_id, transmission) {
621            (TransmissionID::Ratification, Transmission::Ratification) => return Ok(()),
622            (TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => {
623                // Send the solution to the primary.
624                self.primary_sender().tx_unconfirmed_solution.send((solution_id, solution, callback)).await?;
625            }
626            (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) => {
627                // Send the transaction to the primary.
628                self.primary_sender().tx_unconfirmed_transaction.send((transaction_id, transaction, callback)).await?;
629            }
630            _ => bail!("Mismatching `(transmission_id, transmission)` pair in consensus"),
631        }
632        // Await the callback.
633        callback_receiver.await?
634    }
635
636    /// Spawns a task with the given future; it should only be used for long-running tasks.
637    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
638        self.handles.lock().push(tokio::spawn(future));
639    }
640
641    /// Shuts down the BFT.
642    pub async fn shut_down(&self) {
643        guard_info!(self, "Shutting down consensus...");
644
645        // Shut down the BFT.
646        self.bft.shut_down().await;
647
648        // Abort Consensus tasks.
649        let mut handles = self.handles.lock();
650        handles.iter().for_each(|handle| handle.abort());
651        handles.clear();
652    }
653}