Skip to main content

commonware_storage/qmdb/current/
db.rs

1//! A shared, generic implementation of the _Current_ QMDB.
2//!
3//! The impl blocks in this file define shared functionality across all Current QMDB variants.
4
5use crate::{
6    index::Unordered as UnorderedIndex,
7    journal::{
8        contiguous::{Contiguous, Mutable},
9        Error as JournalError,
10    },
11    metadata::{Config as MConfig, Metadata},
12    mmr::{
13        self,
14        hasher::Hasher as _,
15        iterator::{nodes_to_pin, PeakIterator},
16        mem::MIN_TO_PARALLELIZE,
17        storage::Storage as _,
18        Location, Position, Proof, StandardHasher,
19    },
20    qmdb::{
21        any::{
22            self,
23            operation::{update::Update, Operation},
24            ValueEncoding,
25        },
26        current::{
27            grafting,
28            proof::{OperationProof, RangeProof},
29        },
30        operation::Key,
31        store::{LogStore, MerkleizedStore, PrunableStore},
32        Error,
33    },
34    Persistable,
35};
36use commonware_codec::{Codec, CodecShared, DecodeExt};
37use commonware_cryptography::{Digest, DigestOf, Hasher};
38use commonware_parallel::ThreadPool;
39use commonware_runtime::{Clock, Metrics, Storage};
40use commonware_utils::{bitmap::Prunable as BitMap, sequence::prefixed_u64::U64, sync::AsyncMutex};
41use core::{num::NonZeroU64, ops::Range};
42use futures::future::try_join_all;
43use rayon::prelude::*;
44use tracing::{error, warn};
45
46/// Prefix used for the metadata key for grafted MMR pinned nodes.
47const NODE_PREFIX: u8 = 0;
48
49/// Prefix used for the metadata key for the number of pruned bitmap chunks.
50const PRUNED_CHUNKS_PREFIX: u8 = 1;
51
52/// A Current QMDB implementation generic over ordered/unordered keys and variable/fixed values.
53pub struct Db<
54    E: Storage + Clock + Metrics,
55    C: Contiguous<Item: CodecShared>,
56    I: UnorderedIndex<Value = Location>,
57    H: Hasher,
58    U: Send + Sync,
59    const N: usize,
60> {
61    /// An authenticated database that provides the ability to prove whether a key ever had a
62    /// specific value.
63    pub(super) any: any::db::Db<E, C, I, H, U>,
64
65    /// The bitmap over the activity status of each operation. Supports augmenting [Db] proofs in
66    /// order to further prove whether a key _currently_ has a specific value.
67    pub(super) status: BitMap<N>,
68
69    /// Each leaf corresponds to a complete bitmap chunk at the grafting height.
70    /// See the [grafted leaf formula](super) in the module documentation.
71    ///
72    /// Internal nodes are hashed using their position in the ops MMR rather than their
73    /// grafted position.
74    pub(super) grafted_mmr: mmr::mem::Mmr<H::Digest>,
75
76    /// Persists:
77    /// - The number of pruned bitmap chunks at key [PRUNED_CHUNKS_PREFIX]
78    /// - The grafted MMR pinned nodes at key [NODE_PREFIX]
79    pub(super) metadata: AsyncMutex<Metadata<E, U64, Vec<u8>>>,
80
81    /// Optional thread pool for parallelizing grafted leaf computation.
82    pub(super) thread_pool: Option<ThreadPool>,
83
84    /// The cached canonical root.
85    /// See the [Root structure](super) section in the module documentation.
86    pub(super) root: DigestOf<H>,
87}
88
89// Shared read-only functionality.
90impl<E, K, V, C, I, H, U, const N: usize> Db<E, C, I, H, U, N>
91where
92    E: Storage + Clock + Metrics,
93    K: Key,
94    V: ValueEncoding,
95    U: Update<K, V>,
96    C: Contiguous<Item = Operation<K, V, U>>,
97    I: UnorderedIndex<Value = Location>,
98    H: Hasher,
99    Operation<K, V, U>: Codec,
100{
101    /// Return the inactivity floor location. This is the location before which all operations are
102    /// known to be inactive. Operations before this point can be safely pruned.
103    pub const fn inactivity_floor_loc(&self) -> Location {
104        self.any.inactivity_floor_loc()
105    }
106
107    /// Whether the snapshot currently has no active keys.
108    pub const fn is_empty(&self) -> bool {
109        self.any.is_empty()
110    }
111
112    /// Get the metadata associated with the last commit.
113    pub async fn get_metadata(&self) -> Result<Option<V::Value>, Error> {
114        self.any.get_metadata().await
115    }
116
117    /// Return true if the given sequence of `ops` were applied starting at location `start_loc`
118    /// in the log with the provided `root`, having the activity status described by `chunks`.
119    pub fn verify_range_proof(
120        hasher: &mut H,
121        proof: &RangeProof<H::Digest>,
122        start_loc: Location,
123        ops: &[Operation<K, V, U>],
124        chunks: &[[u8; N]],
125        root: &H::Digest,
126    ) -> bool {
127        proof.verify(hasher, start_loc, ops, chunks, root)
128    }
129}
130
131// Functionality requiring non-mutable journal.
132impl<E, K, V, U, C, I, H, const N: usize> Db<E, C, I, H, U, N>
133where
134    E: Storage + Clock + Metrics,
135    K: Key,
136    V: ValueEncoding,
137    U: Update<K, V>,
138    C: Contiguous<Item = Operation<K, V, U>>,
139    I: UnorderedIndex<Value = Location>,
140    H: Hasher,
141    Operation<K, V, U>: Codec,
142{
143    /// Returns a virtual [grafting::Storage] over the grafted MMR and ops MMR.
144    /// For positions at or above the grafting height, returns grafted MMR node.
145    /// For positions below the grafting height, the ops MMR is used.
146    fn grafted_storage(&self) -> impl mmr::storage::Storage<H::Digest> + '_ {
147        grafting::Storage::new(
148            &self.grafted_mmr,
149            grafting::height::<N>(),
150            &self.any.log.mmr,
151        )
152    }
153
154    /// Returns the canonical root.
155    /// See the [Root structure](super) section in the module documentation.
156    pub const fn root(&self) -> H::Digest {
157        self.root
158    }
159
160    /// Returns the ops MMR root.
161    ///
162    /// This is the root of the raw operations log, without the activity bitmap.
163    /// It is used as the sync target because the sync engine verifies batches
164    /// against the ops MMR, not the canonical root.
165    ///
166    /// See the [Root structure](super) section in the module documentation.
167    pub fn ops_root(&self) -> H::Digest {
168        self.any.log.root()
169    }
170
171    /// Create a new speculative batch of operations with this database as its parent.
172    #[allow(clippy::type_complexity)]
173    pub fn new_batch(
174        &self,
175    ) -> super::batch::UnmerkleizedBatch<
176        '_,
177        E,
178        K,
179        V,
180        C,
181        I,
182        H,
183        U,
184        mmr::journaled::Mmr<E, H::Digest>,
185        mmr::mem::Mmr<H::Digest>,
186        BitMap<N>,
187        N,
188    > {
189        super::batch::UnmerkleizedBatch::new(
190            self.any.new_batch(),
191            self,
192            Vec::new(),
193            Vec::new(),
194            &self.grafted_mmr,
195            &self.status,
196        )
197    }
198
199    /// Returns a proof for the operation at `loc`.
200    pub(super) async fn operation_proof(
201        &self,
202        hasher: &mut H,
203        loc: Location,
204    ) -> Result<OperationProof<H::Digest, N>, Error> {
205        let storage = self.grafted_storage();
206        let ops_root = self.any.log.root();
207        OperationProof::new(hasher, &self.status, &storage, loc, ops_root).await
208    }
209
210    /// Returns a proof that the specified range of operations are part of the database, along with
211    /// the operations from the range. A truncated range (from hitting the max) can be detected by
212    /// looking at the length of the returned operations vector. Also returns the bitmap chunks
213    /// required to verify the proof.
214    ///
215    /// # Errors
216    ///
217    /// Returns [Error::OperationPruned] if `start_loc` falls in a pruned bitmap chunk.
218    /// Returns [mmr::Error::LocationOverflow] if `start_loc` > [mmr::MAX_LOCATION].
219    /// Returns [mmr::Error::RangeOutOfBounds] if `start_loc` >= number of leaves in the MMR.
220    pub async fn range_proof(
221        &self,
222        hasher: &mut H,
223        start_loc: Location,
224        max_ops: NonZeroU64,
225    ) -> Result<(RangeProof<H::Digest>, Vec<Operation<K, V, U>>, Vec<[u8; N]>), Error> {
226        let storage = self.grafted_storage();
227        let ops_root = self.any.log.root();
228        RangeProof::new_with_ops(
229            hasher,
230            &self.status,
231            &storage,
232            &self.any.log,
233            start_loc,
234            max_ops,
235            ops_root,
236        )
237        .await
238    }
239}
240
241// Functionality requiring mutable journal.
242impl<E, K, V, U, C, I, H, const N: usize> Db<E, C, I, H, U, N>
243where
244    E: Storage + Clock + Metrics,
245    K: Key,
246    V: ValueEncoding,
247    U: Update<K, V>,
248    C: Mutable<Item = Operation<K, V, U>>,
249    I: UnorderedIndex<Value = Location>,
250    H: Hasher,
251    Operation<K, V, U>: Codec,
252{
253    /// Returns an ops-level historical proof for the specified range.
254    ///
255    /// Unlike [`range_proof`](Self::range_proof) which returns grafted proofs
256    /// incorporating the activity bitmap, this returns standard MMR proofs
257    /// suitable for state sync.
258    pub async fn ops_historical_proof(
259        &self,
260        historical_size: Location,
261        start_loc: Location,
262        max_ops: NonZeroU64,
263    ) -> Result<(mmr::Proof<H::Digest>, Vec<Operation<K, V, U>>), Error> {
264        self.any
265            .historical_proof(historical_size, start_loc, max_ops)
266            .await
267    }
268
269    /// Prunes historical operations prior to `prune_loc`. This does not affect the db's root or
270    /// snapshot.
271    ///
272    /// # Errors
273    ///
274    /// - Returns [Error::PruneBeyondMinRequired] if `prune_loc` > inactivity floor.
275    /// - Returns [mmr::Error::LocationOverflow] if `prune_loc` > [mmr::MAX_LOCATION].
276    pub async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
277        // Persist grafted MMR pruning state before pruning the ops log. If the subsequent
278        // `any.prune` fails, the metadata is ahead of the log, which is safe: on recovery,
279        // `build_grafted_mmr` will recompute from the (un-pruned) log and the metadata
280        // simply records peaks that haven't been pruned yet. The reverse order would be unsafe:
281        // a pruned log with stale metadata would lose peak digests permanently.
282        self.sync_metadata().await?;
283
284        self.any.prune(prune_loc).await
285    }
286
287    /// Sync the metadata to disk.
288    pub(crate) async fn sync_metadata(&self) -> Result<(), Error> {
289        let mut metadata = self.metadata.lock().await;
290        metadata.clear();
291
292        // Write the number of pruned chunks.
293        let key = U64::new(PRUNED_CHUNKS_PREFIX, 0);
294        metadata.put(
295            key,
296            (self.status.pruned_chunks() as u64).to_be_bytes().to_vec(),
297        );
298
299        // Write the grafted MMR pinned nodes. These are the ops-space peaks covering the
300        // pruned portion of the bitmap.
301        let pruned_ops = (self.status.pruned_chunks() as u64)
302            .checked_mul(BitMap::<N>::CHUNK_SIZE_BITS)
303            .ok_or_else(|| Error::DataCorrupted("pruned ops leaves overflow"))?;
304        let ops_mmr_size = Position::try_from(Location::new(pruned_ops))?;
305        let grafting_height = grafting::height::<N>();
306        for (i, (ops_pos, _)) in PeakIterator::new(ops_mmr_size).enumerate() {
307            let grafted_pos = grafting::ops_to_grafted_pos(ops_pos, grafting_height);
308            let digest = self
309                .grafted_mmr
310                .get_node(grafted_pos)
311                .ok_or(mmr::Error::MissingNode(ops_pos))?;
312            let key = U64::new(NODE_PREFIX, i as u64);
313            metadata.put(key, digest.to_vec());
314        }
315
316        metadata.sync().await.map_err(mmr::Error::MetadataError)?;
317
318        Ok(())
319    }
320}
321
322// Functionality requiring mutable + persistable journal.
323impl<E, K, V, U, C, I, H, const N: usize> Db<E, C, I, H, U, N>
324where
325    E: Storage + Clock + Metrics,
326    K: Key,
327    V: ValueEncoding,
328    U: Update<K, V>,
329    C: Mutable<Item = Operation<K, V, U>> + Persistable<Error = JournalError>,
330    I: UnorderedIndex<Value = Location>,
331    H: Hasher,
332    Operation<K, V, U>: Codec,
333{
334    /// Sync all database state to disk.
335    pub async fn sync(&self) -> Result<(), Error> {
336        self.any.sync().await?;
337
338        // Write the bitmap pruning boundary to disk so that next startup doesn't have to
339        // re-Merkleize the inactive portion up to the inactivity floor.
340        self.sync_metadata().await
341    }
342
343    /// Destroy the db, removing all data from disk.
344    pub async fn destroy(self) -> Result<(), Error> {
345        // Clean up bitmap metadata partition.
346        self.metadata.into_inner().destroy().await?;
347
348        // Clean up Any components (MMR and log).
349        self.any.destroy().await
350    }
351}
352
353impl<E, K, V, U, C, I, H, const N: usize> Db<E, C, I, H, U, N>
354where
355    E: Storage + Clock + Metrics,
356    K: Key,
357    V: ValueEncoding,
358    U: Update<K, V> + 'static,
359    C: Mutable<Item = Operation<K, V, U>> + Persistable<Error = JournalError>,
360    I: UnorderedIndex<Value = Location>,
361    H: Hasher,
362    Operation<K, V, U>: Codec,
363{
364    /// Apply a changeset to the database.
365    ///
366    /// A changeset is only valid if the database has not been modified since the
367    /// batch that produced it was created. Multiple batches can be forked from the
368    /// same parent for speculative execution, but only one may be applied. Applying
369    /// a stale changeset returns [`Error::StaleChangeset`].
370    ///
371    /// Returns the range of locations written.
372    pub async fn apply_batch(
373        &mut self,
374        batch: super::batch::Changeset<K, H::Digest, Operation<K, V, U>, N>,
375    ) -> Result<Range<Location>, Error> {
376        // 1. Apply inner any batch (writes ops, updates snapshot).
377        let range = self.any.apply_batch(batch.inner).await?;
378
379        // 2. Push new bits FIRST. Must happen before clears because for chained
380        //    batches, some clears target locations within the push range
381        //    (ancestor-segment superseded ops that were pushed as active by an
382        //    ancestor and then superseded by a descendant).
383        for &bit in &batch.bitmap_pushes {
384            self.status.push(bit);
385        }
386
387        // 3. Clear superseded locations: previous commit inactivation, diff
388        //    base_old_locs, and ancestor-segment superseded locations (chaining).
389        for loc in &batch.bitmap_clears {
390            self.status.set_bit(**loc, false);
391        }
392
393        // 4. Apply precomputed grafted MMR changeset from merkleize().
394        self.grafted_mmr.apply(batch.grafted_changeset)?;
395
396        // 5. Prune bitmap chunks fully below the inactivity floor.
397        self.status.prune_to_bit(*self.any.inactivity_floor_loc);
398
399        // 6. Prune the grafted MMR to match.
400        let pruned_chunks = self.status.pruned_chunks() as u64;
401        if pruned_chunks > 0 {
402            let prune_loc = Location::new(pruned_chunks);
403            if prune_loc > self.grafted_mmr.bounds().start {
404                self.grafted_mmr.prune(prune_loc)?;
405            }
406        }
407
408        // 7. Use precomputed canonical root from merkleize().
409        self.root = batch.canonical_root;
410
411        Ok(range)
412    }
413}
414
415impl<E, K, V, U, C, I, H, const N: usize> Persistable for Db<E, C, I, H, U, N>
416where
417    E: Storage + Clock + Metrics,
418    K: Key,
419    V: ValueEncoding,
420    U: Update<K, V>,
421    C: Mutable<Item = Operation<K, V, U>> + Persistable<Error = JournalError>,
422    I: UnorderedIndex<Value = Location>,
423    H: Hasher,
424    Operation<K, V, U>: Codec,
425{
426    type Error = Error;
427
428    async fn commit(&self) -> Result<(), Error> {
429        // No-op, DB already in recoverable state.
430        Ok(())
431    }
432
433    async fn sync(&self) -> Result<(), Error> {
434        Self::sync(self).await
435    }
436
437    async fn destroy(self) -> Result<(), Error> {
438        self.destroy().await
439    }
440}
441
442// TODO(https://github.com/commonwarexyz/monorepo/issues/2560): This is broken -- it's computing
443// proofs only over the any db mmr not the grafted mmr, so they won't validate against the grafted
444// root.
445impl<E, K, V, U, C, I, H, const N: usize> MerkleizedStore for Db<E, C, I, H, U, N>
446where
447    E: Storage + Clock + Metrics,
448    K: Key,
449    V: ValueEncoding,
450    U: Update<K, V>,
451    C: Mutable<Item = Operation<K, V, U>>,
452    I: UnorderedIndex<Value = Location>,
453    H: Hasher,
454    Operation<K, V, U>: Codec,
455{
456    type Digest = H::Digest;
457    type Operation = Operation<K, V, U>;
458
459    fn root(&self) -> H::Digest {
460        self.root()
461    }
462
463    async fn historical_proof(
464        &self,
465        historical_size: Location,
466        start_loc: Location,
467        max_ops: NonZeroU64,
468    ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
469        self.any
470            .historical_proof(historical_size, start_loc, max_ops)
471            .await
472    }
473}
474
475impl<E, K, V, U, C, I, H, const N: usize> LogStore for Db<E, C, I, H, U, N>
476where
477    E: Storage + Clock + Metrics,
478    K: Key,
479    V: ValueEncoding,
480    U: Update<K, V>,
481    C: Contiguous<Item = Operation<K, V, U>>,
482    I: UnorderedIndex<Value = Location>,
483    H: Hasher,
484    Operation<K, V, U>: Codec,
485{
486    type Value = V::Value;
487
488    async fn bounds(&self) -> std::ops::Range<Location> {
489        self.any.bounds().await
490    }
491
492    async fn get_metadata(&self) -> Result<Option<V::Value>, Error> {
493        self.get_metadata().await
494    }
495}
496
497impl<E, K, V, U, C, I, H, const N: usize> PrunableStore for Db<E, C, I, H, U, N>
498where
499    E: Storage + Clock + Metrics,
500    K: Key,
501    V: ValueEncoding,
502    U: Update<K, V>,
503    C: Mutable<Item = Operation<K, V, U>>,
504    I: UnorderedIndex<Value = Location>,
505    H: Hasher,
506    Operation<K, V, U>: Codec,
507{
508    async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
509        self.prune(prune_loc).await
510    }
511
512    async fn inactivity_floor_loc(&self) -> Location {
513        self.inactivity_floor_loc()
514    }
515}
516
517/// Returns `Some((last_chunk, next_bit))` if the bitmap has an incomplete trailing chunk, or
518/// `None` if all bits fall on complete chunk boundaries.
519pub(super) fn partial_chunk<B: super::batch::BitmapRead<N>, const N: usize>(
520    bitmap: &B,
521) -> Option<([u8; N], u64)> {
522    let (last_chunk, next_bit) = bitmap.last_chunk();
523    if next_bit == BitMap::<N>::CHUNK_SIZE_BITS {
524        None
525    } else {
526        Some((last_chunk, next_bit))
527    }
528}
529
530/// Compute the canonical root from the ops root, grafted MMR root, and optional partial chunk.
531///
532/// See the [Root structure](super) section in the module documentation.
533pub(super) fn combine_roots<H: Hasher>(
534    hasher: &mut StandardHasher<H>,
535    ops_root: &H::Digest,
536    grafted_mmr_root: &H::Digest,
537    partial: Option<(u64, &H::Digest)>,
538) -> H::Digest {
539    hasher.inner().update(ops_root);
540    hasher.inner().update(grafted_mmr_root);
541    if let Some((next_bit, last_chunk_digest)) = partial {
542        hasher.inner().update(&next_bit.to_be_bytes());
543        hasher.inner().update(last_chunk_digest);
544    }
545    hasher.inner().finalize()
546}
547
548/// Compute the canonical root digest of a [Db].
549///
550/// See the [Root structure](super) section in the module documentation.
551pub(super) async fn compute_db_root<
552    H: Hasher,
553    G: mmr::read::Readable<H::Digest>,
554    S: mmr::storage::Storage<H::Digest>,
555    const N: usize,
556>(
557    hasher: &mut StandardHasher<H>,
558    storage: &grafting::Storage<'_, H::Digest, G, S>,
559    partial_chunk: Option<([u8; N], u64)>,
560    ops_root: &H::Digest,
561) -> Result<H::Digest, Error> {
562    let grafted_mmr_root = compute_grafted_mmr_root(hasher, storage).await?;
563    let partial = partial_chunk.map(|(chunk, next_bit)| {
564        let digest = hasher.digest(&chunk);
565        (next_bit, digest)
566    });
567    Ok(combine_roots(
568        hasher,
569        ops_root,
570        &grafted_mmr_root,
571        partial.as_ref().map(|(nb, d)| (*nb, d)),
572    ))
573}
574
575/// Compute the root of the grafted MMR.
576///
577/// `storage` is the grafted storage over the grafted MMR and the ops MMR.
578pub(super) async fn compute_grafted_mmr_root<
579    H: Hasher,
580    G: mmr::read::Readable<H::Digest>,
581    S: mmr::storage::Storage<H::Digest>,
582>(
583    hasher: &mut StandardHasher<H>,
584    storage: &grafting::Storage<'_, H::Digest, G, S>,
585) -> Result<H::Digest, Error> {
586    let size = storage.size().await;
587    let leaves = Location::try_from(size).map_err(mmr::Error::from)?;
588
589    // Collect peak digests from the grafted storage, which transparently dispatches
590    // to the grafted MMR or the ops MMR based on height.
591    let mut peaks = Vec::new();
592    for (peak_pos, _) in PeakIterator::new(size) {
593        let digest = storage
594            .get_node(peak_pos)
595            .await?
596            .ok_or(mmr::Error::MissingNode(peak_pos))?;
597        peaks.push(digest);
598    }
599
600    Ok(hasher.root(leaves, peaks.iter()))
601}
602
603/// Compute grafted leaf digests for the given bitmap chunks as `(ops_pos, digest)` pairs.
604///
605/// Each grafted leaf is `hash(chunk || ops_subtree_root)`, except for all-zero chunks where
606/// the grafted leaf equals the ops subtree root directly (zero-chunk identity).
607///
608/// When a thread pool is provided and there are enough chunks, hashing is parallelized.
609pub(super) async fn compute_grafted_leaves<H: Hasher, const N: usize>(
610    hasher: &mut StandardHasher<H>,
611    ops_mmr: &impl mmr::storage::Storage<H::Digest>,
612    chunks: impl IntoIterator<Item = (usize, [u8; N])>,
613    pool: Option<&ThreadPool>,
614) -> Result<Vec<(Position, H::Digest)>, Error> {
615    let grafting_height = grafting::height::<N>();
616
617    // (ops_pos, ops_digest, chunk) for each chunk, where ops_pos is the position of the ops MMR
618    // node on which to graft the chunk, and ops_digest is the digest of that node.
619    let inputs = try_join_all(chunks.into_iter().map(|(chunk_idx, chunk)| {
620        let ops_pos = grafting::chunk_idx_to_ops_pos(chunk_idx as u64, grafting_height);
621        async move {
622            let ops_digest = ops_mmr
623                .get_node(ops_pos)
624                .await?
625                .ok_or(mmr::Error::MissingGraftedLeaf(ops_pos))?;
626            Ok::<_, Error>((ops_pos, ops_digest, chunk))
627        }
628    }))
629    .await?;
630
631    // Compute grafted leaf for each chunk.
632    let zero_chunk = [0u8; N];
633    Ok(match pool.filter(|_| inputs.len() >= MIN_TO_PARALLELIZE) {
634        Some(pool) => pool.install(|| {
635            inputs
636                .into_par_iter()
637                .map_init(
638                    || hasher.fork(),
639                    |h, (ops_pos, ops_digest, chunk)| {
640                        if chunk == zero_chunk {
641                            (ops_pos, ops_digest)
642                        } else {
643                            h.inner().update(&chunk);
644                            h.inner().update(&ops_digest);
645                            (ops_pos, h.inner().finalize())
646                        }
647                    },
648                )
649                .collect()
650        }),
651        None => inputs
652            .into_iter()
653            .map(|(ops_pos, ops_digest, chunk)| {
654                if chunk == zero_chunk {
655                    (ops_pos, ops_digest)
656                } else {
657                    hasher.inner().update(&chunk);
658                    hasher.inner().update(&ops_digest);
659                    (ops_pos, hasher.inner().finalize())
660                }
661            })
662            .collect(),
663    })
664}
665
666/// Build a grafted [mmr::mem::Mmr] from scratch using bitmap chunks and the ops MMR.
667///
668/// For each non-pruned complete chunk (index in `pruned_chunks..complete_chunks`), reads the
669/// ops MMR node at the grafting height to compute the grafted leaf (see the
670/// [grafted leaf formula](super) in the module documentation). The caller must ensure that all
671/// ops MMR nodes for chunks >= `bitmap.pruned_chunks()` are still accessible in the ops MMR
672/// (i.e., not pruned from the journal).
673pub(super) async fn build_grafted_mmr<H: Hasher, const N: usize>(
674    hasher: &mut StandardHasher<H>,
675    bitmap: &BitMap<N>,
676    pinned_nodes: &[H::Digest],
677    ops_mmr: &impl mmr::storage::Storage<H::Digest>,
678    pool: Option<&ThreadPool>,
679) -> Result<mmr::mem::Mmr<H::Digest>, Error> {
680    let grafting_height = grafting::height::<N>();
681    let pruned_chunks = bitmap.pruned_chunks();
682    let complete_chunks = bitmap.complete_chunks();
683
684    // Compute grafted leaves for each unpruned complete chunk.
685    let leaves = compute_grafted_leaves::<H, N>(
686        hasher,
687        ops_mmr,
688        (pruned_chunks..complete_chunks).map(|chunk_idx| (chunk_idx, *bitmap.get_chunk(chunk_idx))),
689        pool,
690    )
691    .await?;
692
693    // Build a base Mmr: either from pruned components or empty.
694    let mut grafted_hasher = grafting::GraftedHasher::new(hasher.fork(), grafting_height);
695    let mut grafted_mmr = if pruned_chunks > 0 {
696        let grafted_pruned_to = Location::new(pruned_chunks as u64);
697        mmr::mem::Mmr::from_components(
698            &mut grafted_hasher,
699            Vec::new(),
700            grafted_pruned_to,
701            pinned_nodes.to_vec(),
702        )?
703    } else {
704        mmr::mem::Mmr::new(&mut grafted_hasher)
705    };
706
707    // Add each grafted leaf digest.
708    if !leaves.is_empty() {
709        let changeset = {
710            let mut batch = grafted_mmr.new_batch().with_pool(pool.cloned());
711            for &(_ops_pos, digest) in &leaves {
712                batch.add_leaf_digest(digest);
713            }
714            batch.merkleize(&mut grafted_hasher).finalize()
715        };
716        grafted_mmr.apply(changeset)?;
717    }
718
719    Ok(grafted_mmr)
720}
721
722/// Load the metadata and recover the pruning state persisted by previous runs.
723///
724/// The metadata store holds two kinds of entries (keyed by prefix):
725/// - **Pruned chunks count** ([PRUNED_CHUNKS_PREFIX]): the number of bitmap chunks that have been
726///   pruned. This tells us where the active portion of the bitmap begins.
727/// - **Pinned node digests** ([NODE_PREFIX]): grafted MMR digests at peak positions whose
728///   underlying data has been pruned. These are needed to recompute the grafted MMR root without
729///   the pruned chunks.
730///
731/// Returns `(metadata_handle, pruned_chunks, pinned_node_digests)`.
732pub(super) async fn init_metadata<E: Storage + Clock + Metrics, D: Digest>(
733    context: E,
734    partition: &str,
735) -> Result<(Metadata<E, U64, Vec<u8>>, usize, Vec<D>), Error> {
736    let metadata_cfg = MConfig {
737        partition: partition.into(),
738        codec_config: ((0..).into(), ()),
739    };
740    let metadata =
741        Metadata::<_, U64, Vec<u8>>::init(context.with_label("metadata"), metadata_cfg).await?;
742
743    let key = U64::new(PRUNED_CHUNKS_PREFIX, 0);
744    let pruned_chunks = match metadata.get(&key) {
745        Some(bytes) => u64::from_be_bytes(bytes.as_slice().try_into().map_err(|_| {
746            error!("pruned chunks value not a valid u64");
747            Error::DataCorrupted("pruned chunks value not a valid u64")
748        })?),
749        None => {
750            warn!("bitmap metadata does not contain pruned chunks, initializing as empty");
751            0
752        }
753    } as usize;
754
755    // Load pinned nodes if database was pruned. We use nodes_to_pin on the grafted leaf count
756    // to determine how many peaks to read. (Multiplying pruned_chunks by chunk_size is a
757    // left-shift, preserving popcount, so the peak count is the same in grafted or ops space.)
758    let pinned_nodes = if pruned_chunks > 0 {
759        let mmr_size = Position::try_from(Location::new(pruned_chunks as u64))?;
760        let mut pinned = Vec::new();
761        for (index, pos) in nodes_to_pin(mmr_size).enumerate() {
762            let metadata_key = U64::new(NODE_PREFIX, index as u64);
763            let Some(bytes) = metadata.get(&metadata_key) else {
764                return Err(mmr::Error::MissingNode(pos).into());
765            };
766            let digest = D::decode(bytes.as_ref()).map_err(|_| mmr::Error::MissingNode(pos))?;
767            pinned.push(digest);
768        }
769        pinned
770    } else {
771        Vec::new()
772    };
773
774    Ok((metadata, pruned_chunks, pinned_nodes))
775}
776
777#[cfg(test)]
778mod tests {
779    use super::*;
780    use commonware_codec::FixedSize;
781    use commonware_cryptography::{sha256, Sha256};
782    use commonware_utils::bitmap::Prunable as PrunableBitMap;
783
784    const N: usize = sha256::Digest::SIZE;
785
786    #[test]
787    fn partial_chunk_single_bit() {
788        let mut bm = PrunableBitMap::<N>::new();
789        bm.push(true);
790        let result = partial_chunk::<PrunableBitMap<N>, N>(&bm);
791        assert!(result.is_some());
792        let (chunk, next_bit) = result.unwrap();
793        assert_eq!(next_bit, 1);
794        assert_eq!(chunk[0], 1); // bit 0 set
795    }
796
797    #[test]
798    fn partial_chunk_aligned() {
799        let mut bm = PrunableBitMap::<N>::new();
800        for _ in 0..PrunableBitMap::<N>::CHUNK_SIZE_BITS {
801            bm.push(true);
802        }
803        let result = partial_chunk::<PrunableBitMap<N>, N>(&bm);
804        assert!(result.is_none());
805    }
806
807    #[test]
808    fn partial_chunk_partial() {
809        let mut bm = PrunableBitMap::<N>::new();
810        for _ in 0..(PrunableBitMap::<N>::CHUNK_SIZE_BITS + 5) {
811            bm.push(true);
812        }
813        let result = partial_chunk::<PrunableBitMap<N>, N>(&bm);
814        assert!(result.is_some());
815        let (_chunk, next_bit) = result.unwrap();
816        assert_eq!(next_bit, 5);
817    }
818
819    #[test]
820    fn combine_roots_deterministic() {
821        let mut h1 = StandardHasher::<Sha256>::new();
822        let mut h2 = StandardHasher::<Sha256>::new();
823        let ops = Sha256::hash(b"ops");
824        let grafted = Sha256::hash(b"grafted");
825        let r1 = combine_roots(&mut h1, &ops, &grafted, None);
826        let r2 = combine_roots(&mut h2, &ops, &grafted, None);
827        assert_eq!(r1, r2);
828    }
829
830    #[test]
831    fn combine_roots_with_partial_differs() {
832        let mut h1 = StandardHasher::<Sha256>::new();
833        let mut h2 = StandardHasher::<Sha256>::new();
834        let ops = Sha256::hash(b"ops");
835        let grafted = Sha256::hash(b"grafted");
836        let partial_digest = Sha256::hash(b"partial");
837
838        let without = combine_roots(&mut h1, &ops, &grafted, None);
839        let with = combine_roots(&mut h2, &ops, &grafted, Some((5, &partial_digest)));
840        assert_ne!(without, with);
841    }
842
843    #[test]
844    fn combine_roots_different_ops_root() {
845        let mut h1 = StandardHasher::<Sha256>::new();
846        let mut h2 = StandardHasher::<Sha256>::new();
847        let ops_a = Sha256::hash(b"ops_a");
848        let ops_b = Sha256::hash(b"ops_b");
849        let grafted = Sha256::hash(b"grafted");
850
851        let r1 = combine_roots(&mut h1, &ops_a, &grafted, None);
852        let r2 = combine_roots(&mut h2, &ops_b, &grafted, None);
853        assert_ne!(r1, r2);
854    }
855}