snarkos_node_bft/
primary.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Gateway,
18    MAX_BATCH_DELAY_IN_MS,
19    MAX_WORKERS,
20    MIN_BATCH_DELAY_IN_SECS,
21    PRIMARY_PING_IN_MS,
22    Sync,
23    Transport,
24    WORKER_PING_IN_MS,
25    Worker,
26    events::{BatchPropose, BatchSignature, Event},
27    helpers::{
28        BFTSender,
29        PrimaryReceiver,
30        PrimarySender,
31        Proposal,
32        ProposalCache,
33        SignedProposals,
34        Storage,
35        assign_to_worker,
36        assign_to_workers,
37        fmt_id,
38        init_sync_channels,
39        init_worker_channels,
40        now,
41    },
42    spawn_blocking,
43};
44use snarkos_account::Account;
45use snarkos_node_bft_events::PrimaryPing;
46use snarkos_node_bft_ledger_service::LedgerService;
47use snarkos_node_sync::DUMMY_SELF_IP;
48use snarkvm::{
49    console::{
50        prelude::*,
51        types::{Address, Field},
52    },
53    ledger::{
54        block::Transaction,
55        narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
56        puzzle::{Solution, SolutionID},
57    },
58    prelude::committee::Committee,
59};
60
61use colored::Colorize;
62use futures::stream::{FuturesUnordered, StreamExt};
63use indexmap::{IndexMap, IndexSet};
64use parking_lot::{Mutex, RwLock};
65use rayon::prelude::*;
66use std::{
67    collections::{HashMap, HashSet},
68    future::Future,
69    net::SocketAddr,
70    sync::Arc,
71    time::Duration,
72};
73use tokio::{
74    sync::{Mutex as TMutex, OnceCell},
75    task::JoinHandle,
76};
77
78/// A helper type for an optional proposed batch.
79pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
80
81#[derive(Clone)]
82pub struct Primary<N: Network> {
83    /// The sync module.
84    sync: Sync<N>,
85    /// The gateway.
86    gateway: Gateway<N>,
87    /// The storage.
88    storage: Storage<N>,
89    /// The ledger service.
90    ledger: Arc<dyn LedgerService<N>>,
91    /// The workers.
92    workers: Arc<[Worker<N>]>,
93    /// The BFT sender.
94    bft_sender: Arc<OnceCell<BFTSender<N>>>,
95    /// The batch proposal, if the primary is currently proposing a batch.
96    proposed_batch: Arc<ProposedBatch<N>>,
97    /// The timestamp of the most recent proposed batch.
98    latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
99    /// The recently-signed batch proposals.
100    signed_proposals: Arc<RwLock<SignedProposals<N>>>,
101    /// The spawned handles.
102    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
103    /// The lock for propose_batch.
104    propose_lock: Arc<TMutex<u64>>,
105}
106
107impl<N: Network> Primary<N> {
108    /// The maximum number of unconfirmed transmissions to send to the primary.
109    pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
110
111    /// Initializes a new primary instance.
112    pub fn new(
113        account: Account<N>,
114        storage: Storage<N>,
115        ledger: Arc<dyn LedgerService<N>>,
116        ip: Option<SocketAddr>,
117        trusted_validators: &[SocketAddr],
118        dev: Option<u16>,
119    ) -> Result<Self> {
120        // Initialize the gateway.
121        let gateway = Gateway::new(account, storage.clone(), ledger.clone(), ip, trusted_validators, dev)?;
122        // Initialize the sync module.
123        let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone());
124
125        // Initialize the primary instance.
126        Ok(Self {
127            sync,
128            gateway,
129            storage,
130            ledger,
131            workers: Arc::from(vec![]),
132            bft_sender: Default::default(),
133            proposed_batch: Default::default(),
134            latest_proposed_batch_timestamp: Default::default(),
135            signed_proposals: Default::default(),
136            handles: Default::default(),
137            propose_lock: Default::default(),
138        })
139    }
140
141    /// Load the proposal cache file and update the Primary state with the stored data.
142    async fn load_proposal_cache(&self) -> Result<()> {
143        // Fetch the signed proposals from the file system if it exists.
144        match ProposalCache::<N>::exists(self.gateway.dev()) {
145            // If the proposal cache exists, then process the proposal cache.
146            true => match ProposalCache::<N>::load(self.gateway.account().address(), self.gateway.dev()) {
147                Ok(proposal_cache) => {
148                    // Extract the proposal and signed proposals.
149                    let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
150                        proposal_cache.into();
151
152                    // Write the proposed batch.
153                    *self.proposed_batch.write() = proposed_batch;
154                    // Write the signed proposals.
155                    *self.signed_proposals.write() = signed_proposals;
156                    // Writ the propose lock.
157                    *self.propose_lock.lock().await = latest_certificate_round;
158
159                    // Update the storage with the pending certificates.
160                    for certificate in pending_certificates {
161                        let batch_id = certificate.batch_id();
162                        // We use a dummy IP because the node should not need to request from any peers.
163                        // The storage should have stored all the transmissions. If not, we simply
164                        // skip the certificate.
165                        if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
166                        {
167                            warn!("Failed to load stored certificate {} from proposal cache - {err}", fmt_id(batch_id));
168                        }
169                    }
170                    Ok(())
171                }
172                Err(err) => {
173                    bail!("Failed to read the signed proposals from the file system - {err}.");
174                }
175            },
176            // If the proposal cache does not exist, then return early.
177            false => Ok(()),
178        }
179    }
180
181    /// Run the primary instance.
182    pub async fn run(
183        &mut self,
184        bft_sender: Option<BFTSender<N>>,
185        primary_sender: PrimarySender<N>,
186        primary_receiver: PrimaryReceiver<N>,
187    ) -> Result<()> {
188        info!("Starting the primary instance of the memory pool...");
189
190        // Set the BFT sender.
191        if let Some(bft_sender) = &bft_sender {
192            // Set the BFT sender in the primary.
193            self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set");
194        }
195
196        // Construct a map of the worker senders.
197        let mut worker_senders = IndexMap::new();
198        // Construct a map for the workers.
199        let mut workers = Vec::new();
200        // Initialize the workers.
201        for id in 0..MAX_WORKERS {
202            // Construct the worker channels.
203            let (tx_worker, rx_worker) = init_worker_channels();
204            // Construct the worker instance.
205            let worker = Worker::new(
206                id,
207                Arc::new(self.gateway.clone()),
208                self.storage.clone(),
209                self.ledger.clone(),
210                self.proposed_batch.clone(),
211            )?;
212            // Run the worker instance.
213            worker.run(rx_worker);
214            // Add the worker to the list of workers.
215            workers.push(worker);
216            // Add the worker sender to the map.
217            worker_senders.insert(id, tx_worker);
218        }
219        // Set the workers.
220        self.workers = Arc::from(workers);
221
222        // First, initialize the sync channels.
223        let (sync_sender, sync_receiver) = init_sync_channels();
224        // Next, initialize the sync module and sync the storage from ledger.
225        self.sync.initialize(bft_sender).await?;
226        // Next, load and process the proposal cache before running the sync module.
227        self.load_proposal_cache().await?;
228        // Next, run the sync module.
229        self.sync.run(sync_receiver).await?;
230        // Next, initialize the gateway.
231        self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
232        // Lastly, start the primary handlers.
233        // Note: This ensures the primary does not start communicating before syncing is complete.
234        self.start_handlers(primary_receiver);
235
236        Ok(())
237    }
238
239    /// Returns the current round.
240    pub fn current_round(&self) -> u64 {
241        self.storage.current_round()
242    }
243
244    /// Returns `true` if the primary is synced.
245    pub fn is_synced(&self) -> bool {
246        self.sync.is_synced()
247    }
248
249    /// Returns the gateway.
250    pub const fn gateway(&self) -> &Gateway<N> {
251        &self.gateway
252    }
253
254    /// Returns the storage.
255    pub const fn storage(&self) -> &Storage<N> {
256        &self.storage
257    }
258
259    /// Returns the ledger.
260    pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
261        &self.ledger
262    }
263
264    /// Returns the number of workers.
265    pub fn num_workers(&self) -> u8 {
266        u8::try_from(self.workers.len()).expect("Too many workers")
267    }
268
269    /// Returns the workers.
270    pub const fn workers(&self) -> &Arc<[Worker<N>]> {
271        &self.workers
272    }
273
274    /// Returns the batch proposal of our primary, if one currently exists.
275    pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
276        &self.proposed_batch
277    }
278}
279
280impl<N: Network> Primary<N> {
281    /// Returns the number of unconfirmed transmissions.
282    pub fn num_unconfirmed_transmissions(&self) -> usize {
283        self.workers.iter().map(|worker| worker.num_transmissions()).sum()
284    }
285
286    /// Returns the number of unconfirmed ratifications.
287    pub fn num_unconfirmed_ratifications(&self) -> usize {
288        self.workers.iter().map(|worker| worker.num_ratifications()).sum()
289    }
290
291    /// Returns the number of solutions.
292    pub fn num_unconfirmed_solutions(&self) -> usize {
293        self.workers.iter().map(|worker| worker.num_solutions()).sum()
294    }
295
296    /// Returns the number of unconfirmed transactions.
297    pub fn num_unconfirmed_transactions(&self) -> usize {
298        self.workers.iter().map(|worker| worker.num_transactions()).sum()
299    }
300}
301
302impl<N: Network> Primary<N> {
303    /// Returns the worker transmission IDs.
304    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
305        self.workers.iter().flat_map(|worker| worker.transmission_ids())
306    }
307
308    /// Returns the worker transmissions.
309    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
310        self.workers.iter().flat_map(|worker| worker.transmissions())
311    }
312
313    /// Returns the worker solutions.
314    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
315        self.workers.iter().flat_map(|worker| worker.solutions())
316    }
317
318    /// Returns the worker transactions.
319    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
320        self.workers.iter().flat_map(|worker| worker.transactions())
321    }
322}
323
324impl<N: Network> Primary<N> {
325    /// Clears the worker solutions.
326    pub fn clear_worker_solutions(&self) {
327        self.workers.iter().for_each(Worker::clear_solutions);
328    }
329}
330
331impl<N: Network> Primary<N> {
332    /// Proposes the batch for the current round.
333    ///
334    /// This method performs the following steps:
335    /// 1. Drain the workers.
336    /// 2. Sign the batch.
337    /// 3. Set the batch proposal in the primary.
338    /// 4. Broadcast the batch header to all validators for signing.
339    pub async fn propose_batch(&self) -> Result<()> {
340        // This function isn't re-entrant.
341        let mut lock_guard = self.propose_lock.lock().await;
342
343        // Check if the proposed batch has expired, and clear it if it has expired.
344        if let Err(e) = self.check_proposed_batch_for_expiration().await {
345            warn!("Failed to check the proposed batch for expiration - {e}");
346            return Ok(());
347        }
348
349        // Retrieve the current round.
350        let round = self.current_round();
351        // Compute the previous round.
352        let previous_round = round.saturating_sub(1);
353
354        // If the current round is 0, return early.
355        ensure!(round > 0, "Round 0 cannot have transaction batches");
356
357        // If the current storage round is below the latest proposal round, then return early.
358        if round < *lock_guard {
359            warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
360            return Ok(());
361        }
362
363        // If there is a batch being proposed already,
364        // rebroadcast the batch header to the non-signers, and return early.
365        if let Some(proposal) = self.proposed_batch.read().as_ref() {
366            // Ensure that the storage is caught up to the proposal before proceeding to rebroadcast this.
367            if round < proposal.round()
368                || proposal
369                    .batch_header()
370                    .previous_certificate_ids()
371                    .iter()
372                    .any(|id| !self.storage.contains_certificate(*id))
373            {
374                warn!(
375                    "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
376                    proposal.round(),
377                );
378                return Ok(());
379            }
380            // Construct the event.
381            // TODO(ljedrz): the BatchHeader should be serialized only once in advance before being sent to non-signers.
382            let event = Event::BatchPropose(proposal.batch_header().clone().into());
383            // Iterate through the non-signers.
384            for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
385                // Resolve the address to the peer IP.
386                match self.gateway.resolver().get_peer_ip_for_address(address) {
387                    // Resend the batch proposal to the validator for signing.
388                    Some(peer_ip) => {
389                        let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
390                        tokio::spawn(async move {
391                            debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
392                            // Resend the batch proposal to the peer.
393                            if gateway.send(peer_ip, event_).await.is_none() {
394                                warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
395                            }
396                        });
397                    }
398                    None => continue,
399                }
400            }
401            debug!("Proposed batch for round {} is still valid", proposal.round());
402            return Ok(());
403        }
404
405        #[cfg(feature = "metrics")]
406        metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
407
408        // Ensure that the primary does not create a new proposal too quickly.
409        if let Err(e) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) {
410            debug!("Primary is safely skipping a batch proposal for round {round} - {}", format!("{e}").dimmed());
411            return Ok(());
412        }
413
414        // Ensure the primary has not proposed a batch for this round before.
415        if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
416            // If a BFT sender was provided, attempt to advance the current round.
417            if let Some(bft_sender) = self.bft_sender.get() {
418                match bft_sender.send_primary_round_to_bft(self.current_round()).await {
419                    // 'is_ready' is true if the primary is ready to propose a batch for the next round.
420                    Ok(true) => (), // continue,
421                    // 'is_ready' is false if the primary is not ready to propose a batch for the next round.
422                    Ok(false) => return Ok(()),
423                    // An error occurred while attempting to advance the current round.
424                    Err(e) => {
425                        warn!("Failed to update the BFT to the next round - {e}");
426                        return Err(e);
427                    }
428                }
429            }
430            debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
431            return Ok(());
432        }
433
434        // Determine if the current round has been proposed.
435        // Note: Do NOT make this judgment in advance before rebroadcast and round update. Rebroadcasting is
436        // good for network reliability and should not be prevented for the already existing proposed_batch.
437        // If a certificate already exists for the current round, an attempt should be made to advance the
438        // round as early as possible.
439        if round == *lock_guard {
440            warn!("Primary is safely skipping a batch proposal - round {round} already proposed");
441            return Ok(());
442        }
443
444        // Retrieve the committee to check against.
445        let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
446        // Check if the primary is connected to enough validators to reach quorum threshold.
447        {
448            // Retrieve the connected validator addresses.
449            let mut connected_validators = self.gateway.connected_addresses();
450            // Append the primary to the set.
451            connected_validators.insert(self.gateway.account().address());
452            // If quorum threshold is not reached, return early.
453            if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
454                debug!(
455                    "Primary is safely skipping a batch proposal for round {round} {}",
456                    "(please connect to more validators)".dimmed()
457                );
458                trace!("Primary is connected to {} validators", connected_validators.len() - 1);
459                return Ok(());
460            }
461        }
462
463        // Retrieve the previous certificates.
464        let previous_certificates = self.storage.get_certificates_for_round(previous_round);
465
466        // Check if the batch is ready to be proposed.
467        // Note: The primary starts at round 1, and round 0 contains no certificates, by definition.
468        let mut is_ready = previous_round == 0;
469        // If the previous round is not 0, check if the previous certificates have reached the quorum threshold.
470        if previous_round > 0 {
471            // Retrieve the committee lookback for the round.
472            let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
473                bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
474            };
475            // Construct a set over the authors.
476            let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
477            // Check if the previous certificates have reached the quorum threshold.
478            if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
479                is_ready = true;
480            }
481        }
482        // If the batch is not ready to be proposed, return early.
483        if !is_ready {
484            debug!(
485                "Primary is safely skipping a batch proposal for round {round} {}",
486                format!("(previous round {previous_round} has not reached quorum)").dimmed()
487            );
488            return Ok(());
489        }
490
491        // Determined the required number of transmissions per worker.
492        let num_transmissions_per_worker = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / self.num_workers() as usize;
493        // Initialize the map of transmissions.
494        let mut transmissions: IndexMap<_, _> = Default::default();
495        // Take the transmissions from the workers.
496        for worker in self.workers.iter() {
497            // Initialize a tracker for included transmissions for the current worker.
498            let mut num_transmissions_included_for_worker = 0;
499            // Keep draining the worker until the desired number of transmissions is reached or the worker is empty.
500            'outer: while num_transmissions_included_for_worker < num_transmissions_per_worker {
501                // Determine the number of remaining transmissions for the worker.
502                let num_remaining_transmissions =
503                    num_transmissions_per_worker.saturating_sub(num_transmissions_included_for_worker);
504                // Drain the worker.
505                let mut worker_transmissions = worker.drain(num_remaining_transmissions).peekable();
506                // If the worker is empty, break early.
507                if worker_transmissions.peek().is_none() {
508                    break 'outer;
509                }
510                // Iterate through the worker transmissions.
511                'inner: for (id, transmission) in worker_transmissions {
512                    // Check if the ledger already contains the transmission.
513                    if self.ledger.contains_transmission(&id).unwrap_or(true) {
514                        trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
515                        continue 'inner;
516                    }
517                    // Check if the storage already contain the transmission.
518                    // Note: We do not skip if this is the first transmission in the proposal, to ensure that
519                    // the primary does not propose a batch with no transmissions.
520                    if !transmissions.is_empty() && self.storage.contains_transmission(id) {
521                        trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
522                        continue 'inner;
523                    }
524                    // Check the transmission is still valid.
525                    match (id, transmission.clone()) {
526                        (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
527                            // Ensure the checksum matches.
528                            match solution.to_checksum::<N>() {
529                                Ok(solution_checksum) if solution_checksum == checksum => (),
530                                _ => {
531                                    trace!(
532                                        "Proposing - Skipping solution '{}' - Checksum mismatch",
533                                        fmt_id(solution_id)
534                                    );
535                                    continue 'inner;
536                                }
537                            }
538                            // Check if the solution is still valid.
539                            if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
540                                trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
541                                continue 'inner;
542                            }
543                        }
544                        (
545                            TransmissionID::Transaction(transaction_id, checksum),
546                            Transmission::Transaction(transaction),
547                        ) => {
548                            // Ensure the checksum matches.
549                            match transaction.to_checksum::<N>() {
550                                Ok(transaction_checksum) if transaction_checksum == checksum => (),
551                                _ => {
552                                    trace!(
553                                        "Proposing - Skipping transaction '{}' - Checksum mismatch",
554                                        fmt_id(transaction_id)
555                                    );
556                                    continue 'inner;
557                                }
558                            }
559                            // Check if the transaction is still valid.
560                            if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
561                                trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
562                                continue 'inner;
563                            }
564                        }
565                        // Note: We explicitly forbid including ratifications,
566                        // as the protocol currently does not support ratifications.
567                        (TransmissionID::Ratification, Transmission::Ratification) => continue,
568                        // All other combinations are clearly invalid.
569                        _ => continue 'inner,
570                    }
571                    // Insert the transmission into the map.
572                    transmissions.insert(id, transmission);
573                    num_transmissions_included_for_worker += 1;
574                }
575            }
576        }
577
578        // Determine the current timestamp.
579        let current_timestamp = now();
580
581        *lock_guard = round;
582
583        /* Proceeding to sign & propose the batch. */
584        info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
585
586        // Retrieve the private key.
587        let private_key = *self.gateway.account().private_key();
588        // Retrieve the committee ID.
589        let committee_id = committee_lookback.id();
590        // Prepare the transmission IDs.
591        let transmission_ids = transmissions.keys().copied().collect();
592        // Prepare the previous batch certificate IDs.
593        let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
594        // Sign the batch header and construct the proposal.
595        let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
596            &private_key,
597            round,
598            current_timestamp,
599            committee_id,
600            transmission_ids,
601            previous_certificate_ids,
602            &mut rand::thread_rng()
603        ))
604        .and_then(|batch_header| {
605            Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
606                .map(|proposal| (batch_header, proposal))
607        })
608        .inspect_err(|_| {
609            // On error, reinsert the transmissions and then propagate the error.
610            if let Err(e) = self.reinsert_transmissions_into_workers(transmissions) {
611                error!("Failed to reinsert transmissions: {e:?}");
612            }
613        })?;
614        // Broadcast the batch to all validators for signing.
615        self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
616        // Set the timestamp of the latest proposed batch.
617        *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
618        // Set the proposed batch.
619        *self.proposed_batch.write() = Some(proposal);
620        Ok(())
621    }
622
623    /// Processes a batch propose from a peer.
624    ///
625    /// This method performs the following steps:
626    /// 1. Verify the batch.
627    /// 2. Sign the batch.
628    /// 3. Broadcast the signature back to the validator.
629    ///
630    /// If our primary is ahead of the peer, we will not sign the batch.
631    /// If our primary is behind the peer, but within GC range, we will sync up to the peer's round, and then sign the batch.
632    async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
633        let BatchPropose { round: batch_round, batch_header } = batch_propose;
634
635        // Deserialize the batch header.
636        let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
637        // Ensure the round matches in the batch header.
638        if batch_round != batch_header.round() {
639            // Proceed to disconnect the validator.
640            self.gateway.disconnect(peer_ip);
641            bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
642        }
643
644        // Retrieve the batch author.
645        let batch_author = batch_header.author();
646
647        // Ensure the batch proposal is from the validator.
648        match self.gateway.resolver().get_address(peer_ip) {
649            // If the peer is a validator, then ensure the batch proposal is from the validator.
650            Some(address) => {
651                if address != batch_author {
652                    // Proceed to disconnect the validator.
653                    self.gateway.disconnect(peer_ip);
654                    bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
655                }
656            }
657            None => bail!("Batch proposal from a disconnected validator"),
658        }
659        // Ensure the batch author is a current committee member.
660        if !self.gateway.is_authorized_validator_address(batch_author) {
661            // Proceed to disconnect the validator.
662            self.gateway.disconnect(peer_ip);
663            bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
664        }
665        // Ensure the batch proposal is not from the current primary.
666        if self.gateway.account().address() == batch_author {
667            bail!("Invalid peer - proposed batch from myself ({batch_author})");
668        }
669
670        // Ensure that the batch proposal's committee ID matches the expected committee ID.
671        let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
672        if expected_committee_id != batch_header.committee_id() {
673            // Proceed to disconnect the validator.
674            self.gateway.disconnect(peer_ip);
675            bail!(
676                "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
677                batch_header.committee_id()
678            );
679        }
680
681        // Retrieve the cached round and batch ID for this validator.
682        if let Some((signed_round, signed_batch_id, signature)) =
683            self.signed_proposals.read().get(&batch_author).copied()
684        {
685            // If the signed round is ahead of the peer's batch round, do not sign the proposal.
686            // Note: while this may be valid behaviour, additional formal analysis and testing will need to be done before allowing it.
687            if signed_round > batch_header.round() {
688                bail!(
689                    "Peer ({batch_author}) proposed a batch for a previous round ({}), latest signed round: {signed_round}",
690                    batch_header.round()
691                );
692            }
693
694            // If the round matches and the batch ID differs, then the validator is malicious.
695            if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
696                bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
697            }
698            // If the round and batch ID matches, then skip signing the batch a second time.
699            // Instead, rebroadcast the cached signature to the peer.
700            if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
701                let gateway = self.gateway.clone();
702                tokio::spawn(async move {
703                    debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
704                    let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
705                    // Resend the batch signature to the peer.
706                    if gateway.send(peer_ip, event).await.is_none() {
707                        warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
708                    }
709                });
710                // Return early.
711                return Ok(());
712            }
713        }
714
715        // Ensure that the batch header doesn't already exist in storage.
716        // Note this is already checked in `check_batch_header`, however we can return early here without creating a blocking task.
717        if self.storage.contains_batch(batch_header.batch_id()) {
718            debug!(
719                "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
720                format!("batch for round {batch_round} already exists in storage").dimmed()
721            );
722            return Ok(());
723        }
724
725        // Compute the previous round.
726        let previous_round = batch_round.saturating_sub(1);
727        // Ensure that the peer did not propose a batch too quickly.
728        if let Err(e) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
729            // Proceed to disconnect the validator.
730            self.gateway.disconnect(peer_ip);
731            bail!("Malicious peer - {e} from '{peer_ip}'");
732        }
733
734        // Ensure the batch header does not contain any ratifications.
735        if batch_header.contains(TransmissionID::Ratification) {
736            // Proceed to disconnect the validator.
737            self.gateway.disconnect(peer_ip);
738            bail!(
739                "Malicious peer - proposed batch contains an unsupported ratification transmissionID from '{peer_ip}'",
740            );
741        }
742
743        // If the peer is ahead, use the batch header to sync up to the peer.
744        let mut missing_transmissions = self.sync_with_batch_header_from_peer::<false>(peer_ip, &batch_header).await?;
745
746        // Check that the transmission ids match and are not fee transactions.
747        if let Err(err) = cfg_iter_mut!(missing_transmissions).try_for_each(|(transmission_id, transmission)| {
748            // If the transmission is not well-formed, then return early.
749            self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
750        }) {
751            debug!("Batch propose at round {batch_round} from '{peer_ip}' contains an invalid transmission - {err}",);
752            return Ok(());
753        }
754
755        // Ensure the batch is for the current round.
756        // This method must be called after fetching previous certificates (above),
757        // and prior to checking the batch header (below).
758        if let Err(e) = self.ensure_is_signing_round(batch_round) {
759            // If the primary is not signing for the peer's round, then return early.
760            debug!("{e} from '{peer_ip}'");
761            return Ok(());
762        }
763
764        // Ensure the batch header from the peer is valid.
765        let (storage, header) = (self.storage.clone(), batch_header.clone());
766        let missing_transmissions =
767            spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?;
768        // Inserts the missing transmissions into the workers.
769        self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
770
771        /* Proceeding to sign the batch. */
772
773        // Retrieve the batch ID.
774        let batch_id = batch_header.batch_id();
775        // Sign the batch ID.
776        let account = self.gateway.account().clone();
777        let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::thread_rng()))?;
778
779        // Ensure the proposal has not already been signed.
780        //
781        // Note: Due to the need to sync the batch header with the peer, it is possible
782        // for the primary to receive the same 'BatchPropose' event again, whereby only
783        // one instance of this handler should sign the batch. This check guarantees this.
784        match self.signed_proposals.write().0.entry(batch_author) {
785            std::collections::hash_map::Entry::Occupied(mut entry) => {
786                // If the validator has already signed a batch for this round, then return early,
787                // since, if the peer still has not received the signature, they will request it again,
788                // and the logic at the start of this function will resend the (now cached) signature
789                // to the peer if asked to sign this batch proposal again.
790                if entry.get().0 == batch_round {
791                    return Ok(());
792                }
793                // Otherwise, cache the round, batch ID, and signature for this validator.
794                entry.insert((batch_round, batch_id, signature));
795            }
796            // If the validator has not signed a batch before, then continue.
797            std::collections::hash_map::Entry::Vacant(entry) => {
798                // Cache the round, batch ID, and signature for this validator.
799                entry.insert((batch_round, batch_id, signature));
800            }
801        };
802
803        // Broadcast the signature back to the validator.
804        let self_ = self.clone();
805        tokio::spawn(async move {
806            let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
807            // Send the batch signature to the peer.
808            if self_.gateway.send(peer_ip, event).await.is_some() {
809                debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
810            }
811        });
812        Ok(())
813    }
814
815    /// Processes a batch signature from a peer.
816    ///
817    /// This method performs the following steps:
818    /// 1. Ensure the proposed batch has not expired.
819    /// 2. Verify the signature, ensuring it corresponds to the proposed batch.
820    /// 3. Store the signature.
821    /// 4. Certify the batch if enough signatures have been received.
822    /// 5. Broadcast the batch certificate to all validators.
823    async fn process_batch_signature_from_peer(
824        &self,
825        peer_ip: SocketAddr,
826        batch_signature: BatchSignature<N>,
827    ) -> Result<()> {
828        // Ensure the proposed batch has not expired, and clear the proposed batch if it has expired.
829        self.check_proposed_batch_for_expiration().await?;
830
831        // Retrieve the signature and timestamp.
832        let BatchSignature { batch_id, signature } = batch_signature;
833
834        // Retrieve the signer.
835        let signer = signature.to_address();
836
837        // Ensure the batch signature is signed by the validator.
838        if self.gateway.resolver().get_address(peer_ip).map_or(true, |address| address != signer) {
839            // Proceed to disconnect the validator.
840            self.gateway.disconnect(peer_ip);
841            bail!("Malicious peer - batch signature is from a different validator ({signer})");
842        }
843        // Ensure the batch signature is not from the current primary.
844        if self.gateway.account().address() == signer {
845            bail!("Invalid peer - received a batch signature from myself ({signer})");
846        }
847
848        let self_ = self.clone();
849        let Some(proposal) = spawn_blocking!({
850            // Acquire the write lock.
851            let mut proposed_batch = self_.proposed_batch.write();
852            // Add the signature to the batch, and determine if the batch is ready to be certified.
853            match proposed_batch.as_mut() {
854                Some(proposal) => {
855                    // Ensure the batch ID matches the currently proposed batch ID.
856                    if proposal.batch_id() != batch_id {
857                        match self_.storage.contains_batch(batch_id) {
858                            // If this batch was already certified, return early.
859                            true => {
860                                debug!(
861                                    "Primary is safely skipping a a batch signature from {peer_ip} for round {} - batch is already certified",
862                                    proposal.round()
863                                );
864                                return Ok(None);
865                            }
866                            // If the batch ID is unknown, return an error.
867                            false => bail!(
868                                "Unknown batch ID '{batch_id}', expected '{}' for round {}",
869                                proposal.batch_id(),
870                                proposal.round()
871                            ),
872                        }
873                    }
874                    // Retrieve the committee lookback for the round.
875                    let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?;
876                    // Retrieve the address of the validator.
877                    let Some(signer) = self_.gateway.resolver().get_address(peer_ip) else {
878                        bail!("Signature is from a disconnected validator");
879                    };
880                    // Add the signature to the batch.
881                    proposal.add_signature(signer, signature, &committee_lookback)?;
882                    info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
883                    // Check if the batch is ready to be certified.
884                    if !proposal.is_quorum_threshold_reached(&committee_lookback) {
885                        // If the batch is not ready to be certified, return early.
886                        return Ok(None);
887                    }
888                }
889                // There is no proposed batch, so return early.
890                None => return Ok(None),
891            };
892            // Retrieve the batch proposal, clearing the proposed batch.
893            match proposed_batch.take() {
894                Some(proposal) => Ok(Some(proposal)),
895                None => Ok(None),
896            }
897        })?
898        else {
899            return Ok(());
900        };
901
902        /* Proceeding to certify the batch. */
903
904        info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
905
906        // Retrieve the committee lookback for the round.
907        let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
908        // Store the certified batch and broadcast it to all validators.
909        // If there was an error storing the certificate, reinsert the transmissions back into the ready queue.
910        if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
911            // Reinsert the transmissions back into the ready queue for the next proposal.
912            self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
913            return Err(e);
914        }
915
916        #[cfg(feature = "metrics")]
917        metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
918        Ok(())
919    }
920
921    /// Processes a batch certificate from a peer.
922    ///
923    /// This method performs the following steps:
924    /// 1. Stores the given batch certificate, after ensuring it is valid.
925    /// 2. If there are enough certificates to reach quorum threshold for the current round,
926    ///     then proceed to advance to the next round.
927    async fn process_batch_certificate_from_peer(
928        &self,
929        peer_ip: SocketAddr,
930        certificate: BatchCertificate<N>,
931    ) -> Result<()> {
932        // Ensure the batch certificate is from an authorized validator.
933        if !self.gateway.is_authorized_validator_ip(peer_ip) {
934            // Proceed to disconnect the validator.
935            self.gateway.disconnect(peer_ip);
936            bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
937        }
938        // Ensure storage does not already contain the certificate.
939        if self.storage.contains_certificate(certificate.id()) {
940            return Ok(());
941        // Otherwise, ensure ephemeral storage contains the certificate.
942        } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
943            self.storage.insert_unprocessed_certificate(certificate.clone())?;
944        }
945
946        // Retrieve the batch certificate author.
947        let author = certificate.author();
948        // Retrieve the batch certificate round.
949        let certificate_round = certificate.round();
950        // Retrieve the batch certificate committee ID.
951        let committee_id = certificate.committee_id();
952
953        // Ensure the batch certificate is not from the current primary.
954        if self.gateway.account().address() == author {
955            bail!("Received a batch certificate for myself ({author})");
956        }
957
958        // Store the certificate, after ensuring it is valid.
959        self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
960
961        // If there are enough certificates to reach quorum threshold for the certificate round,
962        // then proceed to advance to the next round.
963
964        // Retrieve the committee lookback.
965        let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
966        // Retrieve the certificate authors.
967        let authors = self.storage.get_certificate_authors_for_round(certificate_round);
968        // Check if the certificates have reached the quorum threshold.
969        let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
970
971        // Ensure that the batch certificate's committee ID matches the expected committee ID.
972        let expected_committee_id = committee_lookback.id();
973        if expected_committee_id != committee_id {
974            // Proceed to disconnect the validator.
975            self.gateway.disconnect(peer_ip);
976            bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
977        }
978
979        // Determine if we are currently proposing a round that is relevant.
980        // Note: This is important, because while our peers have advanced,
981        // they may not be proposing yet, and thus still able to sign our proposed batch.
982        let should_advance = match &*self.proposed_batch.read() {
983            // We advance if the proposal round is less than the current round that was just certified.
984            Some(proposal) => proposal.round() < certificate_round,
985            // If there's no proposal, we consider advancing.
986            None => true,
987        };
988
989        // Retrieve the current round.
990        let current_round = self.current_round();
991
992        // Determine whether to advance to the next round.
993        if is_quorum && should_advance && certificate_round >= current_round {
994            // If we have reached the quorum threshold and the round should advance, then proceed to the next round.
995            self.try_increment_to_the_next_round(current_round + 1).await?;
996        }
997        Ok(())
998    }
999}
1000
1001impl<N: Network> Primary<N> {
1002    /// Starts the primary handlers.
1003    fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
1004        let PrimaryReceiver {
1005            mut rx_batch_propose,
1006            mut rx_batch_signature,
1007            mut rx_batch_certified,
1008            mut rx_primary_ping,
1009            mut rx_unconfirmed_solution,
1010            mut rx_unconfirmed_transaction,
1011        } = primary_receiver;
1012
1013        // Start the primary ping.
1014        let self_ = self.clone();
1015        self.spawn(async move {
1016            loop {
1017                // Sleep briefly.
1018                tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
1019
1020                // Retrieve the block locators.
1021                let self__ = self_.clone();
1022                let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1023                    Ok(block_locators) => block_locators,
1024                    Err(e) => {
1025                        warn!("Failed to retrieve block locators - {e}");
1026                        continue;
1027                    }
1028                };
1029
1030                // Retrieve the latest certificate of the primary.
1031                let primary_certificate = {
1032                    // Retrieve the primary address.
1033                    let primary_address = self_.gateway.account().address();
1034
1035                    // Iterate backwards from the latest round to find the primary certificate.
1036                    let mut certificate = None;
1037                    let mut current_round = self_.current_round();
1038                    while certificate.is_none() {
1039                        // If the current round is 0, then break the while loop.
1040                        if current_round == 0 {
1041                            break;
1042                        }
1043                        // Retrieve the primary certificates.
1044                        if let Some(primary_certificate) =
1045                            self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1046                        {
1047                            certificate = Some(primary_certificate);
1048                        // If the primary certificate was not found, decrement the round.
1049                        } else {
1050                            current_round = current_round.saturating_sub(1);
1051                        }
1052                    }
1053
1054                    // Determine if the primary certificate was found.
1055                    match certificate {
1056                        Some(certificate) => certificate,
1057                        // Skip this iteration of the loop (do not send a primary ping).
1058                        None => continue,
1059                    }
1060                };
1061
1062                // Construct the primary ping.
1063                let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1064                // Broadcast the event.
1065                self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1066            }
1067        });
1068
1069        // Start the primary ping handler.
1070        let self_ = self.clone();
1071        self.spawn(async move {
1072            while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1073                // If the primary is not synced, then do not process the primary ping.
1074                if !self_.sync.is_synced() {
1075                    trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1076                    continue;
1077                }
1078
1079                // Spawn a task to process the primary certificate.
1080                {
1081                    let self_ = self_.clone();
1082                    tokio::spawn(async move {
1083                        // Deserialize the primary certificate in the primary ping.
1084                        let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1085                        else {
1086                            warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1087                            return;
1088                        };
1089                        // Process the primary certificate.
1090                        let id = fmt_id(primary_certificate.id());
1091                        let round = primary_certificate.round();
1092                        if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1093                            warn!("Cannot process a primary certificate '{id}' at round {round} in a 'PrimaryPing' from '{peer_ip}' - {e}");
1094                        }
1095                    });
1096                }
1097            }
1098        });
1099
1100        // Start the worker ping(s).
1101        let self_ = self.clone();
1102        self.spawn(async move {
1103            loop {
1104                tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await;
1105                // If the primary is not synced, then do not broadcast the worker ping(s).
1106                if !self_.sync.is_synced() {
1107                    trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1108                    continue;
1109                }
1110                // Broadcast the worker ping(s).
1111                for worker in self_.workers.iter() {
1112                    worker.broadcast_ping();
1113                }
1114            }
1115        });
1116
1117        // Start the batch proposer.
1118        let self_ = self.clone();
1119        self.spawn(async move {
1120            loop {
1121                // Sleep briefly, but longer than if there were no batch.
1122                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1123                let current_round = self_.current_round();
1124                // If the primary is not synced, then do not propose a batch.
1125                if !self_.sync.is_synced() {
1126                    debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed());
1127                    continue;
1128                }
1129                // A best-effort attempt to skip the scheduled batch proposal if
1130                // round progression already triggered one.
1131                if self_.propose_lock.try_lock().is_err() {
1132                    trace!(
1133                        "Skipping batch proposal for round {current_round} {}",
1134                        "(node is already proposing)".dimmed()
1135                    );
1136                    continue;
1137                };
1138                // If there is no proposed batch, attempt to propose a batch.
1139                // Note: Do NOT spawn a task around this function call. Proposing a batch is a critical path,
1140                // and only one batch needs be proposed at a time.
1141                if let Err(e) = self_.propose_batch().await {
1142                    warn!("Cannot propose a batch - {e}");
1143                }
1144            }
1145        });
1146
1147        // Process the proposed batch.
1148        let self_ = self.clone();
1149        self.spawn(async move {
1150            while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1151                // If the primary is not synced, then do not sign the batch.
1152                if !self_.sync.is_synced() {
1153                    trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1154                    continue;
1155                }
1156                // Spawn a task to process the proposed batch.
1157                let self_ = self_.clone();
1158                tokio::spawn(async move {
1159                    // Process the batch proposal.
1160                    let round = batch_propose.round;
1161                    if let Err(e) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1162                        warn!("Cannot sign a batch at round {round} from '{peer_ip}' - {e}");
1163                    }
1164                });
1165            }
1166        });
1167
1168        // Process the batch signature.
1169        let self_ = self.clone();
1170        self.spawn(async move {
1171            while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1172                // If the primary is not synced, then do not store the signature.
1173                if !self_.sync.is_synced() {
1174                    trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1175                    continue;
1176                }
1177                // Process the batch signature.
1178                // Note: Do NOT spawn a task around this function call. Processing signatures from peers
1179                // is a critical path, and we should only store the minimum required number of signatures.
1180                // In addition, spawning a task can cause concurrent processing of signatures (even with a lock),
1181                // which means the RwLock for the proposed batch must become a 'tokio::sync' to be safe.
1182                let id = fmt_id(batch_signature.batch_id);
1183                if let Err(e) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1184                    warn!("Cannot store a signature for batch '{id}' from '{peer_ip}' - {e}");
1185                }
1186            }
1187        });
1188
1189        // Process the certified batch.
1190        let self_ = self.clone();
1191        self.spawn(async move {
1192            while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1193                // If the primary is not synced, then do not store the certificate.
1194                if !self_.sync.is_synced() {
1195                    trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1196                    continue;
1197                }
1198                // Spawn a task to process the batch certificate.
1199                let self_ = self_.clone();
1200                tokio::spawn(async move {
1201                    // Deserialize the batch certificate.
1202                    let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1203                        warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1204                        return;
1205                    };
1206                    // Process the batch certificate.
1207                    let id = fmt_id(batch_certificate.id());
1208                    let round = batch_certificate.round();
1209                    if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1210                        warn!("Cannot store a certificate '{id}' for round {round} from '{peer_ip}' - {e}");
1211                    }
1212                });
1213            }
1214        });
1215
1216        // Periodically try to increment to the next round.
1217        // Note: This is necessary to ensure that the primary is not stuck on a previous round
1218        // despite having received enough certificates to advance to the next round.
1219        let self_ = self.clone();
1220        self.spawn(async move {
1221            loop {
1222                // Sleep briefly.
1223                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1224                // If the primary is not synced, then do not increment to the next round.
1225                if !self_.sync.is_synced() {
1226                    trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1227                    continue;
1228                }
1229                // Attempt to increment to the next round.
1230                let next_round = self_.current_round().saturating_add(1);
1231                // Determine if the quorum threshold is reached for the current round.
1232                let is_quorum_threshold_reached = {
1233                    // Retrieve the certificate authors for the next round.
1234                    let authors = self_.storage.get_certificate_authors_for_round(next_round);
1235                    // If there are no certificates, then skip this check.
1236                    if authors.is_empty() {
1237                        continue;
1238                    }
1239                    let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(next_round) else {
1240                        warn!("Failed to retrieve the committee lookback for round {next_round}");
1241                        continue;
1242                    };
1243                    committee_lookback.is_quorum_threshold_reached(&authors)
1244                };
1245                // Attempt to increment to the next round if the quorum threshold is reached.
1246                if is_quorum_threshold_reached {
1247                    debug!("Quorum threshold reached for round {}", next_round);
1248                    if let Err(e) = self_.try_increment_to_the_next_round(next_round).await {
1249                        warn!("Failed to increment to the next round - {e}");
1250                    }
1251                }
1252            }
1253        });
1254
1255        // Process the unconfirmed solutions.
1256        let self_ = self.clone();
1257        self.spawn(async move {
1258            while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1259                // Compute the checksum for the solution.
1260                let Ok(checksum) = solution.to_checksum::<N>() else {
1261                    error!("Failed to compute the checksum for the unconfirmed solution");
1262                    continue;
1263                };
1264                // Compute the worker ID.
1265                let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1266                    error!("Unable to determine the worker ID for the unconfirmed solution");
1267                    continue;
1268                };
1269                let self_ = self_.clone();
1270                tokio::spawn(async move {
1271                    // Retrieve the worker.
1272                    let worker = &self_.workers[worker_id as usize];
1273                    // Process the unconfirmed solution.
1274                    let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1275                    // Send the result to the callback.
1276                    callback.send(result).ok();
1277                });
1278            }
1279        });
1280
1281        // Process the unconfirmed transactions.
1282        let self_ = self.clone();
1283        self.spawn(async move {
1284            while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1285                trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1286                // Compute the checksum for the transaction.
1287                let Ok(checksum) = transaction.to_checksum::<N>() else {
1288                    error!("Failed to compute the checksum for the unconfirmed transaction");
1289                    continue;
1290                };
1291                // Compute the worker ID.
1292                let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1293                    error!("Unable to determine the worker ID for the unconfirmed transaction");
1294                    continue;
1295                };
1296                let self_ = self_.clone();
1297                tokio::spawn(async move {
1298                    // Retrieve the worker.
1299                    let worker = &self_.workers[worker_id as usize];
1300                    // Process the unconfirmed transaction.
1301                    let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1302                    // Send the result to the callback.
1303                    callback.send(result).ok();
1304                });
1305            }
1306        });
1307    }
1308
1309    /// Checks if the proposed batch is expired, and clears the proposed batch if it has expired.
1310    async fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1311        // Check if the proposed batch is timed out or stale.
1312        let is_expired = match self.proposed_batch.read().as_ref() {
1313            Some(proposal) => proposal.round() < self.current_round(),
1314            None => false,
1315        };
1316        // If the batch is expired, clear the proposed batch.
1317        if is_expired {
1318            // Reset the proposed batch.
1319            let proposal = self.proposed_batch.write().take();
1320            if let Some(proposal) = proposal {
1321                debug!("Cleared expired proposal for round {}", proposal.round());
1322                self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1323            }
1324        }
1325        Ok(())
1326    }
1327
1328    /// Increments to the next round.
1329    async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1330        // If the next round is within GC range, then iterate to the penultimate round.
1331        if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1332            let mut fast_forward_round = self.current_round();
1333            // Iterate until the penultimate round is reached.
1334            while fast_forward_round < next_round.saturating_sub(1) {
1335                // Update to the next round in storage.
1336                fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1337                // Clear the proposed batch.
1338                *self.proposed_batch.write() = None;
1339            }
1340        }
1341
1342        // Retrieve the current round.
1343        let current_round = self.current_round();
1344        // Attempt to advance to the next round.
1345        if current_round < next_round {
1346            // If a BFT sender was provided, send the current round to the BFT.
1347            let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
1348                match bft_sender.send_primary_round_to_bft(current_round).await {
1349                    Ok(is_ready) => is_ready,
1350                    Err(e) => {
1351                        warn!("Failed to update the BFT to the next round - {e}");
1352                        return Err(e);
1353                    }
1354                }
1355            }
1356            // Otherwise, handle the Narwhal case.
1357            else {
1358                // Update to the next round in storage.
1359                self.storage.increment_to_next_round(current_round)?;
1360                // Set 'is_ready' to 'true'.
1361                true
1362            };
1363
1364            // Log whether the next round is ready.
1365            match is_ready {
1366                true => debug!("Primary is ready to propose the next round"),
1367                false => debug!("Primary is not ready to propose the next round"),
1368            }
1369
1370            // If the node is ready, propose a batch for the next round.
1371            if is_ready {
1372                self.propose_batch().await?;
1373            }
1374        }
1375        Ok(())
1376    }
1377
1378    /// Ensures the primary is signing for the specified batch round.
1379    /// This method is used to ensure: for a given round, as soon as the primary starts proposing,
1380    /// it will no longer sign for the previous round (as it has enough previous certificates to proceed).
1381    fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1382        // Retrieve the current round.
1383        let current_round = self.current_round();
1384        // Ensure the batch round is within GC range of the current round.
1385        if current_round + self.storage.max_gc_rounds() <= batch_round {
1386            bail!("Round {batch_round} is too far in the future")
1387        }
1388        // Ensure the batch round is at or one before the current round.
1389        // Intuition: Our primary has moved on to the next round, but has not necessarily started proposing,
1390        // so we can still sign for the previous round. If we have started proposing, the next check will fail.
1391        if current_round > batch_round + 1 {
1392            bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1393        }
1394        // Check if the primary is still signing for the batch round.
1395        if let Some(signing_round) = self.proposed_batch.read().as_ref().map(|proposal| proposal.round()) {
1396            if signing_round > batch_round {
1397                bail!("Our primary at round {signing_round} is no longer signing for round {batch_round}")
1398            }
1399        }
1400        Ok(())
1401    }
1402
1403    /// Ensure the primary is not creating batch proposals too frequently.
1404    /// This checks that the certificate timestamp for the previous round is within the expected range.
1405    fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1406        // Retrieve the timestamp of the previous timestamp to check against.
1407        let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1408            // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
1409            Some(certificate) => certificate.timestamp(),
1410            None => match self.gateway.account().address() == author {
1411                // If we are the author, then ensure the previous proposal was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
1412                true => *self.latest_proposed_batch_timestamp.read(),
1413                // If we do not see a previous certificate for the author, then proceed optimistically.
1414                false => return Ok(()),
1415            },
1416        };
1417
1418        // Determine the elapsed time since the previous timestamp.
1419        let elapsed = timestamp
1420            .checked_sub(previous_timestamp)
1421            .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1422        // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
1423        match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1424            true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1425            false => Ok(()),
1426        }
1427    }
1428
1429    /// Stores the certified batch and broadcasts it to all validators, returning the certificate.
1430    async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1431        // Create the batch certificate and transmissions.
1432        let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1433        // Convert the transmissions into a HashMap.
1434        // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
1435        let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1436        // Store the certified batch.
1437        let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1438        spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1439        debug!("Stored a batch certificate for round {}", certificate.round());
1440        // If a BFT sender was provided, send the certificate to the BFT.
1441        if let Some(bft_sender) = self.bft_sender.get() {
1442            // Await the callback to continue.
1443            if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1444                warn!("Failed to update the BFT DAG from primary - {e}");
1445                return Err(e);
1446            };
1447        }
1448        // Broadcast the certified batch to all validators.
1449        self.gateway.broadcast(Event::BatchCertified(certificate.clone().into()));
1450        // Log the certified batch.
1451        let num_transmissions = certificate.transmission_ids().len();
1452        let round = certificate.round();
1453        info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1454        // Increment to the next round.
1455        self.try_increment_to_the_next_round(round + 1).await
1456    }
1457
1458    /// Inserts the missing transmissions from the proposal into the workers.
1459    fn insert_missing_transmissions_into_workers(
1460        &self,
1461        peer_ip: SocketAddr,
1462        transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1463    ) -> Result<()> {
1464        // Insert the transmissions into the workers.
1465        assign_to_workers(&self.workers, transmissions, |worker, transmission_id, transmission| {
1466            worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1467        })
1468    }
1469
1470    /// Re-inserts the transmissions from the proposal into the workers.
1471    fn reinsert_transmissions_into_workers(
1472        &self,
1473        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1474    ) -> Result<()> {
1475        // Re-insert the transmissions into the workers.
1476        assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1477            worker.reinsert(transmission_id, transmission);
1478        })
1479    }
1480
1481    /// Recursively stores a given batch certificate, after ensuring:
1482    ///   - Ensure the round matches the committee round.
1483    ///   - Ensure the address is a member of the committee.
1484    ///   - Ensure the timestamp is within range.
1485    ///   - Ensure we have all of the transmissions.
1486    ///   - Ensure we have all of the previous certificates.
1487    ///   - Ensure the previous certificates are for the previous round (i.e. round - 1).
1488    ///   - Ensure the previous certificates have reached the quorum threshold.
1489    ///   - Ensure we have not already signed the batch ID.
1490    #[async_recursion::async_recursion]
1491    async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1492        &self,
1493        peer_ip: SocketAddr,
1494        certificate: BatchCertificate<N>,
1495    ) -> Result<()> {
1496        // Retrieve the batch header.
1497        let batch_header = certificate.batch_header();
1498        // Retrieve the batch round.
1499        let batch_round = batch_header.round();
1500
1501        // If the certificate round is outdated, do not store it.
1502        if batch_round <= self.storage.gc_round() {
1503            return Ok(());
1504        }
1505        // If the certificate already exists in storage, return early.
1506        if self.storage.contains_certificate(certificate.id()) {
1507            return Ok(());
1508        }
1509
1510        // If node is not in sync mode and the node is not synced. Then return an error.
1511        if !IS_SYNCING && !self.is_synced() {
1512            bail!(
1513                "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1514                fmt_id(certificate.id())
1515            );
1516        }
1517
1518        // If the peer is ahead, use the batch header to sync up to the peer.
1519        let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?;
1520
1521        // Check if the certificate needs to be stored.
1522        if !self.storage.contains_certificate(certificate.id()) {
1523            // Store the batch certificate.
1524            let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1525            spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1526            debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1527            // If a BFT sender was provided, send the round and certificate to the BFT.
1528            if let Some(bft_sender) = self.bft_sender.get() {
1529                // Send the certificate to the BFT.
1530                if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1531                    warn!("Failed to update the BFT DAG from sync: {e}");
1532                    return Err(e);
1533                };
1534            }
1535        }
1536        Ok(())
1537    }
1538
1539    /// Recursively syncs using the given batch header.
1540    async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>(
1541        &self,
1542        peer_ip: SocketAddr,
1543        batch_header: &BatchHeader<N>,
1544    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1545        // Retrieve the batch round.
1546        let batch_round = batch_header.round();
1547
1548        // If the certificate round is outdated, do not store it.
1549        if batch_round <= self.storage.gc_round() {
1550            bail!("Round {batch_round} is too far in the past")
1551        }
1552
1553        // If node is not in sync mode and the node is not synced. Then return an error.
1554        if !IS_SYNCING && !self.is_synced() {
1555            bail!(
1556                "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1557                fmt_id(batch_header.batch_id())
1558            );
1559        }
1560
1561        // Determine if quorum threshold is reached on the batch round.
1562        let is_quorum_threshold_reached = {
1563            let authors = self.storage.get_certificate_authors_for_round(batch_round);
1564            let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1565            committee_lookback.is_quorum_threshold_reached(&authors)
1566        };
1567
1568        // Check if our primary should move to the next round.
1569        // Note: Checking that quorum threshold is reached is important for mitigating a race condition,
1570        // whereby Narwhal requires N-f, however the BFT only requires f+1. Without this check, the primary
1571        // will advance to the next round assuming f+1, not N-f, which can lead to a network stall.
1572        let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1573        // Check if our primary is far behind the peer.
1574        let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1575        // If our primary is far behind the peer, update our committee to the batch round.
1576        if is_behind_schedule || is_peer_far_in_future {
1577            // If the batch round is greater than the current committee round, update the committee.
1578            self.try_increment_to_the_next_round(batch_round).await?;
1579        }
1580
1581        // Ensure the primary has all of the transmissions.
1582        let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1583
1584        // Ensure the primary has all of the previous certificates.
1585        let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1586
1587        // Wait for the missing transmissions and previous certificates to be fetched.
1588        let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1589            missing_transmissions_handle,
1590            missing_previous_certificates_handle,
1591        ).map_err(|e| {
1592            anyhow!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}' - {e}")
1593        })?;
1594
1595        // Iterate through the missing previous certificates.
1596        for batch_certificate in missing_previous_certificates {
1597            // Store the batch certificate (recursively fetching any missing previous certificates).
1598            self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?;
1599        }
1600        Ok(missing_transmissions)
1601    }
1602
1603    /// Fetches any missing transmissions for the specified batch header.
1604    /// If a transmission does not exist, it will be fetched from the specified peer IP.
1605    async fn fetch_missing_transmissions(
1606        &self,
1607        peer_ip: SocketAddr,
1608        batch_header: &BatchHeader<N>,
1609    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1610        // If the round is <= the GC round, return early.
1611        if batch_header.round() <= self.storage.gc_round() {
1612            return Ok(Default::default());
1613        }
1614
1615        // Ensure this batch ID is new, otherwise return early.
1616        if self.storage.contains_batch(batch_header.batch_id()) {
1617            trace!("Batch for round {} from peer has already been processed", batch_header.round());
1618            return Ok(Default::default());
1619        }
1620
1621        // Retrieve the workers.
1622        let workers = self.workers.clone();
1623
1624        // Initialize a list for the transmissions.
1625        let mut fetch_transmissions = FuturesUnordered::new();
1626
1627        // Retrieve the number of workers.
1628        let num_workers = self.num_workers();
1629        // Iterate through the transmission IDs.
1630        for transmission_id in batch_header.transmission_ids() {
1631            // If the transmission does not exist in storage, proceed to fetch the transmission.
1632            if !self.storage.contains_transmission(*transmission_id) {
1633                // Determine the worker ID.
1634                let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1635                    bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1636                };
1637                // Retrieve the worker.
1638                let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1639                // Push the callback onto the list.
1640                fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
1641            }
1642        }
1643
1644        // Initialize a set for the transmissions.
1645        let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1646        // Wait for all of the transmissions to be fetched.
1647        while let Some(result) = fetch_transmissions.next().await {
1648            // Retrieve the transmission.
1649            let (transmission_id, transmission) = result?;
1650            // Insert the transmission into the set.
1651            transmissions.insert(transmission_id, transmission);
1652        }
1653        // Return the transmissions.
1654        Ok(transmissions)
1655    }
1656
1657    /// Fetches any missing previous certificates for the specified batch header from the specified peer.
1658    async fn fetch_missing_previous_certificates(
1659        &self,
1660        peer_ip: SocketAddr,
1661        batch_header: &BatchHeader<N>,
1662    ) -> Result<HashSet<BatchCertificate<N>>> {
1663        // Retrieve the round.
1664        let round = batch_header.round();
1665        // If the previous round is 0, or is <= the GC round, return early.
1666        if round == 1 || round <= self.storage.gc_round() + 1 {
1667            return Ok(Default::default());
1668        }
1669
1670        // Fetch the missing previous certificates.
1671        let missing_previous_certificates =
1672            self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
1673        if !missing_previous_certificates.is_empty() {
1674            debug!(
1675                "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
1676                missing_previous_certificates.len(),
1677            );
1678        }
1679        // Return the missing previous certificates.
1680        Ok(missing_previous_certificates)
1681    }
1682
1683    /// Fetches any missing certificates for the specified batch header from the specified peer.
1684    async fn fetch_missing_certificates(
1685        &self,
1686        peer_ip: SocketAddr,
1687        round: u64,
1688        certificate_ids: &IndexSet<Field<N>>,
1689    ) -> Result<HashSet<BatchCertificate<N>>> {
1690        // Initialize a list for the missing certificates.
1691        let mut fetch_certificates = FuturesUnordered::new();
1692        // Initialize a set for the missing certificates.
1693        let mut missing_certificates = HashSet::default();
1694        // Iterate through the certificate IDs.
1695        for certificate_id in certificate_ids {
1696            // Check if the certificate already exists in the ledger.
1697            if self.ledger.contains_certificate(certificate_id)? {
1698                continue;
1699            }
1700            // Check if the certificate already exists in storage.
1701            if self.storage.contains_certificate(*certificate_id) {
1702                continue;
1703            }
1704            // If we have not fully processed the certificate yet, store it.
1705            if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
1706                missing_certificates.insert(certificate);
1707            } else {
1708                // If we do not have the certificate, request it.
1709                trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
1710                // TODO (howardwu): Limit the number of open requests we send to a peer.
1711                // Send an certificate request to the peer.
1712                fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
1713            }
1714        }
1715
1716        // If there are no certificates to fetch, return early with the existing unprocessed certificates.
1717        match fetch_certificates.is_empty() {
1718            true => return Ok(missing_certificates),
1719            false => trace!(
1720                "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
1721                fetch_certificates.len(),
1722            ),
1723        }
1724
1725        // Wait for all of the missing certificates to be fetched.
1726        while let Some(result) = fetch_certificates.next().await {
1727            // Insert the missing certificate into the set.
1728            missing_certificates.insert(result?);
1729        }
1730        // Return the missing certificates.
1731        Ok(missing_certificates)
1732    }
1733}
1734
1735impl<N: Network> Primary<N> {
1736    /// Spawns a task with the given future; it should only be used for long-running tasks.
1737    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1738        self.handles.lock().push(tokio::spawn(future));
1739    }
1740
1741    /// Shuts down the primary.
1742    pub async fn shut_down(&self) {
1743        info!("Shutting down the primary...");
1744        // Shut down the workers.
1745        self.workers.iter().for_each(|worker| worker.shut_down());
1746        // Abort the tasks.
1747        self.handles.lock().iter().for_each(|handle| handle.abort());
1748        // Save the current proposal cache to disk.
1749        let proposal_cache = {
1750            let proposal = self.proposed_batch.write().take();
1751            let signed_proposals = self.signed_proposals.read().clone();
1752            let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
1753            let pending_certificates = self.storage.get_pending_certificates();
1754            ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
1755        };
1756        if let Err(err) = proposal_cache.store(self.gateway.dev()) {
1757            error!("Failed to store the current proposal cache: {err}");
1758        }
1759        // Close the gateway.
1760        self.gateway.shut_down().await;
1761    }
1762}
1763
1764#[cfg(test)]
1765mod tests {
1766    use super::*;
1767    use snarkos_node_bft_ledger_service::MockLedgerService;
1768    use snarkos_node_bft_storage_service::BFTMemoryService;
1769    use snarkvm::{
1770        ledger::committee::{Committee, MIN_VALIDATOR_STAKE},
1771        prelude::{Address, Signature},
1772    };
1773
1774    use bytes::Bytes;
1775    use indexmap::IndexSet;
1776    use rand::RngCore;
1777
1778    type CurrentNetwork = snarkvm::prelude::MainnetV0;
1779
1780    // Returns a primary and a list of accounts in the configured committee.
1781    async fn primary_without_handlers(
1782        rng: &mut TestRng,
1783    ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
1784        // Create a committee containing the primary's account.
1785        let (accounts, committee) = {
1786            const COMMITTEE_SIZE: usize = 4;
1787            let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
1788            let mut members = IndexMap::new();
1789
1790            for i in 0..COMMITTEE_SIZE {
1791                let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
1792                let account = Account::new(rng).unwrap();
1793                members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
1794                accounts.push((socket_addr, account));
1795            }
1796
1797            (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
1798        };
1799
1800        let account = accounts.first().unwrap().1.clone();
1801        let ledger = Arc::new(MockLedgerService::new(committee));
1802        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1803
1804        // Initialize the primary.
1805        let mut primary = Primary::new(account, storage, ledger, None, &[], None).unwrap();
1806
1807        // Construct a worker instance.
1808        primary.workers = Arc::from([Worker::new(
1809            0, // id
1810            Arc::new(primary.gateway.clone()),
1811            primary.storage.clone(),
1812            primary.ledger.clone(),
1813            primary.proposed_batch.clone(),
1814        )
1815        .unwrap()]);
1816        for a in accounts.iter() {
1817            primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
1818        }
1819
1820        (primary, accounts)
1821    }
1822
1823    // Creates a mock solution.
1824    fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
1825        // Sample a random fake solution ID.
1826        let solution_id = rng.gen::<u64>().into();
1827        // Vary the size of the solutions.
1828        let size = rng.gen_range(1024..10 * 1024);
1829        // Sample random fake solution bytes.
1830        let mut vec = vec![0u8; size];
1831        rng.fill_bytes(&mut vec);
1832        let solution = Data::Buffer(Bytes::from(vec));
1833        // Return the solution ID and solution.
1834        (solution_id, solution)
1835    }
1836
1837    // Creates a mock transaction.
1838    fn sample_unconfirmed_transaction(
1839        rng: &mut TestRng,
1840    ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
1841        // Sample a random fake transaction ID.
1842        let id = Field::<CurrentNetwork>::rand(rng).into();
1843        // Vary the size of the transactions.
1844        let size = rng.gen_range(1024..10 * 1024);
1845        // Sample random fake transaction bytes.
1846        let mut vec = vec![0u8; size];
1847        rng.fill_bytes(&mut vec);
1848        let transaction = Data::Buffer(Bytes::from(vec));
1849        // Return the ID and transaction.
1850        (id, transaction)
1851    }
1852
1853    // Creates a batch proposal with one solution and one transaction.
1854    fn create_test_proposal(
1855        author: &Account<CurrentNetwork>,
1856        committee: Committee<CurrentNetwork>,
1857        round: u64,
1858        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1859        timestamp: i64,
1860        rng: &mut TestRng,
1861    ) -> Proposal<CurrentNetwork> {
1862        let (solution_id, solution) = sample_unconfirmed_solution(rng);
1863        let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1864        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1865        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1866
1867        let solution_transmission_id = (solution_id, solution_checksum).into();
1868        let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1869
1870        // Retrieve the private key.
1871        let private_key = author.private_key();
1872        // Prepare the transmission IDs.
1873        let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
1874        let transmissions = [
1875            (solution_transmission_id, Transmission::Solution(solution)),
1876            (transaction_transmission_id, Transmission::Transaction(transaction)),
1877        ]
1878        .into();
1879        // Sign the batch header.
1880        let batch_header = BatchHeader::new(
1881            private_key,
1882            round,
1883            timestamp,
1884            committee.id(),
1885            transmission_ids,
1886            previous_certificate_ids,
1887            rng,
1888        )
1889        .unwrap();
1890        // Construct the proposal.
1891        Proposal::new(committee, batch_header, transmissions).unwrap()
1892    }
1893
1894    // Creates a signature of the primary's current proposal for each committee member (excluding
1895    // the primary).
1896    fn peer_signatures_for_proposal(
1897        primary: &Primary<CurrentNetwork>,
1898        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1899        rng: &mut TestRng,
1900    ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
1901        // Each committee member signs the batch.
1902        let mut signatures = Vec::with_capacity(accounts.len() - 1);
1903        for (socket_addr, account) in accounts {
1904            if account.address() == primary.gateway.account().address() {
1905                continue;
1906            }
1907            let batch_id = primary.proposed_batch.read().as_ref().unwrap().batch_id();
1908            let signature = account.sign(&[batch_id], rng).unwrap();
1909            signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
1910        }
1911
1912        signatures
1913    }
1914
1915    /// Creates a signature of the batch ID for each committee member (excluding the primary).
1916    fn peer_signatures_for_batch(
1917        primary_address: Address<CurrentNetwork>,
1918        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1919        batch_id: Field<CurrentNetwork>,
1920        rng: &mut TestRng,
1921    ) -> IndexSet<Signature<CurrentNetwork>> {
1922        let mut signatures = IndexSet::new();
1923        for (_, account) in accounts {
1924            if account.address() == primary_address {
1925                continue;
1926            }
1927            let signature = account.sign(&[batch_id], rng).unwrap();
1928            signatures.insert(signature);
1929        }
1930        signatures
1931    }
1932
1933    // Creates a batch certificate.
1934    fn create_batch_certificate(
1935        primary_address: Address<CurrentNetwork>,
1936        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1937        round: u64,
1938        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1939        rng: &mut TestRng,
1940    ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
1941        let timestamp = now();
1942
1943        let author =
1944            accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
1945        let private_key = author.private_key();
1946
1947        let committee_id = Field::rand(rng);
1948        let (solution_id, solution) = sample_unconfirmed_solution(rng);
1949        let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1950        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1951        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1952
1953        let solution_transmission_id = (solution_id, solution_checksum).into();
1954        let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1955
1956        let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
1957        let transmissions = [
1958            (solution_transmission_id, Transmission::Solution(solution)),
1959            (transaction_transmission_id, Transmission::Transaction(transaction)),
1960        ]
1961        .into();
1962
1963        let batch_header = BatchHeader::new(
1964            private_key,
1965            round,
1966            timestamp,
1967            committee_id,
1968            transmission_ids,
1969            previous_certificate_ids,
1970            rng,
1971        )
1972        .unwrap();
1973        let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
1974        let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
1975        (certificate, transmissions)
1976    }
1977
1978    // Create a certificate chain up to round in primary storage.
1979    fn store_certificate_chain(
1980        primary: &Primary<CurrentNetwork>,
1981        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1982        round: u64,
1983        rng: &mut TestRng,
1984    ) -> IndexSet<Field<CurrentNetwork>> {
1985        let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1986        let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1987        for cur_round in 1..round {
1988            for (_, account) in accounts.iter() {
1989                let (certificate, transmissions) = create_batch_certificate(
1990                    account.address(),
1991                    accounts,
1992                    cur_round,
1993                    previous_certificates.clone(),
1994                    rng,
1995                );
1996                next_certificates.insert(certificate.id());
1997                assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
1998            }
1999
2000            assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
2001            previous_certificates = next_certificates;
2002            next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
2003        }
2004
2005        previous_certificates
2006    }
2007
2008    // Insert the account socket addresses into the resolver so that
2009    // they are recognized as "connected".
2010    fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
2011        // First account is primary, which doesn't need to resolve.
2012        for (addr, acct) in accounts.iter().skip(1) {
2013            primary.gateway.resolver().insert_peer(*addr, *addr, acct.address());
2014        }
2015    }
2016
2017    #[tokio::test]
2018    async fn test_propose_batch() {
2019        let mut rng = TestRng::default();
2020        let (primary, _) = primary_without_handlers(&mut rng).await;
2021
2022        // Check there is no batch currently proposed.
2023        assert!(primary.proposed_batch.read().is_none());
2024
2025        // Generate a solution and a transaction.
2026        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2027        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2028
2029        // Store it on one of the workers.
2030        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2031        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2032
2033        // Try to propose a batch again. This time, it should succeed.
2034        assert!(primary.propose_batch().await.is_ok());
2035        assert!(primary.proposed_batch.read().is_some());
2036    }
2037
2038    #[tokio::test]
2039    async fn test_propose_batch_with_no_transmissions() {
2040        let mut rng = TestRng::default();
2041        let (primary, _) = primary_without_handlers(&mut rng).await;
2042
2043        // Check there is no batch currently proposed.
2044        assert!(primary.proposed_batch.read().is_none());
2045
2046        // Try to propose a batch with no transmissions.
2047        assert!(primary.propose_batch().await.is_ok());
2048        assert!(primary.proposed_batch.read().is_some());
2049    }
2050
2051    #[tokio::test]
2052    async fn test_propose_batch_in_round() {
2053        let round = 3;
2054        let mut rng = TestRng::default();
2055        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2056
2057        // Fill primary storage.
2058        store_certificate_chain(&primary, &accounts, round, &mut rng);
2059
2060        // Sleep for a while to ensure the primary is ready to propose the next round.
2061        tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2062
2063        // Generate a solution and a transaction.
2064        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2065        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2066
2067        // Store it on one of the workers.
2068        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2069        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2070
2071        // Propose a batch again. This time, it should succeed.
2072        assert!(primary.propose_batch().await.is_ok());
2073        assert!(primary.proposed_batch.read().is_some());
2074    }
2075
2076    #[tokio::test]
2077    async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2078        let round = 3;
2079        let prev_round = round - 1;
2080        let mut rng = TestRng::default();
2081        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2082        let peer_account = &accounts[1];
2083        let peer_ip = peer_account.0;
2084
2085        // Fill primary storage.
2086        store_certificate_chain(&primary, &accounts, round, &mut rng);
2087
2088        // Get transmissions from previous certificates.
2089        let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2090
2091        // Track the number of transmissions in the previous round.
2092        let mut num_transmissions_in_previous_round = 0;
2093
2094        // Generate a solution and a transaction.
2095        let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2096        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2097        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2098        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2099
2100        // Store it on one of the workers.
2101        primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2102        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2103
2104        // Check that the worker has 2 transmissions.
2105        assert_eq!(primary.workers[0].num_transmissions(), 2);
2106
2107        // Create certificates for the current round and add the transmissions to the worker before inserting the certificate to storage.
2108        for (_, account) in accounts.iter() {
2109            let (certificate, transmissions) = create_batch_certificate(
2110                account.address(),
2111                &accounts,
2112                round,
2113                previous_certificate_ids.clone(),
2114                &mut rng,
2115            );
2116
2117            // Add the transmissions to the worker.
2118            for (transmission_id, transmission) in transmissions.iter() {
2119                primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2120            }
2121
2122            // Insert the certificate to storage.
2123            num_transmissions_in_previous_round += transmissions.len();
2124            primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2125        }
2126
2127        // Sleep for a while to ensure the primary is ready to propose the next round.
2128        tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2129
2130        // Advance to the next round.
2131        assert!(primary.storage.increment_to_next_round(round).is_ok());
2132
2133        // Check that the worker has `num_transmissions_in_previous_round + 2` transmissions.
2134        assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2135
2136        // Propose the batch.
2137        assert!(primary.propose_batch().await.is_ok());
2138
2139        // Check that the proposal only contains the new transmissions that were not in previous certificates.
2140        let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone();
2141        assert_eq!(proposed_transmissions.len(), 2);
2142        assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2143        assert!(
2144            proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))
2145        );
2146    }
2147
2148    #[tokio::test]
2149    async fn test_batch_propose_from_peer() {
2150        let mut rng = TestRng::default();
2151        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2152
2153        // Create a valid proposal with an author that isn't the primary.
2154        let round = 1;
2155        let peer_account = &accounts[1];
2156        let peer_ip = peer_account.0;
2157        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2158        let proposal = create_test_proposal(
2159            &peer_account.1,
2160            primary.ledger.current_committee().unwrap(),
2161            round,
2162            Default::default(),
2163            timestamp,
2164            &mut rng,
2165        );
2166
2167        // Make sure the primary is aware of the transmissions in the proposal.
2168        for (transmission_id, transmission) in proposal.transmissions() {
2169            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2170        }
2171
2172        // The author must be known to resolver to pass propose checks.
2173        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2174        // The primary must be considered synced.
2175        primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2176
2177        // Try to process the batch proposal from the peer, should succeed.
2178        assert!(
2179            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
2180        );
2181    }
2182
2183    #[tokio::test]
2184    async fn test_batch_propose_from_peer_when_not_synced() {
2185        let mut rng = TestRng::default();
2186        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2187
2188        // Create a valid proposal with an author that isn't the primary.
2189        let round = 1;
2190        let peer_account = &accounts[1];
2191        let peer_ip = peer_account.0;
2192        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2193        let proposal = create_test_proposal(
2194            &peer_account.1,
2195            primary.ledger.current_committee().unwrap(),
2196            round,
2197            Default::default(),
2198            timestamp,
2199            &mut rng,
2200        );
2201
2202        // Make sure the primary is aware of the transmissions in the proposal.
2203        for (transmission_id, transmission) in proposal.transmissions() {
2204            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2205        }
2206
2207        // The author must be known to resolver to pass propose checks.
2208        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2209
2210        // Try to process the batch proposal from the peer, should fail.
2211        assert!(
2212            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2213        );
2214    }
2215
2216    #[tokio::test]
2217    async fn test_batch_propose_from_peer_in_round() {
2218        let round = 2;
2219        let mut rng = TestRng::default();
2220        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2221
2222        // Generate certificates.
2223        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2224
2225        // Create a valid proposal with an author that isn't the primary.
2226        let peer_account = &accounts[1];
2227        let peer_ip = peer_account.0;
2228        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2229        let proposal = create_test_proposal(
2230            &peer_account.1,
2231            primary.ledger.current_committee().unwrap(),
2232            round,
2233            previous_certificates,
2234            timestamp,
2235            &mut rng,
2236        );
2237
2238        // Make sure the primary is aware of the transmissions in the proposal.
2239        for (transmission_id, transmission) in proposal.transmissions() {
2240            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2241        }
2242
2243        // The author must be known to resolver to pass propose checks.
2244        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2245        // The primary must be considered synced.
2246        primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2247
2248        // Try to process the batch proposal from the peer, should succeed.
2249        primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2250    }
2251
2252    #[tokio::test]
2253    async fn test_batch_propose_from_peer_wrong_round() {
2254        let mut rng = TestRng::default();
2255        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2256
2257        // Create a valid proposal with an author that isn't the primary.
2258        let round = 1;
2259        let peer_account = &accounts[1];
2260        let peer_ip = peer_account.0;
2261        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2262        let proposal = create_test_proposal(
2263            &peer_account.1,
2264            primary.ledger.current_committee().unwrap(),
2265            round,
2266            Default::default(),
2267            timestamp,
2268            &mut rng,
2269        );
2270
2271        // Make sure the primary is aware of the transmissions in the proposal.
2272        for (transmission_id, transmission) in proposal.transmissions() {
2273            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2274        }
2275
2276        // The author must be known to resolver to pass propose checks.
2277        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2278        // The primary must be considered synced.
2279        primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2280
2281        // Try to process the batch proposal from the peer, should error.
2282        assert!(
2283            primary
2284                .process_batch_propose_from_peer(peer_ip, BatchPropose {
2285                    round: round + 1,
2286                    batch_header: Data::Object(proposal.batch_header().clone())
2287                })
2288                .await
2289                .is_err()
2290        );
2291    }
2292
2293    #[tokio::test]
2294    async fn test_batch_propose_from_peer_in_round_wrong_round() {
2295        let round = 4;
2296        let mut rng = TestRng::default();
2297        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2298
2299        // Generate certificates.
2300        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2301
2302        // Create a valid proposal with an author that isn't the primary.
2303        let peer_account = &accounts[1];
2304        let peer_ip = peer_account.0;
2305        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2306        let proposal = create_test_proposal(
2307            &peer_account.1,
2308            primary.ledger.current_committee().unwrap(),
2309            round,
2310            previous_certificates,
2311            timestamp,
2312            &mut rng,
2313        );
2314
2315        // Make sure the primary is aware of the transmissions in the proposal.
2316        for (transmission_id, transmission) in proposal.transmissions() {
2317            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2318        }
2319
2320        // The author must be known to resolver to pass propose checks.
2321        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2322        // The primary must be considered synced.
2323        primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2324
2325        // Try to process the batch proposal from the peer, should error.
2326        assert!(
2327            primary
2328                .process_batch_propose_from_peer(peer_ip, BatchPropose {
2329                    round: round + 1,
2330                    batch_header: Data::Object(proposal.batch_header().clone())
2331                })
2332                .await
2333                .is_err()
2334        );
2335    }
2336
2337    #[tokio::test]
2338    async fn test_batch_propose_from_peer_with_invalid_timestamp() {
2339        let round = 2;
2340        let mut rng = TestRng::default();
2341        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2342
2343        // Generate certificates.
2344        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2345
2346        // Create a valid proposal with an author that isn't the primary.
2347        let peer_account = &accounts[1];
2348        let peer_ip = peer_account.0;
2349        let invalid_timestamp = now(); // Use a timestamp that is too early.
2350        let proposal = create_test_proposal(
2351            &peer_account.1,
2352            primary.ledger.current_committee().unwrap(),
2353            round,
2354            previous_certificates,
2355            invalid_timestamp,
2356            &mut rng,
2357        );
2358
2359        // Make sure the primary is aware of the transmissions in the proposal.
2360        for (transmission_id, transmission) in proposal.transmissions() {
2361            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2362        }
2363
2364        // The author must be known to resolver to pass propose checks.
2365        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2366        // The primary must be considered synced.
2367        primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2368
2369        // Try to process the batch proposal from the peer, should error.
2370        assert!(
2371            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2372        );
2373    }
2374
2375    #[tokio::test]
2376    async fn test_batch_propose_from_peer_with_past_timestamp() {
2377        let round = 2;
2378        let mut rng = TestRng::default();
2379        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2380
2381        // Generate certificates.
2382        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2383
2384        // Create a valid proposal with an author that isn't the primary.
2385        let peer_account = &accounts[1];
2386        let peer_ip = peer_account.0;
2387        let past_timestamp = now() - 5; // Use a timestamp that is in the past.
2388        let proposal = create_test_proposal(
2389            &peer_account.1,
2390            primary.ledger.current_committee().unwrap(),
2391            round,
2392            previous_certificates,
2393            past_timestamp,
2394            &mut rng,
2395        );
2396
2397        // Make sure the primary is aware of the transmissions in the proposal.
2398        for (transmission_id, transmission) in proposal.transmissions() {
2399            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2400        }
2401
2402        // The author must be known to resolver to pass propose checks.
2403        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2404        // The primary must be considered synced.
2405        primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2406
2407        // Try to process the batch proposal from the peer, should error.
2408        assert!(
2409            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2410        );
2411    }
2412
2413    #[tokio::test]
2414    async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2415        let round = 3;
2416        let mut rng = TestRng::default();
2417        let (primary, _) = primary_without_handlers(&mut rng).await;
2418
2419        // Check there is no batch currently proposed.
2420        assert!(primary.proposed_batch.read().is_none());
2421
2422        // Generate a solution and a transaction.
2423        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2424        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2425
2426        // Store it on one of the workers.
2427        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2428        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2429
2430        // Set the proposal lock to a round ahead of the storage.
2431        let old_proposal_lock_round = *primary.propose_lock.lock().await;
2432        *primary.propose_lock.lock().await = round + 1;
2433
2434        // Propose a batch and enforce that it fails.
2435        assert!(primary.propose_batch().await.is_ok());
2436        assert!(primary.proposed_batch.read().is_none());
2437
2438        // Set the proposal lock back to the old round.
2439        *primary.propose_lock.lock().await = old_proposal_lock_round;
2440
2441        // Try to propose a batch again. This time, it should succeed.
2442        assert!(primary.propose_batch().await.is_ok());
2443        assert!(primary.proposed_batch.read().is_some());
2444    }
2445
2446    #[tokio::test]
2447    async fn test_propose_batch_with_storage_round_behind_proposal() {
2448        let round = 5;
2449        let mut rng = TestRng::default();
2450        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2451
2452        // Generate previous certificates.
2453        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2454
2455        // Create a valid proposal.
2456        let timestamp = now();
2457        let proposal = create_test_proposal(
2458            primary.gateway.account(),
2459            primary.ledger.current_committee().unwrap(),
2460            round + 1,
2461            previous_certificates,
2462            timestamp,
2463            &mut rng,
2464        );
2465
2466        // Store the proposal on the primary.
2467        *primary.proposed_batch.write() = Some(proposal);
2468
2469        // Try to propose a batch will terminate early because the storage is behind the proposal.
2470        assert!(primary.propose_batch().await.is_ok());
2471        assert!(primary.proposed_batch.read().is_some());
2472        assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
2473    }
2474
2475    #[tokio::test(flavor = "multi_thread")]
2476    async fn test_batch_signature_from_peer() {
2477        let mut rng = TestRng::default();
2478        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2479        map_account_addresses(&primary, &accounts);
2480
2481        // Create a valid proposal.
2482        let round = 1;
2483        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2484        let proposal = create_test_proposal(
2485            primary.gateway.account(),
2486            primary.ledger.current_committee().unwrap(),
2487            round,
2488            Default::default(),
2489            timestamp,
2490            &mut rng,
2491        );
2492
2493        // Store the proposal on the primary.
2494        *primary.proposed_batch.write() = Some(proposal);
2495
2496        // Each committee member signs the batch.
2497        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2498
2499        // Have the primary process the signatures.
2500        for (socket_addr, signature) in signatures {
2501            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2502        }
2503
2504        // Check the certificate was created and stored by the primary.
2505        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2506        // Check the round was incremented.
2507        assert_eq!(primary.current_round(), round + 1);
2508    }
2509
2510    #[tokio::test(flavor = "multi_thread")]
2511    async fn test_batch_signature_from_peer_in_round() {
2512        let round = 5;
2513        let mut rng = TestRng::default();
2514        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2515        map_account_addresses(&primary, &accounts);
2516
2517        // Generate certificates.
2518        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2519
2520        // Create a valid proposal.
2521        let timestamp = now();
2522        let proposal = create_test_proposal(
2523            primary.gateway.account(),
2524            primary.ledger.current_committee().unwrap(),
2525            round,
2526            previous_certificates,
2527            timestamp,
2528            &mut rng,
2529        );
2530
2531        // Store the proposal on the primary.
2532        *primary.proposed_batch.write() = Some(proposal);
2533
2534        // Each committee member signs the batch.
2535        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2536
2537        // Have the primary process the signatures.
2538        for (socket_addr, signature) in signatures {
2539            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2540        }
2541
2542        // Check the certificate was created and stored by the primary.
2543        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2544        // Check the round was incremented.
2545        assert_eq!(primary.current_round(), round + 1);
2546    }
2547
2548    #[tokio::test]
2549    async fn test_batch_signature_from_peer_no_quorum() {
2550        let mut rng = TestRng::default();
2551        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2552        map_account_addresses(&primary, &accounts);
2553
2554        // Create a valid proposal.
2555        let round = 1;
2556        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2557        let proposal = create_test_proposal(
2558            primary.gateway.account(),
2559            primary.ledger.current_committee().unwrap(),
2560            round,
2561            Default::default(),
2562            timestamp,
2563            &mut rng,
2564        );
2565
2566        // Store the proposal on the primary.
2567        *primary.proposed_batch.write() = Some(proposal);
2568
2569        // Each committee member signs the batch.
2570        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2571
2572        // Have the primary process only one signature, mimicking a lack of quorum.
2573        let (socket_addr, signature) = signatures.first().unwrap();
2574        primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2575
2576        // Check the certificate was not created and stored by the primary.
2577        assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2578        // Check the round was incremented.
2579        assert_eq!(primary.current_round(), round);
2580    }
2581
2582    #[tokio::test]
2583    async fn test_batch_signature_from_peer_in_round_no_quorum() {
2584        let round = 7;
2585        let mut rng = TestRng::default();
2586        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2587        map_account_addresses(&primary, &accounts);
2588
2589        // Generate certificates.
2590        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2591
2592        // Create a valid proposal.
2593        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2594        let proposal = create_test_proposal(
2595            primary.gateway.account(),
2596            primary.ledger.current_committee().unwrap(),
2597            round,
2598            previous_certificates,
2599            timestamp,
2600            &mut rng,
2601        );
2602
2603        // Store the proposal on the primary.
2604        *primary.proposed_batch.write() = Some(proposal);
2605
2606        // Each committee member signs the batch.
2607        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2608
2609        // Have the primary process only one signature, mimicking a lack of quorum.
2610        let (socket_addr, signature) = signatures.first().unwrap();
2611        primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2612
2613        // Check the certificate was not created and stored by the primary.
2614        assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2615        // Check the round was incremented.
2616        assert_eq!(primary.current_round(), round);
2617    }
2618
2619    #[tokio::test]
2620    async fn test_insert_certificate_with_aborted_transmissions() {
2621        let round = 3;
2622        let prev_round = round - 1;
2623        let mut rng = TestRng::default();
2624        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2625        let peer_account = &accounts[1];
2626        let peer_ip = peer_account.0;
2627
2628        // Fill primary storage.
2629        store_certificate_chain(&primary, &accounts, round, &mut rng);
2630
2631        // Get transmissions from previous certificates.
2632        let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2633
2634        // Generate a solution and a transaction.
2635        let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2636        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2637
2638        // Store it on one of the workers.
2639        primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2640        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2641
2642        // Check that the worker has 2 transmissions.
2643        assert_eq!(primary.workers[0].num_transmissions(), 2);
2644
2645        // Create certificates for the current round.
2646        let account = accounts[0].1.clone();
2647        let (certificate, transmissions) =
2648            create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
2649        let certificate_id = certificate.id();
2650
2651        // Randomly abort some of the transmissions.
2652        let mut aborted_transmissions = HashSet::new();
2653        let mut transmissions_without_aborted = HashMap::new();
2654        for (transmission_id, transmission) in transmissions.clone() {
2655            match rng.gen::<bool>() || aborted_transmissions.is_empty() {
2656                true => {
2657                    // Insert the aborted transmission.
2658                    aborted_transmissions.insert(transmission_id);
2659                }
2660                false => {
2661                    // Insert the transmission without the aborted transmission.
2662                    transmissions_without_aborted.insert(transmission_id, transmission);
2663                }
2664            };
2665        }
2666
2667        // Add the non-aborted transmissions to the worker.
2668        for (transmission_id, transmission) in transmissions_without_aborted.iter() {
2669            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2670        }
2671
2672        // Check that inserting the transmission with missing transmissions fails.
2673        assert!(
2674            primary
2675                .storage
2676                .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
2677                .is_err()
2678        );
2679        assert!(
2680            primary
2681                .storage
2682                .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
2683                .is_err()
2684        );
2685
2686        // Insert the certificate to storage.
2687        primary
2688            .storage
2689            .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
2690            .unwrap();
2691
2692        // Ensure the certificate exists in storage.
2693        assert!(primary.storage.contains_certificate(certificate_id));
2694        // Ensure that the aborted transmission IDs exist in storage.
2695        for aborted_transmission_id in aborted_transmissions {
2696            assert!(primary.storage.contains_transmission(aborted_transmission_id));
2697            assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
2698        }
2699    }
2700}