amareleo_node_bft/
primary.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    DEVELOPMENT_MODE_RNG_SEED,
18    MAX_BATCH_DELAY_IN_MS,
19    MAX_WORKERS,
20    MIN_BATCH_DELAY_IN_SECS,
21    Sync,
22    Worker,
23    helpers::{
24        BFTSender,
25        PrimaryReceiver,
26        PrimarySender,
27        Proposal,
28        ProposalCache,
29        SignedProposals,
30        Storage,
31        assign_to_worker,
32        assign_to_workers,
33        fmt_id,
34        now,
35    },
36    spawn_blocking,
37};
38use amareleo_chain_account::Account;
39use amareleo_chain_tracing::TracingHandler;
40use amareleo_node_bft_ledger_service::LedgerService;
41use amareleo_node_sync::DUMMY_SELF_IP;
42use snarkvm::{
43    console::{prelude::*, types::Address},
44    ledger::{
45        block::Transaction,
46        narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
47        puzzle::{Solution, SolutionID},
48    },
49    prelude::{Signature, committee::Committee},
50};
51
52use aleo_std::StorageMode;
53use colored::Colorize;
54use futures::stream::{FuturesUnordered, StreamExt};
55use indexmap::IndexMap;
56use parking_lot::{Mutex, RwLock};
57
58use rand::SeedableRng;
59use rand_chacha::ChaChaRng;
60use snarkvm::console::account::PrivateKey;
61
62use std::{
63    collections::{HashMap, HashSet},
64    future::Future,
65    net::SocketAddr,
66    sync::Arc,
67    time::Duration,
68};
69use tokio::{
70    sync::{Mutex as TMutex, OnceCell},
71    task::JoinHandle,
72};
73use tracing::subscriber::DefaultGuard;
74
75/// A helper type for an optional proposed batch.
76pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
77
78#[derive(Clone)]
79pub struct Primary<N: Network> {
80    /// The sync module.
81    sync: Sync<N>,
82    /// Account
83    account: Account<N>,
84    /// The storage.
85    storage: Storage<N>,
86    /// Preserve the chain state on shutdown
87    keep_state: bool,
88    /// The storage mode.
89    storage_mode: StorageMode,
90    /// The ledger service.
91    ledger: Arc<dyn LedgerService<N>>,
92    /// The workers.
93    workers: Arc<[Worker<N>]>,
94    /// The BFT sender.
95    bft_sender: Arc<OnceCell<BFTSender<N>>>,
96    /// The batch proposal, if the primary is currently proposing a batch.
97    proposed_batch: Arc<ProposedBatch<N>>,
98    /// The timestamp of the most recent proposed batch.
99    latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
100    /// The recently-signed batch proposals.
101    signed_proposals: Arc<RwLock<SignedProposals<N>>>,
102    /// The spawned handles.
103    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
104    /// Tracing handle
105    tracing: Option<TracingHandler>,
106    /// The lock for propose_batch.
107    propose_lock: Arc<TMutex<u64>>,
108}
109
110impl<N: Network> Primary<N> {
111    /// The maximum number of unconfirmed transmissions to send to the primary.
112    pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
113
114    /// Initializes a new primary instance.
115    pub fn new(
116        account: Account<N>,
117        storage: Storage<N>,
118        keep_state: bool,
119        storage_mode: StorageMode,
120        ledger: Arc<dyn LedgerService<N>>,
121        tracing: Option<TracingHandler>,
122    ) -> Result<Self> {
123        // Initialize the sync module.
124        let sync = Sync::new(storage.clone(), ledger.clone());
125
126        // Initialize the primary instance.
127        Ok(Self {
128            sync,
129            account,
130            storage,
131            keep_state,
132            storage_mode,
133            ledger,
134            workers: Arc::from(vec![]),
135            bft_sender: Default::default(),
136            proposed_batch: Default::default(),
137            latest_proposed_batch_timestamp: Default::default(),
138            signed_proposals: Default::default(),
139            handles: Default::default(),
140            tracing,
141            propose_lock: Default::default(),
142        })
143    }
144
145    /// Retruns tracing guard
146    pub fn get_tracing_guard(&self) -> Option<DefaultGuard> {
147        self.tracing.clone().map(|trace_handle| trace_handle.subscribe_thread())
148    }
149
150    /// Load the proposal cache file and update the Primary state with the stored data.
151    async fn load_proposal_cache(&self) -> Result<()> {
152        let _guard = self.get_tracing_guard();
153
154        // Fetch the signed proposals from the file system if it exists.
155        match ProposalCache::<N>::exists(&self.storage_mode) {
156            // If the proposal cache exists, then process the proposal cache.
157            true => {
158                match ProposalCache::<N>::load(self.account.address(), &self.storage_mode, self.tracing.clone()) {
159                    Ok(proposal_cache) => {
160                        // Extract the proposal and signed proposals.
161                        let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
162                            proposal_cache.into();
163
164                        // Write the proposed batch.
165                        *self.proposed_batch.write() = proposed_batch;
166                        // Write the signed proposals.
167                        *self.signed_proposals.write() = signed_proposals;
168                        // Writ the propose lock.
169                        *self.propose_lock.lock().await = latest_certificate_round;
170
171                        // Update the storage with the pending certificates.
172                        for certificate in pending_certificates {
173                            let batch_id = certificate.batch_id();
174                            // We use a dummy IP because the node should not need to request from any peers.
175                            // The storage should have stored all the transmissions. If not, we simply
176                            // skip the certificate.
177                            if let Err(err) =
178                                self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
179                            {
180                                warn!(
181                                    "Failed to load stored certificate {} from proposal cache - {err}",
182                                    fmt_id(batch_id)
183                                );
184                            }
185                        }
186                        Ok(())
187                    }
188                    Err(err) => {
189                        bail!("Failed to read the signed proposals from the file system - {err}.");
190                    }
191                }
192            }
193            // If the proposal cache does not exist, then return early.
194            false => Ok(()),
195        }
196    }
197
198    /// Run the primary instance.
199    pub async fn run(
200        &mut self,
201        bft_sender: Option<BFTSender<N>>,
202        _primary_sender: PrimarySender<N>,
203        primary_receiver: PrimaryReceiver<N>,
204    ) -> Result<()> {
205        let _guard = self.get_tracing_guard();
206        info!("Starting the primary instance of the memory pool...");
207
208        // Set the BFT sender.
209        if let Some(bft_sender) = &bft_sender {
210            // Set the BFT sender in the primary.
211            self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set");
212        }
213
214        // Construct a map for the workers.
215        let mut workers = Vec::new();
216        // Initialize the workers.
217        for id in 0..MAX_WORKERS {
218            // Construct the worker instance.
219            let worker = Worker::new(
220                id,
221                self.storage.clone(),
222                self.ledger.clone(),
223                self.proposed_batch.clone(),
224                self.tracing.clone(),
225            )?;
226
227            // Add the worker to the list of workers.
228            workers.push(worker);
229        }
230        // Set the workers.
231        self.workers = Arc::from(workers);
232
233        // Next, initialize the sync module and sync the storage from ledger.
234        self.sync.initialize(bft_sender).await?;
235        // Next, load and process the proposal cache before running the sync module.
236        self.load_proposal_cache().await?;
237        // Next, run the sync module.
238        self.sync.run().await?;
239        // Lastly, start the primary handlers.
240        // Note: This ensures the primary does not start communicating before syncing is complete.
241        self.start_handlers(primary_receiver);
242
243        Ok(())
244    }
245
246    /// Returns the current round.
247    pub fn current_round(&self) -> u64 {
248        self.storage.current_round()
249    }
250
251    /// Returns `true` if the primary is synced.
252    pub fn is_synced(&self) -> bool {
253        self.sync.is_synced()
254    }
255
256    /// Returns the account of the node.
257    pub const fn account(&self) -> &Account<N> {
258        &self.account
259    }
260
261    /// Returns the storage.
262    pub const fn storage(&self) -> &Storage<N> {
263        &self.storage
264    }
265
266    /// Returns the ledger.
267    pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
268        &self.ledger
269    }
270
271    /// Returns the number of workers.
272    pub fn num_workers(&self) -> u8 {
273        u8::try_from(self.workers.len()).expect("Too many workers")
274    }
275
276    /// Returns the workers.
277    pub const fn workers(&self) -> &Arc<[Worker<N>]> {
278        &self.workers
279    }
280
281    /// Returns the batch proposal of our primary, if one currently exists.
282    pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
283        &self.proposed_batch
284    }
285}
286
287impl<N: Network> Primary<N> {
288    /// Returns the number of unconfirmed transmissions.
289    pub fn num_unconfirmed_transmissions(&self) -> usize {
290        self.workers.iter().map(|worker| worker.num_transmissions()).sum()
291    }
292
293    /// Returns the number of unconfirmed ratifications.
294    pub fn num_unconfirmed_ratifications(&self) -> usize {
295        self.workers.iter().map(|worker| worker.num_ratifications()).sum()
296    }
297
298    /// Returns the number of solutions.
299    pub fn num_unconfirmed_solutions(&self) -> usize {
300        self.workers.iter().map(|worker| worker.num_solutions()).sum()
301    }
302
303    /// Returns the number of unconfirmed transactions.
304    pub fn num_unconfirmed_transactions(&self) -> usize {
305        self.workers.iter().map(|worker| worker.num_transactions()).sum()
306    }
307}
308
309impl<N: Network> Primary<N> {
310    /// Returns the worker transmission IDs.
311    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
312        self.workers.iter().flat_map(|worker| worker.transmission_ids())
313    }
314
315    /// Returns the worker transmissions.
316    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
317        self.workers.iter().flat_map(|worker| worker.transmissions())
318    }
319
320    /// Returns the worker solutions.
321    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
322        self.workers.iter().flat_map(|worker| worker.solutions())
323    }
324
325    /// Returns the worker transactions.
326    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
327        self.workers.iter().flat_map(|worker| worker.transactions())
328    }
329}
330
331impl<N: Network> Primary<N> {
332    /// Clears the worker solutions.
333    pub fn clear_worker_solutions(&self) {
334        self.workers.iter().for_each(Worker::clear_solutions);
335    }
336}
337
338impl<N: Network> Primary<N> {
339    pub async fn propose_batch(&self) -> Result<()> {
340        let mut rng = ChaChaRng::seed_from_u64(DEVELOPMENT_MODE_RNG_SEED);
341        let mut all_acc: Vec<Account<N>> = Vec::new();
342
343        for _ in 0u64..4u64 {
344            let private_key = PrivateKey::<N>::new(&mut rng)?;
345            let acc = Account::<N>::try_from(private_key).expect("Failed to initialize account with private key");
346            all_acc.push(acc);
347        }
348
349        // Submit proposal for validator with id 0
350        let primary_addr = all_acc[0].address();
351        let other_acc: Vec<&Account<N>> = all_acc.iter().filter(|acc| acc.address() != primary_addr).collect();
352
353        let round = self.propose_batch_lite(&other_acc).await?;
354        if round == 0u64 {
355            return Ok(());
356        }
357
358        // Submit empty proposals for other validators
359        for vid in 1..all_acc.len() {
360            let primary_acc = &all_acc[vid];
361            let other_acc: Vec<&Account<N>> =
362                all_acc.iter().filter(|acc| acc.address() != primary_acc.address()).collect();
363
364            self.fake_proposal(vid.try_into().unwrap(), primary_acc, &other_acc, round).await?;
365        }
366        Ok(())
367    }
368
369    pub async fn propose_batch_lite(&self, other_acc: &[&Account<N>]) -> Result<u64> {
370        // This function isn't re-entrant.
371        let mut lock_guard = self.propose_lock.lock().await;
372        let _guard = self.get_tracing_guard();
373
374        // Retrieve the current round.
375        let round = self.current_round();
376        // Compute the previous round.
377        let previous_round = round.saturating_sub(1);
378
379        // If the current round is 0, return early.
380        ensure!(round > 0, "Round 0 cannot have transaction batches");
381
382        // If the current storage round is below the latest proposal round, then return early.
383        if round < *lock_guard {
384            warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
385            return Ok(0u64);
386        }
387
388        #[cfg(feature = "metrics")]
389        metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
390
391        // Ensure that the primary does not create a new proposal too quickly.
392        if let Err(e) = self.check_proposal_timestamp(previous_round, self.account.address(), now()) {
393            debug!("Primary is safely skipping a batch proposal for round {round} - {}", format!("{e}").dimmed());
394            return Ok(0u64);
395        }
396
397        // Ensure the primary has not proposed a batch for this round before.
398        if self.storage.contains_certificate_in_round_from(round, self.account.address()) {
399            // If a BFT sender was provided, attempt to advance the current round.
400            if let Some(bft_sender) = self.bft_sender.get() {
401                match bft_sender.send_primary_round_to_bft(self.current_round()).await {
402                    // 'is_ready' is true if the primary is ready to propose a batch for the next round.
403                    Ok(true) => (), // continue,
404                    // 'is_ready' is false if the primary is not ready to propose a batch for the next round.
405                    Ok(false) => return Ok(0u64),
406                    // An error occurred while attempting to advance the current round.
407                    Err(e) => {
408                        warn!("Failed to update the BFT to the next round - {e}");
409                        return Err(e);
410                    }
411                }
412            }
413            debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
414            return Ok(0u64);
415        }
416
417        // Determine if the current round has been proposed.
418        // Note: Do NOT make this judgment in advance before rebroadcast and round update. Rebroadcasting is
419        // good for network reliability and should not be prevented for the already existing proposed_batch.
420        // If a certificate already exists for the current round, an attempt should be made to advance the
421        // round as early as possible.
422        if round == *lock_guard {
423            warn!("Primary is safely skipping a batch proposal - round {round} already proposed");
424            return Ok(0u64);
425        }
426
427        // Retrieve the committee to check against.
428        let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
429        // Check if the primary is connected to enough validators to reach quorum threshold.
430        {
431            // Retrieve the connected validator addresses.
432            let mut connected_validators: HashSet<Address<N>> = other_acc.iter().map(|acc| acc.address()).collect();
433
434            // Append the primary to the set.
435            connected_validators.insert(self.account.address());
436
437            // If quorum threshold is not reached, return early.
438            if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
439                debug!(
440                    "Primary is safely skipping a batch proposal for round {round} {}",
441                    "(please connect to more validators)".dimmed()
442                );
443                trace!("Primary is connected to {} validators", connected_validators.len() - 1);
444                return Ok(0u64);
445            }
446        }
447
448        // Retrieve the previous certificates.
449        let previous_certificates = self.storage.get_certificates_for_round(previous_round);
450
451        // Check if the batch is ready to be proposed.
452        // Note: The primary starts at round 1, and round 0 contains no certificates, by definition.
453        let mut is_ready = previous_round == 0;
454        // If the previous round is not 0, check if the previous certificates have reached the quorum threshold.
455        if previous_round > 0 {
456            // Retrieve the committee lookback for the round.
457            let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
458                bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
459            };
460            // Construct a set over the authors.
461            let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
462            // Check if the previous certificates have reached the quorum threshold.
463            if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
464                is_ready = true;
465            }
466        }
467        // If the batch is not ready to be proposed, return early.
468        if !is_ready {
469            debug!(
470                "Primary is safely skipping a batch proposal for round {round} {}",
471                format!("(previous round {previous_round} has not reached quorum)").dimmed()
472            );
473            return Ok(0u64);
474        }
475
476        // Determined the required number of transmissions per worker.
477        let num_transmissions_per_worker = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / self.num_workers() as usize;
478        // Initialize the map of transmissions.
479        let mut transmissions: IndexMap<_, _> = Default::default();
480        // Take the transmissions from the workers.
481        for worker in self.workers.iter() {
482            // Initialize a tracker for included transmissions for the current worker.
483            let mut num_transmissions_included_for_worker = 0;
484            // Keep draining the worker until the desired number of transmissions is reached or the worker is empty.
485            'outer: while num_transmissions_included_for_worker < num_transmissions_per_worker {
486                // Determine the number of remaining transmissions for the worker.
487                let num_remaining_transmissions =
488                    num_transmissions_per_worker.saturating_sub(num_transmissions_included_for_worker);
489                // Drain the worker.
490                let mut worker_transmissions = worker.drain(num_remaining_transmissions).peekable();
491                // If the worker is empty, break early.
492                if worker_transmissions.peek().is_none() {
493                    break 'outer;
494                }
495                // Iterate through the worker transmissions.
496                'inner: for (id, transmission) in worker_transmissions {
497                    // Check if the ledger already contains the transmission.
498                    if self.ledger.contains_transmission(&id).unwrap_or(true) {
499                        trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
500                        continue 'inner;
501                    }
502                    // Check if the storage already contain the transmission.
503                    // Note: We do not skip if this is the first transmission in the proposal, to ensure that
504                    // the primary does not propose a batch with no transmissions.
505                    if !transmissions.is_empty() && self.storage.contains_transmission(id) {
506                        trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
507                        continue 'inner;
508                    }
509                    // Check the transmission is still valid.
510                    match (id, transmission.clone()) {
511                        (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
512                            // Ensure the checksum matches.
513                            match solution.to_checksum::<N>() {
514                                Ok(solution_checksum) if solution_checksum == checksum => (),
515                                _ => {
516                                    trace!(
517                                        "Proposing - Skipping solution '{}' - Checksum mismatch",
518                                        fmt_id(solution_id)
519                                    );
520                                    continue 'inner;
521                                }
522                            }
523                            // Check if the solution is still valid.
524                            if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
525                                trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
526                                continue 'inner;
527                            }
528                        }
529                        (
530                            TransmissionID::Transaction(transaction_id, checksum),
531                            Transmission::Transaction(transaction),
532                        ) => {
533                            // Ensure the checksum matches.
534                            match transaction.to_checksum::<N>() {
535                                Ok(transaction_checksum) if transaction_checksum == checksum => (),
536                                _ => {
537                                    trace!(
538                                        "Proposing - Skipping transaction '{}' - Checksum mismatch",
539                                        fmt_id(transaction_id)
540                                    );
541                                    continue 'inner;
542                                }
543                            }
544                            // Check if the transaction is still valid.
545                            if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
546                                trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
547                                continue 'inner;
548                            }
549                        }
550                        // Note: We explicitly forbid including ratifications,
551                        // as the protocol currently does not support ratifications.
552                        (TransmissionID::Ratification, Transmission::Ratification) => continue,
553                        // All other combinations are clearly invalid.
554                        _ => continue 'inner,
555                    }
556                    // Insert the transmission into the map.
557                    transmissions.insert(id, transmission);
558                    num_transmissions_included_for_worker += 1;
559                }
560            }
561        }
562
563        // Determine the current timestamp.
564        let current_timestamp = now();
565
566        *lock_guard = round;
567
568        /* Proceeding to sign & propose the batch. */
569        info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
570
571        // Retrieve the private key.
572        let private_key = *self.account.private_key();
573        // Retrieve the committee ID.
574        let committee_id = committee_lookback.id();
575        // Prepare the transmission IDs.
576        let transmission_ids = transmissions.keys().copied().collect();
577        // Prepare the previous batch certificate IDs.
578        let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
579        // Sign the batch header and construct the proposal.
580        let (batch_header, mut proposal) = spawn_blocking!(BatchHeader::new(
581            &private_key,
582            round,
583            current_timestamp,
584            committee_id,
585            transmission_ids,
586            previous_certificate_ids,
587            &mut rand::thread_rng()
588        ))
589        .and_then(|batch_header| {
590            Proposal::new(committee_lookback.clone(), batch_header.clone(), transmissions.clone())
591                .map(|proposal| (batch_header, proposal))
592        })
593        .inspect_err(|_| {
594            // On error, reinsert the transmissions and then propagate the error.
595            if let Err(e) = self.reinsert_transmissions_into_workers(transmissions) {
596                error!("Failed to reinsert transmissions: {e:?}");
597            }
598        })?;
599        // Set the timestamp of the latest proposed batch.
600        *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
601
602        // // Set the proposed batch.
603        // *self.proposed_batch.write() = Some(proposal);
604
605        //===============================================================================
606        // Processing proposal
607
608        info!("Quorum threshold reached - Preparing to certify our batch for round {round}...");
609
610        // Retrieve the batch ID.
611        let batch_id = batch_header.batch_id();
612
613        // Forge signatures of other validators.
614        for acc in other_acc.iter() {
615            // Sign the batch ID.
616            let signer_acc = (*acc).clone();
617            let signer = signer_acc.address();
618            let signature = spawn_blocking!(signer_acc.sign(&[batch_id], &mut rand::thread_rng()))?;
619
620            // Add the signature to the batch.
621            proposal.add_signature(signer, signature, &committee_lookback)?;
622        }
623
624        // Store the certified batch and broadcast it to all validators.
625        // If there was an error storing the certificate, reinsert the transmissions back into the ready queue.
626        if let Err(e) = self.store_and_broadcast_certificate_lite(&proposal, &committee_lookback).await {
627            // Reinsert the transmissions back into the ready queue for the next proposal.
628            self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
629            return Err(e);
630        }
631
632        #[cfg(feature = "metrics")]
633        metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
634        Ok(round)
635    }
636
637    pub async fn fake_proposal(
638        &self,
639        vid: u64,
640        primary_acc: &Account<N>,
641        other_acc: &[&Account<N>],
642        round: u64,
643    ) -> Result<()> {
644        let _guard = self.get_tracing_guard();
645        let transmissions: IndexMap<_, _> = Default::default();
646        let transmission_ids = transmissions.keys().copied().collect();
647
648        let private_key = *primary_acc.private_key();
649        let current_timestamp = now();
650
651        let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
652        let committee_id = committee_lookback.id();
653
654        let previous_round = round.saturating_sub(1);
655        let previous_certificates = self.storage.get_certificates_for_round(previous_round);
656        let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
657
658        let (batch_header, mut proposal) = spawn_blocking!(BatchHeader::new(
659            &private_key,
660            round,
661            current_timestamp,
662            committee_id,
663            transmission_ids,
664            previous_certificate_ids,
665            &mut rand::thread_rng()
666        ))
667        .and_then(|batch_header| {
668            Proposal::new(committee_lookback.clone(), batch_header.clone(), transmissions.clone())
669                .map(|proposal| (batch_header, proposal))
670        })?;
671
672        // Retrieve the batch ID.
673        let batch_id = batch_header.batch_id();
674        let mut our_sign: Option<Signature<N>> = None;
675
676        // Forge signatures of other validators.
677        for acc in other_acc.iter() {
678            // Sign the batch ID.
679            let signer_acc = (*acc).clone();
680            let signer = signer_acc.address();
681            let signature = spawn_blocking!(signer_acc.sign(&[batch_id], &mut rand::thread_rng()))?;
682
683            if signer == self.account.address() {
684                our_sign = Some(signature);
685            }
686
687            // Add the signature to the batch.
688            proposal.add_signature(signer, signature, &committee_lookback)?;
689        }
690
691        // Ensure our signature was not inserted (validator 0 signature)
692        let our_sign = match our_sign {
693            Some(sign) => sign,
694            None => bail!("Fake Proposal generation failed. Validator 0 signature missing."),
695        };
696
697        // Create the batch certificate and transmissions.
698        let (certificate, transmissions) =
699            tokio::task::block_in_place(|| proposal.to_certificate(&committee_lookback))?;
700
701        // Convert the transmissions into a HashMap.
702        // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
703        let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
704
705        // Store the certified batch.
706        let (storage, certificate_) = (self.storage.clone(), certificate.clone());
707        spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
708        info!("Stored a batch certificate for validator/round {vid}/{round}");
709
710        match self.signed_proposals.write().0.entry(primary_acc.address()) {
711            std::collections::hash_map::Entry::Occupied(mut entry) => {
712                // If the validator has already signed a batch for this round, then return early,
713                // since, if the peer still has not received the signature, they will request it again,
714                // and the logic at the start of this function will resend the (now cached) signature
715                // to the peer if asked to sign this batch proposal again.
716                if entry.get().0 == round {
717                    return Ok(());
718                }
719                // Otherwise, cache the round, batch ID, and signature for this validator.
720                entry.insert((round, batch_id, our_sign));
721                info!("Inserted signature to signed_proposals {vid}/{round}");
722            }
723            // If the validator has not signed a batch before, then continue.
724            std::collections::hash_map::Entry::Vacant(entry) => {
725                // Cache the round, batch ID, and signature for this validator.
726                entry.insert((round, batch_id, our_sign));
727                info!("Inserted signature to signed_proposals {vid}/{round}");
728            }
729        };
730
731        if let Some(bft_sender) = self.bft_sender.get() {
732            // Send the certificate to the BFT.
733            if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
734                warn!("Failed to update the BFT DAG from sync: {e}");
735                return Err(e);
736            };
737        }
738
739        Ok(())
740    }
741}
742
743impl<N: Network> Primary<N> {
744    /// Starts the primary handlers.
745    fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
746        let PrimaryReceiver {
747            rx_batch_propose: _,
748            rx_batch_signature: _,
749            rx_batch_certified: _,
750            rx_primary_ping: _,
751            mut rx_unconfirmed_solution,
752            mut rx_unconfirmed_transaction,
753        } = primary_receiver;
754
755        // Start the batch proposer.
756        let self_ = self.clone();
757        let guard = self_.get_tracing_guard();
758        self.spawn(async move {
759            let _guard = guard;
760            loop {
761                // Sleep briefly, but longer than if there were no batch.
762                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
763                let current_round = self_.current_round();
764                // If the primary is not synced, then do not propose a batch.
765                if !self_.is_synced() {
766                    debug!("Skipping batch proposal for round {current_round} {}", "(node is syncing)".dimmed());
767                    continue;
768                }
769                // A best-effort attempt to skip the scheduled batch proposal if
770                // round progression already triggered one.
771                if self_.propose_lock.try_lock().is_err() {
772                    trace!(
773                        "Skipping batch proposal for round {current_round} {}",
774                        "(node is already proposing)".dimmed()
775                    );
776                    continue;
777                };
778                // If there is no proposed batch, attempt to propose a batch.
779                // Note: Do NOT spawn a task around this function call. Proposing a batch is a critical path,
780                // and only one batch needs be proposed at a time.
781                if let Err(e) = self_.propose_batch().await {
782                    warn!("Cannot propose a batch - {e}");
783                }
784            }
785        });
786
787        // Periodically try to increment to the next round.
788        // Note: This is necessary to ensure that the primary is not stuck on a previous round
789        // despite having received enough certificates to advance to the next round.
790        let self_ = self.clone();
791        let guard = self_.get_tracing_guard();
792        self.spawn(async move {
793            let _guard = guard;
794            loop {
795                // Sleep briefly.
796                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
797                // If the primary is not synced, then do not increment to the next round.
798                if !self_.is_synced() {
799                    trace!("Skipping round increment {}", "(node is syncing)".dimmed());
800                    continue;
801                }
802                // Attempt to increment to the next round.
803                let next_round = self_.current_round().saturating_add(1);
804                // Determine if the quorum threshold is reached for the current round.
805                let is_quorum_threshold_reached = {
806                    // Retrieve the certificate authors for the next round.
807                    let authors = self_.storage.get_certificate_authors_for_round(next_round);
808                    // If there are no certificates, then skip this check.
809                    if authors.is_empty() {
810                        continue;
811                    }
812                    let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(next_round) else {
813                        warn!("Failed to retrieve the committee lookback for round {next_round}");
814                        continue;
815                    };
816                    committee_lookback.is_quorum_threshold_reached(&authors)
817                };
818                // Attempt to increment to the next round if the quorum threshold is reached.
819                if is_quorum_threshold_reached {
820                    debug!("Quorum threshold reached for round {}", next_round);
821                    if let Err(e) = self_.try_increment_to_the_next_round(next_round).await {
822                        warn!("Failed to increment to the next round - {e}");
823                    }
824                }
825            }
826        });
827
828        // Process the unconfirmed solutions.
829        let self_ = self.clone();
830        let guard = self_.get_tracing_guard();
831        self.spawn(async move {
832            let _guard = guard;
833            while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
834                // Compute the checksum for the solution.
835                let Ok(checksum) = solution.to_checksum::<N>() else {
836                    error!("Failed to compute the checksum for the unconfirmed solution");
837                    continue;
838                };
839                // Compute the worker ID.
840                let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
841                    error!("Unable to determine the worker ID for the unconfirmed solution");
842                    continue;
843                };
844                let self_ = self_.clone();
845                tokio::spawn(async move {
846                    // Retrieve the worker.
847                    let worker = &self_.workers[worker_id as usize];
848                    // Process the unconfirmed solution.
849                    let result = worker.process_unconfirmed_solution(solution_id, solution).await;
850                    // Send the result to the callback.
851                    callback.send(result).ok();
852                });
853            }
854        });
855
856        // Process the unconfirmed transactions.
857        let self_ = self.clone();
858        let guard = self_.get_tracing_guard();
859        self.spawn(async move {
860            let _guard = guard;
861            while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
862                trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
863                // Compute the checksum for the transaction.
864                let Ok(checksum) = transaction.to_checksum::<N>() else {
865                    error!("Failed to compute the checksum for the unconfirmed transaction");
866                    continue;
867                };
868                // Compute the worker ID.
869                let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
870                    error!("Unable to determine the worker ID for the unconfirmed transaction");
871                    continue;
872                };
873                let self_ = self_.clone();
874                tokio::spawn(async move {
875                    // Retrieve the worker.
876                    let worker = &self_.workers[worker_id as usize];
877                    // Process the unconfirmed transaction.
878                    let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
879                    // Send the result to the callback.
880                    callback.send(result).ok();
881                });
882            }
883        });
884    }
885
886    /// Increments to the next round.
887    async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
888        let _guard = self.get_tracing_guard();
889
890        // If the next round is within GC range, then iterate to the penultimate round.
891        if self.current_round() + self.storage.max_gc_rounds() >= next_round {
892            let mut fast_forward_round = self.current_round();
893            // Iterate until the penultimate round is reached.
894            while fast_forward_round < next_round.saturating_sub(1) {
895                // Update to the next round in storage.
896                fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
897                // Clear the proposed batch.
898                *self.proposed_batch.write() = None;
899            }
900        }
901
902        // Retrieve the current round.
903        let current_round = self.current_round();
904        // Attempt to advance to the next round.
905        if current_round < next_round {
906            // If a BFT sender was provided, send the current round to the BFT.
907            let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
908                match bft_sender.send_primary_round_to_bft(current_round).await {
909                    Ok(is_ready) => is_ready,
910                    Err(e) => {
911                        warn!("Failed to update the BFT to the next round - {e}");
912                        return Err(e);
913                    }
914                }
915            }
916            // Otherwise, handle the Narwhal case.
917            else {
918                // Update to the next round in storage.
919                self.storage.increment_to_next_round(current_round)?;
920                // Set 'is_ready' to 'true'.
921                true
922            };
923
924            // Log whether the next round is ready.
925            match is_ready {
926                true => debug!("Primary is ready to propose the next round"),
927                false => debug!("Primary is not ready to propose the next round"),
928            }
929
930            // If the node is ready, propose a batch for the next round.
931            if is_ready {
932                self.propose_batch().await?;
933            }
934        }
935        Ok(())
936    }
937
938    /// Increments to the next round.
939    async fn try_increment_to_the_next_round_lite(&self, next_round: u64) -> Result<()> {
940        let _guard = self.get_tracing_guard();
941
942        // If the next round is within GC range, then iterate to the penultimate round.
943        if self.current_round() + self.storage.max_gc_rounds() >= next_round {
944            let mut fast_forward_round = self.current_round();
945            // Iterate until the penultimate round is reached.
946            while fast_forward_round < next_round.saturating_sub(1) {
947                // Update to the next round in storage.
948                fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
949                // Clear the proposed batch.
950                *self.proposed_batch.write() = None;
951            }
952        }
953
954        // Retrieve the current round.
955        let current_round = self.current_round();
956        // Attempt to advance to the next round.
957        if current_round < next_round {
958            // If a BFT sender was provided, send the current round to the BFT.
959            let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
960                match bft_sender.send_primary_round_to_bft(current_round).await {
961                    Ok(is_ready) => is_ready,
962                    Err(e) => {
963                        warn!("Failed to update the BFT to the next round - {e}");
964                        return Err(e);
965                    }
966                }
967            }
968            // Otherwise, handle the Narwhal case.
969            else {
970                // Update to the next round in storage.
971                self.storage.increment_to_next_round(current_round)?;
972                // Set 'is_ready' to 'true'.
973                true
974            };
975
976            // Log whether the next round is ready.
977            match is_ready {
978                true => debug!("Primary is ready to propose the next round"),
979                false => debug!("Primary is not ready to propose the next round"),
980            }
981
982            // // If the node is ready, propose a batch for the next round.
983            // if is_ready {
984            //     self.propose_batch().await?;
985            // }
986        }
987        Ok(())
988    }
989
990    /// Ensure the primary is not creating batch proposals too frequently.
991    /// This checks that the certificate timestamp for the previous round is within the expected range.
992    fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
993        // Retrieve the timestamp of the previous timestamp to check against.
994        let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
995            // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
996            Some(certificate) => certificate.timestamp(),
997            None => *self.latest_proposed_batch_timestamp.read(),
998        };
999
1000        // Determine the elapsed time since the previous timestamp.
1001        let elapsed = timestamp
1002            .checked_sub(previous_timestamp)
1003            .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
1004        // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
1005        match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
1006            true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
1007            false => Ok(()),
1008        }
1009    }
1010
1011    /// Stores the certified batch and broadcasts it to all validators, returning the certificate.
1012    async fn store_and_broadcast_certificate_lite(
1013        &self,
1014        proposal: &Proposal<N>,
1015        committee: &Committee<N>,
1016    ) -> Result<()> {
1017        let _guard = self.get_tracing_guard();
1018
1019        // Create the batch certificate and transmissions.
1020        let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
1021        // Convert the transmissions into a HashMap.
1022        // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
1023        let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
1024        // Store the certified batch.
1025        let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1026        spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
1027        debug!("Stored a batch certificate for round {}", certificate.round());
1028        // If a BFT sender was provided, send the certificate to the BFT.
1029        if let Some(bft_sender) = self.bft_sender.get() {
1030            // Await the callback to continue.
1031            if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
1032                warn!("Failed to update the BFT DAG from primary - {e}");
1033                return Err(e);
1034            };
1035        }
1036        // Log the certified batch.
1037        let num_transmissions = certificate.transmission_ids().len();
1038        let round = certificate.round();
1039        info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
1040        // Increment to the next round.
1041        self.try_increment_to_the_next_round_lite(round + 1).await
1042    }
1043
1044    /// Stores the certified batch and broadcasts it to all validators, returning the certificate.
1045    /// Re-inserts the transmissions from the proposal into the workers.
1046    fn reinsert_transmissions_into_workers(
1047        &self,
1048        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1049    ) -> Result<()> {
1050        // Re-insert the transmissions into the workers.
1051        assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1052            worker.reinsert(transmission_id, transmission);
1053        })
1054    }
1055
1056    /// Recursively stores a given batch certificate, after ensuring:
1057    ///   - Ensure the round matches the committee round.
1058    ///   - Ensure the address is a member of the committee.
1059    ///   - Ensure the timestamp is within range.
1060    ///   - Ensure we have all of the transmissions.
1061    ///   - Ensure we have all of the previous certificates.
1062    ///   - Ensure the previous certificates are for the previous round (i.e. round - 1).
1063    ///   - Ensure the previous certificates have reached the quorum threshold.
1064    ///   - Ensure we have not already signed the batch ID.
1065    #[async_recursion::async_recursion]
1066    async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1067        &self,
1068        peer_ip: SocketAddr,
1069        certificate: BatchCertificate<N>,
1070    ) -> Result<()> {
1071        let _guard = self.get_tracing_guard();
1072        // Retrieve the batch header.
1073        let batch_header = certificate.batch_header();
1074        // Retrieve the batch round.
1075        let batch_round = batch_header.round();
1076
1077        // If the certificate round is outdated, do not store it.
1078        if batch_round <= self.storage.gc_round() {
1079            return Ok(());
1080        }
1081        // If the certificate already exists in storage, return early.
1082        if self.storage.contains_certificate(certificate.id()) {
1083            return Ok(());
1084        }
1085
1086        // If node is not in sync mode and the node is not synced. Then return an error.
1087        if !IS_SYNCING && !self.is_synced() {
1088            bail!(
1089                "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1090                fmt_id(certificate.id())
1091            );
1092        }
1093
1094        // If the peer is ahead, use the batch header to sync up to the peer.
1095        let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?;
1096
1097        // Check if the certificate needs to be stored.
1098        if !self.storage.contains_certificate(certificate.id()) {
1099            // Store the batch certificate.
1100            let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1101            spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1102            debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1103            // If a BFT sender was provided, send the round and certificate to the BFT.
1104            if let Some(bft_sender) = self.bft_sender.get() {
1105                // Send the certificate to the BFT.
1106                if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1107                    warn!("Failed to update the BFT DAG from sync: {e}");
1108                    return Err(e);
1109                };
1110            }
1111        }
1112        Ok(())
1113    }
1114
1115    /// Recursively syncs using the given batch header.
1116    async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>(
1117        &self,
1118        peer_ip: SocketAddr,
1119        batch_header: &BatchHeader<N>,
1120    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1121        // Retrieve the batch round.
1122        let batch_round = batch_header.round();
1123
1124        // If the certificate round is outdated, do not store it.
1125        if batch_round <= self.storage.gc_round() {
1126            bail!("Round {batch_round} is too far in the past")
1127        }
1128
1129        // If node is not in sync mode and the node is not synced. Then return an error.
1130        if !IS_SYNCING && !self.is_synced() {
1131            bail!(
1132                "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1133                fmt_id(batch_header.batch_id())
1134            );
1135        }
1136
1137        // Determine if quorum threshold is reached on the batch round.
1138        let is_quorum_threshold_reached = {
1139            let authors = self.storage.get_certificate_authors_for_round(batch_round);
1140            let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1141            committee_lookback.is_quorum_threshold_reached(&authors)
1142        };
1143
1144        // Check if our primary should move to the next round.
1145        // Note: Checking that quorum threshold is reached is important for mitigating a race condition,
1146        // whereby Narwhal requires N-f, however the BFT only requires f+1. Without this check, the primary
1147        // will advance to the next round assuming f+1, not N-f, which can lead to a network stall.
1148        let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1149        // Check if our primary is far behind the peer.
1150        let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1151        // If our primary is far behind the peer, update our committee to the batch round.
1152        if is_behind_schedule || is_peer_far_in_future {
1153            // If the batch round is greater than the current committee round, update the committee.
1154            self.try_increment_to_the_next_round(batch_round).await?;
1155        }
1156
1157        // Ensure the primary has all of the transmissions.
1158        let missing_transmissions = self.fetch_missing_transmissions(batch_header).await.map_err(|e| {
1159            anyhow!("Failed to fetch missing transmissions for round {batch_round} from '{peer_ip}' - {e}")
1160        })?;
1161
1162        Ok(missing_transmissions)
1163    }
1164
1165    /// Fetches any missing transmissions for the specified batch header.
1166    /// If a transmission does not exist, it will be fetched from the specified peer IP.
1167    async fn fetch_missing_transmissions(
1168        &self,
1169        batch_header: &BatchHeader<N>,
1170    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1171        // If the round is <= the GC round, return early.
1172        if batch_header.round() <= self.storage.gc_round() {
1173            return Ok(Default::default());
1174        }
1175
1176        let _guard = self.get_tracing_guard();
1177        // Ensure this batch ID is new, otherwise return early.
1178        if self.storage.contains_batch(batch_header.batch_id()) {
1179            trace!("Batch for round {} from peer has already been processed", batch_header.round());
1180            return Ok(Default::default());
1181        }
1182
1183        // Retrieve the workers.
1184        let workers = self.workers.clone();
1185
1186        // Initialize a list for the transmissions.
1187        let mut fetch_transmissions = FuturesUnordered::new();
1188
1189        // Retrieve the number of workers.
1190        let num_workers = self.num_workers();
1191        // Iterate through the transmission IDs.
1192        for transmission_id in batch_header.transmission_ids() {
1193            // If the transmission does not exist in storage, proceed to fetch the transmission.
1194            if !self.storage.contains_transmission(*transmission_id) {
1195                // Determine the worker ID.
1196                let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1197                    bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1198                };
1199                // Retrieve the worker.
1200                let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1201                // Push the callback onto the list.
1202                fetch_transmissions.push(worker.get_or_fetch_transmission(*transmission_id));
1203            }
1204        }
1205
1206        // Initialize a set for the transmissions.
1207        let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1208        // Wait for all of the transmissions to be fetched.
1209        while let Some(result) = fetch_transmissions.next().await {
1210            // Retrieve the transmission.
1211            let (transmission_id, transmission) = result?;
1212            // Insert the transmission into the set.
1213            transmissions.insert(transmission_id, transmission);
1214        }
1215        // Return the transmissions.
1216        Ok(transmissions)
1217    }
1218}
1219
1220impl<N: Network> Primary<N> {
1221    /// Spawns a task with the given future; it should only be used for long-running tasks.
1222    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1223        self.handles.lock().push(tokio::spawn(future));
1224    }
1225
1226    /// Shuts down the primary.
1227    pub async fn shut_down(&self) {
1228        let _guard = self.get_tracing_guard();
1229        info!("Shutting down the primary...");
1230        // Abort the tasks.
1231        self.handles.lock().iter().for_each(|handle| handle.abort());
1232
1233        // Save the current proposal cache to disk.
1234        if self.keep_state {
1235            let proposal_cache = {
1236                let proposal = self.proposed_batch.write().take();
1237                let signed_proposals = self.signed_proposals.read().clone();
1238                let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
1239                let pending_certificates = self.storage.get_pending_certificates();
1240                ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
1241            };
1242
1243            if let Err(err) = proposal_cache.store(&self.storage_mode, self.tracing.clone()) {
1244                error!("Failed to store the current proposal cache: {err}");
1245            }
1246        }
1247    }
1248}
1249
1250#[cfg(test)]
1251mod tests {
1252    use super::*;
1253    use amareleo_node_bft_ledger_service::MockLedgerService;
1254    use amareleo_node_bft_storage_service::BFTMemoryService;
1255    use snarkvm::{
1256        console::types::Field,
1257        ledger::committee::{Committee, MIN_VALIDATOR_STAKE},
1258        prelude::{Address, Signature},
1259    };
1260
1261    use bytes::Bytes;
1262    use indexmap::IndexSet;
1263    use rand::RngCore;
1264
1265    type CurrentNetwork = snarkvm::prelude::MainnetV0;
1266
1267    // Returns a primary and a list of accounts in the configured committee.
1268    async fn primary_without_handlers(
1269        rng: &mut TestRng,
1270    ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
1271        // Create a committee containing the primary's account.
1272        let (accounts, committee) = {
1273            const COMMITTEE_SIZE: usize = 4;
1274            let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
1275            let mut members = IndexMap::new();
1276
1277            for i in 0..COMMITTEE_SIZE {
1278                let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
1279                let account = Account::new(rng).unwrap();
1280                members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
1281                accounts.push((socket_addr, account));
1282            }
1283
1284            (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
1285        };
1286
1287        let account = accounts.first().unwrap().1.clone();
1288        let ledger = Arc::new(MockLedgerService::new(committee));
1289        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10, None);
1290
1291        // Initialize the primary.
1292        let mut primary = Primary::new(account, storage, false, StorageMode::Development(0), ledger, None).unwrap();
1293
1294        // Construct a worker instance.
1295        primary.workers = Arc::from([Worker::new(
1296            0, // id
1297            primary.storage.clone(),
1298            primary.ledger.clone(),
1299            primary.proposed_batch.clone(),
1300            None,
1301        )
1302        .unwrap()]);
1303
1304        (primary, accounts)
1305    }
1306
1307    // Creates a mock solution.
1308    fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
1309        // Sample a random fake solution ID.
1310        let solution_id = rng.gen::<u64>().into();
1311        // Vary the size of the solutions.
1312        let size = rng.gen_range(1024..10 * 1024);
1313        // Sample random fake solution bytes.
1314        let mut vec = vec![0u8; size];
1315        rng.fill_bytes(&mut vec);
1316        let solution = Data::Buffer(Bytes::from(vec));
1317        // Return the solution ID and solution.
1318        (solution_id, solution)
1319    }
1320
1321    // Creates a mock transaction.
1322    fn sample_unconfirmed_transaction(
1323        rng: &mut TestRng,
1324    ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
1325        // Sample a random fake transaction ID.
1326        let id = Field::<CurrentNetwork>::rand(rng).into();
1327        // Vary the size of the transactions.
1328        let size = rng.gen_range(1024..10 * 1024);
1329        // Sample random fake transaction bytes.
1330        let mut vec = vec![0u8; size];
1331        rng.fill_bytes(&mut vec);
1332        let transaction = Data::Buffer(Bytes::from(vec));
1333        // Return the ID and transaction.
1334        (id, transaction)
1335    }
1336
1337    // Creates a batch proposal with one solution and one transaction.
1338    fn create_test_proposal(
1339        author: &Account<CurrentNetwork>,
1340        committee: Committee<CurrentNetwork>,
1341        round: u64,
1342        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1343        timestamp: i64,
1344        rng: &mut TestRng,
1345    ) -> Proposal<CurrentNetwork> {
1346        let (solution_id, solution) = sample_unconfirmed_solution(rng);
1347        let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1348        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1349        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1350
1351        let solution_transmission_id = (solution_id, solution_checksum).into();
1352        let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1353
1354        // Retrieve the private key.
1355        let private_key = author.private_key();
1356        // Prepare the transmission IDs.
1357        let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
1358        let transmissions = [
1359            (solution_transmission_id, Transmission::Solution(solution)),
1360            (transaction_transmission_id, Transmission::Transaction(transaction)),
1361        ]
1362        .into();
1363        // Sign the batch header.
1364        let batch_header = BatchHeader::new(
1365            private_key,
1366            round,
1367            timestamp,
1368            committee.id(),
1369            transmission_ids,
1370            previous_certificate_ids,
1371            rng,
1372        )
1373        .unwrap();
1374        // Construct the proposal.
1375        Proposal::new(committee, batch_header, transmissions).unwrap()
1376    }
1377
1378    /// Creates a signature of the batch ID for each committee member (excluding the primary).
1379    fn peer_signatures_for_batch(
1380        primary_address: Address<CurrentNetwork>,
1381        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1382        batch_id: Field<CurrentNetwork>,
1383        rng: &mut TestRng,
1384    ) -> IndexSet<Signature<CurrentNetwork>> {
1385        let mut signatures = IndexSet::new();
1386        for (_, account) in accounts {
1387            if account.address() == primary_address {
1388                continue;
1389            }
1390            let signature = account.sign(&[batch_id], rng).unwrap();
1391            signatures.insert(signature);
1392        }
1393        signatures
1394    }
1395
1396    // Creates a batch certificate.
1397    fn create_batch_certificate(
1398        primary_address: Address<CurrentNetwork>,
1399        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1400        round: u64,
1401        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1402        rng: &mut TestRng,
1403    ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
1404        let timestamp = now();
1405
1406        let author =
1407            accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
1408        let private_key = author.private_key();
1409
1410        let committee_id = Field::rand(rng);
1411        let (solution_id, solution) = sample_unconfirmed_solution(rng);
1412        let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1413        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1414        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1415
1416        let solution_transmission_id = (solution_id, solution_checksum).into();
1417        let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1418
1419        let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
1420        let transmissions = [
1421            (solution_transmission_id, Transmission::Solution(solution)),
1422            (transaction_transmission_id, Transmission::Transaction(transaction)),
1423        ]
1424        .into();
1425
1426        let batch_header = BatchHeader::new(
1427            private_key,
1428            round,
1429            timestamp,
1430            committee_id,
1431            transmission_ids,
1432            previous_certificate_ids,
1433            rng,
1434        )
1435        .unwrap();
1436        let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
1437        let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
1438        (certificate, transmissions)
1439    }
1440
1441    // Create a certificate chain up to round in primary storage.
1442    fn store_certificate_chain(
1443        primary: &Primary<CurrentNetwork>,
1444        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1445        round: u64,
1446        rng: &mut TestRng,
1447    ) -> IndexSet<Field<CurrentNetwork>> {
1448        let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1449        let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1450        for cur_round in 1..round {
1451            for (_, account) in accounts.iter() {
1452                let (certificate, transmissions) = create_batch_certificate(
1453                    account.address(),
1454                    accounts,
1455                    cur_round,
1456                    previous_certificates.clone(),
1457                    rng,
1458                );
1459                next_certificates.insert(certificate.id());
1460                assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
1461            }
1462
1463            assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
1464            previous_certificates = next_certificates;
1465            next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1466        }
1467
1468        previous_certificates
1469    }
1470
1471    #[tokio::test]
1472    async fn test_propose_batch() {
1473        let mut rng = TestRng::default();
1474        let (primary, _) = primary_without_handlers(&mut rng).await;
1475
1476        // Check there is no batch currently proposed.
1477        assert!(primary.proposed_batch.read().is_none());
1478
1479        // Generate a solution and a transaction.
1480        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1481        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1482
1483        // Store it on one of the workers.
1484        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1485        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1486
1487        // Try to propose a batch again. This time, it should succeed.
1488        assert!(primary.propose_batch().await.is_ok());
1489
1490        // AlexZ: proposed_batch is no longer written to when processing new batches...
1491        // assert!(primary.proposed_batch.read().is_some());
1492    }
1493
1494    #[tokio::test]
1495    async fn test_propose_batch_with_no_transmissions() {
1496        let mut rng = TestRng::default();
1497        let (primary, _) = primary_without_handlers(&mut rng).await;
1498
1499        // Check there is no batch currently proposed.
1500        assert!(primary.proposed_batch.read().is_none());
1501
1502        // Try to propose a batch with no transmissions.
1503        assert!(primary.propose_batch().await.is_ok());
1504
1505        // AlexZ: proposed_batch is no longer written to when processing new batches...
1506        // assert!(primary.proposed_batch.read().is_some());
1507    }
1508
1509    #[tokio::test]
1510    async fn test_propose_batch_in_round() {
1511        let round = 3;
1512        let mut rng = TestRng::default();
1513        let (primary, accounts) = primary_without_handlers(&mut rng).await;
1514
1515        // Fill primary storage.
1516        store_certificate_chain(&primary, &accounts, round, &mut rng);
1517
1518        // Sleep for a while to ensure the primary is ready to propose the next round.
1519        tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
1520
1521        // Generate a solution and a transaction.
1522        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1523        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1524
1525        // Store it on one of the workers.
1526        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1527        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1528
1529        // Propose a batch again. This time, it should succeed.
1530        assert!(primary.propose_batch().await.is_ok());
1531
1532        // AlexZ: proposed_batch is no longer written to when processing new batches...
1533        // assert!(primary.proposed_batch.read().is_some());
1534    }
1535
1536    #[tokio::test]
1537    async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
1538        let round = 3;
1539        let mut rng = TestRng::default();
1540        let (primary, _) = primary_without_handlers(&mut rng).await;
1541
1542        // Check there is no batch currently proposed.
1543        assert!(primary.proposed_batch.read().is_none());
1544
1545        // Generate a solution and a transaction.
1546        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1547        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1548
1549        // Store it on one of the workers.
1550        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1551        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1552
1553        // Set the proposal lock to a round ahead of the storage.
1554        let old_proposal_lock_round = *primary.propose_lock.lock().await;
1555        *primary.propose_lock.lock().await = round + 1;
1556
1557        // Propose a batch and enforce that it fails.
1558        assert!(primary.propose_batch().await.is_ok());
1559        assert!(primary.proposed_batch.read().is_none());
1560
1561        // Set the proposal lock back to the old round.
1562        *primary.propose_lock.lock().await = old_proposal_lock_round;
1563
1564        // Try to propose a batch again. This time, it should succeed.
1565        assert!(primary.propose_batch().await.is_ok());
1566
1567        // AlexZ: proposed_batch is no longer written to when processing new batches...
1568        // assert!(primary.proposed_batch.read().is_some());
1569    }
1570
1571    #[tokio::test]
1572    async fn test_propose_batch_with_storage_round_behind_proposal() {
1573        let round = 5;
1574        let mut rng = TestRng::default();
1575        let (primary, accounts) = primary_without_handlers(&mut rng).await;
1576
1577        // Generate previous certificates.
1578        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
1579
1580        // Create a valid proposal.
1581        let timestamp = now();
1582        let proposal = create_test_proposal(
1583            &primary.account,
1584            primary.ledger.current_committee().unwrap(),
1585            round + 1,
1586            previous_certificates,
1587            timestamp,
1588            &mut rng,
1589        );
1590
1591        // Store the proposal on the primary.
1592        *primary.proposed_batch.write() = Some(proposal);
1593
1594        // Try to propose a batch will terminate early because the storage is behind the proposal.
1595        assert!(primary.propose_batch().await.is_ok());
1596        assert!(primary.proposed_batch.read().is_some());
1597        assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
1598    }
1599}