snarkos_node_bft/
primary.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Gateway,
18    MAX_BATCH_DELAY_IN_MS,
19    MAX_WORKERS,
20    MIN_BATCH_DELAY_IN_SECS,
21    PRIMARY_PING_IN_MS,
22    Sync,
23    Transport,
24    WORKER_PING_IN_MS,
25    Worker,
26    events::{BatchPropose, BatchSignature, Event},
27    helpers::{
28        BFTSender,
29        PrimaryReceiver,
30        PrimarySender,
31        Proposal,
32        ProposalCache,
33        SignedProposals,
34        Storage,
35        assign_to_worker,
36        assign_to_workers,
37        fmt_id,
38        init_sync_channels,
39        init_worker_channels,
40        now,
41    },
42    spawn_blocking,
43};
44use snarkos_account::Account;
45use snarkos_node_bft_events::PrimaryPing;
46use snarkos_node_bft_ledger_service::LedgerService;
47use snarkos_node_network::PeerPoolHandling;
48use snarkos_node_sync::{BlockSync, DUMMY_SELF_IP, Ping};
49use snarkvm::{
50    console::{
51        prelude::*,
52        types::{Address, Field},
53    },
54    ledger::{
55        block::Transaction,
56        narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
57        puzzle::{Solution, SolutionID},
58    },
59    prelude::{ConsensusVersion, committee::Committee},
60};
61
62use aleo_std::StorageMode;
63use colored::Colorize;
64use futures::stream::{FuturesUnordered, StreamExt};
65use indexmap::{IndexMap, IndexSet};
66#[cfg(feature = "locktick")]
67use locktick::{
68    parking_lot::{Mutex, RwLock},
69    tokio::Mutex as TMutex,
70};
71#[cfg(not(feature = "locktick"))]
72use parking_lot::{Mutex, RwLock};
73#[cfg(not(feature = "serial"))]
74use rayon::prelude::*;
75use std::{
76    collections::{HashMap, HashSet},
77    future::Future,
78    net::SocketAddr,
79    sync::Arc,
80    time::Duration,
81};
82#[cfg(not(feature = "locktick"))]
83use tokio::sync::Mutex as TMutex;
84use tokio::{sync::OnceCell, task::JoinHandle};
85
86/// A helper type for an optional proposed batch.
87pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
88
89/// The primary logic of a node.
90/// AleoBFT adopts a primary-worker architecture as described in the Narwhal and Tusk paper (Section 4.2).
91#[derive(Clone)]
92pub struct Primary<N: Network> {
93    /// The sync module enables fetching data from other validators.
94    sync: Sync<N>,
95    /// The gateway allows talking to other nodes in the validator set.
96    gateway: Gateway<N>,
97    /// The storage.
98    storage: Storage<N>,
99    /// The ledger service.
100    ledger: Arc<dyn LedgerService<N>>,
101    /// The workers.
102    workers: Arc<[Worker<N>]>,
103    /// The BFT sender.
104    bft_sender: Arc<OnceCell<BFTSender<N>>>,
105    /// The batch proposal, if the primary is currently proposing a batch.
106    proposed_batch: Arc<ProposedBatch<N>>,
107    /// The timestamp of the most recent proposed batch.
108    latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
109    /// The recently-signed batch proposals.
110    signed_proposals: Arc<RwLock<SignedProposals<N>>>,
111    /// The handles for all background tasks spawned by this primary.
112    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
113    /// The lock for propose_batch.
114    propose_lock: Arc<TMutex<u64>>,
115    /// The storage mode of the node.
116    storage_mode: StorageMode,
117}
118
119impl<N: Network> Primary<N> {
120    /// The maximum number of unconfirmed transmissions to send to the primary.
121    pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
122
123    /// Initializes a new primary instance.
124    #[allow(clippy::too_many_arguments)]
125    pub fn new(
126        account: Account<N>,
127        storage: Storage<N>,
128        ledger: Arc<dyn LedgerService<N>>,
129        block_sync: Arc<BlockSync<N>>,
130        ip: Option<SocketAddr>,
131        trusted_validators: &[SocketAddr],
132        trusted_peers_only: bool,
133        storage_mode: StorageMode,
134        dev: Option<u16>,
135    ) -> Result<Self> {
136        // Initialize the gateway.
137        let gateway = Gateway::new(
138            account,
139            storage.clone(),
140            ledger.clone(),
141            ip,
142            trusted_validators,
143            trusted_peers_only,
144            storage_mode.clone(),
145            dev,
146        )?;
147        // Initialize the sync module.
148        let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone(), block_sync);
149
150        // Initialize the primary instance.
151        Ok(Self {
152            sync,
153            gateway,
154            storage,
155            ledger,
156            workers: Arc::from(vec![]),
157            bft_sender: Default::default(),
158            proposed_batch: Default::default(),
159            latest_proposed_batch_timestamp: Default::default(),
160            signed_proposals: Default::default(),
161            handles: Default::default(),
162            propose_lock: Default::default(),
163            storage_mode,
164        })
165    }
166
167    /// Load the proposal cache file and update the Primary state with the stored data.
168    async fn load_proposal_cache(&self) -> Result<()> {
169        // Fetch the signed proposals from the file system if it exists.
170        match ProposalCache::<N>::exists(&self.storage_mode) {
171            // If the proposal cache exists, then process the proposal cache.
172            true => match ProposalCache::<N>::load(self.gateway.account().address(), &self.storage_mode) {
173                Ok(proposal_cache) => {
174                    // Extract the proposal and signed proposals.
175                    let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
176                        proposal_cache.into();
177
178                    // Write the proposed batch.
179                    *self.proposed_batch.write() = proposed_batch;
180                    // Write the signed proposals.
181                    *self.signed_proposals.write() = signed_proposals;
182                    // Writ the propose lock.
183                    *self.propose_lock.lock().await = latest_certificate_round;
184
185                    // Update the storage with the pending certificates.
186                    for certificate in pending_certificates {
187                        let batch_id = certificate.batch_id();
188                        // We use a dummy IP because the node should not need to request from any peers.
189                        // The storage should have stored all the transmissions. If not, we simply
190                        // skip the certificate.
191                        if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
192                        {
193                            warn!("Failed to load stored certificate {} from proposal cache - {err}", fmt_id(batch_id));
194                        }
195                    }
196                    Ok(())
197                }
198                Err(err) => {
199                    bail!("Failed to read the signed proposals from the file system - {err}.");
200                }
201            },
202            // If the proposal cache does not exist, then return early.
203            false => Ok(()),
204        }
205    }
206
207    /// Run the primary instance.
208    pub async fn run(
209        &mut self,
210        ping: Option<Arc<Ping<N>>>,
211        bft_sender: Option<BFTSender<N>>,
212        primary_sender: PrimarySender<N>,
213        primary_receiver: PrimaryReceiver<N>,
214    ) -> Result<()> {
215        info!("Starting the primary instance of the memory pool...");
216
217        // Set the BFT sender.
218        if let Some(bft_sender) = &bft_sender {
219            // Set the BFT sender in the primary.
220            self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set");
221        }
222
223        // Construct a map of the worker senders.
224        let mut worker_senders = IndexMap::new();
225        // Construct a map for the workers.
226        let mut workers = Vec::new();
227        // Initialize the workers.
228        for id in 0..MAX_WORKERS {
229            // Construct the worker channels.
230            let (tx_worker, rx_worker) = init_worker_channels();
231            // Construct the worker instance.
232            let worker = Worker::new(
233                id,
234                Arc::new(self.gateway.clone()),
235                self.storage.clone(),
236                self.ledger.clone(),
237                self.proposed_batch.clone(),
238            )?;
239            // Run the worker instance.
240            worker.run(rx_worker);
241            // Add the worker to the list of workers.
242            workers.push(worker);
243            // Add the worker sender to the map.
244            worker_senders.insert(id, tx_worker);
245        }
246        // Set the workers.
247        self.workers = Arc::from(workers);
248
249        // First, initialize the sync channels.
250        let (sync_sender, sync_receiver) = init_sync_channels();
251        // Next, initialize the sync module and sync the storage from ledger.
252        self.sync.initialize(bft_sender).await?;
253        // Next, load and process the proposal cache before running the sync module.
254        self.load_proposal_cache().await?;
255        // Next, run the sync module.
256        self.sync.run(ping, sync_receiver).await?;
257        // Next, initialize the gateway.
258        self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
259        // Lastly, start the primary handlers.
260        // Note: This ensures the primary does not start communicating before syncing is complete.
261        self.start_handlers(primary_receiver);
262
263        Ok(())
264    }
265
266    /// Returns the current round.
267    pub fn current_round(&self) -> u64 {
268        self.storage.current_round()
269    }
270
271    /// Returns `true` if the primary is synced.
272    pub fn is_synced(&self) -> bool {
273        self.sync.is_synced()
274    }
275
276    /// Returns the gateway.
277    pub const fn gateway(&self) -> &Gateway<N> {
278        &self.gateway
279    }
280
281    /// Returns the storage.
282    pub const fn storage(&self) -> &Storage<N> {
283        &self.storage
284    }
285
286    /// Returns the ledger.
287    pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
288        &self.ledger
289    }
290
291    /// Returns the number of workers.
292    pub fn num_workers(&self) -> u8 {
293        u8::try_from(self.workers.len()).expect("Too many workers")
294    }
295
296    /// Returns the workers.
297    pub const fn workers(&self) -> &Arc<[Worker<N>]> {
298        &self.workers
299    }
300
301    /// Returns the batch proposal of our primary, if one currently exists.
302    pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
303        &self.proposed_batch
304    }
305}
306
307impl<N: Network> Primary<N> {
308    /// Returns the number of unconfirmed transmissions.
309    pub fn num_unconfirmed_transmissions(&self) -> usize {
310        self.workers.iter().map(|worker| worker.num_transmissions()).sum()
311    }
312
313    /// Returns the number of unconfirmed ratifications.
314    pub fn num_unconfirmed_ratifications(&self) -> usize {
315        self.workers.iter().map(|worker| worker.num_ratifications()).sum()
316    }
317
318    /// Returns the number of unconfirmed solutions.
319    pub fn num_unconfirmed_solutions(&self) -> usize {
320        self.workers.iter().map(|worker| worker.num_solutions()).sum()
321    }
322
323    /// Returns the number of unconfirmed transactions.
324    pub fn num_unconfirmed_transactions(&self) -> usize {
325        self.workers.iter().map(|worker| worker.num_transactions()).sum()
326    }
327}
328
329impl<N: Network> Primary<N> {
330    /// Returns the worker transmission IDs.
331    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
332        self.workers.iter().flat_map(|worker| worker.transmission_ids())
333    }
334
335    /// Returns the worker transmissions.
336    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
337        self.workers.iter().flat_map(|worker| worker.transmissions())
338    }
339
340    /// Returns the worker solutions.
341    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
342        self.workers.iter().flat_map(|worker| worker.solutions())
343    }
344
345    /// Returns the worker transactions.
346    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
347        self.workers.iter().flat_map(|worker| worker.transactions())
348    }
349}
350
351impl<N: Network> Primary<N> {
352    /// Clears the worker solutions.
353    pub fn clear_worker_solutions(&self) {
354        self.workers.iter().for_each(Worker::clear_solutions);
355    }
356}
357
358impl<N: Network> Primary<N> {
359    /// Proposes the batch for the current round.
360    ///
361    /// This method performs the following steps:
362    /// 1. Drain the workers.
363    /// 2. Sign the batch.
364    /// 3. Set the batch proposal in the primary.
365    /// 4. Broadcast the batch header to all validators for signing.
366    pub async fn propose_batch(&self) -> Result<()> {
367        // This function isn't re-entrant.
368        let mut lock_guard = self.propose_lock.lock().await;
369
370        // Check if the proposed batch has expired, and clear it if it has expired.
371        if let Err(e) = self.check_proposed_batch_for_expiration().await {
372            warn!("Failed to check the proposed batch for expiration - {e}");
373            return Ok(());
374        }
375
376        // Retrieve the current round.
377        let round = self.current_round();
378        // Compute the previous round.
379        let previous_round = round.saturating_sub(1);
380
381        // If the current round is 0, return early.
382        // This can actually never happen, because of the invariant that the current round is never 0
383        // (see [`StorageInner::current_round`]).
384        ensure!(round > 0, "Round 0 cannot have transaction batches");
385
386        // If the current storage round is below the latest proposal round, then return early.
387        if round < *lock_guard {
388            warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
389            return Ok(());
390        }
391
392        // If there is a batch being proposed already,
393        // rebroadcast the batch header to the non-signers, and return early.
394        if let Some(proposal) = self.proposed_batch.read().as_ref() {
395            // Ensure that the storage is caught up to the proposal before proceeding to rebroadcast this.
396            if round < proposal.round()
397                || proposal
398                    .batch_header()
399                    .previous_certificate_ids()
400                    .iter()
401                    .any(|id| !self.storage.contains_certificate(*id))
402            {
403                warn!(
404                    "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
405                    proposal.round(),
406                );
407                return Ok(());
408            }
409            // Construct the event.
410            // TODO(ljedrz): the BatchHeader should be serialized only once in advance before being sent to non-signers.
411            let event = Event::BatchPropose(proposal.batch_header().clone().into());
412            // Iterate through the non-signers.
413            for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
414                // Resolve the address to the peer IP.
415                match self.gateway.resolver().read().get_peer_ip_for_address(address) {
416                    // Resend the batch proposal to the validator for signing.
417                    Some(peer_ip) => {
418                        let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
419                        tokio::spawn(async move {
420                            debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
421                            // Resend the batch proposal to the peer.
422                            if gateway.send(peer_ip, event_).await.is_none() {
423                                warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
424                            }
425                        });
426                    }
427                    None => continue,
428                }
429            }
430            debug!("Proposed batch for round {} is still valid", proposal.round());
431            return Ok(());
432        }
433
434        #[cfg(feature = "metrics")]
435        metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
436
437        // Ensure that the primary does not create a new proposal too quickly.
438        if let Err(e) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) {
439            debug!("Primary is safely skipping a batch proposal for round {round} - {}", format!("{e}").dimmed());
440            return Ok(());
441        }
442
443        // Ensure the primary has not proposed a batch for this round before.
444        if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
445            // If a BFT sender was provided, attempt to advance the current round.
446            if let Some(bft_sender) = self.bft_sender.get() {
447                match bft_sender.send_primary_round_to_bft(self.current_round()).await {
448                    // 'is_ready' is true if the primary is ready to propose a batch for the next round.
449                    Ok(true) => (), // continue,
450                    // 'is_ready' is false if the primary is not ready to propose a batch for the next round.
451                    Ok(false) => return Ok(()),
452                    // An error occurred while attempting to advance the current round.
453                    Err(e) => {
454                        warn!("Failed to update the BFT to the next round - {e}");
455                        return Err(e);
456                    }
457                }
458            }
459            debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
460            return Ok(());
461        }
462
463        // Determine if the current round has been proposed.
464        // Note: Do NOT make this judgment in advance before rebroadcast and round update. Rebroadcasting is
465        // good for network reliability and should not be prevented for the already existing proposed_batch.
466        // If a certificate already exists for the current round, an attempt should be made to advance the
467        // round as early as possible.
468        if round == *lock_guard {
469            warn!("Primary is safely skipping a batch proposal - round {round} already proposed");
470            return Ok(());
471        }
472
473        // Retrieve the committee to check against.
474        let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
475        // Check if the primary is connected to enough validators to reach quorum threshold.
476        {
477            // Retrieve the connected validator addresses.
478            let mut connected_validators = self.gateway.connected_addresses();
479            // Append the primary to the set.
480            connected_validators.insert(self.gateway.account().address());
481            // If quorum threshold is not reached, return early.
482            if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
483                debug!(
484                    "Primary is safely skipping a batch proposal for round {round} {}",
485                    "(please connect to more validators)".dimmed()
486                );
487                trace!("Primary is connected to {} validators", connected_validators.len() - 1);
488                return Ok(());
489            }
490        }
491
492        // Retrieve the previous certificates.
493        let previous_certificates = self.storage.get_certificates_for_round(previous_round);
494
495        // Check if the batch is ready to be proposed.
496        // Note: The primary starts at round 1, and round 0 contains no certificates, by definition.
497        let mut is_ready = previous_round == 0;
498        // If the previous round is not 0, check if the previous certificates have reached the quorum threshold.
499        if previous_round > 0 {
500            // Retrieve the committee lookback for the round.
501            let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
502                bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
503            };
504            // Construct a set over the authors.
505            let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
506            // Check if the previous certificates have reached the quorum threshold.
507            if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
508                is_ready = true;
509            }
510        }
511        // If the batch is not ready to be proposed, return early.
512        if !is_ready {
513            debug!(
514                "Primary is safely skipping a batch proposal for round {round} {}",
515                format!("(previous round {previous_round} has not reached quorum)").dimmed()
516            );
517            return Ok(());
518        }
519
520        // Initialize the map of transmissions.
521        let mut transmissions: IndexMap<_, _> = Default::default();
522        // Track the total execution costs of the batch proposal as it is being constructed.
523        let mut proposal_cost = 0u64;
524        // Note: worker draining and transaction inclusion needs to be thought
525        // through carefully when there is more than one worker. The fairness
526        // provided by one worker (FIFO) is no longer guaranteed with multiple workers.
527        debug_assert_eq!(MAX_WORKERS, 1);
528
529        'outer: for worker in self.workers().iter() {
530            let mut num_worker_transmissions = 0usize;
531
532            while let Some((id, transmission)) = worker.remove_front() {
533                // Check the selected transmissions are below the batch limit.
534                if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
535                    // Reinsert the transmission into the worker.
536                    worker.insert_front(id, transmission);
537                    break 'outer;
538                }
539
540                // Check the max transmissions per worker is not exceeded.
541                if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER {
542                    // Reinsert the transmission into the worker.
543                    worker.insert_front(id, transmission);
544                    continue 'outer;
545                }
546
547                // Check if the ledger already contains the transmission.
548                if self.ledger.contains_transmission(&id).unwrap_or(true) {
549                    trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
550                    continue;
551                }
552
553                // Check if the storage already contain the transmission.
554                // Note: We do not skip if this is the first transmission in the proposal, to ensure that
555                // the primary does not propose a batch with no transmissions.
556                if !transmissions.is_empty() && self.storage.contains_transmission(id) {
557                    trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
558                    continue;
559                }
560
561                // Check the transmission is still valid.
562                match (id, transmission.clone()) {
563                    (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
564                        // Ensure the checksum matches. If not, skip the solution.
565                        if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum)
566                        {
567                            trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id));
568                            continue;
569                        }
570                        // Check if the solution is still valid.
571                        if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
572                            trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
573                            continue;
574                        }
575                    }
576                    (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
577                        // Ensure the checksum matches. If not, skip the transaction.
578                        if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum )
579                        {
580                            trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id));
581                            continue;
582                        }
583
584                        // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
585                        let transaction = spawn_blocking!({
586                            match transaction {
587                                Data::Object(transaction) => Ok(transaction),
588                                Data::Buffer(bytes) => {
589                                    Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
590                                }
591                            }
592                        })?;
593
594                        // TODO (raychu86): Record Commitment - Remove this logic after the next migration height is reached.
595                        // ConsensusVersion V8 Migration logic -
596                        // Do not include deployments in a batch proposal.
597                        let current_block_height = self.ledger.latest_block_height();
598                        let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?;
599                        let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?;
600                        let consensus_version = N::CONSENSUS_VERSION(current_block_height)?;
601                        if current_block_height > consensus_version_v7_height
602                            && current_block_height <= consensus_version_v8_height
603                            && transaction.is_deploy()
604                        {
605                            trace!(
606                                "Proposing - Skipping transaction '{}' - Deployment transactions are not allowed until Consensus V8 (block {consensus_version_v8_height})",
607                                fmt_id(transaction_id)
608                            );
609                            continue;
610                        }
611
612                        // Compute the transaction spent cost (in microcredits).
613                        // Note: We purposefully discard this transaction if we are unable to compute the spent cost.
614                        let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
615                        else {
616                            debug!(
617                                "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost",
618                                fmt_id(transaction_id)
619                            );
620                            continue;
621                        };
622
623                        // Check if the transaction is still valid.
624                        if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
625                            trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
626                            continue;
627                        }
628
629                        // Compute the next proposal cost.
630                        // Note: We purposefully discard this transaction if the proposal cost overflows.
631                        let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
632                            debug!(
633                                "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed",
634                                fmt_id(transaction_id)
635                            );
636                            continue;
637                        };
638
639                        // Check if the next proposal cost exceeds the batch proposal spend limit.
640                        let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(current_block_height);
641                        if next_proposal_cost > batch_spend_limit {
642                            debug!(
643                                "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})",
644                                fmt_id(transaction_id),
645                                batch_spend_limit
646                            );
647
648                            // Reinsert the transmission into the worker.
649                            worker.insert_front(id, transmission);
650                            break 'outer;
651                        }
652
653                        // Update the proposal cost.
654                        proposal_cost = next_proposal_cost;
655                    }
656
657                    // Note: We explicitly forbid including ratifications,
658                    // as the protocol currently does not support ratifications.
659                    (TransmissionID::Ratification, Transmission::Ratification) => continue,
660                    // All other combinations are clearly invalid.
661                    _ => continue,
662                }
663
664                // If the transmission is valid, insert it into the proposal's transmission list.
665                transmissions.insert(id, transmission);
666                num_worker_transmissions = num_worker_transmissions.saturating_add(1);
667            }
668        }
669
670        // Determine the current timestamp.
671        let current_timestamp = now();
672
673        *lock_guard = round;
674
675        /* Proceeding to sign & propose the batch. */
676        info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
677
678        // Retrieve the private key.
679        let private_key = *self.gateway.account().private_key();
680        // Retrieve the committee ID.
681        let committee_id = committee_lookback.id();
682        // Prepare the transmission IDs.
683        let transmission_ids = transmissions.keys().copied().collect();
684        // Prepare the previous batch certificate IDs.
685        let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
686        // Sign the batch header and construct the proposal.
687        let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
688            &private_key,
689            round,
690            current_timestamp,
691            committee_id,
692            transmission_ids,
693            previous_certificate_ids,
694            &mut rand::thread_rng()
695        ))
696        .and_then(|batch_header| {
697            Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
698                .map(|proposal| (batch_header, proposal))
699        })
700        .inspect_err(|_| {
701            // On error, reinsert the transmissions and then propagate the error.
702            if let Err(e) = self.reinsert_transmissions_into_workers(transmissions) {
703                error!("Failed to reinsert transmissions: {e:?}");
704            }
705        })?;
706        // Broadcast the batch to all validators for signing.
707        self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
708        // Set the timestamp of the latest proposed batch.
709        *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
710        // Set the proposed batch.
711        *self.proposed_batch.write() = Some(proposal);
712        Ok(())
713    }
714
715    /// Processes a batch propose from a peer.
716    ///
717    /// This method performs the following steps:
718    /// 1. Verify the batch.
719    /// 2. Sign the batch.
720    /// 3. Broadcast the signature back to the validator.
721    ///
722    /// If our primary is ahead of the peer, we will not sign the batch.
723    /// If our primary is behind the peer, but within GC range, we will sync up to the peer's round, and then sign the batch.
724    async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
725        let BatchPropose { round: batch_round, batch_header } = batch_propose;
726
727        // Deserialize the batch header.
728        let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
729        // Ensure the round matches in the batch header.
730        if batch_round != batch_header.round() {
731            // Proceed to disconnect the validator.
732            self.gateway.disconnect(peer_ip);
733            bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
734        }
735
736        // Retrieve the batch author.
737        let batch_author = batch_header.author();
738
739        // Ensure the batch proposal is from the validator.
740        match self.gateway.resolve_to_aleo_addr(peer_ip) {
741            // If the peer is a validator, then ensure the batch proposal is from the validator.
742            Some(address) => {
743                if address != batch_author {
744                    // Proceed to disconnect the validator.
745                    self.gateway.disconnect(peer_ip);
746                    bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
747                }
748            }
749            None => bail!("Batch proposal from a disconnected validator"),
750        }
751        // Ensure the batch author is a current committee member.
752        if !self.gateway.is_authorized_validator_address(batch_author) {
753            // Proceed to disconnect the validator.
754            self.gateway.disconnect(peer_ip);
755            bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
756        }
757        // Ensure the batch proposal is not from the current primary.
758        if self.gateway.account().address() == batch_author {
759            bail!("Invalid peer - proposed batch from myself ({batch_author})");
760        }
761
762        // Ensure that the batch proposal's committee ID matches the expected committee ID.
763        let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
764        if expected_committee_id != batch_header.committee_id() {
765            // Proceed to disconnect the validator.
766            self.gateway.disconnect(peer_ip);
767            bail!(
768                "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
769                batch_header.committee_id()
770            );
771        }
772
773        // Retrieve the cached round and batch ID for this validator.
774        if let Some((signed_round, signed_batch_id, signature)) =
775            self.signed_proposals.read().get(&batch_author).copied()
776        {
777            // If the signed round is ahead of the peer's batch round, do not sign the proposal.
778            // Note: while this may be valid behavior, additional formal analysis and testing will need to be done before allowing it.
779            if signed_round > batch_header.round() {
780                bail!(
781                    "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}",
782                    batch_header.round()
783                );
784            }
785
786            // If the round matches and the batch ID differs, then the validator is malicious.
787            if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
788                bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
789            }
790            // If the round and batch ID matches, then skip signing the batch a second time.
791            // Instead, rebroadcast the cached signature to the peer.
792            if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
793                let gateway = self.gateway.clone();
794                tokio::spawn(async move {
795                    debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
796                    let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
797                    // Resend the batch signature to the peer.
798                    if gateway.send(peer_ip, event).await.is_none() {
799                        warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
800                    }
801                });
802                // Return early.
803                return Ok(());
804            }
805        }
806
807        // Ensure that the batch header doesn't already exist in storage.
808        // Note this is already checked in `check_batch_header`, however we can return early here without creating a blocking task.
809        if self.storage.contains_batch(batch_header.batch_id()) {
810            debug!(
811                "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
812                format!("batch for round {batch_round} already exists in storage").dimmed()
813            );
814            return Ok(());
815        }
816
817        // Compute the previous round.
818        let previous_round = batch_round.saturating_sub(1);
819        // Ensure that the peer did not propose a batch too quickly.
820        if let Err(e) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
821            // Proceed to disconnect the validator.
822            self.gateway.disconnect(peer_ip);
823            bail!("Malicious peer - {e} from '{peer_ip}'");
824        }
825
826        // Ensure the batch header does not contain any ratifications.
827        if batch_header.contains(TransmissionID::Ratification) {
828            // Proceed to disconnect the validator.
829            self.gateway.disconnect(peer_ip);
830            bail!(
831                "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'",
832            );
833        }
834
835        // If the peer is ahead, use the batch header to sync up to the peer.
836        let mut missing_transmissions = self.sync_with_batch_header_from_peer::<false>(peer_ip, &batch_header).await?;
837
838        // Check that the transmission ids match and are not fee transactions.
839        if let Err(err) = cfg_iter_mut!(&mut missing_transmissions).try_for_each(|(transmission_id, transmission)| {
840            // If the transmission is not well-formed, then return early.
841            self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
842        }) {
843            debug!("Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission - {err}",);
844            return Ok(());
845        }
846
847        // Ensure the batch is for the current round.
848        // This method must be called after fetching previous certificates (above),
849        // and prior to checking the batch header (below).
850        if let Err(e) = self.ensure_is_signing_round(batch_round) {
851            // If the primary is not signing for the peer's round, then return early.
852            debug!("{e} from '{peer_ip}'");
853            return Ok(());
854        }
855
856        // Ensure the batch header from the peer is valid.
857        let (storage, header) = (self.storage.clone(), batch_header.clone());
858        let missing_transmissions =
859            spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?;
860        // Inserts the missing transmissions into the workers.
861        self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
862
863        // Ensure the transaction doesn't bring the proposal above the spend limit.
864        let block_height = self.ledger.latest_block_height() + 1;
865        if N::CONSENSUS_VERSION(block_height)? >= ConsensusVersion::V5 {
866            let mut proposal_cost = 0u64;
867            for transmission_id in batch_header.transmission_ids() {
868                let worker_id = assign_to_worker(*transmission_id, self.num_workers())?;
869                let Some(worker) = self.workers.get(worker_id as usize) else {
870                    debug!("Unable to find worker {worker_id}");
871                    return Ok(());
872                };
873
874                let Some(transmission) = worker.get_transmission(*transmission_id) else {
875                    debug!("Unable to find transmission '{}' in worker '{worker_id}", fmt_id(transmission_id));
876                    return Ok(());
877                };
878
879                // If the transmission is a transaction, compute its execution cost.
880                if let (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) =
881                    (transmission_id, transmission)
882                {
883                    // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
884                    let transaction = spawn_blocking!({
885                        match transaction {
886                            Data::Object(transaction) => Ok(transaction),
887                            Data::Buffer(bytes) => {
888                                Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
889                            }
890                        }
891                    })?;
892
893                    // TODO (raychu86): Record Commitment - Remove this logic after the next migration height is reached.
894                    // ConsensusVersion V8 Migration logic -
895                    // Do not include deployments in a batch proposal.
896                    let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?;
897                    let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?;
898                    let consensus_version = N::CONSENSUS_VERSION(block_height)?;
899                    if block_height > consensus_version_v7_height
900                        && block_height <= consensus_version_v8_height
901                        && transaction.is_deploy()
902                    {
903                        bail!(
904                            "Invalid batch proposal - Batch proposals are not allowed to include deployments until Consensus V8 (block {consensus_version_v8_height})",
905                        )
906                    }
907
908                    // Compute the transaction spent cost (in microcredits).
909                    // Note: We purposefully discard this transaction if we are unable to compute the spent cost.
910                    let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
911                    else {
912                        bail!(
913                            "Invalid batch proposal - Unable to compute transaction spent cost on transaction '{}'",
914                            fmt_id(transaction_id)
915                        )
916                    };
917
918                    // Compute the next proposal cost.
919                    // Note: We purposefully discard this transaction if the proposal cost overflows.
920                    let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
921                        bail!(
922                            "Invalid batch proposal - Batch proposal overflowed on transaction '{}'",
923                            fmt_id(transaction_id)
924                        )
925                    };
926
927                    // Check if the next proposal cost exceeds the batch proposal spend limit.
928                    let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(block_height);
929                    if next_proposal_cost > batch_spend_limit {
930                        bail!(
931                            "Malicious peer - Batch proposal from '{peer_ip}' exceeds the spend limit on transaction '{}' ({next_proposal_cost} > {})",
932                            fmt_id(transaction_id),
933                            batch_spend_limit
934                        );
935                    }
936
937                    // Update the proposal cost.
938                    proposal_cost = next_proposal_cost;
939                }
940            }
941        }
942
943        /* Proceeding to sign the batch. */
944
945        // Retrieve the batch ID.
946        let batch_id = batch_header.batch_id();
947        // Sign the batch ID.
948        let account = self.gateway.account().clone();
949        let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::thread_rng()))?;
950
951        // Ensure the proposal has not already been signed.
952        //
953        // Note: Due to the need to sync the batch header with the peer, it is possible
954        // for the primary to receive the same 'BatchPropose' event again, whereby only
955        // one instance of this handler should sign the batch. This check guarantees this.
956        match self.signed_proposals.write().0.entry(batch_author) {
957            std::collections::hash_map::Entry::Occupied(mut entry) => {
958                // If the validator has already signed a batch for this round, then return early,
959                // since, if the peer still has not received the signature, they will request it again,
960                // and the logic at the start of this function will resend the (now cached) signature
961                // to the peer if asked to sign this batch proposal again.
962                if entry.get().0 == batch_round {
963                    return Ok(());
964                }
965                // Otherwise, cache the round, batch ID, and signature for this validator.
966                entry.insert((batch_round, batch_id, signature));
967            }
968            // If the validator has not signed a batch before, then continue.
969            std::collections::hash_map::Entry::Vacant(entry) => {
970                // Cache the round, batch ID, and signature for this validator.
971                entry.insert((batch_round, batch_id, signature));
972            }
973        };
974
975        // Broadcast the signature back to the validator.
976        let self_ = self.clone();
977        tokio::spawn(async move {
978            let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
979            // Send the batch signature to the peer.
980            if self_.gateway.send(peer_ip, event).await.is_some() {
981                debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
982            }
983        });
984
985        Ok(())
986    }
987
988    /// Processes a batch signature from a peer.
989    ///
990    /// This method performs the following steps:
991    /// 1. Ensure the proposed batch has not expired.
992    /// 2. Verify the signature, ensuring it corresponds to the proposed batch.
993    /// 3. Store the signature.
994    /// 4. Certify the batch if enough signatures have been received.
995    /// 5. Broadcast the batch certificate to all validators.
996    async fn process_batch_signature_from_peer(
997        &self,
998        peer_ip: SocketAddr,
999        batch_signature: BatchSignature<N>,
1000    ) -> Result<()> {
1001        // Ensure the proposed batch has not expired, and clear the proposed batch if it has expired.
1002        self.check_proposed_batch_for_expiration().await?;
1003
1004        // Retrieve the signature and timestamp.
1005        let BatchSignature { batch_id, signature } = batch_signature;
1006
1007        // Retrieve the signer.
1008        let signer = signature.to_address();
1009
1010        // Ensure the batch signature is signed by the validator.
1011        if self.gateway.resolve_to_aleo_addr(peer_ip) != Some(signer) {
1012            // Proceed to disconnect the validator.
1013            self.gateway.disconnect(peer_ip);
1014            bail!("Malicious peer - batch signature is from a different validator ({signer})");
1015        }
1016        // Ensure the batch signature is not from the current primary.
1017        if self.gateway.account().address() == signer {
1018            bail!("Invalid peer - received a batch signature from myself ({signer})");
1019        }
1020
1021        let self_ = self.clone();
1022        let Some(proposal) = spawn_blocking!({
1023            // Acquire the write lock.
1024            let mut proposed_batch = self_.proposed_batch.write();
1025            // Add the signature to the batch, and determine if the batch is ready to be certified.
1026            match proposed_batch.as_mut() {
1027                Some(proposal) => {
1028                    // Ensure the batch ID matches the currently proposed batch ID.
1029                    if proposal.batch_id() != batch_id {
1030                        match self_.storage.contains_batch(batch_id) {
1031                            // If this batch was already certified, return early.
1032                            true => {
1033                                debug!(
1034                                    "Primary is safely skipping a a batch signature from {peer_ip} for round {} - batch is already certified",
1035                                    proposal.round()
1036                                );
1037                                return Ok(None);
1038                            }
1039                            // If the batch ID is unknown, return an error.
1040                            false => bail!(
1041                                "Unknown batch ID '{batch_id}', expected '{}' for round {}",
1042                                proposal.batch_id(),
1043                                proposal.round()
1044                            ),
1045                        }
1046                    }
1047                    // Retrieve the committee lookback for the round.
1048                    let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?;
1049                    // Retrieve the address of the validator.
1050                    let Some(signer) = self_.gateway.resolve_to_aleo_addr(peer_ip) else {
1051                        bail!("Signature is from a disconnected validator");
1052                    };
1053                    // Add the signature to the batch.
1054                    proposal.add_signature(signer, signature, &committee_lookback)?;
1055                    info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
1056                    // Check if the batch is ready to be certified.
1057                    if !proposal.is_quorum_threshold_reached(&committee_lookback) {
1058                        // If the batch is not ready to be certified, return early.
1059                        return Ok(None);
1060                    }
1061                }
1062                // There is no proposed batch, so return early.
1063                None => return Ok(None),
1064            };
1065            // Retrieve the batch proposal, clearing the proposed batch.
1066            match proposed_batch.take() {
1067                Some(proposal) => Ok(Some(proposal)),
1068                None => Ok(None),
1069            }
1070        })?
1071        else {
1072            return Ok(());
1073        };
1074
1075        /* Proceeding to certify the batch. */
1076
1077        info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
1078
1079        // Retrieve the committee lookback for the round.
1080        let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1081        // Store the certified batch and broadcast it to all validators.
1082        // If there was an error storing the certificate, reinsert the transmissions back into the ready queue.
1083        if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
1084            // Reinsert the transmissions back into the ready queue for the next proposal.
1085            self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1086            return Err(e);
1087        }
1088
1089        #[cfg(feature = "metrics")]
1090        metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
1091        Ok(())
1092    }
1093
1094    /// Processes a batch certificate from a peer.
1095    ///
1096    /// This method performs the following steps:
1097    /// 1. Stores the given batch certificate, after ensuring it is valid.
1098    /// 2. If there are enough certificates to reach quorum threshold for the current round,
1099    ///    then proceed to advance to the next round.
1100    async fn process_batch_certificate_from_peer(
1101        &self,
1102        peer_ip: SocketAddr,
1103        certificate: BatchCertificate<N>,
1104    ) -> Result<()> {
1105        // Ensure the batch certificate is from an authorized validator.
1106        if !self.gateway.is_authorized_validator_ip(peer_ip) {
1107            // Proceed to disconnect the validator.
1108            self.gateway.disconnect(peer_ip);
1109            bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
1110        }
1111        // Ensure storage does not already contain the certificate.
1112        if self.storage.contains_certificate(certificate.id()) {
1113            return Ok(());
1114        // Otherwise, ensure ephemeral storage contains the certificate.
1115        } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
1116            self.storage.insert_unprocessed_certificate(certificate.clone())?;
1117        }
1118
1119        // Retrieve the batch certificate author.
1120        let author = certificate.author();
1121        // Retrieve the batch certificate round.
1122        let certificate_round = certificate.round();
1123        // Retrieve the batch certificate committee ID.
1124        let committee_id = certificate.committee_id();
1125
1126        // Ensure the batch certificate is not from the current primary.
1127        if self.gateway.account().address() == author {
1128            bail!("Received a batch certificate for myself ({author})");
1129        }
1130
1131        // Ensure that the incoming certificate is valid.
1132        self.storage.check_incoming_certificate(&certificate)?;
1133
1134        // Store the certificate, after ensuring it is valid above.
1135        // The following call recursively fetches and stores
1136        // the previous certificates referenced from this certificate.
1137        // It is critical to make the following call this after validating the certificate above.
1138        // The reason is that a sequence of malformed certificates,
1139        // with references to previous certificates with non-decreasing rounds,
1140        // cause the recursive fetching of certificates to crash the validator due to resource exhaustion.
1141        // Note that if the following call, if not returning an error, guarantees the backward closure of the DAG
1142        // (i.e. that all the referenced previous certificates are in the DAG before storing this one),
1143        // then all the validity checks in [`Storage::check_certificate`] should be redundant.
1144        // TODO: eliminate those redundant checks
1145        self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
1146
1147        // If there are enough certificates to reach quorum threshold for the certificate round,
1148        // then proceed to advance to the next round.
1149
1150        // Retrieve the committee lookback.
1151        let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
1152
1153        // Retrieve the certificate authors.
1154        let authors = self.storage.get_certificate_authors_for_round(certificate_round);
1155        // Check if the certificates have reached the quorum threshold.
1156        let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
1157
1158        // Ensure that the batch certificate's committee ID matches the expected committee ID.
1159        let expected_committee_id = committee_lookback.id();
1160        if expected_committee_id != committee_id {
1161            // Proceed to disconnect the validator.
1162            self.gateway.disconnect(peer_ip);
1163            bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
1164        }
1165
1166        // Determine if we are currently proposing a round that is relevant.
1167        // Note: This is important, because while our peers have advanced,
1168        // they may not be proposing yet, and thus still able to sign our proposed batch.
1169        let should_advance = match &*self.proposed_batch.read() {
1170            // We advance if the proposal round is less than the current round that was just certified.
1171            Some(proposal) => proposal.round() < certificate_round,
1172            // If there's no proposal, we consider advancing.
1173            None => true,
1174        };
1175
1176        // Retrieve the current round.
1177        let current_round = self.current_round();
1178
1179        // Determine whether to advance to the next round.
1180        if is_quorum && should_advance && certificate_round >= current_round {
1181            // If we have reached the quorum threshold and the round should advance, then proceed to the next round.
1182            self.try_increment_to_the_next_round(current_round + 1).await?;
1183        }
1184        Ok(())
1185    }
1186}
1187
1188impl<N: Network> Primary<N> {
1189    /// Starts the primary handlers.
1190    ///
1191    /// For each receiver in the `primary_receiver` struct, there will be a dedicated task
1192    /// that awaits new data and handles it accordingly.
1193    /// Additionally, this spawns a task that periodically issues PrimaryPings and one that periodically
1194    /// tries to move the the next round of batches.
1195    ///
1196    /// This function is called exactly once, in `Self::run()`.
1197    fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
1198        let PrimaryReceiver {
1199            mut rx_batch_propose,
1200            mut rx_batch_signature,
1201            mut rx_batch_certified,
1202            mut rx_primary_ping,
1203            mut rx_unconfirmed_solution,
1204            mut rx_unconfirmed_transaction,
1205        } = primary_receiver;
1206
1207        // Start the primary ping sender.
1208        let self_ = self.clone();
1209        self.spawn(async move {
1210            loop {
1211                // Sleep briefly.
1212                tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
1213
1214                // Retrieve the block locators.
1215                let self__ = self_.clone();
1216                let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1217                    Ok(block_locators) => block_locators,
1218                    Err(e) => {
1219                        warn!("Failed to retrieve block locators - {e}");
1220                        continue;
1221                    }
1222                };
1223
1224                // Retrieve the latest certificate of the primary.
1225                let primary_certificate = {
1226                    // Retrieve the primary address.
1227                    let primary_address = self_.gateway.account().address();
1228
1229                    // Iterate backwards from the latest round to find the primary certificate.
1230                    let mut certificate = None;
1231                    let mut current_round = self_.current_round();
1232                    while certificate.is_none() {
1233                        // If the current round is 0, then break the while loop.
1234                        if current_round == 0 {
1235                            break;
1236                        }
1237                        // Retrieve the primary certificates.
1238                        if let Some(primary_certificate) =
1239                            self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1240                        {
1241                            certificate = Some(primary_certificate);
1242                        // If the primary certificate was not found, decrement the round.
1243                        } else {
1244                            current_round = current_round.saturating_sub(1);
1245                        }
1246                    }
1247
1248                    // Determine if the primary certificate was found.
1249                    match certificate {
1250                        Some(certificate) => certificate,
1251                        // Skip this iteration of the loop (do not send a primary ping).
1252                        None => continue,
1253                    }
1254                };
1255
1256                // Construct the primary ping.
1257                let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1258                // Broadcast the event.
1259                self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1260            }
1261        });
1262
1263        // Start the primary ping handler.
1264        let self_ = self.clone();
1265        self.spawn(async move {
1266            while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1267                // If the primary is not synced, then do not process the primary ping.
1268                if self_.sync.is_synced() {
1269                    trace!("Processing new primary ping from '{peer_ip}'");
1270                } else {
1271                    trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1272                    continue;
1273                }
1274
1275                // Spawn a task to process the primary certificate.
1276                {
1277                    let self_ = self_.clone();
1278                    tokio::spawn(async move {
1279                        // Deserialize the primary certificate in the primary ping.
1280                        let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1281                        else {
1282                            warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1283                            return;
1284                        };
1285                        // Process the primary certificate.
1286                        let id = fmt_id(primary_certificate.id());
1287                        let round = primary_certificate.round();
1288                        if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1289                            warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}");
1290                        }
1291                    });
1292                }
1293            }
1294        });
1295
1296        // Start the worker ping(s).
1297        let self_ = self.clone();
1298        self.spawn(async move {
1299            loop {
1300                tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await;
1301                // If the primary is not synced, then do not broadcast the worker ping(s).
1302                if !self_.sync.is_synced() {
1303                    trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1304                    continue;
1305                }
1306                // Broadcast the worker ping(s).
1307                for worker in self_.workers.iter() {
1308                    worker.broadcast_ping();
1309                }
1310            }
1311        });
1312
1313        // Start the batch proposer.
1314        let self_ = self.clone();
1315        self.spawn(async move {
1316            loop {
1317                // Sleep briefly, but longer than if there were no batch.
1318                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1319                let current_round = self_.current_round();
1320                // If the primary is not synced, then do not propose a batch.
1321                if !self_.sync.is_synced() {
1322                    debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed());
1323                    continue;
1324                }
1325                // A best-effort attempt to skip the scheduled batch proposal if
1326                // round progression already triggered one.
1327                if self_.propose_lock.try_lock().is_err() {
1328                    trace!(
1329                        "Skipping batch proposal for round {current_round} {}",
1330                        "(node is already proposing)".dimmed()
1331                    );
1332                    continue;
1333                };
1334                // If there is no proposed batch, attempt to propose a batch.
1335                // Note: Do NOT spawn a task around this function call. Proposing a batch is a critical path,
1336                // and only one batch needs to be proposed at a time.
1337                if let Err(e) = self_.propose_batch().await {
1338                    warn!("Cannot propose a batch - {e}");
1339                }
1340            }
1341        });
1342
1343        // Start the proposed batch handler.
1344        let self_ = self.clone();
1345        self.spawn(async move {
1346            while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1347                // If the primary is not synced, then do not sign the batch.
1348                if !self_.sync.is_synced() {
1349                    trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1350                    continue;
1351                }
1352                // Spawn a task to process the proposed batch.
1353                let self_ = self_.clone();
1354                tokio::spawn(async move {
1355                    // Process the batch proposal.
1356                    let round = batch_propose.round;
1357                    if let Err(e) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1358                        warn!("Cannot sign a batch at round {round} from '{peer_ip}' - {e}");
1359                    }
1360                });
1361            }
1362        });
1363
1364        // Start the batch signature handler.
1365        let self_ = self.clone();
1366        self.spawn(async move {
1367            while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1368                // If the primary is not synced, then do not store the signature.
1369                if !self_.sync.is_synced() {
1370                    trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1371                    continue;
1372                }
1373                // Process the batch signature.
1374                // Note: Do NOT spawn a task around this function call. Processing signatures from peers
1375                // is a critical path, and we should only store the minimum required number of signatures.
1376                // In addition, spawning a task can cause concurrent processing of signatures (even with a lock),
1377                // which means the RwLock for the proposed batch must become a 'tokio::sync' to be safe.
1378                let id = fmt_id(batch_signature.batch_id);
1379                if let Err(e) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1380                    warn!("Cannot store a signature for batch '{id}' from '{peer_ip}' - {e}");
1381                }
1382            }
1383        });
1384
1385        // Start the certified batch handler.
1386        let self_ = self.clone();
1387        self.spawn(async move {
1388            while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1389                // If the primary is not synced, then do not store the certificate.
1390                if !self_.sync.is_synced() {
1391                    trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1392                    continue;
1393                }
1394                // Spawn a task to process the batch certificate.
1395                let self_ = self_.clone();
1396                tokio::spawn(async move {
1397                    // Deserialize the batch certificate.
1398                    let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1399                        warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1400                        return;
1401                    };
1402                    // Process the batch certificate.
1403                    let id = fmt_id(batch_certificate.id());
1404                    let round = batch_certificate.round();
1405                    if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1406                        warn!("Cannot store a certificate '{id}' for round {round} from '{peer_ip}' - {e}");
1407                    }
1408                });
1409            }
1410        });
1411
1412        // This task periodically tries to move to the next round.
1413        //
1414        // Note: This is necessary to ensure that the primary is not stuck on a previous round
1415        // despite having received enough certificates to advance to the next round.
1416        let self_ = self.clone();
1417        self.spawn(async move {
1418            loop {
1419                // Sleep briefly.
1420                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1421                // If the primary is not synced, then do not increment to the next round.
1422                if !self_.sync.is_synced() {
1423                    trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1424                    continue;
1425                }
1426                // Attempt to increment to the next round.
1427                let current_round = self_.current_round();
1428                let next_round = current_round.saturating_add(1);
1429                // Determine if the quorum threshold is reached for the current round.
1430                let is_quorum_threshold_reached = {
1431                    // Retrieve the certificate authors for the current round.
1432                    let authors = self_.storage.get_certificate_authors_for_round(current_round);
1433                    // If there are no certificates, then skip this check.
1434                    if authors.is_empty() {
1435                        continue;
1436                    }
1437                    // Retrieve the committee lookback for the current round.
1438                    let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(current_round) else {
1439                        warn!("Failed to retrieve the committee lookback for round {current_round}");
1440                        continue;
1441                    };
1442                    // Check if the quorum threshold is reached for the current round.
1443                    committee_lookback.is_quorum_threshold_reached(&authors)
1444                };
1445                // Attempt to increment to the next round if the quorum threshold is reached.
1446                if is_quorum_threshold_reached {
1447                    debug!("Quorum threshold reached for round {}", current_round);
1448                    if let Err(e) = self_.try_increment_to_the_next_round(next_round).await {
1449                        warn!("Failed to increment to the next round - {e}");
1450                    }
1451                }
1452            }
1453        });
1454
1455        // Start a handler to process new unconfirmed solutions.
1456        let self_ = self.clone();
1457        self.spawn(async move {
1458            while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1459                // Compute the checksum for the solution.
1460                let Ok(checksum) = solution.to_checksum::<N>() else {
1461                    error!("Failed to compute the checksum for the unconfirmed solution");
1462                    continue;
1463                };
1464                // Compute the worker ID.
1465                let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1466                    error!("Unable to determine the worker ID for the unconfirmed solution");
1467                    continue;
1468                };
1469                let self_ = self_.clone();
1470                tokio::spawn(async move {
1471                    // Retrieve the worker.
1472                    let worker = &self_.workers[worker_id as usize];
1473                    // Process the unconfirmed solution.
1474                    let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1475                    // Send the result to the callback.
1476                    callback.send(result).ok();
1477                });
1478            }
1479        });
1480
1481        // Start a handler to process new unconfirmed transactions.
1482        let self_ = self.clone();
1483        self.spawn(async move {
1484            while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1485                trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1486                // Compute the checksum for the transaction.
1487                let Ok(checksum) = transaction.to_checksum::<N>() else {
1488                    error!("Failed to compute the checksum for the unconfirmed transaction");
1489                    continue;
1490                };
1491                // Compute the worker ID.
1492                let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1493                    error!("Unable to determine the worker ID for the unconfirmed transaction");
1494                    continue;
1495                };
1496                let self_ = self_.clone();
1497                tokio::spawn(async move {
1498                    // Retrieve the worker.
1499                    let worker = &self_.workers[worker_id as usize];
1500                    // Process the unconfirmed transaction.
1501                    let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1502                    // Send the result to the callback.
1503                    callback.send(result).ok();
1504                });
1505            }
1506        });
1507    }
1508
1509    /// Checks if the proposed batch is expired, and clears the proposed batch if it has expired.
1510    async fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1511        // Check if the proposed batch is timed out or stale.
1512        let is_expired = match self.proposed_batch.read().as_ref() {
1513            Some(proposal) => proposal.round() < self.current_round(),
1514            None => false,
1515        };
1516        // If the batch is expired, clear the proposed batch.
1517        if is_expired {
1518            // Reset the proposed batch.
1519            let proposal = self.proposed_batch.write().take();
1520            if let Some(proposal) = proposal {
1521                debug!("Cleared expired proposal for round {}", proposal.round());
1522                self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1523            }
1524        }
1525        Ok(())
1526    }
1527
1528    /// Increments to the next round.
1529    async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1530        // If the next round is within GC range, then iterate to the penultimate round.
1531        if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1532            let mut fast_forward_round = self.current_round();
1533            // Iterate until the penultimate round is reached.
1534            while fast_forward_round < next_round.saturating_sub(1) {
1535                // Update to the next round in storage.
1536                fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1537                // Clear the proposed batch.
1538                *self.proposed_batch.write() = None;
1539            }
1540        }
1541
1542        // Retrieve the current round.
1543        let current_round = self.current_round();
1544        // Attempt to advance to the next round.
1545        if current_round < next_round {
1546            // If a BFT sender was provided, send the current round to the BFT.
1547            let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
1548                match bft_sender.send_primary_round_to_bft(current_round).await {
1549                    Ok(is_ready) => is_ready,
1550                    Err(e) => {
1551                        warn!("Failed to update the BFT to the next round - {e}");
1552                        return Err(e);
1553                    }
1554                }
1555            }
1556            // Otherwise, handle the Narwhal case.
1557            else {
1558                // Update to the next round in storage.
1559                self.storage.increment_to_next_round(current_round)?;
1560                // Set 'is_ready' to 'true'.
1561                true
1562            };
1563
1564            // Log whether the next round is ready.
1565            match is_ready {
1566                true => debug!("Primary is ready to propose the next round"),
1567                false => debug!("Primary is not ready to propose the next round"),
1568            }
1569
1570            // If the node is ready, propose a batch for the next round.
1571            if is_ready {
1572                self.propose_batch().await?;
1573            }
1574        }
1575        Ok(())
1576    }
1577
1578    /// Ensures the primary is signing for the specified batch round.
1579    /// This method is used to ensure: for a given round, as soon as the primary starts proposing,
1580    /// it will no longer sign for the previous round (as it has enough previous certificates to proceed).
1581    fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1582        // Retrieve the current round.
1583        let current_round = self.current_round();
1584        // Ensure the batch round is within GC range of the current round.
1585        if current_round + self.storage.max_gc_rounds() <= batch_round {
1586            bail!("Round {batch_round} is too far in the future")
1587        }
1588        // Ensure the batch round is at or one before the current round.
1589        // Intuition: Our primary has moved on to the next round, but has not necessarily started proposing,
1590        // so we can still sign for the previous round. If we have started proposing, the next check will fail.
1591        if current_round > batch_round + 1 {
1592            bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1593        }
1594        // Check if the primary is still signing for the batch round.
1595        if let Some(signing_round) = self.proposed_batch.read().as_ref().map(|proposal| proposal.round()) {
1596            if signing_round > batch_round {
1597                bail!("Our primary at round {signing_round} is no longer signing for round {batch_round}")
1598            }
1599        }
1600        Ok(())
1601    }
1602
1603    /// Ensure the primary is not creating batch proposals too frequently.
1604    /// This checks that the certificate timestamp for the previous round is within the expected range.
1605    fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1606        // Retrieve the timestamp of the previous timestamp to check against.
1607        let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1608            // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago.
1609            Some(certificate) => certificate.timestamp(),
1610            None => match self.gateway.account().address() == author {
1611                // If we are the author, then ensure the previous proposal was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago.
1612                true => *self.latest_proposed_batch_timestamp.read(),
1613                // If we do not see a previous certificate for the author, then proceed optimistically.
1614                false => return Ok(()),
1615            },
1616        };
1617
1618        // Determine the elapsed time since the previous timestamp.
1619        let elapsed = timestamp
1620            .checked_sub(previous_timestamp)
1621            .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1622        // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago.
1623        match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1624            true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1625            false => Ok(()),
1626        }
1627    }
1628
1629    /// Stores the certified batch and broadcasts it to all validators, returning the certificate.
1630    async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1631        // Create the batch certificate and transmissions.
1632        let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1633        // Convert the transmissions into a HashMap.
1634        // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
1635        let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1636        // Store the certified batch.
1637        let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1638        spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1639        debug!("Stored a batch certificate for round {}", certificate.round());
1640        // If a BFT sender was provided, send the certificate to the BFT.
1641        if let Some(bft_sender) = self.bft_sender.get() {
1642            // Await the callback to continue.
1643            if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1644                warn!("Failed to update the BFT DAG from primary - {e}");
1645                return Err(e);
1646            };
1647        }
1648        // Broadcast the certified batch to all validators.
1649        self.gateway.broadcast(Event::BatchCertified(certificate.clone().into()));
1650        // Log the certified batch.
1651        let num_transmissions = certificate.transmission_ids().len();
1652        let round = certificate.round();
1653        info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1654        // Increment to the next round.
1655        self.try_increment_to_the_next_round(round + 1).await
1656    }
1657
1658    /// Inserts the missing transmissions from the proposal into the workers.
1659    fn insert_missing_transmissions_into_workers(
1660        &self,
1661        peer_ip: SocketAddr,
1662        transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1663    ) -> Result<()> {
1664        // Insert the transmissions into the workers.
1665        assign_to_workers(&self.workers, transmissions, |worker, transmission_id, transmission| {
1666            worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1667        })
1668    }
1669
1670    /// Re-inserts the transmissions from the proposal into the workers.
1671    fn reinsert_transmissions_into_workers(
1672        &self,
1673        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1674    ) -> Result<()> {
1675        // Re-insert the transmissions into the workers.
1676        assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1677            worker.reinsert(transmission_id, transmission);
1678        })
1679    }
1680
1681    /// Recursively stores a given batch certificate, after ensuring:
1682    ///   - Ensure the round matches the committee round.
1683    ///   - Ensure the address is a member of the committee.
1684    ///   - Ensure the timestamp is within range.
1685    ///   - Ensure we have all of the transmissions.
1686    ///   - Ensure we have all of the previous certificates.
1687    ///   - Ensure the previous certificates are for the previous round (i.e. round - 1).
1688    ///   - Ensure the previous certificates have reached the quorum threshold.
1689    ///   - Ensure we have not already signed the batch ID.
1690    #[async_recursion::async_recursion]
1691    async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1692        &self,
1693        peer_ip: SocketAddr,
1694        certificate: BatchCertificate<N>,
1695    ) -> Result<()> {
1696        // Retrieve the batch header.
1697        let batch_header = certificate.batch_header();
1698        // Retrieve the batch round.
1699        let batch_round = batch_header.round();
1700
1701        // If the certificate round is outdated, do not store it.
1702        if batch_round <= self.storage.gc_round() {
1703            return Ok(());
1704        }
1705        // If the certificate already exists in storage, return early.
1706        if self.storage.contains_certificate(certificate.id()) {
1707            return Ok(());
1708        }
1709
1710        // If node is not in sync mode and the node is not synced. Then return an error.
1711        if !IS_SYNCING && !self.is_synced() {
1712            bail!(
1713                "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1714                fmt_id(certificate.id())
1715            );
1716        }
1717
1718        // If the peer is ahead, use the batch header to sync up to the peer.
1719        let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?;
1720
1721        // Check if the certificate needs to be stored.
1722        if !self.storage.contains_certificate(certificate.id()) {
1723            // Store the batch certificate.
1724            let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1725            spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1726            debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1727            // If a BFT sender was provided, send the round and certificate to the BFT.
1728            if let Some(bft_sender) = self.bft_sender.get() {
1729                // Send the certificate to the BFT.
1730                if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1731                    warn!("Failed to update the BFT DAG from sync: {e}");
1732                    return Err(e);
1733                };
1734            }
1735        }
1736        Ok(())
1737    }
1738
1739    /// Recursively syncs using the given batch header.
1740    async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>(
1741        &self,
1742        peer_ip: SocketAddr,
1743        batch_header: &BatchHeader<N>,
1744    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1745        // Retrieve the batch round.
1746        let batch_round = batch_header.round();
1747
1748        // If the certificate round is outdated, do not store it.
1749        if batch_round <= self.storage.gc_round() {
1750            bail!("Round {batch_round} is too far in the past")
1751        }
1752
1753        // If node is not in sync mode and the node is not synced, then return an error.
1754        if !IS_SYNCING && !self.is_synced() {
1755            bail!(
1756                "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1757                fmt_id(batch_header.batch_id())
1758            );
1759        }
1760
1761        // Determine if quorum threshold is reached on the batch round.
1762        let is_quorum_threshold_reached = {
1763            let authors = self.storage.get_certificate_authors_for_round(batch_round);
1764            let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1765            committee_lookback.is_quorum_threshold_reached(&authors)
1766        };
1767
1768        // Check if our primary should move to the next round.
1769        // Note: Checking that quorum threshold is reached is important for mitigating a race condition,
1770        // whereby Narwhal requires N-f, however the BFT only requires f+1. Without this check, the primary
1771        // will advance to the next round assuming f+1, not N-f, which can lead to a network stall.
1772        let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1773        // Check if our primary is far behind the peer.
1774        let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1775        // If our primary is far behind the peer, update our committee to the batch round.
1776        if is_behind_schedule || is_peer_far_in_future {
1777            // If the batch round is greater than the current committee round, update the committee.
1778            self.try_increment_to_the_next_round(batch_round).await?;
1779        }
1780
1781        // Ensure the primary has all of the transmissions.
1782        let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1783
1784        // Ensure the primary has all of the previous certificates.
1785        let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1786
1787        // Wait for the missing transmissions and previous certificates to be fetched.
1788        let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1789            missing_transmissions_handle,
1790            missing_previous_certificates_handle,
1791        ).map_err(|e| {
1792            anyhow!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}' - {e}")
1793        })?;
1794
1795        // Iterate through the missing previous certificates.
1796        for batch_certificate in missing_previous_certificates {
1797            // Store the batch certificate (recursively fetching any missing previous certificates).
1798            self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?;
1799        }
1800        Ok(missing_transmissions)
1801    }
1802
1803    /// Fetches any missing transmissions for the specified batch header.
1804    /// If a transmission does not exist, it will be fetched from the specified peer IP.
1805    async fn fetch_missing_transmissions(
1806        &self,
1807        peer_ip: SocketAddr,
1808        batch_header: &BatchHeader<N>,
1809    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1810        // If the round is <= the GC round, return early.
1811        if batch_header.round() <= self.storage.gc_round() {
1812            return Ok(Default::default());
1813        }
1814
1815        // Ensure this batch ID is new, otherwise return early.
1816        if self.storage.contains_batch(batch_header.batch_id()) {
1817            trace!("Batch for round {} from peer has already been processed", batch_header.round());
1818            return Ok(Default::default());
1819        }
1820
1821        // Retrieve the workers.
1822        let workers = self.workers.clone();
1823
1824        // Initialize a list for the transmissions.
1825        let mut fetch_transmissions = FuturesUnordered::new();
1826
1827        // Retrieve the number of workers.
1828        let num_workers = self.num_workers();
1829        // Iterate through the transmission IDs.
1830        for transmission_id in batch_header.transmission_ids() {
1831            // If the transmission does not exist in storage, proceed to fetch the transmission.
1832            if !self.storage.contains_transmission(*transmission_id) {
1833                // Determine the worker ID.
1834                let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1835                    bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1836                };
1837                // Retrieve the worker.
1838                let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1839                // Push the callback onto the list.
1840                fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
1841            }
1842        }
1843
1844        // Initialize a set for the transmissions.
1845        let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1846        // Wait for all of the transmissions to be fetched.
1847        while let Some(result) = fetch_transmissions.next().await {
1848            // Retrieve the transmission.
1849            let (transmission_id, transmission) = result?;
1850            // Insert the transmission into the set.
1851            transmissions.insert(transmission_id, transmission);
1852        }
1853        // Return the transmissions.
1854        Ok(transmissions)
1855    }
1856
1857    /// Fetches any missing previous certificates for the specified batch header from the specified peer.
1858    async fn fetch_missing_previous_certificates(
1859        &self,
1860        peer_ip: SocketAddr,
1861        batch_header: &BatchHeader<N>,
1862    ) -> Result<HashSet<BatchCertificate<N>>> {
1863        // Retrieve the round.
1864        let round = batch_header.round();
1865        // If the previous round is 0, or is <= the GC round, return early.
1866        if round == 1 || round <= self.storage.gc_round() + 1 {
1867            return Ok(Default::default());
1868        }
1869
1870        // Fetch the missing previous certificates.
1871        let missing_previous_certificates =
1872            self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
1873        if !missing_previous_certificates.is_empty() {
1874            debug!(
1875                "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
1876                missing_previous_certificates.len(),
1877            );
1878        }
1879        // Return the missing previous certificates.
1880        Ok(missing_previous_certificates)
1881    }
1882
1883    /// Fetches any missing certificates for the specified batch header from the specified peer.
1884    async fn fetch_missing_certificates(
1885        &self,
1886        peer_ip: SocketAddr,
1887        round: u64,
1888        certificate_ids: &IndexSet<Field<N>>,
1889    ) -> Result<HashSet<BatchCertificate<N>>> {
1890        // Initialize a list for the missing certificates.
1891        let mut fetch_certificates = FuturesUnordered::new();
1892        // Initialize a set for the missing certificates.
1893        let mut missing_certificates = HashSet::default();
1894        // Iterate through the certificate IDs.
1895        for certificate_id in certificate_ids {
1896            // Check if the certificate already exists in the ledger.
1897            if self.ledger.contains_certificate(certificate_id)? {
1898                continue;
1899            }
1900            // Check if the certificate already exists in storage.
1901            if self.storage.contains_certificate(*certificate_id) {
1902                continue;
1903            }
1904            // If we have not fully processed the certificate yet, store it.
1905            if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
1906                missing_certificates.insert(certificate);
1907            } else {
1908                // If we do not have the certificate, request it.
1909                trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
1910                // TODO (howardwu): Limit the number of open requests we send to a peer.
1911                // Send an certificate request to the peer.
1912                fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
1913            }
1914        }
1915
1916        // If there are no certificates to fetch, return early with the existing unprocessed certificates.
1917        match fetch_certificates.is_empty() {
1918            true => return Ok(missing_certificates),
1919            false => trace!(
1920                "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
1921                fetch_certificates.len(),
1922            ),
1923        }
1924
1925        // Wait for all of the missing certificates to be fetched.
1926        while let Some(result) = fetch_certificates.next().await {
1927            // Insert the missing certificate into the set.
1928            missing_certificates.insert(result?);
1929        }
1930        // Return the missing certificates.
1931        Ok(missing_certificates)
1932    }
1933}
1934
1935impl<N: Network> Primary<N> {
1936    /// Spawns a task with the given future; it should only be used for long-running tasks.
1937    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1938        self.handles.lock().push(tokio::spawn(future));
1939    }
1940
1941    /// Shuts down the primary.
1942    pub async fn shut_down(&self) {
1943        info!("Shutting down the primary...");
1944        // Shut down the workers.
1945        self.workers.iter().for_each(|worker| worker.shut_down());
1946        // Abort the tasks.
1947        self.handles.lock().iter().for_each(|handle| handle.abort());
1948        // Save the current proposal cache to disk.
1949        let proposal_cache = {
1950            let proposal = self.proposed_batch.write().take();
1951            let signed_proposals = self.signed_proposals.read().clone();
1952            let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
1953            let pending_certificates = self.storage.get_pending_certificates();
1954            ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
1955        };
1956        if let Err(err) = proposal_cache.store(&self.storage_mode) {
1957            error!("Failed to store the current proposal cache: {err}");
1958        }
1959        // Close the gateway.
1960        self.gateway.shut_down().await;
1961    }
1962}
1963
1964#[cfg(test)]
1965mod tests {
1966    use super::*;
1967    use snarkos_node_bft_ledger_service::MockLedgerService;
1968    use snarkos_node_bft_storage_service::BFTMemoryService;
1969    use snarkos_node_sync::{BlockSync, locators::test_helpers::sample_block_locators};
1970    use snarkvm::{
1971        ledger::{
1972            committee::{Committee, MIN_VALIDATOR_STAKE},
1973            test_helpers::sample_execution_transaction_with_fee,
1974        },
1975        prelude::{Address, Signature},
1976    };
1977
1978    use bytes::Bytes;
1979    use indexmap::IndexSet;
1980    use rand::RngCore;
1981
1982    type CurrentNetwork = snarkvm::prelude::MainnetV0;
1983
1984    fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) {
1985        // Create a committee containing the primary's account.
1986        const COMMITTEE_SIZE: usize = 4;
1987        let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
1988        let mut members = IndexMap::new();
1989
1990        for i in 0..COMMITTEE_SIZE {
1991            let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
1992            let account = Account::new(rng).unwrap();
1993
1994            members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
1995            accounts.push((socket_addr, account));
1996        }
1997
1998        (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
1999    }
2000
2001    // Returns a primary and a list of accounts in the configured committee.
2002    fn primary_with_committee(
2003        account_index: usize,
2004        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2005        committee: Committee<CurrentNetwork>,
2006        height: u32,
2007    ) -> Primary<CurrentNetwork> {
2008        let ledger = Arc::new(MockLedgerService::new_at_height(committee, height));
2009        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
2010
2011        // Initialize the primary.
2012        let account = accounts[account_index].1.clone();
2013        let block_sync = Arc::new(BlockSync::new(ledger.clone()));
2014        let mut primary =
2015            Primary::new(account, storage, ledger, block_sync, None, &[], false, StorageMode::new_test(None), None)
2016                .unwrap();
2017
2018        // Construct a worker instance.
2019        primary.workers = Arc::from([Worker::new(
2020            0, // id
2021            Arc::new(primary.gateway.clone()),
2022            primary.storage.clone(),
2023            primary.ledger.clone(),
2024            primary.proposed_batch.clone(),
2025        )
2026        .unwrap()]);
2027        for a in accounts.iter().skip(account_index) {
2028            primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
2029        }
2030
2031        primary
2032    }
2033
2034    fn primary_without_handlers(
2035        rng: &mut TestRng,
2036    ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
2037        let (accounts, committee) = sample_committee(rng);
2038        let primary = primary_with_committee(
2039            0, // index of primary's account
2040            &accounts,
2041            committee,
2042            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(),
2043        );
2044
2045        (primary, accounts)
2046    }
2047
2048    // Creates a mock solution.
2049    fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
2050        // Sample a random fake solution ID.
2051        let solution_id = rng.r#gen::<u64>().into();
2052        // Vary the size of the solutions.
2053        let size = rng.gen_range(1024..10 * 1024);
2054        // Sample random fake solution bytes.
2055        let mut vec = vec![0u8; size];
2056        rng.fill_bytes(&mut vec);
2057        let solution = Data::Buffer(Bytes::from(vec));
2058        // Return the solution ID and solution.
2059        (solution_id, solution)
2060    }
2061
2062    // Samples a test transaction.
2063    fn sample_unconfirmed_transaction(
2064        rng: &mut TestRng,
2065    ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
2066        let transaction = sample_execution_transaction_with_fee(false, rng, 0);
2067        let id = transaction.id();
2068
2069        (id, Data::Object(transaction))
2070    }
2071
2072    // Creates a batch proposal with one solution and one transaction.
2073    fn create_test_proposal(
2074        author: &Account<CurrentNetwork>,
2075        committee: Committee<CurrentNetwork>,
2076        round: u64,
2077        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2078        timestamp: i64,
2079        num_transactions: u64,
2080        rng: &mut TestRng,
2081    ) -> Proposal<CurrentNetwork> {
2082        let mut transmission_ids = IndexSet::new();
2083        let mut transmissions = IndexMap::new();
2084
2085        // Prepare the solution and insert into the sets.
2086        let (solution_id, solution) = sample_unconfirmed_solution(rng);
2087        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2088        let solution_transmission_id = (solution_id, solution_checksum).into();
2089        transmission_ids.insert(solution_transmission_id);
2090        transmissions.insert(solution_transmission_id, Transmission::Solution(solution));
2091
2092        // Prepare the transactions and insert into the sets.
2093        for _ in 0..num_transactions {
2094            let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2095            let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2096            let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2097            transmission_ids.insert(transaction_transmission_id);
2098            transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction));
2099        }
2100
2101        // Retrieve the private key.
2102        let private_key = author.private_key();
2103        // Sign the batch header.
2104        let batch_header = BatchHeader::new(
2105            private_key,
2106            round,
2107            timestamp,
2108            committee.id(),
2109            transmission_ids,
2110            previous_certificate_ids,
2111            rng,
2112        )
2113        .unwrap();
2114        // Construct the proposal.
2115        Proposal::new(committee, batch_header, transmissions).unwrap()
2116    }
2117
2118    // Creates a signature of the primary's current proposal for each committee member (excluding
2119    // the primary).
2120    fn peer_signatures_for_proposal(
2121        primary: &Primary<CurrentNetwork>,
2122        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2123        rng: &mut TestRng,
2124    ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
2125        // Each committee member signs the batch.
2126        let mut signatures = Vec::with_capacity(accounts.len() - 1);
2127        for (socket_addr, account) in accounts {
2128            if account.address() == primary.gateway.account().address() {
2129                continue;
2130            }
2131            let batch_id = primary.proposed_batch.read().as_ref().unwrap().batch_id();
2132            let signature = account.sign(&[batch_id], rng).unwrap();
2133            signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
2134        }
2135
2136        signatures
2137    }
2138
2139    /// Creates a signature of the batch ID for each committee member (excluding the primary).
2140    fn peer_signatures_for_batch(
2141        primary_address: Address<CurrentNetwork>,
2142        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2143        batch_id: Field<CurrentNetwork>,
2144        rng: &mut TestRng,
2145    ) -> IndexSet<Signature<CurrentNetwork>> {
2146        let mut signatures = IndexSet::new();
2147        for (_, account) in accounts {
2148            if account.address() == primary_address {
2149                continue;
2150            }
2151            let signature = account.sign(&[batch_id], rng).unwrap();
2152            signatures.insert(signature);
2153        }
2154        signatures
2155    }
2156
2157    // Creates a batch certificate.
2158    fn create_batch_certificate(
2159        primary_address: Address<CurrentNetwork>,
2160        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2161        round: u64,
2162        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2163        rng: &mut TestRng,
2164    ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
2165        let timestamp = now();
2166
2167        let author =
2168            accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
2169        let private_key = author.private_key();
2170
2171        let committee_id = Field::rand(rng);
2172        let (solution_id, solution) = sample_unconfirmed_solution(rng);
2173        let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2174        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2175        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2176
2177        let solution_transmission_id = (solution_id, solution_checksum).into();
2178        let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2179
2180        let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
2181        let transmissions = [
2182            (solution_transmission_id, Transmission::Solution(solution)),
2183            (transaction_transmission_id, Transmission::Transaction(transaction)),
2184        ]
2185        .into();
2186
2187        let batch_header = BatchHeader::new(
2188            private_key,
2189            round,
2190            timestamp,
2191            committee_id,
2192            transmission_ids,
2193            previous_certificate_ids,
2194            rng,
2195        )
2196        .unwrap();
2197        let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
2198        let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
2199        (certificate, transmissions)
2200    }
2201
2202    // Create a certificate chain up to, but not including, the specified round in the primary storage.
2203    fn store_certificate_chain(
2204        primary: &Primary<CurrentNetwork>,
2205        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2206        round: u64,
2207        rng: &mut TestRng,
2208    ) -> IndexSet<Field<CurrentNetwork>> {
2209        let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2210        let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2211        for cur_round in 1..round {
2212            for (_, account) in accounts.iter() {
2213                let (certificate, transmissions) = create_batch_certificate(
2214                    account.address(),
2215                    accounts,
2216                    cur_round,
2217                    previous_certificates.clone(),
2218                    rng,
2219                );
2220                next_certificates.insert(certificate.id());
2221                assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
2222            }
2223
2224            assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
2225            previous_certificates = next_certificates;
2226            next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2227        }
2228
2229        previous_certificates
2230    }
2231
2232    // Insert the account socket addresses into the resolver so that
2233    // they are recognized as "connected".
2234    fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
2235        // First account is primary, which doesn't need to resolve.
2236        for (addr, acct) in accounts.iter().skip(1) {
2237            primary.gateway.resolver().write().insert_peer(*addr, *addr, Some(acct.address()));
2238        }
2239    }
2240
2241    #[tokio::test]
2242    async fn test_propose_batch() {
2243        let mut rng = TestRng::default();
2244        let (primary, _) = primary_without_handlers(&mut rng);
2245
2246        // Check there is no batch currently proposed.
2247        assert!(primary.proposed_batch.read().is_none());
2248
2249        // Generate a solution and a transaction.
2250        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2251        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2252
2253        // Store it on one of the workers.
2254        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2255        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2256
2257        // Try to propose a batch again. This time, it should succeed.
2258        assert!(primary.propose_batch().await.is_ok());
2259        assert!(primary.proposed_batch.read().is_some());
2260    }
2261
2262    #[tokio::test]
2263    async fn test_propose_batch_with_no_transmissions() {
2264        let mut rng = TestRng::default();
2265        let (primary, _) = primary_without_handlers(&mut rng);
2266
2267        // Check there is no batch currently proposed.
2268        assert!(primary.proposed_batch.read().is_none());
2269
2270        // Try to propose a batch with no transmissions.
2271        assert!(primary.propose_batch().await.is_ok());
2272        assert!(primary.proposed_batch.read().is_some());
2273    }
2274
2275    #[tokio::test]
2276    async fn test_propose_batch_in_round() {
2277        let round = 3;
2278        let mut rng = TestRng::default();
2279        let (primary, accounts) = primary_without_handlers(&mut rng);
2280
2281        // Fill primary storage.
2282        store_certificate_chain(&primary, &accounts, round, &mut rng);
2283
2284        // Sleep for a while to ensure the primary is ready to propose the next round.
2285        tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2286
2287        // Generate a solution and a transaction.
2288        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2289        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2290
2291        // Store it on one of the workers.
2292        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2293        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2294
2295        // Propose a batch again. This time, it should succeed.
2296        assert!(primary.propose_batch().await.is_ok());
2297        assert!(primary.proposed_batch.read().is_some());
2298    }
2299
2300    #[tokio::test]
2301    async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2302        let round = 3;
2303        let prev_round = round - 1;
2304        let mut rng = TestRng::default();
2305        let (primary, accounts) = primary_without_handlers(&mut rng);
2306        let peer_account = &accounts[1];
2307        let peer_ip = peer_account.0;
2308
2309        // Fill primary storage.
2310        store_certificate_chain(&primary, &accounts, round, &mut rng);
2311
2312        // Get transmissions from previous certificates.
2313        let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2314
2315        // Track the number of transmissions in the previous round.
2316        let mut num_transmissions_in_previous_round = 0;
2317
2318        // Generate a solution and a transaction.
2319        let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2320        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2321        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2322        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2323
2324        // Store it on one of the workers.
2325        primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2326        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2327
2328        // Check that the worker has 2 transmissions.
2329        assert_eq!(primary.workers[0].num_transmissions(), 2);
2330
2331        // Create certificates for the current round and add the transmissions to the worker before inserting the certificate to storage.
2332        for (_, account) in accounts.iter() {
2333            let (certificate, transmissions) = create_batch_certificate(
2334                account.address(),
2335                &accounts,
2336                round,
2337                previous_certificate_ids.clone(),
2338                &mut rng,
2339            );
2340
2341            // Add the transmissions to the worker.
2342            for (transmission_id, transmission) in transmissions.iter() {
2343                primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2344            }
2345
2346            // Insert the certificate to storage.
2347            num_transmissions_in_previous_round += transmissions.len();
2348            primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2349        }
2350
2351        // Sleep for a while to ensure the primary is ready to propose the next round.
2352        tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2353
2354        // Advance to the next round.
2355        assert!(primary.storage.increment_to_next_round(round).is_ok());
2356
2357        // Check that the worker has `num_transmissions_in_previous_round + 2` transmissions.
2358        assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2359
2360        // Propose the batch.
2361        assert!(primary.propose_batch().await.is_ok());
2362
2363        // Check that the proposal only contains the new transmissions that were not in previous certificates.
2364        let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone();
2365        assert_eq!(proposed_transmissions.len(), 2);
2366        assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2367        assert!(
2368            proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))
2369        );
2370    }
2371
2372    #[tokio::test]
2373    async fn test_propose_batch_over_spend_limit() {
2374        let mut rng = TestRng::default();
2375
2376        // Create a primary to test spend limit backwards compatibility with V4.
2377        let (accounts, committee) = sample_committee(&mut rng);
2378        let primary = primary_with_committee(
2379            0,
2380            &accounts,
2381            committee.clone(),
2382            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2383        );
2384
2385        // Check there is no batch currently proposed.
2386        assert!(primary.proposed_batch.read().is_none());
2387        // Check the workers are empty.
2388        primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty()));
2389
2390        // Generate a solution and transactions.
2391        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2392        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2393
2394        for _i in 0..5 {
2395            let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2396            // Store it on one of the workers.
2397            primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2398        }
2399
2400        // Try to propose a batch again. This time, it should succeed.
2401        assert!(primary.propose_batch().await.is_ok());
2402        // Expect 2/5 transactions to be included in the proposal in addition to the solution.
2403        assert_eq!(primary.proposed_batch.read().as_ref().unwrap().transmissions().len(), 3);
2404        // Check the transmissions were correctly drained from the workers.
2405        assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3);
2406    }
2407
2408    #[tokio::test]
2409    async fn test_batch_propose_from_peer() {
2410        let mut rng = TestRng::default();
2411        let (primary, accounts) = primary_without_handlers(&mut rng);
2412
2413        // Create a valid proposal with an author that isn't the primary.
2414        let round = 1;
2415        let peer_account = &accounts[1];
2416        let peer_ip = peer_account.0;
2417        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2418        let proposal = create_test_proposal(
2419            &peer_account.1,
2420            primary.ledger.current_committee().unwrap(),
2421            round,
2422            Default::default(),
2423            timestamp,
2424            1,
2425            &mut rng,
2426        );
2427
2428        // Make sure the primary is aware of the transmissions in the proposal.
2429        for (transmission_id, transmission) in proposal.transmissions() {
2430            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2431        }
2432
2433        // The author must be known to resolver to pass propose checks.
2434        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2435
2436        // The primary will only consider itself synced if we received
2437        // block locators from a peer.
2438        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2439        primary.sync.testing_only_try_block_sync_testing_only().await;
2440
2441        // Try to process the batch proposal from the peer, should succeed.
2442        assert!(
2443            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
2444        );
2445    }
2446
2447    #[tokio::test]
2448    async fn test_batch_propose_from_peer_when_not_synced() {
2449        let mut rng = TestRng::default();
2450        let (primary, accounts) = primary_without_handlers(&mut rng);
2451
2452        // Create a valid proposal with an author that isn't the primary.
2453        let round = 1;
2454        let peer_account = &accounts[1];
2455        let peer_ip = peer_account.0;
2456        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2457        let proposal = create_test_proposal(
2458            &peer_account.1,
2459            primary.ledger.current_committee().unwrap(),
2460            round,
2461            Default::default(),
2462            timestamp,
2463            1,
2464            &mut rng,
2465        );
2466
2467        // Make sure the primary is aware of the transmissions in the proposal.
2468        for (transmission_id, transmission) in proposal.transmissions() {
2469            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2470        }
2471
2472        // The author must be known to resolver to pass propose checks.
2473        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2474
2475        // Add a high block locator to indicate we are not synced.
2476        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2477
2478        // Try to process the batch proposal from the peer, should fail
2479        assert!(
2480            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2481        );
2482    }
2483
2484    #[tokio::test]
2485    async fn test_batch_propose_from_peer_in_round() {
2486        let round = 2;
2487        let mut rng = TestRng::default();
2488        let (primary, accounts) = primary_without_handlers(&mut rng);
2489
2490        // Generate certificates.
2491        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2492
2493        // Create a valid proposal with an author that isn't the primary.
2494        let peer_account = &accounts[1];
2495        let peer_ip = peer_account.0;
2496        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2497        let proposal = create_test_proposal(
2498            &peer_account.1,
2499            primary.ledger.current_committee().unwrap(),
2500            round,
2501            previous_certificates,
2502            timestamp,
2503            1,
2504            &mut rng,
2505        );
2506
2507        // Make sure the primary is aware of the transmissions in the proposal.
2508        for (transmission_id, transmission) in proposal.transmissions() {
2509            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2510        }
2511
2512        // The author must be known to resolver to pass propose checks.
2513        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2514
2515        // The primary will only consider itself synced if we received
2516        // block locators from a peer.
2517        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2518        primary.sync.testing_only_try_block_sync_testing_only().await;
2519
2520        // Try to process the batch proposal from the peer, should succeed.
2521        primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2522    }
2523
2524    #[tokio::test]
2525    async fn test_batch_propose_from_peer_wrong_round() {
2526        let mut rng = TestRng::default();
2527        let (primary, accounts) = primary_without_handlers(&mut rng);
2528
2529        // Create a valid proposal with an author that isn't the primary.
2530        let round = 1;
2531        let peer_account = &accounts[1];
2532        let peer_ip = peer_account.0;
2533        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2534        let proposal = create_test_proposal(
2535            &peer_account.1,
2536            primary.ledger.current_committee().unwrap(),
2537            round,
2538            Default::default(),
2539            timestamp,
2540            1,
2541            &mut rng,
2542        );
2543
2544        // Make sure the primary is aware of the transmissions in the proposal.
2545        for (transmission_id, transmission) in proposal.transmissions() {
2546            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2547        }
2548
2549        // The author must be known to resolver to pass propose checks.
2550        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2551        // The primary must be considered synced.
2552        primary.sync.testing_only_try_block_sync_testing_only().await;
2553
2554        // Try to process the batch proposal from the peer, should error.
2555        assert!(
2556            primary
2557                .process_batch_propose_from_peer(peer_ip, BatchPropose {
2558                    round: round + 1,
2559                    batch_header: Data::Object(proposal.batch_header().clone())
2560                })
2561                .await
2562                .is_err()
2563        );
2564    }
2565
2566    #[tokio::test]
2567    async fn test_batch_propose_from_peer_in_round_wrong_round() {
2568        let round = 4;
2569        let mut rng = TestRng::default();
2570        let (primary, accounts) = primary_without_handlers(&mut rng);
2571
2572        // Generate certificates.
2573        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2574
2575        // Create a valid proposal with an author that isn't the primary.
2576        let peer_account = &accounts[1];
2577        let peer_ip = peer_account.0;
2578        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2579        let proposal = create_test_proposal(
2580            &peer_account.1,
2581            primary.ledger.current_committee().unwrap(),
2582            round,
2583            previous_certificates,
2584            timestamp,
2585            1,
2586            &mut rng,
2587        );
2588
2589        // Make sure the primary is aware of the transmissions in the proposal.
2590        for (transmission_id, transmission) in proposal.transmissions() {
2591            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2592        }
2593
2594        // The author must be known to resolver to pass propose checks.
2595        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2596        // The primary must be considered synced.
2597        primary.sync.testing_only_try_block_sync_testing_only().await;
2598
2599        // Try to process the batch proposal from the peer, should error.
2600        assert!(
2601            primary
2602                .process_batch_propose_from_peer(peer_ip, BatchPropose {
2603                    round: round + 1,
2604                    batch_header: Data::Object(proposal.batch_header().clone())
2605                })
2606                .await
2607                .is_err()
2608        );
2609    }
2610
2611    /// Tests that the minimum batch delay is enforced as expected, i.e., that proposals with timestamps that are too close to the previous proposal are rejected.
2612    #[tokio::test]
2613    async fn test_batch_propose_from_peer_with_past_timestamp() {
2614        let round = 2;
2615        let mut rng = TestRng::default();
2616        let (primary, accounts) = primary_without_handlers(&mut rng);
2617
2618        // Generate certificates.
2619        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2620
2621        // Create a valid proposal with an author that isn't the primary.
2622        let peer_account = &accounts[1];
2623        let peer_ip = peer_account.0;
2624
2625        // Use a timestamp that is too early.
2626        // Set it to something that is less than the minimum batch delay
2627        // Note, that the minimum delay is currently 1, so this will be equal to the last timestamp
2628        let last_timestamp = primary
2629            .storage
2630            .get_certificate_for_round_with_author(round - 1, peer_account.1.address())
2631            .expect("No previous proposal exists")
2632            .timestamp();
2633        let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY_IN_SECS as i64) - 1;
2634
2635        let proposal = create_test_proposal(
2636            &peer_account.1,
2637            primary.ledger.current_committee().unwrap(),
2638            round,
2639            previous_certificates,
2640            invalid_timestamp,
2641            1,
2642            &mut rng,
2643        );
2644
2645        // Make sure the primary is aware of the transmissions in the proposal.
2646        for (transmission_id, transmission) in proposal.transmissions() {
2647            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2648        }
2649
2650        // The author must be known to resolver to pass propose checks.
2651        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2652        // The primary must be considered synced.
2653        primary.sync.testing_only_try_block_sync_testing_only().await;
2654
2655        // Try to process the batch proposal from the peer, should error.
2656        assert!(
2657            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2658        );
2659    }
2660
2661    /// Check that proposals rejected that have timestamps older than the previous proposal.
2662    #[tokio::test]
2663    async fn test_batch_propose_from_peer_over_spend_limit() {
2664        let mut rng = TestRng::default();
2665
2666        // Create two primaries to test spend limit activation on V5.
2667        let (accounts, committee) = sample_committee(&mut rng);
2668        let primary_v4 = primary_with_committee(
2669            0,
2670            &accounts,
2671            committee.clone(),
2672            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2673        );
2674        let primary_v5 = primary_with_committee(
2675            1,
2676            &accounts,
2677            committee.clone(),
2678            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V5).unwrap(),
2679        );
2680
2681        // Create a valid proposal with an author that isn't the primary.
2682        let round = 1;
2683        let peer_account = &accounts[2];
2684        let peer_ip = peer_account.0;
2685
2686        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2687
2688        let proposal =
2689            create_test_proposal(&peer_account.1, committee, round, Default::default(), timestamp, 4, &mut rng);
2690
2691        // Make sure the primary is aware of the transmissions in the proposal.
2692        for (transmission_id, transmission) in proposal.transmissions() {
2693            primary_v4.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2694            primary_v5.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2695        }
2696
2697        // The author must be known to resolver to pass propose checks.
2698        primary_v4.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2699        primary_v5.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2700
2701        // primary v4 must be considered synced.
2702        primary_v4.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2703        primary_v4.sync.testing_only_try_block_sync_testing_only().await;
2704
2705        // primary v5 must be ocnsidered synced.
2706        primary_v5.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2707        primary_v5.sync.testing_only_try_block_sync_testing_only().await;
2708
2709        // Check the spend limit is enforced from V5 onwards.
2710        assert!(
2711            primary_v4
2712                .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2713                .await
2714                .is_ok()
2715        );
2716
2717        assert!(
2718            primary_v5
2719                .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2720                .await
2721                .is_err()
2722        );
2723    }
2724
2725    #[tokio::test]
2726    async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2727        let round = 3;
2728        let mut rng = TestRng::default();
2729        let (primary, _) = primary_without_handlers(&mut rng);
2730
2731        // Check there is no batch currently proposed.
2732        assert!(primary.proposed_batch.read().is_none());
2733
2734        // Generate a solution and a transaction.
2735        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2736        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2737
2738        // Store it on one of the workers.
2739        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2740        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2741
2742        // Set the proposal lock to a round ahead of the storage.
2743        let old_proposal_lock_round = *primary.propose_lock.lock().await;
2744        *primary.propose_lock.lock().await = round + 1;
2745
2746        // Propose a batch and enforce that it fails.
2747        assert!(primary.propose_batch().await.is_ok());
2748        assert!(primary.proposed_batch.read().is_none());
2749
2750        // Set the proposal lock back to the old round.
2751        *primary.propose_lock.lock().await = old_proposal_lock_round;
2752
2753        // Try to propose a batch again. This time, it should succeed.
2754        assert!(primary.propose_batch().await.is_ok());
2755        assert!(primary.proposed_batch.read().is_some());
2756    }
2757
2758    #[tokio::test]
2759    async fn test_propose_batch_with_storage_round_behind_proposal() {
2760        let round = 5;
2761        let mut rng = TestRng::default();
2762        let (primary, accounts) = primary_without_handlers(&mut rng);
2763
2764        // Generate previous certificates.
2765        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2766
2767        // Create a valid proposal.
2768        let timestamp = now();
2769        let proposal = create_test_proposal(
2770            primary.gateway.account(),
2771            primary.ledger.current_committee().unwrap(),
2772            round + 1,
2773            previous_certificates,
2774            timestamp,
2775            1,
2776            &mut rng,
2777        );
2778
2779        // Store the proposal on the primary.
2780        *primary.proposed_batch.write() = Some(proposal);
2781
2782        // Try to propose a batch will terminate early because the storage is behind the proposal.
2783        assert!(primary.propose_batch().await.is_ok());
2784        assert!(primary.proposed_batch.read().is_some());
2785        assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
2786    }
2787
2788    #[tokio::test(flavor = "multi_thread")]
2789    async fn test_batch_signature_from_peer() {
2790        let mut rng = TestRng::default();
2791        let (primary, accounts) = primary_without_handlers(&mut rng);
2792        map_account_addresses(&primary, &accounts);
2793
2794        // Create a valid proposal.
2795        let round = 1;
2796        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2797        let proposal = create_test_proposal(
2798            primary.gateway.account(),
2799            primary.ledger.current_committee().unwrap(),
2800            round,
2801            Default::default(),
2802            timestamp,
2803            1,
2804            &mut rng,
2805        );
2806
2807        // Store the proposal on the primary.
2808        *primary.proposed_batch.write() = Some(proposal);
2809
2810        // Each committee member signs the batch.
2811        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2812
2813        // Have the primary process the signatures.
2814        for (socket_addr, signature) in signatures {
2815            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2816        }
2817
2818        // Check the certificate was created and stored by the primary.
2819        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2820        // Check the round was incremented.
2821        assert_eq!(primary.current_round(), round + 1);
2822    }
2823
2824    #[tokio::test(flavor = "multi_thread")]
2825    async fn test_batch_signature_from_peer_in_round() {
2826        let round = 5;
2827        let mut rng = TestRng::default();
2828        let (primary, accounts) = primary_without_handlers(&mut rng);
2829        map_account_addresses(&primary, &accounts);
2830
2831        // Generate certificates.
2832        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2833
2834        // Create a valid proposal.
2835        let timestamp = now();
2836        let proposal = create_test_proposal(
2837            primary.gateway.account(),
2838            primary.ledger.current_committee().unwrap(),
2839            round,
2840            previous_certificates,
2841            timestamp,
2842            1,
2843            &mut rng,
2844        );
2845
2846        // Store the proposal on the primary.
2847        *primary.proposed_batch.write() = Some(proposal);
2848
2849        // Each committee member signs the batch.
2850        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2851
2852        // Have the primary process the signatures.
2853        for (socket_addr, signature) in signatures {
2854            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2855        }
2856
2857        // Check the certificate was created and stored by the primary.
2858        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2859        // Check the round was incremented.
2860        assert_eq!(primary.current_round(), round + 1);
2861    }
2862
2863    #[tokio::test]
2864    async fn test_batch_signature_from_peer_no_quorum() {
2865        let mut rng = TestRng::default();
2866        let (primary, accounts) = primary_without_handlers(&mut rng);
2867        map_account_addresses(&primary, &accounts);
2868
2869        // Create a valid proposal.
2870        let round = 1;
2871        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2872        let proposal = create_test_proposal(
2873            primary.gateway.account(),
2874            primary.ledger.current_committee().unwrap(),
2875            round,
2876            Default::default(),
2877            timestamp,
2878            1,
2879            &mut rng,
2880        );
2881
2882        // Store the proposal on the primary.
2883        *primary.proposed_batch.write() = Some(proposal);
2884
2885        // Each committee member signs the batch.
2886        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2887
2888        // Have the primary process only one signature, mimicking a lack of quorum.
2889        let (socket_addr, signature) = signatures.first().unwrap();
2890        primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2891
2892        // Check the certificate was not created and stored by the primary.
2893        assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2894        // Check the round was incremented.
2895        assert_eq!(primary.current_round(), round);
2896    }
2897
2898    #[tokio::test]
2899    async fn test_batch_signature_from_peer_in_round_no_quorum() {
2900        let round = 7;
2901        let mut rng = TestRng::default();
2902        let (primary, accounts) = primary_without_handlers(&mut rng);
2903        map_account_addresses(&primary, &accounts);
2904
2905        // Generate certificates.
2906        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2907
2908        // Create a valid proposal.
2909        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2910        let proposal = create_test_proposal(
2911            primary.gateway.account(),
2912            primary.ledger.current_committee().unwrap(),
2913            round,
2914            previous_certificates,
2915            timestamp,
2916            1,
2917            &mut rng,
2918        );
2919
2920        // Store the proposal on the primary.
2921        *primary.proposed_batch.write() = Some(proposal);
2922
2923        // Each committee member signs the batch.
2924        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2925
2926        // Have the primary process only one signature, mimicking a lack of quorum.
2927        let (socket_addr, signature) = signatures.first().unwrap();
2928        primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2929
2930        // Check the certificate was not created and stored by the primary.
2931        assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2932        // Check the round was incremented.
2933        assert_eq!(primary.current_round(), round);
2934    }
2935
2936    #[tokio::test]
2937    async fn test_insert_certificate_with_aborted_transmissions() {
2938        let round = 3;
2939        let prev_round = round - 1;
2940        let mut rng = TestRng::default();
2941        let (primary, accounts) = primary_without_handlers(&mut rng);
2942        let peer_account = &accounts[1];
2943        let peer_ip = peer_account.0;
2944
2945        // Fill primary storage.
2946        store_certificate_chain(&primary, &accounts, round, &mut rng);
2947
2948        // Get transmissions from previous certificates.
2949        let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2950
2951        // Generate a solution and a transaction.
2952        let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2953        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2954
2955        // Store it on one of the workers.
2956        primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2957        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2958
2959        // Check that the worker has 2 transmissions.
2960        assert_eq!(primary.workers[0].num_transmissions(), 2);
2961
2962        // Create certificates for the current round.
2963        let account = accounts[0].1.clone();
2964        let (certificate, transmissions) =
2965            create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
2966        let certificate_id = certificate.id();
2967
2968        // Randomly abort some of the transmissions.
2969        let mut aborted_transmissions = HashSet::new();
2970        let mut transmissions_without_aborted = HashMap::new();
2971        for (transmission_id, transmission) in transmissions.clone() {
2972            match rng.r#gen::<bool>() || aborted_transmissions.is_empty() {
2973                true => {
2974                    // Insert the aborted transmission.
2975                    aborted_transmissions.insert(transmission_id);
2976                }
2977                false => {
2978                    // Insert the transmission without the aborted transmission.
2979                    transmissions_without_aborted.insert(transmission_id, transmission);
2980                }
2981            };
2982        }
2983
2984        // Add the non-aborted transmissions to the worker.
2985        for (transmission_id, transmission) in transmissions_without_aborted.iter() {
2986            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2987        }
2988
2989        // Check that inserting the transmission with missing transmissions fails.
2990        assert!(
2991            primary
2992                .storage
2993                .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
2994                .is_err()
2995        );
2996        assert!(
2997            primary
2998                .storage
2999                .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
3000                .is_err()
3001        );
3002
3003        // Insert the certificate to storage.
3004        primary
3005            .storage
3006            .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
3007            .unwrap();
3008
3009        // Ensure the certificate exists in storage.
3010        assert!(primary.storage.contains_certificate(certificate_id));
3011        // Ensure that the aborted transmission IDs exist in storage.
3012        for aborted_transmission_id in aborted_transmissions {
3013            assert!(primary.storage.contains_transmission(aborted_transmission_id));
3014            assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
3015        }
3016    }
3017}