amareleo_node_consensus/
lib.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate tracing;
20
21use amareleo_chain_account::Account;
22use amareleo_chain_tracing::TracingHandler;
23use amareleo_node_bft::{
24    BFT,
25    MAX_BATCH_DELAY_IN_MS,
26    Primary,
27    helpers::{
28        ConsensusReceiver,
29        PrimaryReceiver,
30        PrimarySender,
31        Storage as NarwhalStorage,
32        fmt_id,
33        init_consensus_channels,
34    },
35    spawn_blocking,
36};
37use amareleo_node_bft_ledger_service::LedgerService;
38use amareleo_node_bft_storage_service::BFTPersistentStorage;
39use snarkvm::{
40    ledger::{
41        block::Transaction,
42        narwhal::{BatchHeader, Data, Subdag, Transmission, TransmissionID},
43        puzzle::{Solution, SolutionID},
44    },
45    prelude::*,
46};
47
48use aleo_std::StorageMode;
49use anyhow::Result;
50use colored::Colorize;
51use indexmap::IndexMap;
52use lru::LruCache;
53use parking_lot::Mutex;
54use std::{future::Future, num::NonZeroUsize, sync::Arc, time::Duration};
55use tokio::{
56    sync::{OnceCell, oneshot},
57    task::JoinHandle,
58};
59use tracing::subscriber::DefaultGuard;
60
61#[cfg(feature = "metrics")]
62use std::collections::HashMap;
63
64/// The capacity of the queue reserved for deployments.
65/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
66const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
67/// The capacity of the queue reserved for executions.
68/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
69const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
70/// The capacity of the queue reserved for solutions.
71/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
72const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
73/// The **suggested** maximum number of deployments in each interval.
74/// Note: This is an inbound queue limit, not a Narwhal-enforced limit.
75const MAX_DEPLOYMENTS_PER_INTERVAL: usize = 1;
76
77/// Helper struct to track incoming transactions.
78struct TransactionsQueue<N: Network> {
79    pub deployments: LruCache<N::TransactionID, Transaction<N>>,
80    pub executions: LruCache<N::TransactionID, Transaction<N>>,
81}
82
83impl<N: Network> Default for TransactionsQueue<N> {
84    fn default() -> Self {
85        Self {
86            deployments: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()),
87            executions: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()),
88        }
89    }
90}
91
92#[derive(Clone)]
93pub struct Consensus<N: Network> {
94    /// The ledger.
95    ledger: Arc<dyn LedgerService<N>>,
96    /// The BFT.
97    bft: BFT<N>,
98    /// The primary sender.
99    primary_sender: Arc<OnceCell<PrimarySender<N>>>,
100    /// The unconfirmed solutions queue.
101    solutions_queue: Arc<Mutex<LruCache<SolutionID<N>, Solution<N>>>>,
102    /// The unconfirmed transactions queue.
103    transactions_queue: Arc<Mutex<TransactionsQueue<N>>>,
104    /// The recently-seen unconfirmed solutions.
105    seen_solutions: Arc<Mutex<LruCache<SolutionID<N>, ()>>>,
106    /// The recently-seen unconfirmed transactions.
107    seen_transactions: Arc<Mutex<LruCache<N::TransactionID, ()>>>,
108    #[cfg(feature = "metrics")]
109    transmissions_queue_timestamps: Arc<Mutex<HashMap<TransmissionID<N>, i64>>>,
110    /// Tracing handle
111    tracing: Option<TracingHandler>,
112    /// The spawned handles.
113    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
114}
115
116impl<N: Network> Consensus<N> {
117    /// Initializes a new instance of consensus.
118    pub fn new(
119        account: Account<N>,
120        ledger: Arc<dyn LedgerService<N>>,
121        keep_state: bool,
122        storage_mode: StorageMode,
123        tracing: Option<TracingHandler>,
124    ) -> Result<Self> {
125        // Initialize the Narwhal transmissions.
126        let transmissions = Arc::new(BFTPersistentStorage::open(storage_mode.clone(), tracing.clone())?);
127        // Initialize the Narwhal storage.
128        let storage =
129            NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::<N>::MAX_GC_ROUNDS as u64, tracing.clone());
130        // Initialize the BFT.
131        let bft = BFT::new(account, storage, keep_state, storage_mode, ledger.clone(), tracing.clone())?;
132        // Return the consensus.
133        Ok(Self {
134            ledger,
135            bft,
136            primary_sender: Default::default(),
137            solutions_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
138            transactions_queue: Default::default(),
139            seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
140            seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
141            #[cfg(feature = "metrics")]
142            transmissions_queue_timestamps: Default::default(),
143            tracing: tracing.clone(),
144            handles: Default::default(),
145        })
146    }
147
148    /// Run the consensus instance.
149    pub async fn run(&mut self, primary_sender: PrimarySender<N>, primary_receiver: PrimaryReceiver<N>) -> Result<()> {
150        let _guard = self.get_tracing_guard();
151        info!("Starting the consensus instance...");
152        // Set the primary sender.
153        self.primary_sender.set(primary_sender.clone()).expect("Primary sender already set");
154
155        // First, initialize the consensus channels.
156        let (consensus_sender, consensus_receiver) = init_consensus_channels();
157        // Then, start the consensus handlers.
158        self.start_handlers(consensus_receiver);
159        // Lastly, the consensus.
160        self.bft.run(Some(consensus_sender), primary_sender, primary_receiver).await?;
161        Ok(())
162    }
163
164    /// Returns the ledger.
165    pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
166        &self.ledger
167    }
168
169    /// Returns the BFT.
170    pub const fn bft(&self) -> &BFT<N> {
171        &self.bft
172    }
173
174    /// Returns the primary sender.
175    pub fn primary_sender(&self) -> &PrimarySender<N> {
176        self.primary_sender.get().expect("Primary sender not set")
177    }
178
179    /// Retruns tracing guard
180    pub fn get_tracing_guard(&self) -> Option<DefaultGuard> {
181        self.tracing.clone().map(|trace_handle| trace_handle.subscribe_thread())
182    }
183}
184
185impl<N: Network> Consensus<N> {
186    /// Returns the number of unconfirmed transmissions.
187    pub fn num_unconfirmed_transmissions(&self) -> usize {
188        self.bft.num_unconfirmed_transmissions()
189    }
190
191    /// Returns the number of unconfirmed ratifications.
192    pub fn num_unconfirmed_ratifications(&self) -> usize {
193        self.bft.num_unconfirmed_ratifications()
194    }
195
196    /// Returns the number of solutions.
197    pub fn num_unconfirmed_solutions(&self) -> usize {
198        self.bft.num_unconfirmed_solutions()
199    }
200
201    /// Returns the number of unconfirmed transactions.
202    pub fn num_unconfirmed_transactions(&self) -> usize {
203        self.bft.num_unconfirmed_transactions()
204    }
205}
206
207impl<N: Network> Consensus<N> {
208    /// Returns the unconfirmed transmission IDs.
209    pub fn unconfirmed_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
210        self.worker_transmission_ids().chain(self.inbound_transmission_ids())
211    }
212
213    /// Returns the unconfirmed transmissions.
214    pub fn unconfirmed_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
215        self.worker_transmissions().chain(self.inbound_transmissions())
216    }
217
218    /// Returns the unconfirmed solutions.
219    pub fn unconfirmed_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
220        self.worker_solutions().chain(self.inbound_solutions())
221    }
222
223    /// Returns the unconfirmed transactions.
224    pub fn unconfirmed_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
225        self.worker_transactions().chain(self.inbound_transactions())
226    }
227}
228
229impl<N: Network> Consensus<N> {
230    /// Returns the worker transmission IDs.
231    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
232        self.bft.worker_transmission_ids()
233    }
234
235    /// Returns the worker transmissions.
236    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
237        self.bft.worker_transmissions()
238    }
239
240    /// Returns the worker solutions.
241    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
242        self.bft.worker_solutions()
243    }
244
245    /// Returns the worker transactions.
246    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
247        self.bft.worker_transactions()
248    }
249}
250
251impl<N: Network> Consensus<N> {
252    /// Returns the transmission IDs in the inbound queue.
253    pub fn inbound_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
254        self.inbound_transmissions().map(|(id, _)| id)
255    }
256
257    /// Returns the transmissions in the inbound queue.
258    pub fn inbound_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
259        self.inbound_transactions()
260            .map(|(id, tx)| {
261                (
262                    TransmissionID::Transaction(id, tx.to_checksum::<N>().unwrap_or_default()),
263                    Transmission::Transaction(tx),
264                )
265            })
266            .chain(self.inbound_solutions().map(|(id, solution)| {
267                (
268                    TransmissionID::Solution(id, solution.to_checksum::<N>().unwrap_or_default()),
269                    Transmission::Solution(solution),
270                )
271            }))
272    }
273
274    /// Returns the solutions in the inbound queue.
275    pub fn inbound_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
276        // Return an iterator over the solutions in the inbound queue.
277        self.solutions_queue.lock().clone().into_iter().map(|(id, solution)| (id, Data::Object(solution)))
278    }
279
280    /// Returns the transactions in the inbound queue.
281    pub fn inbound_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
282        // Acquire the lock on the transactions queue.
283        let tx_queue = self.transactions_queue.lock();
284        // Return an iterator over the deployment and execution transactions in the inbound queue.
285        tx_queue
286            .deployments
287            .clone()
288            .into_iter()
289            .chain(tx_queue.executions.clone())
290            .map(|(id, tx)| (id, Data::Object(tx)))
291    }
292}
293
294impl<N: Network> Consensus<N> {
295    /// Adds the given unconfirmed solution to the memory pool.
296    pub async fn add_unconfirmed_solution(&self, solution: Solution<N>) -> Result<()> {
297        let _guard = self.get_tracing_guard();
298
299        // Calculate the transmission checksum.
300        let checksum = Data::<Solution<N>>::Buffer(solution.to_bytes_le()?.into()).to_checksum::<N>()?;
301        #[cfg(feature = "metrics")]
302        {
303            metrics::increment_gauge(metrics::consensus::UNCONFIRMED_SOLUTIONS, 1f64);
304            let timestamp = amareleo_node_bft::helpers::now();
305            self.transmissions_queue_timestamps
306                .lock()
307                .insert(TransmissionID::Solution(solution.id(), checksum), timestamp);
308        }
309        // Queue the unconfirmed solution.
310        {
311            let solution_id = solution.id();
312
313            // Check if the transaction was recently seen.
314            if self.seen_solutions.lock().put(solution_id, ()).is_some() {
315                // If the transaction was recently seen, return early.
316                return Ok(());
317            }
318            // Check if the solution already exists in the ledger.
319            if self.ledger.contains_transmission(&TransmissionID::Solution(solution_id, checksum))? {
320                bail!("Solution '{}' exists in the ledger {}", fmt_id(solution_id), "(skipping)".dimmed());
321            }
322            // Add the solution to the memory pool.
323            trace!("Received unconfirmed solution '{}' in the queue", fmt_id(solution_id));
324            if self.solutions_queue.lock().put(solution_id, solution).is_some() {
325                bail!("Solution '{}' exists in the memory pool", fmt_id(solution_id));
326            }
327        }
328
329        // Try to process the unconfirmed solutions in the memory pool.
330        self.process_unconfirmed_solutions().await
331    }
332
333    /// Processes unconfirmed transactions in the memory pool.
334    pub async fn process_unconfirmed_solutions(&self) -> Result<()> {
335        let _guard = self.get_tracing_guard();
336
337        // If the memory pool of this node is full, return early.
338        let num_unconfirmed_solutions = self.num_unconfirmed_solutions();
339        let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
340        if num_unconfirmed_solutions >= N::MAX_SOLUTIONS
341            || num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE
342        {
343            return Ok(());
344        }
345        // Retrieve the solutions.
346        let solutions = {
347            // Determine the available capacity.
348            let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions);
349            // Acquire the lock on the queue.
350            let mut queue = self.solutions_queue.lock();
351            // Determine the number of solutions to send.
352            let num_solutions = queue.len().min(capacity);
353            // Drain the solutions from the queue.
354            (0..num_solutions).filter_map(|_| queue.pop_lru().map(|(_, solution)| solution)).collect::<Vec<_>>()
355        };
356        // Iterate over the solutions.
357        for solution in solutions.into_iter() {
358            let solution_id = solution.id();
359            trace!("Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id));
360            // Send the unconfirmed solution to the primary.
361            if let Err(e) = self.primary_sender().send_unconfirmed_solution(solution_id, Data::Object(solution)).await {
362                // If the BFT is synced, then log the warning.
363                if self.bft.is_synced() {
364                    // If error occurs after the first 10 blocks of the epoch, log it as a warning, otherwise ignore.
365                    if self.ledger().latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
366                        warn!("Failed to add unconfirmed solution '{}' to the memory pool - {e}", fmt_id(solution_id))
367                    };
368                }
369            }
370        }
371        Ok(())
372    }
373
374    /// Adds the given unconfirmed transaction to the memory pool.
375    pub async fn add_unconfirmed_transaction(&self, transaction: Transaction<N>) -> Result<()> {
376        let _guard = self.get_tracing_guard();
377
378        // Calculate the transmission checksum.
379        let checksum = Data::<Transaction<N>>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::<N>()?;
380        #[cfg(feature = "metrics")]
381        {
382            metrics::increment_gauge(metrics::consensus::UNCONFIRMED_TRANSACTIONS, 1f64);
383            let timestamp = amareleo_node_bft::helpers::now();
384            self.transmissions_queue_timestamps
385                .lock()
386                .insert(TransmissionID::Transaction(transaction.id(), checksum), timestamp);
387        }
388        // Queue the unconfirmed transaction.
389        {
390            let transaction_id = transaction.id();
391
392            // Check that the transaction is not a fee transaction.
393            if transaction.is_fee() {
394                bail!("Transaction '{}' is a fee transaction {}", fmt_id(transaction_id), "(skipping)".dimmed());
395            }
396            // Check if the transaction was recently seen.
397            if self.seen_transactions.lock().put(transaction_id, ()).is_some() {
398                // If the transaction was recently seen, return early.
399                return Ok(());
400            }
401            // Check if the transaction already exists in the ledger.
402            if self.ledger.contains_transmission(&TransmissionID::Transaction(transaction_id, checksum))? {
403                bail!("Transaction '{}' exists in the ledger {}", fmt_id(transaction_id), "(skipping)".dimmed());
404            }
405            // Add the transaction to the memory pool.
406            trace!("Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id));
407            if transaction.is_deploy() {
408                if self.transactions_queue.lock().deployments.put(transaction_id, transaction).is_some() {
409                    bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
410                }
411            } else if self.transactions_queue.lock().executions.put(transaction_id, transaction).is_some() {
412                bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
413            }
414
415            // Try to process the unconfirmed transactions in the memory pool.
416            self.process_unconfirmed_transactions().await
417        }
418    }
419
420    /// Processes unconfirmed transactions in the memory pool.
421    pub async fn process_unconfirmed_transactions(&self) -> Result<()> {
422        let _guard = self.get_tracing_guard();
423
424        // If the memory pool of this node is full, return early.
425        let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
426        if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE {
427            return Ok(());
428        }
429        // Retrieve the transactions.
430        let transactions = {
431            // Determine the available capacity.
432            let capacity = Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions);
433            // Acquire the lock on the transactions queue.
434            let mut tx_queue = self.transactions_queue.lock();
435            // Determine the number of deployments to send.
436            let num_deployments = tx_queue.deployments.len().min(capacity).min(MAX_DEPLOYMENTS_PER_INTERVAL);
437            // Determine the number of executions to send.
438            let num_executions = tx_queue.executions.len().min(capacity.saturating_sub(num_deployments));
439            // Create an iterator which will select interleaved deployments and executions within the capacity.
440            // Note: interleaving ensures we will never have consecutive invalid deployments blocking the queue.
441            let selector_iter = (0..num_deployments).map(|_| true).interleave((0..num_executions).map(|_| false));
442            // Drain the transactions from the queue, interleaving deployments and executions.
443            selector_iter
444                .filter_map(|select_deployment| {
445                    if select_deployment {
446                        tx_queue.deployments.pop_lru().map(|(_, tx)| tx)
447                    } else {
448                        tx_queue.executions.pop_lru().map(|(_, tx)| tx)
449                    }
450                })
451                .collect_vec()
452        };
453        // Iterate over the transactions.
454        for transaction in transactions.into_iter() {
455            let transaction_id = transaction.id();
456            trace!("Adding unconfirmed transaction '{}' to the memory pool...", fmt_id(transaction_id));
457            // Send the unconfirmed transaction to the primary.
458            if let Err(e) =
459                self.primary_sender().send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await
460            {
461                // If the BFT is synced, then log the warning.
462                if self.bft.is_synced() {
463                    warn!(
464                        "Failed to add unconfirmed transaction '{}' to the memory pool - {e}",
465                        fmt_id(transaction_id)
466                    );
467                }
468            }
469        }
470        Ok(())
471    }
472}
473
474impl<N: Network> Consensus<N> {
475    /// Starts the consensus handlers.
476    fn start_handlers(&self, consensus_receiver: ConsensusReceiver<N>) {
477        let ConsensusReceiver { mut rx_consensus_subdag } = consensus_receiver;
478
479        // Process the committed subdag and transmissions from the BFT.
480        let self_ = self.clone();
481        self.spawn(async move {
482            while let Some((committed_subdag, transmissions, callback)) = rx_consensus_subdag.recv().await {
483                self_.process_bft_subdag(committed_subdag, transmissions, callback).await;
484            }
485        });
486
487        // Process the unconfirmed transactions in the memory pool.
488        let self_ = self.clone();
489        let trace_guard = self.get_tracing_guard();
490        self.spawn(async move {
491            let _guard = trace_guard;
492            loop {
493                // Sleep briefly.
494                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
495                // Process the unconfirmed transactions in the memory pool.
496                if let Err(e) = self_.process_unconfirmed_transactions().await {
497                    warn!("Cannot process unconfirmed transactions - {e}");
498                }
499                // Process the unconfirmed solutions in the memory pool.
500                if let Err(e) = self_.process_unconfirmed_solutions().await {
501                    warn!("Cannot process unconfirmed solutions - {e}");
502                }
503            }
504        });
505    }
506
507    /// Processes the committed subdag and transmissions from the BFT.
508    async fn process_bft_subdag(
509        &self,
510        subdag: Subdag<N>,
511        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
512        callback: oneshot::Sender<Result<()>>,
513    ) {
514        let _guard = self.get_tracing_guard();
515
516        // Try to advance to the next block.
517        let self_ = self.clone();
518        let transmissions_ = transmissions.clone();
519        let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_) };
520
521        // If the block failed to advance, reinsert the transmissions into the memory pool.
522        if let Err(e) = &result {
523            error!("Unable to advance to the next block - {e}");
524            // On failure, reinsert the transmissions into the memory pool.
525            self.reinsert_transmissions(transmissions).await;
526        }
527        // Send the callback **after** advancing to the next block.
528        // Note: We must await the block to be advanced before sending the callback.
529        callback.send(result).ok();
530    }
531
532    /// Attempts to advance to the next block.
533    fn try_advance_to_next_block(
534        &self,
535        subdag: Subdag<N>,
536        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
537    ) -> Result<()> {
538        #[cfg(feature = "metrics")]
539        let start = subdag.leader_certificate().batch_header().timestamp();
540        #[cfg(feature = "metrics")]
541        let num_committed_certificates = subdag.values().map(|c| c.len()).sum::<usize>();
542        #[cfg(feature = "metrics")]
543        let current_block_timestamp = self.ledger.latest_block().header().metadata().timestamp();
544
545        // Create the candidate next block.
546        let next_block = self.ledger.prepare_advance_to_next_quorum_block(subdag, transmissions)?;
547
548        // AlexZ: Disabling block validation since this would catch our
549        //        cheating in always assigning the same leader.
550        //        Validation is within SnarkVM which we don't want to touch.
551        // Check that the block is well-formed.
552        // self.ledger.check_next_block(&next_block)?;
553
554        // Advance to the next block.
555        self.ledger.advance_to_next_block(&next_block)?;
556
557        // If the next block starts a new epoch, clear the existing solutions.
558        if next_block.height() % N::NUM_BLOCKS_PER_EPOCH == 0 {
559            // Clear the solutions queue.
560            self.solutions_queue.lock().clear();
561            // Clear the worker solutions.
562            self.bft.primary().clear_worker_solutions();
563        }
564
565        #[cfg(feature = "metrics")]
566        {
567            let elapsed = std::time::Duration::from_secs((amareleo_node_bft::helpers::now() - start) as u64);
568            let next_block_timestamp = next_block.header().metadata().timestamp();
569            let block_latency = next_block_timestamp - current_block_timestamp;
570            let proof_target = next_block.header().proof_target();
571            let coinbase_target = next_block.header().coinbase_target();
572            let cumulative_proof_target = next_block.header().cumulative_proof_target();
573
574            metrics::add_transmission_latency_metric(&self.transmissions_queue_timestamps, &next_block);
575
576            metrics::gauge(metrics::consensus::COMMITTED_CERTIFICATES, num_committed_certificates as f64);
577            metrics::histogram(metrics::consensus::CERTIFICATE_COMMIT_LATENCY, elapsed.as_secs_f64());
578            metrics::histogram(metrics::consensus::BLOCK_LATENCY, block_latency as f64);
579            metrics::gauge(metrics::blocks::PROOF_TARGET, proof_target as f64);
580            metrics::gauge(metrics::blocks::COINBASE_TARGET, coinbase_target as f64);
581            metrics::gauge(metrics::blocks::CUMULATIVE_PROOF_TARGET, cumulative_proof_target as f64);
582        }
583        Ok(())
584    }
585
586    /// Reinserts the given transmissions into the memory pool.
587    async fn reinsert_transmissions(&self, transmissions: IndexMap<TransmissionID<N>, Transmission<N>>) {
588        let _guard = self.get_tracing_guard();
589
590        // Iterate over the transmissions.
591        for (transmission_id, transmission) in transmissions.into_iter() {
592            // Reinsert the transmission into the memory pool.
593            if let Err(e) = self.reinsert_transmission(transmission_id, transmission).await {
594                warn!(
595                    "Unable to reinsert transmission {}.{} into the memory pool - {e}",
596                    fmt_id(transmission_id),
597                    fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
598                );
599            }
600        }
601    }
602
603    /// Reinserts the given transmission into the memory pool.
604    async fn reinsert_transmission(
605        &self,
606        transmission_id: TransmissionID<N>,
607        transmission: Transmission<N>,
608    ) -> Result<()> {
609        // Initialize a callback sender and receiver.
610        let (callback, callback_receiver) = oneshot::channel();
611        // Send the transmission to the primary.
612        match (transmission_id, transmission) {
613            (TransmissionID::Ratification, Transmission::Ratification) => return Ok(()),
614            (TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => {
615                // Send the solution to the primary.
616                self.primary_sender().tx_unconfirmed_solution.send((solution_id, solution, callback)).await?;
617            }
618            (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) => {
619                // Send the transaction to the primary.
620                self.primary_sender().tx_unconfirmed_transaction.send((transaction_id, transaction, callback)).await?;
621            }
622            _ => bail!("Mismatching `(transmission_id, transmission)` pair in consensus"),
623        }
624        // Await the callback.
625        callback_receiver.await?
626    }
627
628    /// Spawns a task with the given future; it should only be used for long-running tasks.
629    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
630        self.handles.lock().push(tokio::spawn(future));
631    }
632
633    /// Shuts down the BFT.
634    pub async fn shut_down(&self) {
635        let _guard = self.get_tracing_guard();
636        info!("Shutting down consensus...");
637        // Shut down the BFT.
638        self.bft.shut_down().await;
639        // Abort the tasks.
640        self.handles.lock().iter().for_each(|handle| handle.abort());
641    }
642}