Skip to main content

commonware_storage/qmdb/immutable/
compact.rs

1//! An immutable authenticated db that does not retain historical operations after sync.
2//!
3//! Mirrors the API of [`crate::qmdb::immutable::Immutable`] (`new_batch -> merkleize ->
4//! apply_batch -> sync`, pipelined batch chains, `StaleBatch` validation) but is backed by
5//! the peak-only [`crate::merkle::compact`]. Because history is discarded, there are no
6//! `get` / `proof` / `bounds` methods; use the full variant if you need them.
7//!
8//! # Compact serving witness
9//!
10//! On every durable sync, this db persists the encoded last-commit operation together with its
11//! inclusion proof against the current root. Reopen and rewind re-verify that proof; corruption
12//! surfaces as [`Error::DataCorrupted`]. This authenticated witness is what lets compact nodes
13//! serve compact sync without retaining historical operations.
14//!
15//! # Inactivity floor
16//!
17//! Commits still carry an inactivity floor, but only for wire-format compatibility with
18//! [`crate::qmdb::immutable::Immutable`]: the root is computed over the encoded operation
19//! sequence, and that sequence must include the same floor to produce the same root as the
20//! full variant. Here the floor has no effect on pruning or snapshot rebuilding. All
21//! historical in-memory state is discarded on every `sync`.
22
23use super::operation::Operation;
24use crate::{
25    merkle::{batch, compact as compact_merkle, Family, Location, Proof},
26    qmdb::{
27        self,
28        any::value::ValueEncoding,
29        batch_chain::{self, Bounds},
30        compact::{
31            batch as compact_batch,
32            witness::{self, ServeState},
33        },
34        operation::Key,
35        sync::compact as compact_sync,
36        Error,
37    },
38    Context,
39};
40use commonware_codec::{Decode as _, Encode, EncodeShared, Read};
41use commonware_cryptography::{Digest, Hasher};
42use commonware_parallel::Strategy;
43use core::marker::PhantomData;
44use std::{
45    collections::BTreeMap,
46    sync::{Arc, Weak},
47};
48
49/// Configuration for a compact immutable authenticated db.
50#[derive(Clone)]
51pub struct Config<C, S: Strategy> {
52    /// Configuration for the backing compact Merkle structure.
53    pub merkle: compact_merkle::Config<S>,
54
55    /// Codec config used to decode the persisted last commit operation on reopen.
56    pub commit_codec_config: C,
57}
58
59/// An immutable authenticated db that does not retain historical operations after sync.
60pub struct Db<F, E, K, V, H, C, S: Strategy>
61where
62    F: Family,
63    E: Context,
64    K: Key,
65    V: ValueEncoding,
66    H: Hasher,
67    Operation<F, K, V>: EncodeShared,
68    Operation<F, K, V>: Read<Cfg = C>,
69    C: Clone + Send + Sync + 'static,
70{
71    merkle: compact_merkle::Merkle<F, E, H::Digest, S>,
72    last_commit_loc: Location<F>,
73    last_commit_metadata: Option<V::Value>,
74    inactivity_floor_loc: Location<F>,
75    commit_codec_config: C,
76    /// Cache of the last durably persisted compact witness.
77    ///
78    /// This cache is rebuilt from persisted witness bytes on reopen/rewind and refreshed on
79    /// [`Self::sync`]. It intentionally does not track unsynced in-memory mutations, so compact
80    /// serving never advertises state that has not been durably persisted.
81    witness: witness::Cache<F, H::Digest>,
82    _key: PhantomData<K>,
83}
84
85type CompactStateResult<F, K, V, D> =
86    Result<compact_sync::State<F, Operation<F, K, V>, D>, compact_sync::ServeError<F, D>>;
87
88/// A speculative batch for a compact immutable db.
89#[allow(clippy::type_complexity)]
90pub struct UnmerkleizedBatch<F, H, K, V, S: Strategy>
91where
92    F: Family,
93    K: Key,
94    V: ValueEncoding,
95    H: Hasher,
96    Operation<F, K, V>: EncodeShared,
97{
98    merkle_batch: compact_merkle::UnmerkleizedBatch<F, H::Digest, S>,
99    mutations: BTreeMap<K, V::Value>,
100    parent: Option<Arc<MerkleizedBatch<F, H::Digest, K, V, S>>>,
101    base_size: u64,
102    db_size: u64,
103}
104
105/// A merkleized batch for a compact immutable db.
106#[derive(Clone)]
107pub struct MerkleizedBatch<F: Family, D: Digest, K: Key, V: ValueEncoding, S: Strategy>
108where
109    Operation<F, K, V>: EncodeShared,
110{
111    pub(super) merkle_batch: Arc<batch::MerkleizedBatch<F, D, S>>,
112    pub(super) root: D,
113    pub(super) commit_metadata: Option<V::Value>,
114    pub(super) parent: Option<Weak<Self>>,
115    pub(super) bounds: batch_chain::Bounds<F>,
116    pub(super) _key: PhantomData<K>,
117}
118
119impl<F: Family, D: Digest, K: Key, V: ValueEncoding, S: Strategy> MerkleizedBatch<F, D, K, V, S>
120where
121    Operation<F, K, V>: EncodeShared,
122{
123    pub(super) fn ancestors(&self) -> impl Iterator<Item = Arc<Self>> {
124        batch_chain::ancestors(self.parent.clone(), |batch| batch.parent.as_ref())
125    }
126
127    /// Return the root digest after this batch is applied.
128    pub const fn root(&self) -> D {
129        self.root
130    }
131
132    /// Return the [`Bounds`] of the batch.
133    pub const fn bounds(&self) -> &Bounds<F> {
134        &self.bounds
135    }
136
137    /// Create a new speculative batch with this one as its parent.
138    pub fn new_batch<H>(self: &Arc<Self>) -> UnmerkleizedBatch<F, H, K, V, S>
139    where
140        H: Hasher<Digest = D>,
141    {
142        UnmerkleizedBatch {
143            merkle_batch: compact_merkle::UnmerkleizedBatch::wrap(self.merkle_batch.new_batch()),
144            mutations: BTreeMap::new(),
145            parent: Some(Arc::clone(self)),
146            base_size: self.bounds.total_size,
147            db_size: self.bounds.db_size,
148        }
149    }
150}
151
152impl<F, H, K, V, S> UnmerkleizedBatch<F, H, K, V, S>
153where
154    F: Family,
155    K: Key,
156    V: ValueEncoding,
157    H: Hasher,
158    S: Strategy,
159    Operation<F, K, V>: EncodeShared,
160{
161    pub(super) fn new<E, C>(db: &Db<F, E, K, V, H, C, S>, committed_size: u64) -> Self
162    where
163        E: Context,
164        C: Clone + Send + Sync + 'static,
165        Operation<F, K, V>: Read<Cfg = C>,
166    {
167        Self {
168            merkle_batch: db.merkle.new_batch(),
169            mutations: BTreeMap::new(),
170            parent: None,
171            base_size: committed_size,
172            db_size: committed_size,
173        }
174    }
175
176    pub fn set(mut self, key: K, value: V::Value) -> Self {
177        self.mutations.insert(key, value);
178        self
179    }
180
181    /// Resolve mutations into operations, merkleize, and return an `Arc<MerkleizedBatch>`.
182    ///
183    /// `inactivity_floor` is threaded through the commit operation for wire-format parity with
184    /// [`crate::qmdb::immutable::Immutable`]. It must be >= the database's current floor
185    /// (monotonically non-decreasing) and at most the batch's commit location
186    /// (`total_size - 1`); these bounds are validated, but the floor does not drive any local
187    /// pruning or retention in this variant.
188    pub fn merkleize<E, C>(
189        self,
190        db: &Db<F, E, K, V, H, C, S>,
191        metadata: Option<V::Value>,
192        inactivity_floor: Location<F>,
193    ) -> Arc<MerkleizedBatch<F, H::Digest, K, V, S>>
194    where
195        F: Family,
196        E: Context,
197        C: Clone + Send + Sync + 'static,
198        Operation<F, K, V>: Read<Cfg = C>,
199    {
200        let mut ops: Vec<Operation<F, K, V>> = Vec::with_capacity(self.mutations.len() + 1);
201        for (key, value) in self.mutations {
202            ops.push(Operation::Set(key, value));
203        }
204        ops.push(Operation::Commit(metadata.clone(), inactivity_floor));
205
206        let total_size = self.base_size + ops.len() as u64;
207        let merkle = compact_batch::merkleize_ops::<F, E, H, S, _>(
208            &db.merkle,
209            self.merkle_batch,
210            ops.as_slice(),
211        );
212
213        let inactive_peaks = F::inactive_peaks(
214            F::location_to_position(Location::new(total_size)),
215            inactivity_floor,
216        );
217        let hasher = qmdb::hasher::<H>();
218        let root = db
219            .merkle
220            .with_mem(|mem| merkle.root(mem, &hasher, inactive_peaks))
221            .expect("inactive_peaks computed from batch size");
222
223        let ancestors =
224            batch_chain::parent_and_ancestors(self.parent.as_ref(), |parent| parent.ancestors());
225        let ancestors = batch_chain::collect_ancestor_bounds(
226            ancestors,
227            |batch| batch.bounds.inactivity_floor,
228            |batch| batch.bounds.total_size,
229        );
230
231        Arc::new(MerkleizedBatch {
232            merkle_batch: merkle,
233            root,
234            commit_metadata: metadata,
235            parent: self.parent.as_ref().map(Arc::downgrade),
236            bounds: batch_chain::Bounds {
237                base_size: self.base_size,
238                db_size: self.db_size,
239                total_size,
240                ancestors,
241                inactivity_floor,
242            },
243            _key: PhantomData,
244        })
245    }
246}
247
248impl<F, E, K, V, H, C, S: Strategy> Db<F, E, K, V, H, C, S>
249where
250    F: Family,
251    E: Context,
252    K: Key,
253    V: ValueEncoding,
254    H: Hasher,
255    Operation<F, K, V>: EncodeShared,
256    Operation<F, K, V>: Read<Cfg = C>,
257    C: Clone + Send + Sync + 'static,
258{
259    fn encode_commit_op(metadata: Option<V::Value>, inactivity_floor_loc: Location<F>) -> Vec<u8> {
260        Operation::<F, K, V>::Commit(metadata, inactivity_floor_loc)
261            .encode()
262            .to_vec()
263    }
264
265    async fn load_active_witness(
266        merkle: &compact_merkle::Merkle<F, E, H::Digest, S>,
267        commit_codec_config: &C,
268    ) -> Result<(ServeState<F, H::Digest>, Operation<F, K, V>), Error<F>> {
269        witness::load_active_witness::<F, E, H, S, _, Operation<F, K, V>, _>(
270            merkle,
271            commit_codec_config,
272            Operation::has_floor,
273        )
274        .await
275    }
276
277    /// Build a compact db handle from already-verified compact state.
278    ///
279    /// The caller has reconstructed the compact Merkle in memory and already authenticated the
280    /// supplied witness/root pair. This seeds the in-memory witness cache from that verified witness
281    /// but does not itself persist anything; persistence happens only after the caller finishes the
282    /// root check for the reconstructed db.
283    #[allow(clippy::too_many_arguments)]
284    pub(crate) fn init_from_verified_state(
285        merkle: compact_merkle::Merkle<F, E, H::Digest, S>,
286        commit_codec_config: C,
287        last_commit_metadata: Option<V::Value>,
288        inactivity_floor_loc: Location<F>,
289        root: H::Digest,
290        last_commit_op_bytes: Vec<u8>,
291        last_commit_proof: Proof<F, H::Digest>,
292        pinned_nodes: Vec<H::Digest>,
293    ) -> Result<Self, Error<F>> {
294        let (last_commit_loc, witness) = witness::witness_from_authenticated_state(
295            &merkle,
296            root,
297            inactivity_floor_loc,
298            last_commit_op_bytes,
299            last_commit_proof,
300            pinned_nodes,
301        )?;
302
303        Ok(Self {
304            merkle,
305            last_commit_loc,
306            last_commit_metadata,
307            inactivity_floor_loc,
308            commit_codec_config,
309            witness: witness::Cache::new(witness),
310            _key: PhantomData,
311        })
312    }
313
314    /// Open a compact db from persisted compact state and rebuild its witness cache.
315    ///
316    /// On first open, this bootstraps the initial commit and its witness so every later reopen and
317    /// rewind can assume "the active slot has a complete compact witness".
318    pub(crate) async fn init_from_merkle(
319        mut merkle: compact_merkle::Merkle<F, E, H::Digest, S>,
320        commit_codec_config: C,
321    ) -> Result<Self, Error<F>>
322    where
323        F: Family,
324        Operation<F, K, V>: Read<Cfg = C>,
325    {
326        // Bootstrap: append an initial Commit(None, 0) on first open. This establishes the
327        // invariant that every merkleized batch ends with a Commit op, so `last_commit_loc =
328        // leaves - 1` is always correct without replaying the log (which we can't, since we
329        // don't retain it).
330        //
331        // We also persist that initial commit's witness immediately so every later reopen or
332        // rewind can uniformly assume "the active slot has a current tip witness".
333        if merkle.leaves() == 0 {
334            witness::bootstrap_initial_commit::<F, E, H, S>(
335                &mut merkle,
336                Operation::<F, K, V>::Commit(None, Location::new(0))
337                    .encode()
338                    .to_vec(),
339            )
340            .await?;
341        }
342
343        let (witness, last_commit_op) =
344            Self::load_active_witness(&merkle, &commit_codec_config).await?;
345        let Operation::Commit(last_commit_metadata, inactivity_floor_loc) = last_commit_op else {
346            return Err(Error::DataCorrupted("last operation was not a commit"));
347        };
348
349        Self::init_from_verified_state(
350            merkle,
351            commit_codec_config,
352            last_commit_metadata,
353            inactivity_floor_loc,
354            witness.root,
355            witness.last_commit_op_bytes,
356            witness.last_commit_proof,
357            witness.pinned_nodes,
358        )
359    }
360
361    /// Return the root of the db.
362    pub fn root(&self) -> H::Digest
363    where
364        F: Family,
365    {
366        let hasher = qmdb::hasher::<H>();
367        let inactive_peaks = F::inactive_peaks(
368            F::location_to_position(Location::new(*self.last_commit_loc + 1)),
369            self.inactivity_floor_loc,
370        );
371        self.merkle
372            .root(&hasher, inactive_peaks)
373            .expect("compact Merkle root should not fail")
374    }
375
376    /// Return a reference to the merkleization strategy.
377    pub const fn strategy(&self) -> &S {
378        self.merkle.strategy()
379    }
380
381    /// Return the location of the last commit.
382    pub const fn last_commit_loc(&self) -> Location<F> {
383        self.last_commit_loc
384    }
385
386    /// Return the inactivity floor declared by the last committed batch.
387    pub const fn inactivity_floor_loc(&self) -> Location<F> {
388        self.inactivity_floor_loc
389    }
390
391    /// Return the location of the next operation appended to this db.
392    pub fn size(&self) -> Location<F> {
393        Location::new(*self.last_commit_loc + 1)
394    }
395
396    /// Get the metadata associated with the last commit.
397    pub fn get_metadata(&self) -> Option<V::Value> {
398        self.last_commit_metadata.clone()
399    }
400
401    /// Return the compact-sync target described by the current witness.
402    ///
403    /// This reflects the last state for which both frontier and witness were durably captured,
404    /// which may lag behind live in-memory mutations until [`Self::sync`] is called.
405    pub fn current_target(&self) -> compact_sync::Target<F, H::Digest> {
406        self.witness.with(ServeState::target)
407    }
408
409    /// Return the compact-sync state for `target`, or a stale-target error if the source's
410    /// current witness no longer matches.
411    ///
412    /// The witness lock is held only long enough to verify the requested target and snapshot
413    /// the bytes, proof, and pinned nodes needed for [`compact_sync::State`]. Decoding the
414    /// commit operation runs outside the lock so concurrent readers do not contend on it.
415    pub(crate) fn compact_state(
416        &self,
417        target: compact_sync::Target<F, H::Digest>,
418    ) -> CompactStateResult<F, K, V, H::Digest>
419    where
420        Operation<F, K, V>: Read<Cfg = C>,
421    {
422        let (op_bytes, last_commit_proof, pinned_nodes, leaf_count) = self.witness.with(|w| {
423            if target.root != w.root || target.leaf_count != w.leaf_count {
424                return Err(compact_sync::ServeError::StaleTarget {
425                    requested: target.clone(),
426                    current: w.target(),
427                });
428            }
429            Ok((
430                w.last_commit_op_bytes.clone(),
431                w.last_commit_proof.clone(),
432                w.pinned_nodes.clone(),
433                w.leaf_count,
434            ))
435        })?;
436        let op = Operation::<F, K, V>::decode_cfg(op_bytes.as_ref(), &self.commit_codec_config)
437            .map_err(|_| {
438                compact_sync::ServeError::Database(Error::DataCorrupted("invalid commit operation"))
439            })?;
440        if !matches!(&op, Operation::Commit(_, _)) {
441            return Err(compact_sync::ServeError::Database(Error::DataCorrupted(
442                "last operation was not a commit",
443            )));
444        }
445        Ok(compact_sync::State {
446            leaf_count,
447            pinned_nodes,
448            last_commit_op: op,
449            last_commit_proof,
450        })
451    }
452
453    /// Create a new speculative batch of operations with this database as its parent.
454    pub fn new_batch(&self) -> UnmerkleizedBatch<F, H, K, V, S> {
455        let committed_size = *self.last_commit_loc + 1;
456        UnmerkleizedBatch::new(self, committed_size)
457    }
458
459    /// Create an owned merkleized batch representing the current committed state.
460    pub fn to_batch(&self) -> Arc<MerkleizedBatch<F, H::Digest, K, V, S>>
461    where
462        F: Family,
463    {
464        let committed_size = *self.last_commit_loc + 1;
465        Arc::new(MerkleizedBatch {
466            merkle_batch: self.merkle.to_batch(),
467            root: self.root(),
468            commit_metadata: self.last_commit_metadata.clone(),
469            parent: None,
470            bounds: batch_chain::Bounds {
471                base_size: committed_size,
472                db_size: committed_size,
473                total_size: committed_size,
474                ancestors: Vec::new(),
475                inactivity_floor: self.inactivity_floor_loc,
476            },
477            _key: PhantomData,
478        })
479    }
480
481    /// Apply a merkleized batch to the database.
482    ///
483    /// Returns the range of locations written. The state is updated in memory only; call
484    /// [`Self::sync`] or [`Self::commit`] to persist.
485    ///
486    /// # Errors
487    ///
488    /// - [`Error::StaleBatch`] if the batch was created from a stale DB state.
489    /// - [`Error::FloorRegressed`] if any unapplied commit's floor is below the running floor
490    ///   (walking ancestors oldest-first, then the tip).
491    /// - [`Error::FloorBeyondSize`] if any unapplied commit's floor exceeds its own commit
492    ///   location.
493    pub fn apply_batch(
494        &mut self,
495        batch: Arc<MerkleizedBatch<F, H::Digest, K, V, S>>,
496    ) -> Result<core::ops::Range<Location<F>>, Error<F>> {
497        let db_size = *self.last_commit_loc + 1;
498        batch
499            .bounds
500            .validate_apply_to(db_size, self.inactivity_floor_loc)?;
501
502        let start_loc = self.last_commit_loc + 1;
503        self.merkle.apply_batch(&batch.merkle_batch)?;
504        self.last_commit_loc = Location::new(batch.bounds.total_size - 1);
505        self.last_commit_metadata = batch.commit_metadata.clone();
506        self.inactivity_floor_loc = batch.bounds.inactivity_floor;
507        Ok(start_loc..Location::new(batch.bounds.total_size))
508    }
509
510    /// Durably persist the current db state to disk.
511    ///
512    /// This is the point at which in-memory mutations become servable via compact sync. The compact
513    /// Merkle frontier and last-commit witness are written into the same slot, reusing the cached
514    /// witness when the current state has already been persisted.
515    pub async fn sync(&self) -> Result<(), Error<F>> {
516        witness::persist_witness::<F, E, H, S>(
517            &self.merkle,
518            &self.witness,
519            self.last_commit_loc,
520            self.inactivity_floor_loc,
521            Self::encode_commit_op(self.last_commit_metadata.clone(), self.inactivity_floor_loc),
522        )
523        .await
524    }
525
526    /// Durably persist the current db state to disk (alias for [`Self::sync`]).
527    pub async fn commit(&self) -> Result<(), Error<F>>
528    where
529        F: Family,
530    {
531        self.sync().await
532    }
533
534    /// Restore the state as of the sync before the most recent one.
535    ///
536    /// Discards any uncommitted batches, flips the db back to the previous persisted state,
537    /// and reloads the cached commit metadata and inactivity floor from that slot.
538    ///
539    /// Callers must drop any [`Arc<MerkleizedBatch>`] merkleized against state that this rewind
540    /// discards. [`Self::apply_batch`] validates batches by size only: a discarded-branch batch
541    /// will usually trip the size-mismatch check, but if the db later regrows to the same size
542    /// along an alternate branch, the stale batch becomes admissible again and applying it will
543    /// corrupt the committed root. Batches merkleized against the state this rewind restores to
544    /// (for example, a batch built before an advance that is then discarded by the rewind)
545    /// remain compatible and apply cleanly.
546    ///
547    /// # Errors
548    ///
549    /// Returns [`crate::merkle::Error::RewindBeyondHistory`] (wrapped as [`Error::Merkle`]) if
550    /// no prior state exists — either no sync has occurred yet, or the previous state was
551    /// already consumed by a rewind with no intervening sync.
552    ///
553    /// Any error from this method is fatal for this handle. The Merkle layer may have already
554    /// flipped its generation pointer and rebuilt its in-memory state before a later step (e.g.
555    /// reloading the cached commit metadata or inactivity floor) fails, leaving this `Db`'s
556    /// in-memory fields out of sync with the persisted slot. Callers must drop this handle
557    /// after any `Err` from `rewind` and reopen from storage.
558    pub async fn rewind(&mut self) -> Result<(), Error<F>>
559    where
560        F: Family,
561    {
562        self.merkle.rewind().await?;
563        // Reload the witness from the reverted slot as well, so compact serving stays aligned with
564        // the same frontier/root that `rewind` restored.
565        let (witness, last_commit_op) =
566            Self::load_active_witness(&self.merkle, &self.commit_codec_config).await?;
567        let Operation::Commit(last_commit_metadata, inactivity_floor_loc) = last_commit_op else {
568            return Err(Error::DataCorrupted("last operation was not a commit"));
569        };
570        self.last_commit_metadata = last_commit_metadata;
571        self.inactivity_floor_loc = inactivity_floor_loc;
572        self.last_commit_loc = Location::new(*witness.leaf_count - 1);
573        self.witness.replace(witness);
574        Ok(())
575    }
576
577    /// Destroy all persisted state associated with this database.
578    pub async fn destroy(self) -> Result<(), Error<F>> {
579        self.merkle.destroy().await.map_err(Into::into)
580    }
581
582    pub(crate) async fn persist_cached_witness(&self) -> Result<(), Error<F>> {
583        witness::persist_cached_witness::<F, E, H, S>(&self.merkle, &self.witness).await
584    }
585}
586
587#[cfg(test)]
588mod tests {
589    use super::*;
590    use crate::{
591        merkle::mmr,
592        metadata::{Config as MConfig, Metadata},
593        qmdb::any::value::FixedEncoding,
594    };
595    use commonware_cryptography::{sha256::Digest, Sha256};
596    use commonware_macros::test_traced;
597    use commonware_parallel::Sequential;
598    use commonware_runtime::{deterministic, Runner as _, Supervisor as _};
599    use commonware_utils::sequence::prefixed_u64::U64 as MetadataKey;
600
601    type TestDb<F> =
602        Db<F, deterministic::Context, Digest, FixedEncoding<Digest>, Sha256, (), Sequential>;
603
604    async fn open_db<F: Family>(context: deterministic::Context, partition: &str) -> TestDb<F> {
605        let merkle = crate::merkle::compact::Merkle::init(
606            context,
607            crate::merkle::compact::Config {
608                partition: partition.into(),
609                strategy: Sequential,
610            },
611        )
612        .await
613        .unwrap();
614        Db::init_from_merkle(merkle, ()).await.unwrap()
615    }
616
617    async fn tamper_metadata_key(
618        context: deterministic::Context,
619        partition: &str,
620        key: MetadataKey,
621    ) {
622        let mut metadata = open_metadata(context, partition).await;
623        let mut bytes = metadata.get(&key).cloned().expect("metadata entry missing");
624        *bytes.last_mut().expect("metadata entry empty") ^= 0x01;
625        metadata.put_sync(key, bytes).await.unwrap();
626    }
627
628    async fn open_metadata(
629        context: deterministic::Context,
630        partition: &str,
631    ) -> Metadata<deterministic::Context, MetadataKey, Vec<u8>> {
632        Metadata::<_, MetadataKey, Vec<u8>>::init(
633            context.child("meta_write"),
634            MConfig {
635                partition: partition.into(),
636                codec_config: ((0..).into(), ()),
637            },
638        )
639        .await
640        .unwrap()
641    }
642
643    async fn overwrite_metadata_key(
644        context: deterministic::Context,
645        partition: &str,
646        key: MetadataKey,
647        bytes: Vec<u8>,
648    ) {
649        let mut metadata = open_metadata(context, partition).await;
650        metadata.put_sync(key, bytes).await.unwrap();
651    }
652
653    #[test_traced("INFO")]
654    fn test_compact_stale_batch_rejected() {
655        deterministic::Runner::default().start(|context| async move {
656            let mut db = open_db::<mmr::Family>(context.child("db"), "immutable-stale").await;
657
658            let key1 = Sha256::hash(&[1]);
659            let key2 = Sha256::hash(&[2]);
660            let value1 = Sha256::fill(10u8);
661            let value2 = Sha256::fill(20u8);
662
663            let batch_a = db
664                .new_batch()
665                .set(key1, value1)
666                .merkleize(&db, None, Location::new(0));
667            let batch_b = db
668                .new_batch()
669                .set(key2, value2)
670                .merkleize(&db, None, Location::new(0));
671
672            let expected_root = batch_a.root();
673            db.apply_batch(batch_a).unwrap();
674            assert_eq!(db.root(), expected_root);
675            assert!(matches!(
676                db.apply_batch(batch_b),
677                Err(Error::StaleBatch { .. })
678            ));
679
680            db.destroy().await.unwrap();
681        });
682    }
683
684    /// Regression: `to_batch()` must reflect the live in-memory state, not the lagging durable
685    /// serve-state cache. Compact dbs intentionally keep the serve-state cache behind unsynced
686    /// mutations, so a snapshot built without `sync()` / `commit()` between
687    /// `apply_batch()` and `to_batch()` previously bound its cached root to the stale serve
688    /// state.
689    #[test_traced("INFO")]
690    fn test_compact_to_batch_reflects_live_state() {
691        deterministic::Runner::default().start(|context| async move {
692            let mut db =
693                open_db::<mmr::Family>(context.child("db"), "immutable-to-batch-live").await;
694
695            let pre_apply_root = db.root();
696            let pre_snapshot = db.to_batch();
697            assert_eq!(
698                pre_snapshot.root(),
699                pre_apply_root,
700                "snapshot before any mutation should match the live root"
701            );
702
703            let key = Sha256::hash(&[1]);
704            let value = Sha256::fill(10u8);
705            db.apply_batch(
706                db.new_batch()
707                    .set(key, value)
708                    .merkleize(&db, None, Location::new(0)),
709            )
710            .unwrap();
711
712            // Deliberately skip `sync()` / `commit()` so the durable serve-state cache lags the
713            // live merkle state.
714            let live_root = db.root();
715            assert_ne!(
716                live_root, pre_apply_root,
717                "applying a non-empty batch must change the live root"
718            );
719
720            let snapshot = db.to_batch();
721            assert_eq!(
722                snapshot.root(),
723                live_root,
724                "to_batch().root() must match the live db.root() even before sync/commit"
725            );
726
727            db.destroy().await.unwrap();
728        });
729    }
730
731    #[test_traced("INFO")]
732    fn test_compact_stale_batch_chained() {
733        deterministic::Runner::default().start(|context| async move {
734            let mut db =
735                open_db::<mmr::Family>(context.child("db"), "immutable-chained-stale").await;
736
737            let parent = db
738                .new_batch()
739                .set(Sha256::hash(&[1]), Sha256::fill(1u8))
740                .merkleize(&db, None, Location::new(0));
741            let child_a = parent
742                .new_batch::<Sha256>()
743                .set(Sha256::hash(&[2]), Sha256::fill(2u8))
744                .merkleize(&db, None, Location::new(0));
745            let child_b = parent
746                .new_batch::<Sha256>()
747                .set(Sha256::hash(&[3]), Sha256::fill(3u8))
748                .merkleize(&db, None, Location::new(0));
749
750            db.apply_batch(child_a).unwrap();
751            assert!(matches!(
752                db.apply_batch(child_b),
753                Err(Error::StaleBatch { .. })
754            ));
755
756            db.destroy().await.unwrap();
757        });
758    }
759
760    #[test_traced("INFO")]
761    fn test_compact_stale_parent_after_child_applied() {
762        deterministic::Runner::default().start(|context| async move {
763            let mut db =
764                open_db::<mmr::Family>(context.child("db"), "immutable-child-before-parent").await;
765
766            let parent = db
767                .new_batch()
768                .set(Sha256::hash(&[1]), Sha256::fill(1u8))
769                .merkleize(&db, None, Location::new(0));
770            let child = parent
771                .new_batch::<Sha256>()
772                .set(Sha256::hash(&[2]), Sha256::fill(2u8))
773                .merkleize(&db, None, Location::new(0));
774
775            db.apply_batch(child).unwrap();
776            assert!(matches!(
777                db.apply_batch(parent),
778                Err(Error::StaleBatch { .. })
779            ));
780
781            db.destroy().await.unwrap();
782        });
783    }
784
785    #[test_traced("INFO")]
786    fn test_compact_sequential_commit_parent_then_child() {
787        deterministic::Runner::default().start(|context| async move {
788            let mut db =
789                open_db::<mmr::Family>(context.child("db"), "immutable-parent-child").await;
790
791            let parent = db
792                .new_batch()
793                .set(Sha256::hash(&[1]), Sha256::fill(1u8))
794                .merkleize(&db, None, Location::new(0));
795            let child = parent
796                .new_batch::<Sha256>()
797                .set(Sha256::hash(&[2]), Sha256::fill(2u8))
798                .merkleize(&db, None, Location::new(0));
799            let expected_root = child.root();
800
801            db.apply_batch(parent).unwrap();
802            db.apply_batch(child).unwrap();
803            db.commit().await.unwrap();
804
805            assert_eq!(db.root(), expected_root);
806
807            db.destroy().await.unwrap();
808        });
809    }
810
811    #[test_traced("INFO")]
812    fn test_compact_floor_regressed() {
813        deterministic::Runner::default().start(|context| async move {
814            let mut db =
815                open_db::<mmr::Family>(context.child("db"), "immutable-floor-regressed").await;
816
817            let advance_floor = db.new_batch().set(Sha256::hash(&[1]), Sha256::fill(1u8));
818            let advance_floor = advance_floor.merkleize(&db, None, Location::new(1));
819            db.apply_batch(advance_floor).unwrap();
820
821            let regressed = db
822                .new_batch()
823                .set(Sha256::hash(&[2]), Sha256::fill(2u8))
824                .merkleize(&db, None, Location::new(0));
825
826            assert!(matches!(
827                db.apply_batch(regressed),
828                Err(Error::FloorRegressed(new, current))
829                    if new == Location::new(0) && current == Location::new(1)
830            ));
831
832            db.destroy().await.unwrap();
833        });
834    }
835
836    #[test_traced("INFO")]
837    fn test_compact_rejects_regressed_ancestor_floor() {
838        deterministic::Runner::default().start(|context| async move {
839            let mut db =
840                open_db::<mmr::Family>(context.child("db"), "immutable-regressed-ancestor-floor")
841                    .await;
842
843            let parent = db
844                .new_batch()
845                .set(Sha256::hash(&[1]), Sha256::fill(1u8))
846                .merkleize(&db, None, Location::new(1));
847            let child = parent
848                .new_batch::<Sha256>()
849                .set(Sha256::hash(&[2]), Sha256::fill(2u8))
850                .merkleize(&db, None, Location::new(0));
851
852            assert!(matches!(
853                db.apply_batch(child),
854                Err(Error::FloorRegressed(new, prev))
855                    if new == Location::new(0) && prev == Location::new(1)
856            ));
857
858            db.destroy().await.unwrap();
859        });
860    }
861
862    #[test_traced("INFO")]
863    fn test_compact_rewind_restores_commit_metadata_and_floor() {
864        deterministic::Runner::default().start(|context| async move {
865            let mut db = open_db::<mmr::Family>(context.child("db"), "immutable-rewind-meta").await;
866
867            let k1 = Sha256::hash(&[1]);
868            let v1 = Sha256::fill(11u8);
869            let meta1 = Sha256::fill(0xaa);
870            let floor1 = Location::new(0);
871            db.apply_batch(
872                db.new_batch()
873                    .set(k1, v1)
874                    .merkleize(&db, Some(meta1), floor1),
875            )
876            .unwrap();
877            db.commit().await.unwrap();
878            let root_after_first = db.root();
879
880            let k2 = Sha256::hash(&[2]);
881            let v2 = Sha256::fill(22u8);
882            let meta2 = Sha256::fill(0xbb);
883            // Advance the floor to the commit of the first batch (loc 1).
884            let floor2 = Location::new(1);
885            db.apply_batch(
886                db.new_batch()
887                    .set(k2, v2)
888                    .merkleize(&db, Some(meta2), floor2),
889            )
890            .unwrap();
891            db.commit().await.unwrap();
892            assert_eq!(db.get_metadata(), Some(meta2));
893            assert_eq!(db.inactivity_floor_loc(), floor2);
894
895            db.rewind().await.unwrap();
896            assert_eq!(db.root(), root_after_first);
897            assert_eq!(db.get_metadata(), Some(meta1));
898            assert_eq!(db.inactivity_floor_loc(), floor1);
899
900            db.destroy().await.unwrap();
901        });
902    }
903
904    #[test_traced("INFO")]
905    fn test_compact_rewind_persists_across_reopen() {
906        deterministic::Runner::default().start(|context| async move {
907            let partition = "immutable-rewind-reopen";
908            let meta1 = Sha256::fill(0xaa);
909            let floor1 = Location::new(0);
910            let meta2 = Sha256::fill(0xbb);
911            let floor2 = Location::new(1);
912
913            let root_after_first = {
914                let mut db = open_db::<mmr::Family>(context.child("first"), partition).await;
915                db.apply_batch(
916                    db.new_batch()
917                        .set(Sha256::hash(&[1]), Sha256::fill(11u8))
918                        .merkleize(&db, Some(meta1), floor1),
919                )
920                .unwrap();
921                db.commit().await.unwrap();
922                let root = db.root();
923
924                db.apply_batch(
925                    db.new_batch()
926                        .set(Sha256::hash(&[2]), Sha256::fill(22u8))
927                        .merkleize(&db, Some(meta2), floor2),
928                )
929                .unwrap();
930                db.commit().await.unwrap();
931
932                db.rewind().await.unwrap();
933                root
934            };
935
936            let db = open_db::<mmr::Family>(context.child("second"), partition).await;
937            assert_eq!(db.root(), root_after_first);
938            assert_eq!(db.get_metadata(), Some(meta1));
939            assert_eq!(db.inactivity_floor_loc(), floor1);
940
941            db.destroy().await.unwrap();
942        });
943    }
944
945    #[test_traced("INFO")]
946    fn test_compact_reopen_rejects_tampered_witness() {
947        deterministic::Runner::default().start(|context| async move {
948            let partition = "immutable-witness-tamper";
949            let mut db = open_db::<mmr::Family>(context.child("db"), partition).await;
950            db.apply_batch(
951                db.new_batch()
952                    .set(Sha256::hash(&[7]), Sha256::fill(7u8))
953                    .merkleize(&db, Some(Sha256::fill(0xaa)), Location::new(1)),
954            )
955            .unwrap();
956            db.commit().await.unwrap();
957            let slot = db.merkle.active_slot();
958            drop(db);
959
960            tamper_metadata_key(
961                context.child("tamper"),
962                partition,
963                crate::qmdb::compact::witness::last_commit_proof_key(slot),
964            )
965            .await;
966
967            let merkle: crate::merkle::compact::Merkle<mmr::Family, _, _, Sequential> =
968                crate::merkle::compact::Merkle::init(
969                    context.child("reopen"),
970                    crate::merkle::compact::Config {
971                        partition: partition.into(),
972                        strategy: Sequential,
973                    },
974                )
975                .await
976                .unwrap();
977            let reopened = TestDb::<mmr::Family>::init_from_merkle(merkle, ()).await;
978            assert!(matches!(reopened, Err(Error::DataCorrupted(_))));
979        });
980    }
981
982    #[test_traced("INFO")]
983    fn test_compact_reopen_rejects_commit_floor_beyond_tip() {
984        deterministic::Runner::default().start(|context| async move {
985            let partition = "immutable-invalid-persisted-floor";
986            let mut db = open_db::<mmr::Family>(context.child("db"), partition).await;
987            db.apply_batch(
988                db.new_batch()
989                    .set(Sha256::hash(&[7]), Sha256::fill(7u8))
990                    .merkleize(&db, Some(Sha256::fill(0xaa)), Location::new(1)),
991            )
992            .unwrap();
993            db.commit().await.unwrap();
994            let slot = db.merkle.active_slot();
995            drop(db);
996            let oversized_floor = Location::new(10);
997
998            overwrite_metadata_key(
999                context.child("tamper"),
1000                partition,
1001                crate::qmdb::compact::witness::last_commit_op_key(slot),
1002                Operation::<mmr::Family, Digest, FixedEncoding<Digest>>::Commit(
1003                    Some(Sha256::fill(0xaa)),
1004                    oversized_floor,
1005                )
1006                .encode()
1007                .to_vec(),
1008            )
1009            .await;
1010
1011            let merkle: crate::merkle::compact::Merkle<mmr::Family, _, _, Sequential> =
1012                crate::merkle::compact::Merkle::init(
1013                    context.child("reopen"),
1014                    crate::merkle::compact::Config {
1015                        partition: partition.into(),
1016                        strategy: Sequential,
1017                    },
1018                )
1019                .await
1020                .unwrap();
1021            let reopened = TestDb::<mmr::Family>::init_from_merkle(merkle, ()).await;
1022            assert!(matches!(
1023                reopened,
1024                Err(Error::DataCorrupted("invalid compact witness"))
1025            ));
1026        });
1027    }
1028
1029    #[test_traced("INFO")]
1030    fn test_compact_rewind_beyond_history() {
1031        deterministic::Runner::default().start(|context| async move {
1032            let mut db =
1033                open_db::<mmr::Family>(context.child("db"), "immutable-rewind-beyond").await;
1034            // Bootstrap sync flipped the pointer from the default slot 0 to slot 1; slot 0 is
1035            // still empty, so there is no prior state to rewind to.
1036            assert!(matches!(
1037                db.rewind().await,
1038                Err(Error::Merkle(crate::merkle::Error::RewindBeyondHistory))
1039            ));
1040            db.destroy().await.unwrap();
1041        });
1042    }
1043
1044    #[test_traced("INFO")]
1045    fn test_compact_rewind_preserves_pre_advance_batch() {
1046        deterministic::Runner::default().start(|context| async move {
1047            let mut db = open_db::<mmr::Family>(
1048                context.child("db"),
1049                "immutable-rewind-preserves-pre-advance",
1050            )
1051            .await;
1052
1053            db.apply_batch(
1054                db.new_batch()
1055                    .set(Sha256::hash(&[1]), Sha256::fill(1u8))
1056                    .merkleize(&db, None, Location::new(0)),
1057            )
1058            .unwrap();
1059            db.commit().await.unwrap();
1060
1061            // Merkleize a batch against the post-commit-A state.
1062            let held = db
1063                .new_batch()
1064                .set(Sha256::hash(&[2]), Sha256::fill(2u8))
1065                .merkleize(&db, None, Location::new(0));
1066
1067            // Advance past that state and commit, then rewind back to it.
1068            db.apply_batch(
1069                db.new_batch()
1070                    .set(Sha256::hash(&[3]), Sha256::fill(3u8))
1071                    .merkleize(&db, None, Location::new(0)),
1072            )
1073            .unwrap();
1074            db.commit().await.unwrap();
1075            db.rewind().await.unwrap();
1076
1077            // The rewind restored the state that `held` was merkleized against, so it still
1078            // matches the Merkle size and applies cleanly.
1079            db.apply_batch(held).unwrap();
1080
1081            db.destroy().await.unwrap();
1082        });
1083    }
1084
1085    #[test_traced("INFO")]
1086    fn test_compact_noop_commit_after_commit() {
1087        deterministic::Runner::default().start(|context| async move {
1088            let mut db =
1089                open_db::<mmr::Family>(context.child("db"), "immutable-noop-after-commit").await;
1090
1091            let k1 = Sha256::hash(&[1]);
1092            let v1 = Sha256::fill(11u8);
1093            let k2 = Sha256::hash(&[2]);
1094            let v2 = Sha256::fill(22u8);
1095            db.apply_batch(db.new_batch().set(k1, v1).set(k2, v2).merkleize(
1096                &db,
1097                Some(Sha256::fill(0xaa)),
1098                Location::new(0),
1099            ))
1100            .unwrap();
1101            db.commit().await.unwrap();
1102            let root_after_first = db.root();
1103            let size_after_first = db.size();
1104
1105            db.commit().await.unwrap();
1106            assert_eq!(db.size(), size_after_first);
1107            assert_eq!(db.root(), root_after_first);
1108            assert_eq!(db.current_target().root, db.root());
1109
1110            db.destroy().await.unwrap();
1111        });
1112    }
1113
1114    #[test_traced("INFO")]
1115    fn test_compact_noop_commit_after_reopen() {
1116        deterministic::Runner::default().start(|context| async move {
1117            let partition = "immutable-noop-after-reopen";
1118
1119            let (root_before_drop, size_before_drop) = {
1120                let mut db = open_db::<mmr::Family>(context.child("first"), partition).await;
1121                let k1 = Sha256::hash(&[1]);
1122                let v1 = Sha256::fill(11u8);
1123                let k2 = Sha256::hash(&[2]);
1124                let v2 = Sha256::fill(22u8);
1125                db.apply_batch(db.new_batch().set(k1, v1).set(k2, v2).merkleize(
1126                    &db,
1127                    Some(Sha256::fill(0xaa)),
1128                    Location::new(0),
1129                ))
1130                .unwrap();
1131                db.commit().await.unwrap();
1132                (db.root(), db.size())
1133            };
1134
1135            let db = open_db::<mmr::Family>(context.child("second"), partition).await;
1136            assert_eq!(db.root(), root_before_drop);
1137            assert_eq!(db.size(), size_before_drop);
1138
1139            db.commit().await.unwrap();
1140            assert_eq!(db.size(), size_before_drop);
1141            assert_eq!(db.root(), root_before_drop);
1142            assert_eq!(db.current_target().root, db.root());
1143
1144            db.destroy().await.unwrap();
1145        });
1146    }
1147
1148    #[test_traced("INFO")]
1149    fn test_compact_noop_commit_after_rewind() {
1150        deterministic::Runner::default().start(|context| async move {
1151            let mut db =
1152                open_db::<mmr::Family>(context.child("db"), "immutable-noop-after-rewind").await;
1153
1154            let k1 = Sha256::hash(&[1]);
1155            let v1 = Sha256::fill(11u8);
1156            let k2 = Sha256::hash(&[2]);
1157            let v2 = Sha256::fill(22u8);
1158            db.apply_batch(db.new_batch().set(k1, v1).set(k2, v2).merkleize(
1159                &db,
1160                Some(Sha256::fill(0xaa)),
1161                Location::new(0),
1162            ))
1163            .unwrap();
1164            db.commit().await.unwrap();
1165            let root_after_first = db.root();
1166            let size_after_first = db.size();
1167
1168            let k3 = Sha256::hash(&[3]);
1169            let v3 = Sha256::fill(33u8);
1170            db.apply_batch(db.new_batch().set(k3, v3).merkleize(
1171                &db,
1172                Some(Sha256::fill(0xbb)),
1173                Location::new(1),
1174            ))
1175            .unwrap();
1176            db.commit().await.unwrap();
1177
1178            db.rewind().await.unwrap();
1179            assert_eq!(db.size(), size_after_first);
1180            assert_eq!(db.root(), root_after_first);
1181
1182            db.commit().await.unwrap();
1183            assert_eq!(db.size(), size_after_first);
1184            assert_eq!(db.root(), root_after_first);
1185            assert_eq!(db.current_target().root, db.root());
1186
1187            db.destroy().await.unwrap();
1188        });
1189    }
1190
1191    #[test_traced("INFO")]
1192    fn test_compact_rewind_makes_post_advance_batch_stale() {
1193        deterministic::Runner::default().start(|context| async move {
1194            let mut db =
1195                open_db::<mmr::Family>(context.child("db"), "immutable-rewind-makes-stale").await;
1196
1197            db.apply_batch(
1198                db.new_batch()
1199                    .set(Sha256::hash(&[1]), Sha256::fill(1u8))
1200                    .merkleize(&db, None, Location::new(0)),
1201            )
1202            .unwrap();
1203            db.commit().await.unwrap();
1204
1205            db.apply_batch(
1206                db.new_batch()
1207                    .set(Sha256::hash(&[2]), Sha256::fill(2u8))
1208                    .merkleize(&db, None, Location::new(0)),
1209            )
1210            .unwrap();
1211            db.commit().await.unwrap();
1212
1213            // Merkleize a batch against the post-commit-B state, which the rewind will discard.
1214            let held = db
1215                .new_batch()
1216                .set(Sha256::hash(&[3]), Sha256::fill(3u8))
1217                .merkleize(&db, None, Location::new(0));
1218
1219            db.rewind().await.unwrap();
1220
1221            // After rewind, mem.size reflects post-commit-A, but the held batch starts after
1222            // post-commit-B. Apply must be rejected with StaleBatch.
1223            assert!(matches!(
1224                db.apply_batch(held),
1225                Err(Error::StaleBatch { .. })
1226            ));
1227
1228            db.destroy().await.unwrap();
1229        });
1230    }
1231
1232    #[test_traced("INFO")]
1233    fn test_witness_state_reports_cached_commit_corruption() {
1234        deterministic::Runner::default().start(|context| async move {
1235            let db =
1236                open_db::<mmr::Family>(context.child("db"), "immutable-serve-corruption").await;
1237            let target = db.current_target();
1238            db.witness
1239                .mutate(|witness| witness.last_commit_op_bytes.clear());
1240
1241            assert!(matches!(
1242                db.compact_state(target),
1243                Err(compact_sync::ServeError::Database(Error::DataCorrupted(
1244                    "invalid commit operation"
1245                )))
1246            ));
1247
1248            db.destroy().await.unwrap();
1249        });
1250    }
1251
1252    #[test_traced("INFO")]
1253    fn test_compact_floor_beyond_size() {
1254        deterministic::Runner::default().start(|context| async move {
1255            let mut db =
1256                open_db::<mmr::Family>(context.child("db"), "immutable-floor-beyond").await;
1257
1258            let batch = db.new_batch().merkleize(&db, None, Location::new(2));
1259
1260            assert!(matches!(
1261                db.apply_batch(batch),
1262                Err(Error::FloorBeyondSize(floor, tip))
1263                    if floor == Location::new(2) && tip == Location::new(1)
1264            ));
1265
1266            db.destroy().await.unwrap();
1267        });
1268    }
1269}