Skip to main content

snarkos_node_bft/
primary.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Gateway,
18    MAX_BATCH_DELAY_IN_MS,
19    MAX_WORKERS,
20    MIN_BATCH_DELAY_IN_SECS,
21    PRIMARY_PING_IN_MS,
22    Sync,
23    Transport,
24    WORKER_PING_IN_MS,
25    Worker,
26    events::{BatchPropose, BatchSignature, Event},
27    helpers::{
28        PrimaryReceiver,
29        PrimarySender,
30        Proposal,
31        ProposalCache,
32        SignedProposals,
33        Storage,
34        assign_to_worker,
35        assign_to_workers,
36        fmt_id,
37        init_sync_channels,
38        init_worker_channels,
39        now,
40    },
41    spawn_blocking,
42    sync::SyncCallback,
43};
44
45use snarkos_account::Account;
46use snarkos_node_bft_events::PrimaryPing;
47use snarkos_node_bft_ledger_service::LedgerService;
48use snarkos_node_network::PeerPoolHandling;
49use snarkos_node_sync::{BlockSync, DUMMY_SELF_IP, Ping};
50use snarkos_utilities::{CallbackHandle, NodeDataDir};
51
52use snarkvm::{
53    console::{
54        prelude::*,
55        types::{Address, Field},
56    },
57    ledger::{
58        block::Transaction,
59        narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
60        puzzle::{Solution, SolutionID},
61    },
62    prelude::{ConsensusVersion, committee::Committee},
63    utilities::flatten_error,
64};
65
66use anyhow::Context;
67use colored::Colorize;
68use futures::stream::{FuturesUnordered, StreamExt};
69use indexmap::{IndexMap, IndexSet};
70#[cfg(feature = "locktick")]
71use locktick::{
72    parking_lot::{Mutex, RwLock},
73    tokio::Mutex as TMutex,
74};
75#[cfg(not(feature = "locktick"))]
76use parking_lot::{Mutex, RwLock};
77#[cfg(not(feature = "serial"))]
78use rayon::prelude::*;
79#[cfg(feature = "metrics")]
80use std::time::Instant;
81use std::{
82    collections::{HashMap, HashSet},
83    future::Future,
84    net::SocketAddr,
85    sync::{Arc, OnceLock},
86    time::Duration,
87};
88#[cfg(not(feature = "locktick"))]
89use tokio::sync::Mutex as TMutex;
90use tokio::task::JoinHandle;
91
92/// A helper type for an optional proposed batch.
93pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
94
95/// This callback trait allows listening to changes in the Primary, such as round advancement.
96/// This is currently used by BFT.
97#[async_trait::async_trait]
98pub trait PrimaryCallback<N: Network>: Send + std::marker::Sync {
99    /// Notifies that a new round has started.
100    fn update_to_next_round(&self, current_round: u64) -> bool;
101
102    /// Sends a new certificate.
103    async fn add_new_certificate(&self, certificate: BatchCertificate<N>) -> Result<()>;
104}
105
106/// The primary logic of a node.
107/// AleoBFT adopts a primary-worker architecture as described in the Narwhal and Tusk paper (Section 4.2).
108#[derive(Clone)]
109pub struct Primary<N: Network> {
110    /// The sync module enables fetching data from other validators.
111    sync: Sync<N>,
112    /// The gateway allows talking to other nodes in the validator set.
113    gateway: Gateway<N>,
114    /// The storage.
115    storage: Storage<N>,
116    /// The ledger service.
117    ledger: Arc<dyn LedgerService<N>>,
118    /// The workers.
119    workers: Arc<OnceLock<Vec<Worker<N>>>>,
120    /// The primary callback (used by [`BFT`]).
121    primary_callback: Arc<CallbackHandle<Arc<dyn PrimaryCallback<N>>>>,
122    /// The batch proposal, if the primary is currently proposing a batch.
123    proposed_batch: Arc<ProposedBatch<N>>,
124    /// The timestamp of the most recent proposed batch.
125    latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
126    /// The instant at which the current batch was proposed (used to measure certification latency).
127    /// (used for higher precision in the metrics compared to the batch timestamp)
128    #[cfg(feature = "metrics")]
129    batch_propose_start: Arc<Mutex<Option<Instant>>>,
130    /// The recently-signed batch proposals.
131    signed_proposals: Arc<RwLock<SignedProposals<N>>>,
132    /// The handles for all background tasks spawned by this primary.
133    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
134    /// The lock for propose_batch.
135    propose_lock: Arc<TMutex<u64>>,
136    /// The node configuration directory.
137    node_data_dir: NodeDataDir,
138}
139
140impl<N: Network> Primary<N> {
141    /// The maximum number of unconfirmed transmissions to send to the primary.
142    pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
143
144    /// Initializes a new primary instance.
145    #[allow(clippy::too_many_arguments)]
146    pub fn new(
147        account: Account<N>,
148        storage: Storage<N>,
149        ledger: Arc<dyn LedgerService<N>>,
150        block_sync: Arc<BlockSync<N>>,
151        ip: Option<SocketAddr>,
152        trusted_validators: &[SocketAddr],
153        trusted_peers_only: bool,
154        node_data_dir: NodeDataDir,
155        dev: Option<u16>,
156    ) -> Result<Self> {
157        // Initialize the gateway.
158        let gateway = Gateway::new(
159            account,
160            storage.clone(),
161            ledger.clone(),
162            ip,
163            trusted_validators,
164            trusted_peers_only,
165            node_data_dir.clone(),
166            dev,
167        )?;
168        // Initialize the sync module.
169        let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone(), block_sync);
170
171        // Initialize the primary instance.
172        Ok(Self {
173            sync,
174            gateway,
175            storage,
176            ledger,
177            workers: Default::default(),
178            primary_callback: Default::default(),
179            proposed_batch: Default::default(),
180            latest_proposed_batch_timestamp: Default::default(),
181            #[cfg(feature = "metrics")]
182            batch_propose_start: Default::default(),
183            signed_proposals: Default::default(),
184            handles: Default::default(),
185            propose_lock: Default::default(),
186            node_data_dir,
187        })
188    }
189
190    /// Load the proposal cache file and update the Primary state with the stored data.
191    async fn load_proposal_cache(&self) -> Result<()> {
192        // Fetch the signed proposals from the file system if it exists.
193        match ProposalCache::<N>::exists(&self.node_data_dir) {
194            // If the proposal cache exists, then process the proposal cache.
195            true => match ProposalCache::<N>::load(self.gateway.account().address(), &self.node_data_dir) {
196                Ok(proposal_cache) => {
197                    // Extract the proposal and signed proposals.
198                    let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
199                        proposal_cache.into();
200
201                    // Verify that the proposal cache is not too far ahead of the ledger.
202                    // If the cache round exceeds the ledger round by more than MAX_GC_ROUNDS, the ledger
203                    // snapshot is too old to recover from the cached state. The operator must restore a
204                    // more recent ledger snapshot before restarting the node.
205                    let ledger_round = self.ledger.latest_round();
206                    let max_gc_rounds = BatchHeader::<N>::MAX_GC_ROUNDS as u64;
207                    if latest_certificate_round > ledger_round.saturating_add(max_gc_rounds) {
208                        bail!(
209                            "The proposal cache (round {latest_certificate_round}) is more than {max_gc_rounds} \
210                             rounds ahead of the ledger (round {ledger_round}). \
211                             Please restore a more recent ledger snapshot before restarting the node."
212                        );
213                    }
214
215                    // Write the proposed batch.
216                    *self.proposed_batch.write() = proposed_batch;
217                    // Write the signed proposals.
218                    *self.signed_proposals.write() = signed_proposals;
219                    // Writ the propose lock.
220                    *self.propose_lock.lock().await = latest_certificate_round;
221
222                    // Update the storage with the pending certificates.
223                    for certificate in pending_certificates {
224                        let batch_id = certificate.batch_id();
225                        // We use a dummy IP because the node should not need to request from any peers.
226                        // The storage should have stored all the transmissions. If not, we simply
227                        // skip the certificate.
228                        if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
229                        {
230                            let err = err.context(format!(
231                                "Failed to load stored certificate {} from proposal cache",
232                                fmt_id(batch_id)
233                            ));
234                            warn!("{}", &flatten_error(err));
235                        }
236                    }
237                    Ok(())
238                }
239                Err(err) => Err(err.context("Failed to read the signed proposals from the file system")),
240            },
241            // If the proposal cache does not exist, then return early.
242            false => Ok(()),
243        }
244    }
245
246    /// Run the primary instance.
247    pub async fn run(
248        &self,
249        ping: Option<Arc<Ping<N>>>,
250        primary_callback: Option<Arc<dyn PrimaryCallback<N>>>,
251        sync_callback: Option<Arc<dyn SyncCallback<N>>>,
252        primary_sender: PrimarySender<N>,
253        primary_receiver: PrimaryReceiver<N>,
254    ) -> Result<()> {
255        info!("Starting the primary instance of the memory pool...");
256
257        // Set the BFT sender.
258        if let Some(callback) = primary_callback {
259            self.primary_callback.set(callback)?;
260        }
261
262        // Construct a map of the worker senders.
263        let mut worker_senders = IndexMap::new();
264        // Construct a map for the workers.
265        let mut workers = Vec::new();
266        // Initialize the workers.
267        for id in 0..MAX_WORKERS {
268            // Construct the worker channels.
269            let (tx_worker, rx_worker) = init_worker_channels();
270            // Construct the worker instance.
271            let worker = Worker::new(
272                id,
273                Arc::new(self.gateway.clone()),
274                self.storage.clone(),
275                self.ledger.clone(),
276                self.proposed_batch.clone(),
277            )?;
278            // Run the worker instance.
279            worker.run(rx_worker);
280            // Add the worker to the list of workers.
281            workers.push(worker);
282            // Add the worker sender to the map.
283            worker_senders.insert(id, tx_worker);
284        }
285        // Set the workers.
286        if self.workers.set(workers).is_err() {
287            bail!("Workers already set. `Primary::run` cannot be called more than once.");
288        }
289
290        // First, initialize the sync channels.
291        let (sync_sender, sync_receiver) = init_sync_channels();
292        // Next, initialize the sync module and sync the storage from ledger.
293        self.sync.initialize(sync_callback).await?;
294        // Next, load and process the proposal cache before running the sync module.
295        self.load_proposal_cache().await?;
296        // Next, run the sync module.
297        self.sync.run(ping, sync_receiver).await?;
298        // Next, initialize the gateway.
299        self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
300        // Lastly, start the primary handlers.
301        // Note: This ensures the primary does not start communicating before syncing is complete.
302        self.start_handlers(primary_receiver);
303
304        Ok(())
305    }
306
307    /// Returns the current round.
308    pub fn current_round(&self) -> u64 {
309        self.storage.current_round()
310    }
311
312    /// Returns `true` if the primary is synced.
313    pub fn is_synced(&self) -> bool {
314        self.sync.is_synced()
315    }
316
317    /// Returns the gateway.
318    pub const fn gateway(&self) -> &Gateway<N> {
319        &self.gateway
320    }
321
322    /// Returns the storage.
323    pub const fn storage(&self) -> &Storage<N> {
324        &self.storage
325    }
326
327    /// Returns the ledger.
328    pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
329        &self.ledger
330    }
331
332    /// Returns the number of workers.
333    pub fn num_workers(&self) -> u8 {
334        u8::try_from(self.workers.get().expect("Primary is not running yet").len()).expect("Too many workers")
335    }
336
337    /// Returns the workers.
338    pub fn workers(&self) -> &[Worker<N>] {
339        self.workers.get().expect("Primary is not running yet")
340    }
341
342    /// Returns the batch proposal of our primary, if one currently exists.
343    pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
344        &self.proposed_batch
345    }
346}
347
348impl<N: Network> Primary<N> {
349    /// Returns the number of unconfirmed transmissions.
350    pub fn num_unconfirmed_transmissions(&self) -> usize {
351        self.workers().iter().map(|worker| worker.num_transmissions()).sum()
352    }
353
354    /// Returns the number of unconfirmed ratifications.
355    pub fn num_unconfirmed_ratifications(&self) -> usize {
356        self.workers().iter().map(|worker| worker.num_ratifications()).sum()
357    }
358
359    /// Returns the number of unconfirmed solutions.
360    pub fn num_unconfirmed_solutions(&self) -> usize {
361        self.workers().iter().map(|worker| worker.num_solutions()).sum()
362    }
363
364    /// Returns the number of unconfirmed transactions.
365    pub fn num_unconfirmed_transactions(&self) -> usize {
366        self.workers().iter().map(|worker| worker.num_transactions()).sum()
367    }
368}
369
370impl<N: Network> Primary<N> {
371    /// Returns the worker transmission IDs.
372    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
373        self.workers().iter().flat_map(|worker| worker.transmission_ids())
374    }
375
376    /// Returns the worker transmissions.
377    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
378        self.workers().iter().flat_map(|worker| worker.transmissions())
379    }
380
381    /// Returns the worker solutions.
382    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
383        self.workers().iter().flat_map(|worker| worker.solutions())
384    }
385
386    /// Returns the worker transactions.
387    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
388        self.workers().iter().flat_map(|worker| worker.transactions())
389    }
390}
391
392impl<N: Network> Primary<N> {
393    /// Clears the worker solutions.
394    pub fn clear_worker_solutions(&self) {
395        self.workers().iter().for_each(Worker::clear_solutions);
396    }
397}
398
399impl<N: Network> Primary<N> {
400    /// Proposes the batch for the current round.
401    ///
402    /// This method performs the following steps:
403    /// 1. Drain the workers.
404    /// 2. Sign the batch.
405    /// 3. Set the batch proposal in the primary.
406    /// 4. Broadcast the batch header to all validators for signing.
407    pub async fn propose_batch(&self) -> Result<()> {
408        // This function isn't re-entrant.
409        let mut lock_guard = self.propose_lock.lock().await;
410
411        // Check if the proposed batch has expired, and clear it if it has expired.
412        if let Err(err) = self
413            .check_proposed_batch_for_expiration()
414            .with_context(|| "Failed to check the proposed batch for expiration")
415        {
416            warn!("{}", flatten_error(&err));
417            return Ok(());
418        }
419
420        // Retrieve the current round.
421        let round = self.current_round();
422        // Compute the previous round.
423        let previous_round = round.saturating_sub(1);
424
425        // If the current round is 0, return early.
426        // This can actually never happen, because of the invariant that the current round is never 0
427        // (see [`StorageInner::current_round`]).
428        ensure!(round > 0, "Round 0 cannot have transaction batches");
429
430        // If the current storage round is below the latest proposal round, then return early.
431        if round < *lock_guard {
432            warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
433            return Ok(());
434        }
435
436        // If there is a batch being proposed already,
437        // rebroadcast the batch header to the non-signers, and return early.
438        if let Some(proposal) = self.proposed_batch.read().as_ref() {
439            // Ensure that the storage is caught up to the proposal before proceeding to rebroadcast this.
440            if round < proposal.round()
441                || proposal
442                    .batch_header()
443                    .previous_certificate_ids()
444                    .iter()
445                    .any(|id| !self.storage.contains_certificate(*id))
446            {
447                warn!(
448                    "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
449                    proposal.round(),
450                );
451                return Ok(());
452            }
453            // Construct the event.
454            // TODO(ljedrz): the BatchHeader should be serialized only once in advance before being sent to non-signers.
455            let event = Event::BatchPropose(proposal.batch_header().clone().into());
456            // Iterate through the non-signers.
457            for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
458                // Resolve the address to the peer IP.
459                match self.gateway.resolver().read().get_peer_ip_for_address(address) {
460                    // Resend the batch proposal to the validator for signing.
461                    Some(peer_ip) => {
462                        let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
463                        tokio::spawn(async move {
464                            debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
465                            // Resend the batch proposal to the peer.
466                            if gateway.send(peer_ip, event_).await.is_none() {
467                                warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
468                            }
469                        });
470                    }
471                    None => continue,
472                }
473            }
474            debug!("Proposed batch for round {} is still valid", proposal.round());
475            return Ok(());
476        }
477
478        #[cfg(feature = "metrics")]
479        metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
480
481        // Ensure that the primary does not create a new proposal too quickly.
482        if let Err(err) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) {
483            debug!(
484                "{}",
485                flatten_error(err.context(format!("Primary is safely skipping a batch proposal for round {round}")))
486            );
487            return Ok(());
488        }
489
490        // Ensure the primary has not proposed a batch for this round before.
491        if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
492            // If a BFT sender was provided, attempt to advance the current round.
493            if let Some(cb) = &*self.primary_callback.get_ref() {
494                match cb.update_to_next_round(self.current_round()) {
495                    // 'is_ready' is true if the primary is ready to propose a batch for the next round.
496                    true => (), // continue,
497                    // 'is_ready' is false if the primary is not ready to propose a batch for the next round.
498                    false => return Ok(()),
499                }
500            }
501            debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
502            return Ok(());
503        }
504
505        // Determine if the current round has been proposed.
506        // Note: Do NOT make this judgment in advance before rebroadcast and round update. Rebroadcasting is
507        // good for network reliability and should not be prevented for the already existing proposed_batch.
508        // If a certificate already exists for the current round, an attempt should be made to advance the
509        // round as early as possible.
510        if round == *lock_guard {
511            debug!("Primary is safely skipping a batch proposal - round {round} already proposed");
512            return Ok(());
513        }
514
515        // Retrieve the committee to check against.
516        let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
517        // Check if the primary is connected to enough validators to reach quorum threshold.
518        {
519            // Retrieve the connected validator addresses.
520            let mut connected_validators = self.gateway.connected_addresses();
521            // Append the primary to the set.
522            connected_validators.insert(self.gateway.account().address());
523            // If quorum threshold is not reached, return early.
524            if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
525                debug!(
526                    "Primary is safely skipping a batch proposal for round {round} {}",
527                    "(please connect to more validators)".dimmed()
528                );
529                trace!("Primary is connected to {} validators", connected_validators.len() - 1);
530                return Ok(());
531            }
532        }
533
534        // Retrieve the previous certificates.
535        let previous_certificates = self.storage.get_certificates_for_round(previous_round);
536
537        // Check if the batch is ready to be proposed.
538        // Note: The primary starts at round 1, and round 0 contains no certificates, by definition.
539        let mut is_ready = previous_round == 0;
540        // If the previous round is not 0, check if the previous certificates have reached the quorum threshold.
541        if previous_round > 0 {
542            // Retrieve the committee lookback for the round.
543            let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
544                bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
545            };
546            // Construct a set over the authors.
547            let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
548            // Check if the previous certificates have reached the quorum threshold.
549            if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
550                is_ready = true;
551            }
552        }
553        // If the batch is not ready to be proposed, return early.
554        if !is_ready {
555            debug!(
556                "Primary is safely skipping a batch proposal for round {round} {}",
557                format!("(previous round {previous_round} has not reached quorum)").dimmed()
558            );
559            return Ok(());
560        }
561
562        // Initialize the map of transmissions.
563        let mut transmissions: IndexMap<_, _> = Default::default();
564        // Track the total execution costs of the batch proposal as it is being constructed.
565        let mut proposal_cost = 0u64;
566        // Note: worker draining and transaction inclusion needs to be thought
567        // through carefully when there is more than one worker. The fairness
568        // provided by one worker (FIFO) is no longer guaranteed with multiple workers.
569        debug_assert_eq!(MAX_WORKERS, 1);
570
571        'outer: for worker in self.workers().iter() {
572            let mut num_worker_transmissions = 0usize;
573
574            while let Some((id, transmission)) = worker.remove_front() {
575                // Check the selected transmissions are below the batch limit.
576                if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
577                    // Reinsert the transmission into the worker.
578                    worker.insert_front(id, transmission);
579                    break 'outer;
580                }
581
582                // Check the max transmissions per worker is not exceeded.
583                if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER {
584                    // Reinsert the transmission into the worker.
585                    worker.insert_front(id, transmission);
586                    continue 'outer;
587                }
588
589                // Check if the ledger already contains the transmission.
590                if self.ledger.contains_transmission(&id).unwrap_or(true) {
591                    trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
592                    continue;
593                }
594
595                // Check if the storage already contain the transmission.
596                // Note: We do not skip if this is the first transmission in the proposal, to ensure that
597                // the primary does not propose a batch with no transmissions.
598                if !transmissions.is_empty() && self.storage.contains_transmission(id) {
599                    trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
600                    continue;
601                }
602
603                // Check the transmission is still valid.
604                match (id, transmission.clone()) {
605                    (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
606                        // Ensure the checksum matches. If not, skip the solution.
607                        if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum)
608                        {
609                            trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id));
610                            continue;
611                        }
612                        // Check if the solution is still valid.
613                        if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
614                            trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
615                            continue;
616                        }
617                    }
618                    (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
619                        // Ensure the checksum matches. If not, skip the transaction.
620                        if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum )
621                        {
622                            trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id));
623                            continue;
624                        }
625
626                        // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
627                        let transaction = spawn_blocking!({
628                            match transaction {
629                                Data::Object(transaction) => Ok(transaction),
630                                Data::Buffer(bytes) => Ok(Transaction::<N>::read_le(
631                                    &mut bytes.take(N::LATEST_MAX_TRANSACTION_SIZE() as u64),
632                                )?),
633                            }
634                        })?;
635
636                        // Fetch the current block height and consensus version.
637                        let current_block_height = self.ledger.latest_block_height();
638                        let consensus_version = N::CONSENSUS_VERSION(current_block_height)?;
639
640                        // Compute the transaction spent cost (in microcredits).
641                        // Note: We purposefully discard this transaction if we are unable to compute the spent cost.
642                        let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
643                        else {
644                            debug!(
645                                "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost",
646                                fmt_id(transaction_id)
647                            );
648                            continue;
649                        };
650
651                        // Check if the transaction is still valid.
652                        if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
653                            trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
654                            continue;
655                        }
656
657                        // Compute the next proposal cost.
658                        // Note: We purposefully discard this transaction if the proposal cost overflows.
659                        let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
660                            debug!(
661                                "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed",
662                                fmt_id(transaction_id)
663                            );
664                            continue;
665                        };
666
667                        // Check if the next proposal cost exceeds the batch proposal spend limit.
668                        let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(current_block_height);
669                        if next_proposal_cost > batch_spend_limit {
670                            debug!(
671                                "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})",
672                                fmt_id(transaction_id),
673                                batch_spend_limit
674                            );
675
676                            // Reinsert the transmission into the worker.
677                            worker.insert_front(id, transmission);
678                            break 'outer;
679                        }
680
681                        // Update the proposal cost.
682                        proposal_cost = next_proposal_cost;
683                    }
684
685                    // Note: We explicitly forbid including ratifications,
686                    // as the protocol currently does not support ratifications.
687                    (TransmissionID::Ratification, Transmission::Ratification) => continue,
688                    // All other combinations are clearly invalid.
689                    _ => continue,
690                }
691
692                // If the transmission is valid, insert it into the proposal's transmission list.
693                transmissions.insert(id, transmission);
694                num_worker_transmissions = num_worker_transmissions.saturating_add(1);
695            }
696        }
697
698        // Determine the current timestamp.
699        let current_timestamp = now();
700
701        *lock_guard = round;
702
703        /* Proceeding to sign & propose the batch. */
704        info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
705
706        // Retrieve the private key.
707        let private_key = *self.gateway.account().private_key();
708        // Retrieve the committee ID.
709        let committee_id = committee_lookback.id();
710        // Prepare the transmission IDs.
711        let transmission_ids = transmissions.keys().copied().collect();
712        // Prepare the previous batch certificate IDs.
713        let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
714        // Sign the batch header and construct the proposal.
715        let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
716            &private_key,
717            round,
718            current_timestamp,
719            committee_id,
720            transmission_ids,
721            previous_certificate_ids,
722            &mut rand::thread_rng()
723        ))
724        .and_then(|batch_header| {
725            Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
726                .map(|proposal| (batch_header, proposal))
727        })
728        .inspect_err(|_| {
729            // On error, reinsert the transmissions and then propagate the error.
730            if let Err(err) = self.reinsert_transmissions_into_workers(transmissions) {
731                error!("{}", flatten_error(err.context("Failed to reinsert transmissions")));
732            }
733        })?;
734        // Broadcast the batch to all validators for signing.
735        self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
736        // Set the timestamp of the latest proposed batch.
737        *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
738        // Record the wall-clock time at which the batch was proposed.
739        #[cfg(feature = "metrics")]
740        {
741            *self.batch_propose_start.lock() = Some(Instant::now());
742        }
743        // Set the proposed batch.
744        *self.proposed_batch.write() = Some(proposal);
745        Ok(())
746    }
747
748    /// Processes a batch propose from a peer.
749    ///
750    /// This method performs the following steps:
751    /// 1. Verify the batch.
752    /// 2. Sign the batch.
753    /// 3. Broadcast the signature back to the validator.
754    ///
755    /// If our primary is ahead of the peer, we will not sign the batch.
756    /// If our primary is behind the peer, but within GC range, we will sync up to the peer's round, and then sign the batch.
757    async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
758        let BatchPropose { round: batch_round, batch_header } = batch_propose;
759
760        // Deserialize the batch header.
761        let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
762        // Ensure the round matches in the batch header.
763        if batch_round != batch_header.round() {
764            // Proceed to disconnect the validator.
765            self.gateway.disconnect(peer_ip);
766            bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
767        }
768
769        // Retrieve the batch author.
770        let batch_author = batch_header.author();
771
772        // Ensure the batch proposal is from the validator.
773        match self.gateway.resolve_to_aleo_addr(peer_ip) {
774            // If the peer is a validator, then ensure the batch proposal is from the validator.
775            Some(address) => {
776                if address != batch_author {
777                    // Proceed to disconnect the validator.
778                    self.gateway.disconnect(peer_ip);
779                    bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
780                }
781            }
782            None => bail!("Batch proposal from a disconnected validator"),
783        }
784        // Ensure the batch author is a current committee member.
785        if !self.gateway.is_authorized_validator_address(batch_author) {
786            // Proceed to disconnect the validator.
787            self.gateway.disconnect(peer_ip);
788            bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
789        }
790        // Ensure the batch proposal is not from the current primary.
791        if self.gateway.account().address() == batch_author {
792            bail!("Invalid peer - proposed batch from myself ({batch_author})");
793        }
794
795        // Ensure that the batch proposal's committee ID matches the expected committee ID.
796        // This may happen when the network forks. A transaction referencing a
797        // state root from one side of the fork will be aborted on the other
798        // side of the fork. This leads to a different view of stake and
799        // therefore a different view of the committee ID.
800        let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
801        if expected_committee_id != batch_header.committee_id() {
802            // Proceed to disconnect the validator.
803            self.gateway.disconnect(peer_ip);
804            bail!(
805                "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
806                batch_header.committee_id()
807            );
808        }
809
810        // Retrieve the cached round and batch ID for this validator.
811        if let Some((signed_round, signed_batch_id, signature)) =
812            self.signed_proposals.read().get(&batch_author).copied()
813        {
814            // If the signed round is ahead of the peer's batch round, do not sign the proposal.
815            // Note: while this may be valid behavior, additional formal analysis and testing will need to be done before allowing it.
816            if signed_round > batch_header.round() {
817                bail!(
818                    "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}",
819                    batch_header.round()
820                );
821            }
822
823            // If the round matches and the batch ID differs, then the validator is malicious.
824            if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
825                bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
826            }
827            // If the round and batch ID matches, then skip signing the batch a second time.
828            // Instead, rebroadcast the cached signature to the peer.
829            if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
830                let gateway = self.gateway.clone();
831                tokio::spawn(async move {
832                    debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
833                    let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
834                    // Resend the batch signature to the peer.
835                    if gateway.send(peer_ip, event).await.is_none() {
836                        warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
837                    }
838                });
839                // Return early.
840                return Ok(());
841            }
842        }
843
844        // Ensure that the batch header doesn't already exist in storage.
845        // Note this is already checked in `check_batch_header`, however we can return early here without creating a blocking task.
846        if self.storage.contains_batch(batch_header.batch_id()) {
847            debug!(
848                "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
849                format!("batch for round {batch_round} already exists in storage").dimmed()
850            );
851            return Ok(());
852        }
853
854        // Compute the previous round.
855        let previous_round = batch_round.saturating_sub(1);
856        // Ensure that the peer did not propose a batch too quickly.
857        if let Err(err) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
858            // Proceed to disconnect the validator.
859            self.gateway.disconnect(peer_ip);
860            return Err(err.context(format!("Malicious behavior of peer '{peer_ip}'")));
861        }
862
863        // Ensure the batch header does not contain any ratifications.
864        if batch_header.contains(TransmissionID::Ratification) {
865            // Proceed to disconnect the validator.
866            self.gateway.disconnect(peer_ip);
867            bail!(
868                "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'",
869            );
870        }
871
872        // If the peer is ahead, use the batch header to sync up to the peer.
873        let mut missing_transmissions =
874            self.sync_with_batch_header_from_peer::<false, true>(peer_ip, &batch_header).await?;
875
876        // Check that the transmission ids match and are not fee transactions.
877        if let Err(err) = cfg_iter_mut!(&mut missing_transmissions).try_for_each(|(transmission_id, transmission)| {
878            // If the transmission is not well-formed, then return early.
879            self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
880        }) {
881            let err = err.context(format!(
882                "Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission"
883            ));
884            debug!("{}", flatten_error(err));
885            return Ok(());
886        }
887
888        // Ensure the batch is for the current round.
889        // This method must be called after fetching previous certificates (above),
890        // and prior to checking the batch header (below).
891        if let Err(e) = self.ensure_is_signing_round(batch_round) {
892            // If the primary is not signing for the peer's round, then return early.
893            debug!("{e} from '{peer_ip}'");
894            return Ok(());
895        }
896
897        // Ensure the batch header from the peer is valid.
898        let (storage, header) = (self.storage.clone(), batch_header.clone());
899
900        // Check the batch header, and return early if it already exists in storage.
901        let Some(missing_transmissions) =
902            spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?
903        else {
904            return Ok(());
905        };
906
907        // Inserts the missing transmissions into the workers.
908        self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
909
910        // Ensure the transaction doesn't bring the proposal above the spend limit.
911        let block_height = self.ledger.latest_block_height() + 1;
912        if N::CONSENSUS_VERSION(block_height)? >= ConsensusVersion::V5 {
913            let mut proposal_cost = 0u64;
914            for transmission_id in batch_header.transmission_ids() {
915                let worker_id = assign_to_worker(*transmission_id, self.num_workers())?;
916                let Some(worker) = self.workers().get(worker_id as usize) else {
917                    debug!("Unable to find worker {worker_id}");
918                    return Ok(());
919                };
920
921                let Some(transmission) = worker.get_transmission(*transmission_id) else {
922                    debug!("Unable to find transmission '{}' in worker '{worker_id}", fmt_id(transmission_id));
923                    return Ok(());
924                };
925
926                // If the transmission is a transaction, compute its execution cost.
927                if let (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) =
928                    (transmission_id, transmission)
929                {
930                    // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
931                    let transaction = spawn_blocking!({
932                        match transaction {
933                            Data::Object(transaction) => Ok(transaction),
934                            Data::Buffer(bytes) => {
935                                // Note: If `LATEST_MAX_TRANSACTION_SIZE` ever decreases, then we need to allow the previous larger
936                                // size to be deserialized here, until the nodes are properly past the migration height.
937                                Ok(Transaction::<N>::read_le(&mut bytes.take(N::LATEST_MAX_TRANSACTION_SIZE() as u64))?)
938                            }
939                        }
940                    })?;
941
942                    // Fetch the current consensus version.
943                    let consensus_version = N::CONSENSUS_VERSION(block_height)?;
944                    // TODO (raychu86): Remove this logic after the next migration height is reached.
945                    // Enforce maximum transaction size.
946                    if consensus_version < ConsensusVersion::V14 {
947                        // Retrieve the maximum transaction size for the consensus version.
948                        if let Some(max_tx_size) = consensus_config_value!(N, MAX_TRANSACTION_SIZE, block_height) {
949                            // Ensure the transaction does not exceed the maximum size.
950                            if transaction.to_bytes_le()?.len() > max_tx_size {
951                                trace!(
952                                    "Invalid batch proposal - Batch proposal transaction '{}' - Exceeds maximum transaction size of {max_tx_size} bytes",
953                                    fmt_id(transaction_id),
954                                );
955                                continue;
956                            }
957                        }
958                    }
959
960                    // Compute the transaction spent cost (in microcredits).
961                    // Note: We purposefully discard this transaction if we are unable to compute the spent cost.
962                    let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
963                    else {
964                        bail!(
965                            "Invalid batch proposal - Unable to compute transaction spent cost on transaction '{}'",
966                            fmt_id(transaction_id)
967                        )
968                    };
969
970                    // Compute the next proposal cost.
971                    // Note: We purposefully discard this transaction if the proposal cost overflows.
972                    let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
973                        bail!(
974                            "Invalid batch proposal - Batch proposal overflowed on transaction '{}'",
975                            fmt_id(transaction_id)
976                        )
977                    };
978
979                    // Check if the next proposal cost exceeds the batch proposal spend limit.
980                    let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(block_height);
981                    if next_proposal_cost > batch_spend_limit {
982                        bail!(
983                            "Malicious peer - Batch proposal from '{peer_ip}' exceeds the spend limit on transaction '{}' ({next_proposal_cost} > {})",
984                            fmt_id(transaction_id),
985                            batch_spend_limit
986                        );
987                    }
988
989                    // Update the proposal cost.
990                    proposal_cost = next_proposal_cost;
991                }
992            }
993        }
994
995        /* Proceeding to sign the batch. */
996
997        // Retrieve the batch ID.
998        let batch_id = batch_header.batch_id();
999        // Sign the batch ID.
1000        let account = self.gateway.account().clone();
1001        let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::thread_rng()))?;
1002
1003        // Ensure the proposal has not already been signed.
1004        //
1005        // Note: Due to the need to sync the batch header with the peer, it is possible
1006        // for the primary to receive the same 'BatchPropose' event again, whereby only
1007        // one instance of this handler should sign the batch. This check guarantees this.
1008        match self.signed_proposals.write().0.entry(batch_author) {
1009            std::collections::hash_map::Entry::Occupied(mut entry) => {
1010                // If the validator has already signed a batch for this round, then return early,
1011                // since, if the peer still has not received the signature, they will request it again,
1012                // and the logic at the start of this function will resend the (now cached) signature
1013                // to the peer if asked to sign this batch proposal again.
1014                if entry.get().0 == batch_round {
1015                    return Ok(());
1016                }
1017                // Otherwise, cache the round, batch ID, and signature for this validator.
1018                entry.insert((batch_round, batch_id, signature));
1019            }
1020            // If the validator has not signed a batch before, then continue.
1021            std::collections::hash_map::Entry::Vacant(entry) => {
1022                // Cache the round, batch ID, and signature for this validator.
1023                entry.insert((batch_round, batch_id, signature));
1024            }
1025        };
1026
1027        // Broadcast the signature back to the validator.
1028        let self_ = self.clone();
1029        tokio::spawn(async move {
1030            let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
1031            // Send the batch signature to the peer.
1032            if self_.gateway.send(peer_ip, event).await.is_some() {
1033                debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
1034            }
1035        });
1036
1037        Ok(())
1038    }
1039
1040    /// Processes a batch signature from a peer.
1041    ///
1042    /// This method performs the following steps:
1043    /// 1. Ensure the proposed batch has not expired.
1044    /// 2. Verify the signature, ensuring it corresponds to the proposed batch.
1045    /// 3. Store the signature.
1046    /// 4. Certify the batch if enough signatures have been received.
1047    /// 5. Broadcast the batch certificate to all validators.
1048    async fn process_batch_signature_from_peer(
1049        &self,
1050        peer_ip: SocketAddr,
1051        batch_signature: BatchSignature<N>,
1052    ) -> Result<()> {
1053        // Ensure the proposed batch has not expired, and clear the proposed batch if it has expired.
1054        self.check_proposed_batch_for_expiration()?;
1055
1056        // Retrieve the signature and timestamp.
1057        let BatchSignature { batch_id, signature } = batch_signature;
1058
1059        // Retrieve the signer.
1060        let signer = signature.to_address();
1061
1062        // Ensure the batch signature is signed by the validator.
1063        if self.gateway.resolve_to_aleo_addr(peer_ip) != Some(signer) {
1064            // Proceed to disconnect the validator.
1065            self.gateway.disconnect(peer_ip);
1066            bail!("Malicious peer - batch signature is from a different validator ({signer})");
1067        }
1068        // Ensure the batch signature is not from the current primary.
1069        if self.gateway.account().address() == signer {
1070            bail!("Invalid peer - received a batch signature from myself ({signer})");
1071        }
1072
1073        let self_ = self.clone();
1074        let Some(proposal) = spawn_blocking!({
1075            // Acquire the write lock.
1076            let mut proposed_batch = self_.proposed_batch.write();
1077            // Add the signature to the batch, and determine if the batch is ready to be certified.
1078            match proposed_batch.as_mut() {
1079                Some(proposal) => {
1080                    // Ensure the batch ID matches the currently proposed batch ID.
1081                    if proposal.batch_id() != batch_id {
1082                        match self_.storage.contains_batch(batch_id) {
1083                            // If this batch was already certified, return early.
1084                            true => {
1085                                debug!(
1086                                    "Primary is safely skipping a a batch signature from {peer_ip} for round {} - batch is already certified",
1087                                    proposal.round()
1088                                );
1089                                return Ok(None);
1090                            }
1091                            // If the batch ID is unknown, return an error.
1092                            false => bail!(
1093                                "Unknown batch ID '{batch_id}', expected '{}' for round {}",
1094                                proposal.batch_id(),
1095                                proposal.round()
1096                            ),
1097                        }
1098                    }
1099                    // Retrieve the committee lookback for the round.
1100                    let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?;
1101                    // Retrieve the address of the validator.
1102                    let Some(signer) = self_.gateway.resolve_to_aleo_addr(peer_ip) else {
1103                        bail!("Signature is from a disconnected validator");
1104                    };
1105                    // Add the signature to the batch.
1106                    let new_signature = proposal.add_signature(signer, signature, &committee_lookback)?;
1107
1108                    if new_signature {
1109                        info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
1110                        // Check if the batch is ready to be certified.
1111                        if !proposal.is_quorum_threshold_reached(&committee_lookback) {
1112                            // If the batch is not ready to be certified, return early.
1113                            return Ok(None);
1114                        }
1115                    } else {
1116                        debug!(
1117                            "Received duplicated signature from '{peer_ip}' for batch {batch_id} in round {round}",
1118                            round = proposal.round()
1119                        );
1120                        return Ok(None);
1121                    }
1122                }
1123                // There is no proposed batch, so return early.
1124                None => return Ok(None),
1125            };
1126            // Retrieve the batch proposal, clearing the proposed batch.
1127            match proposed_batch.take() {
1128                Some(proposal) => Ok(Some(proposal)),
1129                None => Ok(None),
1130            }
1131        })?
1132        else {
1133            return Ok(());
1134        };
1135
1136        /* Proceeding to certify the batch. */
1137
1138        info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
1139
1140        // Retrieve the committee lookback for the round.
1141        let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1142        // Store the certified batch and broadcast it to all validators.
1143        // If there was an error storing the certificate, reinsert the transmissions back into the ready queue.
1144        if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
1145            // Reinsert the transmissions back into the ready queue for the next proposal.
1146            self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1147            return Err(e);
1148        }
1149
1150        #[cfg(feature = "metrics")]
1151        metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
1152        Ok(())
1153    }
1154
1155    /// Processes a batch certificate from a peer.
1156    ///
1157    /// This method performs the following steps:
1158    /// 1. Stores the given batch certificate, after ensuring it is valid.
1159    /// 2. If there are enough certificates to reach quorum threshold for the current round,
1160    ///    then proceed to advance to the next round.
1161    async fn process_batch_certificate_from_peer(
1162        &self,
1163        peer_ip: SocketAddr,
1164        certificate: BatchCertificate<N>,
1165    ) -> Result<()> {
1166        // Ensure the batch certificate is from an authorized validator.
1167        if !self.gateway.is_authorized_validator_ip(peer_ip) {
1168            // Proceed to disconnect the validator.
1169            self.gateway.disconnect(peer_ip);
1170            bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
1171        }
1172        // Ensure storage does not already contain the certificate.
1173        if self.storage.contains_certificate(certificate.id()) {
1174            return Ok(());
1175        // Otherwise, ensure ephemeral storage contains the certificate.
1176        } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
1177            self.storage.insert_unprocessed_certificate(certificate.clone())?;
1178        }
1179
1180        // Retrieve the batch certificate author.
1181        let author = certificate.author();
1182        // Retrieve the batch certificate round.
1183        let certificate_round = certificate.round();
1184        // Retrieve the batch certificate committee ID.
1185        let committee_id = certificate.committee_id();
1186
1187        // Ensure the batch certificate is not from the current primary.
1188        if self.gateway.account().address() == author {
1189            bail!("Received a batch certificate for myself ({author})");
1190        }
1191
1192        // Ensure that the incoming certificate is valid.
1193        self.storage.check_incoming_certificate(&certificate)?;
1194
1195        // Store the certificate, after ensuring it is valid above.
1196        // The following call recursively fetches and stores
1197        // the previous certificates referenced from this certificate.
1198        // It is critical to make the following call this after validating the certificate above.
1199        // The reason is that a sequence of malformed certificates,
1200        // with references to previous certificates with non-decreasing rounds,
1201        // cause the recursive fetching of certificates to crash the validator due to resource exhaustion.
1202        // Note that if the following call, if not returning an error, guarantees the backward closure of the DAG
1203        // (i.e. that all the referenced previous certificates are in the DAG before storing this one),
1204        // then all the validity checks in [`Storage::check_certificate`] should be redundant.
1205        // TODO: eliminate those redundant checks
1206        self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
1207
1208        // If there are enough certificates to reach quorum threshold for the certificate round,
1209        // then proceed to advance to the next round.
1210
1211        // Retrieve the committee lookback.
1212        let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
1213
1214        // Retrieve the certificate authors.
1215        let authors = self.storage.get_certificate_authors_for_round(certificate_round);
1216        // Check if the certificates have reached the quorum threshold.
1217        let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
1218
1219        // Ensure that the batch certificate's committee ID matches the expected committee ID.
1220        let expected_committee_id = committee_lookback.id();
1221        if expected_committee_id != committee_id {
1222            // Proceed to disconnect the validator.
1223            self.gateway.disconnect(peer_ip);
1224            bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
1225        }
1226
1227        // Determine if we are currently proposing a round that is relevant.
1228        // Note: This is important, because while our peers have advanced,
1229        // they may not be proposing yet, and thus still able to sign our proposed batch.
1230        let should_advance = match &*self.proposed_batch.read() {
1231            // We advance if the proposal round is less than the current round that was just certified.
1232            Some(proposal) => proposal.round() < certificate_round,
1233            // If there's no proposal, we consider advancing.
1234            None => true,
1235        };
1236
1237        // Retrieve the current round.
1238        let current_round = self.current_round();
1239
1240        // Determine whether to advance to the next round.
1241        if is_quorum && should_advance && certificate_round >= current_round {
1242            // If we have reached the quorum threshold and the round should advance, then proceed to the next round.
1243            self.try_increment_to_the_next_round(current_round + 1).await?;
1244        }
1245        Ok(())
1246    }
1247}
1248
1249impl<N: Network> Primary<N> {
1250    /// Starts the primary handlers.
1251    ///
1252    /// For each receiver in the `primary_receiver` struct, there will be a dedicated task
1253    /// that awaits new data and handles it accordingly.
1254    /// Additionally, this spawns a task that periodically issues PrimaryPings and one that periodically
1255    /// tries to move the the next round of batches.
1256    ///
1257    /// This function is called exactly once, in `Self::run()`.
1258    fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
1259        let PrimaryReceiver {
1260            mut rx_batch_propose,
1261            mut rx_batch_signature,
1262            mut rx_batch_certified,
1263            mut rx_primary_ping,
1264            mut rx_unconfirmed_solution,
1265            mut rx_unconfirmed_transaction,
1266        } = primary_receiver;
1267
1268        // Start the primary ping sender.
1269        let self_ = self.clone();
1270        self.spawn(async move {
1271            loop {
1272                // Sleep briefly.
1273                tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
1274
1275                // Retrieve the block locators.
1276                let self__ = self_.clone();
1277                let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1278                    Ok(block_locators) => block_locators,
1279                    Err(e) => {
1280                        warn!("Failed to retrieve block locators - {e}");
1281                        continue;
1282                    }
1283                };
1284
1285                // Retrieve the latest certificate of the primary.
1286                let primary_certificate = {
1287                    // Retrieve the primary address.
1288                    let primary_address = self_.gateway.account().address();
1289
1290                    // Iterate backwards from the latest round to find the primary certificate.
1291                    let mut certificate = None;
1292                    let mut current_round = self_.current_round();
1293                    while certificate.is_none() {
1294                        // If the current round is 0, then break the while loop.
1295                        if current_round == 0 {
1296                            break;
1297                        }
1298                        // Retrieve the primary certificates.
1299                        if let Some(primary_certificate) =
1300                            self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1301                        {
1302                            certificate = Some(primary_certificate);
1303                        // If the primary certificate was not found, decrement the round.
1304                        } else {
1305                            current_round = current_round.saturating_sub(1);
1306                        }
1307                    }
1308
1309                    // Determine if the primary certificate was found.
1310                    match certificate {
1311                        Some(certificate) => certificate,
1312                        // Skip this iteration of the loop (do not send a primary ping).
1313                        None => continue,
1314                    }
1315                };
1316
1317                // Construct the primary ping.
1318                let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1319                // Broadcast the event.
1320                self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1321            }
1322        });
1323
1324        // Start the primary ping handler.
1325        let self_ = self.clone();
1326        self.spawn(async move {
1327            while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1328                // If the primary is not synced, then do not process the primary ping.
1329                if self_.sync.is_synced() {
1330                    trace!("Processing new primary ping from '{peer_ip}'");
1331                } else {
1332                    trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1333                    continue;
1334                }
1335
1336                // Spawn a task to process the primary certificate.
1337                {
1338                    let self_ = self_.clone();
1339                    tokio::spawn(async move {
1340                        // Deserialize the primary certificate in the primary ping.
1341                        let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1342                        else {
1343                            warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1344                            return;
1345                        };
1346                        // Process the primary certificate.
1347                        let id = fmt_id(primary_certificate.id());
1348                        let round = primary_certificate.round();
1349                        if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1350                            warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}");
1351                        }
1352                    });
1353                }
1354            }
1355        });
1356
1357        // Start the worker ping(s).
1358        let self_ = self.clone();
1359        self.spawn(async move {
1360            loop {
1361                tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await;
1362                // If the primary is not synced, then do not broadcast the worker ping(s).
1363                if !self_.sync.is_synced() {
1364                    trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1365                    continue;
1366                }
1367                // Broadcast the worker ping(s).
1368                for worker in self_.workers() {
1369                    worker.broadcast_ping();
1370                }
1371            }
1372        });
1373
1374        // Start the batch proposer.
1375        let self_ = self.clone();
1376        self.spawn(async move {
1377            loop {
1378                // Sleep briefly, but longer than if there were no batch.
1379                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1380                let current_round = self_.current_round();
1381                // If the primary is not synced, then do not propose a batch.
1382                if !self_.sync.is_synced() {
1383                    debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed());
1384                    continue;
1385                }
1386                // A best-effort attempt to skip the scheduled batch proposal if
1387                // round progression already triggered one.
1388                if self_.propose_lock.try_lock().is_err() {
1389                    trace!(
1390                        "Skipping batch proposal for round {current_round} {}",
1391                        "(node is already proposing)".dimmed()
1392                    );
1393                    continue;
1394                };
1395                // If there is no proposed batch, attempt to propose a batch.
1396                // Note: Do NOT spawn a task around this function call. Proposing a batch is a critical path,
1397                // and only one batch needs to be proposed at a time.
1398                if let Err(e) = self_.propose_batch().await {
1399                    warn!("Cannot propose a batch - {e}");
1400                }
1401            }
1402        });
1403
1404        // Start the proposed batch handler.
1405        let self_ = self.clone();
1406        self.spawn(async move {
1407            while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1408                // If the primary is not synced, then do not sign the batch.
1409                if !self_.sync.is_synced() {
1410                    trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1411                    continue;
1412                }
1413                // Spawn a task to process the proposed batch.
1414                let self_ = self_.clone();
1415                tokio::spawn(async move {
1416                    // Process the batch proposal.
1417                    let round = batch_propose.round;
1418                    if let Err(e) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1419                        warn!("Cannot sign a batch at round {round} from '{peer_ip}' - {e}");
1420                    }
1421                });
1422            }
1423        });
1424
1425        // Start the batch signature handler.
1426        let self_ = self.clone();
1427        self.spawn(async move {
1428            while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1429                // If the primary is not synced, then do not store the signature.
1430                if !self_.sync.is_synced() {
1431                    trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1432                    continue;
1433                }
1434                // Process the batch signature.
1435                // Note: Do NOT spawn a task around this function call. Processing signatures from peers
1436                // is a critical path, and we should only store the minimum required number of signatures.
1437                // In addition, spawning a task can cause concurrent processing of signatures (even with a lock),
1438                // which means the RwLock for the proposed batch must become a 'tokio::sync' to be safe.
1439                let id = fmt_id(batch_signature.batch_id);
1440                if let Err(err) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1441                    let err = err.context(format!("Cannot store a signature for batch '{id}' from '{peer_ip}'"));
1442                    warn!("{}", flatten_error(err));
1443                }
1444            }
1445        });
1446
1447        // Start the certified batch handler.
1448        let self_ = self.clone();
1449        self.spawn(async move {
1450            while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1451                // If the primary is not synced, then do not store the certificate.
1452                if !self_.sync.is_synced() {
1453                    trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1454                    continue;
1455                }
1456                // Spawn a task to process the batch certificate.
1457                let self_ = self_.clone();
1458                tokio::spawn(async move {
1459                    // Deserialize the batch certificate.
1460                    let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1461                        warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1462                        return;
1463                    };
1464                    // Process the batch certificate.
1465                    let id = fmt_id(batch_certificate.id());
1466                    let round = batch_certificate.round();
1467                    if let Err(err) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1468                        warn!(
1469                            "{}",
1470                            flatten_error(err.context(format!(
1471                                "Cannot store a certificate '{id}' for round {round} from '{peer_ip}'"
1472                            )))
1473                        );
1474                    }
1475                });
1476            }
1477        });
1478
1479        // This task periodically tries to move to the next round.
1480        //
1481        // Note: This is necessary to ensure that the primary is not stuck on a previous round
1482        // despite having received enough certificates to advance to the next round.
1483        let self_ = self.clone();
1484        self.spawn(async move {
1485            loop {
1486                // Sleep briefly.
1487                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1488                // If the primary is not synced, then do not increment to the next round.
1489                if !self_.sync.is_synced() {
1490                    trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1491                    continue;
1492                }
1493                // Attempt to increment to the next round.
1494                let current_round = self_.current_round();
1495                let next_round = current_round.saturating_add(1);
1496                // Determine if the quorum threshold is reached for the current round.
1497                let is_quorum_threshold_reached = {
1498                    // Retrieve the certificate authors for the current round.
1499                    let authors = self_.storage.get_certificate_authors_for_round(current_round);
1500                    // If there are no certificates, then skip this check.
1501                    if authors.is_empty() {
1502                        continue;
1503                    }
1504                    // Retrieve the committee lookback for the current round.
1505                    let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(current_round) else {
1506                        warn!("Failed to retrieve the committee lookback for round {current_round}");
1507                        continue;
1508                    };
1509                    // Check if the quorum threshold is reached for the current round.
1510                    committee_lookback.is_quorum_threshold_reached(&authors)
1511                };
1512                // Attempt to increment to the next round if the quorum threshold is reached.
1513                if is_quorum_threshold_reached {
1514                    debug!("Quorum threshold reached for round {current_round}");
1515                    if let Err(err) = self_.try_increment_to_the_next_round(next_round).await {
1516                        warn!("{}", flatten_error(err.context("Failed to increment to the next round")));
1517                    }
1518                }
1519            }
1520        });
1521
1522        // Start a handler to process new unconfirmed solutions.
1523        let self_ = self.clone();
1524        self.spawn(async move {
1525            while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1526                // Compute the checksum for the solution.
1527                let Ok(checksum) = solution.to_checksum::<N>() else {
1528                    error!("Failed to compute the checksum for the unconfirmed solution");
1529                    continue;
1530                };
1531                // Compute the worker ID.
1532                let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1533                    error!("Unable to determine the worker ID for the unconfirmed solution");
1534                    continue;
1535                };
1536                let self_ = self_.clone();
1537                tokio::spawn(async move {
1538                    // Retrieve the worker.
1539                    let worker = &self_.workers()[worker_id as usize];
1540                    // Process the unconfirmed solution.
1541                    let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1542                    // Send the result to the callback.
1543                    callback.send(result).ok();
1544                });
1545            }
1546        });
1547
1548        // Start a handler to process new unconfirmed transactions.
1549        let self_ = self.clone();
1550        self.spawn(async move {
1551            while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1552                trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1553                // Compute the checksum for the transaction.
1554                let Ok(checksum) = transaction.to_checksum::<N>() else {
1555                    error!("Failed to compute the checksum for the unconfirmed transaction");
1556                    continue;
1557                };
1558                // Compute the worker ID.
1559                let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1560                    error!("Unable to determine the worker ID for the unconfirmed transaction");
1561                    continue;
1562                };
1563                let self_ = self_.clone();
1564                tokio::spawn(async move {
1565                    // Retrieve the worker.
1566                    let worker = &self_.workers().get(worker_id as usize).expect("Invalid worker ID");
1567                    // Process the unconfirmed transaction.
1568                    let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1569                    // Send the result to the callback.
1570                    callback.send(result).ok();
1571                });
1572            }
1573        });
1574    }
1575
1576    /// Checks if the proposed batch is expired, and clears the proposed batch if it has expired.
1577    fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1578        // Check if the proposed batch is timed out or stale.
1579        let is_expired = match self.proposed_batch.read().as_ref() {
1580            Some(proposal) => proposal.round() < self.current_round(),
1581            None => false,
1582        };
1583        // If the batch is expired, clear the proposed batch.
1584        if is_expired {
1585            // Reset the proposed batch.
1586            let proposal = self.proposed_batch.write().take();
1587            if let Some(proposal) = proposal {
1588                debug!("Cleared expired proposal for round {}", proposal.round());
1589                self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1590            }
1591        }
1592        Ok(())
1593    }
1594
1595    /// Increments to the next round.
1596    async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1597        // If the next round is within GC range, then iterate to the penultimate round.
1598        if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1599            let mut fast_forward_round = self.current_round();
1600            // Iterate until the penultimate round is reached.
1601            while fast_forward_round < next_round.saturating_sub(1) {
1602                // Update to the next round in storage.
1603                fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1604                // Clear the proposed batch.
1605                *self.proposed_batch.write() = None;
1606            }
1607        }
1608
1609        // Retrieve the current round.
1610        let current_round = self.current_round();
1611        // Attempt to advance to the next round.
1612        if current_round < next_round {
1613            // If a BFT sender was provided, send the current round to the BFT.
1614            let is_ready = if let Some(cb) = self.primary_callback.get() {
1615                cb.update_to_next_round(current_round)
1616            }
1617            // Otherwise, handle the Narwhal case.
1618            else {
1619                // Update to the next round in storage.
1620                self.storage.increment_to_next_round(current_round)?;
1621                // Set 'is_ready' to 'true'.
1622                true
1623            };
1624
1625            // Log whether the next round is ready.
1626            match is_ready {
1627                true => debug!("Primary is ready to propose the next round"),
1628                false => debug!("Primary is not ready to propose the next round"),
1629            }
1630
1631            // If the node is ready, propose a batch for the next round.
1632            if is_ready {
1633                self.propose_batch().await?;
1634            }
1635        }
1636        Ok(())
1637    }
1638
1639    /// Ensures the primary is signing for the specified batch round.
1640    /// This method is used to ensure: for a given round, as soon as the primary starts proposing,
1641    /// it will no longer sign for the previous round (as it has enough previous certificates to proceed).
1642    fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1643        // Retrieve the current round.
1644        let current_round = self.current_round();
1645        // Ensure the batch round is within GC range of the current round.
1646        if current_round + self.storage.max_gc_rounds() <= batch_round {
1647            bail!("Round {batch_round} is too far in the future")
1648        }
1649        // Ensure the batch round is at or one before the current round.
1650        // Intuition: Our primary has moved on to the next round, but has not necessarily started proposing,
1651        // so we can still sign for the previous round. If we have started proposing, the next check will fail.
1652        if current_round > batch_round + 1 {
1653            bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1654        }
1655        // Check if the primary is still signing for the batch round.
1656        if let Some(signing_round) = self.proposed_batch.read().as_ref().map(|proposal| proposal.round())
1657            && signing_round > batch_round
1658        {
1659            bail!("Our primary at round {signing_round} is no longer signing for round {batch_round}")
1660        }
1661        Ok(())
1662    }
1663
1664    /// Ensure the primary is not creating batch proposals too frequently.
1665    /// This checks that the certificate timestamp for the previous round is within the expected range.
1666    fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1667        // Retrieve the timestamp of the previous timestamp to check against.
1668        let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1669            // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago.
1670            Some(certificate) => certificate.timestamp(),
1671            None => match self.gateway.account().address() == author {
1672                // If we are the author, then ensure the previous proposal was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago.
1673                true => *self.latest_proposed_batch_timestamp.read(),
1674                // If we do not see a previous certificate for the author, then proceed optimistically.
1675                false => return Ok(()),
1676            },
1677        };
1678
1679        // Determine the elapsed time since the previous timestamp.
1680        let elapsed = timestamp
1681            .checked_sub(previous_timestamp)
1682            .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1683        // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago.
1684        match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1685            true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1686            false => Ok(()),
1687        }
1688    }
1689
1690    /// Stores the certified batch and broadcasts it to all validators, returning the certificate.
1691    async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1692        // Create the batch certificate and transmissions.
1693        let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1694        // Convert the transmissions into a HashMap.
1695        // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
1696        let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1697        // Store the certified batch.
1698        let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1699        spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1700        debug!("Stored a batch certificate for round {}", certificate.round());
1701        // If a BFT sender was provided, send the certificate to the BFT.
1702        if let Some(cb) = self.primary_callback.get() {
1703            // Await the callback to continue.
1704            cb.add_new_certificate(certificate.clone())
1705                .await
1706                .with_context(|| "Failed to add new certificate from primary")?;
1707        }
1708        // Broadcast the certified batch to all validators.
1709        self.gateway.broadcast(Event::BatchCertified(certificate.clone().into()));
1710        // Log the certified batch.
1711        let num_transmissions = certificate.transmission_ids().len();
1712        let round = certificate.round();
1713        info!("Our batch with {num_transmissions} transmissions for round {round} was certified!");
1714        // Record the certification latency (time from batch proposal to certification).
1715        #[cfg(feature = "metrics")]
1716        if let Some(start) = self.batch_propose_start.lock().take() {
1717            metrics::histogram(metrics::bft::BATCH_CERTIFICATION_LATENCY, start.elapsed().as_secs_f64());
1718        }
1719        // Increment to the next round.
1720        self.try_increment_to_the_next_round(round + 1).await
1721    }
1722
1723    /// Inserts the missing transmissions from the proposal into the workers.
1724    fn insert_missing_transmissions_into_workers(
1725        &self,
1726        peer_ip: SocketAddr,
1727        transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1728    ) -> Result<()> {
1729        // Insert the transmissions into the workers.
1730        assign_to_workers(self.workers(), transmissions, |worker, transmission_id, transmission| {
1731            worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1732        })
1733    }
1734
1735    /// Re-inserts the transmissions from the proposal into the workers.
1736    fn reinsert_transmissions_into_workers(
1737        &self,
1738        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1739    ) -> Result<()> {
1740        // Re-insert the transmissions into the workers.
1741        assign_to_workers(self.workers(), transmissions.into_iter(), |worker, transmission_id, transmission| {
1742            worker.reinsert(transmission_id, transmission);
1743        })
1744    }
1745
1746    /// Recursively stores a given batch certificate, after ensuring:
1747    ///   - Ensure the round matches the committee round.
1748    ///   - Ensure the address is a member of the committee.
1749    ///   - Ensure the timestamp is within range.
1750    ///   - Ensure we have all of the transmissions.
1751    ///   - Ensure we have all of the previous certificates.
1752    ///   - Ensure the previous certificates are for the previous round (i.e. round - 1).
1753    ///   - Ensure the previous certificates have reached the quorum threshold.
1754    ///   - Ensure we have not already signed the batch ID.
1755    #[async_recursion::async_recursion]
1756    async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1757        &self,
1758        peer_ip: SocketAddr,
1759        certificate: BatchCertificate<N>,
1760    ) -> Result<()> {
1761        // Retrieve the batch header.
1762        let batch_header = certificate.batch_header();
1763        // Retrieve the batch round.
1764        let batch_round = batch_header.round();
1765
1766        // If the certificate round is outdated, do not store it.
1767        if batch_round <= self.storage.gc_round() {
1768            return Ok(());
1769        }
1770        // If the certificate already exists in storage, return early.
1771        if self.storage.contains_certificate(certificate.id()) {
1772            return Ok(());
1773        }
1774
1775        // If node is not in sync mode and the node is not synced. Then return an error.
1776        if !IS_SYNCING && !self.is_synced() {
1777            bail!(
1778                "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1779                fmt_id(certificate.id())
1780            );
1781        }
1782
1783        // If the peer is ahead, use the batch header to sync up to the peer.
1784        let missing_transmissions =
1785            self.sync_with_batch_header_from_peer::<IS_SYNCING, false>(peer_ip, batch_header).await?;
1786
1787        // Check if the certificate needs to be stored.
1788        if !self.storage.contains_certificate(certificate.id()) {
1789            // Store the batch certificate.
1790            let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1791            spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1792            debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1793            // If a BFT sender was provided, send the round and certificate to the BFT.
1794            if let Some(cb) = self.primary_callback.get() {
1795                cb.add_new_certificate(certificate).await.with_context(|| "Failed to update the DAG from sync")?;
1796            }
1797        }
1798        Ok(())
1799    }
1800
1801    /// Recursively syncs using the given batch header.
1802    async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool, const CHECK_PREVIOUS_CERTIFICATES: bool>(
1803        &self,
1804        peer_ip: SocketAddr,
1805        batch_header: &BatchHeader<N>,
1806    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1807        // Retrieve the batch round.
1808        let batch_round = batch_header.round();
1809
1810        // If the certificate round is outdated, do not store it.
1811        if batch_round <= self.storage.gc_round() {
1812            bail!("Round {batch_round} is too far in the past")
1813        }
1814
1815        // If node is not in sync mode and the node is not synced, then return an error.
1816        if !IS_SYNCING && !self.is_synced() {
1817            bail!(
1818                "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1819                fmt_id(batch_header.batch_id())
1820            );
1821        }
1822
1823        // Determine if quorum threshold is reached on the batch round.
1824        let is_quorum_threshold_reached = {
1825            let authors = self.storage.get_certificate_authors_for_round(batch_round);
1826            let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1827            committee_lookback.is_quorum_threshold_reached(&authors)
1828        };
1829
1830        // Check if our primary should move to the next round.
1831        // Note: Checking that quorum threshold is reached is important for mitigating a race condition,
1832        // whereby Narwhal requires N-f, however the BFT only requires f+1. Without this check, the primary
1833        // will advance to the next round assuming f+1, not N-f, which can lead to a network stall.
1834        let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1835        // Check if our primary is far behind the peer.
1836        let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1837        // If our primary is far behind the peer, update our committee to the batch round.
1838        if is_behind_schedule || is_peer_far_in_future {
1839            // If the batch round is greater than the current committee round, update the committee.
1840            self.try_increment_to_the_next_round(batch_round).await?;
1841        }
1842
1843        // Ensure the primary has all of the transmissions.
1844        let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1845
1846        // Ensure the primary has all of the previous certificates.
1847        let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1848
1849        // Wait for the missing transmissions and previous certificates to be fetched.
1850        let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1851            missing_transmissions_handle,
1852            missing_previous_certificates_handle,
1853        ).with_context(|| format!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}"))?;
1854
1855        // Iterate through the missing previous certificates.
1856        for batch_certificate in missing_previous_certificates {
1857            // Check if the missing previous certificate is valid. This is only
1858            // needed if we are processing an incoming batch header from a peer.
1859            // For incoming certificates, validity is assured by checking the
1860            // root certificate in `process_batch_certificate_from_peer`.
1861            if CHECK_PREVIOUS_CERTIFICATES {
1862                self.storage.check_incoming_certificate(&batch_certificate)?;
1863            }
1864            // Store the batch certificate (recursively fetching any missing previous certificates).
1865            self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?;
1866        }
1867        Ok(missing_transmissions)
1868    }
1869
1870    /// Fetches any missing transmissions for the specified batch header.
1871    /// If a transmission does not exist, it will be fetched from the specified peer IP.
1872    async fn fetch_missing_transmissions(
1873        &self,
1874        peer_ip: SocketAddr,
1875        batch_header: &BatchHeader<N>,
1876    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1877        // If the round is <= the GC round, return early.
1878        if batch_header.round() <= self.storage.gc_round() {
1879            return Ok(Default::default());
1880        }
1881
1882        // Ensure this batch ID is new, otherwise return early.
1883        if self.storage.contains_batch(batch_header.batch_id()) {
1884            trace!("Batch for round {} from peer has already been processed", batch_header.round());
1885            return Ok(Default::default());
1886        }
1887
1888        // Retrieve the workers.
1889        let workers = self.workers.clone();
1890
1891        // Initialize a list for the transmissions.
1892        let mut fetch_transmissions = FuturesUnordered::new();
1893
1894        // Retrieve the number of workers.
1895        let num_workers = self.num_workers();
1896        // Iterate through the transmission IDs.
1897        for transmission_id in batch_header.transmission_ids() {
1898            // If the transmission does not exist in storage, proceed to fetch the transmission.
1899            if !self.storage.contains_transmission(*transmission_id) {
1900                // Determine the worker ID.
1901                let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1902                    bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1903                };
1904                // Retrieve the worker.
1905                let Some(worker) = workers.get().expect("No workers set").get(worker_id as usize) else {
1906                    bail!("Unable to find worker {worker_id}")
1907                };
1908                // Push the callback onto the list.
1909                fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
1910            }
1911        }
1912
1913        // Initialize a set for the transmissions.
1914        let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1915        // Wait for all of the transmissions to be fetched.
1916        while let Some(result) = fetch_transmissions.next().await {
1917            // Retrieve the transmission.
1918            let (transmission_id, transmission) = result?;
1919            // Insert the transmission into the set.
1920            transmissions.insert(transmission_id, transmission);
1921        }
1922        // Return the transmissions.
1923        Ok(transmissions)
1924    }
1925
1926    /// Fetches any missing previous certificates for the specified batch header from the specified peer.
1927    async fn fetch_missing_previous_certificates(
1928        &self,
1929        peer_ip: SocketAddr,
1930        batch_header: &BatchHeader<N>,
1931    ) -> Result<HashSet<BatchCertificate<N>>> {
1932        // Retrieve the round.
1933        let round = batch_header.round();
1934        // If the previous round is 0, or is <= the GC round, return early.
1935        if round == 1 || round <= self.storage.gc_round() + 1 {
1936            return Ok(Default::default());
1937        }
1938
1939        // Fetch the missing previous certificates.
1940        let missing_previous_certificates =
1941            self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
1942        if !missing_previous_certificates.is_empty() {
1943            debug!(
1944                "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
1945                missing_previous_certificates.len(),
1946            );
1947        }
1948        // Return the missing previous certificates.
1949        Ok(missing_previous_certificates)
1950    }
1951
1952    /// Fetches any missing certificates for the specified batch header from the specified peer.
1953    async fn fetch_missing_certificates(
1954        &self,
1955        peer_ip: SocketAddr,
1956        round: u64,
1957        certificate_ids: &IndexSet<Field<N>>,
1958    ) -> Result<HashSet<BatchCertificate<N>>> {
1959        // Initialize a list for the missing certificates.
1960        let mut fetch_certificates = FuturesUnordered::new();
1961        // Initialize a set for the missing certificates.
1962        let mut missing_certificates = HashSet::default();
1963        // Iterate through the certificate IDs.
1964        for certificate_id in certificate_ids {
1965            // Check if the certificate already exists in the ledger.
1966            if self.ledger.contains_certificate(certificate_id)? {
1967                continue;
1968            }
1969            // Check if the certificate already exists in storage.
1970            if self.storage.contains_certificate(*certificate_id) {
1971                continue;
1972            }
1973            // If we have not fully processed the certificate yet, store it.
1974            if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
1975                missing_certificates.insert(certificate);
1976            } else {
1977                // If we do not have the certificate, request it.
1978                trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
1979                // TODO (howardwu): Limit the number of open requests we send to a peer.
1980                // Send an certificate request to the peer.
1981                fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
1982            }
1983        }
1984
1985        // If there are no certificates to fetch, return early with the existing unprocessed certificates.
1986        match fetch_certificates.is_empty() {
1987            true => return Ok(missing_certificates),
1988            false => trace!(
1989                "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
1990                fetch_certificates.len(),
1991            ),
1992        }
1993
1994        // Wait for all of the missing certificates to be fetched.
1995        while let Some(result) = fetch_certificates.next().await {
1996            // Insert the missing certificate into the set.
1997            missing_certificates.insert(result?);
1998        }
1999        // Return the missing certificates.
2000        Ok(missing_certificates)
2001    }
2002}
2003
2004impl<N: Network> Primary<N> {
2005    /// Spawns a task with the given future; it should only be used for long-running tasks.
2006    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
2007        self.handles.lock().push(tokio::spawn(future));
2008    }
2009
2010    /// Shuts down the primary.
2011    pub async fn shut_down(&self) {
2012        info!("Shutting down the primary...");
2013        // Remove the callback.
2014        self.primary_callback.clear();
2015        // Shut down the workers.
2016        self.workers().iter().for_each(|worker| worker.shut_down());
2017        // Abort the tasks.
2018        self.handles.lock().drain(..).for_each(|handle| handle.abort());
2019        // Save the current proposal cache to disk.
2020        let proposal_cache = {
2021            let proposal = self.proposed_batch.write().take();
2022            let signed_proposals = self.signed_proposals.read().clone();
2023            let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
2024            let pending_certificates = self.storage.get_pending_certificates();
2025            ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
2026        };
2027        if let Err(err) = proposal_cache.store(&self.node_data_dir) {
2028            error!("{}", flatten_error(err.context("Failed to store the current proposal cache")));
2029        }
2030        // Close the gateway.
2031        self.gateway.shut_down().await;
2032    }
2033}
2034
2035#[cfg(test)]
2036mod tests {
2037    use super::*;
2038    use snarkos_node_bft_ledger_service::MockLedgerService;
2039    use snarkos_node_bft_storage_service::BFTMemoryService;
2040    use snarkos_node_sync::{BlockSync, locators::test_helpers::sample_block_locators};
2041    use snarkvm::{
2042        ledger::{
2043            committee::{Committee, MIN_VALIDATOR_STAKE},
2044            test_helpers::sample_execution_transaction_with_fee,
2045        },
2046        prelude::{Address, Signature},
2047    };
2048
2049    use bytes::Bytes;
2050    use indexmap::IndexSet;
2051    use rand::RngCore;
2052
2053    type CurrentNetwork = snarkvm::prelude::MainnetV0;
2054
2055    fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) {
2056        // Create a committee containing the primary's account.
2057        const COMMITTEE_SIZE: usize = 4;
2058        let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
2059        let mut members = IndexMap::new();
2060
2061        for i in 0..COMMITTEE_SIZE {
2062            let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
2063            let account = Account::new(rng).unwrap();
2064
2065            members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
2066            accounts.push((socket_addr, account));
2067        }
2068
2069        (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
2070    }
2071
2072    // Returns a primary and a list of accounts in the configured committee.
2073    fn primary_with_committee(
2074        account_index: usize,
2075        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2076        committee: Committee<CurrentNetwork>,
2077        height: u32,
2078    ) -> Primary<CurrentNetwork> {
2079        let ledger = Arc::new(MockLedgerService::new_at_height(committee, height));
2080        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
2081
2082        // Initialize the primary.
2083        let account = accounts[account_index].1.clone();
2084        let block_sync = Arc::new(BlockSync::new(ledger.clone()));
2085        let primary =
2086            Primary::new(account, storage, ledger, block_sync, None, &[], false, NodeDataDir::new_test(None), None)
2087                .unwrap();
2088
2089        // Construct a worker instance.
2090        let worker = Worker::new(
2091            0, // id
2092            Arc::new(primary.gateway.clone()),
2093            primary.storage.clone(),
2094            primary.ledger.clone(),
2095            primary.proposed_batch.clone(),
2096        )
2097        .unwrap();
2098        let _ = primary.workers.set(vec![worker]);
2099        for a in accounts.iter().skip(account_index) {
2100            primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
2101        }
2102
2103        primary
2104    }
2105
2106    fn primary_without_handlers(
2107        rng: &mut TestRng,
2108    ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
2109        let (accounts, committee) = sample_committee(rng);
2110        let primary = primary_with_committee(
2111            0, // index of primary's account
2112            &accounts,
2113            committee,
2114            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(),
2115        );
2116
2117        (primary, accounts)
2118    }
2119
2120    // Creates a mock solution.
2121    fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
2122        // Sample a random fake solution ID.
2123        let solution_id = rng.r#gen::<u64>().into();
2124        // Vary the size of the solutions.
2125        let size = rng.gen_range(1024..10 * 1024);
2126        // Sample random fake solution bytes.
2127        let mut vec = vec![0u8; size];
2128        rng.fill_bytes(&mut vec);
2129        let solution = Data::Buffer(Bytes::from(vec));
2130        // Return the solution ID and solution.
2131        (solution_id, solution)
2132    }
2133
2134    // Samples a test transaction.
2135    fn sample_unconfirmed_transaction(
2136        rng: &mut TestRng,
2137    ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
2138        let transaction = sample_execution_transaction_with_fee(false, rng, 0);
2139        let id = transaction.id();
2140
2141        (id, Data::Object(transaction))
2142    }
2143
2144    // Creates a batch proposal with one solution and one transaction.
2145    fn create_test_proposal(
2146        author: &Account<CurrentNetwork>,
2147        committee: Committee<CurrentNetwork>,
2148        round: u64,
2149        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2150        timestamp: i64,
2151        num_transactions: u64,
2152        rng: &mut TestRng,
2153    ) -> Proposal<CurrentNetwork> {
2154        let mut transmission_ids = IndexSet::new();
2155        let mut transmissions = IndexMap::new();
2156
2157        // Prepare the solution and insert into the sets.
2158        let (solution_id, solution) = sample_unconfirmed_solution(rng);
2159        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2160        let solution_transmission_id = (solution_id, solution_checksum).into();
2161        transmission_ids.insert(solution_transmission_id);
2162        transmissions.insert(solution_transmission_id, Transmission::Solution(solution));
2163
2164        // Prepare the transactions and insert into the sets.
2165        for _ in 0..num_transactions {
2166            let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2167            let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2168            let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2169            transmission_ids.insert(transaction_transmission_id);
2170            transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction));
2171        }
2172
2173        // Retrieve the private key.
2174        let private_key = author.private_key();
2175        // Sign the batch header.
2176        let batch_header = BatchHeader::new(
2177            private_key,
2178            round,
2179            timestamp,
2180            committee.id(),
2181            transmission_ids,
2182            previous_certificate_ids,
2183            rng,
2184        )
2185        .unwrap();
2186        // Construct the proposal.
2187        Proposal::new(committee, batch_header, transmissions).unwrap()
2188    }
2189
2190    // Creates a signature of the primary's current proposal for each committee member (excluding
2191    // the primary).
2192    fn peer_signatures_for_proposal(
2193        primary: &Primary<CurrentNetwork>,
2194        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2195        rng: &mut TestRng,
2196    ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
2197        // Each committee member signs the batch.
2198        let mut signatures = Vec::with_capacity(accounts.len() - 1);
2199        for (socket_addr, account) in accounts {
2200            if account.address() == primary.gateway.account().address() {
2201                continue;
2202            }
2203            let batch_id = primary.proposed_batch.read().as_ref().unwrap().batch_id();
2204            let signature = account.sign(&[batch_id], rng).unwrap();
2205            signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
2206        }
2207
2208        signatures
2209    }
2210
2211    /// Creates a signature of the batch ID for each committee member (excluding the primary).
2212    fn peer_signatures_for_batch(
2213        primary_address: Address<CurrentNetwork>,
2214        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2215        batch_id: Field<CurrentNetwork>,
2216        rng: &mut TestRng,
2217    ) -> IndexSet<Signature<CurrentNetwork>> {
2218        let mut signatures = IndexSet::new();
2219        for (_, account) in accounts {
2220            if account.address() == primary_address {
2221                continue;
2222            }
2223            let signature = account.sign(&[batch_id], rng).unwrap();
2224            signatures.insert(signature);
2225        }
2226        signatures
2227    }
2228
2229    // Creates a batch certificate.
2230    fn create_batch_certificate(
2231        primary_address: Address<CurrentNetwork>,
2232        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2233        round: u64,
2234        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2235        rng: &mut TestRng,
2236    ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
2237        let timestamp = now();
2238
2239        let author =
2240            accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
2241        let private_key = author.private_key();
2242
2243        let committee_id = Field::rand(rng);
2244        let (solution_id, solution) = sample_unconfirmed_solution(rng);
2245        let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2246        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2247        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2248
2249        let solution_transmission_id = (solution_id, solution_checksum).into();
2250        let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2251
2252        let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
2253        let transmissions = [
2254            (solution_transmission_id, Transmission::Solution(solution)),
2255            (transaction_transmission_id, Transmission::Transaction(transaction)),
2256        ]
2257        .into();
2258
2259        let batch_header = BatchHeader::new(
2260            private_key,
2261            round,
2262            timestamp,
2263            committee_id,
2264            transmission_ids,
2265            previous_certificate_ids,
2266            rng,
2267        )
2268        .unwrap();
2269        let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
2270        let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
2271        (certificate, transmissions)
2272    }
2273
2274    // Create a certificate chain up to, but not including, the specified round in the primary storage.
2275    fn store_certificate_chain(
2276        primary: &Primary<CurrentNetwork>,
2277        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2278        round: u64,
2279        rng: &mut TestRng,
2280    ) -> IndexSet<Field<CurrentNetwork>> {
2281        let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2282        let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2283        for cur_round in 1..round {
2284            for (_, account) in accounts.iter() {
2285                let (certificate, transmissions) = create_batch_certificate(
2286                    account.address(),
2287                    accounts,
2288                    cur_round,
2289                    previous_certificates.clone(),
2290                    rng,
2291                );
2292                next_certificates.insert(certificate.id());
2293                assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
2294            }
2295
2296            assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
2297            previous_certificates = next_certificates;
2298            next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2299        }
2300
2301        previous_certificates
2302    }
2303
2304    // Insert the account socket addresses into the resolver so that
2305    // they are recognized as "connected".
2306    fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
2307        // First account is primary, which doesn't need to resolve.
2308        for (addr, acct) in accounts.iter().skip(1) {
2309            primary.gateway.resolver().write().insert_peer(*addr, *addr, Some(acct.address()));
2310        }
2311    }
2312
2313    #[tokio::test]
2314    async fn test_propose_batch() {
2315        let mut rng = TestRng::default();
2316        let (primary, _) = primary_without_handlers(&mut rng);
2317
2318        // Check there is no batch currently proposed.
2319        assert!(primary.proposed_batch.read().is_none());
2320
2321        // Generate a solution and a transaction.
2322        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2323        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2324
2325        // Store it on one of the workers.
2326        primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2327        primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2328
2329        // Try to propose a batch again. This time, it should succeed.
2330        assert!(primary.propose_batch().await.is_ok());
2331        assert!(primary.proposed_batch.read().is_some());
2332    }
2333
2334    #[tokio::test]
2335    async fn test_propose_batch_with_no_transmissions() {
2336        let mut rng = TestRng::default();
2337        let (primary, _) = primary_without_handlers(&mut rng);
2338
2339        // Check there is no batch currently proposed.
2340        assert!(primary.proposed_batch.read().is_none());
2341
2342        // Try to propose a batch with no transmissions.
2343        assert!(primary.propose_batch().await.is_ok());
2344        assert!(primary.proposed_batch.read().is_some());
2345    }
2346
2347    #[tokio::test]
2348    async fn test_propose_batch_in_round() {
2349        let round = 3;
2350        let mut rng = TestRng::default();
2351        let (primary, accounts) = primary_without_handlers(&mut rng);
2352
2353        // Fill primary storage.
2354        store_certificate_chain(&primary, &accounts, round, &mut rng);
2355
2356        // Sleep for a while to ensure the primary is ready to propose the next round.
2357        tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2358
2359        // Generate a solution and a transaction.
2360        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2361        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2362
2363        // Store it on one of the workers.
2364        primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2365        primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2366
2367        // Propose a batch again. This time, it should succeed.
2368        assert!(primary.propose_batch().await.is_ok());
2369        assert!(primary.proposed_batch.read().is_some());
2370    }
2371
2372    #[tokio::test]
2373    async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2374        let round = 3;
2375        let prev_round = round - 1;
2376        let mut rng = TestRng::default();
2377        let (primary, accounts) = primary_without_handlers(&mut rng);
2378        let peer_account = &accounts[1];
2379        let peer_ip = peer_account.0;
2380
2381        // Fill primary storage.
2382        store_certificate_chain(&primary, &accounts, round, &mut rng);
2383
2384        // Get transmissions from previous certificates.
2385        let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2386
2387        // Track the number of transmissions in the previous round.
2388        let mut num_transmissions_in_previous_round = 0;
2389
2390        // Generate a solution and a transaction.
2391        let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2392        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2393        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2394        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2395
2396        // Store it on one of the workers.
2397        primary.workers()[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2398        primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2399
2400        // Check that the worker has 2 transmissions.
2401        assert_eq!(primary.workers()[0].num_transmissions(), 2);
2402
2403        // Create certificates for the current round and add the transmissions to the worker before inserting the certificate to storage.
2404        for (_, account) in accounts.iter() {
2405            let (certificate, transmissions) = create_batch_certificate(
2406                account.address(),
2407                &accounts,
2408                round,
2409                previous_certificate_ids.clone(),
2410                &mut rng,
2411            );
2412
2413            // Add the transmissions to the worker.
2414            for (transmission_id, transmission) in transmissions.iter() {
2415                primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2416            }
2417
2418            // Insert the certificate to storage.
2419            num_transmissions_in_previous_round += transmissions.len();
2420            primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2421        }
2422
2423        // Sleep for a while to ensure the primary is ready to propose the next round.
2424        tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2425
2426        // Advance to the next round.
2427        assert!(primary.storage.increment_to_next_round(round).is_ok());
2428
2429        // Check that the worker has `num_transmissions_in_previous_round + 2` transmissions.
2430        assert_eq!(primary.workers()[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2431
2432        // Propose the batch.
2433        assert!(primary.propose_batch().await.is_ok());
2434
2435        // Check that the proposal only contains the new transmissions that were not in previous certificates.
2436        let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone();
2437        assert_eq!(proposed_transmissions.len(), 2);
2438        assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2439        assert!(
2440            proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))
2441        );
2442    }
2443
2444    #[tokio::test]
2445    async fn test_propose_batch_over_spend_limit() {
2446        let mut rng = TestRng::default();
2447
2448        // Create a primary to test spend limit backwards compatibility with V4.
2449        let (accounts, committee) = sample_committee(&mut rng);
2450        let primary = primary_with_committee(
2451            0,
2452            &accounts,
2453            committee.clone(),
2454            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2455        );
2456
2457        // Check there is no batch currently proposed.
2458        assert!(primary.proposed_batch.read().is_none());
2459        // Check the workers are empty.
2460        primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty()));
2461
2462        // Generate a solution and transactions.
2463        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2464        primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2465
2466        for _i in 0..5 {
2467            let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2468            // Store it on one of the workers.
2469            primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2470        }
2471
2472        // Try to propose a batch again. This time, it should succeed.
2473        assert!(primary.propose_batch().await.is_ok());
2474        // Expect 2/5 transactions to be included in the proposal in addition to the solution.
2475        assert_eq!(primary.proposed_batch.read().as_ref().unwrap().transmissions().len(), 3);
2476        // Check the transmissions were correctly drained from the workers.
2477        assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3);
2478    }
2479
2480    #[tokio::test]
2481    async fn test_batch_propose_from_peer() {
2482        let mut rng = TestRng::default();
2483        let (primary, accounts) = primary_without_handlers(&mut rng);
2484
2485        // Create a valid proposal with an author that isn't the primary.
2486        let round = 1;
2487        let peer_account = &accounts[1];
2488        let peer_ip = peer_account.0;
2489        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2490        let proposal = create_test_proposal(
2491            &peer_account.1,
2492            primary.ledger.current_committee().unwrap(),
2493            round,
2494            Default::default(),
2495            timestamp,
2496            1,
2497            &mut rng,
2498        );
2499
2500        // Make sure the primary is aware of the transmissions in the proposal.
2501        for (transmission_id, transmission) in proposal.transmissions() {
2502            primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2503        }
2504
2505        // The author must be known to resolver to pass propose checks.
2506        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2507
2508        // The primary will only consider itself synced if we received
2509        // block locators from a peer.
2510        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2511        primary.sync.testing_only_try_block_sync_testing_only().await;
2512
2513        // Try to process the batch proposal from the peer, should succeed.
2514        assert!(
2515            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
2516        );
2517    }
2518
2519    #[tokio::test]
2520    async fn test_batch_propose_from_peer_when_not_synced() {
2521        let mut rng = TestRng::default();
2522        let (primary, accounts) = primary_without_handlers(&mut rng);
2523
2524        // Create a valid proposal with an author that isn't the primary.
2525        let round = 1;
2526        let peer_account = &accounts[1];
2527        let peer_ip = peer_account.0;
2528        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2529        let proposal = create_test_proposal(
2530            &peer_account.1,
2531            primary.ledger.current_committee().unwrap(),
2532            round,
2533            Default::default(),
2534            timestamp,
2535            1,
2536            &mut rng,
2537        );
2538
2539        // Make sure the primary is aware of the transmissions in the proposal.
2540        for (transmission_id, transmission) in proposal.transmissions() {
2541            primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2542        }
2543
2544        // The author must be known to resolver to pass propose checks.
2545        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2546
2547        // Add a high block locator to indicate we are not synced.
2548        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2549
2550        // Try to process the batch proposal from the peer, should fail
2551        assert!(
2552            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2553        );
2554    }
2555
2556    #[tokio::test]
2557    async fn test_batch_propose_from_peer_in_round() {
2558        let round = 2;
2559        let mut rng = TestRng::default();
2560        let (primary, accounts) = primary_without_handlers(&mut rng);
2561
2562        // Generate certificates.
2563        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2564
2565        // Create a valid proposal with an author that isn't the primary.
2566        let peer_account = &accounts[1];
2567        let peer_ip = peer_account.0;
2568        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2569        let proposal = create_test_proposal(
2570            &peer_account.1,
2571            primary.ledger.current_committee().unwrap(),
2572            round,
2573            previous_certificates,
2574            timestamp,
2575            1,
2576            &mut rng,
2577        );
2578
2579        // Make sure the primary is aware of the transmissions in the proposal.
2580        for (transmission_id, transmission) in proposal.transmissions() {
2581            primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2582        }
2583
2584        // The author must be known to resolver to pass propose checks.
2585        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2586
2587        // The primary will only consider itself synced if we received
2588        // block locators from a peer.
2589        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2590        primary.sync.testing_only_try_block_sync_testing_only().await;
2591
2592        // Try to process the batch proposal from the peer, should succeed.
2593        primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2594    }
2595
2596    #[tokio::test]
2597    async fn test_batch_propose_from_peer_wrong_round() {
2598        let mut rng = TestRng::default();
2599        let (primary, accounts) = primary_without_handlers(&mut rng);
2600
2601        // Create a valid proposal with an author that isn't the primary.
2602        let round = 1;
2603        let peer_account = &accounts[1];
2604        let peer_ip = peer_account.0;
2605        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2606        let proposal = create_test_proposal(
2607            &peer_account.1,
2608            primary.ledger.current_committee().unwrap(),
2609            round,
2610            Default::default(),
2611            timestamp,
2612            1,
2613            &mut rng,
2614        );
2615
2616        // Make sure the primary is aware of the transmissions in the proposal.
2617        for (transmission_id, transmission) in proposal.transmissions() {
2618            primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2619        }
2620
2621        // The author must be known to resolver to pass propose checks.
2622        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2623        // The primary must be considered synced.
2624        primary.sync.testing_only_try_block_sync_testing_only().await;
2625
2626        // Try to process the batch proposal from the peer, should error.
2627        assert!(
2628            primary
2629                .process_batch_propose_from_peer(peer_ip, BatchPropose {
2630                    round: round + 1,
2631                    batch_header: Data::Object(proposal.batch_header().clone())
2632                })
2633                .await
2634                .is_err()
2635        );
2636    }
2637
2638    #[tokio::test]
2639    async fn test_batch_propose_from_peer_in_round_wrong_round() {
2640        let round = 4;
2641        let mut rng = TestRng::default();
2642        let (primary, accounts) = primary_without_handlers(&mut rng);
2643
2644        // Generate certificates.
2645        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2646
2647        // Create a valid proposal with an author that isn't the primary.
2648        let peer_account = &accounts[1];
2649        let peer_ip = peer_account.0;
2650        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2651        let proposal = create_test_proposal(
2652            &peer_account.1,
2653            primary.ledger.current_committee().unwrap(),
2654            round,
2655            previous_certificates,
2656            timestamp,
2657            1,
2658            &mut rng,
2659        );
2660
2661        // Make sure the primary is aware of the transmissions in the proposal.
2662        for (transmission_id, transmission) in proposal.transmissions() {
2663            primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2664        }
2665
2666        // The author must be known to resolver to pass propose checks.
2667        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2668        // The primary must be considered synced.
2669        primary.sync.testing_only_try_block_sync_testing_only().await;
2670
2671        // Try to process the batch proposal from the peer, should error.
2672        assert!(
2673            primary
2674                .process_batch_propose_from_peer(peer_ip, BatchPropose {
2675                    round: round + 1,
2676                    batch_header: Data::Object(proposal.batch_header().clone())
2677                })
2678                .await
2679                .is_err()
2680        );
2681    }
2682
2683    /// Tests that the minimum batch delay is enforced as expected, i.e., that proposals with timestamps that are too close to the previous proposal are rejected.
2684    #[tokio::test]
2685    async fn test_batch_propose_from_peer_with_past_timestamp() {
2686        let round = 2;
2687        let mut rng = TestRng::default();
2688        let (primary, accounts) = primary_without_handlers(&mut rng);
2689
2690        // Generate certificates.
2691        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2692
2693        // Create a valid proposal with an author that isn't the primary.
2694        let peer_account = &accounts[1];
2695        let peer_ip = peer_account.0;
2696
2697        // Use a timestamp that is too early.
2698        // Set it to something that is less than the minimum batch delay
2699        // Note, that the minimum delay is currently 1, so this will be equal to the last timestamp
2700        let last_timestamp = primary
2701            .storage
2702            .get_certificate_for_round_with_author(round - 1, peer_account.1.address())
2703            .expect("No previous proposal exists")
2704            .timestamp();
2705        let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY_IN_SECS as i64) - 1;
2706
2707        let proposal = create_test_proposal(
2708            &peer_account.1,
2709            primary.ledger.current_committee().unwrap(),
2710            round,
2711            previous_certificates,
2712            invalid_timestamp,
2713            1,
2714            &mut rng,
2715        );
2716
2717        // Make sure the primary is aware of the transmissions in the proposal.
2718        for (transmission_id, transmission) in proposal.transmissions() {
2719            primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2720        }
2721
2722        // The author must be known to resolver to pass propose checks.
2723        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2724        // The primary must be considered synced.
2725        primary.sync.testing_only_try_block_sync_testing_only().await;
2726
2727        // Try to process the batch proposal from the peer, should error.
2728        assert!(
2729            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2730        );
2731    }
2732
2733    /// Check that proposals rejected that have timestamps older than the previous proposal.
2734    #[tokio::test]
2735    async fn test_batch_propose_from_peer_over_spend_limit() {
2736        let mut rng = TestRng::default();
2737
2738        // Create two primaries to test spend limit activation on V5.
2739        let (accounts, committee) = sample_committee(&mut rng);
2740        let primary_v4 = primary_with_committee(
2741            0,
2742            &accounts,
2743            committee.clone(),
2744            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2745        );
2746        let primary_v5 = primary_with_committee(
2747            1,
2748            &accounts,
2749            committee.clone(),
2750            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V5).unwrap(),
2751        );
2752
2753        // Create a valid proposal with an author that isn't the primary.
2754        let round = 1;
2755        let peer_account = &accounts[2];
2756        let peer_ip = peer_account.0;
2757
2758        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2759
2760        let proposal =
2761            create_test_proposal(&peer_account.1, committee, round, Default::default(), timestamp, 4, &mut rng);
2762
2763        // Make sure the primary is aware of the transmissions in the proposal.
2764        for (transmission_id, transmission) in proposal.transmissions() {
2765            primary_v4.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2766            primary_v5.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2767        }
2768
2769        // The author must be known to resolver to pass propose checks.
2770        primary_v4.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2771        primary_v5.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2772
2773        // primary v4 must be considered synced.
2774        primary_v4.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2775        primary_v4.sync.testing_only_try_block_sync_testing_only().await;
2776
2777        // primary v5 must be ocnsidered synced.
2778        primary_v5.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2779        primary_v5.sync.testing_only_try_block_sync_testing_only().await;
2780
2781        // Check the spend limit is enforced from V5 onwards.
2782        assert!(
2783            primary_v4
2784                .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2785                .await
2786                .is_ok()
2787        );
2788
2789        assert!(
2790            primary_v5
2791                .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2792                .await
2793                .is_err()
2794        );
2795    }
2796
2797    #[tokio::test]
2798    async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2799        let round = 3;
2800        let mut rng = TestRng::default();
2801        let (primary, _) = primary_without_handlers(&mut rng);
2802
2803        // Check there is no batch currently proposed.
2804        assert!(primary.proposed_batch.read().is_none());
2805
2806        // Generate a solution and a transaction.
2807        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2808        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2809
2810        // Store it on one of the workers.
2811        primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2812        primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2813
2814        // Set the proposal lock to a round ahead of the storage.
2815        let old_proposal_lock_round = *primary.propose_lock.lock().await;
2816        *primary.propose_lock.lock().await = round + 1;
2817
2818        // Propose a batch and enforce that it fails.
2819        assert!(primary.propose_batch().await.is_ok());
2820        assert!(primary.proposed_batch.read().is_none());
2821
2822        // Set the proposal lock back to the old round.
2823        *primary.propose_lock.lock().await = old_proposal_lock_round;
2824
2825        // Try to propose a batch again. This time, it should succeed.
2826        assert!(primary.propose_batch().await.is_ok());
2827        assert!(primary.proposed_batch.read().is_some());
2828    }
2829
2830    #[tokio::test]
2831    async fn test_propose_batch_with_storage_round_behind_proposal() {
2832        let round = 5;
2833        let mut rng = TestRng::default();
2834        let (primary, accounts) = primary_without_handlers(&mut rng);
2835
2836        // Generate previous certificates.
2837        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2838
2839        // Create a valid proposal.
2840        let timestamp = now();
2841        let proposal = create_test_proposal(
2842            primary.gateway.account(),
2843            primary.ledger.current_committee().unwrap(),
2844            round + 1,
2845            previous_certificates,
2846            timestamp,
2847            1,
2848            &mut rng,
2849        );
2850
2851        // Store the proposal on the primary.
2852        *primary.proposed_batch.write() = Some(proposal);
2853
2854        // Try to propose a batch will terminate early because the storage is behind the proposal.
2855        assert!(primary.propose_batch().await.is_ok());
2856        assert!(primary.proposed_batch.read().is_some());
2857        assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
2858    }
2859
2860    #[tokio::test(flavor = "multi_thread")]
2861    async fn test_batch_signature_from_peer() {
2862        let mut rng = TestRng::default();
2863        let (primary, accounts) = primary_without_handlers(&mut rng);
2864        map_account_addresses(&primary, &accounts);
2865
2866        // Create a valid proposal.
2867        let round = 1;
2868        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2869        let proposal = create_test_proposal(
2870            primary.gateway.account(),
2871            primary.ledger.current_committee().unwrap(),
2872            round,
2873            Default::default(),
2874            timestamp,
2875            1,
2876            &mut rng,
2877        );
2878
2879        // Store the proposal on the primary.
2880        *primary.proposed_batch.write() = Some(proposal);
2881
2882        // Each committee member signs the batch.
2883        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2884
2885        // Have the primary process the signatures.
2886        for (socket_addr, signature) in signatures {
2887            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2888        }
2889
2890        // Check the certificate was created and stored by the primary.
2891        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2892        // Check the round was incremented.
2893        assert_eq!(primary.current_round(), round + 1);
2894    }
2895
2896    #[tokio::test(flavor = "multi_thread")]
2897    async fn test_batch_signature_from_peer_in_round() {
2898        let round = 5;
2899        let mut rng = TestRng::default();
2900        let (primary, accounts) = primary_without_handlers(&mut rng);
2901        map_account_addresses(&primary, &accounts);
2902
2903        // Generate certificates.
2904        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2905
2906        // Create a valid proposal.
2907        let timestamp = now();
2908        let proposal = create_test_proposal(
2909            primary.gateway.account(),
2910            primary.ledger.current_committee().unwrap(),
2911            round,
2912            previous_certificates,
2913            timestamp,
2914            1,
2915            &mut rng,
2916        );
2917
2918        // Store the proposal on the primary.
2919        *primary.proposed_batch.write() = Some(proposal);
2920
2921        // Each committee member signs the batch.
2922        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2923
2924        // Have the primary process the signatures.
2925        for (socket_addr, signature) in signatures {
2926            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2927        }
2928
2929        // Check the certificate was created and stored by the primary.
2930        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2931        // Check the round was incremented.
2932        assert_eq!(primary.current_round(), round + 1);
2933    }
2934
2935    #[tokio::test]
2936    async fn test_batch_signature_from_peer_no_quorum() {
2937        let mut rng = TestRng::default();
2938        let (primary, accounts) = primary_without_handlers(&mut rng);
2939        map_account_addresses(&primary, &accounts);
2940
2941        // Create a valid proposal.
2942        let round = 1;
2943        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2944        let proposal = create_test_proposal(
2945            primary.gateway.account(),
2946            primary.ledger.current_committee().unwrap(),
2947            round,
2948            Default::default(),
2949            timestamp,
2950            1,
2951            &mut rng,
2952        );
2953
2954        // Store the proposal on the primary.
2955        *primary.proposed_batch.write() = Some(proposal);
2956
2957        // Each committee member signs the batch.
2958        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2959
2960        // Have the primary process only one signature, mimicking a lack of quorum.
2961        let (socket_addr, signature) = signatures.first().unwrap();
2962        primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2963
2964        // Check the certificate was not created and stored by the primary.
2965        assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2966        // Check the round was incremented.
2967        assert_eq!(primary.current_round(), round);
2968    }
2969
2970    #[tokio::test]
2971    async fn test_batch_signature_from_peer_in_round_no_quorum() {
2972        let round = 7;
2973        let mut rng = TestRng::default();
2974        let (primary, accounts) = primary_without_handlers(&mut rng);
2975        map_account_addresses(&primary, &accounts);
2976
2977        // Generate certificates.
2978        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2979
2980        // Create a valid proposal.
2981        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2982        let proposal = create_test_proposal(
2983            primary.gateway.account(),
2984            primary.ledger.current_committee().unwrap(),
2985            round,
2986            previous_certificates,
2987            timestamp,
2988            1,
2989            &mut rng,
2990        );
2991
2992        // Store the proposal on the primary.
2993        *primary.proposed_batch.write() = Some(proposal);
2994
2995        // Each committee member signs the batch.
2996        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2997
2998        // Have the primary process only one signature, mimicking a lack of quorum.
2999        let (socket_addr, signature) = signatures.first().unwrap();
3000        primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
3001
3002        // Check the certificate was not created and stored by the primary.
3003        assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
3004        // Check the round was incremented.
3005        assert_eq!(primary.current_round(), round);
3006    }
3007
3008    #[tokio::test]
3009    async fn test_insert_certificate_with_aborted_transmissions() {
3010        let round = 3;
3011        let prev_round = round - 1;
3012        let mut rng = TestRng::default();
3013        let (primary, accounts) = primary_without_handlers(&mut rng);
3014        let peer_account = &accounts[1];
3015        let peer_ip = peer_account.0;
3016
3017        // Fill primary storage.
3018        store_certificate_chain(&primary, &accounts, round, &mut rng);
3019
3020        // Get transmissions from previous certificates.
3021        let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
3022
3023        // Generate a solution and a transaction.
3024        let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
3025        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
3026
3027        // Store it on one of the workers.
3028        primary.workers()[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
3029        primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
3030
3031        // Check that the worker has 2 transmissions.
3032        assert_eq!(primary.workers()[0].num_transmissions(), 2);
3033
3034        // Create certificates for the current round.
3035        let account = accounts[0].1.clone();
3036        let (certificate, transmissions) =
3037            create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
3038        let certificate_id = certificate.id();
3039
3040        // Randomly abort some of the transmissions.
3041        let mut aborted_transmissions = HashSet::new();
3042        let mut transmissions_without_aborted = HashMap::new();
3043        for (transmission_id, transmission) in transmissions.clone() {
3044            match rng.r#gen::<bool>() || aborted_transmissions.is_empty() {
3045                true => {
3046                    // Insert the aborted transmission.
3047                    aborted_transmissions.insert(transmission_id);
3048                }
3049                false => {
3050                    // Insert the transmission without the aborted transmission.
3051                    transmissions_without_aborted.insert(transmission_id, transmission);
3052                }
3053            };
3054        }
3055
3056        // Add the non-aborted transmissions to the worker.
3057        for (transmission_id, transmission) in transmissions_without_aborted.iter() {
3058            primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
3059        }
3060
3061        // Check that inserting the transmission with missing transmissions fails.
3062        assert!(
3063            primary
3064                .storage
3065                .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
3066                .is_err()
3067        );
3068        assert!(
3069            primary
3070                .storage
3071                .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
3072                .is_err()
3073        );
3074
3075        // Insert the certificate to storage.
3076        primary
3077            .storage
3078            .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
3079            .unwrap();
3080
3081        // Ensure the certificate exists in storage.
3082        assert!(primary.storage.contains_certificate(certificate_id));
3083        // Ensure that the aborted transmission IDs exist in storage.
3084        for aborted_transmission_id in aborted_transmissions {
3085            assert!(primary.storage.contains_transmission(aborted_transmission_id));
3086            assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
3087        }
3088    }
3089}