Skip to main content

commonware_storage/qmdb/any/
db.rs

1//! A shared, generic implementation of the _Any_ QMDB.
2//!
3//! The impl blocks in this file define shared functionality across all Any QMDB variants.
4
5use super::operation::{update::Update, Operation};
6use crate::{
7    index::Unordered as UnorderedIndex,
8    journal::{
9        authenticated,
10        contiguous::{Contiguous, Mutable, Reader},
11        Error as JournalError,
12    },
13    merkle::{Family, Location, Proof},
14    qmdb::{
15        bitmap::Shared,
16        build_snapshot_from_log, delete_known_loc,
17        metrics::{KeyReadMetrics, OperationMetrics, StateMetrics},
18        operation::Operation as OperationTrait,
19        update_known_loc, Error,
20    },
21    Context, Persistable,
22};
23use commonware_codec::{Codec, CodecShared};
24use commonware_cryptography::Hasher;
25use commonware_parallel::Strategy;
26use commonware_utils::bitmap;
27use core::num::NonZeroU64;
28use std::{collections::HashMap, sync::Arc};
29
30/// Metrics for Any QMDBs.
31pub(crate) struct Metrics<E: Context> {
32    /// State gauges.
33    pub state: StateMetrics,
34    /// Write and durability metrics.
35    pub operations: OperationMetrics<E>,
36    /// Key read metrics.
37    pub reads: KeyReadMetrics<E>,
38}
39
40impl<E: Context> Metrics<E> {
41    /// Create and register metrics.
42    pub fn new(context: E) -> Self {
43        let context = Arc::new(context);
44        Self {
45            state: StateMetrics::new(context.as_ref()),
46            operations: OperationMetrics::new(context.clone()),
47            reads: KeyReadMetrics::new(context),
48        }
49    }
50}
51
52/// Type alias for the authenticated journal used by [Db].
53pub(crate) type AuthenticatedLog<F, E, C, H, S> = authenticated::Journal<F, E, C, H, S>;
54
55/// Snapshot mutation needed to undo one operation while rewinding.
56enum SnapshotUndo<F: Family, K> {
57    Replace {
58        key: K,
59        old_loc: Location<F>,
60        new_loc: Location<F>,
61    },
62    Remove {
63        key: K,
64        old_loc: Location<F>,
65    },
66    Insert {
67        key: K,
68        new_loc: Location<F>,
69    },
70}
71
72/// An "Any" QMDB implementation generic over ordered/unordered keys and variable/fixed values.
73/// Consider using one of the following specialized variants instead, which may be more ergonomic:
74/// - [crate::qmdb::any::ordered::fixed::Db]
75/// - [crate::qmdb::any::ordered::variable::Db]
76/// - [crate::qmdb::any::unordered::fixed::Db]
77/// - [crate::qmdb::any::unordered::variable::Db]
78///
79/// `N` is the bitmap chunk size in bytes; defaults to `BITMAP_CHUNK_BYTES`. `current::Db`
80/// overrides `N` to match its grafted-tree configuration.
81pub struct Db<
82    F: Family,
83    E: Context,
84    C: Contiguous<Item: CodecShared>,
85    I: UnorderedIndex<Value = Location<F>>,
86    H: Hasher,
87    U: Send + Sync,
88    const N: usize,
89    S: Strategy,
90> {
91    /// A (pruned) log of all operations in order of their application. The index of each
92    /// operation in the log is called its _location_, which is a stable identifier.
93    ///
94    /// # Invariants
95    ///
96    /// - The log is never pruned beyond the inactivity floor.
97    /// - There is always at least one commit operation in the log.
98    pub(crate) log: AuthenticatedLog<F, E, C, H, S>,
99
100    /// Cached operations root for this database.
101    pub(crate) root: H::Digest,
102
103    /// A location before which all operations are "inactive" (that is, operations before this point
104    /// are over keys that have been updated by some operation at or after this point).
105    pub(crate) inactivity_floor_loc: Location<F>,
106
107    /// The location of the last commit operation.
108    pub(crate) last_commit_loc: Location<F>,
109
110    /// A snapshot of all currently active operations in the form of a map from each key to the
111    /// location in the log containing its most recent update.
112    ///
113    /// # Invariant
114    ///
115    /// - Only references `Operation::Update`s.
116    pub(crate) snapshot: I,
117
118    /// The number of active keys in the snapshot.
119    pub(crate) active_keys: usize,
120
121    /// Activity bitmap over committed operations. Rebuilt from the journal on init; never
122    /// persisted. A hint for floor-raise scans; merkleization re-verifies via `is_active_at`.
123    /// When wrapped by `current::Db`, this is also the bitmap that `current` reads for grafted-
124    /// tree leaves and proofs.
125    ///
126    /// # Invariants
127    ///
128    /// - `bitmap.len() == log.size()`.
129    /// - `bitmap[i] == 0` implies location `i` is inactive (false negatives are forbidden).
130    /// - CommitFloor: only the current `last_commit_loc` carries bit = 1; earlier commits
131    ///   are 0.
132    pub(crate) bitmap: Arc<Shared<N>>,
133
134    /// Metrics for this database.
135    pub(crate) metrics: Metrics<E>,
136
137    /// Marker for the update type parameter.
138    pub(crate) _update: core::marker::PhantomData<U>,
139}
140
141// Shared read-only functionality.
142impl<F, E, U, C, I, H, const N: usize, S> Db<F, E, C, I, H, U, N, S>
143where
144    F: Family,
145    E: Context,
146    U: Update,
147    C: Contiguous<Item = Operation<F, U>>,
148    I: UnorderedIndex<Value = Location<F>>,
149    H: Hasher,
150    S: Strategy,
151    Operation<F, U>: Codec,
152{
153    /// Return the inactivity floor location. This is the location before which all operations are
154    /// known to be inactive. Operations before this point can be safely pruned.
155    #[cfg(any(test, feature = "test-traits"))]
156    pub(crate) const fn inactivity_floor_loc(&self) -> Location<F> {
157        self.inactivity_floor_loc
158    }
159
160    /// Return the most recent location from which this database can safely be synced, and the
161    /// upper bound on [`Self::prune`]'s `loc`. For `any`, this equals the inactivity floor.
162    pub const fn sync_boundary(&self) -> Location<F> {
163        self.inactivity_floor_loc
164    }
165
166    /// Whether the snapshot currently has no active keys.
167    pub const fn is_empty(&self) -> bool {
168        self.active_keys == 0
169    }
170
171    /// Get the metadata associated with the last commit.
172    pub async fn get_metadata(&self) -> Result<Option<U::Value>, crate::qmdb::Error<F>> {
173        match self.log.reader().await.read(*self.last_commit_loc).await? {
174            Operation::CommitFloor(metadata, _) => Ok(metadata),
175            _ => unreachable!("last commit is not a CommitFloor operation"),
176        }
177    }
178
179    /// Return the canonical QMDB operations root.
180    pub const fn root(&self) -> H::Digest {
181        self.root
182    }
183
184    /// Return the inactive_peaks count for the given leaf count and inactivity floor.
185    pub(crate) fn inactive_peaks(
186        &self,
187        leaves: Location<F>,
188        inactivity_floor: Location<F>,
189    ) -> usize {
190        F::inactive_peaks(F::location_to_position(leaves), inactivity_floor)
191    }
192
193    /// Return a reference to the merkleization strategy.
194    pub const fn strategy(&self) -> &S {
195        self.log.strategy()
196    }
197
198    /// Get the value of `key` in the db, or None if it has no value.
199    pub async fn get(&self, key: &U::Key) -> Result<Option<U::Value>, crate::qmdb::Error<F>> {
200        let _timer = self.metrics.reads.get_timer();
201        self.metrics.reads.get_calls.inc();
202        self.metrics.reads.keys_requested.inc();
203        // Collect to avoid holding a borrow across await points (rust-lang/rust#100013).
204        let locs: Vec<Location<F>> = self.snapshot.get(key).copied().collect();
205        let reader = self.log.reader().await;
206        let mut result = None;
207        for loc in locs {
208            let op = reader.read(*loc).await?;
209            let Operation::Update(data) = op else {
210                panic!("location does not reference update operation. loc={loc}");
211            };
212            if data.key() == key {
213                result = Some(data.value().clone());
214                break;
215            }
216        }
217
218        Ok(result)
219    }
220
221    /// Batch read multiple keys.
222    ///
223    /// Returns results in the same order as the input keys.
224    pub async fn get_many(
225        &self,
226        keys: &[&U::Key],
227    ) -> Result<Vec<Option<U::Value>>, crate::qmdb::Error<F>> {
228        if keys.is_empty() {
229            return Ok(Vec::new());
230        }
231
232        let _timer = self.metrics.reads.get_many_timer();
233        self.metrics.reads.get_many_calls.inc();
234        self.metrics.reads.keys_requested.inc_by(keys.len() as u64);
235
236        // Phase 1: Collect candidate locations from the in-memory index.
237        // Each key may map to multiple locations due to hash collisions.
238        let mut candidates: Vec<(usize, u64)> = Vec::with_capacity(keys.len());
239        let mut results: Vec<Option<U::Value>> = vec![None; keys.len()];
240
241        for (key_idx, key) in keys.iter().enumerate() {
242            for &loc in self.snapshot.get(key) {
243                candidates.push((key_idx, *loc));
244            }
245        }
246
247        if candidates.is_empty() {
248            return Ok(results);
249        }
250
251        // Phase 2: Sort by position for batched journal reads, then deduplicate.
252        candidates.sort_unstable_by_key(|&(_, pos)| pos);
253
254        let mut positions: Vec<u64> = Vec::with_capacity(candidates.len());
255        for &(_, pos) in &candidates {
256            if positions.last() != Some(&pos) {
257                positions.push(pos);
258            }
259        }
260
261        // Phase 3: Batch-read from the journal (one reader acquisition, one I/O batch).
262        let reader = self.log.reader().await;
263        let ops = reader.read_many(&positions).await?;
264
265        // Phase 4: Match operations back to keys via binary search (no HashMap).
266        for &(key_idx, pos) in &candidates {
267            if results[key_idx].is_some() {
268                continue;
269            }
270            let op_idx = positions
271                .binary_search(&pos)
272                .expect("position was deduped from candidates");
273            let Operation::Update(data) = &ops[op_idx] else {
274                panic!("location does not reference update operation. loc={pos}");
275            };
276            if data.key() == keys[key_idx] {
277                results[key_idx] = Some(data.value().clone());
278            }
279        }
280
281        Ok(results)
282    }
283
284    /// Return [start, end) where `start` and `end - 1` are the Locations of the oldest and newest
285    /// retained operations respectively.
286    pub async fn bounds(&self) -> std::ops::Range<Location<F>> {
287        let bounds = self.log.reader().await.bounds();
288        Location::new(bounds.start)..Location::new(bounds.end)
289    }
290
291    /// Update state gauges from the current database state.
292    pub(crate) async fn update_metrics(&self) {
293        let bounds = self.log.reader().await.bounds();
294        self.metrics.state.set(
295            bounds.end,
296            bounds.start,
297            *self.inactivity_floor_loc,
298            *self.last_commit_loc,
299        );
300    }
301
302    /// Return the pinned Merkle nodes for a lower operation boundary of `loc`.
303    pub async fn pinned_nodes_at(
304        &self,
305        loc: Location<F>,
306    ) -> Result<Vec<H::Digest>, crate::qmdb::Error<F>> {
307        self.log
308            .merkle
309            .pinned_nodes_at(loc)
310            .await
311            .map_err(Into::into)
312    }
313}
314
315// Functionality requiring Mutable journal.
316impl<F, E, U, C, I, H, const N: usize, S> Db<F, E, C, I, H, U, N, S>
317where
318    F: Family,
319    E: Context,
320    U: Update,
321    C: Mutable<Item = Operation<F, U>>,
322    I: UnorderedIndex<Value = Location<F>>,
323    H: Hasher,
324    S: Strategy,
325    Operation<F, U>: Codec,
326{
327    /// Prune the bitmap to `prune_loc`, rounded down to a chunk boundary. Skips the
328    /// inactivity-floor check.
329    pub(crate) fn prune_bitmap(&mut self, prune_loc: Location<F>) {
330        self.bitmap.write().prune_to_bit(*prune_loc);
331    }
332
333    /// Prune the operations log to `prune_loc`. Does not touch the bitmap.
334    ///
335    /// Journal pruning is section-granular, so the actual pruned boundary may be less than
336    /// the requested `prune_loc`. Returns that actual boundary so callers can keep the bitmap
337    /// aligned with the journal's retained start.
338    ///
339    /// # Errors
340    ///
341    /// - Returns [crate::qmdb::Error::PruneBeyondMinRequired] if `prune_loc` > inactivity floor.
342    /// - Returns [`crate::merkle::Error::LocationOverflow`] if `prune_loc` > [`crate::merkle::Family::MAX_LEAVES`].
343    pub(crate) async fn prune_log(
344        &mut self,
345        prune_loc: Location<F>,
346    ) -> Result<Location<F>, crate::qmdb::Error<F>> {
347        if prune_loc > self.inactivity_floor_loc {
348            return Err(crate::qmdb::Error::PruneBeyondMinRequired(
349                prune_loc,
350                self.inactivity_floor_loc,
351            ));
352        }
353
354        Ok(self.log.prune(prune_loc).await?)
355    }
356
357    /// Prune historical operations prior to `prune_loc`. This does not affect the db's root or
358    /// snapshot.
359    #[tracing::instrument(
360        name = "qmdb::any::Db::prune",
361        level = "info",
362        skip_all,
363        fields(
364            requested_loc = *prune_loc,
365            inactivity_floor = *self.inactivity_floor_loc,
366        ),
367    )]
368    pub async fn prune(&mut self, prune_loc: Location<F>) -> Result<(), crate::qmdb::Error<F>> {
369        let _timer = self.metrics.operations.prune_timer();
370        self.metrics.operations.prune_calls.inc();
371        let actual_pruned = self.prune_log(prune_loc).await?;
372        self.prune_bitmap(actual_pruned);
373        self.update_metrics().await;
374        Ok(())
375    }
376
377    /// Returns a historical proof for `historical_size` operations, anchored at `start_loc`
378    /// and bounded by `max_ops`.
379    ///
380    /// # Contract
381    ///
382    /// `historical_size` must be a commit-boundary size: the operation at `historical_size - 1`
383    /// must itself be a commit op declaring the governing inactivity floor.
384    ///
385    /// # Errors
386    ///
387    /// Returns [`crate::qmdb::Error::HistoricalFloorPruned`] if `historical_size - 1` is retained
388    /// but is not a commit op, either because the caller passed a non-commit-boundary size or
389    /// because pruning removed the commit that would have governed it.
390    #[allow(clippy::type_complexity)]
391    #[tracing::instrument(
392        name = "qmdb::any::Db::historical_proof",
393        level = "info",
394        skip_all,
395        fields(
396            historical_size = *historical_size,
397            start_loc = *start_loc,
398            max_ops = max_ops.get(),
399        ),
400    )]
401    pub async fn historical_proof(
402        &self,
403        historical_size: Location<F>,
404        start_loc: Location<F>,
405        max_ops: NonZeroU64,
406    ) -> Result<(Proof<F, H::Digest>, Vec<Operation<F, U>>), crate::qmdb::Error<F>> {
407        if historical_size > self.log.size().await {
408            return Err(crate::qmdb::Error::Merkle(
409                crate::merkle::Error::RangeOutOfBounds(historical_size),
410            ));
411        }
412
413        let inactivity_floor = {
414            let reader = self.log.reader().await;
415            crate::qmdb::find_inactivity_floor_at::<F, _>(&reader, historical_size, |op| {
416                op.has_floor()
417            })
418            .await?
419        };
420        let inactive_peaks = self.inactive_peaks(historical_size, inactivity_floor);
421        self.log
422            .historical_proof(historical_size, start_loc, max_ops, inactive_peaks)
423            .await
424            .map_err(Into::into)
425    }
426
427    pub async fn proof(
428        &self,
429        loc: Location<F>,
430        max_ops: NonZeroU64,
431    ) -> Result<(Proof<F, H::Digest>, Vec<Operation<F, U>>), crate::qmdb::Error<F>> {
432        self.historical_proof(self.log.size().await, loc, max_ops)
433            .await
434    }
435
436    /// Rewind the database to `size` operations, where `size` is the location of the next append.
437    ///
438    /// This rewinds both the authenticated log and the in-memory snapshot, then restores metadata
439    /// (`last_commit_loc`, `inactivity_floor_loc`, `active_keys`) for the new tip commit.
440    ///
441    /// # Errors
442    ///
443    /// Returns an error when:
444    /// - `size` is not a valid rewind target
445    /// - the target's required logical range is not fully retained (for example, the target
446    ///   inactivity floor is pruned)
447    /// - `size - 1` is not a commit operation
448    ///
449    /// Any error from this method is fatal for this handle. Rewind may mutate journal state before
450    /// all in-memory structures are rebuilt. Callers must drop this database handle after any `Err`
451    /// from `rewind` and reopen from storage.
452    ///
453    /// A successful rewind is not restart-stable until a subsequent [`Db::commit`] or
454    /// [`Db::sync`].
455    #[tracing::instrument(
456        name = "qmdb::any::Db::rewind",
457        level = "info",
458        skip_all,
459        fields(
460            target_size = *size,
461            prev_size = *self.last_commit_loc + 1,
462        ),
463    )]
464    pub async fn rewind(&mut self, size: Location<F>) -> Result<(), Error<F>> {
465        let rewind_size = *size;
466        let current_size = *self.last_commit_loc + 1;
467
468        if rewind_size == current_size {
469            return Ok(());
470        }
471        if rewind_size == 0 || rewind_size > current_size {
472            return Err(Error::Journal(JournalError::InvalidRewind(rewind_size)));
473        }
474
475        // Read everything needed for rewind before mutating storage.
476        let (rewind_floor, undos, active_keys_delta) = {
477            let reader = self.log.reader().await;
478            let bounds = reader.bounds();
479            let rewind_last_loc = Location::new(rewind_size - 1);
480            if rewind_size <= bounds.start {
481                return Err(Error::<F>::Journal(JournalError::ItemPruned(
482                    *rewind_last_loc,
483                )));
484            }
485            let rewind_last_op = reader.read(*rewind_last_loc).await?;
486            let Some(rewind_floor) = rewind_last_op.has_floor() else {
487                return Err(Error::UnexpectedData(rewind_last_loc));
488            };
489            if *rewind_floor < bounds.start {
490                return Err(Error::<F>::Journal(JournalError::ItemPruned(*rewind_floor)));
491            }
492
493            let mut undos = Vec::with_capacity((current_size - rewind_size) as usize);
494            let mut active_keys_delta = 0isize;
495            let mut prior_state_by_key: HashMap<U::Key, Option<Location<F>>> = HashMap::new();
496
497            // Reconstruct key state once in a single pass from the rewind floor.
498            for loc in *rewind_floor..current_size {
499                let op = reader.read(loc).await?;
500                let op_loc = Location::new(loc);
501                match op {
502                    Operation::CommitFloor(_, _) => {}
503                    Operation::Update(update) => {
504                        let key = update.key().clone();
505                        let previous_loc = prior_state_by_key.get(&key).copied().flatten();
506
507                        if loc >= rewind_size {
508                            if let Some(previous_loc) = previous_loc {
509                                undos.push(SnapshotUndo::Replace {
510                                    key: key.clone(),
511                                    old_loc: op_loc,
512                                    new_loc: previous_loc,
513                                });
514                            } else {
515                                active_keys_delta -= 1;
516                                undos.push(SnapshotUndo::Remove {
517                                    key: key.clone(),
518                                    old_loc: op_loc,
519                                });
520                            }
521                        }
522
523                        prior_state_by_key.insert(key, Some(op_loc));
524                    }
525                    Operation::Delete(key) => {
526                        let previous_loc = prior_state_by_key.get(&key).copied().flatten();
527
528                        if loc >= rewind_size {
529                            if let Some(previous_loc) = previous_loc {
530                                active_keys_delta += 1;
531                                undos.push(SnapshotUndo::Insert {
532                                    key: key.clone(),
533                                    new_loc: previous_loc,
534                                });
535                            }
536                        }
537
538                        prior_state_by_key.insert(key, None);
539                    }
540                }
541            }
542
543            // Undo operations must run from newest to oldest removed operation.
544            undos.reverse();
545
546            (rewind_floor, undos, active_keys_delta)
547        };
548
549        // Journal rewind happens before in-memory undo application. If any later step fails, this
550        // handle may be internally diverged and must be dropped by the caller. This step is not
551        // restart-stable until a later commit/sync boundary.
552        self.log.rewind(rewind_size).await?;
553
554        // Drop bitmap bits for ops at or above the rewind target. Restored locs below
555        // rewind_size flip back to active in the loop below. `rewind_size >= bitmap.pruned_bits()`
556        // is enforced upstream: directly via the `bounds.start` check above, or via
557        // `current::Db::rewind`'s explicit `pruned_bits` precondition. The debug_assert catches
558        // regressions.
559        {
560            let mut bitmap = self.bitmap.write();
561            assert!(
562                bitmap.pruned_bits() <= rewind_size,
563                "bitmap pruned boundary exceeded journal retained start",
564            );
565            bitmap.truncate(rewind_size);
566
567            for undo in undos {
568                match undo {
569                    SnapshotUndo::Replace {
570                        key,
571                        old_loc,
572                        new_loc,
573                    } => {
574                        if new_loc < rewind_size {
575                            bitmap.set_bit(*new_loc, true);
576                        }
577                        update_known_loc(&mut self.snapshot, &key, old_loc, new_loc);
578                    }
579                    SnapshotUndo::Remove { key, old_loc } => {
580                        delete_known_loc(&mut self.snapshot, &key, old_loc)
581                    }
582                    SnapshotUndo::Insert { key, new_loc } => {
583                        if new_loc < rewind_size {
584                            bitmap.set_bit(*new_loc, true);
585                        }
586                        self.snapshot.insert(&key, new_loc);
587                    }
588                }
589            }
590
591            // The rewound tail's preceding op (validated above) is the new `last_commit_loc`.
592            // Set its bit to 1 to match the CommitFloor convention; previous intermediate
593            // commits in the truncated range stay at 0 from `truncate`. `rewind_size > 0` is
594            // guaranteed by the early-return at the top of this function.
595            bitmap.set_bit(rewind_size - 1, true);
596        }
597
598        self.active_keys = self
599            .active_keys
600            .checked_add_signed(active_keys_delta)
601            .ok_or(Error::DataCorrupted(
602                "active_keys underflow while rewinding",
603            ))?;
604        self.last_commit_loc = Location::new(rewind_size - 1);
605        self.inactivity_floor_loc = rewind_floor;
606        self.root = self
607            .log
608            .root(self.inactive_peaks(Location::new(rewind_size), rewind_floor))?;
609        self.update_metrics().await;
610
611        Ok(())
612    }
613}
614
615// Functionality requiring Mutable + Persistable journal.
616impl<F, E, U, C, I, H, const N: usize, S> Db<F, E, C, I, H, U, N, S>
617where
618    F: Family,
619    E: Context,
620    U: Update,
621    C: Mutable<Item = Operation<F, U>> + Persistable<Error = JournalError>,
622    I: UnorderedIndex<Value = Location<F>>,
623    H: Hasher,
624    S: Strategy,
625    Operation<F, U>: Codec,
626{
627    /// Returns a [Db] initialized from `log`. `shared_bitmap = None` allocates a fresh bitmap;
628    /// `Some(b)` adopts a pre-allocated bitmap (used by `current::Db`, which sizes pruned chunks
629    /// from grafted metadata).
630    ///
631    /// # Panics
632    ///
633    /// Panics if the last operation is not a commit floor operation. Empty logs are handled
634    /// upstream by [`crate::qmdb::any::init_with_bitmap`].
635    pub(crate) async fn init_from_log(
636        mut index: I,
637        log: AuthenticatedLog<F, E, C, H, S>,
638        shared_bitmap: Option<Arc<Shared<N>>>,
639        metrics: Metrics<E>,
640    ) -> Result<Self, crate::qmdb::Error<F>> {
641        let (last_commit_loc, inactivity_floor_loc, active_keys, bitmap) = {
642            let reader = log.reader().await;
643            let bounds = reader.bounds();
644            let last_commit_loc = Location::new(
645                bounds
646                    .end
647                    .checked_sub(1)
648                    .ok_or(Error::HistoricalFloorPruned(Location::new(bounds.end)))?,
649            );
650            let inactivity_floor_loc = crate::qmdb::find_inactivity_floor_at::<F, _>(
651                &reader,
652                Location::new(bounds.end),
653                |op| op.has_floor(),
654            )
655            .await?;
656
657            // Seed the bitmap so its pruned prefix matches the retained log boundary. Bits in
658            // [pruned_bits, bounds.start) correspond to pruned operations and remain 0; replay
659            // appends bits from the inactivity floor onward.
660            let bitmap = shared_bitmap.unwrap_or_else(|| {
661                let pruned_chunks =
662                    (bounds.start / bitmap::Prunable::<N>::CHUNK_SIZE_BITS) as usize;
663                let bm = bitmap::Prunable::<N>::new_with_pruned_chunks(pruned_chunks)
664                    .expect("pruned chunk count fits in u64 bits");
665                Arc::new(Shared::new(bm))
666            });
667
668            // Extend the bitmap up to the inactivity floor (zero-fill).
669            {
670                let mut guard = bitmap.write();
671                // A caller-supplied bitmap must be pruned to a chunk boundary at or below the
672                // inactivity floor; otherwise `extend_to` would silently leave gaps.
673                assert!(
674                    guard.pruned_bits() <= *inactivity_floor_loc,
675                    "shared_bitmap pruned_bits {} exceeds inactivity_floor_loc {}",
676                    guard.pruned_bits(),
677                    *inactivity_floor_loc,
678                );
679                guard.extend_to(*inactivity_floor_loc);
680            }
681
682            // Replay through `build_snapshot_from_log`. The closure fires synchronously between
683            // the helper's awaits, so each invocation does its own brief lock-update-release.
684            // Holding the guard across `.await` would not be `Send`-safe.
685            let active_keys = {
686                let bitmap = &bitmap;
687                build_snapshot_from_log(
688                    inactivity_floor_loc,
689                    &reader,
690                    &mut index,
691                    |is_active, old_loc| {
692                        let mut guard = bitmap.write();
693                        guard.push(is_active);
694                        if let Some(loc) = old_loc {
695                            guard.set_bit(*loc, false);
696                        }
697                    },
698                )
699                .await?
700            };
701
702            // CommitFloor convention: only the current `last_commit_loc` carries bit=1; earlier
703            // CommitFloors are 0. `build_snapshot_from_log` reports `is_active = (loc ==
704            // last_commit_loc)` for each CommitFloor op, so the per-op push above already
705            // encodes this.
706
707            (last_commit_loc, inactivity_floor_loc, active_keys, bitmap)
708        };
709
710        // The bitmap must have exactly one bit per retained log location.
711        if bitmap::Readable::<N>::len(bitmap.as_ref()) != log.size().await {
712            return Err(crate::qmdb::Error::DataCorrupted(
713                "bitmap length diverged from log size during init",
714            ));
715        }
716
717        let inactive_peaks = F::inactive_peaks(
718            F::location_to_position(log.merkle.leaves()),
719            inactivity_floor_loc,
720        );
721        let root = log.root(inactive_peaks)?;
722
723        let db = Self {
724            log,
725            root,
726            inactivity_floor_loc,
727            snapshot: index,
728            last_commit_loc,
729            active_keys,
730            bitmap,
731            metrics,
732            _update: core::marker::PhantomData,
733        };
734        db.update_metrics().await;
735        Ok(db)
736    }
737
738    /// Sync all database state to disk.
739    #[tracing::instrument(
740        name = "qmdb::any::Db::sync",
741        level = "info",
742        skip_all,
743        fields(
744            db_size = *self.last_commit_loc + 1,
745            inactivity_floor = *self.inactivity_floor_loc,
746            active_keys = self.active_keys as u64,
747        ),
748    )]
749    pub async fn sync(&self) -> Result<(), crate::qmdb::Error<F>> {
750        let _timer = self.metrics.operations.sync_timer();
751        self.metrics.operations.sync_calls.inc();
752        self.log.sync().await?;
753        Ok(())
754    }
755
756    /// Durably commit the journal state published by prior [`Db::apply_batch`]
757    /// calls.
758    #[tracing::instrument(
759        name = "qmdb::any::Db::commit",
760        level = "info",
761        skip_all,
762        fields(
763            db_size = *self.last_commit_loc + 1,
764            inactivity_floor = *self.inactivity_floor_loc,
765            active_keys = self.active_keys as u64,
766        ),
767    )]
768    pub async fn commit(&self) -> Result<(), crate::qmdb::Error<F>> {
769        let _timer = self.metrics.operations.commit_timer();
770        self.metrics.operations.commit_calls.inc();
771        self.log.commit().await?;
772        Ok(())
773    }
774
775    /// Destroy the db, removing all data from disk.
776    pub async fn destroy(self) -> Result<(), crate::qmdb::Error<F>> {
777        self.log.destroy().await.map_err(Into::into)
778    }
779}
780
781impl<F, E, U, C, I, H, const N: usize, S> Persistable for Db<F, E, C, I, H, U, N, S>
782where
783    F: Family,
784    E: Context,
785    U: Update,
786    C: Mutable<Item = Operation<F, U>> + Persistable<Error = JournalError>,
787    I: UnorderedIndex<Value = Location<F>>,
788    H: Hasher,
789    S: Strategy,
790    Operation<F, U>: Codec,
791{
792    type Error = crate::qmdb::Error<F>;
793
794    async fn commit(&self) -> Result<(), crate::qmdb::Error<F>> {
795        Self::commit(self).await
796    }
797
798    async fn sync(&self) -> Result<(), crate::qmdb::Error<F>> {
799        Self::sync(self).await
800    }
801
802    async fn destroy(self) -> Result<(), crate::qmdb::Error<F>> {
803        self.destroy().await
804    }
805}