Skip to main content

snarkos_node_bft/
primary.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16mod proposal_task;
17pub use proposal_task::ProposalTask;
18
19use crate::{
20    Gateway,
21    MAX_BATCH_DELAY,
22    MAX_LEADER_CERTIFICATE_DELAY,
23    MAX_WORKERS,
24    MIN_BATCH_DELAY,
25    PRIMARY_PING_INTERVAL,
26    Sync,
27    Transport,
28    WORKER_PING_INTERVAL,
29    Worker,
30    events::{BatchPropose, BatchSignature, Event},
31    helpers::{
32        PrimaryReceiver,
33        PrimarySender,
34        Proposal,
35        ProposalCache,
36        SignedProposals,
37        Storage,
38        assign_to_worker,
39        assign_to_workers,
40        fmt_id,
41        init_sync_channels,
42        init_worker_channels,
43        now,
44    },
45    spawn_blocking,
46    sync::SyncCallback,
47};
48
49use snarkos_account::Account;
50use snarkos_node_bft_events::PrimaryPing;
51use snarkos_node_bft_ledger_service::LedgerService;
52#[cfg(test)]
53use snarkos_node_network::ConnectionMode;
54use snarkos_node_network::PeerPoolHandling;
55use snarkos_node_sync::{BlockSync, DUMMY_SELF_IP, Ping};
56use snarkos_utilities::{CallbackHandle, NodeDataDir};
57
58use snarkvm::{
59    console::{
60        prelude::*,
61        types::{Address, Field},
62    },
63    ledger::{
64        block::Transaction,
65        narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
66        puzzle::{Solution, SolutionID},
67    },
68    prelude::{Signature, committee::Committee},
69    utilities::flatten_error,
70};
71
72use anyhow::Context;
73use colored::Colorize;
74use futures::{
75    future::join_all,
76    stream::{FuturesUnordered, StreamExt},
77};
78use indexmap::{IndexMap, IndexSet};
79#[cfg(feature = "locktick")]
80use locktick::{
81    parking_lot::{Mutex, RwLock},
82    tokio::RwLock as TRwLock,
83};
84#[cfg(not(feature = "locktick"))]
85use parking_lot::{Mutex, RwLock};
86#[cfg(not(feature = "serial"))]
87use rayon::prelude::*;
88use std::{
89    collections::{HashMap, HashSet},
90    future::Future,
91    net::SocketAddr,
92    pin::Pin,
93    sync::{Arc, OnceLock},
94    time::Instant,
95};
96#[cfg(not(feature = "locktick"))]
97use tokio::sync::RwLock as TRwLock;
98use tokio::{sync::Notify, task::JoinHandle};
99
100/// The state of the primary's batch proposal.
101#[derive(Debug, PartialEq, Eq)]
102pub enum ProposedBatchState<N: Network> {
103    /// No batch is currently being proposed.
104    None,
105    /// A batch is being proposed and awaiting signatures.
106    Certifying(Box<Proposal<N>>),
107    /// A batch has reached quorum and is being inserted into storage.
108    /// Carries the batch ID so late-arriving signatures can be recognized and silently dropped.
109    Certified(Field<N>),
110}
111
112impl<N: Network> Default for ProposedBatchState<N> {
113    fn default() -> Self {
114        Self::None
115    }
116}
117
118impl<N: Network> ProposedBatchState<N> {
119    /// Returns `true` if the primary has no active batch proposal.
120    pub fn is_none(&self) -> bool {
121        matches!(self, Self::None)
122    }
123
124    /// Returns `true` if a batch is currently being proposed (awaiting signatures).
125    pub fn is_proposed(&self) -> bool {
126        matches!(self, Self::Certifying(_))
127    }
128
129    /// Returns a reference to the in-progress proposal, or `None` if not in the `Certifying` state.
130    pub fn as_proposal(&self) -> Option<&Proposal<N>> {
131        match self {
132            Self::Certifying(p) => Some(p.as_ref()),
133            _ => None,
134        }
135    }
136}
137
138/// A helper type to keep track of the state of the primary's batch proposal.
139pub type ProposedBatch<N> = RwLock<ProposedBatchState<N>>;
140
141/// This callback trait allows listening to changes in the Primary, such as round advancement.
142/// This is implemented by [`BFT`].
143#[async_trait::async_trait]
144pub trait PrimaryCallback<N: Network>: Send + std::marker::Sync {
145    /// Asks the callback to if we can move to the next round.
146    ///
147    /// # Arguments
148    /// * `current_round` - the round the Primary is in (to avoid race conditions)
149    ///
150    /// # Returns
151    /// `true` if we moved to the next round.
152    fn try_advance_to_next_round(&self, current_round: u64) -> bool;
153
154    /// Add a certificated that was created by the primary or received from a peer.
155    async fn add_new_certificate(&self, certificate: BatchCertificate<N>) -> Result<()>;
156}
157
158/// The primary logic of a node.
159/// AleoBFT adopts a primary-worker architecture as described in the Narwhal and Tusk paper (Section 4.2).
160#[derive(Clone)]
161pub struct Primary<N: Network> {
162    /// The sync module enables fetching data from other validators.
163    sync: Sync<N>,
164    /// The gateway allows talking to other nodes in the validator set.
165    gateway: Gateway<N>,
166    /// The storage.
167    storage: Storage<N>,
168    /// The ledger service.
169    ledger: Arc<dyn LedgerService<N>>,
170    /// The workers.
171    workers: Arc<OnceLock<Vec<Worker<N>>>>,
172
173    /// The primary callback (used by [`BFT`]).
174    primary_callback: Arc<CallbackHandle<Arc<dyn PrimaryCallback<N>>>>,
175
176    /// The batch proposal, if the primary is currently proposing a batch.
177    proposed_batch: Arc<ProposedBatch<N>>,
178
179    /// The instant at which the current batch was proposed (used to measure certification latency).
180    /// (used for higher precision in the metrics compared to the batch timestamp)
181    #[cfg(feature = "metrics")]
182    batch_propose_start: Arc<Mutex<Option<Instant>>>,
183
184    /// Holds the most recent round and timestamp that the primary proposed a batch for.
185    /// TODO(kaimast): avoiding using an async lock here, so this can be merged with the `proposed_batch`,
186    /// to have a unified `primary_state` field.
187    latest_proposal_timestamp: Arc<TRwLock<Option<(u64, i64)>>>,
188
189    /// The recently-signed batch proposals.
190    signed_proposals: Arc<RwLock<SignedProposals<N>>>,
191
192    /// The handles for all background tasks spawned by this primary.
193    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
194
195    /// The node configuration directory.
196    node_data_dir: NodeDataDir,
197
198    /// Manages proposal readiness state and drives the batch proposal loop.
199    proposal_task: ProposalTask<N>,
200
201    /// Used to wake up a the dedicated round-increment task, if we may be able to advance to the next round.
202    /// This is used, so the timeout for round advancement is reset on every round increment.
203    round_increment_notify: Arc<Notify>,
204}
205
206impl<N: Network> Primary<N> {
207    /// The maximum number of unconfirmed transmissions to send to the primary.
208    pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
209
210    /// Initializes a new primary instance.
211    #[allow(clippy::too_many_arguments)]
212    pub fn new(
213        account: Account<N>,
214        storage: Storage<N>,
215        ledger: Arc<dyn LedgerService<N>>,
216        block_sync: Arc<BlockSync<N>>,
217        ip: Option<SocketAddr>,
218        trusted_validators: &[SocketAddr],
219        trusted_peers_only: bool,
220        node_data_dir: NodeDataDir,
221        dev: Option<u16>,
222    ) -> Result<Self> {
223        // Initialize the gateway.
224        let gateway = Gateway::new(
225            account,
226            storage.clone(),
227            ledger.clone(),
228            ip,
229            trusted_validators,
230            trusted_peers_only,
231            node_data_dir.clone(),
232            dev,
233        )?;
234        // Initialize the sync module.
235        let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone(), block_sync);
236
237        // Initialize the primary instance.
238        Ok(Self {
239            sync,
240            gateway,
241            storage,
242            ledger,
243            node_data_dir,
244            workers: Default::default(),
245            primary_callback: Default::default(),
246            proposed_batch: Default::default(),
247            #[cfg(feature = "metrics")]
248            batch_propose_start: Default::default(),
249            latest_proposal_timestamp: Default::default(),
250            signed_proposals: Default::default(),
251            handles: Default::default(),
252            proposal_task: Default::default(),
253            round_increment_notify: Default::default(),
254        })
255    }
256
257    /// Load the proposal cache file and update the Primary state with the stored data.
258    async fn load_proposal_cache(&self) -> Result<()> {
259        // Fetch the signed proposals from the file system if it exists.
260        match ProposalCache::<N>::exists(&self.node_data_dir) {
261            // If the proposal cache exists, then process the proposal cache.
262            true => match ProposalCache::<N>::load(self.gateway.account().address(), &self.node_data_dir) {
263                Ok(proposal_cache) => {
264                    // Extract the proposal and signed proposals.
265                    let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
266                        proposal_cache.into();
267
268                    *self.latest_proposal_timestamp.write().await = Some((latest_certificate_round, now()));
269                    *self.proposed_batch.write() = match proposed_batch {
270                        Some(p) => ProposedBatchState::Certifying(Box::new(p)),
271                        None => ProposedBatchState::None,
272                    };
273                    *self.signed_proposals.write() = signed_proposals;
274
275                    // Update the storage with the pending certificates.
276                    for certificate in pending_certificates {
277                        let batch_id = certificate.batch_id();
278                        // We use a dummy IP because the node should not need to request from any peers.
279                        // The storage should have stored all the transmissions. If not, we simply
280                        // skip the certificate.
281                        if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
282                        {
283                            let err = err.context(format!(
284                                "Failed to load stored certificate {} from proposal cache",
285                                fmt_id(batch_id)
286                            ));
287                            warn!("{}", &flatten_error(err));
288                        }
289                    }
290                    Ok(())
291                }
292                Err(err) => Err(err.context("Failed to read the signed proposals from the file system")),
293            },
294            // If the proposal cache does not exist, then return early.
295            false => Ok(()),
296        }
297    }
298
299    /// Run the primary instance.
300    pub async fn run(
301        &self,
302        ping: Option<Arc<Ping<N>>>,
303        primary_callback: Option<Arc<dyn PrimaryCallback<N>>>,
304        sync_callback: Option<Arc<dyn SyncCallback<N>>>,
305        primary_sender: PrimarySender<N>,
306        primary_receiver: PrimaryReceiver<N>,
307    ) -> Result<()> {
308        info!("Starting the primary instance of the memory pool...");
309
310        // Set the BFT sender.
311        if let Some(callback) = primary_callback {
312            self.primary_callback.set(callback)?;
313        }
314
315        // Construct a map of the worker senders.
316        let mut worker_senders = IndexMap::new();
317        // Construct a map for the workers.
318        let mut workers = Vec::new();
319        // Initialize the workers.
320        for id in 0..MAX_WORKERS {
321            // Construct the worker channels.
322            let (tx_worker, rx_worker) = init_worker_channels();
323            // Construct the worker instance.
324            let worker = Worker::new(
325                id,
326                Arc::new(self.gateway.clone()),
327                self.storage.clone(),
328                self.ledger.clone(),
329                self.proposed_batch.clone(),
330            )?;
331            // Run the worker instance.
332            worker.run(rx_worker);
333            // Add the worker to the list of workers.
334            workers.push(worker);
335            // Add the worker sender to the map.
336            worker_senders.insert(id, tx_worker);
337        }
338        // Set the workers.
339        if self.workers.set(workers).is_err() {
340            bail!("Workers already set. `Primary::run` cannot be called more than once.");
341        }
342
343        // First, initialize the sync channels.
344        let (sync_sender, sync_receiver) = init_sync_channels();
345        // Next, initialize the sync module and sync the storage from ledger.
346        self.sync.initialize(sync_callback)?;
347        // Next, load and process the proposal cache before running the sync module.
348        self.load_proposal_cache().await?;
349        // Next, run the sync module.
350        self.sync.run(ping, sync_receiver).await?;
351        // Next, initialize the gateway.
352        self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
353        // Lastly, start the primary handlers.
354        // Note: This ensures the primary does not start communicating before syncing is complete.
355        self.start_handlers(primary_receiver);
356
357        Ok(())
358    }
359
360    /// Returns the current round.
361    pub fn current_round(&self) -> u64 {
362        self.storage.current_round()
363    }
364
365    /// Returns `true` if the primary is synced.
366    pub fn is_synced(&self) -> bool {
367        self.sync.is_synced()
368    }
369
370    /// Returns the gateway.
371    pub const fn gateway(&self) -> &Gateway<N> {
372        &self.gateway
373    }
374
375    /// Returns the storage.
376    pub const fn storage(&self) -> &Storage<N> {
377        &self.storage
378    }
379
380    /// Returns the ledger.
381    pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
382        &self.ledger
383    }
384
385    /// Returns the number of workers.
386    pub fn num_workers(&self) -> u8 {
387        u8::try_from(self.workers.get().expect("Primary is not running yet").len()).expect("Too many workers")
388    }
389
390    /// Returns the workers.
391    pub fn workers(&self) -> &[Worker<N>] {
392        self.workers.get().expect("Primary is not running yet")
393    }
394}
395
396impl<N: Network> Primary<N> {
397    /// Returns the number of unconfirmed transmissions.
398    pub fn num_unconfirmed_transmissions(&self) -> usize {
399        self.workers().iter().map(|worker| worker.num_transmissions()).sum()
400    }
401
402    /// Returns the number of unconfirmed ratifications.
403    pub fn num_unconfirmed_ratifications(&self) -> usize {
404        self.workers().iter().map(|worker| worker.num_ratifications()).sum()
405    }
406
407    /// Returns the number of unconfirmed solutions.
408    pub fn num_unconfirmed_solutions(&self) -> usize {
409        self.workers().iter().map(|worker| worker.num_solutions()).sum()
410    }
411
412    /// Returns the number of unconfirmed transactions.
413    pub fn num_unconfirmed_transactions(&self) -> usize {
414        self.workers().iter().map(|worker| worker.num_transactions()).sum()
415    }
416}
417
418impl<N: Network> Primary<N> {
419    /// Returns the worker transmission IDs.
420    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
421        self.workers().iter().flat_map(|worker| worker.transmission_ids())
422    }
423
424    /// Returns the worker transmissions.
425    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
426        self.workers().iter().flat_map(|worker| worker.transmissions())
427    }
428
429    /// Returns the worker solutions.
430    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
431        self.workers().iter().flat_map(|worker| worker.solutions())
432    }
433
434    /// Returns the worker transactions.
435    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
436        self.workers().iter().flat_map(|worker| worker.transactions())
437    }
438}
439
440impl<N: Network> Primary<N> {
441    /// Clears the worker solutions.
442    pub fn clear_worker_solutions(&self) {
443        self.workers().iter().for_each(Worker::clear_solutions);
444    }
445}
446
447#[async_trait::async_trait]
448impl<N: Network> proposal_task::BatchPropose for Primary<N> {
449    fn current_round(&self) -> u64 {
450        Primary::current_round(self)
451    }
452
453    fn wait_for_synced_if_syncing(&self) -> Option<futures::future::BoxFuture<'_, ()>> {
454        self.sync.wait_for_synced_if_syncing()
455    }
456
457    fn is_synced(&self) -> bool {
458        self.sync.is_synced()
459    }
460
461    /// Proposes the batch for the current round.
462    ///
463    /// This method performs the following steps:
464    /// 1. Drain the workers.
465    /// 2. Sign the batch.
466    /// 3. Set the batch proposal in the primary.
467    /// 4. Broadcast the batch header to all validators for signing.
468    ///
469    /// # Returns
470    /// - `Ok(true)` if the batch was proposed.
471    /// - `Ok(false)` if the batch was not proposed for a benign reason, e.g., the timestamp is too soon after the previous certificate.
472    /// - `Err(err)` if an unexpected error occured.
473    async fn propose_batch(&self) -> Result<bool> {
474        // Ensure there are not concurrent executions of this function.
475        //
476        // Note, in the current design, this function is only invoked from the batch proposal task, and it is technically
477        // not possible for there to be concurrent invocations of the function, but we keep this lock for now.
478        let mut lock_guard = self.latest_proposal_timestamp.write().await;
479
480        // Check if the proposed batch has expired, and clear it if it has expired.
481        if let Err(err) = self
482            .check_proposed_batch_for_expiration()
483            .with_context(|| "Failed to check the proposed batch for expiration")
484        {
485            warn!("{}", flatten_error(&err));
486            return Ok(false);
487        }
488
489        // Retrieve the current round.
490        let round = self.current_round();
491        // Compute the previous round.
492        let previous_round = round.saturating_sub(1);
493
494        // If the current round is 0, return early.
495        // This can actually never happen, because of the invariant that the current round is never 0
496        // (see [`StorageInner::current_round`]).
497        ensure!(round > 0, "Round 0 cannot have transaction batches");
498
499        // If the current storage round is below the latest proposal round, then return early.
500        if let Some((latest_round, _)) = &*lock_guard
501            && round < *latest_round
502        {
503            warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {latest_round}");
504            return Ok(false);
505        }
506
507        // If there is a batch being proposed or certified already, handle accordingly.
508        match &*self.proposed_batch.read() {
509            ProposedBatchState::Certifying(proposal) => {
510                // Ensure that the storage is caught up to the proposal before proceeding to rebroadcast this.
511                if round < proposal.round()
512                    || proposal
513                        .batch_header()
514                        .previous_certificate_ids()
515                        .iter()
516                        .any(|id| !self.storage.contains_certificate(*id))
517                {
518                    warn!(
519                        "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
520                        proposal.round(),
521                    );
522                    return Ok(false);
523                }
524                // Construct the event.
525                // TODO(ljedrz): the BatchHeader should be serialized only once in advance before being sent to non-signers.
526                let event = Event::BatchPropose(proposal.batch_header().clone().into());
527                // Iterate through the non-signers.
528                for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
529                    // Resolve the address to the peer IP.
530                    match self.gateway.resolver().read().get_peer_ip_for_address(address) {
531                        // Resend the batch proposal to the validator for signing.
532                        Some(peer_ip) => {
533                            let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
534                            tokio::spawn(async move {
535                                debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
536                                // Resend the batch proposal to the peer.
537                                if gateway.send(peer_ip, event_).await.is_none() {
538                                    warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
539                                }
540                            });
541                        }
542                        None => continue,
543                    }
544                }
545                debug!("Proposed batch for round {} is still valid", proposal.round());
546                return Ok(false);
547            }
548            // A batch is being certified; wait until it completes before proposing another.
549            ProposedBatchState::Certified(_) => {
550                debug!("Cannot propose a batch for round {round} - a batch is currently being certified");
551                return Ok(false);
552            }
553            ProposedBatchState::None => {
554                // No batch in progress, so it is save to propose a new one.
555            }
556        }
557
558        #[cfg(feature = "metrics")]
559        metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
560
561        // Ensure that the primary does not create a new proposal too quickly.
562        if let Some((_, latest_timestamp)) = &*lock_guard
563            && !self.check_own_proposal_timestamp(previous_round, *latest_timestamp, now())?
564        {
565            return Ok(false);
566        }
567
568        // Ensure the primary has not proposed a batch for this round before.
569        if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
570            // If a BFT sender was provided, attempt to advance the current round.
571            if let Some(cb) = &*self.primary_callback.get_ref() {
572                match cb.try_advance_to_next_round(self.current_round()) {
573                    true => (), // continue,
574                    false => return Ok(false),
575                }
576            }
577            debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
578            return Ok(false);
579        }
580
581        // Determine if the current round has been proposed.
582        // Note: Do NOT make this judgment in advance before rebroadcast and round update. Rebroadcasting is
583        // good for network reliability and should not be prevented for the already existing proposed_batch.
584        // If a certificate already exists for the current round, an attempt should be made to advance the
585        // round as early as possible.
586        if let Some((latest_round, _)) = &*lock_guard
587            && *latest_round == round
588        {
589            debug!("Primary is safely skipping a batch proposal - round {round} already proposed");
590            return Ok(false);
591        }
592
593        // Retrieve the committee to check against.
594        let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
595        // Check if the primary is connected to enough validators to reach quorum threshold.
596        {
597            // Retrieve the connected validator addresses.
598            let mut connected_validators = self.gateway.connected_addresses();
599            // Append the primary to the set.
600            connected_validators.insert(self.gateway.account().address());
601            // If quorum threshold is not reached, return early.
602            if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
603                debug!(
604                    "Primary is safely skipping a batch proposal for round {round} {}",
605                    "(please connect to more validators)".dimmed()
606                );
607                trace!("Primary is connected to {} validators", connected_validators.len() - 1);
608                return Ok(false);
609            }
610        }
611
612        // Retrieve the previous certificates.
613        let previous_certificates = self.storage.get_certificates_for_round(previous_round);
614
615        // Check if the batch is ready to be proposed.
616        // Note: The primary starts at round 1, and round 0 contains no certificates, by definition.
617        let mut is_ready = previous_round == 0;
618        // If the previous round is not 0, check if the previous certificates have reached the quorum threshold.
619        if previous_round > 0 {
620            // Retrieve the committee lookback for the round.
621            let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
622                bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
623            };
624            // Construct a set over the authors.
625            let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
626            // Check if the previous certificates have reached the quorum threshold.
627            if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
628                is_ready = true;
629            }
630            #[cfg(feature = "test_network")]
631            {
632                // If we are using a hotswapped dev committee, use simplified checks to more easily advance.
633                if let Some(dev_committee) = self.ledger.dev_committee_for_round(previous_round)? {
634                    if round <= dev_committee.starting_round() {
635                        is_ready = true;
636                    }
637                }
638            }
639        }
640        // If the batch is not ready to be proposed, return early.
641        if !is_ready {
642            debug!(
643                "Primary is safely skipping a batch proposal for round {round} {}",
644                format!("(previous round {previous_round} has not reached quorum)").dimmed()
645            );
646            return Ok(false);
647        }
648
649        // Initialize the map of transmissions.
650        let mut transmissions: IndexMap<_, _> = Default::default();
651        // Track the total execution costs of the batch proposal as it is being constructed.
652        let mut proposal_cost = 0u64;
653        // Note: worker draining and transaction inclusion needs to be thought
654        // through carefully when there is more than one worker. The fairness
655        // provided by one worker (FIFO) is no longer guaranteed with multiple workers.
656        debug_assert_eq!(MAX_WORKERS, 1);
657
658        'outer: for worker in self.workers().iter() {
659            let mut num_worker_transmissions = 0usize;
660
661            while let Some((id, transmission)) = worker.remove_front() {
662                // Check the selected transmissions are below the batch limit.
663                if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
664                    // Reinsert the transmission into the worker.
665                    worker.insert_front(id, transmission);
666                    break 'outer;
667                }
668
669                // Check the max transmissions per worker is not exceeded.
670                if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER {
671                    // Reinsert the transmission into the worker.
672                    worker.insert_front(id, transmission);
673                    continue 'outer;
674                }
675
676                // Check if the ledger already contains the transmission.
677                if self.ledger.contains_transmission(&id).unwrap_or(true) {
678                    trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
679                    continue;
680                }
681
682                // Check if the storage already contain the transmission.
683                // Note: We do not skip if this is the first transmission in the proposal, to ensure that
684                // the primary does not propose a batch with no transmissions.
685                if !transmissions.is_empty() && self.storage.contains_transmission(id) {
686                    trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
687                    continue;
688                }
689
690                // Check the transmission is still valid.
691                match (id, transmission.clone()) {
692                    (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
693                        // Ensure the checksum matches. If not, skip the solution.
694                        if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum)
695                        {
696                            trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id));
697                            continue;
698                        }
699                        // Check if the solution is still valid.
700                        if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
701                            trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
702                            continue;
703                        }
704                    }
705                    (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
706                        // Ensure the checksum matches. If not, skip the transaction.
707                        if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum )
708                        {
709                            trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id));
710                            continue;
711                        }
712
713                        // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
714                        let transaction = spawn_blocking!({
715                            match transaction {
716                                Data::Object(transaction) => Ok(transaction),
717                                Data::Buffer(bytes) => Ok(Transaction::<N>::read_le(
718                                    &mut bytes.take(N::LATEST_MAX_TRANSACTION_SIZE() as u64),
719                                )?),
720                            }
721                        })?;
722
723                        // Fetch the current block height and consensus version.
724                        let current_block_height = self.ledger.latest_block_height();
725                        let consensus_version = N::CONSENSUS_VERSION(current_block_height)?;
726
727                        // Compute the transaction spent cost (in microcredits).
728                        // Note: We purposefully discard this transaction if we are unable to compute the spent cost.
729                        let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
730                        else {
731                            debug!(
732                                "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost",
733                                fmt_id(transaction_id)
734                            );
735                            continue;
736                        };
737
738                        // Check if the transaction is still valid.
739                        if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
740                            trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
741                            continue;
742                        }
743
744                        // Compute the next proposal cost.
745                        // Note: We purposefully discard this transaction if the proposal cost overflows.
746                        let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
747                            debug!(
748                                "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed",
749                                fmt_id(transaction_id)
750                            );
751                            continue;
752                        };
753
754                        // Check if the next proposal cost exceeds the batch proposal spend limit.
755                        let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(current_block_height);
756                        if next_proposal_cost > batch_spend_limit {
757                            debug!(
758                                "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})",
759                                fmt_id(transaction_id),
760                                batch_spend_limit
761                            );
762
763                            // Reinsert the transmission into the worker.
764                            worker.insert_front(id, transmission);
765                            break 'outer;
766                        }
767
768                        // Update the proposal cost.
769                        proposal_cost = next_proposal_cost;
770                    }
771
772                    // Note: We explicitly forbid including ratifications,
773                    // as the protocol currently does not support ratifications.
774                    (TransmissionID::Ratification, Transmission::Ratification) => continue,
775                    // All other combinations are clearly invalid.
776                    _ => continue,
777                }
778
779                // If the transmission is valid, insert it into the proposal's transmission list.
780                transmissions.insert(id, transmission);
781                num_worker_transmissions = num_worker_transmissions.saturating_add(1);
782            }
783        }
784
785        // Determine the current timestamp.
786        let current_timestamp = now();
787
788        /* Proceeding to sign & propose the batch. */
789        info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
790
791        // Update the latest proposed round and timestamp.
792        *lock_guard = Some((round, current_timestamp));
793        // Retrieve the private key.
794        let private_key = *self.gateway.account().private_key();
795        // Retrieve the committee ID.
796        let committee_id = committee_lookback.id();
797        // Prepare the transmission IDs.
798        let transmission_ids = transmissions.keys().copied().collect();
799        // Prepare the previous batch certificate IDs.
800        let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
801        // Sign the batch header and construct the proposal.
802        let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
803            &private_key,
804            round,
805            current_timestamp,
806            committee_id,
807            transmission_ids,
808            previous_certificate_ids,
809            &mut rand::rng()
810        ))
811        .and_then(|batch_header| {
812            Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
813                .map(|proposal| (batch_header, proposal))
814        })
815        .inspect_err(|_| {
816            // On error, reinsert the transmissions and then propagate the error.
817            if let Err(err) = self.reinsert_transmissions_into_workers(transmissions) {
818                error!("{}", flatten_error(err.context("Failed to reinsert transmissions")));
819            }
820        })?;
821
822        // Broadcast the batch to all validators for signing.
823        self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
824        // Store the proposal in memory.
825        *self.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
826        // Record the wall-clock time at which the batch was proposed.
827        #[cfg(feature = "metrics")]
828        {
829            *self.batch_propose_start.lock() = Some(Instant::now());
830        }
831
832        Ok(true)
833    }
834}
835
836impl<N: Network> Primary<N> {
837    /// Processes a batch propose from a peer.
838    ///
839    /// This method performs the following steps:
840    /// 1. Verify the batch.
841    /// 2. Sign the batch.
842    /// 3. Broadcast the signature back to the validator.
843    ///
844    /// If our primary is ahead of the peer, we will not sign the batch.
845    /// If our primary is behind the peer, but within GC range, we will sync up to the peer's round, and then sign the batch.
846    async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
847        let BatchPropose { round: batch_round, batch_header } = batch_propose;
848
849        // Deserialize the batch header.
850        let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
851        // Ensure the round matches in the batch header.
852        if batch_round != batch_header.round() {
853            // Proceed to disconnect the validator.
854            self.gateway.disconnect(peer_ip);
855            bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
856        }
857
858        // Retrieve the batch author.
859        let batch_author = batch_header.author();
860
861        // Ensure the batch proposal is from the validator.
862        match self.gateway.resolve_to_aleo_addr(peer_ip) {
863            // If the peer is a validator, then ensure the batch proposal is from the validator.
864            Some(address) => {
865                if address != batch_author {
866                    // Proceed to disconnect the validator.
867                    self.gateway.disconnect(peer_ip);
868                    bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
869                }
870            }
871            None => bail!("Batch proposal from a disconnected validator"),
872        }
873        // Ensure the batch author is a current committee member.
874        if !self.gateway.is_authorized_validator_address(batch_author) {
875            // Proceed to disconnect the validator.
876            self.gateway.disconnect(peer_ip);
877            bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
878        }
879        // Ensure the batch proposal is not from the current primary.
880        if self.gateway.account().address() == batch_author {
881            bail!("Invalid peer - proposed batch from myself ({batch_author})");
882        }
883
884        // Ensure that the batch proposal's committee ID matches the expected committee ID.
885        // This may happen when the network forks. A transaction referencing a
886        // state root from one side of the fork will be aborted on the other
887        // side of the fork. This leads to a different view of stake and
888        // therefore a different view of the committee ID.
889        let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
890        if expected_committee_id != batch_header.committee_id() {
891            // Proceed to disconnect the validator.
892            self.gateway.disconnect(peer_ip);
893            bail!(
894                "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
895                batch_header.committee_id()
896            );
897        }
898
899        // Retrieve the cached round and batch ID for this validator.
900        if let Some((signed_round, signed_batch_id, signature)) =
901            self.signed_proposals.read().get(&batch_author).copied()
902        {
903            // If the signed round is ahead of the peer's batch round, do not sign the proposal.
904            // Note: while this may be valid behavior, additional formal analysis and testing will need to be done before allowing it.
905            if signed_round > batch_header.round() {
906                bail!(
907                    "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}",
908                    batch_header.round()
909                );
910            }
911
912            // If the round matches and the batch ID differs, then the validator is malicious.
913            if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
914                bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
915            }
916            // If the round and batch ID matches, then skip signing the batch a second time.
917            // Instead, rebroadcast the cached signature to the peer.
918            if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
919                let gateway = self.gateway.clone();
920                tokio::spawn(async move {
921                    debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
922                    let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
923                    // Resend the batch signature to the peer.
924                    if gateway.send(peer_ip, event).await.is_none() {
925                        warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
926                    }
927                });
928                // Return early.
929                return Ok(());
930            }
931        }
932
933        // Ensure that the batch header doesn't already exist in storage.
934        // Note this is already checked in `check_batch_header`, however we can return early here without creating a blocking task.
935        if self.storage.contains_batch(batch_header.batch_id()) {
936            debug!(
937                "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
938                format!("batch for round {batch_round} already exists in storage").dimmed()
939            );
940            return Ok(());
941        }
942
943        // Compute the previous round.
944        let previous_round = batch_round.saturating_sub(1);
945        // Ensure that the peer did not propose a batch too quickly.
946        if let Err(err) = self.check_peer_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
947            // Proceed to disconnect the validator.
948            self.gateway.disconnect(peer_ip);
949            return Err(err.context(format!("Malicious behavior of peer '{peer_ip}'")));
950        }
951
952        // Ensure the batch header does not contain any ratifications.
953        if batch_header.contains(TransmissionID::Ratification) {
954            // Proceed to disconnect the validator.
955            self.gateway.disconnect(peer_ip);
956            bail!(
957                "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'",
958            );
959        }
960
961        // If the peer is ahead, use the batch header to sync up to the peer.
962        let mut missing_transmissions =
963            self.sync_with_batch_header_from_peer::<false, true>(peer_ip, &batch_header).await?;
964
965        // Check that the transmission ids match and are not fee transactions.
966        if let Err(err) = cfg_iter_mut!(&mut missing_transmissions).try_for_each(|(transmission_id, transmission)| {
967            // If the transmission is not well-formed, then return early.
968            self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
969        }) {
970            let err = err.context(format!(
971                "Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission"
972            ));
973            debug!("{}", flatten_error(err));
974            return Ok(());
975        }
976
977        // Ensure the batch is for the current round.
978        // This method must be called after fetching previous certificates (above),
979        // and prior to checking the batch header (below).
980        if let Err(e) = self.ensure_is_signing_round(batch_round) {
981            // If the primary is not signing for the peer's round, then return early.
982            debug!("{e} from '{peer_ip}'");
983            return Ok(());
984        }
985
986        // Ensure the batch header from the peer is valid.
987        let (storage, header) = (self.storage.clone(), batch_header.clone());
988
989        // Check the batch header, and return early if it already exists in storage.
990        let Some(missing_transmissions) =
991            spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?
992        else {
993            return Ok(());
994        };
995
996        // Inserts the missing transmissions into the workers.
997        self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
998
999        /* Proceeding to sign the batch. */
1000
1001        // Retrieve the batch ID.
1002        let batch_id = batch_header.batch_id();
1003        // Sign the batch ID.
1004        let account = self.gateway.account().clone();
1005        let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::rng()))?;
1006
1007        // Ensure the proposal has not already been signed.
1008        //
1009        // Note: Due to the need to sync the batch header with the peer, it is possible
1010        // for the primary to receive the same 'BatchPropose' event again, whereby only
1011        // one instance of this handler should sign the batch. This check guarantees this.
1012        match self.signed_proposals.write().0.entry(batch_author) {
1013            std::collections::hash_map::Entry::Occupied(mut entry) => {
1014                // If the validator has already signed a batch for this round, then return early,
1015                // since, if the peer still has not received the signature, they will request it again,
1016                // and the logic at the start of this function will resend the (now cached) signature
1017                // to the peer if asked to sign this batch proposal again.
1018                if entry.get().0 == batch_round {
1019                    return Ok(());
1020                }
1021                // Otherwise, cache the round, batch ID, and signature for this validator.
1022                entry.insert((batch_round, batch_id, signature));
1023            }
1024            // If the validator has not signed a batch before, then continue.
1025            std::collections::hash_map::Entry::Vacant(entry) => {
1026                // Cache the round, batch ID, and signature for this validator.
1027                entry.insert((batch_round, batch_id, signature));
1028            }
1029        };
1030
1031        // Broadcast the signature back to the validator.
1032        let self_ = self.clone();
1033        tokio::spawn(async move {
1034            let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
1035            // Send the batch signature to the peer.
1036            if self_.gateway.send(peer_ip, event).await.is_some() {
1037                debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
1038            }
1039        });
1040
1041        Ok(())
1042    }
1043
1044    /// Attempts to add a peer's `signature` for `batch_id` to the current proposal.
1045    ///
1046    /// Consumes `state` and always returns the (possibly updated) state alongside a result so that
1047    /// the caller can restore it unconditionally, keeping `proposed_batch` consistent even on the
1048    /// error path.
1049    ///
1050    /// # Returns
1051    /// * `(Ok(Some(proposal)), Certified(id))` — quorum reached; caller should certify.
1052    /// * `(Ok(None), <restored state>)` — signature accepted or silently dropped; nothing to do.
1053    /// * `(Err(e), <restored state>)` — signature rejected; caller should propagate the error.
1054    fn add_signature_to_batch(
1055        &self,
1056        state: ProposedBatchState<N>,
1057        peer_ip: SocketAddr,
1058        batch_id: Field<N>,
1059        signature: Signature<N>,
1060    ) -> (Result<Option<Proposal<N>>>, ProposedBatchState<N>) {
1061        match state {
1062            ProposedBatchState::Certifying(mut proposal) if proposal.batch_id() == batch_id => {
1063                // This signature is for our currently active proposal.
1064                // Use an inner closure to keep `?` ergonomics while returning a tuple.
1065                let inner: Result<bool> = (|| {
1066                    let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1067                    let Some(signer) = self.gateway.resolve_to_aleo_addr(peer_ip) else {
1068                        bail!("Signature is from a disconnected validator");
1069                    };
1070                    let new_signature = proposal.add_signature(signer, signature, &committee_lookback)?;
1071                    if new_signature {
1072                        info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
1073                        Ok(proposal.is_quorum_threshold_reached(&committee_lookback))
1074                    } else {
1075                        debug!(
1076                            "Received duplicated signature from '{peer_ip}' for batch \
1077                                {batch_id} in round {round}",
1078                            round = proposal.round()
1079                        );
1080                        Ok(false)
1081                    }
1082                })();
1083                match inner {
1084                    Ok(true) => {
1085                        let certified_id = proposal.batch_id();
1086                        (Ok(Some(*proposal)), ProposedBatchState::Certified(certified_id))
1087                    }
1088                    Ok(false) => (Ok(None), ProposedBatchState::Certifying(proposal)),
1089                    Err(e) => (Err(e), ProposedBatchState::Certifying(proposal)),
1090                }
1091            }
1092            ProposedBatchState::Certifying(proposal) => {
1093                // Certifying a different proposal — check if batch_id is already in storage.
1094                if self.storage.contains_batch(batch_id) {
1095                    debug!(
1096                        "Primary is safely skipping a batch signature from {peer_ip} for \
1097                            round {} - batch is already certified",
1098                        proposal.round()
1099                    );
1100                    (Ok(None), ProposedBatchState::Certifying(proposal))
1101                } else {
1102                    let expected_id = proposal.batch_id();
1103                    let round = proposal.round();
1104                    (
1105                        Err(anyhow!("Unknown batch ID '{batch_id}', expected '{expected_id}' for round {round}")),
1106                        ProposedBatchState::Certifying(proposal),
1107                    )
1108                }
1109            }
1110            ProposedBatchState::Certified(id) if id == batch_id => {
1111                // Quorum already reached; late-arriving signature is harmless.
1112                debug!(
1113                    "Skipping batch signature from {peer_ip} for batch '{batch_id}' - \
1114                        already received sufficient signatures"
1115                );
1116                (Ok(None), ProposedBatchState::Certified(id))
1117            }
1118            ProposedBatchState::Certified(id) => {
1119                let result = if self.storage.contains_batch(batch_id) {
1120                    // This is most likely not malicious, but could indicate connectivity issues.
1121                    warn!("Received signature for an older batch {batch_id}");
1122                    Ok(None)
1123                } else {
1124                    Err(anyhow!("Unknown batch ID '{batch_id}'"))
1125                };
1126
1127                (result, ProposedBatchState::Certified(id))
1128            }
1129            ProposedBatchState::None => {
1130                let result = if self.storage.contains_batch(batch_id) {
1131                    // This is most likely not malicious, but could indicate connectivity issues.
1132                    warn!("Received signature for an older batch {batch_id}");
1133                    Ok(None)
1134                } else {
1135                    Err(anyhow!("Unknown batch ID '{batch_id}'"))
1136                };
1137
1138                (result, ProposedBatchState::None)
1139            }
1140        }
1141    }
1142
1143    /// Processes a batch signature from a peer.
1144    ///
1145    /// This method performs the following steps:
1146    /// 1. Ensure the proposed batch has not expired.
1147    /// 2. Verify the signature, ensuring it corresponds to the proposed batch.
1148    /// 3. Store the signature.
1149    /// 4. Certify the batch if enough signatures have been received.
1150    /// 5. Broadcast the batch certificate to all validators.
1151    async fn process_batch_signature_from_peer(
1152        &self,
1153        peer_ip: SocketAddr,
1154        batch_signature: BatchSignature<N>,
1155    ) -> Result<()> {
1156        // Ensure the proposed batch has not expired, and clear the proposed batch if it has expired.
1157        self.check_proposed_batch_for_expiration()?;
1158
1159        // Retrieve the signature and timestamp.
1160        let BatchSignature { batch_id, signature } = batch_signature;
1161
1162        // Retrieve the signer.
1163        let signer = signature.to_address();
1164
1165        // Ensure the batch signature is signed by the validator.
1166        if self.gateway.resolve_to_aleo_addr(peer_ip) != Some(signer) {
1167            // Proceed to disconnect the validator.
1168            self.gateway.disconnect(peer_ip);
1169            bail!("Malicious peer - batch signature is from a different validator ({signer})");
1170        }
1171        // Ensure the batch signature is not from the current primary.
1172        if self.gateway.account().address() == signer {
1173            bail!("Invalid peer - received a batch signature from myself ({signer})");
1174        }
1175
1176        let self_ = self.clone();
1177        let Some(proposal) = spawn_blocking!({
1178            // Acquire the write lock.
1179            let mut proposed_batch = self_.proposed_batch.write();
1180
1181            let (result, new_state) =
1182                self_.add_signature_to_batch(std::mem::take(&mut *proposed_batch), peer_ip, batch_id, signature);
1183            *proposed_batch = new_state;
1184            result
1185        })?
1186        else {
1187            return Ok(());
1188        };
1189
1190        /* Proceeding to certify the batch. */
1191
1192        info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
1193
1194        // Retrieve the committee lookback for the round.
1195        let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1196        // Store the certified batch and broadcast it to all validators.
1197        // If there was an error storing the certificate, reinsert the transmissions back into the ready queue.
1198        if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
1199            // Reinsert the transmissions back into the ready queue for the next proposal.
1200            self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1201            return Err(e);
1202        }
1203
1204        #[cfg(feature = "metrics")]
1205        metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
1206        Ok(())
1207    }
1208
1209    /// Processes a batch certificate from a peer.
1210    ///
1211    /// This method performs the following steps:
1212    /// 1. Stores the given batch certificate, after ensuring it is valid.
1213    /// 2. If there are enough certificates to reach quorum threshold for the current round,
1214    ///    then proceed to advance to the next round.
1215    async fn process_batch_certificate_from_peer(
1216        &self,
1217        peer_ip: SocketAddr,
1218        certificate: BatchCertificate<N>,
1219    ) -> Result<()> {
1220        // Ensure the batch certificate is from an authorized validator.
1221        if !self.gateway.is_authorized_validator_ip(peer_ip) {
1222            // Proceed to disconnect the validator.
1223            self.gateway.disconnect(peer_ip);
1224            bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
1225        }
1226        // Ensure storage does not already contain the certificate.
1227        if self.storage.contains_certificate(certificate.id()) {
1228            return Ok(());
1229        // Otherwise, ensure ephemeral storage contains the certificate.
1230        } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
1231            self.storage.insert_unprocessed_certificate(certificate.clone())?;
1232        }
1233
1234        // Retrieve the batch certificate author.
1235        let author = certificate.author();
1236        // Retrieve the batch certificate round.
1237        let certificate_round = certificate.round();
1238        // Retrieve the batch certificate committee ID.
1239        let committee_id = certificate.committee_id();
1240
1241        // Ensure the batch certificate is not from the current primary.
1242        if self.gateway.account().address() == author {
1243            bail!("Received a batch certificate for myself ({author})");
1244        }
1245
1246        // Ensure that the incoming certificate is valid.
1247        self.storage.check_incoming_certificate(&certificate)?;
1248
1249        // Store the certificate, after ensuring it is valid above.
1250        // The following call recursively fetches and stores
1251        // the previous certificates referenced from this certificate.
1252        // It is critical to make the following call this after validating the certificate above.
1253        // The reason is that a sequence of malformed certificates,
1254        // with references to previous certificates with non-decreasing rounds,
1255        // cause the recursive fetching of certificates to crash the validator due to resource exhaustion.
1256        // Note that if the following call, if not returning an error, guarantees the backward closure of the DAG
1257        // (i.e. that all the referenced previous certificates are in the DAG before storing this one),
1258        // then all the validity checks in [`Storage::check_certificate`] should be redundant.
1259        // TODO: eliminate those redundant checks
1260        self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
1261
1262        // If there are enough certificates to reach quorum threshold for the certificate round,
1263        // then proceed to advance to the next round.
1264
1265        // Retrieve the committee lookback.
1266        let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
1267
1268        // Retrieve the certificate authors.
1269        let authors = self.storage.get_certificate_authors_for_round(certificate_round);
1270        // Check if the certificates have reached the quorum threshold.
1271        let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
1272
1273        // Ensure that the batch certificate's committee ID matches the expected committee ID.
1274        let expected_committee_id = committee_lookback.id();
1275        if expected_committee_id != committee_id {
1276            // Proceed to disconnect the validator.
1277            self.gateway.disconnect(peer_ip);
1278            bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
1279        }
1280
1281        // Determine if we are currently proposing a round that is relevant.
1282        // Note: This is important, because while our peers have advanced,
1283        // they may not be proposing yet, and thus still able to sign our proposed batch.
1284        let should_advance = match &*self.latest_proposal_timestamp.read().await {
1285            // We advance if the proposal round is less than the current round that was just certified.
1286            Some((latest_round, _)) => *latest_round < certificate_round,
1287            // If there's no proposal, we consider advancing.
1288            None => true,
1289        };
1290
1291        // Retrieve the current round.
1292        let current_round = self.current_round();
1293
1294        // Determine whether to advance to the next round.
1295        if is_quorum && should_advance && certificate_round >= current_round {
1296            // If we have reached the quorum threshold and the round should advance, then proceed to the next round.
1297            self.round_increment_notify.notify_one();
1298        }
1299        Ok(())
1300    }
1301}
1302
1303impl<N: Network> Primary<N> {
1304    /// Starts the primary handlers.
1305    ///
1306    /// For each receiver in the `primary_receiver` struct, there will be a dedicated task
1307    /// that awaits new data and handles it accordingly.
1308    /// Additionally, this spawns a task that periodically issues PrimaryPings and one that
1309    /// tries to move to the next round when triggered (e.g. after a certificate is stored) or on a timeout.
1310    ///
1311    /// This function is called exactly once, in `Self::run()`.
1312    fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
1313        let PrimaryReceiver {
1314            mut rx_batch_propose,
1315            mut rx_batch_signature,
1316            mut rx_batch_certified,
1317            mut rx_primary_ping,
1318            mut rx_unconfirmed_solution,
1319            mut rx_unconfirmed_transaction,
1320        } = primary_receiver;
1321
1322        // Start the primary ping sender.
1323        let self_ = self.clone();
1324        self.spawn(async move {
1325            loop {
1326                // Sleep briefly.
1327                tokio::time::sleep(PRIMARY_PING_INTERVAL).await;
1328
1329                // Retrieve the block locators.
1330                let self__ = self_.clone();
1331                let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1332                    Ok(block_locators) => block_locators,
1333                    Err(e) => {
1334                        warn!("Failed to retrieve block locators - {e}");
1335                        continue;
1336                    }
1337                };
1338
1339                // Retrieve the latest certificate of the primary.
1340                let primary_certificate = {
1341                    // Retrieve the primary address.
1342                    let primary_address = self_.gateway.account().address();
1343
1344                    // Iterate backwards from the latest round to find the primary certificate.
1345                    let mut certificate = None;
1346                    let mut current_round = self_.current_round();
1347                    while certificate.is_none() {
1348                        // If the current round is 0, then break the while loop.
1349                        if current_round == 0 {
1350                            break;
1351                        }
1352                        // Retrieve the primary certificates.
1353                        if let Some(primary_certificate) =
1354                            self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1355                        {
1356                            certificate = Some(primary_certificate);
1357                        // If the primary certificate was not found, decrement the round.
1358                        } else {
1359                            current_round = current_round.saturating_sub(1);
1360                        }
1361                    }
1362
1363                    // Determine if the primary certificate was found.
1364                    match certificate {
1365                        Some(certificate) => certificate,
1366                        // Skip this iteration of the loop (do not send a primary ping).
1367                        None => continue,
1368                    }
1369                };
1370
1371                // Construct the primary ping.
1372                let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1373                // Broadcast the event.
1374                self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1375            }
1376        });
1377
1378        // Start the primary ping handler.
1379        let self_ = self.clone();
1380        self.spawn(async move {
1381            while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1382                // If the primary is not synced, then do not process the primary ping.
1383                if self_.sync.is_synced() {
1384                    trace!("Processing new primary ping from '{peer_ip}'");
1385                } else {
1386                    trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1387                    continue;
1388                }
1389
1390                // Spawn a task to process the primary certificate.
1391                {
1392                    let self_ = self_.clone();
1393                    tokio::spawn(async move {
1394                        // Deserialize the primary certificate in the primary ping.
1395                        let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1396                        else {
1397                            warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1398                            return;
1399                        };
1400                        // Process the primary certificate.
1401                        let id = fmt_id(primary_certificate.id());
1402                        let round = primary_certificate.round();
1403                        if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1404                            warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}");
1405                        }
1406                    });
1407                }
1408            }
1409        });
1410
1411        // Start the worker ping(s).
1412        let self_ = self.clone();
1413        self.spawn(async move {
1414            loop {
1415                tokio::time::sleep(WORKER_PING_INTERVAL).await;
1416                // If the primary is not synced, then do not broadcast the worker ping(s).
1417                if !self_.sync.is_synced() {
1418                    trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1419                    continue;
1420                }
1421                // Broadcast the worker ping(s).
1422                for worker in self_.workers() {
1423                    worker.broadcast_ping();
1424                }
1425            }
1426        });
1427
1428        // Start the batch proposal task.
1429        let proposal_task = self.proposal_task.clone();
1430        let self_ = self.clone();
1431        self.spawn(async move { proposal_task.run(self_).await });
1432
1433        // Start the proposed batch handler.
1434        let self_ = self.clone();
1435        self.spawn(async move {
1436            while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1437                // If the primary is not synced, then do not sign the batch.
1438                if !self_.sync.is_synced() {
1439                    trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1440                    continue;
1441                }
1442
1443                // Spawn a task to process the proposed batch.
1444                let self_ = self_.clone();
1445                tokio::spawn(async move {
1446                    // Process the batch proposal.
1447                    let round = batch_propose.round;
1448                    if let Err(err) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1449                        let err = err.context(format!("Cannot sign a batch at round {round} from '{peer_ip}'"));
1450                        warn!("{}", flatten_error(err));
1451                    }
1452                });
1453            }
1454        });
1455
1456        // Start the batch signature handler.
1457        let self_ = self.clone();
1458        self.spawn(async move {
1459            while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1460                // If the primary is not synced, then do not store the signature.
1461                if !self_.sync.is_synced() {
1462                    trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1463                    continue;
1464                }
1465                // Process the batch signature.
1466                // Note: Do NOT spawn a task around this function call. Processing signatures from peers
1467                // is a critical path, and we should only store the minimum required number of signatures.
1468                // In addition, spawning a task can cause concurrent processing of signatures (even with a lock),
1469                // which means the RwLock for the proposed batch must become a 'tokio::sync' to be safe.
1470                let id = fmt_id(batch_signature.batch_id);
1471                if let Err(err) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1472                    let err = err.context(format!("Cannot store a signature for batch '{id}' from '{peer_ip}'"));
1473                    warn!("{}", flatten_error(err));
1474                }
1475            }
1476        });
1477
1478        // Start the certified batch handler.
1479        let self_ = self.clone();
1480        self.spawn(async move {
1481            while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1482                // If the primary is not synced, then do not store the certificate.
1483                if !self_.sync.is_synced() {
1484                    trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1485                    continue;
1486                }
1487                // Spawn a task to process the batch certificate.
1488                let self_ = self_.clone();
1489                tokio::spawn(async move {
1490                    // Deserialize the batch certificate.
1491                    let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1492                        warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1493                        return;
1494                    };
1495                    // Process the batch certificate.
1496                    let id = fmt_id(batch_certificate.id());
1497                    let round = batch_certificate.round();
1498                    if let Err(err) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1499                        warn!(
1500                            "{}",
1501                            flatten_error(err.context(format!(
1502                                "Cannot store a certificate '{id}' for round {round} from '{peer_ip}'"
1503                            )))
1504                        );
1505                    }
1506                });
1507            }
1508        });
1509
1510        // This task tries to move to the next round when triggered (e.g. after a certificate is stored)
1511        // or after a timeout, so we are not stuck on a previous round despite having quorum.
1512        let self_ = self.clone();
1513        self.spawn(async move {
1514            loop {
1515                let round_start = Instant::now();
1516                let current_round = self_.current_round();
1517
1518                // Inner loop: wait and try to increment while we're still in the same round.
1519                while self_.current_round() == current_round {
1520                    let mut futures: Vec<Pin<Box<dyn Future<Output = ()> + Send>>> =
1521                        vec![Box::pin(self_.round_increment_notify.notified())];
1522
1523                    if let Some(remaining_delay) = MAX_BATCH_DELAY.checked_sub(round_start.elapsed())
1524                        && !remaining_delay.is_zero()
1525                    {
1526                        futures.push(Box::pin(tokio::time::sleep(remaining_delay)));
1527                    }
1528                    // Always ensure a wakeup no later than MAX_LEADER_CERTIFICATE_DELAY so that
1529                    // try_advance_to_next_round is called after the leader-certificate timer
1530                    // expires, even when no further certificates arrive (e.g. an even round where
1531                    // the elected leader was absent and quorum was reached without their cert).
1532                    futures.push(Box::pin(tokio::time::sleep(MAX_LEADER_CERTIFICATE_DELAY)));
1533                    if !self_.sync.is_synced() {
1534                        futures.push(Box::pin(self_.sync.wait_for_synced()));
1535                    }
1536                    let _ = futures::future::select_all(futures).await;
1537
1538                    if !self_.sync.is_synced() {
1539                        trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1540                        continue;
1541                    }
1542
1543                    let next_round = current_round.saturating_add(1);
1544                    let is_quorum_threshold_reached = {
1545                        let authors = self_.storage.get_certificate_authors_for_round(current_round);
1546                        if authors.is_empty() {
1547                            continue;
1548                        }
1549                        let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(current_round)
1550                        else {
1551                            warn!("Failed to retrieve the committee lookback for round {current_round}");
1552                            continue;
1553                        };
1554                        committee_lookback.is_quorum_threshold_reached(&authors)
1555                    };
1556
1557                    if is_quorum_threshold_reached {
1558                        debug!("Quorum threshold reached for round {current_round}");
1559                        if let Err(err) = self_.try_increment_to_the_next_round(next_round).await {
1560                            warn!("{}", flatten_error(err.context("Failed to increment to the next round")));
1561                        }
1562                    }
1563                }
1564            }
1565        });
1566
1567        // Start a handler to process new unconfirmed solutions.
1568        let self_ = self.clone();
1569        self.spawn(async move {
1570            while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1571                // Compute the checksum for the solution.
1572                let Ok(checksum) = solution.to_checksum::<N>() else {
1573                    error!("Failed to compute the checksum for the unconfirmed solution");
1574                    continue;
1575                };
1576                // Compute the worker ID.
1577                let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1578                    error!("Unable to determine the worker ID for the unconfirmed solution");
1579                    continue;
1580                };
1581                let self_ = self_.clone();
1582                tokio::spawn(async move {
1583                    // Retrieve the worker.
1584                    let worker = &self_.workers()[worker_id as usize];
1585                    // Process the unconfirmed solution.
1586                    let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1587                    // Send the result to the callback.
1588                    callback.send(result).ok();
1589                });
1590            }
1591        });
1592
1593        // Start a handler to process new unconfirmed transactions.
1594        let self_ = self.clone();
1595        self.spawn(async move {
1596            while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1597                trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1598                // Compute the checksum for the transaction.
1599                let Ok(checksum) = transaction.to_checksum::<N>() else {
1600                    error!("Failed to compute the checksum for the unconfirmed transaction");
1601                    continue;
1602                };
1603                // Compute the worker ID.
1604                let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1605                    error!("Unable to determine the worker ID for the unconfirmed transaction");
1606                    continue;
1607                };
1608                let self_ = self_.clone();
1609                tokio::spawn(async move {
1610                    // Retrieve the worker.
1611                    let worker = &self_.workers().get(worker_id as usize).expect("Invalid worker ID");
1612                    // Process the unconfirmed transaction.
1613                    let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1614                    // Send the result to the callback.
1615                    callback.send(result).ok();
1616                });
1617            }
1618        });
1619    }
1620
1621    /// Checks if the proposed batch is expired, and clears the proposed batch if it has expired.
1622    fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1623        // Check if the proposed batch is timed out or stale.
1624        // A batch being certified is not considered expired.
1625        let is_expired = match &*self.proposed_batch.read() {
1626            ProposedBatchState::Certifying(proposal) => proposal.round() < self.current_round(),
1627            _ => false,
1628        };
1629        // If the batch is expired, clear the proposed batch.
1630        if is_expired {
1631            // Reset the proposed batch.
1632            let old = std::mem::replace(&mut *self.proposed_batch.write(), ProposedBatchState::None);
1633            if let ProposedBatchState::Certifying(proposal) = old {
1634                debug!("Cleared expired proposal for round {}", proposal.round());
1635                self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1636            }
1637        }
1638        Ok(())
1639    }
1640
1641    /// Increments to the next round.
1642    async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1643        // If the next round is within GC range, then iterate to the penultimate round.
1644        if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1645            let mut fast_forward_round = self.current_round();
1646            // Iterate until the penultimate round is reached.
1647            while fast_forward_round < next_round.saturating_sub(1) {
1648                // Update to the next round in storage.
1649                fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1650                // Clear the proposed batch.
1651                *self.proposed_batch.write() = ProposedBatchState::None;
1652            }
1653        }
1654
1655        // Retrieve the current round.
1656        let current_round = self.current_round();
1657        // Attempt to advance to the next round.
1658        if current_round < next_round {
1659            // If a BFT sender was provided, send the current round to the BFT.
1660            let is_ready = if let Some(cb) = self.primary_callback.get() {
1661                cb.try_advance_to_next_round(current_round)
1662            }
1663            // Otherwise, handle the Narwhal case.
1664            else {
1665                // Update to the next round in storage.
1666                self.storage.increment_to_next_round(current_round)?;
1667                // Set 'is_ready' to 'true'.
1668                true
1669            };
1670
1671            // Notify the proposal task if the new round is ready.
1672            if is_ready && self.is_synced() {
1673                debug!("Primary is ready to propose the next round");
1674                self.proposal_task.signal();
1675            } else {
1676                debug!("Primary is not ready to propose the next round");
1677            }
1678        }
1679        Ok(())
1680    }
1681
1682    /// Ensures the primary is signing for the specified batch round.
1683    /// This method is used to ensure: for a given round, as soon as the primary starts proposing,
1684    /// it will no longer sign for the previous round (as it has enough previous certificates to proceed).
1685    fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1686        // Retrieve the current round.
1687        let current_round = self.current_round();
1688        // Ensure the batch round is within GC range of the current round.
1689        if current_round + self.storage.max_gc_rounds() <= batch_round {
1690            bail!("Round {batch_round} is too far in the future")
1691        }
1692        // Ensure the batch round is at or one before the current round.
1693        // Intuition: Our primary has moved on to the next round, but has not necessarily started proposing,
1694        // so we can still sign for the previous round. If we have started proposing, the next check will fail.
1695        if current_round > batch_round + 1 {
1696            bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1697        }
1698        // Check if the primary is still signing for the batch round.
1699        if let ProposedBatchState::Certifying(proposal) = &*self.proposed_batch.read()
1700            && proposal.round() > batch_round
1701        {
1702            bail!("Our primary at round {} is no longer signing for round {batch_round}", proposal.round())
1703        }
1704        Ok(())
1705    }
1706
1707    /// Ensure the primary is not creating batch proposals too frequently.
1708    /// This checks that the certificate timestamp for the previous round is within the expected range.
1709    fn check_peer_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1710        ensure!(author != self.gateway.account().address(), "Peer cannot propose a batch that is authored by myself");
1711
1712        // Retrieve the timestamp of the previous timestamp to check against.
1713        let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1714            // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY` seconds ago.
1715            Some(certificate) => certificate.timestamp(),
1716            // If we do not see a previous certificate for the author, then proceed optimistically.
1717            None => return Ok(()),
1718        };
1719
1720        // Determine the elapsed time since the previous timestamp.
1721        let elapsed = timestamp
1722            .checked_sub(previous_timestamp)
1723            .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1724        // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY` seconds ago.
1725        match elapsed < MIN_BATCH_DELAY.as_secs() as i64 {
1726            true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1727            false => Ok(()),
1728        }
1729    }
1730
1731    /// Ensure the primary is not creating batch proposals too frequently.
1732    /// This checks that the certificate timestamp for the previous round is within the expected range.
1733    ///
1734    /// # Returns
1735    /// - `Ok(true)` if the timestamp allows a new proposal.
1736    /// - `Ok(false)` if the timestamp is valid but too soon after the previous proposal.
1737    /// - `Err(err)` if an unexpected error occured, such as the timestamp being before the previous certificate.
1738    fn check_own_proposal_timestamp(
1739        &self,
1740        previous_round: u64,
1741        previous_timestamp: i64,
1742        timestamp: i64,
1743    ) -> Result<bool> {
1744        // Determine the elapsed time since the previous timestamp.
1745        let elapsed = timestamp
1746            .checked_sub(previous_timestamp)
1747            .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1748
1749        Ok(elapsed >= MIN_BATCH_DELAY.as_secs() as i64)
1750    }
1751
1752    /// Stores the certified batch and broadcasts it to all validators, returning the certificate.
1753    async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1754        // Create the batch certificate and transmissions.
1755        let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1756
1757        // Convert the transmissions into a HashMap.
1758        // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
1759        let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1760
1761        // Store some metadata about the certified batch.
1762        let round = certificate.round();
1763        let num_transmissions = certificate.transmission_ids().len();
1764
1765        // Store the certified batch.
1766        let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1767        spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1768        debug!("Stored a batch certificate for round {}", certificate.round());
1769        // The batch is now in storage, so late-arriving signatures can find it via contains_batch.
1770        // Transition from Certified back to None.
1771        *self.proposed_batch.write() = ProposedBatchState::None;
1772
1773        // If a BFT sender was provided, send the certificate to the BFT.
1774        if let Some(cb) = self.primary_callback.get() {
1775            // Await the callback to continue.
1776            cb.add_new_certificate(certificate.clone()).await.with_context(|| {
1777                format!("Failed to insert our newly certified batch for round {round} into the DAG")
1778            })?;
1779        }
1780        // Broadcast the certified batch to all validators.
1781        self.gateway.broadcast(Event::BatchCertified(certificate.into()));
1782
1783        // Log the certified batch.
1784        info!("Our batch with {num_transmissions} transmissions for round {round} was certified!");
1785
1786        // Record the certification latency (time from batch proposal to certification).
1787        #[cfg(feature = "metrics")]
1788        if let Some(start) = self.batch_propose_start.lock().take() {
1789            metrics::histogram(metrics::bft::BATCH_CERTIFICATION_LATENCY, start.elapsed().as_secs_f64());
1790        }
1791
1792        // Wake up the round increment task to re-check quorum.
1793        self.round_increment_notify.notify_one();
1794
1795        Ok(())
1796    }
1797
1798    /// Inserts the missing transmissions from the proposal into the workers.
1799    fn insert_missing_transmissions_into_workers(
1800        &self,
1801        peer_ip: SocketAddr,
1802        transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1803    ) -> Result<()> {
1804        // Insert the transmissions into the workers.
1805        assign_to_workers(self.workers(), transmissions, |worker, transmission_id, transmission| {
1806            worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1807        })
1808    }
1809
1810    /// Re-inserts the transmissions from the proposal into the workers.
1811    fn reinsert_transmissions_into_workers(
1812        &self,
1813        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1814    ) -> Result<()> {
1815        // Re-insert the transmissions into the workers.
1816        assign_to_workers(self.workers(), transmissions.into_iter(), |worker, transmission_id, transmission| {
1817            worker.reinsert(transmission_id, transmission);
1818        })
1819    }
1820
1821    /// Recursively stores a given batch certificate, after ensuring:
1822    ///   - Ensure the round matches the committee round.
1823    ///   - Ensure the address is a member of the committee.
1824    ///   - Ensure the timestamp is within range.
1825    ///   - Ensure we have all of the transmissions.
1826    ///   - Ensure we have all of the previous certificates.
1827    ///   - Ensure the previous certificates are for the previous round (i.e. round - 1).
1828    ///   - Ensure the previous certificates have reached the quorum threshold.
1829    ///   - Ensure we have not already signed the batch ID.
1830    #[async_recursion::async_recursion]
1831    async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1832        &self,
1833        peer_ip: SocketAddr,
1834        certificate: BatchCertificate<N>,
1835    ) -> Result<()> {
1836        // Retrieve the batch header.
1837        let batch_header = certificate.batch_header();
1838        // Retrieve the batch round.
1839        let batch_round = batch_header.round();
1840
1841        // If the certificate round is outdated, do not store it.
1842        if batch_round <= self.storage.gc_round() {
1843            return Ok(());
1844        }
1845        // If the certificate already exists in storage, return early.
1846        if self.storage.contains_certificate(certificate.id()) {
1847            return Ok(());
1848        }
1849
1850        // If node is not in sync mode and the node is not synced. Then return an error.
1851        if !IS_SYNCING && !self.is_synced() {
1852            bail!(
1853                "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1854                fmt_id(certificate.id())
1855            );
1856        }
1857
1858        // If the peer is ahead, use the batch header to sync up to the peer.
1859        let missing_transmissions =
1860            self.sync_with_batch_header_from_peer::<IS_SYNCING, false>(peer_ip, batch_header).await?;
1861
1862        // Check if the certificate needs to be stored.
1863        if !self.storage.contains_certificate(certificate.id()) {
1864            // Store the batch certificate.
1865            let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1866            spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1867            debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1868            // If a BFT sender was provided, send the round and certificate to the BFT.
1869            if let Some(cb) = self.primary_callback.get() {
1870                cb.add_new_certificate(certificate).await.with_context(|| "Failed to update the DAG from sync")?;
1871            }
1872            // Wake the round-increment task to re-check quorum.
1873            self.round_increment_notify.notify_one();
1874        }
1875        Ok(())
1876    }
1877
1878    /// Recursively syncs using the given batch header.
1879    async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool, const CHECK_PREVIOUS_CERTIFICATES: bool>(
1880        &self,
1881        peer_ip: SocketAddr,
1882        batch_header: &BatchHeader<N>,
1883    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1884        // Retrieve the batch round.
1885        let batch_round = batch_header.round();
1886
1887        // If the certificate round is outdated, do not store it.
1888        if batch_round <= self.storage.gc_round() {
1889            bail!("Round {batch_round} is too far in the past")
1890        }
1891
1892        // If node is not in sync mode and the node is not synced, then return an error.
1893        if !IS_SYNCING && !self.is_synced() {
1894            bail!(
1895                "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1896                fmt_id(batch_header.batch_id())
1897            );
1898        }
1899
1900        // Determine if quorum threshold is reached on the batch round.
1901        let is_quorum_threshold_reached = {
1902            let authors = self.storage.get_certificate_authors_for_round(batch_round);
1903            let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1904            committee_lookback.is_quorum_threshold_reached(&authors)
1905        };
1906
1907        // Check if our primary should move to the next round.
1908        // Note: Checking that quorum threshold is reached is important for mitigating a race condition,
1909        // whereby Narwhal requires N-f, however the BFT only requires f+1. Without this check, the primary
1910        // will advance to the next round assuming f+1, not N-f, which can lead to a network stall.
1911        let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1912        // Check if our primary is far behind the peer.
1913        let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1914        // If our primary is far behind the peer, update our committee to the batch round.
1915        if is_behind_schedule || is_peer_far_in_future {
1916            // If the batch round is greater than the current committee round, update the committee.
1917            self.try_increment_to_the_next_round(batch_round)
1918                .await
1919                .with_context(|| "Failed to fast forward current round")?;
1920        }
1921
1922        // Ensure the primary has all of the transmissions.
1923        let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1924
1925        // Ensure the primary has all of the previous certificates.
1926        let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1927
1928        // Wait for the missing transmissions and previous certificates to be fetched.
1929        let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1930            missing_transmissions_handle,
1931            missing_previous_certificates_handle,
1932        ).with_context(|| format!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}"))?;
1933
1934        // Process all missing previous certificates in parallel, so that their
1935        // recursive fetches (for level R-2, R-3, ...) all fire concurrently
1936        // rather than being serialized one certificate at a time.
1937        let futures = missing_previous_certificates.into_iter().map(|batch_certificate| {
1938            let self_ = self.clone();
1939            async move {
1940                // Check if the missing previous certificate is valid. This is only
1941                // needed if we are processing an incoming batch header from a peer.
1942                // For incoming certificates, validity is assured by checking the
1943                // root certificate in `process_batch_certificate_from_peer`.
1944                if CHECK_PREVIOUS_CERTIFICATES {
1945                    self_.storage.check_incoming_certificate(&batch_certificate)?;
1946                }
1947                // Store the batch certificate (recursively fetching any missing previous certificates).
1948                self_.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await
1949            }
1950        });
1951
1952        // Check if any of the fetches failed. If so, return an error.
1953        let mut latest_error = None;
1954        for result in join_all(futures).await.into_iter() {
1955            if let Err(err) = result {
1956                let err = err.context("Failed to fetch previous certificate");
1957                error!("{}", flatten_error(&err));
1958                latest_error = Some(err);
1959            }
1960        }
1961
1962        if let Some(err) = latest_error { Err(err) } else { Ok(missing_transmissions) }
1963    }
1964
1965    /// Fetches any missing transmissions for the specified batch header.
1966    /// If a transmission does not exist, it will be fetched from the specified peer IP.
1967    async fn fetch_missing_transmissions(
1968        &self,
1969        peer_ip: SocketAddr,
1970        batch_header: &BatchHeader<N>,
1971    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1972        // If the round is <= the GC round, return early.
1973        if batch_header.round() <= self.storage.gc_round() {
1974            return Ok(Default::default());
1975        }
1976
1977        // Ensure this batch ID is new, otherwise return early.
1978        if self.storage.contains_batch(batch_header.batch_id()) {
1979            trace!("Batch for round {} from peer has already been processed", batch_header.round());
1980            return Ok(Default::default());
1981        }
1982
1983        // Retrieve the workers.
1984        let workers = self.workers.clone();
1985
1986        // Initialize a list for the transmissions.
1987        let mut fetch_transmissions = FuturesUnordered::new();
1988
1989        // Retrieve the number of workers.
1990        let num_workers = self.num_workers();
1991        // Iterate through the transmission IDs.
1992        for transmission_id in batch_header.transmission_ids() {
1993            // If the transmission does not exist in storage, proceed to fetch the transmission.
1994            if !self.storage.contains_transmission(*transmission_id) {
1995                // Determine the worker ID.
1996                let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1997                    bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1998                };
1999                // Retrieve the worker.
2000                let Some(worker) = workers.get().expect("No workers set").get(worker_id as usize) else {
2001                    bail!("Unable to find worker {worker_id}")
2002                };
2003                // Push the callback onto the list.
2004                fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
2005            }
2006        }
2007
2008        // Initialize a set for the transmissions.
2009        let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
2010        // Wait for all of the transmissions to be fetched.
2011        while let Some(result) = fetch_transmissions.next().await {
2012            // Retrieve the transmission.
2013            let (transmission_id, transmission) = result?;
2014            // Insert the transmission into the set.
2015            transmissions.insert(transmission_id, transmission);
2016        }
2017        // Return the transmissions.
2018        Ok(transmissions)
2019    }
2020
2021    /// Fetches any missing previous certificates for the specified batch header from the specified peer.
2022    async fn fetch_missing_previous_certificates(
2023        &self,
2024        peer_ip: SocketAddr,
2025        batch_header: &BatchHeader<N>,
2026    ) -> Result<HashSet<BatchCertificate<N>>> {
2027        // Retrieve the round.
2028        let round = batch_header.round();
2029        // If the previous round is 0, or is <= the GC round, return early.
2030        if round == 1 || round <= self.storage.gc_round() + 1 {
2031            return Ok(Default::default());
2032        }
2033
2034        // Fetch the missing previous certificates.
2035        let missing_previous_certificates =
2036            self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
2037        if !missing_previous_certificates.is_empty() {
2038            debug!(
2039                "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
2040                missing_previous_certificates.len(),
2041            );
2042        }
2043        // Return the missing previous certificates.
2044        Ok(missing_previous_certificates)
2045    }
2046
2047    /// Fetches any missing certificates for the specified batch header from the specified peer.
2048    async fn fetch_missing_certificates(
2049        &self,
2050        peer_ip: SocketAddr,
2051        round: u64,
2052        certificate_ids: &IndexSet<Field<N>>,
2053    ) -> Result<HashSet<BatchCertificate<N>>> {
2054        // Initialize a list for the missing certificates.
2055        let mut fetch_certificates = FuturesUnordered::new();
2056        // Initialize a set for the missing certificates.
2057        let mut missing_certificates = HashSet::default();
2058        // Iterate through the certificate IDs.
2059        for certificate_id in certificate_ids {
2060            // Check if the certificate already exists in the ledger.
2061            if self.ledger.contains_certificate(certificate_id)? {
2062                continue;
2063            }
2064            // Check if the certificate already exists in storage.
2065            if self.storage.contains_certificate(*certificate_id) {
2066                continue;
2067            }
2068            // If we have not fully processed the certificate yet, store it.
2069            if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
2070                missing_certificates.insert(certificate);
2071            } else {
2072                // If we do not have the certificate, request it.
2073                trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
2074                // TODO (howardwu): Limit the number of open requests we send to a peer.
2075                // Send an certificate request to the peer.
2076                fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
2077            }
2078        }
2079
2080        // If there are no certificates to fetch, return early with the existing unprocessed certificates.
2081        match fetch_certificates.is_empty() {
2082            true => return Ok(missing_certificates),
2083            false => trace!(
2084                "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
2085                fetch_certificates.len(),
2086            ),
2087        }
2088
2089        // Wait for all of the missing certificates to be fetched.
2090        while let Some(result) = fetch_certificates.next().await {
2091            // Insert the missing certificate into the set.
2092            missing_certificates.insert(result?);
2093        }
2094        // Return the missing certificates.
2095        Ok(missing_certificates)
2096    }
2097}
2098
2099impl<N: Network> Primary<N> {
2100    /// Spawns a task with the given future; it should only be used for long-running tasks.
2101    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
2102        self.handles.lock().push(tokio::spawn(future));
2103    }
2104
2105    /// Shuts down the primary.
2106    pub async fn shut_down(&self) {
2107        info!("Shutting down the primary...");
2108        // Remove the callback.
2109        self.primary_callback.clear();
2110        // Shut down the sync service.
2111        self.sync.shut_down().await;
2112        // Shut down the workers.
2113        self.workers().iter().for_each(|worker| worker.shut_down());
2114        // Abort the tasks.
2115        self.handles.lock().drain(..).for_each(|handle| handle.abort());
2116        // Save the current proposal cache to disk.
2117        let proposal_cache = {
2118            // Only persist a Certifying batch; a Certified batch will appear in pending_certificates.
2119            // Note: it is guaranteed that there are no concurrent accesses to `proposed_batch` as all
2120            // background tasks already terminated at this point.
2121            let proposal = match std::mem::replace(&mut *self.proposed_batch.write(), ProposedBatchState::None) {
2122                ProposedBatchState::Certifying(p) => Some(*p),
2123                _ => None,
2124            };
2125            let signed_proposals = self.signed_proposals.read().clone();
2126            let latest_round = proposal
2127                .as_ref()
2128                .map(Proposal::round)
2129                .unwrap_or(self.latest_proposal_timestamp.read().await.map(|(round, _)| round).unwrap_or(0));
2130            let pending_certificates = self.storage.get_pending_certificates();
2131            ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
2132        };
2133        if let Err(err) = proposal_cache.store(&self.node_data_dir) {
2134            error!("{}", flatten_error(err.context("Failed to store the current proposal cache")));
2135        }
2136        // Close the gateway.
2137        self.gateway.shut_down().await;
2138    }
2139}
2140
2141#[cfg(test)]
2142mod tests {
2143    use super::{proposal_task::BatchPropose as _, *};
2144
2145    use snarkos_node_bft_ledger_service::MockLedgerService;
2146    use snarkos_node_bft_storage_service::BFTMemoryService;
2147    use snarkos_node_sync::{BlockSync, locators::test_helpers::sample_block_locators};
2148    use snarkvm::{
2149        ledger::{
2150            committee::{Committee, MIN_VALIDATOR_STAKE},
2151            test_helpers::sample_execution_transaction_with_fee,
2152        },
2153        prelude::{Address, Signature},
2154    };
2155
2156    use bytes::Bytes;
2157    use indexmap::IndexSet;
2158    use rand::RngExt;
2159
2160    type CurrentNetwork = snarkvm::prelude::MainnetV0;
2161
2162    fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) {
2163        // Create a committee containing the primary's account.
2164        const COMMITTEE_SIZE: usize = 4;
2165        let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
2166        let mut members = IndexMap::new();
2167
2168        for i in 0..COMMITTEE_SIZE {
2169            let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
2170            let account = Account::new(rng).unwrap();
2171
2172            members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.random_range(0..100)));
2173            accounts.push((socket_addr, account));
2174        }
2175
2176        (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
2177    }
2178
2179    // Returns a primary and a list of accounts in the configured committee.
2180    fn primary_with_committee(
2181        account_index: usize,
2182        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2183        committee: Committee<CurrentNetwork>,
2184        height: u32,
2185    ) -> Primary<CurrentNetwork> {
2186        let ledger = Arc::new(MockLedgerService::new_at_height(committee, height));
2187        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10).unwrap();
2188
2189        // Initialize the primary.
2190        let account = accounts[account_index].1.clone();
2191        let block_sync = Arc::new(BlockSync::new(ledger.clone(), ConnectionMode::Gateway));
2192        let primary =
2193            Primary::new(account, storage, ledger, block_sync, None, &[], false, NodeDataDir::new_test(None), None)
2194                .unwrap();
2195
2196        // Construct a worker instance.
2197        let worker = Worker::new(
2198            0, // id
2199            Arc::new(primary.gateway.clone()),
2200            primary.storage.clone(),
2201            primary.ledger.clone(),
2202            primary.proposed_batch.clone(),
2203        )
2204        .unwrap();
2205        let _ = primary.workers.set(vec![worker]);
2206        for a in accounts.iter().skip(account_index) {
2207            primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
2208        }
2209
2210        primary
2211    }
2212
2213    fn primary_without_handlers(
2214        rng: &mut TestRng,
2215    ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
2216        let (accounts, committee) = sample_committee(rng);
2217        let primary = primary_with_committee(
2218            0, // index of primary's account
2219            &accounts,
2220            committee,
2221            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(),
2222        );
2223
2224        (primary, accounts)
2225    }
2226
2227    // Creates a mock solution.
2228    fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
2229        // Sample a random fake solution ID.
2230        let solution_id = rng.random::<u64>().into();
2231        // Vary the size of the solutions.
2232        let size = rng.random_range(1024..10 * 1024);
2233        // Sample random fake solution bytes.
2234        let vec: Vec<u8> = (0..size).map(|_| rng.random::<u8>()).collect();
2235        let solution = Data::Buffer(Bytes::from(vec));
2236        // Return the solution ID and solution.
2237        (solution_id, solution)
2238    }
2239
2240    // Samples a test transaction.
2241    fn sample_unconfirmed_transaction(
2242        rng: &mut TestRng,
2243    ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
2244        let transaction = sample_execution_transaction_with_fee(false, rng, 0);
2245        let id = transaction.id();
2246
2247        (id, Data::Object(transaction))
2248    }
2249
2250    // Creates a batch proposal with one solution and one transaction.
2251    fn create_test_proposal(
2252        author: &Account<CurrentNetwork>,
2253        committee: Committee<CurrentNetwork>,
2254        round: u64,
2255        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2256        timestamp: i64,
2257        num_transactions: u64,
2258        rng: &mut TestRng,
2259    ) -> Proposal<CurrentNetwork> {
2260        let mut transmission_ids = IndexSet::new();
2261        let mut transmissions = IndexMap::new();
2262
2263        // Prepare the solution and insert into the sets.
2264        let (solution_id, solution) = sample_unconfirmed_solution(rng);
2265        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2266        let solution_transmission_id = (solution_id, solution_checksum).into();
2267        transmission_ids.insert(solution_transmission_id);
2268        transmissions.insert(solution_transmission_id, Transmission::Solution(solution));
2269
2270        // Prepare the transactions and insert into the sets.
2271        for _ in 0..num_transactions {
2272            let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2273            let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2274            let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2275            transmission_ids.insert(transaction_transmission_id);
2276            transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction));
2277        }
2278
2279        // Retrieve the private key.
2280        let private_key = author.private_key();
2281        // Sign the batch header.
2282        let batch_header = BatchHeader::new(
2283            private_key,
2284            round,
2285            timestamp,
2286            committee.id(),
2287            transmission_ids,
2288            previous_certificate_ids,
2289            rng,
2290        )
2291        .unwrap();
2292        // Construct the proposal.
2293        Proposal::new(committee, batch_header, transmissions).unwrap()
2294    }
2295
2296    // Creates a signature of the primary's current proposal for each committee member (excluding
2297    // the primary).
2298    fn peer_signatures_for_proposal(
2299        primary: &Primary<CurrentNetwork>,
2300        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2301        rng: &mut TestRng,
2302    ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
2303        // Each committee member signs the batch.
2304        let mut signatures = Vec::with_capacity(accounts.len() - 1);
2305        for (socket_addr, account) in accounts {
2306            if account.address() == primary.gateway.account().address() {
2307                continue;
2308            }
2309            let batch_id = primary.proposed_batch.read().as_proposal().unwrap().batch_id();
2310            let signature = account.sign(&[batch_id], rng).unwrap();
2311            signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
2312        }
2313
2314        signatures
2315    }
2316
2317    /// Creates a signature of the batch ID for each committee member (excluding the primary).
2318    fn peer_signatures_for_batch(
2319        primary_address: Address<CurrentNetwork>,
2320        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2321        batch_id: Field<CurrentNetwork>,
2322        rng: &mut TestRng,
2323    ) -> IndexSet<Signature<CurrentNetwork>> {
2324        let mut signatures = IndexSet::new();
2325        for (_, account) in accounts {
2326            if account.address() == primary_address {
2327                continue;
2328            }
2329            let signature = account.sign(&[batch_id], rng).unwrap();
2330            signatures.insert(signature);
2331        }
2332        signatures
2333    }
2334
2335    // Creates a batch certificate.
2336    fn create_batch_certificate(
2337        primary_address: Address<CurrentNetwork>,
2338        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2339        round: u64,
2340        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2341        rng: &mut TestRng,
2342    ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
2343        let timestamp = now();
2344
2345        let author =
2346            accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
2347        let private_key = author.private_key();
2348
2349        let committee_id = Field::rand(rng);
2350        let (solution_id, solution) = sample_unconfirmed_solution(rng);
2351        let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2352        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2353        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2354
2355        let solution_transmission_id = (solution_id, solution_checksum).into();
2356        let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2357
2358        let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
2359        let transmissions = [
2360            (solution_transmission_id, Transmission::Solution(solution)),
2361            (transaction_transmission_id, Transmission::Transaction(transaction)),
2362        ]
2363        .into();
2364
2365        let batch_header = BatchHeader::new(
2366            private_key,
2367            round,
2368            timestamp,
2369            committee_id,
2370            transmission_ids,
2371            previous_certificate_ids,
2372            rng,
2373        )
2374        .unwrap();
2375        let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
2376        let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
2377        (certificate, transmissions)
2378    }
2379
2380    // Create a certificate chain up to, but not including, the specified round in the primary storage.
2381    fn store_certificate_chain(
2382        primary: &Primary<CurrentNetwork>,
2383        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2384        round: u64,
2385        rng: &mut TestRng,
2386    ) -> IndexSet<Field<CurrentNetwork>> {
2387        let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2388        let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2389        for cur_round in 1..round {
2390            for (_, account) in accounts.iter() {
2391                let (certificate, transmissions) = create_batch_certificate(
2392                    account.address(),
2393                    accounts,
2394                    cur_round,
2395                    previous_certificates.clone(),
2396                    rng,
2397                );
2398                next_certificates.insert(certificate.id());
2399                assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
2400            }
2401
2402            assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
2403            previous_certificates = next_certificates;
2404            next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2405        }
2406
2407        previous_certificates
2408    }
2409
2410    // Insert the account socket addresses into the resolver so that
2411    // they are recognized as "connected".
2412    fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
2413        // First account is primary, which doesn't need to resolve.
2414        for (addr, acct) in accounts.iter().skip(1) {
2415            primary.gateway.resolver().write().insert_peer(*addr, *addr, Some(acct.address()));
2416        }
2417    }
2418
2419    #[test_log::test(tokio::test)]
2420    async fn test_propose_batch() {
2421        let mut rng = TestRng::default();
2422        let (primary, _) = primary_without_handlers(&mut rng);
2423
2424        // Check there is no batch currently proposed.
2425        assert!(primary.proposed_batch.read().is_none());
2426
2427        // Generate a solution and a transaction.
2428        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2429        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2430
2431        // Store it on one of the workers.
2432        primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2433        primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2434
2435        // Try to propose a batch again. This time, it should succeed.
2436        assert!(primary.propose_batch().await.is_ok());
2437        assert!(primary.proposed_batch.read().is_proposed());
2438    }
2439
2440    #[test_log::test(tokio::test)]
2441    async fn test_propose_batch_with_no_transmissions() {
2442        let mut rng = TestRng::default();
2443        let (primary, _) = primary_without_handlers(&mut rng);
2444
2445        // Check there is no batch currently proposed.
2446        assert!(primary.proposed_batch.read().is_none());
2447
2448        // Try to propose a batch with no transmissions.
2449        assert!(primary.propose_batch().await.is_ok());
2450        assert!(primary.proposed_batch.read().is_proposed());
2451    }
2452
2453    #[test_log::test(tokio::test)]
2454    async fn test_propose_batch_in_round() {
2455        let round = 3;
2456        let mut rng = TestRng::default();
2457        let (primary, accounts) = primary_without_handlers(&mut rng);
2458
2459        // Fill primary storage.
2460        store_certificate_chain(&primary, &accounts, round, &mut rng);
2461
2462        // Sleep for a while to ensure the primary is ready to propose the next round.
2463        tokio::time::sleep(MIN_BATCH_DELAY).await;
2464
2465        // Generate a solution and a transaction.
2466        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2467        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2468
2469        // Store it on one of the workers.
2470        primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2471        primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2472
2473        // Propose a batch again. This time, it should succeed.
2474        assert!(primary.propose_batch().await.is_ok());
2475        assert!(primary.proposed_batch.read().is_proposed());
2476    }
2477
2478    #[test_log::test(tokio::test)]
2479    async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2480        let round = 3;
2481        let prev_round = round - 1;
2482        let mut rng = TestRng::default();
2483        let (primary, accounts) = primary_without_handlers(&mut rng);
2484        let peer_account = &accounts[1];
2485        let peer_ip = peer_account.0;
2486
2487        // Fill primary storage.
2488        store_certificate_chain(&primary, &accounts, round, &mut rng);
2489
2490        // Get transmissions from previous certificates.
2491        let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2492
2493        // Track the number of transmissions in the previous round.
2494        let mut num_transmissions_in_previous_round = 0;
2495
2496        // Generate a solution and a transaction.
2497        let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2498        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2499        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2500        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2501
2502        // Store it on one of the workers.
2503        primary.workers()[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2504        primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2505
2506        // Check that the worker has 2 transmissions.
2507        assert_eq!(primary.workers()[0].num_transmissions(), 2);
2508
2509        // Create certificates for the current round and add the transmissions to the worker before inserting the certificate to storage.
2510        for (_, account) in accounts.iter() {
2511            let (certificate, transmissions) = create_batch_certificate(
2512                account.address(),
2513                &accounts,
2514                round,
2515                previous_certificate_ids.clone(),
2516                &mut rng,
2517            );
2518
2519            // Add the transmissions to the worker.
2520            for (transmission_id, transmission) in transmissions.iter() {
2521                primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2522            }
2523
2524            // Insert the certificate to storage.
2525            num_transmissions_in_previous_round += transmissions.len();
2526            primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2527        }
2528
2529        // Sleep for a while to ensure the primary is ready to propose the next round.
2530        tokio::time::sleep(MIN_BATCH_DELAY).await;
2531
2532        // Advance to the next round.
2533        assert!(primary.storage.increment_to_next_round(round).is_ok());
2534
2535        // Check that the worker has `num_transmissions_in_previous_round + 2` transmissions.
2536        assert_eq!(primary.workers()[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2537
2538        // Propose the batch.
2539        assert!(primary.propose_batch().await.is_ok());
2540
2541        // Check that the proposal only contains the new transmissions that were not in previous certificates.
2542        let proposed_transmissions = primary.proposed_batch.read().as_proposal().unwrap().transmissions().clone();
2543        assert_eq!(proposed_transmissions.len(), 2);
2544        assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2545        assert!(
2546            proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))
2547        );
2548    }
2549
2550    #[test_log::test(tokio::test)]
2551    async fn test_propose_batch_over_spend_limit() {
2552        let mut rng = TestRng::default();
2553
2554        // Create a primary to test spend limit backwards compatibility with V4.
2555        let (accounts, committee) = sample_committee(&mut rng);
2556        let primary = primary_with_committee(
2557            0,
2558            &accounts,
2559            committee.clone(),
2560            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2561        );
2562
2563        // Check there is no batch currently proposed.
2564        assert!(primary.proposed_batch.read().is_none());
2565        // Check the workers are empty.
2566        primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty()));
2567
2568        // Generate a solution and transactions.
2569        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2570        primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2571
2572        for _i in 0..5 {
2573            let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2574            // Store it on one of the workers.
2575            primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2576        }
2577
2578        // Try to propose a batch again. This time, it should succeed.
2579        assert!(primary.propose_batch().await.is_ok());
2580        // Expect 2/5 transactions to be included in the proposal in addition to the solution.
2581        assert_eq!(primary.proposed_batch.read().as_proposal().unwrap().transmissions().len(), 3);
2582        // Check the transmissions were correctly drained from the workers.
2583        assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3);
2584    }
2585
2586    #[test_log::test(tokio::test)]
2587    async fn test_batch_propose_from_peer() {
2588        let mut rng = TestRng::default();
2589        let (primary, accounts) = primary_without_handlers(&mut rng);
2590
2591        // Create a valid proposal with an author that isn't the primary.
2592        let round = 1;
2593        let peer_account = &accounts[1];
2594        let peer_ip = peer_account.0;
2595        let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2596        let proposal = create_test_proposal(
2597            &peer_account.1,
2598            primary.ledger.current_committee().unwrap(),
2599            round,
2600            Default::default(),
2601            timestamp,
2602            1,
2603            &mut rng,
2604        );
2605
2606        // Make sure the primary is aware of the transmissions in the proposal.
2607        for (transmission_id, transmission) in proposal.transmissions() {
2608            primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2609        }
2610
2611        // The author must be known to resolver to pass propose checks.
2612        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2613
2614        // The primary will only consider itself synced if we received
2615        // block locators from a peer.
2616        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2617        primary.sync.testing_only_set_sync_height_testing_only(20);
2618
2619        // Try to process the batch proposal from the peer, should succeed.
2620        assert!(
2621            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
2622        );
2623    }
2624
2625    #[test_log::test(tokio::test)]
2626    async fn test_batch_propose_from_peer_when_not_synced() {
2627        let mut rng = TestRng::default();
2628        let (primary, accounts) = primary_without_handlers(&mut rng);
2629
2630        // Create a valid proposal with an author that isn't the primary.
2631        let round = 1;
2632        let peer_account = &accounts[1];
2633        let peer_ip = peer_account.0;
2634        let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2635        let proposal = create_test_proposal(
2636            &peer_account.1,
2637            primary.ledger.current_committee().unwrap(),
2638            round,
2639            Default::default(),
2640            timestamp,
2641            1,
2642            &mut rng,
2643        );
2644
2645        // Make sure the primary is aware of the transmissions in the proposal.
2646        for (transmission_id, transmission) in proposal.transmissions() {
2647            primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2648        }
2649
2650        // The author must be known to resolver to pass propose checks.
2651        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2652
2653        // Add a high block locator to indicate we are not synced.
2654        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2655
2656        // Try to process the batch proposal from the peer, should fail
2657        assert!(
2658            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2659        );
2660    }
2661
2662    #[test_log::test(tokio::test)]
2663    async fn test_batch_propose_from_peer_in_round() {
2664        let round = 2;
2665        let mut rng = TestRng::default();
2666        let (primary, accounts) = primary_without_handlers(&mut rng);
2667
2668        // Generate certificates.
2669        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2670
2671        // Create a valid proposal with an author that isn't the primary.
2672        let peer_account = &accounts[1];
2673        let peer_ip = peer_account.0;
2674        let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2675        let proposal = create_test_proposal(
2676            &peer_account.1,
2677            primary.ledger.current_committee().unwrap(),
2678            round,
2679            previous_certificates,
2680            timestamp,
2681            1,
2682            &mut rng,
2683        );
2684
2685        // Make sure the primary is aware of the transmissions in the proposal.
2686        for (transmission_id, transmission) in proposal.transmissions() {
2687            primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2688        }
2689
2690        // The author must be known to resolver to pass propose checks.
2691        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2692
2693        // The primary will only consider itself synced if we received
2694        // block locators from a peer.
2695        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2696        primary.sync.testing_only_set_sync_height_testing_only(20);
2697
2698        // Try to process the batch proposal from the peer, should succeed.
2699        primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2700    }
2701
2702    #[test_log::test(tokio::test)]
2703    async fn test_batch_propose_from_peer_wrong_round() {
2704        let mut rng = TestRng::default();
2705        let (primary, accounts) = primary_without_handlers(&mut rng);
2706
2707        // Create a valid proposal with an author that isn't the primary.
2708        let round = 1;
2709        let peer_account = &accounts[1];
2710        let peer_ip = peer_account.0;
2711        let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2712        let proposal = create_test_proposal(
2713            &peer_account.1,
2714            primary.ledger.current_committee().unwrap(),
2715            round,
2716            Default::default(),
2717            timestamp,
2718            1,
2719            &mut rng,
2720        );
2721
2722        // Make sure the primary is aware of the transmissions in the proposal.
2723        for (transmission_id, transmission) in proposal.transmissions() {
2724            primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2725        }
2726
2727        // The author must be known to resolver to pass propose checks.
2728        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2729        // The primary must be considered synced.
2730        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2731        primary.sync.testing_only_set_sync_height_testing_only(20);
2732
2733        // Try to process the batch proposal from the peer, should error.
2734        assert!(
2735            primary
2736                .process_batch_propose_from_peer(peer_ip, BatchPropose {
2737                    round: round + 1,
2738                    batch_header: Data::Object(proposal.batch_header().clone())
2739                })
2740                .await
2741                .is_err()
2742        );
2743    }
2744
2745    #[test_log::test(tokio::test)]
2746    async fn test_batch_propose_from_peer_in_round_wrong_round() {
2747        let round = 4;
2748        let mut rng = TestRng::default();
2749        let (primary, accounts) = primary_without_handlers(&mut rng);
2750
2751        // Generate certificates.
2752        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2753
2754        // Create a valid proposal with an author that isn't the primary.
2755        let peer_account = &accounts[1];
2756        let peer_ip = peer_account.0;
2757        let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2758        let proposal = create_test_proposal(
2759            &peer_account.1,
2760            primary.ledger.current_committee().unwrap(),
2761            round,
2762            previous_certificates,
2763            timestamp,
2764            1,
2765            &mut rng,
2766        );
2767
2768        // Make sure the primary is aware of the transmissions in the proposal.
2769        for (transmission_id, transmission) in proposal.transmissions() {
2770            primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2771        }
2772
2773        // The author must be known to resolver to pass propose checks.
2774        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2775        // The primary must be considered synced.
2776        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2777        primary.sync.testing_only_set_sync_height_testing_only(0);
2778
2779        // Try to process the batch proposal from the peer, should error.
2780        assert!(
2781            primary
2782                .process_batch_propose_from_peer(peer_ip, BatchPropose {
2783                    round: round + 1,
2784                    batch_header: Data::Object(proposal.batch_header().clone())
2785                })
2786                .await
2787                .is_err()
2788        );
2789    }
2790
2791    /// Tests that the minimum batch delay is enforced as expected, i.e., that proposals with timestamps that are too close to the previous proposal are rejected.
2792    #[test_log::test(tokio::test)]
2793    async fn test_batch_propose_from_peer_with_past_timestamp() {
2794        let round = 2;
2795        let mut rng = TestRng::default();
2796        let (primary, accounts) = primary_without_handlers(&mut rng);
2797
2798        // Generate certificates.
2799        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2800
2801        // Create a valid proposal with an author that isn't the primary.
2802        let peer_account = &accounts[1];
2803        let peer_ip = peer_account.0;
2804
2805        // Use a timestamp that is too early.
2806        // Set it to something that is less than the minimum batch delay
2807        // Note, that the minimum delay is currently 1, so this will be equal to the last timestamp
2808        let last_timestamp = primary
2809            .storage
2810            .get_certificate_for_round_with_author(round - 1, peer_account.1.address())
2811            .expect("No previous proposal exists")
2812            .timestamp();
2813        let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY.as_secs() as i64) - 1;
2814
2815        let proposal = create_test_proposal(
2816            &peer_account.1,
2817            primary.ledger.current_committee().unwrap(),
2818            round,
2819            previous_certificates,
2820            invalid_timestamp,
2821            1,
2822            &mut rng,
2823        );
2824
2825        // Make sure the primary is aware of the transmissions in the proposal.
2826        for (transmission_id, transmission) in proposal.transmissions() {
2827            primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2828        }
2829
2830        // The author must be known to resolver to pass propose checks.
2831        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2832        // The primary must be considered synced.
2833        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2834        primary.sync.testing_only_set_sync_height_testing_only(0);
2835
2836        // Try to process the batch proposal from the peer, should error.
2837        assert!(
2838            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2839        );
2840    }
2841
2842    #[test_log::test(tokio::test)]
2843    async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2844        let round = 3;
2845        let mut rng = TestRng::default();
2846        let (primary, _) = primary_without_handlers(&mut rng);
2847
2848        // Check there is no batch currently proposed.
2849        assert!(primary.proposed_batch.read().is_none());
2850
2851        // Generate a solution and a transaction.
2852        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2853        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2854
2855        // Store it on one of the workers.
2856        primary.workers()[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2857        primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2858
2859        // Set the proposal lock to a round ahead of the storage.
2860        let (old_proposal_round, old_proposal_timestamp) = primary
2861            .latest_proposal_timestamp
2862            .read()
2863            .await
2864            .map(|(round, timestamp)| (round, timestamp))
2865            .unwrap_or((0, 0));
2866        *primary.latest_proposal_timestamp.write().await =
2867            Some((round + 1, old_proposal_timestamp + MIN_BATCH_DELAY.as_secs() as i64));
2868
2869        // Propose a batch and enforce that it fails.
2870        assert!(primary.propose_batch().await.is_ok());
2871        assert!(primary.proposed_batch.read().is_none());
2872
2873        // Set the proposal lock back to the old round.
2874        *primary.latest_proposal_timestamp.write().await = Some((old_proposal_round, old_proposal_timestamp));
2875
2876        // Try to propose a batch again. This time, it should succeed.
2877        assert!(primary.propose_batch().await.is_ok());
2878        assert!(primary.proposed_batch.read().is_proposed());
2879    }
2880
2881    #[test_log::test(tokio::test)]
2882    async fn test_propose_batch_with_storage_round_behind_proposal() {
2883        let round = 5;
2884        let mut rng = TestRng::default();
2885        let (primary, accounts) = primary_without_handlers(&mut rng);
2886
2887        // Generate previous certificates.
2888        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2889
2890        // Create a valid proposal.
2891        let timestamp = now();
2892        let proposal = create_test_proposal(
2893            primary.gateway.account(),
2894            primary.ledger.current_committee().unwrap(),
2895            round + 1,
2896            previous_certificates,
2897            timestamp,
2898            1,
2899            &mut rng,
2900        );
2901
2902        // Store the proposal on the primary.
2903        *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
2904
2905        // Try to propose a batch will terminate early because the storage is behind the proposal.
2906        assert!(primary.propose_batch().await.is_ok());
2907        assert!(primary.proposed_batch.read().is_proposed());
2908        assert!(primary.proposed_batch.read().as_proposal().unwrap().round() > primary.current_round());
2909    }
2910
2911    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2912    async fn test_batch_signature_from_peer() {
2913        let mut rng = TestRng::default();
2914        let (primary, accounts) = primary_without_handlers(&mut rng);
2915        map_account_addresses(&primary, &accounts);
2916
2917        // Create a valid proposal.
2918        let round = 1;
2919        let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2920        let proposal = create_test_proposal(
2921            primary.gateway.account(),
2922            primary.ledger.current_committee().unwrap(),
2923            round,
2924            Default::default(),
2925            timestamp,
2926            1,
2927            &mut rng,
2928        );
2929
2930        // Store the proposal on the primary.
2931        *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
2932
2933        // Each committee member signs the batch.
2934        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2935
2936        // Have the primary process the signatures.
2937        for (socket_addr, signature) in signatures {
2938            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2939        }
2940
2941        // Check the certificate was created and stored by the primary.
2942        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2943        // Manually attempt round advancement (because the handler is not running).
2944        primary.try_increment_to_the_next_round(round + 1).await.unwrap();
2945        // Check the round was incremented.
2946        assert_eq!(primary.current_round(), round + 1);
2947    }
2948
2949    #[test_log::test(tokio::test(flavor = "multi_thread"))]
2950    async fn test_batch_signature_from_peer_in_round() {
2951        let round = 5;
2952        let mut rng = TestRng::default();
2953        let (primary, accounts) = primary_without_handlers(&mut rng);
2954        map_account_addresses(&primary, &accounts);
2955
2956        // Generate certificates.
2957        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2958
2959        // Create a valid proposal.
2960        let timestamp = now();
2961        let proposal = create_test_proposal(
2962            primary.gateway.account(),
2963            primary.ledger.current_committee().unwrap(),
2964            round,
2965            previous_certificates,
2966            timestamp,
2967            1,
2968            &mut rng,
2969        );
2970
2971        // Store the proposal on the primary.
2972        *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
2973
2974        // Each committee member signs the batch.
2975        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2976
2977        // Have the primary process the signatures.
2978        for (socket_addr, signature) in signatures {
2979            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2980        }
2981
2982        // Check the certificate was created and stored by the primary.
2983        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2984        // Manually attempt round advancement (because the handler is not running).
2985        primary.try_increment_to_the_next_round(round + 1).await.unwrap();
2986        // Check the round was incremented.
2987        assert_eq!(primary.current_round(), round + 1);
2988    }
2989
2990    #[test_log::test(tokio::test)]
2991    async fn test_batch_signature_from_peer_no_quorum() {
2992        let mut rng = TestRng::default();
2993        let (primary, accounts) = primary_without_handlers(&mut rng);
2994        map_account_addresses(&primary, &accounts);
2995
2996        // Create a valid proposal.
2997        let round = 1;
2998        let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
2999        let proposal = create_test_proposal(
3000            primary.gateway.account(),
3001            primary.ledger.current_committee().unwrap(),
3002            round,
3003            Default::default(),
3004            timestamp,
3005            1,
3006            &mut rng,
3007        );
3008
3009        // Store the proposal on the primary.
3010        *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
3011
3012        // Each committee member signs the batch.
3013        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
3014
3015        // Have the primary process only one signature, mimicking a lack of quorum.
3016        let (socket_addr, signature) = signatures.first().unwrap();
3017        primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
3018
3019        // Check the certificate was not created and stored by the primary.
3020        assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
3021        // Check the round was incremented.
3022        assert_eq!(primary.current_round(), round);
3023    }
3024
3025    #[test_log::test(tokio::test)]
3026    async fn test_batch_signature_from_peer_in_round_no_quorum() {
3027        let round = 7;
3028        let mut rng = TestRng::default();
3029        let (primary, accounts) = primary_without_handlers(&mut rng);
3030        map_account_addresses(&primary, &accounts);
3031
3032        // Generate certificates.
3033        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
3034
3035        // Create a valid proposal.
3036        let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
3037        let proposal = create_test_proposal(
3038            primary.gateway.account(),
3039            primary.ledger.current_committee().unwrap(),
3040            round,
3041            previous_certificates,
3042            timestamp,
3043            1,
3044            &mut rng,
3045        );
3046
3047        // Store the proposal on the primary.
3048        *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(proposal));
3049
3050        // Each committee member signs the batch.
3051        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
3052
3053        // Have the primary process only one signature, mimicking a lack of quorum.
3054        let (socket_addr, signature) = signatures.first().unwrap();
3055        primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
3056
3057        // Check the certificate was not created and stored by the primary.
3058        assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
3059        // Check the round was incremented.
3060        assert_eq!(primary.current_round(), round);
3061    }
3062
3063    // Tests that a late-arriving signature for a batch that is currently being certified
3064    // (ProposedBatchState::Certified) is silently dropped without error.
3065    // This exercises the race condition where proposed_batch.take() has been called but
3066    // insert_certificate has not yet completed.
3067    #[test_log::test(tokio::test)]
3068    async fn test_batch_signature_from_peer_batch_being_certified() {
3069        let mut rng = TestRng::default();
3070        let (primary, accounts) = primary_without_handlers(&mut rng);
3071        map_account_addresses(&primary, &accounts);
3072
3073        // Create a valid proposal.
3074        let round = 1;
3075        let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
3076        let proposal = create_test_proposal(
3077            primary.gateway.account(),
3078            primary.ledger.current_committee().unwrap(),
3079            round,
3080            Default::default(),
3081            timestamp,
3082            1,
3083            &mut rng,
3084        );
3085        let batch_id = proposal.batch_id();
3086
3087        // Simulate the race: the batch has been taken for certification but not yet stored.
3088        *primary.proposed_batch.write() = ProposedBatchState::Certified(batch_id);
3089
3090        // Send a late signature for the batch being certified.
3091        let (socket_addr, account) =
3092            accounts.iter().find(|(_, a)| a.address() != primary.gateway.account().address()).unwrap();
3093        let signature = account.sign(&[batch_id], &mut rng).unwrap();
3094        let batch_signature = BatchSignature::new(batch_id, signature);
3095
3096        // The signature should be accepted without error (silently dropped).
3097        assert!(primary.process_batch_signature_from_peer(*socket_addr, batch_signature).await.is_ok());
3098        // The batch state is unchanged (still BeingCertified — no new proposal was set).
3099        assert!(matches!(&*primary.proposed_batch.read(), ProposedBatchState::Certified(id) if *id == batch_id));
3100    }
3101
3102    // Tests that a signature for a completely unknown batch ID is rejected even when another
3103    // batch is being certified. The BeingCertified state only suppresses errors for its own ID.
3104    #[test_log::test(tokio::test)]
3105    async fn test_batch_signature_from_peer_unknown_id_while_certifying() {
3106        let mut rng = TestRng::default();
3107        let (primary, accounts) = primary_without_handlers(&mut rng);
3108        map_account_addresses(&primary, &accounts);
3109
3110        // Create two proposals so we have two distinct batch IDs.
3111        let round = 1;
3112        let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
3113        let proposal_a = create_test_proposal(
3114            primary.gateway.account(),
3115            primary.ledger.current_committee().unwrap(),
3116            round,
3117            Default::default(),
3118            timestamp,
3119            1,
3120            &mut rng,
3121        );
3122        let proposal_b = create_test_proposal(
3123            primary.gateway.account(),
3124            primary.ledger.current_committee().unwrap(),
3125            round,
3126            Default::default(),
3127            timestamp,
3128            1,
3129            &mut rng,
3130        );
3131        let batch_id_a = proposal_a.batch_id();
3132        let batch_id_b = proposal_b.batch_id();
3133        assert_ne!(batch_id_a, batch_id_b);
3134
3135        // Simulate certifying batch A.
3136        *primary.proposed_batch.write() = ProposedBatchState::Certified(batch_id_a);
3137
3138        // Send a signature for batch B (a genuinely unknown ID).
3139        let (socket_addr, account) =
3140            accounts.iter().find(|(_, a)| a.address() != primary.gateway.account().address()).unwrap();
3141        let signature = account.sign(&[batch_id_b], &mut rng).unwrap();
3142        let batch_signature = BatchSignature::new(batch_id_b, signature);
3143
3144        // The signature is for a genuinely unknown ID — should be rejected with an error.
3145        assert!(primary.process_batch_signature_from_peer(*socket_addr, batch_signature).await.is_err());
3146    }
3147
3148    // Tests the "already certified" path: a signature arrives after the batch is fully in
3149    // storage and the primary has moved on to a new proposal.
3150    #[test_log::test(tokio::test(flavor = "multi_thread"))]
3151    async fn test_batch_signature_from_peer_already_certified() {
3152        let mut rng = TestRng::default();
3153        let (primary, accounts) = primary_without_handlers(&mut rng);
3154        map_account_addresses(&primary, &accounts);
3155
3156        // Create and certify a batch so it lands in storage.
3157        let round = 1;
3158        let timestamp = now() + MIN_BATCH_DELAY.as_secs() as i64;
3159        let old_proposal = create_test_proposal(
3160            primary.gateway.account(),
3161            primary.ledger.current_committee().unwrap(),
3162            round,
3163            Default::default(),
3164            timestamp,
3165            1,
3166            &mut rng,
3167        );
3168        let old_batch_id = old_proposal.batch_id();
3169        *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(old_proposal));
3170        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
3171        for (socket_addr, signature) in signatures {
3172            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
3173        }
3174        // The batch is now in storage.
3175        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
3176
3177        // Simulate a new proposal being active.
3178        let new_proposal = create_test_proposal(
3179            primary.gateway.account(),
3180            primary.ledger.current_committee().unwrap(),
3181            round,
3182            Default::default(),
3183            timestamp,
3184            1,
3185            &mut rng,
3186        );
3187        assert_ne!(new_proposal.batch_id(), old_batch_id);
3188        *primary.proposed_batch.write() = ProposedBatchState::Certifying(Box::new(new_proposal));
3189
3190        // Send a late signature for the already-certified old batch.
3191        let (socket_addr, account) =
3192            accounts.iter().find(|(_, a)| a.address() != primary.gateway.account().address()).unwrap();
3193        let signature = account.sign(&[old_batch_id], &mut rng).unwrap();
3194        let batch_signature = BatchSignature::new(old_batch_id, signature);
3195
3196        // Should be silently accepted (already certified path).
3197        assert!(primary.process_batch_signature_from_peer(*socket_addr, batch_signature).await.is_ok());
3198    }
3199
3200    #[test_log::test(tokio::test)]
3201    async fn test_insert_certificate_with_aborted_transmissions() {
3202        let round = 3;
3203        let prev_round = round - 1;
3204        let mut rng = TestRng::default();
3205        let (primary, accounts) = primary_without_handlers(&mut rng);
3206        let peer_account = &accounts[1];
3207        let peer_ip = peer_account.0;
3208
3209        // Fill primary storage.
3210        store_certificate_chain(&primary, &accounts, round, &mut rng);
3211
3212        // Get transmissions from previous certificates.
3213        let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
3214
3215        // Generate a solution and a transaction.
3216        let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
3217        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
3218
3219        // Store it on one of the workers.
3220        primary.workers()[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
3221        primary.workers()[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
3222
3223        // Check that the worker has 2 transmissions.
3224        assert_eq!(primary.workers()[0].num_transmissions(), 2);
3225
3226        // Create certificates for the current round.
3227        let account = accounts[0].1.clone();
3228        let (certificate, transmissions) =
3229            create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
3230        let certificate_id = certificate.id();
3231
3232        // Randomly abort some of the transmissions.
3233        let mut aborted_transmissions = HashSet::new();
3234        let mut transmissions_without_aborted = HashMap::new();
3235        for (transmission_id, transmission) in transmissions.clone() {
3236            match rng.random::<bool>() || aborted_transmissions.is_empty() {
3237                true => {
3238                    // Insert the aborted transmission.
3239                    aborted_transmissions.insert(transmission_id);
3240                }
3241                false => {
3242                    // Insert the transmission without the aborted transmission.
3243                    transmissions_without_aborted.insert(transmission_id, transmission);
3244                }
3245            };
3246        }
3247
3248        // Add the non-aborted transmissions to the worker.
3249        for (transmission_id, transmission) in transmissions_without_aborted.iter() {
3250            primary.workers()[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
3251        }
3252
3253        // Check that inserting the transmission with missing transmissions fails.
3254        assert!(
3255            primary
3256                .storage
3257                .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
3258                .is_err()
3259        );
3260        assert!(
3261            primary
3262                .storage
3263                .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
3264                .is_err()
3265        );
3266
3267        // Insert the certificate to storage.
3268        primary
3269            .storage
3270            .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
3271            .unwrap();
3272
3273        // Ensure the certificate exists in storage.
3274        assert!(primary.storage.contains_certificate(certificate_id));
3275        // Ensure that the aborted transmission IDs exist in storage.
3276        for aborted_transmission_id in aborted_transmissions {
3277            assert!(primary.storage.contains_transmission(aborted_transmission_id));
3278            assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
3279        }
3280    }
3281
3282    // -----------------------------------------------------------------------
3283    // add_signature_to_batch
3284    // -----------------------------------------------------------------------
3285
3286    /// State is `None` and the batch is not in storage — returns an error, state stays `None`.
3287    #[test]
3288    fn test_add_signature_to_batch_none_state() {
3289        let mut rng = TestRng::default();
3290        let (primary, accounts) = primary_without_handlers(&mut rng);
3291
3292        let peer_ip = accounts[1].0;
3293        let batch_id = Field::rand(&mut rng);
3294        let signature = accounts[1].1.sign(&[batch_id], &mut rng).unwrap();
3295
3296        let (result, new_state) =
3297            primary.add_signature_to_batch(ProposedBatchState::None, peer_ip, batch_id, signature);
3298
3299        assert!(result.is_err());
3300        assert_eq!(new_state, ProposedBatchState::None);
3301    }
3302
3303    /// State is `Certified` with a matching batch ID — silently dropped, state restored.
3304    #[test]
3305    fn test_add_signature_to_batch_certified_matching_id() {
3306        let mut rng = TestRng::default();
3307        let (primary, accounts) = primary_without_handlers(&mut rng);
3308
3309        let peer_ip = accounts[1].0;
3310        let batch_id = Field::rand(&mut rng);
3311        let signature = accounts[1].1.sign(&[batch_id], &mut rng).unwrap();
3312
3313        let (result, new_state) =
3314            primary.add_signature_to_batch(ProposedBatchState::Certified(batch_id), peer_ip, batch_id, signature);
3315
3316        assert!(result.unwrap().is_none());
3317        assert_eq!(new_state, ProposedBatchState::Certified(batch_id));
3318    }
3319
3320    /// State is `Certified` with a *different* batch ID — error returned, state becomes `None`.
3321    #[test]
3322    fn test_add_signature_to_batch_certified_different_id() {
3323        let mut rng = TestRng::default();
3324        let (primary, accounts) = primary_without_handlers(&mut rng);
3325
3326        let peer_ip = accounts[1].0;
3327        let certified_id = Field::rand(&mut rng);
3328        let other_id = Field::rand(&mut rng);
3329        let signature = accounts[1].1.sign(&[other_id], &mut rng).unwrap();
3330
3331        let (result, new_state) =
3332            primary.add_signature_to_batch(ProposedBatchState::Certified(certified_id), peer_ip, other_id, signature);
3333
3334        assert!(result.is_err());
3335        assert_eq!(new_state, ProposedBatchState::Certified(certified_id));
3336    }
3337
3338    /// State is `Certifying` for a *different* batch ID that **is already in storage** — silently
3339    /// dropped, state restored.
3340    #[tokio::test(flavor = "multi_thread")]
3341    async fn test_add_signature_to_batch_certifying_different_id_in_storage() {
3342        let round = 1;
3343        let mut rng = TestRng::default();
3344        let (primary, accounts) = primary_without_handlers(&mut rng);
3345        map_account_addresses(&primary, &accounts);
3346
3347        // Create a proposal owned by the primary.
3348        let proposal = create_test_proposal(
3349            primary.gateway.account(),
3350            primary.ledger.current_committee().unwrap(),
3351            round,
3352            Default::default(),
3353            now(),
3354            0,
3355            &mut rng,
3356        );
3357        let proposal_batch_id = proposal.batch_id();
3358
3359        // Create and store a *different* certificate so `contains_batch` returns true for it.
3360        let (certificate, transmissions) =
3361            create_batch_certificate(accounts[1].1.address(), &accounts, round, Default::default(), &mut rng);
3362        let stored_batch_id = certificate.batch_id();
3363        primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
3364
3365        let peer_ip = accounts[1].0;
3366        let signature = accounts[1].1.sign(&[stored_batch_id], &mut rng).unwrap();
3367
3368        let (result, new_state) = primary.add_signature_to_batch(
3369            ProposedBatchState::Certifying(Box::new(proposal)),
3370            peer_ip,
3371            stored_batch_id,
3372            signature,
3373        );
3374
3375        assert!(result.unwrap().is_none());
3376        // State is restored with the original proposal.
3377        assert_eq!(new_state.as_proposal().unwrap().batch_id(), proposal_batch_id);
3378    }
3379
3380    /// State is `Certifying` for a *different* batch ID that is **not in storage** — error
3381    /// returned, state restored.
3382    #[test]
3383    fn test_add_signature_to_batch_certifying_different_id_unknown() {
3384        let mut rng = TestRng::default();
3385        let (primary, accounts) = primary_without_handlers(&mut rng);
3386
3387        let proposal = create_test_proposal(
3388            primary.gateway.account(),
3389            primary.ledger.current_committee().unwrap(),
3390            1,
3391            Default::default(),
3392            now(),
3393            0,
3394            &mut rng,
3395        );
3396        let proposal_batch_id = proposal.batch_id();
3397
3398        let peer_ip = accounts[1].0;
3399        let unknown_id = Field::rand(&mut rng);
3400        let signature = accounts[1].1.sign(&[unknown_id], &mut rng).unwrap();
3401
3402        let (result, new_state) = primary.add_signature_to_batch(
3403            ProposedBatchState::Certifying(Box::new(proposal)),
3404            peer_ip,
3405            unknown_id,
3406            signature,
3407        );
3408
3409        assert!(result.is_err());
3410        assert_eq!(new_state.as_proposal().unwrap().batch_id(), proposal_batch_id);
3411    }
3412
3413    /// Matching batch ID, valid signature, quorum **not yet** reached — state stays `Certifying`.
3414    #[test]
3415    fn test_add_signature_to_batch_certifying_matching_no_quorum() {
3416        let mut rng = TestRng::default();
3417        let (primary, accounts) = primary_without_handlers(&mut rng);
3418        map_account_addresses(&primary, &accounts);
3419
3420        let proposal = create_test_proposal(
3421            primary.gateway.account(),
3422            primary.ledger.current_committee().unwrap(),
3423            1,
3424            Default::default(),
3425            now(),
3426            0,
3427            &mut rng,
3428        );
3429        let batch_id = proposal.batch_id();
3430
3431        // Only one peer signs — not enough for quorum.
3432        let peer_ip = accounts[1].0;
3433        let signature = accounts[1].1.sign(&[batch_id], &mut rng).unwrap();
3434
3435        let (result, new_state) = primary.add_signature_to_batch(
3436            ProposedBatchState::Certifying(Box::new(proposal)),
3437            peer_ip,
3438            batch_id,
3439            signature,
3440        );
3441
3442        assert!(result.unwrap().is_none());
3443        assert_eq!(new_state.as_proposal().unwrap().batch_id(), batch_id);
3444    }
3445
3446    /// Matching batch ID, all peers sign — quorum reached, proposal extracted and state becomes
3447    /// `Certified`.
3448    #[test]
3449    fn test_add_signature_to_batch_certifying_matching_quorum_reached() {
3450        let mut rng = TestRng::default();
3451        let (primary, accounts) = primary_without_handlers(&mut rng);
3452        map_account_addresses(&primary, &accounts);
3453
3454        let proposal = create_test_proposal(
3455            primary.gateway.account(),
3456            primary.ledger.current_committee().unwrap(),
3457            1,
3458            Default::default(),
3459            now(),
3460            0,
3461            &mut rng,
3462        );
3463        let batch_id = proposal.batch_id();
3464
3465        // Add all peer signatures one by one until quorum is reached.
3466        let peers: Vec<_> =
3467            accounts.iter().filter(|(_, a)| a.address() != primary.gateway.account().address()).collect();
3468        let mut state = ProposedBatchState::Certifying(Box::new(proposal));
3469        let mut final_result = None;
3470
3471        for (peer_ip, peer_account) in &peers {
3472            let signature = peer_account.sign(&[batch_id], &mut rng).unwrap();
3473            let (result, new_state) = primary.add_signature_to_batch(state, *peer_ip, batch_id, signature);
3474            state = new_state;
3475            if result.as_ref().unwrap().is_some() {
3476                final_result = Some(result);
3477                break;
3478            }
3479        }
3480
3481        // Quorum must have been reached with the committee's peers.
3482        let proposal = final_result.expect("quorum should be reached").unwrap().unwrap();
3483        assert_eq!(proposal.batch_id(), batch_id);
3484        assert_eq!(state, ProposedBatchState::Certified(batch_id));
3485    }
3486}