amareleo_node_bft/
primary.rs

1// Copyright 2024 Aleo Network Foundation
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::{
17    MAX_BATCH_DELAY_IN_MS,
18    MAX_WORKERS,
19    MIN_BATCH_DELAY_IN_SECS,
20    Sync,
21    Worker,
22    helpers::{
23        BFTSender,
24        PrimaryReceiver,
25        PrimarySender,
26        Proposal,
27        ProposalCache,
28        SignedProposals,
29        Storage,
30        assign_to_worker,
31        assign_to_workers,
32        fmt_id,
33        now,
34    },
35    spawn_blocking,
36};
37use amareleo_chain_account::Account;
38use amareleo_node_bft_ledger_service::LedgerService;
39use amareleo_node_sync::DUMMY_SELF_IP;
40use snarkvm::{
41    console::{prelude::*, types::Address},
42    ledger::{
43        block::Transaction,
44        narwhal::{BatchCertificate, BatchHeader, Data, Transmission, TransmissionID},
45        puzzle::{Solution, SolutionID},
46    },
47    prelude::{Signature, committee::Committee},
48};
49
50use aleo_std::StorageMode;
51use colored::Colorize;
52use futures::stream::{FuturesUnordered, StreamExt};
53use indexmap::IndexMap;
54use parking_lot::{Mutex, RwLock};
55
56use rand::SeedableRng;
57use rand_chacha::ChaChaRng;
58use snarkvm::console::account::PrivateKey;
59
60use std::{
61    collections::{HashMap, HashSet},
62    future::Future,
63    net::SocketAddr,
64    sync::Arc,
65    time::Duration,
66};
67use tokio::{
68    sync::{Mutex as TMutex, OnceCell},
69    task::JoinHandle,
70};
71
72/// A helper type for an optional proposed batch.
73pub type ProposedBatch<N> = RwLock<Option<Proposal<N>>>;
74
75#[derive(Clone)]
76pub struct Primary<N: Network> {
77    /// The sync module.
78    sync: Sync<N>,
79    /// Account
80    account: Account<N>,
81    /// The storage.
82    storage: Storage<N>,
83    /// Preserve the chain state on shutdown
84    keep_state: bool,
85    /// The storage mode.
86    storage_mode: StorageMode,
87    /// The ledger service.
88    ledger: Arc<dyn LedgerService<N>>,
89    /// The workers.
90    workers: Arc<[Worker<N>]>,
91    /// The BFT sender.
92    bft_sender: Arc<OnceCell<BFTSender<N>>>,
93    /// The batch proposal, if the primary is currently proposing a batch.
94    proposed_batch: Arc<ProposedBatch<N>>,
95    /// The timestamp of the most recent proposed batch.
96    latest_proposed_batch_timestamp: Arc<RwLock<i64>>,
97    /// The recently-signed batch proposals.
98    signed_proposals: Arc<RwLock<SignedProposals<N>>>,
99    /// The spawned handles.
100    handles: Arc<Mutex<Vec<JoinHandle<()>>>>,
101    /// The lock for propose_batch.
102    propose_lock: Arc<TMutex<u64>>,
103}
104
105impl<N: Network> Primary<N> {
106    /// The maximum number of unconfirmed transmissions to send to the primary.
107    pub const MAX_TRANSMISSIONS_TOLERANCE: usize = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH * 2;
108
109    /// Initializes a new primary instance.
110    pub fn new(
111        account: Account<N>,
112        storage: Storage<N>,
113        keep_state: bool,
114        storage_mode: StorageMode,
115        ledger: Arc<dyn LedgerService<N>>,
116    ) -> Result<Self> {
117        // Initialize the sync module.
118        let sync = Sync::new(storage.clone(), ledger.clone());
119
120        // Initialize the primary instance.
121        Ok(Self {
122            sync,
123            account,
124            storage,
125            keep_state,
126            storage_mode,
127            ledger,
128            workers: Arc::from(vec![]),
129            bft_sender: Default::default(),
130            proposed_batch: Default::default(),
131            latest_proposed_batch_timestamp: Default::default(),
132            signed_proposals: Default::default(),
133            handles: Default::default(),
134            propose_lock: Default::default(),
135        })
136    }
137
138    /// Load the proposal cache file and update the Primary state with the stored data.
139    async fn load_proposal_cache(&self) -> Result<()> {
140        // Fetch the signed proposals from the file system if it exists.
141        match ProposalCache::<N>::exists(self.keep_state, &self.storage_mode) {
142            // If the proposal cache exists, then process the proposal cache.
143            true => {
144                match ProposalCache::<N>::load(self.account.address(), self.keep_state, &self.storage_mode) {
145                    Ok(proposal_cache) => {
146                        // Extract the proposal and signed proposals.
147                        let (latest_certificate_round, proposed_batch, signed_proposals, pending_certificates) =
148                            proposal_cache.into();
149
150                        // Write the proposed batch.
151                        *self.proposed_batch.write() = proposed_batch;
152                        // Write the signed proposals.
153                        *self.signed_proposals.write() = signed_proposals;
154                        // Writ the propose lock.
155                        *self.propose_lock.lock().await = latest_certificate_round;
156
157                        // Update the storage with the pending certificates.
158                        for certificate in pending_certificates {
159                            let batch_id = certificate.batch_id();
160                            // We use a dummy IP because the node should not need to request from any peers.
161                            // The storage should have stored all the transmissions. If not, we simply
162                            // skip the certificate.
163                            if let Err(err) =
164                                self.sync_with_certificate_from_peer::<true>(DUMMY_SELF_IP, certificate).await
165                            {
166                                warn!(
167                                    "Failed to load stored certificate {} from proposal cache - {err}",
168                                    fmt_id(batch_id)
169                                );
170                            }
171                        }
172                        Ok(())
173                    }
174                    Err(err) => {
175                        bail!("Failed to read the signed proposals from the file system - {err}.");
176                    }
177                }
178            }
179            // If the proposal cache does not exist, then return early.
180            false => Ok(()),
181        }
182    }
183
184    /// Run the primary instance.
185    pub async fn run(
186        &mut self,
187        bft_sender: Option<BFTSender<N>>,
188        _primary_sender: PrimarySender<N>,
189        primary_receiver: PrimaryReceiver<N>,
190    ) -> Result<()> {
191        info!("Starting the primary instance of the memory pool...");
192
193        // Set the BFT sender.
194        if let Some(bft_sender) = &bft_sender {
195            // Set the BFT sender in the primary.
196            self.bft_sender.set(bft_sender.clone()).expect("BFT sender already set");
197        }
198
199        // Construct a map for the workers.
200        let mut workers = Vec::new();
201        // Initialize the workers.
202        for id in 0..MAX_WORKERS {
203            // Construct the worker instance.
204            let worker = Worker::new(id, self.storage.clone(), self.ledger.clone(), self.proposed_batch.clone())?;
205
206            // Add the worker to the list of workers.
207            workers.push(worker);
208        }
209        // Set the workers.
210        self.workers = Arc::from(workers);
211
212        // Next, initialize the sync module and sync the storage from ledger.
213        self.sync.initialize(bft_sender).await?;
214        // Next, load and process the proposal cache before running the sync module.
215        self.load_proposal_cache().await?;
216        // Next, run the sync module.
217        self.sync.run().await?;
218        // Lastly, start the primary handlers.
219        // Note: This ensures the primary does not start communicating before syncing is complete.
220        self.start_handlers(primary_receiver);
221
222        Ok(())
223    }
224
225    /// Returns the current round.
226    pub fn current_round(&self) -> u64 {
227        self.storage.current_round()
228    }
229
230    /// Returns `true` if the primary is synced.
231    pub fn is_synced(&self) -> bool {
232        self.sync.is_synced()
233    }
234
235    /// Returns the account of the node.
236    pub const fn account(&self) -> &Account<N> {
237        &self.account
238    }
239
240    /// Returns the storage.
241    pub const fn storage(&self) -> &Storage<N> {
242        &self.storage
243    }
244
245    /// Returns the ledger.
246    pub const fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
247        &self.ledger
248    }
249
250    /// Returns the number of workers.
251    pub fn num_workers(&self) -> u8 {
252        u8::try_from(self.workers.len()).expect("Too many workers")
253    }
254
255    /// Returns the workers.
256    pub const fn workers(&self) -> &Arc<[Worker<N>]> {
257        &self.workers
258    }
259
260    /// Returns the batch proposal of our primary, if one currently exists.
261    pub fn proposed_batch(&self) -> &Arc<ProposedBatch<N>> {
262        &self.proposed_batch
263    }
264}
265
266impl<N: Network> Primary<N> {
267    /// Returns the number of unconfirmed transmissions.
268    pub fn num_unconfirmed_transmissions(&self) -> usize {
269        self.workers.iter().map(|worker| worker.num_transmissions()).sum()
270    }
271
272    /// Returns the number of unconfirmed ratifications.
273    pub fn num_unconfirmed_ratifications(&self) -> usize {
274        self.workers.iter().map(|worker| worker.num_ratifications()).sum()
275    }
276
277    /// Returns the number of solutions.
278    pub fn num_unconfirmed_solutions(&self) -> usize {
279        self.workers.iter().map(|worker| worker.num_solutions()).sum()
280    }
281
282    /// Returns the number of unconfirmed transactions.
283    pub fn num_unconfirmed_transactions(&self) -> usize {
284        self.workers.iter().map(|worker| worker.num_transactions()).sum()
285    }
286}
287
288impl<N: Network> Primary<N> {
289    /// Returns the worker transmission IDs.
290    pub fn worker_transmission_ids(&self) -> impl '_ + Iterator<Item = TransmissionID<N>> {
291        self.workers.iter().flat_map(|worker| worker.transmission_ids())
292    }
293
294    /// Returns the worker transmissions.
295    pub fn worker_transmissions(&self) -> impl '_ + Iterator<Item = (TransmissionID<N>, Transmission<N>)> {
296        self.workers.iter().flat_map(|worker| worker.transmissions())
297    }
298
299    /// Returns the worker solutions.
300    pub fn worker_solutions(&self) -> impl '_ + Iterator<Item = (SolutionID<N>, Data<Solution<N>>)> {
301        self.workers.iter().flat_map(|worker| worker.solutions())
302    }
303
304    /// Returns the worker transactions.
305    pub fn worker_transactions(&self) -> impl '_ + Iterator<Item = (N::TransactionID, Data<Transaction<N>>)> {
306        self.workers.iter().flat_map(|worker| worker.transactions())
307    }
308}
309
310impl<N: Network> Primary<N> {
311    /// Clears the worker solutions.
312    pub fn clear_worker_solutions(&self) {
313        self.workers.iter().for_each(Worker::clear_solutions);
314    }
315}
316
317impl<N: Network> Primary<N> {
318    pub async fn propose_batch(&self) -> Result<()> {
319        let mut rng = ChaChaRng::seed_from_u64(1234567890u64);
320        let mut all_acc: Vec<Account<N>> = Vec::new();
321
322        for _ in 0u64..4u64 {
323            let private_key = PrivateKey::<N>::new(&mut rng)?;
324            let acc = Account::<N>::try_from(private_key).expect("Failed to initialize account with private key");
325            all_acc.push(acc);
326        }
327
328        // Submit proposal for validator with id 0
329        let primary_addr = all_acc[0].address();
330        let other_acc: Vec<&Account<N>> = all_acc.iter().filter(|acc| acc.address() != primary_addr).collect();
331
332        let round = self.propose_batch_lite(&other_acc).await?;
333        if round == 0u64 {
334            return Ok(());
335        }
336
337        // Submit empty proposals for other validators
338        for vid in 1..all_acc.len() {
339            let primary_acc = &all_acc[vid];
340            let other_acc: Vec<&Account<N>> =
341                all_acc.iter().filter(|acc| acc.address() != primary_acc.address()).collect();
342
343            self.fake_proposal(vid.try_into().unwrap(), primary_acc, &other_acc, round).await?;
344        }
345        Ok(())
346    }
347
348    pub async fn propose_batch_lite(&self, other_acc: &[&Account<N>]) -> Result<u64> {
349        // This function isn't re-entrant.
350        let mut lock_guard = self.propose_lock.lock().await;
351
352        // Retrieve the current round.
353        let round = self.current_round();
354        // Compute the previous round.
355        let previous_round = round.saturating_sub(1);
356
357        // If the current round is 0, return early.
358        ensure!(round > 0, "Round 0 cannot have transaction batches");
359
360        // If the current storage round is below the latest proposal round, then return early.
361        if round < *lock_guard {
362            warn!("Cannot propose a batch for round {round} - the latest proposal cache round is {}", *lock_guard);
363            return Ok(0u64);
364        }
365
366        #[cfg(feature = "metrics")]
367        metrics::gauge(metrics::bft::PROPOSAL_ROUND, round as f64);
368
369        // Ensure that the primary does not create a new proposal too quickly.
370        if let Err(e) = self.check_proposal_timestamp(previous_round, self.account.address(), now()) {
371            debug!("Primary is safely skipping a batch proposal - {}", format!("{e}").dimmed());
372            return Ok(0u64);
373        }
374
375        // Ensure the primary has not proposed a batch for this round before.
376        if self.storage.contains_certificate_in_round_from(round, self.account.address()) {
377            // If a BFT sender was provided, attempt to advance the current round.
378            if let Some(bft_sender) = self.bft_sender.get() {
379                match bft_sender.send_primary_round_to_bft(self.current_round()).await {
380                    // 'is_ready' is true if the primary is ready to propose a batch for the next round.
381                    Ok(true) => (), // continue,
382                    // 'is_ready' is false if the primary is not ready to propose a batch for the next round.
383                    Ok(false) => return Ok(0u64),
384                    // An error occurred while attempting to advance the current round.
385                    Err(e) => {
386                        warn!("Failed to update the BFT to the next round - {e}");
387                        return Err(e);
388                    }
389                }
390            }
391            debug!("Primary is safely skipping {}", format!("(round {round} was already certified)").dimmed());
392            return Ok(0u64);
393        }
394
395        // Determine if the current round has been proposed.
396        // Note: Do NOT make this judgment in advance before rebroadcast and round update. Rebroadcasting is
397        // good for network reliability and should not be prevented for the already existing proposed_batch.
398        // If a certificate already exists for the current round, an attempt should be made to advance the
399        // round as early as possible.
400        if round == *lock_guard {
401            warn!("Primary is safely skipping a batch proposal - round {round} already proposed");
402            return Ok(0u64);
403        }
404
405        // Retrieve the committee to check against.
406        let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
407        // Check if the primary is connected to enough validators to reach quorum threshold.
408        {
409            // Retrieve the connected validator addresses.
410            let mut connected_validators: HashSet<Address<N>> = other_acc.iter().map(|acc| acc.address()).collect();
411
412            // Append the primary to the set.
413            connected_validators.insert(self.account.address());
414
415            // If quorum threshold is not reached, return early.
416            if !committee_lookback.is_quorum_threshold_reached(&connected_validators) {
417                debug!(
418                    "Primary is safely skipping a batch proposal {}",
419                    "(please connect to more validators)".dimmed()
420                );
421                trace!("Primary is connected to {} validators", connected_validators.len() - 1);
422                return Ok(0u64);
423            }
424        }
425
426        // Retrieve the previous certificates.
427        let previous_certificates = self.storage.get_certificates_for_round(previous_round);
428
429        // Check if the batch is ready to be proposed.
430        // Note: The primary starts at round 1, and round 0 contains no certificates, by definition.
431        let mut is_ready = previous_round == 0;
432        // If the previous round is not 0, check if the previous certificates have reached the quorum threshold.
433        if previous_round > 0 {
434            // Retrieve the committee lookback for the round.
435            let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
436                bail!("Cannot propose a batch for round {round}: the committee lookback is not known yet")
437            };
438            // Construct a set over the authors.
439            let authors = previous_certificates.iter().map(BatchCertificate::author).collect();
440            // Check if the previous certificates have reached the quorum threshold.
441            if previous_committee_lookback.is_quorum_threshold_reached(&authors) {
442                is_ready = true;
443            }
444        }
445        // If the batch is not ready to be proposed, return early.
446        if !is_ready {
447            debug!(
448                "Primary is safely skipping a batch proposal {}",
449                format!("(previous round {previous_round} has not reached quorum)").dimmed()
450            );
451            return Ok(0u64);
452        }
453
454        // Determined the required number of transmissions per worker.
455        let num_transmissions_per_worker = BatchHeader::<N>::MAX_TRANSMISSIONS_PER_BATCH / self.num_workers() as usize;
456        // Initialize the map of transmissions.
457        let mut transmissions: IndexMap<_, _> = Default::default();
458        // Take the transmissions from the workers.
459        for worker in self.workers.iter() {
460            // Initialize a tracker for included transmissions for the current worker.
461            let mut num_transmissions_included_for_worker = 0;
462            // Keep draining the worker until the desired number of transmissions is reached or the worker is empty.
463            'outer: while num_transmissions_included_for_worker < num_transmissions_per_worker {
464                // Determine the number of remaining transmissions for the worker.
465                let num_remaining_transmissions =
466                    num_transmissions_per_worker.saturating_sub(num_transmissions_included_for_worker);
467                // Drain the worker.
468                let mut worker_transmissions = worker.drain(num_remaining_transmissions).peekable();
469                // If the worker is empty, break early.
470                if worker_transmissions.peek().is_none() {
471                    break 'outer;
472                }
473                // Iterate through the worker transmissions.
474                'inner: for (id, transmission) in worker_transmissions {
475                    // Check if the ledger already contains the transmission.
476                    if self.ledger.contains_transmission(&id).unwrap_or(true) {
477                        trace!("Proposing - Skipping transmission '{}' - Already in ledger", fmt_id(id));
478                        continue 'inner;
479                    }
480                    // Check if the storage already contain the transmission.
481                    // Note: We do not skip if this is the first transmission in the proposal, to ensure that
482                    // the primary does not propose a batch with no transmissions.
483                    if !transmissions.is_empty() && self.storage.contains_transmission(id) {
484                        trace!("Proposing - Skipping transmission '{}' - Already in storage", fmt_id(id));
485                        continue 'inner;
486                    }
487                    // Check the transmission is still valid.
488                    match (id, transmission.clone()) {
489                        (TransmissionID::Solution(solution_id, checksum), Transmission::Solution(solution)) => {
490                            // Ensure the checksum matches.
491                            match solution.to_checksum::<N>() {
492                                Ok(solution_checksum) if solution_checksum == checksum => (),
493                                _ => {
494                                    trace!(
495                                        "Proposing - Skipping solution '{}' - Checksum mismatch",
496                                        fmt_id(solution_id)
497                                    );
498                                    continue 'inner;
499                                }
500                            }
501                            // Check if the solution is still valid.
502                            if let Err(e) = self.ledger.check_solution_basic(solution_id, solution).await {
503                                trace!("Proposing - Skipping solution '{}' - {e}", fmt_id(solution_id));
504                                continue 'inner;
505                            }
506                        }
507                        (
508                            TransmissionID::Transaction(transaction_id, checksum),
509                            Transmission::Transaction(transaction),
510                        ) => {
511                            // Ensure the checksum matches.
512                            match transaction.to_checksum::<N>() {
513                                Ok(transaction_checksum) if transaction_checksum == checksum => (),
514                                _ => {
515                                    trace!(
516                                        "Proposing - Skipping transaction '{}' - Checksum mismatch",
517                                        fmt_id(transaction_id)
518                                    );
519                                    continue 'inner;
520                                }
521                            }
522                            // Check if the transaction is still valid.
523                            if let Err(e) = self.ledger.check_transaction_basic(transaction_id, transaction).await {
524                                trace!("Proposing - Skipping transaction '{}' - {e}", fmt_id(transaction_id));
525                                continue 'inner;
526                            }
527                        }
528                        // Note: We explicitly forbid including ratifications,
529                        // as the protocol currently does not support ratifications.
530                        (TransmissionID::Ratification, Transmission::Ratification) => continue,
531                        // All other combinations are clearly invalid.
532                        _ => continue 'inner,
533                    }
534                    // Insert the transmission into the map.
535                    transmissions.insert(id, transmission);
536                    num_transmissions_included_for_worker += 1;
537                }
538            }
539        }
540
541        // Determine the current timestamp.
542        let current_timestamp = now();
543
544        *lock_guard = round;
545
546        /* Proceeding to sign & propose the batch. */
547        info!("Proposing a batch with {} transmissions for round {round}...", transmissions.len());
548
549        // Retrieve the private key.
550        let private_key = *self.account.private_key();
551        // Retrieve the committee ID.
552        let committee_id = committee_lookback.id();
553        // Prepare the transmission IDs.
554        let transmission_ids = transmissions.keys().copied().collect();
555        // Prepare the previous batch certificate IDs.
556        let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
557        // Sign the batch header and construct the proposal.
558        let (batch_header, mut proposal) = spawn_blocking!(BatchHeader::new(
559            &private_key,
560            round,
561            current_timestamp,
562            committee_id,
563            transmission_ids,
564            previous_certificate_ids,
565            &mut rand::thread_rng()
566        ))
567        .and_then(|batch_header| {
568            Proposal::new(committee_lookback.clone(), batch_header.clone(), transmissions.clone())
569                .map(|proposal| (batch_header, proposal))
570        })
571        .inspect_err(|_| {
572            // On error, reinsert the transmissions and then propagate the error.
573            if let Err(e) = self.reinsert_transmissions_into_workers(transmissions) {
574                error!("Failed to reinsert transmissions: {e:?}");
575            }
576        })?;
577        // Set the timestamp of the latest proposed batch.
578        *self.latest_proposed_batch_timestamp.write() = proposal.timestamp();
579
580        // // Set the proposed batch.
581        // *self.proposed_batch.write() = Some(proposal);
582
583        //===============================================================================
584        // Processing proposal
585
586        info!("Quorum threshold reached - Preparing to certify our batch for round {round}...");
587
588        // Retrieve the batch ID.
589        let batch_id = batch_header.batch_id();
590
591        // Forge signatures of other validators.
592        for acc in other_acc.iter() {
593            // Sign the batch ID.
594            let signer_acc = (*acc).clone();
595            let signer = signer_acc.address();
596            let signature = spawn_blocking!(signer_acc.sign(&[batch_id], &mut rand::thread_rng()))?;
597
598            // Add the signature to the batch.
599            proposal.add_signature(signer, signature, &committee_lookback)?;
600        }
601
602        // Store the certified batch and broadcast it to all validators.
603        // If there was an error storing the certificate, reinsert the transmissions back into the ready queue.
604        if let Err(e) = self.store_and_broadcast_certificate_lite(&proposal, &committee_lookback).await {
605            // Reinsert the transmissions back into the ready queue for the next proposal.
606            self.reinsert_transmissions_into_workers(proposal.into_transmissions())?;
607            return Err(e);
608        }
609
610        #[cfg(feature = "metrics")]
611        metrics::increment_gauge(metrics::bft::CERTIFIED_BATCHES, 1.0);
612        Ok(round)
613    }
614
615    pub async fn fake_proposal(
616        &self,
617        vid: u64,
618        primary_acc: &Account<N>,
619        other_acc: &[&Account<N>],
620        round: u64,
621    ) -> Result<()> {
622        let transmissions: IndexMap<_, _> = Default::default();
623        let transmission_ids = transmissions.keys().copied().collect();
624
625        let private_key = *primary_acc.private_key();
626        let current_timestamp = now();
627
628        let committee_lookback = self.ledger.get_committee_lookback_for_round(round)?;
629        let committee_id = committee_lookback.id();
630
631        let previous_round = round.saturating_sub(1);
632        let previous_certificates = self.storage.get_certificates_for_round(previous_round);
633        let previous_certificate_ids = previous_certificates.into_iter().map(|c| c.id()).collect();
634
635        let (batch_header, mut proposal) = spawn_blocking!(BatchHeader::new(
636            &private_key,
637            round,
638            current_timestamp,
639            committee_id,
640            transmission_ids,
641            previous_certificate_ids,
642            &mut rand::thread_rng()
643        ))
644        .and_then(|batch_header| {
645            Proposal::new(committee_lookback.clone(), batch_header.clone(), transmissions.clone())
646                .map(|proposal| (batch_header, proposal))
647        })?;
648
649        // Retrieve the batch ID.
650        let batch_id = batch_header.batch_id();
651        let mut our_sign: Option<Signature<N>> = None;
652
653        // Forge signatures of other validators.
654        for acc in other_acc.iter() {
655            // Sign the batch ID.
656            let signer_acc = (*acc).clone();
657            let signer = signer_acc.address();
658            let signature = spawn_blocking!(signer_acc.sign(&[batch_id], &mut rand::thread_rng()))?;
659
660            if signer == self.account.address() {
661                our_sign = Some(signature);
662            }
663
664            // Add the signature to the batch.
665            proposal.add_signature(signer, signature, &committee_lookback)?;
666        }
667
668        // Ensure our signature was not inserted (validator 0 signature)
669        let our_sign = match our_sign {
670            Some(sign) => sign,
671            None => bail!("Fake Proposal generation failed. Validator 0 signature missing."),
672        };
673
674        // Create the batch certificate and transmissions.
675        let (certificate, transmissions) =
676            tokio::task::block_in_place(|| proposal.to_certificate(&committee_lookback))?;
677
678        // Convert the transmissions into a HashMap.
679        // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
680        let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
681
682        // Store the certified batch.
683        let (storage, certificate_) = (self.storage.clone(), certificate.clone());
684        spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
685        info!("Stored a batch certificate for validator/round {vid}/{round}");
686
687        match self.signed_proposals.write().0.entry(primary_acc.address()) {
688            std::collections::hash_map::Entry::Occupied(mut entry) => {
689                // If the validator has already signed a batch for this round, then return early,
690                // since, if the peer still has not received the signature, they will request it again,
691                // and the logic at the start of this function will resend the (now cached) signature
692                // to the peer if asked to sign this batch proposal again.
693                if entry.get().0 == round {
694                    return Ok(());
695                }
696                // Otherwise, cache the round, batch ID, and signature for this validator.
697                entry.insert((round, batch_id, our_sign));
698                info!("Inserted signature to signed_proposals {vid}/{round}");
699            }
700            // If the validator has not signed a batch before, then continue.
701            std::collections::hash_map::Entry::Vacant(entry) => {
702                // Cache the round, batch ID, and signature for this validator.
703                entry.insert((round, batch_id, our_sign));
704                info!("Inserted signature to signed_proposals {vid}/{round}");
705            }
706        };
707
708        if let Some(bft_sender) = self.bft_sender.get() {
709            // Send the certificate to the BFT.
710            if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
711                warn!("Failed to update the BFT DAG from sync: {e}");
712                return Err(e);
713            };
714        }
715
716        Ok(())
717    }
718}
719
720impl<N: Network> Primary<N> {
721    /// Starts the primary handlers.
722    fn start_handlers(&self, primary_receiver: PrimaryReceiver<N>) {
723        let PrimaryReceiver {
724            rx_batch_propose: _,
725            rx_batch_signature: _,
726            rx_batch_certified: _,
727            rx_primary_ping: _,
728            mut rx_unconfirmed_solution,
729            mut rx_unconfirmed_transaction,
730        } = primary_receiver;
731
732        // Start the batch proposer.
733        let self_ = self.clone();
734        self.spawn(async move {
735            loop {
736                // Sleep briefly, but longer than if there were no batch.
737                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
738                // If the primary is not synced, then do not propose a batch.
739                if !self_.is_synced() {
740                    debug!("Skipping batch proposal {}", "(node is syncing)".dimmed());
741                    continue;
742                }
743                // A best-effort attempt to skip the scheduled batch proposal if
744                // round progression already triggered one.
745                if self_.propose_lock.try_lock().is_err() {
746                    trace!("Skipping batch proposal {}", "(node is already proposing)".dimmed());
747                    continue;
748                };
749                // If there is no proposed batch, attempt to propose a batch.
750                // Note: Do NOT spawn a task around this function call. Proposing a batch is a critical path,
751                // and only one batch needs be proposed at a time.
752                if let Err(e) = self_.propose_batch().await {
753                    warn!("Cannot propose a batch - {e}");
754                }
755            }
756        });
757
758        // Periodically try to increment to the next round.
759        // Note: This is necessary to ensure that the primary is not stuck on a previous round
760        // despite having received enough certificates to advance to the next round.
761        let self_ = self.clone();
762        self.spawn(async move {
763            loop {
764                // Sleep briefly.
765                tokio::time::sleep(Duration::from_millis(MAX_BATCH_DELAY_IN_MS)).await;
766                // If the primary is not synced, then do not increment to the next round.
767                if !self_.is_synced() {
768                    trace!("Skipping round increment {}", "(node is syncing)".dimmed());
769                    continue;
770                }
771                // Attempt to increment to the next round.
772                let next_round = self_.current_round().saturating_add(1);
773                // Determine if the quorum threshold is reached for the current round.
774                let is_quorum_threshold_reached = {
775                    // Retrieve the certificate authors for the next round.
776                    let authors = self_.storage.get_certificate_authors_for_round(next_round);
777                    // If there are no certificates, then skip this check.
778                    if authors.is_empty() {
779                        continue;
780                    }
781                    let Ok(committee_lookback) = self_.ledger.get_committee_lookback_for_round(next_round) else {
782                        warn!("Failed to retrieve the committee lookback for round {next_round}");
783                        continue;
784                    };
785                    committee_lookback.is_quorum_threshold_reached(&authors)
786                };
787                // Attempt to increment to the next round if the quorum threshold is reached.
788                if is_quorum_threshold_reached {
789                    debug!("Quorum threshold reached for round {}", next_round);
790                    if let Err(e) = self_.try_increment_to_the_next_round(next_round).await {
791                        warn!("Failed to increment to the next round - {e}");
792                    }
793                }
794            }
795        });
796
797        // Process the unconfirmed solutions.
798        let self_ = self.clone();
799        self.spawn(async move {
800            while let Some((solution_id, solution, callback)) = rx_unconfirmed_solution.recv().await {
801                // Compute the checksum for the solution.
802                let Ok(checksum) = solution.to_checksum::<N>() else {
803                    error!("Failed to compute the checksum for the unconfirmed solution");
804                    continue;
805                };
806                // Compute the worker ID.
807                let Ok(worker_id) = assign_to_worker((solution_id, checksum), self_.num_workers()) else {
808                    error!("Unable to determine the worker ID for the unconfirmed solution");
809                    continue;
810                };
811                let self_ = self_.clone();
812                tokio::spawn(async move {
813                    // Retrieve the worker.
814                    let worker = &self_.workers[worker_id as usize];
815                    // Process the unconfirmed solution.
816                    let result = worker.process_unconfirmed_solution(solution_id, solution).await;
817                    // Send the result to the callback.
818                    callback.send(result).ok();
819                });
820            }
821        });
822
823        // Process the unconfirmed transactions.
824        let self_ = self.clone();
825        self.spawn(async move {
826            while let Some((transaction_id, transaction, callback)) = rx_unconfirmed_transaction.recv().await {
827                trace!("Primary - Received an unconfirmed transaction '{}'", fmt_id(transaction_id));
828                // Compute the checksum for the transaction.
829                let Ok(checksum) = transaction.to_checksum::<N>() else {
830                    error!("Failed to compute the checksum for the unconfirmed transaction");
831                    continue;
832                };
833                // Compute the worker ID.
834                let Ok(worker_id) = assign_to_worker::<N>((&transaction_id, &checksum), self_.num_workers()) else {
835                    error!("Unable to determine the worker ID for the unconfirmed transaction");
836                    continue;
837                };
838                let self_ = self_.clone();
839                tokio::spawn(async move {
840                    // Retrieve the worker.
841                    let worker = &self_.workers[worker_id as usize];
842                    // Process the unconfirmed transaction.
843                    let result = worker.process_unconfirmed_transaction(transaction_id, transaction).await;
844                    // Send the result to the callback.
845                    callback.send(result).ok();
846                });
847            }
848        });
849    }
850
851    /// Increments to the next round.
852    async fn try_increment_to_the_next_round(&self, next_round: u64) -> Result<()> {
853        // If the next round is within GC range, then iterate to the penultimate round.
854        if self.current_round() + self.storage.max_gc_rounds() >= next_round {
855            let mut fast_forward_round = self.current_round();
856            // Iterate until the penultimate round is reached.
857            while fast_forward_round < next_round.saturating_sub(1) {
858                // Update to the next round in storage.
859                fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
860                // Clear the proposed batch.
861                *self.proposed_batch.write() = None;
862            }
863        }
864
865        // Retrieve the current round.
866        let current_round = self.current_round();
867        // Attempt to advance to the next round.
868        if current_round < next_round {
869            // If a BFT sender was provided, send the current round to the BFT.
870            let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
871                match bft_sender.send_primary_round_to_bft(current_round).await {
872                    Ok(is_ready) => is_ready,
873                    Err(e) => {
874                        warn!("Failed to update the BFT to the next round - {e}");
875                        return Err(e);
876                    }
877                }
878            }
879            // Otherwise, handle the Narwhal case.
880            else {
881                // Update to the next round in storage.
882                self.storage.increment_to_next_round(current_round)?;
883                // Set 'is_ready' to 'true'.
884                true
885            };
886
887            // Log whether the next round is ready.
888            match is_ready {
889                true => debug!("Primary is ready to propose the next round"),
890                false => debug!("Primary is not ready to propose the next round"),
891            }
892
893            // If the node is ready, propose a batch for the next round.
894            if is_ready {
895                self.propose_batch().await?;
896            }
897        }
898        Ok(())
899    }
900
901    /// Increments to the next round.
902    async fn try_increment_to_the_next_round_lite(&self, next_round: u64) -> Result<()> {
903        // If the next round is within GC range, then iterate to the penultimate round.
904        if self.current_round() + self.storage.max_gc_rounds() >= next_round {
905            let mut fast_forward_round = self.current_round();
906            // Iterate until the penultimate round is reached.
907            while fast_forward_round < next_round.saturating_sub(1) {
908                // Update to the next round in storage.
909                fast_forward_round = self.storage.increment_to_next_round(fast_forward_round)?;
910                // Clear the proposed batch.
911                *self.proposed_batch.write() = None;
912            }
913        }
914
915        // Retrieve the current round.
916        let current_round = self.current_round();
917        // Attempt to advance to the next round.
918        if current_round < next_round {
919            // If a BFT sender was provided, send the current round to the BFT.
920            let is_ready = if let Some(bft_sender) = self.bft_sender.get() {
921                match bft_sender.send_primary_round_to_bft(current_round).await {
922                    Ok(is_ready) => is_ready,
923                    Err(e) => {
924                        warn!("Failed to update the BFT to the next round - {e}");
925                        return Err(e);
926                    }
927                }
928            }
929            // Otherwise, handle the Narwhal case.
930            else {
931                // Update to the next round in storage.
932                self.storage.increment_to_next_round(current_round)?;
933                // Set 'is_ready' to 'true'.
934                true
935            };
936
937            // Log whether the next round is ready.
938            match is_ready {
939                true => debug!("Primary is ready to propose the next round"),
940                false => debug!("Primary is not ready to propose the next round"),
941            }
942
943            // // If the node is ready, propose a batch for the next round.
944            // if is_ready {
945            //     self.propose_batch().await?;
946            // }
947        }
948        Ok(())
949    }
950
951    /// Ensure the primary is not creating batch proposals too frequently.
952    /// This checks that the certificate timestamp for the previous round is within the expected range.
953    fn check_proposal_timestamp(&self, previous_round: u64, author: Address<N>, timestamp: i64) -> Result<()> {
954        // Retrieve the timestamp of the previous timestamp to check against.
955        let previous_timestamp = match self.storage.get_certificate_for_round_with_author(previous_round, author) {
956            // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
957            Some(certificate) => certificate.timestamp(),
958            None => *self.latest_proposed_batch_timestamp.read(),
959        };
960
961        // Determine the elapsed time since the previous timestamp.
962        let elapsed = timestamp
963            .checked_sub(previous_timestamp)
964            .ok_or_else(|| anyhow!("Timestamp cannot be before the previous certificate at round {previous_round}"))?;
965        // Ensure that the previous certificate was created at least `MIN_BATCH_DELAY_IN_MS` seconds ago.
966        match elapsed < MIN_BATCH_DELAY_IN_SECS as i64 {
967            true => bail!("Timestamp is too soon after the previous certificate at round {previous_round}"),
968            false => Ok(()),
969        }
970    }
971
972    /// Stores the certified batch and broadcasts it to all validators, returning the certificate.
973    async fn store_and_broadcast_certificate_lite(
974        &self,
975        proposal: &Proposal<N>,
976        committee: &Committee<N>,
977    ) -> Result<()> {
978        // Create the batch certificate and transmissions.
979        let (certificate, transmissions) = tokio::task::block_in_place(|| proposal.to_certificate(committee))?;
980        // Convert the transmissions into a HashMap.
981        // Note: Do not change the `Proposal` to use a HashMap. The ordering there is necessary for safety.
982        let transmissions = transmissions.into_iter().collect::<HashMap<_, _>>();
983        // Store the certified batch.
984        let (storage, certificate_) = (self.storage.clone(), certificate.clone());
985        spawn_blocking!(storage.insert_certificate(certificate_, transmissions, Default::default()))?;
986        debug!("Stored a batch certificate for round {}", certificate.round());
987        // If a BFT sender was provided, send the certificate to the BFT.
988        if let Some(bft_sender) = self.bft_sender.get() {
989            // Await the callback to continue.
990            if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate.clone()).await {
991                warn!("Failed to update the BFT DAG from primary - {e}");
992                return Err(e);
993            };
994        }
995        // Log the certified batch.
996        let num_transmissions = certificate.transmission_ids().len();
997        let round = certificate.round();
998        info!("\n\nOur batch with {num_transmissions} transmissions for round {round} was certified!\n");
999        // Increment to the next round.
1000        self.try_increment_to_the_next_round_lite(round + 1).await
1001    }
1002
1003    /// Stores the certified batch and broadcasts it to all validators, returning the certificate.
1004    /// Re-inserts the transmissions from the proposal into the workers.
1005    fn reinsert_transmissions_into_workers(
1006        &self,
1007        transmissions: IndexMap<TransmissionID<N>, Transmission<N>>,
1008    ) -> Result<()> {
1009        // Re-insert the transmissions into the workers.
1010        assign_to_workers(&self.workers, transmissions.into_iter(), |worker, transmission_id, transmission| {
1011            worker.reinsert(transmission_id, transmission);
1012        })
1013    }
1014
1015    /// Recursively stores a given batch certificate, after ensuring:
1016    ///   - Ensure the round matches the committee round.
1017    ///   - Ensure the address is a member of the committee.
1018    ///   - Ensure the timestamp is within range.
1019    ///   - Ensure we have all of the transmissions.
1020    ///   - Ensure we have all of the previous certificates.
1021    ///   - Ensure the previous certificates are for the previous round (i.e. round - 1).
1022    ///   - Ensure the previous certificates have reached the quorum threshold.
1023    ///   - Ensure we have not already signed the batch ID.
1024    #[async_recursion::async_recursion]
1025    async fn sync_with_certificate_from_peer<const IS_SYNCING: bool>(
1026        &self,
1027        peer_ip: SocketAddr,
1028        certificate: BatchCertificate<N>,
1029    ) -> Result<()> {
1030        // Retrieve the batch header.
1031        let batch_header = certificate.batch_header();
1032        // Retrieve the batch round.
1033        let batch_round = batch_header.round();
1034
1035        // If the certificate round is outdated, do not store it.
1036        if batch_round <= self.storage.gc_round() {
1037            return Ok(());
1038        }
1039        // If the certificate already exists in storage, return early.
1040        if self.storage.contains_certificate(certificate.id()) {
1041            return Ok(());
1042        }
1043
1044        // If node is not in sync mode and the node is not synced. Then return an error.
1045        if !IS_SYNCING && !self.is_synced() {
1046            bail!(
1047                "Failed to process certificate `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1048                fmt_id(certificate.id())
1049            );
1050        }
1051
1052        // If the peer is ahead, use the batch header to sync up to the peer.
1053        let missing_transmissions = self.sync_with_batch_header_from_peer::<IS_SYNCING>(peer_ip, batch_header).await?;
1054
1055        // Check if the certificate needs to be stored.
1056        if !self.storage.contains_certificate(certificate.id()) {
1057            // Store the batch certificate.
1058            let (storage, certificate_) = (self.storage.clone(), certificate.clone());
1059            spawn_blocking!(storage.insert_certificate(certificate_, missing_transmissions, Default::default()))?;
1060            debug!("Stored a batch certificate for round {batch_round} from '{peer_ip}'");
1061            // If a BFT sender was provided, send the round and certificate to the BFT.
1062            if let Some(bft_sender) = self.bft_sender.get() {
1063                // Send the certificate to the BFT.
1064                if let Err(e) = bft_sender.send_primary_certificate_to_bft(certificate).await {
1065                    warn!("Failed to update the BFT DAG from sync: {e}");
1066                    return Err(e);
1067                };
1068            }
1069        }
1070        Ok(())
1071    }
1072
1073    /// Recursively syncs using the given batch header.
1074    async fn sync_with_batch_header_from_peer<const IS_SYNCING: bool>(
1075        &self,
1076        peer_ip: SocketAddr,
1077        batch_header: &BatchHeader<N>,
1078    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1079        // Retrieve the batch round.
1080        let batch_round = batch_header.round();
1081
1082        // If the certificate round is outdated, do not store it.
1083        if batch_round <= self.storage.gc_round() {
1084            bail!("Round {batch_round} is too far in the past")
1085        }
1086
1087        // If node is not in sync mode and the node is not synced. Then return an error.
1088        if !IS_SYNCING && !self.is_synced() {
1089            bail!(
1090                "Failed to process batch header `{}` at round {batch_round} from '{peer_ip}' (node is syncing)",
1091                fmt_id(batch_header.batch_id())
1092            );
1093        }
1094
1095        // Determine if quorum threshold is reached on the batch round.
1096        let is_quorum_threshold_reached = {
1097            let authors = self.storage.get_certificate_authors_for_round(batch_round);
1098            let committee_lookback = self.ledger.get_committee_lookback_for_round(batch_round)?;
1099            committee_lookback.is_quorum_threshold_reached(&authors)
1100        };
1101
1102        // Check if our primary should move to the next round.
1103        // Note: Checking that quorum threshold is reached is important for mitigating a race condition,
1104        // whereby Narwhal requires N-f, however the BFT only requires f+1. Without this check, the primary
1105        // will advance to the next round assuming f+1, not N-f, which can lead to a network stall.
1106        let is_behind_schedule = is_quorum_threshold_reached && batch_round > self.current_round();
1107        // Check if our primary is far behind the peer.
1108        let is_peer_far_in_future = batch_round > self.current_round() + self.storage.max_gc_rounds();
1109        // If our primary is far behind the peer, update our committee to the batch round.
1110        if is_behind_schedule || is_peer_far_in_future {
1111            // If the batch round is greater than the current committee round, update the committee.
1112            self.try_increment_to_the_next_round(batch_round).await?;
1113        }
1114
1115        // Ensure the primary has all of the transmissions.
1116        let missing_transmissions = self.fetch_missing_transmissions(batch_header).await.map_err(|e| {
1117            anyhow!("Failed to fetch missing transmissions for round {batch_round} from '{peer_ip}' - {e}")
1118        })?;
1119
1120        Ok(missing_transmissions)
1121    }
1122
1123    /// Fetches any missing transmissions for the specified batch header.
1124    /// If a transmission does not exist, it will be fetched from the specified peer IP.
1125    async fn fetch_missing_transmissions(
1126        &self,
1127        batch_header: &BatchHeader<N>,
1128    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
1129        // If the round is <= the GC round, return early.
1130        if batch_header.round() <= self.storage.gc_round() {
1131            return Ok(Default::default());
1132        }
1133
1134        // Ensure this batch ID is new, otherwise return early.
1135        if self.storage.contains_batch(batch_header.batch_id()) {
1136            trace!("Batch for round {} from peer has already been processed", batch_header.round());
1137            return Ok(Default::default());
1138        }
1139
1140        // Retrieve the workers.
1141        let workers = self.workers.clone();
1142
1143        // Initialize a list for the transmissions.
1144        let mut fetch_transmissions = FuturesUnordered::new();
1145
1146        // Retrieve the number of workers.
1147        let num_workers = self.num_workers();
1148        // Iterate through the transmission IDs.
1149        for transmission_id in batch_header.transmission_ids() {
1150            // If the transmission does not exist in storage, proceed to fetch the transmission.
1151            if !self.storage.contains_transmission(*transmission_id) {
1152                // Determine the worker ID.
1153                let Ok(worker_id) = assign_to_worker(*transmission_id, num_workers) else {
1154                    bail!("Unable to assign transmission ID '{transmission_id}' to a worker")
1155                };
1156                // Retrieve the worker.
1157                let Some(worker) = workers.get(worker_id as usize) else { bail!("Unable to find worker {worker_id}") };
1158                // Push the callback onto the list.
1159                fetch_transmissions.push(worker.get_or_fetch_transmission(*transmission_id));
1160            }
1161        }
1162
1163        // Initialize a set for the transmissions.
1164        let mut transmissions = HashMap::with_capacity(fetch_transmissions.len());
1165        // Wait for all of the transmissions to be fetched.
1166        while let Some(result) = fetch_transmissions.next().await {
1167            // Retrieve the transmission.
1168            let (transmission_id, transmission) = result?;
1169            // Insert the transmission into the set.
1170            transmissions.insert(transmission_id, transmission);
1171        }
1172        // Return the transmissions.
1173        Ok(transmissions)
1174    }
1175}
1176
1177impl<N: Network> Primary<N> {
1178    /// Spawns a task with the given future; it should only be used for long-running tasks.
1179    fn spawn<T: Future<Output = ()> + Send + 'static>(&self, future: T) {
1180        self.handles.lock().push(tokio::spawn(future));
1181    }
1182
1183    /// Shuts down the primary.
1184    pub async fn shut_down(&self) {
1185        info!("Shutting down the primary...");
1186        // Abort the tasks.
1187        self.handles.lock().iter().for_each(|handle| handle.abort());
1188
1189        // Save the current proposal cache to disk.
1190        if self.keep_state {
1191            let proposal_cache = {
1192                let proposal = self.proposed_batch.write().take();
1193                let signed_proposals = self.signed_proposals.read().clone();
1194                let latest_round = proposal.as_ref().map(Proposal::round).unwrap_or(*self.propose_lock.lock().await);
1195                let pending_certificates = self.storage.get_pending_certificates();
1196                ProposalCache::new(latest_round, proposal, signed_proposals, pending_certificates)
1197            };
1198
1199            if let Err(err) = proposal_cache.store(self.keep_state, &self.storage_mode) {
1200                error!("Failed to store the current proposal cache: {err}");
1201            }
1202        }
1203    }
1204}
1205
1206#[cfg(test)]
1207mod tests {
1208    use super::*;
1209    use amareleo_node_bft_ledger_service::MockLedgerService;
1210    use amareleo_node_bft_storage_service::BFTMemoryService;
1211    use snarkvm::{
1212        console::types::Field,
1213        ledger::committee::{Committee, MIN_VALIDATOR_STAKE},
1214        prelude::{Address, Signature},
1215    };
1216
1217    use bytes::Bytes;
1218    use indexmap::IndexSet;
1219    use rand::RngCore;
1220
1221    type CurrentNetwork = snarkvm::prelude::MainnetV0;
1222
1223    // Returns a primary and a list of accounts in the configured committee.
1224    async fn primary_without_handlers(
1225        rng: &mut TestRng,
1226    ) -> (Primary<CurrentNetwork>, Vec<(SocketAddr, Account<CurrentNetwork>)>) {
1227        // Create a committee containing the primary's account.
1228        let (accounts, committee) = {
1229            const COMMITTEE_SIZE: usize = 4;
1230            let mut accounts = Vec::with_capacity(COMMITTEE_SIZE);
1231            let mut members = IndexMap::new();
1232
1233            for i in 0..COMMITTEE_SIZE {
1234                let socket_addr = format!("127.0.0.1:{}", 5000 + i).parse().unwrap();
1235                let account = Account::new(rng).unwrap();
1236                members.insert(account.address(), (MIN_VALIDATOR_STAKE, true, rng.gen_range(0..100)));
1237                accounts.push((socket_addr, account));
1238            }
1239
1240            (accounts, Committee::<CurrentNetwork>::new(1, members).unwrap())
1241        };
1242
1243        let account = accounts.first().unwrap().1.clone();
1244        let ledger = Arc::new(MockLedgerService::new(committee));
1245        let storage = Storage::new(ledger.clone(), Arc::new(BFTMemoryService::new()), 10);
1246
1247        // Initialize the primary.
1248        let mut primary = Primary::new(account, storage, false, StorageMode::Development(0), ledger).unwrap();
1249
1250        // Construct a worker instance.
1251        primary.workers = Arc::from([Worker::new(
1252            0, // id
1253            primary.storage.clone(),
1254            primary.ledger.clone(),
1255            primary.proposed_batch.clone(),
1256        )
1257        .unwrap()]);
1258
1259        (primary, accounts)
1260    }
1261
1262    // Creates a mock solution.
1263    fn sample_unconfirmed_solution(rng: &mut TestRng) -> (SolutionID<CurrentNetwork>, Data<Solution<CurrentNetwork>>) {
1264        // Sample a random fake solution ID.
1265        let solution_id = rng.gen::<u64>().into();
1266        // Vary the size of the solutions.
1267        let size = rng.gen_range(1024..10 * 1024);
1268        // Sample random fake solution bytes.
1269        let mut vec = vec![0u8; size];
1270        rng.fill_bytes(&mut vec);
1271        let solution = Data::Buffer(Bytes::from(vec));
1272        // Return the solution ID and solution.
1273        (solution_id, solution)
1274    }
1275
1276    // Creates a mock transaction.
1277    fn sample_unconfirmed_transaction(
1278        rng: &mut TestRng,
1279    ) -> (<CurrentNetwork as Network>::TransactionID, Data<Transaction<CurrentNetwork>>) {
1280        // Sample a random fake transaction ID.
1281        let id = Field::<CurrentNetwork>::rand(rng).into();
1282        // Vary the size of the transactions.
1283        let size = rng.gen_range(1024..10 * 1024);
1284        // Sample random fake transaction bytes.
1285        let mut vec = vec![0u8; size];
1286        rng.fill_bytes(&mut vec);
1287        let transaction = Data::Buffer(Bytes::from(vec));
1288        // Return the ID and transaction.
1289        (id, transaction)
1290    }
1291
1292    // Creates a batch proposal with one solution and one transaction.
1293    fn create_test_proposal(
1294        author: &Account<CurrentNetwork>,
1295        committee: Committee<CurrentNetwork>,
1296        round: u64,
1297        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1298        timestamp: i64,
1299        rng: &mut TestRng,
1300    ) -> Proposal<CurrentNetwork> {
1301        let (solution_id, solution) = sample_unconfirmed_solution(rng);
1302        let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1303        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1304        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1305
1306        let solution_transmission_id = (solution_id, solution_checksum).into();
1307        let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1308
1309        // Retrieve the private key.
1310        let private_key = author.private_key();
1311        // Prepare the transmission IDs.
1312        let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
1313        let transmissions = [
1314            (solution_transmission_id, Transmission::Solution(solution)),
1315            (transaction_transmission_id, Transmission::Transaction(transaction)),
1316        ]
1317        .into();
1318        // Sign the batch header.
1319        let batch_header = BatchHeader::new(
1320            private_key,
1321            round,
1322            timestamp,
1323            committee.id(),
1324            transmission_ids,
1325            previous_certificate_ids,
1326            rng,
1327        )
1328        .unwrap();
1329        // Construct the proposal.
1330        Proposal::new(committee, batch_header, transmissions).unwrap()
1331    }
1332
1333    /// Creates a signature of the batch ID for each committee member (excluding the primary).
1334    fn peer_signatures_for_batch(
1335        primary_address: Address<CurrentNetwork>,
1336        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1337        batch_id: Field<CurrentNetwork>,
1338        rng: &mut TestRng,
1339    ) -> IndexSet<Signature<CurrentNetwork>> {
1340        let mut signatures = IndexSet::new();
1341        for (_, account) in accounts {
1342            if account.address() == primary_address {
1343                continue;
1344            }
1345            let signature = account.sign(&[batch_id], rng).unwrap();
1346            signatures.insert(signature);
1347        }
1348        signatures
1349    }
1350
1351    // Creates a batch certificate.
1352    fn create_batch_certificate(
1353        primary_address: Address<CurrentNetwork>,
1354        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1355        round: u64,
1356        previous_certificate_ids: IndexSet<Field<CurrentNetwork>>,
1357        rng: &mut TestRng,
1358    ) -> (BatchCertificate<CurrentNetwork>, HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>) {
1359        let timestamp = now();
1360
1361        let author =
1362            accounts.iter().find(|&(_, acct)| acct.address() == primary_address).map(|(_, acct)| acct.clone()).unwrap();
1363        let private_key = author.private_key();
1364
1365        let committee_id = Field::rand(rng);
1366        let (solution_id, solution) = sample_unconfirmed_solution(rng);
1367        let (transaction_id, transaction) = sample_unconfirmed_transaction(rng);
1368        let solution_checksum = solution.to_checksum::<CurrentNetwork>().unwrap();
1369        let transaction_checksum = transaction.to_checksum::<CurrentNetwork>().unwrap();
1370
1371        let solution_transmission_id = (solution_id, solution_checksum).into();
1372        let transaction_transmission_id = (&transaction_id, &transaction_checksum).into();
1373
1374        let transmission_ids = [solution_transmission_id, transaction_transmission_id].into();
1375        let transmissions = [
1376            (solution_transmission_id, Transmission::Solution(solution)),
1377            (transaction_transmission_id, Transmission::Transaction(transaction)),
1378        ]
1379        .into();
1380
1381        let batch_header = BatchHeader::new(
1382            private_key,
1383            round,
1384            timestamp,
1385            committee_id,
1386            transmission_ids,
1387            previous_certificate_ids,
1388            rng,
1389        )
1390        .unwrap();
1391        let signatures = peer_signatures_for_batch(primary_address, accounts, batch_header.batch_id(), rng);
1392        let certificate = BatchCertificate::<CurrentNetwork>::from(batch_header, signatures).unwrap();
1393        (certificate, transmissions)
1394    }
1395
1396    // Create a certificate chain up to round in primary storage.
1397    fn store_certificate_chain(
1398        primary: &Primary<CurrentNetwork>,
1399        accounts: &[(SocketAddr, Account<CurrentNetwork>)],
1400        round: u64,
1401        rng: &mut TestRng,
1402    ) -> IndexSet<Field<CurrentNetwork>> {
1403        let mut previous_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1404        let mut next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1405        for cur_round in 1..round {
1406            for (_, account) in accounts.iter() {
1407                let (certificate, transmissions) = create_batch_certificate(
1408                    account.address(),
1409                    accounts,
1410                    cur_round,
1411                    previous_certificates.clone(),
1412                    rng,
1413                );
1414                next_certificates.insert(certificate.id());
1415                assert!(primary.storage.insert_certificate(certificate, transmissions, Default::default()).is_ok());
1416            }
1417
1418            assert!(primary.storage.increment_to_next_round(cur_round).is_ok());
1419            previous_certificates = next_certificates;
1420            next_certificates = IndexSet::<Field<CurrentNetwork>>::new();
1421        }
1422
1423        previous_certificates
1424    }
1425
1426    #[tokio::test]
1427    async fn test_propose_batch() {
1428        let mut rng = TestRng::default();
1429        let (primary, _) = primary_without_handlers(&mut rng).await;
1430
1431        // Check there is no batch currently proposed.
1432        assert!(primary.proposed_batch.read().is_none());
1433
1434        // Generate a solution and a transaction.
1435        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1436        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1437
1438        // Store it on one of the workers.
1439        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1440        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1441
1442        // Try to propose a batch again. This time, it should succeed.
1443        assert!(primary.propose_batch().await.is_ok());
1444
1445        // AlexZ: proposed_batch is no longer written to when processing new batches...
1446        // assert!(primary.proposed_batch.read().is_some());
1447    }
1448
1449    #[tokio::test]
1450    async fn test_propose_batch_with_no_transmissions() {
1451        let mut rng = TestRng::default();
1452        let (primary, _) = primary_without_handlers(&mut rng).await;
1453
1454        // Check there is no batch currently proposed.
1455        assert!(primary.proposed_batch.read().is_none());
1456
1457        // Try to propose a batch with no transmissions.
1458        assert!(primary.propose_batch().await.is_ok());
1459
1460        // AlexZ: proposed_batch is no longer written to when processing new batches...
1461        // assert!(primary.proposed_batch.read().is_some());
1462    }
1463
1464    #[tokio::test]
1465    async fn test_propose_batch_in_round() {
1466        let round = 3;
1467        let mut rng = TestRng::default();
1468        let (primary, accounts) = primary_without_handlers(&mut rng).await;
1469
1470        // Fill primary storage.
1471        store_certificate_chain(&primary, &accounts, round, &mut rng);
1472
1473        // Sleep for a while to ensure the primary is ready to propose the next round.
1474        tokio::time::sleep(Duration::from_secs(MIN_BATCH_DELAY_IN_SECS)).await;
1475
1476        // Generate a solution and a transaction.
1477        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1478        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1479
1480        // Store it on one of the workers.
1481        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1482        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1483
1484        // Propose a batch again. This time, it should succeed.
1485        assert!(primary.propose_batch().await.is_ok());
1486
1487        // AlexZ: proposed_batch is no longer written to when processing new batches...
1488        // assert!(primary.proposed_batch.read().is_some());
1489    }
1490
1491    #[tokio::test]
1492    async fn test_propose_batch_with_storage_round_behind_proposal_lock() {
1493        let round = 3;
1494        let mut rng = TestRng::default();
1495        let (primary, _) = primary_without_handlers(&mut rng).await;
1496
1497        // Check there is no batch currently proposed.
1498        assert!(primary.proposed_batch.read().is_none());
1499
1500        // Generate a solution and a transaction.
1501        let (solution_id, solution) = sample_unconfirmed_solution(&mut rng);
1502        let (transaction_id, transaction) = sample_unconfirmed_transaction(&mut rng);
1503
1504        // Store it on one of the workers.
1505        primary.workers[0].process_unconfirmed_solution(solution_id, solution).await.unwrap();
1506        primary.workers[0].process_unconfirmed_transaction(transaction_id, transaction).await.unwrap();
1507
1508        // Set the proposal lock to a round ahead of the storage.
1509        let old_proposal_lock_round = *primary.propose_lock.lock().await;
1510        *primary.propose_lock.lock().await = round + 1;
1511
1512        // Propose a batch and enforce that it fails.
1513        assert!(primary.propose_batch().await.is_ok());
1514        assert!(primary.proposed_batch.read().is_none());
1515
1516        // Set the proposal lock back to the old round.
1517        *primary.propose_lock.lock().await = old_proposal_lock_round;
1518
1519        // Try to propose a batch again. This time, it should succeed.
1520        assert!(primary.propose_batch().await.is_ok());
1521
1522        // AlexZ: proposed_batch is no longer written to when processing new batches...
1523        // assert!(primary.proposed_batch.read().is_some());
1524    }
1525
1526    #[tokio::test]
1527    async fn test_propose_batch_with_storage_round_behind_proposal() {
1528        let round = 5;
1529        let mut rng = TestRng::default();
1530        let (primary, accounts) = primary_without_handlers(&mut rng).await;
1531
1532        // Generate previous certificates.
1533        let previous_certificates = store_certificate_chain(&primary, &accounts, round, &mut rng);
1534
1535        // Create a valid proposal.
1536        let timestamp = now();
1537        let proposal = create_test_proposal(
1538            &primary.account,
1539            primary.ledger.current_committee().unwrap(),
1540            round + 1,
1541            previous_certificates,
1542            timestamp,
1543            &mut rng,
1544        );
1545
1546        // Store the proposal on the primary.
1547        *primary.proposed_batch.write() = Some(proposal);
1548
1549        // Try to propose a batch will terminate early because the storage is behind the proposal.
1550        assert!(primary.propose_batch().await.is_ok());
1551        assert!(primary.proposed_batch.read().is_some());
1552        assert!(primary.proposed_batch.read().as_ref().unwrap().round() > primary.current_round());
1553    }
1554}