amareleo_node_bft/
primary.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    DEVELOPMENT_MODE_RNG_SEED,
18    MAX_BATCH_DELAY_IN_MS,
19    MAX_WORKERS,
20    MIN_BATCH_DELAY_IN_SECS,
21    Sync,
22    Worker,
23    helpers::{
24        BFTSender,
25        PrimaryReceiver,
26        PrimarySender,
27        Proposal,
28        ProposalCache,
29        SignedProposals,
30        Storage,
31        assign_to_worker,
32        assign_to_workers,
33        fmt_id,
34        now,
35    },
36    spawn_blocking,
37};
38use amareleo_chain_account::Account;
39use amareleo_chain_tracing::{TracingHandler, TracingHandlerGuard};
40use amareleo_node_bft_ledger_service::LedgerService;
41use amareleo_node_sync::DUMMY_SELF_IP;
42use snarkvm::{
43    console::{prelude::*, types::Address},
44    ledger::{
45        block::Transaction,
46        narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
47        puzzle::{Solution, SolutionID},
48    },
49    prelude::{Signature, committee::Committee},
50};
51
52use aleo_std::StorageMode;
53use colored::Colorize;
54use futures::stream::{FuturesUnordered, StreamExt};
55use indexmap::IndexMap;
56use parking_lot::{Mutex, RwLock};
57
58use rand::SeedableRng;
59use rand_chacha::ChaChaRng;
60use snarkvm::console::account::PrivateKey;
61
62use std::{
63    collections::{HashMap, HashSet},
64    future::Future,
65    net::SocketAddr,
66    sync::{
67        Arc,
68        atomic::{AtomicBool, Ordering},
69    },
70    time::Duration,
71};
72use tokio::{
73    sync::{Mutex as TMutex, OnceCell},
74    task::JoinHandle,
75};
76use tracing::subscriber::DefaultGuard;
77
78/// A helper type for an optional proposed batch.
79pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
80
81#[derive(Clone)]
82pub struct Primary<N: Network> {
83    /// The sync module.
84    sync: Sync<N>,
85    /// Account
86    account: Account<N>,
87    /// The storage.
88    storage: Storage<N>,
89    /// Preserve the chain state on shutdown
90    keep_state: bool,
91    /// Set if primary is initialized and requires saving state on shutdown
92    save_pending: Arc<AtomicBool>,
93    /// The storage mode.
94    storage_mode: StorageMode,
95    /// The ledger service.
96    ledger: Arc<dyn LedgerService<N>>,
97    /// The workers.
98    workers: Arc<[Worker<N>]>,
99    /// The BFT sender.
100    bft_sender: Arc<OnceCell<BFTSender<N>>>,
101    /// The batch proposal, if the primary is currently proposing a batch.
102    proposed_batch: Arc<ProposedBatch<N>>,
103    /// The timestamp of the most recent proposed batch.
104    latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
105    /// The recently-signed batch proposals.
106    signed_proposals: Arc<RwLock<SignedProposals<N>>>,
107    /// The spawned handles.
108    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
109    /// Tracing handle
110    tracing: Option<TracingHandler>,
111    /// The lock for propose_batch.
112    propose_lock: Arc<TMutex<u64>>,
113}
114
115impl<N: Network> TracingHandlerGuard for Primary<N> {
116    /// Retruns tracing guard
117    fn get_tracing_guard(&self) -> Option<DefaultGuard> {
118        self.tracing.as_ref().and_then(|trace_handle| trace_handle.get_tracing_guard())
119    }
120}
121
122impl<N: Network> Primary<N> {
123    /// The maximum number of unconfirmed transmissions to send to the primary.
124    pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
125
126    /// Initializes a new primary instance.
127    pub fn new(
128        account: Account<N>,
129        storage: Storage<N>,
130        keep_state: bool,
131        storage_mode: StorageMode,
132        ledger: Arc<dyn LedgerService<N>>,
133        tracing: Option<TracingHandler>,
134    ) -> Result<Self> {
135        // Initialize the sync module.
136        let sync = Sync::new(storage.clone(), ledger.clone());
137
138        // Initialize the primary instance.
139        Ok(Self {
140            sync,
141            account,
142            storage,
143            keep_state,
144            save_pending: Arc::new(AtomicBool::new(false)),
145            storage_mode,
146            ledger,
147            workers: Arc::from(vec![]),
148            bft_sender: Default::default(),
149            proposed_batch: Default::default(),
150            latest_proposed_batch_timestamp: Default::default(),
151            signed_proposals: Default::default(),
152            handles: Default::default(),
153            tracing,
154            propose_lock: Default::default(),
155        })
156    }
157
158    /// Load the proposal cache file and update the Primary state with the stored data.
159    async fn load_proposal_cache(&self) -> Result<()> {
160        // Fetch the signed proposals from the file system if it exists.
161        match ProposalCache::<N>::exists(&self.storage_mode) {
162            // If the proposal cache exists, then process the proposal cache.
163            true => {
164                match ProposalCache::<N>::load(self.account.address(), &self.storage_mode, self) {
165                    Ok(proposal_cache) => {
166                        // Extract the proposal and signed proposals.
167                        let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
168                            proposal_cache.into();
169
170                        // Write the proposed batch.
171                        *self.proposed_batch.write() = proposed_batch;
172                        // Write the signed proposals.
173                        *self.signed_proposals.write() = signed_proposals;
174                        // Writ the propose lock.
175                        *self.propose_lock.lock().await = latest_certificate_round;
176
177                        // Update the storage with the pending certificates.
178                        for certificate in pending_certificates {
179                            let batch_id = certificate.batch_id();
180                            // We use a dummy IP because the node should not need to request from any peers.
181                            // The storage should have stored all the transmissions. If not, we simply
182                            // skip the certificate.
183                            if let Err(err) =
184                                self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
185                            {
186                                guard_warn!(
187                                    self,
188                                    "Failed to load stored certificate {} from proposal cache - {err}",
189                                    fmt_id(batch_id)
190                                );
191                            }
192                        }
193                        Ok(())
194                    }
195                    Err(err) => {
196                        bail!("Failed to read the signed proposals from the file system - {err}.");
197                    }
198                }
199            }
200            // If the proposal cache does not exist, then return early.
201            false => Ok(()),
202        }
203    }
204
205    /// Run the primary instance.
206    pub async fn run(
207        &mut self,
208        bft_sender: Option<BFTSender<N>>,
209        _primary_sender: PrimarySender<N>,
210        primary_receiver: PrimaryReceiver<N>,
211    ) -> Result<()> {
212        guard_info!(self, "Starting the primary instance of the memory pool...");
213
214        // Set the BFT sender.
215        if let Some(bft_sender) = &bft_sender {
216            // Set the BFT sender in the primary.
217            if self.bft_sender.set(bft_sender.clone()).is_err() {
218                guard_error!(self, "Unexpected: BFT sender already set");
219                bail!("Unexpected: BFT sender already set");
220            }
221        }
222
223        // Construct a map for the workers.
224        let mut workers = Vec::new();
225        // Initialize the workers.
226        for id in 0..MAX_WORKERS {
227            // Construct the worker instance.
228            let worker = Worker::new(
229                id,
230                self.storage.clone(),
231                self.ledger.clone(),
232                self.proposed_batch.clone(),
233                self.tracing.clone(),
234            )?;
235
236            // Add the worker to the list of workers.
237            workers.push(worker);
238        }
239        // Set the workers.
240        self.workers = Arc::from(workers);
241
242        // Next, initialize the sync module and sync the storage from ledger.
243        self.sync.initialize(bft_sender).await?;
244        // Next, load and process the proposal cache before running the sync module.
245        self.load_proposal_cache().await?;
246        // Next, run the sync module.
247        self.sync.run().await?;
248
249        // Lastly, start the primary handlers.
250        // Note: This ensures the primary does not start communicating before syncing is complete.
251        self.start_handlers(primary_receiver);
252
253        // Once everything is initialized. Enable saving of state on shutdown.
254        if self.keep_state {
255            self.save_pending.store(true, Ordering::Relaxed);
256        }
257
258        Ok(())
259    }
260
261    /// Returns the current round.
262    pub fn current_round(&self) -> u64 {
263        self.storage.current_round()
264    }
265
266    /// Returns `true` if the primary is synced.
267    pub fn is_synced(&self) -> bool {
268        self.sync.is_synced()
269    }
270
271    /// Returns the account of the node.
272    pub const fn account(&self) -> &Account<N> {
273        &self.account
274    }
275
276    /// Returns the storage.
277    pub const fn storage(&self) -> &Storage<N> {
278        &self.storage
279    }
280
281    /// Returns the ledger.
282    pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
283        &self.ledger
284    }
285
286    /// Returns the number of workers.
287    pub fn num_workers(&self) -> u8 {
288        u8::try_from(self.workers.len()).expect("Too many workers")
289    }
290}
291
292impl<N: Network> Primary<N> {
293    /// Returns the number of unconfirmed transmissions.
294    pub fn num_unconfirmed_transmissions(&self) -> usize {
295        self.workers.iter().map(|worker| worker.num_transmissions()).sum()
296    }
297
298    /// Returns the number of unconfirmed ratifications.
299    pub fn num_unconfirmed_ratifications(&self) -> usize {
300        self.workers.iter().map(|worker| worker.num_ratifications()).sum()
301    }
302
303    /// Returns the number of solutions.
304    pub fn num_unconfirmed_solutions(&self) -> usize {
305        self.workers.iter().map(|worker| worker.num_solutions()).sum()
306    }
307
308    /// Returns the number of unconfirmed transactions.
309    pub fn num_unconfirmed_transactions(&self) -> usize {
310        self.workers.iter().map(|worker| worker.num_transactions()).sum()
311    }
312}
313
314impl<N: Network> Primary<N> {
315    /// Returns the worker transmission IDs.
316    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
317        self.workers.iter().flat_map(|worker| worker.transmission_ids())
318    }
319
320    /// Returns the worker transmissions.
321    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
322        self.workers.iter().flat_map(|worker| worker.transmissions())
323    }
324
325    /// Returns the worker solutions.
326    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
327        self.workers.iter().flat_map(|worker| worker.solutions())
328    }
329
330    /// Returns the worker transactions.
331    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
332        self.workers.iter().flat_map(|worker| worker.transactions())
333    }
334}
335
336impl<N: Network> Primary<N> {
337    /// Clears the worker solutions.
338    pub fn clear_worker_solutions(&self) {
339        self.workers.iter().for_each(Worker::clear_solutions);
340    }
341}
342
343impl<N: Network> Primary<N> {
344    pub async fn propose_batch(&self) -> Result<()> {
345        let mut rng = ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
346        let mut all_acc: Vec<Account<N>> = Vec::new();
347        for _ in 0u64..4u64 {
348            let private_key = PrivateKey::<N>::new(&mut rng)?;
349            let acc = Account::<N>::try_from(private_key)?;
350            all_acc.push(acc);
351        }
352
353        // Submit proposal for validator with id 0
354        let other_acc: Vec<&Account<N>> = all_acc.iter().skip(1).collect();
355        let round = self.propose_batch_lite(&other_acc).await?;
356        if round == 0u64 {
357            return Ok(());
358        }
359
360        // Submit empty proposals for other validators
361        for vid in 1u64..4u64 {
362            let primary_acc = &all_acc[vid as usize];
363            let other_acc: Vec<&Account<N>> =
364                all_acc.iter().filter(|acc| acc.address() != primary_acc.address()).collect();
365
366            self.fake_proposal(vid, primary_acc, &other_acc, round).await?;
367        }
368        Ok(())
369    }
370
371    pub async fn propose_batch_lite(&self, other_acc: &[&Account<N>]) -> Result<u64> {
372        // This function isn't re-entrant.
373        let mut lock_guard = self.propose_lock.lock().await;
374
375        // Retrieve the current round.
376        let round = self.current_round();
377        // Compute the previous round.
378        let previous_round = round.saturating_sub(1);
379
380        // If the current round is 0, return early.
381        ensure!(round > 0, "Round 0 cannot have transaction batches");
382
383        // If the current storage round is below the latest proposal round, then return early.
384        if round < *lock_guard {
385            guard_warn!(
386                self,
387                "Cannot propose a batch for round {round} - the latest proposal cache round is {}",
388                *lock_guard
389            );
390            return Ok(0u64);
391        }
392
393        #[cfg(feature = "metrics")]
394        metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
395
396        // Ensure that the primary does not create a new proposal too quickly.
397        if let Err(e) = self.check_proposal_timestamp(previous_round, self.account.address(), now()) {
398            guard_debug!(
399                self,
400                "Primary is safely skipping a batch proposal for round {round} - {}",
401                format!("{e}").dimmed()
402            );
403            return Ok(0u64);
404        }
405
406        // Ensure the primary has not proposed a batch for this round before.
407        if self.storage.contains_certificate_in_round_from(round, self.account.address()) {
408            // If a BFT sender was provided, attempt to advance the current round.
409            if let Some(bft_sender) = self.bft_sender.get() {
410                match bft_sender.send_primary_round_to_bft(self.current_round()).await {
411                    // 'is_ready' is true if the primary is ready to propose a batch for the next round.
412                    Ok(true) => (), // continue,
413                    // 'is_ready' is false if the primary is not ready to propose a batch for the next round.
414                    Ok(false) => return Ok(0u64),
415                    // An error occurred while attempting to advance the current round.
416                    Err(e) => {
417                        guard_warn!(self, "Failed to update the BFT to the next round - {e}");
418                        return Err(e);
419                    }
420                }
421            }
422            guard_debug!(
423                self,
424                "Primary is safely skipping {}",
425                format!("(round {round} was already certified)").dimmed()
426            );
427            return Ok(0u64);
428        }
429
430        // Determine if the current round has been proposed.
431        // Note: Do NOT make this judgment in advance before rebroadcast and round update. Rebroadcasting is
432        // good for network reliability and should not be prevented for the already existing proposed_batch.
433        // If a certificate already exists for the current round, an attempt should be made to advance the
434        // round as early as possible.
435        if round == *lock_guard {
436            guard_warn!(self, "Primary is safely skipping a batch proposal - round {round} already proposed");
437            return Ok(0u64);
438        }
439
440        // Retrieve the committee to check against.
441        let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
442        // Check if the primary is connected to enough validators to reach quorum threshold.
443        {
444            // Retrieve the connected validator addresses.
445            let mut connected_validators: HashSet<Address<N>> = other_acc.iter().map(|acc| acc.address()).collect();
446
447            // Append the primary to the set.
448            connected_validators.insert(self.account.address());
449
450            // If quorum threshold is not reached, return early.
451            if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
452                guard_debug!(
453                    self,
454                    "Primary is safely skipping a batch proposal for round {round} {}",
455                    "(please connect to more validators)".dimmed()
456                );
457                guard_trace!(self, "Primary is connected to {} validators", connected_validators.len() - 1);
458                return Ok(0u64);
459            }
460        }
461
462        // Retrieve the previous certificates.
463        let previous_certificates = self.storage.get_certificates_for_round(previous_round);
464
465        // Check if the batch is ready to be proposed.
466        // Note: The primary starts at round 1, and round 0 contains no certificates, by definition.
467        let mut is_ready = previous_round == 0;
468        // If the previous round is not 0, check if the previous certificates have reached the quorum threshold.
469        if previous_round > 0 {
470            // Retrieve the committee lookback for the round.
471            let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
472                bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
473            };
474            // Construct a set over the authors.
475            let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
476            // Check if the previous certificates have reached the quorum threshold.
477            if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
478                is_ready = true;
479            }
480        }
481        // If the batch is not ready to be proposed, return early.
482        if !is_ready {
483            guard_debug!(
484                self,
485                "Primary is safely skipping a batch proposal for round {round} {}",
486                format!("(previous round {previous_round} has not reached quorum)").dimmed()
487            );
488            return Ok(0u64);
489        }
490
491        // Determined the required number of transmissions per worker.
492        let num_transmissions_per_worker = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / self.num_workers() as usize;
493        // Initialize the map of transmissions.
494        let mut transmissions: IndexMap<_, _> = Default::default();
495        // Take the transmissions from the workers.
496        for worker in self.workers.iter() {
497            // Initialize a tracker for included transmissions for the current worker.
498            let mut num_transmissions_included_for_worker = 0;
499            // Keep draining the worker until the desired number of transmissions is reached or the worker is empty.
500            'outer: while num_transmissions_included_for_worker < num_transmissions_per_worker {
501                // Determine the number of remaining transmissions for the worker.
502                let num_remaining_transmissions =
503                    num_transmissions_per_worker.saturating_sub(num_transmissions_included_for_worker);
504                // Drain the worker.
505                let mut worker_transmissions = worker.drain(num_remaining_transmissions).peekable();
506                // If the worker is empty, break early.
507                if worker_transmissions.peek().is_none() {
508                    break 'outer;
509                }
510                // Iterate through the worker transmissions.
511                'inner: for (id, transmission) in worker_transmissions {
512                    // Check if the ledger already contains the transmission.
513                    if self.ledger.contains_transmission(&id).unwrap_or(true) {
514                        guard_trace!(self, "Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
515                        continue 'inner;
516                    }
517                    // Check if the storage already contain the transmission.
518                    // Note: We do not skip if this is the first transmission in the proposal, to ensure that
519                    // the primary does not propose a batch with no transmissions.
520                    if !transmissions.is_empty() && self.storage.contains_transmission(id) {
521                        guard_trace!(self, "Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
522                        continue 'inner;
523                    }
524                    // Check the transmission is still valid.
525                    match (id, transmission.clone()) {
526                        (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
527                            // Ensure the checksum matches.
528                            match solution.to_checksum::<N>() {
529                                Ok(solution_checksum) if solution_checksum == checksum => (),
530                                _ => {
531                                    guard_trace!(
532                                        self,
533                                        "Proposing - Skipping solution '{}' - Checksum mismatch",
534                                        fmt_id(solution_id)
535                                    );
536                                    continue 'inner;
537                                }
538                            }
539                            // Check if the solution is still valid.
540                            if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
541                                guard_trace!(self, "Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
542                                continue 'inner;
543                            }
544                        }
545                        (
546                            TransmissionID::Transaction(transaction_id, checksum),
547                            Transmission::Transaction(transaction),
548                        ) => {
549                            // Ensure the checksum matches.
550                            match transaction.to_checksum::<N>() {
551                                Ok(transaction_checksum) if transaction_checksum == checksum => (),
552                                _ => {
553                                    guard_trace!(
554                                        self,
555                                        "Proposing - Skipping transaction '{}' - Checksum mismatch",
556                                        fmt_id(transaction_id)
557                                    );
558                                    continue 'inner;
559                                }
560                            }
561                            // Check if the transaction is still valid.
562                            if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
563                                guard_trace!(
564                                    self,
565                                    "Proposing - Skipping transaction '{}' - {e}",
566                                    fmt_id(transaction_id)
567                                );
568                                continue 'inner;
569                            }
570                        }
571                        // Note: We explicitly forbid including ratifications,
572                        // as the protocol currently does not support ratifications.
573                        (TransmissionID::Ratification, Transmission::Ratification) => continue,
574                        // All other combinations are clearly invalid.
575                        _ => continue 'inner,
576                    }
577                    // Insert the transmission into the map.
578                    transmissions.insert(id, transmission);
579                    num_transmissions_included_for_worker += 1;
580                }
581            }
582        }
583
584        // Determine the current timestamp.
585        let current_timestamp = now();
586
587        *lock_guard = round;
588
589        /* Proceeding to sign & propose the batch. */
590        guard_info!(self, "Proposing a batch with {} transmissions for round {round}...", transmissions.len());
591
592        // Retrieve the private key.
593        let private_key = *self.account.private_key();
594        // Retrieve the committee ID.
595        let committee_id = committee_lookback.id();
596        // Prepare the transmission IDs.
597        let transmission_ids = transmissions.keys().copied().collect();
598        // Prepare the previous batch certificate IDs.
599        let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
600        // Sign the batch header and construct the proposal.
601        let (batch_header, mut proposal) = spawn_blocking!(BatchHeader::new(
602            &private_key,
603            round,
604            current_timestamp,
605            committee_id,
606            transmission_ids,
607            previous_certificate_ids,
608            &mut rand::thread_rng()
609        ))
610        .and_then(|batch_header| {
611            Proposal::new(committee_lookback.clone(), batch_header.clone(), transmissions.clone())
612                .map(|proposal| (batch_header, proposal))
613        })
614        .inspect_err(|_| {
615            // On error, reinsert the transmissions and then propagate the error.
616            if let Err(e) = self.reinsert_transmissions_into_workers(transmissions) {
617                guard_error!(self, "Failed to reinsert transmissions: {e:?}");
618            }
619        })?;
620        // Set the timestamp of the latest proposed batch.
621        *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
622
623        // // Set the proposed batch.
624        // *self.proposed_batch.write() = Some(proposal);
625
626        //===============================================================================
627        // Processing proposal
628
629        guard_info!(self, "Quorum threshold reached - Preparing to certify our batch for round {round}...");
630
631        // Retrieve the batch ID.
632        let batch_id = batch_header.batch_id();
633
634        // Forge signatures of other validators.
635        for acc in other_acc.iter() {
636            // Sign the batch ID.
637            let signer_acc = (*acc).clone();
638            let signer = signer_acc.address();
639            let signature = spawn_blocking!(signer_acc.sign(&[batch_id], &mut rand::thread_rng()))?;
640
641            // Add the signature to the batch.
642            proposal.add_signature(signer, signature, &committee_lookback)?;
643        }
644
645        // Store the certified batch and broadcast it to all validators.
646        // If there was an error storing the certificate, reinsert the transmissions back into the ready queue.
647        if let Err(e) = self.store_and_broadcast_certificate_lite(&proposal, &committee_lookback).await {
648            // Reinsert the transmissions back into the ready queue for the next proposal.
649            self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
650            return Err(e);
651        }
652
653        #[cfg(feature = "metrics")]
654        metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
655        Ok(round)
656    }
657
658    pub async fn fake_proposal(
659        &self,
660        vid: u64,
661        primary_acc: &Account<N>,
662        other_acc: &[&Account<N>],
663        round: u64,
664    ) -> Result<()> {
665        let transmissions: IndexMap<_, _> = Default::default();
666        let transmission_ids = transmissions.keys().copied().collect();
667
668        let private_key = *primary_acc.private_key();
669        let current_timestamp = now();
670
671        let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
672        let committee_id = committee_lookback.id();
673
674        let previous_round = round.saturating_sub(1);
675        let previous_certificates = self.storage.get_certificates_for_round(previous_round);
676        let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
677
678        let (batch_header, mut proposal) = spawn_blocking!(BatchHeader::new(
679            &private_key,
680            round,
681            current_timestamp,
682            committee_id,
683            transmission_ids,
684            previous_certificate_ids,
685            &mut rand::thread_rng()
686        ))
687        .and_then(|batch_header| {
688            Proposal::new(committee_lookback.clone(), batch_header.clone(), transmissions.clone())
689                .map(|proposal| (batch_header, proposal))
690        })?;
691
692        // Retrieve the batch ID.
693        let batch_id = batch_header.batch_id();
694        let mut our_sign: Option<Signature<N>> = None;
695
696        // Forge signatures of other validators.
697        for acc in other_acc.iter() {
698            // Sign the batch ID.
699            let signer_acc = (*acc).clone();
700            let signer = signer_acc.address();
701            let signature = spawn_blocking!(signer_acc.sign(&[batch_id], &mut rand::thread_rng()))?;
702
703            if signer == self.account.address() {
704                our_sign = Some(signature);
705            }
706
707            // Add the signature to the batch.
708            proposal.add_signature(signer, signature, &committee_lookback)?;
709        }
710
711        // Ensure our signature was not inserted (validator 0 signature)
712        let our_sign = match our_sign {
713            Some(sign) => sign,
714            None => bail!("Fake Proposal generation failed. Validator 0 signature missing."),
715        };
716
717        // Create the batch certificate and transmissions.
718        let (certificate, transmissions) =
719            tokio::task::block_in_place(|| proposal.to_certificate(&committee_lookback))?;
720
721        // Convert the transmissions into a HashMap.
722        // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
723        let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
724
725        // Store the certified batch.
726        let (storage, certificate_) = (self.storage.clone(), certificate.clone());
727        spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
728        guard_info!(self, "Stored a batch certificate for validator/round {vid}/{round}");
729
730        match self.signed_proposals.write().0.entry(primary_acc.address()) {
731            std::collections::hash_map::Entry::Occupied(mut entry) => {
732                // If the validator has already signed a batch for this round, then return early,
733                // since, if the peer still has not received the signature, they will request it again,
734                // and the logic at the start of this function will resend the (now cached) signature
735                // to the peer if asked to sign this batch proposal again.
736                if entry.get().0 == round {
737                    return Ok(());
738                }
739                // Otherwise, cache the round, batch ID, and signature for this validator.
740                entry.insert((round, batch_id, our_sign));
741                guard_info!(self, "Inserted signature to signed_proposals {vid}/{round}");
742            }
743            // If the validator has not signed a batch before, then continue.
744            std::collections::hash_map::Entry::Vacant(entry) => {
745                // Cache the round, batch ID, and signature for this validator.
746                entry.insert((round, batch_id, our_sign));
747                guard_info!(self, "Inserted signature to signed_proposals {vid}/{round}");
748            }
749        };
750
751        if let Some(bft_sender) = self.bft_sender.get() {
752            // Send the certificate to the BFT.
753            if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
754                guard_warn!(self, "Failed to update the BFT DAG from sync: {e}");
755                return Err(e);
756            };
757        }
758
759        Ok(())
760    }
761}
762
763impl<N: Network> Primary<N> {
764    /// Starts the primary handlers.
765    fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
766        let PrimaryReceiver { mut rx_unconfirmed_solution, mut rx_unconfirmed_transaction } = primary_receiver;
767
768        // Start the batch proposer.
769        let self_ = self.clone();
770        self.spawn(async move {
771            loop {
772                // Sleep briefly, but longer than if there were no batch.
773                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
774                let current_round = self_.current_round();
775                // If the primary is not synced, then do not propose a batch.
776                if !self_.is_synced() {
777                    guard_debug!(
778                        self_,
779                        "Skipping batch proposal for round {current_round} {}",
780                        "(node is syncing)".dimmed()
781                    );
782                    continue;
783                }
784                // A best-effort attempt to skip the scheduled batch proposal if
785                // round progression already triggered one.
786                if self_.propose_lock.try_lock().is_err() {
787                    guard_trace!(
788                        self_,
789                        "Skipping batch proposal for round {current_round} {}",
790                        "(node is already proposing)".dimmed()
791                    );
792                    continue;
793                };
794                // If there is no proposed batch, attempt to propose a batch.
795                // Note: Do NOT spawn a task around this function call. Proposing a batch is a critical path,
796                // and only one batch needs be proposed at a time.
797                if let Err(e) = self_.propose_batch().await {
798                    guard_warn!(self_, "Cannot propose a batch - {e}");
799                }
800            }
801        });
802
803        // Periodically try to increment to the next round.
804        // Note: This is necessary to ensure that the primary is not stuck on a previous round
805        // despite having received enough certificates to advance to the next round.
806        let self_ = self.clone();
807        self.spawn(async move {
808            loop {
809                // Sleep briefly.
810                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
811                // If the primary is not synced, then do not increment to the next round.
812                if !self_.is_synced() {
813                    guard_trace!(self_, "Skipping round increment {}", "(node is syncing)".dimmed());
814                    continue;
815                }
816                // Attempt to increment to the next round.
817                let next_round = self_.current_round().saturating_add(1);
818                // Determine if the quorum threshold is reached for the current round.
819                let is_quorum_threshold_reached = {
820                    // Retrieve the certificate authors for the next round.
821                    let authors = self_.storage.get_certificate_authors_for_round(next_round);
822                    // If there are no certificates, then skip this check.
823                    if authors.is_empty() {
824                        continue;
825                    }
826                    let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(next_round) else {
827                        guard_warn!(self_, "Failed to retrieve the committee lookback for round {next_round}");
828                        continue;
829                    };
830                    committee_lookback.is_quorum_threshold_reached(&authors)
831                };
832                // Attempt to increment to the next round if the quorum threshold is reached.
833                if is_quorum_threshold_reached {
834                    guard_debug!(self_, "Quorum threshold reached for round {}", next_round);
835                    if let Err(e) = self_.try_increment_to_the_next_round(next_round).await {
836                        guard_warn!(self_, "Failed to increment to the next round - {e}");
837                    }
838                }
839            }
840        });
841
842        // Process the unconfirmed solutions.
843        let self_ = self.clone();
844        self.spawn(async move {
845            while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
846                // Compute the checksum for the solution.
847                let Ok(checksum) = solution.to_checksum::<N>() else {
848                    guard_error!(self_, "Failed to compute the checksum for the unconfirmed solution");
849                    continue;
850                };
851                // Compute the worker ID.
852                let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
853                    guard_error!(self_, "Unable to determine the worker ID for the unconfirmed solution");
854                    continue;
855                };
856
857                // Retrieve the worker.
858                let worker = &self_.workers[worker_id as usize];
859                // Process the unconfirmed solution.
860                let result = worker.process_unconfirmed_solution(solution_id, solution).await;
861                // Send the result to the callback.
862                callback.send(result).ok();
863            }
864        });
865
866        // Process the unconfirmed transactions.
867        let self_ = self.clone();
868        self.spawn(async move {
869            while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
870                guard_trace!(self_, "Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
871                // Compute the checksum for the transaction.
872                let Ok(checksum) = transaction.to_checksum::<N>() else {
873                    guard_error!(self_, "Failed to compute the checksum for the unconfirmed transaction");
874                    continue;
875                };
876                // Compute the worker ID.
877                let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
878                    guard_error!(self_, "Unable to determine the worker ID for the unconfirmed transaction");
879                    continue;
880                };
881
882                // Retrieve the worker.
883                let worker = &self_.workers[worker_id as usize];
884                // Process the unconfirmed transaction.
885                let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
886                // Send the result to the callback.
887                callback.send(result).ok();
888            }
889        });
890    }
891
892    /// Increments to the next round.
893    async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
894        // If the next round is within GC range, then iterate to the penultimate round.
895        if self.current_round() + self.storage.max_gc_rounds() >= next_round {
896            let mut fast_forward_round = self.current_round();
897            // Iterate until the penultimate round is reached.
898            while fast_forward_round < next_round.saturating_sub(1) {
899                // Update to the next round in storage.
900                fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
901                // Clear the proposed batch.
902                *self.proposed_batch.write() = None;
903            }
904        }
905
906        // Retrieve the current round.
907        let current_round = self.current_round();
908        // Attempt to advance to the next round.
909        if current_round < next_round {
910            // If a BFT sender was provided, send the current round to the BFT.
911            let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
912                match bft_sender.send_primary_round_to_bft(current_round).await {
913                    Ok(is_ready) => is_ready,
914                    Err(e) => {
915                        guard_warn!(self, "Failed to update the BFT to the next round - {e}");
916                        return Err(e);
917                    }
918                }
919            }
920            // Otherwise, handle the Narwhal case.
921            else {
922                // Update to the next round in storage.
923                self.storage.increment_to_next_round(current_round)?;
924                // Set 'is_ready' to 'true'.
925                true
926            };
927
928            // Log whether the next round is ready.
929            match is_ready {
930                true => guard_debug!(self, "Primary is ready to propose the next round"),
931                false => guard_debug!(self, "Primary is not ready to propose the next round"),
932            }
933
934            // If the node is ready, propose a batch for the next round.
935            if is_ready {
936                self.propose_batch().await?;
937            }
938        }
939        Ok(())
940    }
941
942    /// Increments to the next round.
943    async fn try_increment_to_the_next_round_lite(&self, next_round: u64) -> Result<()> {
944        // If the next round is within GC range, then iterate to the penultimate round.
945        if self.current_round() + self.storage.max_gc_rounds() >= next_round {
946            let mut fast_forward_round = self.current_round();
947            // Iterate until the penultimate round is reached.
948            while fast_forward_round < next_round.saturating_sub(1) {
949                // Update to the next round in storage.
950                fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
951                // Clear the proposed batch.
952                *self.proposed_batch.write() = None;
953            }
954        }
955
956        // Retrieve the current round.
957        let current_round = self.current_round();
958        // Attempt to advance to the next round.
959        if current_round < next_round {
960            // If a BFT sender was provided, send the current round to the BFT.
961            let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
962                match bft_sender.send_primary_round_to_bft(current_round).await {
963                    Ok(is_ready) => is_ready,
964                    Err(e) => {
965                        guard_warn!(self, "Failed to update the BFT to the next round - {e}");
966                        return Err(e);
967                    }
968                }
969            }
970            // Otherwise, handle the Narwhal case.
971            else {
972                // Update to the next round in storage.
973                self.storage.increment_to_next_round(current_round)?;
974                // Set 'is_ready' to 'true'.
975                true
976            };
977
978            // Log whether the next round is ready.
979            match is_ready {
980                true => guard_debug!(self, "Primary is ready to propose the next round"),
981                false => guard_debug!(self, "Primary is not ready to propose the next round"),
982            }
983
984            // // If the node is ready, propose a batch for the next round.
985            // if is_ready {
986            //     self.propose_batch().await?;
987            // }
988        }
989        Ok(())
990    }
991
992    /// Ensure the primary is not creating batch proposals too frequently.
993    /// This checks that the certificate timestamp for the previous round is within the expected range.
994    fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
995        // Retrieve the timestamp of the previous timestamp to check against.
996        let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
997            // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
998            Some(certificate) => certificate.timestamp(),
999            None => *self.latest_proposed_batch_timestamp.read(),
1000        };
1001
1002        // Determine the elapsed time since the previous timestamp.
1003        let elapsed = timestamp
1004            .checked_sub(previous_timestamp)
1005            .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1006        // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
1007        match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1008            true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1009            false => Ok(()),
1010        }
1011    }
1012
1013    /// Stores the certified batch and broadcasts it to all validators, returning the certificate.
1014    async fn store_and_broadcast_certificate_lite(
1015        &self,
1016        proposal: &Proposal<N>,
1017        committee: &Committee<N>,
1018    ) -> Result<()> {
1019        // Create the batch certificate and transmissions.
1020        let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1021        // Convert the transmissions into a HashMap.
1022        // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
1023        let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1024        // Store the certified batch.
1025        let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1026        spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1027        guard_debug!(self, "Stored a batch certificate for round {}", certificate.round());
1028        // If a BFT sender was provided, send the certificate to the BFT.
1029        if let Some(bft_sender) = self.bft_sender.get() {
1030            // Await the callback to continue.
1031            if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1032                guard_warn!(self, "Failed to update the BFT DAG from primary - {e}");
1033                return Err(e);
1034            };
1035        }
1036        // Log the certified batch.
1037        let num_transmissions = certificate.transmission_ids().len();
1038        let round = certificate.round();
1039        guard_info!(self, "\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1040        // Increment to the next round.
1041        self.try_increment_to_the_next_round_lite(round + 1).await
1042    }
1043
1044    /// Stores the certified batch and broadcasts it to all validators, returning the certificate.
1045    /// Re-inserts the transmissions from the proposal into the workers.
1046    fn reinsert_transmissions_into_workers(
1047        &self,
1048        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1049    ) -> Result<()> {
1050        // Re-insert the transmissions into the workers.
1051        assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1052            worker.reinsert(transmission_id, transmission);
1053        })
1054    }
1055
1056    /// Recursively stores a given batch certificate, after ensuring:
1057    ///   - Ensure the round matches the committee round.
1058    ///   - Ensure the address is a member of the committee.
1059    ///   - Ensure the timestamp is within range.
1060    ///   - Ensure we have all of the transmissions.
1061    ///   - Ensure we have all of the previous certificates.
1062    ///   - Ensure the previous certificates are for the previous round (i.e. round - 1).
1063    ///   - Ensure the previous certificates have reached the quorum threshold.
1064    ///   - Ensure we have not already signed the batch ID.
1065    #[async_recursion::async_recursion]
1066    async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1067        &self,
1068        peer_ip: SocketAddr,
1069        certificate: BatchCertificate<N>,
1070    ) -> Result<()> {
1071        // Retrieve the batch header.
1072        let batch_header = certificate.batch_header();
1073        // Retrieve the batch round.
1074        let batch_round = batch_header.round();
1075
1076        // If the certificate round is outdated, do not store it.
1077        if batch_round <= self.storage.gc_round() {
1078            return Ok(());
1079        }
1080        // If the certificate already exists in storage, return early.
1081        if self.storage.contains_certificate(certificate.id()) {
1082            return Ok(());
1083        }
1084
1085        // If node is not in sync mode and the node is not synced. Then return an error.
1086        if !IS_SYNCING && !self.is_synced() {
1087            bail!(
1088                "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1089                fmt_id(certificate.id())
1090            );
1091        }
1092
1093        // If the peer is ahead, use the batch header to sync up to the peer.
1094        let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?;
1095
1096        // Check if the certificate needs to be stored.
1097        if !self.storage.contains_certificate(certificate.id()) {
1098            // Store the batch certificate.
1099            let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1100            spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1101            guard_debug!(self, "Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1102            // If a BFT sender was provided, send the round and certificate to the BFT.
1103            if let Some(bft_sender) = self.bft_sender.get() {
1104                // Send the certificate to the BFT.
1105                if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1106                    guard_warn!(self, "Failed to update the BFT DAG from sync: {e}");
1107                    return Err(e);
1108                };
1109            }
1110        }
1111        Ok(())
1112    }
1113
1114    /// Recursively syncs using the given batch header.
1115    async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>(
1116        &self,
1117        peer_ip: SocketAddr,
1118        batch_header: &BatchHeader<N>,
1119    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1120        // Retrieve the batch round.
1121        let batch_round = batch_header.round();
1122
1123        // If the certificate round is outdated, do not store it.
1124        if batch_round <= self.storage.gc_round() {
1125            bail!("Round {batch_round} is too far in the past")
1126        }
1127
1128        // If node is not in sync mode and the node is not synced. Then return an error.
1129        if !IS_SYNCING && !self.is_synced() {
1130            bail!(
1131                "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1132                fmt_id(batch_header.batch_id())
1133            );
1134        }
1135
1136        // Determine if quorum threshold is reached on the batch round.
1137        let is_quorum_threshold_reached = {
1138            let authors = self.storage.get_certificate_authors_for_round(batch_round);
1139            let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1140            committee_lookback.is_quorum_threshold_reached(&authors)
1141        };
1142
1143        // Check if our primary should move to the next round.
1144        // Note: Checking that quorum threshold is reached is important for mitigating a race condition,
1145        // whereby Narwhal requires N-f, however the BFT only requires f+1. Without this check, the primary
1146        // will advance to the next round assuming f+1, not N-f, which can lead to a network stall.
1147        let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1148        // Check if our primary is far behind the peer.
1149        let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1150        // If our primary is far behind the peer, update our committee to the batch round.
1151        if is_behind_schedule || is_peer_far_in_future {
1152            // If the batch round is greater than the current committee round, update the committee.
1153            self.try_increment_to_the_next_round(batch_round).await?;
1154        }
1155
1156        // Ensure the primary has all of the transmissions.
1157        let missing_transmissions = self.fetch_missing_transmissions(batch_header).await.map_err(|e| {
1158            anyhow!("Failed to fetch missing transmissions for round {batch_round} from '{peer_ip}' - {e}")
1159        })?;
1160
1161        Ok(missing_transmissions)
1162    }
1163
1164    /// Fetches any missing transmissions for the specified batch header.
1165    /// If a transmission does not exist, it will be fetched from the specified peer IP.
1166    async fn fetch_missing_transmissions(
1167        &self,
1168        batch_header: &BatchHeader<N>,
1169    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1170        // If the round is <= the GC round, return early.
1171        if batch_header.round() <= self.storage.gc_round() {
1172            return Ok(Default::default());
1173        }
1174
1175        // Ensure this batch ID is new, otherwise return early.
1176        if self.storage.contains_batch(batch_header.batch_id()) {
1177            guard_trace!(self, "Batch for round {} from peer has already been processed", batch_header.round());
1178            return Ok(Default::default());
1179        }
1180
1181        // Retrieve the workers.
1182        let workers = self.workers.clone();
1183
1184        // Initialize a list for the transmissions.
1185        let mut fetch_transmissions = FuturesUnordered::new();
1186
1187        // Retrieve the number of workers.
1188        let num_workers = self.num_workers();
1189        // Iterate through the transmission IDs.
1190        for transmission_id in batch_header.transmission_ids() {
1191            // If the transmission does not exist in storage, proceed to fetch the transmission.
1192            if !self.storage.contains_transmission(*transmission_id) {
1193                // Determine the worker ID.
1194                let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1195                    bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1196                };
1197                // Retrieve the worker.
1198                let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1199                // Push the callback onto the list.
1200                fetch_transmissions.push(worker.get_or_fetch_transmission(*transmission_id));
1201            }
1202        }
1203
1204        // Initialize a set for the transmissions.
1205        let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1206        // Wait for all of the transmissions to be fetched.
1207        while let Some(result) = fetch_transmissions.next().await {
1208            // Retrieve the transmission.
1209            let (transmission_id, transmission) = result?;
1210            // Insert the transmission into the set.
1211            transmissions.insert(transmission_id, transmission);
1212        }
1213        // Return the transmissions.
1214        Ok(transmissions)
1215    }
1216}
1217
1218impl<N: Network> Primary<N> {
1219    /// Spawns a task with the given future; it should only be used for long-running tasks.
1220    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1221        self.handles.lock().push(tokio::spawn(future));
1222    }
1223
1224    /// Shuts down the primary.
1225    pub async fn shut_down(&self) {
1226        guard_info!(self, "Shutting down the primary...");
1227        {
1228            // Abort all primary tasks and clear the handles.
1229            let mut handles = self.handles.lock();
1230            handles.iter().for_each(|handle| handle.abort());
1231            handles.clear();
1232        }
1233
1234        // Save the current proposal cache to disk,
1235        // and clear save_pending ensuring this
1236        // operation is only ran once.
1237        let save_pending = self.save_pending.swap(false, Ordering::AcqRel);
1238        if save_pending {
1239            let proposal_cache = {
1240                let proposal = self.proposed_batch.write().take();
1241                let signed_proposals = self.signed_proposals.read().clone();
1242                let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
1243                let pending_certificates = self.storage.get_pending_certificates();
1244                ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
1245            };
1246
1247            if let Err(err) = proposal_cache.store(&self.storage_mode, self) {
1248                guard_error!(self, "Failed to store the current proposal cache: {err}");
1249            }
1250        }
1251    }
1252}
1253
1254#[cfg(test)]
1255mod tests {
1256    use super::*;
1257    use amareleo_node_bft_ledger_service::MockLedgerService;
1258    use amareleo_node_bft_storage_service::BFTMemoryService;
1259    use snarkvm::{
1260        console::types::Field,
1261        ledger::committee::{Committee, MIN_VALIDATOR_STAKE},
1262        prelude::{Address, Signature},
1263    };
1264
1265    use bytes::Bytes;
1266    use indexmap::IndexSet;
1267    use rand::RngCore;
1268
1269    type CurrentNetwork = snarkvm::prelude::MainnetV0;
1270
1271    // Returns a primary and a list of accounts in the configured committee.
1272    async fn primary_without_handlers(
1273        rng: &mut TestRng,
1274    ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
1275        // Create a committee containing the primary's account.
1276        let (accounts, committee) = {
1277            const COMMITTEE_SIZE: usize = 4;
1278            let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
1279            let mut members = IndexMap::new();
1280
1281            for i in 0..COMMITTEE_SIZE {
1282                let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
1283                let account = Account::new(rng).unwrap();
1284                members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
1285                accounts.push((socket_addr, account));
1286            }
1287
1288            (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
1289        };
1290
1291        let account = accounts.first().unwrap().1.clone();
1292        let ledger = Arc::new(MockLedgerService::new(committee));
1293        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10, None);
1294
1295        // Initialize the primary.
1296        let mut primary = Primary::new(account, storage, false, StorageMode::Development(0), ledger, None).unwrap();
1297
1298        // Construct a worker instance.
1299        primary.workers = Arc::from([Worker::new(
1300            0, // id
1301            primary.storage.clone(),
1302            primary.ledger.clone(),
1303            primary.proposed_batch.clone(),
1304            None,
1305        )
1306        .unwrap()]);
1307
1308        (primary, accounts)
1309    }
1310
1311    // Creates a mock solution.
1312    fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
1313        // Sample a random fake solution ID.
1314        let solution_id = rng.gen::<u64>().into();
1315        // Vary the size of the solutions.
1316        let size = rng.gen_range(1024..10 * 1024);
1317        // Sample random fake solution bytes.
1318        let mut vec = vec![0u8; size];
1319        rng.fill_bytes(&mut vec);
1320        let solution = Data::Buffer(Bytes::from(vec));
1321        // Return the solution ID and solution.
1322        (solution_id, solution)
1323    }
1324
1325    // Creates a mock transaction.
1326    fn sample_unconfirmed_transaction(
1327        rng: &mut TestRng,
1328    ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
1329        // Sample a random fake transaction ID.
1330        let id = Field::<CurrentNetwork>::rand(rng).into();
1331        // Vary the size of the transactions.
1332        let size = rng.gen_range(1024..10 * 1024);
1333        // Sample random fake transaction bytes.
1334        let mut vec = vec![0u8; size];
1335        rng.fill_bytes(&mut vec);
1336        let transaction = Data::Buffer(Bytes::from(vec));
1337        // Return the ID and transaction.
1338        (id, transaction)
1339    }
1340
1341    // Creates a batch proposal with one solution and one transaction.
1342    fn create_test_proposal(
1343        author: &Account<CurrentNetwork>,
1344        committee: Committee<CurrentNetwork>,
1345        round: u64,
1346        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1347        timestamp: i64,
1348        rng: &mut TestRng,
1349    ) -> Proposal<CurrentNetwork> {
1350        let (solution_id, solution) = sample_unconfirmed_solution(rng);
1351        let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1352        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1353        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1354
1355        let solution_transmission_id = (solution_id, solution_checksum).into();
1356        let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1357
1358        // Retrieve the private key.
1359        let private_key = author.private_key();
1360        // Prepare the transmission IDs.
1361        let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
1362        let transmissions = [
1363            (solution_transmission_id, Transmission::Solution(solution)),
1364            (transaction_transmission_id, Transmission::Transaction(transaction)),
1365        ]
1366        .into();
1367        // Sign the batch header.
1368        let batch_header = BatchHeader::new(
1369            private_key,
1370            round,
1371            timestamp,
1372            committee.id(),
1373            transmission_ids,
1374            previous_certificate_ids,
1375            rng,
1376        )
1377        .unwrap();
1378        // Construct the proposal.
1379        Proposal::new(committee, batch_header, transmissions).unwrap()
1380    }
1381
1382    /// Creates a signature of the batch ID for each committee member (excluding the primary).
1383    fn peer_signatures_for_batch(
1384        primary_address: Address<CurrentNetwork>,
1385        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1386        batch_id: Field<CurrentNetwork>,
1387        rng: &mut TestRng,
1388    ) -> IndexSet<Signature<CurrentNetwork>> {
1389        let mut signatures = IndexSet::new();
1390        for (_, account) in accounts {
1391            if account.address() == primary_address {
1392                continue;
1393            }
1394            let signature = account.sign(&[batch_id], rng).unwrap();
1395            signatures.insert(signature);
1396        }
1397        signatures
1398    }
1399
1400    // Creates a batch certificate.
1401    fn create_batch_certificate(
1402        primary_address: Address<CurrentNetwork>,
1403        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1404        round: u64,
1405        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1406        rng: &mut TestRng,
1407    ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
1408        let timestamp = now();
1409
1410        let author =
1411            accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
1412        let private_key = author.private_key();
1413
1414        let committee_id = Field::rand(rng);
1415        let (solution_id, solution) = sample_unconfirmed_solution(rng);
1416        let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1417        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1418        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1419
1420        let solution_transmission_id = (solution_id, solution_checksum).into();
1421        let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1422
1423        let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
1424        let transmissions = [
1425            (solution_transmission_id, Transmission::Solution(solution)),
1426            (transaction_transmission_id, Transmission::Transaction(transaction)),
1427        ]
1428        .into();
1429
1430        let batch_header = BatchHeader::new(
1431            private_key,
1432            round,
1433            timestamp,
1434            committee_id,
1435            transmission_ids,
1436            previous_certificate_ids,
1437            rng,
1438        )
1439        .unwrap();
1440        let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
1441        let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
1442        (certificate, transmissions)
1443    }
1444
1445    // Create a certificate chain up to round in primary storage.
1446    fn store_certificate_chain(
1447        primary: &Primary<CurrentNetwork>,
1448        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1449        round: u64,
1450        rng: &mut TestRng,
1451    ) -> IndexSet<Field<CurrentNetwork>> {
1452        let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1453        let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1454        for cur_round in 1..round {
1455            for (_, account) in accounts.iter() {
1456                let (certificate, transmissions) = create_batch_certificate(
1457                    account.address(),
1458                    accounts,
1459                    cur_round,
1460                    previous_certificates.clone(),
1461                    rng,
1462                );
1463                next_certificates.insert(certificate.id());
1464                assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
1465            }
1466
1467            assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
1468            previous_certificates = next_certificates;
1469            next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1470        }
1471
1472        previous_certificates
1473    }
1474
1475    #[tokio::test]
1476    async fn test_propose_batch() {
1477        let mut rng = TestRng::default();
1478        let (primary, _) = primary_without_handlers(&mut rng).await;
1479
1480        // Check there is no batch currently proposed.
1481        assert!(primary.proposed_batch.read().is_none());
1482
1483        // Generate a solution and a transaction.
1484        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1485        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1486
1487        // Store it on one of the workers.
1488        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1489        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1490
1491        // Try to propose a batch again. This time, it should succeed.
1492        assert!(primary.propose_batch().await.is_ok());
1493
1494        // AlexZ: proposed_batch is no longer written to when processing new batches...
1495        // assert!(primary.proposed_batch.read().is_some());
1496    }
1497
1498    #[tokio::test]
1499    async fn test_propose_batch_with_no_transmissions() {
1500        let mut rng = TestRng::default();
1501        let (primary, _) = primary_without_handlers(&mut rng).await;
1502
1503        // Check there is no batch currently proposed.
1504        assert!(primary.proposed_batch.read().is_none());
1505
1506        // Try to propose a batch with no transmissions.
1507        assert!(primary.propose_batch().await.is_ok());
1508
1509        // AlexZ: proposed_batch is no longer written to when processing new batches...
1510        // assert!(primary.proposed_batch.read().is_some());
1511    }
1512
1513    #[tokio::test]
1514    async fn test_propose_batch_in_round() {
1515        let round = 3;
1516        let mut rng = TestRng::default();
1517        let (primary, accounts) = primary_without_handlers(&mut rng).await;
1518
1519        // Fill primary storage.
1520        store_certificate_chain(&primary, &accounts, round, &mut rng);
1521
1522        // Sleep for a while to ensure the primary is ready to propose the next round.
1523        tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
1524
1525        // Generate a solution and a transaction.
1526        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1527        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1528
1529        // Store it on one of the workers.
1530        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1531        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1532
1533        // Propose a batch again. This time, it should succeed.
1534        assert!(primary.propose_batch().await.is_ok());
1535
1536        // AlexZ: proposed_batch is no longer written to when processing new batches...
1537        // assert!(primary.proposed_batch.read().is_some());
1538    }
1539
1540    #[tokio::test]
1541    async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
1542        let round = 3;
1543        let mut rng = TestRng::default();
1544        let (primary, _) = primary_without_handlers(&mut rng).await;
1545
1546        // Check there is no batch currently proposed.
1547        assert!(primary.proposed_batch.read().is_none());
1548
1549        // Generate a solution and a transaction.
1550        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1551        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1552
1553        // Store it on one of the workers.
1554        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1555        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1556
1557        // Set the proposal lock to a round ahead of the storage.
1558        let old_proposal_lock_round = *primary.propose_lock.lock().await;
1559        *primary.propose_lock.lock().await = round + 1;
1560
1561        // Propose a batch and enforce that it fails.
1562        assert!(primary.propose_batch().await.is_ok());
1563        assert!(primary.proposed_batch.read().is_none());
1564
1565        // Set the proposal lock back to the old round.
1566        *primary.propose_lock.lock().await = old_proposal_lock_round;
1567
1568        // Try to propose a batch again. This time, it should succeed.
1569        assert!(primary.propose_batch().await.is_ok());
1570
1571        // AlexZ: proposed_batch is no longer written to when processing new batches...
1572        // assert!(primary.proposed_batch.read().is_some());
1573    }
1574
1575    #[tokio::test]
1576    async fn test_propose_batch_with_storage_round_behind_proposal() {
1577        let round = 5;
1578        let mut rng = TestRng::default();
1579        let (primary, accounts) = primary_without_handlers(&mut rng).await;
1580
1581        // Generate previous certificates.
1582        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
1583
1584        // Create a valid proposal.
1585        let timestamp = now();
1586        let proposal = create_test_proposal(
1587            &primary.account,
1588            primary.ledger.current_committee().unwrap(),
1589            round + 1,
1590            previous_certificates,
1591            timestamp,
1592            &mut rng,
1593        );
1594
1595        // Store the proposal on the primary.
1596        *primary.proposed_batch.write() = Some(proposal);
1597
1598        // Try to propose a batch will terminate early because the storage is behind the proposal.
1599        assert!(primary.propose_batch().await.is_ok());
1600        assert!(primary.proposed_batch.read().is_some());
1601        assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
1602    }
1603}