Skip to main content

snarkos_node_bft/helpers/
storage.rs

1// Copyright (c) 2019-2025 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::helpers::{check_timestamp_for_liveness, fmt_id};
17use snarkos_node_bft_ledger_service::LedgerService;
18use snarkos_node_bft_storage_service::StorageService;
19use snarkvm::{
20    ledger::{
21        block::{Block, Transaction},
22        narwhal::{BatchCertificate, BatchHeader, Transmission, TransmissionID},
23    },
24    prelude::{Address, Field, Network, Result, anyhow, bail, ensure},
25    utilities::{cfg_into_iter, cfg_iter, cfg_sorted_by, flatten_error},
26};
27
28use anyhow::Context;
29use indexmap::{IndexMap, IndexSet, map::Entry};
30#[cfg(feature = "locktick")]
31use locktick::parking_lot::RwLock;
32use lru::LruCache;
33#[cfg(not(feature = "locktick"))]
34use parking_lot::RwLock;
35#[cfg(not(feature = "serial"))]
36use rayon::prelude::*;
37use std::{
38    collections::{HashMap, HashSet},
39    num::NonZeroUsize,
40    sync::{
41        Arc,
42        atomic::{AtomicU32, AtomicU64, Ordering},
43    },
44};
45
46#[derive(Clone, Debug)]
47pub struct Storage<N: Network>(Arc<StorageInner<N>>);
48
49impl<N: Network> std::ops::Deref for Storage<N> {
50    type Target = Arc<StorageInner<N>>;
51
52    fn deref(&self) -> &Self::Target {
53        &self.0
54    }
55}
56
57/// The storage for the memory pool.
58///
59/// The storage is used to store the following:
60/// - `current_height` tracker.
61/// - `current_round` tracker.
62/// - `round` to `(certificate ID, batch ID, author)` entries.
63/// - `certificate ID` to `certificate` entries.
64/// - `batch ID` to `round` entries.
65/// - `transmission ID` to `(transmission, certificate IDs)` entries.
66///
67/// The chain of events is as follows:
68/// 1. A `transmission` is received.
69/// 2. After a `batch` is ready to be stored:
70///   - The `certificate` is inserted, triggering updates to the
71///     `rounds`, `certificates`, `batch_ids`, and `transmissions` maps.
72///   - The missing `transmissions` from storage are inserted into the `transmissions` map.
73///   - The certificate ID is inserted into the `transmissions` map.
74/// 3. After a `round` reaches quorum threshold:
75///  - The next round is inserted into the `current_round`.
76#[derive(Debug)]
77pub struct StorageInner<N: Network> {
78    /// The ledger service.
79    ledger: Arc<dyn LedgerService<N>>,
80    /* Once per block */
81    /// The current height.
82    current_height: AtomicU32,
83    /* Once per round */
84    /// The current round.
85    ///
86    /// Invariant: current_round > 0.
87    /// This is established in [`Storage::new`], which sets it to at least 1 via [`Storage::update_current_round`].
88    /// The only callers of [`Storage::update_current_round`] are
89    /// [`Storage::increment_to_next_round`] and [`Storage::sync_round_with_block`],
90    /// both of which set it to at least 1.
91    current_round: AtomicU64,
92    /// The `round` for which garbage collection has occurred **up to** (inclusive).
93    gc_round: AtomicU64,
94    /// The maximum number of rounds to keep in storage.
95    max_gc_rounds: u64,
96    /* Once per batch */
97    /// The map of `round` to a list of `(certificate ID, author)` entries.
98    rounds: RwLock<IndexMap<u64, IndexSet<(Field<N>, Address<N>)>>>,
99    /// A cache of `certificate ID` to unprocessed `certificate`.
100    unprocessed_certificates: RwLock<LruCache<Field<N>, BatchCertificate<N>>>,
101    /// The map of `certificate ID` to `certificate`.
102    certificates: RwLock<IndexMap<Field<N>, BatchCertificate<N>>>,
103    /// The map of `certificate ID` to `round`.
104    batch_ids: RwLock<IndexMap<Field<N>, u64>>,
105    /// The map of `transmission ID` to `(transmission, certificate IDs)` entries.
106    transmissions: Arc<dyn StorageService<N>>,
107}
108
109impl<N: Network> Storage<N> {
110    /// Initializes a new instance of storage.
111    pub fn new(
112        ledger: Arc<dyn LedgerService<N>>,
113        transmissions: Arc<dyn StorageService<N>>,
114        max_gc_rounds: u64,
115    ) -> Self {
116        // Retrieve the latest committee bonded in the ledger
117        // (genesis committee if the ledger contains only the genesis block).
118        let committee = ledger.current_committee().expect("Ledger is missing a committee.");
119        // Retrieve the round at which that committee was created, or 1 if it is the genesis committee.
120        let current_round = committee.starting_round().max(1);
121        // Set the unprocessed certificates cache size.
122        let unprocessed_cache_size = NonZeroUsize::new((N::LATEST_MAX_CERTIFICATES().unwrap() * 2) as usize).unwrap();
123
124        // Create the storage.
125        let storage = Self(Arc::new(StorageInner {
126            ledger,
127            current_height: Default::default(),
128            current_round: AtomicU64::new(current_round),
129            gc_round: Default::default(),
130            max_gc_rounds,
131            rounds: Default::default(),
132            unprocessed_certificates: RwLock::new(LruCache::new(unprocessed_cache_size)),
133            certificates: Default::default(),
134            batch_ids: Default::default(),
135            transmissions,
136        }));
137        // Perform GC on the current round.
138        // Since there are no certificates yet, this only sets `gc_round`.
139        storage.garbage_collect_certificates(current_round);
140        // Return the storage.
141        storage
142    }
143}
144
145impl<N: Network> Storage<N> {
146    /// Returns the current height.
147    pub fn current_height(&self) -> u32 {
148        // Get the current height.
149        self.current_height.load(Ordering::SeqCst)
150    }
151}
152
153impl<N: Network> Storage<N> {
154    /// Returns the current round.
155    pub fn current_round(&self) -> u64 {
156        // Get the current round.
157        self.current_round.load(Ordering::SeqCst)
158    }
159
160    /// Returns the `round` that garbage collection has occurred **up to** (inclusive).
161    pub fn gc_round(&self) -> u64 {
162        // Get the GC round.
163        self.gc_round.load(Ordering::SeqCst)
164    }
165
166    /// Returns the maximum number of rounds to keep in storage.
167    pub fn max_gc_rounds(&self) -> u64 {
168        self.max_gc_rounds
169    }
170
171    /// Increments storage to the next round, updating the current round.
172    /// Note: This method is only called once per round, upon certification of the primary's batch.
173    pub fn increment_to_next_round(&self, current_round: u64) -> Result<u64> {
174        // Determine the next round.
175        let next_round = current_round + 1;
176
177        // Check if the next round is less than the current round in storage.
178        {
179            // Retrieve the storage round.
180            let storage_round = self.current_round();
181            // If the next round is less than the current round in storage, return early with the storage round.
182            if next_round < storage_round {
183                return Ok(storage_round);
184            }
185
186            trace!("Incrementing storage from round {storage_round} to {next_round}");
187        }
188
189        // Retrieve the current committee.
190        let current_committee = self.ledger.current_committee()?;
191        // Retrieve the current committee's starting round.
192        let starting_round = current_committee.starting_round();
193        // If the primary is behind the current committee's starting round, sync with the latest block.
194        if next_round < starting_round {
195            // Retrieve the latest block round.
196            let latest_block_round = self.ledger.latest_round();
197            // Log the round sync.
198            info!(
199                "Syncing primary round ({next_round}) with the current committee's starting round ({starting_round}). Syncing with the latest block round {latest_block_round}..."
200            );
201            // Sync the round with the latest block.
202            self.sync_round_with_block(latest_block_round);
203            // Return the latest block round.
204            return Ok(latest_block_round);
205        }
206
207        // Update the storage to the next round.
208        self.update_current_round(next_round);
209
210        #[cfg(feature = "metrics")]
211        metrics::gauge(metrics::bft::LAST_STORED_ROUND, next_round as f64);
212
213        // Retrieve the storage round.
214        let storage_round = self.current_round();
215        // Retrieve the GC round.
216        let gc_round = self.gc_round();
217        // Ensure the next round matches in storage.
218        ensure!(next_round == storage_round, "The next round {next_round} does not match in storage ({storage_round})");
219        // Ensure the next round is greater than or equal to the GC round.
220        ensure!(next_round >= gc_round, "The next round {next_round} is behind the GC round {gc_round}");
221
222        // Log the updated round.
223        info!("Starting round {next_round}...");
224        Ok(next_round)
225    }
226
227    /// Updates the storage to the next round.
228    fn update_current_round(&self, next_round: u64) {
229        // Update the current round.
230        self.current_round.store(next_round, Ordering::SeqCst);
231    }
232
233    /// Update the storage by performing garbage collection based on the next round.
234    pub(crate) fn garbage_collect_certificates(&self, next_round: u64) {
235        // Fetch the current GC round.
236        let current_gc_round = self.gc_round();
237        // Compute the next GC round.
238        let next_gc_round = next_round.saturating_sub(self.max_gc_rounds);
239        // Check if storage needs to be garbage collected.
240        if next_gc_round > current_gc_round {
241            // Remove the GC round(s) from storage.
242            for gc_round in current_gc_round..=next_gc_round {
243                // Iterate over the certificates for the GC round.
244                for id in self.get_certificate_ids_for_round(gc_round).into_iter() {
245                    trace!(
246                        "Garbage collecting certificate {id} at round {gc_round} (cut-off is round {next_gc_round})"
247                    );
248                    self.remove_certificate(id);
249                }
250            }
251            // Update the GC round.
252            self.gc_round.store(next_gc_round, Ordering::SeqCst);
253        }
254    }
255}
256
257impl<N: Network> Storage<N> {
258    /// Returns `true` if the storage contains the specified `round`.
259    pub fn contains_certificates_for_round(&self, round: u64) -> bool {
260        // Check if the round exists in storage.
261        self.rounds.read().contains_key(&round)
262    }
263
264    /// Returns `true` if the storage contains the specified `certificate ID`.
265    pub fn contains_certificate(&self, certificate_id: Field<N>) -> bool {
266        // Check if the certificate ID exists in storage.
267        self.certificates.read().contains_key(&certificate_id)
268    }
269
270    /// Returns `true` if the storage contains a certificate from the specified `author` in the given `round`.
271    pub fn contains_certificate_in_round_from(&self, round: u64, author: Address<N>) -> bool {
272        self.rounds.read().get(&round).is_some_and(|set| set.iter().any(|(_, a)| a == &author))
273    }
274
275    /// Returns `true` if the storage contains the specified `certificate ID`.
276    pub fn contains_unprocessed_certificate(&self, certificate_id: Field<N>) -> bool {
277        // Check if the certificate ID exists in storage.
278        self.unprocessed_certificates.read().contains(&certificate_id)
279    }
280
281    /// Returns `true` if the storage contains the specified `batch ID`.
282    pub fn contains_batch(&self, batch_id: Field<N>) -> bool {
283        // Check if the batch ID exists in storage.
284        self.batch_ids.read().contains_key(&batch_id)
285    }
286
287    /// Returns `true` if the storage contains the specified `transmission ID`.
288    pub fn contains_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> bool {
289        self.transmissions.contains_transmission(transmission_id.into())
290    }
291
292    /// Returns the transmission for the given `transmission ID`.
293    /// If the transmission ID does not exist in storage, `None` is returned.
294    pub fn get_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> Option<Transmission<N>> {
295        self.transmissions.get_transmission(transmission_id.into())
296    }
297
298    /// Returns the round for the given `certificate ID`.
299    /// If the certificate ID does not exist in storage, `None` is returned.
300    pub fn get_round_for_certificate(&self, certificate_id: Field<N>) -> Option<u64> {
301        // Get the round.
302        self.certificates.read().get(&certificate_id).map(|certificate| certificate.round())
303    }
304
305    /// Returns the round for the given `batch ID`.
306    /// If the batch ID does not exist in storage, `None` is returned.
307    pub fn get_round_for_batch(&self, batch_id: Field<N>) -> Option<u64> {
308        // Get the round.
309        self.batch_ids.read().get(&batch_id).copied()
310    }
311
312    /// Returns the certificate round for the given `certificate ID`.
313    /// If the certificate ID does not exist in storage, `None` is returned.
314    pub fn get_certificate_round(&self, certificate_id: Field<N>) -> Option<u64> {
315        // Get the batch certificate and return the round.
316        self.certificates.read().get(&certificate_id).map(|certificate| certificate.round())
317    }
318
319    /// Returns the certificate for the given `certificate ID`.
320    /// If the certificate ID does not exist in storage, `None` is returned.
321    pub fn get_certificate(&self, certificate_id: Field<N>) -> Option<BatchCertificate<N>> {
322        // Get the batch certificate.
323        self.certificates.read().get(&certificate_id).cloned()
324    }
325
326    /// Returns the unprocessed certificate for the given `certificate ID`.
327    /// If the certificate ID does not exist in storage, `None` is returned.
328    pub fn get_unprocessed_certificate(&self, certificate_id: Field<N>) -> Option<BatchCertificate<N>> {
329        // Get the unprocessed certificate.
330        self.unprocessed_certificates.read().peek(&certificate_id).cloned()
331    }
332
333    /// Returns the certificate for the given `round` and `author`.
334    /// If the round does not exist in storage, `None` is returned.
335    /// If the author for the round does not exist in storage, `None` is returned.
336    pub fn get_certificate_for_round_with_author(&self, round: u64, author: Address<N>) -> Option<BatchCertificate<N>> {
337        // Retrieve the certificates.
338        if let Some(entries) = self.rounds.read().get(&round) {
339            let certificates = self.certificates.read();
340            entries.iter().find_map(
341                |(certificate_id, a)| if a == &author { certificates.get(certificate_id).cloned() } else { None },
342            )
343        } else {
344            Default::default()
345        }
346    }
347
348    /// Returns the certificates for the given `round`.
349    /// If the round does not exist in storage, an empty set is returned.
350    pub fn get_certificates_for_round(&self, round: u64) -> IndexSet<BatchCertificate<N>> {
351        // The genesis round does not have batch certificates.
352        if round == 0 {
353            return Default::default();
354        }
355        // Retrieve the certificates.
356        if let Some(entries) = self.rounds.read().get(&round) {
357            let certificates = self.certificates.read();
358            entries.iter().flat_map(|(certificate_id, _)| certificates.get(certificate_id).cloned()).collect()
359        } else {
360            Default::default()
361        }
362    }
363
364    /// Returns the certificate IDs for the given `round`.
365    /// If the round does not exist in storage, an empty set is returned.
366    pub fn get_certificate_ids_for_round(&self, round: u64) -> IndexSet<Field<N>> {
367        // The genesis round does not have batch certificates.
368        if round == 0 {
369            return Default::default();
370        }
371        // Retrieve the certificates.
372        if let Some(entries) = self.rounds.read().get(&round) {
373            entries.iter().map(|(certificate_id, _)| *certificate_id).collect()
374        } else {
375            Default::default()
376        }
377    }
378
379    /// Returns the certificate authors for the given `round`.
380    /// If the round does not exist in storage, an empty set is returned.
381    pub fn get_certificate_authors_for_round(&self, round: u64) -> HashSet<Address<N>> {
382        // The genesis round does not have batch certificates.
383        if round == 0 {
384            return Default::default();
385        }
386        // Retrieve the certificates.
387        if let Some(entries) = self.rounds.read().get(&round) {
388            entries.iter().map(|(_, author)| *author).collect()
389        } else {
390            Default::default()
391        }
392    }
393
394    /// Returns the certificates that have not yet been included in the ledger.
395    /// Note that the order of this set is by round and then insertion.
396    pub(crate) fn get_pending_certificates(&self) -> IndexSet<BatchCertificate<N>> {
397        // Obtain the read locks.
398        let rounds = self.rounds.read();
399        let certificates = self.certificates.read();
400
401        // Iterate over the rounds.
402        cfg_sorted_by!(rounds.clone(), |a, _, b, _| a.cmp(b))
403            .flat_map(|(_, certificates_for_round)| {
404                // Iterate over the certificates for the round.
405                cfg_into_iter!(certificates_for_round).filter_map(|(certificate_id, _)| {
406                    // Skip the certificate if it already exists in the ledger.
407                    if self.ledger.contains_certificate(&certificate_id).unwrap_or(false) {
408                        None
409                    } else {
410                        // Add the certificate to the pending certificates.
411                        certificates.get(&certificate_id).cloned()
412                    }
413                })
414            })
415            .collect()
416    }
417
418    /// Checks the given `batch_header` for validity, returning the missing transmissions from storage.
419    ///
420    /// # Arguments
421    /// - `batch_header`: The batch header to check.
422    /// - `transmissions`: All transmissions referenced by the certificate.
423    /// - `aborted_transmissions`: The set of aborted transmissions in this certificate.
424    ///
425    /// # Invariants
426    /// This method ensures the following invariants:
427    /// - The batch ID does not already exist in storage.
428    /// - The author is a member of the committee for the batch round.
429    /// - The timestamp is within the allowed time range.
430    /// - None of the transmissions are from any past rounds (up to GC).
431    /// - All transmissions declared in the batch header are provided or exist in storage (up to GC).
432    /// - All previous certificates declared in the certificate exist in storage (up to GC).
433    /// - All previous certificates are for the previous round (i.e. round - 1).
434    /// - All previous certificates contain a unique author.
435    /// - The previous certificates reached the quorum threshold (N - f).
436    ///
437    /// # Returns
438    /// - `Ok(Some(txns))` for a valid new batch, where `txns` is the set of missing transactions in the batch
439    ///   that need to be fetched from peers.
440    /// - `Ok(None)` if the batch already exists in storage
441    pub fn check_batch_header(
442        &self,
443        batch_header: &BatchHeader<N>,
444        transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
445        aborted_transmissions: HashSet<TransmissionID<N>>,
446    ) -> Result<Option<HashMap<TransmissionID<N>, Transmission<N>>>> {
447        // Retrieve the round.
448        let round = batch_header.round();
449        // Retrieve the GC round.
450        let gc_round = self.gc_round();
451        // Construct a GC log message.
452        let gc_log = format!("(gc = {gc_round})");
453
454        // Ensure the batch ID does not already exist in storage.
455        if self.contains_batch(batch_header.batch_id()) {
456            debug!("Batch for round {round} already exists in storage {gc_log}");
457            return Ok(None);
458        }
459
460        // Retrieve the committee lookback for the batch round.
461        let Ok(committee_lookback) = self.ledger.get_committee_lookback_for_round(round) else {
462            bail!("Storage failed to retrieve the committee lookback for round {round} {gc_log}")
463        };
464        // Ensure the author is in the committee.
465        if !committee_lookback.is_committee_member(batch_header.author()) {
466            bail!("Author {} is not in the committee for round {round} {gc_log}", batch_header.author())
467        }
468
469        // Check the timestamp for liveness.
470        check_timestamp_for_liveness(batch_header.timestamp())?;
471
472        // Retrieve the missing transmissions in storage from the given transmissions.
473        let missing_transmissions = self
474            .transmissions
475            .find_missing_transmissions(batch_header, transmissions, aborted_transmissions)
476            .map_err(|e| anyhow!("{e} for round {round} {gc_log}"))?;
477
478        // Compute the previous round.
479        let previous_round = round.saturating_sub(1);
480        // Check if the previous round is within range of the GC round.
481        if previous_round > gc_round {
482            // Retrieve the committee lookback for the previous round.
483            let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
484                bail!("Missing committee for the previous round {previous_round} in storage {gc_log}")
485            };
486            // Ensure the previous round certificates exists in storage.
487            if !self.contains_certificates_for_round(previous_round) {
488                bail!("Missing certificates for the previous round {previous_round} in storage {gc_log}")
489            }
490            // Ensure the number of previous certificate IDs is at or below the number of committee members.
491            if batch_header.previous_certificate_ids().len() > previous_committee_lookback.num_members() {
492                bail!("Too many previous certificates for round {round} {gc_log}")
493            }
494            // Initialize a set of the previous authors.
495            let mut previous_authors = HashSet::with_capacity(batch_header.previous_certificate_ids().len());
496            // Ensure storage contains all declared previous certificates (up to GC).
497            for previous_certificate_id in batch_header.previous_certificate_ids() {
498                // Retrieve the previous certificate.
499                let Some(previous_certificate) = self.get_certificate(*previous_certificate_id) else {
500                    bail!(
501                        "Missing previous certificate '{}' for certificate in round {round} {gc_log}",
502                        fmt_id(previous_certificate_id)
503                    )
504                };
505                // Ensure the previous certificate is for the previous round.
506                if previous_certificate.round() != previous_round {
507                    bail!("Round {round} certificate contains a round {previous_round} certificate {gc_log}")
508                }
509                // Ensure the previous author is new.
510                if previous_authors.contains(&previous_certificate.author()) {
511                    bail!("Round {round} certificate contains a duplicate author {gc_log}")
512                }
513                // Insert the author of the previous certificate.
514                previous_authors.insert(previous_certificate.author());
515            }
516            // Ensure the previous certificates have reached the quorum threshold.
517            if !previous_committee_lookback.is_quorum_threshold_reached(&previous_authors) {
518                bail!("Previous certificates for a batch in round {round} did not reach quorum threshold {gc_log}")
519            }
520        }
521
522        Ok(Some(missing_transmissions))
523    }
524
525    /// Check the validity of a certificate coming from another validator.
526    ///
527    /// It suffices to check that the signers (author and endorsers) are members of the applicable committee
528    /// and that they form a quorum in the committee.
529    /// Under the fundamental fault tolerance assumption of at most `f` (stake of) faulty validators,
530    /// the quorum check on signers guarantees that at least one correct validator
531    /// has ensured the validity of the proposal contained in the certificate,
532    /// either by construction (by the author) or by checking (by an endorser):
533    /// given `N > 0` total stake, and `f` the largest integer `< N/3` (where `/` is exact rational division),
534    /// we have `N >= 3f + 1`, which implies `N - f >= 2f + 1`, which is always `> f`;
535    /// `N - f` is the quorum stake.
536    pub fn check_incoming_certificate(&self, certificate: &BatchCertificate<N>) -> Result<()> {
537        // Retrieve the certificate author and round.
538        let certificate_author = certificate.author();
539        let certificate_round = certificate.round();
540
541        // Retrieve the committee lookback.
542        let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
543
544        // Ensure that the signers of the certificate reach the quorum threshold.
545        // Note that certificate.signatures() only returns the endorsing signatures, not the author's signature.
546        let mut signers: HashSet<Address<N>> =
547            certificate.signatures().map(|signature| signature.to_address()).collect();
548        signers.insert(certificate_author);
549        ensure!(
550            committee_lookback.is_quorum_threshold_reached(&signers),
551            "Certificate '{}' for round {certificate_round} does not meet quorum requirements",
552            certificate.id()
553        );
554
555        // Ensure that the signers of the certificate are in the committee.
556        cfg_iter!(&signers).try_for_each(|signer| {
557            ensure!(
558                committee_lookback.is_committee_member(*signer),
559                "Signer '{signer}' of certificate '{}' for round {certificate_round} is not in the committee",
560                certificate.id()
561            );
562            Ok(())
563        })?;
564
565        Ok(())
566    }
567
568    /// Checks the given `certificate` for validity, returning the missing transmissions from storage.
569    ///
570    /// # Arguments
571    /// - `certificate`: The certificate to check.
572    /// - `transmissions`: The transmissions contained in the certificate.
573    /// - `aborted_transmissions`: The aborted transmission contained in the certificate.
574    ///
575    /// # Invariants
576    /// This method ensures the following invariants:
577    /// - The certificate ID does not already exist in storage.
578    /// - The batch ID does not already exist in storage.
579    /// - The author is a member of the committee for the batch round.
580    /// - The author has not already created a certificate for the batch round.
581    /// - The timestamp is within the allowed time range.
582    /// - None of the transmissions are from any past rounds (up to GC).
583    /// - All transmissions declared in the batch header are provided or exist in storage (up to GC).
584    /// - All previous certificates declared in the certificate exist in storage (up to GC).
585    /// - All previous certificates are for the previous round (i.e. round - 1).
586    /// - The previous certificates reached the quorum threshold (N - f).
587    /// - The timestamps from the signers are all within the allowed time range.
588    /// - The signers have reached the quorum threshold (N - f).
589    pub fn check_certificate(
590        &self,
591        certificate: &BatchCertificate<N>,
592        transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
593        aborted_transmissions: HashSet<TransmissionID<N>>,
594    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
595        // Retrieve the round.
596        let round = certificate.round();
597        // Retrieve the GC round.
598        let gc_round = self.gc_round();
599        // Construct a GC log message.
600        let gc_log = format!("(gc = {gc_round})");
601
602        // Ensure the certificate ID does not already exist in storage.
603        if self.contains_certificate(certificate.id()) {
604            bail!("Certificate for round {round} already exists in storage {gc_log}")
605        }
606
607        // Ensure the storage does not already contain a certificate for this author in this round.
608        if self.contains_certificate_in_round_from(round, certificate.author()) {
609            bail!("Certificate with this author for round {round} already exists in storage {gc_log}")
610        }
611
612        // Ensure the batch header is well-formed.
613        let Some(missing_transmissions) =
614            self.check_batch_header(certificate.batch_header(), transmissions, aborted_transmissions)?
615        else {
616            bail!("Certificate for round {round} already exists in storage {gc_log}")
617        };
618
619        // Check the timestamp for liveness.
620        check_timestamp_for_liveness(certificate.timestamp())?;
621
622        // Retrieve the committee lookback for the batch round.
623        let Ok(committee_lookback) = self.ledger.get_committee_lookback_for_round(round) else {
624            bail!("Storage failed to retrieve the committee for round {round} {gc_log}")
625        };
626
627        // Initialize a set of the signers.
628        let mut signers = HashSet::with_capacity(certificate.signatures().len() + 1);
629        // Append the batch author.
630        signers.insert(certificate.author());
631
632        // Iterate over the signatures.
633        for signature in certificate.signatures() {
634            // Retrieve the signer.
635            let signer = signature.to_address();
636            // Ensure the signer is in the committee.
637            if !committee_lookback.is_committee_member(signer) {
638                bail!("Signer {signer} is not in the committee for round {round} {gc_log}")
639            }
640            // Append the signer.
641            signers.insert(signer);
642        }
643
644        // Ensure the signatures have reached the quorum threshold.
645        if !committee_lookback.is_quorum_threshold_reached(&signers) {
646            bail!("Signatures for a batch in round {round} did not reach quorum threshold {gc_log}")
647        }
648
649        Ok(missing_transmissions)
650    }
651
652    /// Inserts the given `certificate` into storage.
653    ///
654    /// This method triggers updates to the `rounds`, `certificates`, `batch_ids`, and `transmissions` maps.
655    ///
656    /// # Arguments
657    /// - `certificate`: The certificate to insert.
658    /// - `transmissions`: The transmissions contained in the certificate, or the subset of the transmissions that in the certificate that do not yet exist in storage.
659    /// - `aborted_transmissions`: The aborted transmission contained in the certificate.
660    ///
661    /// # Invariants
662    /// This method ensures the following invariants:
663    /// - The certificate ID does not already exist in storage.
664    /// - The batch ID does not already exist in storage.
665    /// - All transmissions declared in the certificate are provided or exist in storage (up to GC).
666    /// - All previous certificates declared in the certificate exist in storage (up to GC).
667    /// - All previous certificates are for the previous round (i.e. round - 1).
668    /// - The previous certificates reached the quorum threshold (N - f).
669    pub fn insert_certificate(
670        &self,
671        certificate: BatchCertificate<N>,
672        transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
673        aborted_transmissions: HashSet<TransmissionID<N>>,
674    ) -> Result<()> {
675        // Ensure the certificate round is above the GC round.
676        ensure!(certificate.round() > self.gc_round(), "Certificate round is at or below the GC round");
677        // Ensure the certificate and its transmissions are valid.
678        let missing_transmissions =
679            self.check_certificate(&certificate, transmissions, aborted_transmissions.clone())?;
680        // Insert the certificate into storage.
681        self.insert_certificate_atomic(certificate, aborted_transmissions, missing_transmissions);
682        Ok(())
683    }
684
685    /// Inserts the given `certificate` into storage.
686    ///
687    /// This method assumes **all missing** transmissions are provided in the `missing_transmissions` map.
688    ///
689    /// This method triggers updates to the `rounds`, `certificates`, `batch_ids`, and `transmissions` maps.
690    fn insert_certificate_atomic(
691        &self,
692        certificate: BatchCertificate<N>,
693        aborted_transmission_ids: HashSet<TransmissionID<N>>,
694        missing_transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
695    ) {
696        // Retrieve the round.
697        let round = certificate.round();
698        // Retrieve the certificate ID.
699        let certificate_id = certificate.id();
700        // Retrieve the author of the batch.
701        let author = certificate.author();
702
703        // Insert the round to certificate ID entry.
704        self.rounds.write().entry(round).or_default().insert((certificate_id, author));
705        // Obtain the certificate's transmission ids.
706        let transmission_ids = certificate.transmission_ids().clone();
707        // Insert the certificate.
708        self.certificates.write().insert(certificate_id, certificate);
709        // Remove the unprocessed certificate.
710        self.unprocessed_certificates.write().pop(&certificate_id);
711        // Insert the batch ID.
712        self.batch_ids.write().insert(certificate_id, round);
713        // Insert the certificate ID for each of the transmissions into storage.
714        self.transmissions.insert_transmissions(
715            certificate_id,
716            transmission_ids,
717            aborted_transmission_ids,
718            missing_transmissions,
719        );
720    }
721
722    /// Inserts the given unprocessed `certificate` into storage.
723    ///
724    /// This is a temporary storage, which is cleared again when calling `insert_certificate_atomic`.
725    pub fn insert_unprocessed_certificate(&self, certificate: BatchCertificate<N>) -> Result<()> {
726        // Ensure the certificate round is above the GC round.
727        ensure!(certificate.round() > self.gc_round(), "Certificate round is at or below the GC round");
728        // Insert the certificate.
729        self.unprocessed_certificates.write().put(certificate.id(), certificate);
730
731        Ok(())
732    }
733
734    /// Removes the given `certificate ID` from storage. This method is used to garbage collect individual certificates once blocks are committed.
735    ///
736    /// This method triggers updates to the `rounds`, `certificates`, `batch_ids`, and `transmissions` maps.
737    ///
738    /// If the certificate was successfully removed, `true` is returned.
739    /// If the certificate did not exist in storage, `false` is returned.
740    fn remove_certificate(&self, certificate_id: Field<N>) -> bool {
741        // Retrieve the certificate.
742        let Some(certificate) = self.get_certificate(certificate_id) else {
743            warn!("Certificate {certificate_id} does not exist in storage");
744            return false;
745        };
746        // Retrieve the round.
747        let round = certificate.round();
748        // Compute the author of the batch.
749        let author = certificate.author();
750
751        // TODO (howardwu): We may want to use `shift_remove` below, in order to align compatibility
752        //  with tests written to for `remove_certificate`. However, this will come with performance hits.
753        //  It will be better to write tests that compare the union of the sets.
754
755        // Update the round.
756        match self.rounds.write().entry(round) {
757            Entry::Occupied(mut entry) => {
758                // Remove the round to certificate ID entry.
759                entry.get_mut().swap_remove(&(certificate_id, author));
760                // If the round is empty, remove it.
761                if entry.get().is_empty() {
762                    entry.swap_remove();
763                }
764            }
765            Entry::Vacant(_) => {}
766        }
767        // Remove the certificate.
768        self.certificates.write().swap_remove(&certificate_id);
769        // Remove the unprocessed certificate.
770        self.unprocessed_certificates.write().pop(&certificate_id);
771        // Remove the batch ID.
772        self.batch_ids.write().swap_remove(&certificate_id);
773        // Remove the transmission entries in the certificate from storage.
774        self.transmissions.remove_transmissions(&certificate_id, certificate.transmission_ids());
775        // Return successfully.
776        true
777    }
778}
779
780impl<N: Network> Storage<N> {
781    /// Syncs the current height with the block.
782    pub(crate) fn sync_height_with_block(&self, next_height: u32) {
783        // If the block height is greater than the current height in storage, sync the height.
784        if next_height > self.current_height() {
785            // Update the current height in storage.
786            self.current_height.store(next_height, Ordering::SeqCst);
787        }
788    }
789
790    /// Syncs the current round with the block.
791    pub(crate) fn sync_round_with_block(&self, next_round: u64) {
792        // Ensure we sync to at least round 1.
793        let next_round = next_round.max(1);
794        // If the round in the block is greater than the current round in storage, sync the round.
795        if next_round > self.current_round() {
796            // Update the current round in storage.
797            self.update_current_round(next_round);
798            // Log the updated round.
799            info!("Synced to round {next_round}...");
800        } else {
801            trace!(
802                "Skipping sync to round {next_round} as it is less than the current round ({})",
803                self.current_round()
804            );
805        }
806    }
807
808    /// Syncs the batch certificate with the block.
809    pub(crate) fn sync_certificate_with_block(
810        &self,
811        block: &Block<N>,
812        certificate: BatchCertificate<N>,
813        unconfirmed_transactions: &HashMap<N::TransactionID, Transaction<N>>,
814    ) {
815        // Skip if the certificate round is below the GC round.
816        let gc_round = self.gc_round();
817        if certificate.round() <= gc_round {
818            trace!("Got certificate for round {} below GC round ({gc_round}). Will not store it.", certificate.round());
819            return;
820        }
821
822        // If the certificate ID already exists in storage, skip it.
823        if self.contains_certificate(certificate.id()) {
824            trace!("Got certificate {} for round {} more than once.", certificate.id(), certificate.round());
825            return;
826        }
827        // Retrieve the transmissions for the certificate.
828        let mut missing_transmissions = HashMap::new();
829
830        // Retrieve the aborted transmissions for the certificate.
831        let mut aborted_transmissions = HashSet::new();
832
833        // Track the block's aborted solutions and transactions.
834        let aborted_solutions: IndexSet<_> = block.aborted_solution_ids().iter().collect();
835        let aborted_transactions: IndexSet<_> = block.aborted_transaction_ids().iter().collect();
836
837        // Iterate over the transmission IDs.
838        for transmission_id in certificate.transmission_ids() {
839            // If the transmission ID already exists in the map, skip it.
840            if missing_transmissions.contains_key(transmission_id) {
841                continue;
842            }
843            // If the transmission ID exists in storage, skip it.
844            if self.contains_transmission(*transmission_id) {
845                continue;
846            }
847            // Retrieve the transmission.
848            match transmission_id {
849                TransmissionID::Ratification => (),
850                TransmissionID::Solution(solution_id, _) => {
851                    // Retrieve the solution.
852                    match block.get_solution(solution_id) {
853                        // Insert the solution.
854                        Some(solution) => missing_transmissions.insert(*transmission_id, (*solution).into()),
855                        // Otherwise, try to load the solution from the ledger.
856                        None => match self.ledger.get_solution(solution_id) {
857                            // Insert the solution.
858                            Ok(solution) => missing_transmissions.insert(*transmission_id, solution.into()),
859                            // Check if the solution is in the aborted solutions.
860                            Err(_) => {
861                                // Insert the aborted solution if it exists in the block or ledger.
862                                match aborted_solutions.contains(solution_id)
863                                    || self.ledger.contains_transmission(transmission_id).unwrap_or(false)
864                                {
865                                    true => {
866                                        aborted_transmissions.insert(*transmission_id);
867                                    }
868                                    false => error!("Missing solution {solution_id} in block {}", block.height()),
869                                }
870                                continue;
871                            }
872                        },
873                    };
874                }
875                TransmissionID::Transaction(transaction_id, _) => {
876                    // Retrieve the transaction.
877                    match unconfirmed_transactions.get(transaction_id) {
878                        // Insert the transaction.
879                        Some(transaction) => missing_transmissions.insert(*transmission_id, transaction.clone().into()),
880                        // Otherwise, try to load the unconfirmed transaction from the ledger.
881                        None => match self.ledger.get_unconfirmed_transaction(*transaction_id) {
882                            // Insert the transaction.
883                            Ok(transaction) => missing_transmissions.insert(*transmission_id, transaction.into()),
884                            // Check if the transaction is in the aborted transactions.
885                            Err(_) => {
886                                // Insert the aborted transaction if it exists in the block or ledger.
887                                match aborted_transactions.contains(transaction_id)
888                                    || self.ledger.contains_transmission(transmission_id).unwrap_or(false)
889                                {
890                                    true => {
891                                        aborted_transmissions.insert(*transmission_id);
892                                    }
893                                    false => warn!("Missing transaction {transaction_id} in block {}", block.height()),
894                                }
895                                continue;
896                            }
897                        },
898                    };
899                }
900            }
901        }
902        // Insert the batch certificate into storage.
903        let certificate_id = fmt_id(certificate.id());
904        debug!(
905            "Syncing certificate '{certificate_id}' for round {} with {} transmissions",
906            certificate.round(),
907            certificate.transmission_ids().len()
908        );
909
910        if let Err(error) = self
911            .insert_certificate(certificate, missing_transmissions, aborted_transmissions)
912            .with_context(|| format!("Failed to insert certificate '{certificate_id}' from block {}", block.height()))
913        {
914            error!("{}", &flatten_error(&error));
915        }
916    }
917}
918
919#[cfg(test)]
920impl<N: Network> Storage<N> {
921    /// Returns the ledger service.
922    pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
923        &self.ledger
924    }
925
926    /// Returns an iterator over the `(round, (certificate ID, batch ID, author))` entries.
927    pub fn rounds_iter(&self) -> impl Iterator<Item = (u64, IndexSet<(Field<N>, Address<N>)>)> {
928        self.rounds.read().clone().into_iter()
929    }
930
931    /// Returns an iterator over the `(certificate ID, certificate)` entries.
932    pub fn certificates_iter(&self) -> impl Iterator<Item = (Field<N>, BatchCertificate<N>)> {
933        self.certificates.read().clone().into_iter()
934    }
935
936    /// Returns an iterator over the `(batch ID, round)` entries.
937    pub fn batch_ids_iter(&self) -> impl Iterator<Item = (Field<N>, u64)> {
938        self.batch_ids.read().clone().into_iter()
939    }
940
941    /// Returns an iterator over the `(transmission ID, (transmission, certificate IDs))` entries.
942    pub fn transmissions_iter(
943        &self,
944    ) -> impl Iterator<Item = (TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>))> {
945        self.transmissions.as_hashmap().into_iter()
946    }
947
948    /// Inserts the given `certificate` into storage.
949    ///
950    /// Note: Do NOT use this in production. This is for **testing only**.
951    #[cfg(test)]
952    #[doc(hidden)]
953    pub(crate) fn testing_only_insert_certificate_testing_only(&self, certificate: BatchCertificate<N>) {
954        // Retrieve the round.
955        let round = certificate.round();
956        // Retrieve the certificate ID.
957        let certificate_id = certificate.id();
958        // Retrieve the author of the batch.
959        let author = certificate.author();
960
961        // Insert the round to certificate ID entry.
962        self.rounds.write().entry(round).or_default().insert((certificate_id, author));
963        // Obtain the certificate's transmission ids.
964        let transmission_ids = certificate.transmission_ids().clone();
965        // Insert the certificate.
966        self.certificates.write().insert(certificate_id, certificate);
967        // Insert the batch ID.
968        self.batch_ids.write().insert(certificate_id, round);
969
970        // Construct the dummy missing transmissions (for testing purposes).
971        let missing_transmissions = transmission_ids
972            .iter()
973            .map(|id| (*id, Transmission::Transaction(snarkvm::ledger::narwhal::Data::Buffer(bytes::Bytes::new()))))
974            .collect::<HashMap<_, _>>();
975        // Insert the certificate ID for each of the transmissions into storage.
976        self.transmissions.insert_transmissions(
977            certificate_id,
978            transmission_ids,
979            Default::default(),
980            missing_transmissions,
981        );
982    }
983}
984
985#[cfg(test)]
986pub(crate) mod tests {
987    use super::*;
988    use snarkos_node_bft_ledger_service::MockLedgerService;
989    use snarkos_node_bft_storage_service::BFTMemoryService;
990    use snarkvm::{
991        ledger::narwhal::{Data, batch_certificate::test_helpers::sample_batch_certificate_for_round_with_committee},
992        prelude::{Rng, TestRng},
993    };
994
995    use ::bytes::Bytes;
996    use indexmap::indexset;
997
998    type CurrentNetwork = snarkvm::prelude::MainnetV0;
999
1000    /// Asserts that the storage matches the expected layout.
1001    pub fn assert_storage<N: Network>(
1002        storage: &Storage<N>,
1003        rounds: &[(u64, IndexSet<(Field<N>, Address<N>)>)],
1004        certificates: &[(Field<N>, BatchCertificate<N>)],
1005        batch_ids: &[(Field<N>, u64)],
1006        transmissions: &HashMap<TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>)>,
1007    ) {
1008        // Ensure the rounds are well-formed.
1009        assert_eq!(storage.rounds_iter().collect::<Vec<_>>(), *rounds);
1010        // Ensure the certificates are well-formed.
1011        assert_eq!(storage.certificates_iter().collect::<Vec<_>>(), *certificates);
1012        // Ensure the batch IDs are well-formed.
1013        assert_eq!(storage.batch_ids_iter().collect::<Vec<_>>(), *batch_ids);
1014        // Ensure the transmissions are well-formed.
1015        assert_eq!(storage.transmissions_iter().collect::<HashMap<_, _>>(), *transmissions);
1016    }
1017
1018    /// Samples a random transmission.
1019    fn sample_transmission(rng: &mut TestRng) -> Transmission<CurrentNetwork> {
1020        // Sample random fake solution bytes.
1021        let s = |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
1022        // Sample random fake transaction bytes.
1023        let t = |rng: &mut TestRng| Data::Buffer(Bytes::from((0..2048).map(|_| rng.r#gen::<u8>()).collect::<Vec<_>>()));
1024        // Sample a random transmission.
1025        match rng.r#gen::<bool>() {
1026            true => Transmission::Solution(s(rng)),
1027            false => Transmission::Transaction(t(rng)),
1028        }
1029    }
1030
1031    /// Samples the random transmissions, returning the missing transmissions and the transmissions.
1032    pub(crate) fn sample_transmissions(
1033        certificate: &BatchCertificate<CurrentNetwork>,
1034        rng: &mut TestRng,
1035    ) -> (
1036        HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>,
1037        HashMap<TransmissionID<CurrentNetwork>, (Transmission<CurrentNetwork>, IndexSet<Field<CurrentNetwork>>)>,
1038    ) {
1039        // Retrieve the certificate ID.
1040        let certificate_id = certificate.id();
1041
1042        let mut missing_transmissions = HashMap::new();
1043        let mut transmissions = HashMap::<_, (_, IndexSet<Field<CurrentNetwork>>)>::new();
1044        for transmission_id in certificate.transmission_ids() {
1045            // Initialize the transmission.
1046            let transmission = sample_transmission(rng);
1047            // Update the missing transmissions.
1048            missing_transmissions.insert(*transmission_id, transmission.clone());
1049            // Update the transmissions map.
1050            transmissions
1051                .entry(*transmission_id)
1052                .or_insert((transmission, Default::default()))
1053                .1
1054                .insert(certificate_id);
1055        }
1056        (missing_transmissions, transmissions)
1057    }
1058
1059    // TODO (howardwu): Testing with 'max_gc_rounds' set to '0' should ensure everything is cleared after insertion.
1060
1061    #[test]
1062    fn test_certificate_insert_remove() {
1063        let rng = &mut TestRng::default();
1064
1065        // Sample a committee.
1066        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
1067        // Initialize the ledger.
1068        let ledger = Arc::new(MockLedgerService::new(committee));
1069        // Initialize the storage.
1070        let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1071
1072        // Ensure the storage is empty.
1073        assert_storage(&storage, &[], &[], &[], &Default::default());
1074
1075        // Create a new certificate.
1076        let certificate = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate(rng);
1077        // Retrieve the certificate ID.
1078        let certificate_id = certificate.id();
1079        // Retrieve the round.
1080        let round = certificate.round();
1081        // Retrieve the author of the batch.
1082        let author = certificate.author();
1083
1084        // Construct the sample 'transmissions'.
1085        let (missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1086
1087        // Insert the certificate.
1088        storage.insert_certificate_atomic(certificate.clone(), Default::default(), missing_transmissions);
1089        // Ensure the certificate exists in storage.
1090        assert!(storage.contains_certificate(certificate_id));
1091        // Ensure the certificate is stored in the correct round.
1092        assert_eq!(storage.get_certificates_for_round(round), indexset! { certificate.clone() });
1093        // Ensure the certificate is stored for the correct round and author.
1094        assert_eq!(storage.get_certificate_for_round_with_author(round, author), Some(certificate.clone()));
1095
1096        // Check that the underlying storage representation is correct.
1097        {
1098            // Construct the expected layout for 'rounds'.
1099            let rounds = [(round, indexset! { (certificate_id, author) })];
1100            // Construct the expected layout for 'certificates'.
1101            let certificates = [(certificate_id, certificate.clone())];
1102            // Construct the expected layout for 'batch_ids'.
1103            let batch_ids = [(certificate_id, round)];
1104            // Assert the storage is well-formed.
1105            assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1106        }
1107
1108        // Retrieve the certificate.
1109        let candidate_certificate = storage.get_certificate(certificate_id).unwrap();
1110        // Ensure the retrieved certificate is the same as the inserted certificate.
1111        assert_eq!(certificate, candidate_certificate);
1112
1113        // Remove the certificate.
1114        assert!(storage.remove_certificate(certificate_id));
1115        // Ensure the certificate does not exist in storage.
1116        assert!(!storage.contains_certificate(certificate_id));
1117        // Ensure the certificate is no longer stored in the round.
1118        assert!(storage.get_certificates_for_round(round).is_empty());
1119        // Ensure the certificate is no longer stored for the round and author.
1120        assert_eq!(storage.get_certificate_for_round_with_author(round, author), None);
1121        // Ensure the storage is empty.
1122        assert_storage(&storage, &[], &[], &[], &Default::default());
1123    }
1124
1125    #[test]
1126    fn test_certificate_duplicate() {
1127        let rng = &mut TestRng::default();
1128
1129        // Sample a committee.
1130        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
1131        // Initialize the ledger.
1132        let ledger = Arc::new(MockLedgerService::new(committee));
1133        // Initialize the storage.
1134        let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1135
1136        // Ensure the storage is empty.
1137        assert_storage(&storage, &[], &[], &[], &Default::default());
1138
1139        // Create a new certificate.
1140        let certificate = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate(rng);
1141        // Retrieve the certificate ID.
1142        let certificate_id = certificate.id();
1143        // Retrieve the round.
1144        let round = certificate.round();
1145        // Retrieve the author of the batch.
1146        let author = certificate.author();
1147
1148        // Construct the expected layout for 'rounds'.
1149        let rounds = [(round, indexset! { (certificate_id, author) })];
1150        // Construct the expected layout for 'certificates'.
1151        let certificates = [(certificate_id, certificate.clone())];
1152        // Construct the expected layout for 'batch_ids'.
1153        let batch_ids = [(certificate_id, round)];
1154        // Construct the sample 'transmissions'.
1155        let (missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1156
1157        // Insert the certificate.
1158        storage.insert_certificate_atomic(certificate.clone(), Default::default(), missing_transmissions.clone());
1159        // Ensure the certificate exists in storage.
1160        assert!(storage.contains_certificate(certificate_id));
1161        // Check that the underlying storage representation is correct.
1162        assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1163
1164        // Insert the certificate again - without any missing transmissions.
1165        storage.insert_certificate_atomic(certificate.clone(), Default::default(), Default::default());
1166        // Ensure the certificate exists in storage.
1167        assert!(storage.contains_certificate(certificate_id));
1168        // Check that the underlying storage representation remains unchanged.
1169        assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1170
1171        // Insert the certificate again - with all of the original missing transmissions.
1172        storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1173        // Ensure the certificate exists in storage.
1174        assert!(storage.contains_certificate(certificate_id));
1175        // Check that the underlying storage representation remains unchanged.
1176        assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1177    }
1178
1179    /// Test that `check_incoming_certificate` does not reject a valid cert.
1180    #[test]
1181    fn test_valid_incoming_certificate() {
1182        let rng = &mut TestRng::default();
1183
1184        // Sample a committee.
1185        let (committee, private_keys) =
1186            snarkvm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 5, rng);
1187        // Initialize the ledger.
1188        let ledger = Arc::new(MockLedgerService::new(committee));
1189        // Initialize the storage.
1190        let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1191
1192        // Go through many rounds of valid certificates and ensure they're accepted.
1193        let mut previous_certs = IndexSet::default();
1194
1195        for round in 1..=100 {
1196            let mut new_certs = IndexSet::default();
1197
1198            // Generate one cert per validator
1199            for private_key in private_keys.iter() {
1200                let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1201
1202                let certificate = sample_batch_certificate_for_round_with_committee(
1203                    round,
1204                    previous_certs.clone(),
1205                    private_key,
1206                    &other_keys,
1207                    rng,
1208                );
1209                storage.check_incoming_certificate(&certificate).expect("Valid certificate rejected");
1210                new_certs.insert(certificate.id());
1211
1212                // Construct the sample 'transmissions'.
1213                let (missing_transmissions, _transmissions) = sample_transmissions(&certificate, rng);
1214                storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1215            }
1216
1217            previous_certs = new_certs;
1218        }
1219    }
1220
1221    /// Make sure that we reject all certificates without sufficient signatures early.
1222    #[test]
1223    fn test_invalid_incoming_certificate_missing_signature() {
1224        let rng = &mut TestRng::default();
1225
1226        // Sample a committee.
1227        let (committee, private_keys) =
1228            snarkvm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 10, rng);
1229        // Initialize the ledger.
1230        let ledger = Arc::new(MockLedgerService::new(committee));
1231        // Initialize the storage.
1232        let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1233
1234        // Go through many rounds of valid certificates and ensure they're accepted.
1235        let mut previous_certs = IndexSet::default();
1236
1237        for round in 1..=5 {
1238            let mut new_certs = IndexSet::default();
1239
1240            // Generate one cert per validator
1241            for private_key in private_keys.iter() {
1242                if round < 5 {
1243                    let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1244
1245                    let certificate = sample_batch_certificate_for_round_with_committee(
1246                        round,
1247                        previous_certs.clone(),
1248                        private_key,
1249                        &other_keys,
1250                        rng,
1251                    );
1252                    storage.check_incoming_certificate(&certificate).expect("Valid certificate rejected");
1253                    new_certs.insert(certificate.id());
1254
1255                    // Construct the sample 'transmissions'.
1256                    let (missing_transmissions, _transmissions) = sample_transmissions(&certificate, rng);
1257                    storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1258                } else {
1259                    // Pick a few signers, but not enough to form a quorum.
1260                    let other_keys: Vec<_> = private_keys[0..=3].iter().cloned().filter(|k| k != private_key).collect();
1261
1262                    let certificate = sample_batch_certificate_for_round_with_committee(
1263                        round,
1264                        previous_certs.clone(),
1265                        private_key,
1266                        &other_keys,
1267                        rng,
1268                    );
1269                    assert!(storage.check_incoming_certificate(&certificate).is_err());
1270                }
1271            }
1272
1273            previous_certs = new_certs;
1274        }
1275    }
1276
1277    /// Verify that `insert_certificate` rejects certs with less edges than required.
1278    #[test]
1279    fn test_invalid_certificate_insufficient_previous_certs() {
1280        let rng = &mut TestRng::default();
1281
1282        // Sample a committee.
1283        let (committee, private_keys) =
1284            snarkvm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 10, rng);
1285        // Initialize the ledger.
1286        let ledger = Arc::new(MockLedgerService::new(committee));
1287        // Initialize the storage.
1288        let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1289
1290        // Go through many rounds of valid certificates and ensure they're accepted.
1291        let mut previous_certs = IndexSet::default();
1292
1293        for round in 1..=6 {
1294            let mut new_certs = IndexSet::default();
1295
1296            // Generate one cert per validator
1297            for private_key in private_keys.iter() {
1298                let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1299
1300                let certificate = sample_batch_certificate_for_round_with_committee(
1301                    round,
1302                    previous_certs.clone(),
1303                    private_key,
1304                    &other_keys,
1305                    rng,
1306                );
1307
1308                // Construct the sample 'transmissions'.
1309                let (_missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1310                let transmissions = transmissions.into_iter().map(|(k, (t, _))| (k, t)).collect();
1311
1312                if round <= 5 {
1313                    new_certs.insert(certificate.id());
1314                    storage
1315                        .insert_certificate(certificate, transmissions, Default::default())
1316                        .expect("Valid certificate rejected");
1317                } else {
1318                    assert!(storage.insert_certificate(certificate, transmissions, Default::default()).is_err());
1319                }
1320            }
1321
1322            if round < 5 {
1323                previous_certs = new_certs;
1324            } else {
1325                // Remove more than half of the previous certs.
1326                previous_certs = new_certs.into_iter().skip(6).collect();
1327            }
1328        }
1329    }
1330
1331    /// Verify that `insert_certificate` rejects certs that do not increment the round number.
1332    #[test]
1333    fn test_invalid_certificate_wrong_round_number() {
1334        let rng = &mut TestRng::default();
1335
1336        // Sample a committee.
1337        let (committee, private_keys) =
1338            snarkvm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 10, rng);
1339        // Initialize the ledger.
1340        let ledger = Arc::new(MockLedgerService::new(committee));
1341        // Initialize the storage.
1342        let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1343
1344        // Go through many rounds of valid certificates and ensure they're accepted.
1345        let mut previous_certs = IndexSet::default();
1346
1347        for round in 1..=6 {
1348            let mut new_certs = IndexSet::default();
1349
1350            // Generate one cert per validator
1351            for private_key in private_keys.iter() {
1352                let cert_round = round.min(5); // In the sixth round, do not increment
1353                let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1354
1355                let certificate = sample_batch_certificate_for_round_with_committee(
1356                    cert_round,
1357                    previous_certs.clone(),
1358                    private_key,
1359                    &other_keys,
1360                    rng,
1361                );
1362
1363                // Construct the sample 'transmissions'.
1364                let (_missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1365                let transmissions = transmissions.into_iter().map(|(k, (t, _))| (k, t)).collect();
1366
1367                if round <= 5 {
1368                    new_certs.insert(certificate.id());
1369                    storage
1370                        .insert_certificate(certificate, transmissions, Default::default())
1371                        .expect("Valid certificate rejected");
1372                } else {
1373                    assert!(storage.insert_certificate(certificate, transmissions, Default::default()).is_err());
1374                }
1375            }
1376
1377            if round < 5 {
1378                previous_certs = new_certs;
1379            } else {
1380                // Remove more than half of the previous certs.
1381                previous_certs = new_certs.into_iter().skip(6).collect();
1382            }
1383        }
1384    }
1385}
1386
1387#[cfg(test)]
1388pub mod prop_tests {
1389    use super::*;
1390    use crate::helpers::{now, storage::tests::assert_storage};
1391    use snarkos_node_bft_ledger_service::MockLedgerService;
1392    use snarkos_node_bft_storage_service::BFTMemoryService;
1393    use snarkvm::{
1394        ledger::{
1395            committee::prop_tests::{CommitteeContext, ValidatorSet},
1396            narwhal::{BatchHeader, Data},
1397            puzzle::SolutionID,
1398        },
1399        prelude::{Signature, Uniform},
1400    };
1401
1402    use ::bytes::Bytes;
1403    use indexmap::indexset;
1404    use proptest::{
1405        collection,
1406        prelude::{Arbitrary, BoxedStrategy, Just, Strategy, any},
1407        prop_oneof,
1408        sample::{Selector, size_range},
1409        test_runner::TestRng,
1410    };
1411    use rand::{CryptoRng, Error, Rng, RngCore};
1412    use std::fmt::Debug;
1413    use test_strategy::proptest;
1414
1415    type CurrentNetwork = snarkvm::prelude::MainnetV0;
1416
1417    impl Arbitrary for Storage<CurrentNetwork> {
1418        type Parameters = CommitteeContext;
1419        type Strategy = BoxedStrategy<Storage<CurrentNetwork>>;
1420
1421        fn arbitrary() -> Self::Strategy {
1422            (any::<CommitteeContext>(), 0..BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64)
1423                .prop_map(|(CommitteeContext(committee, _), gc_rounds)| {
1424                    let ledger = Arc::new(MockLedgerService::new(committee));
1425                    Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), gc_rounds)
1426                })
1427                .boxed()
1428        }
1429
1430        fn arbitrary_with(context: Self::Parameters) -> Self::Strategy {
1431            (Just(context), 0..BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64)
1432                .prop_map(|(CommitteeContext(committee, _), gc_rounds)| {
1433                    let ledger = Arc::new(MockLedgerService::new(committee));
1434                    Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), gc_rounds)
1435                })
1436                .boxed()
1437        }
1438    }
1439
1440    // The `proptest::TestRng` doesn't implement `rand_core::CryptoRng` trait which is required in snarkVM, so we use a wrapper
1441    #[derive(Debug)]
1442    pub struct CryptoTestRng(TestRng);
1443
1444    impl Arbitrary for CryptoTestRng {
1445        type Parameters = ();
1446        type Strategy = BoxedStrategy<CryptoTestRng>;
1447
1448        fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1449            Just(0).prop_perturb(|_, rng| CryptoTestRng(rng)).boxed()
1450        }
1451    }
1452    impl RngCore for CryptoTestRng {
1453        fn next_u32(&mut self) -> u32 {
1454            self.0.next_u32()
1455        }
1456
1457        fn next_u64(&mut self) -> u64 {
1458            self.0.next_u64()
1459        }
1460
1461        fn fill_bytes(&mut self, dest: &mut [u8]) {
1462            self.0.fill_bytes(dest);
1463        }
1464
1465        fn try_fill_bytes(&mut self, dest: &mut [u8]) -> std::result::Result<(), Error> {
1466            self.0.try_fill_bytes(dest)
1467        }
1468    }
1469
1470    impl CryptoRng for CryptoTestRng {}
1471
1472    #[derive(Debug, Clone)]
1473    pub struct AnyTransmission(pub Transmission<CurrentNetwork>);
1474
1475    impl Arbitrary for AnyTransmission {
1476        type Parameters = ();
1477        type Strategy = BoxedStrategy<AnyTransmission>;
1478
1479        fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1480            any_transmission().prop_map(AnyTransmission).boxed()
1481        }
1482    }
1483
1484    #[derive(Debug, Clone)]
1485    pub struct AnyTransmissionID(pub TransmissionID<CurrentNetwork>);
1486
1487    impl Arbitrary for AnyTransmissionID {
1488        type Parameters = ();
1489        type Strategy = BoxedStrategy<AnyTransmissionID>;
1490
1491        fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1492            any_transmission_id().prop_map(AnyTransmissionID).boxed()
1493        }
1494    }
1495
1496    fn any_transmission() -> BoxedStrategy<Transmission<CurrentNetwork>> {
1497        prop_oneof![
1498            (collection::vec(any::<u8>(), 512..=512))
1499                .prop_map(|bytes| Transmission::Solution(Data::Buffer(Bytes::from(bytes)))),
1500            (collection::vec(any::<u8>(), 2048..=2048))
1501                .prop_map(|bytes| Transmission::Transaction(Data::Buffer(Bytes::from(bytes)))),
1502        ]
1503        .boxed()
1504    }
1505
1506    pub fn any_solution_id() -> BoxedStrategy<SolutionID<CurrentNetwork>> {
1507        Just(0).prop_perturb(|_, rng| CryptoTestRng(rng).r#gen::<u64>().into()).boxed()
1508    }
1509
1510    pub fn any_transaction_id() -> BoxedStrategy<<CurrentNetwork as Network>::TransactionID> {
1511        Just(0)
1512            .prop_perturb(|_, rng| {
1513                <CurrentNetwork as Network>::TransactionID::from(Field::rand(&mut CryptoTestRng(rng)))
1514            })
1515            .boxed()
1516    }
1517
1518    pub fn any_transmission_id() -> BoxedStrategy<TransmissionID<CurrentNetwork>> {
1519        prop_oneof![
1520            any_transaction_id().prop_perturb(|id, mut rng| TransmissionID::Transaction(
1521                id,
1522                rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>()
1523            )),
1524            any_solution_id().prop_perturb(|id, mut rng| TransmissionID::Solution(
1525                id,
1526                rng.r#gen::<<CurrentNetwork as Network>::TransmissionChecksum>()
1527            )),
1528        ]
1529        .boxed()
1530    }
1531
1532    pub fn sign_batch_header<R: Rng + CryptoRng>(
1533        validator_set: &ValidatorSet,
1534        batch_header: &BatchHeader<CurrentNetwork>,
1535        rng: &mut R,
1536    ) -> IndexSet<Signature<CurrentNetwork>> {
1537        let mut signatures = IndexSet::with_capacity(validator_set.0.len());
1538        for validator in validator_set.0.iter() {
1539            let private_key = validator.private_key;
1540            signatures.insert(private_key.sign(&[batch_header.batch_id()], rng).unwrap());
1541        }
1542        signatures
1543    }
1544
1545    #[proptest]
1546    fn test_certificate_duplicate(
1547        context: CommitteeContext,
1548        #[any(size_range(1..16).lift())] transmissions: Vec<(AnyTransmissionID, AnyTransmission)>,
1549        mut rng: CryptoTestRng,
1550        selector: Selector,
1551    ) {
1552        let CommitteeContext(committee, ValidatorSet(validators)) = context;
1553        let committee_id = committee.id();
1554
1555        // Initialize the storage.
1556        let ledger = Arc::new(MockLedgerService::new(committee));
1557        let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1);
1558
1559        // Ensure the storage is empty.
1560        assert_storage(&storage, &[], &[], &[], &Default::default());
1561
1562        // Create a new certificate.
1563        let signer = selector.select(&validators);
1564
1565        let mut transmission_map = IndexMap::new();
1566
1567        for (AnyTransmissionID(id), AnyTransmission(t)) in transmissions.iter() {
1568            transmission_map.insert(*id, t.clone());
1569        }
1570
1571        let batch_header = BatchHeader::new(
1572            &signer.private_key,
1573            0,
1574            now(),
1575            committee_id,
1576            transmission_map.keys().cloned().collect(),
1577            Default::default(),
1578            &mut rng,
1579        )
1580        .unwrap();
1581
1582        // Remove the author from the validator set passed to create the batch
1583        // certificate, the author should not sign their own batch.
1584        let mut validators = validators.clone();
1585        validators.remove(signer);
1586
1587        let certificate = BatchCertificate::from(
1588            batch_header.clone(),
1589            sign_batch_header(&ValidatorSet(validators), &batch_header, &mut rng),
1590        )
1591        .unwrap();
1592
1593        // Retrieve the certificate ID.
1594        let certificate_id = certificate.id();
1595        let mut internal_transmissions = HashMap::<_, (_, IndexSet<Field<CurrentNetwork>>)>::new();
1596        for (AnyTransmissionID(id), AnyTransmission(t)) in transmissions.iter().cloned() {
1597            internal_transmissions.entry(id).or_insert((t, Default::default())).1.insert(certificate_id);
1598        }
1599
1600        // Retrieve the round.
1601        let round = certificate.round();
1602        // Retrieve the author of the batch.
1603        let author = certificate.author();
1604
1605        // Construct the expected layout for 'rounds'.
1606        let rounds = [(round, indexset! { (certificate_id, author) })];
1607        // Construct the expected layout for 'certificates'.
1608        let certificates = [(certificate_id, certificate.clone())];
1609        // Construct the expected layout for 'batch_ids'.
1610        let batch_ids = [(certificate_id, round)];
1611
1612        // Insert the certificate.
1613        let missing_transmissions: HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>> =
1614            transmission_map.into_iter().collect();
1615        storage.insert_certificate_atomic(certificate.clone(), Default::default(), missing_transmissions.clone());
1616        // Ensure the certificate exists in storage.
1617        assert!(storage.contains_certificate(certificate_id));
1618        // Check that the underlying storage representation is correct.
1619        assert_storage(&storage, &rounds, &certificates, &batch_ids, &internal_transmissions);
1620
1621        // Insert the certificate again - without any missing transmissions.
1622        storage.insert_certificate_atomic(certificate.clone(), Default::default(), Default::default());
1623        // Ensure the certificate exists in storage.
1624        assert!(storage.contains_certificate(certificate_id));
1625        // Check that the underlying storage representation remains unchanged.
1626        assert_storage(&storage, &rounds, &certificates, &batch_ids, &internal_transmissions);
1627
1628        // Insert the certificate again - with all of the original missing transmissions.
1629        storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1630        // Ensure the certificate exists in storage.
1631        assert!(storage.contains_certificate(certificate_id));
1632        // Check that the underlying storage representation remains unchanged.
1633        assert_storage(&storage, &rounds, &certificates, &batch_ids, &internal_transmissions);
1634    }
1635}