amareleo_node_consensus/
lib.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16#![forbid(unsafe_code)]
17
18#[macro_use]
19extern crate tracing;
20
21#[macro_use]
22extern crate amareleo_chain_tracing;
23
24use amareleo_chain_account::Account;
25use amareleo_chain_tracing::{TracingHandler, TracingHandlerGuard};
26use amareleo_node_bft::{
27    BFT,
28    MAX_BATCH_DELAY_IN_MS,
29    Primary,
30    helpers::{
31        ConsensusReceiver,
32        PrimaryReceiver,
33        PrimarySender,
34        Storage as NarwhalStorage,
35        fmt_id,
36        init_consensus_channels,
37    },
38    spawn_blocking,
39};
40use amareleo_node_bft_ledger_service::LedgerService;
41use amareleo_node_bft_storage_service::BFTPersistentStorage;
42use snarkvm::{
43    ledger::{
44        block::Transaction,
45        narwhal::{BatchHeader, Data, Subdag, Transmission, TransmissionID},
46        puzzle::{Solution, SolutionID},
47    },
48    prelude::*,
49};
50
51use aleo_std::StorageMode;
52use anyhow::Result;
53use colored::Colorize;
54use indexmap::IndexMap;
55#[cfg(feature = "locktick")]
56use locktick::parking_lot::Mutex;
57use lru::LruCache;
58#[cfg(not(feature = "locktick"))]
59use parking_lot::Mutex;
60use std::{future::Future, num::NonZeroUsize, sync::Arc, time::Duration};
61use tokio::{
62    sync::{OnceCell, oneshot},
63    task::JoinHandle,
64};
65use tracing::subscriber::DefaultGuard;
66
67#[cfg(feature = "metrics")]
68use std::collections::HashMap;
69
70/// The capacity of the queue reserved for deployments.
71/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
72const CAPACITY_FOR_DEPLOYMENTS: usize = 1 << 10;
73/// The capacity of the queue reserved for executions.
74/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
75const CAPACITY_FOR_EXECUTIONS: usize = 1 << 10;
76/// The capacity of the queue reserved for solutions.
77/// Note: This is an inbound queue capacity, not a Narwhal-enforced capacity.
78const CAPACITY_FOR_SOLUTIONS: usize = 1 << 10;
79/// The **suggested** maximum number of deployments in each interval.
80/// Note: This is an inbound queue limit, not a Narwhal-enforced limit.
81const MAX_DEPLOYMENTS_PER_INTERVAL: usize = 1;
82
83/// Helper struct to track incoming transactions.
84struct TransactionsQueue<N: Network> {
85    pub deployments: LruCache<N::TransactionID, Transaction<N>>,
86    pub executions: LruCache<N::TransactionID, Transaction<N>>,
87}
88
89impl<N: Network> Default for TransactionsQueue<N> {
90    fn default() -> Self {
91        Self {
92            deployments: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_DEPLOYMENTS).unwrap()),
93            executions: LruCache::new(NonZeroUsize::new(CAPACITY_FOR_EXECUTIONS).unwrap()),
94        }
95    }
96}
97
98#[derive(Clone)]
99pub struct Consensus<N: Network> {
100    /// The ledger.
101    ledger: Arc<dyn LedgerService<N>>,
102    /// The BFT.
103    bft: BFT<N>,
104    /// The primary sender.
105    primary_sender: Arc<OnceCell<PrimarySender<N>>>,
106    /// The unconfirmed solutions queue.
107    solutions_queue: Arc<Mutex<LruCache<SolutionID<N>, Solution<N>>>>,
108    /// The unconfirmed transactions queue.
109    transactions_queue: Arc<Mutex<TransactionsQueue<N>>>,
110    /// The recently-seen unconfirmed solutions.
111    seen_solutions: Arc<Mutex<LruCache<SolutionID<N>, ()>>>,
112    /// The recently-seen unconfirmed transactions.
113    seen_transactions: Arc<Mutex<LruCache<N::TransactionID, ()>>>,
114    #[cfg(feature = "metrics")]
115    transmissions_queue_timestamps: Arc<Mutex<HashMap<TransmissionID<N>, i64>>>,
116    /// Tracing handle
117    tracing: Option<TracingHandler>,
118    /// The spawned handles.
119    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
120}
121
122impl<N: Network> TracingHandlerGuard for Consensus<N> {
123    /// Retruns tracing guard
124    fn get_tracing_guard(&self) -> Option<DefaultGuard> {
125        self.tracing.as_ref().and_then(|trace_handle| trace_handle.get_tracing_guard())
126    }
127}
128
129impl<N: Network> Consensus<N> {
130    /// Initializes a new instance of consensus.
131    pub fn new(
132        account: Account<N>,
133        ledger: Arc<dyn LedgerService<N>>,
134        keep_state: bool,
135        storage_mode: StorageMode,
136        tracing: Option<TracingHandler>,
137    ) -> Result<Self> {
138        // Initialize the Narwhal transmissions.
139        let transmissions = Arc::new(BFTPersistentStorage::open(storage_mode.clone(), tracing.clone())?);
140        // Initialize the Narwhal storage.
141        let storage =
142            NarwhalStorage::new(ledger.clone(), transmissions, BatchHeader::<N>::MAX_GC_ROUNDS as u64, tracing.clone());
143        // Initialize the BFT.
144        let bft = BFT::new(account, storage, keep_state, storage_mode, ledger.clone(), tracing.clone())?;
145        // Return the consensus.
146        Ok(Self {
147            ledger,
148            bft,
149            primary_sender: Default::default(),
150            solutions_queue: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(CAPACITY_FOR_SOLUTIONS).unwrap()))),
151            transactions_queue: Default::default(),
152            seen_solutions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
153            seen_transactions: Arc::new(Mutex::new(LruCache::new(NonZeroUsize::new(1 << 16).unwrap()))),
154            #[cfg(feature = "metrics")]
155            transmissions_queue_timestamps: Default::default(),
156            tracing: tracing.clone(),
157            handles: Default::default(),
158        })
159    }
160
161    /// Run the consensus instance.
162    pub async fn run(&mut self, primary_sender: PrimarySender<N>, primary_receiver: PrimaryReceiver<N>) -> Result<()> {
163        guard_info!(self, "Starting the consensus instance...");
164
165        // Set the primary sender.
166        let result = self.primary_sender.set(primary_sender.clone());
167        if result.is_err() {
168            bail!("Unexpected: Primary sender already set");
169        }
170
171        // Initialize communications between Consensus and BFT.
172        let (consensus_sender, consensus_receiver) = init_consensus_channels();
173
174        // Start the Consensus handlers.
175        self.start_handlers(consensus_receiver);
176
177        // Run the BFT.
178        let result = self.bft.run(Some(consensus_sender), primary_sender, primary_receiver).await;
179        if let Err(err) = result {
180            guard_error!(self, "Consensus failed to run the BFT instance - {err}");
181            self.shut_down().await;
182            return Err(err);
183        }
184
185        Ok(())
186    }
187
188    /// Returns the ledger.
189    pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
190        &self.ledger
191    }
192
193    /// Returns the BFT.
194    pub const fn bft(&self) -> &BFT<N> {
195        &self.bft
196    }
197
198    /// Returns the primary sender.
199    fn primary_sender(&self) -> &PrimarySender<N> {
200        self.primary_sender.get().expect("Primary sender not set")
201    }
202}
203
204impl<N: Network> Consensus<N> {
205    /// Returns the number of unconfirmed transmissions.
206    pub fn num_unconfirmed_transmissions(&self) -> usize {
207        self.bft.num_unconfirmed_transmissions()
208    }
209
210    /// Returns the number of unconfirmed ratifications.
211    pub fn num_unconfirmed_ratifications(&self) -> usize {
212        self.bft.num_unconfirmed_ratifications()
213    }
214
215    /// Returns the number of solutions.
216    pub fn num_unconfirmed_solutions(&self) -> usize {
217        self.bft.num_unconfirmed_solutions()
218    }
219
220    /// Returns the number of unconfirmed transactions.
221    pub fn num_unconfirmed_transactions(&self) -> usize {
222        self.bft.num_unconfirmed_transactions()
223    }
224}
225
226impl<N: Network> Consensus<N> {
227    /// Returns the unconfirmed transmission IDs.
228    pub fn unconfirmed_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
229        self.worker_transmission_ids().chain(self.inbound_transmission_ids())
230    }
231
232    /// Returns the unconfirmed transmissions.
233    pub fn unconfirmed_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
234        self.worker_transmissions().chain(self.inbound_transmissions())
235    }
236
237    /// Returns the unconfirmed solutions.
238    pub fn unconfirmed_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
239        self.worker_solutions().chain(self.inbound_solutions())
240    }
241
242    /// Returns the unconfirmed transactions.
243    pub fn unconfirmed_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
244        self.worker_transactions().chain(self.inbound_transactions())
245    }
246}
247
248impl<N: Network> Consensus<N> {
249    /// Returns the worker transmission IDs.
250    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
251        self.bft.worker_transmission_ids()
252    }
253
254    /// Returns the worker transmissions.
255    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
256        self.bft.worker_transmissions()
257    }
258
259    /// Returns the worker solutions.
260    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
261        self.bft.worker_solutions()
262    }
263
264    /// Returns the worker transactions.
265    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
266        self.bft.worker_transactions()
267    }
268}
269
270impl<N: Network> Consensus<N> {
271    /// Returns the transmission IDs in the inbound queue.
272    pub fn inbound_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
273        self.inbound_transmissions().map(|(id, _)| id)
274    }
275
276    /// Returns the transmissions in the inbound queue.
277    pub fn inbound_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
278        self.inbound_transactions()
279            .map(|(id, tx)| {
280                (
281                    TransmissionID::Transaction(id, tx.to_checksum::<N>().unwrap_or_default()),
282                    Transmission::Transaction(tx),
283                )
284            })
285            .chain(self.inbound_solutions().map(|(id, solution)| {
286                (
287                    TransmissionID::Solution(id, solution.to_checksum::<N>().unwrap_or_default()),
288                    Transmission::Solution(solution),
289                )
290            }))
291    }
292
293    /// Returns the solutions in the inbound queue.
294    pub fn inbound_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
295        // Return an iterator over the solutions in the inbound queue.
296        self.solutions_queue.lock().clone().into_iter().map(|(id, solution)| (id, Data::Object(solution)))
297    }
298
299    /// Returns the transactions in the inbound queue.
300    pub fn inbound_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
301        // Acquire the lock on the transactions queue.
302        let tx_queue = self.transactions_queue.lock();
303        // Return an iterator over the deployment and execution transactions in the inbound queue.
304        tx_queue
305            .deployments
306            .clone()
307            .into_iter()
308            .chain(tx_queue.executions.clone())
309            .map(|(id, tx)| (id, Data::Object(tx)))
310    }
311}
312
313impl<N: Network> Consensus<N> {
314    /// Adds the given unconfirmed solution to the memory pool.
315    pub async fn add_unconfirmed_solution(&self, solution: Solution<N>) -> Result<()> {
316        // Calculate the transmission checksum.
317        let checksum = Data::<Solution<N>>::Buffer(solution.to_bytes_le()?.into()).to_checksum::<N>()?;
318        #[cfg(feature = "metrics")]
319        {
320            metrics::increment_gauge(metrics::consensus::UNCONFIRMED_SOLUTIONS, 1f64);
321            let timestamp = amareleo_node_bft::helpers::now();
322            self.transmissions_queue_timestamps
323                .lock()
324                .insert(TransmissionID::Solution(solution.id(), checksum), timestamp);
325        }
326        // Queue the unconfirmed solution.
327        {
328            let solution_id = solution.id();
329
330            // Check if the transaction was recently seen.
331            if self.seen_solutions.lock().put(solution_id, ()).is_some() {
332                // If the transaction was recently seen, return early.
333                return Ok(());
334            }
335            // Check if the solution already exists in the ledger.
336            if self.ledger.contains_transmission(&TransmissionID::Solution(solution_id, checksum))? {
337                bail!("Solution '{}' exists in the ledger {}", fmt_id(solution_id), "(skipping)".dimmed());
338            }
339            // Add the solution to the memory pool.
340            guard_trace!(self, "Received unconfirmed solution '{}' in the queue", fmt_id(solution_id));
341            if self.solutions_queue.lock().put(solution_id, solution).is_some() {
342                bail!("Solution '{}' exists in the memory pool", fmt_id(solution_id));
343            }
344        }
345
346        // Try to process the unconfirmed solutions in the memory pool.
347        self.process_unconfirmed_solutions().await
348    }
349
350    /// Processes unconfirmed transactions in the memory pool.
351    pub async fn process_unconfirmed_solutions(&self) -> Result<()> {
352        // If the memory pool of this node is full, return early.
353        let num_unconfirmed_solutions = self.num_unconfirmed_solutions();
354        let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
355        if num_unconfirmed_solutions >= N::MAX_SOLUTIONS
356            || num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE
357        {
358            return Ok(());
359        }
360        // Retrieve the solutions.
361        let solutions = {
362            // Determine the available capacity.
363            let capacity = N::MAX_SOLUTIONS.saturating_sub(num_unconfirmed_solutions);
364            // Acquire the lock on the queue.
365            let mut queue = self.solutions_queue.lock();
366            // Determine the number of solutions to send.
367            let num_solutions = queue.len().min(capacity);
368            // Drain the solutions from the queue.
369            (0..num_solutions).filter_map(|_| queue.pop_lru().map(|(_, solution)| solution)).collect::<Vec<_>>()
370        };
371        // Iterate over the solutions.
372        for solution in solutions.into_iter() {
373            let solution_id = solution.id();
374            guard_trace!(self, "Adding unconfirmed solution '{}' to the memory pool...", fmt_id(solution_id));
375            // Send the unconfirmed solution to the primary.
376            if let Err(e) = self.primary_sender().send_unconfirmed_solution(solution_id, Data::Object(solution)).await {
377                // If the BFT is synced, then log the warning.
378                if self.bft.is_synced() {
379                    // If error occurs after the first 10 blocks of the epoch, log it as a warning, otherwise ignore.
380                    if self.ledger().latest_block_height() % N::NUM_BLOCKS_PER_EPOCH > 10 {
381                        guard_warn!(
382                            self,
383                            "Failed to add unconfirmed solution '{}' to the memory pool - {e}",
384                            fmt_id(solution_id)
385                        )
386                    };
387                }
388            }
389        }
390        Ok(())
391    }
392
393    /// Adds the given unconfirmed transaction to the memory pool.
394    pub async fn add_unconfirmed_transaction(&self, transaction: Transaction<N>) -> Result<()> {
395        // Calculate the transmission checksum.
396        let checksum = Data::<Transaction<N>>::Buffer(transaction.to_bytes_le()?.into()).to_checksum::<N>()?;
397        #[cfg(feature = "metrics")]
398        {
399            metrics::increment_gauge(metrics::consensus::UNCONFIRMED_TRANSACTIONS, 1f64);
400            let timestamp = amareleo_node_bft::helpers::now();
401            self.transmissions_queue_timestamps
402                .lock()
403                .insert(TransmissionID::Transaction(transaction.id(), checksum), timestamp);
404        }
405        // Queue the unconfirmed transaction.
406        {
407            let transaction_id = transaction.id();
408
409            // Check that the transaction is not a fee transaction.
410            if transaction.is_fee() {
411                bail!("Transaction '{}' is a fee transaction {}", fmt_id(transaction_id), "(skipping)".dimmed());
412            }
413            // Check if the transaction was recently seen.
414            if self.seen_transactions.lock().put(transaction_id, ()).is_some() {
415                // If the transaction was recently seen, return early.
416                return Ok(());
417            }
418            // Check if the transaction already exists in the ledger.
419            if self.ledger.contains_transmission(&TransmissionID::Transaction(transaction_id, checksum))? {
420                bail!("Transaction '{}' exists in the ledger {}", fmt_id(transaction_id), "(skipping)".dimmed());
421            }
422            // Add the transaction to the memory pool.
423            guard_trace!(self, "Received unconfirmed transaction '{}' in the queue", fmt_id(transaction_id));
424            if transaction.is_deploy() {
425                if self.transactions_queue.lock().deployments.put(transaction_id, transaction).is_some() {
426                    bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
427                }
428            } else if self.transactions_queue.lock().executions.put(transaction_id, transaction).is_some() {
429                bail!("Transaction '{}' exists in the memory pool", fmt_id(transaction_id));
430            }
431
432            // Try to process the unconfirmed transactions in the memory pool.
433            self.process_unconfirmed_transactions().await
434        }
435    }
436
437    /// Processes unconfirmed transactions in the memory pool.
438    pub async fn process_unconfirmed_transactions(&self) -> Result<()> {
439        // If the memory pool of this node is full, return early.
440        let num_unconfirmed_transmissions = self.num_unconfirmed_transmissions();
441        if num_unconfirmed_transmissions >= Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE {
442            return Ok(());
443        }
444        // Retrieve the transactions.
445        let transactions = {
446            // Determine the available capacity.
447            let capacity = Primary::<N>::MAX_TRANSMISSIONS_TOLERANCE.saturating_sub(num_unconfirmed_transmissions);
448            // Acquire the lock on the transactions queue.
449            let mut tx_queue = self.transactions_queue.lock();
450            // Determine the number of deployments to send.
451            let num_deployments = tx_queue.deployments.len().min(capacity).min(MAX_DEPLOYMENTS_PER_INTERVAL);
452            // Determine the number of executions to send.
453            let num_executions = tx_queue.executions.len().min(capacity.saturating_sub(num_deployments));
454            // Create an iterator which will select interleaved deployments and executions within the capacity.
455            // Note: interleaving ensures we will never have consecutive invalid deployments blocking the queue.
456            let selector_iter = (0..num_deployments).map(|_| true).interleave((0..num_executions).map(|_| false));
457            // Drain the transactions from the queue, interleaving deployments and executions.
458            selector_iter
459                .filter_map(|select_deployment| {
460                    if select_deployment {
461                        tx_queue.deployments.pop_lru().map(|(_, tx)| tx)
462                    } else {
463                        tx_queue.executions.pop_lru().map(|(_, tx)| tx)
464                    }
465                })
466                .collect_vec()
467        };
468        // Iterate over the transactions.
469        for transaction in transactions.into_iter() {
470            let transaction_id = transaction.id();
471            guard_trace!(self, "Adding unconfirmed transaction '{}' to the memory pool...", fmt_id(transaction_id));
472            // Send the unconfirmed transaction to the primary.
473            if let Err(e) =
474                self.primary_sender().send_unconfirmed_transaction(transaction_id, Data::Object(transaction)).await
475            {
476                // If the BFT is synced, then log the warning.
477                if self.bft.is_synced() {
478                    guard_warn!(
479                        self,
480                        "Failed to add unconfirmed transaction '{}' to the memory pool - {e}",
481                        fmt_id(transaction_id)
482                    );
483                }
484            }
485        }
486        Ok(())
487    }
488}
489
490impl<N: Network> Consensus<N> {
491    /// Starts the consensus handlers.
492    fn start_handlers(&self, consensus_receiver: ConsensusReceiver<N>) {
493        let ConsensusReceiver { mut rx_consensus_subdag } = consensus_receiver;
494
495        // Process the committed subdag and transmissions from the BFT.
496        let self_ = self.clone();
497        self.spawn(async move {
498            while let Some((committed_subdag, transmissions, callback)) = rx_consensus_subdag.recv().await {
499                self_.process_bft_subdag(committed_subdag, transmissions, callback).await;
500            }
501        });
502
503        // Process the unconfirmed transactions in the memory pool.
504        let self_ = self.clone();
505        self.spawn(async move {
506            loop {
507                // Sleep briefly.
508                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
509                // Process the unconfirmed transactions in the memory pool.
510                if let Err(e) = self_.process_unconfirmed_transactions().await {
511                    guard_warn!(self_, "Cannot process unconfirmed transactions - {e}");
512                }
513                // Process the unconfirmed solutions in the memory pool.
514                if let Err(e) = self_.process_unconfirmed_solutions().await {
515                    guard_warn!(self_, "Cannot process unconfirmed solutions - {e}");
516                }
517            }
518        });
519    }
520
521    /// Processes the committed subdag and transmissions from the BFT.
522    async fn process_bft_subdag(
523        &self,
524        subdag: Subdag<N>,
525        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
526        callback: oneshot::Sender<Result<()>>,
527    ) {
528        // Try to advance to the next block.
529        let self_ = self.clone();
530        let transmissions_ = transmissions.clone();
531        let result = spawn_blocking! { self_.try_advance_to_next_block(subdag, transmissions_) };
532
533        // If the block failed to advance, reinsert the transmissions into the memory pool.
534        if let Err(e) = &result {
535            guard_error!(self, "Unable to advance to the next block - {e}");
536            // On failure, reinsert the transmissions into the memory pool.
537            self.reinsert_transmissions(transmissions).await;
538        }
539        // Send the callback **after** advancing to the next block.
540        // Note: We must await the block to be advanced before sending the callback.
541        callback.send(result).ok();
542    }
543
544    /// Attempts to advance to the next block.
545    fn try_advance_to_next_block(
546        &self,
547        subdag: Subdag<N>,
548        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
549    ) -> Result<()> {
550        #[cfg(feature = "metrics")]
551        let start = subdag.leader_certificate().batch_header().timestamp();
552        #[cfg(feature = "metrics")]
553        let num_committed_certificates = subdag.values().map(|c| c.len()).sum::<usize>();
554        #[cfg(feature = "metrics")]
555        let current_block_timestamp = self.ledger.latest_block().header().metadata().timestamp();
556
557        // Create the candidate next block.
558        let next_block = self.ledger.prepare_advance_to_next_quorum_block(subdag, transmissions)?;
559
560        // AlexZ: Disabling block validation since this would catch our
561        //        cheating in always assigning the same leader.
562        //        Validation is within SnarkVM which we don't want to touch.
563        // Check that the block is well-formed.
564        // self.ledger.check_next_block(&next_block)?;
565
566        // Advance to the next block.
567        self.ledger.advance_to_next_block(&next_block)?;
568
569        // If the next block starts a new epoch, clear the existing solutions.
570        if next_block.height() % N::NUM_BLOCKS_PER_EPOCH == 0 {
571            // Clear the solutions queue.
572            self.solutions_queue.lock().clear();
573            // Clear the worker solutions.
574            self.bft.primary().clear_worker_solutions();
575        }
576
577        #[cfg(feature = "metrics")]
578        {
579            let elapsed = std::time::Duration::from_secs((amareleo_node_bft::helpers::now() - start) as u64);
580            let next_block_timestamp = next_block.header().metadata().timestamp();
581            let block_latency = next_block_timestamp - current_block_timestamp;
582            let proof_target = next_block.header().proof_target();
583            let coinbase_target = next_block.header().coinbase_target();
584            let cumulative_proof_target = next_block.header().cumulative_proof_target();
585
586            metrics::add_transmission_latency_metric(&self.transmissions_queue_timestamps, &next_block);
587
588            metrics::gauge(metrics::consensus::COMMITTED_CERTIFICATES, num_committed_certificates as f64);
589            metrics::histogram(metrics::consensus::CERTIFICATE_COMMIT_LATENCY, elapsed.as_secs_f64());
590            metrics::histogram(metrics::consensus::BLOCK_LATENCY, block_latency as f64);
591            metrics::gauge(metrics::blocks::PROOF_TARGET, proof_target as f64);
592            metrics::gauge(metrics::blocks::COINBASE_TARGET, coinbase_target as f64);
593            metrics::gauge(metrics::blocks::CUMULATIVE_PROOF_TARGET, cumulative_proof_target as f64);
594        }
595        Ok(())
596    }
597
598    /// Reinserts the given transmissions into the memory pool.
599    async fn reinsert_transmissions(&self, transmissions: IndexMap<TransmissionID<N>, Transmission<N>>) {
600        // Iterate over the transmissions.
601        for (transmission_id, transmission) in transmissions.into_iter() {
602            // Reinsert the transmission into the memory pool.
603            if let Err(e) = self.reinsert_transmission(transmission_id, transmission).await {
604                guard_warn!(
605                    self,
606                    "Unable to reinsert transmission {}.{} into the memory pool - {e}",
607                    fmt_id(transmission_id),
608                    fmt_id(transmission_id.checksum().unwrap_or_default()).dimmed()
609                );
610            }
611        }
612    }
613
614    /// Reinserts the given transmission into the memory pool.
615    async fn reinsert_transmission(
616        &self,
617        transmission_id: TransmissionID<N>,
618        transmission: Transmission<N>,
619    ) -> Result<()> {
620        // Initialize a callback sender and receiver.
621        let (callback, callback_receiver) = oneshot::channel();
622        // Send the transmission to the primary.
623        match (transmission_id, transmission) {
624            (TransmissionID::Ratification, Transmission::Ratification) => return Ok(()),
625            (TransmissionID::Solution(solution_id, _), Transmission::Solution(solution)) => {
626                // Send the solution to the primary.
627                self.primary_sender().tx_unconfirmed_solution.send((solution_id, solution, callback)).await?;
628            }
629            (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) => {
630                // Send the transaction to the primary.
631                self.primary_sender().tx_unconfirmed_transaction.send((transaction_id, transaction, callback)).await?;
632            }
633            _ => bail!("Mismatching `(transmission_id, transmission)` pair in consensus"),
634        }
635        // Await the callback.
636        callback_receiver.await?
637    }
638
639    /// Spawns a task with the given future; it should only be used for long-running tasks.
640    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
641        self.handles.lock().push(tokio::spawn(future));
642    }
643
644    /// Shuts down the BFT.
645    pub async fn shut_down(&self) {
646        guard_info!(self, "Shutting down consensus...");
647
648        // Shut down the BFT.
649        self.bft.shut_down().await;
650
651        // Abort Consensus tasks.
652        let mut handles = self.handles.lock();
653        handles.iter().for_each(|handle| handle.abort());
654        handles.clear();
655    }
656}