Skip to main content

snarkos_node_bft/
primary.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod proposal_task;
17pub use proposal_task::ProposalTask;
18
19use crate::{
20    Gateway,
21    MAX_BATCH_DELAY,
22    MAX_LEADER_CERTIFICATE_DELAY,
23    MAX_WORKERS,
24    MIN_BATCH_DELAY,
25    PRIMARY_PING_INTERVAL,
26    Sync,
27    Transport,
28    WORKER_PING_INTERVAL,
29    Worker,
30    events::{BatchPropose, BatchSignature, Event},
31    helpers::{
32        PrimaryReceiver,
33        PrimarySender,
34        Proposal,
35        ProposalCache,
36        SignedProposals,
37        Storage,
38        assign_to_worker,
39        assign_to_workers,
40        fmt_id,
41        init_sync_channels,
42        init_worker_channels,
43        now,
44    },
45    spawn_blocking,
46    sync::SyncCallback,
47};
48
49use snarkos_account::Account;
50use snarkos_node_bft_events::PrimaryPing;
51use snarkos_node_bft_ledger_service::LedgerService;
52#[cfg(test)]
53use snarkos_node_network::ConnectionMode;
54use snarkos_node_network::PeerPoolHandling;
55use snarkos_node_sync::{BlockSync, DUMMY_SELF_IP, Ping};
56use snarkos_utilities::{CallbackHandle, NodeDataDir};
57
58use snarkvm::{
59    console::{
60        prelude::*,
61        types::{Address, Field},
62    },
63    ledger::{
64        block::Transaction,
65        narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
66        puzzle::{Solution, SolutionID},
67    },
68    prelude::{Signature, committee::Committee},
69    utilities::flatten_error,
70};
71
72use anyhow::Context;
73use colored::Colorize;
74use futures::stream::{FuturesUnordered, StreamExt};
75use indexmap::{IndexMap, IndexSet};
76#[cfg(feature = "locktick")]
77use locktick::{
78    parking_lot::{Mutex, RwLock},
79    tokio::RwLock as TRwLock,
80};
81#[cfg(not(feature = "locktick"))]
82use parking_lot::{Mutex, RwLock};
83#[cfg(not(feature = "serial"))]
84use rayon::prelude::*;
85use std::{
86    collections::{HashMap, HashSet},
87    future::Future,
88    net::SocketAddr,
89    pin::Pin,
90    sync::{Arc, OnceLock},
91    time::Instant,
92};
93#[cfg(not(feature = "locktick"))]
94use tokio::sync::RwLock as TRwLock;
95use tokio::{sync::Notify, task::JoinHandle};
96
97/// The state of the primary's batch proposal.
98#[derive(Debug, PartialEq, Eq)]
99pub enum ProposedBatchState<N: Network> {
100    /// No batch is currently being proposed.
101    None,
102    /// A batch is being proposed and awaiting signatures.
103    Certifying(Box<Proposal<N>>),
104    /// A batch has reached quorum and is being inserted into storage.
105    /// Carries the batch ID so late-arriving signatures can be recognized and silently dropped.
106    Certified(Field<N>),
107}
108
109impl<N: Network> Default for ProposedBatchState<N> {
110    fn default() -> Self {
111        Self::None
112    }
113}
114
115impl<N: Network> ProposedBatchState<N> {
116    /// Returns `true` if the primary has no active batch proposal.
117    pub fn is_none(&self) -> bool {
118        matches!(self, Self::None)
119    }
120
121    /// Returns `true` if a batch is currently being proposed (awaiting signatures).
122    pub fn is_proposed(&self) -> bool {
123        matches!(self, Self::Certifying(_))
124    }
125
126    /// Returns a reference to the in-progress proposal, or `None` if not in the `Certifying` state.
127    pub fn as_proposal(&self) -> Option<&Proposal<N>> {
128        match self {
129            Self::Certifying(p) => Some(p.as_ref()),
130            _ => None,
131        }
132    }
133}
134
135/// A helper type to keep track of the state of the primary's batch proposal.
136pub type ProposedBatch<N> = RwLock<ProposedBatchState<N>>;
137
138/// This callback trait allows listening to changes in the Primary, such as round advancement.
139/// This is implemented by [`BFT`].
140#[async_trait::async_trait]
141pub trait PrimaryCallback<N: Network>: Send + std::marker::Sync {
142    /// Asks the callback to if we can move to the next round.
143    ///
144    /// # Arguments
145    /// * `current_round` - the round the Primary is in (to avoid race conditions)
146    ///
147    /// # Returns
148    /// `true` if we moved to the next round.
149    fn try_advance_to_next_round(&self, current_round: u64) -> bool;
150
151    /// Add a certificated that was created by the primary or received from a peer.
152    async fn add_new_certificate(&self, certificate: BatchCertificate<N>) -> Result<()>;
153}
154
155/// The primary logic of a node.
156/// AleoBFT adopts a primary-worker architecture as described in the Narwhal and Tusk paper (Section 4.2).
157#[derive(Clone)]
158pub struct Primary<N: Network> {
159    /// The sync module enables fetching data from other validators.
160    sync: Sync<N>,
161    /// The gateway allows talking to other nodes in the validator set.
162    gateway: Gateway<N>,
163    /// The storage.
164    storage: Storage<N>,
165    /// The ledger service.
166    ledger: Arc<dyn LedgerService<N>>,
167    /// The workers.
168    workers: Arc<OnceLock<Vec<Worker<N>>>>,
169
170    /// The primary callback (used by [`BFT`]).
171    primary_callback: Arc<CallbackHandle<Arc<dyn PrimaryCallback<N>>>>,
172
173    /// The batch proposal, if the primary is currently proposing a batch.
174    proposed_batch: Arc<ProposedBatch<N>>,
175
176    /// The instant at which the current batch was proposed (used to measure certification latency).
177    /// (used for higher precision in the metrics compared to the batch timestamp)
178    #[cfg(feature = "metrics")]
179    batch_propose_start: Arc<Mutex<Option<Instant>>>,
180
181    /// Holds the most recent round and timestamp that the primary proposed a batch for.
182    /// TODO(kaimast): avoiding using an async lock here, so this can be merged with the `proposed_batch`,
183    /// to have a unified `primary_state` field.
184    latest_proposal_timestamp: Arc<TRwLock<Option<(u64, i64)>>>,
185
186    /// The recently-signed batch proposals.
187    signed_proposals: Arc<RwLock<SignedProposals<N>>>,
188
189    /// The handles for all background tasks spawned by this primary.
190    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
191
192    /// The node configuration directory.
193    node_data_dir: NodeDataDir,
194
195    /// Manages proposal readiness state and drives the batch proposal loop.
196    proposal_task: ProposalTask<N>,
197
198    /// Used to wake up a the dedicated round-increment task, if we may be able to advance to the next round.
199    /// This is used, so the timeout for round advancement is reset on every round increment.
200    round_increment_notify: Arc<Notify>,
201}
202
203impl<N: Network> Primary<N> {
204    /// The maximum number of unconfirmed transmissions to send to the primary.
205    pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
206
207    /// Initializes a new primary instance.
208    #[allow(clippy::too_many_arguments)]
209    pub fn new(
210        account: Account<N>,
211        storage: Storage<N>,
212        ledger: Arc<dyn LedgerService<N>>,
213        block_sync: Arc<BlockSync<N>>,
214        ip: Option<SocketAddr>,
215        trusted_validators: &[SocketAddr],
216        trusted_peers_only: bool,
217        node_data_dir: NodeDataDir,
218        dev: Option<u16>,
219    ) -> Result<Self> {
220        // Initialize the gateway.
221        let gateway = Gateway::new(
222            account,
223            storage.clone(),
224            ledger.clone(),
225            ip,
226            trusted_validators,
227            trusted_peers_only,
228            node_data_dir.clone(),
229            dev,
230        )?;
231        // Initialize the sync module.
232        let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone(), block_sync);
233
234        // Initialize the primary instance.
235        Ok(Self {
236            sync,
237            gateway,
238            storage,
239            ledger,
240            node_data_dir,
241            workers: Default::default(),
242            primary_callback: Default::default(),
243            proposed_batch: Default::default(),
244            #[cfg(feature = "metrics")]
245            batch_propose_start: Default::default(),
246            latest_proposal_timestamp: Default::default(),
247            signed_proposals: Default::default(),
248            handles: Default::default(),
249            proposal_task: Default::default(),
250            round_increment_notify: Default::default(),
251        })
252    }
253
254    /// Load the proposal cache file and update the Primary state with the stored data.
255    async fn load_proposal_cache(&self) -> Result<()> {
256        // Fetch the signed proposals from the file system if it exists.
257        match ProposalCache::<N>::exists(&self.node_data_dir) {
258            // If the proposal cache exists, then process the proposal cache.
259            true => match ProposalCache::<N>::load(self.gateway.account().address(), &self.node_data_dir) {
260                Ok(proposal_cache) => {
261                    // Extract the proposal and signed proposals.
262                    let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
263                        proposal_cache.into();
264
265                    *self.latest_proposal_timestamp.write().await = Some((latest_certificate_round, now()));
266                    *self.proposed_batch.write() = match proposed_batch {
267                        Some(p) => ProposedBatchState::Certifying(Box::new(p)),
268                        None => ProposedBatchState::None,
269                    };
270                    *self.signed_proposals.write() = signed_proposals;
271
272                    // Update the storage with the pending certificates.
273                    for certificate in pending_certificates {
274                        let batch_id = certificate.batch_id();
275                        // We use a dummy IP because the node should not need to request from any peers.
276                        // The storage should have stored all the transmissions. If not, we simply
277                        // skip the certificate.
278                        if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
279                        {
280                            let err = err.context(format!(
281                                "Failed to load stored certificate {} from proposal cache",
282                                fmt_id(batch_id)
283                            ));
284                            warn!("{}", &flatten_error(err));
285                        }
286                    }
287                    Ok(())
288                }
289                Err(err) => Err(err.context("Failed to read the signed proposals from the file system")),
290            },
291            // If the proposal cache does not exist, then return early.
292            false => Ok(()),
293        }
294    }
295
296    /// Run the primary instance.
297    pub async fn run(
298        &self,
299        ping: Option<Arc<Ping<N>>>,
300        primary_callback: Option<Arc<dyn PrimaryCallback<N>>>,
301        sync_callback: Option<Arc<dyn SyncCallback<N>>>,
302        primary_sender: PrimarySender<N>,
303        primary_receiver: PrimaryReceiver<N>,
304    ) -> Result<()> {
305        info!("Starting the primary instance of the memory pool...");
306
307        // Set the BFT sender.
308        if let Some(callback) = primary_callback {
309            self.primary_callback.set(callback)?;
310        }
311
312        // Construct a map of the worker senders.
313        let mut worker_senders = IndexMap::new();
314        // Construct a map for the workers.
315        let mut workers = Vec::new();
316        // Initialize the workers.
317        for id in 0..MAX_WORKERS {
318            // Construct the worker channels.
319            let (tx_worker, rx_worker) = init_worker_channels();
320            // Construct the worker instance.
321            let worker = Worker::new(
322                id,
323                Arc::new(self.gateway.clone()),
324                self.storage.clone(),
325                self.ledger.clone(),
326                self.proposed_batch.clone(),
327            )?;
328            // Run the worker instance.
329            worker.run(rx_worker);
330            // Add the worker to the list of workers.
331            workers.push(worker);
332            // Add the worker sender to the map.
333            worker_senders.insert(id, tx_worker);
334        }
335        // Set the workers.
336        if self.workers.set(workers).is_err() {
337            bail!("Workers already set. `Primary::run` cannot be called more than once.");
338        }
339
340        // First, initialize the sync channels.
341        let (sync_sender, sync_receiver) = init_sync_channels();
342        // Next, initialize the sync module and sync the storage from ledger.
343        self.sync.initialize(sync_callback)?;
344        // Next, load and process the proposal cache before running the sync module.
345        self.load_proposal_cache().await?;
346        // Next, run the sync module.
347        self.sync.run(ping, sync_receiver).await?;
348        // Next, initialize the gateway.
349        self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
350        // Lastly, start the primary handlers.
351        // Note: This ensures the primary does not start communicating before syncing is complete.
352        self.start_handlers(primary_receiver);
353
354        Ok(())
355    }
356
357    /// Returns the current round.
358    pub fn current_round(&self) -> u64 {
359        self.storage.current_round()
360    }
361
362    /// Returns `true` if the primary is synced.
363    pub fn is_synced(&self) -> bool {
364        self.sync.is_synced()
365    }
366
367    /// Returns the gateway.
368    pub const fn gateway(&self) -> &Gateway<N> {
369        &self.gateway
370    }
371
372    /// Returns the storage.
373    pub const fn storage(&self) -> &Storage<N> {
374        &self.storage
375    }
376
377    /// Returns the ledger.
378    pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
379        &self.ledger
380    }
381
382    /// Returns the number of workers.
383    pub fn num_workers(&self) -> u8 {
384        u8::try_from(self.workers.get().expect("Primary is not running yet").len()).expect("Too many workers")
385    }
386
387    /// Returns the workers.
388    pub fn workers(&self) -> &[Worker<N>] {
389        self.workers.get().expect("Primary is not running yet")
390    }
391}
392
393impl<N: Network> Primary<N> {
394    /// Returns the number of unconfirmed transmissions.
395    pub fn num_unconfirmed_transmissions(&self) -> usize {
396        self.workers().iter().map(|worker| worker.num_transmissions()).sum()
397    }
398
399    /// Returns the number of unconfirmed ratifications.
400    pub fn num_unconfirmed_ratifications(&self) -> usize {
401        self.workers().iter().map(|worker| worker.num_ratifications()).sum()
402    }
403
404    /// Returns the number of unconfirmed solutions.
405    pub fn num_unconfirmed_solutions(&self) -> usize {
406        self.workers().iter().map(|worker| worker.num_solutions()).sum()
407    }
408
409    /// Returns the number of unconfirmed transactions.
410    pub fn num_unconfirmed_transactions(&self) -> usize {
411        self.workers().iter().map(|worker| worker.num_transactions()).sum()
412    }
413}
414
415impl<N: Network> Primary<N> {
416    /// Returns the worker transmission IDs.
417    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
418        self.workers().iter().flat_map(|worker| worker.transmission_ids())
419    }
420
421    /// Returns the worker transmissions.
422    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
423        self.workers().iter().flat_map(|worker| worker.transmissions())
424    }
425
426    /// Returns the worker solutions.
427    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
428        self.workers().iter().flat_map(|worker| worker.solutions())
429    }
430
431    /// Returns the worker transactions.
432    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
433        self.workers().iter().flat_map(|worker| worker.transactions())
434    }
435}
436
437impl<N: Network> Primary<N> {
438    /// Clears the worker solutions.
439    pub fn clear_worker_solutions(&self) {
440        self.workers().iter().for_each(Worker::clear_solutions);
441    }
442}
443
444#[async_trait::async_trait]
445impl<N: Network> proposal_task::BatchPropose for Primary<N> {
446    fn current_round(&self) -> u64 {
447        Primary::current_round(self)
448    }
449
450    fn wait_for_synced_if_syncing(&self) -> Option<futures::future::BoxFuture<'_, ()>> {
451        self.sync.wait_for_synced_if_syncing()
452    }
453
454    fn is_synced(&self) -> bool {
455        self.sync.is_synced()
456    }
457
458    /// Proposes the batch for the current round.
459    ///
460    /// This method performs the following steps:
461    /// 1. Drain the workers.
462    /// 2. Sign the batch.
463    /// 3. Set the batch proposal in the primary.
464    /// 4. Broadcast the batch header to all validators for signing.
465    ///
466    /// # Returns
467    /// - `Ok(true)` if the batch was proposed.
468    /// - `Ok(false)` if the batch was not proposed for a benign reason, e.g., the timestamp is too soon after the previous certificate.
469    /// - `Err(err)` if an unexpected error occured.
470    async fn propose_batch(&self) -> Result<bool> {
471        // Ensure there are not concurrent executions of this function.
472        //
473        // Note, in the current design, this function is only invoked from the batch proposal task, and it is technically
474        // not possible for there to be concurrent invocations of the function, but we keep this lock for now.
475        let mut lock_guard = self.latest_proposal_timestamp.write().await;
476
477        // Check if the proposed batch has expired, and clear it if it has expired.
478        if let Err(err) = self
479            .check_proposed_batch_for_expiration()
480            .with_context(|| "Failed to check the proposed batch for expiration")
481        {
482            warn!("{}", flatten_error(&err));
483            return Ok(false);
484        }
485
486        // Retrieve the current round.
487        let round = self.current_round();
488        // Compute the previous round.
489        let previous_round = round.saturating_sub(1);
490
491        // If the current round is 0, return early.
492        // This can actually never happen, because of the invariant that the current round is never 0
493        // (see [`StorageInner::current_round`]).
494        ensure!(round > 0, "Round 0 cannot have transaction batches");
495
496        // If the current storage round is below the latest proposal round, then return early.
497        if let Some((latest_round, _)) = &*lock_guard
498            && round < *latest_round
499        {
500            warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {latest_round}");
501            return Ok(false);
502        }
503
504        // If there is a batch being proposed or certified already, handle accordingly.
505        match &*self.proposed_batch.read() {
506            ProposedBatchState::Certifying(proposal) => {
507                // Ensure that the storage is caught up to the proposal before proceeding to rebroadcast this.
508                if round < proposal.round()
509                    || proposal
510                        .batch_header()
511                        .previous_certificate_ids()
512                        .iter()
513                        .any(|id| !self.storage.contains_certificate(*id))
514                {
515                    warn!(
516                        "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
517                        proposal.round(),
518                    );
519                    return Ok(false);
520                }
521                // Construct the event.
522                // TODO(ljedrz): the BatchHeader should be serialized only once in advance before being sent to non-signers.
523                let event = Event::BatchPropose(proposal.batch_header().clone().into());
524                // Iterate through the non-signers.
525                for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
526                    // Resolve the address to the peer IP.
527                    match self.gateway.resolver().read().get_peer_ip_for_address(address) {
528                        // Resend the batch proposal to the validator for signing.
529                        Some(peer_ip) => {
530                            let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
531                            tokio::spawn(async move {
532                                debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
533                                // Resend the batch proposal to the peer.
534                                if gateway.send(peer_ip, event_).await.is_none() {
535                                    warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
536                                }
537                            });
538                        }
539                        None => continue,
540                    }
541                }
542                debug!("Proposed batch for round {} is still valid", proposal.round());
543                return Ok(false);
544            }
545            // A batch is being certified; wait until it completes before proposing another.
546            ProposedBatchState::Certified(_) => {
547                debug!("Cannot propose a batch for round {round} - a batch is currently being certified");
548                return Ok(false);
549            }
550            ProposedBatchState::None => {
551                // No batch in progress, so it is save to propose a new one.
552            }
553        }
554
555        #[cfg(feature = "metrics")]
556        metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
557
558        // Ensure that the primary does not create a new proposal too quickly.
559        if let Some((_, latest_timestamp)) = &*lock_guard
560            && !self.check_own_proposal_timestamp(previous_round, *latest_timestamp, now())?
561        {
562            return Ok(false);
563        }
564
565        // Ensure the primary has not proposed a batch for this round before.
566        if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
567            // If a BFT sender was provided, attempt to advance the current round.
568            if let Some(cb) = &*self.primary_callback.get_ref() {
569                match cb.try_advance_to_next_round(self.current_round()) {
570                    true => (), // continue,
571                    false => return Ok(false),
572                }
573            }
574            debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
575            return Ok(false);
576        }
577
578        // Determine if the current round has been proposed.
579        // Note: Do NOT make this judgment in advance before rebroadcast and round update. Rebroadcasting is
580        // good for network reliability and should not be prevented for the already existing proposed_batch.
581        // If a certificate already exists for the current round, an attempt should be made to advance the
582        // round as early as possible.
583        if let Some((latest_round, _)) = &*lock_guard
584            && *latest_round == round
585        {
586            debug!("Primary is safely skipping a batch proposal - round {round} already proposed");
587            return Ok(false);
588        }
589
590        // Retrieve the committee to check against.
591        let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
592        // Check if the primary is connected to enough validators to reach quorum threshold.
593        {
594            // Retrieve the connected validator addresses.
595            let mut connected_validators = self.gateway.connected_addresses();
596            // Append the primary to the set.
597            connected_validators.insert(self.gateway.account().address());
598            // If quorum threshold is not reached, return early.
599            if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
600                debug!(
601                    "Primary is safely skipping a batch proposal for round {round} {}",
602                    "(please connect to more validators)".dimmed()
603                );
604                trace!("Primary is connected to {} validators", connected_validators.len() - 1);
605                return Ok(false);
606            }
607        }
608
609        // Retrieve the previous certificates.
610        let previous_certificates = self.storage.get_certificates_for_round(previous_round);
611
612        // Check if the batch is ready to be proposed.
613        // Note: The primary starts at round 1, and round 0 contains no certificates, by definition.
614        let mut is_ready = previous_round == 0;
615        // If the previous round is not 0, check if the previous certificates have reached the quorum threshold.
616        if previous_round > 0 {
617            // Retrieve the committee lookback for the round.
618            let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
619                bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
620            };
621            // Construct a set over the authors.
622            let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
623            // Check if the previous certificates have reached the quorum threshold.
624            if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
625                is_ready = true;
626            }
627            #[cfg(feature = "test_network")]
628            {
629                // If we are using a hotswapped dev committee, use simplified checks to more easily advance.
630                if let Some(dev_committee) = self.ledger.dev_committee_for_round(previous_round)? {
631                    if round <= dev_committee.starting_round() {
632                        is_ready = true;
633                    }
634                }
635            }
636        }
637        // If the batch is not ready to be proposed, return early.
638        if !is_ready {
639            debug!(
640                "Primary is safely skipping a batch proposal for round {round} {}",
641                format!("(previous round {previous_round} has not reached quorum)").dimmed()
642            );
643            return Ok(false);
644        }
645
646        // Initialize the map of transmissions.
647        let mut transmissions: IndexMap<_, _> = Default::default();
648        // Track the total execution costs of the batch proposal as it is being constructed.
649        let mut proposal_cost = 0u64;
650        // Note: worker draining and transaction inclusion needs to be thought
651        // through carefully when there is more than one worker. The fairness
652        // provided by one worker (FIFO) is no longer guaranteed with multiple workers.
653        debug_assert_eq!(MAX_WORKERS, 1);
654
655        'outer: for worker in self.workers().iter() {
656            let mut num_worker_transmissions = 0usize;
657
658            while let Some((id, transmission)) = worker.remove_front() {
659                // Check the selected transmissions are below the batch limit.
660                if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
661                    // Reinsert the transmission into the worker.
662                    worker.insert_front(id, transmission);
663                    break 'outer;
664                }
665
666                // Check the max transmissions per worker is not exceeded.
667                if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER {
668                    // Reinsert the transmission into the worker.
669                    worker.insert_front(id, transmission);
670                    continue 'outer;
671                }
672
673                // Check if the ledger already contains the transmission.
674                if self.ledger.contains_transmission(&id).unwrap_or(true) {
675                    trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
676                    continue;
677                }
678
679                // Check if the storage already contain the transmission.
680                // Note: We do not skip if this is the first transmission in the proposal, to ensure that
681                // the primary does not propose a batch with no transmissions.
682                if !transmissions.is_empty() && self.storage.contains_transmission(id) {
683                    trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
684                    continue;
685                }
686
687                // Check the transmission is still valid.
688                match (id, transmission.clone()) {
689                    (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
690                        // Ensure the checksum matches. If not, skip the solution.
691                        if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum)
692                        {
693                            trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id));
694                            continue;
695                        }
696                        // Check if the solution is still valid.
697                        if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
698                            trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
699                            continue;
700                        }
701                    }
702                    (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
703                        // Ensure the checksum matches. If not, skip the transaction.
704                        if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum )
705                        {
706                            trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id));
707                            continue;
708                        }
709
710                        // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
711                        let transaction = spawn_blocking!({
712                            match transaction {
713                                Data::Object(transaction) => Ok(transaction),
714                                Data::Buffer(bytes) => Ok(Transaction::<N>::read_le(
715                                    &mut bytes.take(N::LATEST_MAX_TRANSACTION_SIZE() as u64),
716                                )?),
717                            }
718                        })?;
719
720                        // Fetch the current block height and consensus version.
721                        let current_block_height = self.ledger.latest_block_height();
722                        let consensus_version = N::CONSENSUS_VERSION(current_block_height)?;
723
724                        // Compute the transaction spent cost (in microcredits).
725                        // Note: We purposefully discard this transaction if we are unable to compute the spent cost.
726                        let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
727                        else {
728                            debug!(
729                                "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost",
730                                fmt_id(transaction_id)
731                            );
732                            continue;
733                        };
734
735                        // Check if the transaction is still valid.
736                        if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
737                            trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
738                            continue;
739                        }
740
741                        // Compute the next proposal cost.
742                        // Note: We purposefully discard this transaction if the proposal cost overflows.
743                        let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
744                            debug!(
745                                "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed",
746                                fmt_id(transaction_id)
747                            );
748                            continue;
749                        };
750
751                        // Check if the next proposal cost exceeds the batch proposal spend limit.
752                        let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(current_block_height);
753                        if next_proposal_cost > batch_spend_limit {
754                            debug!(
755                                "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})",
756                                fmt_id(transaction_id),
757                                batch_spend_limit
758                            );
759
760                            // Reinsert the transmission into the worker.
761                            worker.insert_front(id, transmission);
762                            break 'outer;
763                        }
764
765                        // Update the proposal cost.
766                        proposal_cost = next_proposal_cost;
767                    }
768
769                    // Note: We explicitly forbid including ratifications,
770                    // as the protocol currently does not support ratifications.
771                    (TransmissionID::Ratification, Transmission::Ratification) => continue,
772                    // All other combinations are clearly invalid.
773                    _ => continue,
774                }
775
776                // If the transmission is valid, insert it into the proposal's transmission list.
777                transmissions.insert(id, transmission);
778                num_worker_transmissions = num_worker_transmissions.saturating_add(1);
779            }
780        }
781
782        // Determine the current timestamp.
783        let current_timestamp = now();
784
785        /* Proceeding to sign & propose the batch. */
786        info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
787
788        // Update the latest proposed round and timestamp.
789        *lock_guard = Some((round, current_timestamp));
790        // Retrieve the private key.
791        let private_key = *self.gateway.account().private_key();
792        // Retrieve the committee ID.
793        let committee_id = committee_lookback.id();
794        // Prepare the transmission IDs.
795        let transmission_ids = transmissions.keys().copied().collect();
796        // Prepare the previous batch certificate IDs.
797        let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
798        // Sign the batch header and construct the proposal.
799        let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
800            &private_key,
801            round,
802            current_timestamp,
803            committee_id,
804            transmission_ids,
805            previous_certificate_ids,
806            &mut rand::rng()
807        ))
808        .and_then(|batch_header| {
809            Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
810                .map(|proposal| (batch_header, proposal))
811        })
812        .inspect_err(|_| {
813            // On error, reinsert the transmissions and then propagate the error.
814            if let Err(err) = self.reinsert_transmissions_into_workers(transmissions) {
815                error!("{}", flatten_error(err.context("Failed to reinsert transmissions")));
816            }
817        })?;
818
819        // Broadcast the batch to all validators for signing.
820        self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
821        // Store the proposal in memory.
822        *self.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
823        // Record the wall-clock time at which the batch was proposed.
824        #[cfg(feature = "metrics")]
825        {
826            *self.batch_propose_start.lock() = Some(Instant::now());
827        }
828
829        Ok(true)
830    }
831}
832
833impl<N: Network> Primary<N> {
834    /// Processes a batch propose from a peer.
835    ///
836    /// This method performs the following steps:
837    /// 1. Verify the batch.
838    /// 2. Sign the batch.
839    /// 3. Broadcast the signature back to the validator.
840    ///
841    /// If our primary is ahead of the peer, we will not sign the batch.
842    /// If our primary is behind the peer, but within GC range, we will sync up to the peer's round, and then sign the batch.
843    async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
844        let BatchPropose { round: batch_round, batch_header } = batch_propose;
845
846        // Deserialize the batch header.
847        let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
848        // Ensure the round matches in the batch header.
849        if batch_round != batch_header.round() {
850            // Proceed to disconnect the validator.
851            self.gateway.disconnect(peer_ip);
852            bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
853        }
854
855        // Retrieve the batch author.
856        let batch_author = batch_header.author();
857
858        // Ensure the batch proposal is from the validator.
859        match self.gateway.resolve_to_aleo_addr(peer_ip) {
860            // If the peer is a validator, then ensure the batch proposal is from the validator.
861            Some(address) => {
862                if address != batch_author {
863                    // Proceed to disconnect the validator.
864                    self.gateway.disconnect(peer_ip);
865                    bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
866                }
867            }
868            None => bail!("Batch proposal from a disconnected validator"),
869        }
870        // Ensure the batch author is a current committee member.
871        if !self.gateway.is_authorized_validator_address(batch_author) {
872            // Proceed to disconnect the validator.
873            self.gateway.disconnect(peer_ip);
874            bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
875        }
876        // Ensure the batch proposal is not from the current primary.
877        if self.gateway.account().address() == batch_author {
878            bail!("Invalid peer - proposed batch from myself ({batch_author})");
879        }
880
881        // Ensure that the batch proposal's committee ID matches the expected committee ID.
882        // This may happen when the network forks. A transaction referencing a
883        // state root from one side of the fork will be aborted on the other
884        // side of the fork. This leads to a different view of stake and
885        // therefore a different view of the committee ID.
886        let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
887        if expected_committee_id != batch_header.committee_id() {
888            // Proceed to disconnect the validator.
889            self.gateway.disconnect(peer_ip);
890            bail!(
891                "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
892                batch_header.committee_id()
893            );
894        }
895
896        // Retrieve the cached round and batch ID for this validator.
897        if let Some((signed_round, signed_batch_id, signature)) =
898            self.signed_proposals.read().get(&batch_author).copied()
899        {
900            // If the signed round is ahead of the peer's batch round, do not sign the proposal.
901            // Note: while this may be valid behavior, additional formal analysis and testing will need to be done before allowing it.
902            if signed_round > batch_header.round() {
903                bail!(
904                    "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}",
905                    batch_header.round()
906                );
907            }
908
909            // If the round matches and the batch ID differs, then the validator is malicious.
910            if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
911                bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
912            }
913            // If the round and batch ID matches, then skip signing the batch a second time.
914            // Instead, rebroadcast the cached signature to the peer.
915            if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
916                let gateway = self.gateway.clone();
917                tokio::spawn(async move {
918                    debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
919                    let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
920                    // Resend the batch signature to the peer.
921                    if gateway.send(peer_ip, event).await.is_none() {
922                        warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
923                    }
924                });
925                // Return early.
926                return Ok(());
927            }
928        }
929
930        // Ensure that the batch header doesn't already exist in storage.
931        // Note this is already checked in `check_batch_header`, however we can return early here without creating a blocking task.
932        if self.storage.contains_batch(batch_header.batch_id()) {
933            debug!(
934                "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
935                format!("batch for round {batch_round} already exists in storage").dimmed()
936            );
937            return Ok(());
938        }
939
940        // Compute the previous round.
941        let previous_round = batch_round.saturating_sub(1);
942        // Ensure that the peer did not propose a batch too quickly.
943        if let Err(err) = self.check_peer_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
944            // Proceed to disconnect the validator.
945            self.gateway.disconnect(peer_ip);
946            return Err(err.context(format!("Malicious behavior of peer '{peer_ip}'")));
947        }
948
949        // Ensure the batch header does not contain any ratifications.
950        if batch_header.contains(TransmissionID::Ratification) {
951            // Proceed to disconnect the validator.
952            self.gateway.disconnect(peer_ip);
953            bail!(
954                "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'",
955            );
956        }
957
958        // If the peer is ahead, use the batch header to sync up to the peer.
959        let mut missing_transmissions =
960            self.sync_with_batch_header_from_peer::<false, true>(peer_ip, &batch_header).await?;
961
962        // Check that the transmission ids match and are not fee transactions.
963        if let Err(err) = cfg_iter_mut!(&mut missing_transmissions).try_for_each(|(transmission_id, transmission)| {
964            // If the transmission is not well-formed, then return early.
965            self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
966        }) {
967            let err = err.context(format!(
968                "Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission"
969            ));
970            debug!("{}", flatten_error(err));
971            return Ok(());
972        }
973
974        // Ensure the batch is for the current round.
975        // This method must be called after fetching previous certificates (above),
976        // and prior to checking the batch header (below).
977        if let Err(e) = self.ensure_is_signing_round(batch_round) {
978            // If the primary is not signing for the peer's round, then return early.
979            debug!("{e} from '{peer_ip}'");
980            return Ok(());
981        }
982
983        // Ensure the batch header from the peer is valid.
984        let (storage, header) = (self.storage.clone(), batch_header.clone());
985
986        // Check the batch header, and return early if it already exists in storage.
987        let Some(missing_transmissions) =
988            spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?
989        else {
990            return Ok(());
991        };
992
993        // Inserts the missing transmissions into the workers.
994        self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
995
996        /* Proceeding to sign the batch. */
997
998        // Retrieve the batch ID.
999        let batch_id = batch_header.batch_id();
1000        // Sign the batch ID.
1001        let account = self.gateway.account().clone();
1002        let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::rng()))?;
1003
1004        // Ensure the proposal has not already been signed.
1005        //
1006        // Note: Due to the need to sync the batch header with the peer, it is possible
1007        // for the primary to receive the same 'BatchPropose' event again, whereby only
1008        // one instance of this handler should sign the batch. This check guarantees this.
1009        match self.signed_proposals.write().0.entry(batch_author) {
1010            std::collections::hash_map::Entry::Occupied(mut entry) => {
1011                // If the validator has already signed a batch for this round, then return early,
1012                // since, if the peer still has not received the signature, they will request it again,
1013                // and the logic at the start of this function will resend the (now cached) signature
1014                // to the peer if asked to sign this batch proposal again.
1015                if entry.get().0 == batch_round {
1016                    return Ok(());
1017                }
1018                // Otherwise, cache the round, batch ID, and signature for this validator.
1019                entry.insert((batch_round, batch_id, signature));
1020            }
1021            // If the validator has not signed a batch before, then continue.
1022            std::collections::hash_map::Entry::Vacant(entry) => {
1023                // Cache the round, batch ID, and signature for this validator.
1024                entry.insert((batch_round, batch_id, signature));
1025            }
1026        };
1027
1028        // Broadcast the signature back to the validator.
1029        let self_ = self.clone();
1030        tokio::spawn(async move {
1031            let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
1032            // Send the batch signature to the peer.
1033            if self_.gateway.send(peer_ip, event).await.is_some() {
1034                debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
1035            }
1036        });
1037
1038        Ok(())
1039    }
1040
1041    /// Attempts to add a peer's `signature` for `batch_id` to the current proposal.
1042    ///
1043    /// Consumes `state` and always returns the (possibly updated) state alongside a result so that
1044    /// the caller can restore it unconditionally, keeping `proposed_batch` consistent even on the
1045    /// error path.
1046    ///
1047    /// # Returns
1048    /// * `(Ok(Some(proposal)), Certified(id))` — quorum reached; caller should certify.
1049    /// * `(Ok(None), <restored state>)` — signature accepted or silently dropped; nothing to do.
1050    /// * `(Err(e), <restored state>)` — signature rejected; caller should propagate the error.
1051    fn add_signature_to_batch(
1052        &self,
1053        state: ProposedBatchState<N>,
1054        peer_ip: SocketAddr,
1055        batch_id: Field<N>,
1056        signature: Signature<N>,
1057    ) -> (Result<Option<Proposal<N>>>, ProposedBatchState<N>) {
1058        match state {
1059            ProposedBatchState::Certifying(mut proposal) if proposal.batch_id() == batch_id => {
1060                // This signature is for our currently active proposal.
1061                // Use an inner closure to keep `?` ergonomics while returning a tuple.
1062                let inner: Result<bool> = (|| {
1063                    let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1064                    let Some(signer) = self.gateway.resolve_to_aleo_addr(peer_ip) else {
1065                        bail!("Signature is from a disconnected validator");
1066                    };
1067                    let new_signature = proposal.add_signature(signer, signature, &committee_lookback)?;
1068                    if new_signature {
1069                        info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
1070                        Ok(proposal.is_quorum_threshold_reached(&committee_lookback))
1071                    } else {
1072                        debug!(
1073                            "Received duplicated signature from '{peer_ip}' for batch \
1074                                {batch_id} in round {round}",
1075                            round = proposal.round()
1076                        );
1077                        Ok(false)
1078                    }
1079                })();
1080                match inner {
1081                    Ok(true) => {
1082                        let certified_id = proposal.batch_id();
1083                        (Ok(Some(*proposal)), ProposedBatchState::Certified(certified_id))
1084                    }
1085                    Ok(false) => (Ok(None), ProposedBatchState::Certifying(proposal)),
1086                    Err(e) => (Err(e), ProposedBatchState::Certifying(proposal)),
1087                }
1088            }
1089            ProposedBatchState::Certifying(proposal) => {
1090                // Certifying a different proposal — check if batch_id is already in storage.
1091                if self.storage.contains_batch(batch_id) {
1092                    debug!(
1093                        "Primary is safely skipping a batch signature from {peer_ip} for \
1094                            round {} - batch is already certified",
1095                        proposal.round()
1096                    );
1097                    (Ok(None), ProposedBatchState::Certifying(proposal))
1098                } else {
1099                    let expected_id = proposal.batch_id();
1100                    let round = proposal.round();
1101                    (
1102                        Err(anyhow!("Unknown batch ID '{batch_id}', expected '{expected_id}' for round {round}")),
1103                        ProposedBatchState::Certifying(proposal),
1104                    )
1105                }
1106            }
1107            ProposedBatchState::Certified(id) if id == batch_id => {
1108                // Quorum already reached; late-arriving signature is harmless.
1109                debug!(
1110                    "Skipping batch signature from {peer_ip} for batch '{batch_id}' - \
1111                        already received sufficient signatures"
1112                );
1113                (Ok(None), ProposedBatchState::Certified(id))
1114            }
1115            ProposedBatchState::Certified(id) => {
1116                let result = if self.storage.contains_batch(batch_id) {
1117                    // This is most likely not malicious, but could indicate connectivity issues.
1118                    warn!("Received signature for an older batch {batch_id}");
1119                    Ok(None)
1120                } else {
1121                    Err(anyhow!("Unknown batch ID '{batch_id}'"))
1122                };
1123
1124                (result, ProposedBatchState::Certified(id))
1125            }
1126            ProposedBatchState::None => {
1127                let result = if self.storage.contains_batch(batch_id) {
1128                    // This is most likely not malicious, but could indicate connectivity issues.
1129                    warn!("Received signature for an older batch {batch_id}");
1130                    Ok(None)
1131                } else {
1132                    Err(anyhow!("Unknown batch ID '{batch_id}'"))
1133                };
1134
1135                (result, ProposedBatchState::None)
1136            }
1137        }
1138    }
1139
1140    /// Processes a batch signature from a peer.
1141    ///
1142    /// This method performs the following steps:
1143    /// 1. Ensure the proposed batch has not expired.
1144    /// 2. Verify the signature, ensuring it corresponds to the proposed batch.
1145    /// 3. Store the signature.
1146    /// 4. Certify the batch if enough signatures have been received.
1147    /// 5. Broadcast the batch certificate to all validators.
1148    async fn process_batch_signature_from_peer(
1149        &self,
1150        peer_ip: SocketAddr,
1151        batch_signature: BatchSignature<N>,
1152    ) -> Result<()> {
1153        // Ensure the proposed batch has not expired, and clear the proposed batch if it has expired.
1154        self.check_proposed_batch_for_expiration()?;
1155
1156        // Retrieve the signature and timestamp.
1157        let BatchSignature { batch_id, signature } = batch_signature;
1158
1159        // Retrieve the signer.
1160        let signer = signature.to_address();
1161
1162        // Ensure the batch signature is signed by the validator.
1163        if self.gateway.resolve_to_aleo_addr(peer_ip) != Some(signer) {
1164            // Proceed to disconnect the validator.
1165            self.gateway.disconnect(peer_ip);
1166            bail!("Malicious peer - batch signature is from a different validator ({signer})");
1167        }
1168        // Ensure the batch signature is not from the current primary.
1169        if self.gateway.account().address() == signer {
1170            bail!("Invalid peer - received a batch signature from myself ({signer})");
1171        }
1172
1173        let self_ = self.clone();
1174        let Some(proposal) = spawn_blocking!({
1175            // Acquire the write lock.
1176            let mut proposed_batch = self_.proposed_batch.write();
1177
1178            let (result, new_state) =
1179                self_.add_signature_to_batch(std::mem::take(&mut *proposed_batch), peer_ip, batch_id, signature);
1180            *proposed_batch = new_state;
1181            result
1182        })?
1183        else {
1184            return Ok(());
1185        };
1186
1187        /* Proceeding to certify the batch. */
1188
1189        info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
1190
1191        // Retrieve the committee lookback for the round.
1192        let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1193        // Store the certified batch and broadcast it to all validators.
1194        // If there was an error storing the certificate, reinsert the transmissions back into the ready queue.
1195        if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
1196            // Reinsert the transmissions back into the ready queue for the next proposal.
1197            self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1198            return Err(e);
1199        }
1200
1201        #[cfg(feature = "metrics")]
1202        metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
1203        Ok(())
1204    }
1205
1206    /// Processes a batch certificate from a peer.
1207    ///
1208    /// This method performs the following steps:
1209    /// 1. Stores the given batch certificate, after ensuring it is valid.
1210    /// 2. If there are enough certificates to reach quorum threshold for the current round,
1211    ///    then proceed to advance to the next round.
1212    async fn process_batch_certificate_from_peer(
1213        &self,
1214        peer_ip: SocketAddr,
1215        certificate: BatchCertificate<N>,
1216    ) -> Result<()> {
1217        // Ensure the batch certificate is from an authorized validator.
1218        if !self.gateway.is_authorized_validator_ip(peer_ip) {
1219            // Proceed to disconnect the validator.
1220            self.gateway.disconnect(peer_ip);
1221            bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
1222        }
1223        // Ensure storage does not already contain the certificate.
1224        if self.storage.contains_certificate(certificate.id()) {
1225            return Ok(());
1226        // Otherwise, ensure ephemeral storage contains the certificate.
1227        } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
1228            self.storage.insert_unprocessed_certificate(certificate.clone())?;
1229        }
1230
1231        // Retrieve the batch certificate author.
1232        let author = certificate.author();
1233        // Retrieve the batch certificate round.
1234        let certificate_round = certificate.round();
1235        // Retrieve the batch certificate committee ID.
1236        let committee_id = certificate.committee_id();
1237
1238        // Ensure the batch certificate is not from the current primary.
1239        if self.gateway.account().address() == author {
1240            bail!("Received a batch certificate for myself ({author})");
1241        }
1242
1243        // Ensure that the incoming certificate is valid.
1244        self.storage.check_incoming_certificate(&certificate)?;
1245
1246        // Store the certificate, after ensuring it is valid above.
1247        // The following call recursively fetches and stores
1248        // the previous certificates referenced from this certificate.
1249        // It is critical to make the following call this after validating the certificate above.
1250        // The reason is that a sequence of malformed certificates,
1251        // with references to previous certificates with non-decreasing rounds,
1252        // cause the recursive fetching of certificates to crash the validator due to resource exhaustion.
1253        // Note that if the following call, if not returning an error, guarantees the backward closure of the DAG
1254        // (i.e. that all the referenced previous certificates are in the DAG before storing this one),
1255        // then all the validity checks in [`Storage::check_certificate`] should be redundant.
1256        // TODO: eliminate those redundant checks
1257        self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
1258
1259        // If there are enough certificates to reach quorum threshold for the certificate round,
1260        // then proceed to advance to the next round.
1261
1262        // Retrieve the committee lookback.
1263        let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
1264
1265        // Retrieve the certificate authors.
1266        let authors = self.storage.get_certificate_authors_for_round(certificate_round);
1267        // Check if the certificates have reached the quorum threshold.
1268        let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
1269
1270        // Ensure that the batch certificate's committee ID matches the expected committee ID.
1271        let expected_committee_id = committee_lookback.id();
1272        if expected_committee_id != committee_id {
1273            // Proceed to disconnect the validator.
1274            self.gateway.disconnect(peer_ip);
1275            bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
1276        }
1277
1278        // Determine if we are currently proposing a round that is relevant.
1279        // Note: This is important, because while our peers have advanced,
1280        // they may not be proposing yet, and thus still able to sign our proposed batch.
1281        let should_advance = match &*self.latest_proposal_timestamp.read().await {
1282            // We advance if the proposal round is less than the current round that was just certified.
1283            Some((latest_round, _)) => *latest_round < certificate_round,
1284            // If there's no proposal, we consider advancing.
1285            None => true,
1286        };
1287
1288        // Retrieve the current round.
1289        let current_round = self.current_round();
1290
1291        // Determine whether to advance to the next round.
1292        if is_quorum && should_advance && certificate_round >= current_round {
1293            // If we have reached the quorum threshold and the round should advance, then proceed to the next round.
1294            self.round_increment_notify.notify_one();
1295        }
1296        Ok(())
1297    }
1298}
1299
1300impl<N: Network> Primary<N> {
1301    /// Starts the primary handlers.
1302    ///
1303    /// For each receiver in the `primary_receiver` struct, there will be a dedicated task
1304    /// that awaits new data and handles it accordingly.
1305    /// Additionally, this spawns a task that periodically issues PrimaryPings and one that
1306    /// tries to move to the next round when triggered (e.g. after a certificate is stored) or on a timeout.
1307    ///
1308    /// This function is called exactly once, in `Self::run()`.
1309    fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
1310        let PrimaryReceiver {
1311            mut rx_batch_propose,
1312            mut rx_batch_signature,
1313            mut rx_batch_certified,
1314            mut rx_primary_ping,
1315            mut rx_unconfirmed_solution,
1316            mut rx_unconfirmed_transaction,
1317        } = primary_receiver;
1318
1319        // Start the primary ping sender.
1320        let self_ = self.clone();
1321        self.spawn(async move {
1322            loop {
1323                // Sleep briefly.
1324                tokio::time::sleep(PRIMARY_PING_INTERVAL).await;
1325
1326                // Retrieve the block locators.
1327                let self__ = self_.clone();
1328                let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1329                    Ok(block_locators) => block_locators,
1330                    Err(e) => {
1331                        warn!("Failed to retrieve block locators - {e}");
1332                        continue;
1333                    }
1334                };
1335
1336                // Retrieve the latest certificate of the primary.
1337                let primary_certificate = {
1338                    // Retrieve the primary address.
1339                    let primary_address = self_.gateway.account().address();
1340
1341                    // Iterate backwards from the latest round to find the primary certificate.
1342                    let mut certificate = None;
1343                    let mut current_round = self_.current_round();
1344                    while certificate.is_none() {
1345                        // If the current round is 0, then break the while loop.
1346                        if current_round == 0 {
1347                            break;
1348                        }
1349                        // Retrieve the primary certificates.
1350                        if let Some(primary_certificate) =
1351                            self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1352                        {
1353                            certificate = Some(primary_certificate);
1354                        // If the primary certificate was not found, decrement the round.
1355                        } else {
1356                            current_round = current_round.saturating_sub(1);
1357                        }
1358                    }
1359
1360                    // Determine if the primary certificate was found.
1361                    match certificate {
1362                        Some(certificate) => certificate,
1363                        // Skip this iteration of the loop (do not send a primary ping).
1364                        None => continue,
1365                    }
1366                };
1367
1368                // Construct the primary ping.
1369                let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1370                // Broadcast the event.
1371                self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1372            }
1373        });
1374
1375        // Start the primary ping handler.
1376        let self_ = self.clone();
1377        self.spawn(async move {
1378            while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1379                // If the primary is not synced, then do not process the primary ping.
1380                if self_.sync.is_synced() {
1381                    trace!("Processing new primary ping from '{peer_ip}'");
1382                } else {
1383                    trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1384                    continue;
1385                }
1386
1387                // Spawn a task to process the primary certificate.
1388                {
1389                    let self_ = self_.clone();
1390                    tokio::spawn(async move {
1391                        // Deserialize the primary certificate in the primary ping.
1392                        let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1393                        else {
1394                            warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1395                            return;
1396                        };
1397                        // Process the primary certificate.
1398                        let id = fmt_id(primary_certificate.id());
1399                        let round = primary_certificate.round();
1400                        if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1401                            warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}");
1402                        }
1403                    });
1404                }
1405            }
1406        });
1407
1408        // Start the worker ping(s).
1409        let self_ = self.clone();
1410        self.spawn(async move {
1411            loop {
1412                tokio::time::sleep(WORKER_PING_INTERVAL).await;
1413                // If the primary is not synced, then do not broadcast the worker ping(s).
1414                if !self_.sync.is_synced() {
1415                    trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1416                    continue;
1417                }
1418                // Broadcast the worker ping(s).
1419                for worker in self_.workers() {
1420                    worker.broadcast_ping();
1421                }
1422            }
1423        });
1424
1425        // Start the batch proposal task.
1426        let proposal_task = self.proposal_task.clone();
1427        let self_ = self.clone();
1428        self.spawn(async move { proposal_task.run(self_).await });
1429
1430        // Start the proposed batch handler.
1431        let self_ = self.clone();
1432        self.spawn(async move {
1433            while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1434                // If the primary is not synced, then do not sign the batch.
1435                if !self_.sync.is_synced() {
1436                    trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1437                    continue;
1438                }
1439
1440                // Spawn a task to process the proposed batch.
1441                let self_ = self_.clone();
1442                tokio::spawn(async move {
1443                    // Process the batch proposal.
1444                    let round = batch_propose.round;
1445                    if let Err(err) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1446                        let err = err.context(format!("Cannot sign a batch at round {round} from '{peer_ip}'"));
1447                        warn!("{}", flatten_error(err));
1448                    }
1449                });
1450            }
1451        });
1452
1453        // Start the batch signature handler.
1454        let self_ = self.clone();
1455        self.spawn(async move {
1456            while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1457                // If the primary is not synced, then do not store the signature.
1458                if !self_.sync.is_synced() {
1459                    trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1460                    continue;
1461                }
1462                // Process the batch signature.
1463                // Note: Do NOT spawn a task around this function call. Processing signatures from peers
1464                // is a critical path, and we should only store the minimum required number of signatures.
1465                // In addition, spawning a task can cause concurrent processing of signatures (even with a lock),
1466                // which means the RwLock for the proposed batch must become a 'tokio::sync' to be safe.
1467                let id = fmt_id(batch_signature.batch_id);
1468                if let Err(err) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1469                    let err = err.context(format!("Cannot store a signature for batch '{id}' from '{peer_ip}'"));
1470                    warn!("{}", flatten_error(err));
1471                }
1472            }
1473        });
1474
1475        // Start the certified batch handler.
1476        let self_ = self.clone();
1477        self.spawn(async move {
1478            while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1479                // If the primary is not synced, then do not store the certificate.
1480                if !self_.sync.is_synced() {
1481                    trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1482                    continue;
1483                }
1484                // Spawn a task to process the batch certificate.
1485                let self_ = self_.clone();
1486                tokio::spawn(async move {
1487                    // Deserialize the batch certificate.
1488                    let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1489                        warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1490                        return;
1491                    };
1492                    // Process the batch certificate.
1493                    let id = fmt_id(batch_certificate.id());
1494                    let round = batch_certificate.round();
1495                    if let Err(err) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1496                        warn!(
1497                            "{}",
1498                            flatten_error(err.context(format!(
1499                                "Cannot store a certificate '{id}' for round {round} from '{peer_ip}'"
1500                            )))
1501                        );
1502                    }
1503                });
1504            }
1505        });
1506
1507        // This task tries to move to the next round when triggered (e.g. after a certificate is stored)
1508        // or after a timeout, so we are not stuck on a previous round despite having quorum.
1509        let self_ = self.clone();
1510        self.spawn(async move {
1511            loop {
1512                let round_start = Instant::now();
1513                let current_round = self_.current_round();
1514
1515                // Inner loop: wait and try to increment while we're still in the same round.
1516                while self_.current_round() == current_round {
1517                    let mut futures: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> =
1518                        vec![Box::pin(self_.round_increment_notify.notified())];
1519
1520                    if let Some(remaining_delay) = MAX_BATCH_DELAY.checked_sub(round_start.elapsed())
1521                        && !remaining_delay.is_zero()
1522                    {
1523                        futures.push(Box::pin(tokio::time::sleep(remaining_delay)));
1524                    }
1525                    // Always ensure a wakeup no later than MAX_LEADER_CERTIFICATE_DELAY so that
1526                    // try_advance_to_next_round is called after the leader-certificate timer
1527                    // expires, even when no further certificates arrive (e.g. an even round where
1528                    // the elected leader was absent and quorum was reached without their cert).
1529                    futures.push(Box::pin(tokio::time::sleep(MAX_LEADER_CERTIFICATE_DELAY)));
1530                    if !self_.sync.is_synced() {
1531                        futures.push(Box::pin(self_.sync.wait_for_synced()));
1532                    }
1533                    let _ = futures::future::select_all(futures).await;
1534
1535                    if !self_.sync.is_synced() {
1536                        trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1537                        continue;
1538                    }
1539
1540                    let next_round = current_round.saturating_add(1);
1541                    let is_quorum_threshold_reached = {
1542                        let authors = self_.storage.get_certificate_authors_for_round(current_round);
1543                        if authors.is_empty() {
1544                            continue;
1545                        }
1546                        let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(current_round)
1547                        else {
1548                            warn!("Failed to retrieve the committee lookback for round {current_round}");
1549                            continue;
1550                        };
1551                        committee_lookback.is_quorum_threshold_reached(&authors)
1552                    };
1553
1554                    if is_quorum_threshold_reached {
1555                        debug!("Quorum threshold reached for round {current_round}");
1556                        if let Err(err) = self_.try_increment_to_the_next_round(next_round).await {
1557                            warn!("{}", flatten_error(err.context("Failed to increment to the next round")));
1558                        }
1559                    }
1560                }
1561            }
1562        });
1563
1564        // Start a handler to process new unconfirmed solutions.
1565        let self_ = self.clone();
1566        self.spawn(async move {
1567            while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1568                // Compute the checksum for the solution.
1569                let Ok(checksum) = solution.to_checksum::<N>() else {
1570                    error!("Failed to compute the checksum for the unconfirmed solution");
1571                    continue;
1572                };
1573                // Compute the worker ID.
1574                let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1575                    error!("Unable to determine the worker ID for the unconfirmed solution");
1576                    continue;
1577                };
1578                let self_ = self_.clone();
1579                tokio::spawn(async move {
1580                    // Retrieve the worker.
1581                    let worker = &self_.workers()[worker_id as usize];
1582                    // Process the unconfirmed solution.
1583                    let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1584                    // Send the result to the callback.
1585                    callback.send(result).ok();
1586                });
1587            }
1588        });
1589
1590        // Start a handler to process new unconfirmed transactions.
1591        let self_ = self.clone();
1592        self.spawn(async move {
1593            while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1594                trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1595                // Compute the checksum for the transaction.
1596                let Ok(checksum) = transaction.to_checksum::<N>() else {
1597                    error!("Failed to compute the checksum for the unconfirmed transaction");
1598                    continue;
1599                };
1600                // Compute the worker ID.
1601                let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1602                    error!("Unable to determine the worker ID for the unconfirmed transaction");
1603                    continue;
1604                };
1605                let self_ = self_.clone();
1606                tokio::spawn(async move {
1607                    // Retrieve the worker.
1608                    let worker = &self_.workers().get(worker_id as usize).expect("Invalid worker ID");
1609                    // Process the unconfirmed transaction.
1610                    let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1611                    // Send the result to the callback.
1612                    callback.send(result).ok();
1613                });
1614            }
1615        });
1616    }
1617
1618    /// Checks if the proposed batch is expired, and clears the proposed batch if it has expired.
1619    fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1620        // Check if the proposed batch is timed out or stale.
1621        // A batch being certified is not considered expired.
1622        let is_expired = match &*self.proposed_batch.read() {
1623            ProposedBatchState::Certifying(proposal) => proposal.round() < self.current_round(),
1624            _ => false,
1625        };
1626        // If the batch is expired, clear the proposed batch.
1627        if is_expired {
1628            // Reset the proposed batch.
1629            let old = std::mem::replace(&mut *self.proposed_batch.write(), ProposedBatchState::None);
1630            if let ProposedBatchState::Certifying(proposal) = old {
1631                debug!("Cleared expired proposal for round {}", proposal.round());
1632                self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1633            }
1634        }
1635        Ok(())
1636    }
1637
1638    /// Increments to the next round.
1639    async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1640        // If the next round is within GC range, then iterate to the penultimate round.
1641        if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1642            let mut fast_forward_round = self.current_round();
1643            // Iterate until the penultimate round is reached.
1644            while fast_forward_round < next_round.saturating_sub(1) {
1645                // Update to the next round in storage.
1646                fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1647                // Clear the proposed batch.
1648                *self.proposed_batch.write() = ProposedBatchState::None;
1649            }
1650        }
1651
1652        // Retrieve the current round.
1653        let current_round = self.current_round();
1654        // Attempt to advance to the next round.
1655        if current_round < next_round {
1656            // If a BFT sender was provided, send the current round to the BFT.
1657            let is_ready = if let Some(cb) = self.primary_callback.get() {
1658                cb.try_advance_to_next_round(current_round)
1659            }
1660            // Otherwise, handle the Narwhal case.
1661            else {
1662                // Update to the next round in storage.
1663                self.storage.increment_to_next_round(current_round)?;
1664                // Set 'is_ready' to 'true'.
1665                true
1666            };
1667
1668            // Notify the proposal task if the new round is ready.
1669            if is_ready && self.is_synced() {
1670                debug!("Primary is ready to propose the next round");
1671                self.proposal_task.signal();
1672            } else {
1673                debug!("Primary is not ready to propose the next round");
1674            }
1675        }
1676        Ok(())
1677    }
1678
1679    /// Ensures the primary is signing for the specified batch round.
1680    /// This method is used to ensure: for a given round, as soon as the primary starts proposing,
1681    /// it will no longer sign for the previous round (as it has enough previous certificates to proceed).
1682    fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1683        // Retrieve the current round.
1684        let current_round = self.current_round();
1685        // Ensure the batch round is within GC range of the current round.
1686        if current_round + self.storage.max_gc_rounds() <= batch_round {
1687            bail!("Round {batch_round} is too far in the future")
1688        }
1689        // Ensure the batch round is at or one before the current round.
1690        // Intuition: Our primary has moved on to the next round, but has not necessarily started proposing,
1691        // so we can still sign for the previous round. If we have started proposing, the next check will fail.
1692        if current_round > batch_round + 1 {
1693            bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1694        }
1695        // Check if the primary is still signing for the batch round.
1696        if let ProposedBatchState::Certifying(proposal) = &*self.proposed_batch.read()
1697            && proposal.round() > batch_round
1698        {
1699            bail!("Our primary at round {} is no longer signing for round {batch_round}", proposal.round())
1700        }
1701        Ok(())
1702    }
1703
1704    /// Ensure the primary is not creating batch proposals too frequently.
1705    /// This checks that the certificate timestamp for the previous round is within the expected range.
1706    fn check_peer_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1707        ensure!(author != self.gateway.account().address(), "Peer cannot propose a batch that is authored by myself");
1708
1709        // Retrieve the timestamp of the previous timestamp to check against.
1710        let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1711            // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY` seconds ago.
1712            Some(certificate) => certificate.timestamp(),
1713            // If we do not see a previous certificate for the author, then proceed optimistically.
1714            None => return Ok(()),
1715        };
1716
1717        // Determine the elapsed time since the previous timestamp.
1718        let elapsed = timestamp
1719            .checked_sub(previous_timestamp)
1720            .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1721        // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY` seconds ago.
1722        match elapsed < MIN_BATCH_DELAY.as_secs() as i64 {
1723            true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1724            false => Ok(()),
1725        }
1726    }
1727
1728    /// Ensure the primary is not creating batch proposals too frequently.
1729    /// This checks that the certificate timestamp for the previous round is within the expected range.
1730    ///
1731    /// # Returns
1732    /// - `Ok(true)` if the timestamp allows a new proposal.
1733    /// - `Ok(false)` if the timestamp is valid but too soon after the previous proposal.
1734    /// - `Err(err)` if an unexpected error occured, such as the timestamp being before the previous certificate.
1735    fn check_own_proposal_timestamp(
1736        &self,
1737        previous_round: u64,
1738        previous_timestamp: i64,
1739        timestamp: i64,
1740    ) -> Result<bool> {
1741        // Determine the elapsed time since the previous timestamp.
1742        let elapsed = timestamp
1743            .checked_sub(previous_timestamp)
1744            .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1745
1746        Ok(elapsed >= MIN_BATCH_DELAY.as_secs() as i64)
1747    }
1748
1749    /// Stores the certified batch and broadcasts it to all validators, returning the certificate.
1750    async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1751        // Create the batch certificate and transmissions.
1752        let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1753
1754        // Convert the transmissions into a HashMap.
1755        // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
1756        let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1757
1758        // Store some metadata about the certified batch.
1759        let round = certificate.round();
1760        let num_transmissions = certificate.transmission_ids().len();
1761
1762        // Store the certified batch.
1763        let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1764        spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1765        debug!("Stored a batch certificate for round {}", certificate.round());
1766        // The batch is now in storage, so late-arriving signatures can find it via contains_batch.
1767        // Transition from Certified back to None.
1768        *self.proposed_batch.write() = ProposedBatchState::None;
1769
1770        // If a BFT sender was provided, send the certificate to the BFT.
1771        if let Some(cb) = self.primary_callback.get() {
1772            // Await the callback to continue.
1773            cb.add_new_certificate(certificate.clone()).await.with_context(|| {
1774                format!("Failed to insert our newly certified batch for round {round} into the DAG")
1775            })?;
1776        }
1777        // Broadcast the certified batch to all validators.
1778        self.gateway.broadcast(Event::BatchCertified(certificate.into()));
1779
1780        // Log the certified batch.
1781        info!("Our batch with {num_transmissions} transmissions for round {round} was certified!");
1782
1783        // Record the certification latency (time from batch proposal to certification).
1784        #[cfg(feature = "metrics")]
1785        if let Some(start) = self.batch_propose_start.lock().take() {
1786            metrics::histogram(metrics::bft::BATCH_CERTIFICATION_LATENCY, start.elapsed().as_secs_f64());
1787        }
1788
1789        // Wake up the round increment task to re-check quorum.
1790        self.round_increment_notify.notify_one();
1791
1792        Ok(())
1793    }
1794
1795    /// Inserts the missing transmissions from the proposal into the workers.
1796    fn insert_missing_transmissions_into_workers(
1797        &self,
1798        peer_ip: SocketAddr,
1799        transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1800    ) -> Result<()> {
1801        // Insert the transmissions into the workers.
1802        assign_to_workers(self.workers(), transmissions, |worker, transmission_id, transmission| {
1803            worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1804        })
1805    }
1806
1807    /// Re-inserts the transmissions from the proposal into the workers.
1808    fn reinsert_transmissions_into_workers(
1809        &self,
1810        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1811    ) -> Result<()> {
1812        // Re-insert the transmissions into the workers.
1813        assign_to_workers(self.workers(), transmissions.into_iter(), |worker, transmission_id, transmission| {
1814            worker.reinsert(transmission_id, transmission);
1815        })
1816    }
1817
1818    /// Recursively stores a given batch certificate, after ensuring:
1819    ///   - Ensure the round matches the committee round.
1820    ///   - Ensure the address is a member of the committee.
1821    ///   - Ensure the timestamp is within range.
1822    ///   - Ensure we have all of the transmissions.
1823    ///   - Ensure we have all of the previous certificates.
1824    ///   - Ensure the previous certificates are for the previous round (i.e. round - 1).
1825    ///   - Ensure the previous certificates have reached the quorum threshold.
1826    ///   - Ensure we have not already signed the batch ID.
1827    #[async_recursion::async_recursion]
1828    async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1829        &self,
1830        peer_ip: SocketAddr,
1831        certificate: BatchCertificate<N>,
1832    ) -> Result<()> {
1833        // Retrieve the batch header.
1834        let batch_header = certificate.batch_header();
1835        // Retrieve the batch round.
1836        let batch_round = batch_header.round();
1837
1838        // If the certificate round is outdated, do not store it.
1839        if batch_round <= self.storage.gc_round() {
1840            return Ok(());
1841        }
1842        // If the certificate already exists in storage, return early.
1843        if self.storage.contains_certificate(certificate.id()) {
1844            return Ok(());
1845        }
1846
1847        // If node is not in sync mode and the node is not synced. Then return an error.
1848        if !IS_SYNCING && !self.is_synced() {
1849            bail!(
1850                "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1851                fmt_id(certificate.id())
1852            );
1853        }
1854
1855        // If the peer is ahead, use the batch header to sync up to the peer.
1856        let missing_transmissions =
1857            self.sync_with_batch_header_from_peer::<IS_SYNCING, false>(peer_ip, batch_header).await?;
1858
1859        // Check if the certificate needs to be stored.
1860        if !self.storage.contains_certificate(certificate.id()) {
1861            // Store the batch certificate.
1862            let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1863            spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1864            debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1865            // If a BFT sender was provided, send the round and certificate to the BFT.
1866            if let Some(cb) = self.primary_callback.get() {
1867                cb.add_new_certificate(certificate).await.with_context(|| "Failed to update the DAG from sync")?;
1868            }
1869            // Wake the round-increment task to re-check quorum.
1870            self.round_increment_notify.notify_one();
1871        }
1872        Ok(())
1873    }
1874
1875    /// Recursively syncs using the given batch header.
1876    async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool, const CHECK_PREVIOUS_CERTIFICATES: bool>(
1877        &self,
1878        peer_ip: SocketAddr,
1879        batch_header: &BatchHeader<N>,
1880    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1881        // Retrieve the batch round.
1882        let batch_round = batch_header.round();
1883
1884        // If the certificate round is outdated, do not store it.
1885        if batch_round <= self.storage.gc_round() {
1886            bail!("Round {batch_round} is too far in the past")
1887        }
1888
1889        // If node is not in sync mode and the node is not synced, then return an error.
1890        if !IS_SYNCING && !self.is_synced() {
1891            bail!(
1892                "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1893                fmt_id(batch_header.batch_id())
1894            );
1895        }
1896
1897        // Determine if quorum threshold is reached on the batch round.
1898        let is_quorum_threshold_reached = {
1899            let authors = self.storage.get_certificate_authors_for_round(batch_round);
1900            let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1901            committee_lookback.is_quorum_threshold_reached(&authors)
1902        };
1903
1904        // Check if our primary should move to the next round.
1905        // Note: Checking that quorum threshold is reached is important for mitigating a race condition,
1906        // whereby Narwhal requires N-f, however the BFT only requires f+1. Without this check, the primary
1907        // will advance to the next round assuming f+1, not N-f, which can lead to a network stall.
1908        let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1909        // Check if our primary is far behind the peer.
1910        let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1911        // If our primary is far behind the peer, update our committee to the batch round.
1912        if is_behind_schedule || is_peer_far_in_future {
1913            // If the batch round is greater than the current committee round, update the committee.
1914            self.try_increment_to_the_next_round(batch_round)
1915                .await
1916                .with_context(|| "Failed to fast forward current round")?;
1917        }
1918
1919        // Ensure the primary has all of the transmissions.
1920        let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1921
1922        // Ensure the primary has all of the previous certificates.
1923        let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1924
1925        // Wait for the missing transmissions and previous certificates to be fetched.
1926        let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1927            missing_transmissions_handle,
1928            missing_previous_certificates_handle,
1929        ).with_context(|| format!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}"))?;
1930
1931        // Iterate through the missing previous certificates sequentially.
1932        // This is done sequentially to avoid requesting a large number of certificates from peers all at once.
1933        // TODO (raychu86): Optimize this by parallelizing requests, but avoiding duplicated requests since certificates are likely shared across batches.
1934        for batch_certificate in missing_previous_certificates {
1935            // Check if the missing previous certificate is valid. This is only
1936            // needed if we are processing an incoming batch header from a peer.
1937            // For incoming certificates, validity is assured by checking the
1938            // root certificate in `process_batch_certificate_from_peer`.
1939            if CHECK_PREVIOUS_CERTIFICATES {
1940                self.storage.check_incoming_certificate(&batch_certificate)?;
1941            }
1942            // Store the batch certificate (recursively fetching any missing previous certificates).
1943            self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?;
1944        }
1945        Ok(missing_transmissions)
1946    }
1947
1948    /// Fetches any missing transmissions for the specified batch header.
1949    /// If a transmission does not exist, it will be fetched from the specified peer IP.
1950    async fn fetch_missing_transmissions(
1951        &self,
1952        peer_ip: SocketAddr,
1953        batch_header: &BatchHeader<N>,
1954    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1955        // If the round is <= the GC round, return early.
1956        if batch_header.round() <= self.storage.gc_round() {
1957            return Ok(Default::default());
1958        }
1959
1960        // Ensure this batch ID is new, otherwise return early.
1961        if self.storage.contains_batch(batch_header.batch_id()) {
1962            trace!("Batch for round {} from peer has already been processed", batch_header.round());
1963            return Ok(Default::default());
1964        }
1965
1966        // Retrieve the workers.
1967        let workers = self.workers.clone();
1968
1969        // Initialize a list for the transmissions.
1970        let mut fetch_transmissions = FuturesUnordered::new();
1971
1972        // Retrieve the number of workers.
1973        let num_workers = self.num_workers();
1974        // Iterate through the transmission IDs.
1975        for transmission_id in batch_header.transmission_ids() {
1976            // If the transmission does not exist in storage, proceed to fetch the transmission.
1977            if !self.storage.contains_transmission(*transmission_id) {
1978                // Determine the worker ID.
1979                let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1980                    bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1981                };
1982                // Retrieve the worker.
1983                let Some(worker) = workers.get().expect("No workers set").get(worker_id as usize) else {
1984                    bail!("Unable to find worker {worker_id}")
1985                };
1986                // Push the callback onto the list.
1987                fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
1988            }
1989        }
1990
1991        // Initialize a set for the transmissions.
1992        let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1993        // Wait for all of the transmissions to be fetched.
1994        while let Some(result) = fetch_transmissions.next().await {
1995            // Retrieve the transmission.
1996            let (transmission_id, transmission) = result?;
1997            // Insert the transmission into the set.
1998            transmissions.insert(transmission_id, transmission);
1999        }
2000        // Return the transmissions.
2001        Ok(transmissions)
2002    }
2003
2004    /// Fetches any missing previous certificates for the specified batch header from the specified peer.
2005    async fn fetch_missing_previous_certificates(
2006        &self,
2007        peer_ip: SocketAddr,
2008        batch_header: &BatchHeader<N>,
2009    ) -> Result<HashSet<BatchCertificate<N>>> {
2010        // Retrieve the round.
2011        let round = batch_header.round();
2012        // If the previous round is 0, or is <= the GC round, return early.
2013        if round == 1 || round <= self.storage.gc_round() + 1 {
2014            return Ok(Default::default());
2015        }
2016
2017        // Fetch the missing previous certificates.
2018        let missing_previous_certificates =
2019            self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
2020        if !missing_previous_certificates.is_empty() {
2021            debug!(
2022                "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
2023                missing_previous_certificates.len(),
2024            );
2025        }
2026        // Return the missing previous certificates.
2027        Ok(missing_previous_certificates)
2028    }
2029
2030    /// Fetches any missing certificates for the specified batch header from the specified peer.
2031    async fn fetch_missing_certificates(
2032        &self,
2033        peer_ip: SocketAddr,
2034        round: u64,
2035        certificate_ids: &IndexSet<Field<N>>,
2036    ) -> Result<HashSet<BatchCertificate<N>>> {
2037        // Initialize a list for the missing certificates.
2038        let mut fetch_certificates = FuturesUnordered::new();
2039        // Initialize a set for the missing certificates.
2040        let mut missing_certificates = HashSet::default();
2041        // Iterate through the certificate IDs.
2042        for certificate_id in certificate_ids {
2043            // Check if the certificate already exists in the ledger.
2044            if self.ledger.contains_certificate(certificate_id)? {
2045                continue;
2046            }
2047            // Check if the certificate already exists in storage.
2048            if self.storage.contains_certificate(*certificate_id) {
2049                continue;
2050            }
2051            // If we have not fully processed the certificate yet, store it.
2052            if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
2053                missing_certificates.insert(certificate);
2054            } else {
2055                // If we do not have the certificate, request it.
2056                trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
2057                // TODO (howardwu): Limit the number of open requests we send to a peer.
2058                // Send an certificate request to the peer.
2059                fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
2060            }
2061        }
2062
2063        // If there are no certificates to fetch, return early with the existing unprocessed certificates.
2064        match fetch_certificates.is_empty() {
2065            true => return Ok(missing_certificates),
2066            false => trace!(
2067                "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
2068                fetch_certificates.len(),
2069            ),
2070        }
2071
2072        // Wait for all of the missing certificates to be fetched.
2073        while let Some(result) = fetch_certificates.next().await {
2074            // Insert the missing certificate into the set.
2075            missing_certificates.insert(result?);
2076        }
2077        // Return the missing certificates.
2078        Ok(missing_certificates)
2079    }
2080}
2081
2082impl<N: Network> Primary<N> {
2083    /// Spawns a task with the given future; it should only be used for long-running tasks.
2084    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
2085        self.handles.lock().push(tokio::spawn(future));
2086    }
2087
2088    /// Shuts down the primary.
2089    pub async fn shut_down(&self) {
2090        info!("Shutting down the primary...");
2091        // Remove the callback.
2092        self.primary_callback.clear();
2093        // Shut down the sync service.
2094        self.sync.shut_down().await;
2095        // Shut down the workers.
2096        self.workers().iter().for_each(|worker| worker.shut_down());
2097        // Abort the tasks.
2098        self.handles.lock().drain(..).for_each(|handle| handle.abort());
2099        // Save the current proposal cache to disk.
2100        let proposal_cache = {
2101            // Only persist a Certifying batch; a Certified batch will appear in pending_certificates.
2102            // Note: it is guaranteed that there are no concurrent accesses to `proposed_batch` as all
2103            // background tasks already terminated at this point.
2104            let proposal = match std::mem::replace(&mut *self.proposed_batch.write(), ProposedBatchState::None) {
2105                ProposedBatchState::Certifying(p) => Some(*p),
2106                _ => None,
2107            };
2108            let signed_proposals = self.signed_proposals.read().clone();
2109            let latest_round = proposal
2110                .as_ref()
2111                .map(Proposal::round)
2112                .unwrap_or(self.latest_proposal_timestamp.read().await.map(|(round, _)| round).unwrap_or(0));
2113            let pending_certificates = self.storage.get_pending_certificates();
2114            ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
2115        };
2116        if let Err(err) = proposal_cache.store(&self.node_data_dir) {
2117            error!("{}", flatten_error(err.context("Failed to store the current proposal cache")));
2118        }
2119        // Close the gateway.
2120        self.gateway.shut_down().await;
2121    }
2122}
2123
2124#[cfg(test)]
2125mod tests {
2126    use super::{proposal_task::BatchPropose as _, *};
2127
2128    use snarkos_node_bft_ledger_service::MockLedgerService;
2129    use snarkos_node_bft_storage_service::BFTMemoryService;
2130    use snarkos_node_sync::{BlockSync, locators::test_helpers::sample_block_locators};
2131    use snarkvm::{
2132        ledger::{
2133            committee::{Committee, MIN_VALIDATOR_STAKE},
2134            test_helpers::sample_execution_transaction_with_fee,
2135        },
2136        prelude::{Address, Signature},
2137    };
2138
2139    use bytes::Bytes;
2140    use indexmap::IndexSet;
2141    use rand::RngExt;
2142
2143    type CurrentNetwork = snarkvm::prelude::MainnetV0;
2144
2145    fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) {
2146        // Create a committee containing the primary's account.
2147        const COMMITTEE_SIZE: usize = 4;
2148        let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
2149        let mut members = IndexMap::new();
2150
2151        for i in 0..COMMITTEE_SIZE {
2152            let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
2153            let account = Account::new(rng).unwrap();
2154
2155            members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.random_range(0..100)));
2156            accounts.push((socket_addr, account));
2157        }
2158
2159        (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
2160    }
2161
2162    // Returns a primary and a list of accounts in the configured committee.
2163    fn primary_with_committee(
2164        account_index: usize,
2165        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2166        committee: Committee<CurrentNetwork>,
2167        height: u32,
2168    ) -> Primary<CurrentNetwork> {
2169        let ledger = Arc::new(MockLedgerService::new_at_height(committee, height));
2170        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10).unwrap();
2171
2172        // Initialize the primary.
2173        let account = accounts[account_index].1.clone();
2174        let block_sync = Arc::new(BlockSync::new(ledger.clone(), ConnectionMode::Gateway));
2175        let primary =
2176            Primary::new(account, storage, ledger, block_sync, None, &[], false, NodeDataDir::new_test(None), None)
2177                .unwrap();
2178
2179        // Construct a worker instance.
2180        let worker = Worker::new(
2181            0, // id
2182            Arc::new(primary.gateway.clone()),
2183            primary.storage.clone(),
2184            primary.ledger.clone(),
2185            primary.proposed_batch.clone(),
2186        )
2187        .unwrap();
2188        let _ = primary.workers.set(vec![worker]);
2189        for a in accounts.iter().skip(account_index) {
2190            primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
2191        }
2192
2193        primary
2194    }
2195
2196    fn primary_without_handlers(
2197        rng: &mut TestRng,
2198    ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
2199        let (accounts, committee) = sample_committee(rng);
2200        let primary = primary_with_committee(
2201            0, // index of primary's account
2202            &accounts,
2203            committee,
2204            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(),
2205        );
2206
2207        (primary, accounts)
2208    }
2209
2210    // Creates a mock solution.
2211    fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
2212        // Sample a random fake solution ID.
2213        let solution_id = rng.random::<u64>().into();
2214        // Vary the size of the solutions.
2215        let size = rng.random_range(1024..10 * 1024);
2216        // Sample random fake solution bytes.
2217        let vec: Vec<u8> = (0..size).map(|_| rng.random::<u8>()).collect();
2218        let solution = Data::Buffer(Bytes::from(vec));
2219        // Return the solution ID and solution.
2220        (solution_id, solution)
2221    }
2222
2223    // Samples a test transaction.
2224    fn sample_unconfirmed_transaction(
2225        rng: &mut TestRng,
2226    ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
2227        let transaction = sample_execution_transaction_with_fee(false, rng, 0);
2228        let id = transaction.id();
2229
2230        (id, Data::Object(transaction))
2231    }
2232
2233    // Creates a batch proposal with one solution and one transaction.
2234    fn create_test_proposal(
2235        author: &Account<CurrentNetwork>,
2236        committee: Committee<CurrentNetwork>,
2237        round: u64,
2238        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2239        timestamp: i64,
2240        num_transactions: u64,
2241        rng: &mut TestRng,
2242    ) -> Proposal<CurrentNetwork> {
2243        let mut transmission_ids = IndexSet::new();
2244        let mut transmissions = IndexMap::new();
2245
2246        // Prepare the solution and insert into the sets.
2247        let (solution_id, solution) = sample_unconfirmed_solution(rng);
2248        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2249        let solution_transmission_id = (solution_id, solution_checksum).into();
2250        transmission_ids.insert(solution_transmission_id);
2251        transmissions.insert(solution_transmission_id, Transmission::Solution(solution));
2252
2253        // Prepare the transactions and insert into the sets.
2254        for _ in 0..num_transactions {
2255            let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2256            let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2257            let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2258            transmission_ids.insert(transaction_transmission_id);
2259            transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction));
2260        }
2261
2262        // Retrieve the private key.
2263        let private_key = author.private_key();
2264        // Sign the batch header.
2265        let batch_header = BatchHeader::new(
2266            private_key,
2267            round,
2268            timestamp,
2269            committee.id(),
2270            transmission_ids,
2271            previous_certificate_ids,
2272            rng,
2273        )
2274        .unwrap();
2275        // Construct the proposal.
2276        Proposal::new(committee, batch_header, transmissions).unwrap()
2277    }
2278
2279    // Creates a signature of the primary's current proposal for each committee member (excluding
2280    // the primary).
2281    fn peer_signatures_for_proposal(
2282        primary: &Primary<CurrentNetwork>,
2283        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2284        rng: &mut TestRng,
2285    ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
2286        // Each committee member signs the batch.
2287        let mut signatures = Vec::with_capacity(accounts.len() - 1);
2288        for (socket_addr, account) in accounts {
2289            if account.address() == primary.gateway.account().address() {
2290                continue;
2291            }
2292            let batch_id = primary.proposed_batch.read().as_proposal().unwrap().batch_id();
2293            let signature = account.sign(&[batch_id], rng).unwrap();
2294            signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
2295        }
2296
2297        signatures
2298    }
2299
2300    /// Creates a signature of the batch ID for each committee member (excluding the primary).
2301    fn peer_signatures_for_batch(
2302        primary_address: Address<CurrentNetwork>,
2303        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2304        batch_id: Field<CurrentNetwork>,
2305        rng: &mut TestRng,
2306    ) -> IndexSet<Signature<CurrentNetwork>> {
2307        let mut signatures = IndexSet::new();
2308        for (_, account) in accounts {
2309            if account.address() == primary_address {
2310                continue;
2311            }
2312            let signature = account.sign(&[batch_id], rng).unwrap();
2313            signatures.insert(signature);
2314        }
2315        signatures
2316    }
2317
2318    // Creates a batch certificate.
2319    fn create_batch_certificate(
2320        primary_address: Address<CurrentNetwork>,
2321        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2322        round: u64,
2323        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2324        rng: &mut TestRng,
2325    ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
2326        let timestamp = now();
2327
2328        let author =
2329            accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
2330        let private_key = author.private_key();
2331
2332        let committee_id = Field::rand(rng);
2333        let (solution_id, solution) = sample_unconfirmed_solution(rng);
2334        let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2335        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2336        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2337
2338        let solution_transmission_id = (solution_id, solution_checksum).into();
2339        let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2340
2341        let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
2342        let transmissions = [
2343            (solution_transmission_id, Transmission::Solution(solution)),
2344            (transaction_transmission_id, Transmission::Transaction(transaction)),
2345        ]
2346        .into();
2347
2348        let batch_header = BatchHeader::new(
2349            private_key,
2350            round,
2351            timestamp,
2352            committee_id,
2353            transmission_ids,
2354            previous_certificate_ids,
2355            rng,
2356        )
2357        .unwrap();
2358        let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
2359        let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
2360        (certificate, transmissions)
2361    }
2362
2363    // Create a certificate chain up to, but not including, the specified round in the primary storage.
2364    fn store_certificate_chain(
2365        primary: &Primary<CurrentNetwork>,
2366        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2367        round: u64,
2368        rng: &mut TestRng,
2369    ) -> IndexSet<Field<CurrentNetwork>> {
2370        let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2371        let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2372        for cur_round in 1..round {
2373            for (_, account) in accounts.iter() {
2374                let (certificate, transmissions) = create_batch_certificate(
2375                    account.address(),
2376                    accounts,
2377                    cur_round,
2378                    previous_certificates.clone(),
2379                    rng,
2380                );
2381                next_certificates.insert(certificate.id());
2382                assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
2383            }
2384
2385            assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
2386            previous_certificates = next_certificates;
2387            next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2388        }
2389
2390        previous_certificates
2391    }
2392
2393    // Insert the account socket addresses into the resolver so that
2394    // they are recognized as "connected".
2395    fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
2396        // First account is primary, which doesn't need to resolve.
2397        for (addr, acct) in accounts.iter().skip(1) {
2398            primary.gateway.resolver().write().insert_peer(*addr, *addr, Some(acct.address()));
2399        }
2400    }
2401
2402    #[test_log::test(tokio::test)]
2403    async fn test_propose_batch() {
2404        let mut rng = TestRng::default();
2405        let (primary, _) = primary_without_handlers(&mut rng);
2406
2407        // Check there is no batch currently proposed.
2408        assert!(primary.proposed_batch.read().is_none());
2409
2410        // Generate a solution and a transaction.
2411        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2412        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2413
2414        // Store it on one of the workers.
2415        primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2416        primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2417
2418        // Try to propose a batch again. This time, it should succeed.
2419        assert!(primary.propose_batch().await.is_ok());
2420        assert!(primary.proposed_batch.read().is_proposed());
2421    }
2422
2423    #[test_log::test(tokio::test)]
2424    async fn test_propose_batch_with_no_transmissions() {
2425        let mut rng = TestRng::default();
2426        let (primary, _) = primary_without_handlers(&mut rng);
2427
2428        // Check there is no batch currently proposed.
2429        assert!(primary.proposed_batch.read().is_none());
2430
2431        // Try to propose a batch with no transmissions.
2432        assert!(primary.propose_batch().await.is_ok());
2433        assert!(primary.proposed_batch.read().is_proposed());
2434    }
2435
2436    #[test_log::test(tokio::test)]
2437    async fn test_propose_batch_in_round() {
2438        let round = 3;
2439        let mut rng = TestRng::default();
2440        let (primary, accounts) = primary_without_handlers(&mut rng);
2441
2442        // Fill primary storage.
2443        store_certificate_chain(&primary, &accounts, round, &mut rng);
2444
2445        // Sleep for a while to ensure the primary is ready to propose the next round.
2446        tokio::time::sleep(MIN_BATCH_DELAY).await;
2447
2448        // Generate a solution and a transaction.
2449        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2450        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2451
2452        // Store it on one of the workers.
2453        primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2454        primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2455
2456        // Propose a batch again. This time, it should succeed.
2457        assert!(primary.propose_batch().await.is_ok());
2458        assert!(primary.proposed_batch.read().is_proposed());
2459    }
2460
2461    #[test_log::test(tokio::test)]
2462    async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2463        let round = 3;
2464        let prev_round = round - 1;
2465        let mut rng = TestRng::default();
2466        let (primary, accounts) = primary_without_handlers(&mut rng);
2467        let peer_account = &accounts[1];
2468        let peer_ip = peer_account.0;
2469
2470        // Fill primary storage.
2471        store_certificate_chain(&primary, &accounts, round, &mut rng);
2472
2473        // Get transmissions from previous certificates.
2474        let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2475
2476        // Track the number of transmissions in the previous round.
2477        let mut num_transmissions_in_previous_round = 0;
2478
2479        // Generate a solution and a transaction.
2480        let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2481        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2482        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2483        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2484
2485        // Store it on one of the workers.
2486        primary.workers()[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2487        primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2488
2489        // Check that the worker has 2 transmissions.
2490        assert_eq!(primary.workers()[0].num_transmissions(), 2);
2491
2492        // Create certificates for the current round and add the transmissions to the worker before inserting the certificate to storage.
2493        for (_, account) in accounts.iter() {
2494            let (certificate, transmissions) = create_batch_certificate(
2495                account.address(),
2496                &accounts,
2497                round,
2498                previous_certificate_ids.clone(),
2499                &mut rng,
2500            );
2501
2502            // Add the transmissions to the worker.
2503            for (transmission_id, transmission) in transmissions.iter() {
2504                primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2505            }
2506
2507            // Insert the certificate to storage.
2508            num_transmissions_in_previous_round += transmissions.len();
2509            primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2510        }
2511
2512        // Sleep for a while to ensure the primary is ready to propose the next round.
2513        tokio::time::sleep(MIN_BATCH_DELAY).await;
2514
2515        // Advance to the next round.
2516        assert!(primary.storage.increment_to_next_round(round).is_ok());
2517
2518        // Check that the worker has `num_transmissions_in_previous_round + 2` transmissions.
2519        assert_eq!(primary.workers()[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2520
2521        // Propose the batch.
2522        assert!(primary.propose_batch().await.is_ok());
2523
2524        // Check that the proposal only contains the new transmissions that were not in previous certificates.
2525        let proposed_transmissions = primary.proposed_batch.read().as_proposal().unwrap().transmissions().clone();
2526        assert_eq!(proposed_transmissions.len(), 2);
2527        assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2528        assert!(
2529            proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))
2530        );
2531    }
2532
2533    #[test_log::test(tokio::test)]
2534    async fn test_propose_batch_over_spend_limit() {
2535        let mut rng = TestRng::default();
2536
2537        // Create a primary to test spend limit backwards compatibility with V4.
2538        let (accounts, committee) = sample_committee(&mut rng);
2539        let primary = primary_with_committee(
2540            0,
2541            &accounts,
2542            committee.clone(),
2543            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2544        );
2545
2546        // Check there is no batch currently proposed.
2547        assert!(primary.proposed_batch.read().is_none());
2548        // Check the workers are empty.
2549        primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty()));
2550
2551        // Generate a solution and transactions.
2552        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2553        primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2554
2555        for _i in 0..5 {
2556            let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2557            // Store it on one of the workers.
2558            primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2559        }
2560
2561        // Try to propose a batch again. This time, it should succeed.
2562        assert!(primary.propose_batch().await.is_ok());
2563        // Expect 2/5 transactions to be included in the proposal in addition to the solution.
2564        assert_eq!(primary.proposed_batch.read().as_proposal().unwrap().transmissions().len(), 3);
2565        // Check the transmissions were correctly drained from the workers.
2566        assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3);
2567    }
2568
2569    #[test_log::test(tokio::test)]
2570    async fn test_batch_propose_from_peer() {
2571        let mut rng = TestRng::default();
2572        let (primary, accounts) = primary_without_handlers(&mut rng);
2573
2574        // Create a valid proposal with an author that isn't the primary.
2575        let round = 1;
2576        let peer_account = &accounts[1];
2577        let peer_ip = peer_account.0;
2578        let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2579        let proposal = create_test_proposal(
2580            &peer_account.1,
2581            primary.ledger.current_committee().unwrap(),
2582            round,
2583            Default::default(),
2584            timestamp,
2585            1,
2586            &mut rng,
2587        );
2588
2589        // Make sure the primary is aware of the transmissions in the proposal.
2590        for (transmission_id, transmission) in proposal.transmissions() {
2591            primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2592        }
2593
2594        // The author must be known to resolver to pass propose checks.
2595        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2596
2597        // The primary will only consider itself synced if we received
2598        // block locators from a peer.
2599        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2600        primary.sync.testing_only_set_sync_height_testing_only(20);
2601
2602        // Try to process the batch proposal from the peer, should succeed.
2603        assert!(
2604            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
2605        );
2606    }
2607
2608    #[test_log::test(tokio::test)]
2609    async fn test_batch_propose_from_peer_when_not_synced() {
2610        let mut rng = TestRng::default();
2611        let (primary, accounts) = primary_without_handlers(&mut rng);
2612
2613        // Create a valid proposal with an author that isn't the primary.
2614        let round = 1;
2615        let peer_account = &accounts[1];
2616        let peer_ip = peer_account.0;
2617        let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2618        let proposal = create_test_proposal(
2619            &peer_account.1,
2620            primary.ledger.current_committee().unwrap(),
2621            round,
2622            Default::default(),
2623            timestamp,
2624            1,
2625            &mut rng,
2626        );
2627
2628        // Make sure the primary is aware of the transmissions in the proposal.
2629        for (transmission_id, transmission) in proposal.transmissions() {
2630            primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2631        }
2632
2633        // The author must be known to resolver to pass propose checks.
2634        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2635
2636        // Add a high block locator to indicate we are not synced.
2637        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2638
2639        // Try to process the batch proposal from the peer, should fail
2640        assert!(
2641            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2642        );
2643    }
2644
2645    #[test_log::test(tokio::test)]
2646    async fn test_batch_propose_from_peer_in_round() {
2647        let round = 2;
2648        let mut rng = TestRng::default();
2649        let (primary, accounts) = primary_without_handlers(&mut rng);
2650
2651        // Generate certificates.
2652        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2653
2654        // Create a valid proposal with an author that isn't the primary.
2655        let peer_account = &accounts[1];
2656        let peer_ip = peer_account.0;
2657        let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2658        let proposal = create_test_proposal(
2659            &peer_account.1,
2660            primary.ledger.current_committee().unwrap(),
2661            round,
2662            previous_certificates,
2663            timestamp,
2664            1,
2665            &mut rng,
2666        );
2667
2668        // Make sure the primary is aware of the transmissions in the proposal.
2669        for (transmission_id, transmission) in proposal.transmissions() {
2670            primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2671        }
2672
2673        // The author must be known to resolver to pass propose checks.
2674        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2675
2676        // The primary will only consider itself synced if we received
2677        // block locators from a peer.
2678        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2679        primary.sync.testing_only_set_sync_height_testing_only(20);
2680
2681        // Try to process the batch proposal from the peer, should succeed.
2682        primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2683    }
2684
2685    #[test_log::test(tokio::test)]
2686    async fn test_batch_propose_from_peer_wrong_round() {
2687        let mut rng = TestRng::default();
2688        let (primary, accounts) = primary_without_handlers(&mut rng);
2689
2690        // Create a valid proposal with an author that isn't the primary.
2691        let round = 1;
2692        let peer_account = &accounts[1];
2693        let peer_ip = peer_account.0;
2694        let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2695        let proposal = create_test_proposal(
2696            &peer_account.1,
2697            primary.ledger.current_committee().unwrap(),
2698            round,
2699            Default::default(),
2700            timestamp,
2701            1,
2702            &mut rng,
2703        );
2704
2705        // Make sure the primary is aware of the transmissions in the proposal.
2706        for (transmission_id, transmission) in proposal.transmissions() {
2707            primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2708        }
2709
2710        // The author must be known to resolver to pass propose checks.
2711        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2712        // The primary must be considered synced.
2713        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2714        primary.sync.testing_only_set_sync_height_testing_only(20);
2715
2716        // Try to process the batch proposal from the peer, should error.
2717        assert!(
2718            primary
2719                .process_batch_propose_from_peer(peer_ip, BatchPropose {
2720                    round: round + 1,
2721                    batch_header: Data::Object(proposal.batch_header().clone())
2722                })
2723                .await
2724                .is_err()
2725        );
2726    }
2727
2728    #[test_log::test(tokio::test)]
2729    async fn test_batch_propose_from_peer_in_round_wrong_round() {
2730        let round = 4;
2731        let mut rng = TestRng::default();
2732        let (primary, accounts) = primary_without_handlers(&mut rng);
2733
2734        // Generate certificates.
2735        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2736
2737        // Create a valid proposal with an author that isn't the primary.
2738        let peer_account = &accounts[1];
2739        let peer_ip = peer_account.0;
2740        let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2741        let proposal = create_test_proposal(
2742            &peer_account.1,
2743            primary.ledger.current_committee().unwrap(),
2744            round,
2745            previous_certificates,
2746            timestamp,
2747            1,
2748            &mut rng,
2749        );
2750
2751        // Make sure the primary is aware of the transmissions in the proposal.
2752        for (transmission_id, transmission) in proposal.transmissions() {
2753            primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2754        }
2755
2756        // The author must be known to resolver to pass propose checks.
2757        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2758        // The primary must be considered synced.
2759        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2760        primary.sync.testing_only_set_sync_height_testing_only(0);
2761
2762        // Try to process the batch proposal from the peer, should error.
2763        assert!(
2764            primary
2765                .process_batch_propose_from_peer(peer_ip, BatchPropose {
2766                    round: round + 1,
2767                    batch_header: Data::Object(proposal.batch_header().clone())
2768                })
2769                .await
2770                .is_err()
2771        );
2772    }
2773
2774    /// Tests that the minimum batch delay is enforced as expected, i.e., that proposals with timestamps that are too close to the previous proposal are rejected.
2775    #[test_log::test(tokio::test)]
2776    async fn test_batch_propose_from_peer_with_past_timestamp() {
2777        let round = 2;
2778        let mut rng = TestRng::default();
2779        let (primary, accounts) = primary_without_handlers(&mut rng);
2780
2781        // Generate certificates.
2782        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2783
2784        // Create a valid proposal with an author that isn't the primary.
2785        let peer_account = &accounts[1];
2786        let peer_ip = peer_account.0;
2787
2788        // Use a timestamp that is too early.
2789        // Set it to something that is less than the minimum batch delay
2790        // Note, that the minimum delay is currently 1, so this will be equal to the last timestamp
2791        let last_timestamp = primary
2792            .storage
2793            .get_certificate_for_round_with_author(round - 1, peer_account.1.address())
2794            .expect("No previous proposal exists")
2795            .timestamp();
2796        let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY.as_secs() as i64) - 1;
2797
2798        let proposal = create_test_proposal(
2799            &peer_account.1,
2800            primary.ledger.current_committee().unwrap(),
2801            round,
2802            previous_certificates,
2803            invalid_timestamp,
2804            1,
2805            &mut rng,
2806        );
2807
2808        // Make sure the primary is aware of the transmissions in the proposal.
2809        for (transmission_id, transmission) in proposal.transmissions() {
2810            primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2811        }
2812
2813        // The author must be known to resolver to pass propose checks.
2814        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2815        // The primary must be considered synced.
2816        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2817        primary.sync.testing_only_set_sync_height_testing_only(0);
2818
2819        // Try to process the batch proposal from the peer, should error.
2820        assert!(
2821            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2822        );
2823    }
2824
2825    #[test_log::test(tokio::test)]
2826    async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2827        let round = 3;
2828        let mut rng = TestRng::default();
2829        let (primary, _) = primary_without_handlers(&mut rng);
2830
2831        // Check there is no batch currently proposed.
2832        assert!(primary.proposed_batch.read().is_none());
2833
2834        // Generate a solution and a transaction.
2835        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2836        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2837
2838        // Store it on one of the workers.
2839        primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2840        primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2841
2842        // Set the proposal lock to a round ahead of the storage.
2843        let (old_proposal_round, old_proposal_timestamp) = primary
2844            .latest_proposal_timestamp
2845            .read()
2846            .await
2847            .map(|(round, timestamp)| (round, timestamp))
2848            .unwrap_or((0, 0));
2849        *primary.latest_proposal_timestamp.write().await =
2850            Some((round + 1, old_proposal_timestamp + MIN_BATCH_DELAY.as_secs() as i64));
2851
2852        // Propose a batch and enforce that it fails.
2853        assert!(primary.propose_batch().await.is_ok());
2854        assert!(primary.proposed_batch.read().is_none());
2855
2856        // Set the proposal lock back to the old round.
2857        *primary.latest_proposal_timestamp.write().await = Some((old_proposal_round, old_proposal_timestamp));
2858
2859        // Try to propose a batch again. This time, it should succeed.
2860        assert!(primary.propose_batch().await.is_ok());
2861        assert!(primary.proposed_batch.read().is_proposed());
2862    }
2863
2864    #[test_log::test(tokio::test)]
2865    async fn test_propose_batch_with_storage_round_behind_proposal() {
2866        let round = 5;
2867        let mut rng = TestRng::default();
2868        let (primary, accounts) = primary_without_handlers(&mut rng);
2869
2870        // Generate previous certificates.
2871        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2872
2873        // Create a valid proposal.
2874        let timestamp = now();
2875        let proposal = create_test_proposal(
2876            primary.gateway.account(),
2877            primary.ledger.current_committee().unwrap(),
2878            round + 1,
2879            previous_certificates,
2880            timestamp,
2881            1,
2882            &mut rng,
2883        );
2884
2885        // Store the proposal on the primary.
2886        *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
2887
2888        // Try to propose a batch will terminate early because the storage is behind the proposal.
2889        assert!(primary.propose_batch().await.is_ok());
2890        assert!(primary.proposed_batch.read().is_proposed());
2891        assert!(primary.proposed_batch.read().as_proposal().unwrap().round() > primary.current_round());
2892    }
2893
2894    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2895    async fn test_batch_signature_from_peer() {
2896        let mut rng = TestRng::default();
2897        let (primary, accounts) = primary_without_handlers(&mut rng);
2898        map_account_addresses(&primary, &accounts);
2899
2900        // Create a valid proposal.
2901        let round = 1;
2902        let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2903        let proposal = create_test_proposal(
2904            primary.gateway.account(),
2905            primary.ledger.current_committee().unwrap(),
2906            round,
2907            Default::default(),
2908            timestamp,
2909            1,
2910            &mut rng,
2911        );
2912
2913        // Store the proposal on the primary.
2914        *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
2915
2916        // Each committee member signs the batch.
2917        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2918
2919        // Have the primary process the signatures.
2920        for (socket_addr, signature) in signatures {
2921            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2922        }
2923
2924        // Check the certificate was created and stored by the primary.
2925        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2926        // Manually attempt round advancement (because the handler is not running).
2927        primary.try_increment_to_the_next_round(round + 1).await.unwrap();
2928        // Check the round was incremented.
2929        assert_eq!(primary.current_round(), round + 1);
2930    }
2931
2932    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2933    async fn test_batch_signature_from_peer_in_round() {
2934        let round = 5;
2935        let mut rng = TestRng::default();
2936        let (primary, accounts) = primary_without_handlers(&mut rng);
2937        map_account_addresses(&primary, &accounts);
2938
2939        // Generate certificates.
2940        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2941
2942        // Create a valid proposal.
2943        let timestamp = now();
2944        let proposal = create_test_proposal(
2945            primary.gateway.account(),
2946            primary.ledger.current_committee().unwrap(),
2947            round,
2948            previous_certificates,
2949            timestamp,
2950            1,
2951            &mut rng,
2952        );
2953
2954        // Store the proposal on the primary.
2955        *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
2956
2957        // Each committee member signs the batch.
2958        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2959
2960        // Have the primary process the signatures.
2961        for (socket_addr, signature) in signatures {
2962            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2963        }
2964
2965        // Check the certificate was created and stored by the primary.
2966        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2967        // Manually attempt round advancement (because the handler is not running).
2968        primary.try_increment_to_the_next_round(round + 1).await.unwrap();
2969        // Check the round was incremented.
2970        assert_eq!(primary.current_round(), round + 1);
2971    }
2972
2973    #[test_log::test(tokio::test)]
2974    async fn test_batch_signature_from_peer_no_quorum() {
2975        let mut rng = TestRng::default();
2976        let (primary, accounts) = primary_without_handlers(&mut rng);
2977        map_account_addresses(&primary, &accounts);
2978
2979        // Create a valid proposal.
2980        let round = 1;
2981        let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2982        let proposal = create_test_proposal(
2983            primary.gateway.account(),
2984            primary.ledger.current_committee().unwrap(),
2985            round,
2986            Default::default(),
2987            timestamp,
2988            1,
2989            &mut rng,
2990        );
2991
2992        // Store the proposal on the primary.
2993        *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
2994
2995        // Each committee member signs the batch.
2996        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2997
2998        // Have the primary process only one signature, mimicking a lack of quorum.
2999        let (socket_addr, signature) = signatures.first().unwrap();
3000        primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
3001
3002        // Check the certificate was not created and stored by the primary.
3003        assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
3004        // Check the round was incremented.
3005        assert_eq!(primary.current_round(), round);
3006    }
3007
3008    #[test_log::test(tokio::test)]
3009    async fn test_batch_signature_from_peer_in_round_no_quorum() {
3010        let round = 7;
3011        let mut rng = TestRng::default();
3012        let (primary, accounts) = primary_without_handlers(&mut rng);
3013        map_account_addresses(&primary, &accounts);
3014
3015        // Generate certificates.
3016        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
3017
3018        // Create a valid proposal.
3019        let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
3020        let proposal = create_test_proposal(
3021            primary.gateway.account(),
3022            primary.ledger.current_committee().unwrap(),
3023            round,
3024            previous_certificates,
3025            timestamp,
3026            1,
3027            &mut rng,
3028        );
3029
3030        // Store the proposal on the primary.
3031        *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
3032
3033        // Each committee member signs the batch.
3034        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
3035
3036        // Have the primary process only one signature, mimicking a lack of quorum.
3037        let (socket_addr, signature) = signatures.first().unwrap();
3038        primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
3039
3040        // Check the certificate was not created and stored by the primary.
3041        assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
3042        // Check the round was incremented.
3043        assert_eq!(primary.current_round(), round);
3044    }
3045
3046    // Tests that a late-arriving signature for a batch that is currently being certified
3047    // (ProposedBatchState::Certified) is silently dropped without error.
3048    // This exercises the race condition where proposed_batch.take() has been called but
3049    // insert_certificate has not yet completed.
3050    #[test_log::test(tokio::test)]
3051    async fn test_batch_signature_from_peer_batch_being_certified() {
3052        let mut rng = TestRng::default();
3053        let (primary, accounts) = primary_without_handlers(&mut rng);
3054        map_account_addresses(&primary, &accounts);
3055
3056        // Create a valid proposal.
3057        let round = 1;
3058        let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
3059        let proposal = create_test_proposal(
3060            primary.gateway.account(),
3061            primary.ledger.current_committee().unwrap(),
3062            round,
3063            Default::default(),
3064            timestamp,
3065            1,
3066            &mut rng,
3067        );
3068        let batch_id = proposal.batch_id();
3069
3070        // Simulate the race: the batch has been taken for certification but not yet stored.
3071        *primary.proposed_batch.write() = ProposedBatchState::Certified(batch_id);
3072
3073        // Send a late signature for the batch being certified.
3074        let (socket_addr, account) =
3075            accounts.iter().find(|(_, a)| a.address() != primary.gateway.account().address()).unwrap();
3076        let signature = account.sign(&[batch_id], &mut rng).unwrap();
3077        let batch_signature = BatchSignature::new(batch_id, signature);
3078
3079        // The signature should be accepted without error (silently dropped).
3080        assert!(primary.process_batch_signature_from_peer(*socket_addr, batch_signature).await.is_ok());
3081        // The batch state is unchanged (still BeingCertified — no new proposal was set).
3082        assert!(matches!(&*primary.proposed_batch.read(), ProposedBatchState::Certified(id) if *id == batch_id));
3083    }
3084
3085    // Tests that a signature for a completely unknown batch ID is rejected even when another
3086    // batch is being certified. The BeingCertified state only suppresses errors for its own ID.
3087    #[test_log::test(tokio::test)]
3088    async fn test_batch_signature_from_peer_unknown_id_while_certifying() {
3089        let mut rng = TestRng::default();
3090        let (primary, accounts) = primary_without_handlers(&mut rng);
3091        map_account_addresses(&primary, &accounts);
3092
3093        // Create two proposals so we have two distinct batch IDs.
3094        let round = 1;
3095        let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
3096        let proposal_a = create_test_proposal(
3097            primary.gateway.account(),
3098            primary.ledger.current_committee().unwrap(),
3099            round,
3100            Default::default(),
3101            timestamp,
3102            1,
3103            &mut rng,
3104        );
3105        let proposal_b = create_test_proposal(
3106            primary.gateway.account(),
3107            primary.ledger.current_committee().unwrap(),
3108            round,
3109            Default::default(),
3110            timestamp,
3111            1,
3112            &mut rng,
3113        );
3114        let batch_id_a = proposal_a.batch_id();
3115        let batch_id_b = proposal_b.batch_id();
3116        assert_ne!(batch_id_a, batch_id_b);
3117
3118        // Simulate certifying batch A.
3119        *primary.proposed_batch.write() = ProposedBatchState::Certified(batch_id_a);
3120
3121        // Send a signature for batch B (a genuinely unknown ID).
3122        let (socket_addr, account) =
3123            accounts.iter().find(|(_, a)| a.address() != primary.gateway.account().address()).unwrap();
3124        let signature = account.sign(&[batch_id_b], &mut rng).unwrap();
3125        let batch_signature = BatchSignature::new(batch_id_b, signature);
3126
3127        // The signature is for a genuinely unknown ID — should be rejected with an error.
3128        assert!(primary.process_batch_signature_from_peer(*socket_addr, batch_signature).await.is_err());
3129    }
3130
3131    // Tests the "already certified" path: a signature arrives after the batch is fully in
3132    // storage and the primary has moved on to a new proposal.
3133    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3134    async fn test_batch_signature_from_peer_already_certified() {
3135        let mut rng = TestRng::default();
3136        let (primary, accounts) = primary_without_handlers(&mut rng);
3137        map_account_addresses(&primary, &accounts);
3138
3139        // Create and certify a batch so it lands in storage.
3140        let round = 1;
3141        let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
3142        let old_proposal = create_test_proposal(
3143            primary.gateway.account(),
3144            primary.ledger.current_committee().unwrap(),
3145            round,
3146            Default::default(),
3147            timestamp,
3148            1,
3149            &mut rng,
3150        );
3151        let old_batch_id = old_proposal.batch_id();
3152        *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(old_proposal));
3153        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
3154        for (socket_addr, signature) in signatures {
3155            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
3156        }
3157        // The batch is now in storage.
3158        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
3159
3160        // Simulate a new proposal being active.
3161        let new_proposal = create_test_proposal(
3162            primary.gateway.account(),
3163            primary.ledger.current_committee().unwrap(),
3164            round,
3165            Default::default(),
3166            timestamp,
3167            1,
3168            &mut rng,
3169        );
3170        assert_ne!(new_proposal.batch_id(), old_batch_id);
3171        *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(new_proposal));
3172
3173        // Send a late signature for the already-certified old batch.
3174        let (socket_addr, account) =
3175            accounts.iter().find(|(_, a)| a.address() != primary.gateway.account().address()).unwrap();
3176        let signature = account.sign(&[old_batch_id], &mut rng).unwrap();
3177        let batch_signature = BatchSignature::new(old_batch_id, signature);
3178
3179        // Should be silently accepted (already certified path).
3180        assert!(primary.process_batch_signature_from_peer(*socket_addr, batch_signature).await.is_ok());
3181    }
3182
3183    #[test_log::test(tokio::test)]
3184    async fn test_insert_certificate_with_aborted_transmissions() {
3185        let round = 3;
3186        let prev_round = round - 1;
3187        let mut rng = TestRng::default();
3188        let (primary, accounts) = primary_without_handlers(&mut rng);
3189        let peer_account = &accounts[1];
3190        let peer_ip = peer_account.0;
3191
3192        // Fill primary storage.
3193        store_certificate_chain(&primary, &accounts, round, &mut rng);
3194
3195        // Get transmissions from previous certificates.
3196        let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
3197
3198        // Generate a solution and a transaction.
3199        let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
3200        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
3201
3202        // Store it on one of the workers.
3203        primary.workers()[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
3204        primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
3205
3206        // Check that the worker has 2 transmissions.
3207        assert_eq!(primary.workers()[0].num_transmissions(), 2);
3208
3209        // Create certificates for the current round.
3210        let account = accounts[0].1.clone();
3211        let (certificate, transmissions) =
3212            create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
3213        let certificate_id = certificate.id();
3214
3215        // Randomly abort some of the transmissions.
3216        let mut aborted_transmissions = HashSet::new();
3217        let mut transmissions_without_aborted = HashMap::new();
3218        for (transmission_id, transmission) in transmissions.clone() {
3219            match rng.random::<bool>() || aborted_transmissions.is_empty() {
3220                true => {
3221                    // Insert the aborted transmission.
3222                    aborted_transmissions.insert(transmission_id);
3223                }
3224                false => {
3225                    // Insert the transmission without the aborted transmission.
3226                    transmissions_without_aborted.insert(transmission_id, transmission);
3227                }
3228            };
3229        }
3230
3231        // Add the non-aborted transmissions to the worker.
3232        for (transmission_id, transmission) in transmissions_without_aborted.iter() {
3233            primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
3234        }
3235
3236        // Check that inserting the transmission with missing transmissions fails.
3237        assert!(
3238            primary
3239                .storage
3240                .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
3241                .is_err()
3242        );
3243        assert!(
3244            primary
3245                .storage
3246                .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
3247                .is_err()
3248        );
3249
3250        // Insert the certificate to storage.
3251        primary
3252            .storage
3253            .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
3254            .unwrap();
3255
3256        // Ensure the certificate exists in storage.
3257        assert!(primary.storage.contains_certificate(certificate_id));
3258        // Ensure that the aborted transmission IDs exist in storage.
3259        for aborted_transmission_id in aborted_transmissions {
3260            assert!(primary.storage.contains_transmission(aborted_transmission_id));
3261            assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
3262        }
3263    }
3264
3265    // -----------------------------------------------------------------------
3266    // add_signature_to_batch
3267    // -----------------------------------------------------------------------
3268
3269    /// State is `None` and the batch is not in storage — returns an error, state stays `None`.
3270    #[test]
3271    fn test_add_signature_to_batch_none_state() {
3272        let mut rng = TestRng::default();
3273        let (primary, accounts) = primary_without_handlers(&mut rng);
3274
3275        let peer_ip = accounts[1].0;
3276        let batch_id = Field::rand(&mut rng);
3277        let signature = accounts[1].1.sign(&[batch_id], &mut rng).unwrap();
3278
3279        let (result, new_state) =
3280            primary.add_signature_to_batch(ProposedBatchState::None, peer_ip, batch_id, signature);
3281
3282        assert!(result.is_err());
3283        assert_eq!(new_state, ProposedBatchState::None);
3284    }
3285
3286    /// State is `Certified` with a matching batch ID — silently dropped, state restored.
3287    #[test]
3288    fn test_add_signature_to_batch_certified_matching_id() {
3289        let mut rng = TestRng::default();
3290        let (primary, accounts) = primary_without_handlers(&mut rng);
3291
3292        let peer_ip = accounts[1].0;
3293        let batch_id = Field::rand(&mut rng);
3294        let signature = accounts[1].1.sign(&[batch_id], &mut rng).unwrap();
3295
3296        let (result, new_state) =
3297            primary.add_signature_to_batch(ProposedBatchState::Certified(batch_id), peer_ip, batch_id, signature);
3298
3299        assert!(result.unwrap().is_none());
3300        assert_eq!(new_state, ProposedBatchState::Certified(batch_id));
3301    }
3302
3303    /// State is `Certified` with a *different* batch ID — error returned, state becomes `None`.
3304    #[test]
3305    fn test_add_signature_to_batch_certified_different_id() {
3306        let mut rng = TestRng::default();
3307        let (primary, accounts) = primary_without_handlers(&mut rng);
3308
3309        let peer_ip = accounts[1].0;
3310        let certified_id = Field::rand(&mut rng);
3311        let other_id = Field::rand(&mut rng);
3312        let signature = accounts[1].1.sign(&[other_id], &mut rng).unwrap();
3313
3314        let (result, new_state) =
3315            primary.add_signature_to_batch(ProposedBatchState::Certified(certified_id), peer_ip, other_id, signature);
3316
3317        assert!(result.is_err());
3318        assert_eq!(new_state, ProposedBatchState::Certified(certified_id));
3319    }
3320
3321    /// State is `Certifying` for a *different* batch ID that **is already in storage** — silently
3322    /// dropped, state restored.
3323    #[tokio::test(flavor = "multi_thread")]
3324    async fn test_add_signature_to_batch_certifying_different_id_in_storage() {
3325        let round = 1;
3326        let mut rng = TestRng::default();
3327        let (primary, accounts) = primary_without_handlers(&mut rng);
3328        map_account_addresses(&primary, &accounts);
3329
3330        // Create a proposal owned by the primary.
3331        let proposal = create_test_proposal(
3332            primary.gateway.account(),
3333            primary.ledger.current_committee().unwrap(),
3334            round,
3335            Default::default(),
3336            now(),
3337            0,
3338            &mut rng,
3339        );
3340        let proposal_batch_id = proposal.batch_id();
3341
3342        // Create and store a *different* certificate so `contains_batch` returns true for it.
3343        let (certificate, transmissions) =
3344            create_batch_certificate(accounts[1].1.address(), &accounts, round, Default::default(), &mut rng);
3345        let stored_batch_id = certificate.batch_id();
3346        primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
3347
3348        let peer_ip = accounts[1].0;
3349        let signature = accounts[1].1.sign(&[stored_batch_id], &mut rng).unwrap();
3350
3351        let (result, new_state) = primary.add_signature_to_batch(
3352            ProposedBatchState::Certifying(Box::new(proposal)),
3353            peer_ip,
3354            stored_batch_id,
3355            signature,
3356        );
3357
3358        assert!(result.unwrap().is_none());
3359        // State is restored with the original proposal.
3360        assert_eq!(new_state.as_proposal().unwrap().batch_id(), proposal_batch_id);
3361    }
3362
3363    /// State is `Certifying` for a *different* batch ID that is **not in storage** — error
3364    /// returned, state restored.
3365    #[test]
3366    fn test_add_signature_to_batch_certifying_different_id_unknown() {
3367        let mut rng = TestRng::default();
3368        let (primary, accounts) = primary_without_handlers(&mut rng);
3369
3370        let proposal = create_test_proposal(
3371            primary.gateway.account(),
3372            primary.ledger.current_committee().unwrap(),
3373            1,
3374            Default::default(),
3375            now(),
3376            0,
3377            &mut rng,
3378        );
3379        let proposal_batch_id = proposal.batch_id();
3380
3381        let peer_ip = accounts[1].0;
3382        let unknown_id = Field::rand(&mut rng);
3383        let signature = accounts[1].1.sign(&[unknown_id], &mut rng).unwrap();
3384
3385        let (result, new_state) = primary.add_signature_to_batch(
3386            ProposedBatchState::Certifying(Box::new(proposal)),
3387            peer_ip,
3388            unknown_id,
3389            signature,
3390        );
3391
3392        assert!(result.is_err());
3393        assert_eq!(new_state.as_proposal().unwrap().batch_id(), proposal_batch_id);
3394    }
3395
3396    /// Matching batch ID, valid signature, quorum **not yet** reached — state stays `Certifying`.
3397    #[test]
3398    fn test_add_signature_to_batch_certifying_matching_no_quorum() {
3399        let mut rng = TestRng::default();
3400        let (primary, accounts) = primary_without_handlers(&mut rng);
3401        map_account_addresses(&primary, &accounts);
3402
3403        let proposal = create_test_proposal(
3404            primary.gateway.account(),
3405            primary.ledger.current_committee().unwrap(),
3406            1,
3407            Default::default(),
3408            now(),
3409            0,
3410            &mut rng,
3411        );
3412        let batch_id = proposal.batch_id();
3413
3414        // Only one peer signs — not enough for quorum.
3415        let peer_ip = accounts[1].0;
3416        let signature = accounts[1].1.sign(&[batch_id], &mut rng).unwrap();
3417
3418        let (result, new_state) = primary.add_signature_to_batch(
3419            ProposedBatchState::Certifying(Box::new(proposal)),
3420            peer_ip,
3421            batch_id,
3422            signature,
3423        );
3424
3425        assert!(result.unwrap().is_none());
3426        assert_eq!(new_state.as_proposal().unwrap().batch_id(), batch_id);
3427    }
3428
3429    /// Matching batch ID, all peers sign — quorum reached, proposal extracted and state becomes
3430    /// `Certified`.
3431    #[test]
3432    fn test_add_signature_to_batch_certifying_matching_quorum_reached() {
3433        let mut rng = TestRng::default();
3434        let (primary, accounts) = primary_without_handlers(&mut rng);
3435        map_account_addresses(&primary, &accounts);
3436
3437        let proposal = create_test_proposal(
3438            primary.gateway.account(),
3439            primary.ledger.current_committee().unwrap(),
3440            1,
3441            Default::default(),
3442            now(),
3443            0,
3444            &mut rng,
3445        );
3446        let batch_id = proposal.batch_id();
3447
3448        // Add all peer signatures one by one until quorum is reached.
3449        let peers: Vec<_> =
3450            accounts.iter().filter(|(_, a)| a.address() != primary.gateway.account().address()).collect();
3451        let mut state = ProposedBatchState::Certifying(Box::new(proposal));
3452        let mut final_result = None;
3453
3454        for (peer_ip, peer_account) in &peers {
3455            let signature = peer_account.sign(&[batch_id], &mut rng).unwrap();
3456            let (result, new_state) = primary.add_signature_to_batch(state, *peer_ip, batch_id, signature);
3457            state = new_state;
3458            if result.as_ref().unwrap().is_some() {
3459                final_result = Some(result);
3460                break;
3461            }
3462        }
3463
3464        // Quorum must have been reached with the committee's peers.
3465        let proposal = final_result.expect("quorum should be reached").unwrap().unwrap();
3466        assert_eq!(proposal.batch_id(), batch_id);
3467        assert_eq!(state, ProposedBatchState::Certified(batch_id));
3468    }
3469}