Skip to main content

commonware_storage/qmdb/current/
db.rs

1//! A shared, generic implementation of the _Current_ QMDB.
2//!
3//! The impl blocks in this file define shared functionality across all Current QMDB variants.
4
5use crate::{
6    index::Unordered as UnorderedIndex,
7    journal::{
8        contiguous::{Contiguous, Mutable, Reader},
9        Error as JournalError,
10    },
11    merkle::{
12        self,
13        hasher::{Hasher as MerkleHasher, Standard as StandardHasher},
14        mem::Mem,
15        storage::Storage as MerkleStorage,
16        Graftable, Location, Position,
17    },
18    metadata::{Config as MConfig, Metadata},
19    qmdb::{
20        self,
21        any::{
22            self,
23            operation::{update::Update, Operation},
24        },
25        current::{
26            batch::BitmapBatch,
27            grafting,
28            proof::{OperationProof, OpsRootWitness, RangeProof, RangeProofSpec},
29        },
30        operation::Operation as _,
31        Error,
32    },
33    Context, Persistable,
34};
35use commonware_codec::{Codec, CodecShared, DecodeExt};
36use commonware_cryptography::{Digest, DigestOf, Hasher};
37use commonware_parallel::Strategy;
38use commonware_runtime::telemetry::metrics::{
39    histogram::{duration_histogram, ScopedTimer, Timed},
40    Counter, Gauge, GaugeExt as _, MetricsExt as _,
41};
42use commonware_utils::{
43    bitmap::{self, Readable as _},
44    sequence::prefixed_u64::U64,
45    sync::AsyncMutex,
46};
47use core::{num::NonZeroU64, ops::Range};
48use futures::future::try_join_all;
49use std::{collections::BTreeMap, sync::Arc};
50use tracing::{error, warn};
51
52/// Prefix used for the metadata key for grafted tree pinned nodes.
53const NODE_PREFIX: u8 = 0;
54
55/// Prefix used for the metadata key for the number of pruned bitmap chunks.
56const PRUNED_CHUNKS_PREFIX: u8 = 1;
57
58/// Metrics for the Current layer.
59pub(crate) struct Metrics<E: Context> {
60    /// Clock used for duration timers.
61    clock: Arc<E>,
62    /// Pruned bitmap chunks.
63    pruned_chunks: Gauge,
64    /// Most recent safe sync/prune boundary location.
65    sync_boundary: Gauge,
66    /// Current-layer apply-batch calls.
67    pub apply_batch_calls: Counter,
68    /// Duration of Current-layer apply-batch calls.
69    apply_batch_duration: Timed,
70    /// Current-layer sync calls.
71    pub sync_calls: Counter,
72    /// Duration of Current-layer sync calls.
73    sync_duration: Timed,
74    /// Current-layer prune calls.
75    pub prune_calls: Counter,
76    /// Duration of Current-layer prune calls.
77    prune_duration: Timed,
78}
79
80impl<E: Context> Metrics<E> {
81    /// Create and register metrics.
82    pub fn new(context: E) -> Self {
83        let pruned_chunks = context.gauge("pruned_chunks", "Number of pruned bitmap chunks");
84        let sync_boundary =
85            context.gauge("sync_boundary", "Most recent safe sync boundary location");
86        let apply_batch_calls = context.counter("apply_batch_calls", "Number of apply-batch calls");
87        let apply_batch_duration = duration_histogram(
88            &context,
89            "apply_batch_duration",
90            "Duration of apply-batch calls",
91        );
92        let sync_calls = context.counter("sync_calls", "Number of sync calls");
93        let sync_duration = duration_histogram(&context, "sync_duration", "Duration of sync calls");
94        let prune_calls = context.counter("prune_calls", "Number of prune calls");
95        let prune_duration =
96            duration_histogram(&context, "prune_duration", "Duration of prune calls");
97        let clock = Arc::new(context);
98
99        Self {
100            clock,
101            pruned_chunks,
102            sync_boundary,
103            apply_batch_calls,
104            apply_batch_duration: Timed::new(apply_batch_duration),
105            sync_calls,
106            sync_duration: Timed::new(sync_duration),
107            prune_calls,
108            prune_duration: Timed::new(prune_duration),
109        }
110    }
111
112    pub fn apply_batch_timer(&self) -> ScopedTimer<E> {
113        self.apply_batch_duration.scoped(&self.clock)
114    }
115
116    pub fn sync_timer(&self) -> ScopedTimer<E> {
117        self.sync_duration.scoped(&self.clock)
118    }
119
120    pub fn prune_timer(&self) -> ScopedTimer<E> {
121        self.prune_duration.scoped(&self.clock)
122    }
123
124    /// Update Current-specific state gauges.
125    pub fn update(&self, pruned_chunks: u64, sync_boundary: u64) {
126        let _ = self.pruned_chunks.try_set(pruned_chunks);
127        let _ = self.sync_boundary.try_set(sync_boundary);
128    }
129}
130
131/// A Current QMDB implementation generic over ordered/unordered keys and variable/fixed values.
132pub struct Db<
133    F: merkle::Graftable,
134    E: Context,
135    C: Contiguous<Item: CodecShared>,
136    I: UnorderedIndex<Value = Location<F>>,
137    H: Hasher,
138    U: Send + Sync,
139    const N: usize,
140    S: Strategy,
141> {
142    /// An authenticated database that provides the ability to prove whether a key ever had a
143    /// specific value. Owns the activity-status bitmap (`any.bitmap`) that this layer reads to
144    /// install grafted-tree updates and serve proofs.
145    pub(super) any: any::db::Db<F, E, C, I, H, U, N, S>,
146
147    /// Each leaf corresponds to a complete bitmap chunk at the grafting height.
148    /// See the [grafted leaf formula](super) in the module documentation.
149    ///
150    /// Internal nodes are hashed using their position in the ops tree rather than their
151    /// grafted position.
152    pub(super) grafted_tree: Mem<F, H::Digest>,
153
154    /// Persists:
155    /// - The number of pruned bitmap chunks at key [PRUNED_CHUNKS_PREFIX]
156    /// - The grafted tree pinned nodes at key [NODE_PREFIX]
157    pub(super) metadata: AsyncMutex<Metadata<E, U64, Vec<u8>>>,
158
159    /// Strategy used to parallelize batch operations across the ops tree, the grafted tree,
160    /// and grafted leaf computation.
161    pub(super) strategy: S,
162
163    /// The cached canonical root.
164    /// See the [Root structure](super) section in the module documentation.
165    pub(super) root: DigestOf<H>,
166
167    /// Metrics for the Current layer.
168    pub(super) metrics: Metrics<E>,
169}
170
171// Shared read-only functionality.
172impl<F, E, C, I, H, U, const N: usize, S> Db<F, E, C, I, H, U, N, S>
173where
174    F: merkle::Graftable,
175    E: Context,
176    U: Update,
177    C: Contiguous<Item = Operation<F, U>>,
178    I: UnorderedIndex<Value = Location<F>>,
179    H: Hasher,
180    S: Strategy,
181    Operation<F, U>: Codec,
182{
183    /// Return the inactivity floor location. This is the location before which all operations are
184    /// known to be inactive.
185    #[cfg(any(test, feature = "test-traits"))]
186    pub(crate) const fn inactivity_floor_loc(&self) -> Location<F> {
187        self.any.inactivity_floor_loc()
188    }
189
190    /// Whether the snapshot currently has no active keys.
191    pub const fn is_empty(&self) -> bool {
192        self.any.is_empty()
193    }
194
195    /// Get the metadata associated with the last commit.
196    pub async fn get_metadata(&self) -> Result<Option<U::Value>, Error<F>> {
197        self.any.get_metadata().await
198    }
199
200    /// Return [start, end) where `start` and `end - 1` are the Locations of the oldest and newest
201    /// retained operations respectively.
202    pub async fn bounds(&self) -> std::ops::Range<Location<F>> {
203        self.any.bounds().await
204    }
205
206    /// Return true if the given sequence of `ops` were applied starting at location `start_loc`
207    /// in the log with the provided `root`, having the activity status described by `chunks`.
208    pub fn verify_range_proof(
209        hasher: &StandardHasher<H>,
210        proof: &RangeProof<F, H::Digest>,
211        start_loc: Location<F>,
212        ops: &[Operation<F, U>],
213        chunks: &[[u8; N]],
214        root: &H::Digest,
215    ) -> bool {
216        proof.verify(hasher, start_loc, ops, chunks, root)
217    }
218}
219
220// Functionality requiring non-mutable journal.
221impl<F, E, U, C, I, H, const N: usize, S> Db<F, E, C, I, H, U, N, S>
222where
223    F: merkle::Graftable,
224    E: Context,
225    U: Update,
226    C: Contiguous<Item = Operation<F, U>>,
227    I: UnorderedIndex<Value = Location<F>>,
228    H: Hasher,
229    S: Strategy,
230    Operation<F, U>: Codec,
231{
232    /// Returns a virtual [grafting::Storage] over the grafted tree and ops tree. For positions at
233    /// or above the grafting height, returns the grafted node. For positions below the grafting
234    /// height, the ops tree is used.
235    fn grafted_storage(&self) -> impl MerkleStorage<F, Digest = H::Digest> + '_ {
236        grafting::Storage::new(
237            &self.grafted_tree,
238            grafting::height::<N>(),
239            &self.any.log.merkle,
240            qmdb::hasher::<H>(),
241        )
242    }
243
244    /// Returns the canonical root.
245    /// See the [Root structure](super) section in the module documentation.
246    pub const fn root(&self) -> H::Digest {
247        self.root
248    }
249
250    /// Return a reference to the merkleization strategy.
251    pub const fn strategy(&self) -> &S {
252        &self.strategy
253    }
254
255    /// Returns the ops tree root.
256    ///
257    /// This is the root of the raw operations log, without the activity bitmap. It is used as the
258    /// sync target because the sync engine verifies batches against the ops root, not the canonical
259    /// root.
260    ///
261    /// External consumers that receive a trusted canonical `current` root should use
262    /// [`Self::ops_root_witness`] to authenticate this ops root against it.
263    ///
264    /// See the [Root structure](super) section in the module documentation.
265    pub const fn ops_root(&self) -> H::Digest {
266        self.any.root()
267    }
268
269    /// Returns a witness that this database's canonical root commits to its ops root.
270    ///
271    /// This can be used to authenticate an ops root against a trusted canonical `current` root.
272    pub async fn ops_root_witness(
273        &self,
274        hasher: &StandardHasher<H>,
275    ) -> Result<OpsRootWitness<F, H::Digest>, Error<F>> {
276        let storage = self.grafted_storage();
277        let ops_size = storage.size().await;
278        let ops_leaves = Location::<F>::try_from(ops_size)?;
279        let grafted_root = compute_grafted_root::<F, H, _, _, N>(
280            hasher,
281            self.any.bitmap.as_ref(),
282            &storage,
283            ops_leaves,
284            self.any.inactivity_floor_loc,
285        )
286        .await?;
287        let partial_chunk = partial_chunk::<_, N>(self.any.bitmap.as_ref())
288            .map(|(chunk, next_bit)| (next_bit, hasher.digest(&chunk)));
289        let pending_chunk_digest: F::PendingChunk<H::Digest> = pending_chunk::<F, _, N>(
290            self.any.bitmap.as_ref(),
291            ops_leaves,
292            grafting::height::<N>(),
293        )?
294        .map(|chunk| hasher.digest(&chunk))
295        .try_into()
296        .expect("pending_chunk must be consistent with family");
297        Ok(OpsRootWitness {
298            grafted_root,
299            pending_chunk_digest,
300            partial_chunk,
301        })
302    }
303
304    /// Snapshot of the grafted tree for use in batch chains.
305    pub(super) fn grafted_snapshot(&self) -> Arc<merkle::batch::MerkleizedBatch<F, H::Digest, S>> {
306        merkle::batch::MerkleizedBatch::from_mem_with_strategy(
307            &self.grafted_tree,
308            self.strategy.clone(),
309        )
310    }
311
312    /// Create a new speculative batch of operations with this database as its parent.
313    pub fn new_batch(&self) -> super::batch::UnmerkleizedBatch<F, H, U, N, S> {
314        super::batch::UnmerkleizedBatch::new(
315            self.any.new_batch(),
316            self.grafted_snapshot(),
317            BitmapBatch::Base(Arc::clone(&self.any.bitmap)),
318        )
319    }
320
321    /// Returns a proof for the operation at `loc`.
322    pub(super) async fn operation_proof(
323        &self,
324        hasher: &StandardHasher<H>,
325        loc: Location<F>,
326    ) -> Result<OperationProof<F, H::Digest, N>, Error<F>> {
327        let storage = self.grafted_storage();
328        let ops_root = self.any.root();
329        OperationProof::new(
330            hasher,
331            self.any.bitmap.as_ref(),
332            &storage,
333            self.any.inactivity_floor_loc,
334            loc,
335            ops_root,
336        )
337        .await
338    }
339
340    /// Returns a proof that the specified range of operations are part of the database, along with
341    /// the operations from the range. A truncated range (from hitting the max) can be detected by
342    /// looking at the length of the returned operations vector. Also returns the bitmap chunks
343    /// required to verify the proof.
344    ///
345    /// # Errors
346    ///
347    /// Returns [Error::OperationPruned] if `start_loc` falls in a pruned bitmap chunk. Returns
348    /// [`crate::merkle::Error::LocationOverflow`] if `start_loc` >
349    /// [`crate::merkle::Family::MAX_LEAVES`]. Returns [`crate::merkle::Error::RangeOutOfBounds`] if
350    /// `start_loc` >= number of leaves in the tree.
351    pub async fn range_proof(
352        &self,
353        hasher: &StandardHasher<H>,
354        start_loc: Location<F>,
355        max_ops: NonZeroU64,
356    ) -> Result<(RangeProof<F, H::Digest>, Vec<Operation<F, U>>, Vec<[u8; N]>), Error<F>> {
357        let storage = self.grafted_storage();
358        let ops_root = self.any.root();
359        RangeProof::new_with_ops(
360            hasher,
361            self.any.bitmap.as_ref(),
362            &storage,
363            &self.any.log,
364            RangeProofSpec {
365                start_loc,
366                max_ops,
367                inactivity_floor: self.any.inactivity_floor_loc,
368                ops_root,
369            },
370        )
371        .await
372    }
373}
374
375// Functionality requiring mutable journal.
376impl<F, E, U, C, I, H, const N: usize, S> Db<F, E, C, I, H, U, N, S>
377where
378    F: merkle::Graftable,
379    E: Context,
380    U: Update,
381    C: Mutable<Item = Operation<F, U>>,
382    I: UnorderedIndex<Value = Location<F>>,
383    H: Hasher,
384    S: Strategy,
385    Operation<F, U>: Codec,
386{
387    /// Returns an ops-level historical proof for the specified range.
388    ///
389    /// Unlike [`range_proof`](Self::range_proof) which returns grafted proofs incorporating the
390    /// activity bitmap, this returns ops-tree Merkle proofs suitable for state sync. Direct
391    /// verifiers should use [`crate::qmdb::hasher`].
392    pub async fn ops_historical_proof(
393        &self,
394        historical_size: Location<F>,
395        start_loc: Location<F>,
396        max_ops: NonZeroU64,
397    ) -> Result<(merkle::Proof<F, H::Digest>, Vec<Operation<F, U>>), Error<F>> {
398        self.any
399            .historical_proof(historical_size, start_loc, max_ops)
400            .await
401    }
402
403    /// Return the pinned nodes for a lower operation boundary of `loc`.
404    pub async fn pinned_nodes_at(&self, loc: Location<F>) -> Result<Vec<H::Digest>, Error<F>> {
405        self.any.pinned_nodes_at(loc).await
406    }
407
408    /// Returns the most recent location from which this database can safely be synced, and the
409    /// upper bound on [`Self::prune`]'s `prune_loc`.
410    ///
411    /// Callers constructing a sync [`Target`](crate::qmdb::sync::Target) may use this value, or
412    /// any earlier retained location, as `range.start`. Values *above* this boundary are unsafe:
413    /// the receiver's grafted-pin derivation requires absorption-settled state for every fully
414    /// pruned chunk, which this value guarantees.
415    ///
416    /// # Computation
417    ///
418    /// Starts from the inactivity floor (the most chunks we could possibly prune) and walks
419    /// backward until two conditions hold for the youngest chunk that would be pruned:
420    ///
421    /// 1. **Settled**: the chunk's ops subtree root at height `gh` has been born in the ops
422    ///    tree (its `peak_birth_size <= ops_leaves`).
423    ///
424    /// 2. **Absorbed**: the chunk-pair parent at height `gh+1` has been born. This guarantees
425    ///    that the ops tree has no individual height-`gh` peaks for pruned chunks, so
426    ///    `compute_grafted_root` never queries a discarded grafted leaf.
427    ///
428    /// Because older chunk-pairs have strictly earlier birth times, checking only the youngest
429    /// pair is sufficient: if the youngest pair's parent is born, all older pairs' parents are
430    /// too. In the worst case the loop decrements twice (once past the unsettled chunk, once
431    /// to land on the older pair boundary).
432    ///
433    /// For families without delayed merges (e.g. MMR), `peak_birth_size` at height `gh` equals
434    /// the chunk's last leaf, so condition (1) always holds and the function returns the
435    /// inactivity floor rounded down to the nearest chunk boundary.
436    pub fn sync_boundary(&self) -> Location<F> {
437        sync_boundary::<F, N>(
438            *self.any.inactivity_floor_loc / bitmap::Prunable::<N>::CHUNK_SIZE_BITS,
439            *self.any.last_commit_loc + 1,
440        )
441    }
442
443    /// Update Current-specific state gauges.
444    pub(super) fn update_metrics(&self) {
445        self.metrics.update(
446            self.any.bitmap.pruned_chunks() as u64,
447            *self.sync_boundary(),
448        );
449    }
450
451    /// Returns the minimum rewind target that keeps delayed-merge grafting queries valid
452    /// for the current bitmap pruning boundary.
453    ///
454    /// This is the same absorption threshold used by [`Self::sync_boundary`]: the
455    /// `peak_birth_size` of the youngest pruned chunk-pair's height-(gh+1) parent.
456    /// Rewinding below this size would put the ops tree in a state where the parent has not
457    /// been born, re-exposing individual height-`gh` ops peaks for pruned chunks whose
458    /// grafted leaves are no longer available.
459    ///
460    /// Returns `None` for families without delayed merges.
461    fn delayed_merge_rewind_floor(&self) -> Option<u64> {
462        pair_absorption_threshold::<F, N>(self.any.bitmap.pruned_chunks() as u64)
463    }
464
465    /// Prune the grafted tree to match the committed bitmap's pruned chunks.
466    fn prune_grafted_tree_to_bitmap(&mut self) -> Result<(), Error<F>> {
467        let pruned_chunks = self.any.bitmap.pruned_chunks() as u64;
468        if pruned_chunks == 0 {
469            return Ok(());
470        }
471
472        let prune_loc = Location::<F>::new(pruned_chunks);
473        if prune_loc <= self.grafted_tree.bounds().start {
474            return Ok(());
475        }
476
477        let prune_pos = Position::try_from(prune_loc)
478            .map_err(|_| Error::<F>::DataCorrupted("prune location overflow"))?;
479        let size = self.grafted_tree.size();
480
481        let mut pinned = BTreeMap::new();
482        for pos in F::nodes_to_pin(prune_loc) {
483            let digest = self
484                .grafted_tree
485                .get_node(pos)
486                .ok_or(Error::<F>::DataCorrupted("missing grafted pinned node"))?;
487            pinned.insert(pos, digest);
488        }
489
490        let mut retained = Vec::with_capacity((*size - *prune_pos) as usize);
491        for p in *prune_pos..*size {
492            let digest = self
493                .grafted_tree
494                .get_node(Position::new(p))
495                .ok_or(Error::<F>::DataCorrupted("missing retained grafted node"))?;
496            retained.push(digest);
497        }
498
499        self.grafted_tree = Mem::from_pruned_with_retained(prune_pos, pinned, retained);
500        Ok(())
501    }
502
503    /// Prunes historical operations prior to `prune_loc`. This does not affect the db's root or
504    /// snapshot.
505    ///
506    /// `prune_loc` must be at most [`Self::sync_boundary`]: the ops log's lower bound must not
507    /// advance past the point where the grafting overlay has been pruned. The bitmap and grafted
508    /// tree advance to the sync boundary regardless of `prune_loc`.
509    ///
510    /// # Errors
511    ///
512    /// - Returns [Error::PruneBeyondMinRequired] if `prune_loc` > [`Self::sync_boundary`].
513    /// - Returns [`crate::merkle::Error::LocationOverflow`] if `prune_loc` >
514    ///   [crate::merkle::Family::MAX_LEAVES].
515    /// - Returns [Error::DataCorrupted] if internal grafted-tree state is inconsistent (a pinned
516    ///   or retained node is missing, or the prune location overflows a [Position]).
517    pub async fn prune(&mut self, prune_loc: Location<F>) -> Result<(), Error<F>> {
518        let _timer = self.metrics.prune_timer();
519        self.metrics.prune_calls.inc();
520        let sync_boundary = self.sync_boundary();
521        if prune_loc > sync_boundary {
522            return Err(Error::PruneBeyondMinRequired(prune_loc, sync_boundary));
523        }
524
525        // Prune the bitmap to the sync boundary (most aggressive safe location).
526        self.any.prune_bitmap(sync_boundary);
527        self.prune_grafted_tree_to_bitmap()?;
528
529        // Persist grafted tree pruning state before pruning the ops log. If the subsequent
530        // `any.prune_log` fails, the metadata is ahead of the log, which is safe: on recovery,
531        // `build_grafted_tree` will recompute from the (un-pruned) log and the metadata
532        // simply records peaks that haven't been pruned yet. The reverse order would be unsafe:
533        // a pruned log with stale metadata would lose peak digests permanently.
534        self.sync_metadata().await?;
535
536        self.any.prune_log(prune_loc).await?;
537        self.any.update_metrics().await;
538        self.update_metrics();
539        Ok(())
540    }
541
542    /// Rewind the database to `size` operations, where `size` is the location of the next append.
543    ///
544    /// This rewinds the underlying Any database and rebuilds the Current overlay state (bitmap,
545    /// grafted tree, and canonical root) for the rewound size.
546    ///
547    /// # Errors
548    ///
549    /// Returns an error when:
550    /// - `size` is not a valid rewind target
551    /// - the target's required logical range is not fully retained (for Current, this includes the
552    ///   underlying Any inactivity-floor boundary and bitmap pruning boundary)
553    /// - `size - 1` is not a commit operation
554    /// - `size` is below the bitmap pruning boundary
555    ///
556    /// Any error from this method is fatal for this handle. Rewind may mutate state in the
557    /// underlying Any database before this Current overlay finishes rebuilding. Callers must drop
558    /// this database handle after any `Err` from `rewind` and reopen from storage.
559    ///
560    /// A successful rewind is not restart-stable until a subsequent [`Db::commit`] or
561    /// [`Db::sync`].
562    pub async fn rewind(&mut self, size: Location<F>) -> Result<(), Error<F>> {
563        let rewind_size = *size;
564        let current_size = *self.any.last_commit_loc + 1;
565        // No-op short-circuit. Avoids the post-rewind grafted-tree rebuild and the validation
566        // and journal-read overhead below. Validation runs after this on the non-no-op path.
567        if rewind_size == current_size {
568            return Ok(());
569        }
570        // Reject zero / out-of-range up front: lines below compute `rewind_size - 1`, which
571        // underflows when `rewind_size == 0`. `any::Db::rewind` would catch these, but it isn't
572        // called until after those subtractions.
573        if rewind_size == 0 || rewind_size > current_size {
574            return Err(Error::Journal(JournalError::InvalidRewind(rewind_size)));
575        }
576
577        let pruned_chunks = self.any.bitmap.pruned_chunks();
578        let pruned_bits = (pruned_chunks as u64)
579            .checked_mul(bitmap::Prunable::<N>::CHUNK_SIZE_BITS)
580            .ok_or_else(|| Error::DataCorrupted("pruned ops leaves overflow"))?;
581        if rewind_size < pruned_bits {
582            return Err(Error::Journal(JournalError::ItemPruned(rewind_size - 1)));
583        }
584        if let Some(rewind_floor) = self.delayed_merge_rewind_floor() {
585            if rewind_size < rewind_floor {
586                return Err(Error::Journal(JournalError::ItemPruned(rewind_size - 1)));
587            }
588        }
589
590        // Ensure the target commit's logical range is fully representable with the current
591        // bitmap pruning boundary. Even if the ops log still retains older entries, rewinding
592        // to a commit with floor below `pruned_bits` would require bitmap chunks we've already
593        // discarded.
594        {
595            let reader = self.any.log.reader().await;
596            let rewind_last_loc = Location::<F>::new(rewind_size - 1);
597            let rewind_last_op = reader.read(*rewind_last_loc).await?;
598            let Some(rewind_floor) = rewind_last_op.has_floor() else {
599                return Err(Error::<F>::UnexpectedData(rewind_last_loc));
600            };
601            if *rewind_floor < pruned_bits {
602                return Err(Error::<F>::Journal(JournalError::ItemPruned(*rewind_floor)));
603            }
604        }
605
606        // Extract pinned nodes for the existing pruning boundary from the in-memory grafted tree.
607        let pinned_nodes = if pruned_chunks > 0 {
608            let grafted_leaves = Location::<F>::new(pruned_chunks as u64);
609            let mut pinned_nodes = Vec::new();
610            for pos in F::nodes_to_pin(grafted_leaves) {
611                let digest = self
612                    .grafted_tree
613                    .get_node(pos)
614                    .ok_or(Error::<F>::DataCorrupted("missing grafted pinned node"))?;
615                pinned_nodes.push(digest);
616            }
617            pinned_nodes
618        } else {
619            Vec::new()
620        };
621
622        // `any.rewind` rewinds the log and patches the shared bitmap (truncate + restore active
623        // bits + set the rewound tail's CommitFloor). Live pre-rewind batches must be dropped by
624        // the caller; reads through them now return inconsistent data.
625        self.any.rewind(size).await?;
626
627        let hasher = qmdb::hasher::<H>();
628        let ops_size = self.any.log.merkle.size();
629        let ops_leaves = Location::<F>::try_from(ops_size)?;
630        let grafted_tree = build_grafted_tree::<F, H, S, N>(
631            &hasher,
632            self.any.bitmap.as_ref(),
633            &pinned_nodes,
634            &self.any.log.merkle,
635            ops_leaves,
636            &self.strategy,
637        )
638        .await?;
639        let storage = grafting::Storage::new(
640            &grafted_tree,
641            grafting::height::<N>(),
642            &self.any.log.merkle,
643            hasher.clone(),
644        );
645        let partial_chunk = partial_chunk(self.any.bitmap.as_ref());
646        let ops_root = self.any.root();
647        let root = compute_db_root(
648            &hasher,
649            self.any.bitmap.as_ref(),
650            &storage,
651            ops_leaves,
652            partial_chunk,
653            self.any.inactivity_floor_loc,
654            &ops_root,
655        )
656        .await?;
657
658        self.grafted_tree = grafted_tree;
659        self.root = root;
660        self.update_metrics();
661
662        Ok(())
663    }
664
665    /// Sync the metadata to disk.
666    pub(crate) async fn sync_metadata(&self) -> Result<(), Error<F>> {
667        let mut metadata = self.metadata.lock().await;
668        metadata.clear();
669
670        // Snapshot the pruning boundary under the read lock; the guard drops before any await.
671        let pruned_chunks_u64 = self.any.bitmap.pruned_chunks() as u64;
672
673        // Write the number of pruned chunks.
674        let key = U64::new(PRUNED_CHUNKS_PREFIX, 0);
675        metadata.put(key, pruned_chunks_u64.to_be_bytes().to_vec());
676
677        // Write the pinned nodes of the grafted tree.
678        let pruned_chunks = Location::<F>::new(pruned_chunks_u64);
679        for (i, grafted_pos) in F::nodes_to_pin(pruned_chunks).enumerate() {
680            let digest = self
681                .grafted_tree
682                .get_node(grafted_pos)
683                .ok_or(Error::<F>::DataCorrupted("missing grafted pinned node"))?;
684            let key = U64::new(NODE_PREFIX, i as u64);
685            metadata.put(key, digest.to_vec());
686        }
687
688        metadata.sync().await?;
689
690        Ok(())
691    }
692}
693
694/// Compute the safe sync boundary from a pruning boundary and the current ops-tree size.
695///
696/// Shared by the live DB and speculative batch wrappers so they both derive the same range start.
697pub(crate) fn sync_boundary<F: Graftable, const N: usize>(
698    mut pruned_chunks: u64,
699    ops_leaves: u64,
700) -> Location<F> {
701    let chunk_bits = bitmap::Prunable::<N>::CHUNK_SIZE_BITS;
702    let grafting_height = grafting::height::<N>();
703
704    while pruned_chunks > 0 {
705        let required_ops = pair_absorption_threshold::<F, N>(pruned_chunks).unwrap_or_else(|| {
706            let youngest_start = (pruned_chunks - 1) * chunk_bits;
707            let pos = F::subtree_root_position(Location::<F>::new(youngest_start), grafting_height);
708            F::peak_birth_size(pos, grafting_height)
709        });
710
711        if ops_leaves >= required_ops {
712            break;
713        }
714        pruned_chunks -= 1;
715    }
716
717    Location::new(pruned_chunks * chunk_bits)
718}
719
720/// For the youngest of `pruned_chunks` chunks, return the `peak_birth_size` of its
721/// chunk-pair parent at height `gh+1`. Returns `None` for families without delayed merges
722/// (where `peak_birth_size` at height `gh` equals the chunk boundary).
723fn pair_absorption_threshold<F: Graftable, const N: usize>(pruned_chunks: u64) -> Option<u64> {
724    if pruned_chunks == 0 {
725        return None;
726    }
727
728    let grafting_height = grafting::height::<N>();
729    let youngest = pruned_chunks - 1;
730    let youngest_start = youngest << grafting_height;
731    let youngest_end = (youngest + 1) << grafting_height;
732    let youngest_pos =
733        F::subtree_root_position(Location::<F>::new(youngest_start), grafting_height);
734
735    if F::peak_birth_size(youngest_pos, grafting_height) <= youngest_end {
736        return None;
737    }
738
739    let pair_chunk = youngest & !1;
740    let pair_start = pair_chunk << grafting_height;
741    let pair_pos = F::subtree_root_position(Location::<F>::new(pair_start), grafting_height + 1);
742    Some(F::peak_birth_size(pair_pos, grafting_height + 1))
743}
744
745// Functionality requiring mutable + persistable journal.
746impl<F, E, U, C, I, H, const N: usize, S> Db<F, E, C, I, H, U, N, S>
747where
748    F: merkle::Graftable,
749    E: Context,
750    U: Update,
751    C: Mutable<Item = Operation<F, U>> + Persistable<Error = JournalError>,
752    I: UnorderedIndex<Value = Location<F>>,
753    H: Hasher,
754    S: Strategy,
755    Operation<F, U>: Codec,
756{
757    /// Durably commit the journal state published by prior [`Db::apply_batch`]
758    /// calls.
759    pub async fn commit(&self) -> Result<(), Error<F>> {
760        self.any.commit().await
761    }
762
763    /// Sync all database state to disk.
764    pub async fn sync(&self) -> Result<(), Error<F>> {
765        let _timer = self.metrics.sync_timer();
766        self.metrics.sync_calls.inc();
767        self.any.sync().await?;
768
769        // Write the bitmap pruning boundary to disk so that next startup doesn't have to
770        // re-Merkleize the inactive portion up to the inactivity floor.
771        self.sync_metadata().await?;
772        self.update_metrics();
773        Ok(())
774    }
775
776    /// Destroy the db, removing all data from disk.
777    pub async fn destroy(self) -> Result<(), Error<F>> {
778        self.metadata.into_inner().destroy().await?;
779        self.any.destroy().await
780    }
781}
782
783impl<F, E, U, C, I, H, const N: usize, S> Db<F, E, C, I, H, U, N, S>
784where
785    F: merkle::Graftable,
786    E: Context,
787    U: Update + 'static,
788    C: Mutable<Item = Operation<F, U>> + Persistable<Error = JournalError>,
789    I: UnorderedIndex<Value = Location<F>>,
790    H: Hasher,
791    S: Strategy,
792    Operation<F, U>: Codec,
793{
794    /// Apply a batch to the database, returning the range of written operations.
795    ///
796    /// A batch is valid only if every batch applied to the database since this batch's
797    /// ancestor chain was created is an ancestor of this batch. Applying a batch from a
798    /// different fork returns [`Error::StaleBatch`].
799    ///
800    /// This publishes the batch to the in-memory Current view and appends it to the journal,
801    /// but does not durably persist it. Call [`Db::commit`] or [`Db::sync`] to guarantee
802    /// durability.
803    pub async fn apply_batch(
804        &mut self,
805        batch: Arc<super::batch::MerkleizedBatch<F, H::Digest, U, N, S>>,
806    ) -> Result<Range<Location<F>>, Error<F>> {
807        let _timer = self.metrics.apply_batch_timer();
808        self.metrics.apply_batch_calls.inc();
809        let range = self.any.apply_batch(Arc::clone(&batch.inner)).await?;
810        self.grafted_tree.apply_batch(&batch.grafted)?;
811        self.root = batch.canonical_root;
812        self.update_metrics();
813        Ok(range)
814    }
815}
816
817impl<F, E, U, C, I, H, const N: usize, S> Persistable for Db<F, E, C, I, H, U, N, S>
818where
819    F: merkle::Graftable,
820    E: Context,
821    U: Update,
822    C: Mutable<Item = Operation<F, U>> + Persistable<Error = JournalError>,
823    I: UnorderedIndex<Value = Location<F>>,
824    H: Hasher,
825    S: Strategy,
826    Operation<F, U>: Codec,
827{
828    type Error = Error<F>;
829
830    async fn commit(&self) -> Result<(), Error<F>> {
831        Self::commit(self).await
832    }
833
834    async fn sync(&self) -> Result<(), Error<F>> {
835        Self::sync(self).await
836    }
837
838    async fn destroy(self) -> Result<(), Error<F>> {
839        self.destroy().await
840    }
841}
842
843/// Returns `Some((last_chunk, next_bit))` if the bitmap has an incomplete trailing chunk, or
844/// `None` if all bits fall on complete chunk boundaries.
845pub(super) fn partial_chunk<B: bitmap::Readable<N>, const N: usize>(
846    bitmap: &B,
847) -> Option<([u8; N], u64)> {
848    let (last_chunk, next_bit) = bitmap.last_chunk();
849    if next_bit == bitmap::Prunable::<N>::CHUNK_SIZE_BITS {
850        None
851    } else {
852        Some((last_chunk, next_bit))
853    }
854}
855
856/// Return complete and graftable chunk counts, enforcing the pending and pruning invariants.
857///
858/// Returns [`Error::DataCorrupted`] if `bitmap` and `ops_leaves` imply more than one
859/// pending chunk, or if pruning has advanced past the graftable chunk boundary.
860fn graftable_chunk_window<F: merkle::Graftable, B: bitmap::Readable<N>, const N: usize>(
861    bitmap: &B,
862    ops_leaves: Location<F>,
863    grafting_height: u32,
864) -> Result<(u64, u64), Error<F>> {
865    let complete = bitmap.complete_chunks() as u64;
866    let graftable = grafting::graftable_chunks::<F>(*ops_leaves, grafting_height).min(complete);
867    let pending = complete - graftable;
868    if pending > 1 {
869        return Err(Error::DataCorrupted("multiple pending bitmap chunks"));
870    }
871
872    let pruned = bitmap.pruned_chunks() as u64;
873    if pruned > graftable {
874        return Err(Error::DataCorrupted(
875            "pruned chunks exceed graftable chunks",
876        ));
877    }
878
879    Ok((complete, graftable))
880}
881
882/// Returns the bytes of the "pending" chunk if the bitmap currently has one, else `None`.
883///
884/// A chunk is pending when its bits are fully written to the bitmap but its h=G ancestor
885/// has not yet been born in the ops tree. At most one chunk is ever in this state (the most
886/// recently completed one); see [`super::grafting::graftable_chunks`] for the structural
887/// argument.
888///
889/// The caller must pass a consistent snapshot of `ops_leaves` (the ops tree's leaf count)
890/// and the bitmap state. Both inputs are used to derive `graftable_chunks`; deriving them from
891/// independent snapshots can violate the pending-window or pruning invariants.
892///
893/// Returns [`Error::DataCorrupted`] when those invariants are violated.
894pub(super) fn pending_chunk<F: merkle::Graftable, B: bitmap::Readable<N>, const N: usize>(
895    bitmap: &B,
896    ops_leaves: Location<F>,
897    grafting_height: u32,
898) -> Result<Option<[u8; N]>, Error<F>> {
899    let (complete, graftable) =
900        graftable_chunk_window::<F, B, N>(bitmap, ops_leaves, grafting_height)?;
901    if complete - graftable != 1 {
902        return Ok(None);
903    }
904    Ok(Some(bitmap.get_chunk(graftable as usize)))
905}
906
907/// Compute the canonical root from the ops root, grafted tree root, and optional pending /
908/// partial chunk digests.
909///
910/// See [Canonical root structure](super::proof#canonical-root-structure) for the full layout.
911/// The pending and partial inputs are independent: either, both, or neither may be set, and
912/// pending precedes partial in hash order when both are present.
913///
914/// # Collision resistance
915///
916/// `pending` contributes `D` bytes when present; `partial` contributes `D + 8` bytes (`D` =
917/// digest size). Different fixed lengths, so the two cannot produce the same input bytes,
918/// even when their digests are identical. Collisions reduce to H.
919pub(super) fn combine_roots<H: Hasher>(
920    hasher: &StandardHasher<H>,
921    ops_root: &H::Digest,
922    grafted_root: &H::Digest,
923    pending: Option<&H::Digest>,
924    partial: Option<(u64, &H::Digest)>,
925) -> H::Digest {
926    match (pending, partial) {
927        (None, None) => hasher.hash([ops_root.as_ref(), grafted_root.as_ref()]),
928        (Some(pe), None) => hasher.hash([ops_root.as_ref(), grafted_root.as_ref(), pe.as_ref()]),
929        (None, Some((nb, p))) => {
930            let nb_bytes = nb.to_be_bytes();
931            hasher.hash([
932                ops_root.as_ref(),
933                grafted_root.as_ref(),
934                nb_bytes.as_slice(),
935                p.as_ref(),
936            ])
937        }
938        (Some(pe), Some((nb, p))) => {
939            let nb_bytes = nb.to_be_bytes();
940            hasher.hash([
941                ops_root.as_ref(),
942                grafted_root.as_ref(),
943                pe.as_ref(),
944                nb_bytes.as_slice(),
945                p.as_ref(),
946            ])
947        }
948    }
949}
950
951/// Compute the canonical root digest of a [Db].
952///
953/// See the [Root structure](super) section in the module documentation.
954///
955/// `ops_leaves` must be a single consistent snapshot of the ops tree's leaf count, taken
956/// in the same logical instant as the bitmap state passed via `status`. Both the pending
957/// chunk derivation and `compute_grafted_root` use this value to compute `graftable_chunks`;
958/// deriving them from independent snapshots risks the inconsistent state where a chunk is
959/// counted in one path but not the other.
960#[allow(clippy::too_many_arguments)]
961pub(super) async fn compute_db_root<
962    F: merkle::Graftable,
963    H: Hasher,
964    B: bitmap::Readable<N>,
965    S: MerkleStorage<F, Digest = H::Digest>,
966    const N: usize,
967>(
968    hasher: &StandardHasher<H>,
969    status: &B,
970    storage: &S,
971    ops_leaves: Location<F>,
972    partial_chunk: Option<([u8; N], u64)>,
973    inactivity_floor: Location<F>,
974    ops_root: &H::Digest,
975) -> Result<H::Digest, Error<F>> {
976    let grafted_root =
977        compute_grafted_root(hasher, status, storage, ops_leaves, inactivity_floor).await?;
978    let pending = pending_chunk::<F, B, N>(status, ops_leaves, grafting::height::<N>())?
979        .map(|chunk| hasher.digest(&chunk));
980    let partial = partial_chunk.map(|(chunk, next_bit)| {
981        let digest = hasher.digest(&chunk);
982        (next_bit, digest)
983    });
984    Ok(combine_roots(
985        hasher,
986        ops_root,
987        &grafted_root,
988        pending.as_ref(),
989        partial.as_ref().map(|(nb, d)| (*nb, d)),
990    ))
991}
992
993/// Compute the root of the grafted structure represented by `storage`.
994///
995/// Only **graftable** chunks (those whose h=G ancestor has been born in the ops tree) are
996/// committed by the grafted tree. The most recently completed but ungraftable chunk, if
997/// any, is hashed into the canonical root directly by [`combine_roots`] as the pending
998/// chunk, not by this function.
999///
1000/// `ops_leaves` must come from the same single snapshot as `status` to preserve the
1001/// `pruned_chunks <= graftable_chunks <= complete_chunks` invariant.
1002pub(super) async fn compute_grafted_root<
1003    F: merkle::Graftable,
1004    H: Hasher,
1005    B: bitmap::Readable<N>,
1006    S: MerkleStorage<F, Digest = H::Digest>,
1007    const N: usize,
1008>(
1009    hasher: &StandardHasher<H>,
1010    status: &B,
1011    storage: &S,
1012    ops_leaves: Location<F>,
1013    inactivity_floor: Location<F>,
1014) -> Result<H::Digest, Error<F>> {
1015    let size = storage.size().await;
1016    let leaves = Location::try_from(size)?;
1017
1018    // Collect peak digests of the grafted structure.
1019    let mut peaks: Vec<H::Digest> = Vec::new();
1020    for (peak_pos, _) in F::peaks(size) {
1021        let digest = storage
1022            .get_node(peak_pos)
1023            .await?
1024            .ok_or_else(|| merkle::Error::<F>::MissingNode(peak_pos))?;
1025        peaks.push(digest);
1026    }
1027
1028    // Validate bitmap invariants (pending <= 1, pruned <= graftable).
1029    let grafting_height = grafting::height::<N>();
1030    let (_complete_chunks, _graftable_chunks) =
1031        graftable_chunk_window::<F, B, N>(status, ops_leaves, grafting_height)?;
1032
1033    let inactive_peaks =
1034        grafting::chunk_aligned_inactive_peaks::<F>(leaves, inactivity_floor, grafting_height)?;
1035
1036    // Every peak the storage layer surfaces is either a grafted-tree node (graftable chunks already
1037    // incorporate `hash(chunk || h_G_node)`), an ops node above G (hashed normally), or an ops node
1038    // below G (raw, because its chunk is pending and its digest is hashed directly into the
1039    // canonical root rather than through the tree). Bagging is a straight fold; no per-chunk
1040    // transformation is needed.
1041    Ok(hasher.root(leaves, inactive_peaks, peaks.iter())?)
1042}
1043
1044/// Compute grafted leaf digests for the given bitmap chunks as `(chunk_idx, digest)` pairs.
1045///
1046/// Callers must pass only **graftable** chunks (those whose h=G ancestor has already been born in
1047/// the ops tree). Each graftable chunk has exactly one covering ops node at height G, looked up via
1048/// [`merkle::Graftable::subtree_root_position`]. The grafted leaf digest is `hash(chunk ||
1049/// ops_h_G_node)`; for all-zero chunks the grafted leaf equals the ops digest directly (zero-chunk
1050/// identity).
1051///
1052/// The provided strategy determines if or how to parallelize merkleization.
1053pub(super) async fn compute_grafted_leaves<
1054    F: merkle::Graftable,
1055    H: Hasher,
1056    S: Strategy,
1057    const N: usize,
1058>(
1059    hasher: &StandardHasher<H>,
1060    ops_tree: &impl MerkleStorage<F, Digest = H::Digest>,
1061    chunks: impl IntoIterator<Item = (usize, [u8; N])>,
1062    strategy: &S,
1063) -> Result<Vec<(usize, H::Digest)>, Error<F>> {
1064    let grafting_height = grafting::height::<N>();
1065
1066    // Each graftable chunk has a single h=G ancestor at the deterministic
1067    // `subtree_root_position(chunk_idx << G, G)`. Look it up directly.
1068    let inputs = try_join_all(chunks.into_iter().map(|(chunk_idx, chunk)| async move {
1069        let leaf_start = Location::<F>::new((chunk_idx as u64) << grafting_height);
1070        let pos = F::subtree_root_position(leaf_start, grafting_height);
1071        let chunk_ops_digest = ops_tree
1072            .get_node(pos)
1073            .await?
1074            .ok_or(merkle::Error::<F>::MissingGraftedLeaf(pos))?;
1075        Ok::<_, Error<F>>((chunk_idx, chunk_ops_digest, chunk))
1076    }))
1077    .await?;
1078
1079    // Compute the grafted leaf digest for each chunk. For all-zero chunks, the
1080    // grafted leaf equals the chunk_ops_digest directly (zero-chunk identity).
1081    let zero_chunk = [0u8; N];
1082    Ok(strategy.map_init_collect_vec(
1083        inputs,
1084        || hasher.clone(),
1085        |h, (chunk_idx, chunk_ops_digest, chunk)| {
1086            if chunk == zero_chunk {
1087                (chunk_idx, chunk_ops_digest)
1088            } else {
1089                (
1090                    chunk_idx,
1091                    h.hash([chunk.as_slice(), chunk_ops_digest.as_ref()]),
1092                )
1093            }
1094        },
1095    ))
1096}
1097
1098/// Build a grafted [Mem] from scratch using bitmap chunks and the ops tree.
1099///
1100/// For each non-pruned **graftable** chunk (index in `pruned_chunks..graftable_chunks`), reads the
1101/// ops tree node at the grafting height to compute the grafted leaf (see the
1102/// [grafted leaf formula](super) in the module documentation).
1103///
1104/// The most recently completed chunk may not yet be graftable (its h=G ancestor not yet born);
1105/// that chunk is **excluded** from the grafted tree and its digest is hashed directly into
1106/// the canonical root as the pending chunk. The caller must ensure that all ops tree nodes
1107/// for chunks `>= pruned_chunks` are still accessible in the ops tree (i.e., not pruned from
1108/// the journal).
1109///
1110/// `ops_leaves` must be a single consistent snapshot of `ops_tree.size()` taken in the same
1111/// instant as the bitmap state.
1112pub(super) async fn build_grafted_tree<
1113    F: merkle::Graftable,
1114    H: Hasher,
1115    S: Strategy,
1116    const N: usize,
1117>(
1118    hasher: &StandardHasher<H>,
1119    bitmap: &impl bitmap::Readable<N>,
1120    pinned_nodes: &[H::Digest],
1121    ops_tree: &impl MerkleStorage<F, Digest = H::Digest>,
1122    ops_leaves: Location<F>,
1123    strategy: &S,
1124) -> Result<Mem<F, H::Digest>, Error<F>> {
1125    let grafting_height = grafting::height::<N>();
1126    let pruned_chunks = bitmap.pruned_chunks();
1127    let complete_chunks = bitmap.complete_chunks();
1128    let graftable_chunks = grafting::graftable_chunks::<F>(*ops_leaves, grafting_height)
1129        .min(complete_chunks as u64) as usize;
1130    assert!(
1131        pruned_chunks <= graftable_chunks && graftable_chunks <= complete_chunks,
1132        "invariant violated: pruned={pruned_chunks} graftable={graftable_chunks} complete={complete_chunks}"
1133    );
1134
1135    // Compute grafted leaves for each unpruned graftable chunk. The pending chunk (if any)
1136    // sits at index `graftable_chunks` and is excluded; its digest is hashed directly into
1137    // the canonical root.
1138    let leaves = compute_grafted_leaves::<F, H, S, N>(
1139        hasher,
1140        ops_tree,
1141        (pruned_chunks..graftable_chunks).map(|chunk_idx| (chunk_idx, bitmap.get_chunk(chunk_idx))),
1142        strategy,
1143    )
1144    .await?;
1145
1146    // Build the base grafted tree: either from pruned components or empty.
1147    let grafted_hasher = grafting::GraftedHasher::<F, _>::new(hasher.clone(), grafting_height);
1148    let mut grafted_tree = if pruned_chunks > 0 {
1149        let grafted_pruning_boundary = Location::<F>::new(pruned_chunks as u64);
1150        Mem::from_components(Vec::new(), grafted_pruning_boundary, pinned_nodes.to_vec())
1151            .map_err(|_| Error::<F>::DataCorrupted("grafted tree rebuild failed"))?
1152    } else {
1153        Mem::new()
1154    };
1155
1156    // Add each grafted leaf digest.
1157    if !leaves.is_empty() {
1158        let batch = {
1159            let mut batch = grafted_tree.new_batch_with_strategy(strategy.clone());
1160            for &(_ops_pos, digest) in &leaves {
1161                batch = batch.add_leaf_digest(digest);
1162            }
1163            batch.merkleize(&grafted_tree, &grafted_hasher)
1164        };
1165        grafted_tree.apply_batch(&batch)?;
1166    }
1167
1168    Ok(grafted_tree)
1169}
1170
1171/// Load the metadata and recover the pruning state persisted by previous runs.
1172///
1173/// The metadata store holds two kinds of entries (keyed by prefix):
1174/// - **Pruned chunks count** ([PRUNED_CHUNKS_PREFIX]): the number of bitmap chunks that have been
1175///   pruned. This tells us where the active portion of the bitmap begins.
1176/// - **Pinned node digests** ([NODE_PREFIX]): grafted tree digests at peak positions whose
1177///   underlying data has been pruned. These are needed to recompute the grafted tree root without
1178///   the pruned chunks.
1179///
1180/// Returns `(metadata_handle, pruned_chunks, pinned_node_digests)`.
1181pub(super) async fn init_metadata<F: merkle::Graftable, E: Context, D: Digest>(
1182    context: E,
1183    partition: &str,
1184) -> Result<(Metadata<E, U64, Vec<u8>>, usize, Vec<D>), Error<F>> {
1185    let metadata_cfg = MConfig {
1186        partition: partition.into(),
1187        codec_config: ((0..).into(), ()),
1188    };
1189    let metadata =
1190        Metadata::<_, U64, Vec<u8>>::init(context.child("metadata"), metadata_cfg).await?;
1191
1192    let key = U64::new(PRUNED_CHUNKS_PREFIX, 0);
1193    let pruned_chunks = match metadata.get(&key) {
1194        Some(bytes) => u64::from_be_bytes(bytes.as_slice().try_into().map_err(|_| {
1195            error!("pruned chunks value not a valid u64");
1196            Error::<F>::DataCorrupted("pruned chunks value not a valid u64")
1197        })?),
1198        None => {
1199            warn!("bitmap metadata does not contain pruned chunks, initializing as empty");
1200            0
1201        }
1202    } as usize;
1203
1204    // Load pinned nodes if database was pruned. We use nodes_to_pin on the grafted leaf count
1205    // to determine how many peaks to read. (Multiplying pruned_chunks by chunk_size is a
1206    // left-shift, preserving popcount, so the peak count is the same in grafted or ops space.)
1207    let pinned_nodes = if pruned_chunks > 0 {
1208        let pruned_loc = Location::<F>::new(pruned_chunks as u64);
1209        if !pruned_loc.is_valid() {
1210            return Err(Error::DataCorrupted("pruned chunks exceeds MAX_LEAVES"));
1211        }
1212        let mut pinned = Vec::new();
1213        for (index, _pos) in F::nodes_to_pin(pruned_loc).enumerate() {
1214            let metadata_key = U64::new(NODE_PREFIX, index as u64);
1215            let Some(bytes) = metadata.get(&metadata_key) else {
1216                return Err(Error::DataCorrupted(
1217                    "missing pinned node in grafted tree metadata",
1218                ));
1219            };
1220            let digest = D::decode(bytes.as_ref())
1221                .map_err(|_| Error::<F>::DataCorrupted("invalid pinned node digest"))?;
1222            pinned.push(digest);
1223        }
1224        pinned
1225    } else {
1226        Vec::new()
1227    };
1228
1229    Ok((metadata, pruned_chunks, pinned_nodes))
1230}
1231
1232#[cfg(test)]
1233mod tests {
1234    use super::*;
1235    use crate::{
1236        merkle::{mmb, mmr, Bagging::ForwardFold},
1237        qmdb::{
1238            any::traits::{DbAny, UnmerkleizedBatch as _},
1239            current::{tests::fixed_config, unordered::fixed},
1240        },
1241        translator::OneCap,
1242    };
1243    use commonware_codec::FixedSize;
1244    use commonware_cryptography::{sha256, Sha256};
1245    use commonware_macros::test_traced;
1246    use commonware_runtime::{deterministic, Runner as _, Supervisor as _};
1247    use commonware_utils::bitmap::Prunable as PrunableBitMap;
1248
1249    const N: usize = sha256::Digest::SIZE;
1250
1251    #[test]
1252    fn partial_chunk_single_bit() {
1253        let mut bm = PrunableBitMap::<N>::new();
1254        bm.push(true);
1255        let result = partial_chunk::<PrunableBitMap<N>, N>(&bm);
1256        assert!(result.is_some());
1257        let (chunk, next_bit) = result.unwrap();
1258        assert_eq!(next_bit, 1);
1259        assert_eq!(chunk[0], 1); // bit 0 set
1260    }
1261
1262    #[test]
1263    fn partial_chunk_aligned() {
1264        let mut bm = PrunableBitMap::<N>::new();
1265        for _ in 0..PrunableBitMap::<N>::CHUNK_SIZE_BITS {
1266            bm.push(true);
1267        }
1268        let result = partial_chunk::<PrunableBitMap<N>, N>(&bm);
1269        assert!(result.is_none());
1270    }
1271
1272    #[test]
1273    fn partial_chunk_partial() {
1274        let mut bm = PrunableBitMap::<N>::new();
1275        for _ in 0..(PrunableBitMap::<N>::CHUNK_SIZE_BITS + 5) {
1276            bm.push(true);
1277        }
1278        let result = partial_chunk::<PrunableBitMap<N>, N>(&bm);
1279        assert!(result.is_some());
1280        let (_chunk, next_bit) = result.unwrap();
1281        assert_eq!(next_bit, 5);
1282    }
1283
1284    #[test]
1285    fn combine_roots_deterministic() {
1286        let hasher = StandardHasher::<Sha256>::new(ForwardFold);
1287        let ops = Sha256::hash(b"ops");
1288        let grafted = Sha256::hash(b"grafted");
1289        let r1 = combine_roots(&hasher, &ops, &grafted, None, None);
1290        let r2 = combine_roots(&hasher, &ops, &grafted, None, None);
1291        assert_eq!(r1, r2);
1292    }
1293
1294    #[test]
1295    fn combine_roots_with_partial_differs() {
1296        let hasher = StandardHasher::<Sha256>::new(ForwardFold);
1297        let ops = Sha256::hash(b"ops");
1298        let grafted = Sha256::hash(b"grafted");
1299        let partial_digest = Sha256::hash(b"partial");
1300
1301        let without = combine_roots(&hasher, &ops, &grafted, None, None);
1302        let with = combine_roots(&hasher, &ops, &grafted, None, Some((5, &partial_digest)));
1303        assert_ne!(without, with);
1304    }
1305
1306    #[test]
1307    fn combine_roots_with_pending_differs() {
1308        let hasher = StandardHasher::<Sha256>::new(ForwardFold);
1309        let ops = Sha256::hash(b"ops");
1310        let grafted = Sha256::hash(b"grafted");
1311        let pending_digest = Sha256::hash(b"pending");
1312
1313        let without = combine_roots(&hasher, &ops, &grafted, None, None);
1314        let with = combine_roots(&hasher, &ops, &grafted, Some(&pending_digest), None);
1315        assert_ne!(without, with);
1316    }
1317
1318    #[test]
1319    fn combine_roots_pending_and_partial_independent() {
1320        let hasher = StandardHasher::<Sha256>::new(ForwardFold);
1321        let ops = Sha256::hash(b"ops");
1322        let grafted = Sha256::hash(b"grafted");
1323        let pending_digest = Sha256::hash(b"pending");
1324        let partial_digest = Sha256::hash(b"partial");
1325
1326        let only_pending = combine_roots(&hasher, &ops, &grafted, Some(&pending_digest), None);
1327        let only_partial = combine_roots(&hasher, &ops, &grafted, None, Some((5, &partial_digest)));
1328        let both = combine_roots(
1329            &hasher,
1330            &ops,
1331            &grafted,
1332            Some(&pending_digest),
1333            Some((5, &partial_digest)),
1334        );
1335        assert_ne!(only_pending, only_partial);
1336        assert_ne!(only_pending, both);
1337        assert_ne!(only_partial, both);
1338    }
1339
1340    #[test]
1341    fn combine_roots_different_ops_root() {
1342        let hasher = StandardHasher::<Sha256>::new(ForwardFold);
1343        let ops_a = Sha256::hash(b"ops_a");
1344        let ops_b = Sha256::hash(b"ops_b");
1345        let grafted = Sha256::hash(b"grafted");
1346
1347        let r1 = combine_roots(&hasher, &ops_a, &grafted, None, None);
1348        let r2 = combine_roots(&hasher, &ops_b, &grafted, None, None);
1349        assert_ne!(r1, r2);
1350    }
1351
1352    /// Pin the canonical-root format down to the byte. A change to `combine_roots`'s hash
1353    /// pre-image (e.g., reordering, dropping a length tag, swapping pending/partial order)
1354    /// would silently break wire compatibility; this test catches that.
1355    #[test]
1356    fn combine_roots_format_golden() {
1357        let hasher = StandardHasher::<Sha256>::new(ForwardFold);
1358        let ops = Sha256::hash(b"ops");
1359        let grafted = Sha256::hash(b"grafted");
1360        let pending = Sha256::hash(b"pending");
1361        let partial = Sha256::hash(b"partial");
1362        let next_bit: u64 = 0x1122_3344_5566_7788;
1363
1364        // Neither pending nor partial.
1365        assert_eq!(
1366            combine_roots(&hasher, &ops, &grafted, None, None),
1367            hasher.hash([ops.as_ref(), grafted.as_ref()])
1368        );
1369
1370        // Pending only.
1371        assert_eq!(
1372            combine_roots(&hasher, &ops, &grafted, Some(&pending), None),
1373            hasher.hash([ops.as_ref(), grafted.as_ref(), pending.as_ref()])
1374        );
1375
1376        // Partial only.
1377        assert_eq!(
1378            combine_roots(&hasher, &ops, &grafted, None, Some((next_bit, &partial))),
1379            hasher.hash([
1380                ops.as_ref(),
1381                grafted.as_ref(),
1382                next_bit.to_be_bytes().as_slice(),
1383                partial.as_ref(),
1384            ])
1385        );
1386
1387        // Both: pending precedes partial.
1388        assert_eq!(
1389            combine_roots(
1390                &hasher,
1391                &ops,
1392                &grafted,
1393                Some(&pending),
1394                Some((next_bit, &partial))
1395            ),
1396            hasher.hash([
1397                ops.as_ref(),
1398                grafted.as_ref(),
1399                pending.as_ref(),
1400                next_bit.to_be_bytes().as_slice(),
1401                partial.as_ref(),
1402            ])
1403        );
1404    }
1405
1406    type MmrDb = fixed::Db<
1407        mmr::Family,
1408        deterministic::Context,
1409        sha256::Digest,
1410        sha256::Digest,
1411        Sha256,
1412        OneCap,
1413        32,
1414        commonware_parallel::Sequential,
1415    >;
1416    type MmbDb = fixed::Db<
1417        mmb::Family,
1418        deterministic::Context,
1419        sha256::Digest,
1420        sha256::Digest,
1421        Sha256,
1422        OneCap,
1423        32,
1424        commonware_parallel::Sequential,
1425    >;
1426
1427    async fn populate_fixed_db<F, DB>(db: &mut DB, start: u64, count: u64)
1428    where
1429        F: merkle::Graftable,
1430        DB: DbAny<F, Key = sha256::Digest, Value = sha256::Digest>,
1431    {
1432        let mut batch = db.new_batch();
1433        for idx in start..start + count {
1434            let key = Sha256::hash(&idx.to_be_bytes());
1435            let value = Sha256::hash(&(idx + count).to_be_bytes());
1436            batch = batch.write(key, Some(value));
1437        }
1438        let merkleized = batch.merkleize(db, None).await.unwrap();
1439        db.apply_batch(merkleized).await.unwrap();
1440        db.commit().await.unwrap();
1441    }
1442
1443    #[test_traced]
1444    fn test_ops_root_witness_verifies_without_partial_chunk() {
1445        let executor = deterministic::Runner::default();
1446        executor.start(|ctx| async move {
1447            let mut db = MmrDb::init(
1448                ctx.child("storage"),
1449                fixed_config::<OneCap>("ops-root-witness-full", &ctx),
1450            )
1451            .await
1452            .unwrap();
1453            let mut next_idx = 0;
1454            populate_fixed_db::<mmr::Family, _>(&mut db, next_idx, 256).await;
1455            next_idx += 256;
1456            while partial_chunk::<_, 32>(db.any.bitmap.as_ref()).is_some() {
1457                populate_fixed_db::<mmr::Family, _>(&mut db, next_idx, 1).await;
1458                next_idx += 1;
1459            }
1460
1461            let hasher = qmdb::hasher::<Sha256>();
1462            let witness = db.ops_root_witness(&hasher).await.unwrap();
1463            let ops_root = db.ops_root();
1464            let canonical_root = db.root();
1465
1466            assert!(witness.partial_chunk.is_none());
1467            assert!(witness.verify(&hasher, &ops_root, &canonical_root));
1468
1469            let wrong_ops_root = Sha256::hash(b"wrong ops root");
1470            assert!(!witness.verify(&hasher, &wrong_ops_root, &canonical_root));
1471
1472            let wrong_canonical_root = Sha256::hash(b"wrong canonical root");
1473            assert!(!witness.verify(&hasher, &ops_root, &wrong_canonical_root));
1474
1475            let mut tampered = witness;
1476            tampered.grafted_root = Sha256::hash(b"wrong grafted root");
1477            assert!(!tampered.verify(&hasher, &ops_root, &canonical_root));
1478        });
1479    }
1480
1481    #[test_traced]
1482    fn test_ops_root_witness_verifies_with_partial_chunk() {
1483        let executor = deterministic::Runner::default();
1484        executor.start(|ctx| async move {
1485            let mut db = MmbDb::init(
1486                ctx.child("storage"),
1487                fixed_config::<OneCap>("ops-root-witness-partial", &ctx),
1488            )
1489            .await
1490            .unwrap();
1491            populate_fixed_db::<mmb::Family, _>(&mut db, 0, 260).await;
1492
1493            let hasher = qmdb::hasher::<Sha256>();
1494            let witness = db.ops_root_witness(&hasher).await.unwrap();
1495            let ops_root = db.ops_root();
1496            let canonical_root = db.root();
1497
1498            assert!(witness.partial_chunk.is_some());
1499            assert!(witness.verify(&hasher, &ops_root, &canonical_root));
1500
1501            let wrong_ops_root = Sha256::hash(b"wrong ops root");
1502            assert!(!witness.verify(&hasher, &wrong_ops_root, &canonical_root));
1503
1504            let wrong_canonical_root = Sha256::hash(b"wrong canonical root");
1505            assert!(!witness.verify(&hasher, &ops_root, &wrong_canonical_root));
1506
1507            let mut tampered = witness.clone();
1508            tampered.grafted_root = Sha256::hash(b"wrong grafted root");
1509            assert!(!tampered.verify(&hasher, &ops_root, &canonical_root));
1510
1511            let mut tampered = witness.clone();
1512            tampered.partial_chunk.as_mut().unwrap().0 += 1;
1513            assert!(!tampered.verify(&hasher, &ops_root, &canonical_root));
1514
1515            let mut tampered = witness;
1516            tampered.partial_chunk.as_mut().unwrap().1 = Sha256::hash(b"wrong partial chunk");
1517            assert!(!tampered.verify(&hasher, &ops_root, &canonical_root));
1518        });
1519    }
1520
1521    #[test_traced]
1522    fn test_ops_root_witness_verifies_with_pruned_db() {
1523        let executor = deterministic::Runner::default();
1524        executor.start(|ctx| async move {
1525            let mut db = MmrDb::init(
1526                ctx.child("storage"),
1527                fixed_config::<OneCap>("ops-root-witness-pruned", &ctx),
1528            )
1529            .await
1530            .unwrap();
1531
1532            // Churn the same keys repeatedly to drive the inactivity floor past chunk boundaries.
1533            for _ in 0..5 {
1534                populate_fixed_db::<mmr::Family, _>(&mut db, 0, 512).await;
1535            }
1536            db.prune(db.sync_boundary()).await.unwrap();
1537            assert!(
1538                db.any.bitmap.pruned_chunks() > 0,
1539                "test requires at least one pruned chunk to exercise the zero-chunk path"
1540            );
1541
1542            let hasher = qmdb::hasher::<Sha256>();
1543            let witness = db.ops_root_witness(&hasher).await.unwrap();
1544            let ops_root = db.ops_root();
1545            let canonical_root = db.root();
1546
1547            assert!(witness.verify(&hasher, &ops_root, &canonical_root));
1548
1549            let wrong_canonical_root = Sha256::hash(b"wrong canonical root");
1550            assert!(!witness.verify(&hasher, &ops_root, &wrong_canonical_root));
1551
1552            let mut tampered = witness;
1553            tampered.grafted_root = Sha256::hash(b"wrong grafted root");
1554            assert!(!tampered.verify(&hasher, &ops_root, &canonical_root));
1555        });
1556    }
1557
1558    #[test_traced]
1559    fn test_ops_root_witness_verifies_on_fresh_db() {
1560        let executor = deterministic::Runner::default();
1561        executor.start(|ctx| async move {
1562            let db = MmrDb::init(
1563                ctx.child("storage"),
1564                fixed_config::<OneCap>("ops-root-witness-fresh", &ctx),
1565            )
1566            .await
1567            .unwrap();
1568
1569            let hasher = qmdb::hasher::<Sha256>();
1570            let witness = db.ops_root_witness(&hasher).await.unwrap();
1571            let ops_root = db.ops_root();
1572            let canonical_root = db.root();
1573
1574            assert!(witness.verify(&hasher, &ops_root, &canonical_root));
1575        });
1576    }
1577}