Skip to main content

snarkos_node_bft/
primary.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Gateway,
18    MAX_BATCH_DELAY_IN_MS,
19    MAX_WORKERS,
20    MIN_BATCH_DELAY_IN_SECS,
21    PRIMARY_PING_IN_MS,
22    Sync,
23    Transport,
24    WORKER_PING_IN_MS,
25    Worker,
26    events::{BatchPropose, BatchSignature, Event},
27    helpers::{
28        BFTSender,
29        PrimaryReceiver,
30        PrimarySender,
31        Proposal,
32        ProposalCache,
33        SignedProposals,
34        Storage,
35        assign_to_worker,
36        assign_to_workers,
37        fmt_id,
38        init_sync_channels,
39        init_worker_channels,
40        now,
41    },
42    spawn_blocking,
43};
44
45use snarkos_account::Account;
46use snarkos_node_bft_events::PrimaryPing;
47use snarkos_node_bft_ledger_service::LedgerService;
48use snarkos_node_network::PeerPoolHandling;
49use snarkos_node_sync::{BlockSync, DUMMY_SELF_IP, Ping};
50use snarkos_utilities::NodeDataDir;
51
52use snarkvm::{
53    console::{
54        prelude::*,
55        types::{Address, Field},
56    },
57    ledger::{
58        block::Transaction,
59        narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
60        puzzle::{Solution, SolutionID},
61    },
62    prelude::{ConsensusVersion, committee::Committee},
63    utilities::flatten_error,
64};
65
66use anyhow::Context;
67use colored::Colorize;
68use futures::stream::{FuturesUnordered, StreamExt};
69use indexmap::{IndexMap, IndexSet};
70#[cfg(feature = "locktick")]
71use locktick::{
72    parking_lot::{Mutex, RwLock},
73    tokio::Mutex as TMutex,
74};
75#[cfg(not(feature = "locktick"))]
76use parking_lot::{Mutex, RwLock};
77#[cfg(not(feature = "serial"))]
78use rayon::prelude::*;
79#[cfg(feature = "metrics")]
80use std::time::Instant;
81use std::{
82    collections::{HashMap, HashSet},
83    future::Future,
84    net::SocketAddr,
85    sync::Arc,
86    time::Duration,
87};
88#[cfg(not(feature = "locktick"))]
89use tokio::sync::Mutex as TMutex;
90use tokio::{sync::OnceCell, task::JoinHandle};
91
92/// A helper type for an optional proposed batch.
93pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
94
95/// The primary logic of a node.
96/// AleoBFT adopts a primary-worker architecture as described in the Narwhal and Tusk paper (Section 4.2).
97#[derive(Clone)]
98pub struct Primary<N: Network> {
99    /// The sync module enables fetching data from other validators.
100    sync: Sync<N>,
101    /// The gateway allows talking to other nodes in the validator set.
102    gateway: Gateway<N>,
103    /// The storage.
104    storage: Storage<N>,
105    /// The ledger service.
106    ledger: Arc<dyn LedgerService<N>>,
107    /// The workers.
108    workers: Arc<[Worker<N>]>,
109    /// The BFT sender.
110    bft_sender: Arc<OnceCell<BFTSender<N>>>,
111    /// The batch proposal, if the primary is currently proposing a batch.
112    proposed_batch: Arc<ProposedBatch<N>>,
113    /// The timestamp of the most recent proposed batch.
114    latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
115    /// The instant at which the current batch was proposed (used to measure certification latency).
116    /// (used for higher precision in the metrics compared to the batch timestamp)
117    #[cfg(feature = "metrics")]
118    batch_propose_start: Arc<Mutex<Option<Instant>>>,
119    /// The recently-signed batch proposals.
120    signed_proposals: Arc<RwLock<SignedProposals<N>>>,
121    /// The handles for all background tasks spawned by this primary.
122    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
123    /// The lock for propose_batch.
124    propose_lock: Arc<TMutex<u64>>,
125    /// The node configuration directory.
126    node_data_dir: NodeDataDir,
127}
128
129impl<N: Network> Primary<N> {
130    /// The maximum number of unconfirmed transmissions to send to the primary.
131    pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
132
133    /// Initializes a new primary instance.
134    #[allow(clippy::too_many_arguments)]
135    pub fn new(
136        account: Account<N>,
137        storage: Storage<N>,
138        ledger: Arc<dyn LedgerService<N>>,
139        block_sync: Arc<BlockSync<N>>,
140        ip: Option<SocketAddr>,
141        trusted_validators: &[SocketAddr],
142        trusted_peers_only: bool,
143        node_data_dir: NodeDataDir,
144        dev: Option<u16>,
145    ) -> Result<Self> {
146        // Initialize the gateway.
147        let gateway = Gateway::new(
148            account,
149            storage.clone(),
150            ledger.clone(),
151            ip,
152            trusted_validators,
153            trusted_peers_only,
154            node_data_dir.clone(),
155            dev,
156        )?;
157        // Initialize the sync module.
158        let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone(), block_sync);
159
160        // Initialize the primary instance.
161        Ok(Self {
162            sync,
163            gateway,
164            storage,
165            ledger,
166            workers: Arc::from(vec![]),
167            bft_sender: Default::default(),
168            proposed_batch: Default::default(),
169            latest_proposed_batch_timestamp: Default::default(),
170            #[cfg(feature = "metrics")]
171            batch_propose_start: Default::default(),
172            signed_proposals: Default::default(),
173            handles: Default::default(),
174            propose_lock: Default::default(),
175            node_data_dir,
176        })
177    }
178
179    /// Load the proposal cache file and update the Primary state with the stored data.
180    async fn load_proposal_cache(&self) -> Result<()> {
181        // Fetch the signed proposals from the file system if it exists.
182        match ProposalCache::<N>::exists(&self.node_data_dir) {
183            // If the proposal cache exists, then process the proposal cache.
184            true => match ProposalCache::<N>::load(self.gateway.account().address(), &self.node_data_dir) {
185                Ok(proposal_cache) => {
186                    // Extract the proposal and signed proposals.
187                    let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
188                        proposal_cache.into();
189
190                    // Write the proposed batch.
191                    *self.proposed_batch.write() = proposed_batch;
192                    // Write the signed proposals.
193                    *self.signed_proposals.write() = signed_proposals;
194                    // Writ the propose lock.
195                    *self.propose_lock.lock().await = latest_certificate_round;
196
197                    // Update the storage with the pending certificates.
198                    for certificate in pending_certificates {
199                        let batch_id = certificate.batch_id();
200                        // We use a dummy IP because the node should not need to request from any peers.
201                        // The storage should have stored all the transmissions. If not, we simply
202                        // skip the certificate.
203                        if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
204                        {
205                            let err = err.context(format!(
206                                "Failed to load stored certificate {} from proposal cache",
207                                fmt_id(batch_id)
208                            ));
209                            warn!("{}", &flatten_error(err));
210                        }
211                    }
212                    Ok(())
213                }
214                Err(err) => Err(err.context("Failed to read the signed proposals from the file system")),
215            },
216            // If the proposal cache does not exist, then return early.
217            false => Ok(()),
218        }
219    }
220
221    /// Run the primary instance.
222    pub async fn run(
223        &mut self,
224        ping: Option<Arc<Ping<N>>>,
225        bft_sender: Option<BFTSender<N>>,
226        primary_sender: PrimarySender<N>,
227        primary_receiver: PrimaryReceiver<N>,
228    ) -> Result<()> {
229        info!("Starting the primary instance of the memory pool...");
230
231        // Set the BFT sender.
232        if let Some(bft_sender) = &bft_sender {
233            // Set the BFT sender in the primary.
234            self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set");
235        }
236
237        // Construct a map of the worker senders.
238        let mut worker_senders = IndexMap::new();
239        // Construct a map for the workers.
240        let mut workers = Vec::new();
241        // Initialize the workers.
242        for id in 0..MAX_WORKERS {
243            // Construct the worker channels.
244            let (tx_worker, rx_worker) = init_worker_channels();
245            // Construct the worker instance.
246            let worker = Worker::new(
247                id,
248                Arc::new(self.gateway.clone()),
249                self.storage.clone(),
250                self.ledger.clone(),
251                self.proposed_batch.clone(),
252            )?;
253            // Run the worker instance.
254            worker.run(rx_worker);
255            // Add the worker to the list of workers.
256            workers.push(worker);
257            // Add the worker sender to the map.
258            worker_senders.insert(id, tx_worker);
259        }
260        // Set the workers.
261        self.workers = Arc::from(workers);
262
263        // First, initialize the sync channels.
264        let (sync_sender, sync_receiver) = init_sync_channels();
265        // Next, initialize the sync module and sync the storage from ledger.
266        self.sync.initialize(bft_sender).await?;
267        // Next, load and process the proposal cache before running the sync module.
268        self.load_proposal_cache().await?;
269        // Next, run the sync module.
270        self.sync.run(ping, sync_receiver).await?;
271        // Next, initialize the gateway.
272        self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
273        // Lastly, start the primary handlers.
274        // Note: This ensures the primary does not start communicating before syncing is complete.
275        self.start_handlers(primary_receiver);
276
277        Ok(())
278    }
279
280    /// Returns the current round.
281    pub fn current_round(&self) -> u64 {
282        self.storage.current_round()
283    }
284
285    /// Returns `true` if the primary is synced.
286    pub fn is_synced(&self) -> bool {
287        self.sync.is_synced()
288    }
289
290    /// Returns the gateway.
291    pub const fn gateway(&self) -> &Gateway<N> {
292        &self.gateway
293    }
294
295    /// Returns the storage.
296    pub const fn storage(&self) -> &Storage<N> {
297        &self.storage
298    }
299
300    /// Returns the ledger.
301    pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
302        &self.ledger
303    }
304
305    /// Returns the number of workers.
306    pub fn num_workers(&self) -> u8 {
307        u8::try_from(self.workers.len()).expect("Too many workers")
308    }
309
310    /// Returns the workers.
311    pub const fn workers(&self) -> &Arc<[Worker<N>]> {
312        &self.workers
313    }
314
315    /// Returns the batch proposal of our primary, if one currently exists.
316    pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
317        &self.proposed_batch
318    }
319}
320
321impl<N: Network> Primary<N> {
322    /// Returns the number of unconfirmed transmissions.
323    pub fn num_unconfirmed_transmissions(&self) -> usize {
324        self.workers.iter().map(|worker| worker.num_transmissions()).sum()
325    }
326
327    /// Returns the number of unconfirmed ratifications.
328    pub fn num_unconfirmed_ratifications(&self) -> usize {
329        self.workers.iter().map(|worker| worker.num_ratifications()).sum()
330    }
331
332    /// Returns the number of unconfirmed solutions.
333    pub fn num_unconfirmed_solutions(&self) -> usize {
334        self.workers.iter().map(|worker| worker.num_solutions()).sum()
335    }
336
337    /// Returns the number of unconfirmed transactions.
338    pub fn num_unconfirmed_transactions(&self) -> usize {
339        self.workers.iter().map(|worker| worker.num_transactions()).sum()
340    }
341}
342
343impl<N: Network> Primary<N> {
344    /// Returns the worker transmission IDs.
345    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
346        self.workers.iter().flat_map(|worker| worker.transmission_ids())
347    }
348
349    /// Returns the worker transmissions.
350    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
351        self.workers.iter().flat_map(|worker| worker.transmissions())
352    }
353
354    /// Returns the worker solutions.
355    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
356        self.workers.iter().flat_map(|worker| worker.solutions())
357    }
358
359    /// Returns the worker transactions.
360    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
361        self.workers.iter().flat_map(|worker| worker.transactions())
362    }
363}
364
365impl<N: Network> Primary<N> {
366    /// Clears the worker solutions.
367    pub fn clear_worker_solutions(&self) {
368        self.workers.iter().for_each(Worker::clear_solutions);
369    }
370}
371
372impl<N: Network> Primary<N> {
373    /// Proposes the batch for the current round.
374    ///
375    /// This method performs the following steps:
376    /// 1. Drain the workers.
377    /// 2. Sign the batch.
378    /// 3. Set the batch proposal in the primary.
379    /// 4. Broadcast the batch header to all validators for signing.
380    pub async fn propose_batch(&self) -> Result<()> {
381        // This function isn't re-entrant.
382        let mut lock_guard = self.propose_lock.lock().await;
383
384        // Check if the proposed batch has expired, and clear it if it has expired.
385        if let Err(err) = self
386            .check_proposed_batch_for_expiration()
387            .await
388            .with_context(|| "Failed to check the proposed batch for expiration")
389        {
390            warn!("{}", flatten_error(&err));
391            return Ok(());
392        }
393
394        // Retrieve the current round.
395        let round = self.current_round();
396        // Compute the previous round.
397        let previous_round = round.saturating_sub(1);
398
399        // If the current round is 0, return early.
400        // This can actually never happen, because of the invariant that the current round is never 0
401        // (see [`StorageInner::current_round`]).
402        ensure!(round > 0, "Round 0 cannot have transaction batches");
403
404        // If the current storage round is below the latest proposal round, then return early.
405        if round < *lock_guard {
406            warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
407            return Ok(());
408        }
409
410        // If there is a batch being proposed already,
411        // rebroadcast the batch header to the non-signers, and return early.
412        if let Some(proposal) = self.proposed_batch.read().as_ref() {
413            // Ensure that the storage is caught up to the proposal before proceeding to rebroadcast this.
414            if round < proposal.round()
415                || proposal
416                    .batch_header()
417                    .previous_certificate_ids()
418                    .iter()
419                    .any(|id| !self.storage.contains_certificate(*id))
420            {
421                warn!(
422                    "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
423                    proposal.round(),
424                );
425                return Ok(());
426            }
427            // Construct the event.
428            // TODO(ljedrz): the BatchHeader should be serialized only once in advance before being sent to non-signers.
429            let event = Event::BatchPropose(proposal.batch_header().clone().into());
430            // Iterate through the non-signers.
431            for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
432                // Resolve the address to the peer IP.
433                match self.gateway.resolver().read().get_peer_ip_for_address(address) {
434                    // Resend the batch proposal to the validator for signing.
435                    Some(peer_ip) => {
436                        let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
437                        tokio::spawn(async move {
438                            debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
439                            // Resend the batch proposal to the peer.
440                            if gateway.send(peer_ip, event_).await.is_none() {
441                                warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
442                            }
443                        });
444                    }
445                    None => continue,
446                }
447            }
448            debug!("Proposed batch for round {} is still valid", proposal.round());
449            return Ok(());
450        }
451
452        #[cfg(feature = "metrics")]
453        metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
454
455        // Ensure that the primary does not create a new proposal too quickly.
456        if let Err(err) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) {
457            debug!(
458                "{}",
459                flatten_error(err.context(format!("Primary is safely skipping a batch proposal for round {round}")))
460            );
461            return Ok(());
462        }
463
464        // Ensure the primary has not proposed a batch for this round before.
465        if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
466            // If a BFT sender was provided, attempt to advance the current round.
467            if let Some(bft_sender) = self.bft_sender.get() {
468                match bft_sender.send_primary_round_to_bft(self.current_round()).await {
469                    // 'is_ready' is true if the primary is ready to propose a batch for the next round.
470                    Ok(true) => (), // continue,
471                    // 'is_ready' is false if the primary is not ready to propose a batch for the next round.
472                    Ok(false) => return Ok(()),
473                    // An error occurred while attempting to advance the current round.
474                    Err(err) => {
475                        let err = err.context("Failed to update the BFT to the next round");
476                        warn!("{}", &flatten_error(&err));
477                        return Err(err);
478                    }
479                }
480            }
481            debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
482            return Ok(());
483        }
484
485        // Determine if the current round has been proposed.
486        // Note: Do NOT make this judgment in advance before rebroadcast and round update. Rebroadcasting is
487        // good for network reliability and should not be prevented for the already existing proposed_batch.
488        // If a certificate already exists for the current round, an attempt should be made to advance the
489        // round as early as possible.
490        if round == *lock_guard {
491            debug!("Primary is safely skipping a batch proposal - round {round} already proposed");
492            return Ok(());
493        }
494
495        // Retrieve the committee to check against.
496        let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
497        // Check if the primary is connected to enough validators to reach quorum threshold.
498        {
499            // Retrieve the connected validator addresses.
500            let mut connected_validators = self.gateway.connected_addresses();
501            // Append the primary to the set.
502            connected_validators.insert(self.gateway.account().address());
503            // If quorum threshold is not reached, return early.
504            if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
505                debug!(
506                    "Primary is safely skipping a batch proposal for round {round} {}",
507                    "(please connect to more validators)".dimmed()
508                );
509                trace!("Primary is connected to {} validators", connected_validators.len() - 1);
510                return Ok(());
511            }
512        }
513
514        // Retrieve the previous certificates.
515        let previous_certificates = self.storage.get_certificates_for_round(previous_round);
516
517        // Check if the batch is ready to be proposed.
518        // Note: The primary starts at round 1, and round 0 contains no certificates, by definition.
519        let mut is_ready = previous_round == 0;
520        // If the previous round is not 0, check if the previous certificates have reached the quorum threshold.
521        if previous_round > 0 {
522            // Retrieve the committee lookback for the round.
523            let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
524                bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
525            };
526            // Construct a set over the authors.
527            let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
528            // Check if the previous certificates have reached the quorum threshold.
529            if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
530                is_ready = true;
531            }
532        }
533        // If the batch is not ready to be proposed, return early.
534        if !is_ready {
535            debug!(
536                "Primary is safely skipping a batch proposal for round {round} {}",
537                format!("(previous round {previous_round} has not reached quorum)").dimmed()
538            );
539            return Ok(());
540        }
541
542        // Initialize the map of transmissions.
543        let mut transmissions: IndexMap<_, _> = Default::default();
544        // Track the total execution costs of the batch proposal as it is being constructed.
545        let mut proposal_cost = 0u64;
546        // Note: worker draining and transaction inclusion needs to be thought
547        // through carefully when there is more than one worker. The fairness
548        // provided by one worker (FIFO) is no longer guaranteed with multiple workers.
549        debug_assert_eq!(MAX_WORKERS, 1);
550
551        'outer: for worker in self.workers().iter() {
552            let mut num_worker_transmissions = 0usize;
553
554            while let Some((id, transmission)) = worker.remove_front() {
555                // Check the selected transmissions are below the batch limit.
556                if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
557                    // Reinsert the transmission into the worker.
558                    worker.insert_front(id, transmission);
559                    break 'outer;
560                }
561
562                // Check the max transmissions per worker is not exceeded.
563                if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER {
564                    // Reinsert the transmission into the worker.
565                    worker.insert_front(id, transmission);
566                    continue 'outer;
567                }
568
569                // Check if the ledger already contains the transmission.
570                if self.ledger.contains_transmission(&id).unwrap_or(true) {
571                    trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
572                    continue;
573                }
574
575                // Check if the storage already contain the transmission.
576                // Note: We do not skip if this is the first transmission in the proposal, to ensure that
577                // the primary does not propose a batch with no transmissions.
578                if !transmissions.is_empty() && self.storage.contains_transmission(id) {
579                    trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
580                    continue;
581                }
582
583                // Check the transmission is still valid.
584                match (id, transmission.clone()) {
585                    (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
586                        // Ensure the checksum matches. If not, skip the solution.
587                        if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum)
588                        {
589                            trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id));
590                            continue;
591                        }
592                        // Check if the solution is still valid.
593                        if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
594                            trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
595                            continue;
596                        }
597                    }
598                    (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
599                        // Ensure the checksum matches. If not, skip the transaction.
600                        if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum )
601                        {
602                            trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id));
603                            continue;
604                        }
605
606                        // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
607                        let transaction = spawn_blocking!({
608                            match transaction {
609                                Data::Object(transaction) => Ok(transaction),
610                                Data::Buffer(bytes) => {
611                                    Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
612                                }
613                            }
614                        })?;
615
616                        // TODO (raychu86): Record Commitment - Remove this logic after the next migration height is reached.
617                        // ConsensusVersion V8 Migration logic -
618                        // Do not include deployments in a batch proposal.
619                        let current_block_height = self.ledger.latest_block_height();
620                        let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?;
621                        let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?;
622                        let consensus_version = N::CONSENSUS_VERSION(current_block_height)?;
623                        if current_block_height > consensus_version_v7_height
624                            && current_block_height <= consensus_version_v8_height
625                            && transaction.is_deploy()
626                        {
627                            trace!(
628                                "Proposing - Skipping transaction '{}' - Deployment transactions are not allowed until Consensus V8 (block {consensus_version_v8_height})",
629                                fmt_id(transaction_id)
630                            );
631                            continue;
632                        }
633
634                        // Compute the transaction spent cost (in microcredits).
635                        // Note: We purposefully discard this transaction if we are unable to compute the spent cost.
636                        let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
637                        else {
638                            debug!(
639                                "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost",
640                                fmt_id(transaction_id)
641                            );
642                            continue;
643                        };
644
645                        // Check if the transaction is still valid.
646                        if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
647                            trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
648                            continue;
649                        }
650
651                        // Compute the next proposal cost.
652                        // Note: We purposefully discard this transaction if the proposal cost overflows.
653                        let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
654                            debug!(
655                                "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed",
656                                fmt_id(transaction_id)
657                            );
658                            continue;
659                        };
660
661                        // Check if the next proposal cost exceeds the batch proposal spend limit.
662                        let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(current_block_height);
663                        if next_proposal_cost > batch_spend_limit {
664                            debug!(
665                                "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})",
666                                fmt_id(transaction_id),
667                                batch_spend_limit
668                            );
669
670                            // Reinsert the transmission into the worker.
671                            worker.insert_front(id, transmission);
672                            break 'outer;
673                        }
674
675                        // Update the proposal cost.
676                        proposal_cost = next_proposal_cost;
677                    }
678
679                    // Note: We explicitly forbid including ratifications,
680                    // as the protocol currently does not support ratifications.
681                    (TransmissionID::Ratification, Transmission::Ratification) => continue,
682                    // All other combinations are clearly invalid.
683                    _ => continue,
684                }
685
686                // If the transmission is valid, insert it into the proposal's transmission list.
687                transmissions.insert(id, transmission);
688                num_worker_transmissions = num_worker_transmissions.saturating_add(1);
689            }
690        }
691
692        // Determine the current timestamp.
693        let current_timestamp = now();
694
695        *lock_guard = round;
696
697        /* Proceeding to sign & propose the batch. */
698        info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
699
700        // Retrieve the private key.
701        let private_key = *self.gateway.account().private_key();
702        // Retrieve the committee ID.
703        let committee_id = committee_lookback.id();
704        // Prepare the transmission IDs.
705        let transmission_ids = transmissions.keys().copied().collect();
706        // Prepare the previous batch certificate IDs.
707        let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
708        // Sign the batch header and construct the proposal.
709        let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
710            &private_key,
711            round,
712            current_timestamp,
713            committee_id,
714            transmission_ids,
715            previous_certificate_ids,
716            &mut rand::thread_rng()
717        ))
718        .and_then(|batch_header| {
719            Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
720                .map(|proposal| (batch_header, proposal))
721        })
722        .inspect_err(|_| {
723            // On error, reinsert the transmissions and then propagate the error.
724            if let Err(err) = self.reinsert_transmissions_into_workers(transmissions) {
725                error!("{}", flatten_error(err.context("Failed to reinsert transmissions")));
726            }
727        })?;
728        // Broadcast the batch to all validators for signing.
729        self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
730        // Set the timestamp of the latest proposed batch.
731        *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
732        // Record the wall-clock time at which the batch was proposed.
733        #[cfg(feature = "metrics")]
734        {
735            *self.batch_propose_start.lock() = Some(Instant::now());
736        }
737        // Set the proposed batch.
738        *self.proposed_batch.write() = Some(proposal);
739        Ok(())
740    }
741
742    /// Processes a batch propose from a peer.
743    ///
744    /// This method performs the following steps:
745    /// 1. Verify the batch.
746    /// 2. Sign the batch.
747    /// 3. Broadcast the signature back to the validator.
748    ///
749    /// If our primary is ahead of the peer, we will not sign the batch.
750    /// If our primary is behind the peer, but within GC range, we will sync up to the peer's round, and then sign the batch.
751    async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
752        let BatchPropose { round: batch_round, batch_header } = batch_propose;
753
754        // Deserialize the batch header.
755        let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
756        // Ensure the round matches in the batch header.
757        if batch_round != batch_header.round() {
758            // Proceed to disconnect the validator.
759            self.gateway.disconnect(peer_ip);
760            bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
761        }
762
763        // Retrieve the batch author.
764        let batch_author = batch_header.author();
765
766        // Ensure the batch proposal is from the validator.
767        match self.gateway.resolve_to_aleo_addr(peer_ip) {
768            // If the peer is a validator, then ensure the batch proposal is from the validator.
769            Some(address) => {
770                if address != batch_author {
771                    // Proceed to disconnect the validator.
772                    self.gateway.disconnect(peer_ip);
773                    bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
774                }
775            }
776            None => bail!("Batch proposal from a disconnected validator"),
777        }
778        // Ensure the batch author is a current committee member.
779        if !self.gateway.is_authorized_validator_address(batch_author) {
780            // Proceed to disconnect the validator.
781            self.gateway.disconnect(peer_ip);
782            bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
783        }
784        // Ensure the batch proposal is not from the current primary.
785        if self.gateway.account().address() == batch_author {
786            bail!("Invalid peer - proposed batch from myself ({batch_author})");
787        }
788
789        // Ensure that the batch proposal's committee ID matches the expected committee ID.
790        let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
791        if expected_committee_id != batch_header.committee_id() {
792            // Proceed to disconnect the validator.
793            self.gateway.disconnect(peer_ip);
794            bail!(
795                "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
796                batch_header.committee_id()
797            );
798        }
799
800        // Retrieve the cached round and batch ID for this validator.
801        if let Some((signed_round, signed_batch_id, signature)) =
802            self.signed_proposals.read().get(&batch_author).copied()
803        {
804            // If the signed round is ahead of the peer's batch round, do not sign the proposal.
805            // Note: while this may be valid behavior, additional formal analysis and testing will need to be done before allowing it.
806            if signed_round > batch_header.round() {
807                bail!(
808                    "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}",
809                    batch_header.round()
810                );
811            }
812
813            // If the round matches and the batch ID differs, then the validator is malicious.
814            if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
815                bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
816            }
817            // If the round and batch ID matches, then skip signing the batch a second time.
818            // Instead, rebroadcast the cached signature to the peer.
819            if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
820                let gateway = self.gateway.clone();
821                tokio::spawn(async move {
822                    debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
823                    let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
824                    // Resend the batch signature to the peer.
825                    if gateway.send(peer_ip, event).await.is_none() {
826                        warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
827                    }
828                });
829                // Return early.
830                return Ok(());
831            }
832        }
833
834        // Ensure that the batch header doesn't already exist in storage.
835        // Note this is already checked in `check_batch_header`, however we can return early here without creating a blocking task.
836        if self.storage.contains_batch(batch_header.batch_id()) {
837            debug!(
838                "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
839                format!("batch for round {batch_round} already exists in storage").dimmed()
840            );
841            return Ok(());
842        }
843
844        // Compute the previous round.
845        let previous_round = batch_round.saturating_sub(1);
846        // Ensure that the peer did not propose a batch too quickly.
847        if let Err(err) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
848            // Proceed to disconnect the validator.
849            self.gateway.disconnect(peer_ip);
850            return Err(err.context(format!("Malicious behavior of peer '{peer_ip}'")));
851        }
852
853        // Ensure the batch header does not contain any ratifications.
854        if batch_header.contains(TransmissionID::Ratification) {
855            // Proceed to disconnect the validator.
856            self.gateway.disconnect(peer_ip);
857            bail!(
858                "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'",
859            );
860        }
861
862        // If the peer is ahead, use the batch header to sync up to the peer.
863        let mut missing_transmissions =
864            self.sync_with_batch_header_from_peer::<false, true>(peer_ip, &batch_header).await?;
865
866        // Check that the transmission ids match and are not fee transactions.
867        if let Err(err) = cfg_iter_mut!(&mut missing_transmissions).try_for_each(|(transmission_id, transmission)| {
868            // If the transmission is not well-formed, then return early.
869            self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
870        }) {
871            let err = err.context(format!(
872                "Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission"
873            ));
874            debug!("{}", flatten_error(err));
875            return Ok(());
876        }
877
878        // Ensure the batch is for the current round.
879        // This method must be called after fetching previous certificates (above),
880        // and prior to checking the batch header (below).
881        if let Err(e) = self.ensure_is_signing_round(batch_round) {
882            // If the primary is not signing for the peer's round, then return early.
883            debug!("{e} from '{peer_ip}'");
884            return Ok(());
885        }
886
887        // Ensure the batch header from the peer is valid.
888        let (storage, header) = (self.storage.clone(), batch_header.clone());
889
890        // Check the batch header, and return early if it already exists in storage.
891        let Some(missing_transmissions) =
892            spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?
893        else {
894            return Ok(());
895        };
896
897        // Inserts the missing transmissions into the workers.
898        self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
899
900        // Ensure the transaction doesn't bring the proposal above the spend limit.
901        let block_height = self.ledger.latest_block_height() + 1;
902        if N::CONSENSUS_VERSION(block_height)? >= ConsensusVersion::V5 {
903            let mut proposal_cost = 0u64;
904            for transmission_id in batch_header.transmission_ids() {
905                let worker_id = assign_to_worker(*transmission_id, self.num_workers())?;
906                let Some(worker) = self.workers.get(worker_id as usize) else {
907                    debug!("Unable to find worker {worker_id}");
908                    return Ok(());
909                };
910
911                let Some(transmission) = worker.get_transmission(*transmission_id) else {
912                    debug!("Unable to find transmission '{}' in worker '{worker_id}", fmt_id(transmission_id));
913                    return Ok(());
914                };
915
916                // If the transmission is a transaction, compute its execution cost.
917                if let (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) =
918                    (transmission_id, transmission)
919                {
920                    // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
921                    let transaction = spawn_blocking!({
922                        match transaction {
923                            Data::Object(transaction) => Ok(transaction),
924                            Data::Buffer(bytes) => {
925                                Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
926                            }
927                        }
928                    })?;
929
930                    // TODO (raychu86): Record Commitment - Remove this logic after the next migration height is reached.
931                    // ConsensusVersion V8 Migration logic -
932                    // Do not include deployments in a batch proposal.
933                    let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?;
934                    let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?;
935                    let consensus_version = N::CONSENSUS_VERSION(block_height)?;
936                    if block_height > consensus_version_v7_height
937                        && block_height <= consensus_version_v8_height
938                        && transaction.is_deploy()
939                    {
940                        bail!(
941                            "Invalid batch proposal - Batch proposals are not allowed to include deployments until Consensus V8 (block {consensus_version_v8_height})",
942                        )
943                    }
944
945                    // Compute the transaction spent cost (in microcredits).
946                    // Note: We purposefully discard this transaction if we are unable to compute the spent cost.
947                    let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
948                    else {
949                        bail!(
950                            "Invalid batch proposal - Unable to compute transaction spent cost on transaction '{}'",
951                            fmt_id(transaction_id)
952                        )
953                    };
954
955                    // Compute the next proposal cost.
956                    // Note: We purposefully discard this transaction if the proposal cost overflows.
957                    let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
958                        bail!(
959                            "Invalid batch proposal - Batch proposal overflowed on transaction '{}'",
960                            fmt_id(transaction_id)
961                        )
962                    };
963
964                    // Check if the next proposal cost exceeds the batch proposal spend limit.
965                    let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(block_height);
966                    if next_proposal_cost > batch_spend_limit {
967                        bail!(
968                            "Malicious peer - Batch proposal from '{peer_ip}' exceeds the spend limit on transaction '{}' ({next_proposal_cost} > {})",
969                            fmt_id(transaction_id),
970                            batch_spend_limit
971                        );
972                    }
973
974                    // Update the proposal cost.
975                    proposal_cost = next_proposal_cost;
976                }
977            }
978        }
979
980        /* Proceeding to sign the batch. */
981
982        // Retrieve the batch ID.
983        let batch_id = batch_header.batch_id();
984        // Sign the batch ID.
985        let account = self.gateway.account().clone();
986        let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::thread_rng()))?;
987
988        // Ensure the proposal has not already been signed.
989        //
990        // Note: Due to the need to sync the batch header with the peer, it is possible
991        // for the primary to receive the same 'BatchPropose' event again, whereby only
992        // one instance of this handler should sign the batch. This check guarantees this.
993        match self.signed_proposals.write().0.entry(batch_author) {
994            std::collections::hash_map::Entry::Occupied(mut entry) => {
995                // If the validator has already signed a batch for this round, then return early,
996                // since, if the peer still has not received the signature, they will request it again,
997                // and the logic at the start of this function will resend the (now cached) signature
998                // to the peer if asked to sign this batch proposal again.
999                if entry.get().0 == batch_round {
1000                    return Ok(());
1001                }
1002                // Otherwise, cache the round, batch ID, and signature for this validator.
1003                entry.insert((batch_round, batch_id, signature));
1004            }
1005            // If the validator has not signed a batch before, then continue.
1006            std::collections::hash_map::Entry::Vacant(entry) => {
1007                // Cache the round, batch ID, and signature for this validator.
1008                entry.insert((batch_round, batch_id, signature));
1009            }
1010        };
1011
1012        // Broadcast the signature back to the validator.
1013        let self_ = self.clone();
1014        tokio::spawn(async move {
1015            let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
1016            // Send the batch signature to the peer.
1017            if self_.gateway.send(peer_ip, event).await.is_some() {
1018                debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
1019            }
1020        });
1021
1022        Ok(())
1023    }
1024
1025    /// Processes a batch signature from a peer.
1026    ///
1027    /// This method performs the following steps:
1028    /// 1. Ensure the proposed batch has not expired.
1029    /// 2. Verify the signature, ensuring it corresponds to the proposed batch.
1030    /// 3. Store the signature.
1031    /// 4. Certify the batch if enough signatures have been received.
1032    /// 5. Broadcast the batch certificate to all validators.
1033    async fn process_batch_signature_from_peer(
1034        &self,
1035        peer_ip: SocketAddr,
1036        batch_signature: BatchSignature<N>,
1037    ) -> Result<()> {
1038        // Ensure the proposed batch has not expired, and clear the proposed batch if it has expired.
1039        self.check_proposed_batch_for_expiration().await?;
1040
1041        // Retrieve the signature and timestamp.
1042        let BatchSignature { batch_id, signature } = batch_signature;
1043
1044        // Retrieve the signer.
1045        let signer = signature.to_address();
1046
1047        // Ensure the batch signature is signed by the validator.
1048        if self.gateway.resolve_to_aleo_addr(peer_ip) != Some(signer) {
1049            // Proceed to disconnect the validator.
1050            self.gateway.disconnect(peer_ip);
1051            bail!("Malicious peer - batch signature is from a different validator ({signer})");
1052        }
1053        // Ensure the batch signature is not from the current primary.
1054        if self.gateway.account().address() == signer {
1055            bail!("Invalid peer - received a batch signature from myself ({signer})");
1056        }
1057
1058        let self_ = self.clone();
1059        let Some(proposal) = spawn_blocking!({
1060            // Acquire the write lock.
1061            let mut proposed_batch = self_.proposed_batch.write();
1062            // Add the signature to the batch, and determine if the batch is ready to be certified.
1063            match proposed_batch.as_mut() {
1064                Some(proposal) => {
1065                    // Ensure the batch ID matches the currently proposed batch ID.
1066                    if proposal.batch_id() != batch_id {
1067                        match self_.storage.contains_batch(batch_id) {
1068                            // If this batch was already certified, return early.
1069                            true => {
1070                                debug!(
1071                                    "Primary is safely skipping a a batch signature from {peer_ip} for round {} - batch is already certified",
1072                                    proposal.round()
1073                                );
1074                                return Ok(None);
1075                            }
1076                            // If the batch ID is unknown, return an error.
1077                            false => bail!(
1078                                "Unknown batch ID '{batch_id}', expected '{}' for round {}",
1079                                proposal.batch_id(),
1080                                proposal.round()
1081                            ),
1082                        }
1083                    }
1084                    // Retrieve the committee lookback for the round.
1085                    let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?;
1086                    // Retrieve the address of the validator.
1087                    let Some(signer) = self_.gateway.resolve_to_aleo_addr(peer_ip) else {
1088                        bail!("Signature is from a disconnected validator");
1089                    };
1090                    // Add the signature to the batch.
1091                    let new_signature = proposal.add_signature(signer, signature, &committee_lookback)?;
1092
1093                    if new_signature {
1094                        info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
1095                        // Check if the batch is ready to be certified.
1096                        if !proposal.is_quorum_threshold_reached(&committee_lookback) {
1097                            // If the batch is not ready to be certified, return early.
1098                            return Ok(None);
1099                        }
1100                    } else {
1101                        debug!(
1102                            "Received duplicated signature from '{peer_ip}' for batch {batch_id} in round {round}",
1103                            round = proposal.round()
1104                        );
1105                        return Ok(None);
1106                    }
1107                }
1108                // There is no proposed batch, so return early.
1109                None => return Ok(None),
1110            };
1111            // Retrieve the batch proposal, clearing the proposed batch.
1112            match proposed_batch.take() {
1113                Some(proposal) => Ok(Some(proposal)),
1114                None => Ok(None),
1115            }
1116        })?
1117        else {
1118            return Ok(());
1119        };
1120
1121        /* Proceeding to certify the batch. */
1122
1123        info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
1124
1125        // Retrieve the committee lookback for the round.
1126        let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1127        // Store the certified batch and broadcast it to all validators.
1128        // If there was an error storing the certificate, reinsert the transmissions back into the ready queue.
1129        if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
1130            // Reinsert the transmissions back into the ready queue for the next proposal.
1131            self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1132            return Err(e);
1133        }
1134
1135        #[cfg(feature = "metrics")]
1136        metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
1137        Ok(())
1138    }
1139
1140    /// Processes a batch certificate from a peer.
1141    ///
1142    /// This method performs the following steps:
1143    /// 1. Stores the given batch certificate, after ensuring it is valid.
1144    /// 2. If there are enough certificates to reach quorum threshold for the current round,
1145    ///    then proceed to advance to the next round.
1146    async fn process_batch_certificate_from_peer(
1147        &self,
1148        peer_ip: SocketAddr,
1149        certificate: BatchCertificate<N>,
1150    ) -> Result<()> {
1151        // Ensure the batch certificate is from an authorized validator.
1152        if !self.gateway.is_authorized_validator_ip(peer_ip) {
1153            // Proceed to disconnect the validator.
1154            self.gateway.disconnect(peer_ip);
1155            bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
1156        }
1157        // Ensure storage does not already contain the certificate.
1158        if self.storage.contains_certificate(certificate.id()) {
1159            return Ok(());
1160        // Otherwise, ensure ephemeral storage contains the certificate.
1161        } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
1162            self.storage.insert_unprocessed_certificate(certificate.clone())?;
1163        }
1164
1165        // Retrieve the batch certificate author.
1166        let author = certificate.author();
1167        // Retrieve the batch certificate round.
1168        let certificate_round = certificate.round();
1169        // Retrieve the batch certificate committee ID.
1170        let committee_id = certificate.committee_id();
1171
1172        // Ensure the batch certificate is not from the current primary.
1173        if self.gateway.account().address() == author {
1174            bail!("Received a batch certificate for myself ({author})");
1175        }
1176
1177        // Ensure that the incoming certificate is valid.
1178        self.storage.check_incoming_certificate(&certificate)?;
1179
1180        // Store the certificate, after ensuring it is valid above.
1181        // The following call recursively fetches and stores
1182        // the previous certificates referenced from this certificate.
1183        // It is critical to make the following call this after validating the certificate above.
1184        // The reason is that a sequence of malformed certificates,
1185        // with references to previous certificates with non-decreasing rounds,
1186        // cause the recursive fetching of certificates to crash the validator due to resource exhaustion.
1187        // Note that if the following call, if not returning an error, guarantees the backward closure of the DAG
1188        // (i.e. that all the referenced previous certificates are in the DAG before storing this one),
1189        // then all the validity checks in [`Storage::check_certificate`] should be redundant.
1190        // TODO: eliminate those redundant checks
1191        self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
1192
1193        // If there are enough certificates to reach quorum threshold for the certificate round,
1194        // then proceed to advance to the next round.
1195
1196        // Retrieve the committee lookback.
1197        let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
1198
1199        // Retrieve the certificate authors.
1200        let authors = self.storage.get_certificate_authors_for_round(certificate_round);
1201        // Check if the certificates have reached the quorum threshold.
1202        let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
1203
1204        // Ensure that the batch certificate's committee ID matches the expected committee ID.
1205        let expected_committee_id = committee_lookback.id();
1206        if expected_committee_id != committee_id {
1207            // Proceed to disconnect the validator.
1208            self.gateway.disconnect(peer_ip);
1209            bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
1210        }
1211
1212        // Determine if we are currently proposing a round that is relevant.
1213        // Note: This is important, because while our peers have advanced,
1214        // they may not be proposing yet, and thus still able to sign our proposed batch.
1215        let should_advance = match &*self.proposed_batch.read() {
1216            // We advance if the proposal round is less than the current round that was just certified.
1217            Some(proposal) => proposal.round() < certificate_round,
1218            // If there's no proposal, we consider advancing.
1219            None => true,
1220        };
1221
1222        // Retrieve the current round.
1223        let current_round = self.current_round();
1224
1225        // Determine whether to advance to the next round.
1226        if is_quorum && should_advance && certificate_round >= current_round {
1227            // If we have reached the quorum threshold and the round should advance, then proceed to the next round.
1228            self.try_increment_to_the_next_round(current_round + 1).await?;
1229        }
1230        Ok(())
1231    }
1232}
1233
1234impl<N: Network> Primary<N> {
1235    /// Starts the primary handlers.
1236    ///
1237    /// For each receiver in the `primary_receiver` struct, there will be a dedicated task
1238    /// that awaits new data and handles it accordingly.
1239    /// Additionally, this spawns a task that periodically issues PrimaryPings and one that periodically
1240    /// tries to move the the next round of batches.
1241    ///
1242    /// This function is called exactly once, in `Self::run()`.
1243    fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
1244        let PrimaryReceiver {
1245            mut rx_batch_propose,
1246            mut rx_batch_signature,
1247            mut rx_batch_certified,
1248            mut rx_primary_ping,
1249            mut rx_unconfirmed_solution,
1250            mut rx_unconfirmed_transaction,
1251        } = primary_receiver;
1252
1253        // Start the primary ping sender.
1254        let self_ = self.clone();
1255        self.spawn(async move {
1256            loop {
1257                // Sleep briefly.
1258                tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
1259
1260                // Retrieve the block locators.
1261                let self__ = self_.clone();
1262                let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1263                    Ok(block_locators) => block_locators,
1264                    Err(e) => {
1265                        warn!("Failed to retrieve block locators - {e}");
1266                        continue;
1267                    }
1268                };
1269
1270                // Retrieve the latest certificate of the primary.
1271                let primary_certificate = {
1272                    // Retrieve the primary address.
1273                    let primary_address = self_.gateway.account().address();
1274
1275                    // Iterate backwards from the latest round to find the primary certificate.
1276                    let mut certificate = None;
1277                    let mut current_round = self_.current_round();
1278                    while certificate.is_none() {
1279                        // If the current round is 0, then break the while loop.
1280                        if current_round == 0 {
1281                            break;
1282                        }
1283                        // Retrieve the primary certificates.
1284                        if let Some(primary_certificate) =
1285                            self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1286                        {
1287                            certificate = Some(primary_certificate);
1288                        // If the primary certificate was not found, decrement the round.
1289                        } else {
1290                            current_round = current_round.saturating_sub(1);
1291                        }
1292                    }
1293
1294                    // Determine if the primary certificate was found.
1295                    match certificate {
1296                        Some(certificate) => certificate,
1297                        // Skip this iteration of the loop (do not send a primary ping).
1298                        None => continue,
1299                    }
1300                };
1301
1302                // Construct the primary ping.
1303                let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1304                // Broadcast the event.
1305                self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1306            }
1307        });
1308
1309        // Start the primary ping handler.
1310        let self_ = self.clone();
1311        self.spawn(async move {
1312            while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1313                // If the primary is not synced, then do not process the primary ping.
1314                if self_.sync.is_synced() {
1315                    trace!("Processing new primary ping from '{peer_ip}'");
1316                } else {
1317                    trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1318                    continue;
1319                }
1320
1321                // Spawn a task to process the primary certificate.
1322                {
1323                    let self_ = self_.clone();
1324                    tokio::spawn(async move {
1325                        // Deserialize the primary certificate in the primary ping.
1326                        let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1327                        else {
1328                            warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1329                            return;
1330                        };
1331                        // Process the primary certificate.
1332                        let id = fmt_id(primary_certificate.id());
1333                        let round = primary_certificate.round();
1334                        if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1335                            warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}");
1336                        }
1337                    });
1338                }
1339            }
1340        });
1341
1342        // Start the worker ping(s).
1343        let self_ = self.clone();
1344        self.spawn(async move {
1345            loop {
1346                tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await;
1347                // If the primary is not synced, then do not broadcast the worker ping(s).
1348                if !self_.sync.is_synced() {
1349                    trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1350                    continue;
1351                }
1352                // Broadcast the worker ping(s).
1353                for worker in self_.workers.iter() {
1354                    worker.broadcast_ping();
1355                }
1356            }
1357        });
1358
1359        // Start the batch proposer.
1360        let self_ = self.clone();
1361        self.spawn(async move {
1362            loop {
1363                // Sleep briefly, but longer than if there were no batch.
1364                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1365                let current_round = self_.current_round();
1366                // If the primary is not synced, then do not propose a batch.
1367                if !self_.sync.is_synced() {
1368                    debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed());
1369                    continue;
1370                }
1371                // A best-effort attempt to skip the scheduled batch proposal if
1372                // round progression already triggered one.
1373                if self_.propose_lock.try_lock().is_err() {
1374                    trace!(
1375                        "Skipping batch proposal for round {current_round} {}",
1376                        "(node is already proposing)".dimmed()
1377                    );
1378                    continue;
1379                };
1380                // If there is no proposed batch, attempt to propose a batch.
1381                // Note: Do NOT spawn a task around this function call. Proposing a batch is a critical path,
1382                // and only one batch needs to be proposed at a time.
1383                if let Err(e) = self_.propose_batch().await {
1384                    warn!("Cannot propose a batch - {e}");
1385                }
1386            }
1387        });
1388
1389        // Start the proposed batch handler.
1390        let self_ = self.clone();
1391        self.spawn(async move {
1392            while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1393                // If the primary is not synced, then do not sign the batch.
1394                if !self_.sync.is_synced() {
1395                    trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1396                    continue;
1397                }
1398                // Spawn a task to process the proposed batch.
1399                let self_ = self_.clone();
1400                tokio::spawn(async move {
1401                    // Process the batch proposal.
1402                    let round = batch_propose.round;
1403                    if let Err(e) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1404                        warn!("Cannot sign a batch at round {round} from '{peer_ip}' - {e}");
1405                    }
1406                });
1407            }
1408        });
1409
1410        // Start the batch signature handler.
1411        let self_ = self.clone();
1412        self.spawn(async move {
1413            while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1414                // If the primary is not synced, then do not store the signature.
1415                if !self_.sync.is_synced() {
1416                    trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1417                    continue;
1418                }
1419                // Process the batch signature.
1420                // Note: Do NOT spawn a task around this function call. Processing signatures from peers
1421                // is a critical path, and we should only store the minimum required number of signatures.
1422                // In addition, spawning a task can cause concurrent processing of signatures (even with a lock),
1423                // which means the RwLock for the proposed batch must become a 'tokio::sync' to be safe.
1424                let id = fmt_id(batch_signature.batch_id);
1425                if let Err(err) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1426                    let err = err.context(format!("Cannot store a signature for batch '{id}' from '{peer_ip}'"));
1427                    warn!("{}", flatten_error(err));
1428                }
1429            }
1430        });
1431
1432        // Start the certified batch handler.
1433        let self_ = self.clone();
1434        self.spawn(async move {
1435            while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1436                // If the primary is not synced, then do not store the certificate.
1437                if !self_.sync.is_synced() {
1438                    trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1439                    continue;
1440                }
1441                // Spawn a task to process the batch certificate.
1442                let self_ = self_.clone();
1443                tokio::spawn(async move {
1444                    // Deserialize the batch certificate.
1445                    let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1446                        warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1447                        return;
1448                    };
1449                    // Process the batch certificate.
1450                    let id = fmt_id(batch_certificate.id());
1451                    let round = batch_certificate.round();
1452                    if let Err(err) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1453                        warn!(
1454                            "{}",
1455                            flatten_error(err.context(format!(
1456                                "Cannot store a certificate '{id}' for round {round} from '{peer_ip}'"
1457                            )))
1458                        );
1459                    }
1460                });
1461            }
1462        });
1463
1464        // This task periodically tries to move to the next round.
1465        //
1466        // Note: This is necessary to ensure that the primary is not stuck on a previous round
1467        // despite having received enough certificates to advance to the next round.
1468        let self_ = self.clone();
1469        self.spawn(async move {
1470            loop {
1471                // Sleep briefly.
1472                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1473                // If the primary is not synced, then do not increment to the next round.
1474                if !self_.sync.is_synced() {
1475                    trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1476                    continue;
1477                }
1478                // Attempt to increment to the next round.
1479                let current_round = self_.current_round();
1480                let next_round = current_round.saturating_add(1);
1481                // Determine if the quorum threshold is reached for the current round.
1482                let is_quorum_threshold_reached = {
1483                    // Retrieve the certificate authors for the current round.
1484                    let authors = self_.storage.get_certificate_authors_for_round(current_round);
1485                    // If there are no certificates, then skip this check.
1486                    if authors.is_empty() {
1487                        continue;
1488                    }
1489                    // Retrieve the committee lookback for the current round.
1490                    let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(current_round) else {
1491                        warn!("Failed to retrieve the committee lookback for round {current_round}");
1492                        continue;
1493                    };
1494                    // Check if the quorum threshold is reached for the current round.
1495                    committee_lookback.is_quorum_threshold_reached(&authors)
1496                };
1497                // Attempt to increment to the next round if the quorum threshold is reached.
1498                if is_quorum_threshold_reached {
1499                    debug!("Quorum threshold reached for round {current_round}");
1500                    if let Err(err) = self_.try_increment_to_the_next_round(next_round).await {
1501                        warn!("{}", flatten_error(err.context("Failed to increment to the next round")));
1502                    }
1503                }
1504            }
1505        });
1506
1507        // Start a handler to process new unconfirmed solutions.
1508        let self_ = self.clone();
1509        self.spawn(async move {
1510            while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1511                // Compute the checksum for the solution.
1512                let Ok(checksum) = solution.to_checksum::<N>() else {
1513                    error!("Failed to compute the checksum for the unconfirmed solution");
1514                    continue;
1515                };
1516                // Compute the worker ID.
1517                let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1518                    error!("Unable to determine the worker ID for the unconfirmed solution");
1519                    continue;
1520                };
1521                let self_ = self_.clone();
1522                tokio::spawn(async move {
1523                    // Retrieve the worker.
1524                    let worker = &self_.workers[worker_id as usize];
1525                    // Process the unconfirmed solution.
1526                    let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1527                    // Send the result to the callback.
1528                    callback.send(result).ok();
1529                });
1530            }
1531        });
1532
1533        // Start a handler to process new unconfirmed transactions.
1534        let self_ = self.clone();
1535        self.spawn(async move {
1536            while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1537                trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1538                // Compute the checksum for the transaction.
1539                let Ok(checksum) = transaction.to_checksum::<N>() else {
1540                    error!("Failed to compute the checksum for the unconfirmed transaction");
1541                    continue;
1542                };
1543                // Compute the worker ID.
1544                let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1545                    error!("Unable to determine the worker ID for the unconfirmed transaction");
1546                    continue;
1547                };
1548                let self_ = self_.clone();
1549                tokio::spawn(async move {
1550                    // Retrieve the worker.
1551                    let worker = &self_.workers[worker_id as usize];
1552                    // Process the unconfirmed transaction.
1553                    let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1554                    // Send the result to the callback.
1555                    callback.send(result).ok();
1556                });
1557            }
1558        });
1559    }
1560
1561    /// Checks if the proposed batch is expired, and clears the proposed batch if it has expired.
1562    async fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1563        // Check if the proposed batch is timed out or stale.
1564        let is_expired = match self.proposed_batch.read().as_ref() {
1565            Some(proposal) => proposal.round() < self.current_round(),
1566            None => false,
1567        };
1568        // If the batch is expired, clear the proposed batch.
1569        if is_expired {
1570            // Reset the proposed batch.
1571            let proposal = self.proposed_batch.write().take();
1572            if let Some(proposal) = proposal {
1573                debug!("Cleared expired proposal for round {}", proposal.round());
1574                self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1575            }
1576        }
1577        Ok(())
1578    }
1579
1580    /// Increments to the next round.
1581    async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1582        // If the next round is within GC range, then iterate to the penultimate round.
1583        if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1584            let mut fast_forward_round = self.current_round();
1585            // Iterate until the penultimate round is reached.
1586            while fast_forward_round < next_round.saturating_sub(1) {
1587                // Update to the next round in storage.
1588                fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1589                // Clear the proposed batch.
1590                *self.proposed_batch.write() = None;
1591            }
1592        }
1593
1594        // Retrieve the current round.
1595        let current_round = self.current_round();
1596        // Attempt to advance to the next round.
1597        if current_round < next_round {
1598            // If a BFT sender was provided, send the current round to the BFT.
1599            let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
1600                match bft_sender.send_primary_round_to_bft(current_round).await {
1601                    Ok(is_ready) => is_ready,
1602                    Err(err) => {
1603                        let err = err.context("Failed to update the BFT to the next round");
1604                        warn!("{}", flatten_error(&err));
1605                        return Err(err);
1606                    }
1607                }
1608            }
1609            // Otherwise, handle the Narwhal case.
1610            else {
1611                // Update to the next round in storage.
1612                self.storage.increment_to_next_round(current_round)?;
1613                // Set 'is_ready' to 'true'.
1614                true
1615            };
1616
1617            // Log whether the next round is ready.
1618            match is_ready {
1619                true => debug!("Primary is ready to propose the next round"),
1620                false => debug!("Primary is not ready to propose the next round"),
1621            }
1622
1623            // If the node is ready, propose a batch for the next round.
1624            if is_ready {
1625                self.propose_batch().await?;
1626            }
1627        }
1628        Ok(())
1629    }
1630
1631    /// Ensures the primary is signing for the specified batch round.
1632    /// This method is used to ensure: for a given round, as soon as the primary starts proposing,
1633    /// it will no longer sign for the previous round (as it has enough previous certificates to proceed).
1634    fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1635        // Retrieve the current round.
1636        let current_round = self.current_round();
1637        // Ensure the batch round is within GC range of the current round.
1638        if current_round + self.storage.max_gc_rounds() <= batch_round {
1639            bail!("Round {batch_round} is too far in the future")
1640        }
1641        // Ensure the batch round is at or one before the current round.
1642        // Intuition: Our primary has moved on to the next round, but has not necessarily started proposing,
1643        // so we can still sign for the previous round. If we have started proposing, the next check will fail.
1644        if current_round > batch_round + 1 {
1645            bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1646        }
1647        // Check if the primary is still signing for the batch round.
1648        if let Some(signing_round) = self.proposed_batch.read().as_ref().map(|proposal| proposal.round()) {
1649            if signing_round > batch_round {
1650                bail!("Our primary at round {signing_round} is no longer signing for round {batch_round}")
1651            }
1652        }
1653        Ok(())
1654    }
1655
1656    /// Ensure the primary is not creating batch proposals too frequently.
1657    /// This checks that the certificate timestamp for the previous round is within the expected range.
1658    fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1659        // Retrieve the timestamp of the previous timestamp to check against.
1660        let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1661            // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago.
1662            Some(certificate) => certificate.timestamp(),
1663            None => match self.gateway.account().address() == author {
1664                // If we are the author, then ensure the previous proposal was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago.
1665                true => *self.latest_proposed_batch_timestamp.read(),
1666                // If we do not see a previous certificate for the author, then proceed optimistically.
1667                false => return Ok(()),
1668            },
1669        };
1670
1671        // Determine the elapsed time since the previous timestamp.
1672        let elapsed = timestamp
1673            .checked_sub(previous_timestamp)
1674            .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1675        // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago.
1676        match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1677            true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1678            false => Ok(()),
1679        }
1680    }
1681
1682    /// Stores the certified batch and broadcasts it to all validators, returning the certificate.
1683    async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1684        // Create the batch certificate and transmissions.
1685        let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1686        // Convert the transmissions into a HashMap.
1687        // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
1688        let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1689        // Store the certified batch.
1690        let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1691        spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1692        debug!("Stored a batch certificate for round {}", certificate.round());
1693        // If a BFT sender was provided, send the certificate to the BFT.
1694        if let Some(bft_sender) = self.bft_sender.get() {
1695            // Await the callback to continue.
1696            if let Err(err) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1697                let err = err.context("Failed to update the BFT DAG from primary");
1698                warn!("{}", flatten_error(&err));
1699                return Err(err);
1700            };
1701        }
1702        // Broadcast the certified batch to all validators.
1703        self.gateway.broadcast(Event::BatchCertified(certificate.clone().into()));
1704        // Log the certified batch.
1705        let num_transmissions = certificate.transmission_ids().len();
1706        let round = certificate.round();
1707        info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1708        // Record the certification latency (time from batch proposal to certification).
1709        #[cfg(feature = "metrics")]
1710        if let Some(start) = self.batch_propose_start.lock().take() {
1711            metrics::histogram(metrics::bft::BATCH_CERTIFICATION_LATENCY, start.elapsed().as_secs_f64());
1712        }
1713        // Increment to the next round.
1714        self.try_increment_to_the_next_round(round + 1).await
1715    }
1716
1717    /// Inserts the missing transmissions from the proposal into the workers.
1718    fn insert_missing_transmissions_into_workers(
1719        &self,
1720        peer_ip: SocketAddr,
1721        transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1722    ) -> Result<()> {
1723        // Insert the transmissions into the workers.
1724        assign_to_workers(&self.workers, transmissions, |worker, transmission_id, transmission| {
1725            worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1726        })
1727    }
1728
1729    /// Re-inserts the transmissions from the proposal into the workers.
1730    fn reinsert_transmissions_into_workers(
1731        &self,
1732        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1733    ) -> Result<()> {
1734        // Re-insert the transmissions into the workers.
1735        assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1736            worker.reinsert(transmission_id, transmission);
1737        })
1738    }
1739
1740    /// Recursively stores a given batch certificate, after ensuring:
1741    ///   - Ensure the round matches the committee round.
1742    ///   - Ensure the address is a member of the committee.
1743    ///   - Ensure the timestamp is within range.
1744    ///   - Ensure we have all of the transmissions.
1745    ///   - Ensure we have all of the previous certificates.
1746    ///   - Ensure the previous certificates are for the previous round (i.e. round - 1).
1747    ///   - Ensure the previous certificates have reached the quorum threshold.
1748    ///   - Ensure we have not already signed the batch ID.
1749    #[async_recursion::async_recursion]
1750    async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1751        &self,
1752        peer_ip: SocketAddr,
1753        certificate: BatchCertificate<N>,
1754    ) -> Result<()> {
1755        // Retrieve the batch header.
1756        let batch_header = certificate.batch_header();
1757        // Retrieve the batch round.
1758        let batch_round = batch_header.round();
1759
1760        // If the certificate round is outdated, do not store it.
1761        if batch_round <= self.storage.gc_round() {
1762            return Ok(());
1763        }
1764        // If the certificate already exists in storage, return early.
1765        if self.storage.contains_certificate(certificate.id()) {
1766            return Ok(());
1767        }
1768
1769        // If node is not in sync mode and the node is not synced. Then return an error.
1770        if !IS_SYNCING && !self.is_synced() {
1771            bail!(
1772                "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1773                fmt_id(certificate.id())
1774            );
1775        }
1776
1777        // If the peer is ahead, use the batch header to sync up to the peer.
1778        let missing_transmissions =
1779            self.sync_with_batch_header_from_peer::<IS_SYNCING, false>(peer_ip, batch_header).await?;
1780
1781        // Check if the certificate needs to be stored.
1782        if !self.storage.contains_certificate(certificate.id()) {
1783            // Store the batch certificate.
1784            let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1785            spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1786            debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1787            // If a BFT sender was provided, send the round and certificate to the BFT.
1788            if let Some(bft_sender) = self.bft_sender.get() {
1789                // Send the certificate to the BFT.
1790                if let Err(err) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1791                    let err = err.context("Failed to update the BFT DAG from sync");
1792                    warn!("{}", &flatten_error(&err));
1793                    return Err(err);
1794                };
1795            }
1796        }
1797        Ok(())
1798    }
1799
1800    /// Recursively syncs using the given batch header.
1801    async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool, const CHECK_PREVIOUS_CERTIFICATES: bool>(
1802        &self,
1803        peer_ip: SocketAddr,
1804        batch_header: &BatchHeader<N>,
1805    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1806        // Retrieve the batch round.
1807        let batch_round = batch_header.round();
1808
1809        // If the certificate round is outdated, do not store it.
1810        if batch_round <= self.storage.gc_round() {
1811            bail!("Round {batch_round} is too far in the past")
1812        }
1813
1814        // If node is not in sync mode and the node is not synced, then return an error.
1815        if !IS_SYNCING && !self.is_synced() {
1816            bail!(
1817                "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1818                fmt_id(batch_header.batch_id())
1819            );
1820        }
1821
1822        // Determine if quorum threshold is reached on the batch round.
1823        let is_quorum_threshold_reached = {
1824            let authors = self.storage.get_certificate_authors_for_round(batch_round);
1825            let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1826            committee_lookback.is_quorum_threshold_reached(&authors)
1827        };
1828
1829        // Check if our primary should move to the next round.
1830        // Note: Checking that quorum threshold is reached is important for mitigating a race condition,
1831        // whereby Narwhal requires N-f, however the BFT only requires f+1. Without this check, the primary
1832        // will advance to the next round assuming f+1, not N-f, which can lead to a network stall.
1833        let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1834        // Check if our primary is far behind the peer.
1835        let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1836        // If our primary is far behind the peer, update our committee to the batch round.
1837        if is_behind_schedule || is_peer_far_in_future {
1838            // If the batch round is greater than the current committee round, update the committee.
1839            self.try_increment_to_the_next_round(batch_round).await?;
1840        }
1841
1842        // Ensure the primary has all of the transmissions.
1843        let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1844
1845        // Ensure the primary has all of the previous certificates.
1846        let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1847
1848        // Wait for the missing transmissions and previous certificates to be fetched.
1849        let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1850            missing_transmissions_handle,
1851            missing_previous_certificates_handle,
1852        ).with_context(|| format!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}"))?;
1853
1854        // Iterate through the missing previous certificates.
1855        for batch_certificate in missing_previous_certificates {
1856            // Check if the missing previous certificate is valid. This is only
1857            // needed if we are processing an incoming batch header from a peer.
1858            // For incoming certificates, validity is assured by checking the
1859            // root certificate in `process_batch_certificate_from_peer`.
1860            if CHECK_PREVIOUS_CERTIFICATES {
1861                self.storage.check_incoming_certificate(&batch_certificate)?;
1862            }
1863            // Store the batch certificate (recursively fetching any missing previous certificates).
1864            self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?;
1865        }
1866        Ok(missing_transmissions)
1867    }
1868
1869    /// Fetches any missing transmissions for the specified batch header.
1870    /// If a transmission does not exist, it will be fetched from the specified peer IP.
1871    async fn fetch_missing_transmissions(
1872        &self,
1873        peer_ip: SocketAddr,
1874        batch_header: &BatchHeader<N>,
1875    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1876        // If the round is <= the GC round, return early.
1877        if batch_header.round() <= self.storage.gc_round() {
1878            return Ok(Default::default());
1879        }
1880
1881        // Ensure this batch ID is new, otherwise return early.
1882        if self.storage.contains_batch(batch_header.batch_id()) {
1883            trace!("Batch for round {} from peer has already been processed", batch_header.round());
1884            return Ok(Default::default());
1885        }
1886
1887        // Retrieve the workers.
1888        let workers = self.workers.clone();
1889
1890        // Initialize a list for the transmissions.
1891        let mut fetch_transmissions = FuturesUnordered::new();
1892
1893        // Retrieve the number of workers.
1894        let num_workers = self.num_workers();
1895        // Iterate through the transmission IDs.
1896        for transmission_id in batch_header.transmission_ids() {
1897            // If the transmission does not exist in storage, proceed to fetch the transmission.
1898            if !self.storage.contains_transmission(*transmission_id) {
1899                // Determine the worker ID.
1900                let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1901                    bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1902                };
1903                // Retrieve the worker.
1904                let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1905                // Push the callback onto the list.
1906                fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
1907            }
1908        }
1909
1910        // Initialize a set for the transmissions.
1911        let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1912        // Wait for all of the transmissions to be fetched.
1913        while let Some(result) = fetch_transmissions.next().await {
1914            // Retrieve the transmission.
1915            let (transmission_id, transmission) = result?;
1916            // Insert the transmission into the set.
1917            transmissions.insert(transmission_id, transmission);
1918        }
1919        // Return the transmissions.
1920        Ok(transmissions)
1921    }
1922
1923    /// Fetches any missing previous certificates for the specified batch header from the specified peer.
1924    async fn fetch_missing_previous_certificates(
1925        &self,
1926        peer_ip: SocketAddr,
1927        batch_header: &BatchHeader<N>,
1928    ) -> Result<HashSet<BatchCertificate<N>>> {
1929        // Retrieve the round.
1930        let round = batch_header.round();
1931        // If the previous round is 0, or is <= the GC round, return early.
1932        if round == 1 || round <= self.storage.gc_round() + 1 {
1933            return Ok(Default::default());
1934        }
1935
1936        // Fetch the missing previous certificates.
1937        let missing_previous_certificates =
1938            self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
1939        if !missing_previous_certificates.is_empty() {
1940            debug!(
1941                "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
1942                missing_previous_certificates.len(),
1943            );
1944        }
1945        // Return the missing previous certificates.
1946        Ok(missing_previous_certificates)
1947    }
1948
1949    /// Fetches any missing certificates for the specified batch header from the specified peer.
1950    async fn fetch_missing_certificates(
1951        &self,
1952        peer_ip: SocketAddr,
1953        round: u64,
1954        certificate_ids: &IndexSet<Field<N>>,
1955    ) -> Result<HashSet<BatchCertificate<N>>> {
1956        // Initialize a list for the missing certificates.
1957        let mut fetch_certificates = FuturesUnordered::new();
1958        // Initialize a set for the missing certificates.
1959        let mut missing_certificates = HashSet::default();
1960        // Iterate through the certificate IDs.
1961        for certificate_id in certificate_ids {
1962            // Check if the certificate already exists in the ledger.
1963            if self.ledger.contains_certificate(certificate_id)? {
1964                continue;
1965            }
1966            // Check if the certificate already exists in storage.
1967            if self.storage.contains_certificate(*certificate_id) {
1968                continue;
1969            }
1970            // If we have not fully processed the certificate yet, store it.
1971            if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
1972                missing_certificates.insert(certificate);
1973            } else {
1974                // If we do not have the certificate, request it.
1975                trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
1976                // TODO (howardwu): Limit the number of open requests we send to a peer.
1977                // Send an certificate request to the peer.
1978                fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
1979            }
1980        }
1981
1982        // If there are no certificates to fetch, return early with the existing unprocessed certificates.
1983        match fetch_certificates.is_empty() {
1984            true => return Ok(missing_certificates),
1985            false => trace!(
1986                "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
1987                fetch_certificates.len(),
1988            ),
1989        }
1990
1991        // Wait for all of the missing certificates to be fetched.
1992        while let Some(result) = fetch_certificates.next().await {
1993            // Insert the missing certificate into the set.
1994            missing_certificates.insert(result?);
1995        }
1996        // Return the missing certificates.
1997        Ok(missing_certificates)
1998    }
1999}
2000
2001impl<N: Network> Primary<N> {
2002    /// Spawns a task with the given future; it should only be used for long-running tasks.
2003    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
2004        self.handles.lock().push(tokio::spawn(future));
2005    }
2006
2007    /// Shuts down the primary.
2008    pub async fn shut_down(&self) {
2009        info!("Shutting down the primary...");
2010        // Shut down the workers.
2011        self.workers.iter().for_each(|worker| worker.shut_down());
2012        // Abort the tasks.
2013        self.handles.lock().iter().for_each(|handle| handle.abort());
2014        // Save the current proposal cache to disk.
2015        let proposal_cache = {
2016            let proposal = self.proposed_batch.write().take();
2017            let signed_proposals = self.signed_proposals.read().clone();
2018            let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
2019            let pending_certificates = self.storage.get_pending_certificates();
2020            ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
2021        };
2022        if let Err(err) = proposal_cache.store(&self.node_data_dir) {
2023            error!("{}", flatten_error(err.context("Failed to store the current proposal cache")));
2024        }
2025        // Close the gateway.
2026        self.gateway.shut_down().await;
2027    }
2028}
2029
2030#[cfg(test)]
2031mod tests {
2032    use super::*;
2033    use snarkos_node_bft_ledger_service::MockLedgerService;
2034    use snarkos_node_bft_storage_service::BFTMemoryService;
2035    use snarkos_node_sync::{BlockSync, locators::test_helpers::sample_block_locators};
2036    use snarkvm::{
2037        ledger::{
2038            committee::{Committee, MIN_VALIDATOR_STAKE},
2039            test_helpers::sample_execution_transaction_with_fee,
2040        },
2041        prelude::{Address, Signature},
2042    };
2043
2044    use bytes::Bytes;
2045    use indexmap::IndexSet;
2046    use rand::RngCore;
2047
2048    type CurrentNetwork = snarkvm::prelude::MainnetV0;
2049
2050    fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) {
2051        // Create a committee containing the primary's account.
2052        const COMMITTEE_SIZE: usize = 4;
2053        let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
2054        let mut members = IndexMap::new();
2055
2056        for i in 0..COMMITTEE_SIZE {
2057            let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
2058            let account = Account::new(rng).unwrap();
2059
2060            members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
2061            accounts.push((socket_addr, account));
2062        }
2063
2064        (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
2065    }
2066
2067    // Returns a primary and a list of accounts in the configured committee.
2068    fn primary_with_committee(
2069        account_index: usize,
2070        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2071        committee: Committee<CurrentNetwork>,
2072        height: u32,
2073    ) -> Primary<CurrentNetwork> {
2074        let ledger = Arc::new(MockLedgerService::new_at_height(committee, height));
2075        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
2076
2077        // Initialize the primary.
2078        let account = accounts[account_index].1.clone();
2079        let block_sync = Arc::new(BlockSync::new(ledger.clone()));
2080        let mut primary =
2081            Primary::new(account, storage, ledger, block_sync, None, &[], false, NodeDataDir::new_test(None), None)
2082                .unwrap();
2083
2084        // Construct a worker instance.
2085        primary.workers = Arc::from([Worker::new(
2086            0, // id
2087            Arc::new(primary.gateway.clone()),
2088            primary.storage.clone(),
2089            primary.ledger.clone(),
2090            primary.proposed_batch.clone(),
2091        )
2092        .unwrap()]);
2093        for a in accounts.iter().skip(account_index) {
2094            primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
2095        }
2096
2097        primary
2098    }
2099
2100    fn primary_without_handlers(
2101        rng: &mut TestRng,
2102    ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
2103        let (accounts, committee) = sample_committee(rng);
2104        let primary = primary_with_committee(
2105            0, // index of primary's account
2106            &accounts,
2107            committee,
2108            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(),
2109        );
2110
2111        (primary, accounts)
2112    }
2113
2114    // Creates a mock solution.
2115    fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
2116        // Sample a random fake solution ID.
2117        let solution_id = rng.r#gen::<u64>().into();
2118        // Vary the size of the solutions.
2119        let size = rng.gen_range(1024..10 * 1024);
2120        // Sample random fake solution bytes.
2121        let mut vec = vec![0u8; size];
2122        rng.fill_bytes(&mut vec);
2123        let solution = Data::Buffer(Bytes::from(vec));
2124        // Return the solution ID and solution.
2125        (solution_id, solution)
2126    }
2127
2128    // Samples a test transaction.
2129    fn sample_unconfirmed_transaction(
2130        rng: &mut TestRng,
2131    ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
2132        let transaction = sample_execution_transaction_with_fee(false, rng, 0);
2133        let id = transaction.id();
2134
2135        (id, Data::Object(transaction))
2136    }
2137
2138    // Creates a batch proposal with one solution and one transaction.
2139    fn create_test_proposal(
2140        author: &Account<CurrentNetwork>,
2141        committee: Committee<CurrentNetwork>,
2142        round: u64,
2143        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2144        timestamp: i64,
2145        num_transactions: u64,
2146        rng: &mut TestRng,
2147    ) -> Proposal<CurrentNetwork> {
2148        let mut transmission_ids = IndexSet::new();
2149        let mut transmissions = IndexMap::new();
2150
2151        // Prepare the solution and insert into the sets.
2152        let (solution_id, solution) = sample_unconfirmed_solution(rng);
2153        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2154        let solution_transmission_id = (solution_id, solution_checksum).into();
2155        transmission_ids.insert(solution_transmission_id);
2156        transmissions.insert(solution_transmission_id, Transmission::Solution(solution));
2157
2158        // Prepare the transactions and insert into the sets.
2159        for _ in 0..num_transactions {
2160            let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2161            let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2162            let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2163            transmission_ids.insert(transaction_transmission_id);
2164            transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction));
2165        }
2166
2167        // Retrieve the private key.
2168        let private_key = author.private_key();
2169        // Sign the batch header.
2170        let batch_header = BatchHeader::new(
2171            private_key,
2172            round,
2173            timestamp,
2174            committee.id(),
2175            transmission_ids,
2176            previous_certificate_ids,
2177            rng,
2178        )
2179        .unwrap();
2180        // Construct the proposal.
2181        Proposal::new(committee, batch_header, transmissions).unwrap()
2182    }
2183
2184    // Creates a signature of the primary's current proposal for each committee member (excluding
2185    // the primary).
2186    fn peer_signatures_for_proposal(
2187        primary: &Primary<CurrentNetwork>,
2188        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2189        rng: &mut TestRng,
2190    ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
2191        // Each committee member signs the batch.
2192        let mut signatures = Vec::with_capacity(accounts.len() - 1);
2193        for (socket_addr, account) in accounts {
2194            if account.address() == primary.gateway.account().address() {
2195                continue;
2196            }
2197            let batch_id = primary.proposed_batch.read().as_ref().unwrap().batch_id();
2198            let signature = account.sign(&[batch_id], rng).unwrap();
2199            signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
2200        }
2201
2202        signatures
2203    }
2204
2205    /// Creates a signature of the batch ID for each committee member (excluding the primary).
2206    fn peer_signatures_for_batch(
2207        primary_address: Address<CurrentNetwork>,
2208        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2209        batch_id: Field<CurrentNetwork>,
2210        rng: &mut TestRng,
2211    ) -> IndexSet<Signature<CurrentNetwork>> {
2212        let mut signatures = IndexSet::new();
2213        for (_, account) in accounts {
2214            if account.address() == primary_address {
2215                continue;
2216            }
2217            let signature = account.sign(&[batch_id], rng).unwrap();
2218            signatures.insert(signature);
2219        }
2220        signatures
2221    }
2222
2223    // Creates a batch certificate.
2224    fn create_batch_certificate(
2225        primary_address: Address<CurrentNetwork>,
2226        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2227        round: u64,
2228        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2229        rng: &mut TestRng,
2230    ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
2231        let timestamp = now();
2232
2233        let author =
2234            accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
2235        let private_key = author.private_key();
2236
2237        let committee_id = Field::rand(rng);
2238        let (solution_id, solution) = sample_unconfirmed_solution(rng);
2239        let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2240        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2241        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2242
2243        let solution_transmission_id = (solution_id, solution_checksum).into();
2244        let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2245
2246        let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
2247        let transmissions = [
2248            (solution_transmission_id, Transmission::Solution(solution)),
2249            (transaction_transmission_id, Transmission::Transaction(transaction)),
2250        ]
2251        .into();
2252
2253        let batch_header = BatchHeader::new(
2254            private_key,
2255            round,
2256            timestamp,
2257            committee_id,
2258            transmission_ids,
2259            previous_certificate_ids,
2260            rng,
2261        )
2262        .unwrap();
2263        let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
2264        let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
2265        (certificate, transmissions)
2266    }
2267
2268    // Create a certificate chain up to, but not including, the specified round in the primary storage.
2269    fn store_certificate_chain(
2270        primary: &Primary<CurrentNetwork>,
2271        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2272        round: u64,
2273        rng: &mut TestRng,
2274    ) -> IndexSet<Field<CurrentNetwork>> {
2275        let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2276        let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2277        for cur_round in 1..round {
2278            for (_, account) in accounts.iter() {
2279                let (certificate, transmissions) = create_batch_certificate(
2280                    account.address(),
2281                    accounts,
2282                    cur_round,
2283                    previous_certificates.clone(),
2284                    rng,
2285                );
2286                next_certificates.insert(certificate.id());
2287                assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
2288            }
2289
2290            assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
2291            previous_certificates = next_certificates;
2292            next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2293        }
2294
2295        previous_certificates
2296    }
2297
2298    // Insert the account socket addresses into the resolver so that
2299    // they are recognized as "connected".
2300    fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
2301        // First account is primary, which doesn't need to resolve.
2302        for (addr, acct) in accounts.iter().skip(1) {
2303            primary.gateway.resolver().write().insert_peer(*addr, *addr, Some(acct.address()));
2304        }
2305    }
2306
2307    #[tokio::test]
2308    async fn test_propose_batch() {
2309        let mut rng = TestRng::default();
2310        let (primary, _) = primary_without_handlers(&mut rng);
2311
2312        // Check there is no batch currently proposed.
2313        assert!(primary.proposed_batch.read().is_none());
2314
2315        // Generate a solution and a transaction.
2316        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2317        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2318
2319        // Store it on one of the workers.
2320        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2321        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2322
2323        // Try to propose a batch again. This time, it should succeed.
2324        assert!(primary.propose_batch().await.is_ok());
2325        assert!(primary.proposed_batch.read().is_some());
2326    }
2327
2328    #[tokio::test]
2329    async fn test_propose_batch_with_no_transmissions() {
2330        let mut rng = TestRng::default();
2331        let (primary, _) = primary_without_handlers(&mut rng);
2332
2333        // Check there is no batch currently proposed.
2334        assert!(primary.proposed_batch.read().is_none());
2335
2336        // Try to propose a batch with no transmissions.
2337        assert!(primary.propose_batch().await.is_ok());
2338        assert!(primary.proposed_batch.read().is_some());
2339    }
2340
2341    #[tokio::test]
2342    async fn test_propose_batch_in_round() {
2343        let round = 3;
2344        let mut rng = TestRng::default();
2345        let (primary, accounts) = primary_without_handlers(&mut rng);
2346
2347        // Fill primary storage.
2348        store_certificate_chain(&primary, &accounts, round, &mut rng);
2349
2350        // Sleep for a while to ensure the primary is ready to propose the next round.
2351        tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2352
2353        // Generate a solution and a transaction.
2354        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2355        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2356
2357        // Store it on one of the workers.
2358        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2359        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2360
2361        // Propose a batch again. This time, it should succeed.
2362        assert!(primary.propose_batch().await.is_ok());
2363        assert!(primary.proposed_batch.read().is_some());
2364    }
2365
2366    #[tokio::test]
2367    async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2368        let round = 3;
2369        let prev_round = round - 1;
2370        let mut rng = TestRng::default();
2371        let (primary, accounts) = primary_without_handlers(&mut rng);
2372        let peer_account = &accounts[1];
2373        let peer_ip = peer_account.0;
2374
2375        // Fill primary storage.
2376        store_certificate_chain(&primary, &accounts, round, &mut rng);
2377
2378        // Get transmissions from previous certificates.
2379        let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2380
2381        // Track the number of transmissions in the previous round.
2382        let mut num_transmissions_in_previous_round = 0;
2383
2384        // Generate a solution and a transaction.
2385        let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2386        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2387        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2388        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2389
2390        // Store it on one of the workers.
2391        primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2392        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2393
2394        // Check that the worker has 2 transmissions.
2395        assert_eq!(primary.workers[0].num_transmissions(), 2);
2396
2397        // Create certificates for the current round and add the transmissions to the worker before inserting the certificate to storage.
2398        for (_, account) in accounts.iter() {
2399            let (certificate, transmissions) = create_batch_certificate(
2400                account.address(),
2401                &accounts,
2402                round,
2403                previous_certificate_ids.clone(),
2404                &mut rng,
2405            );
2406
2407            // Add the transmissions to the worker.
2408            for (transmission_id, transmission) in transmissions.iter() {
2409                primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2410            }
2411
2412            // Insert the certificate to storage.
2413            num_transmissions_in_previous_round += transmissions.len();
2414            primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2415        }
2416
2417        // Sleep for a while to ensure the primary is ready to propose the next round.
2418        tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2419
2420        // Advance to the next round.
2421        assert!(primary.storage.increment_to_next_round(round).is_ok());
2422
2423        // Check that the worker has `num_transmissions_in_previous_round + 2` transmissions.
2424        assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2425
2426        // Propose the batch.
2427        assert!(primary.propose_batch().await.is_ok());
2428
2429        // Check that the proposal only contains the new transmissions that were not in previous certificates.
2430        let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone();
2431        assert_eq!(proposed_transmissions.len(), 2);
2432        assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2433        assert!(
2434            proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))
2435        );
2436    }
2437
2438    #[tokio::test]
2439    async fn test_propose_batch_over_spend_limit() {
2440        let mut rng = TestRng::default();
2441
2442        // Create a primary to test spend limit backwards compatibility with V4.
2443        let (accounts, committee) = sample_committee(&mut rng);
2444        let primary = primary_with_committee(
2445            0,
2446            &accounts,
2447            committee.clone(),
2448            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2449        );
2450
2451        // Check there is no batch currently proposed.
2452        assert!(primary.proposed_batch.read().is_none());
2453        // Check the workers are empty.
2454        primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty()));
2455
2456        // Generate a solution and transactions.
2457        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2458        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2459
2460        for _i in 0..5 {
2461            let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2462            // Store it on one of the workers.
2463            primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2464        }
2465
2466        // Try to propose a batch again. This time, it should succeed.
2467        assert!(primary.propose_batch().await.is_ok());
2468        // Expect 2/5 transactions to be included in the proposal in addition to the solution.
2469        assert_eq!(primary.proposed_batch.read().as_ref().unwrap().transmissions().len(), 3);
2470        // Check the transmissions were correctly drained from the workers.
2471        assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3);
2472    }
2473
2474    #[tokio::test]
2475    async fn test_batch_propose_from_peer() {
2476        let mut rng = TestRng::default();
2477        let (primary, accounts) = primary_without_handlers(&mut rng);
2478
2479        // Create a valid proposal with an author that isn't the primary.
2480        let round = 1;
2481        let peer_account = &accounts[1];
2482        let peer_ip = peer_account.0;
2483        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2484        let proposal = create_test_proposal(
2485            &peer_account.1,
2486            primary.ledger.current_committee().unwrap(),
2487            round,
2488            Default::default(),
2489            timestamp,
2490            1,
2491            &mut rng,
2492        );
2493
2494        // Make sure the primary is aware of the transmissions in the proposal.
2495        for (transmission_id, transmission) in proposal.transmissions() {
2496            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2497        }
2498
2499        // The author must be known to resolver to pass propose checks.
2500        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2501
2502        // The primary will only consider itself synced if we received
2503        // block locators from a peer.
2504        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2505        primary.sync.testing_only_try_block_sync_testing_only().await;
2506
2507        // Try to process the batch proposal from the peer, should succeed.
2508        assert!(
2509            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
2510        );
2511    }
2512
2513    #[tokio::test]
2514    async fn test_batch_propose_from_peer_when_not_synced() {
2515        let mut rng = TestRng::default();
2516        let (primary, accounts) = primary_without_handlers(&mut rng);
2517
2518        // Create a valid proposal with an author that isn't the primary.
2519        let round = 1;
2520        let peer_account = &accounts[1];
2521        let peer_ip = peer_account.0;
2522        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2523        let proposal = create_test_proposal(
2524            &peer_account.1,
2525            primary.ledger.current_committee().unwrap(),
2526            round,
2527            Default::default(),
2528            timestamp,
2529            1,
2530            &mut rng,
2531        );
2532
2533        // Make sure the primary is aware of the transmissions in the proposal.
2534        for (transmission_id, transmission) in proposal.transmissions() {
2535            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2536        }
2537
2538        // The author must be known to resolver to pass propose checks.
2539        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2540
2541        // Add a high block locator to indicate we are not synced.
2542        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2543
2544        // Try to process the batch proposal from the peer, should fail
2545        assert!(
2546            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2547        );
2548    }
2549
2550    #[tokio::test]
2551    async fn test_batch_propose_from_peer_in_round() {
2552        let round = 2;
2553        let mut rng = TestRng::default();
2554        let (primary, accounts) = primary_without_handlers(&mut rng);
2555
2556        // Generate certificates.
2557        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2558
2559        // Create a valid proposal with an author that isn't the primary.
2560        let peer_account = &accounts[1];
2561        let peer_ip = peer_account.0;
2562        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2563        let proposal = create_test_proposal(
2564            &peer_account.1,
2565            primary.ledger.current_committee().unwrap(),
2566            round,
2567            previous_certificates,
2568            timestamp,
2569            1,
2570            &mut rng,
2571        );
2572
2573        // Make sure the primary is aware of the transmissions in the proposal.
2574        for (transmission_id, transmission) in proposal.transmissions() {
2575            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2576        }
2577
2578        // The author must be known to resolver to pass propose checks.
2579        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2580
2581        // The primary will only consider itself synced if we received
2582        // block locators from a peer.
2583        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2584        primary.sync.testing_only_try_block_sync_testing_only().await;
2585
2586        // Try to process the batch proposal from the peer, should succeed.
2587        primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2588    }
2589
2590    #[tokio::test]
2591    async fn test_batch_propose_from_peer_wrong_round() {
2592        let mut rng = TestRng::default();
2593        let (primary, accounts) = primary_without_handlers(&mut rng);
2594
2595        // Create a valid proposal with an author that isn't the primary.
2596        let round = 1;
2597        let peer_account = &accounts[1];
2598        let peer_ip = peer_account.0;
2599        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2600        let proposal = create_test_proposal(
2601            &peer_account.1,
2602            primary.ledger.current_committee().unwrap(),
2603            round,
2604            Default::default(),
2605            timestamp,
2606            1,
2607            &mut rng,
2608        );
2609
2610        // Make sure the primary is aware of the transmissions in the proposal.
2611        for (transmission_id, transmission) in proposal.transmissions() {
2612            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2613        }
2614
2615        // The author must be known to resolver to pass propose checks.
2616        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2617        // The primary must be considered synced.
2618        primary.sync.testing_only_try_block_sync_testing_only().await;
2619
2620        // Try to process the batch proposal from the peer, should error.
2621        assert!(
2622            primary
2623                .process_batch_propose_from_peer(peer_ip, BatchPropose {
2624                    round: round + 1,
2625                    batch_header: Data::Object(proposal.batch_header().clone())
2626                })
2627                .await
2628                .is_err()
2629        );
2630    }
2631
2632    #[tokio::test]
2633    async fn test_batch_propose_from_peer_in_round_wrong_round() {
2634        let round = 4;
2635        let mut rng = TestRng::default();
2636        let (primary, accounts) = primary_without_handlers(&mut rng);
2637
2638        // Generate certificates.
2639        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2640
2641        // Create a valid proposal with an author that isn't the primary.
2642        let peer_account = &accounts[1];
2643        let peer_ip = peer_account.0;
2644        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2645        let proposal = create_test_proposal(
2646            &peer_account.1,
2647            primary.ledger.current_committee().unwrap(),
2648            round,
2649            previous_certificates,
2650            timestamp,
2651            1,
2652            &mut rng,
2653        );
2654
2655        // Make sure the primary is aware of the transmissions in the proposal.
2656        for (transmission_id, transmission) in proposal.transmissions() {
2657            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2658        }
2659
2660        // The author must be known to resolver to pass propose checks.
2661        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2662        // The primary must be considered synced.
2663        primary.sync.testing_only_try_block_sync_testing_only().await;
2664
2665        // Try to process the batch proposal from the peer, should error.
2666        assert!(
2667            primary
2668                .process_batch_propose_from_peer(peer_ip, BatchPropose {
2669                    round: round + 1,
2670                    batch_header: Data::Object(proposal.batch_header().clone())
2671                })
2672                .await
2673                .is_err()
2674        );
2675    }
2676
2677    /// Tests that the minimum batch delay is enforced as expected, i.e., that proposals with timestamps that are too close to the previous proposal are rejected.
2678    #[tokio::test]
2679    async fn test_batch_propose_from_peer_with_past_timestamp() {
2680        let round = 2;
2681        let mut rng = TestRng::default();
2682        let (primary, accounts) = primary_without_handlers(&mut rng);
2683
2684        // Generate certificates.
2685        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2686
2687        // Create a valid proposal with an author that isn't the primary.
2688        let peer_account = &accounts[1];
2689        let peer_ip = peer_account.0;
2690
2691        // Use a timestamp that is too early.
2692        // Set it to something that is less than the minimum batch delay
2693        // Note, that the minimum delay is currently 1, so this will be equal to the last timestamp
2694        let last_timestamp = primary
2695            .storage
2696            .get_certificate_for_round_with_author(round - 1, peer_account.1.address())
2697            .expect("No previous proposal exists")
2698            .timestamp();
2699        let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY_IN_SECS as i64) - 1;
2700
2701        let proposal = create_test_proposal(
2702            &peer_account.1,
2703            primary.ledger.current_committee().unwrap(),
2704            round,
2705            previous_certificates,
2706            invalid_timestamp,
2707            1,
2708            &mut rng,
2709        );
2710
2711        // Make sure the primary is aware of the transmissions in the proposal.
2712        for (transmission_id, transmission) in proposal.transmissions() {
2713            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2714        }
2715
2716        // The author must be known to resolver to pass propose checks.
2717        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2718        // The primary must be considered synced.
2719        primary.sync.testing_only_try_block_sync_testing_only().await;
2720
2721        // Try to process the batch proposal from the peer, should error.
2722        assert!(
2723            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2724        );
2725    }
2726
2727    /// Check that proposals rejected that have timestamps older than the previous proposal.
2728    #[tokio::test]
2729    async fn test_batch_propose_from_peer_over_spend_limit() {
2730        let mut rng = TestRng::default();
2731
2732        // Create two primaries to test spend limit activation on V5.
2733        let (accounts, committee) = sample_committee(&mut rng);
2734        let primary_v4 = primary_with_committee(
2735            0,
2736            &accounts,
2737            committee.clone(),
2738            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2739        );
2740        let primary_v5 = primary_with_committee(
2741            1,
2742            &accounts,
2743            committee.clone(),
2744            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V5).unwrap(),
2745        );
2746
2747        // Create a valid proposal with an author that isn't the primary.
2748        let round = 1;
2749        let peer_account = &accounts[2];
2750        let peer_ip = peer_account.0;
2751
2752        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2753
2754        let proposal =
2755            create_test_proposal(&peer_account.1, committee, round, Default::default(), timestamp, 4, &mut rng);
2756
2757        // Make sure the primary is aware of the transmissions in the proposal.
2758        for (transmission_id, transmission) in proposal.transmissions() {
2759            primary_v4.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2760            primary_v5.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2761        }
2762
2763        // The author must be known to resolver to pass propose checks.
2764        primary_v4.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2765        primary_v5.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2766
2767        // primary v4 must be considered synced.
2768        primary_v4.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2769        primary_v4.sync.testing_only_try_block_sync_testing_only().await;
2770
2771        // primary v5 must be ocnsidered synced.
2772        primary_v5.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2773        primary_v5.sync.testing_only_try_block_sync_testing_only().await;
2774
2775        // Check the spend limit is enforced from V5 onwards.
2776        assert!(
2777            primary_v4
2778                .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2779                .await
2780                .is_ok()
2781        );
2782
2783        assert!(
2784            primary_v5
2785                .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2786                .await
2787                .is_err()
2788        );
2789    }
2790
2791    #[tokio::test]
2792    async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2793        let round = 3;
2794        let mut rng = TestRng::default();
2795        let (primary, _) = primary_without_handlers(&mut rng);
2796
2797        // Check there is no batch currently proposed.
2798        assert!(primary.proposed_batch.read().is_none());
2799
2800        // Generate a solution and a transaction.
2801        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2802        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2803
2804        // Store it on one of the workers.
2805        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2806        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2807
2808        // Set the proposal lock to a round ahead of the storage.
2809        let old_proposal_lock_round = *primary.propose_lock.lock().await;
2810        *primary.propose_lock.lock().await = round + 1;
2811
2812        // Propose a batch and enforce that it fails.
2813        assert!(primary.propose_batch().await.is_ok());
2814        assert!(primary.proposed_batch.read().is_none());
2815
2816        // Set the proposal lock back to the old round.
2817        *primary.propose_lock.lock().await = old_proposal_lock_round;
2818
2819        // Try to propose a batch again. This time, it should succeed.
2820        assert!(primary.propose_batch().await.is_ok());
2821        assert!(primary.proposed_batch.read().is_some());
2822    }
2823
2824    #[tokio::test]
2825    async fn test_propose_batch_with_storage_round_behind_proposal() {
2826        let round = 5;
2827        let mut rng = TestRng::default();
2828        let (primary, accounts) = primary_without_handlers(&mut rng);
2829
2830        // Generate previous certificates.
2831        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2832
2833        // Create a valid proposal.
2834        let timestamp = now();
2835        let proposal = create_test_proposal(
2836            primary.gateway.account(),
2837            primary.ledger.current_committee().unwrap(),
2838            round + 1,
2839            previous_certificates,
2840            timestamp,
2841            1,
2842            &mut rng,
2843        );
2844
2845        // Store the proposal on the primary.
2846        *primary.proposed_batch.write() = Some(proposal);
2847
2848        // Try to propose a batch will terminate early because the storage is behind the proposal.
2849        assert!(primary.propose_batch().await.is_ok());
2850        assert!(primary.proposed_batch.read().is_some());
2851        assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
2852    }
2853
2854    #[tokio::test(flavor = "multi_thread")]
2855    async fn test_batch_signature_from_peer() {
2856        let mut rng = TestRng::default();
2857        let (primary, accounts) = primary_without_handlers(&mut rng);
2858        map_account_addresses(&primary, &accounts);
2859
2860        // Create a valid proposal.
2861        let round = 1;
2862        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2863        let proposal = create_test_proposal(
2864            primary.gateway.account(),
2865            primary.ledger.current_committee().unwrap(),
2866            round,
2867            Default::default(),
2868            timestamp,
2869            1,
2870            &mut rng,
2871        );
2872
2873        // Store the proposal on the primary.
2874        *primary.proposed_batch.write() = Some(proposal);
2875
2876        // Each committee member signs the batch.
2877        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2878
2879        // Have the primary process the signatures.
2880        for (socket_addr, signature) in signatures {
2881            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2882        }
2883
2884        // Check the certificate was created and stored by the primary.
2885        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2886        // Check the round was incremented.
2887        assert_eq!(primary.current_round(), round + 1);
2888    }
2889
2890    #[tokio::test(flavor = "multi_thread")]
2891    async fn test_batch_signature_from_peer_in_round() {
2892        let round = 5;
2893        let mut rng = TestRng::default();
2894        let (primary, accounts) = primary_without_handlers(&mut rng);
2895        map_account_addresses(&primary, &accounts);
2896
2897        // Generate certificates.
2898        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2899
2900        // Create a valid proposal.
2901        let timestamp = now();
2902        let proposal = create_test_proposal(
2903            primary.gateway.account(),
2904            primary.ledger.current_committee().unwrap(),
2905            round,
2906            previous_certificates,
2907            timestamp,
2908            1,
2909            &mut rng,
2910        );
2911
2912        // Store the proposal on the primary.
2913        *primary.proposed_batch.write() = Some(proposal);
2914
2915        // Each committee member signs the batch.
2916        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2917
2918        // Have the primary process the signatures.
2919        for (socket_addr, signature) in signatures {
2920            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2921        }
2922
2923        // Check the certificate was created and stored by the primary.
2924        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2925        // Check the round was incremented.
2926        assert_eq!(primary.current_round(), round + 1);
2927    }
2928
2929    #[tokio::test]
2930    async fn test_batch_signature_from_peer_no_quorum() {
2931        let mut rng = TestRng::default();
2932        let (primary, accounts) = primary_without_handlers(&mut rng);
2933        map_account_addresses(&primary, &accounts);
2934
2935        // Create a valid proposal.
2936        let round = 1;
2937        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2938        let proposal = create_test_proposal(
2939            primary.gateway.account(),
2940            primary.ledger.current_committee().unwrap(),
2941            round,
2942            Default::default(),
2943            timestamp,
2944            1,
2945            &mut rng,
2946        );
2947
2948        // Store the proposal on the primary.
2949        *primary.proposed_batch.write() = Some(proposal);
2950
2951        // Each committee member signs the batch.
2952        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2953
2954        // Have the primary process only one signature, mimicking a lack of quorum.
2955        let (socket_addr, signature) = signatures.first().unwrap();
2956        primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2957
2958        // Check the certificate was not created and stored by the primary.
2959        assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2960        // Check the round was incremented.
2961        assert_eq!(primary.current_round(), round);
2962    }
2963
2964    #[tokio::test]
2965    async fn test_batch_signature_from_peer_in_round_no_quorum() {
2966        let round = 7;
2967        let mut rng = TestRng::default();
2968        let (primary, accounts) = primary_without_handlers(&mut rng);
2969        map_account_addresses(&primary, &accounts);
2970
2971        // Generate certificates.
2972        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2973
2974        // Create a valid proposal.
2975        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2976        let proposal = create_test_proposal(
2977            primary.gateway.account(),
2978            primary.ledger.current_committee().unwrap(),
2979            round,
2980            previous_certificates,
2981            timestamp,
2982            1,
2983            &mut rng,
2984        );
2985
2986        // Store the proposal on the primary.
2987        *primary.proposed_batch.write() = Some(proposal);
2988
2989        // Each committee member signs the batch.
2990        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2991
2992        // Have the primary process only one signature, mimicking a lack of quorum.
2993        let (socket_addr, signature) = signatures.first().unwrap();
2994        primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2995
2996        // Check the certificate was not created and stored by the primary.
2997        assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2998        // Check the round was incremented.
2999        assert_eq!(primary.current_round(), round);
3000    }
3001
3002    #[tokio::test]
3003    async fn test_insert_certificate_with_aborted_transmissions() {
3004        let round = 3;
3005        let prev_round = round - 1;
3006        let mut rng = TestRng::default();
3007        let (primary, accounts) = primary_without_handlers(&mut rng);
3008        let peer_account = &accounts[1];
3009        let peer_ip = peer_account.0;
3010
3011        // Fill primary storage.
3012        store_certificate_chain(&primary, &accounts, round, &mut rng);
3013
3014        // Get transmissions from previous certificates.
3015        let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
3016
3017        // Generate a solution and a transaction.
3018        let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
3019        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
3020
3021        // Store it on one of the workers.
3022        primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
3023        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
3024
3025        // Check that the worker has 2 transmissions.
3026        assert_eq!(primary.workers[0].num_transmissions(), 2);
3027
3028        // Create certificates for the current round.
3029        let account = accounts[0].1.clone();
3030        let (certificate, transmissions) =
3031            create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
3032        let certificate_id = certificate.id();
3033
3034        // Randomly abort some of the transmissions.
3035        let mut aborted_transmissions = HashSet::new();
3036        let mut transmissions_without_aborted = HashMap::new();
3037        for (transmission_id, transmission) in transmissions.clone() {
3038            match rng.r#gen::<bool>() || aborted_transmissions.is_empty() {
3039                true => {
3040                    // Insert the aborted transmission.
3041                    aborted_transmissions.insert(transmission_id);
3042                }
3043                false => {
3044                    // Insert the transmission without the aborted transmission.
3045                    transmissions_without_aborted.insert(transmission_id, transmission);
3046                }
3047            };
3048        }
3049
3050        // Add the non-aborted transmissions to the worker.
3051        for (transmission_id, transmission) in transmissions_without_aborted.iter() {
3052            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
3053        }
3054
3055        // Check that inserting the transmission with missing transmissions fails.
3056        assert!(
3057            primary
3058                .storage
3059                .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
3060                .is_err()
3061        );
3062        assert!(
3063            primary
3064                .storage
3065                .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
3066                .is_err()
3067        );
3068
3069        // Insert the certificate to storage.
3070        primary
3071            .storage
3072            .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
3073            .unwrap();
3074
3075        // Ensure the certificate exists in storage.
3076        assert!(primary.storage.contains_certificate(certificate_id));
3077        // Ensure that the aborted transmission IDs exist in storage.
3078        for aborted_transmission_id in aborted_transmissions {
3079            assert!(primary.storage.contains_transmission(aborted_transmission_id));
3080            assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
3081        }
3082    }
3083}