snarkos_node_bft/helpers/
storage.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::helpers::{check_timestamp_for_liveness, fmt_id};
17use snarkos_node_bft_ledger_service::LedgerService;
18use snarkos_node_bft_storage_service::StorageService;
19use snarkvm::{
20    ledger::{
21        block::{Block, Transaction},
22        narwhal::{BatchCertificate, BatchHeader, Transmission, TransmissionID},
23    },
24    prelude::{Address, Field, Network, Result, anyhow, bail, ensure},
25    utilities::{cfg_into_iter, cfg_iter, cfg_sorted_by},
26};
27
28use indexmap::{IndexMap, IndexSet, map::Entry};
29#[cfg(feature = "locktick")]
30use locktick::parking_lot::RwLock;
31use lru::LruCache;
32#[cfg(not(feature = "locktick"))]
33use parking_lot::RwLock;
34#[cfg(not(feature = "serial"))]
35use rayon::prelude::*;
36use std::{
37    collections::{HashMap, HashSet},
38    num::NonZeroUsize,
39    sync::{
40        Arc,
41        atomic::{AtomicU32, AtomicU64, Ordering},
42    },
43};
44
45#[derive(Clone, Debug)]
46pub struct Storage<N: Network>(Arc<StorageInner<N>>);
47
48impl<N: Network> std::ops::Deref for Storage<N> {
49    type Target = Arc<StorageInner<N>>;
50
51    fn deref(&self) -> &Self::Target {
52        &self.0
53    }
54}
55
56/// The storage for the memory pool.
57///
58/// The storage is used to store the following:
59/// - `current_height` tracker.
60/// - `current_round` tracker.
61/// - `round` to `(certificate ID, batch ID, author)` entries.
62/// - `certificate ID` to `certificate` entries.
63/// - `batch ID` to `round` entries.
64/// - `transmission ID` to `(transmission, certificate IDs)` entries.
65///
66/// The chain of events is as follows:
67/// 1. A `transmission` is received.
68/// 2. After a `batch` is ready to be stored:
69///   - The `certificate` is inserted, triggering updates to the
70///     `rounds`, `certificates`, `batch_ids`, and `transmissions` maps.
71///   - The missing `transmissions` from storage are inserted into the `transmissions` map.
72///   - The certificate ID is inserted into the `transmissions` map.
73/// 3. After a `round` reaches quorum threshold:
74///  - The next round is inserted into the `current_round`.
75#[derive(Debug)]
76pub struct StorageInner<N: Network> {
77    /// The ledger service.
78    ledger: Arc<dyn LedgerService<N>>,
79    /* Once per block */
80    /// The current height.
81    current_height: AtomicU32,
82    /* Once per round */
83    /// The current round.
84    ///
85    /// Invariant: current_round > 0.
86    /// This is established in [`Storage::new`], which sets it to at least 1 via [`Storage::update_current_round`].
87    /// The only callers of [`Storage::update_current_round`] are
88    /// [`Storage::increment_to_next_round`] and [`Storage::sync_round_with_block`],
89    /// both of which set it to at least 1.
90    current_round: AtomicU64,
91    /// The `round` for which garbage collection has occurred **up to** (inclusive).
92    gc_round: AtomicU64,
93    /// The maximum number of rounds to keep in storage.
94    max_gc_rounds: u64,
95    /* Once per batch */
96    /// The map of `round` to a list of `(certificate ID, author)` entries.
97    rounds: RwLock<IndexMap<u64, IndexSet<(Field<N>, Address<N>)>>>,
98    /// A cache of `certificate ID` to unprocessed `certificate`.
99    unprocessed_certificates: RwLock<LruCache<Field<N>, BatchCertificate<N>>>,
100    /// The map of `certificate ID` to `certificate`.
101    certificates: RwLock<IndexMap<Field<N>, BatchCertificate<N>>>,
102    /// The map of `certificate ID` to `round`.
103    batch_ids: RwLock<IndexMap<Field<N>, u64>>,
104    /// The map of `transmission ID` to `(transmission, certificate IDs)` entries.
105    transmissions: Arc<dyn StorageService<N>>,
106}
107
108impl<N: Network> Storage<N> {
109    /// Initializes a new instance of storage.
110    pub fn new(
111        ledger: Arc<dyn LedgerService<N>>,
112        transmissions: Arc<dyn StorageService<N>>,
113        max_gc_rounds: u64,
114    ) -> Self {
115        // Retrieve the latest committee bonded in the ledger
116        // (genesis committee if the ledger contains only the genesis block).
117        let committee = ledger.current_committee().expect("Ledger is missing a committee.");
118        // Retrieve the round at which that committee was created, or 1 if it is the genesis committee.
119        let current_round = committee.starting_round().max(1);
120        // Set the unprocessed certificates cache size.
121        let unprocessed_cache_size = NonZeroUsize::new((N::LATEST_MAX_CERTIFICATES().unwrap() * 2) as usize).unwrap();
122
123        // Create the storage.
124        let storage = Self(Arc::new(StorageInner {
125            ledger,
126            current_height: Default::default(),
127            current_round: Default::default(),
128            gc_round: Default::default(),
129            max_gc_rounds,
130            rounds: Default::default(),
131            unprocessed_certificates: RwLock::new(LruCache::new(unprocessed_cache_size)),
132            certificates: Default::default(),
133            batch_ids: Default::default(),
134            transmissions,
135        }));
136        // Update the storage to the current round.
137        storage.update_current_round(current_round);
138        // Perform GC on the current round.
139        // Since there are no certificates yet, this only sets `gc_round`.
140        storage.garbage_collect_certificates(current_round);
141        // Return the storage.
142        storage
143    }
144}
145
146impl<N: Network> Storage<N> {
147    /// Returns the current height.
148    pub fn current_height(&self) -> u32 {
149        // Get the current height.
150        self.current_height.load(Ordering::SeqCst)
151    }
152}
153
154impl<N: Network> Storage<N> {
155    /// Returns the current round.
156    pub fn current_round(&self) -> u64 {
157        // Get the current round.
158        self.current_round.load(Ordering::SeqCst)
159    }
160
161    /// Returns the `round` that garbage collection has occurred **up to** (inclusive).
162    pub fn gc_round(&self) -> u64 {
163        // Get the GC round.
164        self.gc_round.load(Ordering::SeqCst)
165    }
166
167    /// Returns the maximum number of rounds to keep in storage.
168    pub fn max_gc_rounds(&self) -> u64 {
169        self.max_gc_rounds
170    }
171
172    /// Increments storage to the next round, updating the current round.
173    /// Note: This method is only called once per round, upon certification of the primary's batch.
174    pub fn increment_to_next_round(&self, current_round: u64) -> Result<u64> {
175        // Determine the next round.
176        let next_round = current_round + 1;
177
178        // Check if the next round is less than the current round in storage.
179        {
180            // Retrieve the storage round.
181            let storage_round = self.current_round();
182            // If the next round is less than the current round in storage, return early with the storage round.
183            if next_round < storage_round {
184                return Ok(storage_round);
185            }
186        }
187
188        // Retrieve the current committee.
189        let current_committee = self.ledger.current_committee()?;
190        // Retrieve the current committee's starting round.
191        let starting_round = current_committee.starting_round();
192        // If the primary is behind the current committee's starting round, sync with the latest block.
193        if next_round < starting_round {
194            // Retrieve the latest block round.
195            let latest_block_round = self.ledger.latest_round();
196            // Log the round sync.
197            info!(
198                "Syncing primary round ({next_round}) with the current committee's starting round ({starting_round}). Syncing with the latest block round {latest_block_round}..."
199            );
200            // Sync the round with the latest block.
201            self.sync_round_with_block(latest_block_round);
202            // Return the latest block round.
203            return Ok(latest_block_round);
204        }
205
206        // Update the storage to the next round.
207        self.update_current_round(next_round);
208
209        #[cfg(feature = "metrics")]
210        metrics::gauge(metrics::bft::LAST_STORED_ROUND, next_round as f64);
211
212        // Retrieve the storage round.
213        let storage_round = self.current_round();
214        // Retrieve the GC round.
215        let gc_round = self.gc_round();
216        // Ensure the next round matches in storage.
217        ensure!(next_round == storage_round, "The next round {next_round} does not match in storage ({storage_round})");
218        // Ensure the next round is greater than or equal to the GC round.
219        ensure!(next_round >= gc_round, "The next round {next_round} is behind the GC round {gc_round}");
220
221        // Log the updated round.
222        info!("Starting round {next_round}...");
223        Ok(next_round)
224    }
225
226    /// Updates the storage to the next round.
227    fn update_current_round(&self, next_round: u64) {
228        // Update the current round.
229        self.current_round.store(next_round, Ordering::SeqCst);
230    }
231
232    /// Update the storage by performing garbage collection based on the next round.
233    pub(crate) fn garbage_collect_certificates(&self, next_round: u64) {
234        // Fetch the current GC round.
235        let current_gc_round = self.gc_round();
236        // Compute the next GC round.
237        let next_gc_round = next_round.saturating_sub(self.max_gc_rounds);
238        // Check if storage needs to be garbage collected.
239        if next_gc_round > current_gc_round {
240            // Remove the GC round(s) from storage.
241            for gc_round in current_gc_round..=next_gc_round {
242                // Iterate over the certificates for the GC round.
243                for id in self.get_certificate_ids_for_round(gc_round).into_iter() {
244                    // Remove the certificate from storage.
245                    self.remove_certificate(id);
246                }
247            }
248            // Update the GC round.
249            self.gc_round.store(next_gc_round, Ordering::SeqCst);
250        }
251    }
252}
253
254impl<N: Network> Storage<N> {
255    /// Returns `true` if the storage contains the specified `round`.
256    pub fn contains_certificates_for_round(&self, round: u64) -> bool {
257        // Check if the round exists in storage.
258        self.rounds.read().contains_key(&round)
259    }
260
261    /// Returns `true` if the storage contains the specified `certificate ID`.
262    pub fn contains_certificate(&self, certificate_id: Field<N>) -> bool {
263        // Check if the certificate ID exists in storage.
264        self.certificates.read().contains_key(&certificate_id)
265    }
266
267    /// Returns `true` if the storage contains a certificate from the specified `author` in the given `round`.
268    pub fn contains_certificate_in_round_from(&self, round: u64, author: Address<N>) -> bool {
269        self.rounds.read().get(&round).is_some_and(|set| set.iter().any(|(_, a)| a == &author))
270    }
271
272    /// Returns `true` if the storage contains the specified `certificate ID`.
273    pub fn contains_unprocessed_certificate(&self, certificate_id: Field<N>) -> bool {
274        // Check if the certificate ID exists in storage.
275        self.unprocessed_certificates.read().contains(&certificate_id)
276    }
277
278    /// Returns `true` if the storage contains the specified `batch ID`.
279    pub fn contains_batch(&self, batch_id: Field<N>) -> bool {
280        // Check if the batch ID exists in storage.
281        self.batch_ids.read().contains_key(&batch_id)
282    }
283
284    /// Returns `true` if the storage contains the specified `transmission ID`.
285    pub fn contains_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> bool {
286        self.transmissions.contains_transmission(transmission_id.into())
287    }
288
289    /// Returns the transmission for the given `transmission ID`.
290    /// If the transmission ID does not exist in storage, `None` is returned.
291    pub fn get_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> Option<Transmission<N>> {
292        self.transmissions.get_transmission(transmission_id.into())
293    }
294
295    /// Returns the round for the given `certificate ID`.
296    /// If the certificate ID does not exist in storage, `None` is returned.
297    pub fn get_round_for_certificate(&self, certificate_id: Field<N>) -> Option<u64> {
298        // Get the round.
299        self.certificates.read().get(&certificate_id).map(|certificate| certificate.round())
300    }
301
302    /// Returns the round for the given `batch ID`.
303    /// If the batch ID does not exist in storage, `None` is returned.
304    pub fn get_round_for_batch(&self, batch_id: Field<N>) -> Option<u64> {
305        // Get the round.
306        self.batch_ids.read().get(&batch_id).copied()
307    }
308
309    /// Returns the certificate round for the given `certificate ID`.
310    /// If the certificate ID does not exist in storage, `None` is returned.
311    pub fn get_certificate_round(&self, certificate_id: Field<N>) -> Option<u64> {
312        // Get the batch certificate and return the round.
313        self.certificates.read().get(&certificate_id).map(|certificate| certificate.round())
314    }
315
316    /// Returns the certificate for the given `certificate ID`.
317    /// If the certificate ID does not exist in storage, `None` is returned.
318    pub fn get_certificate(&self, certificate_id: Field<N>) -> Option<BatchCertificate<N>> {
319        // Get the batch certificate.
320        self.certificates.read().get(&certificate_id).cloned()
321    }
322
323    /// Returns the unprocessed certificate for the given `certificate ID`.
324    /// If the certificate ID does not exist in storage, `None` is returned.
325    pub fn get_unprocessed_certificate(&self, certificate_id: Field<N>) -> Option<BatchCertificate<N>> {
326        // Get the unprocessed certificate.
327        self.unprocessed_certificates.read().peek(&certificate_id).cloned()
328    }
329
330    /// Returns the certificate for the given `round` and `author`.
331    /// If the round does not exist in storage, `None` is returned.
332    /// If the author for the round does not exist in storage, `None` is returned.
333    pub fn get_certificate_for_round_with_author(&self, round: u64, author: Address<N>) -> Option<BatchCertificate<N>> {
334        // Retrieve the certificates.
335        if let Some(entries) = self.rounds.read().get(&round) {
336            let certificates = self.certificates.read();
337            entries.iter().find_map(
338                |(certificate_id, a)| if a == &author { certificates.get(certificate_id).cloned() } else { None },
339            )
340        } else {
341            Default::default()
342        }
343    }
344
345    /// Returns the certificates for the given `round`.
346    /// If the round does not exist in storage, an empty set is returned.
347    pub fn get_certificates_for_round(&self, round: u64) -> IndexSet<BatchCertificate<N>> {
348        // The genesis round does not have batch certificates.
349        if round == 0 {
350            return Default::default();
351        }
352        // Retrieve the certificates.
353        if let Some(entries) = self.rounds.read().get(&round) {
354            let certificates = self.certificates.read();
355            entries.iter().flat_map(|(certificate_id, _)| certificates.get(certificate_id).cloned()).collect()
356        } else {
357            Default::default()
358        }
359    }
360
361    /// Returns the certificate IDs for the given `round`.
362    /// If the round does not exist in storage, an empty set is returned.
363    pub fn get_certificate_ids_for_round(&self, round: u64) -> IndexSet<Field<N>> {
364        // The genesis round does not have batch certificates.
365        if round == 0 {
366            return Default::default();
367        }
368        // Retrieve the certificates.
369        if let Some(entries) = self.rounds.read().get(&round) {
370            entries.iter().map(|(certificate_id, _)| *certificate_id).collect()
371        } else {
372            Default::default()
373        }
374    }
375
376    /// Returns the certificate authors for the given `round`.
377    /// If the round does not exist in storage, an empty set is returned.
378    pub fn get_certificate_authors_for_round(&self, round: u64) -> HashSet<Address<N>> {
379        // The genesis round does not have batch certificates.
380        if round == 0 {
381            return Default::default();
382        }
383        // Retrieve the certificates.
384        if let Some(entries) = self.rounds.read().get(&round) {
385            entries.iter().map(|(_, author)| *author).collect()
386        } else {
387            Default::default()
388        }
389    }
390
391    /// Returns the certificates that have not yet been included in the ledger.
392    /// Note that the order of this set is by round and then insertion.
393    pub(crate) fn get_pending_certificates(&self) -> IndexSet<BatchCertificate<N>> {
394        // Obtain the read locks.
395        let rounds = self.rounds.read();
396        let certificates = self.certificates.read();
397
398        // Iterate over the rounds.
399        cfg_sorted_by!(rounds.clone(), |a, _, b, _| a.cmp(b))
400            .flat_map(|(_, certificates_for_round)| {
401                // Iterate over the certificates for the round.
402                cfg_into_iter!(certificates_for_round).filter_map(|(certificate_id, _)| {
403                    // Skip the certificate if it already exists in the ledger.
404                    if self.ledger.contains_certificate(&certificate_id).unwrap_or(false) {
405                        None
406                    } else {
407                        // Add the certificate to the pending certificates.
408                        certificates.get(&certificate_id).cloned()
409                    }
410                })
411            })
412            .collect()
413    }
414
415    /// Checks the given `batch_header` for validity, returning the missing transmissions from storage.
416    ///
417    /// This method ensures the following invariants:
418    /// - The batch ID does not already exist in storage.
419    /// - The author is a member of the committee for the batch round.
420    /// - The timestamp is within the allowed time range.
421    /// - None of the transmissions are from any past rounds (up to GC).
422    /// - All transmissions declared in the batch header are provided or exist in storage (up to GC).
423    /// - All previous certificates declared in the certificate exist in storage (up to GC).
424    /// - All previous certificates are for the previous round (i.e. round - 1).
425    /// - All previous certificates contain a unique author.
426    /// - The previous certificates reached the quorum threshold (N - f).
427    pub fn check_batch_header(
428        &self,
429        batch_header: &BatchHeader<N>,
430        transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
431        aborted_transmissions: HashSet<TransmissionID<N>>,
432    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
433        // Retrieve the round.
434        let round = batch_header.round();
435        // Retrieve the GC round.
436        let gc_round = self.gc_round();
437        // Construct a GC log message.
438        let gc_log = format!("(gc = {gc_round})");
439
440        // Ensure the batch ID does not already exist in storage.
441        if self.contains_batch(batch_header.batch_id()) {
442            bail!("Batch for round {round} already exists in storage {gc_log}")
443        }
444
445        // Retrieve the committee lookback for the batch round.
446        let Ok(committee_lookback) = self.ledger.get_committee_lookback_for_round(round) else {
447            bail!("Storage failed to retrieve the committee lookback for round {round} {gc_log}")
448        };
449        // Ensure the author is in the committee.
450        if !committee_lookback.is_committee_member(batch_header.author()) {
451            bail!("Author {} is not in the committee for round {round} {gc_log}", batch_header.author())
452        }
453
454        // Check the timestamp for liveness.
455        check_timestamp_for_liveness(batch_header.timestamp())?;
456
457        // Retrieve the missing transmissions in storage from the given transmissions.
458        let missing_transmissions = self
459            .transmissions
460            .find_missing_transmissions(batch_header, transmissions, aborted_transmissions)
461            .map_err(|e| anyhow!("{e} for round {round} {gc_log}"))?;
462
463        // Compute the previous round.
464        let previous_round = round.saturating_sub(1);
465        // Check if the previous round is within range of the GC round.
466        if previous_round > gc_round {
467            // Retrieve the committee lookback for the previous round.
468            let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
469                bail!("Missing committee for the previous round {previous_round} in storage {gc_log}")
470            };
471            // Ensure the previous round certificates exists in storage.
472            if !self.contains_certificates_for_round(previous_round) {
473                bail!("Missing certificates for the previous round {previous_round} in storage {gc_log}")
474            }
475            // Ensure the number of previous certificate IDs is at or below the number of committee members.
476            if batch_header.previous_certificate_ids().len() > previous_committee_lookback.num_members() {
477                bail!("Too many previous certificates for round {round} {gc_log}")
478            }
479            // Initialize a set of the previous authors.
480            let mut previous_authors = HashSet::with_capacity(batch_header.previous_certificate_ids().len());
481            // Ensure storage contains all declared previous certificates (up to GC).
482            for previous_certificate_id in batch_header.previous_certificate_ids() {
483                // Retrieve the previous certificate.
484                let Some(previous_certificate) = self.get_certificate(*previous_certificate_id) else {
485                    bail!(
486                        "Missing previous certificate '{}' for certificate in round {round} {gc_log}",
487                        fmt_id(previous_certificate_id)
488                    )
489                };
490                // Ensure the previous certificate is for the previous round.
491                if previous_certificate.round() != previous_round {
492                    bail!("Round {round} certificate contains a round {previous_round} certificate {gc_log}")
493                }
494                // Ensure the previous author is new.
495                if previous_authors.contains(&previous_certificate.author()) {
496                    bail!("Round {round} certificate contains a duplicate author {gc_log}")
497                }
498                // Insert the author of the previous certificate.
499                previous_authors.insert(previous_certificate.author());
500            }
501            // Ensure the previous certificates have reached the quorum threshold.
502            if !previous_committee_lookback.is_quorum_threshold_reached(&previous_authors) {
503                bail!("Previous certificates for a batch in round {round} did not reach quorum threshold {gc_log}")
504            }
505        }
506        Ok(missing_transmissions)
507    }
508
509    /// Check the validity of a certificate coming from another validator.
510    ///
511    /// It suffices to check that the signers (author and endorsers) are members of the applicable committee
512    /// and that they form a quorum in the committee.
513    /// Under the fundamental fault tolerance assumption of at most `f` (stake of) faulty validators,
514    /// the quorum check on signers guarantees that at least one correct validator
515    /// has ensured the validity of the proposal contained in the certificate,
516    /// either by construction (by the author) or by checking (by an endorser):
517    /// given `N > 0` total stake, and `f` the largest integer `< N/3` (where `/` is exact rational division),
518    /// we have `N >= 3f + 1`, which implies `N - f >= 2f + 1`, which is always `> f`;
519    /// `N - f` is the quorum stake.
520    pub fn check_incoming_certificate(&self, certificate: &BatchCertificate<N>) -> Result<()> {
521        // Retrieve the certificate author and round.
522        let certificate_author = certificate.author();
523        let certificate_round = certificate.round();
524
525        // Retrieve the committee lookback.
526        let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
527
528        // Ensure that the signers of the certificate reach the quorum threshold.
529        // Note that certificate.signatures() only returns the endorsing signatures, not the author's signature.
530        let mut signers: HashSet<Address<N>> =
531            certificate.signatures().map(|signature| signature.to_address()).collect();
532        signers.insert(certificate_author);
533        ensure!(
534            committee_lookback.is_quorum_threshold_reached(&signers),
535            "Certificate '{}' for round {certificate_round} does not meet quorum requirements",
536            certificate.id()
537        );
538
539        // Ensure that the signers of the certificate are in the committee.
540        cfg_iter!(&signers).try_for_each(|signer| {
541            ensure!(
542                committee_lookback.is_committee_member(*signer),
543                "Signer '{signer}' of certificate '{}' for round {certificate_round} is not in the committee",
544                certificate.id()
545            );
546            Ok(())
547        })?;
548
549        Ok(())
550    }
551
552    /// Checks the given `certificate` for validity, returning the missing transmissions from storage.
553    ///
554    /// This method ensures the following invariants:
555    /// - The certificate ID does not already exist in storage.
556    /// - The batch ID does not already exist in storage.
557    /// - The author is a member of the committee for the batch round.
558    /// - The author has not already created a certificate for the batch round.
559    /// - The timestamp is within the allowed time range.
560    /// - None of the transmissions are from any past rounds (up to GC).
561    /// - All transmissions declared in the batch header are provided or exist in storage (up to GC).
562    /// - All previous certificates declared in the certificate exist in storage (up to GC).
563    /// - All previous certificates are for the previous round (i.e. round - 1).
564    /// - The previous certificates reached the quorum threshold (N - f).
565    /// - The timestamps from the signers are all within the allowed time range.
566    /// - The signers have reached the quorum threshold (N - f).
567    pub fn check_certificate(
568        &self,
569        certificate: &BatchCertificate<N>,
570        transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
571        aborted_transmissions: HashSet<TransmissionID<N>>,
572    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
573        // Retrieve the round.
574        let round = certificate.round();
575        // Retrieve the GC round.
576        let gc_round = self.gc_round();
577        // Construct a GC log message.
578        let gc_log = format!("(gc = {gc_round})");
579
580        // Ensure the certificate ID does not already exist in storage.
581        if self.contains_certificate(certificate.id()) {
582            bail!("Certificate for round {round} already exists in storage {gc_log}")
583        }
584
585        // Ensure the storage does not already contain a certificate for this author in this round.
586        if self.contains_certificate_in_round_from(round, certificate.author()) {
587            bail!("Certificate with this author for round {round} already exists in storage {gc_log}")
588        }
589
590        // Ensure the batch header is well-formed.
591        let missing_transmissions =
592            self.check_batch_header(certificate.batch_header(), transmissions, aborted_transmissions)?;
593
594        // Check the timestamp for liveness.
595        check_timestamp_for_liveness(certificate.timestamp())?;
596
597        // Retrieve the committee lookback for the batch round.
598        let Ok(committee_lookback) = self.ledger.get_committee_lookback_for_round(round) else {
599            bail!("Storage failed to retrieve the committee for round {round} {gc_log}")
600        };
601
602        // Initialize a set of the signers.
603        let mut signers = HashSet::with_capacity(certificate.signatures().len() + 1);
604        // Append the batch author.
605        signers.insert(certificate.author());
606
607        // Iterate over the signatures.
608        for signature in certificate.signatures() {
609            // Retrieve the signer.
610            let signer = signature.to_address();
611            // Ensure the signer is in the committee.
612            if !committee_lookback.is_committee_member(signer) {
613                bail!("Signer {signer} is not in the committee for round {round} {gc_log}")
614            }
615            // Append the signer.
616            signers.insert(signer);
617        }
618
619        // Ensure the signatures have reached the quorum threshold.
620        if !committee_lookback.is_quorum_threshold_reached(&signers) {
621            bail!("Signatures for a batch in round {round} did not reach quorum threshold {gc_log}")
622        }
623        Ok(missing_transmissions)
624    }
625
626    /// Inserts the given `certificate` into storage.
627    ///
628    /// This method triggers updates to the `rounds`, `certificates`, `batch_ids`, and `transmissions` maps.
629    ///
630    /// This method ensures the following invariants:
631    /// - The certificate ID does not already exist in storage.
632    /// - The batch ID does not already exist in storage.
633    /// - All transmissions declared in the certificate are provided or exist in storage (up to GC).
634    /// - All previous certificates declared in the certificate exist in storage (up to GC).
635    /// - All previous certificates are for the previous round (i.e. round - 1).
636    /// - The previous certificates reached the quorum threshold (N - f).
637    pub fn insert_certificate(
638        &self,
639        certificate: BatchCertificate<N>,
640        transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
641        aborted_transmissions: HashSet<TransmissionID<N>>,
642    ) -> Result<()> {
643        // Ensure the certificate round is above the GC round.
644        ensure!(certificate.round() > self.gc_round(), "Certificate round is at or below the GC round");
645        // Ensure the certificate and its transmissions are valid.
646        let missing_transmissions =
647            self.check_certificate(&certificate, transmissions, aborted_transmissions.clone())?;
648        // Insert the certificate into storage.
649        self.insert_certificate_atomic(certificate, aborted_transmissions, missing_transmissions);
650        Ok(())
651    }
652
653    /// Inserts the given `certificate` into storage.
654    ///
655    /// This method assumes **all missing** transmissions are provided in the `missing_transmissions` map.
656    ///
657    /// This method triggers updates to the `rounds`, `certificates`, `batch_ids`, and `transmissions` maps.
658    fn insert_certificate_atomic(
659        &self,
660        certificate: BatchCertificate<N>,
661        aborted_transmission_ids: HashSet<TransmissionID<N>>,
662        missing_transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
663    ) {
664        // Retrieve the round.
665        let round = certificate.round();
666        // Retrieve the certificate ID.
667        let certificate_id = certificate.id();
668        // Retrieve the author of the batch.
669        let author = certificate.author();
670
671        // Insert the round to certificate ID entry.
672        self.rounds.write().entry(round).or_default().insert((certificate_id, author));
673        // Obtain the certificate's transmission ids.
674        let transmission_ids = certificate.transmission_ids().clone();
675        // Insert the certificate.
676        self.certificates.write().insert(certificate_id, certificate);
677        // Remove the unprocessed certificate.
678        self.unprocessed_certificates.write().pop(&certificate_id);
679        // Insert the batch ID.
680        self.batch_ids.write().insert(certificate_id, round);
681        // Insert the certificate ID for each of the transmissions into storage.
682        self.transmissions.insert_transmissions(
683            certificate_id,
684            transmission_ids,
685            aborted_transmission_ids,
686            missing_transmissions,
687        );
688    }
689
690    /// Inserts the given unprocessed `certificate` into storage.
691    ///
692    /// This is a temporary storage, which is cleared again when calling `insert_certificate_atomic`.
693    pub fn insert_unprocessed_certificate(&self, certificate: BatchCertificate<N>) -> Result<()> {
694        // Ensure the certificate round is above the GC round.
695        ensure!(certificate.round() > self.gc_round(), "Certificate round is at or below the GC round");
696        // Insert the certificate.
697        self.unprocessed_certificates.write().put(certificate.id(), certificate);
698
699        Ok(())
700    }
701
702    /// Removes the given `certificate ID` from storage.
703    ///
704    /// This method triggers updates to the `rounds`, `certificates`, `batch_ids`, and `transmissions` maps.
705    ///
706    /// If the certificate was successfully removed, `true` is returned.
707    /// If the certificate did not exist in storage, `false` is returned.
708    fn remove_certificate(&self, certificate_id: Field<N>) -> bool {
709        // Retrieve the certificate.
710        let Some(certificate) = self.get_certificate(certificate_id) else {
711            warn!("Certificate {certificate_id} does not exist in storage");
712            return false;
713        };
714        // Retrieve the round.
715        let round = certificate.round();
716        // Compute the author of the batch.
717        let author = certificate.author();
718
719        // TODO (howardwu): We may want to use `shift_remove` below, in order to align compatibility
720        //  with tests written to for `remove_certificate`. However, this will come with performance hits.
721        //  It will be better to write tests that compare the union of the sets.
722
723        // Update the round.
724        match self.rounds.write().entry(round) {
725            Entry::Occupied(mut entry) => {
726                // Remove the round to certificate ID entry.
727                entry.get_mut().swap_remove(&(certificate_id, author));
728                // If the round is empty, remove it.
729                if entry.get().is_empty() {
730                    entry.swap_remove();
731                }
732            }
733            Entry::Vacant(_) => {}
734        }
735        // Remove the certificate.
736        self.certificates.write().swap_remove(&certificate_id);
737        // Remove the unprocessed certificate.
738        self.unprocessed_certificates.write().pop(&certificate_id);
739        // Remove the batch ID.
740        self.batch_ids.write().swap_remove(&certificate_id);
741        // Remove the transmission entries in the certificate from storage.
742        self.transmissions.remove_transmissions(&certificate_id, certificate.transmission_ids());
743        // Return successfully.
744        true
745    }
746}
747
748impl<N: Network> Storage<N> {
749    /// Syncs the current height with the block.
750    pub(crate) fn sync_height_with_block(&self, next_height: u32) {
751        // If the block height is greater than the current height in storage, sync the height.
752        if next_height > self.current_height() {
753            // Update the current height in storage.
754            self.current_height.store(next_height, Ordering::SeqCst);
755        }
756    }
757
758    /// Syncs the current round with the block.
759    pub(crate) fn sync_round_with_block(&self, next_round: u64) {
760        // Retrieve the current round in the block.
761        let next_round = next_round.max(1);
762        // If the round in the block is greater than the current round in storage, sync the round.
763        if next_round > self.current_round() {
764            // Update the current round in storage.
765            self.update_current_round(next_round);
766            // Log the updated round.
767            info!("Synced to round {next_round}...");
768        }
769    }
770
771    /// Syncs the batch certificate with the block.
772    pub(crate) fn sync_certificate_with_block(
773        &self,
774        block: &Block<N>,
775        certificate: BatchCertificate<N>,
776        unconfirmed_transactions: &HashMap<N::TransactionID, Transaction<N>>,
777    ) {
778        // Skip if the certificate round is below the GC round.
779        let gc_round = self.gc_round();
780        if certificate.round() <= gc_round {
781            trace!("Got certificate for round {} below GC round ({gc_round}). Will not store it.", certificate.round());
782            return;
783        }
784
785        // If the certificate ID already exists in storage, skip it.
786        if self.contains_certificate(certificate.id()) {
787            trace!("Got certificate {} for round {} more than once.", certificate.id(), certificate.round());
788            return;
789        }
790        // Retrieve the transmissions for the certificate.
791        let mut missing_transmissions = HashMap::new();
792
793        // Retrieve the aborted transmissions for the certificate.
794        let mut aborted_transmissions = HashSet::new();
795
796        // Track the block's aborted solutions and transactions.
797        let aborted_solutions: IndexSet<_> = block.aborted_solution_ids().iter().collect();
798        let aborted_transactions: IndexSet<_> = block.aborted_transaction_ids().iter().collect();
799
800        // Iterate over the transmission IDs.
801        for transmission_id in certificate.transmission_ids() {
802            // If the transmission ID already exists in the map, skip it.
803            if missing_transmissions.contains_key(transmission_id) {
804                continue;
805            }
806            // If the transmission ID exists in storage, skip it.
807            if self.contains_transmission(*transmission_id) {
808                continue;
809            }
810            // Retrieve the transmission.
811            match transmission_id {
812                TransmissionID::Ratification => (),
813                TransmissionID::Solution(solution_id, _) => {
814                    // Retrieve the solution.
815                    match block.get_solution(solution_id) {
816                        // Insert the solution.
817                        Some(solution) => missing_transmissions.insert(*transmission_id, (*solution).into()),
818                        // Otherwise, try to load the solution from the ledger.
819                        None => match self.ledger.get_solution(solution_id) {
820                            // Insert the solution.
821                            Ok(solution) => missing_transmissions.insert(*transmission_id, solution.into()),
822                            // Check if the solution is in the aborted solutions.
823                            Err(_) => {
824                                // Insert the aborted solution if it exists in the block or ledger.
825                                match aborted_solutions.contains(solution_id)
826                                    || self.ledger.contains_transmission(transmission_id).unwrap_or(false)
827                                {
828                                    true => {
829                                        aborted_transmissions.insert(*transmission_id);
830                                    }
831                                    false => error!("Missing solution {solution_id} in block {}", block.height()),
832                                }
833                                continue;
834                            }
835                        },
836                    };
837                }
838                TransmissionID::Transaction(transaction_id, _) => {
839                    // Retrieve the transaction.
840                    match unconfirmed_transactions.get(transaction_id) {
841                        // Insert the transaction.
842                        Some(transaction) => missing_transmissions.insert(*transmission_id, transaction.clone().into()),
843                        // Otherwise, try to load the unconfirmed transaction from the ledger.
844                        None => match self.ledger.get_unconfirmed_transaction(*transaction_id) {
845                            // Insert the transaction.
846                            Ok(transaction) => missing_transmissions.insert(*transmission_id, transaction.into()),
847                            // Check if the transaction is in the aborted transactions.
848                            Err(_) => {
849                                // Insert the aborted transaction if it exists in the block or ledger.
850                                match aborted_transactions.contains(transaction_id)
851                                    || self.ledger.contains_transmission(transmission_id).unwrap_or(false)
852                                {
853                                    true => {
854                                        aborted_transmissions.insert(*transmission_id);
855                                    }
856                                    false => warn!("Missing transaction {transaction_id} in block {}", block.height()),
857                                }
858                                continue;
859                            }
860                        },
861                    };
862                }
863            }
864        }
865        // Insert the batch certificate into storage.
866        let certificate_id = fmt_id(certificate.id());
867        debug!(
868            "Syncing certificate '{certificate_id}' for round {} with {} transmissions",
869            certificate.round(),
870            certificate.transmission_ids().len()
871        );
872        if let Err(error) = self.insert_certificate(certificate, missing_transmissions, aborted_transmissions) {
873            error!("Failed to insert certificate '{certificate_id}' from block {} - {error}", block.height());
874        }
875    }
876}
877
878#[cfg(test)]
879impl<N: Network> Storage<N> {
880    /// Returns the ledger service.
881    pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
882        &self.ledger
883    }
884
885    /// Returns an iterator over the `(round, (certificate ID, batch ID, author))` entries.
886    pub fn rounds_iter(&self) -> impl Iterator<Item = (u64, IndexSet<(Field<N>, Address<N>)>)> {
887        self.rounds.read().clone().into_iter()
888    }
889
890    /// Returns an iterator over the `(certificate ID, certificate)` entries.
891    pub fn certificates_iter(&self) -> impl Iterator<Item = (Field<N>, BatchCertificate<N>)> {
892        self.certificates.read().clone().into_iter()
893    }
894
895    /// Returns an iterator over the `(batch ID, round)` entries.
896    pub fn batch_ids_iter(&self) -> impl Iterator<Item = (Field<N>, u64)> {
897        self.batch_ids.read().clone().into_iter()
898    }
899
900    /// Returns an iterator over the `(transmission ID, (transmission, certificate IDs))` entries.
901    pub fn transmissions_iter(
902        &self,
903    ) -> impl Iterator<Item = (TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>))> {
904        self.transmissions.as_hashmap().into_iter()
905    }
906
907    /// Inserts the given `certificate` into storage.
908    ///
909    /// Note: Do NOT use this in production. This is for **testing only**.
910    #[cfg(test)]
911    #[doc(hidden)]
912    pub(crate) fn testing_only_insert_certificate_testing_only(&self, certificate: BatchCertificate<N>) {
913        // Retrieve the round.
914        let round = certificate.round();
915        // Retrieve the certificate ID.
916        let certificate_id = certificate.id();
917        // Retrieve the author of the batch.
918        let author = certificate.author();
919
920        // Insert the round to certificate ID entry.
921        self.rounds.write().entry(round).or_default().insert((certificate_id, author));
922        // Obtain the certificate's transmission ids.
923        let transmission_ids = certificate.transmission_ids().clone();
924        // Insert the certificate.
925        self.certificates.write().insert(certificate_id, certificate);
926        // Insert the batch ID.
927        self.batch_ids.write().insert(certificate_id, round);
928
929        // Construct the dummy missing transmissions (for testing purposes).
930        let missing_transmissions = transmission_ids
931            .iter()
932            .map(|id| (*id, Transmission::Transaction(snarkvm::ledger::narwhal::Data::Buffer(bytes::Bytes::new()))))
933            .collect::<HashMap<_, _>>();
934        // Insert the certificate ID for each of the transmissions into storage.
935        self.transmissions.insert_transmissions(
936            certificate_id,
937            transmission_ids,
938            Default::default(),
939            missing_transmissions,
940        );
941    }
942}
943
944#[cfg(test)]
945pub(crate) mod tests {
946    use super::*;
947    use snarkos_node_bft_ledger_service::MockLedgerService;
948    use snarkos_node_bft_storage_service::BFTMemoryService;
949    use snarkvm::{
950        ledger::narwhal::{Data, batch_certificate::test_helpers::sample_batch_certificate_for_round_with_committee},
951        prelude::{Rng, TestRng},
952    };
953
954    use ::bytes::Bytes;
955    use indexmap::indexset;
956
957    type CurrentNetwork = snarkvm::prelude::MainnetV0;
958
959    /// Asserts that the storage matches the expected layout.
960    pub fn assert_storage<N: Network>(
961        storage: &Storage<N>,
962        rounds: &[(u64, IndexSet<(Field<N>, Address<N>)>)],
963        certificates: &[(Field<N>, BatchCertificate<N>)],
964        batch_ids: &[(Field<N>, u64)],
965        transmissions: &HashMap<TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>)>,
966    ) {
967        // Ensure the rounds are well-formed.
968        assert_eq!(storage.rounds_iter().collect::<Vec<_>>(), *rounds);
969        // Ensure the certificates are well-formed.
970        assert_eq!(storage.certificates_iter().collect::<Vec<_>>(), *certificates);
971        // Ensure the batch IDs are well-formed.
972        assert_eq!(storage.batch_ids_iter().collect::<Vec<_>>(), *batch_ids);
973        // Ensure the transmissions are well-formed.
974        assert_eq!(storage.transmissions_iter().collect::<HashMap<_, _>>(), *transmissions);
975    }
976
977    /// Samples a random transmission.
978    fn sample_transmission(rng: &mut TestRng) -> Transmission<CurrentNetwork> {
979        // Sample random fake solution bytes.
980        let s = |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
981        // Sample random fake transaction bytes.
982        let t = |rng: &mut TestRng| Data::Buffer(Bytes::from((0..2048).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
983        // Sample a random transmission.
984        match rng.r#gen::<bool>() {
985            true => Transmission::Solution(s(rng)),
986            false => Transmission::Transaction(t(rng)),
987        }
988    }
989
990    /// Samples the random transmissions, returning the missing transmissions and the transmissions.
991    pub(crate) fn sample_transmissions(
992        certificate: &BatchCertificate<CurrentNetwork>,
993        rng: &mut TestRng,
994    ) -> (
995        HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>,
996        HashMap<TransmissionID<CurrentNetwork>, (Transmission<CurrentNetwork>, IndexSet<Field<CurrentNetwork>>)>,
997    ) {
998        // Retrieve the certificate ID.
999        let certificate_id = certificate.id();
1000
1001        let mut missing_transmissions = HashMap::new();
1002        let mut transmissions = HashMap::<_, (_, IndexSet<Field<CurrentNetwork>>)>::new();
1003        for transmission_id in certificate.transmission_ids() {
1004            // Initialize the transmission.
1005            let transmission = sample_transmission(rng);
1006            // Update the missing transmissions.
1007            missing_transmissions.insert(*transmission_id, transmission.clone());
1008            // Update the transmissions map.
1009            transmissions
1010                .entry(*transmission_id)
1011                .or_insert((transmission, Default::default()))
1012                .1
1013                .insert(certificate_id);
1014        }
1015        (missing_transmissions, transmissions)
1016    }
1017
1018    // TODO (howardwu): Testing with 'max_gc_rounds' set to '0' should ensure everything is cleared after insertion.
1019
1020    #[test]
1021    fn test_certificate_insert_remove() {
1022        let rng = &mut TestRng::default();
1023
1024        // Sample a committee.
1025        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
1026        // Initialize the ledger.
1027        let ledger = Arc::new(MockLedgerService::new(committee));
1028        // Initialize the storage.
1029        let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1030
1031        // Ensure the storage is empty.
1032        assert_storage(&storage, &[], &[], &[], &Default::default());
1033
1034        // Create a new certificate.
1035        let certificate = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate(rng);
1036        // Retrieve the certificate ID.
1037        let certificate_id = certificate.id();
1038        // Retrieve the round.
1039        let round = certificate.round();
1040        // Retrieve the author of the batch.
1041        let author = certificate.author();
1042
1043        // Construct the sample 'transmissions'.
1044        let (missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1045
1046        // Insert the certificate.
1047        storage.insert_certificate_atomic(certificate.clone(), Default::default(), missing_transmissions);
1048        // Ensure the certificate exists in storage.
1049        assert!(storage.contains_certificate(certificate_id));
1050        // Ensure the certificate is stored in the correct round.
1051        assert_eq!(storage.get_certificates_for_round(round), indexset! { certificate.clone() });
1052        // Ensure the certificate is stored for the correct round and author.
1053        assert_eq!(storage.get_certificate_for_round_with_author(round, author), Some(certificate.clone()));
1054
1055        // Check that the underlying storage representation is correct.
1056        {
1057            // Construct the expected layout for 'rounds'.
1058            let rounds = [(round, indexset! { (certificate_id, author) })];
1059            // Construct the expected layout for 'certificates'.
1060            let certificates = [(certificate_id, certificate.clone())];
1061            // Construct the expected layout for 'batch_ids'.
1062            let batch_ids = [(certificate_id, round)];
1063            // Assert the storage is well-formed.
1064            assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1065        }
1066
1067        // Retrieve the certificate.
1068        let candidate_certificate = storage.get_certificate(certificate_id).unwrap();
1069        // Ensure the retrieved certificate is the same as the inserted certificate.
1070        assert_eq!(certificate, candidate_certificate);
1071
1072        // Remove the certificate.
1073        assert!(storage.remove_certificate(certificate_id));
1074        // Ensure the certificate does not exist in storage.
1075        assert!(!storage.contains_certificate(certificate_id));
1076        // Ensure the certificate is no longer stored in the round.
1077        assert!(storage.get_certificates_for_round(round).is_empty());
1078        // Ensure the certificate is no longer stored for the round and author.
1079        assert_eq!(storage.get_certificate_for_round_with_author(round, author), None);
1080        // Ensure the storage is empty.
1081        assert_storage(&storage, &[], &[], &[], &Default::default());
1082    }
1083
1084    #[test]
1085    fn test_certificate_duplicate() {
1086        let rng = &mut TestRng::default();
1087
1088        // Sample a committee.
1089        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
1090        // Initialize the ledger.
1091        let ledger = Arc::new(MockLedgerService::new(committee));
1092        // Initialize the storage.
1093        let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1094
1095        // Ensure the storage is empty.
1096        assert_storage(&storage, &[], &[], &[], &Default::default());
1097
1098        // Create a new certificate.
1099        let certificate = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate(rng);
1100        // Retrieve the certificate ID.
1101        let certificate_id = certificate.id();
1102        // Retrieve the round.
1103        let round = certificate.round();
1104        // Retrieve the author of the batch.
1105        let author = certificate.author();
1106
1107        // Construct the expected layout for 'rounds'.
1108        let rounds = [(round, indexset! { (certificate_id, author) })];
1109        // Construct the expected layout for 'certificates'.
1110        let certificates = [(certificate_id, certificate.clone())];
1111        // Construct the expected layout for 'batch_ids'.
1112        let batch_ids = [(certificate_id, round)];
1113        // Construct the sample 'transmissions'.
1114        let (missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1115
1116        // Insert the certificate.
1117        storage.insert_certificate_atomic(certificate.clone(), Default::default(), missing_transmissions.clone());
1118        // Ensure the certificate exists in storage.
1119        assert!(storage.contains_certificate(certificate_id));
1120        // Check that the underlying storage representation is correct.
1121        assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1122
1123        // Insert the certificate again - without any missing transmissions.
1124        storage.insert_certificate_atomic(certificate.clone(), Default::default(), Default::default());
1125        // Ensure the certificate exists in storage.
1126        assert!(storage.contains_certificate(certificate_id));
1127        // Check that the underlying storage representation remains unchanged.
1128        assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1129
1130        // Insert the certificate again - with all of the original missing transmissions.
1131        storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1132        // Ensure the certificate exists in storage.
1133        assert!(storage.contains_certificate(certificate_id));
1134        // Check that the underlying storage representation remains unchanged.
1135        assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1136    }
1137
1138    /// Test that `check_incoming_certificate` does not reject a valid cert.
1139    #[test]
1140    fn test_valid_incoming_certificate() {
1141        let rng = &mut TestRng::default();
1142
1143        // Sample a committee.
1144        let (committee, private_keys) =
1145            snarkvm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 5, rng);
1146        // Initialize the ledger.
1147        let ledger = Arc::new(MockLedgerService::new(committee));
1148        // Initialize the storage.
1149        let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1150
1151        // Go through many rounds of valid certificates and ensure they're accepted.
1152        let mut previous_certs = IndexSet::default();
1153
1154        for round in 1..=100 {
1155            let mut new_certs = IndexSet::default();
1156
1157            // Generate one cert per validator
1158            for private_key in private_keys.iter() {
1159                let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1160
1161                let certificate = sample_batch_certificate_for_round_with_committee(
1162                    round,
1163                    previous_certs.clone(),
1164                    private_key,
1165                    &other_keys,
1166                    rng,
1167                );
1168                storage.check_incoming_certificate(&certificate).expect("Valid certificate rejected");
1169                new_certs.insert(certificate.id());
1170
1171                // Construct the sample 'transmissions'.
1172                let (missing_transmissions, _transmissions) = sample_transmissions(&certificate, rng);
1173                storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1174            }
1175
1176            previous_certs = new_certs;
1177        }
1178    }
1179
1180    /// Make sure that we reject all certificates without sufficient signatures early.
1181    #[test]
1182    fn test_invalid_incoming_certificate_missing_signature() {
1183        let rng = &mut TestRng::default();
1184
1185        // Sample a committee.
1186        let (committee, private_keys) =
1187            snarkvm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 10, rng);
1188        // Initialize the ledger.
1189        let ledger = Arc::new(MockLedgerService::new(committee));
1190        // Initialize the storage.
1191        let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1192
1193        // Go through many rounds of valid certificates and ensure they're accepted.
1194        let mut previous_certs = IndexSet::default();
1195
1196        for round in 1..=5 {
1197            let mut new_certs = IndexSet::default();
1198
1199            // Generate one cert per validator
1200            for private_key in private_keys.iter() {
1201                if round < 5 {
1202                    let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1203
1204                    let certificate = sample_batch_certificate_for_round_with_committee(
1205                        round,
1206                        previous_certs.clone(),
1207                        private_key,
1208                        &other_keys,
1209                        rng,
1210                    );
1211                    storage.check_incoming_certificate(&certificate).expect("Valid certificate rejected");
1212                    new_certs.insert(certificate.id());
1213
1214                    // Construct the sample 'transmissions'.
1215                    let (missing_transmissions, _transmissions) = sample_transmissions(&certificate, rng);
1216                    storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1217                } else {
1218                    // Pick a few signers, but not enough to form a quorum.
1219                    let other_keys: Vec<_> = private_keys[0..=3].iter().cloned().filter(|k| k != private_key).collect();
1220
1221                    let certificate = sample_batch_certificate_for_round_with_committee(
1222                        round,
1223                        previous_certs.clone(),
1224                        private_key,
1225                        &other_keys,
1226                        rng,
1227                    );
1228                    assert!(storage.check_incoming_certificate(&certificate).is_err());
1229                }
1230            }
1231
1232            previous_certs = new_certs;
1233        }
1234    }
1235
1236    /// Verify that `insert_certificate` rejects certs with less edges than required.
1237    #[test]
1238    fn test_invalid_certificate_insufficient_previous_certs() {
1239        let rng = &mut TestRng::default();
1240
1241        // Sample a committee.
1242        let (committee, private_keys) =
1243            snarkvm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 10, rng);
1244        // Initialize the ledger.
1245        let ledger = Arc::new(MockLedgerService::new(committee));
1246        // Initialize the storage.
1247        let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1248
1249        // Go through many rounds of valid certificates and ensure they're accepted.
1250        let mut previous_certs = IndexSet::default();
1251
1252        for round in 1..=6 {
1253            let mut new_certs = IndexSet::default();
1254
1255            // Generate one cert per validator
1256            for private_key in private_keys.iter() {
1257                let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1258
1259                let certificate = sample_batch_certificate_for_round_with_committee(
1260                    round,
1261                    previous_certs.clone(),
1262                    private_key,
1263                    &other_keys,
1264                    rng,
1265                );
1266
1267                // Construct the sample 'transmissions'.
1268                let (_missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1269                let transmissions = transmissions.into_iter().map(|(k, (t, _))| (k, t)).collect();
1270
1271                if round <= 5 {
1272                    new_certs.insert(certificate.id());
1273                    storage
1274                        .insert_certificate(certificate, transmissions, Default::default())
1275                        .expect("Valid certificate rejected");
1276                } else {
1277                    assert!(storage.insert_certificate(certificate, transmissions, Default::default()).is_err());
1278                }
1279            }
1280
1281            if round < 5 {
1282                previous_certs = new_certs;
1283            } else {
1284                // Remove more than half of the previous certs.
1285                previous_certs = new_certs.into_iter().skip(6).collect();
1286            }
1287        }
1288    }
1289
1290    /// Verify that `insert_certificate` rejects certs that do not increment the round number.
1291    #[test]
1292    fn test_invalid_certificate_wrong_round_number() {
1293        let rng = &mut TestRng::default();
1294
1295        // Sample a committee.
1296        let (committee, private_keys) =
1297            snarkvm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 10, rng);
1298        // Initialize the ledger.
1299        let ledger = Arc::new(MockLedgerService::new(committee));
1300        // Initialize the storage.
1301        let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1302
1303        // Go through many rounds of valid certificates and ensure they're accepted.
1304        let mut previous_certs = IndexSet::default();
1305
1306        for round in 1..=6 {
1307            let mut new_certs = IndexSet::default();
1308
1309            // Generate one cert per validator
1310            for private_key in private_keys.iter() {
1311                let cert_round = round.min(5); // In the sixth round, do not increment
1312                let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1313
1314                let certificate = sample_batch_certificate_for_round_with_committee(
1315                    cert_round,
1316                    previous_certs.clone(),
1317                    private_key,
1318                    &other_keys,
1319                    rng,
1320                );
1321
1322                // Construct the sample 'transmissions'.
1323                let (_missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1324                let transmissions = transmissions.into_iter().map(|(k, (t, _))| (k, t)).collect();
1325
1326                if round <= 5 {
1327                    new_certs.insert(certificate.id());
1328                    storage
1329                        .insert_certificate(certificate, transmissions, Default::default())
1330                        .expect("Valid certificate rejected");
1331                } else {
1332                    assert!(storage.insert_certificate(certificate, transmissions, Default::default()).is_err());
1333                }
1334            }
1335
1336            if round < 5 {
1337                previous_certs = new_certs;
1338            } else {
1339                // Remove more than half of the previous certs.
1340                previous_certs = new_certs.into_iter().skip(6).collect();
1341            }
1342        }
1343    }
1344}
1345
1346#[cfg(test)]
1347pub mod prop_tests {
1348    use super::*;
1349    use crate::helpers::{now, storage::tests::assert_storage};
1350    use snarkos_node_bft_ledger_service::MockLedgerService;
1351    use snarkos_node_bft_storage_service::BFTMemoryService;
1352    use snarkvm::{
1353        ledger::{
1354            committee::prop_tests::{CommitteeContext, ValidatorSet},
1355            narwhal::{BatchHeader, Data},
1356            puzzle::SolutionID,
1357        },
1358        prelude::{Signature, Uniform},
1359    };
1360
1361    use ::bytes::Bytes;
1362    use indexmap::indexset;
1363    use proptest::{
1364        collection,
1365        prelude::{Arbitrary, BoxedStrategy, Just, Strategy, any},
1366        prop_oneof,
1367        sample::{Selector, size_range},
1368        test_runner::TestRng,
1369    };
1370    use rand::{CryptoRng, Error, Rng, RngCore};
1371    use std::fmt::Debug;
1372    use test_strategy::proptest;
1373
1374    type CurrentNetwork = snarkvm::prelude::MainnetV0;
1375
1376    impl Arbitrary for Storage<CurrentNetwork> {
1377        type Parameters = CommitteeContext;
1378        type Strategy = BoxedStrategy<Storage<CurrentNetwork>>;
1379
1380        fn arbitrary() -> Self::Strategy {
1381            (any::<CommitteeContext>(), 0..BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64)
1382                .prop_map(|(CommitteeContext(committee, _), gc_rounds)| {
1383                    let ledger = Arc::new(MockLedgerService::new(committee));
1384                    Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), gc_rounds)
1385                })
1386                .boxed()
1387        }
1388
1389        fn arbitrary_with(context: Self::Parameters) -> Self::Strategy {
1390            (Just(context), 0..BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64)
1391                .prop_map(|(CommitteeContext(committee, _), gc_rounds)| {
1392                    let ledger = Arc::new(MockLedgerService::new(committee));
1393                    Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), gc_rounds)
1394                })
1395                .boxed()
1396        }
1397    }
1398
1399    // The `proptest::TestRng` doesn't implement `rand_core::CryptoRng` trait which is required in snarkVM, so we use a wrapper
1400    #[derive(Debug)]
1401    pub struct CryptoTestRng(TestRng);
1402
1403    impl Arbitrary for CryptoTestRng {
1404        type Parameters = ();
1405        type Strategy = BoxedStrategy<CryptoTestRng>;
1406
1407        fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1408            Just(0).prop_perturb(|_, rng| CryptoTestRng(rng)).boxed()
1409        }
1410    }
1411    impl RngCore for CryptoTestRng {
1412        fn next_u32(&mut self) -> u32 {
1413            self.0.next_u32()
1414        }
1415
1416        fn next_u64(&mut self) -> u64 {
1417            self.0.next_u64()
1418        }
1419
1420        fn fill_bytes(&mut self, dest: &mut [u8]) {
1421            self.0.fill_bytes(dest);
1422        }
1423
1424        fn try_fill_bytes(&mut self, dest: &mut [u8]) -> std::result::Result<(), Error> {
1425            self.0.try_fill_bytes(dest)
1426        }
1427    }
1428
1429    impl CryptoRng for CryptoTestRng {}
1430
1431    #[derive(Debug, Clone)]
1432    pub struct AnyTransmission(pub Transmission<CurrentNetwork>);
1433
1434    impl Arbitrary for AnyTransmission {
1435        type Parameters = ();
1436        type Strategy = BoxedStrategy<AnyTransmission>;
1437
1438        fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1439            any_transmission().prop_map(AnyTransmission).boxed()
1440        }
1441    }
1442
1443    #[derive(Debug, Clone)]
1444    pub struct AnyTransmissionID(pub TransmissionID<CurrentNetwork>);
1445
1446    impl Arbitrary for AnyTransmissionID {
1447        type Parameters = ();
1448        type Strategy = BoxedStrategy<AnyTransmissionID>;
1449
1450        fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1451            any_transmission_id().prop_map(AnyTransmissionID).boxed()
1452        }
1453    }
1454
1455    fn any_transmission() -> BoxedStrategy<Transmission<CurrentNetwork>> {
1456        prop_oneof![
1457            (collection::vec(any::<u8>(), 512..=512))
1458                .prop_map(|bytes| Transmission::Solution(Data::Buffer(Bytes::from(bytes)))),
1459            (collection::vec(any::<u8>(), 2048..=2048))
1460                .prop_map(|bytes| Transmission::Transaction(Data::Buffer(Bytes::from(bytes)))),
1461        ]
1462        .boxed()
1463    }
1464
1465    pub fn any_solution_id() -> BoxedStrategy<SolutionID<CurrentNetwork>> {
1466        Just(0).prop_perturb(|_, rng| CryptoTestRng(rng).r#gen::<u64>().into()).boxed()
1467    }
1468
1469    pub fn any_transaction_id() -> BoxedStrategy<<CurrentNetwork as Network>::TransactionID> {
1470        Just(0)
1471            .prop_perturb(|_, rng| {
1472                <CurrentNetwork as Network>::TransactionID::from(Field::rand(&mut CryptoTestRng(rng)))
1473            })
1474            .boxed()
1475    }
1476
1477    pub fn any_transmission_id() -> BoxedStrategy<TransmissionID<CurrentNetwork>> {
1478        prop_oneof![
1479            any_transaction_id().prop_perturb(|id, mut rng| TransmissionID::Transaction(
1480                id,
1481                rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>()
1482            )),
1483            any_solution_id().prop_perturb(|id, mut rng| TransmissionID::Solution(
1484                id,
1485                rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>()
1486            )),
1487        ]
1488        .boxed()
1489    }
1490
1491    pub fn sign_batch_header<R: Rng + CryptoRng>(
1492        validator_set: &ValidatorSet,
1493        batch_header: &BatchHeader<CurrentNetwork>,
1494        rng: &mut R,
1495    ) -> IndexSet<Signature<CurrentNetwork>> {
1496        let mut signatures = IndexSet::with_capacity(validator_set.0.len());
1497        for validator in validator_set.0.iter() {
1498            let private_key = validator.private_key;
1499            signatures.insert(private_key.sign(&[batch_header.batch_id()], rng).unwrap());
1500        }
1501        signatures
1502    }
1503
1504    #[proptest]
1505    fn test_certificate_duplicate(
1506        context: CommitteeContext,
1507        #[any(size_range(1..16).lift())] transmissions: Vec<(AnyTransmissionID, AnyTransmission)>,
1508        mut rng: CryptoTestRng,
1509        selector: Selector,
1510    ) {
1511        let CommitteeContext(committee, ValidatorSet(validators)) = context;
1512        let committee_id = committee.id();
1513
1514        // Initialize the storage.
1515        let ledger = Arc::new(MockLedgerService::new(committee));
1516        let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1517
1518        // Ensure the storage is empty.
1519        assert_storage(&storage, &[], &[], &[], &Default::default());
1520
1521        // Create a new certificate.
1522        let signer = selector.select(&validators);
1523
1524        let mut transmission_map = IndexMap::new();
1525
1526        for (AnyTransmissionID(id), AnyTransmission(t)) in transmissions.iter() {
1527            transmission_map.insert(*id, t.clone());
1528        }
1529
1530        let batch_header = BatchHeader::new(
1531            &signer.private_key,
1532            0,
1533            now(),
1534            committee_id,
1535            transmission_map.keys().cloned().collect(),
1536            Default::default(),
1537            &mut rng,
1538        )
1539        .unwrap();
1540
1541        // Remove the author from the validator set passed to create the batch
1542        // certificate, the author should not sign their own batch.
1543        let mut validators = validators.clone();
1544        validators.remove(signer);
1545
1546        let certificate = BatchCertificate::from(
1547            batch_header.clone(),
1548            sign_batch_header(&ValidatorSet(validators), &batch_header, &mut rng),
1549        )
1550        .unwrap();
1551
1552        // Retrieve the certificate ID.
1553        let certificate_id = certificate.id();
1554        let mut internal_transmissions = HashMap::<_, (_, IndexSet<Field<CurrentNetwork>>)>::new();
1555        for (AnyTransmissionID(id), AnyTransmission(t)) in transmissions.iter().cloned() {
1556            internal_transmissions.entry(id).or_insert((t, Default::default())).1.insert(certificate_id);
1557        }
1558
1559        // Retrieve the round.
1560        let round = certificate.round();
1561        // Retrieve the author of the batch.
1562        let author = certificate.author();
1563
1564        // Construct the expected layout for 'rounds'.
1565        let rounds = [(round, indexset! { (certificate_id, author) })];
1566        // Construct the expected layout for 'certificates'.
1567        let certificates = [(certificate_id, certificate.clone())];
1568        // Construct the expected layout for 'batch_ids'.
1569        let batch_ids = [(certificate_id, round)];
1570
1571        // Insert the certificate.
1572        let missing_transmissions: HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>> =
1573            transmission_map.into_iter().collect();
1574        storage.insert_certificate_atomic(certificate.clone(), Default::default(), missing_transmissions.clone());
1575        // Ensure the certificate exists in storage.
1576        assert!(storage.contains_certificate(certificate_id));
1577        // Check that the underlying storage representation is correct.
1578        assert_storage(&storage, &rounds, &certificates, &batch_ids, &internal_transmissions);
1579
1580        // Insert the certificate again - without any missing transmissions.
1581        storage.insert_certificate_atomic(certificate.clone(), Default::default(), Default::default());
1582        // Ensure the certificate exists in storage.
1583        assert!(storage.contains_certificate(certificate_id));
1584        // Check that the underlying storage representation remains unchanged.
1585        assert_storage(&storage, &rounds, &certificates, &batch_ids, &internal_transmissions);
1586
1587        // Insert the certificate again - with all of the original missing transmissions.
1588        storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1589        // Ensure the certificate exists in storage.
1590        assert!(storage.contains_certificate(certificate_id));
1591        // Check that the underlying storage representation remains unchanged.
1592        assert_storage(&storage, &rounds, &certificates, &batch_ids, &internal_transmissions);
1593    }
1594}