amareleo_node_bft/
primary.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    DEVELOPMENT_MODE_RNG_SEED,
18    MAX_BATCH_DELAY_IN_MS,
19    MAX_WORKERS,
20    MIN_BATCH_DELAY_IN_SECS,
21    Sync,
22    Worker,
23    helpers::{
24        BFTSender,
25        PrimaryReceiver,
26        PrimarySender,
27        Proposal,
28        ProposalCache,
29        SignedProposals,
30        Storage,
31        assign_to_worker,
32        assign_to_workers,
33        fmt_id,
34        now,
35    },
36    spawn_blocking,
37};
38use amareleo_chain_account::Account;
39use amareleo_chain_tracing::{TracingHandler, TracingHandlerGuard};
40use amareleo_node_bft_ledger_service::LedgerService;
41use amareleo_node_sync::DUMMY_SELF_IP;
42use snarkvm::{
43    console::{prelude::*, types::Address},
44    ledger::{
45        block::Transaction,
46        narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
47        puzzle::{Solution, SolutionID},
48    },
49    prelude::{Signature, committee::Committee},
50};
51
52use aleo_std::StorageMode;
53use colored::Colorize;
54use futures::stream::{FuturesUnordered, StreamExt};
55use indexmap::IndexMap;
56#[cfg(feature = "locktick")]
57use locktick::{
58    parking_lot::{Mutex, RwLock},
59    tokio::Mutex as TMutex,
60};
61#[cfg(not(feature = "locktick"))]
62use parking_lot::{Mutex, RwLock};
63use rand::SeedableRng;
64use rand_chacha::ChaChaRng;
65use snarkvm::console::account::PrivateKey;
66
67use std::{
68    collections::{HashMap, HashSet},
69    future::Future,
70    net::SocketAddr,
71    sync::{
72        Arc,
73        atomic::{AtomicBool, Ordering},
74    },
75    time::Duration,
76};
77#[cfg(not(feature = "locktick"))]
78use tokio::sync::Mutex as TMutex;
79use tokio::{sync::OnceCell, task::JoinHandle};
80use tracing::subscriber::DefaultGuard;
81
82/// A helper type for an optional proposed batch.
83pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
84
85#[derive(Clone)]
86pub struct Primary<N: Network> {
87    /// The sync module.
88    sync: Sync<N>,
89    /// Account
90    account: Account<N>,
91    /// The storage.
92    storage: Storage<N>,
93    /// Preserve the chain state on shutdown
94    keep_state: bool,
95    /// Set if primary is initialized and requires saving state on shutdown
96    save_pending: Arc<AtomicBool>,
97    /// The storage mode.
98    storage_mode: StorageMode,
99    /// The ledger service.
100    ledger: Arc<dyn LedgerService<N>>,
101    /// The workers.
102    workers: Arc<[Worker<N>]>,
103    /// The BFT sender.
104    bft_sender: Arc<OnceCell<BFTSender<N>>>,
105    /// The batch proposal, if the primary is currently proposing a batch.
106    proposed_batch: Arc<ProposedBatch<N>>,
107    /// The timestamp of the most recent proposed batch.
108    latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
109    /// The recently-signed batch proposals.
110    signed_proposals: Arc<RwLock<SignedProposals<N>>>,
111    /// The spawned handles.
112    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
113    /// Tracing handle
114    tracing: Option<TracingHandler>,
115    /// The lock for propose_batch.
116    propose_lock: Arc<TMutex<u64>>,
117}
118
119impl<N: Network> TracingHandlerGuard for Primary<N> {
120    /// Retruns tracing guard
121    fn get_tracing_guard(&self) -> Option<DefaultGuard> {
122        self.tracing.as_ref().and_then(|trace_handle| trace_handle.get_tracing_guard())
123    }
124}
125
126impl<N: Network> Primary<N> {
127    /// The maximum number of unconfirmed transmissions to send to the primary.
128    pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
129
130    /// Initializes a new primary instance.
131    pub fn new(
132        account: Account<N>,
133        storage: Storage<N>,
134        keep_state: bool,
135        storage_mode: StorageMode,
136        ledger: Arc<dyn LedgerService<N>>,
137        tracing: Option<TracingHandler>,
138    ) -> Result<Self> {
139        // Initialize the sync module.
140        let sync = Sync::new(storage.clone(), ledger.clone());
141
142        // Initialize the primary instance.
143        Ok(Self {
144            sync,
145            account,
146            storage,
147            keep_state,
148            save_pending: Arc::new(AtomicBool::new(false)),
149            storage_mode,
150            ledger,
151            workers: Arc::from(vec![]),
152            bft_sender: Default::default(),
153            proposed_batch: Default::default(),
154            latest_proposed_batch_timestamp: Default::default(),
155            signed_proposals: Default::default(),
156            handles: Default::default(),
157            tracing,
158            propose_lock: Default::default(),
159        })
160    }
161
162    /// Load the proposal cache file and update the Primary state with the stored data.
163    async fn load_proposal_cache(&self) -> Result<()> {
164        // Fetch the signed proposals from the file system if it exists.
165        match ProposalCache::<N>::exists(&self.storage_mode) {
166            // If the proposal cache exists, then process the proposal cache.
167            true => match ProposalCache::<N>::load(self.account.address(), &self.storage_mode, self) {
168                Ok(proposal_cache) => {
169                    // Extract the proposal and signed proposals.
170                    let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
171                        proposal_cache.into();
172
173                    // Write the proposed batch.
174                    *self.proposed_batch.write() = proposed_batch;
175                    // Write the signed proposals.
176                    *self.signed_proposals.write() = signed_proposals;
177                    // Writ the propose lock.
178                    *self.propose_lock.lock().await = latest_certificate_round;
179
180                    // Update the storage with the pending certificates.
181                    for certificate in pending_certificates {
182                        let batch_id = certificate.batch_id();
183                        // We use a dummy IP because the node should not need to request from any peers.
184                        // The storage should have stored all the transmissions. If not, we simply
185                        // skip the certificate.
186                        if let Err(err) = self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
187                        {
188                            guard_warn!(
189                                self,
190                                "Failed to load stored certificate {} from proposal cache - {err}",
191                                fmt_id(batch_id)
192                            );
193                        }
194                    }
195                    Ok(())
196                }
197                Err(err) => {
198                    bail!("Failed to read the signed proposals from the file system - {err}.");
199                }
200            },
201            // If the proposal cache does not exist, then return early.
202            false => Ok(()),
203        }
204    }
205
206    /// Run the primary instance.
207    pub async fn run(
208        &mut self,
209        bft_sender: Option<BFTSender<N>>,
210        _primary_sender: PrimarySender<N>,
211        primary_receiver: PrimaryReceiver<N>,
212    ) -> Result<()> {
213        guard_info!(self, "Starting the primary instance of the memory pool...");
214
215        // Set the BFT sender.
216        if let Some(bft_sender) = &bft_sender {
217            // Set the BFT sender in the primary.
218            if self.bft_sender.set(bft_sender.clone()).is_err() {
219                guard_error!(self, "Unexpected: BFT sender already set");
220                bail!("Unexpected: BFT sender already set");
221            }
222        }
223
224        // Construct a map for the workers.
225        let mut workers = Vec::new();
226        // Initialize the workers.
227        for id in 0..MAX_WORKERS {
228            // Construct the worker instance.
229            let worker = Worker::new(
230                id,
231                self.storage.clone(),
232                self.ledger.clone(),
233                self.proposed_batch.clone(),
234                self.tracing.clone(),
235            )?;
236
237            // Add the worker to the list of workers.
238            workers.push(worker);
239        }
240        // Set the workers.
241        self.workers = Arc::from(workers);
242
243        // Next, initialize the sync module and sync the storage from ledger.
244        self.sync.initialize(bft_sender).await?;
245        // Next, load and process the proposal cache before running the sync module.
246        self.load_proposal_cache().await?;
247        // Next, run the sync module.
248        self.sync.run().await?;
249
250        // Lastly, start the primary handlers.
251        // Note: This ensures the primary does not start communicating before syncing is complete.
252        self.start_handlers(primary_receiver);
253
254        // Once everything is initialized. Enable saving of state on shutdown.
255        if self.keep_state {
256            self.save_pending.store(true, Ordering::Relaxed);
257        }
258
259        Ok(())
260    }
261
262    /// Returns the current round.
263    pub fn current_round(&self) -> u64 {
264        self.storage.current_round()
265    }
266
267    /// Returns `true` if the primary is synced.
268    pub fn is_synced(&self) -> bool {
269        self.sync.is_synced()
270    }
271
272    /// Returns the account of the node.
273    pub const fn account(&self) -> &Account<N> {
274        &self.account
275    }
276
277    /// Returns the storage.
278    pub const fn storage(&self) -> &Storage<N> {
279        &self.storage
280    }
281
282    /// Returns the ledger.
283    pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
284        &self.ledger
285    }
286
287    /// Returns the number of workers.
288    pub fn num_workers(&self) -> u8 {
289        u8::try_from(self.workers.len()).expect("Too many workers")
290    }
291}
292
293impl<N: Network> Primary<N> {
294    /// Returns the number of unconfirmed transmissions.
295    pub fn num_unconfirmed_transmissions(&self) -> usize {
296        self.workers.iter().map(|worker| worker.num_transmissions()).sum()
297    }
298
299    /// Returns the number of unconfirmed ratifications.
300    pub fn num_unconfirmed_ratifications(&self) -> usize {
301        self.workers.iter().map(|worker| worker.num_ratifications()).sum()
302    }
303
304    /// Returns the number of solutions.
305    pub fn num_unconfirmed_solutions(&self) -> usize {
306        self.workers.iter().map(|worker| worker.num_solutions()).sum()
307    }
308
309    /// Returns the number of unconfirmed transactions.
310    pub fn num_unconfirmed_transactions(&self) -> usize {
311        self.workers.iter().map(|worker| worker.num_transactions()).sum()
312    }
313}
314
315impl<N: Network> Primary<N> {
316    /// Returns the worker transmission IDs.
317    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
318        self.workers.iter().flat_map(|worker| worker.transmission_ids())
319    }
320
321    /// Returns the worker transmissions.
322    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
323        self.workers.iter().flat_map(|worker| worker.transmissions())
324    }
325
326    /// Returns the worker solutions.
327    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
328        self.workers.iter().flat_map(|worker| worker.solutions())
329    }
330
331    /// Returns the worker transactions.
332    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
333        self.workers.iter().flat_map(|worker| worker.transactions())
334    }
335}
336
337impl<N: Network> Primary<N> {
338    /// Clears the worker solutions.
339    pub fn clear_worker_solutions(&self) {
340        self.workers.iter().for_each(Worker::clear_solutions);
341    }
342}
343
344impl<N: Network> Primary<N> {
345    pub async fn propose_batch(&self, complete: bool) -> Result<()> {
346        let mut rng = ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
347        let mut all_acc: Vec<Account<N>> = Vec::new();
348        for _ in 0u64..4u64 {
349            let private_key = PrivateKey::<N>::new(&mut rng)?;
350            let acc = Account::<N>::try_from(private_key)?;
351            all_acc.push(acc);
352        }
353
354        // Submit proposal for validator with id 0
355        let other_acc: Vec<&Account<N>> = all_acc.iter().skip(1).collect();
356        let round = self.propose_batch_lite(&other_acc, complete).await?;
357        if !complete || round == 0u64 {
358            return Ok(());
359        }
360
361        // Submit empty proposals for other validators
362        for vid in 1u64..4u64 {
363            let primary_acc = &all_acc[vid as usize];
364            let other_acc: Vec<&Account<N>> =
365                all_acc.iter().filter(|acc| acc.address() != primary_acc.address()).collect();
366
367            self.fake_proposal(vid, primary_acc, &other_acc, round).await?;
368        }
369        Ok(())
370    }
371
372    pub async fn propose_batch_lite(&self, other_acc: &[&Account<N>], complete: bool) -> Result<u64> {
373        // This function isn't re-entrant.
374        let mut lock_guard = self.propose_lock.lock().await;
375
376        // Retrieve the current round.
377        let round = self.current_round();
378        // Compute the previous round.
379        let previous_round = round.saturating_sub(1);
380
381        // If the current round is 0, return early.
382        ensure!(round > 0, "Round 0 cannot have transaction batches");
383
384        // If the current storage round is below the latest proposal round, then return early.
385        if round < *lock_guard {
386            guard_warn!(
387                self,
388                "Cannot propose a batch for round {round} - the latest proposal cache round is {}",
389                *lock_guard
390            );
391            return Ok(0u64);
392        }
393
394        #[cfg(feature = "metrics")]
395        metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
396
397        // Ensure that the primary does not create a new proposal too quickly.
398        if let Err(e) = self.check_proposal_timestamp(previous_round, self.account.address(), now()) {
399            guard_debug!(
400                self,
401                "Primary is safely skipping a batch proposal for round {round} - {}",
402                format!("{e}").dimmed()
403            );
404            return Ok(0u64);
405        }
406
407        // Ensure the primary has not proposed a batch for this round before.
408        if self.storage.contains_certificate_in_round_from(round, self.account.address()) {
409            // If a BFT sender was provided, attempt to advance the current round.
410            if let Some(bft_sender) = self.bft_sender.get() {
411                match bft_sender.send_primary_round_to_bft(self.current_round()).await {
412                    // 'is_ready' is true if the primary is ready to propose a batch for the next round.
413                    Ok(true) => (), // continue,
414                    // 'is_ready' is false if the primary is not ready to propose a batch for the next round.
415                    Ok(false) => return Ok(0u64),
416                    // An error occurred while attempting to advance the current round.
417                    Err(e) => {
418                        guard_warn!(self, "Failed to update the BFT to the next round - {e}");
419                        return Err(e);
420                    }
421                }
422            }
423            guard_debug!(
424                self,
425                "Primary is safely skipping {}",
426                format!("(round {round} was already certified)").dimmed()
427            );
428            return Ok(0u64);
429        }
430
431        // Determine if the current round has been proposed.
432        // Note: Do NOT make this judgment in advance before rebroadcast and round update. Rebroadcasting is
433        // good for network reliability and should not be prevented for the already existing proposed_batch.
434        // If a certificate already exists for the current round, an attempt should be made to advance the
435        // round as early as possible.
436        if round == *lock_guard {
437            guard_warn!(self, "Primary is safely skipping a batch proposal - round {round} already proposed");
438            return Ok(0u64);
439        }
440
441        // Retrieve the committee to check against.
442        let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
443        // Check if the primary is connected to enough validators to reach quorum threshold.
444        {
445            // Retrieve the connected validator addresses.
446            let mut connected_validators: HashSet<Address<N>> = other_acc.iter().map(|acc| acc.address()).collect();
447
448            // Append the primary to the set.
449            connected_validators.insert(self.account.address());
450
451            // If quorum threshold is not reached, return early.
452            if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
453                guard_debug!(
454                    self,
455                    "Primary is safely skipping a batch proposal for round {round} {}",
456                    "(please connect to more validators)".dimmed()
457                );
458                guard_trace!(self, "Primary is connected to {} validators", connected_validators.len() - 1);
459                return Ok(0u64);
460            }
461        }
462
463        // Retrieve the previous certificates.
464        let previous_certificates = self.storage.get_certificates_for_round(previous_round);
465
466        // Check if the batch is ready to be proposed.
467        // Note: The primary starts at round 1, and round 0 contains no certificates, by definition.
468        let mut is_ready = previous_round == 0;
469        // If the previous round is not 0, check if the previous certificates have reached the quorum threshold.
470        if previous_round > 0 {
471            // Retrieve the committee lookback for the round.
472            let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
473                bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
474            };
475            // Construct a set over the authors.
476            let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
477            // Check if the previous certificates have reached the quorum threshold.
478            if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
479                is_ready = true;
480            }
481        }
482        // If the batch is not ready to be proposed, return early.
483        if !is_ready {
484            guard_debug!(
485                self,
486                "Primary is safely skipping a batch proposal for round {round} {}",
487                format!("(previous round {previous_round} has not reached quorum)").dimmed()
488            );
489            return Ok(0u64);
490        }
491
492        // Initialize the map of transmissions.
493        let mut transmissions: IndexMap<_, _> = Default::default();
494        // Track the total execution costs of the batch proposal as it is being constructed.
495        let mut proposal_cost = 0u64;
496        // Note: worker draining and transaction inclusion needs to be thought
497        // through carefully when there is more than one worker. The fairness
498        // provided by one worker (FIFO) is no longer guaranteed with multiple workers.
499        debug_assert_eq!(MAX_WORKERS, 1);
500
501        'outer: for worker in self.workers.iter() {
502            let mut num_worker_transmissions = 0usize;
503
504            while let Some((id, transmission)) = worker.remove_front() {
505                // Check the selected transmissions are below the batch limit.
506                if transmissions.len() >= BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH {
507                    break 'outer;
508                }
509
510                // Check the max transmissions per worker is not exceeded.
511                if num_worker_transmissions >= Worker::<N>::MAX_TRANSMISSIONS_PER_WORKER {
512                    continue 'outer;
513                }
514
515                // Check if the ledger already contains the transmission.
516                if self.ledger.contains_transmission(&id).unwrap_or(true) {
517                    guard_trace!(self, "Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
518                    continue;
519                }
520
521                // Check if the storage already contain the transmission.
522                // Note: We do not skip if this is the first transmission in the proposal, to ensure that
523                // the primary does not propose a batch with no transmissions.
524                if !transmissions.is_empty() && self.storage.contains_transmission(id) {
525                    guard_trace!(self, "Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
526                    continue;
527                }
528
529                // Check the transmission is still valid.
530                match (id, transmission.clone()) {
531                    (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
532                        // Ensure the checksum matches. If not, skip the solution.
533                        if !matches!(solution.to_checksum::<N>(), Ok(solution_checksum) if solution_checksum == checksum)
534                        {
535                            guard_trace!(
536                                self,
537                                "Proposing - Skipping solution '{}' - Checksum mismatch",
538                                fmt_id(solution_id)
539                            );
540                            continue;
541                        }
542                        // Check if the solution is still valid.
543                        if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
544                            guard_trace!(self, "Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
545                            continue;
546                        }
547                    }
548                    (TransmissionID::Transaction(transaction_id, checksum), Transmission::Transaction(transaction)) => {
549                        // Ensure the checksum matches. If not, skip the transaction.
550                        if !matches!(transaction.to_checksum::<N>(), Ok(transaction_checksum) if transaction_checksum == checksum )
551                        {
552                            guard_trace!(
553                                self,
554                                "Proposing - Skipping transaction '{}' - Checksum mismatch",
555                                fmt_id(transaction_id)
556                            );
557                            continue;
558                        }
559
560                        // Deserialize the transaction. If the transaction exceeds the maximum size, then return an error.
561                        let transaction = spawn_blocking!({
562                            match transaction {
563                                Data::Object(transaction) => Ok(transaction),
564                                Data::Buffer(bytes) => {
565                                    Ok(Transaction::<N>::read_le(&mut bytes.take(N::MAX_TRANSACTION_SIZE as u64))?)
566                                }
567                            }
568                        })?;
569
570                        // Check if the transaction is still valid.
571                        // TODO: check if clone is cheap, otherwise fix.
572                        if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction.clone()).await {
573                            guard_trace!(self, "Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
574                            continue;
575                        }
576
577                        // Compute the transaction spent cost (in microcredits).
578                        // Note: We purposefully discard this transaction if we are unable to compute the spent cost.
579                        let Ok(cost) = self.ledger.transaction_spent_cost_in_microcredits(transaction_id, transaction)
580                        else {
581                            guard_debug!(
582                                self,
583                                "Proposing - Skipping and discarding transaction '{}' - Unable to compute transaction spent cost",
584                                fmt_id(transaction_id)
585                            );
586                            continue;
587                        };
588
589                        // Compute the next proposal cost.
590                        // Note: We purposefully discard this transaction if the proposal cost overflows.
591                        let Some(next_proposal_cost) = proposal_cost.checked_add(cost) else {
592                            guard_debug!(
593                                self,
594                                "Proposing - Skipping and discarding transaction '{}' - Proposal cost overflowed",
595                                fmt_id(transaction_id)
596                            );
597                            continue;
598                        };
599
600                        // Check if the next proposal cost exceeds the batch proposal spend limit.
601                        if next_proposal_cost > BatchHeader::<N>::BATCH_SPEND_LIMIT {
602                            guard_trace!(
603                                self,
604                                "Proposing - Skipping transaction '{}' - Batch spend limit surpassed ({next_proposal_cost} > {})",
605                                fmt_id(transaction_id),
606                                BatchHeader::<N>::BATCH_SPEND_LIMIT
607                            );
608
609                            // Reinsert the transmission into the worker.
610                            worker.insert_front(id, transmission);
611                            break 'outer;
612                        }
613
614                        // Update the proposal cost.
615                        proposal_cost = next_proposal_cost;
616                    }
617
618                    // Note: We explicitly forbid including ratifications,
619                    // as the protocol currently does not support ratifications.
620                    (TransmissionID::Ratification, Transmission::Ratification) => continue,
621                    // All other combinations are clearly invalid.
622                    _ => continue,
623                }
624
625                // If the transmission is valid, insert it into the proposal's transmission list.
626                transmissions.insert(id, transmission);
627                num_worker_transmissions = num_worker_transmissions.saturating_add(1);
628            }
629        }
630
631        // Determine the current timestamp.
632        let current_timestamp = now();
633
634        *lock_guard = round;
635
636        /* Proceeding to sign & propose the batch. */
637        guard_info!(self, "Proposing a batch with {} transmissions for round {round}...", transmissions.len());
638
639        // Retrieve the private key.
640        let private_key = *self.account.private_key();
641        // Retrieve the committee ID.
642        let committee_id = committee_lookback.id();
643        // Prepare the transmission IDs.
644        let transmission_ids = transmissions.keys().copied().collect();
645        // Prepare the previous batch certificate IDs.
646        let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
647        // Sign the batch header and construct the proposal.
648        let (batch_header, proposal) = spawn_blocking!(BatchHeader::new(
649            &private_key,
650            round,
651            current_timestamp,
652            committee_id,
653            transmission_ids,
654            previous_certificate_ids,
655            &mut rand::thread_rng()
656        ))
657        .and_then(|batch_header| {
658            Proposal::new(committee_lookback.clone(), batch_header.clone(), transmissions.clone())
659                .map(|proposal| (batch_header, proposal))
660        })
661        .inspect_err(|_| {
662            // On error, reinsert the transmissions and then propagate the error.
663            if let Err(e) = self.reinsert_transmissions_into_workers(transmissions) {
664                guard_error!(self, "Failed to reinsert transmissions: {e:?}");
665            }
666        })?;
667        // Set the timestamp of the latest proposed batch.
668        *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
669
670        // // Set the proposed batch.
671        // *self.proposed_batch.write() = Some(proposal);
672
673        // Processing proposal
674        if complete {
675            self.complete_batch_lite(other_acc, round, batch_header, proposal, committee_lookback).await?;
676        }
677
678        Ok(round)
679    }
680
681    pub async fn complete_batch_lite(
682        &self,
683        other_acc: &[&Account<N>],
684        round: u64,
685        batch_header: BatchHeader<N>,
686        mut proposal: Proposal<N>,
687        committee_lookback: Committee<N>,
688    ) -> Result<()> {
689        guard_info!(self, "Quorum threshold reached - Preparing to certify our batch for round {round}...");
690
691        // Retrieve the batch ID.
692        let batch_id = batch_header.batch_id();
693
694        // Forge signatures of other validators.
695        for acc in other_acc.iter() {
696            // Sign the batch ID.
697            let signer_acc = (*acc).clone();
698            let signer = signer_acc.address();
699            let signature = spawn_blocking!(signer_acc.sign(&[batch_id], &mut rand::thread_rng()))?;
700
701            // Add the signature to the batch.
702            proposal.add_signature(signer, signature, &committee_lookback)?;
703        }
704
705        // Store the certified batch and broadcast it to all validators.
706        // If there was an error storing the certificate, reinsert the transmissions back into the ready queue.
707        if let Err(e) = self.store_and_broadcast_certificate_lite(&proposal, &committee_lookback).await {
708            // Reinsert the transmissions back into the ready queue for the next proposal.
709            self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
710            return Err(e);
711        }
712
713        #[cfg(feature = "metrics")]
714        metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
715        Ok(())
716    }
717
718    pub async fn fake_proposal(
719        &self,
720        vid: u64,
721        primary_acc: &Account<N>,
722        other_acc: &[&Account<N>],
723        round: u64,
724    ) -> Result<()> {
725        let transmissions: IndexMap<_, _> = Default::default();
726        let transmission_ids = transmissions.keys().copied().collect();
727
728        let private_key = *primary_acc.private_key();
729        let current_timestamp = now();
730
731        let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
732        let committee_id = committee_lookback.id();
733
734        let previous_round = round.saturating_sub(1);
735        let previous_certificates = self.storage.get_certificates_for_round(previous_round);
736        let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
737
738        let (batch_header, mut proposal) = spawn_blocking!(BatchHeader::new(
739            &private_key,
740            round,
741            current_timestamp,
742            committee_id,
743            transmission_ids,
744            previous_certificate_ids,
745            &mut rand::thread_rng()
746        ))
747        .and_then(|batch_header| {
748            Proposal::new(committee_lookback.clone(), batch_header.clone(), transmissions.clone())
749                .map(|proposal| (batch_header, proposal))
750        })?;
751
752        // Retrieve the batch ID.
753        let batch_id = batch_header.batch_id();
754        let mut our_sign: Option<Signature<N>> = None;
755
756        // Forge signatures of other validators.
757        for acc in other_acc.iter() {
758            // Sign the batch ID.
759            let signer_acc = (*acc).clone();
760            let signer = signer_acc.address();
761            let signature = spawn_blocking!(signer_acc.sign(&[batch_id], &mut rand::thread_rng()))?;
762
763            if signer == self.account.address() {
764                our_sign = Some(signature);
765            }
766
767            // Add the signature to the batch.
768            proposal.add_signature(signer, signature, &committee_lookback)?;
769        }
770
771        // Ensure our signature was not inserted (validator 0 signature)
772        let our_sign = match our_sign {
773            Some(sign) => sign,
774            None => bail!("Fake Proposal generation failed. Validator 0 signature missing."),
775        };
776
777        // Create the batch certificate and transmissions.
778        let (certificate, transmissions) =
779            tokio::task::block_in_place(|| proposal.to_certificate(&committee_lookback))?;
780
781        // Convert the transmissions into a HashMap.
782        // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
783        let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
784
785        // Store the certified batch.
786        let (storage, certificate_) = (self.storage.clone(), certificate.clone());
787        spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
788        guard_info!(self, "Stored a batch certificate for validator/round {vid}/{round}");
789
790        match self.signed_proposals.write().0.entry(primary_acc.address()) {
791            std::collections::hash_map::Entry::Occupied(mut entry) => {
792                // If the validator has already signed a batch for this round, then return early,
793                // since, if the peer still has not received the signature, they will request it again,
794                // and the logic at the start of this function will resend the (now cached) signature
795                // to the peer if asked to sign this batch proposal again.
796                if entry.get().0 == round {
797                    return Ok(());
798                }
799                // Otherwise, cache the round, batch ID, and signature for this validator.
800                entry.insert((round, batch_id, our_sign));
801                guard_info!(self, "Inserted signature to signed_proposals {vid}/{round}");
802            }
803            // If the validator has not signed a batch before, then continue.
804            std::collections::hash_map::Entry::Vacant(entry) => {
805                // Cache the round, batch ID, and signature for this validator.
806                entry.insert((round, batch_id, our_sign));
807                guard_info!(self, "Inserted signature to signed_proposals {vid}/{round}");
808            }
809        };
810
811        if let Some(bft_sender) = self.bft_sender.get() {
812            // Send the certificate to the BFT.
813            if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
814                guard_warn!(self, "Failed to update the BFT DAG from sync: {e}");
815                return Err(e);
816            };
817        }
818
819        Ok(())
820    }
821}
822
823impl<N: Network> Primary<N> {
824    /// Starts the primary handlers.
825    fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
826        let PrimaryReceiver { mut rx_unconfirmed_solution, mut rx_unconfirmed_transaction } = primary_receiver;
827
828        // Start the batch proposer.
829        let self_ = self.clone();
830        self.spawn(async move {
831            loop {
832                // Sleep briefly, but longer than if there were no batch.
833                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
834                let current_round = self_.current_round();
835                // If the primary is not synced, then do not propose a batch.
836                if !self_.is_synced() {
837                    guard_debug!(
838                        self_,
839                        "Skipping batch proposal for round {current_round} {}",
840                        "(node is syncing)".dimmed()
841                    );
842                    continue;
843                }
844                // A best-effort attempt to skip the scheduled batch proposal if
845                // round progression already triggered one.
846                if self_.propose_lock.try_lock().is_err() {
847                    guard_trace!(
848                        self_,
849                        "Skipping batch proposal for round {current_round} {}",
850                        "(node is already proposing)".dimmed()
851                    );
852                    continue;
853                };
854                // If there is no proposed batch, attempt to propose a batch.
855                // Note: Do NOT spawn a task around this function call. Proposing a batch is a critical path,
856                // and only one batch needs be proposed at a time.
857                if let Err(e) = self_.propose_batch(true).await {
858                    guard_warn!(self_, "Cannot propose a batch - {e}");
859                }
860            }
861        });
862
863        // Periodically try to increment to the next round.
864        // Note: This is necessary to ensure that the primary is not stuck on a previous round
865        // despite having received enough certificates to advance to the next round.
866        let self_ = self.clone();
867        self.spawn(async move {
868            loop {
869                // Sleep briefly.
870                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
871                // If the primary is not synced, then do not increment to the next round.
872                if !self_.is_synced() {
873                    guard_trace!(self_, "Skipping round increment {}", "(node is syncing)".dimmed());
874                    continue;
875                }
876                // Attempt to increment to the next round.
877                let next_round = self_.current_round().saturating_add(1);
878                // Determine if the quorum threshold is reached for the current round.
879                let is_quorum_threshold_reached = {
880                    // Retrieve the certificate authors for the next round.
881                    let authors = self_.storage.get_certificate_authors_for_round(next_round);
882                    // If there are no certificates, then skip this check.
883                    if authors.is_empty() {
884                        continue;
885                    }
886                    let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(next_round) else {
887                        guard_warn!(self_, "Failed to retrieve the committee lookback for round {next_round}");
888                        continue;
889                    };
890                    committee_lookback.is_quorum_threshold_reached(&authors)
891                };
892                // Attempt to increment to the next round if the quorum threshold is reached.
893                if is_quorum_threshold_reached {
894                    guard_debug!(self_, "Quorum threshold reached for round {}", next_round);
895                    if let Err(e) = self_.try_increment_to_the_next_round(next_round).await {
896                        guard_warn!(self_, "Failed to increment to the next round - {e}");
897                    }
898                }
899            }
900        });
901
902        // Process the unconfirmed solutions.
903        let self_ = self.clone();
904        self.spawn(async move {
905            while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
906                // Compute the checksum for the solution.
907                let Ok(checksum) = solution.to_checksum::<N>() else {
908                    guard_error!(self_, "Failed to compute the checksum for the unconfirmed solution");
909                    continue;
910                };
911                // Compute the worker ID.
912                let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
913                    guard_error!(self_, "Unable to determine the worker ID for the unconfirmed solution");
914                    continue;
915                };
916
917                // Retrieve the worker.
918                let worker = &self_.workers[worker_id as usize];
919                // Process the unconfirmed solution.
920                let result = worker.process_unconfirmed_solution(solution_id, solution).await;
921                // Send the result to the callback.
922                callback.send(result).ok();
923            }
924        });
925
926        // Process the unconfirmed transactions.
927        let self_ = self.clone();
928        self.spawn(async move {
929            while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
930                guard_trace!(self_, "Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
931                // Compute the checksum for the transaction.
932                let Ok(checksum) = transaction.to_checksum::<N>() else {
933                    guard_error!(self_, "Failed to compute the checksum for the unconfirmed transaction");
934                    continue;
935                };
936                // Compute the worker ID.
937                let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
938                    guard_error!(self_, "Unable to determine the worker ID for the unconfirmed transaction");
939                    continue;
940                };
941
942                // Retrieve the worker.
943                let worker = &self_.workers[worker_id as usize];
944                // Process the unconfirmed transaction.
945                let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
946                // Send the result to the callback.
947                callback.send(result).ok();
948            }
949        });
950    }
951
952    /// Increments to the next round.
953    async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
954        // If the next round is within GC range, then iterate to the penultimate round.
955        if self.current_round() + self.storage.max_gc_rounds() >= next_round {
956            let mut fast_forward_round = self.current_round();
957            // Iterate until the penultimate round is reached.
958            while fast_forward_round < next_round.saturating_sub(1) {
959                // Update to the next round in storage.
960                fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
961                // Clear the proposed batch.
962                *self.proposed_batch.write() = None;
963            }
964        }
965
966        // Retrieve the current round.
967        let current_round = self.current_round();
968        // Attempt to advance to the next round.
969        if current_round < next_round {
970            // If a BFT sender was provided, send the current round to the BFT.
971            let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
972                match bft_sender.send_primary_round_to_bft(current_round).await {
973                    Ok(is_ready) => is_ready,
974                    Err(e) => {
975                        guard_warn!(self, "Failed to update the BFT to the next round - {e}");
976                        return Err(e);
977                    }
978                }
979            }
980            // Otherwise, handle the Narwhal case.
981            else {
982                // Update to the next round in storage.
983                self.storage.increment_to_next_round(current_round)?;
984                // Set 'is_ready' to 'true'.
985                true
986            };
987
988            // Log whether the next round is ready.
989            match is_ready {
990                true => guard_debug!(self, "Primary is ready to propose the next round"),
991                false => guard_debug!(self, "Primary is not ready to propose the next round"),
992            }
993
994            // If the node is ready, propose a batch for the next round.
995            if is_ready {
996                self.propose_batch(true).await?;
997            }
998        }
999        Ok(())
1000    }
1001
1002    /// Increments to the next round.
1003    async fn try_increment_to_the_next_round_lite(&self, next_round: u64) -> Result<()> {
1004        // If the next round is within GC range, then iterate to the penultimate round.
1005        if self.current_round() + self.storage.max_gc_rounds() >= next_round {
1006            let mut fast_forward_round = self.current_round();
1007            // Iterate until the penultimate round is reached.
1008            while fast_forward_round < next_round.saturating_sub(1) {
1009                // Update to the next round in storage.
1010                fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
1011                // Clear the proposed batch.
1012                *self.proposed_batch.write() = None;
1013            }
1014        }
1015
1016        // Retrieve the current round.
1017        let current_round = self.current_round();
1018        // Attempt to advance to the next round.
1019        if current_round < next_round {
1020            // If a BFT sender was provided, send the current round to the BFT.
1021            let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
1022                match bft_sender.send_primary_round_to_bft(current_round).await {
1023                    Ok(is_ready) => is_ready,
1024                    Err(e) => {
1025                        guard_warn!(self, "Failed to update the BFT to the next round - {e}");
1026                        return Err(e);
1027                    }
1028                }
1029            }
1030            // Otherwise, handle the Narwhal case.
1031            else {
1032                // Update to the next round in storage.
1033                self.storage.increment_to_next_round(current_round)?;
1034                // Set 'is_ready' to 'true'.
1035                true
1036            };
1037
1038            // Log whether the next round is ready.
1039            match is_ready {
1040                true => guard_debug!(self, "Primary is ready to propose the next round"),
1041                false => guard_debug!(self, "Primary is not ready to propose the next round"),
1042            }
1043
1044            // // If the node is ready, propose a batch for the next round.
1045            // if is_ready {
1046            //     self.propose_batch(true).await?;
1047            // }
1048        }
1049        Ok(())
1050    }
1051
1052    /// Ensure the primary is not creating batch proposals too frequently.
1053    /// This checks that the certificate timestamp for the previous round is within the expected range.
1054    fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
1055        // Retrieve the timestamp of the previous timestamp to check against.
1056        let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
1057            // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
1058            Some(certificate) => certificate.timestamp(),
1059            None => *self.latest_proposed_batch_timestamp.read(),
1060        };
1061
1062        // Determine the elapsed time since the previous timestamp.
1063        let elapsed = timestamp
1064            .checked_sub(previous_timestamp)
1065            .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1066        // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
1067        match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1068            true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1069            false => Ok(()),
1070        }
1071    }
1072
1073    /// Stores the certified batch and broadcasts it to all validators, returning the certificate.
1074    async fn store_and_broadcast_certificate_lite(
1075        &self,
1076        proposal: &Proposal<N>,
1077        committee: &Committee<N>,
1078    ) -> Result<()> {
1079        // Create the batch certificate and transmissions.
1080        let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1081        // Convert the transmissions into a HashMap.
1082        // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
1083        let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1084        // Store the certified batch.
1085        let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1086        spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1087        guard_debug!(self, "Stored a batch certificate for round {}", certificate.round());
1088        // If a BFT sender was provided, send the certificate to the BFT.
1089        if let Some(bft_sender) = self.bft_sender.get() {
1090            // Await the callback to continue.
1091            if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1092                guard_warn!(self, "Failed to update the BFT DAG from primary - {e}");
1093                return Err(e);
1094            };
1095        }
1096        // Log the certified batch.
1097        let num_transmissions = certificate.transmission_ids().len();
1098        let round = certificate.round();
1099        guard_info!(self, "\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1100        // Increment to the next round.
1101        self.try_increment_to_the_next_round_lite(round + 1).await
1102    }
1103
1104    /// Stores the certified batch and broadcasts it to all validators, returning the certificate.
1105    /// Re-inserts the transmissions from the proposal into the workers.
1106    fn reinsert_transmissions_into_workers(
1107        &self,
1108        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1109    ) -> Result<()> {
1110        // Re-insert the transmissions into the workers.
1111        assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1112            worker.reinsert(transmission_id, transmission);
1113        })
1114    }
1115
1116    /// Recursively stores a given batch certificate, after ensuring:
1117    ///   - Ensure the round matches the committee round.
1118    ///   - Ensure the address is a member of the committee.
1119    ///   - Ensure the timestamp is within range.
1120    ///   - Ensure we have all of the transmissions.
1121    ///   - Ensure we have all of the previous certificates.
1122    ///   - Ensure the previous certificates are for the previous round (i.e. round - 1).
1123    ///   - Ensure the previous certificates have reached the quorum threshold.
1124    ///   - Ensure we have not already signed the batch ID.
1125    #[async_recursion::async_recursion]
1126    async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1127        &self,
1128        peer_ip: SocketAddr,
1129        certificate: BatchCertificate<N>,
1130    ) -> Result<()> {
1131        // Retrieve the batch header.
1132        let batch_header = certificate.batch_header();
1133        // Retrieve the batch round.
1134        let batch_round = batch_header.round();
1135
1136        // If the certificate round is outdated, do not store it.
1137        if batch_round <= self.storage.gc_round() {
1138            return Ok(());
1139        }
1140        // If the certificate already exists in storage, return early.
1141        if self.storage.contains_certificate(certificate.id()) {
1142            return Ok(());
1143        }
1144
1145        // If node is not in sync mode and the node is not synced. Then return an error.
1146        if !IS_SYNCING && !self.is_synced() {
1147            bail!(
1148                "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1149                fmt_id(certificate.id())
1150            );
1151        }
1152
1153        // If the peer is ahead, use the batch header to sync up to the peer.
1154        let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?;
1155
1156        // Check if the certificate needs to be stored.
1157        if !self.storage.contains_certificate(certificate.id()) {
1158            // Store the batch certificate.
1159            let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1160            spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1161            guard_debug!(self, "Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1162            // If a BFT sender was provided, send the round and certificate to the BFT.
1163            if let Some(bft_sender) = self.bft_sender.get() {
1164                // Send the certificate to the BFT.
1165                if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1166                    guard_warn!(self, "Failed to update the BFT DAG from sync: {e}");
1167                    return Err(e);
1168                };
1169            }
1170        }
1171        Ok(())
1172    }
1173
1174    /// Recursively syncs using the given batch header.
1175    async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>(
1176        &self,
1177        peer_ip: SocketAddr,
1178        batch_header: &BatchHeader<N>,
1179    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1180        // Retrieve the batch round.
1181        let batch_round = batch_header.round();
1182
1183        // If the certificate round is outdated, do not store it.
1184        if batch_round <= self.storage.gc_round() {
1185            bail!("Round {batch_round} is too far in the past")
1186        }
1187
1188        // If node is not in sync mode and the node is not synced. Then return an error.
1189        if !IS_SYNCING && !self.is_synced() {
1190            bail!(
1191                "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1192                fmt_id(batch_header.batch_id())
1193            );
1194        }
1195
1196        // Determine if quorum threshold is reached on the batch round.
1197        let is_quorum_threshold_reached = {
1198            let authors = self.storage.get_certificate_authors_for_round(batch_round);
1199            let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1200            committee_lookback.is_quorum_threshold_reached(&authors)
1201        };
1202
1203        // Check if our primary should move to the next round.
1204        // Note: Checking that quorum threshold is reached is important for mitigating a race condition,
1205        // whereby Narwhal requires N-f, however the BFT only requires f+1. Without this check, the primary
1206        // will advance to the next round assuming f+1, not N-f, which can lead to a network stall.
1207        let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1208        // Check if our primary is far behind the peer.
1209        let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1210        // If our primary is far behind the peer, update our committee to the batch round.
1211        if is_behind_schedule || is_peer_far_in_future {
1212            // If the batch round is greater than the current committee round, update the committee.
1213            self.try_increment_to_the_next_round(batch_round).await?;
1214        }
1215
1216        // Ensure the primary has all of the transmissions.
1217        let missing_transmissions = self.fetch_missing_transmissions(batch_header).await.map_err(|e| {
1218            anyhow!("Failed to fetch missing transmissions for round {batch_round} from '{peer_ip}' - {e}")
1219        })?;
1220
1221        Ok(missing_transmissions)
1222    }
1223
1224    /// Fetches any missing transmissions for the specified batch header.
1225    /// If a transmission does not exist, it will be fetched from the specified peer IP.
1226    async fn fetch_missing_transmissions(
1227        &self,
1228        batch_header: &BatchHeader<N>,
1229    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1230        // If the round is <= the GC round, return early.
1231        if batch_header.round() <= self.storage.gc_round() {
1232            return Ok(Default::default());
1233        }
1234
1235        // Ensure this batch ID is new, otherwise return early.
1236        if self.storage.contains_batch(batch_header.batch_id()) {
1237            guard_trace!(self, "Batch for round {} from peer has already been processed", batch_header.round());
1238            return Ok(Default::default());
1239        }
1240
1241        // Retrieve the workers.
1242        let workers = self.workers.clone();
1243
1244        // Initialize a list for the transmissions.
1245        let mut fetch_transmissions = FuturesUnordered::new();
1246
1247        // Retrieve the number of workers.
1248        let num_workers = self.num_workers();
1249        // Iterate through the transmission IDs.
1250        for transmission_id in batch_header.transmission_ids() {
1251            // If the transmission does not exist in storage, proceed to fetch the transmission.
1252            if !self.storage.contains_transmission(*transmission_id) {
1253                // Determine the worker ID.
1254                let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1255                    bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1256                };
1257                // Retrieve the worker.
1258                let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1259                // Push the callback onto the list.
1260                fetch_transmissions.push(worker.get_or_fetch_transmission(*transmission_id));
1261            }
1262        }
1263
1264        // Initialize a set for the transmissions.
1265        let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1266        // Wait for all of the transmissions to be fetched.
1267        while let Some(result) = fetch_transmissions.next().await {
1268            // Retrieve the transmission.
1269            let (transmission_id, transmission) = result?;
1270            // Insert the transmission into the set.
1271            transmissions.insert(transmission_id, transmission);
1272        }
1273        // Return the transmissions.
1274        Ok(transmissions)
1275    }
1276}
1277
1278impl<N: Network> Primary<N> {
1279    /// Spawns a task with the given future; it should only be used for long-running tasks.
1280    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1281        self.handles.lock().push(tokio::spawn(future));
1282    }
1283
1284    /// Shuts down the primary.
1285    pub async fn shut_down(&self) {
1286        guard_info!(self, "Shutting down the primary...");
1287        {
1288            // Abort all primary tasks and clear the handles.
1289            let mut handles = self.handles.lock();
1290            handles.iter().for_each(|handle| handle.abort());
1291            handles.clear();
1292        }
1293
1294        // Save the current proposal cache to disk,
1295        // and clear save_pending ensuring this
1296        // operation is only ran once.
1297        let save_pending = self.save_pending.swap(false, Ordering::AcqRel);
1298        if save_pending {
1299            let proposal_cache = {
1300                let proposal = self.proposed_batch.write().take();
1301                let signed_proposals = self.signed_proposals.read().clone();
1302                let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
1303                let pending_certificates = self.storage.get_pending_certificates();
1304                ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
1305            };
1306
1307            if let Err(err) = proposal_cache.store(&self.storage_mode, self) {
1308                guard_error!(self, "Failed to store the current proposal cache: {err}");
1309            }
1310        }
1311    }
1312}
1313
1314#[cfg(test)]
1315mod tests {
1316    use super::*;
1317    use amareleo_node_bft_ledger_service::MockLedgerService;
1318    use amareleo_node_bft_storage_service::BFTMemoryService;
1319    use snarkvm::{
1320        console::types::Field,
1321        ledger::{
1322            committee::{Committee, MIN_VALIDATOR_STAKE},
1323            ledger_test_helpers::sample_execution_transaction_with_fee,
1324        },
1325        prelude::{Address, Signature},
1326    };
1327
1328    use bytes::Bytes;
1329    use indexmap::IndexSet;
1330    use rand::RngCore;
1331
1332    type CurrentNetwork = snarkvm::prelude::MainnetV0;
1333
1334    fn sample_committee(rng: &mut TestRng) -> (Vec<(SocketAddr, Account<CurrentNetwork>)>, Committee<CurrentNetwork>) {
1335        // Create a committee containing the primary's account.
1336        const COMMITTEE_SIZE: usize = 4;
1337        let mut rng_validator = ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
1338        let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
1339        let mut members = IndexMap::new();
1340
1341        for i in 0..COMMITTEE_SIZE {
1342            let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
1343            let account = Account::new(&mut rng_validator).unwrap();
1344
1345            members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
1346            accounts.push((socket_addr, account));
1347        }
1348
1349        (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
1350    }
1351
1352    // Returns a primary and a list of accounts in the configured committee.
1353    fn primary_with_committee(
1354        account_index: usize,
1355        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1356        committee: Committee<CurrentNetwork>,
1357        height: u32,
1358    ) -> Primary<CurrentNetwork> {
1359        let ledger = Arc::new(MockLedgerService::new_at_height(committee, height));
1360        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10, None);
1361
1362        // Initialize the primary.
1363        let account = accounts[account_index].1.clone();
1364        let mut primary = Primary::new(account, storage, false, StorageMode::Development(0), ledger, None).unwrap();
1365
1366        // Construct a worker instance.
1367        primary.workers = Arc::from([Worker::new(
1368            0, // id
1369            primary.storage.clone(),
1370            primary.ledger.clone(),
1371            primary.proposed_batch.clone(),
1372            None,
1373        )
1374        .unwrap()]);
1375
1376        primary
1377    }
1378
1379    fn primary_without_handlers(
1380        rng: &mut TestRng,
1381    ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
1382        let (accounts, committee) = sample_committee(rng);
1383        let primary = primary_with_committee(
1384            0, // index of primary's account
1385            &accounts,
1386            committee,
1387            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V1).unwrap(),
1388        );
1389
1390        (primary, accounts)
1391    }
1392
1393    // Creates a mock solution.
1394    fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
1395        // Sample a random fake solution ID.
1396        let solution_id = rng.gen::<u64>().into();
1397        // Vary the size of the solutions.
1398        let size = rng.gen_range(1024..10 * 1024);
1399        // Sample random fake solution bytes.
1400        let mut vec = vec![0u8; size];
1401        rng.fill_bytes(&mut vec);
1402        let solution = Data::Buffer(Bytes::from(vec));
1403        // Return the solution ID and solution.
1404        (solution_id, solution)
1405    }
1406
1407    // Samples a test transaction.
1408    fn sample_unconfirmed_transaction(
1409        rng: &mut TestRng,
1410    ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
1411        let transaction = sample_execution_transaction_with_fee(false, rng);
1412        let id = transaction.id();
1413
1414        (id, Data::Object(transaction))
1415    }
1416
1417    // Creates a batch proposal with one solution and one transaction.
1418    fn create_test_proposal(
1419        author: &Account<CurrentNetwork>,
1420        committee: Committee<CurrentNetwork>,
1421        round: u64,
1422        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1423        timestamp: i64,
1424        num_transactions: u64,
1425        rng: &mut TestRng,
1426    ) -> Proposal<CurrentNetwork> {
1427        let mut transmission_ids = IndexSet::new();
1428        let mut transmissions = IndexMap::new();
1429
1430        // Prepare the solution and insert into the sets.
1431        let (solution_id, solution) = sample_unconfirmed_solution(rng);
1432        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1433        let solution_transmission_id = (solution_id, solution_checksum).into();
1434        transmission_ids.insert(solution_transmission_id);
1435        transmissions.insert(solution_transmission_id, Transmission::Solution(solution));
1436
1437        // Prepare the transactions and insert into the sets.
1438        for _ in 0..num_transactions {
1439            let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1440            let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1441            let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1442            transmission_ids.insert(transaction_transmission_id);
1443            transmissions.insert(transaction_transmission_id, Transmission::Transaction(transaction));
1444        }
1445
1446        // Retrieve the private key.
1447        let private_key = author.private_key();
1448        // Sign the batch header.
1449        let batch_header = BatchHeader::new(
1450            private_key,
1451            round,
1452            timestamp,
1453            committee.id(),
1454            transmission_ids,
1455            previous_certificate_ids,
1456            rng,
1457        )
1458        .unwrap();
1459        // Construct the proposal.
1460        Proposal::new(committee, batch_header, transmissions).unwrap()
1461    }
1462
1463    /// Creates a signature of the batch ID for each committee member (excluding the primary).
1464    fn peer_signatures_for_batch(
1465        primary_address: Address<CurrentNetwork>,
1466        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1467        batch_id: Field<CurrentNetwork>,
1468        rng: &mut TestRng,
1469    ) -> IndexSet<Signature<CurrentNetwork>> {
1470        let mut signatures = IndexSet::new();
1471        for (_, account) in accounts {
1472            if account.address() == primary_address {
1473                continue;
1474            }
1475            let signature = account.sign(&[batch_id], rng).unwrap();
1476            signatures.insert(signature);
1477        }
1478        signatures
1479    }
1480
1481    // Creates a batch certificate.
1482    fn create_batch_certificate(
1483        primary_address: Address<CurrentNetwork>,
1484        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1485        round: u64,
1486        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1487        rng: &mut TestRng,
1488    ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
1489        let timestamp = now();
1490
1491        let author =
1492            accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
1493        let private_key = author.private_key();
1494
1495        let committee_id = Field::rand(rng);
1496        let (solution_id, solution) = sample_unconfirmed_solution(rng);
1497        let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1498        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1499        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1500
1501        let solution_transmission_id = (solution_id, solution_checksum).into();
1502        let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1503
1504        let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
1505        let transmissions = [
1506            (solution_transmission_id, Transmission::Solution(solution)),
1507            (transaction_transmission_id, Transmission::Transaction(transaction)),
1508        ]
1509        .into();
1510
1511        let batch_header = BatchHeader::new(
1512            private_key,
1513            round,
1514            timestamp,
1515            committee_id,
1516            transmission_ids,
1517            previous_certificate_ids,
1518            rng,
1519        )
1520        .unwrap();
1521        let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
1522        let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
1523        (certificate, transmissions)
1524    }
1525
1526    // Create a certificate chain up to round in primary storage.
1527    fn store_certificate_chain(
1528        primary: &Primary<CurrentNetwork>,
1529        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1530        round: u64,
1531        rng: &mut TestRng,
1532    ) -> IndexSet<Field<CurrentNetwork>> {
1533        let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1534        let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1535        for cur_round in 1..round {
1536            for (_, account) in accounts.iter() {
1537                let (certificate, transmissions) = create_batch_certificate(
1538                    account.address(),
1539                    accounts,
1540                    cur_round,
1541                    previous_certificates.clone(),
1542                    rng,
1543                );
1544                next_certificates.insert(certificate.id());
1545                assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
1546            }
1547
1548            assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
1549            previous_certificates = next_certificates;
1550            next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1551        }
1552
1553        previous_certificates
1554    }
1555
1556    #[tokio::test]
1557    async fn test_propose_batch() {
1558        let mut rng = TestRng::default();
1559        let (primary, _) = primary_without_handlers(&mut rng);
1560
1561        // Check there is no batch currently proposed.
1562        assert!(primary.proposed_batch.read().is_none());
1563
1564        // Generate a solution and a transaction.
1565        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1566        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1567
1568        // Store it on one of the workers.
1569        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1570        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1571
1572        // Try to propose a batch again. This time, it should succeed.
1573        assert!(primary.propose_batch(false).await.is_ok());
1574
1575        // AlexZ: proposed_batch is no longer written to when processing new batches...
1576        // assert!(primary.proposed_batch.read().is_some());
1577    }
1578
1579    #[tokio::test]
1580    async fn test_propose_batch_with_no_transmissions() {
1581        let mut rng = TestRng::default();
1582        let (primary, _) = primary_without_handlers(&mut rng);
1583
1584        // Check there is no batch currently proposed.
1585        assert!(primary.proposed_batch.read().is_none());
1586
1587        // Try to propose a batch with no transmissions.
1588        assert!(primary.propose_batch(false).await.is_ok());
1589
1590        // AlexZ: proposed_batch is no longer written to when processing new batches...
1591        // assert!(primary.proposed_batch.read().is_some());
1592    }
1593
1594    #[tokio::test]
1595    async fn test_propose_batch_in_round() {
1596        let round = 3;
1597        let mut rng = TestRng::default();
1598        let (primary, accounts) = primary_without_handlers(&mut rng);
1599
1600        // Fill primary storage.
1601        store_certificate_chain(&primary, &accounts, round, &mut rng);
1602
1603        // Sleep for a while to ensure the primary is ready to propose the next round.
1604        tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
1605
1606        // Generate a solution and a transaction.
1607        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1608        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1609
1610        // Store it on one of the workers.
1611        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1612        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1613
1614        // Propose a batch again. This time, it should succeed.
1615        assert!(primary.propose_batch(false).await.is_ok());
1616
1617        // AlexZ: proposed_batch is no longer written to when processing new batches...
1618        // assert!(primary.proposed_batch.read().is_some());
1619    }
1620
1621    #[tokio::test]
1622    async fn test_propose_batch_over_spend_limit() {
1623        let mut rng = TestRng::default();
1624
1625        // Create a primary to test spend limit backwards compatibility with V4.
1626        let (accounts, committee) = sample_committee(&mut rng);
1627        let primary = primary_with_committee(
1628            0,
1629            &accounts,
1630            committee.clone(),
1631            CurrentNetwork::CONSENSUS_HEIGHT(ConsensusVersion::V4).unwrap(),
1632        );
1633
1634        // Check there is no batch currently proposed.
1635        assert!(primary.proposed_batch.read().is_none());
1636        // Check the workers are empty.
1637        primary.workers.iter().for_each(|worker| assert!(worker.transmissions().is_empty()));
1638
1639        // Generate a solution and a transaction.
1640        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1641        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1642
1643        for _i in 0..5 {
1644            let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1645            // Store it on one of the workers.
1646            primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1647        }
1648
1649        // Try to propose a batch again. This time, it should succeed.
1650        assert!(primary.propose_batch(false).await.is_ok());
1651
1652        // AlexZ: proposed_batch is no longer written to when processing new batches...
1653        // Expect 2/5 transactions to be included in the proposal in addition to the solution.
1654        // assert_eq!(primary.proposed_batch.read().as_ref().unwrap().transmissions().len(), 3);
1655
1656        // Check the transmissions were correctly drained from the workers.
1657        assert_eq!(primary.workers.iter().map(|worker| worker.transmissions().len()).sum::<usize>(), 3);
1658    }
1659
1660    #[tokio::test]
1661    async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
1662        let round = 3;
1663        let mut rng = TestRng::default();
1664        let (primary, _) = primary_without_handlers(&mut rng);
1665
1666        // Check there is no batch currently proposed.
1667        assert!(primary.proposed_batch.read().is_none());
1668
1669        // Generate a solution and a transaction.
1670        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1671        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1672
1673        // Store it on one of the workers.
1674        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1675        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1676
1677        // Set the proposal lock to a round ahead of the storage.
1678        let old_proposal_lock_round = *primary.propose_lock.lock().await;
1679        *primary.propose_lock.lock().await = round + 1;
1680
1681        // Propose a batch and enforce that it fails.
1682        assert!(primary.propose_batch(false).await.is_ok());
1683        assert!(primary.proposed_batch.read().is_none());
1684
1685        // Set the proposal lock back to the old round.
1686        *primary.propose_lock.lock().await = old_proposal_lock_round;
1687
1688        // Try to propose a batch again. This time, it should succeed.
1689        assert!(primary.propose_batch(false).await.is_ok());
1690
1691        // AlexZ: proposed_batch is no longer written to when processing new batches...
1692        // assert!(primary.proposed_batch.read().is_some());
1693    }
1694
1695    #[tokio::test]
1696    async fn test_propose_batch_with_storage_round_behind_proposal() {
1697        let round = 5;
1698        let mut rng = TestRng::default();
1699        let (primary, accounts) = primary_without_handlers(&mut rng);
1700
1701        // Generate previous certificates.
1702        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
1703
1704        // Create a valid proposal.
1705        let timestamp = now();
1706        let proposal = create_test_proposal(
1707            &primary.account,
1708            primary.ledger.current_committee().unwrap(),
1709            round + 1,
1710            previous_certificates,
1711            timestamp,
1712            1,
1713            &mut rng,
1714        );
1715
1716        // Store the proposal on the primary.
1717        *primary.proposed_batch.write() = Some(proposal);
1718
1719        // Try to propose a batch will terminate early because the storage is behind the proposal.
1720        assert!(primary.propose_batch(false).await.is_ok());
1721        assert!(primary.proposed_batch.read().is_some());
1722        assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
1723    }
1724}