Skip to main content

commonware_storage/qmdb/current/
db.rs

1//! A shared, generic implementation of the _Current_ QMDB.
2//!
3//! The impl blocks in this file define shared functionality across all Current QMDB variants.
4
5use crate::{
6    index::Unordered as UnorderedIndex,
7    journal::{
8        contiguous::{Contiguous, Mutable, Reader},
9        Error as JournalError,
10    },
11    merkle::{
12        self, batch::MIN_TO_PARALLELIZE, hasher::Standard as StandardHasher, mem::Mem,
13        storage::Storage as MerkleStorage, Location, Position, Readable,
14    },
15    metadata::{Config as MConfig, Metadata},
16    qmdb::{
17        any::{
18            self,
19            operation::{update::Update, Operation},
20        },
21        current::{
22            batch::BitmapBatch,
23            grafting,
24            proof::{OperationProof, RangeProof},
25        },
26        operation::Operation as _,
27        Error,
28    },
29    Context, Persistable,
30};
31use commonware_codec::{Codec, CodecShared, DecodeExt};
32use commonware_cryptography::{Digest, DigestOf, Hasher};
33use commonware_parallel::ThreadPool;
34use commonware_utils::{
35    bitmap::{Prunable as BitMap, Readable as BitmapReadable},
36    sequence::prefixed_u64::U64,
37    sync::AsyncMutex,
38};
39use core::{num::NonZeroU64, ops::Range};
40use futures::future::try_join_all;
41use rayon::prelude::*;
42use std::{collections::BTreeMap, sync::Arc};
43use tracing::{error, warn};
44
45/// Prefix used for the metadata key for grafted tree pinned nodes.
46const NODE_PREFIX: u8 = 0;
47
48/// Prefix used for the metadata key for the number of pruned bitmap chunks.
49const PRUNED_CHUNKS_PREFIX: u8 = 1;
50
51/// A Current QMDB implementation generic over ordered/unordered keys and variable/fixed values.
52pub struct Db<
53    F: merkle::Graftable,
54    E: Context,
55    C: Contiguous<Item: CodecShared>,
56    I: UnorderedIndex<Value = Location<F>>,
57    H: Hasher,
58    U: Send + Sync,
59    const N: usize,
60> {
61    /// An authenticated database that provides the ability to prove whether a key ever had a
62    /// specific value.
63    pub(super) any: any::db::Db<F, E, C, I, H, U>,
64
65    /// The bitmap over the activity status of each operation. Supports augmenting [Db] proofs in
66    /// order to further prove whether a key _currently_ has a specific value.
67    ///
68    /// Stored as a [`BitmapBatch`] so that `apply_batch` can
69    /// push layers in O(batch) instead of deep-cloning.
70    pub(super) status: BitmapBatch<N>,
71
72    /// Each leaf corresponds to a complete bitmap chunk at the grafting height.
73    /// See the [grafted leaf formula](super) in the module documentation.
74    ///
75    /// Internal nodes are hashed using their position in the ops tree rather than their
76    /// grafted position.
77    pub(super) grafted_tree: Mem<F, H::Digest>,
78
79    /// Persists:
80    /// - The number of pruned bitmap chunks at key [PRUNED_CHUNKS_PREFIX]
81    /// - The grafted tree pinned nodes at key [NODE_PREFIX]
82    pub(super) metadata: AsyncMutex<Metadata<E, U64, Vec<u8>>>,
83
84    /// Optional thread pool for parallelizing grafted leaf computation.
85    pub(super) thread_pool: Option<ThreadPool>,
86
87    /// The cached canonical root.
88    /// See the [Root structure](super) section in the module documentation.
89    pub(super) root: DigestOf<H>,
90}
91
92// Shared read-only functionality.
93impl<F, E, C, I, H, U, const N: usize> Db<F, E, C, I, H, U, N>
94where
95    F: merkle::Graftable,
96    E: Context,
97    U: Update,
98    C: Contiguous<Item = Operation<F, U>>,
99    I: UnorderedIndex<Value = Location<F>>,
100    H: Hasher,
101    Operation<F, U>: Codec,
102{
103    /// Return the inactivity floor location. This is the location before which all operations are
104    /// known to be inactive. Operations before this point can be safely pruned.
105    pub const fn inactivity_floor_loc(&self) -> Location<F> {
106        self.any.inactivity_floor_loc()
107    }
108
109    /// Whether the snapshot currently has no active keys.
110    pub const fn is_empty(&self) -> bool {
111        self.any.is_empty()
112    }
113
114    /// Get the metadata associated with the last commit.
115    pub async fn get_metadata(&self) -> Result<Option<U::Value>, Error<F>> {
116        self.any.get_metadata().await
117    }
118
119    /// Return [start, end) where `start` and `end - 1` are the Locations of the oldest and newest
120    /// retained operations respectively.
121    pub async fn bounds(&self) -> std::ops::Range<Location<F>> {
122        self.any.bounds().await
123    }
124
125    /// Return true if the given sequence of `ops` were applied starting at location `start_loc`
126    /// in the log with the provided `root`, having the activity status described by `chunks`.
127    pub fn verify_range_proof(
128        hasher: &mut H,
129        proof: &RangeProof<F, H::Digest>,
130        start_loc: Location<F>,
131        ops: &[Operation<F, U>],
132        chunks: &[[u8; N]],
133        root: &H::Digest,
134    ) -> bool {
135        proof.verify(hasher, start_loc, ops, chunks, root)
136    }
137}
138
139// Functionality requiring non-mutable journal.
140impl<F, E, U, C, I, H, const N: usize> Db<F, E, C, I, H, U, N>
141where
142    F: merkle::Graftable,
143    E: Context,
144    U: Update,
145    C: Contiguous<Item = Operation<F, U>>,
146    I: UnorderedIndex<Value = Location<F>>,
147    H: Hasher,
148    Operation<F, U>: Codec,
149{
150    /// Returns a virtual [grafting::Storage] over the grafted tree and ops tree. For positions at
151    /// or above the grafting height, returns the grafted node. For positions below the grafting
152    /// height, the ops tree is used.
153    fn grafted_storage(&self) -> impl MerkleStorage<F, Digest = H::Digest> + '_ {
154        grafting::Storage::new(
155            &self.grafted_tree,
156            grafting::height::<N>(),
157            &self.any.log.merkle,
158        )
159    }
160
161    /// Returns the canonical root.
162    /// See the [Root structure](super) section in the module documentation.
163    pub const fn root(&self) -> H::Digest {
164        self.root
165    }
166
167    /// Returns the ops tree root.
168    ///
169    /// This is the root of the raw operations log, without the activity bitmap. It is used as the
170    /// sync target because the sync engine verifies batches against the ops root, not the canonical
171    /// root.
172    ///
173    /// See the [Root structure](super) section in the module documentation.
174    pub fn ops_root(&self) -> H::Digest {
175        self.any.log.root()
176    }
177
178    /// Snapshot of the grafted tree for use in batch chains.
179    pub(super) fn grafted_snapshot(&self) -> Arc<merkle::batch::MerkleizedBatch<F, H::Digest>> {
180        merkle::batch::MerkleizedBatch::from_mem(&self.grafted_tree)
181    }
182
183    /// Create a new speculative batch of operations with this database as its parent.
184    pub fn new_batch(&self) -> super::batch::UnmerkleizedBatch<F, H, U, N> {
185        super::batch::UnmerkleizedBatch::new(
186            self.any.new_batch(),
187            self.grafted_snapshot(),
188            self.status.clone(),
189        )
190    }
191
192    /// Returns a proof for the operation at `loc`.
193    pub(super) async fn operation_proof(
194        &self,
195        hasher: &mut H,
196        loc: Location<F>,
197    ) -> Result<OperationProof<F, H::Digest, N>, Error<F>> {
198        let storage = self.grafted_storage();
199        let ops_root = self.any.log.root();
200        OperationProof::new(hasher, &self.status, &storage, loc, ops_root).await
201    }
202
203    /// Returns a proof that the specified range of operations are part of the database, along with
204    /// the operations from the range. A truncated range (from hitting the max) can be detected by
205    /// looking at the length of the returned operations vector. Also returns the bitmap chunks
206    /// required to verify the proof.
207    ///
208    /// # Errors
209    ///
210    /// Returns [Error::OperationPruned] if `start_loc` falls in a pruned bitmap chunk. Returns
211    /// [`crate::merkle::Error::LocationOverflow`] if `start_loc` >
212    /// [`crate::merkle::Family::MAX_LEAVES`]. Returns [`crate::merkle::Error::RangeOutOfBounds`] if
213    /// `start_loc` >= number of leaves in the tree.
214    pub async fn range_proof(
215        &self,
216        hasher: &mut H,
217        start_loc: Location<F>,
218        max_ops: NonZeroU64,
219    ) -> Result<(RangeProof<F, H::Digest>, Vec<Operation<F, U>>, Vec<[u8; N]>), Error<F>> {
220        let storage = self.grafted_storage();
221        let ops_root = self.any.log.root();
222        RangeProof::new_with_ops(
223            hasher,
224            &self.status,
225            &storage,
226            &self.any.log,
227            start_loc,
228            max_ops,
229            ops_root,
230        )
231        .await
232    }
233}
234
235// Functionality requiring mutable journal.
236impl<F, E, U, C, I, H, const N: usize> Db<F, E, C, I, H, U, N>
237where
238    F: merkle::Graftable,
239    E: Context,
240    U: Update,
241    C: Mutable<Item = Operation<F, U>>,
242    I: UnorderedIndex<Value = Location<F>>,
243    H: Hasher,
244    Operation<F, U>: Codec,
245{
246    /// Returns an ops-level historical proof for the specified range.
247    ///
248    /// Unlike [`range_proof`](Self::range_proof) which returns grafted proofs incorporating the
249    /// activity bitmap, this returns standard range proofs suitable for state sync.
250    pub async fn ops_historical_proof(
251        &self,
252        historical_size: Location<F>,
253        start_loc: Location<F>,
254        max_ops: NonZeroU64,
255    ) -> Result<(merkle::Proof<F, H::Digest>, Vec<Operation<F, U>>), Error<F>> {
256        self.any
257            .historical_proof(historical_size, start_loc, max_ops)
258            .await
259    }
260
261    /// Return the pinned nodes for a lower operation boundary of `loc`.
262    pub async fn pinned_nodes_at(&self, loc: Location<F>) -> Result<Vec<H::Digest>, Error<F>> {
263        self.any.pinned_nodes_at(loc).await
264    }
265
266    /// Collapse the accumulated bitmap `Layer` chain into a flat `Base`.
267    ///
268    /// Each [`Db::apply_batch`] pushes a new `Layer` on the bitmap. These layers are cheap
269    /// to create but make subsequent reads walk the full chain. Calling `flatten` collapses
270    /// the chain into a single `Base`, bounding lookup cost.
271    ///
272    /// This is called automatically by [`Db::prune`]. Callers that apply many batches without
273    /// pruning should call this periodically.
274    pub fn flatten(&mut self) {
275        self.status.flatten();
276    }
277
278    /// Prunes historical operations prior to `prune_loc`. This does not affect the db's root or
279    /// snapshot.
280    ///
281    /// # Errors
282    ///
283    /// - Returns [Error::PruneBeyondMinRequired] if `prune_loc` > inactivity floor.
284    /// - Returns [`crate::merkle::Error::LocationOverflow`] if `prune_loc` > [crate::merkle::Family::MAX_LEAVES].
285    pub async fn prune(&mut self, prune_loc: Location<F>) -> Result<(), Error<F>> {
286        self.flatten();
287
288        // Prune bitmap chunks below the inactivity floor.
289        let BitmapBatch::<N>::Base(base) = &mut self.status else {
290            unreachable!("flatten() guarantees Base");
291        };
292        Arc::make_mut(base).prune_to_bit(*self.any.inactivity_floor_loc);
293
294        // Prune the grafted tree to match the bitmap's pruned chunks.
295        let pruned_chunks = self.status.pruned_chunks() as u64;
296        if pruned_chunks > 0 {
297            let prune_loc_grafted = Location::<F>::new(pruned_chunks);
298            let bounds_start = self.grafted_tree.bounds().start;
299            let grafted_prune_pos =
300                Position::try_from(prune_loc_grafted).expect("valid leaf count");
301            if prune_loc_grafted > bounds_start {
302                let root = *self.grafted_tree.root();
303                let size = self.grafted_tree.size();
304
305                let mut pinned = BTreeMap::new();
306                for pos in F::nodes_to_pin(prune_loc_grafted) {
307                    pinned.insert(
308                        pos,
309                        self.grafted_tree
310                            .get_node(pos)
311                            .expect("pinned peak must exist"),
312                    );
313                }
314                let mut retained = Vec::with_capacity((*size - *grafted_prune_pos) as usize);
315                for p in *grafted_prune_pos..*size {
316                    retained.push(
317                        self.grafted_tree
318                            .get_node(Position::new(p))
319                            .expect("retained node must exist"),
320                    );
321                }
322                self.grafted_tree =
323                    Mem::from_pruned_with_retained(root, grafted_prune_pos, pinned, retained);
324            }
325        }
326
327        // Persist grafted tree pruning state before pruning the ops log. If the subsequent
328        // `any.prune` fails, the metadata is ahead of the log, which is safe: on recovery,
329        // `build_grafted_tree` will recompute from the (un-pruned) log and the metadata
330        // simply records peaks that haven't been pruned yet. The reverse order would be unsafe:
331        // a pruned log with stale metadata would lose peak digests permanently.
332        self.sync_metadata().await?;
333
334        self.any.prune(prune_loc).await
335    }
336
337    /// Rewind the database to `size` operations, where `size` is the location of the next append.
338    ///
339    /// This rewinds the underlying Any database and rebuilds the Current overlay state (bitmap,
340    /// grafted tree, and canonical root) for the rewound size.
341    ///
342    /// # Errors
343    ///
344    /// Returns an error when:
345    /// - `size` is not a valid rewind target
346    /// - the target's required logical range is not fully retained (for Current, this includes the
347    ///   underlying Any inactivity-floor boundary and bitmap pruning boundary)
348    /// - `size - 1` is not a commit operation
349    /// - `size` is below the bitmap pruning boundary
350    ///
351    /// Any error from this method is fatal for this handle. Rewind may mutate state in the
352    /// underlying Any database before this Current overlay finishes rebuilding. Callers must drop
353    /// this database handle after any `Err` from `rewind` and reopen from storage.
354    ///
355    /// A successful rewind is not restart-stable until a subsequent [`Db::commit`] or
356    /// [`Db::sync`].
357    pub async fn rewind(&mut self, size: Location<F>) -> Result<(), Error<F>> {
358        self.flatten();
359
360        let rewind_size = *size;
361        let current_size = *self.any.last_commit_loc + 1;
362        if rewind_size == current_size {
363            return Ok(());
364        }
365        if rewind_size == 0 || rewind_size > current_size {
366            return Err(Error::Journal(JournalError::InvalidRewind(rewind_size)));
367        }
368
369        let pruned_chunks = self.status.pruned_chunks();
370        let pruned_bits = (pruned_chunks as u64)
371            .checked_mul(BitMap::<N>::CHUNK_SIZE_BITS)
372            .ok_or_else(|| Error::DataCorrupted("pruned ops leaves overflow"))?;
373        if rewind_size < pruned_bits {
374            return Err(Error::Journal(JournalError::ItemPruned(rewind_size - 1)));
375        }
376
377        // Ensure the target commit's logical range is fully representable with the current
378        // bitmap pruning boundary. Even if the ops log still retains older entries, rewinding
379        // to a commit with floor below `pruned_bits` would require bitmap chunks we've already
380        // discarded.
381        {
382            let reader = self.any.log.reader().await;
383            let rewind_last_loc = Location::<F>::new(rewind_size - 1);
384            let rewind_last_op = reader.read(*rewind_last_loc).await?;
385            let Some(rewind_floor) = rewind_last_op.has_floor() else {
386                return Err(Error::<F>::UnexpectedData(rewind_last_loc));
387            };
388            if *rewind_floor < pruned_bits {
389                return Err(Error::<F>::Journal(JournalError::ItemPruned(*rewind_floor)));
390            }
391        }
392
393        // Extract pinned nodes for the existing pruning boundary from the in-memory grafted tree.
394        let pinned_nodes = if pruned_chunks > 0 {
395            let grafted_leaves = Location::<F>::new(pruned_chunks as u64);
396            let mut pinned_nodes = Vec::new();
397            for pos in F::nodes_to_pin(grafted_leaves) {
398                let digest = self
399                    .grafted_tree
400                    .get_node(pos)
401                    .ok_or(Error::<F>::DataCorrupted("missing grafted pinned node"))?;
402                pinned_nodes.push(digest);
403            }
404            pinned_nodes
405        } else {
406            Vec::new()
407        };
408
409        // Rewind underlying ops log + Any state. If a later overlay rebuild step fails, this
410        // handle may be internally diverged and must be dropped by the caller.
411        let restored_locs = self.any.rewind(size).await?;
412
413        // Patch bitmap: truncate to rewound size, then mark restored locations as active.
414        {
415            let BitmapBatch::<N>::Base(base) = &mut self.status else {
416                unreachable!("flatten() guarantees Base");
417            };
418            let status: &mut BitMap<N> = Arc::get_mut(base).expect("flatten ensures sole owner");
419            status.truncate(rewind_size);
420            for loc in &restored_locs {
421                status.set_bit(**loc, true);
422            }
423            status.set_bit(rewind_size - 1, true);
424        }
425        let BitmapBatch::Base(status) = &self.status else {
426            unreachable!("flatten() guarantees Base");
427        };
428        let status = status.as_ref();
429
430        // Rebuild grafted tree and canonical root for the patched bitmap.
431        let hasher = StandardHasher::<H>::new();
432        let grafted_tree = build_grafted_tree::<F, H, N>(
433            &hasher,
434            status,
435            &pinned_nodes,
436            &self.any.log.merkle,
437            self.thread_pool.as_ref(),
438        )
439        .await?;
440        let storage =
441            grafting::Storage::new(&grafted_tree, grafting::height::<N>(), &self.any.log.merkle);
442        let partial_chunk = partial_chunk(status);
443        let ops_root = self.any.log.root();
444        let root = compute_db_root(&hasher, status, &storage, partial_chunk, &ops_root).await?;
445
446        self.grafted_tree = grafted_tree;
447        self.root = root;
448
449        Ok(())
450    }
451
452    /// Sync the metadata to disk.
453    pub(crate) async fn sync_metadata(&self) -> Result<(), Error<F>> {
454        let mut metadata = self.metadata.lock().await;
455        metadata.clear();
456
457        // Write the number of pruned chunks.
458        let key = U64::new(PRUNED_CHUNKS_PREFIX, 0);
459        metadata.put(
460            key,
461            (self.status.pruned_chunks() as u64).to_be_bytes().to_vec(),
462        );
463
464        // Write the pinned nodes of the grafted tree.
465        let pruned_chunks = Location::<F>::new(self.status.pruned_chunks() as u64);
466        for (i, grafted_pos) in F::nodes_to_pin(pruned_chunks).enumerate() {
467            let digest = self
468                .grafted_tree
469                .get_node(grafted_pos)
470                .ok_or(Error::<F>::DataCorrupted("missing grafted pinned node"))?;
471            let key = U64::new(NODE_PREFIX, i as u64);
472            metadata.put(key, digest.to_vec());
473        }
474
475        metadata.sync().await?;
476
477        Ok(())
478    }
479}
480
481// Functionality requiring mutable + persistable journal.
482impl<F, E, U, C, I, H, const N: usize> Db<F, E, C, I, H, U, N>
483where
484    F: merkle::Graftable,
485    E: Context,
486    U: Update,
487    C: Mutable<Item = Operation<F, U>> + Persistable<Error = JournalError>,
488    I: UnorderedIndex<Value = Location<F>>,
489    H: Hasher,
490    Operation<F, U>: Codec,
491{
492    /// Durably commit the journal state published by prior [`Db::apply_batch`]
493    /// calls.
494    pub async fn commit(&self) -> Result<(), Error<F>> {
495        self.any.commit().await
496    }
497
498    /// Sync all database state to disk.
499    pub async fn sync(&self) -> Result<(), Error<F>> {
500        self.any.sync().await?;
501
502        // Write the bitmap pruning boundary to disk so that next startup doesn't have to
503        // re-Merkleize the inactive portion up to the inactivity floor.
504        self.sync_metadata().await
505    }
506
507    /// Destroy the db, removing all data from disk.
508    pub async fn destroy(self) -> Result<(), Error<F>> {
509        self.metadata.into_inner().destroy().await?;
510        self.any.destroy().await
511    }
512}
513
514impl<F, E, U, C, I, H, const N: usize> Db<F, E, C, I, H, U, N>
515where
516    F: merkle::Graftable,
517    E: Context,
518    U: Update + 'static,
519    C: Mutable<Item = Operation<F, U>> + Persistable<Error = JournalError>,
520    I: UnorderedIndex<Value = Location<F>>,
521    H: Hasher,
522    Operation<F, U>: Codec,
523{
524    /// Apply a batch to the database, returning the range of written operations.
525    ///
526    /// A batch is valid only if every batch applied to the database since this batch's
527    /// ancestor chain was created is an ancestor of this batch. Applying a batch from a
528    /// different fork returns [`Error::StaleBatch`].
529    ///
530    /// This publishes the batch to the in-memory Current view and appends it to the journal,
531    /// but does not durably persist it. Call [`Db::commit`] or [`Db::sync`] to guarantee
532    /// durability.
533    pub async fn apply_batch(
534        &mut self,
535        batch: Arc<super::batch::MerkleizedBatch<F, H::Digest, U, N>>,
536    ) -> Result<Range<Location<F>>, Error<F>> {
537        // Staleness is checked by self.any.apply_batch() below.
538        let db_size = *self.any.last_commit_loc + 1;
539
540        // 1. Apply inner any-layer batch (handles snapshot + journal partial skipping).
541        let range = self.any.apply_batch(Arc::clone(&batch.inner)).await?;
542
543        // 2. Apply bitmap overlay. The batch's bitmap is a Layer whose overlay
544        //    contains all dirty chunks. Walk the layer chain to collect and apply
545        //    all uncommitted ancestor overlays + this batch's overlay.
546        {
547            let mut overlays = Vec::new();
548            let mut current = &batch.bitmap;
549            while let super::batch::BitmapBatch::Layer(layer) = current {
550                if layer.overlay.len <= db_size {
551                    break;
552                }
553                overlays.push(Arc::clone(&layer.overlay));
554                current = &layer.parent;
555            }
556            // Apply in chronological order (deepest ancestor first).
557            for overlay in overlays.into_iter().rev() {
558                self.status.apply_overlay(overlay);
559            }
560        }
561
562        // 3. Apply grafted tree (merkle layer handles partial ancestor skipping).
563        self.grafted_tree.apply_batch(&batch.grafted)?;
564
565        // 4. Canonical root.
566        self.root = batch.canonical_root;
567
568        Ok(range)
569    }
570}
571
572impl<F, E, U, C, I, H, const N: usize> Persistable for Db<F, E, C, I, H, U, N>
573where
574    F: merkle::Graftable,
575    E: Context,
576    U: Update,
577    C: Mutable<Item = Operation<F, U>> + Persistable<Error = JournalError>,
578    I: UnorderedIndex<Value = Location<F>>,
579    H: Hasher,
580    Operation<F, U>: Codec,
581{
582    type Error = Error<F>;
583
584    async fn commit(&self) -> Result<(), Error<F>> {
585        Self::commit(self).await
586    }
587
588    async fn sync(&self) -> Result<(), Error<F>> {
589        Self::sync(self).await
590    }
591
592    async fn destroy(self) -> Result<(), Error<F>> {
593        self.destroy().await
594    }
595}
596
597/// Returns `Some((last_chunk, next_bit))` if the bitmap has an incomplete trailing chunk, or
598/// `None` if all bits fall on complete chunk boundaries.
599pub(super) fn partial_chunk<B: BitmapReadable<N>, const N: usize>(
600    bitmap: &B,
601) -> Option<([u8; N], u64)> {
602    let (last_chunk, next_bit) = bitmap.last_chunk();
603    if next_bit == BitMap::<N>::CHUNK_SIZE_BITS {
604        None
605    } else {
606        Some((last_chunk, next_bit))
607    }
608}
609
610/// Compute the canonical root from the ops root, grafted tree root, and optional partial chunk.
611///
612/// See the [Root structure](super) section in the module documentation.
613pub(super) fn combine_roots<H: Hasher>(
614    hasher: &StandardHasher<H>,
615    ops_root: &H::Digest,
616    grafted_root: &H::Digest,
617    partial: Option<(u64, &H::Digest)>,
618) -> H::Digest {
619    match partial {
620        Some((next_bit, last_chunk_digest)) => {
621            let next_bit = next_bit.to_be_bytes();
622            hasher.hash([
623                ops_root.as_ref(),
624                grafted_root.as_ref(),
625                next_bit.as_slice(),
626                last_chunk_digest.as_ref(),
627            ])
628        }
629        None => hasher.hash([ops_root.as_ref(), grafted_root.as_ref()]),
630    }
631}
632
633/// Compute the canonical root digest of a [Db].
634///
635/// See the [Root structure](super) section in the module documentation.
636pub(super) async fn compute_db_root<
637    F: merkle::Graftable,
638    H: Hasher,
639    B: BitmapReadable<N>,
640    G: Readable<Family = F, Digest = H::Digest, Error = merkle::Error<F>>,
641    S: MerkleStorage<F, Digest = H::Digest>,
642    const N: usize,
643>(
644    hasher: &StandardHasher<H>,
645    status: &B,
646    storage: &grafting::Storage<'_, F, H::Digest, G, S>,
647    partial_chunk: Option<([u8; N], u64)>,
648    ops_root: &H::Digest,
649) -> Result<H::Digest, Error<F>> {
650    let grafted_root = compute_grafted_root(hasher, status, storage).await?;
651    let partial = partial_chunk.map(|(chunk, next_bit)| {
652        let digest = hasher.digest(&chunk);
653        (next_bit, digest)
654    });
655    Ok(combine_roots(
656        hasher,
657        ops_root,
658        &grafted_root,
659        partial.as_ref().map(|(nb, d)| (*nb, d)),
660    ))
661}
662
663/// Compute the root of the grafted structure represented by `storage`.
664///
665/// We use [`grafting::grafted_root`] instead of a standard `hasher.root()` fold to correctly handle
666/// grafting over MMB (Merkle Mountain Belt) structures. In an MMB, the trailing operations at the
667/// right edge of the structure might not be numerous enough to form a complete subtree at the
668/// grafting height. Therefore, a single bitmap chunk may span across multiple smaller ops peaks.
669/// `grafting::grafted_root` intercepts the folding process to group these sub-grafting-height
670/// peaks, hash them together with their corresponding bitmap chunks, and then complete the final
671/// fold. For MMR, this produces the exact same result as `hasher.root()`.
672pub(super) async fn compute_grafted_root<
673    F: merkle::Graftable,
674    H: Hasher,
675    B: BitmapReadable<N>,
676    G: Readable<Family = F, Digest = H::Digest, Error = merkle::Error<F>>,
677    S: MerkleStorage<F, Digest = H::Digest>,
678    const N: usize,
679>(
680    hasher: &StandardHasher<H>,
681    status: &B,
682    storage: &grafting::Storage<'_, F, H::Digest, G, S>,
683) -> Result<H::Digest, Error<F>> {
684    let size = storage.size().await;
685    let leaves = Location::try_from(size)?;
686
687    // Collect peak digests of the grafted structure.
688    let mut peaks: Vec<H::Digest> = Vec::new();
689    for (peak_pos, _) in F::peaks(size) {
690        let digest = storage
691            .get_node(peak_pos)
692            .await?
693            .ok_or(merkle::Error::<F>::MissingNode(peak_pos))?;
694        peaks.push(digest);
695    }
696
697    let grafting_height = grafting::height::<N>();
698    let complete_chunks = status.complete_chunks() as u64;
699    let pruned_chunks = status.pruned_chunks() as u64;
700
701    Ok(grafting::grafted_root(
702        hasher,
703        leaves,
704        &peaks,
705        grafting_height,
706        |chunk_idx| {
707            if chunk_idx < complete_chunks {
708                // Pruned chunks are guaranteed to be all-zero (only chunks with no active
709                // operations are prunable), so a synthetic zero chunk produces the correct grafted
710                // digest via the zero-chunk identity shortcut.
711                if chunk_idx < pruned_chunks {
712                    Some([0u8; N])
713                } else {
714                    Some(status.get_chunk(chunk_idx as usize))
715                }
716            } else {
717                None
718            }
719        },
720    ))
721}
722
723/// Compute grafted leaf digests for the given bitmap chunks as `(chunk_idx, digest)` pairs.
724///
725/// For each chunk, reads the covering peak digests from the ops structure via
726/// [`Graftable::chunk_peaks`](merkle::Graftable::chunk_peaks), folds them into a single
727/// `chunk_ops_digest`, then combines with the bitmap chunk: `hash(chunk || chunk_ops_digest)`. For
728/// all-zero chunks the grafted leaf equals the `chunk_ops_digest` directly (zero-chunk identity).
729///
730/// When a thread pool is provided and there are enough chunks, hashing is parallelized.
731pub(super) async fn compute_grafted_leaves<F: merkle::Graftable, H: Hasher, const N: usize>(
732    hasher: &StandardHasher<H>,
733    ops_tree: &impl MerkleStorage<F, Digest = H::Digest>,
734    chunks: impl IntoIterator<Item = (usize, [u8; N])>,
735    pool: Option<&ThreadPool>,
736) -> Result<Vec<(usize, H::Digest)>, Error<F>> {
737    let grafting_height = grafting::height::<N>();
738    let ops_size = ops_tree.size().await;
739
740    // For each chunk, read the covering peak digests and fold them into a single
741    // chunk_ops_digest. With MMR there is always exactly one peak; with MMB there
742    // may be multiple. The fold happens inline to avoid per-chunk Vec allocations.
743    let inputs = try_join_all(chunks.into_iter().map(|(chunk_idx, chunk)| async move {
744        let mut chunk_ops_digest: Option<H::Digest> = None;
745        for (pos, _) in F::chunk_peaks(ops_size, chunk_idx as u64, grafting_height) {
746            let digest = ops_tree
747                .get_node(pos)
748                .await?
749                .ok_or(merkle::Error::<F>::MissingGraftedLeaf(pos))?;
750            chunk_ops_digest = Some(
751                chunk_ops_digest.map_or(digest, |acc| hasher.hash([acc.as_ref(), digest.as_ref()])),
752            );
753        }
754        let chunk_ops_digest =
755            chunk_ops_digest.expect("chunk must have at least one covering peak");
756        Ok::<_, Error<F>>((chunk_idx, chunk_ops_digest, chunk))
757    }))
758    .await?;
759
760    // Compute the grafted leaf digest for each chunk. For all-zero chunks, the
761    // grafted leaf equals the chunk_ops_digest directly (zero-chunk identity).
762    let zero_chunk = [0u8; N];
763    let graft =
764        |h: &StandardHasher<H>, chunk_idx: usize, chunk_ops_digest: H::Digest, chunk: [u8; N]| {
765            if chunk == zero_chunk {
766                (chunk_idx, chunk_ops_digest)
767            } else {
768                (
769                    chunk_idx,
770                    h.hash([chunk.as_slice(), chunk_ops_digest.as_ref()]),
771                )
772            }
773        };
774
775    Ok(match pool.filter(|_| inputs.len() >= MIN_TO_PARALLELIZE) {
776        Some(pool) => pool.install(|| {
777            inputs
778                .into_par_iter()
779                .map_init(
780                    || hasher.clone(),
781                    |h, (chunk_idx, chunk_ops_digest, chunk)| {
782                        graft(h, chunk_idx, chunk_ops_digest, chunk)
783                    },
784                )
785                .collect()
786        }),
787        None => inputs
788            .into_iter()
789            .map(|(chunk_idx, chunk_ops_digest, chunk)| {
790                graft(hasher, chunk_idx, chunk_ops_digest, chunk)
791            })
792            .collect(),
793    })
794}
795
796/// Build a grafted [Mem] from scratch using bitmap chunks and the ops tree.
797///
798/// For each non-pruned complete chunk (index in `pruned_chunks..complete_chunks`), reads the
799/// ops tree node at the grafting height to compute the grafted leaf (see the
800/// [grafted leaf formula](super) in the module documentation). The caller must ensure that all
801/// ops tree nodes for chunks >= `bitmap.pruned_chunks()` are still accessible in the ops tree
802/// (i.e., not pruned from the journal).
803pub(super) async fn build_grafted_tree<F: merkle::Graftable, H: Hasher, const N: usize>(
804    hasher: &StandardHasher<H>,
805    bitmap: &BitMap<N>,
806    pinned_nodes: &[H::Digest],
807    ops_tree: &impl MerkleStorage<F, Digest = H::Digest>,
808    pool: Option<&ThreadPool>,
809) -> Result<Mem<F, H::Digest>, Error<F>> {
810    let grafting_height = grafting::height::<N>();
811    let pruned_chunks = bitmap.pruned_chunks();
812    let complete_chunks = bitmap.complete_chunks();
813
814    // Compute grafted leaves for each unpruned complete chunk.
815    let leaves = compute_grafted_leaves::<F, H, N>(
816        hasher,
817        ops_tree,
818        (pruned_chunks..complete_chunks).map(|chunk_idx| (chunk_idx, *bitmap.get_chunk(chunk_idx))),
819        pool,
820    )
821    .await?;
822
823    // Build the base grafted tree: either from pruned components or empty.
824    let grafted_hasher = grafting::GraftedHasher::<F, _>::new(hasher.clone(), grafting_height);
825    let mut grafted_tree = if pruned_chunks > 0 {
826        let grafted_pruning_boundary = Location::<F>::new(pruned_chunks as u64);
827        Mem::from_components(
828            &grafted_hasher,
829            Vec::new(),
830            grafted_pruning_boundary,
831            pinned_nodes.to_vec(),
832        )
833        .map_err(|_| Error::<F>::DataCorrupted("grafted tree rebuild failed"))?
834    } else {
835        Mem::new(&grafted_hasher)
836    };
837
838    // Add each grafted leaf digest.
839    if !leaves.is_empty() {
840        let batch = {
841            let mut batch = grafted_tree.new_batch().with_pool(pool.cloned());
842            for &(_ops_pos, digest) in &leaves {
843                batch = batch.add_leaf_digest(digest);
844            }
845            batch.merkleize(&grafted_tree, &grafted_hasher)
846        };
847        grafted_tree.apply_batch(&batch)?;
848    }
849
850    Ok(grafted_tree)
851}
852
853/// Load the metadata and recover the pruning state persisted by previous runs.
854///
855/// The metadata store holds two kinds of entries (keyed by prefix):
856/// - **Pruned chunks count** ([PRUNED_CHUNKS_PREFIX]): the number of bitmap chunks that have been
857///   pruned. This tells us where the active portion of the bitmap begins.
858/// - **Pinned node digests** ([NODE_PREFIX]): grafted tree digests at peak positions whose
859///   underlying data has been pruned. These are needed to recompute the grafted tree root without
860///   the pruned chunks.
861///
862/// Returns `(metadata_handle, pruned_chunks, pinned_node_digests)`.
863pub(super) async fn init_metadata<F: merkle::Graftable, E: Context, D: Digest>(
864    context: E,
865    partition: &str,
866) -> Result<(Metadata<E, U64, Vec<u8>>, usize, Vec<D>), Error<F>> {
867    let metadata_cfg = MConfig {
868        partition: partition.into(),
869        codec_config: ((0..).into(), ()),
870    };
871    let metadata =
872        Metadata::<_, U64, Vec<u8>>::init(context.with_label("metadata"), metadata_cfg).await?;
873
874    let key = U64::new(PRUNED_CHUNKS_PREFIX, 0);
875    let pruned_chunks = match metadata.get(&key) {
876        Some(bytes) => u64::from_be_bytes(bytes.as_slice().try_into().map_err(|_| {
877            error!("pruned chunks value not a valid u64");
878            Error::<F>::DataCorrupted("pruned chunks value not a valid u64")
879        })?),
880        None => {
881            warn!("bitmap metadata does not contain pruned chunks, initializing as empty");
882            0
883        }
884    } as usize;
885
886    // Load pinned nodes if database was pruned. We use nodes_to_pin on the grafted leaf count
887    // to determine how many peaks to read. (Multiplying pruned_chunks by chunk_size is a
888    // left-shift, preserving popcount, so the peak count is the same in grafted or ops space.)
889    let pinned_nodes = if pruned_chunks > 0 {
890        let pruned_loc = Location::<F>::new(pruned_chunks as u64);
891        if !pruned_loc.is_valid() {
892            return Err(Error::DataCorrupted("pruned chunks exceeds MAX_LEAVES"));
893        }
894        let mut pinned = Vec::new();
895        for (index, _pos) in F::nodes_to_pin(pruned_loc).enumerate() {
896            let metadata_key = U64::new(NODE_PREFIX, index as u64);
897            let Some(bytes) = metadata.get(&metadata_key) else {
898                return Err(Error::DataCorrupted(
899                    "missing pinned node in grafted tree metadata",
900                ));
901            };
902            let digest = D::decode(bytes.as_ref())
903                .map_err(|_| Error::<F>::DataCorrupted("invalid pinned node digest"))?;
904            pinned.push(digest);
905        }
906        pinned
907    } else {
908        Vec::new()
909    };
910
911    Ok((metadata, pruned_chunks, pinned_nodes))
912}
913
914#[cfg(test)]
915mod tests {
916    use super::*;
917    use commonware_codec::FixedSize;
918    use commonware_cryptography::{sha256, Sha256};
919    use commonware_utils::bitmap::Prunable as PrunableBitMap;
920
921    const N: usize = sha256::Digest::SIZE;
922
923    #[test]
924    fn partial_chunk_single_bit() {
925        let mut bm = PrunableBitMap::<N>::new();
926        bm.push(true);
927        let result = partial_chunk::<PrunableBitMap<N>, N>(&bm);
928        assert!(result.is_some());
929        let (chunk, next_bit) = result.unwrap();
930        assert_eq!(next_bit, 1);
931        assert_eq!(chunk[0], 1); // bit 0 set
932    }
933
934    #[test]
935    fn partial_chunk_aligned() {
936        let mut bm = PrunableBitMap::<N>::new();
937        for _ in 0..PrunableBitMap::<N>::CHUNK_SIZE_BITS {
938            bm.push(true);
939        }
940        let result = partial_chunk::<PrunableBitMap<N>, N>(&bm);
941        assert!(result.is_none());
942    }
943
944    #[test]
945    fn partial_chunk_partial() {
946        let mut bm = PrunableBitMap::<N>::new();
947        for _ in 0..(PrunableBitMap::<N>::CHUNK_SIZE_BITS + 5) {
948            bm.push(true);
949        }
950        let result = partial_chunk::<PrunableBitMap<N>, N>(&bm);
951        assert!(result.is_some());
952        let (_chunk, next_bit) = result.unwrap();
953        assert_eq!(next_bit, 5);
954    }
955
956    #[test]
957    fn combine_roots_deterministic() {
958        let h1 = StandardHasher::<Sha256>::new();
959        let h2 = StandardHasher::<Sha256>::new();
960        let ops = Sha256::hash(b"ops");
961        let grafted = Sha256::hash(b"grafted");
962        let r1 = combine_roots(&h1, &ops, &grafted, None);
963        let r2 = combine_roots(&h2, &ops, &grafted, None);
964        assert_eq!(r1, r2);
965    }
966
967    #[test]
968    fn combine_roots_with_partial_differs() {
969        let h1 = StandardHasher::<Sha256>::new();
970        let h2 = StandardHasher::<Sha256>::new();
971        let ops = Sha256::hash(b"ops");
972        let grafted = Sha256::hash(b"grafted");
973        let partial_digest = Sha256::hash(b"partial");
974
975        let without = combine_roots(&h1, &ops, &grafted, None);
976        let with = combine_roots(&h2, &ops, &grafted, Some((5, &partial_digest)));
977        assert_ne!(without, with);
978    }
979
980    #[test]
981    fn combine_roots_different_ops_root() {
982        let h1 = StandardHasher::<Sha256>::new();
983        let h2 = StandardHasher::<Sha256>::new();
984        let ops_a = Sha256::hash(b"ops_a");
985        let ops_b = Sha256::hash(b"ops_b");
986        let grafted = Sha256::hash(b"grafted");
987
988        let r1 = combine_roots(&h1, &ops_a, &grafted, None);
989        let r2 = combine_roots(&h2, &ops_b, &grafted, None);
990        assert_ne!(r1, r2);
991    }
992}