snarkos_node_bft/
primary.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    Gateway,
18    MAX_BATCH_DELAY_IN_MS,
19    MAX_WORKERS,
20    MIN_BATCH_DELAY_IN_SECS,
21    PRIMARY_PING_IN_MS,
22    Sync,
23    Transport,
24    WORKER_PING_IN_MS,
25    Worker,
26    events::{BatchPropose, BatchSignature, Event},
27    helpers::{
28        BFTSender,
29        PrimaryReceiver,
30        PrimarySender,
31        Proposal,
32        ProposalCache,
33        SignedProposals,
34        Storage,
35        assign_to_worker,
36        assign_to_workers,
37        fmt_id,
38        init_sync_channels,
39        init_worker_channels,
40        now,
41    },
42    spawn_blocking,
43};
44use snarkos_account::Account;
45use snarkos_node_bft_events::PrimaryPing;
46use snarkos_node_bft_ledger_service::LedgerService;
47use snarkos_node_sync::DUMMY_SELF_IP;
48use snarkvm::{
49    console::{
50        prelude::*,
51        types::{Address, Field},
52    },
53    ledger::{
54        block::Transaction,
55        narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
56        puzzle::{Solution, SolutionID},
57    },
58    prelude::committee::Committee,
59};
60
61use colored::Colorize;
62use futures::stream::{FuturesUnordered, StreamExt};
63use indexmap::{IndexMap, IndexSet};
64use parking_lot::{Mutex, RwLock};
65use rayon::prelude::*;
66use std::{
67    collections::{HashMap, HashSet},
68    future::Future,
69    net::SocketAddr,
70    sync::Arc,
71    time::Duration,
72};
73use tokio::{
74    sync::{Mutex as TMutex, OnceCell},
75    task::JoinHandle,
76};
77
78/// A helper type for an optional proposed batch.
79pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
80
81#[derive(Clone)]
82pub struct Primary<N: Network> {
83    /// The sync module.
84    sync: Sync<N>,
85    /// The gateway.
86    gateway: Gateway<N>,
87    /// The storage.
88    storage: Storage<N>,
89    /// The ledger service.
90    ledger: Arc<dyn LedgerService<N>>,
91    /// The workers.
92    workers: Arc<[Worker<N>]>,
93    /// The BFT sender.
94    bft_sender: Arc<OnceCell<BFTSender<N>>>,
95    /// The batch proposal, if the primary is currently proposing a batch.
96    proposed_batch: Arc<ProposedBatch<N>>,
97    /// The timestamp of the most recent proposed batch.
98    latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
99    /// The recently-signed batch proposals.
100    signed_proposals: Arc<RwLock<SignedProposals<N>>>,
101    /// The spawned handles.
102    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
103    /// The lock for propose_batch.
104    propose_lock: Arc<TMutex<u64>>,
105}
106
107impl<N: Network> Primary<N> {
108    /// The maximum number of unconfirmed transmissions to send to the primary.
109    pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
110
111    /// Initializes a new primary instance.
112    pub fn new(
113        account: Account<N>,
114        storage: Storage<N>,
115        ledger: Arc<dyn LedgerService<N>>,
116        ip: Option<SocketAddr>,
117        trusted_validators: &[SocketAddr],
118        dev: Option<u16>,
119    ) -> Result<Self> {
120        // Initialize the gateway.
121        let gateway = Gateway::new(account, storage.clone(), ledger.clone(), ip, trusted_validators, dev)?;
122        // Initialize the sync module.
123        let sync = Sync::new(gateway.clone(), storage.clone(), ledger.clone());
124
125        // Initialize the primary instance.
126        Ok(Self {
127            sync,
128            gateway,
129            storage,
130            ledger,
131            workers: Arc::from(vec![]),
132            bft_sender: Default::default(),
133            proposed_batch: Default::default(),
134            latest_proposed_batch_timestamp: Default::default(),
135            signed_proposals: Default::default(),
136            handles: Default::default(),
137            propose_lock: Default::default(),
138        })
139    }
140
141    /// Load the proposal cache file and update the Primary state with the stored data.
142    async fn load_proposal_cache(&self) -> Result<()> {
143        // Fetch the signed proposals from the file system if it exists.
144        match ProposalCache::<N>::exists(self.gateway.dev()) {
145            // If the proposal cache exists, then process the proposal cache.
146            true => match ProposalCache::<N>::load(self.gateway.account().address(), self.gateway.dev()) {
147                Ok(proposal_cache) => {
148                    // Extract the proposal and signed proposals.
149                    let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
150                        proposal_cache.into();
151
152                    // Write the proposed batch.
153                    *self.proposed_batch.write() = proposed_batch;
154                    // Write the signed proposals.
155                    *self.signed_proposals.write() = signed_proposals;
156                    // Writ the propose lock.
157                    *self.propose_lock.lock().await = latest_certificate_round;
158
159                    // Update the storage with the pending certificates.
160                    for certificate in pending_certificates {
161                        let batch_id = certificate.batch_id();
162                        // We use a dummy IP because the node should not need to request from any peers.
163                        // The storage should have stored all the transmissions. If not, we simply
164                        // skip the certificate.
165                        if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
166                        {
167                            warn!("Failed to load stored certificate {} from proposal cache - {err}", fmt_id(batch_id));
168                        }
169                    }
170                    Ok(())
171                }
172                Err(err) => {
173                    bail!("Failed to read the signed proposals from the file system - {err}.");
174                }
175            },
176            // If the proposal cache does not exist, then return early.
177            false => Ok(()),
178        }
179    }
180
181    /// Run the primary instance.
182    pub async fn run(
183        &mut self,
184        bft_sender: Option<BFTSender<N>>,
185        primary_sender: PrimarySender<N>,
186        primary_receiver: PrimaryReceiver<N>,
187    ) -> Result<()> {
188        info!("Starting the primary instance of the memory pool...");
189
190        // Set the BFT sender.
191        if let Some(bft_sender) = &bft_sender {
192            // Set the BFT sender in the primary.
193            self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set");
194        }
195
196        // Construct a map of the worker senders.
197        let mut worker_senders = IndexMap::new();
198        // Construct a map for the workers.
199        let mut workers = Vec::new();
200        // Initialize the workers.
201        for id in 0..MAX_WORKERS {
202            // Construct the worker channels.
203            let (tx_worker, rx_worker) = init_worker_channels();
204            // Construct the worker instance.
205            let worker = Worker::new(
206                id,
207                Arc::new(self.gateway.clone()),
208                self.storage.clone(),
209                self.ledger.clone(),
210                self.proposed_batch.clone(),
211            )?;
212            // Run the worker instance.
213            worker.run(rx_worker);
214            // Add the worker to the list of workers.
215            workers.push(worker);
216            // Add the worker sender to the map.
217            worker_senders.insert(id, tx_worker);
218        }
219        // Set the workers.
220        self.workers = Arc::from(workers);
221
222        // First, initialize the sync channels.
223        let (sync_sender, sync_receiver) = init_sync_channels();
224        // Next, initialize the sync module and sync the storage from ledger.
225        self.sync.initialize(bft_sender).await?;
226        // Next, load and process the proposal cache before running the sync module.
227        self.load_proposal_cache().await?;
228        // Next, run the sync module.
229        self.sync.run(sync_receiver).await?;
230        // Next, initialize the gateway.
231        self.gateway.run(primary_sender, worker_senders, Some(sync_sender)).await;
232        // Lastly, start the primary handlers.
233        // Note: This ensures the primary does not start communicating before syncing is complete.
234        self.start_handlers(primary_receiver);
235
236        Ok(())
237    }
238
239    /// Returns the current round.
240    pub fn current_round(&self) -> u64 {
241        self.storage.current_round()
242    }
243
244    /// Returns `true` if the primary is synced.
245    pub fn is_synced(&self) -> bool {
246        self.sync.is_synced()
247    }
248
249    /// Returns the gateway.
250    pub const fn gateway(&self) -> &Gateway<N> {
251        &self.gateway
252    }
253
254    /// Returns the storage.
255    pub const fn storage(&self) -> &Storage<N> {
256        &self.storage
257    }
258
259    /// Returns the ledger.
260    pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
261        &self.ledger
262    }
263
264    /// Returns the number of workers.
265    pub fn num_workers(&self) -> u8 {
266        u8::try_from(self.workers.len()).expect("Too many workers")
267    }
268
269    /// Returns the workers.
270    pub const fn workers(&self) -> &Arc<[Worker<N>]> {
271        &self.workers
272    }
273
274    /// Returns the batch proposal of our primary, if one currently exists.
275    pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
276        &self.proposed_batch
277    }
278}
279
280impl<N: Network> Primary<N> {
281    /// Returns the number of unconfirmed transmissions.
282    pub fn num_unconfirmed_transmissions(&self) -> usize {
283        self.workers.iter().map(|worker| worker.num_transmissions()).sum()
284    }
285
286    /// Returns the number of unconfirmed ratifications.
287    pub fn num_unconfirmed_ratifications(&self) -> usize {
288        self.workers.iter().map(|worker| worker.num_ratifications()).sum()
289    }
290
291    /// Returns the number of solutions.
292    pub fn num_unconfirmed_solutions(&self) -> usize {
293        self.workers.iter().map(|worker| worker.num_solutions()).sum()
294    }
295
296    /// Returns the number of unconfirmed transactions.
297    pub fn num_unconfirmed_transactions(&self) -> usize {
298        self.workers.iter().map(|worker| worker.num_transactions()).sum()
299    }
300}
301
302impl<N: Network> Primary<N> {
303    /// Returns the worker transmission IDs.
304    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
305        self.workers.iter().flat_map(|worker| worker.transmission_ids())
306    }
307
308    /// Returns the worker transmissions.
309    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
310        self.workers.iter().flat_map(|worker| worker.transmissions())
311    }
312
313    /// Returns the worker solutions.
314    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
315        self.workers.iter().flat_map(|worker| worker.solutions())
316    }
317
318    /// Returns the worker transactions.
319    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
320        self.workers.iter().flat_map(|worker| worker.transactions())
321    }
322}
323
324impl<N: Network> Primary<N> {
325    /// Clears the worker solutions.
326    pub fn clear_worker_solutions(&self) {
327        self.workers.iter().for_each(Worker::clear_solutions);
328    }
329}
330
331impl<N: Network> Primary<N> {
332    /// Proposes the batch for the current round.
333    ///
334    /// This method performs the following steps:
335    /// 1. Drain the workers.
336    /// 2. Sign the batch.
337    /// 3. Set the batch proposal in the primary.
338    /// 4. Broadcast the batch header to all validators for signing.
339    pub async fn propose_batch(&self) -> Result<()> {
340        // This function isn't re-entrant.
341        let mut lock_guard = self.propose_lock.lock().await;
342
343        // Check if the proposed batch has expired, and clear it if it has expired.
344        if let Err(e) = self.check_proposed_batch_for_expiration().await {
345            warn!("Failed to check the proposed batch for expiration - {e}");
346            return Ok(());
347        }
348
349        // Retrieve the current round.
350        let round = self.current_round();
351        // Compute the previous round.
352        let previous_round = round.saturating_sub(1);
353
354        // If the current round is 0, return early.
355        ensure!(round > 0, "Round 0 cannot have transaction batches");
356
357        // If the current storage round is below the latest proposal round, then return early.
358        if round < *lock_guard {
359            warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
360            return Ok(());
361        }
362
363        // If there is a batch being proposed already,
364        // rebroadcast the batch header to the non-signers, and return early.
365        if let Some(proposal) = self.proposed_batch.read().as_ref() {
366            // Ensure that the storage is caught up to the proposal before proceeding to rebroadcast this.
367            if round < proposal.round()
368                || proposal
369                    .batch_header()
370                    .previous_certificate_ids()
371                    .iter()
372                    .any(|id| !self.storage.contains_certificate(*id))
373            {
374                warn!(
375                    "Cannot propose a batch for round {} - the current storage (round {round}) is not caught up to the proposed batch.",
376                    proposal.round(),
377                );
378                return Ok(());
379            }
380            // Construct the event.
381            // TODO(ljedrz): the BatchHeader should be serialized only once in advance before being sent to non-signers.
382            let event = Event::BatchPropose(proposal.batch_header().clone().into());
383            // Iterate through the non-signers.
384            for address in proposal.nonsigners(&self.ledger.get_committee_lookback_for_round(proposal.round())?) {
385                // Resolve the address to the peer IP.
386                match self.gateway.resolver().get_peer_ip_for_address(address) {
387                    // Resend the batch proposal to the validator for signing.
388                    Some(peer_ip) => {
389                        let (gateway, event_, round) = (self.gateway.clone(), event.clone(), proposal.round());
390                        tokio::spawn(async move {
391                            debug!("Resending batch proposal for round {round} to peer '{peer_ip}'");
392                            // Resend the batch proposal to the peer.
393                            if gateway.send(peer_ip, event_).await.is_none() {
394                                warn!("Failed to resend batch proposal for round {round} to peer '{peer_ip}'");
395                            }
396                        });
397                    }
398                    None => continue,
399                }
400            }
401            debug!("Proposed batch for round {} is still valid", proposal.round());
402            return Ok(());
403        }
404
405        #[cfg(feature = "metrics")]
406        metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
407
408        // Ensure that the primary does not create a new proposal too quickly.
409        if let Err(e) = self.check_proposal_timestamp(previous_round, self.gateway.account().address(), now()) {
410            debug!("Primary is safely skipping a batch proposal - {}", format!("{e}").dimmed());
411            return Ok(());
412        }
413
414        // Ensure the primary has not proposed a batch for this round before.
415        if self.storage.contains_certificate_in_round_from(round, self.gateway.account().address()) {
416            // If a BFT sender was provided, attempt to advance the current round.
417            if let Some(bft_sender) = self.bft_sender.get() {
418                match bft_sender.send_primary_round_to_bft(self.current_round()).await {
419                    // 'is_ready' is true if the primary is ready to propose a batch for the next round.
420                    Ok(true) => (), // continue,
421                    // 'is_ready' is false if the primary is not ready to propose a batch for the next round.
422                    Ok(false) => return Ok(()),
423                    // An error occurred while attempting to advance the current round.
424                    Err(e) => {
425                        warn!("Failed to update the BFT to the next round - {e}");
426                        return Err(e);
427                    }
428                }
429            }
430            debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
431            return Ok(());
432        }
433
434        // Determine if the current round has been proposed.
435        // Note: Do NOT make this judgment in advance before rebroadcast and round update. Rebroadcasting is
436        // good for network reliability and should not be prevented for the already existing proposed_batch.
437        // If a certificate already exists for the current round, an attempt should be made to advance the
438        // round as early as possible.
439        if round == *lock_guard {
440            warn!("Primary is safely skipping a batch proposal - round {round} already proposed");
441            return Ok(());
442        }
443
444        // Retrieve the committee to check against.
445        let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
446        // Check if the primary is connected to enough validators to reach quorum threshold.
447        {
448            // Retrieve the connected validator addresses.
449            let mut connected_validators = self.gateway.connected_addresses();
450            // Append the primary to the set.
451            connected_validators.insert(self.gateway.account().address());
452            // If quorum threshold is not reached, return early.
453            if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
454                debug!(
455                    "Primary is safely skipping a batch proposal {}",
456                    "(please connect to more validators)".dimmed()
457                );
458                trace!("Primary is connected to {} validators", connected_validators.len() - 1);
459                return Ok(());
460            }
461        }
462
463        // Retrieve the previous certificates.
464        let previous_certificates = self.storage.get_certificates_for_round(previous_round);
465
466        // Check if the batch is ready to be proposed.
467        // Note: The primary starts at round 1, and round 0 contains no certificates, by definition.
468        let mut is_ready = previous_round == 0;
469        // If the previous round is not 0, check if the previous certificates have reached the quorum threshold.
470        if previous_round > 0 {
471            // Retrieve the committee lookback for the round.
472            let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
473                bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
474            };
475            // Construct a set over the authors.
476            let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
477            // Check if the previous certificates have reached the quorum threshold.
478            if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
479                is_ready = true;
480            }
481        }
482        // If the batch is not ready to be proposed, return early.
483        if !is_ready {
484            debug!(
485                "Primary is safely skipping a batch proposal {}",
486                format!("(previous round {previous_round} has not reached quorum)").dimmed()
487            );
488            return Ok(());
489        }
490
491        // Determined the required number of transmissions per worker.
492        let num_transmissions_per_worker = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / self.num_workers() as usize;
493        // Initialize the map of transmissions.
494        let mut transmissions: IndexMap<_, _> = Default::default();
495        // Take the transmissions from the workers.
496        for worker in self.workers.iter() {
497            // Initialize a tracker for included transmissions for the current worker.
498            let mut num_transmissions_included_for_worker = 0;
499            // Keep draining the worker until the desired number of transmissions is reached or the worker is empty.
500            'outer: while num_transmissions_included_for_worker < num_transmissions_per_worker {
501                // Determine the number of remaining transmissions for the worker.
502                let num_remaining_transmissions =
503                    num_transmissions_per_worker.saturating_sub(num_transmissions_included_for_worker);
504                // Drain the worker.
505                let mut worker_transmissions = worker.drain(num_remaining_transmissions).peekable();
506                // If the worker is empty, break early.
507                if worker_transmissions.peek().is_none() {
508                    break 'outer;
509                }
510                // Iterate through the worker transmissions.
511                'inner: for (id, transmission) in worker_transmissions {
512                    // Check if the ledger already contains the transmission.
513                    if self.ledger.contains_transmission(&id).unwrap_or(true) {
514                        trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
515                        continue 'inner;
516                    }
517                    // Check if the storage already contain the transmission.
518                    // Note: We do not skip if this is the first transmission in the proposal, to ensure that
519                    // the primary does not propose a batch with no transmissions.
520                    if !transmissions.is_empty() && self.storage.contains_transmission(id) {
521                        trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
522                        continue 'inner;
523                    }
524                    // Check the transmission is still valid.
525                    match (id, transmission.clone()) {
526                        (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
527                            // Ensure the checksum matches.
528                            match solution.to_checksum::<N>() {
529                                Ok(solution_checksum) if solution_checksum == checksum => (),
530                                _ => {
531                                    trace!(
532                                        "Proposing - Skipping solution '{}' - Checksum mismatch",
533                                        fmt_id(solution_id)
534                                    );
535                                    continue 'inner;
536                                }
537                            }
538                            // Check if the solution is still valid.
539                            if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
540                                trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
541                                continue 'inner;
542                            }
543                        }
544                        (
545                            TransmissionID::Transaction(transaction_id, checksum),
546                            Transmission::Transaction(transaction),
547                        ) => {
548                            // Ensure the checksum matches.
549                            match transaction.to_checksum::<N>() {
550                                Ok(transaction_checksum) if transaction_checksum == checksum => (),
551                                _ => {
552                                    trace!(
553                                        "Proposing - Skipping transaction '{}' - Checksum mismatch",
554                                        fmt_id(transaction_id)
555                                    );
556                                    continue 'inner;
557                                }
558                            }
559                            // Check if the transaction is still valid.
560                            if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
561                                trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
562                                continue 'inner;
563                            }
564                        }
565                        // Note: We explicitly forbid including ratifications,
566                        // as the protocol currently does not support ratifications.
567                        (TransmissionID::Ratification, Transmission::Ratification) => continue,
568                        // All other combinations are clearly invalid.
569                        _ => continue 'inner,
570                    }
571                    // Insert the transmission into the map.
572                    transmissions.insert(id, transmission);
573                    num_transmissions_included_for_worker += 1;
574                }
575            }
576        }
577
578        // Determine the current timestamp.
579        let current_timestamp = now();
580
581        *lock_guard = round;
582
583        /* Proceeding to sign & propose the batch. */
584        info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
585
586        // Retrieve the private key.
587        let private_key = *self.gateway.account().private_key();
588        // Retrieve the committee ID.
589        let committee_id = committee_lookback.id();
590        // Prepare the transmission IDs.
591        let transmission_ids = transmissions.keys().copied().collect();
592        // Prepare the previous batch certificate IDs.
593        let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
594        // Sign the batch header and construct the proposal.
595        let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
596            &private_key,
597            round,
598            current_timestamp,
599            committee_id,
600            transmission_ids,
601            previous_certificate_ids,
602            &mut rand::thread_rng()
603        ))
604        .and_then(|batch_header| {
605            Proposal::new(committee_lookback, batch_header.clone(), transmissions.clone())
606                .map(|proposal| (batch_header, proposal))
607        })
608        .inspect_err(|_| {
609            // On error, reinsert the transmissions and then propagate the error.
610            if let Err(e) = self.reinsert_transmissions_into_workers(transmissions) {
611                error!("Failed to reinsert transmissions: {e:?}");
612            }
613        })?;
614        // Broadcast the batch to all validators for signing.
615        self.gateway.broadcast(Event::BatchPropose(batch_header.into()));
616        // Set the timestamp of the latest proposed batch.
617        *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
618        // Set the proposed batch.
619        *self.proposed_batch.write() = Some(proposal);
620        Ok(())
621    }
622
623    /// Processes a batch propose from a peer.
624    ///
625    /// This method performs the following steps:
626    /// 1. Verify the batch.
627    /// 2. Sign the batch.
628    /// 3. Broadcast the signature back to the validator.
629    ///
630    /// If our primary is ahead of the peer, we will not sign the batch.
631    /// If our primary is behind the peer, but within GC range, we will sync up to the peer's round, and then sign the batch.
632    async fn process_batch_propose_from_peer(&self, peer_ip: SocketAddr, batch_propose: BatchPropose<N>) -> Result<()> {
633        let BatchPropose { round: batch_round, batch_header } = batch_propose;
634
635        // Deserialize the batch header.
636        let batch_header = spawn_blocking!(batch_header.deserialize_blocking())?;
637        // Ensure the round matches in the batch header.
638        if batch_round != batch_header.round() {
639            // Proceed to disconnect the validator.
640            self.gateway.disconnect(peer_ip);
641            bail!("Malicious peer - proposed round {batch_round}, but sent batch for round {}", batch_header.round());
642        }
643
644        // Retrieve the batch author.
645        let batch_author = batch_header.author();
646
647        // Ensure the batch proposal is from the validator.
648        match self.gateway.resolver().get_address(peer_ip) {
649            // If the peer is a validator, then ensure the batch proposal is from the validator.
650            Some(address) => {
651                if address != batch_author {
652                    // Proceed to disconnect the validator.
653                    self.gateway.disconnect(peer_ip);
654                    bail!("Malicious peer - proposed batch from a different validator ({batch_author})");
655                }
656            }
657            None => bail!("Batch proposal from a disconnected validator"),
658        }
659        // Ensure the batch author is a current committee member.
660        if !self.gateway.is_authorized_validator_address(batch_author) {
661            // Proceed to disconnect the validator.
662            self.gateway.disconnect(peer_ip);
663            bail!("Malicious peer - proposed batch from a non-committee member ({batch_author})");
664        }
665        // Ensure the batch proposal is not from the current primary.
666        if self.gateway.account().address() == batch_author {
667            bail!("Invalid peer - proposed batch from myself ({batch_author})");
668        }
669
670        // Ensure that the batch proposal's committee ID matches the expected committee ID.
671        let expected_committee_id = self.ledger.get_committee_lookback_for_round(batch_round)?.id();
672        if expected_committee_id != batch_header.committee_id() {
673            // Proceed to disconnect the validator.
674            self.gateway.disconnect(peer_ip);
675            bail!(
676                "Malicious peer - proposed batch has a different committee ID ({expected_committee_id} != {})",
677                batch_header.committee_id()
678            );
679        }
680
681        // Retrieve the cached round and batch ID for this validator.
682        if let Some((signed_round, signed_batch_id, signature)) =
683            self.signed_proposals.read().get(&batch_author).copied()
684        {
685            // If the signed round is ahead of the peer's batch round, then the validator is malicious.
686            if signed_round > batch_header.round() {
687                bail!("Peer ({batch_author}) proposed a batch for a previous round ({})", batch_header.round());
688            }
689
690            // If the round matches and the batch ID differs, then the validator is malicious.
691            if signed_round == batch_header.round() && signed_batch_id != batch_header.batch_id() {
692                bail!("Peer ({batch_author}) proposed another batch for the same round ({signed_round})");
693            }
694            // If the round and batch ID matches, then skip signing the batch a second time.
695            // Instead, rebroadcast the cached signature to the peer.
696            if signed_round == batch_header.round() && signed_batch_id == batch_header.batch_id() {
697                let gateway = self.gateway.clone();
698                tokio::spawn(async move {
699                    debug!("Resending a signature for a batch in round {batch_round} from '{peer_ip}'");
700                    let event = Event::BatchSignature(BatchSignature::new(batch_header.batch_id(), signature));
701                    // Resend the batch signature to the peer.
702                    if gateway.send(peer_ip, event).await.is_none() {
703                        warn!("Failed to resend a signature for a batch in round {batch_round} to '{peer_ip}'");
704                    }
705                });
706                // Return early.
707                return Ok(());
708            }
709        }
710
711        // Ensure that the batch header doesn't already exist in storage.
712        // Note this is already checked in `check_batch_header`, however we can return early here without creating a blocking task.
713        if self.storage.contains_batch(batch_header.batch_id()) {
714            debug!(
715                "Primary is safely skipping a batch proposal from '{peer_ip}' - {}",
716                format!("batch for round {batch_round} already exists in storage").dimmed()
717            );
718            return Ok(());
719        }
720
721        // Compute the previous round.
722        let previous_round = batch_round.saturating_sub(1);
723        // Ensure that the peer did not propose a batch too quickly.
724        if let Err(e) = self.check_proposal_timestamp(previous_round, batch_author, batch_header.timestamp()) {
725            // Proceed to disconnect the validator.
726            self.gateway.disconnect(peer_ip);
727            bail!("Malicious peer - {e} from '{peer_ip}'");
728        }
729
730        // Ensure the batch header does not contain any ratifications.
731        if batch_header.contains(TransmissionID::Ratification) {
732            // Proceed to disconnect the validator.
733            self.gateway.disconnect(peer_ip);
734            bail!("Malicious peer - proposed batch contains an unsupported ratification transmissionID",);
735        }
736
737        // If the peer is ahead, use the batch header to sync up to the peer.
738        let mut missing_transmissions = self.sync_with_batch_header_from_peer::<false>(peer_ip, &batch_header).await?;
739
740        // Check that the transmission ids match and are not fee transactions.
741        if let Err(err) = cfg_iter_mut!(missing_transmissions).try_for_each(|(transmission_id, transmission)| {
742            // If the transmission is not well-formed, then return early.
743            self.ledger.ensure_transmission_is_well_formed(*transmission_id, transmission)
744        }) {
745            debug!("Batch propose from '{peer_ip}' contains an invalid transmission - {err}",);
746            return Ok(());
747        }
748
749        // Ensure the batch is for the current round.
750        // This method must be called after fetching previous certificates (above),
751        // and prior to checking the batch header (below).
752        if let Err(e) = self.ensure_is_signing_round(batch_round) {
753            // If the primary is not signing for the peer's round, then return early.
754            debug!("{e} from '{peer_ip}'");
755            return Ok(());
756        }
757
758        // Ensure the batch header from the peer is valid.
759        let (storage, header) = (self.storage.clone(), batch_header.clone());
760        let missing_transmissions =
761            spawn_blocking!(storage.check_batch_header(&header, missing_transmissions, Default::default()))?;
762        // Inserts the missing transmissions into the workers.
763        self.insert_missing_transmissions_into_workers(peer_ip, missing_transmissions.into_iter())?;
764
765        /* Proceeding to sign the batch. */
766
767        // Retrieve the batch ID.
768        let batch_id = batch_header.batch_id();
769        // Sign the batch ID.
770        let account = self.gateway.account().clone();
771        let signature = spawn_blocking!(account.sign(&[batch_id], &mut rand::thread_rng()))?;
772
773        // Ensure the proposal has not already been signed.
774        //
775        // Note: Due to the need to sync the batch header with the peer, it is possible
776        // for the primary to receive the same 'BatchPropose' event again, whereby only
777        // one instance of this handler should sign the batch. This check guarantees this.
778        match self.signed_proposals.write().0.entry(batch_author) {
779            std::collections::hash_map::Entry::Occupied(mut entry) => {
780                // If the validator has already signed a batch for this round, then return early,
781                // since, if the peer still has not received the signature, they will request it again,
782                // and the logic at the start of this function will resend the (now cached) signature
783                // to the peer if asked to sign this batch proposal again.
784                if entry.get().0 == batch_round {
785                    return Ok(());
786                }
787                // Otherwise, cache the round, batch ID, and signature for this validator.
788                entry.insert((batch_round, batch_id, signature));
789            }
790            // If the validator has not signed a batch before, then continue.
791            std::collections::hash_map::Entry::Vacant(entry) => {
792                // Cache the round, batch ID, and signature for this validator.
793                entry.insert((batch_round, batch_id, signature));
794            }
795        };
796
797        // Broadcast the signature back to the validator.
798        let self_ = self.clone();
799        tokio::spawn(async move {
800            let event = Event::BatchSignature(BatchSignature::new(batch_id, signature));
801            // Send the batch signature to the peer.
802            if self_.gateway.send(peer_ip, event).await.is_some() {
803                debug!("Signed a batch for round {batch_round} from '{peer_ip}'");
804            }
805        });
806        Ok(())
807    }
808
809    /// Processes a batch signature from a peer.
810    ///
811    /// This method performs the following steps:
812    /// 1. Ensure the proposed batch has not expired.
813    /// 2. Verify the signature, ensuring it corresponds to the proposed batch.
814    /// 3. Store the signature.
815    /// 4. Certify the batch if enough signatures have been received.
816    /// 5. Broadcast the batch certificate to all validators.
817    async fn process_batch_signature_from_peer(
818        &self,
819        peer_ip: SocketAddr,
820        batch_signature: BatchSignature<N>,
821    ) -> Result<()> {
822        // Ensure the proposed batch has not expired, and clear the proposed batch if it has expired.
823        self.check_proposed_batch_for_expiration().await?;
824
825        // Retrieve the signature and timestamp.
826        let BatchSignature { batch_id, signature } = batch_signature;
827
828        // Retrieve the signer.
829        let signer = signature.to_address();
830
831        // Ensure the batch signature is signed by the validator.
832        if self.gateway.resolver().get_address(peer_ip).map_or(true, |address| address != signer) {
833            // Proceed to disconnect the validator.
834            self.gateway.disconnect(peer_ip);
835            bail!("Malicious peer - batch signature is from a different validator ({signer})");
836        }
837        // Ensure the batch signature is not from the current primary.
838        if self.gateway.account().address() == signer {
839            bail!("Invalid peer - received a batch signature from myself ({signer})");
840        }
841
842        let self_ = self.clone();
843        let Some(proposal) = spawn_blocking!({
844            // Acquire the write lock.
845            let mut proposed_batch = self_.proposed_batch.write();
846            // Add the signature to the batch, and determine if the batch is ready to be certified.
847            match proposed_batch.as_mut() {
848                Some(proposal) => {
849                    // Ensure the batch ID matches the currently proposed batch ID.
850                    if proposal.batch_id() != batch_id {
851                        match self_.storage.contains_batch(batch_id) {
852                            // If this batch was already certified, return early.
853                            true => {
854                                debug!(
855                                    "Primary is safely skipping a a batch signature from {peer_ip} for round {} - batch is already certified",
856                                    proposal.round()
857                                );
858                                return Ok(None);
859                            }
860                            // If the batch ID is unknown, return an error.
861                            false => bail!(
862                                "Unknown batch ID '{batch_id}', expected '{}' for round {}",
863                                proposal.batch_id(),
864                                proposal.round()
865                            ),
866                        }
867                    }
868                    // Retrieve the committee lookback for the round.
869                    let committee_lookback = self_.ledger.get_committee_lookback_for_round(proposal.round())?;
870                    // Retrieve the address of the validator.
871                    let Some(signer) = self_.gateway.resolver().get_address(peer_ip) else {
872                        bail!("Signature is from a disconnected validator");
873                    };
874                    // Add the signature to the batch.
875                    proposal.add_signature(signer, signature, &committee_lookback)?;
876                    info!("Received a batch signature for round {} from '{peer_ip}'", proposal.round());
877                    // Check if the batch is ready to be certified.
878                    if !proposal.is_quorum_threshold_reached(&committee_lookback) {
879                        // If the batch is not ready to be certified, return early.
880                        return Ok(None);
881                    }
882                }
883                // There is no proposed batch, so return early.
884                None => return Ok(None),
885            };
886            // Retrieve the batch proposal, clearing the proposed batch.
887            match proposed_batch.take() {
888                Some(proposal) => Ok(Some(proposal)),
889                None => Ok(None),
890            }
891        })?
892        else {
893            return Ok(());
894        };
895
896        /* Proceeding to certify the batch. */
897
898        info!("Quorum threshold reached - Preparing to certify our batch for round {}...", proposal.round());
899
900        // Retrieve the committee lookback for the round.
901        let committee_lookback = self.ledger.get_committee_lookback_for_round(proposal.round())?;
902        // Store the certified batch and broadcast it to all validators.
903        // If there was an error storing the certificate, reinsert the transmissions back into the ready queue.
904        if let Err(e) = self.store_and_broadcast_certificate(&proposal, &committee_lookback).await {
905            // Reinsert the transmissions back into the ready queue for the next proposal.
906            self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
907            return Err(e);
908        }
909
910        #[cfg(feature = "metrics")]
911        metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
912        Ok(())
913    }
914
915    /// Processes a batch certificate from a peer.
916    ///
917    /// This method performs the following steps:
918    /// 1. Stores the given batch certificate, after ensuring it is valid.
919    /// 2. If there are enough certificates to reach quorum threshold for the current round,
920    ///     then proceed to advance to the next round.
921    async fn process_batch_certificate_from_peer(
922        &self,
923        peer_ip: SocketAddr,
924        certificate: BatchCertificate<N>,
925    ) -> Result<()> {
926        // Ensure the batch certificate is from an authorized validator.
927        if !self.gateway.is_authorized_validator_ip(peer_ip) {
928            // Proceed to disconnect the validator.
929            self.gateway.disconnect(peer_ip);
930            bail!("Malicious peer - Received a batch certificate from an unauthorized validator IP ({peer_ip})");
931        }
932        // Ensure storage does not already contain the certificate.
933        if self.storage.contains_certificate(certificate.id()) {
934            return Ok(());
935        // Otherwise, ensure ephemeral storage contains the certificate.
936        } else if !self.storage.contains_unprocessed_certificate(certificate.id()) {
937            self.storage.insert_unprocessed_certificate(certificate.clone())?;
938        }
939
940        // Retrieve the batch certificate author.
941        let author = certificate.author();
942        // Retrieve the batch certificate round.
943        let certificate_round = certificate.round();
944        // Retrieve the batch certificate committee ID.
945        let committee_id = certificate.committee_id();
946
947        // Ensure the batch certificate is not from the current primary.
948        if self.gateway.account().address() == author {
949            bail!("Received a batch certificate for myself ({author})");
950        }
951
952        // Store the certificate, after ensuring it is valid.
953        self.sync_with_certificate_from_peer::<false>(peer_ip, certificate).await?;
954
955        // If there are enough certificates to reach quorum threshold for the certificate round,
956        // then proceed to advance to the next round.
957
958        // Retrieve the committee lookback.
959        let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
960        // Retrieve the certificate authors.
961        let authors = self.storage.get_certificate_authors_for_round(certificate_round);
962        // Check if the certificates have reached the quorum threshold.
963        let is_quorum = committee_lookback.is_quorum_threshold_reached(&authors);
964
965        // Ensure that the batch certificate's committee ID matches the expected committee ID.
966        let expected_committee_id = committee_lookback.id();
967        if expected_committee_id != committee_id {
968            // Proceed to disconnect the validator.
969            self.gateway.disconnect(peer_ip);
970            bail!("Batch certificate has a different committee ID ({expected_committee_id} != {committee_id})");
971        }
972
973        // Determine if we are currently proposing a round that is relevant.
974        // Note: This is important, because while our peers have advanced,
975        // they may not be proposing yet, and thus still able to sign our proposed batch.
976        let should_advance = match &*self.proposed_batch.read() {
977            // We advance if the proposal round is less than the current round that was just certified.
978            Some(proposal) => proposal.round() < certificate_round,
979            // If there's no proposal, we consider advancing.
980            None => true,
981        };
982
983        // Retrieve the current round.
984        let current_round = self.current_round();
985
986        // Determine whether to advance to the next round.
987        if is_quorum && should_advance && certificate_round >= current_round {
988            // If we have reached the quorum threshold and the round should advance, then proceed to the next round.
989            self.try_increment_to_the_next_round(current_round + 1).await?;
990        }
991        Ok(())
992    }
993}
994
995impl<N: Network> Primary<N> {
996    /// Starts the primary handlers.
997    fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
998        let PrimaryReceiver {
999            mut rx_batch_propose,
1000            mut rx_batch_signature,
1001            mut rx_batch_certified,
1002            mut rx_primary_ping,
1003            mut rx_unconfirmed_solution,
1004            mut rx_unconfirmed_transaction,
1005        } = primary_receiver;
1006
1007        // Start the primary ping.
1008        if self.sync.is_gateway_mode() {
1009            let self_ = self.clone();
1010            self.spawn(async move {
1011                loop {
1012                    // Sleep briefly.
1013                    tokio::time::sleep(Duration::from_millis(PRIMARY_PING_IN_MS)).await;
1014
1015                    // Retrieve the block locators.
1016                    let self__ = self_.clone();
1017                    let block_locators = match spawn_blocking!(self__.sync.get_block_locators()) {
1018                        Ok(block_locators) => block_locators,
1019                        Err(e) => {
1020                            warn!("Failed to retrieve block locators - {e}");
1021                            continue;
1022                        }
1023                    };
1024
1025                    // Retrieve the latest certificate of the primary.
1026                    let primary_certificate = {
1027                        // Retrieve the primary address.
1028                        let primary_address = self_.gateway.account().address();
1029
1030                        // Iterate backwards from the latest round to find the primary certificate.
1031                        let mut certificate = None;
1032                        let mut current_round = self_.current_round();
1033                        while certificate.is_none() {
1034                            // If the current round is 0, then break the while loop.
1035                            if current_round == 0 {
1036                                break;
1037                            }
1038                            // Retrieve the primary certificates.
1039                            if let Some(primary_certificate) =
1040                                self_.storage.get_certificate_for_round_with_author(current_round, primary_address)
1041                            {
1042                                certificate = Some(primary_certificate);
1043                            // If the primary certificate was not found, decrement the round.
1044                            } else {
1045                                current_round = current_round.saturating_sub(1);
1046                            }
1047                        }
1048
1049                        // Determine if the primary certificate was found.
1050                        match certificate {
1051                            Some(certificate) => certificate,
1052                            // Skip this iteration of the loop (do not send a primary ping).
1053                            None => continue,
1054                        }
1055                    };
1056
1057                    // Construct the primary ping.
1058                    let primary_ping = PrimaryPing::from((<Event<N>>::VERSION, block_locators, primary_certificate));
1059                    // Broadcast the event.
1060                    self_.gateway.broadcast(Event::PrimaryPing(primary_ping));
1061                }
1062            });
1063        }
1064
1065        // Start the primary ping handler.
1066        let self_ = self.clone();
1067        self.spawn(async move {
1068            while let Some((peer_ip, primary_certificate)) = rx_primary_ping.recv().await {
1069                // If the primary is not synced, then do not process the primary ping.
1070                if !self_.sync.is_synced() {
1071                    trace!("Skipping a primary ping from '{peer_ip}' {}", "(node is syncing)".dimmed());
1072                    continue;
1073                }
1074
1075                // Spawn a task to process the primary certificate.
1076                {
1077                    let self_ = self_.clone();
1078                    tokio::spawn(async move {
1079                        // Deserialize the primary certificate in the primary ping.
1080                        let Ok(primary_certificate) = spawn_blocking!(primary_certificate.deserialize_blocking())
1081                        else {
1082                            warn!("Failed to deserialize primary certificate in 'PrimaryPing' from '{peer_ip}'");
1083                            return;
1084                        };
1085                        // Process the primary certificate.
1086                        if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, primary_certificate).await {
1087                            warn!("Cannot process a primary certificate in a 'PrimaryPing' from '{peer_ip}' - {e}");
1088                        }
1089                    });
1090                }
1091            }
1092        });
1093
1094        // Start the worker ping(s).
1095        if self.sync.is_gateway_mode() {
1096            let self_ = self.clone();
1097            self.spawn(async move {
1098                loop {
1099                    tokio::time::sleep(Duration::from_millis(WORKER_PING_IN_MS)).await;
1100                    // If the primary is not synced, then do not broadcast the worker ping(s).
1101                    if !self_.sync.is_synced() {
1102                        trace!("Skipping worker ping(s) {}", "(node is syncing)".dimmed());
1103                        continue;
1104                    }
1105                    // Broadcast the worker ping(s).
1106                    for worker in self_.workers.iter() {
1107                        worker.broadcast_ping();
1108                    }
1109                }
1110            });
1111        }
1112
1113        // Start the batch proposer.
1114        let self_ = self.clone();
1115        self.spawn(async move {
1116            loop {
1117                // Sleep briefly, but longer than if there were no batch.
1118                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1119                // If the primary is not synced, then do not propose a batch.
1120                if !self_.sync.is_synced() {
1121                    debug!("Skipping batch proposal {}", "(node is syncing)".dimmed());
1122                    continue;
1123                }
1124                // A best-effort attempt to skip the scheduled batch proposal if
1125                // round progression already triggered one.
1126                if self_.propose_lock.try_lock().is_err() {
1127                    trace!("Skipping batch proposal {}", "(node is already proposing)".dimmed());
1128                    continue;
1129                };
1130                // If there is no proposed batch, attempt to propose a batch.
1131                // Note: Do NOT spawn a task around this function call. Proposing a batch is a critical path,
1132                // and only one batch needs be proposed at a time.
1133                if let Err(e) = self_.propose_batch().await {
1134                    warn!("Cannot propose a batch - {e}");
1135                }
1136            }
1137        });
1138
1139        // Process the proposed batch.
1140        let self_ = self.clone();
1141        self.spawn(async move {
1142            while let Some((peer_ip, batch_propose)) = rx_batch_propose.recv().await {
1143                // If the primary is not synced, then do not sign the batch.
1144                if !self_.sync.is_synced() {
1145                    trace!("Skipping a batch proposal from '{peer_ip}' {}", "(node is syncing)".dimmed());
1146                    continue;
1147                }
1148                // Spawn a task to process the proposed batch.
1149                let self_ = self_.clone();
1150                tokio::spawn(async move {
1151                    // Process the batch proposal.
1152                    if let Err(e) = self_.process_batch_propose_from_peer(peer_ip, batch_propose).await {
1153                        warn!("Cannot sign a batch from '{peer_ip}' - {e}");
1154                    }
1155                });
1156            }
1157        });
1158
1159        // Process the batch signature.
1160        let self_ = self.clone();
1161        self.spawn(async move {
1162            while let Some((peer_ip, batch_signature)) = rx_batch_signature.recv().await {
1163                // If the primary is not synced, then do not store the signature.
1164                if !self_.sync.is_synced() {
1165                    trace!("Skipping a batch signature from '{peer_ip}' {}", "(node is syncing)".dimmed());
1166                    continue;
1167                }
1168                // Process the batch signature.
1169                // Note: Do NOT spawn a task around this function call. Processing signatures from peers
1170                // is a critical path, and we should only store the minimum required number of signatures.
1171                // In addition, spawning a task can cause concurrent processing of signatures (even with a lock),
1172                // which means the RwLock for the proposed batch must become a 'tokio::sync' to be safe.
1173                if let Err(e) = self_.process_batch_signature_from_peer(peer_ip, batch_signature).await {
1174                    warn!("Cannot store a signature from '{peer_ip}' - {e}");
1175                }
1176            }
1177        });
1178
1179        // Process the certified batch.
1180        let self_ = self.clone();
1181        self.spawn(async move {
1182            while let Some((peer_ip, batch_certificate)) = rx_batch_certified.recv().await {
1183                // If the primary is not synced, then do not store the certificate.
1184                if !self_.sync.is_synced() {
1185                    trace!("Skipping a certified batch from '{peer_ip}' {}", "(node is syncing)".dimmed());
1186                    continue;
1187                }
1188                // Spawn a task to process the batch certificate.
1189                let self_ = self_.clone();
1190                tokio::spawn(async move {
1191                    // Deserialize the batch certificate.
1192                    let Ok(batch_certificate) = spawn_blocking!(batch_certificate.deserialize_blocking()) else {
1193                        warn!("Failed to deserialize the batch certificate from '{peer_ip}'");
1194                        return;
1195                    };
1196                    // Process the batch certificate.
1197                    if let Err(e) = self_.process_batch_certificate_from_peer(peer_ip, batch_certificate).await {
1198                        warn!("Cannot store a certificate from '{peer_ip}' - {e}");
1199                    }
1200                });
1201            }
1202        });
1203
1204        // Periodically try to increment to the next round.
1205        // Note: This is necessary to ensure that the primary is not stuck on a previous round
1206        // despite having received enough certificates to advance to the next round.
1207        let self_ = self.clone();
1208        self.spawn(async move {
1209            loop {
1210                // Sleep briefly.
1211                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
1212                // If the primary is not synced, then do not increment to the next round.
1213                if !self_.sync.is_synced() {
1214                    trace!("Skipping round increment {}", "(node is syncing)".dimmed());
1215                    continue;
1216                }
1217                // Attempt to increment to the next round.
1218                let next_round = self_.current_round().saturating_add(1);
1219                // Determine if the quorum threshold is reached for the current round.
1220                let is_quorum_threshold_reached = {
1221                    // Retrieve the certificate authors for the next round.
1222                    let authors = self_.storage.get_certificate_authors_for_round(next_round);
1223                    // If there are no certificates, then skip this check.
1224                    if authors.is_empty() {
1225                        continue;
1226                    }
1227                    let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(next_round) else {
1228                        warn!("Failed to retrieve the committee lookback for round {next_round}");
1229                        continue;
1230                    };
1231                    committee_lookback.is_quorum_threshold_reached(&authors)
1232                };
1233                // Attempt to increment to the next round if the quorum threshold is reached.
1234                if is_quorum_threshold_reached {
1235                    debug!("Quorum threshold reached for round {}", next_round);
1236                    if let Err(e) = self_.try_increment_to_the_next_round(next_round).await {
1237                        warn!("Failed to increment to the next round - {e}");
1238                    }
1239                }
1240            }
1241        });
1242
1243        // Process the unconfirmed solutions.
1244        let self_ = self.clone();
1245        self.spawn(async move {
1246            while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
1247                // Compute the checksum for the solution.
1248                let Ok(checksum) = solution.to_checksum::<N>() else {
1249                    error!("Failed to compute the checksum for the unconfirmed solution");
1250                    continue;
1251                };
1252                // Compute the worker ID.
1253                let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
1254                    error!("Unable to determine the worker ID for the unconfirmed solution");
1255                    continue;
1256                };
1257                let self_ = self_.clone();
1258                tokio::spawn(async move {
1259                    // Retrieve the worker.
1260                    let worker = &self_.workers[worker_id as usize];
1261                    // Process the unconfirmed solution.
1262                    let result = worker.process_unconfirmed_solution(solution_id, solution).await;
1263                    // Send the result to the callback.
1264                    callback.send(result).ok();
1265                });
1266            }
1267        });
1268
1269        // Process the unconfirmed transactions.
1270        let self_ = self.clone();
1271        self.spawn(async move {
1272            while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
1273                trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
1274                // Compute the checksum for the transaction.
1275                let Ok(checksum) = transaction.to_checksum::<N>() else {
1276                    error!("Failed to compute the checksum for the unconfirmed transaction");
1277                    continue;
1278                };
1279                // Compute the worker ID.
1280                let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
1281                    error!("Unable to determine the worker ID for the unconfirmed transaction");
1282                    continue;
1283                };
1284                let self_ = self_.clone();
1285                tokio::spawn(async move {
1286                    // Retrieve the worker.
1287                    let worker = &self_.workers[worker_id as usize];
1288                    // Process the unconfirmed transaction.
1289                    let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
1290                    // Send the result to the callback.
1291                    callback.send(result).ok();
1292                });
1293            }
1294        });
1295    }
1296
1297    /// Checks if the proposed batch is expired, and clears the proposed batch if it has expired.
1298    async fn check_proposed_batch_for_expiration(&self) -> Result<()> {
1299        // Check if the proposed batch is timed out or stale.
1300        let is_expired = match self.proposed_batch.read().as_ref() {
1301            Some(proposal) => proposal.round() < self.current_round(),
1302            None => false,
1303        };
1304        // If the batch is expired, clear the proposed batch.
1305        if is_expired {
1306            // Reset the proposed batch.
1307            let proposal = self.proposed_batch.write().take();
1308            if let Some(proposal) = proposal {
1309                debug!("Cleared expired proposal for round {}", proposal.round());
1310                self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
1311            }
1312        }
1313        Ok(())
1314    }
1315
1316    /// Increments to the next round.
1317    async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
1318        // If the next round is within GC range, then iterate to the penultimate round.
1319        if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1320            let mut fast_forward_round = self.current_round();
1321            // Iterate until the penultimate round is reached.
1322            while fast_forward_round < next_round.saturating_sub(1) {
1323                // Update to the next round in storage.
1324                fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1325                // Clear the proposed batch.
1326                *self.proposed_batch.write() = None;
1327            }
1328        }
1329
1330        // Retrieve the current round.
1331        let current_round = self.current_round();
1332        // Attempt to advance to the next round.
1333        if current_round < next_round {
1334            // If a BFT sender was provided, send the current round to the BFT.
1335            let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
1336                match bft_sender.send_primary_round_to_bft(current_round).await {
1337                    Ok(is_ready) => is_ready,
1338                    Err(e) => {
1339                        warn!("Failed to update the BFT to the next round - {e}");
1340                        return Err(e);
1341                    }
1342                }
1343            }
1344            // Otherwise, handle the Narwhal case.
1345            else {
1346                // Update to the next round in storage.
1347                self.storage.increment_to_next_round(current_round)?;
1348                // Set 'is_ready' to 'true'.
1349                true
1350            };
1351
1352            // Log whether the next round is ready.
1353            match is_ready {
1354                true => debug!("Primary is ready to propose the next round"),
1355                false => debug!("Primary is not ready to propose the next round"),
1356            }
1357
1358            // If the node is ready, propose a batch for the next round.
1359            if is_ready {
1360                self.propose_batch().await?;
1361            }
1362        }
1363        Ok(())
1364    }
1365
1366    /// Ensures the primary is signing for the specified batch round.
1367    /// This method is used to ensure: for a given round, as soon as the primary starts proposing,
1368    /// it will no longer sign for the previous round (as it has enough previous certificates to proceed).
1369    fn ensure_is_signing_round(&self, batch_round: u64) -> Result<()> {
1370        // Retrieve the current round.
1371        let current_round = self.current_round();
1372        // Ensure the batch round is within GC range of the current round.
1373        if current_round + self.storage.max_gc_rounds() <= batch_round {
1374            bail!("Round {batch_round} is too far in the future")
1375        }
1376        // Ensure the batch round is at or one before the current round.
1377        // Intuition: Our primary has moved on to the next round, but has not necessarily started proposing,
1378        // so we can still sign for the previous round. If we have started proposing, the next check will fail.
1379        if current_round > batch_round + 1 {
1380            bail!("Primary is on round {current_round}, and no longer signing for round {batch_round}")
1381        }
1382        // Check if the primary is still signing for the batch round.
1383        if let Some(signing_round) = self.proposed_batch.read().as_ref().map(|proposal| proposal.round()) {
1384            if signing_round > batch_round {
1385                bail!("Our primary at round {signing_round} is no longer signing for round {batch_round}")
1386            }
1387        }
1388        Ok(())
1389    }
1390
1391    /// Ensure the primary is not creating batch proposals too frequently.
1392    /// This checks that the certificate timestamp for the previous round is within the expected range.
1393    fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1394        // Retrieve the timestamp of the previous timestamp to check against.
1395        let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1396            // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
1397            Some(certificate) => certificate.timestamp(),
1398            None => match self.gateway.account().address() == author {
1399                // If we are the author, then ensure the previous proposal was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
1400                true => *self.latest_proposed_batch_timestamp.read(),
1401                // If we do not see a previous certificate for the author, then proceed optimistically.
1402                false => return Ok(()),
1403            },
1404        };
1405
1406        // Determine the elapsed time since the previous timestamp.
1407        let elapsed = timestamp
1408            .checked_sub(previous_timestamp)
1409            .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1410        // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
1411        match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1412            true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1413            false => Ok(()),
1414        }
1415    }
1416
1417    /// Stores the certified batch and broadcasts it to all validators, returning the certificate.
1418    async fn store_and_broadcast_certificate(&self, proposal: &Proposal<N>, committee: &Committee<N>) -> Result<()> {
1419        // Create the batch certificate and transmissions.
1420        let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1421        // Convert the transmissions into a HashMap.
1422        // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
1423        let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1424        // Store the certified batch.
1425        let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1426        spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1427        debug!("Stored a batch certificate for round {}", certificate.round());
1428        // If a BFT sender was provided, send the certificate to the BFT.
1429        if let Some(bft_sender) = self.bft_sender.get() {
1430            // Await the callback to continue.
1431            if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1432                warn!("Failed to update the BFT DAG from primary - {e}");
1433                return Err(e);
1434            };
1435        }
1436        // Broadcast the certified batch to all validators.
1437        self.gateway.broadcast(Event::BatchCertified(certificate.clone().into()));
1438        // Log the certified batch.
1439        let num_transmissions = certificate.transmission_ids().len();
1440        let round = certificate.round();
1441        info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1442        // Increment to the next round.
1443        self.try_increment_to_the_next_round(round + 1).await
1444    }
1445
1446    /// Inserts the missing transmissions from the proposal into the workers.
1447    fn insert_missing_transmissions_into_workers(
1448        &self,
1449        peer_ip: SocketAddr,
1450        transmissions: impl Iterator<Item = (TransmissionID<N>, Transmission<N>)>,
1451    ) -> Result<()> {
1452        // Insert the transmissions into the workers.
1453        assign_to_workers(&self.workers, transmissions, |worker, transmission_id, transmission| {
1454            worker.process_transmission_from_peer(peer_ip, transmission_id, transmission);
1455        })
1456    }
1457
1458    /// Re-inserts the transmissions from the proposal into the workers.
1459    fn reinsert_transmissions_into_workers(
1460        &self,
1461        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1462    ) -> Result<()> {
1463        // Re-insert the transmissions into the workers.
1464        assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1465            worker.reinsert(transmission_id, transmission);
1466        })
1467    }
1468
1469    /// Recursively stores a given batch certificate, after ensuring:
1470    ///   - Ensure the round matches the committee round.
1471    ///   - Ensure the address is a member of the committee.
1472    ///   - Ensure the timestamp is within range.
1473    ///   - Ensure we have all of the transmissions.
1474    ///   - Ensure we have all of the previous certificates.
1475    ///   - Ensure the previous certificates are for the previous round (i.e. round - 1).
1476    ///   - Ensure the previous certificates have reached the quorum threshold.
1477    ///   - Ensure we have not already signed the batch ID.
1478    #[async_recursion::async_recursion]
1479    async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1480        &self,
1481        peer_ip: SocketAddr,
1482        certificate: BatchCertificate<N>,
1483    ) -> Result<()> {
1484        // Retrieve the batch header.
1485        let batch_header = certificate.batch_header();
1486        // Retrieve the batch round.
1487        let batch_round = batch_header.round();
1488
1489        // If the certificate round is outdated, do not store it.
1490        if batch_round <= self.storage.gc_round() {
1491            return Ok(());
1492        }
1493        // If the certificate already exists in storage, return early.
1494        if self.storage.contains_certificate(certificate.id()) {
1495            return Ok(());
1496        }
1497
1498        // If node is not in sync mode and the node is not synced. Then return an error.
1499        if !IS_SYNCING && !self.is_synced() {
1500            bail!(
1501                "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1502                fmt_id(certificate.id())
1503            );
1504        }
1505
1506        // If the peer is ahead, use the batch header to sync up to the peer.
1507        let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?;
1508
1509        // Check if the certificate needs to be stored.
1510        if !self.storage.contains_certificate(certificate.id()) {
1511            // Store the batch certificate.
1512            let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1513            spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1514            debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1515            // If a BFT sender was provided, send the round and certificate to the BFT.
1516            if let Some(bft_sender) = self.bft_sender.get() {
1517                // Send the certificate to the BFT.
1518                if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1519                    warn!("Failed to update the BFT DAG from sync: {e}");
1520                    return Err(e);
1521                };
1522            }
1523        }
1524        Ok(())
1525    }
1526
1527    /// Recursively syncs using the given batch header.
1528    async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>(
1529        &self,
1530        peer_ip: SocketAddr,
1531        batch_header: &BatchHeader<N>,
1532    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1533        // Retrieve the batch round.
1534        let batch_round = batch_header.round();
1535
1536        // If the certificate round is outdated, do not store it.
1537        if batch_round <= self.storage.gc_round() {
1538            bail!("Round {batch_round} is too far in the past")
1539        }
1540
1541        // If node is not in sync mode and the node is not synced. Then return an error.
1542        if !IS_SYNCING && !self.is_synced() {
1543            bail!(
1544                "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1545                fmt_id(batch_header.batch_id())
1546            );
1547        }
1548
1549        // Determine if quorum threshold is reached on the batch round.
1550        let is_quorum_threshold_reached = {
1551            let authors = self.storage.get_certificate_authors_for_round(batch_round);
1552            let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1553            committee_lookback.is_quorum_threshold_reached(&authors)
1554        };
1555
1556        // Check if our primary should move to the next round.
1557        // Note: Checking that quorum threshold is reached is important for mitigating a race condition,
1558        // whereby Narwhal requires N-f, however the BFT only requires f+1. Without this check, the primary
1559        // will advance to the next round assuming f+1, not N-f, which can lead to a network stall.
1560        let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1561        // Check if our primary is far behind the peer.
1562        let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1563        // If our primary is far behind the peer, update our committee to the batch round.
1564        if is_behind_schedule || is_peer_far_in_future {
1565            // If the batch round is greater than the current committee round, update the committee.
1566            self.try_increment_to_the_next_round(batch_round).await?;
1567        }
1568
1569        // Ensure the primary has all of the transmissions.
1570        let missing_transmissions_handle = self.fetch_missing_transmissions(peer_ip, batch_header);
1571
1572        // Ensure the primary has all of the previous certificates.
1573        let missing_previous_certificates_handle = self.fetch_missing_previous_certificates(peer_ip, batch_header);
1574
1575        // Wait for the missing transmissions and previous certificates to be fetched.
1576        let (missing_transmissions, missing_previous_certificates) = tokio::try_join!(
1577            missing_transmissions_handle,
1578            missing_previous_certificates_handle,
1579        ).map_err(|e| {
1580            anyhow!("Failed to fetch missing transmissions and previous certificates for round {batch_round} from '{peer_ip}' - {e}")
1581        })?;
1582
1583        // Iterate through the missing previous certificates.
1584        for batch_certificate in missing_previous_certificates {
1585            // Store the batch certificate (recursively fetching any missing previous certificates).
1586            self.sync_with_certificate_from_peer::<IS_SYNCING>(peer_ip, batch_certificate).await?;
1587        }
1588        Ok(missing_transmissions)
1589    }
1590
1591    /// Fetches any missing transmissions for the specified batch header.
1592    /// If a transmission does not exist, it will be fetched from the specified peer IP.
1593    async fn fetch_missing_transmissions(
1594        &self,
1595        peer_ip: SocketAddr,
1596        batch_header: &BatchHeader<N>,
1597    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1598        // If the round is <= the GC round, return early.
1599        if batch_header.round() <= self.storage.gc_round() {
1600            return Ok(Default::default());
1601        }
1602
1603        // Ensure this batch ID is new, otherwise return early.
1604        if self.storage.contains_batch(batch_header.batch_id()) {
1605            trace!("Batch for round {} from peer has already been processed", batch_header.round());
1606            return Ok(Default::default());
1607        }
1608
1609        // Retrieve the workers.
1610        let workers = self.workers.clone();
1611
1612        // Initialize a list for the transmissions.
1613        let mut fetch_transmissions = FuturesUnordered::new();
1614
1615        // Retrieve the number of workers.
1616        let num_workers = self.num_workers();
1617        // Iterate through the transmission IDs.
1618        for transmission_id in batch_header.transmission_ids() {
1619            // If the transmission does not exist in storage, proceed to fetch the transmission.
1620            if !self.storage.contains_transmission(*transmission_id) {
1621                // Determine the worker ID.
1622                let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1623                    bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1624                };
1625                // Retrieve the worker.
1626                let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1627                // Push the callback onto the list.
1628                fetch_transmissions.push(worker.get_or_fetch_transmission(peer_ip, *transmission_id));
1629            }
1630        }
1631
1632        // Initialize a set for the transmissions.
1633        let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1634        // Wait for all of the transmissions to be fetched.
1635        while let Some(result) = fetch_transmissions.next().await {
1636            // Retrieve the transmission.
1637            let (transmission_id, transmission) = result?;
1638            // Insert the transmission into the set.
1639            transmissions.insert(transmission_id, transmission);
1640        }
1641        // Return the transmissions.
1642        Ok(transmissions)
1643    }
1644
1645    /// Fetches any missing previous certificates for the specified batch header from the specified peer.
1646    async fn fetch_missing_previous_certificates(
1647        &self,
1648        peer_ip: SocketAddr,
1649        batch_header: &BatchHeader<N>,
1650    ) -> Result<HashSet<BatchCertificate<N>>> {
1651        // Retrieve the round.
1652        let round = batch_header.round();
1653        // If the previous round is 0, or is <= the GC round, return early.
1654        if round == 1 || round <= self.storage.gc_round() + 1 {
1655            return Ok(Default::default());
1656        }
1657
1658        // Fetch the missing previous certificates.
1659        let missing_previous_certificates =
1660            self.fetch_missing_certificates(peer_ip, round, batch_header.previous_certificate_ids()).await?;
1661        if !missing_previous_certificates.is_empty() {
1662            debug!(
1663                "Fetched {} missing previous certificates for round {round} from '{peer_ip}'",
1664                missing_previous_certificates.len(),
1665            );
1666        }
1667        // Return the missing previous certificates.
1668        Ok(missing_previous_certificates)
1669    }
1670
1671    /// Fetches any missing certificates for the specified batch header from the specified peer.
1672    async fn fetch_missing_certificates(
1673        &self,
1674        peer_ip: SocketAddr,
1675        round: u64,
1676        certificate_ids: &IndexSet<Field<N>>,
1677    ) -> Result<HashSet<BatchCertificate<N>>> {
1678        // Initialize a list for the missing certificates.
1679        let mut fetch_certificates = FuturesUnordered::new();
1680        // Initialize a set for the missing certificates.
1681        let mut missing_certificates = HashSet::default();
1682        // Iterate through the certificate IDs.
1683        for certificate_id in certificate_ids {
1684            // Check if the certificate already exists in the ledger.
1685            if self.ledger.contains_certificate(certificate_id)? {
1686                continue;
1687            }
1688            // Check if the certificate already exists in storage.
1689            if self.storage.contains_certificate(*certificate_id) {
1690                continue;
1691            }
1692            // If we have not fully processed the certificate yet, store it.
1693            if let Some(certificate) = self.storage.get_unprocessed_certificate(*certificate_id) {
1694                missing_certificates.insert(certificate);
1695            } else {
1696                // If we do not have the certificate, request it.
1697                trace!("Primary - Found a new certificate ID for round {round} from '{peer_ip}'");
1698                // TODO (howardwu): Limit the number of open requests we send to a peer.
1699                // Send an certificate request to the peer.
1700                fetch_certificates.push(self.sync.send_certificate_request(peer_ip, *certificate_id));
1701            }
1702        }
1703
1704        // If there are no certificates to fetch, return early with the existing unprocessed certificates.
1705        match fetch_certificates.is_empty() {
1706            true => return Ok(missing_certificates),
1707            false => trace!(
1708                "Fetching {} missing certificates for round {round} from '{peer_ip}'...",
1709                fetch_certificates.len(),
1710            ),
1711        }
1712
1713        // Wait for all of the missing certificates to be fetched.
1714        while let Some(result) = fetch_certificates.next().await {
1715            // Insert the missing certificate into the set.
1716            missing_certificates.insert(result?);
1717        }
1718        // Return the missing certificates.
1719        Ok(missing_certificates)
1720    }
1721}
1722
1723impl<N: Network> Primary<N> {
1724    /// Spawns a task with the given future; it should only be used for long-running tasks.
1725    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1726        self.handles.lock().push(tokio::spawn(future));
1727    }
1728
1729    /// Shuts down the primary.
1730    pub async fn shut_down(&self) {
1731        info!("Shutting down the primary...");
1732        // Shut down the workers.
1733        self.workers.iter().for_each(|worker| worker.shut_down());
1734        // Abort the tasks.
1735        self.handles.lock().iter().for_each(|handle| handle.abort());
1736        // Save the current proposal cache to disk.
1737        let proposal_cache = {
1738            let proposal = self.proposed_batch.write().take();
1739            let signed_proposals = self.signed_proposals.read().clone();
1740            let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
1741            let pending_certificates = self.storage.get_pending_certificates();
1742            ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
1743        };
1744        if let Err(err) = proposal_cache.store(self.gateway.dev()) {
1745            error!("Failed to store the current proposal cache: {err}");
1746        }
1747        // Close the gateway.
1748        self.gateway.shut_down().await;
1749    }
1750}
1751
1752#[cfg(test)]
1753mod tests {
1754    use super::*;
1755    use snarkos_node_bft_ledger_service::MockLedgerService;
1756    use snarkos_node_bft_storage_service::BFTMemoryService;
1757    use snarkvm::{
1758        ledger::committee::{Committee, MIN_VALIDATOR_STAKE},
1759        prelude::{Address, Signature},
1760    };
1761
1762    use bytes::Bytes;
1763    use indexmap::IndexSet;
1764    use rand::RngCore;
1765
1766    type CurrentNetwork = snarkvm::prelude::MainnetV0;
1767
1768    // Returns a primary and a list of accounts in the configured committee.
1769    async fn primary_without_handlers(
1770        rng: &mut TestRng,
1771    ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
1772        // Create a committee containing the primary's account.
1773        let (accounts, committee) = {
1774            const COMMITTEE_SIZE: usize = 4;
1775            let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
1776            let mut members = IndexMap::new();
1777
1778            for i in 0..COMMITTEE_SIZE {
1779                let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
1780                let account = Account::new(rng).unwrap();
1781                members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
1782                accounts.push((socket_addr, account));
1783            }
1784
1785            (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
1786        };
1787
1788        let account = accounts.first().unwrap().1.clone();
1789        let ledger = Arc::new(MockLedgerService::new(committee));
1790        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1791
1792        // Initialize the primary.
1793        let mut primary = Primary::new(account, storage, ledger, None, &[], None).unwrap();
1794
1795        // Construct a worker instance.
1796        primary.workers = Arc::from([Worker::new(
1797            0, // id
1798            Arc::new(primary.gateway.clone()),
1799            primary.storage.clone(),
1800            primary.ledger.clone(),
1801            primary.proposed_batch.clone(),
1802        )
1803        .unwrap()]);
1804        for a in accounts.iter() {
1805            primary.gateway.insert_connected_peer(a.0, a.0, a.1.address());
1806        }
1807
1808        (primary, accounts)
1809    }
1810
1811    // Creates a mock solution.
1812    fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
1813        // Sample a random fake solution ID.
1814        let solution_id = rng.gen::<u64>().into();
1815        // Vary the size of the solutions.
1816        let size = rng.gen_range(1024..10 * 1024);
1817        // Sample random fake solution bytes.
1818        let mut vec = vec![0u8; size];
1819        rng.fill_bytes(&mut vec);
1820        let solution = Data::Buffer(Bytes::from(vec));
1821        // Return the solution ID and solution.
1822        (solution_id, solution)
1823    }
1824
1825    // Creates a mock transaction.
1826    fn sample_unconfirmed_transaction(
1827        rng: &mut TestRng,
1828    ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
1829        // Sample a random fake transaction ID.
1830        let id = Field::<CurrentNetwork>::rand(rng).into();
1831        // Vary the size of the transactions.
1832        let size = rng.gen_range(1024..10 * 1024);
1833        // Sample random fake transaction bytes.
1834        let mut vec = vec![0u8; size];
1835        rng.fill_bytes(&mut vec);
1836        let transaction = Data::Buffer(Bytes::from(vec));
1837        // Return the ID and transaction.
1838        (id, transaction)
1839    }
1840
1841    // Creates a batch proposal with one solution and one transaction.
1842    fn create_test_proposal(
1843        author: &Account<CurrentNetwork>,
1844        committee: Committee<CurrentNetwork>,
1845        round: u64,
1846        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1847        timestamp: i64,
1848        rng: &mut TestRng,
1849    ) -> Proposal<CurrentNetwork> {
1850        let (solution_id, solution) = sample_unconfirmed_solution(rng);
1851        let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1852        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1853        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1854
1855        let solution_transmission_id = (solution_id, solution_checksum).into();
1856        let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1857
1858        // Retrieve the private key.
1859        let private_key = author.private_key();
1860        // Prepare the transmission IDs.
1861        let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
1862        let transmissions = [
1863            (solution_transmission_id, Transmission::Solution(solution)),
1864            (transaction_transmission_id, Transmission::Transaction(transaction)),
1865        ]
1866        .into();
1867        // Sign the batch header.
1868        let batch_header = BatchHeader::new(
1869            private_key,
1870            round,
1871            timestamp,
1872            committee.id(),
1873            transmission_ids,
1874            previous_certificate_ids,
1875            rng,
1876        )
1877        .unwrap();
1878        // Construct the proposal.
1879        Proposal::new(committee, batch_header, transmissions).unwrap()
1880    }
1881
1882    // Creates a signature of the primary's current proposal for each committee member (excluding
1883    // the primary).
1884    fn peer_signatures_for_proposal(
1885        primary: &Primary<CurrentNetwork>,
1886        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1887        rng: &mut TestRng,
1888    ) -> Vec<(SocketAddr, BatchSignature<CurrentNetwork>)> {
1889        // Each committee member signs the batch.
1890        let mut signatures = Vec::with_capacity(accounts.len() - 1);
1891        for (socket_addr, account) in accounts {
1892            if account.address() == primary.gateway.account().address() {
1893                continue;
1894            }
1895            let batch_id = primary.proposed_batch.read().as_ref().unwrap().batch_id();
1896            let signature = account.sign(&[batch_id], rng).unwrap();
1897            signatures.push((*socket_addr, BatchSignature::new(batch_id, signature)));
1898        }
1899
1900        signatures
1901    }
1902
1903    /// Creates a signature of the batch ID for each committee member (excluding the primary).
1904    fn peer_signatures_for_batch(
1905        primary_address: Address<CurrentNetwork>,
1906        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1907        batch_id: Field<CurrentNetwork>,
1908        rng: &mut TestRng,
1909    ) -> IndexSet<Signature<CurrentNetwork>> {
1910        let mut signatures = IndexSet::new();
1911        for (_, account) in accounts {
1912            if account.address() == primary_address {
1913                continue;
1914            }
1915            let signature = account.sign(&[batch_id], rng).unwrap();
1916            signatures.insert(signature);
1917        }
1918        signatures
1919    }
1920
1921    // Creates a batch certificate.
1922    fn create_batch_certificate(
1923        primary_address: Address<CurrentNetwork>,
1924        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1925        round: u64,
1926        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1927        rng: &mut TestRng,
1928    ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
1929        let timestamp = now();
1930
1931        let author =
1932            accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
1933        let private_key = author.private_key();
1934
1935        let committee_id = Field::rand(rng);
1936        let (solution_id, solution) = sample_unconfirmed_solution(rng);
1937        let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1938        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1939        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1940
1941        let solution_transmission_id = (solution_id, solution_checksum).into();
1942        let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1943
1944        let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
1945        let transmissions = [
1946            (solution_transmission_id, Transmission::Solution(solution)),
1947            (transaction_transmission_id, Transmission::Transaction(transaction)),
1948        ]
1949        .into();
1950
1951        let batch_header = BatchHeader::new(
1952            private_key,
1953            round,
1954            timestamp,
1955            committee_id,
1956            transmission_ids,
1957            previous_certificate_ids,
1958            rng,
1959        )
1960        .unwrap();
1961        let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
1962        let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
1963        (certificate, transmissions)
1964    }
1965
1966    // Create a certificate chain up to round in primary storage.
1967    fn store_certificate_chain(
1968        primary: &Primary<CurrentNetwork>,
1969        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1970        round: u64,
1971        rng: &mut TestRng,
1972    ) -> IndexSet<Field<CurrentNetwork>> {
1973        let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1974        let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1975        for cur_round in 1..round {
1976            for (_, account) in accounts.iter() {
1977                let (certificate, transmissions) = create_batch_certificate(
1978                    account.address(),
1979                    accounts,
1980                    cur_round,
1981                    previous_certificates.clone(),
1982                    rng,
1983                );
1984                next_certificates.insert(certificate.id());
1985                assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
1986            }
1987
1988            assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
1989            previous_certificates = next_certificates;
1990            next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1991        }
1992
1993        previous_certificates
1994    }
1995
1996    // Insert the account socket addresses into the resolver so that
1997    // they are recognized as "connected".
1998    fn map_account_addresses(primary: &Primary<CurrentNetwork>, accounts: &[(SocketAddr, Account<CurrentNetwork>)]) {
1999        // First account is primary, which doesn't need to resolve.
2000        for (addr, acct) in accounts.iter().skip(1) {
2001            primary.gateway.resolver().insert_peer(*addr, *addr, acct.address());
2002        }
2003    }
2004
2005    #[tokio::test]
2006    async fn test_propose_batch() {
2007        let mut rng = TestRng::default();
2008        let (primary, _) = primary_without_handlers(&mut rng).await;
2009
2010        // Check there is no batch currently proposed.
2011        assert!(primary.proposed_batch.read().is_none());
2012
2013        // Generate a solution and a transaction.
2014        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2015        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2016
2017        // Store it on one of the workers.
2018        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2019        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2020
2021        // Try to propose a batch again. This time, it should succeed.
2022        assert!(primary.propose_batch().await.is_ok());
2023        assert!(primary.proposed_batch.read().is_some());
2024    }
2025
2026    #[tokio::test]
2027    async fn test_propose_batch_with_no_transmissions() {
2028        let mut rng = TestRng::default();
2029        let (primary, _) = primary_without_handlers(&mut rng).await;
2030
2031        // Check there is no batch currently proposed.
2032        assert!(primary.proposed_batch.read().is_none());
2033
2034        // Try to propose a batch with no transmissions.
2035        assert!(primary.propose_batch().await.is_ok());
2036        assert!(primary.proposed_batch.read().is_some());
2037    }
2038
2039    #[tokio::test]
2040    async fn test_propose_batch_in_round() {
2041        let round = 3;
2042        let mut rng = TestRng::default();
2043        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2044
2045        // Fill primary storage.
2046        store_certificate_chain(&primary, &accounts, round, &mut rng);
2047
2048        // Sleep for a while to ensure the primary is ready to propose the next round.
2049        tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2050
2051        // Generate a solution and a transaction.
2052        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2053        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2054
2055        // Store it on one of the workers.
2056        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2057        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2058
2059        // Propose a batch again. This time, it should succeed.
2060        assert!(primary.propose_batch().await.is_ok());
2061        assert!(primary.proposed_batch.read().is_some());
2062    }
2063
2064    #[tokio::test]
2065    async fn test_propose_batch_skip_transmissions_from_previous_certificates() {
2066        let round = 3;
2067        let prev_round = round - 1;
2068        let mut rng = TestRng::default();
2069        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2070        let peer_account = &accounts[1];
2071        let peer_ip = peer_account.0;
2072
2073        // Fill primary storage.
2074        store_certificate_chain(&primary, &accounts, round, &mut rng);
2075
2076        // Get transmissions from previous certificates.
2077        let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2078
2079        // Track the number of transmissions in the previous round.
2080        let mut num_transmissions_in_previous_round = 0;
2081
2082        // Generate a solution and a transaction.
2083        let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2084        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2085        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
2086        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
2087
2088        // Store it on one of the workers.
2089        primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2090        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2091
2092        // Check that the worker has 2 transmissions.
2093        assert_eq!(primary.workers[0].num_transmissions(), 2);
2094
2095        // Create certificates for the current round and add the transmissions to the worker before inserting the certificate to storage.
2096        for (_, account) in accounts.iter() {
2097            let (certificate, transmissions) = create_batch_certificate(
2098                account.address(),
2099                &accounts,
2100                round,
2101                previous_certificate_ids.clone(),
2102                &mut rng,
2103            );
2104
2105            // Add the transmissions to the worker.
2106            for (transmission_id, transmission) in transmissions.iter() {
2107                primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2108            }
2109
2110            // Insert the certificate to storage.
2111            num_transmissions_in_previous_round += transmissions.len();
2112            primary.storage.insert_certificate(certificate, transmissions, Default::default()).unwrap();
2113        }
2114
2115        // Sleep for a while to ensure the primary is ready to propose the next round.
2116        tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
2117
2118        // Advance to the next round.
2119        assert!(primary.storage.increment_to_next_round(round).is_ok());
2120
2121        // Check that the worker has `num_transmissions_in_previous_round + 2` transmissions.
2122        assert_eq!(primary.workers[0].num_transmissions(), num_transmissions_in_previous_round + 2);
2123
2124        // Propose the batch.
2125        assert!(primary.propose_batch().await.is_ok());
2126
2127        // Check that the proposal only contains the new transmissions that were not in previous certificates.
2128        let proposed_transmissions = primary.proposed_batch.read().as_ref().unwrap().transmissions().clone();
2129        assert_eq!(proposed_transmissions.len(), 2);
2130        assert!(proposed_transmissions.contains_key(&TransmissionID::Solution(solution_commitment, solution_checksum)));
2131        assert!(
2132            proposed_transmissions.contains_key(&TransmissionID::Transaction(transaction_id, transaction_checksum))
2133        );
2134    }
2135
2136    #[tokio::test]
2137    async fn test_batch_propose_from_peer() {
2138        let mut rng = TestRng::default();
2139        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2140
2141        // Create a valid proposal with an author that isn't the primary.
2142        let round = 1;
2143        let peer_account = &accounts[1];
2144        let peer_ip = peer_account.0;
2145        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2146        let proposal = create_test_proposal(
2147            &peer_account.1,
2148            primary.ledger.current_committee().unwrap(),
2149            round,
2150            Default::default(),
2151            timestamp,
2152            &mut rng,
2153        );
2154
2155        // Make sure the primary is aware of the transmissions in the proposal.
2156        for (transmission_id, transmission) in proposal.transmissions() {
2157            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2158        }
2159
2160        // The author must be known to resolver to pass propose checks.
2161        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2162        // The primary must be considered synced.
2163        primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2164
2165        // Try to process the batch proposal from the peer, should succeed.
2166        assert!(
2167            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_ok()
2168        );
2169    }
2170
2171    #[tokio::test]
2172    async fn test_batch_propose_from_peer_when_not_synced() {
2173        let mut rng = TestRng::default();
2174        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2175
2176        // Create a valid proposal with an author that isn't the primary.
2177        let round = 1;
2178        let peer_account = &accounts[1];
2179        let peer_ip = peer_account.0;
2180        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2181        let proposal = create_test_proposal(
2182            &peer_account.1,
2183            primary.ledger.current_committee().unwrap(),
2184            round,
2185            Default::default(),
2186            timestamp,
2187            &mut rng,
2188        );
2189
2190        // Make sure the primary is aware of the transmissions in the proposal.
2191        for (transmission_id, transmission) in proposal.transmissions() {
2192            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2193        }
2194
2195        // The author must be known to resolver to pass propose checks.
2196        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2197
2198        // Try to process the batch proposal from the peer, should fail.
2199        assert!(
2200            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2201        );
2202    }
2203
2204    #[tokio::test]
2205    async fn test_batch_propose_from_peer_in_round() {
2206        let round = 2;
2207        let mut rng = TestRng::default();
2208        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2209
2210        // Generate certificates.
2211        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2212
2213        // Create a valid proposal with an author that isn't the primary.
2214        let peer_account = &accounts[1];
2215        let peer_ip = peer_account.0;
2216        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2217        let proposal = create_test_proposal(
2218            &peer_account.1,
2219            primary.ledger.current_committee().unwrap(),
2220            round,
2221            previous_certificates,
2222            timestamp,
2223            &mut rng,
2224        );
2225
2226        // Make sure the primary is aware of the transmissions in the proposal.
2227        for (transmission_id, transmission) in proposal.transmissions() {
2228            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2229        }
2230
2231        // The author must be known to resolver to pass propose checks.
2232        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2233        // The primary must be considered synced.
2234        primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2235
2236        // Try to process the batch proposal from the peer, should succeed.
2237        primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.unwrap();
2238    }
2239
2240    #[tokio::test]
2241    async fn test_batch_propose_from_peer_wrong_round() {
2242        let mut rng = TestRng::default();
2243        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2244
2245        // Create a valid proposal with an author that isn't the primary.
2246        let round = 1;
2247        let peer_account = &accounts[1];
2248        let peer_ip = peer_account.0;
2249        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2250        let proposal = create_test_proposal(
2251            &peer_account.1,
2252            primary.ledger.current_committee().unwrap(),
2253            round,
2254            Default::default(),
2255            timestamp,
2256            &mut rng,
2257        );
2258
2259        // Make sure the primary is aware of the transmissions in the proposal.
2260        for (transmission_id, transmission) in proposal.transmissions() {
2261            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2262        }
2263
2264        // The author must be known to resolver to pass propose checks.
2265        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2266        // The primary must be considered synced.
2267        primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2268
2269        // Try to process the batch proposal from the peer, should error.
2270        assert!(
2271            primary
2272                .process_batch_propose_from_peer(peer_ip, BatchPropose {
2273                    round: round + 1,
2274                    batch_header: Data::Object(proposal.batch_header().clone())
2275                })
2276                .await
2277                .is_err()
2278        );
2279    }
2280
2281    #[tokio::test]
2282    async fn test_batch_propose_from_peer_in_round_wrong_round() {
2283        let round = 4;
2284        let mut rng = TestRng::default();
2285        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2286
2287        // Generate certificates.
2288        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2289
2290        // Create a valid proposal with an author that isn't the primary.
2291        let peer_account = &accounts[1];
2292        let peer_ip = peer_account.0;
2293        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2294        let proposal = create_test_proposal(
2295            &peer_account.1,
2296            primary.ledger.current_committee().unwrap(),
2297            round,
2298            previous_certificates,
2299            timestamp,
2300            &mut rng,
2301        );
2302
2303        // Make sure the primary is aware of the transmissions in the proposal.
2304        for (transmission_id, transmission) in proposal.transmissions() {
2305            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2306        }
2307
2308        // The author must be known to resolver to pass propose checks.
2309        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2310        // The primary must be considered synced.
2311        primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2312
2313        // Try to process the batch proposal from the peer, should error.
2314        assert!(
2315            primary
2316                .process_batch_propose_from_peer(peer_ip, BatchPropose {
2317                    round: round + 1,
2318                    batch_header: Data::Object(proposal.batch_header().clone())
2319                })
2320                .await
2321                .is_err()
2322        );
2323    }
2324
2325    #[tokio::test]
2326    async fn test_batch_propose_from_peer_with_invalid_timestamp() {
2327        let round = 2;
2328        let mut rng = TestRng::default();
2329        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2330
2331        // Generate certificates.
2332        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2333
2334        // Create a valid proposal with an author that isn't the primary.
2335        let peer_account = &accounts[1];
2336        let peer_ip = peer_account.0;
2337        let invalid_timestamp = now(); // Use a timestamp that is too early.
2338        let proposal = create_test_proposal(
2339            &peer_account.1,
2340            primary.ledger.current_committee().unwrap(),
2341            round,
2342            previous_certificates,
2343            invalid_timestamp,
2344            &mut rng,
2345        );
2346
2347        // Make sure the primary is aware of the transmissions in the proposal.
2348        for (transmission_id, transmission) in proposal.transmissions() {
2349            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2350        }
2351
2352        // The author must be known to resolver to pass propose checks.
2353        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2354        // The primary must be considered synced.
2355        primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2356
2357        // Try to process the batch proposal from the peer, should error.
2358        assert!(
2359            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2360        );
2361    }
2362
2363    #[tokio::test]
2364    async fn test_batch_propose_from_peer_with_past_timestamp() {
2365        let round = 2;
2366        let mut rng = TestRng::default();
2367        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2368
2369        // Generate certificates.
2370        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2371
2372        // Create a valid proposal with an author that isn't the primary.
2373        let peer_account = &accounts[1];
2374        let peer_ip = peer_account.0;
2375        let past_timestamp = now() - 5; // Use a timestamp that is in the past.
2376        let proposal = create_test_proposal(
2377            &peer_account.1,
2378            primary.ledger.current_committee().unwrap(),
2379            round,
2380            previous_certificates,
2381            past_timestamp,
2382            &mut rng,
2383        );
2384
2385        // Make sure the primary is aware of the transmissions in the proposal.
2386        for (transmission_id, transmission) in proposal.transmissions() {
2387            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone())
2388        }
2389
2390        // The author must be known to resolver to pass propose checks.
2391        primary.gateway.resolver().insert_peer(peer_ip, peer_ip, peer_account.1.address());
2392        // The primary must be considered synced.
2393        primary.sync.block_sync().try_block_sync(&primary.gateway.clone()).await;
2394
2395        // Try to process the batch proposal from the peer, should error.
2396        assert!(
2397            primary.process_batch_propose_from_peer(peer_ip, (*proposal.batch_header()).clone().into()).await.is_err()
2398        );
2399    }
2400
2401    #[tokio::test]
2402    async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
2403        let round = 3;
2404        let mut rng = TestRng::default();
2405        let (primary, _) = primary_without_handlers(&mut rng).await;
2406
2407        // Check there is no batch currently proposed.
2408        assert!(primary.proposed_batch.read().is_none());
2409
2410        // Generate a solution and a transaction.
2411        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
2412        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2413
2414        // Store it on one of the workers.
2415        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
2416        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2417
2418        // Set the proposal lock to a round ahead of the storage.
2419        let old_proposal_lock_round = *primary.propose_lock.lock().await;
2420        *primary.propose_lock.lock().await = round + 1;
2421
2422        // Propose a batch and enforce that it fails.
2423        assert!(primary.propose_batch().await.is_ok());
2424        assert!(primary.proposed_batch.read().is_none());
2425
2426        // Set the proposal lock back to the old round.
2427        *primary.propose_lock.lock().await = old_proposal_lock_round;
2428
2429        // Try to propose a batch again. This time, it should succeed.
2430        assert!(primary.propose_batch().await.is_ok());
2431        assert!(primary.proposed_batch.read().is_some());
2432    }
2433
2434    #[tokio::test]
2435    async fn test_propose_batch_with_storage_round_behind_proposal() {
2436        let round = 5;
2437        let mut rng = TestRng::default();
2438        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2439
2440        // Generate previous certificates.
2441        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2442
2443        // Create a valid proposal.
2444        let timestamp = now();
2445        let proposal = create_test_proposal(
2446            primary.gateway.account(),
2447            primary.ledger.current_committee().unwrap(),
2448            round + 1,
2449            previous_certificates,
2450            timestamp,
2451            &mut rng,
2452        );
2453
2454        // Store the proposal on the primary.
2455        *primary.proposed_batch.write() = Some(proposal);
2456
2457        // Try to propose a batch will terminate early because the storage is behind the proposal.
2458        assert!(primary.propose_batch().await.is_ok());
2459        assert!(primary.proposed_batch.read().is_some());
2460        assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
2461    }
2462
2463    #[tokio::test(flavor = "multi_thread")]
2464    async fn test_batch_signature_from_peer() {
2465        let mut rng = TestRng::default();
2466        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2467        map_account_addresses(&primary, &accounts);
2468
2469        // Create a valid proposal.
2470        let round = 1;
2471        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2472        let proposal = create_test_proposal(
2473            primary.gateway.account(),
2474            primary.ledger.current_committee().unwrap(),
2475            round,
2476            Default::default(),
2477            timestamp,
2478            &mut rng,
2479        );
2480
2481        // Store the proposal on the primary.
2482        *primary.proposed_batch.write() = Some(proposal);
2483
2484        // Each committee member signs the batch.
2485        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2486
2487        // Have the primary process the signatures.
2488        for (socket_addr, signature) in signatures {
2489            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2490        }
2491
2492        // Check the certificate was created and stored by the primary.
2493        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2494        // Check the round was incremented.
2495        assert_eq!(primary.current_round(), round + 1);
2496    }
2497
2498    #[tokio::test(flavor = "multi_thread")]
2499    async fn test_batch_signature_from_peer_in_round() {
2500        let round = 5;
2501        let mut rng = TestRng::default();
2502        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2503        map_account_addresses(&primary, &accounts);
2504
2505        // Generate certificates.
2506        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2507
2508        // Create a valid proposal.
2509        let timestamp = now();
2510        let proposal = create_test_proposal(
2511            primary.gateway.account(),
2512            primary.ledger.current_committee().unwrap(),
2513            round,
2514            previous_certificates,
2515            timestamp,
2516            &mut rng,
2517        );
2518
2519        // Store the proposal on the primary.
2520        *primary.proposed_batch.write() = Some(proposal);
2521
2522        // Each committee member signs the batch.
2523        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2524
2525        // Have the primary process the signatures.
2526        for (socket_addr, signature) in signatures {
2527            primary.process_batch_signature_from_peer(socket_addr, signature).await.unwrap();
2528        }
2529
2530        // Check the certificate was created and stored by the primary.
2531        assert!(primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2532        // Check the round was incremented.
2533        assert_eq!(primary.current_round(), round + 1);
2534    }
2535
2536    #[tokio::test]
2537    async fn test_batch_signature_from_peer_no_quorum() {
2538        let mut rng = TestRng::default();
2539        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2540        map_account_addresses(&primary, &accounts);
2541
2542        // Create a valid proposal.
2543        let round = 1;
2544        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2545        let proposal = create_test_proposal(
2546            primary.gateway.account(),
2547            primary.ledger.current_committee().unwrap(),
2548            round,
2549            Default::default(),
2550            timestamp,
2551            &mut rng,
2552        );
2553
2554        // Store the proposal on the primary.
2555        *primary.proposed_batch.write() = Some(proposal);
2556
2557        // Each committee member signs the batch.
2558        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2559
2560        // Have the primary process only one signature, mimicking a lack of quorum.
2561        let (socket_addr, signature) = signatures.first().unwrap();
2562        primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2563
2564        // Check the certificate was not created and stored by the primary.
2565        assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2566        // Check the round was incremented.
2567        assert_eq!(primary.current_round(), round);
2568    }
2569
2570    #[tokio::test]
2571    async fn test_batch_signature_from_peer_in_round_no_quorum() {
2572        let round = 7;
2573        let mut rng = TestRng::default();
2574        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2575        map_account_addresses(&primary, &accounts);
2576
2577        // Generate certificates.
2578        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
2579
2580        // Create a valid proposal.
2581        let timestamp = now() + MIN_BATCH_DELAY_IN_SECS as i64;
2582        let proposal = create_test_proposal(
2583            primary.gateway.account(),
2584            primary.ledger.current_committee().unwrap(),
2585            round,
2586            previous_certificates,
2587            timestamp,
2588            &mut rng,
2589        );
2590
2591        // Store the proposal on the primary.
2592        *primary.proposed_batch.write() = Some(proposal);
2593
2594        // Each committee member signs the batch.
2595        let signatures = peer_signatures_for_proposal(&primary, &accounts, &mut rng);
2596
2597        // Have the primary process only one signature, mimicking a lack of quorum.
2598        let (socket_addr, signature) = signatures.first().unwrap();
2599        primary.process_batch_signature_from_peer(*socket_addr, *signature).await.unwrap();
2600
2601        // Check the certificate was not created and stored by the primary.
2602        assert!(!primary.storage.contains_certificate_in_round_from(round, primary.gateway.account().address()));
2603        // Check the round was incremented.
2604        assert_eq!(primary.current_round(), round);
2605    }
2606
2607    #[tokio::test]
2608    async fn test_insert_certificate_with_aborted_transmissions() {
2609        let round = 3;
2610        let prev_round = round - 1;
2611        let mut rng = TestRng::default();
2612        let (primary, accounts) = primary_without_handlers(&mut rng).await;
2613        let peer_account = &accounts[1];
2614        let peer_ip = peer_account.0;
2615
2616        // Fill primary storage.
2617        store_certificate_chain(&primary, &accounts, round, &mut rng);
2618
2619        // Get transmissions from previous certificates.
2620        let previous_certificate_ids: IndexSet<_> = primary.storage.get_certificate_ids_for_round(prev_round);
2621
2622        // Generate a solution and a transaction.
2623        let (solution_commitment, solution) = sample_unconfirmed_solution(&mut rng);
2624        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
2625
2626        // Store it on one of the workers.
2627        primary.workers[0].process_unconfirmed_solution(solution_commitment, solution).await.unwrap();
2628        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
2629
2630        // Check that the worker has 2 transmissions.
2631        assert_eq!(primary.workers[0].num_transmissions(), 2);
2632
2633        // Create certificates for the current round.
2634        let account = accounts[0].1.clone();
2635        let (certificate, transmissions) =
2636            create_batch_certificate(account.address(), &accounts, round, previous_certificate_ids.clone(), &mut rng);
2637        let certificate_id = certificate.id();
2638
2639        // Randomly abort some of the transmissions.
2640        let mut aborted_transmissions = HashSet::new();
2641        let mut transmissions_without_aborted = HashMap::new();
2642        for (transmission_id, transmission) in transmissions.clone() {
2643            match rng.gen::<bool>() || aborted_transmissions.is_empty() {
2644                true => {
2645                    // Insert the aborted transmission.
2646                    aborted_transmissions.insert(transmission_id);
2647                }
2648                false => {
2649                    // Insert the transmission without the aborted transmission.
2650                    transmissions_without_aborted.insert(transmission_id, transmission);
2651                }
2652            };
2653        }
2654
2655        // Add the non-aborted transmissions to the worker.
2656        for (transmission_id, transmission) in transmissions_without_aborted.iter() {
2657            primary.workers[0].process_transmission_from_peer(peer_ip, *transmission_id, transmission.clone());
2658        }
2659
2660        // Check that inserting the transmission with missing transmissions fails.
2661        assert!(
2662            primary
2663                .storage
2664                .check_certificate(&certificate, transmissions_without_aborted.clone(), Default::default())
2665                .is_err()
2666        );
2667        assert!(
2668            primary
2669                .storage
2670                .insert_certificate(certificate.clone(), transmissions_without_aborted.clone(), Default::default())
2671                .is_err()
2672        );
2673
2674        // Insert the certificate to storage.
2675        primary
2676            .storage
2677            .insert_certificate(certificate, transmissions_without_aborted, aborted_transmissions.clone())
2678            .unwrap();
2679
2680        // Ensure the certificate exists in storage.
2681        assert!(primary.storage.contains_certificate(certificate_id));
2682        // Ensure that the aborted transmission IDs exist in storage.
2683        for aborted_transmission_id in aborted_transmissions {
2684            assert!(primary.storage.contains_transmission(aborted_transmission_id));
2685            assert!(primary.storage.get_transmission(aborted_transmission_id).is_none());
2686        }
2687    }
2688}