Skip to main content

snarkos_node_bft/helpers/
storage.rs

1// Copyright (c) 2019-2026 Provable Inc.
2// This file is part of the snarkOS library.
3
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at:
7
8// http://www.apache.org/licenses/LICENSE-2.0
9
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16use crate::helpers::{check_timestamp_for_liveness, fmt_id};
17use snarkos_node_bft_ledger_service::LedgerService;
18use snarkos_node_bft_storage_service::StorageService;
19use snarkvm::{
20    ledger::{
21        block::{Block, Transaction},
22        narwhal::{BatchCertificate, BatchHeader, Transmission, TransmissionID},
23    },
24    prelude::{Address, Field, Network, Result, anyhow, bail, ensure},
25    utilities::{cfg_into_iter, cfg_iter, cfg_sorted_by},
26};
27
28use anyhow::Context;
29use indexmap::{IndexMap, IndexSet, map::Entry};
30#[cfg(feature = "locktick")]
31use locktick::parking_lot::RwLock;
32use lru::LruCache;
33#[cfg(not(feature = "locktick"))]
34use parking_lot::RwLock;
35#[cfg(not(feature = "serial"))]
36use rayon::prelude::*;
37use std::{
38    collections::{HashMap, HashSet},
39    num::NonZeroUsize,
40    sync::{
41        Arc,
42        atomic::{AtomicU32, AtomicU64, Ordering},
43    },
44};
45
46#[derive(Clone, Debug)]
47pub struct Storage<N: Network>(Arc<StorageInner<N>>);
48
49impl<N: Network> std::ops::Deref for Storage<N> {
50    type Target = Arc<StorageInner<N>>;
51
52    fn deref(&self) -> &Self::Target {
53        &self.0
54    }
55}
56
57/// The storage for the memory pool.
58///
59/// The storage is used to store the following:
60/// - `current_height` tracker.
61/// - `current_round` tracker.
62/// - `round` to `(certificate ID, batch ID, author)` entries.
63/// - `certificate ID` to `certificate` entries.
64/// - `batch ID` to `round` entries.
65/// - `transmission ID` to `(transmission, certificate IDs)` entries.
66///
67/// The chain of events is as follows:
68/// 1. A `transmission` is received.
69/// 2. After a `batch` is ready to be stored:
70///   - The `certificate` is inserted, triggering updates to the
71///     `rounds`, `certificates`, `batch_ids`, and `transmissions` maps.
72///   - The missing `transmissions` from storage are inserted into the `transmissions` map.
73///   - The certificate ID is inserted into the `transmissions` map.
74/// 3. After a `round` reaches quorum threshold:
75///  - The next round is inserted into the `current_round`.
76#[derive(Debug)]
77pub struct StorageInner<N: Network> {
78    /// The ledger service.
79    ledger: Arc<dyn LedgerService<N>>,
80    /* Once per block */
81    /// The current height.
82    current_height: AtomicU32,
83    /* Once per round */
84    /// The current round.
85    ///
86    /// Invariant: current_round > 0.
87    /// This is established in [`Storage::new`], which sets it to at least 1 via [`Storage::update_current_round`].
88    /// The only callers of [`Storage::update_current_round`] are
89    /// [`Storage::increment_to_next_round`] and [`Storage::sync_round_with_block`],
90    /// both of which set it to at least 1.
91    current_round: AtomicU64,
92    /// The `round` for which garbage collection has occurred **up to** (inclusive).
93    gc_round: AtomicU64,
94    /// The maximum number of rounds to keep in storage.
95    max_gc_rounds: u64,
96    /* Once per batch */
97    /// The map of `round` to a list of `(certificate ID, author)` entries.
98    rounds: RwLock<IndexMap<u64, IndexSet<(Field<N>, Address<N>)>>>,
99    /// A cache of `certificate ID` to unprocessed `certificate`.
100    unprocessed_certificates: RwLock<LruCache<Field<N>, BatchCertificate<N>>>,
101    /// The map of `certificate ID` to `certificate`.
102    certificates: RwLock<IndexMap<Field<N>, BatchCertificate<N>>>,
103    /// The map of `certificate ID` to `round`.
104    batch_ids: RwLock<IndexMap<Field<N>, u64>>,
105    /// The map of `transmission ID` to `(transmission, certificate IDs)` entries.
106    transmissions: Arc<dyn StorageService<N>>,
107}
108
109impl<N: Network> Storage<N> {
110    /// Initializes a new instance of storage.
111    pub fn new(
112        ledger: Arc<dyn LedgerService<N>>,
113        transmissions: Arc<dyn StorageService<N>>,
114        max_gc_rounds: u64,
115    ) -> Result<Self> {
116        // Retrieve the latest committee bonded in the ledger
117        // (genesis committee if the ledger contains only the genesis block).
118        let committee = ledger.current_committee().expect("Ledger is missing a committee.");
119        // Retrieve the round at which that committee was created, or 1 if it is the genesis committee.
120        let current_round = committee.starting_round().max(1);
121        // Set the unprocessed certificates cache size.
122        let unprocessed_cache_size = NonZeroUsize::new((N::LATEST_MAX_CERTIFICATES() * 2) as usize).unwrap();
123
124        // Create the storage.
125        let storage = Self(Arc::new(StorageInner {
126            ledger,
127            current_height: Default::default(),
128            current_round: AtomicU64::new(current_round),
129            gc_round: Default::default(),
130            max_gc_rounds,
131            rounds: Default::default(),
132            unprocessed_certificates: RwLock::new(LruCache::new(unprocessed_cache_size)),
133            certificates: Default::default(),
134            batch_ids: Default::default(),
135            transmissions,
136        }));
137
138        // Perform GC on the current round.
139        // Since there are no certificates yet, this only sets `gc_round`.
140        storage.garbage_collect_certificates(current_round)?;
141
142        // Return the storage.
143        Ok(storage)
144    }
145}
146
147impl<N: Network> Storage<N> {
148    /// Returns the current height.
149    pub fn current_height(&self) -> u32 {
150        // Get the current height.
151        self.current_height.load(Ordering::SeqCst)
152    }
153}
154
155impl<N: Network> Storage<N> {
156    /// Returns the current round.
157    pub fn current_round(&self) -> u64 {
158        // Get the current round.
159        self.current_round.load(Ordering::SeqCst)
160    }
161
162    /// Returns the `round` that garbage collection has occurred **up to** (inclusive).
163    ///
164    /// # Invariants
165    /// The value returned is greater or equal to return values of prior calls to this method.
166    pub fn gc_round(&self) -> u64 {
167        // Get the GC round.
168        self.gc_round.load(Ordering::SeqCst)
169    }
170
171    /// Returns the maximum number of rounds to keep in storage.
172    pub fn max_gc_rounds(&self) -> u64 {
173        self.max_gc_rounds
174    }
175
176    /// Increments storage to the next round, updating the current round.
177    /// Note: This method is only called once per round, upon certification of the primary's batch.
178    pub fn increment_to_next_round(&self, current_round: u64) -> Result<u64> {
179        // Determine the next round.
180        let next_round = current_round + 1;
181
182        // Check if the next round is less than the current round in storage.
183        {
184            // Retrieve the storage round.
185            let storage_round = self.current_round();
186            // If the next round is less than the current round in storage, return early with the storage round.
187            if next_round < storage_round {
188                return Ok(storage_round);
189            }
190
191            trace!("Incrementing storage from round {storage_round} to {next_round}");
192        }
193
194        // Retrieve the current committee.
195        let current_committee = self.ledger.current_committee()?;
196        // Retrieve the current committee's starting round.
197        let starting_round = current_committee.starting_round();
198        // If the primary is behind the current committee's starting round, sync with the latest block.
199        if next_round < starting_round {
200            // Retrieve the latest block round.
201            let latest_block_round = self.ledger.latest_round();
202            // Log the round sync.
203            info!(
204                "Syncing primary round ({next_round}) with the current committee's starting round ({starting_round}). Syncing with the latest block round {latest_block_round}..."
205            );
206            // Sync the round with the latest block.
207            self.sync_round_with_block(latest_block_round);
208            // Return the latest block round.
209            return Ok(latest_block_round);
210        }
211
212        // Update the storage to the next round.
213        self.update_current_round(next_round);
214
215        #[cfg(feature = "metrics")]
216        metrics::gauge(metrics::bft::LAST_STORED_ROUND, next_round as f64);
217
218        // Retrieve the storage round.
219        let storage_round = self.current_round();
220        // Retrieve the GC round.
221        let gc_round = self.gc_round();
222        // Ensure the next round matches in storage.
223        ensure!(next_round == storage_round, "The next round {next_round} does not match in storage ({storage_round})");
224        // Ensure the next round is greater than or equal to the GC round.
225        ensure!(next_round >= gc_round, "The next round {next_round} is behind the GC round {gc_round}");
226
227        // Log the updated round.
228        info!("Starting round {next_round}...");
229        Ok(next_round)
230    }
231
232    /// Updates the storage to the next round.
233    fn update_current_round(&self, next_round: u64) {
234        // Update the current round.
235        self.current_round.store(next_round, Ordering::SeqCst);
236    }
237
238    /// Update the storage by performing garbage collection based on the next round.
239    pub(crate) fn garbage_collect_certificates(&self, next_round: u64) -> Result<()> {
240        // Fetch the current GC round.
241        let current_gc_round = self.gc_round();
242        // Compute the next GC round.
243        let next_gc_round = next_round.saturating_sub(self.max_gc_rounds);
244        // Check if storage needs to be garbage collected.
245        if next_gc_round > current_gc_round {
246            if self
247                .gc_round
248                .compare_exchange(current_gc_round, next_gc_round, Ordering::SeqCst, Ordering::SeqCst)
249                .is_err()
250            {
251                bail!("Concurrent updates to GC round detected.");
252            }
253
254            // Remove the GC round(s) from storage.
255            for gc_round in current_gc_round..=next_gc_round {
256                // Iterate over the certificates for the GC round.
257                for id in self.get_certificate_ids_for_round(gc_round).into_iter() {
258                    trace!(
259                        "Garbage collecting certificate {id} at round {gc_round} (cut-off is round {next_gc_round})"
260                    );
261                    self.remove_certificate(id);
262                }
263            }
264            // Update the GC round.
265            self.gc_round.store(next_gc_round, Ordering::SeqCst);
266        } else if next_gc_round < current_gc_round {
267            bail!("Attempted to decrease GC round from {current_gc_round} to {next_gc_round}");
268        }
269
270        Ok(())
271    }
272}
273
274impl<N: Network> Storage<N> {
275    /// Returns `true` if the storage contains the specified `round`.
276    pub fn contains_certificates_for_round(&self, round: u64) -> bool {
277        // Check if the round exists in storage.
278        self.rounds.read().contains_key(&round)
279    }
280
281    /// Returns `true` if the storage contains the specified `certificate ID`.
282    pub fn contains_certificate(&self, certificate_id: Field<N>) -> bool {
283        // Check if the certificate ID exists in storage.
284        self.certificates.read().contains_key(&certificate_id)
285    }
286
287    /// Returns `true` if the storage contains a certificate from the specified `author` in the given `round`.
288    pub fn contains_certificate_in_round_from(&self, round: u64, author: Address<N>) -> bool {
289        self.rounds.read().get(&round).is_some_and(|set| set.iter().any(|(_, a)| a == &author))
290    }
291
292    /// Returns `true` if the storage contains the specified `certificate ID`.
293    pub fn contains_unprocessed_certificate(&self, certificate_id: Field<N>) -> bool {
294        // Check if the certificate ID exists in storage.
295        self.unprocessed_certificates.read().contains(&certificate_id)
296    }
297
298    /// Returns `true` if the storage contains the specified `batch ID`.
299    pub fn contains_batch(&self, batch_id: Field<N>) -> bool {
300        // Check if the batch ID exists in storage.
301        self.batch_ids.read().contains_key(&batch_id)
302    }
303
304    /// Returns `true` if the storage contains the specified transmission, or it was recorded as aborted.
305    pub fn contains_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> bool {
306        self.transmissions.contains_transmission(transmission_id.into())
307    }
308
309    /// Returns the transmission for the given `transmission ID`.
310    /// If the transmission ID does not exist in storage or was aborted, `None` is returned.
311    pub fn get_transmission(&self, transmission_id: impl Into<TransmissionID<N>>) -> Option<Transmission<N>> {
312        self.transmissions.get_transmission(transmission_id.into())
313    }
314
315    /// Returns the round for the given `certificate ID`.
316    /// If the certificate ID does not exist in storage, `None` is returned.
317    pub fn get_round_for_certificate(&self, certificate_id: Field<N>) -> Option<u64> {
318        // Get the round.
319        self.certificates.read().get(&certificate_id).map(|certificate| certificate.round())
320    }
321
322    /// Returns the round for the given `batch ID`.
323    /// If the batch ID does not exist in storage, `None` is returned.
324    pub fn get_round_for_batch(&self, batch_id: Field<N>) -> Option<u64> {
325        // Get the round.
326        self.batch_ids.read().get(&batch_id).copied()
327    }
328
329    /// Returns the certificate round for the given `certificate ID`.
330    /// If the certificate ID does not exist in storage, `None` is returned.
331    pub fn get_certificate_round(&self, certificate_id: Field<N>) -> Option<u64> {
332        // Get the batch certificate and return the round.
333        self.certificates.read().get(&certificate_id).map(|certificate| certificate.round())
334    }
335
336    /// Returns the certificate for the given `certificate ID`.
337    /// If the certificate ID does not exist in storage, `None` is returned.
338    pub fn get_certificate(&self, certificate_id: Field<N>) -> Option<BatchCertificate<N>> {
339        // Get the batch certificate.
340        self.certificates.read().get(&certificate_id).cloned()
341    }
342
343    /// Returns the unprocessed certificate for the given `certificate ID`.
344    /// If the certificate ID does not exist in storage, `None` is returned.
345    pub fn get_unprocessed_certificate(&self, certificate_id: Field<N>) -> Option<BatchCertificate<N>> {
346        // Get the unprocessed certificate.
347        self.unprocessed_certificates.read().peek(&certificate_id).cloned()
348    }
349
350    /// Returns the certificate for the given `round` and `author`.
351    /// If the round does not exist in storage, `None` is returned.
352    /// If the author for the round does not exist in storage, `None` is returned.
353    pub fn get_certificate_for_round_with_author(&self, round: u64, author: Address<N>) -> Option<BatchCertificate<N>> {
354        // Retrieve the certificates.
355        if let Some(entries) = self.rounds.read().get(&round) {
356            let certificates = self.certificates.read();
357            entries.iter().find_map(
358                |(certificate_id, a)| if a == &author { certificates.get(certificate_id).cloned() } else { None },
359            )
360        } else {
361            Default::default()
362        }
363    }
364
365    /// Returns the certificates for the given `round`.
366    /// If the round does not exist in storage, an empty set is returned.
367    pub fn get_certificates_for_round(&self, round: u64) -> IndexSet<BatchCertificate<N>> {
368        // The genesis round does not have batch certificates.
369        if round == 0 {
370            return Default::default();
371        }
372        // Retrieve the certificates.
373        if let Some(entries) = self.rounds.read().get(&round) {
374            let certificates = self.certificates.read();
375            entries.iter().flat_map(|(certificate_id, _)| certificates.get(certificate_id).cloned()).collect()
376        } else {
377            Default::default()
378        }
379    }
380
381    /// Returns the certificate IDs for the given `round`.
382    /// If the round does not exist in storage, an empty set is returned.
383    pub fn get_certificate_ids_for_round(&self, round: u64) -> IndexSet<Field<N>> {
384        // The genesis round does not have batch certificates.
385        if round == 0 {
386            return Default::default();
387        }
388        // Retrieve the certificates.
389        if let Some(entries) = self.rounds.read().get(&round) {
390            entries.iter().map(|(certificate_id, _)| *certificate_id).collect()
391        } else {
392            Default::default()
393        }
394    }
395
396    /// Returns the certificate authors for the given `round`.
397    /// If the round does not exist in storage, an empty set is returned.
398    pub fn get_certificate_authors_for_round(&self, round: u64) -> HashSet<Address<N>> {
399        // The genesis round does not have batch certificates.
400        if round == 0 {
401            return Default::default();
402        }
403        // Retrieve the certificates.
404        if let Some(entries) = self.rounds.read().get(&round) {
405            entries.iter().map(|(_, author)| *author).collect()
406        } else {
407            Default::default()
408        }
409    }
410
411    /// Returns the certificates that have not yet been included in the ledger.
412    /// Note that the order of this set is by round and then insertion.
413    pub(crate) fn get_pending_certificates(&self) -> IndexSet<BatchCertificate<N>> {
414        // Obtain the read locks.
415        let rounds = self.rounds.read();
416        let certificates = self.certificates.read();
417
418        // Iterate over the rounds.
419        cfg_sorted_by!(rounds.clone(), |a, _, b, _| a.cmp(b))
420            .flat_map(|(_, certificates_for_round)| {
421                // Iterate over the certificates for the round.
422                cfg_into_iter!(certificates_for_round).filter_map(|(certificate_id, _)| {
423                    // Skip the certificate if it already exists in the ledger.
424                    if self.ledger.contains_certificate(&certificate_id).unwrap_or(false) {
425                        None
426                    } else {
427                        // Add the certificate to the pending certificates.
428                        certificates.get(&certificate_id).cloned()
429                    }
430                })
431            })
432            .collect()
433    }
434
435    /// Checks the given `batch_header` for validity, returning the missing transmissions from storage.
436    ///
437    /// # Arguments
438    /// - `batch_header`: The batch header to check.
439    /// - `transmissions`: All transmissions referenced by the certificate.
440    /// - `aborted_transmissions`: The set of aborted transmissions in this certificate.
441    ///
442    /// # Invariants
443    /// This method ensures the following invariants:
444    /// - The batch ID does not already exist in storage.
445    /// - The author is a member of the committee for the batch round.
446    /// - The timestamp is within the allowed time range.
447    /// - None of the transmissions are from any past rounds (up to GC).
448    /// - All transmissions declared in the batch header are provided or exist in storage (up to GC).
449    /// - All previous certificates declared in the certificate exist in storage (up to GC).
450    /// - All previous certificates are for the previous round (i.e. round - 1).
451    /// - All previous certificates contain a unique author.
452    /// - The previous certificates reached the quorum threshold (N - f).
453    ///
454    /// # Returns
455    /// - `Ok(Some(txns))` for a valid new batch, where `txns` is the set of missing transactions in the batch
456    ///   that need to be fetched from peers.
457    /// - `Ok(None)` if the batch already exists in storage
458    pub fn check_batch_header(
459        &self,
460        batch_header: &BatchHeader<N>,
461        transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
462        aborted_transmissions: HashSet<TransmissionID<N>>,
463    ) -> Result<Option<HashMap<TransmissionID<N>, Transmission<N>>>> {
464        // Retrieve the round.
465        let round = batch_header.round();
466        // Retrieve the GC round.
467        let gc_round = self.gc_round();
468        // Construct a GC log message.
469        let gc_log = format!("(gc = {gc_round})");
470
471        // Ensure the batch ID does not already exist in storage.
472        if self.contains_batch(batch_header.batch_id()) {
473            debug!("Batch for round {round} already exists in storage {gc_log}");
474            return Ok(None);
475        }
476
477        // Retrieve the committee lookback for the batch round.
478        let Ok(committee_lookback) = self.ledger.get_committee_lookback_for_round(round) else {
479            bail!("Storage failed to retrieve the committee lookback for round {round} {gc_log}")
480        };
481        // Ensure the author is in the committee.
482        if !committee_lookback.is_committee_member(batch_header.author()) {
483            bail!("Author {} is not in the committee for round {round} {gc_log}", batch_header.author())
484        }
485
486        // Check the timestamp for liveness.
487        check_timestamp_for_liveness(batch_header.timestamp())?;
488
489        // Determine which transmissions in this batch are missing from storage.
490        let missing_transmissions = self
491            .transmissions
492            .find_missing_transmissions(batch_header, transmissions, aborted_transmissions)
493            .map_err(|e| anyhow!("{e} for round {round} {gc_log}"))?;
494
495        // Compute the previous round.
496        let previous_round = round.saturating_sub(1);
497        // Check if the previous round is within range of the GC round.
498        if previous_round > gc_round {
499            // Retrieve the committee lookback for the previous round.
500            let Ok(previous_committee_lookback) = self.ledger.get_committee_lookback_for_round(previous_round) else {
501                bail!("Missing committee for the previous round {previous_round} in storage {gc_log}")
502            };
503            // Ensure the previous round certificates exists in storage.
504            if !self.contains_certificates_for_round(previous_round) {
505                bail!("Missing certificates for the previous round {previous_round} in storage {gc_log}")
506            }
507            // Ensure the number of previous certificate IDs is at or below the number of committee members.
508            if batch_header.previous_certificate_ids().len() > previous_committee_lookback.num_members() {
509                bail!("Too many previous certificates for round {round} {gc_log}")
510            }
511            // Initialize a set of the previous authors.
512            let mut previous_authors = HashSet::with_capacity(batch_header.previous_certificate_ids().len());
513            // Ensure storage contains all declared previous certificates (up to GC).
514            for previous_certificate_id in batch_header.previous_certificate_ids() {
515                // Retrieve the previous certificate.
516                let Some(previous_certificate) = self.get_certificate(*previous_certificate_id) else {
517                    bail!(
518                        "Missing previous certificate '{}' for certificate in round {round} {gc_log}",
519                        fmt_id(previous_certificate_id)
520                    )
521                };
522                // Ensure the previous certificate is for the previous round.
523                if previous_certificate.round() != previous_round {
524                    bail!("Round {round} certificate contains a round {previous_round} certificate {gc_log}")
525                }
526                // Ensure the previous author is new.
527                if previous_authors.contains(&previous_certificate.author()) {
528                    bail!("Round {round} certificate contains a duplicate author {gc_log}")
529                }
530                // Insert the author of the previous certificate.
531                previous_authors.insert(previous_certificate.author());
532            }
533            // Ensure the previous certificates have reached the quorum threshold.
534            if !previous_committee_lookback.is_quorum_threshold_reached(&previous_authors) {
535                bail!("Previous certificates for a batch in round {round} did not reach quorum threshold {gc_log}")
536            }
537        }
538
539        Ok(Some(missing_transmissions))
540    }
541
542    /// Check the validity of a certificate coming from another validator.
543    ///
544    /// It suffices to check that the signers (author and endorsers) are members of the applicable committee
545    /// and that they form a quorum in the committee.
546    /// Under the fundamental fault tolerance assumption of at most `f` (stake of) faulty validators,
547    /// the quorum check on signers guarantees that at least one correct validator
548    /// has ensured the validity of the proposal contained in the certificate,
549    /// either by construction (by the author) or by checking (by an endorser):
550    /// given `N > 0` total stake, and `f` the largest integer `< N/3` (where `/` is exact rational division),
551    /// we have `N >= 3f + 1`, which implies `N - f >= 2f + 1`, which is always `> f`;
552    /// `N - f` is the quorum stake.
553    pub fn check_incoming_certificate(&self, certificate: &BatchCertificate<N>) -> Result<()> {
554        // Retrieve the certificate author and round.
555        let certificate_author = certificate.author();
556        let certificate_round = certificate.round();
557
558        // Retrieve the committee lookback.
559        let committee_lookback = self.ledger.get_committee_lookback_for_round(certificate_round)?;
560
561        // Ensure that the signers of the certificate reach the quorum threshold.
562        // Note that certificate.signatures() only returns the endorsing signatures, not the author's signature.
563        let mut signers: HashSet<Address<N>> =
564            certificate.signatures().map(|signature| signature.to_address()).collect();
565        signers.insert(certificate_author);
566        ensure!(
567            committee_lookback.is_quorum_threshold_reached(&signers),
568            "Certificate '{}' for round {certificate_round} does not meet quorum requirements",
569            certificate.id()
570        );
571
572        // Ensure that the signers of the certificate are in the committee.
573        cfg_iter!(&signers).try_for_each(|signer| {
574            ensure!(
575                committee_lookback.is_committee_member(*signer),
576                "Signer '{signer}' of certificate '{}' for round {certificate_round} is not in the committee",
577                certificate.id()
578            );
579            Ok(())
580        })?;
581
582        Ok(())
583    }
584
585    /// Checks the given `certificate` for validity, returning the missing transmissions from storage.
586    ///
587    /// # Arguments
588    /// - `certificate`: The certificate to check.
589    /// - `transmissions`: The transmissions contained in the certificate.
590    /// - `aborted_transmissions`: The aborted transmission contained in the certificate.
591    ///
592    /// # Invariants
593    /// This method ensures the following invariants:
594    /// - The certificate ID does not already exist in storage.
595    /// - The batch ID does not already exist in storage.
596    /// - The author is a member of the committee for the batch round.
597    /// - The author has not already created a certificate for the batch round.
598    /// - The timestamp is within the allowed time range.
599    /// - None of the transmissions are from any past rounds (up to GC).
600    /// - All transmissions declared in the batch header are provided or exist in storage (up to GC).
601    /// - All previous certificates declared in the certificate exist in storage (up to GC).
602    /// - All previous certificates are for the previous round (i.e. round - 1).
603    /// - The previous certificates reached the quorum threshold (N - f).
604    /// - The timestamps from the signers are all within the allowed time range.
605    /// - The signers have reached the quorum threshold (N - f).
606    pub fn check_certificate(
607        &self,
608        certificate: &BatchCertificate<N>,
609        transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
610        aborted_transmissions: HashSet<TransmissionID<N>>,
611    ) -> Result<HashMap<TransmissionID<N>, Transmission<N>>> {
612        // Retrieve the round.
613        let round = certificate.round();
614        // Retrieve the GC round.
615        let gc_round = self.gc_round();
616        // Construct a GC log message.
617        let gc_log = format!("(gc = {gc_round})");
618
619        // Ensure the certificate ID does not already exist in storage.
620        if self.contains_certificate(certificate.id()) {
621            bail!("Certificate for round {round} already exists in storage {gc_log}")
622        }
623
624        // Ensure the storage does not already contain a certificate for this author in this round.
625        if self.contains_certificate_in_round_from(round, certificate.author()) {
626            bail!("Certificate with this author for round {round} already exists in storage {gc_log}")
627        }
628
629        // Ensure the batch header is well-formed.
630        let Some(missing_transmissions) =
631            self.check_batch_header(certificate.batch_header(), transmissions, aborted_transmissions)?
632        else {
633            bail!("Certificate for round {round} already exists in storage {gc_log}")
634        };
635
636        // Check the timestamp for liveness.
637        check_timestamp_for_liveness(certificate.timestamp())?;
638
639        // Retrieve the committee lookback for the batch round.
640        let Ok(committee_lookback) = self.ledger.get_committee_lookback_for_round(round) else {
641            bail!("Storage failed to retrieve the committee for round {round} {gc_log}")
642        };
643
644        // Initialize a set of the signers.
645        let mut signers = HashSet::with_capacity(certificate.signatures().len() + 1);
646        // Append the batch author.
647        signers.insert(certificate.author());
648
649        // Iterate over the signatures.
650        for signature in certificate.signatures() {
651            // Retrieve the signer.
652            let signer = signature.to_address();
653            // Ensure the signer is in the committee.
654            if !committee_lookback.is_committee_member(signer) {
655                bail!("Signer {signer} is not in the committee for round {round} {gc_log}")
656            }
657            // Append the signer.
658            signers.insert(signer);
659        }
660
661        // Ensure the signatures have reached the quorum threshold.
662        if !committee_lookback.is_quorum_threshold_reached(&signers) {
663            bail!("Signatures for a batch in round {round} did not reach quorum threshold {gc_log}")
664        }
665
666        Ok(missing_transmissions)
667    }
668
669    /// Inserts the given `certificate` into storage.
670    ///
671    /// This method triggers updates to the `rounds`, `certificates`, `batch_ids`, and `transmissions` maps.
672    ///
673    /// # Arguments
674    /// - `certificate`: The certificate to insert.
675    /// - `transmissions`: The transmissions contained in the certificate, or the subset of the transmissions that in the certificate that do not yet exist in storage.
676    /// - `aborted_transmissions`: The aborted transmission contained in the certificate.
677    ///
678    /// # Invariants
679    /// This method ensures the following invariants:
680    /// - The certificate ID does not already exist in storage.
681    /// - The batch ID does not already exist in storage.
682    /// - All transmissions declared in the certificate are provided or exist in storage (up to GC).
683    /// - All previous certificates declared in the certificate exist in storage (up to GC).
684    /// - All previous certificates are for the previous round (i.e. round - 1).
685    /// - The previous certificates reached the quorum threshold (N - f).
686    pub fn insert_certificate(
687        &self,
688        certificate: BatchCertificate<N>,
689        transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
690        aborted_transmissions: HashSet<TransmissionID<N>>,
691    ) -> Result<()> {
692        // Ensure the certificate round is above the GC round.
693        ensure!(certificate.round() > self.gc_round(), "Certificate round is at or below the GC round");
694        // Ensure the certificate and its transmissions are valid.
695        let missing_transmissions =
696            self.check_certificate(&certificate, transmissions, aborted_transmissions.clone())?;
697        // Insert the certificate into storage.
698        self.insert_certificate_atomic(certificate, aborted_transmissions, missing_transmissions);
699        Ok(())
700    }
701
702    /// Inserts the given `certificate` into storage.
703    ///
704    /// This method assumes **all missing** transmissions are provided in the `missing_transmissions` map.
705    ///
706    /// This method triggers updates to the `rounds`, `certificates`, `batch_ids`, and `transmissions` maps.
707    fn insert_certificate_atomic(
708        &self,
709        certificate: BatchCertificate<N>,
710        aborted_transmission_ids: HashSet<TransmissionID<N>>,
711        missing_transmissions: HashMap<TransmissionID<N>, Transmission<N>>,
712    ) {
713        // Retrieve the round.
714        let round = certificate.round();
715        // Retrieve the certificate ID.
716        let certificate_id = certificate.id();
717        // Retrieve the author of the batch.
718        let author = certificate.author();
719
720        // Insert the round to certificate ID entry.
721        self.rounds.write().entry(round).or_default().insert((certificate_id, author));
722        // Obtain the certificate's transmission ids.
723        let transmission_ids = certificate.transmission_ids().clone();
724        // Insert the certificate.
725        self.certificates.write().insert(certificate_id, certificate);
726        // Remove the unprocessed certificate.
727        self.unprocessed_certificates.write().pop(&certificate_id);
728        // Insert the batch ID.
729        self.batch_ids.write().insert(certificate_id, round);
730        // Insert the certificate ID for each of the transmissions into storage.
731        self.transmissions.insert_transmissions(
732            certificate_id,
733            transmission_ids,
734            aborted_transmission_ids,
735            missing_transmissions,
736        );
737    }
738
739    /// Inserts the given unprocessed `certificate` into storage.
740    ///
741    /// This is a temporary storage, which is cleared again when calling `insert_certificate_atomic`.
742    pub fn insert_unprocessed_certificate(&self, certificate: BatchCertificate<N>) -> Result<()> {
743        // Ensure the certificate round is above the GC round.
744        ensure!(certificate.round() > self.gc_round(), "Certificate round is at or below the GC round");
745        // Insert the certificate.
746        self.unprocessed_certificates.write().put(certificate.id(), certificate);
747
748        Ok(())
749    }
750
751    /// Removes the given `certificate ID` from storage. This method is used to garbage collect individual certificates once blocks are committed.
752    ///
753    /// This method triggers updates to the `rounds`, `certificates`, `batch_ids`, and `transmissions` maps.
754    ///
755    /// If the certificate was successfully removed, `true` is returned.
756    /// If the certificate did not exist in storage, `false` is returned.
757    fn remove_certificate(&self, certificate_id: Field<N>) -> bool {
758        // Retrieve the certificate.
759        let Some(certificate) = self.get_certificate(certificate_id) else {
760            warn!("Certificate {certificate_id} does not exist in storage");
761            return false;
762        };
763        // Retrieve the round.
764        let round = certificate.round();
765        // Compute the author of the batch.
766        let author = certificate.author();
767
768        // TODO (howardwu): We may want to use `shift_remove` below, in order to align compatibility
769        //  with tests written to for `remove_certificate`. However, this will come with performance hits.
770        //  It will be better to write tests that compare the union of the sets.
771
772        // Update the round.
773        match self.rounds.write().entry(round) {
774            Entry::Occupied(mut entry) => {
775                // Remove the round to certificate ID entry.
776                entry.get_mut().swap_remove(&(certificate_id, author));
777                // If the round is empty, remove it.
778                if entry.get().is_empty() {
779                    entry.swap_remove();
780                }
781            }
782            Entry::Vacant(_) => {}
783        }
784        // Remove the certificate.
785        self.certificates.write().swap_remove(&certificate_id);
786        // Remove the unprocessed certificate.
787        self.unprocessed_certificates.write().pop(&certificate_id);
788        // Remove the batch ID.
789        self.batch_ids.write().swap_remove(&certificate_id);
790        // Remove the transmission entries in the certificate from storage.
791        self.transmissions.remove_transmissions(&certificate_id, certificate.transmission_ids());
792        // Return successfully.
793        true
794    }
795}
796
797impl<N: Network> Storage<N> {
798    /// Syncs the current height with the block.
799    pub(crate) fn sync_height_with_block(&self, next_height: u32) {
800        // If the block height is greater than the current height in storage, sync the height.
801        if next_height > self.current_height() {
802            // Update the current height in storage.
803            self.current_height.store(next_height, Ordering::SeqCst);
804        }
805    }
806
807    /// Syncs the current round with the block.
808    pub(crate) fn sync_round_with_block(&self, next_round: u64) {
809        // Ensure we sync to at least round 1.
810        let next_round = next_round.max(1);
811        // If the round in the block is greater than the current round in storage, sync the round.
812        if next_round > self.current_round() {
813            // Update the current round in storage.
814            self.update_current_round(next_round);
815            // Log the updated round.
816            info!("Synced to round {next_round}...");
817        } else {
818            trace!(
819                "Skipping sync to round {next_round} as it is less than or equal to the current round ({})",
820                self.current_round()
821            );
822        }
823    }
824
825    /// Syncs the batch certificate with the block.
826    pub(crate) fn sync_certificate_with_block(
827        &self,
828        block: &Block<N>,
829        certificate: BatchCertificate<N>,
830        unconfirmed_transactions: &HashMap<N::TransactionID, Transaction<N>>,
831        trusted_ledger_certificate: bool,
832    ) -> Result<()> {
833        // Skip if the certificate round is below the GC round.
834        let gc_round = self.gc_round();
835        if certificate.round() <= gc_round {
836            trace!("Got certificate for round {} below GC round ({gc_round}). Will not store it.", certificate.round());
837            return Ok(());
838        }
839
840        // If the certificate ID already exists in storage, skip it.
841        if self.contains_certificate(certificate.id()) {
842            trace!("Got certificate {} for round {} more than once.", certificate.id(), certificate.round());
843            return Ok(());
844        }
845        // Retrieve the transmissions for the certificate.
846        let mut missing_transmissions = HashMap::new();
847
848        // Retrieve the aborted transmissions for the certificate.
849        let mut aborted_transmissions = HashSet::new();
850
851        // Track the block's aborted solutions and transactions.
852        let aborted_solutions: IndexSet<_> = block.aborted_solution_ids().iter().collect();
853        let aborted_transactions: IndexSet<_> = block.aborted_transaction_ids().iter().collect();
854
855        // Iterate over the transmission IDs.
856        for transmission_id in certificate.transmission_ids() {
857            // If the transmission ID already exists in the map, skip it.
858            if missing_transmissions.contains_key(transmission_id) {
859                continue;
860            }
861            // If the transmission ID exists in storage, skip it.
862            if self.contains_transmission(*transmission_id) {
863                continue;
864            }
865            // Retrieve the transmission.
866            match transmission_id {
867                TransmissionID::Ratification => (),
868                TransmissionID::Solution(solution_id, _) => {
869                    // The solution may exist either in the block itself, or stored in the ledger.
870                    // Note that the ledger does *not* store aborted transmissions, so we need to check if the solution
871                    // was aborted before querying the ledger.
872                    //
873                    // Aborted transmissions only appear in the aborted set of the first block that contains them,
874                    // for subsequent blocks, we can check that `contains_transmission` is true to determine if the transmission was aborted.
875                    if let Some(solution) = block.get_solution(solution_id) {
876                        missing_transmissions.insert(*transmission_id, (*solution).into());
877                    } else if let Ok(Some(solution)) = self.ledger.get_solution(solution_id) {
878                        missing_transmissions.insert(*transmission_id, solution.into());
879                    } else if aborted_solutions.contains(solution_id)
880                        || self.ledger.contains_transmission(transmission_id).unwrap_or(false)
881                    {
882                        aborted_transmissions.insert(*transmission_id);
883                    } else {
884                        bail!("Missing solution {solution_id} for block {}", block.height());
885                    }
886                }
887                TransmissionID::Transaction(transaction_id, _) => {
888                    // The transaction may exists either in the block itself, or stored in the ledger.
889                    // Note that the ledger does *not* store aborted transmissions, so we need to check if the transaction
890                    // was aborted before querying the ledger.
891                    //
892                    // Aborted solutions only appear in the aborted set of the first block that contains them,
893                    // for subsequent blocks, we can check that `contains_transmission` is true to determine if the transaction was aborted.
894                    if let Some(transaction) = unconfirmed_transactions.get(transaction_id) {
895                        missing_transmissions.insert(*transmission_id, transaction.clone().into());
896                    } else if let Ok(Some(transaction)) = self.ledger.get_unconfirmed_transaction(*transaction_id) {
897                        missing_transmissions.insert(*transmission_id, transaction.into());
898                    } else if aborted_transactions.contains(transaction_id)
899                        || self.ledger.contains_transmission(transmission_id).unwrap_or(false)
900                    {
901                        aborted_transmissions.insert(*transmission_id);
902                    } else {
903                        bail!("Missing transaction {transaction_id} for block {}", block.height());
904                    }
905                }
906            }
907        }
908        // Insert the batch certificate into storage.
909        let certificate_id = fmt_id(certificate.id());
910        debug!(
911            "Syncing certificate '{certificate_id}' for round {} with {} transmissions",
912            certificate.round(),
913            certificate.transmission_ids().len()
914        );
915
916        if trusted_ledger_certificate {
917            self.insert_certificate_atomic(certificate, aborted_transmissions, missing_transmissions);
918            Ok(())
919        } else {
920            self.insert_certificate(certificate, missing_transmissions, aborted_transmissions).with_context(|| {
921                format!("Failed to insert certificate '{certificate_id}' from block {}", block.height())
922            })
923        }
924    }
925}
926
927#[cfg(test)]
928impl<N: Network> Storage<N> {
929    /// Returns the ledger service.
930    pub fn ledger(&self) -> &Arc<dyn LedgerService<N>> {
931        &self.ledger
932    }
933
934    /// Returns an iterator over the `(round, (certificate ID, batch ID, author))` entries.
935    pub fn rounds_iter(&self) -> impl Iterator<Item = (u64, IndexSet<(Field<N>, Address<N>)>)> {
936        self.rounds.read().clone().into_iter()
937    }
938
939    /// Returns an iterator over the `(certificate ID, certificate)` entries.
940    pub fn certificates_iter(&self) -> impl Iterator<Item = (Field<N>, BatchCertificate<N>)> {
941        self.certificates.read().clone().into_iter()
942    }
943
944    /// Returns an iterator over the `(batch ID, round)` entries.
945    pub fn batch_ids_iter(&self) -> impl Iterator<Item = (Field<N>, u64)> {
946        self.batch_ids.read().clone().into_iter()
947    }
948
949    /// Returns an iterator over the `(transmission ID, (transmission, certificate IDs))` entries.
950    pub fn transmissions_iter(
951        &self,
952    ) -> impl Iterator<Item = (TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>))> {
953        self.transmissions.as_hashmap().into_iter()
954    }
955
956    /// Inserts the given `certificate` into storage.
957    ///
958    /// Note: Do NOT use this in production. This is for **testing only**.
959    #[cfg(test)]
960    #[doc(hidden)]
961    pub(crate) fn testing_only_insert_certificate_testing_only(&self, certificate: BatchCertificate<N>) {
962        // Retrieve the round.
963        let round = certificate.round();
964        // Retrieve the certificate ID.
965        let certificate_id = certificate.id();
966        // Retrieve the author of the batch.
967        let author = certificate.author();
968
969        // Insert the round to certificate ID entry.
970        self.rounds.write().entry(round).or_default().insert((certificate_id, author));
971        // Obtain the certificate's transmission ids.
972        let transmission_ids = certificate.transmission_ids().clone();
973        // Insert the certificate.
974        self.certificates.write().insert(certificate_id, certificate);
975        // Insert the batch ID.
976        self.batch_ids.write().insert(certificate_id, round);
977
978        // Construct the dummy missing transmissions (for testing purposes).
979        let missing_transmissions = transmission_ids
980            .iter()
981            .map(|id| (*id, Transmission::Transaction(snarkvm::ledger::narwhal::Data::Buffer(bytes::Bytes::new()))))
982            .collect::<HashMap<_, _>>();
983        // Insert the certificate ID for each of the transmissions into storage.
984        self.transmissions.insert_transmissions(
985            certificate_id,
986            transmission_ids,
987            Default::default(),
988            missing_transmissions,
989        );
990    }
991}
992
993#[cfg(test)]
994pub(crate) mod tests {
995    use super::*;
996    use snarkos_node_bft_ledger_service::MockLedgerService;
997    use snarkos_node_bft_storage_service::BFTMemoryService;
998    use snarkvm::{
999        ledger::narwhal::{Data, batch_certificate::test_helpers::sample_batch_certificate_for_round_with_committee},
1000        prelude::{Rng, TestRng},
1001    };
1002
1003    use ::bytes::Bytes;
1004    use indexmap::indexset;
1005
1006    type CurrentNetwork = snarkvm::prelude::MainnetV0;
1007
1008    /// Asserts that the storage matches the expected layout.
1009    pub fn assert_storage<N: Network>(
1010        storage: &Storage<N>,
1011        rounds: &[(u64, IndexSet<(Field<N>, Address<N>)>)],
1012        certificates: &[(Field<N>, BatchCertificate<N>)],
1013        batch_ids: &[(Field<N>, u64)],
1014        transmissions: &HashMap<TransmissionID<N>, (Transmission<N>, IndexSet<Field<N>>)>,
1015    ) {
1016        // Ensure the rounds are well-formed.
1017        assert_eq!(storage.rounds_iter().collect::<Vec<_>>(), *rounds);
1018        // Ensure the certificates are well-formed.
1019        assert_eq!(storage.certificates_iter().collect::<Vec<_>>(), *certificates);
1020        // Ensure the batch IDs are well-formed.
1021        assert_eq!(storage.batch_ids_iter().collect::<Vec<_>>(), *batch_ids);
1022        // Ensure the transmissions are well-formed.
1023        assert_eq!(storage.transmissions_iter().collect::<HashMap<_, _>>(), *transmissions);
1024    }
1025
1026    /// Samples a random transmission.
1027    fn sample_transmission(rng: &mut TestRng) -> Transmission<CurrentNetwork> {
1028        // Sample random fake solution bytes.
1029        let s = |rng: &mut TestRng| Data::Buffer(Bytes::from((0..512).map(|_| rng.random::<u8>()).collect::<Vec<_>>()));
1030        // Sample random fake transaction bytes.
1031        let t =
1032            |rng: &mut TestRng| Data::Buffer(Bytes::from((0..2048).map(|_| rng.random::<u8>()).collect::<Vec<_>>()));
1033        // Sample a random transmission.
1034        match rng.random::<bool>() {
1035            true => Transmission::Solution(s(rng)),
1036            false => Transmission::Transaction(t(rng)),
1037        }
1038    }
1039
1040    /// Samples the random transmissions, returning the missing transmissions and the transmissions.
1041    pub(crate) fn sample_transmissions(
1042        certificate: &BatchCertificate<CurrentNetwork>,
1043        rng: &mut TestRng,
1044    ) -> (
1045        HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>>,
1046        HashMap<TransmissionID<CurrentNetwork>, (Transmission<CurrentNetwork>, IndexSet<Field<CurrentNetwork>>)>,
1047    ) {
1048        // Retrieve the certificate ID.
1049        let certificate_id = certificate.id();
1050
1051        let mut missing_transmissions = HashMap::new();
1052        let mut transmissions = HashMap::<_, (_, IndexSet<Field<CurrentNetwork>>)>::new();
1053        for transmission_id in certificate.transmission_ids() {
1054            // Initialize the transmission.
1055            let transmission = sample_transmission(rng);
1056            // Update the missing transmissions.
1057            missing_transmissions.insert(*transmission_id, transmission.clone());
1058            // Update the transmissions map.
1059            transmissions
1060                .entry(*transmission_id)
1061                .or_insert((transmission, Default::default()))
1062                .1
1063                .insert(certificate_id);
1064        }
1065        (missing_transmissions, transmissions)
1066    }
1067
1068    // TODO (howardwu): Testing with 'max_gc_rounds' set to '0' should ensure everything is cleared after insertion.
1069
1070    #[test]
1071    fn test_certificate_insert_remove() {
1072        let rng = &mut TestRng::default();
1073
1074        // Sample a committee.
1075        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
1076        // Initialize the ledger.
1077        let ledger = Arc::new(MockLedgerService::new(committee));
1078        // Initialize the storage.
1079        let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1).unwrap();
1080
1081        // Ensure the storage is empty.
1082        assert_storage(&storage, &[], &[], &[], &Default::default());
1083
1084        // Create a new certificate.
1085        let certificate = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate(rng);
1086        // Retrieve the certificate ID.
1087        let certificate_id = certificate.id();
1088        // Retrieve the round.
1089        let round = certificate.round();
1090        // Retrieve the author of the batch.
1091        let author = certificate.author();
1092
1093        // Construct the sample 'transmissions'.
1094        let (missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1095
1096        // Insert the certificate.
1097        storage.insert_certificate_atomic(certificate.clone(), Default::default(), missing_transmissions);
1098        // Ensure the certificate exists in storage.
1099        assert!(storage.contains_certificate(certificate_id));
1100        // Ensure the certificate is stored in the correct round.
1101        assert_eq!(storage.get_certificates_for_round(round), indexset! { certificate.clone() });
1102        // Ensure the certificate is stored for the correct round and author.
1103        assert_eq!(storage.get_certificate_for_round_with_author(round, author), Some(certificate.clone()));
1104
1105        // Check that the underlying storage representation is correct.
1106        {
1107            // Construct the expected layout for 'rounds'.
1108            let rounds = [(round, indexset! { (certificate_id, author) })];
1109            // Construct the expected layout for 'certificates'.
1110            let certificates = [(certificate_id, certificate.clone())];
1111            // Construct the expected layout for 'batch_ids'.
1112            let batch_ids = [(certificate_id, round)];
1113            // Assert the storage is well-formed.
1114            assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1115        }
1116
1117        // Retrieve the certificate.
1118        let candidate_certificate = storage.get_certificate(certificate_id).unwrap();
1119        // Ensure the retrieved certificate is the same as the inserted certificate.
1120        assert_eq!(certificate, candidate_certificate);
1121
1122        // Remove the certificate.
1123        assert!(storage.remove_certificate(certificate_id));
1124        // Ensure the certificate does not exist in storage.
1125        assert!(!storage.contains_certificate(certificate_id));
1126        // Ensure the certificate is no longer stored in the round.
1127        assert!(storage.get_certificates_for_round(round).is_empty());
1128        // Ensure the certificate is no longer stored for the round and author.
1129        assert_eq!(storage.get_certificate_for_round_with_author(round, author), None);
1130        // Ensure the storage is empty.
1131        assert_storage(&storage, &[], &[], &[], &Default::default());
1132    }
1133
1134    #[test]
1135    fn test_certificate_duplicate() {
1136        let rng = &mut TestRng::default();
1137
1138        // Sample a committee.
1139        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
1140        // Initialize the ledger.
1141        let ledger = Arc::new(MockLedgerService::new(committee));
1142        // Initialize the storage.
1143        let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1).unwrap();
1144
1145        // Ensure the storage is empty.
1146        assert_storage(&storage, &[], &[], &[], &Default::default());
1147
1148        // Create a new certificate.
1149        let certificate = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate(rng);
1150        // Retrieve the certificate ID.
1151        let certificate_id = certificate.id();
1152        // Retrieve the round.
1153        let round = certificate.round();
1154        // Retrieve the author of the batch.
1155        let author = certificate.author();
1156
1157        // Construct the expected layout for 'rounds'.
1158        let rounds = [(round, indexset! { (certificate_id, author) })];
1159        // Construct the expected layout for 'certificates'.
1160        let certificates = [(certificate_id, certificate.clone())];
1161        // Construct the expected layout for 'batch_ids'.
1162        let batch_ids = [(certificate_id, round)];
1163        // Construct the sample 'transmissions'.
1164        let (missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1165
1166        // Insert the certificate.
1167        storage.insert_certificate_atomic(certificate.clone(), Default::default(), missing_transmissions.clone());
1168        // Ensure the certificate exists in storage.
1169        assert!(storage.contains_certificate(certificate_id));
1170        // Check that the underlying storage representation is correct.
1171        assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1172
1173        // Insert the certificate again - without any missing transmissions.
1174        storage.insert_certificate_atomic(certificate.clone(), Default::default(), Default::default());
1175        // Ensure the certificate exists in storage.
1176        assert!(storage.contains_certificate(certificate_id));
1177        // Check that the underlying storage representation remains unchanged.
1178        assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1179
1180        // Insert the certificate again - with all of the original missing transmissions.
1181        storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1182        // Ensure the certificate exists in storage.
1183        assert!(storage.contains_certificate(certificate_id));
1184        // Check that the underlying storage representation remains unchanged.
1185        assert_storage(&storage, &rounds, &certificates, &batch_ids, &transmissions);
1186    }
1187
1188    /// Verify that when inserting a certificate with a mix of provided transmissions and aborted
1189    /// transmission IDs, storage correctly records both: contains_transmission is true for all
1190    /// (including aborted), and aborted IDs are stored so sync can resolve certificate references.
1191    #[test]
1192    fn test_certificate_insert_with_aborted_transmissions() {
1193        use std::collections::HashSet;
1194
1195        let rng = &mut TestRng::default();
1196
1197        let committee = snarkvm::ledger::committee::test_helpers::sample_committee(rng);
1198        let ledger = Arc::new(MockLedgerService::new(committee));
1199        let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1).unwrap();
1200
1201        let certificate = snarkvm::ledger::narwhal::batch_certificate::test_helpers::sample_batch_certificate(rng);
1202        let certificate_id = certificate.id();
1203        let round = certificate.round();
1204        let transmission_ids: Vec<_> = certificate.transmission_ids().iter().copied().collect();
1205
1206        if transmission_ids.len() < 2 {
1207            // Certificate has 0 or 1 transmission; just verify insert without aborted works.
1208            let (missing_transmissions, _) = sample_transmissions(&certificate, rng);
1209            storage.insert_certificate_atomic(certificate.clone(), HashSet::new(), missing_transmissions);
1210            for id in certificate.transmission_ids() {
1211                assert!(storage.contains_transmission(*id));
1212            }
1213            return;
1214        }
1215
1216        let (all_missing, _) = sample_transmissions(&certificate, rng);
1217        let aborted_id = transmission_ids[0];
1218        let aborted_transmission_ids: HashSet<_> = [aborted_id].into_iter().collect();
1219        let mut missing_transmissions = all_missing;
1220        missing_transmissions.remove(&aborted_id);
1221
1222        storage.insert_certificate_atomic(certificate.clone(), aborted_transmission_ids, missing_transmissions);
1223
1224        assert!(storage.contains_certificate(certificate_id));
1225        assert_eq!(storage.get_certificates_for_round(round), indexset! { certificate.clone() });
1226
1227        // Every transmission ID in the certificate (including aborted) should be resolvable.
1228        for id in certificate.transmission_ids() {
1229            assert!(
1230                storage.contains_transmission(*id),
1231                "contains_transmission should be true for all transmission IDs including aborted {id:?}"
1232            );
1233        }
1234
1235        // Aborted transmission has no content in storage; others do.
1236        assert!(
1237            storage.get_transmission(aborted_id).is_none(),
1238            "Aborted transmission should not have content in storage"
1239        );
1240        for id in transmission_ids.iter().skip(1) {
1241            assert!(
1242                storage.get_transmission(*id).is_some(),
1243                "Non-aborted transmission {id:?} should have content in storage"
1244            );
1245        }
1246    }
1247
1248    /// Test that `check_incoming_certificate` does not reject a valid cert.
1249    #[test]
1250    fn test_valid_incoming_certificate() {
1251        let rng = &mut TestRng::default();
1252
1253        // Sample a committee.
1254        let (committee, private_keys) =
1255            snarkvm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 5, rng);
1256        // Initialize the ledger.
1257        let ledger = Arc::new(MockLedgerService::new(committee));
1258        // Initialize the storage.
1259        let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1).unwrap();
1260
1261        // Go through many rounds of valid certificates and ensure they're accepted.
1262        let mut previous_certs = IndexSet::default();
1263
1264        for round in 1..=100 {
1265            let mut new_certs = IndexSet::default();
1266
1267            // Generate one cert per validator
1268            for private_key in private_keys.iter() {
1269                let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1270
1271                let certificate = sample_batch_certificate_for_round_with_committee(
1272                    round,
1273                    previous_certs.clone(),
1274                    private_key,
1275                    &other_keys,
1276                    rng,
1277                );
1278                storage.check_incoming_certificate(&certificate).expect("Valid certificate rejected");
1279                new_certs.insert(certificate.id());
1280
1281                // Construct the sample 'transmissions'.
1282                let (missing_transmissions, _transmissions) = sample_transmissions(&certificate, rng);
1283                storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1284            }
1285
1286            previous_certs = new_certs;
1287        }
1288    }
1289
1290    /// Make sure that we reject all certificates without sufficient signatures early.
1291    #[test]
1292    fn test_invalid_incoming_certificate_missing_signature() {
1293        let rng = &mut TestRng::default();
1294
1295        // Sample a committee.
1296        let (committee, private_keys) =
1297            snarkvm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 10, rng);
1298        // Initialize the ledger.
1299        let ledger = Arc::new(MockLedgerService::new(committee));
1300        // Initialize the storage.
1301        let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1).unwrap();
1302
1303        // Go through many rounds of valid certificates and ensure they're accepted.
1304        let mut previous_certs = IndexSet::default();
1305
1306        for round in 1..=5 {
1307            let mut new_certs = IndexSet::default();
1308
1309            // Generate one cert per validator
1310            for private_key in private_keys.iter() {
1311                if round < 5 {
1312                    let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1313
1314                    let certificate = sample_batch_certificate_for_round_with_committee(
1315                        round,
1316                        previous_certs.clone(),
1317                        private_key,
1318                        &other_keys,
1319                        rng,
1320                    );
1321                    storage.check_incoming_certificate(&certificate).expect("Valid certificate rejected");
1322                    new_certs.insert(certificate.id());
1323
1324                    // Construct the sample 'transmissions'.
1325                    let (missing_transmissions, _transmissions) = sample_transmissions(&certificate, rng);
1326                    storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1327                } else {
1328                    // Pick a few signers, but not enough to form a quorum.
1329                    let other_keys: Vec<_> = private_keys[0..=3].iter().cloned().filter(|k| k != private_key).collect();
1330
1331                    let certificate = sample_batch_certificate_for_round_with_committee(
1332                        round,
1333                        previous_certs.clone(),
1334                        private_key,
1335                        &other_keys,
1336                        rng,
1337                    );
1338                    assert!(storage.check_incoming_certificate(&certificate).is_err());
1339                }
1340            }
1341
1342            previous_certs = new_certs;
1343        }
1344    }
1345
1346    /// Verify that `insert_certificate` rejects certs with less edges than required.
1347    #[test]
1348    fn test_invalid_certificate_insufficient_previous_certs() {
1349        let rng = &mut TestRng::default();
1350
1351        // Sample a committee.
1352        let (committee, private_keys) =
1353            snarkvm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 10, rng);
1354        // Initialize the ledger.
1355        let ledger = Arc::new(MockLedgerService::new(committee));
1356        // Initialize the storage.
1357        let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1).unwrap();
1358
1359        // Go through many rounds of valid certificates and ensure they're accepted.
1360        let mut previous_certs = IndexSet::default();
1361
1362        for round in 1..=6 {
1363            let mut new_certs = IndexSet::default();
1364
1365            // Generate one cert per validator
1366            for private_key in private_keys.iter() {
1367                let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1368
1369                let certificate = sample_batch_certificate_for_round_with_committee(
1370                    round,
1371                    previous_certs.clone(),
1372                    private_key,
1373                    &other_keys,
1374                    rng,
1375                );
1376
1377                // Construct the sample 'transmissions'.
1378                let (_missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1379                let transmissions = transmissions.into_iter().map(|(k, (t, _))| (k, t)).collect();
1380
1381                if round <= 5 {
1382                    new_certs.insert(certificate.id());
1383                    storage
1384                        .insert_certificate(certificate, transmissions, Default::default())
1385                        .expect("Valid certificate rejected");
1386                } else {
1387                    assert!(storage.insert_certificate(certificate, transmissions, Default::default()).is_err());
1388                }
1389            }
1390
1391            if round < 5 {
1392                previous_certs = new_certs;
1393            } else {
1394                // Remove more than half of the previous certs.
1395                previous_certs = new_certs.into_iter().skip(6).collect();
1396            }
1397        }
1398    }
1399
1400    /// Verify that `insert_certificate` rejects certs that do not increment the round number.
1401    #[test]
1402    fn test_invalid_certificate_wrong_round_number() {
1403        let rng = &mut TestRng::default();
1404
1405        // Sample a committee.
1406        let (committee, private_keys) =
1407            snarkvm::ledger::committee::test_helpers::sample_committee_and_keys_for_round(0, 10, rng);
1408        // Initialize the ledger.
1409        let ledger = Arc::new(MockLedgerService::new(committee));
1410        // Initialize the storage.
1411        let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1).unwrap();
1412
1413        // Go through many rounds of valid certificates and ensure they're accepted.
1414        let mut previous_certs = IndexSet::default();
1415
1416        for round in 1..=6 {
1417            let mut new_certs = IndexSet::default();
1418
1419            // Generate one cert per validator
1420            for private_key in private_keys.iter() {
1421                let cert_round = round.min(5); // In the sixth round, do not increment
1422                let other_keys: Vec<_> = private_keys.iter().cloned().filter(|k| k != private_key).collect();
1423
1424                let certificate = sample_batch_certificate_for_round_with_committee(
1425                    cert_round,
1426                    previous_certs.clone(),
1427                    private_key,
1428                    &other_keys,
1429                    rng,
1430                );
1431
1432                // Construct the sample 'transmissions'.
1433                let (_missing_transmissions, transmissions) = sample_transmissions(&certificate, rng);
1434                let transmissions = transmissions.into_iter().map(|(k, (t, _))| (k, t)).collect();
1435
1436                if round <= 5 {
1437                    new_certs.insert(certificate.id());
1438                    storage
1439                        .insert_certificate(certificate, transmissions, Default::default())
1440                        .expect("Valid certificate rejected");
1441                } else {
1442                    assert!(storage.insert_certificate(certificate, transmissions, Default::default()).is_err());
1443                }
1444            }
1445
1446            if round < 5 {
1447                previous_certs = new_certs;
1448            } else {
1449                // Remove more than half of the previous certs.
1450                previous_certs = new_certs.into_iter().skip(6).collect();
1451            }
1452        }
1453    }
1454}
1455
1456#[cfg(test)]
1457pub mod prop_tests {
1458    use super::*;
1459    use crate::helpers::{now, storage::tests::assert_storage};
1460    use snarkos_node_bft_events::committee_prop_tests::{CommitteeContext, ValidatorSet};
1461    use snarkos_node_bft_ledger_service::MockLedgerService;
1462    use snarkos_node_bft_storage_service::BFTMemoryService;
1463    use snarkvm::{
1464        ledger::{
1465            narwhal::{BatchHeader, Data},
1466            puzzle::SolutionID,
1467        },
1468        prelude::{Signature, Uniform},
1469    };
1470
1471    use ::bytes::Bytes;
1472    use indexmap::indexset;
1473    use proptest::{
1474        collection,
1475        prelude::{Arbitrary, BoxedStrategy, Just, Strategy, any},
1476        prop_oneof,
1477        sample::{Selector, size_range},
1478    };
1479    use rand::{CryptoRng, SeedableRng, TryCryptoRng, TryRng};
1480    use rand_chacha::ChaChaRng;
1481    use std::fmt::Debug;
1482    use test_strategy::proptest;
1483
1484    type CurrentNetwork = snarkvm::prelude::MainnetV0;
1485
1486    impl Arbitrary for Storage<CurrentNetwork> {
1487        type Parameters = CommitteeContext;
1488        type Strategy = BoxedStrategy<Storage<CurrentNetwork>>;
1489
1490        fn arbitrary() -> Self::Strategy {
1491            (any::<CommitteeContext>(), 0..BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64)
1492                .prop_map(|(CommitteeContext(committee, _), gc_rounds)| {
1493                    let ledger = Arc::new(MockLedgerService::new(committee));
1494                    Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), gc_rounds).unwrap()
1495                })
1496                .boxed()
1497        }
1498
1499        fn arbitrary_with(context: Self::Parameters) -> Self::Strategy {
1500            (Just(context), 0..BatchHeader::<CurrentNetwork>::MAX_GC_ROUNDS as u64)
1501                .prop_map(|(CommitteeContext(committee, _), gc_rounds)| {
1502                    let ledger = Arc::new(MockLedgerService::new(committee));
1503                    Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), gc_rounds).unwrap()
1504                })
1505                .boxed()
1506        }
1507    }
1508
1509    // The `proptest::TestRng` doesn't implement `rand_core::CryptoRng` trait which is required in snarkVM, so we use a wrapper
1510    // We wrap a `ChaChaRng` (rand 0.10 compatible) seeded from the proptest RNG.
1511    #[derive(Debug)]
1512    pub struct CryptoTestRng(ChaChaRng);
1513
1514    impl Arbitrary for CryptoTestRng {
1515        type Parameters = ();
1516        type Strategy = BoxedStrategy<CryptoTestRng>;
1517
1518        fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1519            use proptest::prelude::RngCore as ProptestRngCore;
1520            Just(0).prop_perturb(|_, mut rng| CryptoTestRng(ChaChaRng::seed_from_u64(rng.next_u64()))).boxed()
1521        }
1522    }
1523
1524    impl TryRng for CryptoTestRng {
1525        type Error = core::convert::Infallible;
1526
1527        fn try_next_u32(&mut self) -> Result<u32, Self::Error> {
1528            TryRng::try_next_u32(&mut self.0)
1529        }
1530
1531        fn try_next_u64(&mut self) -> Result<u64, Self::Error> {
1532            TryRng::try_next_u64(&mut self.0)
1533        }
1534
1535        fn try_fill_bytes(&mut self, dest: &mut [u8]) -> Result<(), Self::Error> {
1536            TryRng::try_fill_bytes(&mut self.0, dest)
1537        }
1538    }
1539
1540    impl TryCryptoRng for CryptoTestRng {}
1541
1542    #[derive(Debug, Clone)]
1543    pub struct AnyTransmission(pub Transmission<CurrentNetwork>);
1544
1545    impl Arbitrary for AnyTransmission {
1546        type Parameters = ();
1547        type Strategy = BoxedStrategy<AnyTransmission>;
1548
1549        fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1550            any_transmission().prop_map(AnyTransmission).boxed()
1551        }
1552    }
1553
1554    #[derive(Debug, Clone)]
1555    pub struct AnyTransmissionID(pub TransmissionID<CurrentNetwork>);
1556
1557    impl Arbitrary for AnyTransmissionID {
1558        type Parameters = ();
1559        type Strategy = BoxedStrategy<AnyTransmissionID>;
1560
1561        fn arbitrary_with(_: Self::Parameters) -> Self::Strategy {
1562            any_transmission_id().prop_map(AnyTransmissionID).boxed()
1563        }
1564    }
1565
1566    fn any_transmission() -> BoxedStrategy<Transmission<CurrentNetwork>> {
1567        prop_oneof![
1568            (collection::vec(any::<u8>(), 512..=512))
1569                .prop_map(|bytes| Transmission::Solution(Data::Buffer(Bytes::from(bytes)))),
1570            (collection::vec(any::<u8>(), 2048..=2048))
1571                .prop_map(|bytes| Transmission::Transaction(Data::Buffer(Bytes::from(bytes)))),
1572        ]
1573        .boxed()
1574    }
1575
1576    pub fn any_solution_id() -> BoxedStrategy<SolutionID<CurrentNetwork>> {
1577        any::<u64>().prop_map(|x| x.into()).boxed()
1578    }
1579
1580    pub fn any_transaction_id() -> BoxedStrategy<<CurrentNetwork as Network>::TransactionID> {
1581        any::<u64>()
1582            .prop_map(|seed| {
1583                let rng = &mut ChaChaRng::seed_from_u64(seed);
1584                <CurrentNetwork as Network>::TransactionID::from(Field::rand(rng))
1585            })
1586            .boxed()
1587    }
1588
1589    pub fn any_transmission_id() -> BoxedStrategy<TransmissionID<CurrentNetwork>> {
1590        prop_oneof![
1591            (any_transaction_id(), any::<<CurrentNetwork as Network>::TransmissionChecksum>())
1592                .prop_map(|(id, cs)| TransmissionID::Transaction(id, cs)),
1593            (any_solution_id(), any::<<CurrentNetwork as Network>::TransmissionChecksum>())
1594                .prop_map(|(id, cs)| TransmissionID::Solution(id, cs)),
1595        ]
1596        .boxed()
1597    }
1598
1599    pub fn sign_batch_header<R: CryptoRng>(
1600        validator_set: &ValidatorSet,
1601        batch_header: &BatchHeader<CurrentNetwork>,
1602        rng: &mut R,
1603    ) -> IndexSet<Signature<CurrentNetwork>> {
1604        let mut signatures = IndexSet::with_capacity(validator_set.0.len());
1605        for validator in validator_set.0.iter() {
1606            let private_key = validator.private_key;
1607            signatures.insert(private_key.sign(&[batch_header.batch_id()], rng).unwrap());
1608        }
1609        signatures
1610    }
1611
1612    #[proptest]
1613    fn test_certificate_duplicate(
1614        context: CommitteeContext,
1615        #[any(size_range(1..16).lift())] transmissions: Vec<(AnyTransmissionID, AnyTransmission)>,
1616        mut rng: CryptoTestRng,
1617        selector: Selector,
1618    ) {
1619        let CommitteeContext(committee, ValidatorSet(validators)) = context;
1620        let committee_id = committee.id();
1621
1622        // Initialize the storage.
1623        let ledger = Arc::new(MockLedgerService::new(committee));
1624        let storage = Storage::<CurrentNetwork>::new(ledger, Arc::new(BFTMemoryService::new()), 1).unwrap();
1625
1626        // Ensure the storage is empty.
1627        assert_storage(&storage, &[], &[], &[], &Default::default());
1628
1629        // Create a new certificate.
1630        let signer = selector.select(&validators);
1631
1632        let mut transmission_map = IndexMap::new();
1633
1634        for (AnyTransmissionID(id), AnyTransmission(t)) in transmissions.iter() {
1635            transmission_map.insert(*id, t.clone());
1636        }
1637
1638        let batch_header = BatchHeader::new(
1639            &signer.private_key,
1640            0,
1641            now(),
1642            committee_id,
1643            transmission_map.keys().cloned().collect(),
1644            Default::default(),
1645            &mut rng,
1646        )
1647        .unwrap();
1648
1649        // Remove the author from the validator set passed to create the batch
1650        // certificate, the author should not sign their own batch.
1651        let mut validators = validators.clone();
1652        validators.remove(signer);
1653
1654        let certificate = BatchCertificate::from(
1655            batch_header.clone(),
1656            sign_batch_header(&ValidatorSet(validators), &batch_header, &mut rng),
1657        )
1658        .unwrap();
1659
1660        // Retrieve the certificate ID.
1661        let certificate_id = certificate.id();
1662        let mut internal_transmissions = HashMap::<_, (_, IndexSet<Field<CurrentNetwork>>)>::new();
1663        for (AnyTransmissionID(id), AnyTransmission(t)) in transmissions.iter().cloned() {
1664            internal_transmissions.entry(id).or_insert((t, Default::default())).1.insert(certificate_id);
1665        }
1666
1667        // Retrieve the round.
1668        let round = certificate.round();
1669        // Retrieve the author of the batch.
1670        let author = certificate.author();
1671
1672        // Construct the expected layout for 'rounds'.
1673        let rounds = [(round, indexset! { (certificate_id, author) })];
1674        // Construct the expected layout for 'certificates'.
1675        let certificates = [(certificate_id, certificate.clone())];
1676        // Construct the expected layout for 'batch_ids'.
1677        let batch_ids = [(certificate_id, round)];
1678
1679        // Insert the certificate.
1680        let missing_transmissions: HashMap<TransmissionID<CurrentNetwork>, Transmission<CurrentNetwork>> =
1681            transmission_map.into_iter().collect();
1682        storage.insert_certificate_atomic(certificate.clone(), Default::default(), missing_transmissions.clone());
1683        // Ensure the certificate exists in storage.
1684        assert!(storage.contains_certificate(certificate_id));
1685        // Check that the underlying storage representation is correct.
1686        assert_storage(&storage, &rounds, &certificates, &batch_ids, &internal_transmissions);
1687
1688        // Insert the certificate again - without any missing transmissions.
1689        storage.insert_certificate_atomic(certificate.clone(), Default::default(), Default::default());
1690        // Ensure the certificate exists in storage.
1691        assert!(storage.contains_certificate(certificate_id));
1692        // Check that the underlying storage representation remains unchanged.
1693        assert_storage(&storage, &rounds, &certificates, &batch_ids, &internal_transmissions);
1694
1695        // Insert the certificate again - with all of the original missing transmissions.
1696        storage.insert_certificate_atomic(certificate, Default::default(), missing_transmissions);
1697        // Ensure the certificate exists in storage.
1698        assert!(storage.contains_certificate(certificate_id));
1699        // Check that the underlying storage representation remains unchanged.
1700        assert_storage(&storage, &rounds, &certificates, &batch_ids, &internal_transmissions);
1701    }
1702}