snarkos_node_bft/
primary.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Gateway,
18    MAX_BATCH_DELAY_IN_MS,
19    MAX_WORKERS,
20    MIN_BATCH_DELAY_IN_SECS,
21    PRIMARY_PING_IN_MS,
22    Sync,
23    Transport,
24    WORKER_PING_IN_MS,
25    Worker,
26    events::{BatchPropose, BatchSignature, Event},
27    helpers::{
28        BFTSender,
29        PrimaryReceiver,
30        PrimarySender,
31        Proposal,
32        ProposalCache,
33        SignedProposals,
34        Storage,
35        assign_to_worker,
36        assign_to_workers,
37        fmt_id,
38        init_sync_channels,
39        init_worker_channels,
40        now,
41    },
42    spawn_blocking,
43};
44use snarkos_account::Account;
45use snarkos_node_bft_events::PrimaryPing;
46use snarkos_node_bft_ledger_service::LedgerService;
47use snarkos_node_sync::DUMMY_SELF_IP;
48use snarkvm::{
49    console::{
50        prelude::*,
51        types::{Address, Field},
52    },
53    ledger::{
54        block::Transaction,
55        narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
56        puzzle::{Solution, SolutionID},
57    },
58    prelude::committee::Committee,
59};
60
61use aleo_std::StorageMode;
62use colored::Colorize;
63use futures::stream::{FuturesUnordered, StreamExt};
64use indexmap::{IndexMap, IndexSet};
65#[cfg(feature = "locktick")]
66use locktick::{
67    parking_lot::{Mutex, RwLock},
68    tokio::Mutex as TMutex,
69};
70#[cfg(not(feature = "locktick"))]
71use parking_lot::{Mutex, RwLock};
72use rayon::prelude::*;
73use std::{
74    collections::{HashMap, HashSet},
75    future::Future,
76    net::SocketAddr,
77    sync::Arc,
78    time::Duration,
79};
80#[cfg(not(feature = "locktick"))]
81use tokio::sync::Mutex as TMutex;
82use tokio::{sync::OnceCell, task::JoinHandle};
83
84/// A helper type for an optional proposed batch.
85pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
86
87#[derive(Clone)]
88pub struct Primary<N: Network> {
89    /// The sync module.
90    sync: Sync<N>,
91    /// The gateway.
92    gateway: Gateway<N>,
93    /// The storage.
94    storage: Storage<N>,
95    /// The ledger service.
96    ledger: Arc<dyn LedgerService<N>>,
97    /// The workers.
98    workers: Arc<[Worker<N>]>,
99    /// The BFT sender.
100    bft_sender: Arc<OnceCell<BFTSender<N>>>,
101    /// The batch proposal, if the primary is currently proposing a batch.
102    proposed_batch: Arc<ProposedBatch<N>>,
103    /// The timestamp of the most recent proposed batch.
104    latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
105    /// The recently-signed batch proposals.
106    signed_proposals: Arc<RwLock<SignedProposals<N>>>,
107    /// The spawned handles.
108    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
109    /// The lock for propose_batch.
110    propose_lock: Arc<TMutex<u64>>,
111    /// The storage mode of the node.
112    storage_mode: StorageMode,
113}
114
115impl<N: Network> Primary<N> {
116    /// The maximum number of unconfirmed transmissions to send to the primary.
117    pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
118
119    /// Initializes a new primary instance.
120    pub fn new(
121        account: Account<N>,
122        storage: Storage<N>,
123        ledger: Arc<dyn LedgerService<N>>,
124        ip: Option<SocketAddr>,
125        trusted_validators: &[SocketAddr],
126        storage_mode: StorageMode,
127    ) -> Result<Self> {
128        // Initialize the gateway.
129        let gateway =
130            Gateway::new(account, storage.clone(), ledger.clone(), ip, trusted_validators, storage_mode.dev())?;
131        // Initialize the sync module.
132        let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone());
133
134        // Initialize the primary instance.
135        Ok(Self {
136            sync,
137            gateway,
138            storage,
139            ledger,
140            workers: Arc::from(vec![]),
141            bft_sender: Default::default(),
142            proposed_batch: Default::default(),
143            latest_proposed_batch_timestamp: Default::default(),
144            signed_proposals: Default::default(),
145            handles: Default::default(),
146            propose_lock: Default::default(),
147            storage_mode,
148        })
149    }
150
151    /// Load the proposal cache file and update the Primary state with the stored data.
152    async fn load_proposal_cache(&self) -> Result<()> {
153        // Fetch the signed proposals from the file system if it exists.
154        match ProposalCache::<N>::exists(&self.storage_mode) {
155            // If the proposal cache exists, then process the proposal cache.
156            true => match ProposalCache::<N>::load(self.gateway.account().address(), &self.storage_mode) {
157                Ok(proposal_cache) => {
158                    // Extract the proposal and signed proposals.
159                    let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
160                        proposal_cache.into();
161
162                    // Write the proposed batch.
163                    *self.proposed_batch.write() = proposed_batch;
164                    // Write the signed proposals.
165                    *self.signed_proposals.write() = signed_proposals;
166                    // Writ the propose lock.
167                    *self.propose_lock.lock().await = latest_certificate_round;
168
169                    // Update the storage with the pending certificates.
170                    for certificate in pending_certificates {
171                        let batch_id = certificate.batch_id();
172                        // We use a dummy IP because the node should not need to request from any peers.
173                        // The storage should have stored all the transmissions. If not, we simply
174                        // skip the certificate.
175                        if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
176                        {
177                            warn!("Failed to load stored certificate {} from proposal cache - {err}", fmt_id(batch_id));
178                        }
179                    }
180                    Ok(())
181                }
182                Err(err) => {
183                    bail!("Failed to read the signed proposals from the file system - {err}.");
184                }
185            },
186            // If the proposal cache does not exist, then return early.
187            false => Ok(()),
188        }
189    }
190
191    /// Run the primary instance.
192    pub async fn run(
193        &mut self,
194        bft_sender: Option<BFTSender<N>>,
195        primary_sender: PrimarySender<N>,
196        primary_receiver: PrimaryReceiver<N>,
197    ) -> Result<()> {
198        info!("Starting the primary instance of the memory pool...");
199
200        // Set the BFT sender.
201        if let Some(bft_sender) = &bft_sender {
202            // Set the BFT sender in the primary.
203            self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set");
204        }
205
206        // Construct a map of the worker senders.
207        let mut worker_senders = IndexMap::new();
208        // Construct a map for the workers.
209        let mut workers = Vec::new();
210        // Initialize the workers.
211        for id in 0..MAX_WORKERS {
212            // Construct the worker channels.
213            let (tx_worker, rx_worker) = init_worker_channels();
214            // Construct the worker instance.
215            let worker = Worker::new(
216                id,
217                Arc::new(self.gateway.clone()),
218                self.storage.clone(),
219                self.ledger.clone(),
220                self.proposed_batch.clone(),
221            )?;
222            // Run the worker instance.
223            worker.run(rx_worker);
224            // Add the worker to the list of workers.
225            workers.push(worker);
226            // Add the worker sender to the map.
227            worker_senders.insert(id, tx_worker);
228        }
229        // Set the workers.
230        self.workers = Arc::from(workers);
231
232        // First, initialize the sync channels.
233        let (sync_sender, sync_receiver) = init_sync_channels();
234        // Next, initialize the sync module and sync the storage from ledger.
235        self.sync.initialize(bft_sender).await?;
236        // Next, load and process the proposal cache before running the sync module.
237        self.load_proposal_cache().await?;
238        // Next, run the sync module.
239        self.sync.run(sync_receiver).await?;
240        // Next, initialize the gateway.
241        self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
242        // Lastly, start the primary handlers.
243        // Note: This ensures the primary does not start communicating before syncing is complete.
244        self.start_handlers(primary_receiver);
245
246        Ok(())
247    }
248
249    /// Returns the current round.
250    pub fn current_round(&self) -> u64 {
251        self.storage.current_round()
252    }
253
254    /// Returns `true` if the primary is synced.
255    pub fn is_synced(&self) -> bool {
256        self.sync.is_synced()
257    }
258
259    /// Returns the gateway.
260    pub const fn gateway(&self) -> &Gateway<N> {
261        &self.gateway
262    }
263
264    /// Returns the storage.
265    pub const fn storage(&self) -> &Storage<N> {
266        &self.storage
267    }
268
269    /// Returns the ledger.
270    pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
271        &self.ledger
272    }
273
274    /// Returns the number of workers.
275    pub fn num_workers(&self) -> u8 {
276        u8::try_from(self.workers.len()).expect("Too many workers")
277    }
278
279    /// Returns the workers.
280    pub const fn workers(&self) -> &Arc<[Worker<N>]> {
281        &self.workers
282    }
283
284    /// Returns the batch proposal of our primary, if one currently exists.
285    pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
286        &self.proposed_batch
287    }
288}
289
290impl<N: Network> Primary<N> {
291    /// Returns the number of unconfirmed transmissions.
292    pub fn num_unconfirmed_transmissions(&self) -> usize {
293        self.workers.iter().map(|worker| worker.num_transmissions()).sum()
294    }
295
296    /// Returns the number of unconfirmed ratifications.
297    pub fn num_unconfirmed_ratifications(&self) -> usize {
298        self.workers.iter().map(|worker| worker.num_ratifications()).sum()
299    }
300
301    /// Returns the number of unconfirmed solutions.
302    pub fn num_unconfirmed_solutions(&self) -> usize {
303        self.workers.iter().map(|worker| worker.num_solutions()).sum()
304    }
305
306    /// Returns the number of unconfirmed transactions.
307    pub fn num_unconfirmed_transactions(&self) -> usize {
308        self.workers.iter().map(|worker| worker.num_transactions()).sum()
309    }
310}
311
312impl<N: Network> Primary<N> {
313    /// Returns the worker transmission IDs.
314    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
315        self.workers.iter().flat_map(|worker| worker.transmission_ids())
316    }
317
318    /// Returns the worker transmissions.
319    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
320        self.workers.iter().flat_map(|worker| worker.transmissions())
321    }
322
323    /// Returns the worker solutions.
324    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
325        self.workers.iter().flat_map(|worker| worker.solutions())
326    }
327
328    /// Returns the worker transactions.
329    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
330        self.workers.iter().flat_map(|worker| worker.transactions())
331    }
332}
333
334impl<N: Network> Primary<N> {
335    /// Clears the worker solutions.
336    pub fn clear_worker_solutions(&self) {
337        self.workers.iter().for_each(Worker::clear_solutions);
338    }
339}
340
341impl<N: Network> Primary<N> {
342    /// Proposes the batch for the current round.
343    ///
344    /// This method performs the following steps:
345    /// 1. Drain the workers.
346    /// 2. Sign the batch.
347    /// 3. Set the batch proposal in the primary.
348    /// 4. Broadcast the batch header to all validators for signing.
349    pub async fn propose_batch(&self) -> Result<()> {
350        // This function isn't re-entrant.
351        let mut lock_guard = self.propose_lock.lock().await;
352
353        // Check if the proposed batch has expired, and clear it if it has expired.
354        if let Err(e) = self.check_proposed_batch_for_expiration().await {
355            warn!("Failed to check the proposed batch for expiration - {e}");
356            return Ok(());
357        }
358
359        // Retrieve the current round.
360        let round = self.current_round();
361        // Compute the previous round.
362        let previous_round = round.saturating_sub(1);
363
364        // If the current round is 0, return early.
365        // This can actually never happen, because of the invariant that the current round is never 0
366        // (see [`StorageInner::current_round`]).
367        ensure!(round > 0, "Round 0 cannot have transaction batches");
368
369        // If the current storage round is below the latest proposal round, then return early.
370        if round < *lock_guard {
371            warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
372            return Ok(());
373        }
374
375        // If there is a batch being proposed already,
376        // rebroadcast the batch header to the non-signers, and return early.
377        if let Some(proposal) = self.proposed_batch.read().as_ref() {
378            // Ensure that the storage is caught up to the proposal before proceeding to rebroadcast this.
379            if round < proposal.round()
380                || proposal
381                    .batch_header()
382                    .previous_certificate_ids()
383                    .iter()
384                    .any(|id| !self.storage.contains_certificate(*id))
385            {
386                warn!(
387                    "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
388                    proposal.round(),
389                );
390                return Ok(());
391            }
392            // Construct the event.
393            // TODO(ljedrz): the BatchHeader should be serialized only once in advance before being sent to non-signers.
394            let event = Event::BatchPropose(proposal.batch_header().clone().into());
395            // Iterate through the non-signers.
396            for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
397                // Resolve the address to the peer IP.
398                match self.gateway.resolver().get_peer_ip_for_address(address) {
399                    // Resend the batch proposal to the validator for signing.
400                    Some(peer_ip) => {
401                        let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
402                        tokio::spawn(async move {
403                            debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
404                            // Resend the batch proposal to the peer.
405                            if gateway.send(peer_ip, event_).await.is_none() {
406                                warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
407                            }
408                        });
409                    }
410                    None => continue,
411                }
412            }
413            debug!("Proposed batch for round {} is still valid", proposal.round());
414            return Ok(());
415        }
416
417        #[cfg(feature = "metrics")]
418        metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
419
420        // Ensure that the primary does not create a new proposal too quickly.
421        if let Err(e) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) {
422            debug!("Primary is safely skipping a batch proposal for round {round} - {}", format!("{e}").dimmed());
423            return Ok(());
424        }
425
426        // Ensure the primary has not proposed a batch for this round before.
427        if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
428            // If a BFT sender was provided, attempt to advance the current round.
429            if let Some(bft_sender) = self.bft_sender.get() {
430                match bft_sender.send_primary_round_to_bft(self.current_round()).await {
431                    // 'is_ready' is true if the primary is ready to propose a batch for the next round.
432                    Ok(true) => (), // continue,
433                    // 'is_ready' is false if the primary is not ready to propose a batch for the next round.
434                    Ok(false) => return Ok(()),
435                    // An error occurred while attempting to advance the current round.
436                    Err(e) => {
437                        warn!("Failed to update the BFT to the next round - {e}");
438                        return Err(e);
439                    }
440                }
441            }
442            debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
443            return Ok(());
444        }
445
446        // Determine if the current round has been proposed.
447        // Note: Do NOT make this judgment in advance before rebroadcast and round update. Rebroadcasting is
448        // good for network reliability and should not be prevented for the already existing proposed_batch.
449        // If a certificate already exists for the current round, an attempt should be made to advance the
450        // round as early as possible.
451        if round == *lock_guard {
452            warn!("Primary is safely skipping a batch proposal - round {round} already proposed");
453            return Ok(());
454        }
455
456        // Retrieve the committee to check against.
457        let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
458        // Check if the primary is connected to enough validators to reach quorum threshold.
459        {
460            // Retrieve the connected validator addresses.
461            let mut connected_validators = self.gateway.connected_addresses();
462            // Append the primary to the set.
463            connected_validators.insert(self.gateway.account().address());
464            // If quorum threshold is not reached, return early.
465            if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
466                debug!(
467                    "Primary is safely skipping a batch proposal for round {round} {}",
468                    "(please connect to more validators)".dimmed()
469                );
470                trace!("Primary is connected to {} validators", connected_validators.len() - 1);
471                return Ok(());
472            }
473        }
474
475        // Retrieve the previous certificates.
476        let previous_certificates = self.storage.get_certificates_for_round(previous_round);
477
478        // Check if the batch is ready to be proposed.
479        // Note: The primary starts at round 1, and round 0 contains no certificates, by definition.
480        let mut is_ready = previous_round == 0;
481        // If the previous round is not 0, check if the previous certificates have reached the quorum threshold.
482        if previous_round > 0 {
483            // Retrieve the committee lookback for the round.
484            let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
485                bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
486            };
487            // Construct a set over the authors.
488            let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
489            // Check if the previous certificates have reached the quorum threshold.
490            if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
491                is_ready = true;
492            }
493        }
494        // If the batch is not ready to be proposed, return early.
495        if !is_ready {
496            debug!(
497                "Primary is safely skipping a batch proposal for round {round} {}",
498                format!("(previous round {previous_round} has not reached quorum)").dimmed()
499            );
500            return Ok(());
501        }
502
503        // Initialize the map of transmissions.
504        let mut transmissions: IndexMap<_, _> = Default::default();
505        // Track the total execution costs of the batch proposal as it is being constructed.
506        let mut proposal_cost = 0u64;
507        // Note: worker draining and transaction inclusion needs to be thought
508        // through carefully when there is more than one worker. The fairness
509        // provided by one worker (FIFO) is no longer guaranteed with multiple workers.
510        debug_assert_eq!(MAX_WORKERS, 1);
511
512        'outer: for worker in self.workers().iter() {
513            let mut num_worker_transmissions = 0usize;
514
515            while let Some((id, transmission)) = worker.remove_front() {
516                // Check the selected transmissions are below the batch limit.
517                if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
518                    break 'outer;
519                }
520
521                // Check the max transmissions per worker is not exceeded.
522                if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER {
523                    continue 'outer;
524                }
525
526                // Check if the ledger already contains the transmission.
527                if self.ledger.contains_transmission(&id).unwrap_or(true) {
528                    trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
529                    continue;
530                }
531
532                // Check if the storage already contain the transmission.
533                // Note: We do not skip if this is the first transmission in the proposal, to ensure that
534                // the primary does not propose a batch with no transmissions.
535                if !transmissions.is_empty() && self.storage.contains_transmission(id) {
536                    trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
537                    continue;
538                }
539
540                // Check the transmission is still valid.
541                match (id, transmission.clone()) {
542                    (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
543                        // Ensure the checksum matches. If not, skip the solution.
544                        if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum)
545                        {
546                            trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id));
547                            continue;
548                        }
549                        // Check if the solution is still valid.
550                        if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
551                            trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
552                            continue;
553                        }
554                    }
555                    (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
556                        // Ensure the checksum matches. If not, skip the transaction.
557                        if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum )
558                        {
559                            trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id));
560                            continue;
561                        }
562
563                        // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
564                        let transaction = spawn_blocking!({
565                            match transaction {
566                                Data::Object(transaction) => Ok(transaction),
567                                Data::Buffer(bytes) => {
568                                    Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
569                                }
570                            }
571                        })?;
572
573                        // Check if the transaction is still valid.
574                        // TODO: check if clone is cheap, otherwise fix.
575                        if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction.clone()).await {
576                            trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
577                            continue;
578                        }
579
580                        // Compute the transaction spent cost (in microcredits).
581                        // Note: We purposefully discard this transaction if we are unable to compute the spent cost.
582                        let Ok(cost) = self.ledger.transaction_spent_cost_in_microcredits(transaction_id, transaction)
583                        else {
584                            debug!(
585                                "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost",
586                                fmt_id(transaction_id)
587                            );
588                            continue;
589                        };
590
591                        // Compute the next proposal cost.
592                        // Note: We purposefully discard this transaction if the proposal cost overflows.
593                        let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
594                            debug!(
595                                "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed",
596                                fmt_id(transaction_id)
597                            );
598                            continue;
599                        };
600
601                        // Check if the next proposal cost exceeds the batch proposal spend limit.
602                        if next_proposal_cost > BatchHeader::<N>::BATCH_SPEND_LIMIT {
603                            trace!(
604                                "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})",
605                                fmt_id(transaction_id),
606                                BatchHeader::<N>::BATCH_SPEND_LIMIT
607                            );
608
609                            // Reinsert the transmission into the worker.
610                            worker.insert_front(id, transmission);
611                            break 'outer;
612                        }
613
614                        // Update the proposal cost.
615                        proposal_cost = next_proposal_cost;
616                    }
617
618                    // Note: We explicitly forbid including ratifications,
619                    // as the protocol currently does not support ratifications.
620                    (TransmissionID::Ratification, Transmission::Ratification) => continue,
621                    // All other combinations are clearly invalid.
622                    _ => continue,
623                }
624
625                // If the transmission is valid, insert it into the proposal's transmission list.
626                transmissions.insert(id, transmission);
627                num_worker_transmissions = num_worker_transmissions.saturating_add(1);
628            }
629        }
630
631        // Determine the current timestamp.
632        let current_timestamp = now();
633
634        *lock_guard = round;
635
636        /* Proceeding to sign & propose the batch. */
637        info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
638
639        // Retrieve the private key.
640        let private_key = *self.gateway.account().private_key();
641        // Retrieve the committee ID.
642        let committee_id = committee_lookback.id();
643        // Prepare the transmission IDs.
644        let transmission_ids = transmissions.keys().copied().collect();
645        // Prepare the previous batch certificate IDs.
646        let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
647        // Sign the batch header and construct the proposal.
648        let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
649            &private_key,
650            round,
651            current_timestamp,
652            committee_id,
653            transmission_ids,
654            previous_certificate_ids,
655            &mut rand::thread_rng()
656        ))
657        .and_then(|batch_header| {
658            Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
659                .map(|proposal| (batch_header, proposal))
660        })
661        .inspect_err(|_| {
662            // On error, reinsert the transmissions and then propagate the error.
663            if let Err(e) = self.reinsert_transmissions_into_workers(transmissions) {
664                error!("Failed to reinsert transmissions: {e:?}");
665            }
666        })?;
667        // Broadcast the batch to all validators for signing.
668        self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
669        // Set the timestamp of the latest proposed batch.
670        *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
671        // Set the proposed batch.
672        *self.proposed_batch.write() = Some(proposal);
673        Ok(())
674    }
675
676    /// Processes a batch propose from a peer.
677    ///
678    /// This method performs the following steps:
679    /// 1. Verify the batch.
680    /// 2. Sign the batch.
681    /// 3. Broadcast the signature back to the validator.
682    ///
683    /// If our primary is ahead of the peer, we will not sign the batch.
684    /// If our primary is behind the peer, but within GC range, we will sync up to the peer's round, and then sign the batch.
685    async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
686        let BatchPropose { round: batch_round, batch_header } = batch_propose;
687
688        // Deserialize the batch header.
689        let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
690        // Ensure the round matches in the batch header.
691        if batch_round != batch_header.round() {
692            // Proceed to disconnect the validator.
693            self.gateway.disconnect(peer_ip);
694            bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
695        }
696
697        // Retrieve the batch author.
698        let batch_author = batch_header.author();
699
700        // Ensure the batch proposal is from the validator.
701        match self.gateway.resolver().get_address(peer_ip) {
702            // If the peer is a validator, then ensure the batch proposal is from the validator.
703            Some(address) => {
704                if address != batch_author {
705                    // Proceed to disconnect the validator.
706                    self.gateway.disconnect(peer_ip);
707                    bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
708                }
709            }
710            None => bail!("Batch proposal from a disconnected validator"),
711        }
712        // Ensure the batch author is a current committee member.
713        if !self.gateway.is_authorized_validator_address(batch_author) {
714            // Proceed to disconnect the validator.
715            self.gateway.disconnect(peer_ip);
716            bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
717        }
718        // Ensure the batch proposal is not from the current primary.
719        if self.gateway.account().address() == batch_author {
720            bail!("Invalid peer - proposed batch from myself ({batch_author})");
721        }
722
723        // Ensure that the batch proposal's committee ID matches the expected committee ID.
724        let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
725        if expected_committee_id != batch_header.committee_id() {
726            // Proceed to disconnect the validator.
727            self.gateway.disconnect(peer_ip);
728            bail!(
729                "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
730                batch_header.committee_id()
731            );
732        }
733
734        // Retrieve the cached round and batch ID for this validator.
735        if let Some((signed_round, signed_batch_id, signature)) =
736            self.signed_proposals.read().get(&batch_author).copied()
737        {
738            // If the signed round is ahead of the peer's batch round, do not sign the proposal.
739            // Note: while this may be valid behavior, additional formal analysis and testing will need to be done before allowing it.
740            if signed_round > batch_header.round() {
741                bail!(
742                    "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}",
743                    batch_header.round()
744                );
745            }
746
747            // If the round matches and the batch ID differs, then the validator is malicious.
748            if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
749                bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
750            }
751            // If the round and batch ID matches, then skip signing the batch a second time.
752            // Instead, rebroadcast the cached signature to the peer.
753            if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
754                let gateway = self.gateway.clone();
755                tokio::spawn(async move {
756                    debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
757                    let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
758                    // Resend the batch signature to the peer.
759                    if gateway.send(peer_ip, event).await.is_none() {
760                        warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
761                    }
762                });
763                // Return early.
764                return Ok(());
765            }
766        }
767
768        // Ensure that the batch header doesn't already exist in storage.
769        // Note this is already checked in `check_batch_header`, however we can return early here without creating a blocking task.
770        if self.storage.contains_batch(batch_header.batch_id()) {
771            debug!(
772                "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
773                format!("batch for round {batch_round} already exists in storage").dimmed()
774            );
775            return Ok(());
776        }
777
778        // Compute the previous round.
779        let previous_round = batch_round.saturating_sub(1);
780        // Ensure that the peer did not propose a batch too quickly.
781        if let Err(e) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
782            // Proceed to disconnect the validator.
783            self.gateway.disconnect(peer_ip);
784            bail!("Malicious peer - {e} from '{peer_ip}'");
785        }
786
787        // Ensure the batch header does not contain any ratifications.
788        if batch_header.contains(TransmissionID::Ratification) {
789            // Proceed to disconnect the validator.
790            self.gateway.disconnect(peer_ip);
791            bail!(
792                "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'",
793            );
794        }
795
796        // If the peer is ahead, use the batch header to sync up to the peer.
797        let mut missing_transmissions = self.sync_with_batch_header_from_peer::<false>(peer_ip, &batch_header).await?;
798
799        // Check that the transmission ids match and are not fee transactions.
800        if let Err(err) = cfg_iter_mut!(missing_transmissions).try_for_each(|(transmission_id, transmission)| {
801            // If the transmission is not well-formed, then return early.
802            self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
803        }) {
804            debug!("Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission - {err}",);
805            return Ok(());
806        }
807
808        // Ensure the batch is for the current round.
809        // This method must be called after fetching previous certificates (above),
810        // and prior to checking the batch header (below).
811        if let Err(e) = self.ensure_is_signing_round(batch_round) {
812            // If the primary is not signing for the peer's round, then return early.
813            debug!("{e} from '{peer_ip}'");
814            return Ok(());
815        }
816
817        // Ensure the batch header from the peer is valid.
818        let (storage, header) = (self.storage.clone(), batch_header.clone());
819        let missing_transmissions =
820            spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?;
821        // Inserts the missing transmissions into the workers.
822        self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
823
824        // Ensure the transaction doesn't bring the proposal above the spend limit.
825        let block_height = self.ledger.latest_block_height() + 1;
826        if N::CONSENSUS_VERSION(block_height)? >= ConsensusVersion::V5 {
827            let mut proposal_cost = 0u64;
828            for transmission_id in batch_header.transmission_ids() {
829                let worker_id = assign_to_worker(*transmission_id, self.num_workers())?;
830                let Some(worker) = self.workers.get(worker_id as usize) else {
831                    debug!("Unable to find worker {worker_id}");
832                    return Ok(());
833                };
834
835                let Some(transmission) = worker.get_transmission(*transmission_id) else {
836                    debug!("Unable to find transmission '{}' in worker '{worker_id}", fmt_id(transmission_id));
837                    return Ok(());
838                };
839
840                // If the transmission is a transaction, compute its execution cost.
841                if let (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) =
842                    (transmission_id, transmission)
843                {
844                    // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
845                    let transaction = spawn_blocking!({
846                        match transaction {
847                            Data::Object(transaction) => Ok(transaction),
848                            Data::Buffer(bytes) => {
849                                Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
850                            }
851                        }
852                    })?;
853
854                    // Compute the transaction spent cost (in microcredits).
855                    // Note: We purposefully discard this transaction if we are unable to compute the spent cost.
856                    let Ok(cost) = self.ledger.transaction_spent_cost_in_microcredits(*transaction_id, transaction)
857                    else {
858                        bail!(
859                            "Invalid batch proposal - Unable to compute transaction spent cost on transaction '{}'",
860                            fmt_id(transaction_id)
861                        )
862                    };
863
864                    // Compute the next proposal cost.
865                    // Note: We purposefully discard this transaction if the proposal cost overflows.
866                    let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
867                        bail!(
868                            "Invalid batch proposal - Batch proposal overflowed on transaction '{}'",
869                            fmt_id(transaction_id)
870                        )
871                    };
872
873                    // Check if the next proposal cost exceeds the batch proposal spend limit.
874                    if next_proposal_cost > BatchHeader::<N>::BATCH_SPEND_LIMIT {
875                        bail!(
876                            "Malicious peer - Batch proposal from '{peer_ip}' exceeds the spend limit on transaction '{}' ({next_proposal_cost} > {})",
877                            fmt_id(transaction_id),
878                            BatchHeader::<N>::BATCH_SPEND_LIMIT
879                        );
880                    }
881
882                    // Update the proposal cost.
883                    proposal_cost = next_proposal_cost;
884                }
885            }
886        }
887
888        /* Proceeding to sign the batch. */
889
890        // Retrieve the batch ID.
891        let batch_id = batch_header.batch_id();
892        // Sign the batch ID.
893        let account = self.gateway.account().clone();
894        let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::thread_rng()))?;
895
896        // Ensure the proposal has not already been signed.
897        //
898        // Note: Due to the need to sync the batch header with the peer, it is possible
899        // for the primary to receive the same 'BatchPropose' event again, whereby only
900        // one instance of this handler should sign the batch. This check guarantees this.
901        match self.signed_proposals.write().0.entry(batch_author) {
902            std::collections::hash_map::Entry::Occupied(mut entry) => {
903                // If the validator has already signed a batch for this round, then return early,
904                // since, if the peer still has not received the signature, they will request it again,
905                // and the logic at the start of this function will resend the (now cached) signature
906                // to the peer if asked to sign this batch proposal again.
907                if entry.get().0 == batch_round {
908                    return Ok(());
909                }
910                // Otherwise, cache the round, batch ID, and signature for this validator.
911                entry.insert((batch_round, batch_id, signature));
912            }
913            // If the validator has not signed a batch before, then continue.
914            std::collections::hash_map::Entry::Vacant(entry) => {
915                // Cache the round, batch ID, and signature for this validator.
916                entry.insert((batch_round, batch_id, signature));
917            }
918        };
919
920        // Broadcast the signature back to the validator.
921        let self_ = self.clone();
922        tokio::spawn(async move {
923            let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
924            // Send the batch signature to the peer.
925            if self_.gateway.send(peer_ip, event).await.is_some() {
926                debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
927            }
928        });
929
930        Ok(())
931    }
932
933    /// Processes a batch signature from a peer.
934    ///
935    /// This method performs the following steps:
936    /// 1. Ensure the proposed batch has not expired.
937    /// 2. Verify the signature, ensuring it corresponds to the proposed batch.
938    /// 3. Store the signature.
939    /// 4. Certify the batch if enough signatures have been received.
940    /// 5. Broadcast the batch certificate to all validators.
941    async fn process_batch_signature_from_peer(
942        &self,
943        peer_ip: SocketAddr,
944        batch_signature: BatchSignature<N>,
945    ) -> Result<()> {
946        // Ensure the proposed batch has not expired, and clear the proposed batch if it has expired.
947        self.check_proposed_batch_for_expiration().await?;
948
949        // Retrieve the signature and timestamp.
950        let BatchSignature { batch_id, signature } = batch_signature;
951
952        // Retrieve the signer.
953        let signer = signature.to_address();
954
955        // Ensure the batch signature is signed by the validator.
956        if self.gateway.resolver().get_address(peer_ip).map_or(true, |address| address != signer) {
957            // Proceed to disconnect the validator.
958            self.gateway.disconnect(peer_ip);
959            bail!("Malicious peer - batch signature is from a different validator ({signer})");
960        }
961        // Ensure the batch signature is not from the current primary.
962        if self.gateway.account().address() == signer {
963            bail!("Invalid peer - received a batch signature from myself ({signer})");
964        }
965
966        let self_ = self.clone();
967        let Some(proposal) = spawn_blocking!({
968            // Acquire the write lock.
969            let mut proposed_batch = self_.proposed_batch.write();
970            // Add the signature to the batch, and determine if the batch is ready to be certified.
971            match proposed_batch.as_mut() {
972                Some(proposal) => {
973                    // Ensure the batch ID matches the currently proposed batch ID.
974                    if proposal.batch_id() != batch_id {
975                        match self_.storage.contains_batch(batch_id) {
976                            // If this batch was already certified, return early.
977                            true => {
978                                debug!(
979                                    "Primary is safely skipping a a batch signature from {peer_ip} for round {} - batch is already certified",
980                                    proposal.round()
981                                );
982                                return Ok(None);
983                            }
984                            // If the batch ID is unknown, return an error.
985                            false => bail!(
986                                "Unknown batch ID '{batch_id}', expected '{}' for round {}",
987                                proposal.batch_id(),
988                                proposal.round()
989                            ),
990                        }
991                    }
992                    // Retrieve the committee lookback for the round.
993                    let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?;
994                    // Retrieve the address of the validator.
995                    let Some(signer) = self_.gateway.resolver().get_address(peer_ip) else {
996                        bail!("Signature is from a disconnected validator");
997                    };
998                    // Add the signature to the batch.
999                    proposal.add_signature(signer, signature, &committee_lookback)?;
1000                    info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
1001                    // Check if the batch is ready to be certified.
1002                    if !proposal.is_quorum_threshold_reached(&committee_lookback) {
1003                        // If the batch is not ready to be certified, return early.
1004                        return Ok(None);
1005                    }
1006                }
1007                // There is no proposed batch, so return early.
1008                None => return Ok(None),
1009            };
1010            // Retrieve the batch proposal, clearing the proposed batch.
1011            match proposed_batch.take() {
1012                Some(proposal) => Ok(Some(proposal)),
1013                None => Ok(None),
1014            }
1015        })?
1016        else {
1017            return Ok(());
1018        };
1019
1020        /* Proceeding to certify the batch. */
1021
1022        info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
1023
1024        // Retrieve the committee lookback for the round.
1025        let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1026        // Store the certified batch and broadcast it to all validators.
1027        // If there was an error storing the certificate, reinsert the transmissions back into the ready queue.
1028        if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
1029            // Reinsert the transmissions back into the ready queue for the next proposal.
1030            self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1031            return Err(e);
1032        }
1033
1034        #[cfg(feature = "metrics")]
1035        metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
1036        Ok(())
1037    }
1038
1039    /// Processes a batch certificate from a peer.
1040    ///
1041    /// This method performs the following steps:
1042    /// 1. Stores the given batch certificate, after ensuring it is valid.
1043    /// 2. If there are enough certificates to reach quorum threshold for the current round,
1044    ///     then proceed to advance to the next round.
1045    async fn process_batch_certificate_from_peer(
1046        &self,
1047        peer_ip: SocketAddr,
1048        certificate: BatchCertificate<N>,
1049    ) -> Result<()> {
1050        // Ensure the batch certificate is from an authorized validator.
1051        if !self.gateway.is_authorized_validator_ip(peer_ip) {
1052            // Proceed to disconnect the validator.
1053            self.gateway.disconnect(peer_ip);
1054            bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
1055        }
1056        // Ensure storage does not already contain the certificate.
1057        if self.storage.contains_certificate(certificate.id()) {
1058            return Ok(());
1059        // Otherwise, ensure ephemeral storage contains the certificate.
1060        } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
1061            self.storage.insert_unprocessed_certificate(certificate.clone())?;
1062        }
1063
1064        // Retrieve the batch certificate author.
1065        let author = certificate.author();
1066        // Retrieve the batch certificate round.
1067        let certificate_round = certificate.round();
1068        // Retrieve the batch certificate committee ID.
1069        let committee_id = certificate.committee_id();
1070
1071        // Ensure the batch certificate is not from the current primary.
1072        if self.gateway.account().address() == author {
1073            bail!("Received a batch certificate for myself ({author})");
1074        }
1075
1076        // Ensure that the incoming certificate is valid.
1077        self.storage.check_incoming_certificate(&certificate)?;
1078
1079        // Store the certificate, after ensuring it is valid above.
1080        // The following call recursively fetches and stores
1081        // the previous certificates referenced from this certificate.
1082        // It is critical to make the following call this after validating the certificate above.
1083        // The reason is that a sequence of malformed certificates,
1084        // with references to previous certificates with non-decreasing rounds,
1085        // cause the recursive fetching of certificates to crash the validator due to resource exhaustion.
1086        // Note that if the following call, if not returning an error, guarantees the backward closure of the DAG
1087        // (i.e. that all the referenced previous certificates are in the DAG before storing this one),
1088        // then all the validity checks in [`Storage::check_certificate`] should be redundant.
1089        // TODO: eliminate those redundant checks
1090        self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
1091
1092        // If there are enough certificates to reach quorum threshold for the certificate round,
1093        // then proceed to advance to the next round.
1094
1095        // Retrieve the committee lookback.
1096        let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
1097
1098        // Retrieve the certificate authors.
1099        let authors = self.storage.get_certificate_authors_for_round(certificate_round);
1100        // Check if the certificates have reached the quorum threshold.
1101        let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
1102
1103        // Ensure that the batch certificate's committee ID matches the expected committee ID.
1104        let expected_committee_id = committee_lookback.id();
1105        if expected_committee_id != committee_id {
1106            // Proceed to disconnect the validator.
1107            self.gateway.disconnect(peer_ip);
1108            bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
1109        }
1110
1111        // Determine if we are currently proposing a round that is relevant.
1112        // Note: This is important, because while our peers have advanced,
1113        // they may not be proposing yet, and thus still able to sign our proposed batch.
1114        let should_advance = match &*self.proposed_batch.read() {
1115            // We advance if the proposal round is less than the current round that was just certified.
1116            Some(proposal) => proposal.round() < certificate_round,
1117            // If there's no proposal, we consider advancing.
1118            None => true,
1119        };
1120
1121        // Retrieve the current round.
1122        let current_round = self.current_round();
1123
1124        // Determine whether to advance to the next round.
1125        if is_quorum && should_advance && certificate_round >= current_round {
1126            // If we have reached the quorum threshold and the round should advance, then proceed to the next round.
1127            self.try_increment_to_the_next_round(current_round + 1).await?;
1128        }
1129        Ok(())
1130    }
1131}
1132
1133impl<N: Network> Primary<N> {
1134    /// Starts the primary handlers.
1135    fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
1136        let PrimaryReceiver {
1137            mut rx_batch_propose,
1138            mut rx_batch_signature,
1139            mut rx_batch_certified,
1140            mut rx_primary_ping,
1141            mut rx_unconfirmed_solution,
1142            mut rx_unconfirmed_transaction,
1143        } = primary_receiver;
1144
1145        // Start the primary ping.
1146        let self_ = self.clone();
1147        self.spawn(async move {
1148            loop {
1149                // Sleep briefly.
1150                tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
1151
1152                // Retrieve the block locators.
1153                let self__ = self_.clone();
1154                let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1155                    Ok(block_locators) => block_locators,
1156                    Err(e) => {
1157                        warn!("Failed to retrieve block locators - {e}");
1158                        continue;
1159                    }
1160                };
1161
1162                // Retrieve the latest certificate of the primary.
1163                let primary_certificate = {
1164                    // Retrieve the primary address.
1165                    let primary_address = self_.gateway.account().address();
1166
1167                    // Iterate backwards from the latest round to find the primary certificate.
1168                    let mut certificate = None;
1169                    let mut current_round = self_.current_round();
1170                    while certificate.is_none() {
1171                        // If the current round is 0, then break the while loop.
1172                        if current_round == 0 {
1173                            break;
1174                        }
1175                        // Retrieve the primary certificates.
1176                        if let Some(primary_certificate) =
1177                            self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1178                        {
1179                            certificate = Some(primary_certificate);
1180                        // If the primary certificate was not found, decrement the round.
1181                        } else {
1182                            current_round = current_round.saturating_sub(1);
1183                        }
1184                    }
1185
1186                    // Determine if the primary certificate was found.
1187                    match certificate {
1188                        Some(certificate) => certificate,
1189                        // Skip this iteration of the loop (do not send a primary ping).
1190                        None => continue,
1191                    }
1192                };
1193
1194                // Construct the primary ping.
1195                let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1196                // Broadcast the event.
1197                self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1198            }
1199        });
1200
1201        // Start the primary ping handler.
1202        let self_ = self.clone();
1203        self.spawn(async move {
1204            while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1205                // If the primary is not synced, then do not process the primary ping.
1206                if !self_.sync.is_synced() {
1207                    trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1208                    continue;
1209                }
1210
1211                // Spawn a task to process the primary certificate.
1212                {
1213                    let self_ = self_.clone();
1214                    tokio::spawn(async move {
1215                        // Deserialize the primary certificate in the primary ping.
1216                        let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1217                        else {
1218                            warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1219                            return;
1220                        };
1221                        // Process the primary certificate.
1222                        let id = fmt_id(primary_certificate.id());
1223                        let round = primary_certificate.round();
1224                        if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1225                            warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}");
1226                        }
1227                    });
1228                }
1229            }
1230        });
1231
1232        // Start the worker ping(s).
1233        let self_ = self.clone();
1234        self.spawn(async move {
1235            loop {
1236                tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await;
1237                // If the primary is not synced, then do not broadcast the worker ping(s).
1238                if !self_.sync.is_synced() {
1239                    trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1240                    continue;
1241                }
1242                // Broadcast the worker ping(s).
1243                for worker in self_.workers.iter() {
1244                    worker.broadcast_ping();
1245                }
1246            }
1247        });
1248
1249        // Start the batch proposer.
1250        let self_ = self.clone();
1251        self.spawn(async move {
1252            loop {
1253                // Sleep briefly, but longer than if there were no batch.
1254                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1255                let current_round = self_.current_round();
1256                // If the primary is not synced, then do not propose a batch.
1257                if !self_.sync.is_synced() {
1258                    debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed());
1259                    continue;
1260                }
1261                // A best-effort attempt to skip the scheduled batch proposal if
1262                // round progression already triggered one.
1263                if self_.propose_lock.try_lock().is_err() {
1264                    trace!(
1265                        "Skipping batch proposal for round {current_round} {}",
1266                        "(node is already proposing)".dimmed()
1267                    );
1268                    continue;
1269                };
1270                // If there is no proposed batch, attempt to propose a batch.
1271                // Note: Do NOT spawn a task around this function call. Proposing a batch is a critical path,
1272                // and only one batch needs to be proposed at a time.
1273                if let Err(e) = self_.propose_batch().await {
1274                    warn!("Cannot propose a batch - {e}");
1275                }
1276            }
1277        });
1278
1279        // Process the proposed batch.
1280        let self_ = self.clone();
1281        self.spawn(async move {
1282            while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1283                // If the primary is not synced, then do not sign the batch.
1284                if !self_.sync.is_synced() {
1285                    trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1286                    continue;
1287                }
1288                // Spawn a task to process the proposed batch.
1289                let self_ = self_.clone();
1290                tokio::spawn(async move {
1291                    // Process the batch proposal.
1292                    let round = batch_propose.round;
1293                    if let Err(e) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1294                        warn!("Cannot sign a batch at round {round} from '{peer_ip}' - {e}");
1295                    }
1296                });
1297            }
1298        });
1299
1300        // Process the batch signature.
1301        let self_ = self.clone();
1302        self.spawn(async move {
1303            while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1304                // If the primary is not synced, then do not store the signature.
1305                if !self_.sync.is_synced() {
1306                    trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1307                    continue;
1308                }
1309                // Process the batch signature.
1310                // Note: Do NOT spawn a task around this function call. Processing signatures from peers
1311                // is a critical path, and we should only store the minimum required number of signatures.
1312                // In addition, spawning a task can cause concurrent processing of signatures (even with a lock),
1313                // which means the RwLock for the proposed batch must become a 'tokio::sync' to be safe.
1314                let id = fmt_id(batch_signature.batch_id);
1315                if let Err(e) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1316                    warn!("Cannot store a signature for batch '{id}' from '{peer_ip}' - {e}");
1317                }
1318            }
1319        });
1320
1321        // Process the certified batch.
1322        let self_ = self.clone();
1323        self.spawn(async move {
1324            while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1325                // If the primary is not synced, then do not store the certificate.
1326                if !self_.sync.is_synced() {
1327                    trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1328                    continue;
1329                }
1330                // Spawn a task to process the batch certificate.
1331                let self_ = self_.clone();
1332                tokio::spawn(async move {
1333                    // Deserialize the batch certificate.
1334                    let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1335                        warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1336                        return;
1337                    };
1338                    // Process the batch certificate.
1339                    let id = fmt_id(batch_certificate.id());
1340                    let round = batch_certificate.round();
1341                    if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1342                        warn!("Cannot store a certificate '{id}' for round {round} from '{peer_ip}' - {e}");
1343                    }
1344                });
1345            }
1346        });
1347
1348        // Periodically try to increment to the next round.
1349        // Note: This is necessary to ensure that the primary is not stuck on a previous round
1350        // despite having received enough certificates to advance to the next round.
1351        let self_ = self.clone();
1352        self.spawn(async move {
1353            loop {
1354                // Sleep briefly.
1355                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1356                // If the primary is not synced, then do not increment to the next round.
1357                if !self_.sync.is_synced() {
1358                    trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1359                    continue;
1360                }
1361                // Attempt to increment to the next round.
1362                let next_round = self_.current_round().saturating_add(1);
1363                // Determine if the quorum threshold is reached for the current round.
1364                let is_quorum_threshold_reached = {
1365                    // Retrieve the certificate authors for the next round.
1366                    let authors = self_.storage.get_certificate_authors_for_round(next_round);
1367                    // If there are no certificates, then skip this check.
1368                    if authors.is_empty() {
1369                        continue;
1370                    }
1371                    let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(next_round) else {
1372                        warn!("Failed to retrieve the committee lookback for round {next_round}");
1373                        continue;
1374                    };
1375                    committee_lookback.is_quorum_threshold_reached(&authors)
1376                };
1377                // Attempt to increment to the next round if the quorum threshold is reached.
1378                if is_quorum_threshold_reached {
1379                    debug!("Quorum threshold reached for round {}", next_round);
1380                    if let Err(e) = self_.try_increment_to_the_next_round(next_round).await {
1381                        warn!("Failed to increment to the next round - {e}");
1382                    }
1383                }
1384            }
1385        });
1386
1387        // Process the unconfirmed solutions.
1388        let self_ = self.clone();
1389        self.spawn(async move {
1390            while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1391                // Compute the checksum for the solution.
1392                let Ok(checksum) = solution.to_checksum::<N>() else {
1393                    error!("Failed to compute the checksum for the unconfirmed solution");
1394                    continue;
1395                };
1396                // Compute the worker ID.
1397                let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1398                    error!("Unable to determine the worker ID for the unconfirmed solution");
1399                    continue;
1400                };
1401                let self_ = self_.clone();
1402                tokio::spawn(async move {
1403                    // Retrieve the worker.
1404                    let worker = &self_.workers[worker_id as usize];
1405                    // Process the unconfirmed solution.
1406                    let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1407                    // Send the result to the callback.
1408                    callback.send(result).ok();
1409                });
1410            }
1411        });
1412
1413        // Process the unconfirmed transactions.
1414        let self_ = self.clone();
1415        self.spawn(async move {
1416            while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1417                trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1418                // Compute the checksum for the transaction.
1419                let Ok(checksum) = transaction.to_checksum::<N>() else {
1420                    error!("Failed to compute the checksum for the unconfirmed transaction");
1421                    continue;
1422                };
1423                // Compute the worker ID.
1424                let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1425                    error!("Unable to determine the worker ID for the unconfirmed transaction");
1426                    continue;
1427                };
1428                let self_ = self_.clone();
1429                tokio::spawn(async move {
1430                    // Retrieve the worker.
1431                    let worker = &self_.workers[worker_id as usize];
1432                    // Process the unconfirmed transaction.
1433                    let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1434                    // Send the result to the callback.
1435                    callback.send(result).ok();
1436                });
1437            }
1438        });
1439    }
1440
1441    /// Checks if the proposed batch is expired, and clears the proposed batch if it has expired.
1442    async fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1443        // Check if the proposed batch is timed out or stale.
1444        let is_expired = match self.proposed_batch.read().as_ref() {
1445            Some(proposal) => proposal.round() < self.current_round(),
1446            None => false,
1447        };
1448        // If the batch is expired, clear the proposed batch.
1449        if is_expired {
1450            // Reset the proposed batch.
1451            let proposal = self.proposed_batch.write().take();
1452            if let Some(proposal) = proposal {
1453                debug!("Cleared expired proposal for round {}", proposal.round());
1454                self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1455            }
1456        }
1457        Ok(())
1458    }
1459
1460    /// Increments to the next round.
1461    async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1462        // If the next round is within GC range, then iterate to the penultimate round.
1463        if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1464            let mut fast_forward_round = self.current_round();
1465            // Iterate until the penultimate round is reached.
1466            while fast_forward_round < next_round.saturating_sub(1) {
1467                // Update to the next round in storage.
1468                fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1469                // Clear the proposed batch.
1470                *self.proposed_batch.write() = None;
1471            }
1472        }
1473
1474        // Retrieve the current round.
1475        let current_round = self.current_round();
1476        // Attempt to advance to the next round.
1477        if current_round < next_round {
1478            // If a BFT sender was provided, send the current round to the BFT.
1479            let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
1480                match bft_sender.send_primary_round_to_bft(current_round).await {
1481                    Ok(is_ready) => is_ready,
1482                    Err(e) => {
1483                        warn!("Failed to update the BFT to the next round - {e}");
1484                        return Err(e);
1485                    }
1486                }
1487            }
1488            // Otherwise, handle the Narwhal case.
1489            else {
1490                // Update to the next round in storage.
1491                self.storage.increment_to_next_round(current_round)?;
1492                // Set 'is_ready' to 'true'.
1493                true
1494            };
1495
1496            // Log whether the next round is ready.
1497            match is_ready {
1498                true => debug!("Primary is ready to propose the next round"),
1499                false => debug!("Primary is not ready to propose the next round"),
1500            }
1501
1502            // If the node is ready, propose a batch for the next round.
1503            if is_ready {
1504                self.propose_batch().await?;
1505            }
1506        }
1507        Ok(())
1508    }
1509
1510    /// Ensures the primary is signing for the specified batch round.
1511    /// This method is used to ensure: for a given round, as soon as the primary starts proposing,
1512    /// it will no longer sign for the previous round (as it has enough previous certificates to proceed).
1513    fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1514        // Retrieve the current round.
1515        let current_round = self.current_round();
1516        // Ensure the batch round is within GC range of the current round.
1517        if current_round + self.storage.max_gc_rounds() <= batch_round {
1518            bail!("Round {batch_round} is too far in the future")
1519        }
1520        // Ensure the batch round is at or one before the current round.
1521        // Intuition: Our primary has moved on to the next round, but has not necessarily started proposing,
1522        // so we can still sign for the previous round. If we have started proposing, the next check will fail.
1523        if current_round > batch_round + 1 {
1524            bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1525        }
1526        // Check if the primary is still signing for the batch round.
1527        if let Some(signing_round) = self.proposed_batch.read().as_ref().map(|proposal| proposal.round()) {
1528            if signing_round > batch_round {
1529                bail!("Our primary at round {signing_round} is no longer signing for round {batch_round}")
1530            }
1531        }
1532        Ok(())
1533    }
1534
1535    /// Ensure the primary is not creating batch proposals too frequently.
1536    /// This checks that the certificate timestamp for the previous round is within the expected range.
1537    fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1538        // Retrieve the timestamp of the previous timestamp to check against.
1539        let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1540            // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
1541            Some(certificate) => certificate.timestamp(),
1542            None => match self.gateway.account().address() == author {
1543                // If we are the author, then ensure the previous proposal was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
1544                true => *self.latest_proposed_batch_timestamp.read(),
1545                // If we do not see a previous certificate for the author, then proceed optimistically.
1546                false => return Ok(()),
1547            },
1548        };
1549
1550        // Determine the elapsed time since the previous timestamp.
1551        let elapsed = timestamp
1552            .checked_sub(previous_timestamp)
1553            .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1554        // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
1555        match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1556            true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1557            false => Ok(()),
1558        }
1559    }
1560
1561    /// Stores the certified batch and broadcasts it to all validators, returning the certificate.
1562    async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1563        // Create the batch certificate and transmissions.
1564        let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1565        // Convert the transmissions into a HashMap.
1566        // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
1567        let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1568        // Store the certified batch.
1569        let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1570        spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1571        debug!("Stored a batch certificate for round {}", certificate.round());
1572        // If a BFT sender was provided, send the certificate to the BFT.
1573        if let Some(bft_sender) = self.bft_sender.get() {
1574            // Await the callback to continue.
1575            if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1576                warn!("Failed to update the BFT DAG from primary - {e}");
1577                return Err(e);
1578            };
1579        }
1580        // Broadcast the certified batch to all validators.
1581        self.gateway.broadcast(Event::BatchCertified(certificate.clone().into()));
1582        // Log the certified batch.
1583        let num_transmissions = certificate.transmission_ids().len();
1584        let round = certificate.round();
1585        info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1586        // Increment to the next round.
1587        self.try_increment_to_the_next_round(round + 1).await
1588    }
1589
1590    /// Inserts the missing transmissions from the proposal into the workers.
1591    fn insert_missing_transmissions_into_workers(
1592        &self,
1593        peer_ip: SocketAddr,
1594        transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1595    ) -> Result<()> {
1596        // Insert the transmissions into the workers.
1597        assign_to_workers(&self.workers, transmissions, |worker, transmission_id, transmission| {
1598            worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1599        })
1600    }
1601
1602    /// Re-inserts the transmissions from the proposal into the workers.
1603    fn reinsert_transmissions_into_workers(
1604        &self,
1605        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1606    ) -> Result<()> {
1607        // Re-insert the transmissions into the workers.
1608        assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1609            worker.reinsert(transmission_id, transmission);
1610        })
1611    }
1612
1613    /// Recursively stores a given batch certificate, after ensuring:
1614    ///   - Ensure the round matches the committee round.
1615    ///   - Ensure the address is a member of the committee.
1616    ///   - Ensure the timestamp is within range.
1617    ///   - Ensure we have all of the transmissions.
1618    ///   - Ensure we have all of the previous certificates.
1619    ///   - Ensure the previous certificates are for the previous round (i.e. round - 1).
1620    ///   - Ensure the previous certificates have reached the quorum threshold.
1621    ///   - Ensure we have not already signed the batch ID.
1622    #[async_recursion::async_recursion]
1623    async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1624        &self,
1625        peer_ip: SocketAddr,
1626        certificate: BatchCertificate<N>,
1627    ) -> Result<()> {
1628        // Retrieve the batch header.
1629        let batch_header = certificate.batch_header();
1630        // Retrieve the batch round.
1631        let batch_round = batch_header.round();
1632
1633        // If the certificate round is outdated, do not store it.
1634        if batch_round <= self.storage.gc_round() {
1635            return Ok(());
1636        }
1637        // If the certificate already exists in storage, return early.
1638        if self.storage.contains_certificate(certificate.id()) {
1639            return Ok(());
1640        }
1641
1642        // If node is not in sync mode and the node is not synced. Then return an error.
1643        if !IS_SYNCING && !self.is_synced() {
1644            bail!(
1645                "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1646                fmt_id(certificate.id())
1647            );
1648        }
1649
1650        // If the peer is ahead, use the batch header to sync up to the peer.
1651        let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?;
1652
1653        // Check if the certificate needs to be stored.
1654        if !self.storage.contains_certificate(certificate.id()) {
1655            // Store the batch certificate.
1656            let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1657            spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1658            debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1659            // If a BFT sender was provided, send the round and certificate to the BFT.
1660            if let Some(bft_sender) = self.bft_sender.get() {
1661                // Send the certificate to the BFT.
1662                if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1663                    warn!("Failed to update the BFT DAG from sync: {e}");
1664                    return Err(e);
1665                };
1666            }
1667        }
1668        Ok(())
1669    }
1670
1671    /// Recursively syncs using the given batch header.
1672    async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>(
1673        &self,
1674        peer_ip: SocketAddr,
1675        batch_header: &BatchHeader<N>,
1676    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1677        // Retrieve the batch round.
1678        let batch_round = batch_header.round();
1679
1680        // If the certificate round is outdated, do not store it.
1681        if batch_round <= self.storage.gc_round() {
1682            bail!("Round {batch_round} is too far in the past")
1683        }
1684
1685        // If node is not in sync mode and the node is not synced, then return an error.
1686        if !IS_SYNCING && !self.is_synced() {
1687            bail!(
1688                "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1689                fmt_id(batch_header.batch_id())
1690            );
1691        }
1692
1693        // Determine if quorum threshold is reached on the batch round.
1694        let is_quorum_threshold_reached = {
1695            let authors = self.storage.get_certificate_authors_for_round(batch_round);
1696            let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1697            committee_lookback.is_quorum_threshold_reached(&authors)
1698        };
1699
1700        // Check if our primary should move to the next round.
1701        // Note: Checking that quorum threshold is reached is important for mitigating a race condition,
1702        // whereby Narwhal requires N-f, however the BFT only requires f+1. Without this check, the primary
1703        // will advance to the next round assuming f+1, not N-f, which can lead to a network stall.
1704        let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1705        // Check if our primary is far behind the peer.
1706        let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1707        // If our primary is far behind the peer, update our committee to the batch round.
1708        if is_behind_schedule || is_peer_far_in_future {
1709            // If the batch round is greater than the current committee round, update the committee.
1710            self.try_increment_to_the_next_round(batch_round).await?;
1711        }
1712
1713        // Ensure the primary has all of the transmissions.
1714        let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1715
1716        // Ensure the primary has all of the previous certificates.
1717        let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1718
1719        // Wait for the missing transmissions and previous certificates to be fetched.
1720        let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1721            missing_transmissions_handle,
1722            missing_previous_certificates_handle,
1723        ).map_err(|e| {
1724            anyhow!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}' - {e}")
1725        })?;
1726
1727        // Iterate through the missing previous certificates.
1728        for batch_certificate in missing_previous_certificates {
1729            // Store the batch certificate (recursively fetching any missing previous certificates).
1730            self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?;
1731        }
1732        Ok(missing_transmissions)
1733    }
1734
1735    /// Fetches any missing transmissions for the specified batch header.
1736    /// If a transmission does not exist, it will be fetched from the specified peer IP.
1737    async fn fetch_missing_transmissions(
1738        &self,
1739        peer_ip: SocketAddr,
1740        batch_header: &BatchHeader<N>,
1741    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1742        // If the round is <= the GC round, return early.
1743        if batch_header.round() <= self.storage.gc_round() {
1744            return Ok(Default::default());
1745        }
1746
1747        // Ensure this batch ID is new, otherwise return early.
1748        if self.storage.contains_batch(batch_header.batch_id()) {
1749            trace!("Batch for round {} from peer has already been processed", batch_header.round());
1750            return Ok(Default::default());
1751        }
1752
1753        // Retrieve the workers.
1754        let workers = self.workers.clone();
1755
1756        // Initialize a list for the transmissions.
1757        let mut fetch_transmissions = FuturesUnordered::new();
1758
1759        // Retrieve the number of workers.
1760        let num_workers = self.num_workers();
1761        // Iterate through the transmission IDs.
1762        for transmission_id in batch_header.transmission_ids() {
1763            // If the transmission does not exist in storage, proceed to fetch the transmission.
1764            if !self.storage.contains_transmission(*transmission_id) {
1765                // Determine the worker ID.
1766                let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1767                    bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1768                };
1769                // Retrieve the worker.
1770                let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1771                // Push the callback onto the list.
1772                fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
1773            }
1774        }
1775
1776        // Initialize a set for the transmissions.
1777        let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1778        // Wait for all of the transmissions to be fetched.
1779        while let Some(result) = fetch_transmissions.next().await {
1780            // Retrieve the transmission.
1781            let (transmission_id, transmission) = result?;
1782            // Insert the transmission into the set.
1783            transmissions.insert(transmission_id, transmission);
1784        }
1785        // Return the transmissions.
1786        Ok(transmissions)
1787    }
1788
1789    /// Fetches any missing previous certificates for the specified batch header from the specified peer.
1790    async fn fetch_missing_previous_certificates(
1791        &self,
1792        peer_ip: SocketAddr,
1793        batch_header: &BatchHeader<N>,
1794    ) -> Result<HashSet<BatchCertificate<N>>> {
1795        // Retrieve the round.
1796        let round = batch_header.round();
1797        // If the previous round is 0, or is <= the GC round, return early.
1798        if round == 1 || round <= self.storage.gc_round() + 1 {
1799            return Ok(Default::default());
1800        }
1801
1802        // Fetch the missing previous certificates.
1803        let missing_previous_certificates =
1804            self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
1805        if !missing_previous_certificates.is_empty() {
1806            debug!(
1807                "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
1808                missing_previous_certificates.len(),
1809            );
1810        }
1811        // Return the missing previous certificates.
1812        Ok(missing_previous_certificates)
1813    }
1814
1815    /// Fetches any missing certificates for the specified batch header from the specified peer.
1816    async fn fetch_missing_certificates(
1817        &self,
1818        peer_ip: SocketAddr,
1819        round: u64,
1820        certificate_ids: &IndexSet<Field<N>>,
1821    ) -> Result<HashSet<BatchCertificate<N>>> {
1822        // Initialize a list for the missing certificates.
1823        let mut fetch_certificates = FuturesUnordered::new();
1824        // Initialize a set for the missing certificates.
1825        let mut missing_certificates = HashSet::default();
1826        // Iterate through the certificate IDs.
1827        for certificate_id in certificate_ids {
1828            // Check if the certificate already exists in the ledger.
1829            if self.ledger.contains_certificate(certificate_id)? {
1830                continue;
1831            }
1832            // Check if the certificate already exists in storage.
1833            if self.storage.contains_certificate(*certificate_id) {
1834                continue;
1835            }
1836            // If we have not fully processed the certificate yet, store it.
1837            if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
1838                missing_certificates.insert(certificate);
1839            } else {
1840                // If we do not have the certificate, request it.
1841                trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
1842                // TODO (howardwu): Limit the number of open requests we send to a peer.
1843                // Send an certificate request to the peer.
1844                fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
1845            }
1846        }
1847
1848        // If there are no certificates to fetch, return early with the existing unprocessed certificates.
1849        match fetch_certificates.is_empty() {
1850            true => return Ok(missing_certificates),
1851            false => trace!(
1852                "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
1853                fetch_certificates.len(),
1854            ),
1855        }
1856
1857        // Wait for all of the missing certificates to be fetched.
1858        while let Some(result) = fetch_certificates.next().await {
1859            // Insert the missing certificate into the set.
1860            missing_certificates.insert(result?);
1861        }
1862        // Return the missing certificates.
1863        Ok(missing_certificates)
1864    }
1865}
1866
1867impl<N: Network> Primary<N> {
1868    /// Spawns a task with the given future; it should only be used for long-running tasks.
1869    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1870        self.handles.lock().push(tokio::spawn(future));
1871    }
1872
1873    /// Shuts down the primary.
1874    pub async fn shut_down(&self) {
1875        info!("Shutting down the primary...");
1876        // Shut down the workers.
1877        self.workers.iter().for_each(|worker| worker.shut_down());
1878        // Abort the tasks.
1879        self.handles.lock().iter().for_each(|handle| handle.abort());
1880        // Save the current proposal cache to disk.
1881        let proposal_cache = {
1882            let proposal = self.proposed_batch.write().take();
1883            let signed_proposals = self.signed_proposals.read().clone();
1884            let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
1885            let pending_certificates = self.storage.get_pending_certificates();
1886            ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
1887        };
1888        if let Err(err) = proposal_cache.store(&self.storage_mode) {
1889            error!("Failed to store the current proposal cache: {err}");
1890        }
1891        // Close the gateway.
1892        self.gateway.shut_down().await;
1893    }
1894}
1895
1896#[cfg(test)]
1897mod tests {
1898    use super::*;
1899    use snarkos_node_bft_ledger_service::MockLedgerService;
1900    use snarkos_node_bft_storage_service::BFTMemoryService;
1901    use snarkvm::{
1902        ledger::{
1903            committee::{Committee, MIN_VALIDATOR_STAKE},
1904            ledger_test_helpers::sample_execution_transaction_with_fee,
1905        },
1906        prelude::{Address, Signature},
1907    };
1908
1909    use bytes::Bytes;
1910    use indexmap::IndexSet;
1911    use rand::RngCore;
1912
1913    type CurrentNetwork = snarkvm::prelude::MainnetV0;
1914
1915    fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) {
1916        // Create a committee containing the primary's account.
1917        const COMMITTEE_SIZE: usize = 4;
1918        let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
1919        let mut members = IndexMap::new();
1920
1921        for i in 0..COMMITTEE_SIZE {
1922            let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
1923            let account = Account::new(rng).unwrap();
1924
1925            members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
1926            accounts.push((socket_addr, account));
1927        }
1928
1929        (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
1930    }
1931
1932    // Returns a primary and a list of accounts in the configured committee.
1933    fn primary_with_committee(
1934        account_index: usize,
1935        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1936        committee: Committee<CurrentNetwork>,
1937        height: u32,
1938    ) -> Primary<CurrentNetwork> {
1939        let ledger = Arc::new(MockLedgerService::new_at_height(committee, height));
1940        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1941
1942        // Initialize the primary.
1943        let account = accounts[account_index].1.clone();
1944        let mut primary = Primary::new(account, storage, ledger, None, &[], StorageMode::new_test(None)).unwrap();
1945
1946        // Construct a worker instance.
1947        primary.workers = Arc::from([Worker::new(
1948            0, // id
1949            Arc::new(primary.gateway.clone()),
1950            primary.storage.clone(),
1951            primary.ledger.clone(),
1952            primary.proposed_batch.clone(),
1953        )
1954        .unwrap()]);
1955        for a in accounts.iter().skip(account_index) {
1956            primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
1957        }
1958
1959        primary
1960    }
1961
1962    fn primary_without_handlers(
1963        rng: &mut TestRng,
1964    ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
1965        let (accounts, committee) = sample_committee(rng);
1966        let primary = primary_with_committee(
1967            0, // index of primary's account
1968            &accounts,
1969            committee,
1970            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(),
1971        );
1972
1973        (primary, accounts)
1974    }
1975
1976    // Creates a mock solution.
1977    fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
1978        // Sample a random fake solution ID.
1979        let solution_id = rng.gen::<u64>().into();
1980        // Vary the size of the solutions.
1981        let size = rng.gen_range(1024..10 * 1024);
1982        // Sample random fake solution bytes.
1983        let mut vec = vec![0u8; size];
1984        rng.fill_bytes(&mut vec);
1985        let solution = Data::Buffer(Bytes::from(vec));
1986        // Return the solution ID and solution.
1987        (solution_id, solution)
1988    }
1989
1990    // Samples a test transaction.
1991    fn sample_unconfirmed_transaction(
1992        rng: &mut TestRng,
1993    ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
1994        let transaction = sample_execution_transaction_with_fee(false, rng);
1995        let id = transaction.id();
1996
1997        (id, Data::Object(transaction))
1998    }
1999
2000    // Creates a batch proposal with one solution and one transaction.
2001    fn create_test_proposal(
2002        author: &Account<CurrentNetwork>,
2003        committee: Committee<CurrentNetwork>,
2004        round: u64,
2005        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2006        timestamp: i64,
2007        num_transactions: u64,
2008        rng: &mut TestRng,
2009    ) -> Proposal<CurrentNetwork> {
2010        let mut transmission_ids = IndexSet::new();
2011        let mut transmissions = IndexMap::new();
2012
2013        // Prepare the solution and insert into the sets.
2014        let (solution_id, solution) = sample_unconfirmed_solution(rng);
2015        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2016        let solution_transmission_id = (solution_id, solution_checksum).into();
2017        transmission_ids.insert(solution_transmission_id);
2018        transmissions.insert(solution_transmission_id, Transmission::Solution(solution));
2019
2020        // Prepare the transactions and insert into the sets.
2021        for _ in 0..num_transactions {
2022            let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2023            let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2024            let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2025            transmission_ids.insert(transaction_transmission_id);
2026            transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction));
2027        }
2028
2029        // Retrieve the private key.
2030        let private_key = author.private_key();
2031        // Sign the batch header.
2032        let batch_header = BatchHeader::new(
2033            private_key,
2034            round,
2035            timestamp,
2036            committee.id(),
2037            transmission_ids,
2038            previous_certificate_ids,
2039            rng,
2040        )
2041        .unwrap();
2042        // Construct the proposal.
2043        Proposal::new(committee, batch_header, transmissions).unwrap()
2044    }
2045
2046    // Creates a signature of the primary's current proposal for each committee member (excluding
2047    // the primary).
2048    fn peer_signatures_for_proposal(
2049        primary: &Primary<CurrentNetwork>,
2050        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2051        rng: &mut TestRng,
2052    ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
2053        // Each committee member signs the batch.
2054        let mut signatures = Vec::with_capacity(accounts.len() - 1);
2055        for (socket_addr, account) in accounts {
2056            if account.address() == primary.gateway.account().address() {
2057                continue;
2058            }
2059            let batch_id = primary.proposed_batch.read().as_ref().unwrap().batch_id();
2060            let signature = account.sign(&[batch_id], rng).unwrap();
2061            signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
2062        }
2063
2064        signatures
2065    }
2066
2067    /// Creates a signature of the batch ID for each committee member (excluding the primary).
2068    fn peer_signatures_for_batch(
2069        primary_address: Address<CurrentNetwork>,
2070        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2071        batch_id: Field<CurrentNetwork>,
2072        rng: &mut TestRng,
2073    ) -> IndexSet<Signature<CurrentNetwork>> {
2074        let mut signatures = IndexSet::new();
2075        for (_, account) in accounts {
2076            if account.address() == primary_address {
2077                continue;
2078            }
2079            let signature = account.sign(&[batch_id], rng).unwrap();
2080            signatures.insert(signature);
2081        }
2082        signatures
2083    }
2084
2085    // Creates a batch certificate.
2086    fn create_batch_certificate(
2087        primary_address: Address<CurrentNetwork>,
2088        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2089        round: u64,
2090        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2091        rng: &mut TestRng,
2092    ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
2093        let timestamp = now();
2094
2095        let author =
2096            accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
2097        let private_key = author.private_key();
2098
2099        let committee_id = Field::rand(rng);
2100        let (solution_id, solution) = sample_unconfirmed_solution(rng);
2101        let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2102        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2103        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2104
2105        let solution_transmission_id = (solution_id, solution_checksum).into();
2106        let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2107
2108        let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
2109        let transmissions = [
2110            (solution_transmission_id, Transmission::Solution(solution)),
2111            (transaction_transmission_id, Transmission::Transaction(transaction)),
2112        ]
2113        .into();
2114
2115        let batch_header = BatchHeader::new(
2116            private_key,
2117            round,
2118            timestamp,
2119            committee_id,
2120            transmission_ids,
2121            previous_certificate_ids,
2122            rng,
2123        )
2124        .unwrap();
2125        let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
2126        let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
2127        (certificate, transmissions)
2128    }
2129
2130    // Create a certificate chain up to round in primary storage.
2131    fn store_certificate_chain(
2132        primary: &Primary<CurrentNetwork>,
2133        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2134        round: u64,
2135        rng: &mut TestRng,
2136    ) -> IndexSet<Field<CurrentNetwork>> {
2137        let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2138        let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2139        for cur_round in 1..round {
2140            for (_, account) in accounts.iter() {
2141                let (certificate, transmissions) = create_batch_certificate(
2142                    account.address(),
2143                    accounts,
2144                    cur_round,
2145                    previous_certificates.clone(),
2146                    rng,
2147                );
2148                next_certificates.insert(certificate.id());
2149                assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
2150            }
2151
2152            assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
2153            previous_certificates = next_certificates;
2154            next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2155        }
2156
2157        previous_certificates
2158    }
2159
2160    // Insert the account socket addresses into the resolver so that
2161    // they are recognized as "connected".
2162    fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
2163        // First account is primary, which doesn't need to resolve.
2164        for (addr, acct) in accounts.iter().skip(1) {
2165            primary.gateway.resolver().insert_peer(*addr, *addr, acct.address());
2166        }
2167    }
2168
2169    #[tokio::test]
2170    async fn test_propose_batch() {
2171        let mut rng = TestRng::default();
2172        let (primary, _) = primary_without_handlers(&mut rng);
2173
2174        // Check there is no batch currently proposed.
2175        assert!(primary.proposed_batch.read().is_none());
2176
2177        // Generate a solution and a transaction.
2178        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2179        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2180
2181        // Store it on one of the workers.
2182        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2183        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2184
2185        // Try to propose a batch again. This time, it should succeed.
2186        assert!(primary.propose_batch().await.is_ok());
2187        assert!(primary.proposed_batch.read().is_some());
2188    }
2189
2190    #[tokio::test]
2191    async fn test_propose_batch_with_no_transmissions() {
2192        let mut rng = TestRng::default();
2193        let (primary, _) = primary_without_handlers(&mut rng);
2194
2195        // Check there is no batch currently proposed.
2196        assert!(primary.proposed_batch.read().is_none());
2197
2198        // Try to propose a batch with no transmissions.
2199        assert!(primary.propose_batch().await.is_ok());
2200        assert!(primary.proposed_batch.read().is_some());
2201    }
2202
2203    #[tokio::test]
2204    async fn test_propose_batch_in_round() {
2205        let round = 3;
2206        let mut rng = TestRng::default();
2207        let (primary, accounts) = primary_without_handlers(&mut rng);
2208
2209        // Fill primary storage.
2210        store_certificate_chain(&primary, &accounts, round, &mut rng);
2211
2212        // Sleep for a while to ensure the primary is ready to propose the next round.
2213        tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2214
2215        // Generate a solution and a transaction.
2216        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2217        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2218
2219        // Store it on one of the workers.
2220        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2221        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2222
2223        // Propose a batch again. This time, it should succeed.
2224        assert!(primary.propose_batch().await.is_ok());
2225        assert!(primary.proposed_batch.read().is_some());
2226    }
2227
2228    #[tokio::test]
2229    async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2230        let round = 3;
2231        let prev_round = round - 1;
2232        let mut rng = TestRng::default();
2233        let (primary, accounts) = primary_without_handlers(&mut rng);
2234        let peer_account = &accounts[1];
2235        let peer_ip = peer_account.0;
2236
2237        // Fill primary storage.
2238        store_certificate_chain(&primary, &accounts, round, &mut rng);
2239
2240        // Get transmissions from previous certificates.
2241        let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2242
2243        // Track the number of transmissions in the previous round.
2244        let mut num_transmissions_in_previous_round = 0;
2245
2246        // Generate a solution and a transaction.
2247        let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2248        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2249        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2250        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2251
2252        // Store it on one of the workers.
2253        primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2254        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2255
2256        // Check that the worker has 2 transmissions.
2257        assert_eq!(primary.workers[0].num_transmissions(), 2);
2258
2259        // Create certificates for the current round and add the transmissions to the worker before inserting the certificate to storage.
2260        for (_, account) in accounts.iter() {
2261            let (certificate, transmissions) = create_batch_certificate(
2262                account.address(),
2263                &accounts,
2264                round,
2265                previous_certificate_ids.clone(),
2266                &mut rng,
2267            );
2268
2269            // Add the transmissions to the worker.
2270            for (transmission_id, transmission) in transmissions.iter() {
2271                primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2272            }
2273
2274            // Insert the certificate to storage.
2275            num_transmissions_in_previous_round += transmissions.len();
2276            primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2277        }
2278
2279        // Sleep for a while to ensure the primary is ready to propose the next round.
2280        tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2281
2282        // Advance to the next round.
2283        assert!(primary.storage.increment_to_next_round(round).is_ok());
2284
2285        // Check that the worker has `num_transmissions_in_previous_round + 2` transmissions.
2286        assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2287
2288        // Propose the batch.
2289        assert!(primary.propose_batch().await.is_ok());
2290
2291        // Check that the proposal only contains the new transmissions that were not in previous certificates.
2292        let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone();
2293        assert_eq!(proposed_transmissions.len(), 2);
2294        assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2295        assert!(
2296            proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))
2297        );
2298    }
2299
2300    #[tokio::test]
2301    async fn test_propose_batch_over_spend_limit() {
2302        let mut rng = TestRng::default();
2303
2304        // Create a primary to test spend limit backwards compatibility with V4.
2305        let (accounts, committee) = sample_committee(&mut rng);
2306        let primary = primary_with_committee(
2307            0,
2308            &accounts,
2309            committee.clone(),
2310            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2311        );
2312
2313        // Check there is no batch currently proposed.
2314        assert!(primary.proposed_batch.read().is_none());
2315        // Check the workers are empty.
2316        primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty()));
2317
2318        // Generate a solution and a transaction.
2319        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2320        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2321
2322        for _i in 0..5 {
2323            let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2324            // Store it on one of the workers.
2325            primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2326        }
2327
2328        // Try to propose a batch again. This time, it should succeed.
2329        assert!(primary.propose_batch().await.is_ok());
2330        // Expect 2/5 transactions to be included in the proposal in addition to the solution.
2331        assert_eq!(primary.proposed_batch.read().as_ref().unwrap().transmissions().len(), 3);
2332        // Check the transmissions were correctly drained from the workers.
2333        assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3);
2334    }
2335
2336    #[tokio::test]
2337    async fn test_batch_propose_from_peer() {
2338        let mut rng = TestRng::default();
2339        let (primary, accounts) = primary_without_handlers(&mut rng);
2340
2341        // Create a valid proposal with an author that isn't the primary.
2342        let round = 1;
2343        let peer_account = &accounts[1];
2344        let peer_ip = peer_account.0;
2345        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2346        let proposal = create_test_proposal(
2347            &peer_account.1,
2348            primary.ledger.current_committee().unwrap(),
2349            round,
2350            Default::default(),
2351            timestamp,
2352            1,
2353            &mut rng,
2354        );
2355
2356        // Make sure the primary is aware of the transmissions in the proposal.
2357        for (transmission_id, transmission) in proposal.transmissions() {
2358            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2359        }
2360
2361        // The author must be known to resolver to pass propose checks.
2362        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2363        // The primary must be considered synced.
2364        primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2365
2366        // Try to process the batch proposal from the peer, should succeed.
2367        assert!(
2368            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
2369        );
2370    }
2371
2372    #[tokio::test]
2373    async fn test_batch_propose_from_peer_when_not_synced() {
2374        let mut rng = TestRng::default();
2375        let (primary, accounts) = primary_without_handlers(&mut rng);
2376
2377        // Create a valid proposal with an author that isn't the primary.
2378        let round = 1;
2379        let peer_account = &accounts[1];
2380        let peer_ip = peer_account.0;
2381        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2382        let proposal = create_test_proposal(
2383            &peer_account.1,
2384            primary.ledger.current_committee().unwrap(),
2385            round,
2386            Default::default(),
2387            timestamp,
2388            1,
2389            &mut rng,
2390        );
2391
2392        // Make sure the primary is aware of the transmissions in the proposal.
2393        for (transmission_id, transmission) in proposal.transmissions() {
2394            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2395        }
2396
2397        // The author must be known to resolver to pass propose checks.
2398        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2399
2400        // Try to process the batch proposal from the peer, should fail.
2401        assert!(
2402            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2403        );
2404    }
2405
2406    #[tokio::test]
2407    async fn test_batch_propose_from_peer_in_round() {
2408        let round = 2;
2409        let mut rng = TestRng::default();
2410        let (primary, accounts) = primary_without_handlers(&mut rng);
2411
2412        // Generate certificates.
2413        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2414
2415        // Create a valid proposal with an author that isn't the primary.
2416        let peer_account = &accounts[1];
2417        let peer_ip = peer_account.0;
2418        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2419        let proposal = create_test_proposal(
2420            &peer_account.1,
2421            primary.ledger.current_committee().unwrap(),
2422            round,
2423            previous_certificates,
2424            timestamp,
2425            1,
2426            &mut rng,
2427        );
2428
2429        // Make sure the primary is aware of the transmissions in the proposal.
2430        for (transmission_id, transmission) in proposal.transmissions() {
2431            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2432        }
2433
2434        // The author must be known to resolver to pass propose checks.
2435        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2436        // The primary must be considered synced.
2437        primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2438
2439        // Try to process the batch proposal from the peer, should succeed.
2440        primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2441    }
2442
2443    #[tokio::test]
2444    async fn test_batch_propose_from_peer_wrong_round() {
2445        let mut rng = TestRng::default();
2446        let (primary, accounts) = primary_without_handlers(&mut rng);
2447
2448        // Create a valid proposal with an author that isn't the primary.
2449        let round = 1;
2450        let peer_account = &accounts[1];
2451        let peer_ip = peer_account.0;
2452        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2453        let proposal = create_test_proposal(
2454            &peer_account.1,
2455            primary.ledger.current_committee().unwrap(),
2456            round,
2457            Default::default(),
2458            timestamp,
2459            1,
2460            &mut rng,
2461        );
2462
2463        // Make sure the primary is aware of the transmissions in the proposal.
2464        for (transmission_id, transmission) in proposal.transmissions() {
2465            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2466        }
2467
2468        // The author must be known to resolver to pass propose checks.
2469        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2470        // The primary must be considered synced.
2471        primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2472
2473        // Try to process the batch proposal from the peer, should error.
2474        assert!(
2475            primary
2476                .process_batch_propose_from_peer(peer_ip, BatchPropose {
2477                    round: round + 1,
2478                    batch_header: Data::Object(proposal.batch_header().clone())
2479                })
2480                .await
2481                .is_err()
2482        );
2483    }
2484
2485    #[tokio::test]
2486    async fn test_batch_propose_from_peer_in_round_wrong_round() {
2487        let round = 4;
2488        let mut rng = TestRng::default();
2489        let (primary, accounts) = primary_without_handlers(&mut rng);
2490
2491        // Generate certificates.
2492        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2493
2494        // Create a valid proposal with an author that isn't the primary.
2495        let peer_account = &accounts[1];
2496        let peer_ip = peer_account.0;
2497        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2498        let proposal = create_test_proposal(
2499            &peer_account.1,
2500            primary.ledger.current_committee().unwrap(),
2501            round,
2502            previous_certificates,
2503            timestamp,
2504            1,
2505            &mut rng,
2506        );
2507
2508        // Make sure the primary is aware of the transmissions in the proposal.
2509        for (transmission_id, transmission) in proposal.transmissions() {
2510            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2511        }
2512
2513        // The author must be known to resolver to pass propose checks.
2514        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2515        // The primary must be considered synced.
2516        primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2517
2518        // Try to process the batch proposal from the peer, should error.
2519        assert!(
2520            primary
2521                .process_batch_propose_from_peer(peer_ip, BatchPropose {
2522                    round: round + 1,
2523                    batch_header: Data::Object(proposal.batch_header().clone())
2524                })
2525                .await
2526                .is_err()
2527        );
2528    }
2529
2530    #[tokio::test]
2531    async fn test_batch_propose_from_peer_with_past_timestamp() {
2532        let round = 2;
2533        let mut rng = TestRng::default();
2534        let (primary, accounts) = primary_without_handlers(&mut rng);
2535
2536        // Generate certificates.
2537        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2538
2539        // Create a valid proposal with an author that isn't the primary.
2540        let peer_account = &accounts[1];
2541        let peer_ip = peer_account.0;
2542        let past_timestamp = now() - 100; // Use a timestamp that is in the past.
2543        let proposal = create_test_proposal(
2544            &peer_account.1,
2545            primary.ledger.current_committee().unwrap(),
2546            round,
2547            previous_certificates,
2548            past_timestamp,
2549            1,
2550            &mut rng,
2551        );
2552
2553        // Make sure the primary is aware of the transmissions in the proposal.
2554        for (transmission_id, transmission) in proposal.transmissions() {
2555            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2556        }
2557
2558        // The author must be known to resolver to pass propose checks.
2559        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2560        // The primary must be considered synced.
2561        primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2562
2563        // Try to process the batch proposal from the peer, should error.
2564        assert!(
2565            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2566        );
2567    }
2568
2569    #[tokio::test]
2570    async fn test_batch_propose_from_peer_over_spend_limit() {
2571        let mut rng = TestRng::default();
2572
2573        // Create two primaries to test spend limit activation on V5.
2574        let (accounts, committee) = sample_committee(&mut rng);
2575        let primary_v4 = primary_with_committee(
2576            0,
2577            &accounts,
2578            committee.clone(),
2579            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2580        );
2581        let primary_v5 = primary_with_committee(
2582            1,
2583            &accounts,
2584            committee.clone(),
2585            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V5).unwrap(),
2586        );
2587
2588        // Create a valid proposal with an author that isn't the primary.
2589        let round = 1;
2590        let peer_account = &accounts[2];
2591        let peer_ip = peer_account.0;
2592        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2593        let proposal =
2594            create_test_proposal(&peer_account.1, committee, round, Default::default(), timestamp, 4, &mut rng);
2595
2596        // Make sure the primary is aware of the transmissions in the proposal.
2597        for (transmission_id, transmission) in proposal.transmissions() {
2598            primary_v4.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2599            primary_v5.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2600        }
2601
2602        // The author must be known to resolver to pass propose checks.
2603        primary_v4.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2604        primary_v5.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2605        // The primary must be considered synced.
2606        primary_v4.sync.block_sync().try_block_sync(&primary_v4.gateway.clone()).await;
2607        primary_v5.sync.block_sync().try_block_sync(&primary_v5.gateway.clone()).await;
2608
2609        // Check the spend limit is enforced from V5 onwards.
2610        assert!(
2611            primary_v4
2612                .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2613                .await
2614                .is_ok()
2615        );
2616        assert!(
2617            primary_v5
2618                .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2619                .await
2620                .is_err()
2621        );
2622    }
2623
2624    #[tokio::test]
2625    async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2626        let round = 3;
2627        let mut rng = TestRng::default();
2628        let (primary, _) = primary_without_handlers(&mut rng);
2629
2630        // Check there is no batch currently proposed.
2631        assert!(primary.proposed_batch.read().is_none());
2632
2633        // Generate a solution and a transaction.
2634        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2635        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2636
2637        // Store it on one of the workers.
2638        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2639        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2640
2641        // Set the proposal lock to a round ahead of the storage.
2642        let old_proposal_lock_round = *primary.propose_lock.lock().await;
2643        *primary.propose_lock.lock().await = round + 1;
2644
2645        // Propose a batch and enforce that it fails.
2646        assert!(primary.propose_batch().await.is_ok());
2647        assert!(primary.proposed_batch.read().is_none());
2648
2649        // Set the proposal lock back to the old round.
2650        *primary.propose_lock.lock().await = old_proposal_lock_round;
2651
2652        // Try to propose a batch again. This time, it should succeed.
2653        assert!(primary.propose_batch().await.is_ok());
2654        assert!(primary.proposed_batch.read().is_some());
2655    }
2656
2657    #[tokio::test]
2658    async fn test_propose_batch_with_storage_round_behind_proposal() {
2659        let round = 5;
2660        let mut rng = TestRng::default();
2661        let (primary, accounts) = primary_without_handlers(&mut rng);
2662
2663        // Generate previous certificates.
2664        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2665
2666        // Create a valid proposal.
2667        let timestamp = now();
2668        let proposal = create_test_proposal(
2669            primary.gateway.account(),
2670            primary.ledger.current_committee().unwrap(),
2671            round + 1,
2672            previous_certificates,
2673            timestamp,
2674            1,
2675            &mut rng,
2676        );
2677
2678        // Store the proposal on the primary.
2679        *primary.proposed_batch.write() = Some(proposal);
2680
2681        // Try to propose a batch will terminate early because the storage is behind the proposal.
2682        assert!(primary.propose_batch().await.is_ok());
2683        assert!(primary.proposed_batch.read().is_some());
2684        assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
2685    }
2686
2687    #[tokio::test(flavor = "multi_thread")]
2688    async fn test_batch_signature_from_peer() {
2689        let mut rng = TestRng::default();
2690        let (primary, accounts) = primary_without_handlers(&mut rng);
2691        map_account_addresses(&primary, &accounts);
2692
2693        // Create a valid proposal.
2694        let round = 1;
2695        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2696        let proposal = create_test_proposal(
2697            primary.gateway.account(),
2698            primary.ledger.current_committee().unwrap(),
2699            round,
2700            Default::default(),
2701            timestamp,
2702            1,
2703            &mut rng,
2704        );
2705
2706        // Store the proposal on the primary.
2707        *primary.proposed_batch.write() = Some(proposal);
2708
2709        // Each committee member signs the batch.
2710        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2711
2712        // Have the primary process the signatures.
2713        for (socket_addr, signature) in signatures {
2714            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2715        }
2716
2717        // Check the certificate was created and stored by the primary.
2718        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2719        // Check the round was incremented.
2720        assert_eq!(primary.current_round(), round + 1);
2721    }
2722
2723    #[tokio::test(flavor = "multi_thread")]
2724    async fn test_batch_signature_from_peer_in_round() {
2725        let round = 5;
2726        let mut rng = TestRng::default();
2727        let (primary, accounts) = primary_without_handlers(&mut rng);
2728        map_account_addresses(&primary, &accounts);
2729
2730        // Generate certificates.
2731        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2732
2733        // Create a valid proposal.
2734        let timestamp = now();
2735        let proposal = create_test_proposal(
2736            primary.gateway.account(),
2737            primary.ledger.current_committee().unwrap(),
2738            round,
2739            previous_certificates,
2740            timestamp,
2741            1,
2742            &mut rng,
2743        );
2744
2745        // Store the proposal on the primary.
2746        *primary.proposed_batch.write() = Some(proposal);
2747
2748        // Each committee member signs the batch.
2749        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2750
2751        // Have the primary process the signatures.
2752        for (socket_addr, signature) in signatures {
2753            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2754        }
2755
2756        // Check the certificate was created and stored by the primary.
2757        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2758        // Check the round was incremented.
2759        assert_eq!(primary.current_round(), round + 1);
2760    }
2761
2762    #[tokio::test]
2763    async fn test_batch_signature_from_peer_no_quorum() {
2764        let mut rng = TestRng::default();
2765        let (primary, accounts) = primary_without_handlers(&mut rng);
2766        map_account_addresses(&primary, &accounts);
2767
2768        // Create a valid proposal.
2769        let round = 1;
2770        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2771        let proposal = create_test_proposal(
2772            primary.gateway.account(),
2773            primary.ledger.current_committee().unwrap(),
2774            round,
2775            Default::default(),
2776            timestamp,
2777            1,
2778            &mut rng,
2779        );
2780
2781        // Store the proposal on the primary.
2782        *primary.proposed_batch.write() = Some(proposal);
2783
2784        // Each committee member signs the batch.
2785        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2786
2787        // Have the primary process only one signature, mimicking a lack of quorum.
2788        let (socket_addr, signature) = signatures.first().unwrap();
2789        primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2790
2791        // Check the certificate was not created and stored by the primary.
2792        assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2793        // Check the round was incremented.
2794        assert_eq!(primary.current_round(), round);
2795    }
2796
2797    #[tokio::test]
2798    async fn test_batch_signature_from_peer_in_round_no_quorum() {
2799        let round = 7;
2800        let mut rng = TestRng::default();
2801        let (primary, accounts) = primary_without_handlers(&mut rng);
2802        map_account_addresses(&primary, &accounts);
2803
2804        // Generate certificates.
2805        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2806
2807        // Create a valid proposal.
2808        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2809        let proposal = create_test_proposal(
2810            primary.gateway.account(),
2811            primary.ledger.current_committee().unwrap(),
2812            round,
2813            previous_certificates,
2814            timestamp,
2815            1,
2816            &mut rng,
2817        );
2818
2819        // Store the proposal on the primary.
2820        *primary.proposed_batch.write() = Some(proposal);
2821
2822        // Each committee member signs the batch.
2823        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2824
2825        // Have the primary process only one signature, mimicking a lack of quorum.
2826        let (socket_addr, signature) = signatures.first().unwrap();
2827        primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2828
2829        // Check the certificate was not created and stored by the primary.
2830        assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2831        // Check the round was incremented.
2832        assert_eq!(primary.current_round(), round);
2833    }
2834
2835    #[tokio::test]
2836    async fn test_insert_certificate_with_aborted_transmissions() {
2837        let round = 3;
2838        let prev_round = round - 1;
2839        let mut rng = TestRng::default();
2840        let (primary, accounts) = primary_without_handlers(&mut rng);
2841        let peer_account = &accounts[1];
2842        let peer_ip = peer_account.0;
2843
2844        // Fill primary storage.
2845        store_certificate_chain(&primary, &accounts, round, &mut rng);
2846
2847        // Get transmissions from previous certificates.
2848        let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2849
2850        // Generate a solution and a transaction.
2851        let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2852        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2853
2854        // Store it on one of the workers.
2855        primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2856        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2857
2858        // Check that the worker has 2 transmissions.
2859        assert_eq!(primary.workers[0].num_transmissions(), 2);
2860
2861        // Create certificates for the current round.
2862        let account = accounts[0].1.clone();
2863        let (certificate, transmissions) =
2864            create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
2865        let certificate_id = certificate.id();
2866
2867        // Randomly abort some of the transmissions.
2868        let mut aborted_transmissions = HashSet::new();
2869        let mut transmissions_without_aborted = HashMap::new();
2870        for (transmission_id, transmission) in transmissions.clone() {
2871            match rng.gen::<bool>() || aborted_transmissions.is_empty() {
2872                true => {
2873                    // Insert the aborted transmission.
2874                    aborted_transmissions.insert(transmission_id);
2875                }
2876                false => {
2877                    // Insert the transmission without the aborted transmission.
2878                    transmissions_without_aborted.insert(transmission_id, transmission);
2879                }
2880            };
2881        }
2882
2883        // Add the non-aborted transmissions to the worker.
2884        for (transmission_id, transmission) in transmissions_without_aborted.iter() {
2885            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2886        }
2887
2888        // Check that inserting the transmission with missing transmissions fails.
2889        assert!(
2890            primary
2891                .storage
2892                .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
2893                .is_err()
2894        );
2895        assert!(
2896            primary
2897                .storage
2898                .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
2899                .is_err()
2900        );
2901
2902        // Insert the certificate to storage.
2903        primary
2904            .storage
2905            .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
2906            .unwrap();
2907
2908        // Ensure the certificate exists in storage.
2909        assert!(primary.storage.contains_certificate(certificate_id));
2910        // Ensure that the aborted transmission IDs exist in storage.
2911        for aborted_transmission_id in aborted_transmissions {
2912            assert!(primary.storage.contains_transmission(aborted_transmission_id));
2913            assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
2914        }
2915    }
2916}