snarkos_node_bft/
primary.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Gateway,
18    MAX_BATCH_DELAY_IN_MS,
19    MAX_WORKERS,
20    MIN_BATCH_DELAY_IN_SECS,
21    PRIMARY_PING_IN_MS,
22    Sync,
23    Transport,
24    WORKER_PING_IN_MS,
25    Worker,
26    events::{BatchPropose, BatchSignature, Event},
27    helpers::{
28        BFTSender,
29        PrimaryReceiver,
30        PrimarySender,
31        Proposal,
32        ProposalCache,
33        SignedProposals,
34        Storage,
35        assign_to_worker,
36        assign_to_workers,
37        fmt_id,
38        init_sync_channels,
39        init_worker_channels,
40        now,
41    },
42    spawn_blocking,
43};
44use snarkos_account::Account;
45use snarkos_node_bft_events::PrimaryPing;
46use snarkos_node_bft_ledger_service::LedgerService;
47use snarkos_node_sync::DUMMY_SELF_IP;
48use snarkvm::{
49    console::{
50        prelude::*,
51        types::{Address, Field},
52    },
53    ledger::{
54        block::Transaction,
55        narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
56        puzzle::{Solution, SolutionID},
57    },
58    prelude::committee::Committee,
59};
60
61use aleo_std::StorageMode;
62use colored::Colorize;
63use futures::stream::{FuturesUnordered, StreamExt};
64use indexmap::{IndexMap, IndexSet};
65#[cfg(feature = "locktick")]
66use locktick::{
67    parking_lot::{Mutex, RwLock},
68    tokio::Mutex as TMutex,
69};
70#[cfg(not(feature = "locktick"))]
71use parking_lot::{Mutex, RwLock};
72use rayon::prelude::*;
73use std::{
74    collections::{HashMap, HashSet},
75    future::Future,
76    net::SocketAddr,
77    sync::Arc,
78    time::Duration,
79};
80#[cfg(not(feature = "locktick"))]
81use tokio::sync::Mutex as TMutex;
82use tokio::{sync::OnceCell, task::JoinHandle};
83
84/// A helper type for an optional proposed batch.
85pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
86
87#[derive(Clone)]
88pub struct Primary<N: Network> {
89    /// The sync module.
90    sync: Sync<N>,
91    /// The gateway.
92    gateway: Gateway<N>,
93    /// The storage.
94    storage: Storage<N>,
95    /// The ledger service.
96    ledger: Arc<dyn LedgerService<N>>,
97    /// The workers.
98    workers: Arc<[Worker<N>]>,
99    /// The BFT sender.
100    bft_sender: Arc<OnceCell<BFTSender<N>>>,
101    /// The batch proposal, if the primary is currently proposing a batch.
102    proposed_batch: Arc<ProposedBatch<N>>,
103    /// The timestamp of the most recent proposed batch.
104    latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
105    /// The recently-signed batch proposals.
106    signed_proposals: Arc<RwLock<SignedProposals<N>>>,
107    /// The spawned handles.
108    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
109    /// The lock for propose_batch.
110    propose_lock: Arc<TMutex<u64>>,
111    /// The storage mode of the node.
112    storage_mode: StorageMode,
113}
114
115impl<N: Network> Primary<N> {
116    /// The maximum number of unconfirmed transmissions to send to the primary.
117    pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
118
119    /// Initializes a new primary instance.
120    pub fn new(
121        account: Account<N>,
122        storage: Storage<N>,
123        ledger: Arc<dyn LedgerService<N>>,
124        ip: Option<SocketAddr>,
125        trusted_validators: &[SocketAddr],
126        storage_mode: StorageMode,
127    ) -> Result<Self> {
128        // Initialize the gateway.
129        let gateway =
130            Gateway::new(account, storage.clone(), ledger.clone(), ip, trusted_validators, storage_mode.dev())?;
131        // Initialize the sync module.
132        let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone());
133
134        // Initialize the primary instance.
135        Ok(Self {
136            sync,
137            gateway,
138            storage,
139            ledger,
140            workers: Arc::from(vec![]),
141            bft_sender: Default::default(),
142            proposed_batch: Default::default(),
143            latest_proposed_batch_timestamp: Default::default(),
144            signed_proposals: Default::default(),
145            handles: Default::default(),
146            propose_lock: Default::default(),
147            storage_mode,
148        })
149    }
150
151    /// Load the proposal cache file and update the Primary state with the stored data.
152    async fn load_proposal_cache(&self) -> Result<()> {
153        // Fetch the signed proposals from the file system if it exists.
154        match ProposalCache::<N>::exists(&self.storage_mode) {
155            // If the proposal cache exists, then process the proposal cache.
156            true => match ProposalCache::<N>::load(self.gateway.account().address(), &self.storage_mode) {
157                Ok(proposal_cache) => {
158                    // Extract the proposal and signed proposals.
159                    let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
160                        proposal_cache.into();
161
162                    // Write the proposed batch.
163                    *self.proposed_batch.write() = proposed_batch;
164                    // Write the signed proposals.
165                    *self.signed_proposals.write() = signed_proposals;
166                    // Writ the propose lock.
167                    *self.propose_lock.lock().await = latest_certificate_round;
168
169                    // Update the storage with the pending certificates.
170                    for certificate in pending_certificates {
171                        let batch_id = certificate.batch_id();
172                        // We use a dummy IP because the node should not need to request from any peers.
173                        // The storage should have stored all the transmissions. If not, we simply
174                        // skip the certificate.
175                        if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
176                        {
177                            warn!("Failed to load stored certificate {} from proposal cache - {err}", fmt_id(batch_id));
178                        }
179                    }
180                    Ok(())
181                }
182                Err(err) => {
183                    bail!("Failed to read the signed proposals from the file system - {err}.");
184                }
185            },
186            // If the proposal cache does not exist, then return early.
187            false => Ok(()),
188        }
189    }
190
191    /// Run the primary instance.
192    pub async fn run(
193        &mut self,
194        bft_sender: Option<BFTSender<N>>,
195        primary_sender: PrimarySender<N>,
196        primary_receiver: PrimaryReceiver<N>,
197    ) -> Result<()> {
198        info!("Starting the primary instance of the memory pool...");
199
200        // Set the BFT sender.
201        if let Some(bft_sender) = &bft_sender {
202            // Set the BFT sender in the primary.
203            self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set");
204        }
205
206        // Construct a map of the worker senders.
207        let mut worker_senders = IndexMap::new();
208        // Construct a map for the workers.
209        let mut workers = Vec::new();
210        // Initialize the workers.
211        for id in 0..MAX_WORKERS {
212            // Construct the worker channels.
213            let (tx_worker, rx_worker) = init_worker_channels();
214            // Construct the worker instance.
215            let worker = Worker::new(
216                id,
217                Arc::new(self.gateway.clone()),
218                self.storage.clone(),
219                self.ledger.clone(),
220                self.proposed_batch.clone(),
221            )?;
222            // Run the worker instance.
223            worker.run(rx_worker);
224            // Add the worker to the list of workers.
225            workers.push(worker);
226            // Add the worker sender to the map.
227            worker_senders.insert(id, tx_worker);
228        }
229        // Set the workers.
230        self.workers = Arc::from(workers);
231
232        // First, initialize the sync channels.
233        let (sync_sender, sync_receiver) = init_sync_channels();
234        // Next, initialize the sync module and sync the storage from ledger.
235        self.sync.initialize(bft_sender).await?;
236        // Next, load and process the proposal cache before running the sync module.
237        self.load_proposal_cache().await?;
238        // Next, run the sync module.
239        self.sync.run(sync_receiver).await?;
240        // Next, initialize the gateway.
241        self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
242        // Lastly, start the primary handlers.
243        // Note: This ensures the primary does not start communicating before syncing is complete.
244        self.start_handlers(primary_receiver);
245
246        Ok(())
247    }
248
249    /// Returns the current round.
250    pub fn current_round(&self) -> u64 {
251        self.storage.current_round()
252    }
253
254    /// Returns `true` if the primary is synced.
255    pub fn is_synced(&self) -> bool {
256        self.sync.is_synced()
257    }
258
259    /// Returns the gateway.
260    pub const fn gateway(&self) -> &Gateway<N> {
261        &self.gateway
262    }
263
264    /// Returns the storage.
265    pub const fn storage(&self) -> &Storage<N> {
266        &self.storage
267    }
268
269    /// Returns the ledger.
270    pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
271        &self.ledger
272    }
273
274    /// Returns the number of workers.
275    pub fn num_workers(&self) -> u8 {
276        u8::try_from(self.workers.len()).expect("Too many workers")
277    }
278
279    /// Returns the workers.
280    pub const fn workers(&self) -> &Arc<[Worker<N>]> {
281        &self.workers
282    }
283
284    /// Returns the batch proposal of our primary, if one currently exists.
285    pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
286        &self.proposed_batch
287    }
288}
289
290impl<N: Network> Primary<N> {
291    /// Returns the number of unconfirmed transmissions.
292    pub fn num_unconfirmed_transmissions(&self) -> usize {
293        self.workers.iter().map(|worker| worker.num_transmissions()).sum()
294    }
295
296    /// Returns the number of unconfirmed ratifications.
297    pub fn num_unconfirmed_ratifications(&self) -> usize {
298        self.workers.iter().map(|worker| worker.num_ratifications()).sum()
299    }
300
301    /// Returns the number of solutions.
302    pub fn num_unconfirmed_solutions(&self) -> usize {
303        self.workers.iter().map(|worker| worker.num_solutions()).sum()
304    }
305
306    /// Returns the number of unconfirmed transactions.
307    pub fn num_unconfirmed_transactions(&self) -> usize {
308        self.workers.iter().map(|worker| worker.num_transactions()).sum()
309    }
310}
311
312impl<N: Network> Primary<N> {
313    /// Returns the worker transmission IDs.
314    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
315        self.workers.iter().flat_map(|worker| worker.transmission_ids())
316    }
317
318    /// Returns the worker transmissions.
319    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
320        self.workers.iter().flat_map(|worker| worker.transmissions())
321    }
322
323    /// Returns the worker solutions.
324    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
325        self.workers.iter().flat_map(|worker| worker.solutions())
326    }
327
328    /// Returns the worker transactions.
329    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
330        self.workers.iter().flat_map(|worker| worker.transactions())
331    }
332}
333
334impl<N: Network> Primary<N> {
335    /// Clears the worker solutions.
336    pub fn clear_worker_solutions(&self) {
337        self.workers.iter().for_each(Worker::clear_solutions);
338    }
339}
340
341impl<N: Network> Primary<N> {
342    /// Proposes the batch for the current round.
343    ///
344    /// This method performs the following steps:
345    /// 1. Drain the workers.
346    /// 2. Sign the batch.
347    /// 3. Set the batch proposal in the primary.
348    /// 4. Broadcast the batch header to all validators for signing.
349    pub async fn propose_batch(&self) -> Result<()> {
350        // This function isn't re-entrant.
351        let mut lock_guard = self.propose_lock.lock().await;
352
353        // Check if the proposed batch has expired, and clear it if it has expired.
354        if let Err(e) = self.check_proposed_batch_for_expiration().await {
355            warn!("Failed to check the proposed batch for expiration - {e}");
356            return Ok(());
357        }
358
359        // Retrieve the current round.
360        let round = self.current_round();
361        // Compute the previous round.
362        let previous_round = round.saturating_sub(1);
363
364        // If the current round is 0, return early.
365        ensure!(round > 0, "Round 0 cannot have transaction batches");
366
367        // If the current storage round is below the latest proposal round, then return early.
368        if round < *lock_guard {
369            warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
370            return Ok(());
371        }
372
373        // If there is a batch being proposed already,
374        // rebroadcast the batch header to the non-signers, and return early.
375        if let Some(proposal) = self.proposed_batch.read().as_ref() {
376            // Ensure that the storage is caught up to the proposal before proceeding to rebroadcast this.
377            if round < proposal.round()
378                || proposal
379                    .batch_header()
380                    .previous_certificate_ids()
381                    .iter()
382                    .any(|id| !self.storage.contains_certificate(*id))
383            {
384                warn!(
385                    "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
386                    proposal.round(),
387                );
388                return Ok(());
389            }
390            // Construct the event.
391            // TODO(ljedrz): the BatchHeader should be serialized only once in advance before being sent to non-signers.
392            let event = Event::BatchPropose(proposal.batch_header().clone().into());
393            // Iterate through the non-signers.
394            for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
395                // Resolve the address to the peer IP.
396                match self.gateway.resolver().get_peer_ip_for_address(address) {
397                    // Resend the batch proposal to the validator for signing.
398                    Some(peer_ip) => {
399                        let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
400                        tokio::spawn(async move {
401                            debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
402                            // Resend the batch proposal to the peer.
403                            if gateway.send(peer_ip, event_).await.is_none() {
404                                warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
405                            }
406                        });
407                    }
408                    None => continue,
409                }
410            }
411            debug!("Proposed batch for round {} is still valid", proposal.round());
412            return Ok(());
413        }
414
415        #[cfg(feature = "metrics")]
416        metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
417
418        // Ensure that the primary does not create a new proposal too quickly.
419        if let Err(e) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) {
420            debug!("Primary is safely skipping a batch proposal for round {round} - {}", format!("{e}").dimmed());
421            return Ok(());
422        }
423
424        // Ensure the primary has not proposed a batch for this round before.
425        if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
426            // If a BFT sender was provided, attempt to advance the current round.
427            if let Some(bft_sender) = self.bft_sender.get() {
428                match bft_sender.send_primary_round_to_bft(self.current_round()).await {
429                    // 'is_ready' is true if the primary is ready to propose a batch for the next round.
430                    Ok(true) => (), // continue,
431                    // 'is_ready' is false if the primary is not ready to propose a batch for the next round.
432                    Ok(false) => return Ok(()),
433                    // An error occurred while attempting to advance the current round.
434                    Err(e) => {
435                        warn!("Failed to update the BFT to the next round - {e}");
436                        return Err(e);
437                    }
438                }
439            }
440            debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
441            return Ok(());
442        }
443
444        // Determine if the current round has been proposed.
445        // Note: Do NOT make this judgment in advance before rebroadcast and round update. Rebroadcasting is
446        // good for network reliability and should not be prevented for the already existing proposed_batch.
447        // If a certificate already exists for the current round, an attempt should be made to advance the
448        // round as early as possible.
449        if round == *lock_guard {
450            warn!("Primary is safely skipping a batch proposal - round {round} already proposed");
451            return Ok(());
452        }
453
454        // Retrieve the committee to check against.
455        let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
456        // Check if the primary is connected to enough validators to reach quorum threshold.
457        {
458            // Retrieve the connected validator addresses.
459            let mut connected_validators = self.gateway.connected_addresses();
460            // Append the primary to the set.
461            connected_validators.insert(self.gateway.account().address());
462            // If quorum threshold is not reached, return early.
463            if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
464                debug!(
465                    "Primary is safely skipping a batch proposal for round {round} {}",
466                    "(please connect to more validators)".dimmed()
467                );
468                trace!("Primary is connected to {} validators", connected_validators.len() - 1);
469                return Ok(());
470            }
471        }
472
473        // Retrieve the previous certificates.
474        let previous_certificates = self.storage.get_certificates_for_round(previous_round);
475
476        // Check if the batch is ready to be proposed.
477        // Note: The primary starts at round 1, and round 0 contains no certificates, by definition.
478        let mut is_ready = previous_round == 0;
479        // If the previous round is not 0, check if the previous certificates have reached the quorum threshold.
480        if previous_round > 0 {
481            // Retrieve the committee lookback for the round.
482            let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
483                bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
484            };
485            // Construct a set over the authors.
486            let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
487            // Check if the previous certificates have reached the quorum threshold.
488            if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
489                is_ready = true;
490            }
491        }
492        // If the batch is not ready to be proposed, return early.
493        if !is_ready {
494            debug!(
495                "Primary is safely skipping a batch proposal for round {round} {}",
496                format!("(previous round {previous_round} has not reached quorum)").dimmed()
497            );
498            return Ok(());
499        }
500
501        // Initialize the map of transmissions.
502        let mut transmissions: IndexMap<_, _> = Default::default();
503        // Track the total execution costs of the batch proposal as it is being constructed.
504        let mut proposal_cost = 0u64;
505        // Note: worker draining and transaction inclusion needs to be thought
506        // through carefully when there is more than one worker. The fairness
507        // provided by one worker (FIFO) is no longer guaranteed with multiple workers.
508        debug_assert_eq!(MAX_WORKERS, 1);
509
510        'outer: for worker in self.workers().iter() {
511            let mut num_worker_transmissions = 0usize;
512
513            while let Some((id, transmission)) = worker.remove_front() {
514                // Check the selected transmissions are below the batch limit.
515                if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
516                    break 'outer;
517                }
518
519                // Check the max transmissions per worker is not exceeded.
520                if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER {
521                    continue 'outer;
522                }
523
524                // Check if the ledger already contains the transmission.
525                if self.ledger.contains_transmission(&id).unwrap_or(true) {
526                    trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
527                    continue;
528                }
529
530                // Check if the storage already contain the transmission.
531                // Note: We do not skip if this is the first transmission in the proposal, to ensure that
532                // the primary does not propose a batch with no transmissions.
533                if !transmissions.is_empty() && self.storage.contains_transmission(id) {
534                    trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
535                    continue;
536                }
537
538                // Check the transmission is still valid.
539                match (id, transmission.clone()) {
540                    (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
541                        // Ensure the checksum matches. If not, skip the solution.
542                        if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum)
543                        {
544                            trace!("Proposing - Skipping solution '{}' - Checksum mismatch", fmt_id(solution_id));
545                            continue;
546                        }
547                        // Check if the solution is still valid.
548                        if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
549                            trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
550                            continue;
551                        }
552                    }
553                    (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
554                        // Ensure the checksum matches. If not, skip the transaction.
555                        if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum )
556                        {
557                            trace!("Proposing - Skipping transaction '{}' - Checksum mismatch", fmt_id(transaction_id));
558                            continue;
559                        }
560
561                        // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
562                        let transaction = spawn_blocking!({
563                            match transaction {
564                                Data::Object(transaction) => Ok(transaction),
565                                Data::Buffer(bytes) => {
566                                    Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
567                                }
568                            }
569                        })?;
570
571                        // Check if the transaction is still valid.
572                        // TODO: check if clone is cheap, otherwise fix.
573                        if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction.clone()).await {
574                            trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
575                            continue;
576                        }
577
578                        // Compute the transaction spent cost (in microcredits).
579                        // Note: We purposefully discard this transaction if we are unable to compute the spent cost.
580                        let Ok(cost) = self.ledger.transaction_spent_cost_in_microcredits(transaction_id, transaction)
581                        else {
582                            debug!(
583                                "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost",
584                                fmt_id(transaction_id)
585                            );
586                            continue;
587                        };
588
589                        // Compute the next proposal cost.
590                        // Note: We purposefully discard this transaction if the proposal cost overflows.
591                        let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
592                            debug!(
593                                "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed",
594                                fmt_id(transaction_id)
595                            );
596                            continue;
597                        };
598
599                        // Check if the next proposal cost exceeds the batch proposal spend limit.
600                        if next_proposal_cost > BatchHeader::<N>::BATCH_SPEND_LIMIT {
601                            trace!(
602                                "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})",
603                                fmt_id(transaction_id),
604                                BatchHeader::<N>::BATCH_SPEND_LIMIT
605                            );
606
607                            // Reinsert the transmission into the worker.
608                            worker.insert_front(id, transmission);
609                            break 'outer;
610                        }
611
612                        // Update the proposal cost.
613                        proposal_cost = next_proposal_cost;
614                    }
615
616                    // Note: We explicitly forbid including ratifications,
617                    // as the protocol currently does not support ratifications.
618                    (TransmissionID::Ratification, Transmission::Ratification) => continue,
619                    // All other combinations are clearly invalid.
620                    _ => continue,
621                }
622
623                // If the transmission is valid, insert it into the proposal's transmission list.
624                transmissions.insert(id, transmission);
625                num_worker_transmissions = num_worker_transmissions.saturating_add(1);
626            }
627        }
628
629        // Determine the current timestamp.
630        let current_timestamp = now();
631
632        *lock_guard = round;
633
634        /* Proceeding to sign & propose the batch. */
635        info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
636
637        // Retrieve the private key.
638        let private_key = *self.gateway.account().private_key();
639        // Retrieve the committee ID.
640        let committee_id = committee_lookback.id();
641        // Prepare the transmission IDs.
642        let transmission_ids = transmissions.keys().copied().collect();
643        // Prepare the previous batch certificate IDs.
644        let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
645        // Sign the batch header and construct the proposal.
646        let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
647            &private_key,
648            round,
649            current_timestamp,
650            committee_id,
651            transmission_ids,
652            previous_certificate_ids,
653            &mut rand::thread_rng()
654        ))
655        .and_then(|batch_header| {
656            Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
657                .map(|proposal| (batch_header, proposal))
658        })
659        .inspect_err(|_| {
660            // On error, reinsert the transmissions and then propagate the error.
661            if let Err(e) = self.reinsert_transmissions_into_workers(transmissions) {
662                error!("Failed to reinsert transmissions: {e:?}");
663            }
664        })?;
665        // Broadcast the batch to all validators for signing.
666        self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
667        // Set the timestamp of the latest proposed batch.
668        *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
669        // Set the proposed batch.
670        *self.proposed_batch.write() = Some(proposal);
671        Ok(())
672    }
673
674    /// Processes a batch propose from a peer.
675    ///
676    /// This method performs the following steps:
677    /// 1. Verify the batch.
678    /// 2. Sign the batch.
679    /// 3. Broadcast the signature back to the validator.
680    ///
681    /// If our primary is ahead of the peer, we will not sign the batch.
682    /// If our primary is behind the peer, but within GC range, we will sync up to the peer's round, and then sign the batch.
683    async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
684        let BatchPropose { round: batch_round, batch_header } = batch_propose;
685
686        // Deserialize the batch header.
687        let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
688        // Ensure the round matches in the batch header.
689        if batch_round != batch_header.round() {
690            // Proceed to disconnect the validator.
691            self.gateway.disconnect(peer_ip);
692            bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
693        }
694
695        // Retrieve the batch author.
696        let batch_author = batch_header.author();
697
698        // Ensure the batch proposal is from the validator.
699        match self.gateway.resolver().get_address(peer_ip) {
700            // If the peer is a validator, then ensure the batch proposal is from the validator.
701            Some(address) => {
702                if address != batch_author {
703                    // Proceed to disconnect the validator.
704                    self.gateway.disconnect(peer_ip);
705                    bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
706                }
707            }
708            None => bail!("Batch proposal from a disconnected validator"),
709        }
710        // Ensure the batch author is a current committee member.
711        if !self.gateway.is_authorized_validator_address(batch_author) {
712            // Proceed to disconnect the validator.
713            self.gateway.disconnect(peer_ip);
714            bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
715        }
716        // Ensure the batch proposal is not from the current primary.
717        if self.gateway.account().address() == batch_author {
718            bail!("Invalid peer - proposed batch from myself ({batch_author})");
719        }
720
721        // Ensure that the batch proposal's committee ID matches the expected committee ID.
722        let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
723        if expected_committee_id != batch_header.committee_id() {
724            // Proceed to disconnect the validator.
725            self.gateway.disconnect(peer_ip);
726            bail!(
727                "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
728                batch_header.committee_id()
729            );
730        }
731
732        // Retrieve the cached round and batch ID for this validator.
733        if let Some((signed_round, signed_batch_id, signature)) =
734            self.signed_proposals.read().get(&batch_author).copied()
735        {
736            // If the signed round is ahead of the peer's batch round, do not sign the proposal.
737            // Note: while this may be valid behaviour, additional formal analysis and testing will need to be done before allowing it.
738            if signed_round > batch_header.round() {
739                bail!(
740                    "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}",
741                    batch_header.round()
742                );
743            }
744
745            // If the round matches and the batch ID differs, then the validator is malicious.
746            if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
747                bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
748            }
749            // If the round and batch ID matches, then skip signing the batch a second time.
750            // Instead, rebroadcast the cached signature to the peer.
751            if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
752                let gateway = self.gateway.clone();
753                tokio::spawn(async move {
754                    debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
755                    let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
756                    // Resend the batch signature to the peer.
757                    if gateway.send(peer_ip, event).await.is_none() {
758                        warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
759                    }
760                });
761                // Return early.
762                return Ok(());
763            }
764        }
765
766        // Ensure that the batch header doesn't already exist in storage.
767        // Note this is already checked in `check_batch_header`, however we can return early here without creating a blocking task.
768        if self.storage.contains_batch(batch_header.batch_id()) {
769            debug!(
770                "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
771                format!("batch for round {batch_round} already exists in storage").dimmed()
772            );
773            return Ok(());
774        }
775
776        // Compute the previous round.
777        let previous_round = batch_round.saturating_sub(1);
778        // Ensure that the peer did not propose a batch too quickly.
779        if let Err(e) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
780            // Proceed to disconnect the validator.
781            self.gateway.disconnect(peer_ip);
782            bail!("Malicious peer - {e} from '{peer_ip}'");
783        }
784
785        // Ensure the batch header does not contain any ratifications.
786        if batch_header.contains(TransmissionID::Ratification) {
787            // Proceed to disconnect the validator.
788            self.gateway.disconnect(peer_ip);
789            bail!(
790                "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'",
791            );
792        }
793
794        // If the peer is ahead, use the batch header to sync up to the peer.
795        let mut missing_transmissions = self.sync_with_batch_header_from_peer::<false>(peer_ip, &batch_header).await?;
796
797        // Check that the transmission ids match and are not fee transactions.
798        if let Err(err) = cfg_iter_mut!(missing_transmissions).try_for_each(|(transmission_id, transmission)| {
799            // If the transmission is not well-formed, then return early.
800            self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
801        }) {
802            debug!("Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission - {err}",);
803            return Ok(());
804        }
805
806        // Ensure the batch is for the current round.
807        // This method must be called after fetching previous certificates (above),
808        // and prior to checking the batch header (below).
809        if let Err(e) = self.ensure_is_signing_round(batch_round) {
810            // If the primary is not signing for the peer's round, then return early.
811            debug!("{e} from '{peer_ip}'");
812            return Ok(());
813        }
814
815        // Ensure the batch header from the peer is valid.
816        let (storage, header) = (self.storage.clone(), batch_header.clone());
817        let missing_transmissions =
818            spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?;
819        // Inserts the missing transmissions into the workers.
820        self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
821
822        // Ensure the transaction doesn't bring the proposal above the spend limit.
823        let block_height = self.ledger.latest_block_height() + 1;
824        if N::CONSENSUS_VERSION(block_height)? >= ConsensusVersion::V5 {
825            let mut proposal_cost = 0u64;
826            for transmission_id in batch_header.transmission_ids() {
827                let worker_id = assign_to_worker(*transmission_id, self.num_workers())?;
828                let Some(worker) = self.workers.get(worker_id as usize) else {
829                    debug!("Unable to find worker {worker_id}");
830                    return Ok(());
831                };
832
833                let Some(transmission) = worker.get_transmission(*transmission_id) else {
834                    debug!("Unable to find transmission '{}' in worker '{worker_id}", fmt_id(transmission_id));
835                    return Ok(());
836                };
837
838                // If the transmission is a transaction, compute its execution cost.
839                if let (TransmissionID::Transaction(transaction_id, _), Transmission::Transaction(transaction)) =
840                    (transmission_id, transmission)
841                {
842                    // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
843                    let transaction = spawn_blocking!({
844                        match transaction {
845                            Data::Object(transaction) => Ok(transaction),
846                            Data::Buffer(bytes) => {
847                                Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
848                            }
849                        }
850                    })?;
851
852                    // Compute the transaction spent cost (in microcredits).
853                    // Note: We purposefully discard this transaction if we are unable to compute the spent cost.
854                    let Ok(cost) = self.ledger.transaction_spent_cost_in_microcredits(*transaction_id, transaction)
855                    else {
856                        bail!(
857                            "Invalid batch proposal - Unable to compute transaction spent cost on transaction '{}'",
858                            fmt_id(transaction_id)
859                        )
860                    };
861
862                    // Compute the next proposal cost.
863                    // Note: We purposefully discard this transaction if the proposal cost overflows.
864                    let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
865                        bail!(
866                            "Invalid batch proposal - Batch proposal overflowed on transaction '{}'",
867                            fmt_id(transaction_id)
868                        )
869                    };
870
871                    // Check if the next proposal cost exceeds the batch proposal spend limit.
872                    if next_proposal_cost > BatchHeader::<N>::BATCH_SPEND_LIMIT {
873                        bail!(
874                            "Malicious peer - Batch proposal from '{peer_ip}' exceeds the spend limit on transaction '{}' ({next_proposal_cost} > {})",
875                            fmt_id(transaction_id),
876                            BatchHeader::<N>::BATCH_SPEND_LIMIT
877                        );
878                    }
879
880                    // Update the proposal cost.
881                    proposal_cost = next_proposal_cost;
882                }
883            }
884        }
885
886        /* Proceeding to sign the batch. */
887
888        // Retrieve the batch ID.
889        let batch_id = batch_header.batch_id();
890        // Sign the batch ID.
891        let account = self.gateway.account().clone();
892        let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::thread_rng()))?;
893
894        // Ensure the proposal has not already been signed.
895        //
896        // Note: Due to the need to sync the batch header with the peer, it is possible
897        // for the primary to receive the same 'BatchPropose' event again, whereby only
898        // one instance of this handler should sign the batch. This check guarantees this.
899        match self.signed_proposals.write().0.entry(batch_author) {
900            std::collections::hash_map::Entry::Occupied(mut entry) => {
901                // If the validator has already signed a batch for this round, then return early,
902                // since, if the peer still has not received the signature, they will request it again,
903                // and the logic at the start of this function will resend the (now cached) signature
904                // to the peer if asked to sign this batch proposal again.
905                if entry.get().0 == batch_round {
906                    return Ok(());
907                }
908                // Otherwise, cache the round, batch ID, and signature for this validator.
909                entry.insert((batch_round, batch_id, signature));
910            }
911            // If the validator has not signed a batch before, then continue.
912            std::collections::hash_map::Entry::Vacant(entry) => {
913                // Cache the round, batch ID, and signature for this validator.
914                entry.insert((batch_round, batch_id, signature));
915            }
916        };
917
918        // Broadcast the signature back to the validator.
919        let self_ = self.clone();
920        tokio::spawn(async move {
921            let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
922            // Send the batch signature to the peer.
923            if self_.gateway.send(peer_ip, event).await.is_some() {
924                debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
925            }
926        });
927
928        Ok(())
929    }
930
931    /// Processes a batch signature from a peer.
932    ///
933    /// This method performs the following steps:
934    /// 1. Ensure the proposed batch has not expired.
935    /// 2. Verify the signature, ensuring it corresponds to the proposed batch.
936    /// 3. Store the signature.
937    /// 4. Certify the batch if enough signatures have been received.
938    /// 5. Broadcast the batch certificate to all validators.
939    async fn process_batch_signature_from_peer(
940        &self,
941        peer_ip: SocketAddr,
942        batch_signature: BatchSignature<N>,
943    ) -> Result<()> {
944        // Ensure the proposed batch has not expired, and clear the proposed batch if it has expired.
945        self.check_proposed_batch_for_expiration().await?;
946
947        // Retrieve the signature and timestamp.
948        let BatchSignature { batch_id, signature } = batch_signature;
949
950        // Retrieve the signer.
951        let signer = signature.to_address();
952
953        // Ensure the batch signature is signed by the validator.
954        if self.gateway.resolver().get_address(peer_ip).map_or(true, |address| address != signer) {
955            // Proceed to disconnect the validator.
956            self.gateway.disconnect(peer_ip);
957            bail!("Malicious peer - batch signature is from a different validator ({signer})");
958        }
959        // Ensure the batch signature is not from the current primary.
960        if self.gateway.account().address() == signer {
961            bail!("Invalid peer - received a batch signature from myself ({signer})");
962        }
963
964        let self_ = self.clone();
965        let Some(proposal) = spawn_blocking!({
966            // Acquire the write lock.
967            let mut proposed_batch = self_.proposed_batch.write();
968            // Add the signature to the batch, and determine if the batch is ready to be certified.
969            match proposed_batch.as_mut() {
970                Some(proposal) => {
971                    // Ensure the batch ID matches the currently proposed batch ID.
972                    if proposal.batch_id() != batch_id {
973                        match self_.storage.contains_batch(batch_id) {
974                            // If this batch was already certified, return early.
975                            true => {
976                                debug!(
977                                    "Primary is safely skipping a a batch signature from {peer_ip} for round {} - batch is already certified",
978                                    proposal.round()
979                                );
980                                return Ok(None);
981                            }
982                            // If the batch ID is unknown, return an error.
983                            false => bail!(
984                                "Unknown batch ID '{batch_id}', expected '{}' for round {}",
985                                proposal.batch_id(),
986                                proposal.round()
987                            ),
988                        }
989                    }
990                    // Retrieve the committee lookback for the round.
991                    let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?;
992                    // Retrieve the address of the validator.
993                    let Some(signer) = self_.gateway.resolver().get_address(peer_ip) else {
994                        bail!("Signature is from a disconnected validator");
995                    };
996                    // Add the signature to the batch.
997                    proposal.add_signature(signer, signature, &committee_lookback)?;
998                    info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
999                    // Check if the batch is ready to be certified.
1000                    if !proposal.is_quorum_threshold_reached(&committee_lookback) {
1001                        // If the batch is not ready to be certified, return early.
1002                        return Ok(None);
1003                    }
1004                }
1005                // There is no proposed batch, so return early.
1006                None => return Ok(None),
1007            };
1008            // Retrieve the batch proposal, clearing the proposed batch.
1009            match proposed_batch.take() {
1010                Some(proposal) => Ok(Some(proposal)),
1011                None => Ok(None),
1012            }
1013        })?
1014        else {
1015            return Ok(());
1016        };
1017
1018        /* Proceeding to certify the batch. */
1019
1020        info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
1021
1022        // Retrieve the committee lookback for the round.
1023        let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
1024        // Store the certified batch and broadcast it to all validators.
1025        // If there was an error storing the certificate, reinsert the transmissions back into the ready queue.
1026        if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
1027            // Reinsert the transmissions back into the ready queue for the next proposal.
1028            self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1029            return Err(e);
1030        }
1031
1032        #[cfg(feature = "metrics")]
1033        metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
1034        Ok(())
1035    }
1036
1037    /// Processes a batch certificate from a peer.
1038    ///
1039    /// This method performs the following steps:
1040    /// 1. Stores the given batch certificate, after ensuring it is valid.
1041    /// 2. If there are enough certificates to reach quorum threshold for the current round,
1042    ///     then proceed to advance to the next round.
1043    async fn process_batch_certificate_from_peer(
1044        &self,
1045        peer_ip: SocketAddr,
1046        certificate: BatchCertificate<N>,
1047    ) -> Result<()> {
1048        // Ensure the batch certificate is from an authorized validator.
1049        if !self.gateway.is_authorized_validator_ip(peer_ip) {
1050            // Proceed to disconnect the validator.
1051            self.gateway.disconnect(peer_ip);
1052            bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
1053        }
1054        // Ensure storage does not already contain the certificate.
1055        if self.storage.contains_certificate(certificate.id()) {
1056            return Ok(());
1057        // Otherwise, ensure ephemeral storage contains the certificate.
1058        } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
1059            self.storage.insert_unprocessed_certificate(certificate.clone())?;
1060        }
1061
1062        // Retrieve the batch certificate author.
1063        let author = certificate.author();
1064        // Retrieve the batch certificate round.
1065        let certificate_round = certificate.round();
1066        // Retrieve the batch certificate committee ID.
1067        let committee_id = certificate.committee_id();
1068
1069        // Ensure the batch certificate is not from the current primary.
1070        if self.gateway.account().address() == author {
1071            bail!("Received a batch certificate for myself ({author})");
1072        }
1073
1074        // Ensure that the incoming certificate is valid.
1075        self.storage.check_incoming_certificate(&certificate)?;
1076
1077        // Store the certificate, after ensuring it is valid above.
1078        // The following call recursively fetches and stores
1079        // the previous certificates referenced from this certificate.
1080        // It is critical to make the following call this after validating the certificate above.
1081        // The reason is that a sequence of malformed certificates,
1082        // with references to previous certificates with non-decreasing rounds,
1083        // cause the recursive fetching of certificates to crash the validator due to resource exhaustion.
1084        // Note that if the following call, if not returning an error, guarantees the backward closure of the DAG
1085        // (i.e. that all the referenced previous certificates are in the DAG before storing this one),
1086        // then all the validity checks in [`Storage::check_certificate`] should be redundant.
1087        // TODO: eliminate those redundant checks
1088        self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
1089
1090        // If there are enough certificates to reach quorum threshold for the certificate round,
1091        // then proceed to advance to the next round.
1092
1093        // Retrieve the committee lookback.
1094        let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
1095
1096        // Retrieve the certificate authors.
1097        let authors = self.storage.get_certificate_authors_for_round(certificate_round);
1098        // Check if the certificates have reached the quorum threshold.
1099        let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
1100
1101        // Ensure that the batch certificate's committee ID matches the expected committee ID.
1102        let expected_committee_id = committee_lookback.id();
1103        if expected_committee_id != committee_id {
1104            // Proceed to disconnect the validator.
1105            self.gateway.disconnect(peer_ip);
1106            bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
1107        }
1108
1109        // Determine if we are currently proposing a round that is relevant.
1110        // Note: This is important, because while our peers have advanced,
1111        // they may not be proposing yet, and thus still able to sign our proposed batch.
1112        let should_advance = match &*self.proposed_batch.read() {
1113            // We advance if the proposal round is less than the current round that was just certified.
1114            Some(proposal) => proposal.round() < certificate_round,
1115            // If there's no proposal, we consider advancing.
1116            None => true,
1117        };
1118
1119        // Retrieve the current round.
1120        let current_round = self.current_round();
1121
1122        // Determine whether to advance to the next round.
1123        if is_quorum && should_advance && certificate_round >= current_round {
1124            // If we have reached the quorum threshold and the round should advance, then proceed to the next round.
1125            self.try_increment_to_the_next_round(current_round + 1).await?;
1126        }
1127        Ok(())
1128    }
1129}
1130
1131impl<N: Network> Primary<N> {
1132    /// Starts the primary handlers.
1133    fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
1134        let PrimaryReceiver {
1135            mut rx_batch_propose,
1136            mut rx_batch_signature,
1137            mut rx_batch_certified,
1138            mut rx_primary_ping,
1139            mut rx_unconfirmed_solution,
1140            mut rx_unconfirmed_transaction,
1141        } = primary_receiver;
1142
1143        // Start the primary ping.
1144        let self_ = self.clone();
1145        self.spawn(async move {
1146            loop {
1147                // Sleep briefly.
1148                tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
1149
1150                // Retrieve the block locators.
1151                let self__ = self_.clone();
1152                let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1153                    Ok(block_locators) => block_locators,
1154                    Err(e) => {
1155                        warn!("Failed to retrieve block locators - {e}");
1156                        continue;
1157                    }
1158                };
1159
1160                // Retrieve the latest certificate of the primary.
1161                let primary_certificate = {
1162                    // Retrieve the primary address.
1163                    let primary_address = self_.gateway.account().address();
1164
1165                    // Iterate backwards from the latest round to find the primary certificate.
1166                    let mut certificate = None;
1167                    let mut current_round = self_.current_round();
1168                    while certificate.is_none() {
1169                        // If the current round is 0, then break the while loop.
1170                        if current_round == 0 {
1171                            break;
1172                        }
1173                        // Retrieve the primary certificates.
1174                        if let Some(primary_certificate) =
1175                            self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1176                        {
1177                            certificate = Some(primary_certificate);
1178                        // If the primary certificate was not found, decrement the round.
1179                        } else {
1180                            current_round = current_round.saturating_sub(1);
1181                        }
1182                    }
1183
1184                    // Determine if the primary certificate was found.
1185                    match certificate {
1186                        Some(certificate) => certificate,
1187                        // Skip this iteration of the loop (do not send a primary ping).
1188                        None => continue,
1189                    }
1190                };
1191
1192                // Construct the primary ping.
1193                let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1194                // Broadcast the event.
1195                self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1196            }
1197        });
1198
1199        // Start the primary ping handler.
1200        let self_ = self.clone();
1201        self.spawn(async move {
1202            while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1203                // If the primary is not synced, then do not process the primary ping.
1204                if !self_.sync.is_synced() {
1205                    trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1206                    continue;
1207                }
1208
1209                // Spawn a task to process the primary certificate.
1210                {
1211                    let self_ = self_.clone();
1212                    tokio::spawn(async move {
1213                        // Deserialize the primary certificate in the primary ping.
1214                        let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1215                        else {
1216                            warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1217                            return;
1218                        };
1219                        // Process the primary certificate.
1220                        let id = fmt_id(primary_certificate.id());
1221                        let round = primary_certificate.round();
1222                        if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1223                            warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}");
1224                        }
1225                    });
1226                }
1227            }
1228        });
1229
1230        // Start the worker ping(s).
1231        let self_ = self.clone();
1232        self.spawn(async move {
1233            loop {
1234                tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await;
1235                // If the primary is not synced, then do not broadcast the worker ping(s).
1236                if !self_.sync.is_synced() {
1237                    trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1238                    continue;
1239                }
1240                // Broadcast the worker ping(s).
1241                for worker in self_.workers.iter() {
1242                    worker.broadcast_ping();
1243                }
1244            }
1245        });
1246
1247        // Start the batch proposer.
1248        let self_ = self.clone();
1249        self.spawn(async move {
1250            loop {
1251                // Sleep briefly, but longer than if there were no batch.
1252                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1253                let current_round = self_.current_round();
1254                // If the primary is not synced, then do not propose a batch.
1255                if !self_.sync.is_synced() {
1256                    debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed());
1257                    continue;
1258                }
1259                // A best-effort attempt to skip the scheduled batch proposal if
1260                // round progression already triggered one.
1261                if self_.propose_lock.try_lock().is_err() {
1262                    trace!(
1263                        "Skipping batch proposal for round {current_round} {}",
1264                        "(node is already proposing)".dimmed()
1265                    );
1266                    continue;
1267                };
1268                // If there is no proposed batch, attempt to propose a batch.
1269                // Note: Do NOT spawn a task around this function call. Proposing a batch is a critical path,
1270                // and only one batch needs be proposed at a time.
1271                if let Err(e) = self_.propose_batch().await {
1272                    warn!("Cannot propose a batch - {e}");
1273                }
1274            }
1275        });
1276
1277        // Process the proposed batch.
1278        let self_ = self.clone();
1279        self.spawn(async move {
1280            while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1281                // If the primary is not synced, then do not sign the batch.
1282                if !self_.sync.is_synced() {
1283                    trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1284                    continue;
1285                }
1286                // Spawn a task to process the proposed batch.
1287                let self_ = self_.clone();
1288                tokio::spawn(async move {
1289                    // Process the batch proposal.
1290                    let round = batch_propose.round;
1291                    if let Err(e) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1292                        warn!("Cannot sign a batch at round {round} from '{peer_ip}' - {e}");
1293                    }
1294                });
1295            }
1296        });
1297
1298        // Process the batch signature.
1299        let self_ = self.clone();
1300        self.spawn(async move {
1301            while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1302                // If the primary is not synced, then do not store the signature.
1303                if !self_.sync.is_synced() {
1304                    trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1305                    continue;
1306                }
1307                // Process the batch signature.
1308                // Note: Do NOT spawn a task around this function call. Processing signatures from peers
1309                // is a critical path, and we should only store the minimum required number of signatures.
1310                // In addition, spawning a task can cause concurrent processing of signatures (even with a lock),
1311                // which means the RwLock for the proposed batch must become a 'tokio::sync' to be safe.
1312                let id = fmt_id(batch_signature.batch_id);
1313                if let Err(e) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1314                    warn!("Cannot store a signature for batch '{id}' from '{peer_ip}' - {e}");
1315                }
1316            }
1317        });
1318
1319        // Process the certified batch.
1320        let self_ = self.clone();
1321        self.spawn(async move {
1322            while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1323                // If the primary is not synced, then do not store the certificate.
1324                if !self_.sync.is_synced() {
1325                    trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1326                    continue;
1327                }
1328                // Spawn a task to process the batch certificate.
1329                let self_ = self_.clone();
1330                tokio::spawn(async move {
1331                    // Deserialize the batch certificate.
1332                    let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1333                        warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1334                        return;
1335                    };
1336                    // Process the batch certificate.
1337                    let id = fmt_id(batch_certificate.id());
1338                    let round = batch_certificate.round();
1339                    if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1340                        warn!("Cannot store a certificate '{id}' for round {round} from '{peer_ip}' - {e}");
1341                    }
1342                });
1343            }
1344        });
1345
1346        // Periodically try to increment to the next round.
1347        // Note: This is necessary to ensure that the primary is not stuck on a previous round
1348        // despite having received enough certificates to advance to the next round.
1349        let self_ = self.clone();
1350        self.spawn(async move {
1351            loop {
1352                // Sleep briefly.
1353                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1354                // If the primary is not synced, then do not increment to the next round.
1355                if !self_.sync.is_synced() {
1356                    trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1357                    continue;
1358                }
1359                // Attempt to increment to the next round.
1360                let next_round = self_.current_round().saturating_add(1);
1361                // Determine if the quorum threshold is reached for the current round.
1362                let is_quorum_threshold_reached = {
1363                    // Retrieve the certificate authors for the next round.
1364                    let authors = self_.storage.get_certificate_authors_for_round(next_round);
1365                    // If there are no certificates, then skip this check.
1366                    if authors.is_empty() {
1367                        continue;
1368                    }
1369                    let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(next_round) else {
1370                        warn!("Failed to retrieve the committee lookback for round {next_round}");
1371                        continue;
1372                    };
1373                    committee_lookback.is_quorum_threshold_reached(&authors)
1374                };
1375                // Attempt to increment to the next round if the quorum threshold is reached.
1376                if is_quorum_threshold_reached {
1377                    debug!("Quorum threshold reached for round {}", next_round);
1378                    if let Err(e) = self_.try_increment_to_the_next_round(next_round).await {
1379                        warn!("Failed to increment to the next round - {e}");
1380                    }
1381                }
1382            }
1383        });
1384
1385        // Process the unconfirmed solutions.
1386        let self_ = self.clone();
1387        self.spawn(async move {
1388            while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1389                // Compute the checksum for the solution.
1390                let Ok(checksum) = solution.to_checksum::<N>() else {
1391                    error!("Failed to compute the checksum for the unconfirmed solution");
1392                    continue;
1393                };
1394                // Compute the worker ID.
1395                let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1396                    error!("Unable to determine the worker ID for the unconfirmed solution");
1397                    continue;
1398                };
1399                let self_ = self_.clone();
1400                tokio::spawn(async move {
1401                    // Retrieve the worker.
1402                    let worker = &self_.workers[worker_id as usize];
1403                    // Process the unconfirmed solution.
1404                    let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1405                    // Send the result to the callback.
1406                    callback.send(result).ok();
1407                });
1408            }
1409        });
1410
1411        // Process the unconfirmed transactions.
1412        let self_ = self.clone();
1413        self.spawn(async move {
1414            while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1415                trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1416                // Compute the checksum for the transaction.
1417                let Ok(checksum) = transaction.to_checksum::<N>() else {
1418                    error!("Failed to compute the checksum for the unconfirmed transaction");
1419                    continue;
1420                };
1421                // Compute the worker ID.
1422                let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1423                    error!("Unable to determine the worker ID for the unconfirmed transaction");
1424                    continue;
1425                };
1426                let self_ = self_.clone();
1427                tokio::spawn(async move {
1428                    // Retrieve the worker.
1429                    let worker = &self_.workers[worker_id as usize];
1430                    // Process the unconfirmed transaction.
1431                    let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1432                    // Send the result to the callback.
1433                    callback.send(result).ok();
1434                });
1435            }
1436        });
1437    }
1438
1439    /// Checks if the proposed batch is expired, and clears the proposed batch if it has expired.
1440    async fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1441        // Check if the proposed batch is timed out or stale.
1442        let is_expired = match self.proposed_batch.read().as_ref() {
1443            Some(proposal) => proposal.round() < self.current_round(),
1444            None => false,
1445        };
1446        // If the batch is expired, clear the proposed batch.
1447        if is_expired {
1448            // Reset the proposed batch.
1449            let proposal = self.proposed_batch.write().take();
1450            if let Some(proposal) = proposal {
1451                debug!("Cleared expired proposal for round {}", proposal.round());
1452                self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1453            }
1454        }
1455        Ok(())
1456    }
1457
1458    /// Increments to the next round.
1459    async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1460        // If the next round is within GC range, then iterate to the penultimate round.
1461        if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1462            let mut fast_forward_round = self.current_round();
1463            // Iterate until the penultimate round is reached.
1464            while fast_forward_round < next_round.saturating_sub(1) {
1465                // Update to the next round in storage.
1466                fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1467                // Clear the proposed batch.
1468                *self.proposed_batch.write() = None;
1469            }
1470        }
1471
1472        // Retrieve the current round.
1473        let current_round = self.current_round();
1474        // Attempt to advance to the next round.
1475        if current_round < next_round {
1476            // If a BFT sender was provided, send the current round to the BFT.
1477            let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
1478                match bft_sender.send_primary_round_to_bft(current_round).await {
1479                    Ok(is_ready) => is_ready,
1480                    Err(e) => {
1481                        warn!("Failed to update the BFT to the next round - {e}");
1482                        return Err(e);
1483                    }
1484                }
1485            }
1486            // Otherwise, handle the Narwhal case.
1487            else {
1488                // Update to the next round in storage.
1489                self.storage.increment_to_next_round(current_round)?;
1490                // Set 'is_ready' to 'true'.
1491                true
1492            };
1493
1494            // Log whether the next round is ready.
1495            match is_ready {
1496                true => debug!("Primary is ready to propose the next round"),
1497                false => debug!("Primary is not ready to propose the next round"),
1498            }
1499
1500            // If the node is ready, propose a batch for the next round.
1501            if is_ready {
1502                self.propose_batch().await?;
1503            }
1504        }
1505        Ok(())
1506    }
1507
1508    /// Ensures the primary is signing for the specified batch round.
1509    /// This method is used to ensure: for a given round, as soon as the primary starts proposing,
1510    /// it will no longer sign for the previous round (as it has enough previous certificates to proceed).
1511    fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1512        // Retrieve the current round.
1513        let current_round = self.current_round();
1514        // Ensure the batch round is within GC range of the current round.
1515        if current_round + self.storage.max_gc_rounds() <= batch_round {
1516            bail!("Round {batch_round} is too far in the future")
1517        }
1518        // Ensure the batch round is at or one before the current round.
1519        // Intuition: Our primary has moved on to the next round, but has not necessarily started proposing,
1520        // so we can still sign for the previous round. If we have started proposing, the next check will fail.
1521        if current_round > batch_round + 1 {
1522            bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1523        }
1524        // Check if the primary is still signing for the batch round.
1525        if let Some(signing_round) = self.proposed_batch.read().as_ref().map(|proposal| proposal.round()) {
1526            if signing_round > batch_round {
1527                bail!("Our primary at round {signing_round} is no longer signing for round {batch_round}")
1528            }
1529        }
1530        Ok(())
1531    }
1532
1533    /// Ensure the primary is not creating batch proposals too frequently.
1534    /// This checks that the certificate timestamp for the previous round is within the expected range.
1535    fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1536        // Retrieve the timestamp of the previous timestamp to check against.
1537        let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1538            // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
1539            Some(certificate) => certificate.timestamp(),
1540            None => match self.gateway.account().address() == author {
1541                // If we are the author, then ensure the previous proposal was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
1542                true => *self.latest_proposed_batch_timestamp.read(),
1543                // If we do not see a previous certificate for the author, then proceed optimistically.
1544                false => return Ok(()),
1545            },
1546        };
1547
1548        // Determine the elapsed time since the previous timestamp.
1549        let elapsed = timestamp
1550            .checked_sub(previous_timestamp)
1551            .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1552        // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
1553        match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1554            true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1555            false => Ok(()),
1556        }
1557    }
1558
1559    /// Stores the certified batch and broadcasts it to all validators, returning the certificate.
1560    async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1561        // Create the batch certificate and transmissions.
1562        let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1563        // Convert the transmissions into a HashMap.
1564        // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
1565        let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1566        // Store the certified batch.
1567        let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1568        spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1569        debug!("Stored a batch certificate for round {}", certificate.round());
1570        // If a BFT sender was provided, send the certificate to the BFT.
1571        if let Some(bft_sender) = self.bft_sender.get() {
1572            // Await the callback to continue.
1573            if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1574                warn!("Failed to update the BFT DAG from primary - {e}");
1575                return Err(e);
1576            };
1577        }
1578        // Broadcast the certified batch to all validators.
1579        self.gateway.broadcast(Event::BatchCertified(certificate.clone().into()));
1580        // Log the certified batch.
1581        let num_transmissions = certificate.transmission_ids().len();
1582        let round = certificate.round();
1583        info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1584        // Increment to the next round.
1585        self.try_increment_to_the_next_round(round + 1).await
1586    }
1587
1588    /// Inserts the missing transmissions from the proposal into the workers.
1589    fn insert_missing_transmissions_into_workers(
1590        &self,
1591        peer_ip: SocketAddr,
1592        transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1593    ) -> Result<()> {
1594        // Insert the transmissions into the workers.
1595        assign_to_workers(&self.workers, transmissions, |worker, transmission_id, transmission| {
1596            worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1597        })
1598    }
1599
1600    /// Re-inserts the transmissions from the proposal into the workers.
1601    fn reinsert_transmissions_into_workers(
1602        &self,
1603        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1604    ) -> Result<()> {
1605        // Re-insert the transmissions into the workers.
1606        assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1607            worker.reinsert(transmission_id, transmission);
1608        })
1609    }
1610
1611    /// Recursively stores a given batch certificate, after ensuring:
1612    ///   - Ensure the round matches the committee round.
1613    ///   - Ensure the address is a member of the committee.
1614    ///   - Ensure the timestamp is within range.
1615    ///   - Ensure we have all of the transmissions.
1616    ///   - Ensure we have all of the previous certificates.
1617    ///   - Ensure the previous certificates are for the previous round (i.e. round - 1).
1618    ///   - Ensure the previous certificates have reached the quorum threshold.
1619    ///   - Ensure we have not already signed the batch ID.
1620    #[async_recursion::async_recursion]
1621    async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1622        &self,
1623        peer_ip: SocketAddr,
1624        certificate: BatchCertificate<N>,
1625    ) -> Result<()> {
1626        // Retrieve the batch header.
1627        let batch_header = certificate.batch_header();
1628        // Retrieve the batch round.
1629        let batch_round = batch_header.round();
1630
1631        // If the certificate round is outdated, do not store it.
1632        if batch_round <= self.storage.gc_round() {
1633            return Ok(());
1634        }
1635        // If the certificate already exists in storage, return early.
1636        if self.storage.contains_certificate(certificate.id()) {
1637            return Ok(());
1638        }
1639
1640        // If node is not in sync mode and the node is not synced. Then return an error.
1641        if !IS_SYNCING && !self.is_synced() {
1642            bail!(
1643                "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1644                fmt_id(certificate.id())
1645            );
1646        }
1647
1648        // If the peer is ahead, use the batch header to sync up to the peer.
1649        let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?;
1650
1651        // Check if the certificate needs to be stored.
1652        if !self.storage.contains_certificate(certificate.id()) {
1653            // Store the batch certificate.
1654            let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1655            spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1656            debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1657            // If a BFT sender was provided, send the round and certificate to the BFT.
1658            if let Some(bft_sender) = self.bft_sender.get() {
1659                // Send the certificate to the BFT.
1660                if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1661                    warn!("Failed to update the BFT DAG from sync: {e}");
1662                    return Err(e);
1663                };
1664            }
1665        }
1666        Ok(())
1667    }
1668
1669    /// Recursively syncs using the given batch header.
1670    async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>(
1671        &self,
1672        peer_ip: SocketAddr,
1673        batch_header: &BatchHeader<N>,
1674    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1675        // Retrieve the batch round.
1676        let batch_round = batch_header.round();
1677
1678        // If the certificate round is outdated, do not store it.
1679        if batch_round <= self.storage.gc_round() {
1680            bail!("Round {batch_round} is too far in the past")
1681        }
1682
1683        // If node is not in sync mode and the node is not synced. Then return an error.
1684        if !IS_SYNCING && !self.is_synced() {
1685            bail!(
1686                "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1687                fmt_id(batch_header.batch_id())
1688            );
1689        }
1690
1691        // Determine if quorum threshold is reached on the batch round.
1692        let is_quorum_threshold_reached = {
1693            let authors = self.storage.get_certificate_authors_for_round(batch_round);
1694            let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1695            committee_lookback.is_quorum_threshold_reached(&authors)
1696        };
1697
1698        // Check if our primary should move to the next round.
1699        // Note: Checking that quorum threshold is reached is important for mitigating a race condition,
1700        // whereby Narwhal requires N-f, however the BFT only requires f+1. Without this check, the primary
1701        // will advance to the next round assuming f+1, not N-f, which can lead to a network stall.
1702        let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1703        // Check if our primary is far behind the peer.
1704        let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1705        // If our primary is far behind the peer, update our committee to the batch round.
1706        if is_behind_schedule || is_peer_far_in_future {
1707            // If the batch round is greater than the current committee round, update the committee.
1708            self.try_increment_to_the_next_round(batch_round).await?;
1709        }
1710
1711        // Ensure the primary has all of the transmissions.
1712        let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1713
1714        // Ensure the primary has all of the previous certificates.
1715        let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1716
1717        // Wait for the missing transmissions and previous certificates to be fetched.
1718        let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1719            missing_transmissions_handle,
1720            missing_previous_certificates_handle,
1721        ).map_err(|e| {
1722            anyhow!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}' - {e}")
1723        })?;
1724
1725        // Iterate through the missing previous certificates.
1726        for batch_certificate in missing_previous_certificates {
1727            // Store the batch certificate (recursively fetching any missing previous certificates).
1728            self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?;
1729        }
1730        Ok(missing_transmissions)
1731    }
1732
1733    /// Fetches any missing transmissions for the specified batch header.
1734    /// If a transmission does not exist, it will be fetched from the specified peer IP.
1735    async fn fetch_missing_transmissions(
1736        &self,
1737        peer_ip: SocketAddr,
1738        batch_header: &BatchHeader<N>,
1739    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1740        // If the round is <= the GC round, return early.
1741        if batch_header.round() <= self.storage.gc_round() {
1742            return Ok(Default::default());
1743        }
1744
1745        // Ensure this batch ID is new, otherwise return early.
1746        if self.storage.contains_batch(batch_header.batch_id()) {
1747            trace!("Batch for round {} from peer has already been processed", batch_header.round());
1748            return Ok(Default::default());
1749        }
1750
1751        // Retrieve the workers.
1752        let workers = self.workers.clone();
1753
1754        // Initialize a list for the transmissions.
1755        let mut fetch_transmissions = FuturesUnordered::new();
1756
1757        // Retrieve the number of workers.
1758        let num_workers = self.num_workers();
1759        // Iterate through the transmission IDs.
1760        for transmission_id in batch_header.transmission_ids() {
1761            // If the transmission does not exist in storage, proceed to fetch the transmission.
1762            if !self.storage.contains_transmission(*transmission_id) {
1763                // Determine the worker ID.
1764                let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1765                    bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1766                };
1767                // Retrieve the worker.
1768                let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1769                // Push the callback onto the list.
1770                fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
1771            }
1772        }
1773
1774        // Initialize a set for the transmissions.
1775        let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1776        // Wait for all of the transmissions to be fetched.
1777        while let Some(result) = fetch_transmissions.next().await {
1778            // Retrieve the transmission.
1779            let (transmission_id, transmission) = result?;
1780            // Insert the transmission into the set.
1781            transmissions.insert(transmission_id, transmission);
1782        }
1783        // Return the transmissions.
1784        Ok(transmissions)
1785    }
1786
1787    /// Fetches any missing previous certificates for the specified batch header from the specified peer.
1788    async fn fetch_missing_previous_certificates(
1789        &self,
1790        peer_ip: SocketAddr,
1791        batch_header: &BatchHeader<N>,
1792    ) -> Result<HashSet<BatchCertificate<N>>> {
1793        // Retrieve the round.
1794        let round = batch_header.round();
1795        // If the previous round is 0, or is <= the GC round, return early.
1796        if round == 1 || round <= self.storage.gc_round() + 1 {
1797            return Ok(Default::default());
1798        }
1799
1800        // Fetch the missing previous certificates.
1801        let missing_previous_certificates =
1802            self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
1803        if !missing_previous_certificates.is_empty() {
1804            debug!(
1805                "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
1806                missing_previous_certificates.len(),
1807            );
1808        }
1809        // Return the missing previous certificates.
1810        Ok(missing_previous_certificates)
1811    }
1812
1813    /// Fetches any missing certificates for the specified batch header from the specified peer.
1814    async fn fetch_missing_certificates(
1815        &self,
1816        peer_ip: SocketAddr,
1817        round: u64,
1818        certificate_ids: &IndexSet<Field<N>>,
1819    ) -> Result<HashSet<BatchCertificate<N>>> {
1820        // Initialize a list for the missing certificates.
1821        let mut fetch_certificates = FuturesUnordered::new();
1822        // Initialize a set for the missing certificates.
1823        let mut missing_certificates = HashSet::default();
1824        // Iterate through the certificate IDs.
1825        for certificate_id in certificate_ids {
1826            // Check if the certificate already exists in the ledger.
1827            if self.ledger.contains_certificate(certificate_id)? {
1828                continue;
1829            }
1830            // Check if the certificate already exists in storage.
1831            if self.storage.contains_certificate(*certificate_id) {
1832                continue;
1833            }
1834            // If we have not fully processed the certificate yet, store it.
1835            if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
1836                missing_certificates.insert(certificate);
1837            } else {
1838                // If we do not have the certificate, request it.
1839                trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
1840                // TODO (howardwu): Limit the number of open requests we send to a peer.
1841                // Send an certificate request to the peer.
1842                fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
1843            }
1844        }
1845
1846        // If there are no certificates to fetch, return early with the existing unprocessed certificates.
1847        match fetch_certificates.is_empty() {
1848            true => return Ok(missing_certificates),
1849            false => trace!(
1850                "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
1851                fetch_certificates.len(),
1852            ),
1853        }
1854
1855        // Wait for all of the missing certificates to be fetched.
1856        while let Some(result) = fetch_certificates.next().await {
1857            // Insert the missing certificate into the set.
1858            missing_certificates.insert(result?);
1859        }
1860        // Return the missing certificates.
1861        Ok(missing_certificates)
1862    }
1863}
1864
1865impl<N: Network> Primary<N> {
1866    /// Spawns a task with the given future; it should only be used for long-running tasks.
1867    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1868        self.handles.lock().push(tokio::spawn(future));
1869    }
1870
1871    /// Shuts down the primary.
1872    pub async fn shut_down(&self) {
1873        info!("Shutting down the primary...");
1874        // Shut down the workers.
1875        self.workers.iter().for_each(|worker| worker.shut_down());
1876        // Abort the tasks.
1877        self.handles.lock().iter().for_each(|handle| handle.abort());
1878        // Save the current proposal cache to disk.
1879        let proposal_cache = {
1880            let proposal = self.proposed_batch.write().take();
1881            let signed_proposals = self.signed_proposals.read().clone();
1882            let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
1883            let pending_certificates = self.storage.get_pending_certificates();
1884            ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
1885        };
1886        if let Err(err) = proposal_cache.store(&self.storage_mode) {
1887            error!("Failed to store the current proposal cache: {err}");
1888        }
1889        // Close the gateway.
1890        self.gateway.shut_down().await;
1891    }
1892}
1893
1894#[cfg(test)]
1895mod tests {
1896    use super::*;
1897    use snarkos_node_bft_ledger_service::MockLedgerService;
1898    use snarkos_node_bft_storage_service::BFTMemoryService;
1899    use snarkvm::{
1900        ledger::{
1901            committee::{Committee, MIN_VALIDATOR_STAKE},
1902            ledger_test_helpers::sample_execution_transaction_with_fee,
1903        },
1904        prelude::{Address, Signature},
1905    };
1906
1907    use bytes::Bytes;
1908    use indexmap::IndexSet;
1909    use rand::RngCore;
1910
1911    type CurrentNetwork = snarkvm::prelude::MainnetV0;
1912
1913    fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) {
1914        // Create a committee containing the primary's account.
1915        const COMMITTEE_SIZE: usize = 4;
1916        let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
1917        let mut members = IndexMap::new();
1918
1919        for i in 0..COMMITTEE_SIZE {
1920            let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
1921            let account = Account::new(rng).unwrap();
1922
1923            members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
1924            accounts.push((socket_addr, account));
1925        }
1926
1927        (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
1928    }
1929
1930    // Returns a primary and a list of accounts in the configured committee.
1931    fn primary_with_committee(
1932        account_index: usize,
1933        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1934        committee: Committee<CurrentNetwork>,
1935        height: u32,
1936    ) -> Primary<CurrentNetwork> {
1937        let ledger = Arc::new(MockLedgerService::new_at_height(committee, height));
1938        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1939
1940        // Initialize the primary.
1941        let account = accounts[account_index].1.clone();
1942        let mut primary = Primary::new(account, storage, ledger, None, &[], StorageMode::new_test(None)).unwrap();
1943
1944        // Construct a worker instance.
1945        primary.workers = Arc::from([Worker::new(
1946            0, // id
1947            Arc::new(primary.gateway.clone()),
1948            primary.storage.clone(),
1949            primary.ledger.clone(),
1950            primary.proposed_batch.clone(),
1951        )
1952        .unwrap()]);
1953        for a in accounts.iter().skip(account_index) {
1954            primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
1955        }
1956
1957        primary
1958    }
1959
1960    fn primary_without_handlers(
1961        rng: &mut TestRng,
1962    ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
1963        let (accounts, committee) = sample_committee(rng);
1964        let primary = primary_with_committee(
1965            0, // index of primary's account
1966            &accounts,
1967            committee,
1968            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(),
1969        );
1970
1971        (primary, accounts)
1972    }
1973
1974    // Creates a mock solution.
1975    fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
1976        // Sample a random fake solution ID.
1977        let solution_id = rng.gen::<u64>().into();
1978        // Vary the size of the solutions.
1979        let size = rng.gen_range(1024..10 * 1024);
1980        // Sample random fake solution bytes.
1981        let mut vec = vec![0u8; size];
1982        rng.fill_bytes(&mut vec);
1983        let solution = Data::Buffer(Bytes::from(vec));
1984        // Return the solution ID and solution.
1985        (solution_id, solution)
1986    }
1987
1988    // Samples a test transaction.
1989    fn sample_unconfirmed_transaction(
1990        rng: &mut TestRng,
1991    ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
1992        let transaction = sample_execution_transaction_with_fee(false, rng);
1993        let id = transaction.id();
1994
1995        (id, Data::Object(transaction))
1996    }
1997
1998    // Creates a batch proposal with one solution and one transaction.
1999    fn create_test_proposal(
2000        author: &Account<CurrentNetwork>,
2001        committee: Committee<CurrentNetwork>,
2002        round: u64,
2003        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2004        timestamp: i64,
2005        num_transactions: u64,
2006        rng: &mut TestRng,
2007    ) -> Proposal<CurrentNetwork> {
2008        let mut transmission_ids = IndexSet::new();
2009        let mut transmissions = IndexMap::new();
2010
2011        // Prepare the solution and insert into the sets.
2012        let (solution_id, solution) = sample_unconfirmed_solution(rng);
2013        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2014        let solution_transmission_id = (solution_id, solution_checksum).into();
2015        transmission_ids.insert(solution_transmission_id);
2016        transmissions.insert(solution_transmission_id, Transmission::Solution(solution));
2017
2018        // Prepare the transactions and insert into the sets.
2019        for _ in 0..num_transactions {
2020            let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2021            let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2022            let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2023            transmission_ids.insert(transaction_transmission_id);
2024            transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction));
2025        }
2026
2027        // Retrieve the private key.
2028        let private_key = author.private_key();
2029        // Sign the batch header.
2030        let batch_header = BatchHeader::new(
2031            private_key,
2032            round,
2033            timestamp,
2034            committee.id(),
2035            transmission_ids,
2036            previous_certificate_ids,
2037            rng,
2038        )
2039        .unwrap();
2040        // Construct the proposal.
2041        Proposal::new(committee, batch_header, transmissions).unwrap()
2042    }
2043
2044    // Creates a signature of the primary's current proposal for each committee member (excluding
2045    // the primary).
2046    fn peer_signatures_for_proposal(
2047        primary: &Primary<CurrentNetwork>,
2048        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2049        rng: &mut TestRng,
2050    ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
2051        // Each committee member signs the batch.
2052        let mut signatures = Vec::with_capacity(accounts.len() - 1);
2053        for (socket_addr, account) in accounts {
2054            if account.address() == primary.gateway.account().address() {
2055                continue;
2056            }
2057            let batch_id = primary.proposed_batch.read().as_ref().unwrap().batch_id();
2058            let signature = account.sign(&[batch_id], rng).unwrap();
2059            signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
2060        }
2061
2062        signatures
2063    }
2064
2065    /// Creates a signature of the batch ID for each committee member (excluding the primary).
2066    fn peer_signatures_for_batch(
2067        primary_address: Address<CurrentNetwork>,
2068        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2069        batch_id: Field<CurrentNetwork>,
2070        rng: &mut TestRng,
2071    ) -> IndexSet<Signature<CurrentNetwork>> {
2072        let mut signatures = IndexSet::new();
2073        for (_, account) in accounts {
2074            if account.address() == primary_address {
2075                continue;
2076            }
2077            let signature = account.sign(&[batch_id], rng).unwrap();
2078            signatures.insert(signature);
2079        }
2080        signatures
2081    }
2082
2083    // Creates a batch certificate.
2084    fn create_batch_certificate(
2085        primary_address: Address<CurrentNetwork>,
2086        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2087        round: u64,
2088        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
2089        rng: &mut TestRng,
2090    ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
2091        let timestamp = now();
2092
2093        let author =
2094            accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
2095        let private_key = author.private_key();
2096
2097        let committee_id = Field::rand(rng);
2098        let (solution_id, solution) = sample_unconfirmed_solution(rng);
2099        let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
2100        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2101        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2102
2103        let solution_transmission_id = (solution_id, solution_checksum).into();
2104        let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
2105
2106        let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
2107        let transmissions = [
2108            (solution_transmission_id, Transmission::Solution(solution)),
2109            (transaction_transmission_id, Transmission::Transaction(transaction)),
2110        ]
2111        .into();
2112
2113        let batch_header = BatchHeader::new(
2114            private_key,
2115            round,
2116            timestamp,
2117            committee_id,
2118            transmission_ids,
2119            previous_certificate_ids,
2120            rng,
2121        )
2122        .unwrap();
2123        let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
2124        let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
2125        (certificate, transmissions)
2126    }
2127
2128    // Create a certificate chain up to round in primary storage.
2129    fn store_certificate_chain(
2130        primary: &Primary<CurrentNetwork>,
2131        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
2132        round: u64,
2133        rng: &mut TestRng,
2134    ) -> IndexSet<Field<CurrentNetwork>> {
2135        let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2136        let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2137        for cur_round in 1..round {
2138            for (_, account) in accounts.iter() {
2139                let (certificate, transmissions) = create_batch_certificate(
2140                    account.address(),
2141                    accounts,
2142                    cur_round,
2143                    previous_certificates.clone(),
2144                    rng,
2145                );
2146                next_certificates.insert(certificate.id());
2147                assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
2148            }
2149
2150            assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
2151            previous_certificates = next_certificates;
2152            next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2153        }
2154
2155        previous_certificates
2156    }
2157
2158    // Insert the account socket addresses into the resolver so that
2159    // they are recognized as "connected".
2160    fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
2161        // First account is primary, which doesn't need to resolve.
2162        for (addr, acct) in accounts.iter().skip(1) {
2163            primary.gateway.resolver().insert_peer(*addr, *addr, acct.address());
2164        }
2165    }
2166
2167    #[tokio::test]
2168    async fn test_propose_batch() {
2169        let mut rng = TestRng::default();
2170        let (primary, _) = primary_without_handlers(&mut rng);
2171
2172        // Check there is no batch currently proposed.
2173        assert!(primary.proposed_batch.read().is_none());
2174
2175        // Generate a solution and a transaction.
2176        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2177        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2178
2179        // Store it on one of the workers.
2180        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2181        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2182
2183        // Try to propose a batch again. This time, it should succeed.
2184        assert!(primary.propose_batch().await.is_ok());
2185        assert!(primary.proposed_batch.read().is_some());
2186    }
2187
2188    #[tokio::test]
2189    async fn test_propose_batch_with_no_transmissions() {
2190        let mut rng = TestRng::default();
2191        let (primary, _) = primary_without_handlers(&mut rng);
2192
2193        // Check there is no batch currently proposed.
2194        assert!(primary.proposed_batch.read().is_none());
2195
2196        // Try to propose a batch with no transmissions.
2197        assert!(primary.propose_batch().await.is_ok());
2198        assert!(primary.proposed_batch.read().is_some());
2199    }
2200
2201    #[tokio::test]
2202    async fn test_propose_batch_in_round() {
2203        let round = 3;
2204        let mut rng = TestRng::default();
2205        let (primary, accounts) = primary_without_handlers(&mut rng);
2206
2207        // Fill primary storage.
2208        store_certificate_chain(&primary, &accounts, round, &mut rng);
2209
2210        // Sleep for a while to ensure the primary is ready to propose the next round.
2211        tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2212
2213        // Generate a solution and a transaction.
2214        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2215        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2216
2217        // Store it on one of the workers.
2218        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2219        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2220
2221        // Propose a batch again. This time, it should succeed.
2222        assert!(primary.propose_batch().await.is_ok());
2223        assert!(primary.proposed_batch.read().is_some());
2224    }
2225
2226    #[tokio::test]
2227    async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2228        let round = 3;
2229        let prev_round = round - 1;
2230        let mut rng = TestRng::default();
2231        let (primary, accounts) = primary_without_handlers(&mut rng);
2232        let peer_account = &accounts[1];
2233        let peer_ip = peer_account.0;
2234
2235        // Fill primary storage.
2236        store_certificate_chain(&primary, &accounts, round, &mut rng);
2237
2238        // Get transmissions from previous certificates.
2239        let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2240
2241        // Track the number of transmissions in the previous round.
2242        let mut num_transmissions_in_previous_round = 0;
2243
2244        // Generate a solution and a transaction.
2245        let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2246        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2247        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2248        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2249
2250        // Store it on one of the workers.
2251        primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2252        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2253
2254        // Check that the worker has 2 transmissions.
2255        assert_eq!(primary.workers[0].num_transmissions(), 2);
2256
2257        // Create certificates for the current round and add the transmissions to the worker before inserting the certificate to storage.
2258        for (_, account) in accounts.iter() {
2259            let (certificate, transmissions) = create_batch_certificate(
2260                account.address(),
2261                &accounts,
2262                round,
2263                previous_certificate_ids.clone(),
2264                &mut rng,
2265            );
2266
2267            // Add the transmissions to the worker.
2268            for (transmission_id, transmission) in transmissions.iter() {
2269                primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2270            }
2271
2272            // Insert the certificate to storage.
2273            num_transmissions_in_previous_round += transmissions.len();
2274            primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2275        }
2276
2277        // Sleep for a while to ensure the primary is ready to propose the next round.
2278        tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2279
2280        // Advance to the next round.
2281        assert!(primary.storage.increment_to_next_round(round).is_ok());
2282
2283        // Check that the worker has `num_transmissions_in_previous_round + 2` transmissions.
2284        assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2285
2286        // Propose the batch.
2287        assert!(primary.propose_batch().await.is_ok());
2288
2289        // Check that the proposal only contains the new transmissions that were not in previous certificates.
2290        let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone();
2291        assert_eq!(proposed_transmissions.len(), 2);
2292        assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2293        assert!(
2294            proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))
2295        );
2296    }
2297
2298    #[tokio::test]
2299    async fn test_propose_batch_over_spend_limit() {
2300        let mut rng = TestRng::default();
2301
2302        // Create a primary to test spend limit backwards compatibility with V4.
2303        let (accounts, committee) = sample_committee(&mut rng);
2304        let primary = primary_with_committee(
2305            0,
2306            &accounts,
2307            committee.clone(),
2308            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2309        );
2310
2311        // Check there is no batch currently proposed.
2312        assert!(primary.proposed_batch.read().is_none());
2313        // Check the workers are empty.
2314        primary.workers().iter().for_each(|worker| assert!(worker.transmissions().is_empty()));
2315
2316        // Generate a solution and a transaction.
2317        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2318        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2319
2320        for _i in 0..5 {
2321            let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2322            // Store it on one of the workers.
2323            primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2324        }
2325
2326        // Try to propose a batch again. This time, it should succeed.
2327        assert!(primary.propose_batch().await.is_ok());
2328        // Expect 2/5 transactions to be included in the proposal in addition to the solution.
2329        assert_eq!(primary.proposed_batch.read().as_ref().unwrap().transmissions().len(), 3);
2330        // Check the transmissions were correctly drained from the workers.
2331        assert_eq!(primary.workers().iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3);
2332    }
2333
2334    #[tokio::test]
2335    async fn test_batch_propose_from_peer() {
2336        let mut rng = TestRng::default();
2337        let (primary, accounts) = primary_without_handlers(&mut rng);
2338
2339        // Create a valid proposal with an author that isn't the primary.
2340        let round = 1;
2341        let peer_account = &accounts[1];
2342        let peer_ip = peer_account.0;
2343        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2344        let proposal = create_test_proposal(
2345            &peer_account.1,
2346            primary.ledger.current_committee().unwrap(),
2347            round,
2348            Default::default(),
2349            timestamp,
2350            1,
2351            &mut rng,
2352        );
2353
2354        // Make sure the primary is aware of the transmissions in the proposal.
2355        for (transmission_id, transmission) in proposal.transmissions() {
2356            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2357        }
2358
2359        // The author must be known to resolver to pass propose checks.
2360        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2361        // The primary must be considered synced.
2362        primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2363
2364        // Try to process the batch proposal from the peer, should succeed.
2365        assert!(
2366            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
2367        );
2368    }
2369
2370    #[tokio::test]
2371    async fn test_batch_propose_from_peer_when_not_synced() {
2372        let mut rng = TestRng::default();
2373        let (primary, accounts) = primary_without_handlers(&mut rng);
2374
2375        // Create a valid proposal with an author that isn't the primary.
2376        let round = 1;
2377        let peer_account = &accounts[1];
2378        let peer_ip = peer_account.0;
2379        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2380        let proposal = create_test_proposal(
2381            &peer_account.1,
2382            primary.ledger.current_committee().unwrap(),
2383            round,
2384            Default::default(),
2385            timestamp,
2386            1,
2387            &mut rng,
2388        );
2389
2390        // Make sure the primary is aware of the transmissions in the proposal.
2391        for (transmission_id, transmission) in proposal.transmissions() {
2392            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2393        }
2394
2395        // The author must be known to resolver to pass propose checks.
2396        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2397
2398        // Try to process the batch proposal from the peer, should fail.
2399        assert!(
2400            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2401        );
2402    }
2403
2404    #[tokio::test]
2405    async fn test_batch_propose_from_peer_in_round() {
2406        let round = 2;
2407        let mut rng = TestRng::default();
2408        let (primary, accounts) = primary_without_handlers(&mut rng);
2409
2410        // Generate certificates.
2411        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2412
2413        // Create a valid proposal with an author that isn't the primary.
2414        let peer_account = &accounts[1];
2415        let peer_ip = peer_account.0;
2416        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2417        let proposal = create_test_proposal(
2418            &peer_account.1,
2419            primary.ledger.current_committee().unwrap(),
2420            round,
2421            previous_certificates,
2422            timestamp,
2423            1,
2424            &mut rng,
2425        );
2426
2427        // Make sure the primary is aware of the transmissions in the proposal.
2428        for (transmission_id, transmission) in proposal.transmissions() {
2429            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2430        }
2431
2432        // The author must be known to resolver to pass propose checks.
2433        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2434        // The primary must be considered synced.
2435        primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2436
2437        // Try to process the batch proposal from the peer, should succeed.
2438        primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2439    }
2440
2441    #[tokio::test]
2442    async fn test_batch_propose_from_peer_wrong_round() {
2443        let mut rng = TestRng::default();
2444        let (primary, accounts) = primary_without_handlers(&mut rng);
2445
2446        // Create a valid proposal with an author that isn't the primary.
2447        let round = 1;
2448        let peer_account = &accounts[1];
2449        let peer_ip = peer_account.0;
2450        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2451        let proposal = create_test_proposal(
2452            &peer_account.1,
2453            primary.ledger.current_committee().unwrap(),
2454            round,
2455            Default::default(),
2456            timestamp,
2457            1,
2458            &mut rng,
2459        );
2460
2461        // Make sure the primary is aware of the transmissions in the proposal.
2462        for (transmission_id, transmission) in proposal.transmissions() {
2463            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2464        }
2465
2466        // The author must be known to resolver to pass propose checks.
2467        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2468        // The primary must be considered synced.
2469        primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2470
2471        // Try to process the batch proposal from the peer, should error.
2472        assert!(
2473            primary
2474                .process_batch_propose_from_peer(peer_ip, BatchPropose {
2475                    round: round + 1,
2476                    batch_header: Data::Object(proposal.batch_header().clone())
2477                })
2478                .await
2479                .is_err()
2480        );
2481    }
2482
2483    #[tokio::test]
2484    async fn test_batch_propose_from_peer_in_round_wrong_round() {
2485        let round = 4;
2486        let mut rng = TestRng::default();
2487        let (primary, accounts) = primary_without_handlers(&mut rng);
2488
2489        // Generate certificates.
2490        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2491
2492        // Create a valid proposal with an author that isn't the primary.
2493        let peer_account = &accounts[1];
2494        let peer_ip = peer_account.0;
2495        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2496        let proposal = create_test_proposal(
2497            &peer_account.1,
2498            primary.ledger.current_committee().unwrap(),
2499            round,
2500            previous_certificates,
2501            timestamp,
2502            1,
2503            &mut rng,
2504        );
2505
2506        // Make sure the primary is aware of the transmissions in the proposal.
2507        for (transmission_id, transmission) in proposal.transmissions() {
2508            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2509        }
2510
2511        // The author must be known to resolver to pass propose checks.
2512        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2513        // The primary must be considered synced.
2514        primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2515
2516        // Try to process the batch proposal from the peer, should error.
2517        assert!(
2518            primary
2519                .process_batch_propose_from_peer(peer_ip, BatchPropose {
2520                    round: round + 1,
2521                    batch_header: Data::Object(proposal.batch_header().clone())
2522                })
2523                .await
2524                .is_err()
2525        );
2526    }
2527
2528    #[tokio::test]
2529    async fn test_batch_propose_from_peer_with_past_timestamp() {
2530        let round = 2;
2531        let mut rng = TestRng::default();
2532        let (primary, accounts) = primary_without_handlers(&mut rng);
2533
2534        // Generate certificates.
2535        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2536
2537        // Create a valid proposal with an author that isn't the primary.
2538        let peer_account = &accounts[1];
2539        let peer_ip = peer_account.0;
2540        let past_timestamp = now() - 100; // Use a timestamp that is in the past.
2541        let proposal = create_test_proposal(
2542            &peer_account.1,
2543            primary.ledger.current_committee().unwrap(),
2544            round,
2545            previous_certificates,
2546            past_timestamp,
2547            1,
2548            &mut rng,
2549        );
2550
2551        // Make sure the primary is aware of the transmissions in the proposal.
2552        for (transmission_id, transmission) in proposal.transmissions() {
2553            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2554        }
2555
2556        // The author must be known to resolver to pass propose checks.
2557        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2558        // The primary must be considered synced.
2559        primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2560
2561        // Try to process the batch proposal from the peer, should error.
2562        assert!(
2563            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2564        );
2565    }
2566
2567    #[tokio::test]
2568    async fn test_batch_propose_from_peer_over_spend_limit() {
2569        let mut rng = TestRng::default();
2570
2571        // Create two primaries to test spend limit activation on V5.
2572        let (accounts, committee) = sample_committee(&mut rng);
2573        let primary_v4 = primary_with_committee(
2574            0,
2575            &accounts,
2576            committee.clone(),
2577            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
2578        );
2579        let primary_v5 = primary_with_committee(
2580            1,
2581            &accounts,
2582            committee.clone(),
2583            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V5).unwrap(),
2584        );
2585
2586        // Create a valid proposal with an author that isn't the primary.
2587        let round = 1;
2588        let peer_account = &accounts[2];
2589        let peer_ip = peer_account.0;
2590        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2591        let proposal =
2592            create_test_proposal(&peer_account.1, committee, round, Default::default(), timestamp, 4, &mut rng);
2593
2594        // Make sure the primary is aware of the transmissions in the proposal.
2595        for (transmission_id, transmission) in proposal.transmissions() {
2596            primary_v4.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2597            primary_v5.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2598        }
2599
2600        // The author must be known to resolver to pass propose checks.
2601        primary_v4.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2602        primary_v5.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2603        // The primary must be considered synced.
2604        primary_v4.sync.block_sync().try_block_sync(&primary_v4.gateway.clone()).await;
2605        primary_v5.sync.block_sync().try_block_sync(&primary_v5.gateway.clone()).await;
2606
2607        // Check the spend limit is enforced from V5 onwards.
2608        assert!(
2609            primary_v4
2610                .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2611                .await
2612                .is_ok()
2613        );
2614        assert!(
2615            primary_v5
2616                .process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into())
2617                .await
2618                .is_err()
2619        );
2620    }
2621
2622    #[tokio::test]
2623    async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2624        let round = 3;
2625        let mut rng = TestRng::default();
2626        let (primary, _) = primary_without_handlers(&mut rng);
2627
2628        // Check there is no batch currently proposed.
2629        assert!(primary.proposed_batch.read().is_none());
2630
2631        // Generate a solution and a transaction.
2632        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2633        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2634
2635        // Store it on one of the workers.
2636        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2637        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2638
2639        // Set the proposal lock to a round ahead of the storage.
2640        let old_proposal_lock_round = *primary.propose_lock.lock().await;
2641        *primary.propose_lock.lock().await = round + 1;
2642
2643        // Propose a batch and enforce that it fails.
2644        assert!(primary.propose_batch().await.is_ok());
2645        assert!(primary.proposed_batch.read().is_none());
2646
2647        // Set the proposal lock back to the old round.
2648        *primary.propose_lock.lock().await = old_proposal_lock_round;
2649
2650        // Try to propose a batch again. This time, it should succeed.
2651        assert!(primary.propose_batch().await.is_ok());
2652        assert!(primary.proposed_batch.read().is_some());
2653    }
2654
2655    #[tokio::test]
2656    async fn test_propose_batch_with_storage_round_behind_proposal() {
2657        let round = 5;
2658        let mut rng = TestRng::default();
2659        let (primary, accounts) = primary_without_handlers(&mut rng);
2660
2661        // Generate previous certificates.
2662        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2663
2664        // Create a valid proposal.
2665        let timestamp = now();
2666        let proposal = create_test_proposal(
2667            primary.gateway.account(),
2668            primary.ledger.current_committee().unwrap(),
2669            round + 1,
2670            previous_certificates,
2671            timestamp,
2672            1,
2673            &mut rng,
2674        );
2675
2676        // Store the proposal on the primary.
2677        *primary.proposed_batch.write() = Some(proposal);
2678
2679        // Try to propose a batch will terminate early because the storage is behind the proposal.
2680        assert!(primary.propose_batch().await.is_ok());
2681        assert!(primary.proposed_batch.read().is_some());
2682        assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
2683    }
2684
2685    #[tokio::test(flavor = "multi_thread")]
2686    async fn test_batch_signature_from_peer() {
2687        let mut rng = TestRng::default();
2688        let (primary, accounts) = primary_without_handlers(&mut rng);
2689        map_account_addresses(&primary, &accounts);
2690
2691        // Create a valid proposal.
2692        let round = 1;
2693        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2694        let proposal = create_test_proposal(
2695            primary.gateway.account(),
2696            primary.ledger.current_committee().unwrap(),
2697            round,
2698            Default::default(),
2699            timestamp,
2700            1,
2701            &mut rng,
2702        );
2703
2704        // Store the proposal on the primary.
2705        *primary.proposed_batch.write() = Some(proposal);
2706
2707        // Each committee member signs the batch.
2708        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2709
2710        // Have the primary process the signatures.
2711        for (socket_addr, signature) in signatures {
2712            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2713        }
2714
2715        // Check the certificate was created and stored by the primary.
2716        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2717        // Check the round was incremented.
2718        assert_eq!(primary.current_round(), round + 1);
2719    }
2720
2721    #[tokio::test(flavor = "multi_thread")]
2722    async fn test_batch_signature_from_peer_in_round() {
2723        let round = 5;
2724        let mut rng = TestRng::default();
2725        let (primary, accounts) = primary_without_handlers(&mut rng);
2726        map_account_addresses(&primary, &accounts);
2727
2728        // Generate certificates.
2729        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2730
2731        // Create a valid proposal.
2732        let timestamp = now();
2733        let proposal = create_test_proposal(
2734            primary.gateway.account(),
2735            primary.ledger.current_committee().unwrap(),
2736            round,
2737            previous_certificates,
2738            timestamp,
2739            1,
2740            &mut rng,
2741        );
2742
2743        // Store the proposal on the primary.
2744        *primary.proposed_batch.write() = Some(proposal);
2745
2746        // Each committee member signs the batch.
2747        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2748
2749        // Have the primary process the signatures.
2750        for (socket_addr, signature) in signatures {
2751            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2752        }
2753
2754        // Check the certificate was created and stored by the primary.
2755        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2756        // Check the round was incremented.
2757        assert_eq!(primary.current_round(), round + 1);
2758    }
2759
2760    #[tokio::test]
2761    async fn test_batch_signature_from_peer_no_quorum() {
2762        let mut rng = TestRng::default();
2763        let (primary, accounts) = primary_without_handlers(&mut rng);
2764        map_account_addresses(&primary, &accounts);
2765
2766        // Create a valid proposal.
2767        let round = 1;
2768        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2769        let proposal = create_test_proposal(
2770            primary.gateway.account(),
2771            primary.ledger.current_committee().unwrap(),
2772            round,
2773            Default::default(),
2774            timestamp,
2775            1,
2776            &mut rng,
2777        );
2778
2779        // Store the proposal on the primary.
2780        *primary.proposed_batch.write() = Some(proposal);
2781
2782        // Each committee member signs the batch.
2783        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2784
2785        // Have the primary process only one signature, mimicking a lack of quorum.
2786        let (socket_addr, signature) = signatures.first().unwrap();
2787        primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2788
2789        // Check the certificate was not created and stored by the primary.
2790        assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2791        // Check the round was incremented.
2792        assert_eq!(primary.current_round(), round);
2793    }
2794
2795    #[tokio::test]
2796    async fn test_batch_signature_from_peer_in_round_no_quorum() {
2797        let round = 7;
2798        let mut rng = TestRng::default();
2799        let (primary, accounts) = primary_without_handlers(&mut rng);
2800        map_account_addresses(&primary, &accounts);
2801
2802        // Generate certificates.
2803        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2804
2805        // Create a valid proposal.
2806        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2807        let proposal = create_test_proposal(
2808            primary.gateway.account(),
2809            primary.ledger.current_committee().unwrap(),
2810            round,
2811            previous_certificates,
2812            timestamp,
2813            1,
2814            &mut rng,
2815        );
2816
2817        // Store the proposal on the primary.
2818        *primary.proposed_batch.write() = Some(proposal);
2819
2820        // Each committee member signs the batch.
2821        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2822
2823        // Have the primary process only one signature, mimicking a lack of quorum.
2824        let (socket_addr, signature) = signatures.first().unwrap();
2825        primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2826
2827        // Check the certificate was not created and stored by the primary.
2828        assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2829        // Check the round was incremented.
2830        assert_eq!(primary.current_round(), round);
2831    }
2832
2833    #[tokio::test]
2834    async fn test_insert_certificate_with_aborted_transmissions() {
2835        let round = 3;
2836        let prev_round = round - 1;
2837        let mut rng = TestRng::default();
2838        let (primary, accounts) = primary_without_handlers(&mut rng);
2839        let peer_account = &accounts[1];
2840        let peer_ip = peer_account.0;
2841
2842        // Fill primary storage.
2843        store_certificate_chain(&primary, &accounts, round, &mut rng);
2844
2845        // Get transmissions from previous certificates.
2846        let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2847
2848        // Generate a solution and a transaction.
2849        let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2850        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2851
2852        // Store it on one of the workers.
2853        primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2854        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2855
2856        // Check that the worker has 2 transmissions.
2857        assert_eq!(primary.workers[0].num_transmissions(), 2);
2858
2859        // Create certificates for the current round.
2860        let account = accounts[0].1.clone();
2861        let (certificate, transmissions) =
2862            create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
2863        let certificate_id = certificate.id();
2864
2865        // Randomly abort some of the transmissions.
2866        let mut aborted_transmissions = HashSet::new();
2867        let mut transmissions_without_aborted = HashMap::new();
2868        for (transmission_id, transmission) in transmissions.clone() {
2869            match rng.gen::<bool>() || aborted_transmissions.is_empty() {
2870                true => {
2871                    // Insert the aborted transmission.
2872                    aborted_transmissions.insert(transmission_id);
2873                }
2874                false => {
2875                    // Insert the transmission without the aborted transmission.
2876                    transmissions_without_aborted.insert(transmission_id, transmission);
2877                }
2878            };
2879        }
2880
2881        // Add the non-aborted transmissions to the worker.
2882        for (transmission_id, transmission) in transmissions_without_aborted.iter() {
2883            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2884        }
2885
2886        // Check that inserting the transmission with missing transmissions fails.
2887        assert!(
2888            primary
2889                .storage
2890                .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
2891                .is_err()
2892        );
2893        assert!(
2894            primary
2895                .storage
2896                .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
2897                .is_err()
2898        );
2899
2900        // Insert the certificate to storage.
2901        primary
2902            .storage
2903            .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
2904            .unwrap();
2905
2906        // Ensure the certificate exists in storage.
2907        assert!(primary.storage.contains_certificate(certificate_id));
2908        // Ensure that the aborted transmission IDs exist in storage.
2909        for aborted_transmission_id in aborted_transmissions {
2910            assert!(primary.storage.contains_transmission(aborted_transmission_id));
2911            assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
2912        }
2913    }
2914}