Skip to main content

snarkos_node_bft/
primary.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Gateway,
18    MAX_BATCH_DELAY_IN_MS,
19    MAX_WORKERS,
20    MIN_BATCH_DELAY_IN_SECS,
21    PRIMARY_PING_IN_MS,
22    Sync,
23    Transport,
24    WORKER_PING_IN_MS,
25    Worker,
26    events::{BatchPropose, BatchSignature, Event},
27    helpers::{
28        BFTSender,
29        PrimaryReceiver,
30        PrimarySender,
31        Proposal,
32        ProposalCache,
33        SignedProposals,
34        Storage,
35        assign_to_worker,
36        assign_to_workers,
37        fmt_id,
38        init_sync_channels,
39        init_worker_channels,
40        now,
41    },
42    spawn_blocking,
43};
44
45use snarkos_account::Account;
46use snarkos_node_bft_events::PrimaryPing;
47use snarkos_node_bft_ledger_service::LedgerService;
48use snarkos_node_network::PeerPoolHandling;
49use snarkos_node_sync::{BlockSync, DUMMY_SELF_IP, Ping};
50use snarkos_utilities::NodeDataDir;
51
52use snarkvm::{
53    console::{
54        prelude::*,
55        types::{Address, Field},
56    },
57    ledger::{
58        block::Transaction,
59        narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
60        puzzle::{Solution, SolutionID},
61    },
62    prelude::{ConsensusVersion, committee::Committee},
63    utilities::flatten_error,
64};
65
66use anyhow::Context;
67use colored::Colorize;
68use futures::stream::{FuturesUnordered, StreamExt};
69use indexmap::{IndexMap, IndexSet};
70#[cfg(feature = "locktick")]
71use locktick::{
72    parking_lot::{Mutex, RwLock},
73    tokio::Mutex as TMutex,
74};
75#[cfg(not(feature = "locktick"))]
76use parking_lot::{Mutex, RwLock};
77#[cfg(not(feature = "serial"))]
78use rayon::prelude::*;
79use std::{
80    collections::{HashMap, HashSet},
81    future::Future,
82    net::SocketAddr,
83    sync::Arc,
84    time::Duration,
85};
86#[cfg(not(feature = "locktick"))]
87use tokio::sync::Mutex as TMutex;
88use tokio::{sync::OnceCell, task::JoinHandle};
89
90/// A helper type for an optional proposed batch.
91pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
92
93/// The primary logic of a node.
94/// AleoBFT adopts a primary-worker architecture as described in the Narwhal and Tusk paper (Section 4.2).
95#[derive(Clone)]
96pub struct Primary<N: Network> {
97    /// The sync module enables fetching data from other validators.
98    sync: Sync<N>,
99    /// The gateway allows talking to other nodes in the validator set.
100    gateway: Gateway<N>,
101    /// The storage.
102    storage: Storage<N>,
103    /// The ledger service.
104    ledger: Arc<dyn LedgerService<N>>,
105    /// The workers.
106    workers: Arc<[Worker<N>]>,
107    /// The BFT sender.
108    bft_sender: Arc<OnceCell<BFTSender<N>>>,
109    /// The batch proposal, if the primary is currently proposing a batch.
110    proposed_batch: Arc<ProposedBatch<N>>,
111    /// The timestamp of the most recent proposed batch.
112    latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
113    /// The recently-signed batch proposals.
114    signed_proposals: Arc<RwLock<SignedProposals<N>>>,
115    /// The handles for all background tasks spawned by this primary.
116    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
117    /// The lock for propose_batch.
118    propose_lock: Arc<TMutex<u64>>,
119    /// The node configuration directory.
120    node_data_dir: NodeDataDir,
121}
122
123impl<N: Network> Primary<N> {
124    /// The maximum number of unconfirmed transmissions to send to the primary.
125    pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
126
127    /// Initializes a new primary instance.
128    #[allow(clippy::too_many_arguments)]
129    pub fn new(
130        account: Account<N>,
131        storage: Storage<N>,
132        ledger: Arc<dyn LedgerService<N>>,
133        block_sync: Arc<BlockSync<N>>,
134        ip: Option<SocketAddr>,
135        trusted_validators: &[SocketAddr],
136        trusted_peers_only: bool,
137        node_data_dir: NodeDataDir,
138        dev: Option<u16>,
139    ) -> Result<Self> {
140        // Initialize the gateway.
141        let gateway = Gateway::new(
142            account,
143            storage.clone(),
144            ledger.clone(),
145            ip,
146            trusted_validators,
147            trusted_peers_only,
148            node_data_dir.clone(),
149            dev,
150        )?;
151        // Initialize the sync module.
152        let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone(), block_sync);
153
154        // Initialize the primary instance.
155        Ok(Self {
156            sync,
157            gateway,
158            storage,
159            ledger,
160            workers: Arc::from(vec![]),
161            bft_sender: Default::default(),
162            proposed_batch: Default::default(),
163            latest_proposed_batch_timestamp: Default::default(),
164            signed_proposals: Default::default(),
165            handles: Default::default(),
166            propose_lock: Default::default(),
167            node_data_dir,
168        })
169    }
170
171    /// Load the proposal cache file and update the Primary state with the stored data.
172    async fn load_proposal_cache(&self) -> Result<()> {
173        // Fetch the signed proposals from the file system if it exists.
174        match ProposalCache::<N>::exists(&self.node_data_dir) {
175            // If the proposal cache exists, then process the proposal cache.
176            true => match ProposalCache::<N>::load(self.gateway.account().address(), &self.node_data_dir) {
177                Ok(proposal_cache) => {
178                    // Extract the proposal and signed proposals.
179                    let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
180                        proposal_cache.into();
181
182                    // Write the proposed batch.
183                    *self.proposed_batch.write() = proposed_batch;
184                    // Write the signed proposals.
185                    *self.signed_proposals.write() = signed_proposals;
186                    // Writ the propose lock.
187                    *self.propose_lock.lock().await = latest_certificate_round;
188
189                    // Update the storage with the pending certificates.
190                    for certificate in pending_certificates {
191                        let batch_id = certificate.batch_id();
192                        // We use a dummy IP because the node should not need to request from any peers.
193                        // The storage should have stored all the transmissions. If not, we simply
194                        // skip the certificate.
195                        if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
196                        {
197                            let err = err.context(format!(
198                                "Failed to load stored certificate {} from proposal cache",
199                                fmt_id(batch_id)
200                            ));
201                            warn!("{}", &flatten_error(err));
202                        }
203                    }
204                    Ok(())
205                }
206                Err(err) => Err(err.context("Failed to read the signed proposals from the file system")),
207            },
208            // If the proposal cache does not exist, then return early.
209            false => Ok(()),
210        }
211    }
212
213    /// Run the primary instance.
214    pub async fn run(
215        &mut self,
216        ping: Option<Arc<Ping<N>>>,
217        bft_sender: Option<BFTSender<N>>,
218        primary_sender: PrimarySender<N>,
219        primary_receiver: PrimaryReceiver<N>,
220    ) -> Result<()> {
221        info!("Starting the primary instance of the memory pool...");
222
223        // Set the BFT sender.
224        if let Some(bft_sender) = &bft_sender {
225            // Set the BFT sender in the primary.
226            self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set");
227        }
228
229        // Construct a map of the worker senders.
230        let mut worker_senders = IndexMap::new();
231        // Construct a map for the workers.
232        let mut workers = Vec::new();
233        // Initialize the workers.
234        for id in 0..MAX_WORKERS {
235            // Construct the worker channels.
236            let (tx_worker, rx_worker) = init_worker_channels();
237            // Construct the worker instance.
238            let worker = Worker::new(
239                id,
240                Arc::new(self.gateway.clone()),
241                self.storage.clone(),
242                self.ledger.clone(),
243                self.proposed_batch.clone(),
244            )?;
245            // Run the worker instance.
246            worker.run(rx_worker);
247            // Add the worker to the list of workers.
248            workers.push(worker);
249            // Add the worker sender to the map.
250            worker_senders.insert(id, tx_worker);
251        }
252        // Set the workers.
253        self.workers = Arc::from(workers);
254
255        // First, initialize the sync channels.
256        let (sync_sender, sync_receiver) = init_sync_channels();
257        // Next, initialize the sync module and sync the storage from ledger.
258        self.sync.initialize(bft_sender).await?;
259        // Next, load and process the proposal cache before running the sync module.
260        self.load_proposal_cache().await?;
261        // Next, run the sync module.
262        self.sync.run(ping, sync_receiver).await?;
263        // Next, initialize the gateway.
264        self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
265        // Lastly, start the primary handlers.
266        // Note: This ensures the primary does not start communicating before syncing is complete.
267        self.start_handlers(primary_receiver);
268
269        Ok(())
270    }
271
272    /// Returns the current round.
273    pub fn current_round(&self) -> u64 {
274        self.storage.current_round()
275    }
276
277    /// Returns `true` if the primary is synced.
278    pub fn is_synced(&self) -> bool {
279        self.sync.is_synced()
280    }
281
282    /// Returns the gateway.
283    pub const fn gateway(&self) -> &Gateway<N> {
284        &self.gateway
285    }
286
287    /// Returns the storage.
288    pub const fn storage(&self) -> &Storage<N> {
289        &self.storage
290    }
291
292    /// Returns the ledger.
293    pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
294        &self.ledger
295    }
296
297    /// Returns the number of workers.
298    pub fn num_workers(&self) -> u8 {
299        u8::try_from(self.workers.len()).expect("Too many workers")
300    }
301
302    /// Returns the workers.
303    pub const fn workers(&self) -> &Arc<[Worker<N>]> {
304        &self.workers
305    }
306
307    /// Returns the batch proposal of our primary, if one currently exists.
308    pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
309        &self.proposed_batch
310    }
311}
312
313impl<N: Network> Primary<N> {
314    /// Returns the number of unconfirmed transmissions.
315    pub fn num_unconfirmed_transmissions(&self) -> usize {
316        self.workers.iter().map(|worker| worker.num_transmissions()).sum()
317    }
318
319    /// Returns the number of unconfirmed ratifications.
320    pub fn num_unconfirmed_ratifications(&self) -> usize {
321        self.workers.iter().map(|worker| worker.num_ratifications()).sum()
322    }
323
324    /// Returns the number of unconfirmed solutions.
325    pub fn num_unconfirmed_solutions(&self) -> usize {
326        self.workers.iter().map(|worker| worker.num_solutions()).sum()
327    }
328
329    /// Returns the number of unconfirmed transactions.
330    pub fn num_unconfirmed_transactions(&self) -> usize {
331        self.workers.iter().map(|worker| worker.num_transactions()).sum()
332    }
333}
334
335impl<N: Network> Primary<N> {
336    /// Returns the worker transmission IDs.
337    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
338        self.workers.iter().flat_map(|worker| worker.transmission_ids())
339    }
340
341    /// Returns the worker transmissions.
342    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
343        self.workers.iter().flat_map(|worker| worker.transmissions())
344    }
345
346    /// Returns the worker solutions.
347    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
348        self.workers.iter().flat_map(|worker| worker.solutions())
349    }
350
351    /// Returns the worker transactions.
352    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
353        self.workers.iter().flat_map(|worker| worker.transactions())
354    }
355}
356
357impl<N: Network> Primary<N> {
358    /// Clears the worker solutions.
359    pub fn clear_worker_solutions(&self) {
360        self.workers.iter().for_each(Worker::clear_solutions);
361    }
362}
363
364impl<N: Network> Primary<N> {
365    /// Proposes the batch for the current round.
366    ///
367    /// This method performs the following steps:
368    /// 1. Drain the workers.
369    /// 2. Sign the batch.
370    /// 3. Set the batch proposal in the primary.
371    /// 4. Broadcast the batch header to all validators for signing.
372    pub async fn propose_batch(&self) -> Result<()> {
373        // This function isn't re-entrant.
374        let mut lock_guard = self.propose_lock.lock().await;
375
376        // Check if the proposed batch has expired, and clear it if it has expired.
377        if let Err(err) = self
378            .check_proposed_batch_for_expiration()
379            .await
380            .with_context(|| "Failed to check the proposed batch for expiration")
381        {
382            warn!("{}", flatten_error(&err));
383            return Ok(());
384        }
385
386        // Retrieve the current round.
387        let round = self.current_round();
388        // Compute the previous round.
389        let previous_round = round.saturating_sub(1);
390
391        // If the current round is 0, return early.
392        // This can actually never happen, because of the invariant that the current round is never 0
393        // (see [`StorageInner::current_round`]).
394        ensure!(round > 0, "Round 0 cannot have transaction batches");
395
396        // If the current storage round is below the latest proposal round, then return early.
397        if round < *lock_guard {
398            warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
399            return Ok(());
400        }
401
402        // If there is a batch being proposed already,
403        // rebroadcast the batch header to the non-signers, and return early.
404        if let Some(proposal) = self.proposed_batch.read().as_ref() {
405            // Ensure that the storage is caught up to the proposal before proceeding to rebroadcast this.
406            if round < proposal.round()
407                || proposal
408                    .batch_header()
409                    .previous_certificate_ids()
410                    .iter()
411                    .any(|id| !self.storage.contains_certificate(*id))
412            {
413                warn!(
414                    "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
415                    proposal.round(),
416                );
417                return Ok(());
418            }
419            // Construct the event.
420            // TODO(ljedrz): the BatchHeader should be serialized only once in advance before being sent to non-signers.
421            let event = Event::BatchPropose(proposal.batch_header().clone().into());
422            // Iterate through the non-signers.
423            for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
424                // Resolve the address to the peer IP.
425                match self.gateway.resolver().read().get_peer_ip_for_address(address) {
426                    // Resend the batch proposal to the validator for signing.
427                    Some(peer_ip) => {
428                        let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
429                        tokio::spawn(async move {
430                            debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
431                            // Resend the batch proposal to the peer.
432                            if gateway.send(peer_ip, event_).await.is_none() {
433                                warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
434                            }
435                        });
436                    }
437                    None => continue,
438                }
439            }
440            debug!("Proposed batch for round {} is still valid", proposal.round());
441            return Ok(());
442        }
443
444        #[cfg(feature = "metrics")]
445        metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
446
447        // Ensure that the primary does not create a new proposal too quickly.
448        if let Err(err) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) {
449            debug!(
450                "{}",
451                flatten_error(err.context(format!("Primary is safely skipping a batch proposal for round {round}")))
452            );
453            return Ok(());
454        }
455
456        // Ensure the primary has not proposed a batch for this round before.
457        if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
458            // If a BFT sender was provided, attempt to advance the current round.
459            if let Some(bft_sender) = self.bft_sender.get() {
460                match bft_sender.send_primary_round_to_bft(self.current_round()).await {
461                    // 'is_ready' is true if the primary is ready to propose a batch for the next round.
462                    Ok(true) => (), // continue,
463                    // 'is_ready' is false if the primary is not ready to propose a batch for the next round.
464                    Ok(false) => return Ok(()),
465                    // An error occurred while attempting to advance the current round.
466                    Err(err) => {
467                        let err = err.context("Failed to update the BFT to the next round");
468                        warn!("{}", &flatten_error(&err));
469                        return Err(err);
470                    }
471                }
472            }
473            debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
474            return Ok(());
475        }
476
477        // Determine if the current round has been proposed.
478        // Note: Do NOT make this judgment in advance before rebroadcast and round update. Rebroadcasting is
479        // good for network reliability and should not be prevented for the already existing proposed_batch.
480        // If a certificate already exists for the current round, an attempt should be made to advance the
481        // round as early as possible.
482        if round == *lock_guard {
483            debug!("Primary is safely skipping a batch proposal - round {round} already proposed");
484            return Ok(());
485        }
486
487        // Retrieve the committee to check against.
488        let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
489        // Check if the primary is connected to enough validators to reach quorum threshold.
490        {
491            // Retrieve the connected validator addresses.
492            let mut connected_validators = self.gateway.connected_addresses();
493            // Append the primary to the set.
494            connected_validators.insert(self.gateway.account().address());
495            // If quorum threshold is not reached, return early.
496            if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
497                debug!(
498                    "Primary is safely skipping a batch proposal for round {round} {}",
499                    "(please connect to more validators)".dimmed()
500                );
501                trace!("Primary is connected to {} validators", connected_validators.len() - 1);
502                return Ok(());
503            }
504        }
505
506        // Retrieve the previous certificates.
507        let previous_certificates = self.storage.get_certificates_for_round(previous_round);
508
509        // Check if the batch is ready to be proposed.
510        // Note: The primary starts at round 1, and round 0 contains no certificates, by definition.
511        let mut is_ready = previous_round == 0;
512        // If the previous round is not 0, check if the previous certificates have reached the quorum threshold.
513        if previous_round > 0 {
514            // Retrieve the committee lookback for the round.
515            let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
516                bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
517            };
518            // Construct a set over the authors.
519            let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
520            // Check if the previous certificates have reached the quorum threshold.
521            if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
522                is_ready = true;
523            }
524        }
525        // If the batch is not ready to be proposed, return early.
526        if !is_ready {
527            debug!(
528                "Primary is safely skipping a batch proposal for round {round} {}",
529                format!("(previous round {previous_round} has not reached quorum)").dimmed()
530            );
531            return Ok(());
532        }
533
534        // Initialize the map of transmissions.
535        let mut transmissions: IndexMap<_, _> = Default::default();
536        // Track the total execution costs of the batch proposal as it is being constructed.
537        let mut proposal_cost = 0u64;
538        // Note: worker draining and transaction inclusion needs to be thought
539        // through carefully when there is more than one worker. The fairness
540        // provided by one worker (FIFO) is no longer guaranteed with multiple workers.
541        debug_assert_eq!(MAX_WORKERS, 1);
542
543        'outer: for worker in self.workers().iter() {
544            let mut num_worker_transmissions = 0usize;
545
546            while let Some((id, transmission)) = worker.remove_front() {
547                // Check the selected transmissions are below the batch limit.
548                if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
549                    // Reinsert the transmission into the worker.
550                    worker.insert_front(id, transmission);
551                    break 'outer;
552                }
553
554                // Check the max transmissions per worker is not exceeded.
555                if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER {
556                    // Reinsert the transmission into the worker.
557                    worker.insert_front(id, transmission);
558                    continue 'outer;
559                }
560
561                // Check if the ledger already contains the transmission.
562                if self.ledger.contains_transmission(&id).unwrap_or(true) {
563                    trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
564                    continue;
565                }
566
567                // Check if the storage already contain the transmission.
568                // Note: We do not skip if this is the first transmission in the proposal, to ensure that
569                // the primary does not propose a batch with no transmissions.
570                if !transmissions.is_empty() && self.storage.contains_transmission(id) {
571                    trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
572                    continue;
573                }
574
575                // Check the transmission is still valid.
576                match (id, transmission.clone()) {
577                    (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
578                        // Ensure the checksum matches. If not, skip the solution.
579                        if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum)
580                        {
581                            trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id));
582                            continue;
583                        }
584                        // Check if the solution is still valid.
585                        if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
586                            trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
587                            continue;
588                        }
589                    }
590                    (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
591                        // Ensure the checksum matches. If not, skip the transaction.
592                        if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum )
593                        {
594                            trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id));
595                            continue;
596                        }
597
598                        // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
599                        let transaction = spawn_blocking!({
600                            match transaction {
601                                Data::Object(transaction) => Ok(transaction),
602                                Data::Buffer(bytes) => {
603                                    Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
604                                }
605                            }
606                        })?;
607
608                        // TODO (raychu86): Record Commitment - Remove this logic after the next migration height is reached.
609                        // ConsensusVersion V8 Migration logic -
610                        // Do not include deployments in a batch proposal.
611                        let current_block_height = self.ledger.latest_block_height();
612                        let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?;
613                        let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?;
614                        let consensus_version = N::CONSENSUS_VERSION(current_block_height)?;
615                        if current_block_height > consensus_version_v7_height
616                            && current_block_height <= consensus_version_v8_height
617                            && transaction.is_deploy()
618                        {
619                            trace!(
620                                "Proposing - Skipping transaction '{}' - Deployment transactions are not allowed until Consensus V8 (block {consensus_version_v8_height})",
621                                fmt_id(transaction_id)
622                            );
623                            continue;
624                        }
625
626                        // Compute the transaction spent cost (in microcredits).
627                        // Note: We purposefully discard this transaction if we are unable to compute the spent cost.
628                        let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
629                        else {
630                            debug!(
631                                "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost",
632                                fmt_id(transaction_id)
633                            );
634                            continue;
635                        };
636
637                        // Check if the transaction is still valid.
638                        if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
639                            trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
640                            continue;
641                        }
642
643                        // Compute the next proposal cost.
644                        // Note: We purposefully discard this transaction if the proposal cost overflows.
645                        let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
646                            debug!(
647                                "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed",
648                                fmt_id(transaction_id)
649                            );
650                            continue;
651                        };
652
653                        // Check if the next proposal cost exceeds the batch proposal spend limit.
654                        let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(current_block_height);
655                        if next_proposal_cost > batch_spend_limit {
656                            debug!(
657                                "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})",
658                                fmt_id(transaction_id),
659                                batch_spend_limit
660                            );
661
662                            // Reinsert the transmission into the worker.
663                            worker.insert_front(id, transmission);
664                            break 'outer;
665                        }
666
667                        // Update the proposal cost.
668                        proposal_cost = next_proposal_cost;
669                    }
670
671                    // Note: We explicitly forbid including ratifications,
672                    // as the protocol currently does not support ratifications.
673                    (TransmissionID::Ratification, Transmission::Ratification) => continue,
674                    // All other combinations are clearly invalid.
675                    _ => continue,
676                }
677
678                // If the transmission is valid, insert it into the proposal's transmission list.
679                transmissions.insert(id, transmission);
680                num_worker_transmissions = num_worker_transmissions.saturating_add(1);
681            }
682        }
683
684        // Determine the current timestamp.
685        let current_timestamp = now();
686
687        *lock_guard = round;
688
689        /* Proceeding to sign & propose the batch. */
690        info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
691
692        // Retrieve the private key.
693        let private_key = *self.gateway.account().private_key();
694        // Retrieve the committee ID.
695        let committee_id = committee_lookback.id();
696        // Prepare the transmission IDs.
697        let transmission_ids = transmissions.keys().copied().collect();
698        // Prepare the previous batch certificate IDs.
699        let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
700        // Sign the batch header and construct the proposal.
701        let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
702            &private_key,
703            round,
704            current_timestamp,
705            committee_id,
706            transmission_ids,
707            previous_certificate_ids,
708            &mut rand::thread_rng()
709        ))
710        .and_then(|batch_header| {
711            Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
712                .map(|proposal| (batch_header, proposal))
713        })
714        .inspect_err(|_| {
715            // On error, reinsert the transmissions and then propagate the error.
716            if let Err(err) = self.reinsert_transmissions_into_workers(transmissions) {
717                error!("{}", flatten_error(err.context("Failed to reinsert transmissions")));
718            }
719        })?;
720        // Broadcast the batch to all validators for signing.
721        self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
722        // Set the timestamp of the latest proposed batch.
723        *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
724        // Set the proposed batch.
725        *self.proposed_batch.write() = Some(proposal);
726        Ok(())
727    }
728
729    /// Processes a batch propose from a peer.
730    ///
731    /// This method performs the following steps:
732    /// 1. Verify the batch.
733    /// 2. Sign the batch.
734    /// 3. Broadcast the signature back to the validator.
735    ///
736    /// If our primary is ahead of the peer, we will not sign the batch.
737    /// If our primary is behind the peer, but within GC range, we will sync up to the peer's round, and then sign the batch.
738    async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
739        let BatchPropose { round: batch_round, batch_header } = batch_propose;
740
741        // Deserialize the batch header.
742        let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
743        // Ensure the round matches in the batch header.
744        if batch_round != batch_header.round() {
745            // Proceed to disconnect the validator.
746            self.gateway.disconnect(peer_ip);
747            bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
748        }
749
750        // Retrieve the batch author.
751        let batch_author = batch_header.author();
752
753        // Ensure the batch proposal is from the validator.
754        match self.gateway.resolve_to_aleo_addr(peer_ip) {
755            // If the peer is a validator, then ensure the batch proposal is from the validator.
756            Some(address) => {
757                if address != batch_author {
758                    // Proceed to disconnect the validator.
759                    self.gateway.disconnect(peer_ip);
760                    bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
761                }
762            }
763            None => bail!("Batch proposal from a disconnected validator"),
764        }
765        // Ensure the batch author is a current committee member.
766        if !self.gateway.is_authorized_validator_address(batch_author) {
767            // Proceed to disconnect the validator.
768            self.gateway.disconnect(peer_ip);
769            bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
770        }
771        // Ensure the batch proposal is not from the current primary.
772        if self.gateway.account().address() == batch_author {
773            bail!("Invalid peer - proposed batch from myself ({batch_author})");
774        }
775
776        // Ensure that the batch proposal's committee ID matches the expected committee ID.
777        let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
778        if expected_committee_id != batch_header.committee_id() {
779            // Proceed to disconnect the validator.
780            self.gateway.disconnect(peer_ip);
781            bail!(
782                "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
783                batch_header.committee_id()
784            );
785        }
786
787        // Retrieve the cached round and batch ID for this validator.
788        if let Some((signed_round, signed_batch_id, signature)) =
789            self.signed_proposals.read().get(&batch_author).copied()
790        {
791            // If the signed round is ahead of the peer's batch round, do not sign the proposal.
792            // Note: while this may be valid behavior, additional formal analysis and testing will need to be done before allowing it.
793            if signed_round > batch_header.round() {
794                bail!(
795                    "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}",
796                    batch_header.round()
797                );
798            }
799
800            // If the round matches and the batch ID differs, then the validator is malicious.
801            if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
802                bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
803            }
804            // If the round and batch ID matches, then skip signing the batch a second time.
805            // Instead, rebroadcast the cached signature to the peer.
806            if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
807                let gateway = self.gateway.clone();
808                tokio::spawn(async move {
809                    debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
810                    let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
811                    // Resend the batch signature to the peer.
812                    if gateway.send(peer_ip, event).await.is_none() {
813                        warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
814                    }
815                });
816                // Return early.
817                return Ok(());
818            }
819        }
820
821        // Ensure that the batch header doesn't already exist in storage.
822        // Note this is already checked in `check_batch_header`, however we can return early here without creating a blocking task.
823        if self.storage.contains_batch(batch_header.batch_id()) {
824            debug!(
825                "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
826                format!("batch for round {batch_round} already exists in storage").dimmed()
827            );
828            return Ok(());
829        }
830
831        // Compute the previous round.
832        let previous_round = batch_round.saturating_sub(1);
833        // Ensure that the peer did not propose a batch too quickly.
834        if let Err(err) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
835            // Proceed to disconnect the validator.
836            self.gateway.disconnect(peer_ip);
837            return Err(err.context(format!("Malicious behavior of peer '{peer_ip}'")));
838        }
839
840        // Ensure the batch header does not contain any ratifications.
841        if batch_header.contains(TransmissionID::Ratification) {
842            // Proceed to disconnect the validator.
843            self.gateway.disconnect(peer_ip);
844            bail!(
845                "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'",
846            );
847        }
848
849        // If the peer is ahead, use the batch header to sync up to the peer.
850        let mut missing_transmissions =
851            self.sync_with_batch_header_from_peer::<false, true>(peer_ip, &batch_header).await?;
852
853        // Check that the transmission ids match and are not fee transactions.
854        if let Err(err) = cfg_iter_mut!(&mut missing_transmissions).try_for_each(|(transmission_id, transmission)| {
855            // If the transmission is not well-formed, then return early.
856            self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
857        }) {
858            let err = err.context(format!(
859                "Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission"
860            ));
861            debug!("{}", flatten_error(err));
862            return Ok(());
863        }
864
865        // Ensure the batch is for the current round.
866        // This method must be called after fetching previous certificates (above),
867        // and prior to checking the batch header (below).
868        if let Err(e) = self.ensure_is_signing_round(batch_round) {
869            // If the primary is not signing for the peer's round, then return early.
870            debug!("{e} from '{peer_ip}'");
871            return Ok(());
872        }
873
874        // Ensure the batch header from the peer is valid.
875        let (storage, header) = (self.storage.clone(), batch_header.clone());
876
877        // Check the batch header, and return early if it already exists in storage.
878        let Some(missing_transmissions) =
879            spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?
880        else {
881            return Ok(());
882        };
883
884        // Inserts the missing transmissions into the workers.
885        self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
886
887        // Ensure the transaction doesn't bring the proposal above the spend limit.
888        let block_height = self.ledger.latest_block_height() + 1;
889        if N::CONSENSUS_VERSION(block_height)? >= ConsensusVersion::V5 {
890            let mut proposal_cost = 0u64;
891            for transmission_id in batch_header.transmission_ids() {
892                let worker_id = assign_to_worker(*transmission_id, self.num_workers())?;
893                let Some(worker) = self.workers.get(worker_id as usize) else {
894                    debug!("Unable to find worker {worker_id}");
895                    return Ok(());
896                };
897
898                let Some(transmission) = worker.get_transmission(*transmission_id) else {
899                    debug!("Unable to find transmission '{}' in worker '{worker_id}", fmt_id(transmission_id));
900                    return Ok(());
901                };
902
903                // If the transmission is a transaction, compute its execution cost.
904                if let (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) =
905                    (transmission_id, transmission)
906                {
907                    // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
908                    let transaction = spawn_blocking!({
909                        match transaction {
910                            Data::Object(transaction) => Ok(transaction),
911                            Data::Buffer(bytes) => {
912                                Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
913                            }
914                        }
915                    })?;
916
917                    // TODO (raychu86): Record Commitment - Remove this logic after the next migration height is reached.
918                    // ConsensusVersion V8 Migration logic -
919                    // Do not include deployments in a batch proposal.
920                    let consensus_version_v7_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V7)?;
921                    let consensus_version_v8_height = N::CONSENSUS_HEIGHT(ConsensusVersion::V8)?;
922                    let consensus_version = N::CONSENSUS_VERSION(block_height)?;
923                    if block_height > consensus_version_v7_height
924                        && block_height <= consensus_version_v8_height
925                        && transaction.is_deploy()
926                    {
927                        bail!(
928                            "Invalid batch proposal - Batch proposals are not allowed to include deployments until Consensus V8 (block {consensus_version_v8_height})",
929                        )
930                    }
931
932                    // Compute the transaction spent cost (in microcredits).
933                    // Note: We purposefully discard this transaction if we are unable to compute the spent cost.
934                    let Ok(cost) = self.ledger.transaction_spend_in_microcredits(&transaction, consensus_version)
935                    else {
936                        bail!(
937                            "Invalid batch proposal - Unable to compute transaction spent cost on transaction '{}'",
938                            fmt_id(transaction_id)
939                        )
940                    };
941
942                    // Compute the next proposal cost.
943                    // Note: We purposefully discard this transaction if the proposal cost overflows.
944                    let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
945                        bail!(
946                            "Invalid batch proposal - Batch proposal overflowed on transaction '{}'",
947                            fmt_id(transaction_id)
948                        )
949                    };
950
951                    // Check if the next proposal cost exceeds the batch proposal spend limit.
952                    let batch_spend_limit = BatchHeader::<N>::batch_spend_limit(block_height);
953                    if next_proposal_cost > batch_spend_limit {
954                        bail!(
955                            "Malicious peer - Batch proposal from '{peer_ip}' exceeds the spend limit on transaction '{}' ({next_proposal_cost} > {})",
956                            fmt_id(transaction_id),
957                            batch_spend_limit
958                        );
959                    }
960
961                    // Update the proposal cost.
962                    proposal_cost = next_proposal_cost;
963                }
964            }
965        }
966
967        /* Proceeding to sign the batch. */
968
969        // Retrieve the batch ID.
970        let batch_id = batch_header.batch_id();
971        // Sign the batch ID.
972        let account = self.gateway.account().clone();
973        let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::thread_rng()))?;
974
975        // Ensure the proposal has not already been signed.
976        //
977        // Note: Due to the need to sync the batch header with the peer, it is possible
978        // for the primary to receive the same 'BatchPropose' event again, whereby only
979        // one instance of this handler should sign the batch. This check guarantees this.
980        match self.signed_proposals.write().0.entry(batch_author) {
981            std::collections::hash_map::Entry::Occupied(mut entry) => {
982                // If the validator has already signed a batch for this round, then return early,
983                // since, if the peer still has not received the signature, they will request it again,
984                // and the logic at the start of this function will resend the (now cached) signature
985                // to the peer if asked to sign this batch proposal again.
986                if entry.get().0 == batch_round {
987                    return Ok(());
988                }
989                // Otherwise, cache the round, batch ID, and signature for this validator.
990                entry.insert((batch_round, batch_id, signature));
991            }
992            // If the validator has not signed a batch before, then continue.
993            std::collections::hash_map::Entry::Vacant(entry) => {
994                // Cache the round, batch ID, and signature for this validator.
995                entry.insert((batch_round, batch_id, signature));
996            }
997        };
998
999        // Broadcast the signature back to the validator.
1000        let self_ = self.clone();
1001        tokio::spawn(async move {
1002            let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
1003            // Send the batch signature to the peer.
1004            if self_.gateway.send(peer_ip, event).await.is_some() {
1005                debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
1006            }
1007        });
1008
1009        Ok(())
1010    }
1011
1012    /// Processes a batch signature from a peer.
1013    ///
1014    /// This method performs the following steps:
1015    /// 1. Ensure the proposed batch has not expired.
1016    /// 2. Verify the signature, ensuring it corresponds to the proposed batch.
1017    /// 3. Store the signature.
1018    /// 4. Certify the batch if enough signatures have been received.
1019    /// 5. Broadcast the batch certificate to all validators.
1020    async fn process_batch_signature_from_peer(
1021        &self,
1022        peer_ip: SocketAddr,
1023        batch_signature: BatchSignature<N>,
1024    ) -> Result<()> {
1025        // Ensure the proposed batch has not expired, and clear the proposed batch if it has expired.
1026        self.check_proposed_batch_for_expiration().await?;
1027
1028        // Retrieve the signature and timestamp.
1029        let BatchSignature { batch_id, signature } = batch_signature;
1030
1031        // Retrieve the signer.
1032        let signer = signature.to_address();
1033
1034        // Ensure the batch signature is signed by the validator.
1035        if self.gateway.resolve_to_aleo_addr(peer_ip) != Some(signer) {
1036            // Proceed to disconnect the validator.
1037            self.gateway.disconnect(peer_ip);
1038            bail!("Malicious peer - batch signature is from a different validator ({signer})");
1039        }
1040        // Ensure the batch signature is not from the current primary.
1041        if self.gateway.account().address() == signer {
1042            bail!("Invalid peer - received a batch signature from myself ({signer})");
1043        }
1044
1045        let self_ = self.clone();
1046        let Some(proposal) = spawn_blocking!({
1047            // Acquire the write lock.
1048            let mut proposed_batch = self_.proposed_batch.write();
1049            // Add the signature to the batch, and determine if the batch is ready to be certified.
1050            match proposed_batch.as_mut() {
1051                Some(proposal) => {
1052                    // Ensure the batch ID matches the currently proposed batch ID.
1053                    if proposal.batch_id() != batch_id {
1054                        match self_.storage.contains_batch(batch_id) {
1055                            // If this batch was already certified, return early.
1056                            true => {
1057                                debug!(
1058                                    "Primary is safely skipping a a batch signature from {peer_ip} for round {} - batch is already certified",
1059                                    proposal.round()
1060                                );
1061                                return Ok(None);
1062                            }
1063                            // If the batch ID is unknown, return an error.
1064                            false => bail!(
1065                                "Unknown batch ID '{batch_id}', expected '{}' for round {}",
1066                                proposal.batch_id(),
1067                                proposal.round()
1068                            ),
1069                        }
1070                    }
1071                    // Retrieve the committee lookback for the round.
1072                    let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?;
1073                    // Retrieve the address of the validator.
1074                    let Some(signer) = self_.gateway.resolve_to_aleo_addr(peer_ip) else {
1075                        bail!("Signature is from a disconnected validator");
1076                    };
1077                    // Add the signature to the batch.
1078                    let new_signature = proposal.add_signature(signer, signature, &committee_lookback)?;
1079
1080                    if new_signature {
1081                        info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
1082                        // Check if the batch is ready to be certified.
1083                        if !proposal.is_quorum_threshold_reached(&committee_lookback) {
1084                            // If the batch is not ready to be certified, return early.
1085                            return Ok(None);
1086                        }
1087                    } else {
1088                        debug!(
1089                            "Received duplicated signature from '{peer_ip}' for batch {batch_id} in round {round}",
1090                            round = proposal.round()
1091                        );
1092                        return Ok(None);
1093                    }
1094                }
1095                // There is no proposed batch, so return early.
1096                None => return Ok(None),
1097            };
1098            // Retrieve the batch proposal, clearing the proposed batch.
1099            match proposed_batch.take() {
1100                Some(proposal) => Ok(Some(proposal)),
1101                None => Ok(None),
1102            }
1103        })?
1104        else {
1105            return Ok(());
1106        };
1107
1108        /* Proceeding to certify the batch. */
1109
1110        info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
1111
1112        // Retrieve the committee lookback for the round.
1113        let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1114        // Store the certified batch and broadcast it to all validators.
1115        // If there was an error storing the certificate, reinsert the transmissions back into the ready queue.
1116        if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
1117            // Reinsert the transmissions back into the ready queue for the next proposal.
1118            self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1119            return Err(e);
1120        }
1121
1122        #[cfg(feature = "metrics")]
1123        metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
1124        Ok(())
1125    }
1126
1127    /// Processes a batch certificate from a peer.
1128    ///
1129    /// This method performs the following steps:
1130    /// 1. Stores the given batch certificate, after ensuring it is valid.
1131    /// 2. If there are enough certificates to reach quorum threshold for the current round,
1132    ///    then proceed to advance to the next round.
1133    async fn process_batch_certificate_from_peer(
1134        &self,
1135        peer_ip: SocketAddr,
1136        certificate: BatchCertificate<N>,
1137    ) -> Result<()> {
1138        // Ensure the batch certificate is from an authorized validator.
1139        if !self.gateway.is_authorized_validator_ip(peer_ip) {
1140            // Proceed to disconnect the validator.
1141            self.gateway.disconnect(peer_ip);
1142            bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
1143        }
1144        // Ensure storage does not already contain the certificate.
1145        if self.storage.contains_certificate(certificate.id()) {
1146            return Ok(());
1147        // Otherwise, ensure ephemeral storage contains the certificate.
1148        } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
1149            self.storage.insert_unprocessed_certificate(certificate.clone())?;
1150        }
1151
1152        // Retrieve the batch certificate author.
1153        let author = certificate.author();
1154        // Retrieve the batch certificate round.
1155        let certificate_round = certificate.round();
1156        // Retrieve the batch certificate committee ID.
1157        let committee_id = certificate.committee_id();
1158
1159        // Ensure the batch certificate is not from the current primary.
1160        if self.gateway.account().address() == author {
1161            bail!("Received a batch certificate for myself ({author})");
1162        }
1163
1164        // Ensure that the incoming certificate is valid.
1165        self.storage.check_incoming_certificate(&certificate)?;
1166
1167        // Store the certificate, after ensuring it is valid above.
1168        // The following call recursively fetches and stores
1169        // the previous certificates referenced from this certificate.
1170        // It is critical to make the following call this after validating the certificate above.
1171        // The reason is that a sequence of malformed certificates,
1172        // with references to previous certificates with non-decreasing rounds,
1173        // cause the recursive fetching of certificates to crash the validator due to resource exhaustion.
1174        // Note that if the following call, if not returning an error, guarantees the backward closure of the DAG
1175        // (i.e. that all the referenced previous certificates are in the DAG before storing this one),
1176        // then all the validity checks in [`Storage::check_certificate`] should be redundant.
1177        // TODO: eliminate those redundant checks
1178        self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
1179
1180        // If there are enough certificates to reach quorum threshold for the certificate round,
1181        // then proceed to advance to the next round.
1182
1183        // Retrieve the committee lookback.
1184        let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
1185
1186        // Retrieve the certificate authors.
1187        let authors = self.storage.get_certificate_authors_for_round(certificate_round);
1188        // Check if the certificates have reached the quorum threshold.
1189        let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
1190
1191        // Ensure that the batch certificate's committee ID matches the expected committee ID.
1192        let expected_committee_id = committee_lookback.id();
1193        if expected_committee_id != committee_id {
1194            // Proceed to disconnect the validator.
1195            self.gateway.disconnect(peer_ip);
1196            bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
1197        }
1198
1199        // Determine if we are currently proposing a round that is relevant.
1200        // Note: This is important, because while our peers have advanced,
1201        // they may not be proposing yet, and thus still able to sign our proposed batch.
1202        let should_advance = match &*self.proposed_batch.read() {
1203            // We advance if the proposal round is less than the current round that was just certified.
1204            Some(proposal) => proposal.round() < certificate_round,
1205            // If there's no proposal, we consider advancing.
1206            None => true,
1207        };
1208
1209        // Retrieve the current round.
1210        let current_round = self.current_round();
1211
1212        // Determine whether to advance to the next round.
1213        if is_quorum && should_advance && certificate_round >= current_round {
1214            // If we have reached the quorum threshold and the round should advance, then proceed to the next round.
1215            self.try_increment_to_the_next_round(current_round + 1).await?;
1216        }
1217        Ok(())
1218    }
1219}
1220
1221impl<N: Network> Primary<N> {
1222    /// Starts the primary handlers.
1223    ///
1224    /// For each receiver in the `primary_receiver` struct, there will be a dedicated task
1225    /// that awaits new data and handles it accordingly.
1226    /// Additionally, this spawns a task that periodically issues PrimaryPings and one that periodically
1227    /// tries to move the the next round of batches.
1228    ///
1229    /// This function is called exactly once, in `Self::run()`.
1230    fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
1231        let PrimaryReceiver {
1232            mut rx_batch_propose,
1233            mut rx_batch_signature,
1234            mut rx_batch_certified,
1235            mut rx_primary_ping,
1236            mut rx_unconfirmed_solution,
1237            mut rx_unconfirmed_transaction,
1238        } = primary_receiver;
1239
1240        // Start the primary ping sender.
1241        let self_ = self.clone();
1242        self.spawn(async move {
1243            loop {
1244                // Sleep briefly.
1245                tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
1246
1247                // Retrieve the block locators.
1248                let self__ = self_.clone();
1249                let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1250                    Ok(block_locators) => block_locators,
1251                    Err(e) => {
1252                        warn!("Failed to retrieve block locators - {e}");
1253                        continue;
1254                    }
1255                };
1256
1257                // Retrieve the latest certificate of the primary.
1258                let primary_certificate = {
1259                    // Retrieve the primary address.
1260                    let primary_address = self_.gateway.account().address();
1261
1262                    // Iterate backwards from the latest round to find the primary certificate.
1263                    let mut certificate = None;
1264                    let mut current_round = self_.current_round();
1265                    while certificate.is_none() {
1266                        // If the current round is 0, then break the while loop.
1267                        if current_round == 0 {
1268                            break;
1269                        }
1270                        // Retrieve the primary certificates.
1271                        if let Some(primary_certificate) =
1272                            self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1273                        {
1274                            certificate = Some(primary_certificate);
1275                        // If the primary certificate was not found, decrement the round.
1276                        } else {
1277                            current_round = current_round.saturating_sub(1);
1278                        }
1279                    }
1280
1281                    // Determine if the primary certificate was found.
1282                    match certificate {
1283                        Some(certificate) => certificate,
1284                        // Skip this iteration of the loop (do not send a primary ping).
1285                        None => continue,
1286                    }
1287                };
1288
1289                // Construct the primary ping.
1290                let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1291                // Broadcast the event.
1292                self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1293            }
1294        });
1295
1296        // Start the primary ping handler.
1297        let self_ = self.clone();
1298        self.spawn(async move {
1299            while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1300                // If the primary is not synced, then do not process the primary ping.
1301                if self_.sync.is_synced() {
1302                    trace!("Processing new primary ping from '{peer_ip}'");
1303                } else {
1304                    trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1305                    continue;
1306                }
1307
1308                // Spawn a task to process the primary certificate.
1309                {
1310                    let self_ = self_.clone();
1311                    tokio::spawn(async move {
1312                        // Deserialize the primary certificate in the primary ping.
1313                        let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1314                        else {
1315                            warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1316                            return;
1317                        };
1318                        // Process the primary certificate.
1319                        let id = fmt_id(primary_certificate.id());
1320                        let round = primary_certificate.round();
1321                        if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1322                            warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}");
1323                        }
1324                    });
1325                }
1326            }
1327        });
1328
1329        // Start the worker ping(s).
1330        let self_ = self.clone();
1331        self.spawn(async move {
1332            loop {
1333                tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await;
1334                // If the primary is not synced, then do not broadcast the worker ping(s).
1335                if !self_.sync.is_synced() {
1336                    trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1337                    continue;
1338                }
1339                // Broadcast the worker ping(s).
1340                for worker in self_.workers.iter() {
1341                    worker.broadcast_ping();
1342                }
1343            }
1344        });
1345
1346        // Start the batch proposer.
1347        let self_ = self.clone();
1348        self.spawn(async move {
1349            loop {
1350                // Sleep briefly, but longer than if there were no batch.
1351                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1352                let current_round = self_.current_round();
1353                // If the primary is not synced, then do not propose a batch.
1354                if !self_.sync.is_synced() {
1355                    debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed());
1356                    continue;
1357                }
1358                // A best-effort attempt to skip the scheduled batch proposal if
1359                // round progression already triggered one.
1360                if self_.propose_lock.try_lock().is_err() {
1361                    trace!(
1362                        "Skipping batch proposal for round {current_round} {}",
1363                        "(node is already proposing)".dimmed()
1364                    );
1365                    continue;
1366                };
1367                // If there is no proposed batch, attempt to propose a batch.
1368                // Note: Do NOT spawn a task around this function call. Proposing a batch is a critical path,
1369                // and only one batch needs to be proposed at a time.
1370                if let Err(e) = self_.propose_batch().await {
1371                    warn!("Cannot propose a batch - {e}");
1372                }
1373            }
1374        });
1375
1376        // Start the proposed batch handler.
1377        let self_ = self.clone();
1378        self.spawn(async move {
1379            while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1380                // If the primary is not synced, then do not sign the batch.
1381                if !self_.sync.is_synced() {
1382                    trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1383                    continue;
1384                }
1385                // Spawn a task to process the proposed batch.
1386                let self_ = self_.clone();
1387                tokio::spawn(async move {
1388                    // Process the batch proposal.
1389                    let round = batch_propose.round;
1390                    if let Err(e) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1391                        warn!("Cannot sign a batch at round {round} from '{peer_ip}' - {e}");
1392                    }
1393                });
1394            }
1395        });
1396
1397        // Start the batch signature handler.
1398        let self_ = self.clone();
1399        self.spawn(async move {
1400            while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1401                // If the primary is not synced, then do not store the signature.
1402                if !self_.sync.is_synced() {
1403                    trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1404                    continue;
1405                }
1406                // Process the batch signature.
1407                // Note: Do NOT spawn a task around this function call. Processing signatures from peers
1408                // is a critical path, and we should only store the minimum required number of signatures.
1409                // In addition, spawning a task can cause concurrent processing of signatures (even with a lock),
1410                // which means the RwLock for the proposed batch must become a 'tokio::sync' to be safe.
1411                let id = fmt_id(batch_signature.batch_id);
1412                if let Err(err) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1413                    let err = err.context(format!("Cannot store a signature for batch '{id}' from '{peer_ip}'"));
1414                    warn!("{}", flatten_error(err));
1415                }
1416            }
1417        });
1418
1419        // Start the certified batch handler.
1420        let self_ = self.clone();
1421        self.spawn(async move {
1422            while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1423                // If the primary is not synced, then do not store the certificate.
1424                if !self_.sync.is_synced() {
1425                    trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1426                    continue;
1427                }
1428                // Spawn a task to process the batch certificate.
1429                let self_ = self_.clone();
1430                tokio::spawn(async move {
1431                    // Deserialize the batch certificate.
1432                    let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1433                        warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1434                        return;
1435                    };
1436                    // Process the batch certificate.
1437                    let id = fmt_id(batch_certificate.id());
1438                    let round = batch_certificate.round();
1439                    if let Err(err) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1440                        warn!(
1441                            "{}",
1442                            flatten_error(err.context(format!(
1443                                "Cannot store a certificate '{id}' for round {round} from '{peer_ip}'"
1444                            )))
1445                        );
1446                    }
1447                });
1448            }
1449        });
1450
1451        // This task periodically tries to move to the next round.
1452        //
1453        // Note: This is necessary to ensure that the primary is not stuck on a previous round
1454        // despite having received enough certificates to advance to the next round.
1455        let self_ = self.clone();
1456        self.spawn(async move {
1457            loop {
1458                // Sleep briefly.
1459                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1460                // If the primary is not synced, then do not increment to the next round.
1461                if !self_.sync.is_synced() {
1462                    trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1463                    continue;
1464                }
1465                // Attempt to increment to the next round.
1466                let current_round = self_.current_round();
1467                let next_round = current_round.saturating_add(1);
1468                // Determine if the quorum threshold is reached for the current round.
1469                let is_quorum_threshold_reached = {
1470                    // Retrieve the certificate authors for the current round.
1471                    let authors = self_.storage.get_certificate_authors_for_round(current_round);
1472                    // If there are no certificates, then skip this check.
1473                    if authors.is_empty() {
1474                        continue;
1475                    }
1476                    // Retrieve the committee lookback for the current round.
1477                    let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(current_round) else {
1478                        warn!("Failed to retrieve the committee lookback for round {current_round}");
1479                        continue;
1480                    };
1481                    // Check if the quorum threshold is reached for the current round.
1482                    committee_lookback.is_quorum_threshold_reached(&authors)
1483                };
1484                // Attempt to increment to the next round if the quorum threshold is reached.
1485                if is_quorum_threshold_reached {
1486                    debug!("Quorum threshold reached for round {current_round}");
1487                    if let Err(err) = self_.try_increment_to_the_next_round(next_round).await {
1488                        warn!("{}", flatten_error(err.context("Failed to increment to the next round")));
1489                    }
1490                }
1491            }
1492        });
1493
1494        // Start a handler to process new unconfirmed solutions.
1495        let self_ = self.clone();
1496        self.spawn(async move {
1497            while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1498                // Compute the checksum for the solution.
1499                let Ok(checksum) = solution.to_checksum::<N>() else {
1500                    error!("Failed to compute the checksum for the unconfirmed solution");
1501                    continue;
1502                };
1503                // Compute the worker ID.
1504                let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1505                    error!("Unable to determine the worker ID for the unconfirmed solution");
1506                    continue;
1507                };
1508                let self_ = self_.clone();
1509                tokio::spawn(async move {
1510                    // Retrieve the worker.
1511                    let worker = &self_.workers[worker_id as usize];
1512                    // Process the unconfirmed solution.
1513                    let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1514                    // Send the result to the callback.
1515                    callback.send(result).ok();
1516                });
1517            }
1518        });
1519
1520        // Start a handler to process new unconfirmed transactions.
1521        let self_ = self.clone();
1522        self.spawn(async move {
1523            while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1524                trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1525                // Compute the checksum for the transaction.
1526                let Ok(checksum) = transaction.to_checksum::<N>() else {
1527                    error!("Failed to compute the checksum for the unconfirmed transaction");
1528                    continue;
1529                };
1530                // Compute the worker ID.
1531                let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1532                    error!("Unable to determine the worker ID for the unconfirmed transaction");
1533                    continue;
1534                };
1535                let self_ = self_.clone();
1536                tokio::spawn(async move {
1537                    // Retrieve the worker.
1538                    let worker = &self_.workers[worker_id as usize];
1539                    // Process the unconfirmed transaction.
1540                    let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1541                    // Send the result to the callback.
1542                    callback.send(result).ok();
1543                });
1544            }
1545        });
1546    }
1547
1548    /// Checks if the proposed batch is expired, and clears the proposed batch if it has expired.
1549    async fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1550        // Check if the proposed batch is timed out or stale.
1551        let is_expired = match self.proposed_batch.read().as_ref() {
1552            Some(proposal) => proposal.round() < self.current_round(),
1553            None => false,
1554        };
1555        // If the batch is expired, clear the proposed batch.
1556        if is_expired {
1557            // Reset the proposed batch.
1558            let proposal = self.proposed_batch.write().take();
1559            if let Some(proposal) = proposal {
1560                debug!("Cleared expired proposal for round {}", proposal.round());
1561                self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1562            }
1563        }
1564        Ok(())
1565    }
1566
1567    /// Increments to the next round.
1568    async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1569        // If the next round is within GC range, then iterate to the penultimate round.
1570        if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1571            let mut fast_forward_round = self.current_round();
1572            // Iterate until the penultimate round is reached.
1573            while fast_forward_round < next_round.saturating_sub(1) {
1574                // Update to the next round in storage.
1575                fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1576                // Clear the proposed batch.
1577                *self.proposed_batch.write() = None;
1578            }
1579        }
1580
1581        // Retrieve the current round.
1582        let current_round = self.current_round();
1583        // Attempt to advance to the next round.
1584        if current_round < next_round {
1585            // If a BFT sender was provided, send the current round to the BFT.
1586            let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
1587                match bft_sender.send_primary_round_to_bft(current_round).await {
1588                    Ok(is_ready) => is_ready,
1589                    Err(err) => {
1590                        let err = err.context("Failed to update the BFT to the next round");
1591                        warn!("{}", flatten_error(&err));
1592                        return Err(err);
1593                    }
1594                }
1595            }
1596            // Otherwise, handle the Narwhal case.
1597            else {
1598                // Update to the next round in storage.
1599                self.storage.increment_to_next_round(current_round)?;
1600                // Set 'is_ready' to 'true'.
1601                true
1602            };
1603
1604            // Log whether the next round is ready.
1605            match is_ready {
1606                true => debug!("Primary is ready to propose the next round"),
1607                false => debug!("Primary is not ready to propose the next round"),
1608            }
1609
1610            // If the node is ready, propose a batch for the next round.
1611            if is_ready {
1612                self.propose_batch().await?;
1613            }
1614        }
1615        Ok(())
1616    }
1617
1618    /// Ensures the primary is signing for the specified batch round.
1619    /// This method is used to ensure: for a given round, as soon as the primary starts proposing,
1620    /// it will no longer sign for the previous round (as it has enough previous certificates to proceed).
1621    fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1622        // Retrieve the current round.
1623        let current_round = self.current_round();
1624        // Ensure the batch round is within GC range of the current round.
1625        if current_round + self.storage.max_gc_rounds() <= batch_round {
1626            bail!("Round {batch_round} is too far in the future")
1627        }
1628        // Ensure the batch round is at or one before the current round.
1629        // Intuition: Our primary has moved on to the next round, but has not necessarily started proposing,
1630        // so we can still sign for the previous round. If we have started proposing, the next check will fail.
1631        if current_round > batch_round + 1 {
1632            bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1633        }
1634        // Check if the primary is still signing for the batch round.
1635        if let Some(signing_round) = self.proposed_batch.read().as_ref().map(|proposal| proposal.round()) {
1636            if signing_round > batch_round {
1637                bail!("Our primary at round {signing_round} is no longer signing for round {batch_round}")
1638            }
1639        }
1640        Ok(())
1641    }
1642
1643    /// Ensure the primary is not creating batch proposals too frequently.
1644    /// This checks that the certificate timestamp for the previous round is within the expected range.
1645    fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1646        // Retrieve the timestamp of the previous timestamp to check against.
1647        let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1648            // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago.
1649            Some(certificate) => certificate.timestamp(),
1650            None => match self.gateway.account().address() == author {
1651                // If we are the author, then ensure the previous proposal was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago.
1652                true => *self.latest_proposed_batch_timestamp.read(),
1653                // If we do not see a previous certificate for the author, then proceed optimistically.
1654                false => return Ok(()),
1655            },
1656        };
1657
1658        // Determine the elapsed time since the previous timestamp.
1659        let elapsed = timestamp
1660            .checked_sub(previous_timestamp)
1661            .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1662        // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_SECS` seconds ago.
1663        match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1664            true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1665            false => Ok(()),
1666        }
1667    }
1668
1669    /// Stores the certified batch and broadcasts it to all validators, returning the certificate.
1670    async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1671        // Create the batch certificate and transmissions.
1672        let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1673        // Convert the transmissions into a HashMap.
1674        // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
1675        let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1676        // Store the certified batch.
1677        let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1678        spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1679        debug!("Stored a batch certificate for round {}", certificate.round());
1680        // If a BFT sender was provided, send the certificate to the BFT.
1681        if let Some(bft_sender) = self.bft_sender.get() {
1682            // Await the callback to continue.
1683            if let Err(err) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1684                let err = err.context("Failed to update the BFT DAG from primary");
1685                warn!("{}", flatten_error(&err));
1686                return Err(err);
1687            };
1688        }
1689        // Broadcast the certified batch to all validators.
1690        self.gateway.broadcast(Event::BatchCertified(certificate.clone().into()));
1691        // Log the certified batch.
1692        let num_transmissions = certificate.transmission_ids().len();
1693        let round = certificate.round();
1694        info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1695        // Increment to the next round.
1696        self.try_increment_to_the_next_round(round + 1).await
1697    }
1698
1699    /// Inserts the missing transmissions from the proposal into the workers.
1700    fn insert_missing_transmissions_into_workers(
1701        &self,
1702        peer_ip: SocketAddr,
1703        transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1704    ) -> Result<()> {
1705        // Insert the transmissions into the workers.
1706        assign_to_workers(&self.workers, transmissions, |worker, transmission_id, transmission| {
1707            worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1708        })
1709    }
1710
1711    /// Re-inserts the transmissions from the proposal into the workers.
1712    fn reinsert_transmissions_into_workers(
1713        &self,
1714        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1715    ) -> Result<()> {
1716        // Re-insert the transmissions into the workers.
1717        assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1718            worker.reinsert(transmission_id, transmission);
1719        })
1720    }
1721
1722    /// Recursively stores a given batch certificate, after ensuring:
1723    ///   - Ensure the round matches the committee round.
1724    ///   - Ensure the address is a member of the committee.
1725    ///   - Ensure the timestamp is within range.
1726    ///   - Ensure we have all of the transmissions.
1727    ///   - Ensure we have all of the previous certificates.
1728    ///   - Ensure the previous certificates are for the previous round (i.e. round - 1).
1729    ///   - Ensure the previous certificates have reached the quorum threshold.
1730    ///   - Ensure we have not already signed the batch ID.
1731    #[async_recursion::async_recursion]
1732    async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1733        &self,
1734        peer_ip: SocketAddr,
1735        certificate: BatchCertificate<N>,
1736    ) -> Result<()> {
1737        // Retrieve the batch header.
1738        let batch_header = certificate.batch_header();
1739        // Retrieve the batch round.
1740        let batch_round = batch_header.round();
1741
1742        // If the certificate round is outdated, do not store it.
1743        if batch_round <= self.storage.gc_round() {
1744            return Ok(());
1745        }
1746        // If the certificate already exists in storage, return early.
1747        if self.storage.contains_certificate(certificate.id()) {
1748            return Ok(());
1749        }
1750
1751        // If node is not in sync mode and the node is not synced. Then return an error.
1752        if !IS_SYNCING && !self.is_synced() {
1753            bail!(
1754                "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1755                fmt_id(certificate.id())
1756            );
1757        }
1758
1759        // If the peer is ahead, use the batch header to sync up to the peer.
1760        let missing_transmissions =
1761            self.sync_with_batch_header_from_peer::<IS_SYNCING, false>(peer_ip, batch_header).await?;
1762
1763        // Check if the certificate needs to be stored.
1764        if !self.storage.contains_certificate(certificate.id()) {
1765            // Store the batch certificate.
1766            let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1767            spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1768            debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1769            // If a BFT sender was provided, send the round and certificate to the BFT.
1770            if let Some(bft_sender) = self.bft_sender.get() {
1771                // Send the certificate to the BFT.
1772                if let Err(err) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1773                    let err = err.context("Failed to update the BFT DAG from sync");
1774                    warn!("{}", &flatten_error(&err));
1775                    return Err(err);
1776                };
1777            }
1778        }
1779        Ok(())
1780    }
1781
1782    /// Recursively syncs using the given batch header.
1783    async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool, const CHECK_PREVIOUS_CERTIFICATES: bool>(
1784        &self,
1785        peer_ip: SocketAddr,
1786        batch_header: &BatchHeader<N>,
1787    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1788        // Retrieve the batch round.
1789        let batch_round = batch_header.round();
1790
1791        // If the certificate round is outdated, do not store it.
1792        if batch_round <= self.storage.gc_round() {
1793            bail!("Round {batch_round} is too far in the past")
1794        }
1795
1796        // If node is not in sync mode and the node is not synced, then return an error.
1797        if !IS_SYNCING && !self.is_synced() {
1798            bail!(
1799                "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1800                fmt_id(batch_header.batch_id())
1801            );
1802        }
1803
1804        // Determine if quorum threshold is reached on the batch round.
1805        let is_quorum_threshold_reached = {
1806            let authors = self.storage.get_certificate_authors_for_round(batch_round);
1807            let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1808            committee_lookback.is_quorum_threshold_reached(&authors)
1809        };
1810
1811        // Check if our primary should move to the next round.
1812        // Note: Checking that quorum threshold is reached is important for mitigating a race condition,
1813        // whereby Narwhal requires N-f, however the BFT only requires f+1. Without this check, the primary
1814        // will advance to the next round assuming f+1, not N-f, which can lead to a network stall.
1815        let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1816        // Check if our primary is far behind the peer.
1817        let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1818        // If our primary is far behind the peer, update our committee to the batch round.
1819        if is_behind_schedule || is_peer_far_in_future {
1820            // If the batch round is greater than the current committee round, update the committee.
1821            self.try_increment_to_the_next_round(batch_round).await?;
1822        }
1823
1824        // Ensure the primary has all of the transmissions.
1825        let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1826
1827        // Ensure the primary has all of the previous certificates.
1828        let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1829
1830        // Wait for the missing transmissions and previous certificates to be fetched.
1831        let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1832            missing_transmissions_handle,
1833            missing_previous_certificates_handle,
1834        ).with_context(|| format!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}"))?;
1835
1836        // Iterate through the missing previous certificates.
1837        for batch_certificate in missing_previous_certificates {
1838            // Check if the missing previous certificate is valid. This is only
1839            // needed if we are processing an incoming batch header from a peer.
1840            // For incoming certificates, validity is assured by checking the
1841            // root certificate in `process_batch_certificate_from_peer`.
1842            if CHECK_PREVIOUS_CERTIFICATES {
1843                self.storage.check_incoming_certificate(&batch_certificate)?;
1844            }
1845            // Store the batch certificate (recursively fetching any missing previous certificates).
1846            self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?;
1847        }
1848        Ok(missing_transmissions)
1849    }
1850
1851    /// Fetches any missing transmissions for the specified batch header.
1852    /// If a transmission does not exist, it will be fetched from the specified peer IP.
1853    async fn fetch_missing_transmissions(
1854        &self,
1855        peer_ip: SocketAddr,
1856        batch_header: &BatchHeader<N>,
1857    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1858        // If the round is <= the GC round, return early.
1859        if batch_header.round() <= self.storage.gc_round() {
1860            return Ok(Default::default());
1861        }
1862
1863        // Ensure this batch ID is new, otherwise return early.
1864        if self.storage.contains_batch(batch_header.batch_id()) {
1865            trace!("Batch for round {} from peer has already been processed", batch_header.round());
1866            return Ok(Default::default());
1867        }
1868
1869        // Retrieve the workers.
1870        let workers = self.workers.clone();
1871
1872        // Initialize a list for the transmissions.
1873        let mut fetch_transmissions = FuturesUnordered::new();
1874
1875        // Retrieve the number of workers.
1876        let num_workers = self.num_workers();
1877        // Iterate through the transmission IDs.
1878        for transmission_id in batch_header.transmission_ids() {
1879            // If the transmission does not exist in storage, proceed to fetch the transmission.
1880            if !self.storage.contains_transmission(*transmission_id) {
1881                // Determine the worker ID.
1882                let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1883                    bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1884                };
1885                // Retrieve the worker.
1886                let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1887                // Push the callback onto the list.
1888                fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
1889            }
1890        }
1891
1892        // Initialize a set for the transmissions.
1893        let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1894        // Wait for all of the transmissions to be fetched.
1895        while let Some(result) = fetch_transmissions.next().await {
1896            // Retrieve the transmission.
1897            let (transmission_id, transmission) = result?;
1898            // Insert the transmission into the set.
1899            transmissions.insert(transmission_id, transmission);
1900        }
1901        // Return the transmissions.
1902        Ok(transmissions)
1903    }
1904
1905    /// Fetches any missing previous certificates for the specified batch header from the specified peer.
1906    async fn fetch_missing_previous_certificates(
1907        &self,
1908        peer_ip: SocketAddr,
1909        batch_header: &BatchHeader<N>,
1910    ) -> Result<HashSet<BatchCertificate<N>>> {
1911        // Retrieve the round.
1912        let round = batch_header.round();
1913        // If the previous round is 0, or is <= the GC round, return early.
1914        if round == 1 || round <= self.storage.gc_round() + 1 {
1915            return Ok(Default::default());
1916        }
1917
1918        // Fetch the missing previous certificates.
1919        let missing_previous_certificates =
1920            self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
1921        if !missing_previous_certificates.is_empty() {
1922            debug!(
1923                "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
1924                missing_previous_certificates.len(),
1925            );
1926        }
1927        // Return the missing previous certificates.
1928        Ok(missing_previous_certificates)
1929    }
1930
1931    /// Fetches any missing certificates for the specified batch header from the specified peer.
1932    async fn fetch_missing_certificates(
1933        &self,
1934        peer_ip: SocketAddr,
1935        round: u64,
1936        certificate_ids: &IndexSet<Field<N>>,
1937    ) -> Result<HashSet<BatchCertificate<N>>> {
1938        // Initialize a list for the missing certificates.
1939        let mut fetch_certificates = FuturesUnordered::new();
1940        // Initialize a set for the missing certificates.
1941        let mut missing_certificates = HashSet::default();
1942        // Iterate through the certificate IDs.
1943        for certificate_id in certificate_ids {
1944            // Check if the certificate already exists in the ledger.
1945            if self.ledger.contains_certificate(certificate_id)? {
1946                continue;
1947            }
1948            // Check if the certificate already exists in storage.
1949            if self.storage.contains_certificate(*certificate_id) {
1950                continue;
1951            }
1952            // If we have not fully processed the certificate yet, store it.
1953            if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
1954                missing_certificates.insert(certificate);
1955            } else {
1956                // If we do not have the certificate, request it.
1957                trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
1958                // TODO (howardwu): Limit the number of open requests we send to a peer.
1959                // Send an certificate request to the peer.
1960                fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
1961            }
1962        }
1963
1964        // If there are no certificates to fetch, return early with the existing unprocessed certificates.
1965        match fetch_certificates.is_empty() {
1966            true => return Ok(missing_certificates),
1967            false => trace!(
1968                "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
1969                fetch_certificates.len(),
1970            ),
1971        }
1972
1973        // Wait for all of the missing certificates to be fetched.
1974        while let Some(result) = fetch_certificates.next().await {
1975            // Insert the missing certificate into the set.
1976            missing_certificates.insert(result?);
1977        }
1978        // Return the missing certificates.
1979        Ok(missing_certificates)
1980    }
1981}
1982
1983impl<N: Network> Primary<N> {
1984    /// Spawns a task with the given future; it should only be used for long-running tasks.
1985    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1986        self.handles.lock().push(tokio::spawn(future));
1987    }
1988
1989    /// Shuts down the primary.
1990    pub async fn shut_down(&self) {
1991        info!("Shutting down the primary...");
1992        // Shut down the workers.
1993        self.workers.iter().for_each(|worker| worker.shut_down());
1994        // Abort the tasks.
1995        self.handles.lock().iter().for_each(|handle| handle.abort());
1996        // Save the current proposal cache to disk.
1997        let proposal_cache = {
1998            let proposal = self.proposed_batch.write().take();
1999            let signed_proposals = self.signed_proposals.read().clone();
2000            let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
2001            let pending_certificates = self.storage.get_pending_certificates();
2002            ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
2003        };
2004        if let Err(err) = proposal_cache.store(&self.node_data_dir) {
2005            error!("{}", flatten_error(err.context("Failed to store the current proposal cache")));
2006        }
2007        // Close the gateway.
2008        self.gateway.shut_down().await;
2009    }
2010}
2011
2012#[cfg(test)]
2013mod tests {
2014    use super::*;
2015    use snarkos_node_bft_ledger_service::MockLedgerService;
2016    use snarkos_node_bft_storage_service::BFTMemoryService;
2017    use snarkos_node_sync::{BlockSync, locators::test_helpers::sample_block_locators};
2018    use snarkvm::{
2019        ledger::{
2020            committee::{Committee, MIN_VALIDATOR_STAKE},
2021            test_helpers::sample_execution_transaction_with_fee,
2022        },
2023        prelude::{Address, Signature},
2024    };
2025
2026    use bytes::Bytes;
2027    use indexmap::IndexSet;
2028    use rand::RngCore;
2029
2030    type CurrentNetwork = snarkvm::prelude::MainnetV0;
2031
2032    fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) {
2033        // Create a committee containing the primary's account.
2034        const COMMITTEE_SIZE: usize = 4;
2035        let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
2036        let mut members = IndexMap::new();
2037
2038        for i in 0..COMMITTEE_SIZE {
2039            let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
2040            let account = Account::new(rng).unwrap();
2041
2042            members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
2043            accounts.push((socket_addr, account));
2044        }
2045
2046        (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
2047    }
2048
2049    // Returns a primary and a list of accounts in the configured committee.
2050    fn primary_with_committee(
2051        account_index: usize,
2052        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2053        committee: Committee<CurrentNetwork>,
2054        height: u32,
2055    ) -> Primary<CurrentNetwork> {
2056        let ledger = Arc::new(MockLedgerService::new_at_height(committee, height));
2057        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
2058
2059        // Initialize the primary.
2060        let account = accounts[account_index].1.clone();
2061        let block_sync = Arc::new(BlockSync::new(ledger.clone()));
2062        let mut primary =
2063            Primary::new(account, storage, ledger, block_sync, None, &[], false, NodeDataDir::new_test(None), None)
2064                .unwrap();
2065
2066        // Construct a worker instance.
2067        primary.workers = Arc::from([Worker::new(
2068            0, // id
2069            Arc::new(primary.gateway.clone()),
2070            primary.storage.clone(),
2071            primary.ledger.clone(),
2072            primary.proposed_batch.clone(),
2073        )
2074        .unwrap()]);
2075        for a in accounts.iter().skip(account_index) {
2076            primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
2077        }
2078
2079        primary
2080    }
2081
2082    fn primary_without_handlers(
2083        rng: &mut TestRng,
2084    ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
2085        let (accounts, committee) = sample_committee(rng);
2086        let primary = primary_with_committee(
2087            0, // index of primary's account
2088            &accounts,
2089            committee,
2090            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(),
2091        );
2092
2093        (primary, accounts)
2094    }
2095
2096    // Creates a mock solution.
2097    fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
2098        // Sample a random fake solution ID.
2099        let solution_id = rng.r#gen::<u64>().into();
2100        // Vary the size of the solutions.
2101        let size = rng.gen_range(1024..10 * 1024);
2102        // Sample random fake solution bytes.
2103        let mut vec = vec![0u8; size];
2104        rng.fill_bytes(&mut vec);
2105        let solution = Data::Buffer(Bytes::from(vec));
2106        // Return the solution ID and solution.
2107        (solution_id, solution)
2108    }
2109
2110    // Samples a test transaction.
2111    fn sample_unconfirmed_transaction(
2112        rng: &mut TestRng,
2113    ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
2114        let transaction = sample_execution_transaction_with_fee(false, rng, 0);
2115        let id = transaction.id();
2116
2117        (id, Data::Object(transaction))
2118    }
2119
2120    // Creates a batch proposal with one solution and one transaction.
2121    fn create_test_proposal(
2122        author: &Account<CurrentNetwork>,
2123        committee: Committee<CurrentNetwork>,
2124        round: u64,
2125        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2126        timestamp: i64,
2127        num_transactions: u64,
2128        rng: &mut TestRng,
2129    ) -> Proposal<CurrentNetwork> {
2130        let mut transmission_ids = IndexSet::new();
2131        let mut transmissions = IndexMap::new();
2132
2133        // Prepare the solution and insert into the sets.
2134        let (solution_id, solution) = sample_unconfirmed_solution(rng);
2135        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2136        let solution_transmission_id = (solution_id, solution_checksum).into();
2137        transmission_ids.insert(solution_transmission_id);
2138        transmissions.insert(solution_transmission_id, Transmission::Solution(solution));
2139
2140        // Prepare the transactions and insert into the sets.
2141        for _ in 0..num_transactions {
2142            let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2143            let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2144            let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2145            transmission_ids.insert(transaction_transmission_id);
2146            transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction));
2147        }
2148
2149        // Retrieve the private key.
2150        let private_key = author.private_key();
2151        // Sign the batch header.
2152        let batch_header = BatchHeader::new(
2153            private_key,
2154            round,
2155            timestamp,
2156            committee.id(),
2157            transmission_ids,
2158            previous_certificate_ids,
2159            rng,
2160        )
2161        .unwrap();
2162        // Construct the proposal.
2163        Proposal::new(committee, batch_header, transmissions).unwrap()
2164    }
2165
2166    // Creates a signature of the primary's current proposal for each committee member (excluding
2167    // the primary).
2168    fn peer_signatures_for_proposal(
2169        primary: &Primary<CurrentNetwork>,
2170        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2171        rng: &mut TestRng,
2172    ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
2173        // Each committee member signs the batch.
2174        let mut signatures = Vec::with_capacity(accounts.len() - 1);
2175        for (socket_addr, account) in accounts {
2176            if account.address() == primary.gateway.account().address() {
2177                continue;
2178            }
2179            let batch_id = primary.proposed_batch.read().as_ref().unwrap().batch_id();
2180            let signature = account.sign(&[batch_id], rng).unwrap();
2181            signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
2182        }
2183
2184        signatures
2185    }
2186
2187    /// Creates a signature of the batch ID for each committee member (excluding the primary).
2188    fn peer_signatures_for_batch(
2189        primary_address: Address<CurrentNetwork>,
2190        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2191        batch_id: Field<CurrentNetwork>,
2192        rng: &mut TestRng,
2193    ) -> IndexSet<Signature<CurrentNetwork>> {
2194        let mut signatures = IndexSet::new();
2195        for (_, account) in accounts {
2196            if account.address() == primary_address {
2197                continue;
2198            }
2199            let signature = account.sign(&[batch_id], rng).unwrap();
2200            signatures.insert(signature);
2201        }
2202        signatures
2203    }
2204
2205    // Creates a batch certificate.
2206    fn create_batch_certificate(
2207        primary_address: Address<CurrentNetwork>,
2208        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2209        round: u64,
2210        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2211        rng: &mut TestRng,
2212    ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
2213        let timestamp = now();
2214
2215        let author =
2216            accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
2217        let private_key = author.private_key();
2218
2219        let committee_id = Field::rand(rng);
2220        let (solution_id, solution) = sample_unconfirmed_solution(rng);
2221        let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2222        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2223        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2224
2225        let solution_transmission_id = (solution_id, solution_checksum).into();
2226        let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2227
2228        let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
2229        let transmissions = [
2230            (solution_transmission_id, Transmission::Solution(solution)),
2231            (transaction_transmission_id, Transmission::Transaction(transaction)),
2232        ]
2233        .into();
2234
2235        let batch_header = BatchHeader::new(
2236            private_key,
2237            round,
2238            timestamp,
2239            committee_id,
2240            transmission_ids,
2241            previous_certificate_ids,
2242            rng,
2243        )
2244        .unwrap();
2245        let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
2246        let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
2247        (certificate, transmissions)
2248    }
2249
2250    // Create a certificate chain up to, but not including, the specified round in the primary storage.
2251    fn store_certificate_chain(
2252        primary: &Primary<CurrentNetwork>,
2253        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2254        round: u64,
2255        rng: &mut TestRng,
2256    ) -> IndexSet<Field<CurrentNetwork>> {
2257        let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2258        let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2259        for cur_round in 1..round {
2260            for (_, account) in accounts.iter() {
2261                let (certificate, transmissions) = create_batch_certificate(
2262                    account.address(),
2263                    accounts,
2264                    cur_round,
2265                    previous_certificates.clone(),
2266                    rng,
2267                );
2268                next_certificates.insert(certificate.id());
2269                assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
2270            }
2271
2272            assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
2273            previous_certificates = next_certificates;
2274            next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2275        }
2276
2277        previous_certificates
2278    }
2279
2280    // Insert the account socket addresses into the resolver so that
2281    // they are recognized as "connected".
2282    fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
2283        // First account is primary, which doesn't need to resolve.
2284        for (addr, acct) in accounts.iter().skip(1) {
2285            primary.gateway.resolver().write().insert_peer(*addr, *addr, Some(acct.address()));
2286        }
2287    }
2288
2289    #[tokio::test]
2290    async fn test_propose_batch() {
2291        let mut rng = TestRng::default();
2292        let (primary, _) = primary_without_handlers(&mut rng);
2293
2294        // Check there is no batch currently proposed.
2295        assert!(primary.proposed_batch.read().is_none());
2296
2297        // Generate a solution and a transaction.
2298        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2299        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2300
2301        // Store it on one of the workers.
2302        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2303        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2304
2305        // Try to propose a batch again. This time, it should succeed.
2306        assert!(primary.propose_batch().await.is_ok());
2307        assert!(primary.proposed_batch.read().is_some());
2308    }
2309
2310    #[tokio::test]
2311    async fn test_propose_batch_with_no_transmissions() {
2312        let mut rng = TestRng::default();
2313        let (primary, _) = primary_without_handlers(&mut rng);
2314
2315        // Check there is no batch currently proposed.
2316        assert!(primary.proposed_batch.read().is_none());
2317
2318        // Try to propose a batch with no transmissions.
2319        assert!(primary.propose_batch().await.is_ok());
2320        assert!(primary.proposed_batch.read().is_some());
2321    }
2322
2323    #[tokio::test]
2324    async fn test_propose_batch_in_round() {
2325        let round = 3;
2326        let mut rng = TestRng::default();
2327        let (primary, accounts) = primary_without_handlers(&mut rng);
2328
2329        // Fill primary storage.
2330        store_certificate_chain(&primary, &accounts, round, &mut rng);
2331
2332        // Sleep for a while to ensure the primary is ready to propose the next round.
2333        tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2334
2335        // Generate a solution and a transaction.
2336        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2337        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2338
2339        // Store it on one of the workers.
2340        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2341        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2342
2343        // Propose a batch again. This time, it should succeed.
2344        assert!(primary.propose_batch().await.is_ok());
2345        assert!(primary.proposed_batch.read().is_some());
2346    }
2347
2348    #[tokio::test]
2349    async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2350        let round = 3;
2351        let prev_round = round - 1;
2352        let mut rng = TestRng::default();
2353        let (primary, accounts) = primary_without_handlers(&mut rng);
2354        let peer_account = &accounts[1];
2355        let peer_ip = peer_account.0;
2356
2357        // Fill primary storage.
2358        store_certificate_chain(&primary, &accounts, round, &mut rng);
2359
2360        // Get transmissions from previous certificates.
2361        let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2362
2363        // Track the number of transmissions in the previous round.
2364        let mut num_transmissions_in_previous_round = 0;
2365
2366        // Generate a solution and a transaction.
2367        let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2368        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2369        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2370        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2371
2372        // Store it on one of the workers.
2373        primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2374        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2375
2376        // Check that the worker has 2 transmissions.
2377        assert_eq!(primary.workers[0].num_transmissions(), 2);
2378
2379        // Create certificates for the current round and add the transmissions to the worker before inserting the certificate to storage.
2380        for (_, account) in accounts.iter() {
2381            let (certificate, transmissions) = create_batch_certificate(
2382                account.address(),
2383                &accounts,
2384                round,
2385                previous_certificate_ids.clone(),
2386                &mut rng,
2387            );
2388
2389            // Add the transmissions to the worker.
2390            for (transmission_id, transmission) in transmissions.iter() {
2391                primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2392            }
2393
2394            // Insert the certificate to storage.
2395            num_transmissions_in_previous_round += transmissions.len();
2396            primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2397        }
2398
2399        // Sleep for a while to ensure the primary is ready to propose the next round.
2400        tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2401
2402        // Advance to the next round.
2403        assert!(primary.storage.increment_to_next_round(round).is_ok());
2404
2405        // Check that the worker has `num_transmissions_in_previous_round + 2` transmissions.
2406        assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2407
2408        // Propose the batch.
2409        assert!(primary.propose_batch().await.is_ok());
2410
2411        // Check that the proposal only contains the new transmissions that were not in previous certificates.
2412        let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone();
2413        assert_eq!(proposed_transmissions.len(), 2);
2414        assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2415        assert!(
2416            proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))
2417        );
2418    }
2419
2420    #[tokio::test]
2421    async fn test_propose_batch_over_spend_limit() {
2422        let mut rng = TestRng::default();
2423
2424        // Create a primary to test spend limit backwards compatibility with V4.
2425        let (accounts, committee) = sample_committee(&mut rng);
2426        let primary = primary_with_committee(
2427            0,
2428            &accounts,
2429            committee.clone(),
2430            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2431        );
2432
2433        // Check there is no batch currently proposed.
2434        assert!(primary.proposed_batch.read().is_none());
2435        // Check the workers are empty.
2436        primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty()));
2437
2438        // Generate a solution and transactions.
2439        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2440        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2441
2442        for _i in 0..5 {
2443            let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2444            // Store it on one of the workers.
2445            primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2446        }
2447
2448        // Try to propose a batch again. This time, it should succeed.
2449        assert!(primary.propose_batch().await.is_ok());
2450        // Expect 2/5 transactions to be included in the proposal in addition to the solution.
2451        assert_eq!(primary.proposed_batch.read().as_ref().unwrap().transmissions().len(), 3);
2452        // Check the transmissions were correctly drained from the workers.
2453        assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3);
2454    }
2455
2456    #[tokio::test]
2457    async fn test_batch_propose_from_peer() {
2458        let mut rng = TestRng::default();
2459        let (primary, accounts) = primary_without_handlers(&mut rng);
2460
2461        // Create a valid proposal with an author that isn't the primary.
2462        let round = 1;
2463        let peer_account = &accounts[1];
2464        let peer_ip = peer_account.0;
2465        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2466        let proposal = create_test_proposal(
2467            &peer_account.1,
2468            primary.ledger.current_committee().unwrap(),
2469            round,
2470            Default::default(),
2471            timestamp,
2472            1,
2473            &mut rng,
2474        );
2475
2476        // Make sure the primary is aware of the transmissions in the proposal.
2477        for (transmission_id, transmission) in proposal.transmissions() {
2478            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2479        }
2480
2481        // The author must be known to resolver to pass propose checks.
2482        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2483
2484        // The primary will only consider itself synced if we received
2485        // block locators from a peer.
2486        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2487        primary.sync.testing_only_try_block_sync_testing_only().await;
2488
2489        // Try to process the batch proposal from the peer, should succeed.
2490        assert!(
2491            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
2492        );
2493    }
2494
2495    #[tokio::test]
2496    async fn test_batch_propose_from_peer_when_not_synced() {
2497        let mut rng = TestRng::default();
2498        let (primary, accounts) = primary_without_handlers(&mut rng);
2499
2500        // Create a valid proposal with an author that isn't the primary.
2501        let round = 1;
2502        let peer_account = &accounts[1];
2503        let peer_ip = peer_account.0;
2504        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2505        let proposal = create_test_proposal(
2506            &peer_account.1,
2507            primary.ledger.current_committee().unwrap(),
2508            round,
2509            Default::default(),
2510            timestamp,
2511            1,
2512            &mut rng,
2513        );
2514
2515        // Make sure the primary is aware of the transmissions in the proposal.
2516        for (transmission_id, transmission) in proposal.transmissions() {
2517            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2518        }
2519
2520        // The author must be known to resolver to pass propose checks.
2521        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2522
2523        // Add a high block locator to indicate we are not synced.
2524        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(20)).unwrap();
2525
2526        // Try to process the batch proposal from the peer, should fail
2527        assert!(
2528            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2529        );
2530    }
2531
2532    #[tokio::test]
2533    async fn test_batch_propose_from_peer_in_round() {
2534        let round = 2;
2535        let mut rng = TestRng::default();
2536        let (primary, accounts) = primary_without_handlers(&mut rng);
2537
2538        // Generate certificates.
2539        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2540
2541        // Create a valid proposal with an author that isn't the primary.
2542        let peer_account = &accounts[1];
2543        let peer_ip = peer_account.0;
2544        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2545        let proposal = create_test_proposal(
2546            &peer_account.1,
2547            primary.ledger.current_committee().unwrap(),
2548            round,
2549            previous_certificates,
2550            timestamp,
2551            1,
2552            &mut rng,
2553        );
2554
2555        // Make sure the primary is aware of the transmissions in the proposal.
2556        for (transmission_id, transmission) in proposal.transmissions() {
2557            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2558        }
2559
2560        // The author must be known to resolver to pass propose checks.
2561        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2562
2563        // The primary will only consider itself synced if we received
2564        // block locators from a peer.
2565        primary.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2566        primary.sync.testing_only_try_block_sync_testing_only().await;
2567
2568        // Try to process the batch proposal from the peer, should succeed.
2569        primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2570    }
2571
2572    #[tokio::test]
2573    async fn test_batch_propose_from_peer_wrong_round() {
2574        let mut rng = TestRng::default();
2575        let (primary, accounts) = primary_without_handlers(&mut rng);
2576
2577        // Create a valid proposal with an author that isn't the primary.
2578        let round = 1;
2579        let peer_account = &accounts[1];
2580        let peer_ip = peer_account.0;
2581        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2582        let proposal = create_test_proposal(
2583            &peer_account.1,
2584            primary.ledger.current_committee().unwrap(),
2585            round,
2586            Default::default(),
2587            timestamp,
2588            1,
2589            &mut rng,
2590        );
2591
2592        // Make sure the primary is aware of the transmissions in the proposal.
2593        for (transmission_id, transmission) in proposal.transmissions() {
2594            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2595        }
2596
2597        // The author must be known to resolver to pass propose checks.
2598        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2599        // The primary must be considered synced.
2600        primary.sync.testing_only_try_block_sync_testing_only().await;
2601
2602        // Try to process the batch proposal from the peer, should error.
2603        assert!(
2604            primary
2605                .process_batch_propose_from_peer(peer_ip, BatchPropose {
2606                    round: round + 1,
2607                    batch_header: Data::Object(proposal.batch_header().clone())
2608                })
2609                .await
2610                .is_err()
2611        );
2612    }
2613
2614    #[tokio::test]
2615    async fn test_batch_propose_from_peer_in_round_wrong_round() {
2616        let round = 4;
2617        let mut rng = TestRng::default();
2618        let (primary, accounts) = primary_without_handlers(&mut rng);
2619
2620        // Generate certificates.
2621        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2622
2623        // Create a valid proposal with an author that isn't the primary.
2624        let peer_account = &accounts[1];
2625        let peer_ip = peer_account.0;
2626        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2627        let proposal = create_test_proposal(
2628            &peer_account.1,
2629            primary.ledger.current_committee().unwrap(),
2630            round,
2631            previous_certificates,
2632            timestamp,
2633            1,
2634            &mut rng,
2635        );
2636
2637        // Make sure the primary is aware of the transmissions in the proposal.
2638        for (transmission_id, transmission) in proposal.transmissions() {
2639            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2640        }
2641
2642        // The author must be known to resolver to pass propose checks.
2643        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2644        // The primary must be considered synced.
2645        primary.sync.testing_only_try_block_sync_testing_only().await;
2646
2647        // Try to process the batch proposal from the peer, should error.
2648        assert!(
2649            primary
2650                .process_batch_propose_from_peer(peer_ip, BatchPropose {
2651                    round: round + 1,
2652                    batch_header: Data::Object(proposal.batch_header().clone())
2653                })
2654                .await
2655                .is_err()
2656        );
2657    }
2658
2659    /// Tests that the minimum batch delay is enforced as expected, i.e., that proposals with timestamps that are too close to the previous proposal are rejected.
2660    #[tokio::test]
2661    async fn test_batch_propose_from_peer_with_past_timestamp() {
2662        let round = 2;
2663        let mut rng = TestRng::default();
2664        let (primary, accounts) = primary_without_handlers(&mut rng);
2665
2666        // Generate certificates.
2667        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2668
2669        // Create a valid proposal with an author that isn't the primary.
2670        let peer_account = &accounts[1];
2671        let peer_ip = peer_account.0;
2672
2673        // Use a timestamp that is too early.
2674        // Set it to something that is less than the minimum batch delay
2675        // Note, that the minimum delay is currently 1, so this will be equal to the last timestamp
2676        let last_timestamp = primary
2677            .storage
2678            .get_certificate_for_round_with_author(round - 1, peer_account.1.address())
2679            .expect("No previous proposal exists")
2680            .timestamp();
2681        let invalid_timestamp = last_timestamp + (MIN_BATCH_DELAY_IN_SECS as i64) - 1;
2682
2683        let proposal = create_test_proposal(
2684            &peer_account.1,
2685            primary.ledger.current_committee().unwrap(),
2686            round,
2687            previous_certificates,
2688            invalid_timestamp,
2689            1,
2690            &mut rng,
2691        );
2692
2693        // Make sure the primary is aware of the transmissions in the proposal.
2694        for (transmission_id, transmission) in proposal.transmissions() {
2695            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2696        }
2697
2698        // The author must be known to resolver to pass propose checks.
2699        primary.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2700        // The primary must be considered synced.
2701        primary.sync.testing_only_try_block_sync_testing_only().await;
2702
2703        // Try to process the batch proposal from the peer, should error.
2704        assert!(
2705            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2706        );
2707    }
2708
2709    /// Check that proposals rejected that have timestamps older than the previous proposal.
2710    #[tokio::test]
2711    async fn test_batch_propose_from_peer_over_spend_limit() {
2712        let mut rng = TestRng::default();
2713
2714        // Create two primaries to test spend limit activation on V5.
2715        let (accounts, committee) = sample_committee(&mut rng);
2716        let primary_v4 = primary_with_committee(
2717            0,
2718            &accounts,
2719            committee.clone(),
2720            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2721        );
2722        let primary_v5 = primary_with_committee(
2723            1,
2724            &accounts,
2725            committee.clone(),
2726            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V5).unwrap(),
2727        );
2728
2729        // Create a valid proposal with an author that isn't the primary.
2730        let round = 1;
2731        let peer_account = &accounts[2];
2732        let peer_ip = peer_account.0;
2733
2734        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2735
2736        let proposal =
2737            create_test_proposal(&peer_account.1, committee, round, Default::default(), timestamp, 4, &mut rng);
2738
2739        // Make sure the primary is aware of the transmissions in the proposal.
2740        for (transmission_id, transmission) in proposal.transmissions() {
2741            primary_v4.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2742            primary_v5.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2743        }
2744
2745        // The author must be known to resolver to pass propose checks.
2746        primary_v4.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2747        primary_v5.gateway.resolver().write().insert_peer(peer_ip, peer_ip, Some(peer_account.1.address()));
2748
2749        // primary v4 must be considered synced.
2750        primary_v4.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2751        primary_v4.sync.testing_only_try_block_sync_testing_only().await;
2752
2753        // primary v5 must be ocnsidered synced.
2754        primary_v5.sync.testing_only_update_peer_locators_testing_only(peer_ip, sample_block_locators(0)).unwrap();
2755        primary_v5.sync.testing_only_try_block_sync_testing_only().await;
2756
2757        // Check the spend limit is enforced from V5 onwards.
2758        assert!(
2759            primary_v4
2760                .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2761                .await
2762                .is_ok()
2763        );
2764
2765        assert!(
2766            primary_v5
2767                .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2768                .await
2769                .is_err()
2770        );
2771    }
2772
2773    #[tokio::test]
2774    async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2775        let round = 3;
2776        let mut rng = TestRng::default();
2777        let (primary, _) = primary_without_handlers(&mut rng);
2778
2779        // Check there is no batch currently proposed.
2780        assert!(primary.proposed_batch.read().is_none());
2781
2782        // Generate a solution and a transaction.
2783        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2784        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2785
2786        // Store it on one of the workers.
2787        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2788        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2789
2790        // Set the proposal lock to a round ahead of the storage.
2791        let old_proposal_lock_round = *primary.propose_lock.lock().await;
2792        *primary.propose_lock.lock().await = round + 1;
2793
2794        // Propose a batch and enforce that it fails.
2795        assert!(primary.propose_batch().await.is_ok());
2796        assert!(primary.proposed_batch.read().is_none());
2797
2798        // Set the proposal lock back to the old round.
2799        *primary.propose_lock.lock().await = old_proposal_lock_round;
2800
2801        // Try to propose a batch again. This time, it should succeed.
2802        assert!(primary.propose_batch().await.is_ok());
2803        assert!(primary.proposed_batch.read().is_some());
2804    }
2805
2806    #[tokio::test]
2807    async fn test_propose_batch_with_storage_round_behind_proposal() {
2808        let round = 5;
2809        let mut rng = TestRng::default();
2810        let (primary, accounts) = primary_without_handlers(&mut rng);
2811
2812        // Generate previous certificates.
2813        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2814
2815        // Create a valid proposal.
2816        let timestamp = now();
2817        let proposal = create_test_proposal(
2818            primary.gateway.account(),
2819            primary.ledger.current_committee().unwrap(),
2820            round + 1,
2821            previous_certificates,
2822            timestamp,
2823            1,
2824            &mut rng,
2825        );
2826
2827        // Store the proposal on the primary.
2828        *primary.proposed_batch.write() = Some(proposal);
2829
2830        // Try to propose a batch will terminate early because the storage is behind the proposal.
2831        assert!(primary.propose_batch().await.is_ok());
2832        assert!(primary.proposed_batch.read().is_some());
2833        assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
2834    }
2835
2836    #[tokio::test(flavor = "multi_thread")]
2837    async fn test_batch_signature_from_peer() {
2838        let mut rng = TestRng::default();
2839        let (primary, accounts) = primary_without_handlers(&mut rng);
2840        map_account_addresses(&primary, &accounts);
2841
2842        // Create a valid proposal.
2843        let round = 1;
2844        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2845        let proposal = create_test_proposal(
2846            primary.gateway.account(),
2847            primary.ledger.current_committee().unwrap(),
2848            round,
2849            Default::default(),
2850            timestamp,
2851            1,
2852            &mut rng,
2853        );
2854
2855        // Store the proposal on the primary.
2856        *primary.proposed_batch.write() = Some(proposal);
2857
2858        // Each committee member signs the batch.
2859        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2860
2861        // Have the primary process the signatures.
2862        for (socket_addr, signature) in signatures {
2863            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2864        }
2865
2866        // Check the certificate was created and stored by the primary.
2867        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2868        // Check the round was incremented.
2869        assert_eq!(primary.current_round(), round + 1);
2870    }
2871
2872    #[tokio::test(flavor = "multi_thread")]
2873    async fn test_batch_signature_from_peer_in_round() {
2874        let round = 5;
2875        let mut rng = TestRng::default();
2876        let (primary, accounts) = primary_without_handlers(&mut rng);
2877        map_account_addresses(&primary, &accounts);
2878
2879        // Generate certificates.
2880        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2881
2882        // Create a valid proposal.
2883        let timestamp = now();
2884        let proposal = create_test_proposal(
2885            primary.gateway.account(),
2886            primary.ledger.current_committee().unwrap(),
2887            round,
2888            previous_certificates,
2889            timestamp,
2890            1,
2891            &mut rng,
2892        );
2893
2894        // Store the proposal on the primary.
2895        *primary.proposed_batch.write() = Some(proposal);
2896
2897        // Each committee member signs the batch.
2898        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2899
2900        // Have the primary process the signatures.
2901        for (socket_addr, signature) in signatures {
2902            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2903        }
2904
2905        // Check the certificate was created and stored by the primary.
2906        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2907        // Check the round was incremented.
2908        assert_eq!(primary.current_round(), round + 1);
2909    }
2910
2911    #[tokio::test]
2912    async fn test_batch_signature_from_peer_no_quorum() {
2913        let mut rng = TestRng::default();
2914        let (primary, accounts) = primary_without_handlers(&mut rng);
2915        map_account_addresses(&primary, &accounts);
2916
2917        // Create a valid proposal.
2918        let round = 1;
2919        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2920        let proposal = create_test_proposal(
2921            primary.gateway.account(),
2922            primary.ledger.current_committee().unwrap(),
2923            round,
2924            Default::default(),
2925            timestamp,
2926            1,
2927            &mut rng,
2928        );
2929
2930        // Store the proposal on the primary.
2931        *primary.proposed_batch.write() = Some(proposal);
2932
2933        // Each committee member signs the batch.
2934        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2935
2936        // Have the primary process only one signature, mimicking a lack of quorum.
2937        let (socket_addr, signature) = signatures.first().unwrap();
2938        primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2939
2940        // Check the certificate was not created and stored by the primary.
2941        assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2942        // Check the round was incremented.
2943        assert_eq!(primary.current_round(), round);
2944    }
2945
2946    #[tokio::test]
2947    async fn test_batch_signature_from_peer_in_round_no_quorum() {
2948        let round = 7;
2949        let mut rng = TestRng::default();
2950        let (primary, accounts) = primary_without_handlers(&mut rng);
2951        map_account_addresses(&primary, &accounts);
2952
2953        // Generate certificates.
2954        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2955
2956        // Create a valid proposal.
2957        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2958        let proposal = create_test_proposal(
2959            primary.gateway.account(),
2960            primary.ledger.current_committee().unwrap(),
2961            round,
2962            previous_certificates,
2963            timestamp,
2964            1,
2965            &mut rng,
2966        );
2967
2968        // Store the proposal on the primary.
2969        *primary.proposed_batch.write() = Some(proposal);
2970
2971        // Each committee member signs the batch.
2972        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2973
2974        // Have the primary process only one signature, mimicking a lack of quorum.
2975        let (socket_addr, signature) = signatures.first().unwrap();
2976        primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2977
2978        // Check the certificate was not created and stored by the primary.
2979        assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2980        // Check the round was incremented.
2981        assert_eq!(primary.current_round(), round);
2982    }
2983
2984    #[tokio::test]
2985    async fn test_insert_certificate_with_aborted_transmissions() {
2986        let round = 3;
2987        let prev_round = round - 1;
2988        let mut rng = TestRng::default();
2989        let (primary, accounts) = primary_without_handlers(&mut rng);
2990        let peer_account = &accounts[1];
2991        let peer_ip = peer_account.0;
2992
2993        // Fill primary storage.
2994        store_certificate_chain(&primary, &accounts, round, &mut rng);
2995
2996        // Get transmissions from previous certificates.
2997        let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2998
2999        // Generate a solution and a transaction.
3000        let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
3001        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
3002
3003        // Store it on one of the workers.
3004        primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
3005        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
3006
3007        // Check that the worker has 2 transmissions.
3008        assert_eq!(primary.workers[0].num_transmissions(), 2);
3009
3010        // Create certificates for the current round.
3011        let account = accounts[0].1.clone();
3012        let (certificate, transmissions) =
3013            create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
3014        let certificate_id = certificate.id();
3015
3016        // Randomly abort some of the transmissions.
3017        let mut aborted_transmissions = HashSet::new();
3018        let mut transmissions_without_aborted = HashMap::new();
3019        for (transmission_id, transmission) in transmissions.clone() {
3020            match rng.r#gen::<bool>() || aborted_transmissions.is_empty() {
3021                true => {
3022                    // Insert the aborted transmission.
3023                    aborted_transmissions.insert(transmission_id);
3024                }
3025                false => {
3026                    // Insert the transmission without the aborted transmission.
3027                    transmissions_without_aborted.insert(transmission_id, transmission);
3028                }
3029            };
3030        }
3031
3032        // Add the non-aborted transmissions to the worker.
3033        for (transmission_id, transmission) in transmissions_without_aborted.iter() {
3034            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
3035        }
3036
3037        // Check that inserting the transmission with missing transmissions fails.
3038        assert!(
3039            primary
3040                .storage
3041                .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
3042                .is_err()
3043        );
3044        assert!(
3045            primary
3046                .storage
3047                .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
3048                .is_err()
3049        );
3050
3051        // Insert the certificate to storage.
3052        primary
3053            .storage
3054            .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
3055            .unwrap();
3056
3057        // Ensure the certificate exists in storage.
3058        assert!(primary.storage.contains_certificate(certificate_id));
3059        // Ensure that the aborted transmission IDs exist in storage.
3060        for aborted_transmission_id in aborted_transmissions {
3061            assert!(primary.storage.contains_transmission(aborted_transmission_id));
3062            assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
3063        }
3064    }
3065}