snarkos_node_bft/
primary.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Gateway,
18    MAX_BATCH_DELAY_IN_MS,
19    MAX_WORKERS,
20    MIN_BATCH_DELAY_IN_SECS,
21    PRIMARY_PING_IN_MS,
22    Sync,
23    Transport,
24    WORKER_PING_IN_MS,
25    Worker,
26    events::{BatchPropose, BatchSignature, Event},
27    helpers::{
28        BFTSender,
29        PrimaryReceiver,
30        PrimarySender,
31        Proposal,
32        ProposalCache,
33        SignedProposals,
34        Storage,
35        assign_to_worker,
36        assign_to_workers,
37        fmt_id,
38        init_sync_channels,
39        init_worker_channels,
40        now,
41    },
42    spawn_blocking,
43};
44use snarkos_account::Account;
45use snarkos_node_bft_events::PrimaryPing;
46use snarkos_node_bft_ledger_service::LedgerService;
47use snarkos_node_sync::{BlockSync, DUMMY_SELF_IP, Ping};
48use snarkvm::{
49    console::{
50        prelude::*,
51        types::{Address, Field},
52    },
53    ledger::{
54        block::Transaction,
55        narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
56        puzzle::{Solution, SolutionID},
57    },
58    prelude::{ConsensusVersion, committee::Committee},
59};
60
61use aleo_std::StorageMode;
62use colored::Colorize;
63use futures::stream::{FuturesUnordered, StreamExt};
64use indexmap::{IndexMap, IndexSet};
65#[cfg(feature = "locktick")]
66use locktick::{
67    parking_lot::{Mutex, RwLock},
68    tokio::Mutex as TMutex,
69};
70#[cfg(not(feature = "locktick"))]
71use parking_lot::{Mutex, RwLock};
72#[cfg(not(feature = "serial"))]
73use rayon::prelude::*;
74use std::{
75    collections::{HashMap, HashSet},
76    future::Future,
77    net::SocketAddr,
78    sync::Arc,
79    time::Duration,
80};
81#[cfg(not(feature = "locktick"))]
82use tokio::sync::Mutex as TMutex;
83use tokio::{sync::OnceCell, task::JoinHandle};
84
85/// A helper type for an optional proposed batch.
86pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
87
88/// The primary logic of a node.
89/// AleoBFT adopts a primary-worker architecture as described in the Narwhal and Tusk paper (Section 4.2).
90#[derive(Clone)]
91pub struct Primary<N: Network> {
92    /// The sync module enables fetching data from other validators.
93    sync: Sync<N>,
94    /// The gateway allows talking to other nodes in the validator set.
95    gateway: Gateway<N>,
96    /// The storage.
97    storage: Storage<N>,
98    /// The ledger service.
99    ledger: Arc<dyn LedgerService<N>>,
100    /// The workers.
101    workers: Arc<[Worker<N>]>,
102    /// The BFT sender.
103    bft_sender: Arc<OnceCell<BFTSender<N>>>,
104    /// The batch proposal, if the primary is currently proposing a batch.
105    proposed_batch: Arc<ProposedBatch<N>>,
106    /// The timestamp of the most recent proposed batch.
107    latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
108    /// The recently-signed batch proposals.
109    signed_proposals: Arc<RwLock<SignedProposals<N>>>,
110    /// The handles for all background tasks spawned by this primary.
111    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
112    /// The lock for propose_batch.
113    propose_lock: Arc<TMutex<u64>>,
114    /// The storage mode of the node.
115    storage_mode: StorageMode,
116}
117
118impl<N: Network> Primary<N> {
119    /// The maximum number of unconfirmed transmissions to send to the primary.
120    pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
121
122    /// Initializes a new primary instance.
123    #[allow(clippy::too_many_arguments)]
124    pub fn new(
125        account: Account<N>,
126        storage: Storage<N>,
127        ledger: Arc<dyn LedgerService<N>>,
128        block_sync: Arc<BlockSync<N>>,
129        ip: Option<SocketAddr>,
130        trusted_validators: &[SocketAddr],
131        storage_mode: StorageMode,
132        dev: Option<u16>,
133    ) -> Result<Self> {
134        // Initialize the gateway.
135        let gateway = Gateway::new(account, storage.clone(), ledger.clone(), ip, trusted_validators, dev)?;
136        // Initialize the sync module.
137        let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone(), block_sync);
138
139        // Initialize the primary instance.
140        Ok(Self {
141            sync,
142            gateway,
143            storage,
144            ledger,
145            workers: Arc::from(vec![]),
146            bft_sender: Default::default(),
147            proposed_batch: Default::default(),
148            latest_proposed_batch_timestamp: Default::default(),
149            signed_proposals: Default::default(),
150            handles: Default::default(),
151            propose_lock: Default::default(),
152            storage_mode,
153        })
154    }
155
156    /// Load the proposal cache file and update the Primary state with the stored data.
157    async fn load_proposal_cache(&self) -> Result<()> {
158        // Fetch the signed proposals from the file system if it exists.
159        match ProposalCache::<N>::exists(&self.storage_mode) {
160            // If the proposal cache exists, then process the proposal cache.
161            true => match ProposalCache::<N>::load(self.gateway.account().address(), &self.storage_mode) {
162                Ok(proposal_cache) => {
163                    // Extract the proposal and signed proposals.
164                    let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
165                        proposal_cache.into();
166
167                    // Write the proposed batch.
168                    *self.proposed_batch.write() = proposed_batch;
169                    // Write the signed proposals.
170                    *self.signed_proposals.write() = signed_proposals;
171                    // Writ the propose lock.
172                    *self.propose_lock.lock().await = latest_certificate_round;
173
174                    // Update the storage with the pending certificates.
175                    for certificate in pending_certificates {
176                        let batch_id = certificate.batch_id();
177                        // We use a dummy IP because the node should not need to request from any peers.
178                        // The storage should have stored all the transmissions. If not, we simply
179                        // skip the certificate.
180                        if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
181                        {
182                            warn!("Failed to load stored certificate {} from proposal cache - {err}", fmt_id(batch_id));
183                        }
184                    }
185                    Ok(())
186                }
187                Err(err) => {
188                    bail!("Failed to read the signed proposals from the file system - {err}.");
189                }
190            },
191            // If the proposal cache does not exist, then return early.
192            false => Ok(()),
193        }
194    }
195
196    /// Run the primary instance.
197    pub async fn run(
198        &mut self,
199        ping: Option<Arc<Ping<N>>>,
200        bft_sender: Option<BFTSender<N>>,
201        primary_sender: PrimarySender<N>,
202        primary_receiver: PrimaryReceiver<N>,
203    ) -> Result<()> {
204        info!("Starting the primary instance of the memory pool...");
205
206        // Set the BFT sender.
207        if let Some(bft_sender) = &bft_sender {
208            // Set the BFT sender in the primary.
209            self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set");
210        }
211
212        // Construct a map of the worker senders.
213        let mut worker_senders = IndexMap::new();
214        // Construct a map for the workers.
215        let mut workers = Vec::new();
216        // Initialize the workers.
217        for id in 0..MAX_WORKERS {
218            // Construct the worker channels.
219            let (tx_worker, rx_worker) = init_worker_channels();
220            // Construct the worker instance.
221            let worker = Worker::new(
222                id,
223                Arc::new(self.gateway.clone()),
224                self.storage.clone(),
225                self.ledger.clone(),
226                self.proposed_batch.clone(),
227            )?;
228            // Run the worker instance.
229            worker.run(rx_worker);
230            // Add the worker to the list of workers.
231            workers.push(worker);
232            // Add the worker sender to the map.
233            worker_senders.insert(id, tx_worker);
234        }
235        // Set the workers.
236        self.workers = Arc::from(workers);
237
238        // First, initialize the sync channels.
239        let (sync_sender, sync_receiver) = init_sync_channels();
240        // Next, initialize the sync module and sync the storage from ledger.
241        self.sync.initialize(bft_sender).await?;
242        // Next, load and process the proposal cache before running the sync module.
243        self.load_proposal_cache().await?;
244        // Next, run the sync module.
245        self.sync.run(ping, sync_receiver).await?;
246        // Next, initialize the gateway.
247        self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
248        // Lastly, start the primary handlers.
249        // Note: This ensures the primary does not start communicating before syncing is complete.
250        self.start_handlers(primary_receiver);
251
252        Ok(())
253    }
254
255    /// Returns the current round.
256    pub fn current_round(&self) -> u64 {
257        self.storage.current_round()
258    }
259
260    /// Returns `true` if the primary is synced.
261    pub fn is_synced(&self) -> bool {
262        self.sync.is_synced()
263    }
264
265    /// Returns the gateway.
266    pub const fn gateway(&self) -> &Gateway<N> {
267        &self.gateway
268    }
269
270    /// Returns the storage.
271    pub const fn storage(&self) -> &Storage<N> {
272        &self.storage
273    }
274
275    /// Returns the ledger.
276    pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
277        &self.ledger
278    }
279
280    /// Returns the number of workers.
281    pub fn num_workers(&self) -> u8 {
282        u8::try_from(self.workers.len()).expect("Too many workers")
283    }
284
285    /// Returns the workers.
286    pub const fn workers(&self) -> &Arc<[Worker<N>]> {
287        &self.workers
288    }
289
290    /// Returns the batch proposal of our primary, if one currently exists.
291    pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
292        &self.proposed_batch
293    }
294}
295
296impl<N: Network> Primary<N> {
297    /// Returns the number of unconfirmed transmissions.
298    pub fn num_unconfirmed_transmissions(&self) -> usize {
299        self.workers.iter().map(|worker| worker.num_transmissions()).sum()
300    }
301
302    /// Returns the number of unconfirmed ratifications.
303    pub fn num_unconfirmed_ratifications(&self) -> usize {
304        self.workers.iter().map(|worker| worker.num_ratifications()).sum()
305    }
306
307    /// Returns the number of unconfirmed solutions.
308    pub fn num_unconfirmed_solutions(&self) -> usize {
309        self.workers.iter().map(|worker| worker.num_solutions()).sum()
310    }
311
312    /// Returns the number of unconfirmed transactions.
313    pub fn num_unconfirmed_transactions(&self) -> usize {
314        self.workers.iter().map(|worker| worker.num_transactions()).sum()
315    }
316}
317
318impl<N: Network> Primary<N> {
319    /// Returns the worker transmission IDs.
320    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
321        self.workers.iter().flat_map(|worker| worker.transmission_ids())
322    }
323
324    /// Returns the worker transmissions.
325    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
326        self.workers.iter().flat_map(|worker| worker.transmissions())
327    }
328
329    /// Returns the worker solutions.
330    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
331        self.workers.iter().flat_map(|worker| worker.solutions())
332    }
333
334    /// Returns the worker transactions.
335    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
336        self.workers.iter().flat_map(|worker| worker.transactions())
337    }
338}
339
340impl<N: Network> Primary<N> {
341    /// Clears the worker solutions.
342    pub fn clear_worker_solutions(&self) {
343        self.workers.iter().for_each(Worker::clear_solutions);
344    }
345}
346
347impl<N: Network> Primary<N> {
348    /// Proposes the batch for the current round.
349    ///
350    /// This method performs the following steps:
351    /// 1. Drain the workers.
352    /// 2. Sign the batch.
353    /// 3. Set the batch proposal in the primary.
354    /// 4. Broadcast the batch header to all validators for signing.
355    pub async fn propose_batch(&self) -> Result<()> {
356        // This function isn't re-entrant.
357        let mut lock_guard = self.propose_lock.lock().await;
358
359        // Check if the proposed batch has expired, and clear it if it has expired.
360        if let Err(e) = self.check_proposed_batch_for_expiration().await {
361            warn!("Failed to check the proposed batch for expiration - {e}");
362            return Ok(());
363        }
364
365        // Retrieve the current round.
366        let round = self.current_round();
367        // Compute the previous round.
368        let previous_round = round.saturating_sub(1);
369
370        // If the current round is 0, return early.
371        // This can actually never happen, because of the invariant that the current round is never 0
372        // (see [`StorageInner::current_round`]).
373        ensure!(round > 0, "Round 0 cannot have transaction batches");
374
375        // If the current storage round is below the latest proposal round, then return early.
376        if round < *lock_guard {
377            warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
378            return Ok(());
379        }
380
381        // If there is a batch being proposed already,
382        // rebroadcast the batch header to the non-signers, and return early.
383        if let Some(proposal) = self.proposed_batch.read().as_ref() {
384            // Ensure that the storage is caught up to the proposal before proceeding to rebroadcast this.
385            if round < proposal.round()
386                || proposal
387                    .batch_header()
388                    .previous_certificate_ids()
389                    .iter()
390                    .any(|id| !self.storage.contains_certificate(*id))
391            {
392                warn!(
393                    "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
394                    proposal.round(),
395                );
396                return Ok(());
397            }
398            // Construct the event.
399            // TODO(ljedrz): the BatchHeader should be serialized only once in advance before being sent to non-signers.
400            let event = Event::BatchPropose(proposal.batch_header().clone().into());
401            // Iterate through the non-signers.
402            for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
403                // Resolve the address to the peer IP.
404                match self.gateway.resolver().get_peer_ip_for_address(address) {
405                    // Resend the batch proposal to the validator for signing.
406                    Some(peer_ip) => {
407                        let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
408                        tokio::spawn(async move {
409                            debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
410                            // Resend the batch proposal to the peer.
411                            if gateway.send(peer_ip, event_).await.is_none() {
412                                warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
413                            }
414                        });
415                    }
416                    None => continue,
417                }
418            }
419            debug!("Proposed batch for round {} is still valid", proposal.round());
420            return Ok(());
421        }
422
423        #[cfg(feature = "metrics")]
424        metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
425
426        // Ensure that the primary does not create a new proposal too quickly.
427        if let Err(e) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) {
428            debug!("Primary is safely skipping a batch proposal for round {round} - {}", format!("{e}").dimmed());
429            return Ok(());
430        }
431
432        // Ensure the primary has not proposed a batch for this round before.
433        if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
434            // If a BFT sender was provided, attempt to advance the current round.
435            if let Some(bft_sender) = self.bft_sender.get() {
436                match bft_sender.send_primary_round_to_bft(self.current_round()).await {
437                    // 'is_ready' is true if the primary is ready to propose a batch for the next round.
438                    Ok(true) => (), // continue,
439                    // 'is_ready' is false if the primary is not ready to propose a batch for the next round.
440                    Ok(false) => return Ok(()),
441                    // An error occurred while attempting to advance the current round.
442                    Err(e) => {
443                        warn!("Failed to update the BFT to the next round - {e}");
444                        return Err(e);
445                    }
446                }
447            }
448            debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
449            return Ok(());
450        }
451
452        // Determine if the current round has been proposed.
453        // Note: Do NOT make this judgment in advance before rebroadcast and round update. Rebroadcasting is
454        // good for network reliability and should not be prevented for the already existing proposed_batch.
455        // If a certificate already exists for the current round, an attempt should be made to advance the
456        // round as early as possible.
457        if round == *lock_guard {
458            warn!("Primary is safely skipping a batch proposal - round {round} already proposed");
459            return Ok(());
460        }
461
462        // Retrieve the committee to check against.
463        let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
464        // Check if the primary is connected to enough validators to reach quorum threshold.
465        {
466            // Retrieve the connected validator addresses.
467            let mut connected_validators = self.gateway.connected_addresses();
468            // Append the primary to the set.
469            connected_validators.insert(self.gateway.account().address());
470            // If quorum threshold is not reached, return early.
471            if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
472                debug!(
473                    "Primary is safely skipping a batch proposal for round {round} {}",
474                    "(please connect to more validators)".dimmed()
475                );
476                trace!("Primary is connected to {} validators", connected_validators.len() - 1);
477                return Ok(());
478            }
479        }
480
481        // Retrieve the previous certificates.
482        let previous_certificates = self.storage.get_certificates_for_round(previous_round);
483
484        // Check if the batch is ready to be proposed.
485        // Note: The primary starts at round 1, and round 0 contains no certificates, by definition.
486        let mut is_ready = previous_round == 0;
487        // If the previous round is not 0, check if the previous certificates have reached the quorum threshold.
488        if previous_round > 0 {
489            // Retrieve the committee lookback for the round.
490            let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
491                bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
492            };
493            // Construct a set over the authors.
494            let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
495            // Check if the previous certificates have reached the quorum threshold.
496            if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
497                is_ready = true;
498            }
499        }
500        // If the batch is not ready to be proposed, return early.
501        if !is_ready {
502            debug!(
503                "Primary is safely skipping a batch proposal for round {round} {}",
504                format!("(previous round {previous_round} has not reached quorum)").dimmed()
505            );
506            return Ok(());
507        }
508
509        // Initialize the map of transmissions.
510        let mut transmissions: IndexMap<_, _> = Default::default();
511        // Track the total execution costs of the batch proposal as it is being constructed.
512        let mut proposal_cost = 0u64;
513        // Note: worker draining and transaction inclusion needs to be thought
514        // through carefully when there is more than one worker. The fairness
515        // provided by one worker (FIFO) is no longer guaranteed with multiple workers.
516        debug_assert_eq!(MAX_WORKERS, 1);
517
518        'outer: for worker in self.workers().iter() {
519            let mut num_worker_transmissions = 0usize;
520
521            while let Some((id, transmission)) = worker.remove_front() {
522                // Check the selected transmissions are below the batch limit.
523                if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
524                    // Reinsert the transmission into the worker.
525                    worker.insert_front(id, transmission);
526                    break 'outer;
527                }
528
529                // Check the max transmissions per worker is not exceeded.
530                if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER {
531                    // Reinsert the transmission into the worker.
532                    worker.insert_front(id, transmission);
533                    continue 'outer;
534                }
535
536                // Check if the ledger already contains the transmission.
537                if self.ledger.contains_transmission(&id).unwrap_or(true) {
538                    trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
539                    continue;
540                }
541
542                // Check if the storage already contain the transmission.
543                // Note: We do not skip if this is the first transmission in the proposal, to ensure that
544                // the primary does not propose a batch with no transmissions.
545                if !transmissions.is_empty() && self.storage.contains_transmission(id) {
546                    trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
547                    continue;
548                }
549
550                // Check the transmission is still valid.
551                match (id, transmission.clone()) {
552                    (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
553                        // Ensure the checksum matches. If not, skip the solution.
554                        if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum)
555                        {
556                            trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id));
557                            continue;
558                        }
559                        // Check if the solution is still valid.
560                        if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
561                            trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
562                            continue;
563                        }
564                    }
565                    (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
566                        // Ensure the checksum matches. If not, skip the transaction.
567                        if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum )
568                        {
569                            trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id));
570                            continue;
571                        }
572
573                        // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
574                        let transaction = spawn_blocking!({
575                            match transaction {
576                                Data::Object(transaction) => Ok(transaction),
577                                Data::Buffer(bytes) => {
578                                    Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
579                                }
580                            }
581                        })?;
582
583                        // TODO (raychu86): Record Commitment - Remove this logic after the next migration height is reached.
584                        // ConsensusVersion V8 Migration logic -
585                        // Do not include deployments in a batch proposal.
586                        let current_block_height = self.ledger.latest_block_height();
587                        let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?;
588                        let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?;
589                        let consensus_version = N::CONSENSUS_VERSION(current_block_height)?;
590                        if current_block_height > consensus_version_v7_height
591                            && current_block_height <= consensus_version_v8_height
592                            && transaction.is_deploy()
593                        {
594                            trace!(
595                                "Proposing - Skipping transaction '{}' - Deployment transactions are not allowed until Consensus V8 (block {consensus_version_v8_height})",
596                                fmt_id(transaction_id)
597                            );
598                            continue;
599                        }
600
601                        // Check if the transaction is still valid.
602                        // TODO: check if clone is cheap, otherwise fix.
603                        if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction.clone()).await {
604                            trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
605                            continue;
606                        }
607
608                        // Compute the transaction spent cost (in microcredits).
609                        // Note: We purposefully discard this transaction if we are unable to compute the spent cost.
610                        let Ok(cost) = self.ledger.transaction_spent_cost_in_microcredits(
611                            transaction_id,
612                            transaction,
613                            consensus_version,
614                        ) else {
615                            debug!(
616                                "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost",
617                                fmt_id(transaction_id)
618                            );
619                            continue;
620                        };
621
622                        // Compute the next proposal cost.
623                        // Note: We purposefully discard this transaction if the proposal cost overflows.
624                        let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
625                            debug!(
626                                "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed",
627                                fmt_id(transaction_id)
628                            );
629                            continue;
630                        };
631
632                        // Check if the next proposal cost exceeds the batch proposal spend limit.
633                        let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(current_block_height);
634                        if next_proposal_cost > batch_spend_limit {
635                            debug!(
636                                "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})",
637                                fmt_id(transaction_id),
638                                batch_spend_limit
639                            );
640
641                            // Reinsert the transmission into the worker.
642                            worker.insert_front(id, transmission);
643                            break 'outer;
644                        }
645
646                        // Update the proposal cost.
647                        proposal_cost = next_proposal_cost;
648                    }
649
650                    // Note: We explicitly forbid including ratifications,
651                    // as the protocol currently does not support ratifications.
652                    (TransmissionID::Ratification, Transmission::Ratification) => continue,
653                    // All other combinations are clearly invalid.
654                    _ => continue,
655                }
656
657                // If the transmission is valid, insert it into the proposal's transmission list.
658                transmissions.insert(id, transmission);
659                num_worker_transmissions = num_worker_transmissions.saturating_add(1);
660            }
661        }
662
663        // Determine the current timestamp.
664        let current_timestamp = now();
665
666        *lock_guard = round;
667
668        /* Proceeding to sign & propose the batch. */
669        info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
670
671        // Retrieve the private key.
672        let private_key = *self.gateway.account().private_key();
673        // Retrieve the committee ID.
674        let committee_id = committee_lookback.id();
675        // Prepare the transmission IDs.
676        let transmission_ids = transmissions.keys().copied().collect();
677        // Prepare the previous batch certificate IDs.
678        let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
679        // Sign the batch header and construct the proposal.
680        let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
681            &private_key,
682            round,
683            current_timestamp,
684            committee_id,
685            transmission_ids,
686            previous_certificate_ids,
687            &mut rand::thread_rng()
688        ))
689        .and_then(|batch_header| {
690            Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
691                .map(|proposal| (batch_header, proposal))
692        })
693        .inspect_err(|_| {
694            // On error, reinsert the transmissions and then propagate the error.
695            if let Err(e) = self.reinsert_transmissions_into_workers(transmissions) {
696                error!("Failed to reinsert transmissions: {e:?}");
697            }
698        })?;
699        // Broadcast the batch to all validators for signing.
700        self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
701        // Set the timestamp of the latest proposed batch.
702        *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
703        // Set the proposed batch.
704        *self.proposed_batch.write() = Some(proposal);
705        Ok(())
706    }
707
708    /// Processes a batch propose from a peer.
709    ///
710    /// This method performs the following steps:
711    /// 1. Verify the batch.
712    /// 2. Sign the batch.
713    /// 3. Broadcast the signature back to the validator.
714    ///
715    /// If our primary is ahead of the peer, we will not sign the batch.
716    /// If our primary is behind the peer, but within GC range, we will sync up to the peer's round, and then sign the batch.
717    async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
718        let BatchPropose { round: batch_round, batch_header } = batch_propose;
719
720        // Deserialize the batch header.
721        let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
722        // Ensure the round matches in the batch header.
723        if batch_round != batch_header.round() {
724            // Proceed to disconnect the validator.
725            self.gateway.disconnect(peer_ip);
726            bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
727        }
728
729        // Retrieve the batch author.
730        let batch_author = batch_header.author();
731
732        // Ensure the batch proposal is from the validator.
733        match self.gateway.resolver().get_address(peer_ip) {
734            // If the peer is a validator, then ensure the batch proposal is from the validator.
735            Some(address) => {
736                if address != batch_author {
737                    // Proceed to disconnect the validator.
738                    self.gateway.disconnect(peer_ip);
739                    bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
740                }
741            }
742            None => bail!("Batch proposal from a disconnected validator"),
743        }
744        // Ensure the batch author is a current committee member.
745        if !self.gateway.is_authorized_validator_address(batch_author) {
746            // Proceed to disconnect the validator.
747            self.gateway.disconnect(peer_ip);
748            bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
749        }
750        // Ensure the batch proposal is not from the current primary.
751        if self.gateway.account().address() == batch_author {
752            bail!("Invalid peer - proposed batch from myself ({batch_author})");
753        }
754
755        // Ensure that the batch proposal's committee ID matches the expected committee ID.
756        let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
757        if expected_committee_id != batch_header.committee_id() {
758            // Proceed to disconnect the validator.
759            self.gateway.disconnect(peer_ip);
760            bail!(
761                "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
762                batch_header.committee_id()
763            );
764        }
765
766        // Retrieve the cached round and batch ID for this validator.
767        if let Some((signed_round, signed_batch_id, signature)) =
768            self.signed_proposals.read().get(&batch_author).copied()
769        {
770            // If the signed round is ahead of the peer's batch round, do not sign the proposal.
771            // Note: while this may be valid behavior, additional formal analysis and testing will need to be done before allowing it.
772            if signed_round > batch_header.round() {
773                bail!(
774                    "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}",
775                    batch_header.round()
776                );
777            }
778
779            // If the round matches and the batch ID differs, then the validator is malicious.
780            if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
781                bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
782            }
783            // If the round and batch ID matches, then skip signing the batch a second time.
784            // Instead, rebroadcast the cached signature to the peer.
785            if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
786                let gateway = self.gateway.clone();
787                tokio::spawn(async move {
788                    debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
789                    let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
790                    // Resend the batch signature to the peer.
791                    if gateway.send(peer_ip, event).await.is_none() {
792                        warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
793                    }
794                });
795                // Return early.
796                return Ok(());
797            }
798        }
799
800        // Ensure that the batch header doesn't already exist in storage.
801        // Note this is already checked in `check_batch_header`, however we can return early here without creating a blocking task.
802        if self.storage.contains_batch(batch_header.batch_id()) {
803            debug!(
804                "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
805                format!("batch for round {batch_round} already exists in storage").dimmed()
806            );
807            return Ok(());
808        }
809
810        // Compute the previous round.
811        let previous_round = batch_round.saturating_sub(1);
812        // Ensure that the peer did not propose a batch too quickly.
813        if let Err(e) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
814            // Proceed to disconnect the validator.
815            self.gateway.disconnect(peer_ip);
816            bail!("Malicious peer - {e} from '{peer_ip}'");
817        }
818
819        // Ensure the batch header does not contain any ratifications.
820        if batch_header.contains(TransmissionID::Ratification) {
821            // Proceed to disconnect the validator.
822            self.gateway.disconnect(peer_ip);
823            bail!(
824                "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'",
825            );
826        }
827
828        // If the peer is ahead, use the batch header to sync up to the peer.
829        let mut missing_transmissions = self.sync_with_batch_header_from_peer::<false>(peer_ip, &batch_header).await?;
830
831        // Check that the transmission ids match and are not fee transactions.
832        if let Err(err) = cfg_iter_mut!(&mut missing_transmissions).try_for_each(|(transmission_id, transmission)| {
833            // If the transmission is not well-formed, then return early.
834            self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
835        }) {
836            debug!("Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission - {err}",);
837            return Ok(());
838        }
839
840        // Ensure the batch is for the current round.
841        // This method must be called after fetching previous certificates (above),
842        // and prior to checking the batch header (below).
843        if let Err(e) = self.ensure_is_signing_round(batch_round) {
844            // If the primary is not signing for the peer's round, then return early.
845            debug!("{e} from '{peer_ip}'");
846            return Ok(());
847        }
848
849        // Ensure the batch header from the peer is valid.
850        let (storage, header) = (self.storage.clone(), batch_header.clone());
851        let missing_transmissions =
852            spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?;
853        // Inserts the missing transmissions into the workers.
854        self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
855
856        // Ensure the transaction doesn't bring the proposal above the spend limit.
857        let block_height = self.ledger.latest_block_height() + 1;
858        if N::CONSENSUS_VERSION(block_height)? >= ConsensusVersion::V5 {
859            let mut proposal_cost = 0u64;
860            for transmission_id in batch_header.transmission_ids() {
861                let worker_id = assign_to_worker(*transmission_id, self.num_workers())?;
862                let Some(worker) = self.workers.get(worker_id as usize) else {
863                    debug!("Unable to find worker {worker_id}");
864                    return Ok(());
865                };
866
867                let Some(transmission) = worker.get_transmission(*transmission_id) else {
868                    debug!("Unable to find transmission '{}' in worker '{worker_id}", fmt_id(transmission_id));
869                    return Ok(());
870                };
871
872                // If the transmission is a transaction, compute its execution cost.
873                if let (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) =
874                    (transmission_id, transmission)
875                {
876                    // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
877                    let transaction = spawn_blocking!({
878                        match transaction {
879                            Data::Object(transaction) => Ok(transaction),
880                            Data::Buffer(bytes) => {
881                                Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
882                            }
883                        }
884                    })?;
885
886                    // TODO (raychu86): Record Commitment - Remove this logic after the next migration height is reached.
887                    // ConsensusVersion V8 Migration logic -
888                    // Do not include deployments in a batch proposal.
889                    let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?;
890                    let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?;
891                    let consensus_version = N::CONSENSUS_VERSION(block_height)?;
892                    if block_height > consensus_version_v7_height
893                        && block_height <= consensus_version_v8_height
894                        && transaction.is_deploy()
895                    {
896                        bail!(
897                            "Invalid batch proposal - Batch proposals are not allowed to include deployments until Consensus V8 (block {consensus_version_v8_height})",
898                        )
899                    }
900
901                    // Compute the transaction spent cost (in microcredits).
902                    // Note: We purposefully discard this transaction if we are unable to compute the spent cost.
903                    let Ok(cost) = self.ledger.transaction_spent_cost_in_microcredits(
904                        *transaction_id,
905                        transaction,
906                        consensus_version,
907                    ) else {
908                        bail!(
909                            "Invalid batch proposal - Unable to compute transaction spent cost on transaction '{}'",
910                            fmt_id(transaction_id)
911                        )
912                    };
913
914                    // Compute the next proposal cost.
915                    // Note: We purposefully discard this transaction if the proposal cost overflows.
916                    let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
917                        bail!(
918                            "Invalid batch proposal - Batch proposal overflowed on transaction '{}'",
919                            fmt_id(transaction_id)
920                        )
921                    };
922
923                    // Check if the next proposal cost exceeds the batch proposal spend limit.
924                    let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(block_height);
925                    if next_proposal_cost > batch_spend_limit {
926                        bail!(
927                            "Malicious peer - Batch proposal from '{peer_ip}' exceeds the spend limit on transaction '{}' ({next_proposal_cost} > {})",
928                            fmt_id(transaction_id),
929                            batch_spend_limit
930                        );
931                    }
932
933                    // Update the proposal cost.
934                    proposal_cost = next_proposal_cost;
935                }
936            }
937        }
938
939        /* Proceeding to sign the batch. */
940
941        // Retrieve the batch ID.
942        let batch_id = batch_header.batch_id();
943        // Sign the batch ID.
944        let account = self.gateway.account().clone();
945        let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::thread_rng()))?;
946
947        // Ensure the proposal has not already been signed.
948        //
949        // Note: Due to the need to sync the batch header with the peer, it is possible
950        // for the primary to receive the same 'BatchPropose' event again, whereby only
951        // one instance of this handler should sign the batch. This check guarantees this.
952        match self.signed_proposals.write().0.entry(batch_author) {
953            std::collections::hash_map::Entry::Occupied(mut entry) => {
954                // If the validator has already signed a batch for this round, then return early,
955                // since, if the peer still has not received the signature, they will request it again,
956                // and the logic at the start of this function will resend the (now cached) signature
957                // to the peer if asked to sign this batch proposal again.
958                if entry.get().0 == batch_round {
959                    return Ok(());
960                }
961                // Otherwise, cache the round, batch ID, and signature for this validator.
962                entry.insert((batch_round, batch_id, signature));
963            }
964            // If the validator has not signed a batch before, then continue.
965            std::collections::hash_map::Entry::Vacant(entry) => {
966                // Cache the round, batch ID, and signature for this validator.
967                entry.insert((batch_round, batch_id, signature));
968            }
969        };
970
971        // Broadcast the signature back to the validator.
972        let self_ = self.clone();
973        tokio::spawn(async move {
974            let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
975            // Send the batch signature to the peer.
976            if self_.gateway.send(peer_ip, event).await.is_some() {
977                debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
978            }
979        });
980
981        Ok(())
982    }
983
984    /// Processes a batch signature from a peer.
985    ///
986    /// This method performs the following steps:
987    /// 1. Ensure the proposed batch has not expired.
988    /// 2. Verify the signature, ensuring it corresponds to the proposed batch.
989    /// 3. Store the signature.
990    /// 4. Certify the batch if enough signatures have been received.
991    /// 5. Broadcast the batch certificate to all validators.
992    async fn process_batch_signature_from_peer(
993        &self,
994        peer_ip: SocketAddr,
995        batch_signature: BatchSignature<N>,
996    ) -> Result<()> {
997        // Ensure the proposed batch has not expired, and clear the proposed batch if it has expired.
998        self.check_proposed_batch_for_expiration().await?;
999
1000        // Retrieve the signature and timestamp.
1001        let BatchSignature { batch_id, signature } = batch_signature;
1002
1003        // Retrieve the signer.
1004        let signer = signature.to_address();
1005
1006        // Ensure the batch signature is signed by the validator.
1007        if self.gateway.resolver().get_address(peer_ip) != Some(signer) {
1008            // Proceed to disconnect the validator.
1009            self.gateway.disconnect(peer_ip);
1010            bail!("Malicious peer - batch signature is from a different validator ({signer})");
1011        }
1012        // Ensure the batch signature is not from the current primary.
1013        if self.gateway.account().address() == signer {
1014            bail!("Invalid peer - received a batch signature from myself ({signer})");
1015        }
1016
1017        let self_ = self.clone();
1018        let Some(proposal) = spawn_blocking!({
1019            // Acquire the write lock.
1020            let mut proposed_batch = self_.proposed_batch.write();
1021            // Add the signature to the batch, and determine if the batch is ready to be certified.
1022            match proposed_batch.as_mut() {
1023                Some(proposal) => {
1024                    // Ensure the batch ID matches the currently proposed batch ID.
1025                    if proposal.batch_id() != batch_id {
1026                        match self_.storage.contains_batch(batch_id) {
1027                            // If this batch was already certified, return early.
1028                            true => {
1029                                debug!(
1030                                    "Primary is safely skipping a a batch signature from {peer_ip} for round {} - batch is already certified",
1031                                    proposal.round()
1032                                );
1033                                return Ok(None);
1034                            }
1035                            // If the batch ID is unknown, return an error.
1036                            false => bail!(
1037                                "Unknown batch ID '{batch_id}', expected '{}' for round {}",
1038                                proposal.batch_id(),
1039                                proposal.round()
1040                            ),
1041                        }
1042                    }
1043                    // Retrieve the committee lookback for the round.
1044                    let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?;
1045                    // Retrieve the address of the validator.
1046                    let Some(signer) = self_.gateway.resolver().get_address(peer_ip) else {
1047                        bail!("Signature is from a disconnected validator");
1048                    };
1049                    // Add the signature to the batch.
1050                    proposal.add_signature(signer, signature, &committee_lookback)?;
1051                    info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
1052                    // Check if the batch is ready to be certified.
1053                    if !proposal.is_quorum_threshold_reached(&committee_lookback) {
1054                        // If the batch is not ready to be certified, return early.
1055                        return Ok(None);
1056                    }
1057                }
1058                // There is no proposed batch, so return early.
1059                None => return Ok(None),
1060            };
1061            // Retrieve the batch proposal, clearing the proposed batch.
1062            match proposed_batch.take() {
1063                Some(proposal) => Ok(Some(proposal)),
1064                None => Ok(None),
1065            }
1066        })?
1067        else {
1068            return Ok(());
1069        };
1070
1071        /* Proceeding to certify the batch. */
1072
1073        info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
1074
1075        // Retrieve the committee lookback for the round.
1076        let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1077        // Store the certified batch and broadcast it to all validators.
1078        // If there was an error storing the certificate, reinsert the transmissions back into the ready queue.
1079        if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
1080            // Reinsert the transmissions back into the ready queue for the next proposal.
1081            self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1082            return Err(e);
1083        }
1084
1085        #[cfg(feature = "metrics")]
1086        metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
1087        Ok(())
1088    }
1089
1090    /// Processes a batch certificate from a peer.
1091    ///
1092    /// This method performs the following steps:
1093    /// 1. Stores the given batch certificate, after ensuring it is valid.
1094    /// 2. If there are enough certificates to reach quorum threshold for the current round,
1095    ///    then proceed to advance to the next round.
1096    async fn process_batch_certificate_from_peer(
1097        &self,
1098        peer_ip: SocketAddr,
1099        certificate: BatchCertificate<N>,
1100    ) -> Result<()> {
1101        // Ensure the batch certificate is from an authorized validator.
1102        if !self.gateway.is_authorized_validator_ip(peer_ip) {
1103            // Proceed to disconnect the validator.
1104            self.gateway.disconnect(peer_ip);
1105            bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
1106        }
1107        // Ensure storage does not already contain the certificate.
1108        if self.storage.contains_certificate(certificate.id()) {
1109            return Ok(());
1110        // Otherwise, ensure ephemeral storage contains the certificate.
1111        } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
1112            self.storage.insert_unprocessed_certificate(certificate.clone())?;
1113        }
1114
1115        // Retrieve the batch certificate author.
1116        let author = certificate.author();
1117        // Retrieve the batch certificate round.
1118        let certificate_round = certificate.round();
1119        // Retrieve the batch certificate committee ID.
1120        let committee_id = certificate.committee_id();
1121
1122        // Ensure the batch certificate is not from the current primary.
1123        if self.gateway.account().address() == author {
1124            bail!("Received a batch certificate for myself ({author})");
1125        }
1126
1127        // Ensure that the incoming certificate is valid.
1128        self.storage.check_incoming_certificate(&certificate)?;
1129
1130        // Store the certificate, after ensuring it is valid above.
1131        // The following call recursively fetches and stores
1132        // the previous certificates referenced from this certificate.
1133        // It is critical to make the following call this after validating the certificate above.
1134        // The reason is that a sequence of malformed certificates,
1135        // with references to previous certificates with non-decreasing rounds,
1136        // cause the recursive fetching of certificates to crash the validator due to resource exhaustion.
1137        // Note that if the following call, if not returning an error, guarantees the backward closure of the DAG
1138        // (i.e. that all the referenced previous certificates are in the DAG before storing this one),
1139        // then all the validity checks in [`Storage::check_certificate`] should be redundant.
1140        // TODO: eliminate those redundant checks
1141        self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
1142
1143        // If there are enough certificates to reach quorum threshold for the certificate round,
1144        // then proceed to advance to the next round.
1145
1146        // Retrieve the committee lookback.
1147        let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
1148
1149        // Retrieve the certificate authors.
1150        let authors = self.storage.get_certificate_authors_for_round(certificate_round);
1151        // Check if the certificates have reached the quorum threshold.
1152        let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
1153
1154        // Ensure that the batch certificate's committee ID matches the expected committee ID.
1155        let expected_committee_id = committee_lookback.id();
1156        if expected_committee_id != committee_id {
1157            // Proceed to disconnect the validator.
1158            self.gateway.disconnect(peer_ip);
1159            bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
1160        }
1161
1162        // Determine if we are currently proposing a round that is relevant.
1163        // Note: This is important, because while our peers have advanced,
1164        // they may not be proposing yet, and thus still able to sign our proposed batch.
1165        let should_advance = match &*self.proposed_batch.read() {
1166            // We advance if the proposal round is less than the current round that was just certified.
1167            Some(proposal) => proposal.round() < certificate_round,
1168            // If there's no proposal, we consider advancing.
1169            None => true,
1170        };
1171
1172        // Retrieve the current round.
1173        let current_round = self.current_round();
1174
1175        // Determine whether to advance to the next round.
1176        if is_quorum && should_advance && certificate_round >= current_round {
1177            // If we have reached the quorum threshold and the round should advance, then proceed to the next round.
1178            self.try_increment_to_the_next_round(current_round + 1).await?;
1179        }
1180        Ok(())
1181    }
1182}
1183
1184impl<N: Network> Primary<N> {
1185    /// Starts the primary handlers.
1186    ///
1187    /// For each receiver in the `primary_receiver` struct, there will be a dedicated task
1188    /// that awaits new data and handles it accordingly.
1189    /// Additionally, this spawns a task that periodically issues PrimaryPings and one that periodically
1190    /// tries to move the the next round of batches.
1191    ///
1192    /// This function is called exactly once, in `Self::run()`.
1193    fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
1194        let PrimaryReceiver {
1195            mut rx_batch_propose,
1196            mut rx_batch_signature,
1197            mut rx_batch_certified,
1198            mut rx_primary_ping,
1199            mut rx_unconfirmed_solution,
1200            mut rx_unconfirmed_transaction,
1201        } = primary_receiver;
1202
1203        // Start the primary ping sender.
1204        let self_ = self.clone();
1205        self.spawn(async move {
1206            loop {
1207                // Sleep briefly.
1208                tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
1209
1210                // Retrieve the block locators.
1211                let self__ = self_.clone();
1212                let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1213                    Ok(block_locators) => block_locators,
1214                    Err(e) => {
1215                        warn!("Failed to retrieve block locators - {e}");
1216                        continue;
1217                    }
1218                };
1219
1220                // Retrieve the latest certificate of the primary.
1221                let primary_certificate = {
1222                    // Retrieve the primary address.
1223                    let primary_address = self_.gateway.account().address();
1224
1225                    // Iterate backwards from the latest round to find the primary certificate.
1226                    let mut certificate = None;
1227                    let mut current_round = self_.current_round();
1228                    while certificate.is_none() {
1229                        // If the current round is 0, then break the while loop.
1230                        if current_round == 0 {
1231                            break;
1232                        }
1233                        // Retrieve the primary certificates.
1234                        if let Some(primary_certificate) =
1235                            self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1236                        {
1237                            certificate = Some(primary_certificate);
1238                        // If the primary certificate was not found, decrement the round.
1239                        } else {
1240                            current_round = current_round.saturating_sub(1);
1241                        }
1242                    }
1243
1244                    // Determine if the primary certificate was found.
1245                    match certificate {
1246                        Some(certificate) => certificate,
1247                        // Skip this iteration of the loop (do not send a primary ping).
1248                        None => continue,
1249                    }
1250                };
1251
1252                // Construct the primary ping.
1253                let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1254                // Broadcast the event.
1255                self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1256            }
1257        });
1258
1259        // Start the primary ping handler.
1260        let self_ = self.clone();
1261        self.spawn(async move {
1262            while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1263                // If the primary is not synced, then do not process the primary ping.
1264                if self_.sync.is_synced() {
1265                    trace!("Processing new primary ping from '{peer_ip}'");
1266                } else {
1267                    trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1268                    continue;
1269                }
1270
1271                // Spawn a task to process the primary certificate.
1272                {
1273                    let self_ = self_.clone();
1274                    tokio::spawn(async move {
1275                        // Deserialize the primary certificate in the primary ping.
1276                        let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1277                        else {
1278                            warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1279                            return;
1280                        };
1281                        // Process the primary certificate.
1282                        let id = fmt_id(primary_certificate.id());
1283                        let round = primary_certificate.round();
1284                        if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1285                            warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}");
1286                        }
1287                    });
1288                }
1289            }
1290        });
1291
1292        // Start the worker ping(s).
1293        let self_ = self.clone();
1294        self.spawn(async move {
1295            loop {
1296                tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await;
1297                // If the primary is not synced, then do not broadcast the worker ping(s).
1298                if !self_.sync.is_synced() {
1299                    trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1300                    continue;
1301                }
1302                // Broadcast the worker ping(s).
1303                for worker in self_.workers.iter() {
1304                    worker.broadcast_ping();
1305                }
1306            }
1307        });
1308
1309        // Start the batch proposer.
1310        let self_ = self.clone();
1311        self.spawn(async move {
1312            loop {
1313                // Sleep briefly, but longer than if there were no batch.
1314                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1315                let current_round = self_.current_round();
1316                // If the primary is not synced, then do not propose a batch.
1317                if !self_.sync.is_synced() {
1318                    debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed());
1319                    continue;
1320                }
1321                // A best-effort attempt to skip the scheduled batch proposal if
1322                // round progression already triggered one.
1323                if self_.propose_lock.try_lock().is_err() {
1324                    trace!(
1325                        "Skipping batch proposal for round {current_round} {}",
1326                        "(node is already proposing)".dimmed()
1327                    );
1328                    continue;
1329                };
1330                // If there is no proposed batch, attempt to propose a batch.
1331                // Note: Do NOT spawn a task around this function call. Proposing a batch is a critical path,
1332                // and only one batch needs to be proposed at a time.
1333                if let Err(e) = self_.propose_batch().await {
1334                    warn!("Cannot propose a batch - {e}");
1335                }
1336            }
1337        });
1338
1339        // Start the proposed batch handler.
1340        let self_ = self.clone();
1341        self.spawn(async move {
1342            while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1343                // If the primary is not synced, then do not sign the batch.
1344                if !self_.sync.is_synced() {
1345                    trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1346                    continue;
1347                }
1348                // Spawn a task to process the proposed batch.
1349                let self_ = self_.clone();
1350                tokio::spawn(async move {
1351                    // Process the batch proposal.
1352                    let round = batch_propose.round;
1353                    if let Err(e) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1354                        warn!("Cannot sign a batch at round {round} from '{peer_ip}' - {e}");
1355                    }
1356                });
1357            }
1358        });
1359
1360        // Start the batch signature handler.
1361        let self_ = self.clone();
1362        self.spawn(async move {
1363            while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1364                // If the primary is not synced, then do not store the signature.
1365                if !self_.sync.is_synced() {
1366                    trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1367                    continue;
1368                }
1369                // Process the batch signature.
1370                // Note: Do NOT spawn a task around this function call. Processing signatures from peers
1371                // is a critical path, and we should only store the minimum required number of signatures.
1372                // In addition, spawning a task can cause concurrent processing of signatures (even with a lock),
1373                // which means the RwLock for the proposed batch must become a 'tokio::sync' to be safe.
1374                let id = fmt_id(batch_signature.batch_id);
1375                if let Err(e) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1376                    warn!("Cannot store a signature for batch '{id}' from '{peer_ip}' - {e}");
1377                }
1378            }
1379        });
1380
1381        // Start the certified batch handler.
1382        let self_ = self.clone();
1383        self.spawn(async move {
1384            while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1385                // If the primary is not synced, then do not store the certificate.
1386                if !self_.sync.is_synced() {
1387                    trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1388                    continue;
1389                }
1390                // Spawn a task to process the batch certificate.
1391                let self_ = self_.clone();
1392                tokio::spawn(async move {
1393                    // Deserialize the batch certificate.
1394                    let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1395                        warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1396                        return;
1397                    };
1398                    // Process the batch certificate.
1399                    let id = fmt_id(batch_certificate.id());
1400                    let round = batch_certificate.round();
1401                    if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1402                        warn!("Cannot store a certificate '{id}' for round {round} from '{peer_ip}' - {e}");
1403                    }
1404                });
1405            }
1406        });
1407
1408        // This task periodically tries to move to the next round.
1409        //
1410        // Note: This is necessary to ensure that the primary is not stuck on a previous round
1411        // despite having received enough certificates to advance to the next round.
1412        let self_ = self.clone();
1413        self.spawn(async move {
1414            loop {
1415                // Sleep briefly.
1416                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1417                // If the primary is not synced, then do not increment to the next round.
1418                if !self_.sync.is_synced() {
1419                    trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1420                    continue;
1421                }
1422                // Attempt to increment to the next round.
1423                let current_round = self_.current_round();
1424                let next_round = current_round.saturating_add(1);
1425                // Determine if the quorum threshold is reached for the current round.
1426                let is_quorum_threshold_reached = {
1427                    // Retrieve the certificate authors for the current round.
1428                    let authors = self_.storage.get_certificate_authors_for_round(current_round);
1429                    // If there are no certificates, then skip this check.
1430                    if authors.is_empty() {
1431                        continue;
1432                    }
1433                    // Retrieve the committee lookback for the current round.
1434                    let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(current_round) else {
1435                        warn!("Failed to retrieve the committee lookback for round {current_round}");
1436                        continue;
1437                    };
1438                    // Check if the quorum threshold is reached for the current round.
1439                    committee_lookback.is_quorum_threshold_reached(&authors)
1440                };
1441                // Attempt to increment to the next round if the quorum threshold is reached.
1442                if is_quorum_threshold_reached {
1443                    debug!("Quorum threshold reached for round {}", current_round);
1444                    if let Err(e) = self_.try_increment_to_the_next_round(next_round).await {
1445                        warn!("Failed to increment to the next round - {e}");
1446                    }
1447                }
1448            }
1449        });
1450
1451        // Start a handler to process new unconfirmed solutions.
1452        let self_ = self.clone();
1453        self.spawn(async move {
1454            while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1455                // Compute the checksum for the solution.
1456                let Ok(checksum) = solution.to_checksum::<N>() else {
1457                    error!("Failed to compute the checksum for the unconfirmed solution");
1458                    continue;
1459                };
1460                // Compute the worker ID.
1461                let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1462                    error!("Unable to determine the worker ID for the unconfirmed solution");
1463                    continue;
1464                };
1465                let self_ = self_.clone();
1466                tokio::spawn(async move {
1467                    // Retrieve the worker.
1468                    let worker = &self_.workers[worker_id as usize];
1469                    // Process the unconfirmed solution.
1470                    let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1471                    // Send the result to the callback.
1472                    callback.send(result).ok();
1473                });
1474            }
1475        });
1476
1477        // Start a handler to process new unconfirmed transactions.
1478        let self_ = self.clone();
1479        self.spawn(async move {
1480            while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1481                trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1482                // Compute the checksum for the transaction.
1483                let Ok(checksum) = transaction.to_checksum::<N>() else {
1484                    error!("Failed to compute the checksum for the unconfirmed transaction");
1485                    continue;
1486                };
1487                // Compute the worker ID.
1488                let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1489                    error!("Unable to determine the worker ID for the unconfirmed transaction");
1490                    continue;
1491                };
1492                let self_ = self_.clone();
1493                tokio::spawn(async move {
1494                    // Retrieve the worker.
1495                    let worker = &self_.workers[worker_id as usize];
1496                    // Process the unconfirmed transaction.
1497                    let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1498                    // Send the result to the callback.
1499                    callback.send(result).ok();
1500                });
1501            }
1502        });
1503    }
1504
1505    /// Checks if the proposed batch is expired, and clears the proposed batch if it has expired.
1506    async fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1507        // Check if the proposed batch is timed out or stale.
1508        let is_expired = match self.proposed_batch.read().as_ref() {
1509            Some(proposal) => proposal.round() < self.current_round(),
1510            None => false,
1511        };
1512        // If the batch is expired, clear the proposed batch.
1513        if is_expired {
1514            // Reset the proposed batch.
1515            let proposal = self.proposed_batch.write().take();
1516            if let Some(proposal) = proposal {
1517                debug!("Cleared expired proposal for round {}", proposal.round());
1518                self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1519            }
1520        }
1521        Ok(())
1522    }
1523
1524    /// Increments to the next round.
1525    async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1526        // If the next round is within GC range, then iterate to the penultimate round.
1527        if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1528            let mut fast_forward_round = self.current_round();
1529            // Iterate until the penultimate round is reached.
1530            while fast_forward_round < next_round.saturating_sub(1) {
1531                // Update to the next round in storage.
1532                fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1533                // Clear the proposed batch.
1534                *self.proposed_batch.write() = None;
1535            }
1536        }
1537
1538        // Retrieve the current round.
1539        let current_round = self.current_round();
1540        // Attempt to advance to the next round.
1541        if current_round < next_round {
1542            // If a BFT sender was provided, send the current round to the BFT.
1543            let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
1544                match bft_sender.send_primary_round_to_bft(current_round).await {
1545                    Ok(is_ready) => is_ready,
1546                    Err(e) => {
1547                        warn!("Failed to update the BFT to the next round - {e}");
1548                        return Err(e);
1549                    }
1550                }
1551            }
1552            // Otherwise, handle the Narwhal case.
1553            else {
1554                // Update to the next round in storage.
1555                self.storage.increment_to_next_round(current_round)?;
1556                // Set 'is_ready' to 'true'.
1557                true
1558            };
1559
1560            // Log whether the next round is ready.
1561            match is_ready {
1562                true => debug!("Primary is ready to propose the next round"),
1563                false => debug!("Primary is not ready to propose the next round"),
1564            }
1565
1566            // If the node is ready, propose a batch for the next round.
1567            if is_ready {
1568                self.propose_batch().await?;
1569            }
1570        }
1571        Ok(())
1572    }
1573
1574    /// Ensures the primary is signing for the specified batch round.
1575    /// This method is used to ensure: for a given round, as soon as the primary starts proposing,
1576    /// it will no longer sign for the previous round (as it has enough previous certificates to proceed).
1577    fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1578        // Retrieve the current round.
1579        let current_round = self.current_round();
1580        // Ensure the batch round is within GC range of the current round.
1581        if current_round + self.storage.max_gc_rounds() <= batch_round {
1582            bail!("Round {batch_round} is too far in the future")
1583        }
1584        // Ensure the batch round is at or one before the current round.
1585        // Intuition: Our primary has moved on to the next round, but has not necessarily started proposing,
1586        // so we can still sign for the previous round. If we have started proposing, the next check will fail.
1587        if current_round > batch_round + 1 {
1588            bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1589        }
1590        // Check if the primary is still signing for the batch round.
1591        if let Some(signing_round) = self.proposed_batch.read().as_ref().map(|proposal| proposal.round()) {
1592            if signing_round > batch_round {
1593                bail!("Our primary at round {signing_round} is no longer signing for round {batch_round}")
1594            }
1595        }
1596        Ok(())
1597    }
1598
1599    /// Ensure the primary is not creating batch proposals too frequently.
1600    /// This checks that the certificate timestamp for the previous round is within the expected range.
1601    fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1602        // Retrieve the timestamp of the previous timestamp to check against.
1603        let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1604            // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago.
1605            Some(certificate) => certificate.timestamp(),
1606            None => match self.gateway.account().address() == author {
1607                // If we are the author, then ensure the previous proposal was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago.
1608                true => *self.latest_proposed_batch_timestamp.read(),
1609                // If we do not see a previous certificate for the author, then proceed optimistically.
1610                false => return Ok(()),
1611            },
1612        };
1613
1614        // Determine the elapsed time since the previous timestamp.
1615        let elapsed = timestamp
1616            .checked_sub(previous_timestamp)
1617            .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1618        // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago.
1619        match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1620            true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1621            false => Ok(()),
1622        }
1623    }
1624
1625    /// Stores the certified batch and broadcasts it to all validators, returning the certificate.
1626    async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1627        // Create the batch certificate and transmissions.
1628        let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1629        // Convert the transmissions into a HashMap.
1630        // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
1631        let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1632        // Store the certified batch.
1633        let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1634        spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1635        debug!("Stored a batch certificate for round {}", certificate.round());
1636        // If a BFT sender was provided, send the certificate to the BFT.
1637        if let Some(bft_sender) = self.bft_sender.get() {
1638            // Await the callback to continue.
1639            if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1640                warn!("Failed to update the BFT DAG from primary - {e}");
1641                return Err(e);
1642            };
1643        }
1644        // Broadcast the certified batch to all validators.
1645        self.gateway.broadcast(Event::BatchCertified(certificate.clone().into()));
1646        // Log the certified batch.
1647        let num_transmissions = certificate.transmission_ids().len();
1648        let round = certificate.round();
1649        info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1650        // Increment to the next round.
1651        self.try_increment_to_the_next_round(round + 1).await
1652    }
1653
1654    /// Inserts the missing transmissions from the proposal into the workers.
1655    fn insert_missing_transmissions_into_workers(
1656        &self,
1657        peer_ip: SocketAddr,
1658        transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1659    ) -> Result<()> {
1660        // Insert the transmissions into the workers.
1661        assign_to_workers(&self.workers, transmissions, |worker, transmission_id, transmission| {
1662            worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1663        })
1664    }
1665
1666    /// Re-inserts the transmissions from the proposal into the workers.
1667    fn reinsert_transmissions_into_workers(
1668        &self,
1669        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1670    ) -> Result<()> {
1671        // Re-insert the transmissions into the workers.
1672        assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1673            worker.reinsert(transmission_id, transmission);
1674        })
1675    }
1676
1677    /// Recursively stores a given batch certificate, after ensuring:
1678    ///   - Ensure the round matches the committee round.
1679    ///   - Ensure the address is a member of the committee.
1680    ///   - Ensure the timestamp is within range.
1681    ///   - Ensure we have all of the transmissions.
1682    ///   - Ensure we have all of the previous certificates.
1683    ///   - Ensure the previous certificates are for the previous round (i.e. round - 1).
1684    ///   - Ensure the previous certificates have reached the quorum threshold.
1685    ///   - Ensure we have not already signed the batch ID.
1686    #[async_recursion::async_recursion]
1687    async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1688        &self,
1689        peer_ip: SocketAddr,
1690        certificate: BatchCertificate<N>,
1691    ) -> Result<()> {
1692        // Retrieve the batch header.
1693        let batch_header = certificate.batch_header();
1694        // Retrieve the batch round.
1695        let batch_round = batch_header.round();
1696
1697        // If the certificate round is outdated, do not store it.
1698        if batch_round <= self.storage.gc_round() {
1699            return Ok(());
1700        }
1701        // If the certificate already exists in storage, return early.
1702        if self.storage.contains_certificate(certificate.id()) {
1703            return Ok(());
1704        }
1705
1706        // If node is not in sync mode and the node is not synced. Then return an error.
1707        if !IS_SYNCING && !self.is_synced() {
1708            bail!(
1709                "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1710                fmt_id(certificate.id())
1711            );
1712        }
1713
1714        // If the peer is ahead, use the batch header to sync up to the peer.
1715        let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?;
1716
1717        // Check if the certificate needs to be stored.
1718        if !self.storage.contains_certificate(certificate.id()) {
1719            // Store the batch certificate.
1720            let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1721            spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1722            debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1723            // If a BFT sender was provided, send the round and certificate to the BFT.
1724            if let Some(bft_sender) = self.bft_sender.get() {
1725                // Send the certificate to the BFT.
1726                if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1727                    warn!("Failed to update the BFT DAG from sync: {e}");
1728                    return Err(e);
1729                };
1730            }
1731        }
1732        Ok(())
1733    }
1734
1735    /// Recursively syncs using the given batch header.
1736    async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>(
1737        &self,
1738        peer_ip: SocketAddr,
1739        batch_header: &BatchHeader<N>,
1740    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1741        // Retrieve the batch round.
1742        let batch_round = batch_header.round();
1743
1744        // If the certificate round is outdated, do not store it.
1745        if batch_round <= self.storage.gc_round() {
1746            bail!("Round {batch_round} is too far in the past")
1747        }
1748
1749        // If node is not in sync mode and the node is not synced, then return an error.
1750        if !IS_SYNCING && !self.is_synced() {
1751            bail!(
1752                "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1753                fmt_id(batch_header.batch_id())
1754            );
1755        }
1756
1757        // Determine if quorum threshold is reached on the batch round.
1758        let is_quorum_threshold_reached = {
1759            let authors = self.storage.get_certificate_authors_for_round(batch_round);
1760            let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1761            committee_lookback.is_quorum_threshold_reached(&authors)
1762        };
1763
1764        // Check if our primary should move to the next round.
1765        // Note: Checking that quorum threshold is reached is important for mitigating a race condition,
1766        // whereby Narwhal requires N-f, however the BFT only requires f+1. Without this check, the primary
1767        // will advance to the next round assuming f+1, not N-f, which can lead to a network stall.
1768        let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1769        // Check if our primary is far behind the peer.
1770        let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1771        // If our primary is far behind the peer, update our committee to the batch round.
1772        if is_behind_schedule || is_peer_far_in_future {
1773            // If the batch round is greater than the current committee round, update the committee.
1774            self.try_increment_to_the_next_round(batch_round).await?;
1775        }
1776
1777        // Ensure the primary has all of the transmissions.
1778        let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1779
1780        // Ensure the primary has all of the previous certificates.
1781        let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1782
1783        // Wait for the missing transmissions and previous certificates to be fetched.
1784        let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1785            missing_transmissions_handle,
1786            missing_previous_certificates_handle,
1787        ).map_err(|e| {
1788            anyhow!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}' - {e}")
1789        })?;
1790
1791        // Iterate through the missing previous certificates.
1792        for batch_certificate in missing_previous_certificates {
1793            // Store the batch certificate (recursively fetching any missing previous certificates).
1794            self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?;
1795        }
1796        Ok(missing_transmissions)
1797    }
1798
1799    /// Fetches any missing transmissions for the specified batch header.
1800    /// If a transmission does not exist, it will be fetched from the specified peer IP.
1801    async fn fetch_missing_transmissions(
1802        &self,
1803        peer_ip: SocketAddr,
1804        batch_header: &BatchHeader<N>,
1805    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1806        // If the round is <= the GC round, return early.
1807        if batch_header.round() <= self.storage.gc_round() {
1808            return Ok(Default::default());
1809        }
1810
1811        // Ensure this batch ID is new, otherwise return early.
1812        if self.storage.contains_batch(batch_header.batch_id()) {
1813            trace!("Batch for round {} from peer has already been processed", batch_header.round());
1814            return Ok(Default::default());
1815        }
1816
1817        // Retrieve the workers.
1818        let workers = self.workers.clone();
1819
1820        // Initialize a list for the transmissions.
1821        let mut fetch_transmissions = FuturesUnordered::new();
1822
1823        // Retrieve the number of workers.
1824        let num_workers = self.num_workers();
1825        // Iterate through the transmission IDs.
1826        for transmission_id in batch_header.transmission_ids() {
1827            // If the transmission does not exist in storage, proceed to fetch the transmission.
1828            if !self.storage.contains_transmission(*transmission_id) {
1829                // Determine the worker ID.
1830                let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1831                    bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1832                };
1833                // Retrieve the worker.
1834                let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1835                // Push the callback onto the list.
1836                fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
1837            }
1838        }
1839
1840        // Initialize a set for the transmissions.
1841        let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1842        // Wait for all of the transmissions to be fetched.
1843        while let Some(result) = fetch_transmissions.next().await {
1844            // Retrieve the transmission.
1845            let (transmission_id, transmission) = result?;
1846            // Insert the transmission into the set.
1847            transmissions.insert(transmission_id, transmission);
1848        }
1849        // Return the transmissions.
1850        Ok(transmissions)
1851    }
1852
1853    /// Fetches any missing previous certificates for the specified batch header from the specified peer.
1854    async fn fetch_missing_previous_certificates(
1855        &self,
1856        peer_ip: SocketAddr,
1857        batch_header: &BatchHeader<N>,
1858    ) -> Result<HashSet<BatchCertificate<N>>> {
1859        // Retrieve the round.
1860        let round = batch_header.round();
1861        // If the previous round is 0, or is <= the GC round, return early.
1862        if round == 1 || round <= self.storage.gc_round() + 1 {
1863            return Ok(Default::default());
1864        }
1865
1866        // Fetch the missing previous certificates.
1867        let missing_previous_certificates =
1868            self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
1869        if !missing_previous_certificates.is_empty() {
1870            debug!(
1871                "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
1872                missing_previous_certificates.len(),
1873            );
1874        }
1875        // Return the missing previous certificates.
1876        Ok(missing_previous_certificates)
1877    }
1878
1879    /// Fetches any missing certificates for the specified batch header from the specified peer.
1880    async fn fetch_missing_certificates(
1881        &self,
1882        peer_ip: SocketAddr,
1883        round: u64,
1884        certificate_ids: &IndexSet<Field<N>>,
1885    ) -> Result<HashSet<BatchCertificate<N>>> {
1886        // Initialize a list for the missing certificates.
1887        let mut fetch_certificates = FuturesUnordered::new();
1888        // Initialize a set for the missing certificates.
1889        let mut missing_certificates = HashSet::default();
1890        // Iterate through the certificate IDs.
1891        for certificate_id in certificate_ids {
1892            // Check if the certificate already exists in the ledger.
1893            if self.ledger.contains_certificate(certificate_id)? {
1894                continue;
1895            }
1896            // Check if the certificate already exists in storage.
1897            if self.storage.contains_certificate(*certificate_id) {
1898                continue;
1899            }
1900            // If we have not fully processed the certificate yet, store it.
1901            if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
1902                missing_certificates.insert(certificate);
1903            } else {
1904                // If we do not have the certificate, request it.
1905                trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
1906                // TODO (howardwu): Limit the number of open requests we send to a peer.
1907                // Send an certificate request to the peer.
1908                fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
1909            }
1910        }
1911
1912        // If there are no certificates to fetch, return early with the existing unprocessed certificates.
1913        match fetch_certificates.is_empty() {
1914            true => return Ok(missing_certificates),
1915            false => trace!(
1916                "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
1917                fetch_certificates.len(),
1918            ),
1919        }
1920
1921        // Wait for all of the missing certificates to be fetched.
1922        while let Some(result) = fetch_certificates.next().await {
1923            // Insert the missing certificate into the set.
1924            missing_certificates.insert(result?);
1925        }
1926        // Return the missing certificates.
1927        Ok(missing_certificates)
1928    }
1929}
1930
1931impl<N: Network> Primary<N> {
1932    /// Spawns a task with the given future; it should only be used for long-running tasks.
1933    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1934        self.handles.lock().push(tokio::spawn(future));
1935    }
1936
1937    /// Shuts down the primary.
1938    pub async fn shut_down(&self) {
1939        info!("Shutting down the primary...");
1940        // Shut down the workers.
1941        self.workers.iter().for_each(|worker| worker.shut_down());
1942        // Abort the tasks.
1943        self.handles.lock().iter().for_each(|handle| handle.abort());
1944        // Save the current proposal cache to disk.
1945        let proposal_cache = {
1946            let proposal = self.proposed_batch.write().take();
1947            let signed_proposals = self.signed_proposals.read().clone();
1948            let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
1949            let pending_certificates = self.storage.get_pending_certificates();
1950            ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
1951        };
1952        if let Err(err) = proposal_cache.store(&self.storage_mode) {
1953            error!("Failed to store the current proposal cache: {err}");
1954        }
1955        // Close the gateway.
1956        self.gateway.shut_down().await;
1957    }
1958}
1959
1960#[cfg(test)]
1961mod tests {
1962    use super::*;
1963    use snarkos_node_bft_ledger_service::MockLedgerService;
1964    use snarkos_node_bft_storage_service::BFTMemoryService;
1965    use snarkos_node_sync::{BlockSync, locators::test_helpers::sample_block_locators};
1966    use snarkvm::{
1967        ledger::{
1968            committee::{Committee, MIN_VALIDATOR_STAKE},
1969            snarkvm_ledger_test_helpers::sample_execution_transaction_with_fee,
1970        },
1971        prelude::{Address, Signature},
1972    };
1973
1974    use bytes::Bytes;
1975    use indexmap::IndexSet;
1976    use rand::RngCore;
1977
1978    type CurrentNetwork = snarkvm::prelude::MainnetV0;
1979
1980    fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) {
1981        // Create a committee containing the primary's account.
1982        const COMMITTEE_SIZE: usize = 4;
1983        let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
1984        let mut members = IndexMap::new();
1985
1986        for i in 0..COMMITTEE_SIZE {
1987            let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
1988            let account = Account::new(rng).unwrap();
1989
1990            members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
1991            accounts.push((socket_addr, account));
1992        }
1993
1994        (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
1995    }
1996
1997    // Returns a primary and a list of accounts in the configured committee.
1998    fn primary_with_committee(
1999        account_index: usize,
2000        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2001        committee: Committee<CurrentNetwork>,
2002        height: u32,
2003    ) -> Primary<CurrentNetwork> {
2004        let ledger = Arc::new(MockLedgerService::new_at_height(committee, height));
2005        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
2006
2007        // Initialize the primary.
2008        let account = accounts[account_index].1.clone();
2009        let block_sync = Arc::new(BlockSync::new(ledger.clone()));
2010        let mut primary =
2011            Primary::new(account, storage, ledger, block_sync, None, &[], StorageMode::Test(None), None).unwrap();
2012
2013        // Construct a worker instance.
2014        primary.workers = Arc::from([Worker::new(
2015            0, // id
2016            Arc::new(primary.gateway.clone()),
2017            primary.storage.clone(),
2018            primary.ledger.clone(),
2019            primary.proposed_batch.clone(),
2020        )
2021        .unwrap()]);
2022        for a in accounts.iter().skip(account_index) {
2023            primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
2024        }
2025
2026        primary
2027    }
2028
2029    fn primary_without_handlers(
2030        rng: &mut TestRng,
2031    ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
2032        let (accounts, committee) = sample_committee(rng);
2033        let primary = primary_with_committee(
2034            0, // index of primary's account
2035            &accounts,
2036            committee,
2037            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(),
2038        );
2039
2040        (primary, accounts)
2041    }
2042
2043    // Creates a mock solution.
2044    fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
2045        // Sample a random fake solution ID.
2046        let solution_id = rng.r#gen::<u64>().into();
2047        // Vary the size of the solutions.
2048        let size = rng.gen_range(1024..10 * 1024);
2049        // Sample random fake solution bytes.
2050        let mut vec = vec![0u8; size];
2051        rng.fill_bytes(&mut vec);
2052        let solution = Data::Buffer(Bytes::from(vec));
2053        // Return the solution ID and solution.
2054        (solution_id, solution)
2055    }
2056
2057    // Samples a test transaction.
2058    fn sample_unconfirmed_transaction(
2059        rng: &mut TestRng,
2060    ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
2061        let transaction = sample_execution_transaction_with_fee(false, rng, 0);
2062        let id = transaction.id();
2063
2064        (id, Data::Object(transaction))
2065    }
2066
2067    // Creates a batch proposal with one solution and one transaction.
2068    fn create_test_proposal(
2069        author: &Account<CurrentNetwork>,
2070        committee: Committee<CurrentNetwork>,
2071        round: u64,
2072        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2073        timestamp: i64,
2074        num_transactions: u64,
2075        rng: &mut TestRng,
2076    ) -> Proposal<CurrentNetwork> {
2077        let mut transmission_ids = IndexSet::new();
2078        let mut transmissions = IndexMap::new();
2079
2080        // Prepare the solution and insert into the sets.
2081        let (solution_id, solution) = sample_unconfirmed_solution(rng);
2082        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2083        let solution_transmission_id = (solution_id, solution_checksum).into();
2084        transmission_ids.insert(solution_transmission_id);
2085        transmissions.insert(solution_transmission_id, Transmission::Solution(solution));
2086
2087        // Prepare the transactions and insert into the sets.
2088        for _ in 0..num_transactions {
2089            let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2090            let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2091            let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2092            transmission_ids.insert(transaction_transmission_id);
2093            transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction));
2094        }
2095
2096        // Retrieve the private key.
2097        let private_key = author.private_key();
2098        // Sign the batch header.
2099        let batch_header = BatchHeader::new(
2100            private_key,
2101            round,
2102            timestamp,
2103            committee.id(),
2104            transmission_ids,
2105            previous_certificate_ids,
2106            rng,
2107        )
2108        .unwrap();
2109        // Construct the proposal.
2110        Proposal::new(committee, batch_header, transmissions).unwrap()
2111    }
2112
2113    // Creates a signature of the primary's current proposal for each committee member (excluding
2114    // the primary).
2115    fn peer_signatures_for_proposal(
2116        primary: &Primary<CurrentNetwork>,
2117        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2118        rng: &mut TestRng,
2119    ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
2120        // Each committee member signs the batch.
2121        let mut signatures = Vec::with_capacity(accounts.len() - 1);
2122        for (socket_addr, account) in accounts {
2123            if account.address() == primary.gateway.account().address() {
2124                continue;
2125            }
2126            let batch_id = primary.proposed_batch.read().as_ref().unwrap().batch_id();
2127            let signature = account.sign(&[batch_id], rng).unwrap();
2128            signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
2129        }
2130
2131        signatures
2132    }
2133
2134    /// Creates a signature of the batch ID for each committee member (excluding the primary).
2135    fn peer_signatures_for_batch(
2136        primary_address: Address<CurrentNetwork>,
2137        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2138        batch_id: Field<CurrentNetwork>,
2139        rng: &mut TestRng,
2140    ) -> IndexSet<Signature<CurrentNetwork>> {
2141        let mut signatures = IndexSet::new();
2142        for (_, account) in accounts {
2143            if account.address() == primary_address {
2144                continue;
2145            }
2146            let signature = account.sign(&[batch_id], rng).unwrap();
2147            signatures.insert(signature);
2148        }
2149        signatures
2150    }
2151
2152    // Creates a batch certificate.
2153    fn create_batch_certificate(
2154        primary_address: Address<CurrentNetwork>,
2155        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2156        round: u64,
2157        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2158        rng: &mut TestRng,
2159    ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
2160        let timestamp = now();
2161
2162        let author =
2163            accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
2164        let private_key = author.private_key();
2165
2166        let committee_id = Field::rand(rng);
2167        let (solution_id, solution) = sample_unconfirmed_solution(rng);
2168        let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2169        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2170        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2171
2172        let solution_transmission_id = (solution_id, solution_checksum).into();
2173        let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2174
2175        let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
2176        let transmissions = [
2177            (solution_transmission_id, Transmission::Solution(solution)),
2178            (transaction_transmission_id, Transmission::Transaction(transaction)),
2179        ]
2180        .into();
2181
2182        let batch_header = BatchHeader::new(
2183            private_key,
2184            round,
2185            timestamp,
2186            committee_id,
2187            transmission_ids,
2188            previous_certificate_ids,
2189            rng,
2190        )
2191        .unwrap();
2192        let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
2193        let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
2194        (certificate, transmissions)
2195    }
2196
2197    // Create a certificate chain up to, but not including, the specified round in the primary storage.
2198    fn store_certificate_chain(
2199        primary: &Primary<CurrentNetwork>,
2200        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2201        round: u64,
2202        rng: &mut TestRng,
2203    ) -> IndexSet<Field<CurrentNetwork>> {
2204        let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2205        let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2206        for cur_round in 1..round {
2207            for (_, account) in accounts.iter() {
2208                let (certificate, transmissions) = create_batch_certificate(
2209                    account.address(),
2210                    accounts,
2211                    cur_round,
2212                    previous_certificates.clone(),
2213                    rng,
2214                );
2215                next_certificates.insert(certificate.id());
2216                assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
2217            }
2218
2219            assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
2220            previous_certificates = next_certificates;
2221            next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2222        }
2223
2224        previous_certificates
2225    }
2226
2227    // Insert the account socket addresses into the resolver so that
2228    // they are recognized as "connected".
2229    fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
2230        // First account is primary, which doesn't need to resolve.
2231        for (addr, acct) in accounts.iter().skip(1) {
2232            primary.gateway.resolver().insert_peer(*addr, *addr, acct.address());
2233        }
2234    }
2235
2236    #[tokio::test]
2237    async fn test_propose_batch() {
2238        let mut rng = TestRng::default();
2239        let (primary, _) = primary_without_handlers(&mut rng);
2240
2241        // Check there is no batch currently proposed.
2242        assert!(primary.proposed_batch.read().is_none());
2243
2244        // Generate a solution and a transaction.
2245        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2246        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2247
2248        // Store it on one of the workers.
2249        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2250        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2251
2252        // Try to propose a batch again. This time, it should succeed.
2253        assert!(primary.propose_batch().await.is_ok());
2254        assert!(primary.proposed_batch.read().is_some());
2255    }
2256
2257    #[tokio::test]
2258    async fn test_propose_batch_with_no_transmissions() {
2259        let mut rng = TestRng::default();
2260        let (primary, _) = primary_without_handlers(&mut rng);
2261
2262        // Check there is no batch currently proposed.
2263        assert!(primary.proposed_batch.read().is_none());
2264
2265        // Try to propose a batch with no transmissions.
2266        assert!(primary.propose_batch().await.is_ok());
2267        assert!(primary.proposed_batch.read().is_some());
2268    }
2269
2270    #[tokio::test]
2271    async fn test_propose_batch_in_round() {
2272        let round = 3;
2273        let mut rng = TestRng::default();
2274        let (primary, accounts) = primary_without_handlers(&mut rng);
2275
2276        // Fill primary storage.
2277        store_certificate_chain(&primary, &accounts, round, &mut rng);
2278
2279        // Sleep for a while to ensure the primary is ready to propose the next round.
2280        tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2281
2282        // Generate a solution and a transaction.
2283        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2284        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2285
2286        // Store it on one of the workers.
2287        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2288        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2289
2290        // Propose a batch again. This time, it should succeed.
2291        assert!(primary.propose_batch().await.is_ok());
2292        assert!(primary.proposed_batch.read().is_some());
2293    }
2294
2295    #[tokio::test]
2296    async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2297        let round = 3;
2298        let prev_round = round - 1;
2299        let mut rng = TestRng::default();
2300        let (primary, accounts) = primary_without_handlers(&mut rng);
2301        let peer_account = &accounts[1];
2302        let peer_ip = peer_account.0;
2303
2304        // Fill primary storage.
2305        store_certificate_chain(&primary, &accounts, round, &mut rng);
2306
2307        // Get transmissions from previous certificates.
2308        let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2309
2310        // Track the number of transmissions in the previous round.
2311        let mut num_transmissions_in_previous_round = 0;
2312
2313        // Generate a solution and a transaction.
2314        let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2315        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2316        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2317        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2318
2319        // Store it on one of the workers.
2320        primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2321        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2322
2323        // Check that the worker has 2 transmissions.
2324        assert_eq!(primary.workers[0].num_transmissions(), 2);
2325
2326        // Create certificates for the current round and add the transmissions to the worker before inserting the certificate to storage.
2327        for (_, account) in accounts.iter() {
2328            let (certificate, transmissions) = create_batch_certificate(
2329                account.address(),
2330                &accounts,
2331                round,
2332                previous_certificate_ids.clone(),
2333                &mut rng,
2334            );
2335
2336            // Add the transmissions to the worker.
2337            for (transmission_id, transmission) in transmissions.iter() {
2338                primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2339            }
2340
2341            // Insert the certificate to storage.
2342            num_transmissions_in_previous_round += transmissions.len();
2343            primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2344        }
2345
2346        // Sleep for a while to ensure the primary is ready to propose the next round.
2347        tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2348
2349        // Advance to the next round.
2350        assert!(primary.storage.increment_to_next_round(round).is_ok());
2351
2352        // Check that the worker has `num_transmissions_in_previous_round + 2` transmissions.
2353        assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2354
2355        // Propose the batch.
2356        assert!(primary.propose_batch().await.is_ok());
2357
2358        // Check that the proposal only contains the new transmissions that were not in previous certificates.
2359        let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone();
2360        assert_eq!(proposed_transmissions.len(), 2);
2361        assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2362        assert!(
2363            proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))
2364        );
2365    }
2366
2367    #[tokio::test]
2368    async fn test_propose_batch_over_spend_limit() {
2369        let mut rng = TestRng::default();
2370
2371        // Create a primary to test spend limit backwards compatibility with V4.
2372        let (accounts, committee) = sample_committee(&mut rng);
2373        let primary = primary_with_committee(
2374            0,
2375            &accounts,
2376            committee.clone(),
2377            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2378        );
2379
2380        // Check there is no batch currently proposed.
2381        assert!(primary.proposed_batch.read().is_none());
2382        // Check the workers are empty.
2383        primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty()));
2384
2385        // Generate a solution and transactions.
2386        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2387        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2388
2389        for _i in 0..5 {
2390            let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2391            // Store it on one of the workers.
2392            primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2393        }
2394
2395        // Try to propose a batch again. This time, it should succeed.
2396        assert!(primary.propose_batch().await.is_ok());
2397        // Expect 2/5 transactions to be included in the proposal in addition to the solution.
2398        assert_eq!(primary.proposed_batch.read().as_ref().unwrap().transmissions().len(), 3);
2399        // Check the transmissions were correctly drained from the workers.
2400        assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3);
2401    }
2402
2403    #[tokio::test]
2404    async fn test_batch_propose_from_peer() {
2405        let mut rng = TestRng::default();
2406        let (primary, accounts) = primary_without_handlers(&mut rng);
2407
2408        // Create a valid proposal with an author that isn't the primary.
2409        let round = 1;
2410        let peer_account = &accounts[1];
2411        let peer_ip = peer_account.0;
2412        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2413        let proposal = create_test_proposal(
2414            &peer_account.1,
2415            primary.ledger.current_committee().unwrap(),
2416            round,
2417            Default::default(),
2418            timestamp,
2419            1,
2420            &mut rng,
2421        );
2422
2423        // Make sure the primary is aware of the transmissions in the proposal.
2424        for (transmission_id, transmission) in proposal.transmissions() {
2425            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2426        }
2427
2428        // The author must be known to resolver to pass propose checks.
2429        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2430
2431        // The primary will only consider itself synced if we received
2432        // block locators from a peer.
2433        primary.sync.test_update_peer_locators(peer_ip, sample_block_locators(0)).unwrap();
2434        primary.sync.try_block_sync().await;
2435
2436        // Try to process the batch proposal from the peer, should succeed.
2437        assert!(
2438            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
2439        );
2440    }
2441
2442    #[tokio::test]
2443    async fn test_batch_propose_from_peer_when_not_synced() {
2444        let mut rng = TestRng::default();
2445        let (primary, accounts) = primary_without_handlers(&mut rng);
2446
2447        // Create a valid proposal with an author that isn't the primary.
2448        let round = 1;
2449        let peer_account = &accounts[1];
2450        let peer_ip = peer_account.0;
2451        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2452        let proposal = create_test_proposal(
2453            &peer_account.1,
2454            primary.ledger.current_committee().unwrap(),
2455            round,
2456            Default::default(),
2457            timestamp,
2458            1,
2459            &mut rng,
2460        );
2461
2462        // Make sure the primary is aware of the transmissions in the proposal.
2463        for (transmission_id, transmission) in proposal.transmissions() {
2464            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2465        }
2466
2467        // The author must be known to resolver to pass propose checks.
2468        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2469
2470        // Add a high block locator to indicate we are not synced.
2471        primary.sync.test_update_peer_locators(peer_ip, sample_block_locators(20)).unwrap();
2472
2473        // Try to process the batch proposal from the peer, should fail
2474        assert!(
2475            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2476        );
2477    }
2478
2479    #[tokio::test]
2480    async fn test_batch_propose_from_peer_in_round() {
2481        let round = 2;
2482        let mut rng = TestRng::default();
2483        let (primary, accounts) = primary_without_handlers(&mut rng);
2484
2485        // Generate certificates.
2486        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2487
2488        // Create a valid proposal with an author that isn't the primary.
2489        let peer_account = &accounts[1];
2490        let peer_ip = peer_account.0;
2491        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2492        let proposal = create_test_proposal(
2493            &peer_account.1,
2494            primary.ledger.current_committee().unwrap(),
2495            round,
2496            previous_certificates,
2497            timestamp,
2498            1,
2499            &mut rng,
2500        );
2501
2502        // Make sure the primary is aware of the transmissions in the proposal.
2503        for (transmission_id, transmission) in proposal.transmissions() {
2504            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2505        }
2506
2507        // The author must be known to resolver to pass propose checks.
2508        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2509
2510        // The primary will only consider itself synced if we received
2511        // block locators from a peer.
2512        primary.sync.test_update_peer_locators(peer_ip, sample_block_locators(0)).unwrap();
2513        primary.sync.try_block_sync().await;
2514
2515        // Try to process the batch proposal from the peer, should succeed.
2516        primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2517    }
2518
2519    #[tokio::test]
2520    async fn test_batch_propose_from_peer_wrong_round() {
2521        let mut rng = TestRng::default();
2522        let (primary, accounts) = primary_without_handlers(&mut rng);
2523
2524        // Create a valid proposal with an author that isn't the primary.
2525        let round = 1;
2526        let peer_account = &accounts[1];
2527        let peer_ip = peer_account.0;
2528        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2529        let proposal = create_test_proposal(
2530            &peer_account.1,
2531            primary.ledger.current_committee().unwrap(),
2532            round,
2533            Default::default(),
2534            timestamp,
2535            1,
2536            &mut rng,
2537        );
2538
2539        // Make sure the primary is aware of the transmissions in the proposal.
2540        for (transmission_id, transmission) in proposal.transmissions() {
2541            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2542        }
2543
2544        // The author must be known to resolver to pass propose checks.
2545        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2546        // The primary must be considered synced.
2547        primary.sync.try_block_sync().await;
2548
2549        // Try to process the batch proposal from the peer, should error.
2550        assert!(
2551            primary
2552                .process_batch_propose_from_peer(peer_ip, BatchPropose {
2553                    round: round + 1,
2554                    batch_header: Data::Object(proposal.batch_header().clone())
2555                })
2556                .await
2557                .is_err()
2558        );
2559    }
2560
2561    #[tokio::test]
2562    async fn test_batch_propose_from_peer_in_round_wrong_round() {
2563        let round = 4;
2564        let mut rng = TestRng::default();
2565        let (primary, accounts) = primary_without_handlers(&mut rng);
2566
2567        // Generate certificates.
2568        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2569
2570        // Create a valid proposal with an author that isn't the primary.
2571        let peer_account = &accounts[1];
2572        let peer_ip = peer_account.0;
2573        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2574        let proposal = create_test_proposal(
2575            &peer_account.1,
2576            primary.ledger.current_committee().unwrap(),
2577            round,
2578            previous_certificates,
2579            timestamp,
2580            1,
2581            &mut rng,
2582        );
2583
2584        // Make sure the primary is aware of the transmissions in the proposal.
2585        for (transmission_id, transmission) in proposal.transmissions() {
2586            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2587        }
2588
2589        // The author must be known to resolver to pass propose checks.
2590        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2591        // The primary must be considered synced.
2592        primary.sync.try_block_sync().await;
2593
2594        // Try to process the batch proposal from the peer, should error.
2595        assert!(
2596            primary
2597                .process_batch_propose_from_peer(peer_ip, BatchPropose {
2598                    round: round + 1,
2599                    batch_header: Data::Object(proposal.batch_header().clone())
2600                })
2601                .await
2602                .is_err()
2603        );
2604    }
2605
2606    /// Tests that the minimum batch delay is enforced as expected, i.e., that proposals with timestamps that are too close to the previous proposal are rejected.
2607    #[tokio::test]
2608    async fn test_batch_propose_from_peer_with_past_timestamp() {
2609        let round = 2;
2610        let mut rng = TestRng::default();
2611        let (primary, accounts) = primary_without_handlers(&mut rng);
2612
2613        // Generate certificates.
2614        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2615
2616        // Create a valid proposal with an author that isn't the primary.
2617        let peer_account = &accounts[1];
2618        let peer_ip = peer_account.0;
2619
2620        // Use a timestamp that is too early.
2621        // Set it to something that is less than the minimum batch delay
2622        // Note, that the minimum delay is currently 1, so this will be equal to the last timestamp
2623        let last_timestamp = primary
2624            .storage
2625            .get_certificate_for_round_with_author(round - 1, peer_account.1.address())
2626            .expect("No previous proposal exists")
2627            .timestamp();
2628        let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY_IN_SECS as i64) - 1;
2629
2630        let proposal = create_test_proposal(
2631            &peer_account.1,
2632            primary.ledger.current_committee().unwrap(),
2633            round,
2634            previous_certificates,
2635            invalid_timestamp,
2636            1,
2637            &mut rng,
2638        );
2639
2640        // Make sure the primary is aware of the transmissions in the proposal.
2641        for (transmission_id, transmission) in proposal.transmissions() {
2642            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2643        }
2644
2645        // The author must be known to resolver to pass propose checks.
2646        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2647        // The primary must be considered synced.
2648        primary.sync.try_block_sync().await;
2649
2650        // Try to process the batch proposal from the peer, should error.
2651        assert!(
2652            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2653        );
2654    }
2655
2656    /// Check that proposals rejected that have timestamps older than the previous proposal.
2657    #[tokio::test]
2658    async fn test_batch_propose_from_peer_over_spend_limit() {
2659        let mut rng = TestRng::default();
2660
2661        // Create two primaries to test spend limit activation on V5.
2662        let (accounts, committee) = sample_committee(&mut rng);
2663        let primary_v4 = primary_with_committee(
2664            0,
2665            &accounts,
2666            committee.clone(),
2667            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2668        );
2669        let primary_v5 = primary_with_committee(
2670            1,
2671            &accounts,
2672            committee.clone(),
2673            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V5).unwrap(),
2674        );
2675
2676        // Create a valid proposal with an author that isn't the primary.
2677        let round = 1;
2678        let peer_account = &accounts[2];
2679        let peer_ip = peer_account.0;
2680
2681        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2682
2683        let proposal =
2684            create_test_proposal(&peer_account.1, committee, round, Default::default(), timestamp, 4, &mut rng);
2685
2686        // Make sure the primary is aware of the transmissions in the proposal.
2687        for (transmission_id, transmission) in proposal.transmissions() {
2688            primary_v4.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2689            primary_v5.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2690        }
2691
2692        // The author must be known to resolver to pass propose checks.
2693        primary_v4.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2694        primary_v5.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2695
2696        // primary v4 must be considered synced.
2697        primary_v4.sync.test_update_peer_locators(peer_ip, sample_block_locators(0)).unwrap();
2698        primary_v4.sync.try_block_sync().await;
2699
2700        // primary v5 must be ocnsidered synced.
2701        primary_v5.sync.test_update_peer_locators(peer_ip, sample_block_locators(0)).unwrap();
2702        primary_v5.sync.try_block_sync().await;
2703
2704        // Check the spend limit is enforced from V5 onwards.
2705        assert!(
2706            primary_v4
2707                .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2708                .await
2709                .is_ok()
2710        );
2711
2712        assert!(
2713            primary_v5
2714                .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2715                .await
2716                .is_err()
2717        );
2718    }
2719
2720    #[tokio::test]
2721    async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2722        let round = 3;
2723        let mut rng = TestRng::default();
2724        let (primary, _) = primary_without_handlers(&mut rng);
2725
2726        // Check there is no batch currently proposed.
2727        assert!(primary.proposed_batch.read().is_none());
2728
2729        // Generate a solution and a transaction.
2730        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2731        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2732
2733        // Store it on one of the workers.
2734        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2735        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2736
2737        // Set the proposal lock to a round ahead of the storage.
2738        let old_proposal_lock_round = *primary.propose_lock.lock().await;
2739        *primary.propose_lock.lock().await = round + 1;
2740
2741        // Propose a batch and enforce that it fails.
2742        assert!(primary.propose_batch().await.is_ok());
2743        assert!(primary.proposed_batch.read().is_none());
2744
2745        // Set the proposal lock back to the old round.
2746        *primary.propose_lock.lock().await = old_proposal_lock_round;
2747
2748        // Try to propose a batch again. This time, it should succeed.
2749        assert!(primary.propose_batch().await.is_ok());
2750        assert!(primary.proposed_batch.read().is_some());
2751    }
2752
2753    #[tokio::test]
2754    async fn test_propose_batch_with_storage_round_behind_proposal() {
2755        let round = 5;
2756        let mut rng = TestRng::default();
2757        let (primary, accounts) = primary_without_handlers(&mut rng);
2758
2759        // Generate previous certificates.
2760        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2761
2762        // Create a valid proposal.
2763        let timestamp = now();
2764        let proposal = create_test_proposal(
2765            primary.gateway.account(),
2766            primary.ledger.current_committee().unwrap(),
2767            round + 1,
2768            previous_certificates,
2769            timestamp,
2770            1,
2771            &mut rng,
2772        );
2773
2774        // Store the proposal on the primary.
2775        *primary.proposed_batch.write() = Some(proposal);
2776
2777        // Try to propose a batch will terminate early because the storage is behind the proposal.
2778        assert!(primary.propose_batch().await.is_ok());
2779        assert!(primary.proposed_batch.read().is_some());
2780        assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
2781    }
2782
2783    #[tokio::test(flavor = "multi_thread")]
2784    async fn test_batch_signature_from_peer() {
2785        let mut rng = TestRng::default();
2786        let (primary, accounts) = primary_without_handlers(&mut rng);
2787        map_account_addresses(&primary, &accounts);
2788
2789        // Create a valid proposal.
2790        let round = 1;
2791        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2792        let proposal = create_test_proposal(
2793            primary.gateway.account(),
2794            primary.ledger.current_committee().unwrap(),
2795            round,
2796            Default::default(),
2797            timestamp,
2798            1,
2799            &mut rng,
2800        );
2801
2802        // Store the proposal on the primary.
2803        *primary.proposed_batch.write() = Some(proposal);
2804
2805        // Each committee member signs the batch.
2806        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2807
2808        // Have the primary process the signatures.
2809        for (socket_addr, signature) in signatures {
2810            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2811        }
2812
2813        // Check the certificate was created and stored by the primary.
2814        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2815        // Check the round was incremented.
2816        assert_eq!(primary.current_round(), round + 1);
2817    }
2818
2819    #[tokio::test(flavor = "multi_thread")]
2820    async fn test_batch_signature_from_peer_in_round() {
2821        let round = 5;
2822        let mut rng = TestRng::default();
2823        let (primary, accounts) = primary_without_handlers(&mut rng);
2824        map_account_addresses(&primary, &accounts);
2825
2826        // Generate certificates.
2827        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2828
2829        // Create a valid proposal.
2830        let timestamp = now();
2831        let proposal = create_test_proposal(
2832            primary.gateway.account(),
2833            primary.ledger.current_committee().unwrap(),
2834            round,
2835            previous_certificates,
2836            timestamp,
2837            1,
2838            &mut rng,
2839        );
2840
2841        // Store the proposal on the primary.
2842        *primary.proposed_batch.write() = Some(proposal);
2843
2844        // Each committee member signs the batch.
2845        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2846
2847        // Have the primary process the signatures.
2848        for (socket_addr, signature) in signatures {
2849            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2850        }
2851
2852        // Check the certificate was created and stored by the primary.
2853        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2854        // Check the round was incremented.
2855        assert_eq!(primary.current_round(), round + 1);
2856    }
2857
2858    #[tokio::test]
2859    async fn test_batch_signature_from_peer_no_quorum() {
2860        let mut rng = TestRng::default();
2861        let (primary, accounts) = primary_without_handlers(&mut rng);
2862        map_account_addresses(&primary, &accounts);
2863
2864        // Create a valid proposal.
2865        let round = 1;
2866        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2867        let proposal = create_test_proposal(
2868            primary.gateway.account(),
2869            primary.ledger.current_committee().unwrap(),
2870            round,
2871            Default::default(),
2872            timestamp,
2873            1,
2874            &mut rng,
2875        );
2876
2877        // Store the proposal on the primary.
2878        *primary.proposed_batch.write() = Some(proposal);
2879
2880        // Each committee member signs the batch.
2881        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2882
2883        // Have the primary process only one signature, mimicking a lack of quorum.
2884        let (socket_addr, signature) = signatures.first().unwrap();
2885        primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2886
2887        // Check the certificate was not created and stored by the primary.
2888        assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2889        // Check the round was incremented.
2890        assert_eq!(primary.current_round(), round);
2891    }
2892
2893    #[tokio::test]
2894    async fn test_batch_signature_from_peer_in_round_no_quorum() {
2895        let round = 7;
2896        let mut rng = TestRng::default();
2897        let (primary, accounts) = primary_without_handlers(&mut rng);
2898        map_account_addresses(&primary, &accounts);
2899
2900        // Generate certificates.
2901        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2902
2903        // Create a valid proposal.
2904        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2905        let proposal = create_test_proposal(
2906            primary.gateway.account(),
2907            primary.ledger.current_committee().unwrap(),
2908            round,
2909            previous_certificates,
2910            timestamp,
2911            1,
2912            &mut rng,
2913        );
2914
2915        // Store the proposal on the primary.
2916        *primary.proposed_batch.write() = Some(proposal);
2917
2918        // Each committee member signs the batch.
2919        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2920
2921        // Have the primary process only one signature, mimicking a lack of quorum.
2922        let (socket_addr, signature) = signatures.first().unwrap();
2923        primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2924
2925        // Check the certificate was not created and stored by the primary.
2926        assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2927        // Check the round was incremented.
2928        assert_eq!(primary.current_round(), round);
2929    }
2930
2931    #[tokio::test]
2932    async fn test_insert_certificate_with_aborted_transmissions() {
2933        let round = 3;
2934        let prev_round = round - 1;
2935        let mut rng = TestRng::default();
2936        let (primary, accounts) = primary_without_handlers(&mut rng);
2937        let peer_account = &accounts[1];
2938        let peer_ip = peer_account.0;
2939
2940        // Fill primary storage.
2941        store_certificate_chain(&primary, &accounts, round, &mut rng);
2942
2943        // Get transmissions from previous certificates.
2944        let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2945
2946        // Generate a solution and a transaction.
2947        let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2948        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2949
2950        // Store it on one of the workers.
2951        primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2952        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2953
2954        // Check that the worker has 2 transmissions.
2955        assert_eq!(primary.workers[0].num_transmissions(), 2);
2956
2957        // Create certificates for the current round.
2958        let account = accounts[0].1.clone();
2959        let (certificate, transmissions) =
2960            create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
2961        let certificate_id = certificate.id();
2962
2963        // Randomly abort some of the transmissions.
2964        let mut aborted_transmissions = HashSet::new();
2965        let mut transmissions_without_aborted = HashMap::new();
2966        for (transmission_id, transmission) in transmissions.clone() {
2967            match rng.r#gen::<bool>() || aborted_transmissions.is_empty() {
2968                true => {
2969                    // Insert the aborted transmission.
2970                    aborted_transmissions.insert(transmission_id);
2971                }
2972                false => {
2973                    // Insert the transmission without the aborted transmission.
2974                    transmissions_without_aborted.insert(transmission_id, transmission);
2975                }
2976            };
2977        }
2978
2979        // Add the non-aborted transmissions to the worker.
2980        for (transmission_id, transmission) in transmissions_without_aborted.iter() {
2981            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2982        }
2983
2984        // Check that inserting the transmission with missing transmissions fails.
2985        assert!(
2986            primary
2987                .storage
2988                .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
2989                .is_err()
2990        );
2991        assert!(
2992            primary
2993                .storage
2994                .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
2995                .is_err()
2996        );
2997
2998        // Insert the certificate to storage.
2999        primary
3000            .storage
3001            .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
3002            .unwrap();
3003
3004        // Ensure the certificate exists in storage.
3005        assert!(primary.storage.contains_certificate(certificate_id));
3006        // Ensure that the aborted transmission IDs exist in storage.
3007        for aborted_transmission_id in aborted_transmissions {
3008            assert!(primary.storage.contains_transmission(aborted_transmission_id));
3009            assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
3010        }
3011    }
3012}