Skip to main content

commonware_storage/qmdb/keyless/
compact.rs

1//! A keyless authenticated db that does not retain historical operations after sync.
2//!
3//! Mirrors the API of [`crate::qmdb::keyless::Keyless`] (`new_batch -> merkleize ->
4//! apply_batch -> sync`, pipelined batch chains, `StaleBatch` validation) but is backed by
5//! the peak-only [`crate::merkle::compact`]. Because history is discarded, there are no
6//! `get` / `proof` / `bounds` methods; use the full variant if you need them.
7//!
8//! # Compact serving witness
9//!
10//! On every durable sync, this db persists the encoded last-commit operation together with its
11//! inclusion proof against the current root. Reopen and rewind re-verify that proof; corruption
12//! surfaces as [`Error::DataCorrupted`]. This authenticated witness is what lets compact nodes
13//! serve compact sync without retaining historical operations.
14//!
15//! # Inactivity floor
16//!
17//! Commits still carry an inactivity floor, but only for wire-format compatibility with
18//! [`crate::qmdb::keyless::Keyless`]: the root is computed over the encoded operation
19//! sequence, and that sequence must include the same floor to produce the same root as the
20//! full variant. Here the floor has no effect on pruning or snapshot rebuilding. All
21//! historical in-memory state is discarded on every `sync`.
22
23use super::operation::Operation;
24use crate::{
25    merkle::{batch, compact as compact_merkle, Family, Location, Proof},
26    qmdb::{
27        self,
28        any::value::ValueEncoding,
29        batch_chain::{self, Bounds},
30        compact::{
31            batch as compact_batch,
32            witness::{self, ServeState},
33        },
34        sync::compact as compact_sync,
35        Error,
36    },
37    Context,
38};
39use commonware_codec::{Decode as _, Encode, EncodeShared, Read};
40use commonware_cryptography::{Digest, Hasher};
41use commonware_parallel::Strategy;
42use std::sync::{Arc, Weak};
43
44/// Configuration for a compact keyless authenticated db.
45#[derive(Clone)]
46pub struct Config<C, S: Strategy> {
47    /// Configuration for the backing compact Merkle structure.
48    pub merkle: compact_merkle::Config<S>,
49
50    /// Codec config used to decode the persisted last commit operation on reopen.
51    pub commit_codec_config: C,
52}
53
54/// A keyless authenticated db that does not retain historical operations after sync.
55pub struct Db<F, E, V, H, C, S: Strategy>
56where
57    F: Family,
58    E: Context,
59    V: ValueEncoding,
60    H: Hasher,
61    Operation<F, V>: EncodeShared,
62    Operation<F, V>: Read<Cfg = C>,
63    C: Clone + Send + Sync + 'static,
64{
65    merkle: compact_merkle::Merkle<F, E, H::Digest, S>,
66    last_commit_loc: Location<F>,
67    last_commit_metadata: Option<V::Value>,
68    inactivity_floor_loc: Location<F>,
69    commit_codec_config: C,
70    /// Cache of the last durably persisted compact witness.
71    ///
72    /// This cache is rebuilt from persisted witness bytes on reopen/rewind and refreshed on
73    /// [`Self::sync`]. It intentionally does not track unsynced in-memory mutations, so compact
74    /// serving never advertises state that has not been durably persisted.
75    witness: witness::Cache<F, H::Digest>,
76}
77
78type CompactStateResult<F, V, D> =
79    Result<compact_sync::State<F, Operation<F, V>, D>, compact_sync::ServeError<F, D>>;
80
81/// A speculative batch for a compact keyless db.
82#[allow(clippy::type_complexity)]
83pub struct UnmerkleizedBatch<F, H, V, S: Strategy>
84where
85    F: Family,
86    V: ValueEncoding,
87    H: Hasher,
88    Operation<F, V>: EncodeShared,
89{
90    merkle_batch: compact_merkle::UnmerkleizedBatch<F, H::Digest, S>,
91    appends: Vec<V::Value>,
92    parent: Option<Arc<MerkleizedBatch<F, H::Digest, V, S>>>,
93    base_size: u64,
94    db_size: u64,
95}
96
97/// A speculative batch whose root digest has been computed.
98#[derive(Clone)]
99pub struct MerkleizedBatch<F: Family, D: Digest, V: ValueEncoding, S: Strategy>
100where
101    Operation<F, V>: EncodeShared,
102{
103    pub(super) merkle_batch: Arc<batch::MerkleizedBatch<F, D, S>>,
104    pub(super) root: D,
105    pub(super) commit_metadata: Option<V::Value>,
106    pub(super) parent: Option<Weak<Self>>,
107    pub(super) bounds: batch_chain::Bounds<F>,
108}
109
110impl<F: Family, D: Digest, V: ValueEncoding, S: Strategy> MerkleizedBatch<F, D, V, S>
111where
112    Operation<F, V>: EncodeShared,
113{
114    pub(super) fn ancestors(&self) -> impl Iterator<Item = Arc<Self>> {
115        batch_chain::ancestors(self.parent.clone(), |batch| batch.parent.as_ref())
116    }
117
118    /// Return the root digest after this batch is applied.
119    pub const fn root(&self) -> D {
120        self.root
121    }
122
123    /// Return the [`Bounds`] of the batch.
124    pub const fn bounds(&self) -> &Bounds<F> {
125        &self.bounds
126    }
127
128    /// Create a new speculative batch with this one as its parent.
129    pub fn new_batch<H>(self: &Arc<Self>) -> UnmerkleizedBatch<F, H, V, S>
130    where
131        H: Hasher<Digest = D>,
132    {
133        UnmerkleizedBatch {
134            merkle_batch: compact_merkle::UnmerkleizedBatch::wrap(self.merkle_batch.new_batch()),
135            appends: Vec::new(),
136            parent: Some(Arc::clone(self)),
137            base_size: self.bounds.total_size,
138            db_size: self.bounds.db_size,
139        }
140    }
141}
142
143impl<F, H, V, S> UnmerkleizedBatch<F, H, V, S>
144where
145    F: Family,
146    V: ValueEncoding,
147    H: Hasher,
148    S: Strategy,
149    Operation<F, V>: EncodeShared,
150{
151    pub(super) fn new<E, C>(db: &Db<F, E, V, H, C, S>, committed_size: u64) -> Self
152    where
153        E: Context,
154        C: Clone + Send + Sync + 'static,
155        Operation<F, V>: Read<Cfg = C>,
156    {
157        Self {
158            merkle_batch: db.merkle.new_batch(),
159            appends: Vec::new(),
160            parent: None,
161            base_size: committed_size,
162            db_size: committed_size,
163        }
164    }
165
166    pub fn append(mut self, value: V::Value) -> Self {
167        self.appends.push(value);
168        self
169    }
170
171    /// Resolve appends into operations, merkleize, and return an `Arc<MerkleizedBatch>`.
172    ///
173    /// `inactivity_floor` is threaded through the commit operation for wire-format parity with
174    /// [`crate::qmdb::keyless::Keyless`]. It must be >= the database's current floor
175    /// (monotonically non-decreasing) and at most the batch's commit location
176    /// (`total_size - 1`); these bounds are validated, but the floor does not drive any local
177    /// pruning or retention in this variant.
178    pub fn merkleize<E, C>(
179        self,
180        db: &Db<F, E, V, H, C, S>,
181        metadata: Option<V::Value>,
182        inactivity_floor: Location<F>,
183    ) -> Arc<MerkleizedBatch<F, H::Digest, V, S>>
184    where
185        F: Family,
186        E: Context,
187        C: Clone + Send + Sync + 'static,
188        Operation<F, V>: Read<Cfg = C>,
189    {
190        let mut ops: Vec<Operation<F, V>> = Vec::with_capacity(self.appends.len() + 1);
191        for value in self.appends {
192            ops.push(Operation::Append(value));
193        }
194        ops.push(Operation::Commit(metadata.clone(), inactivity_floor));
195
196        let total_size = self.base_size + ops.len() as u64;
197        let merkle = compact_batch::merkleize_ops::<F, E, H, S, _>(
198            &db.merkle,
199            self.merkle_batch,
200            ops.as_slice(),
201        );
202
203        let inactive_peaks = F::inactive_peaks(
204            F::location_to_position(Location::new(total_size)),
205            inactivity_floor,
206        );
207        let hasher = qmdb::hasher::<H>();
208        let root = db
209            .merkle
210            .with_mem(|mem| merkle.root(mem, &hasher, inactive_peaks))
211            .expect("inactive_peaks computed from batch size");
212
213        let ancestors =
214            batch_chain::parent_and_ancestors(self.parent.as_ref(), |parent| parent.ancestors());
215        let ancestors = batch_chain::collect_ancestor_bounds(
216            ancestors,
217            |batch| batch.bounds.inactivity_floor,
218            |batch| batch.bounds.total_size,
219        );
220
221        Arc::new(MerkleizedBatch {
222            merkle_batch: merkle,
223            root,
224            commit_metadata: metadata,
225            parent: self.parent.as_ref().map(Arc::downgrade),
226            bounds: batch_chain::Bounds {
227                base_size: self.base_size,
228                db_size: self.db_size,
229                total_size,
230                ancestors,
231                inactivity_floor,
232            },
233        })
234    }
235}
236
237impl<F, E, V, H, C, S> Db<F, E, V, H, C, S>
238where
239    F: Family,
240    E: Context,
241    V: ValueEncoding,
242    H: Hasher,
243    S: Strategy,
244    Operation<F, V>: EncodeShared,
245    Operation<F, V>: Read<Cfg = C>,
246    C: Clone + Send + Sync + 'static,
247{
248    fn encode_commit_op(metadata: Option<V::Value>, inactivity_floor_loc: Location<F>) -> Vec<u8> {
249        Operation::<F, V>::Commit(metadata, inactivity_floor_loc)
250            .encode()
251            .to_vec()
252    }
253
254    async fn load_active_witness(
255        merkle: &compact_merkle::Merkle<F, E, H::Digest, S>,
256        commit_codec_config: &C,
257    ) -> Result<(ServeState<F, H::Digest>, Operation<F, V>), Error<F>> {
258        witness::load_active_witness::<F, E, H, S, _, Operation<F, V>, _>(
259            merkle,
260            commit_codec_config,
261            Operation::has_floor,
262        )
263        .await
264    }
265
266    /// Build a compact db handle from already-verified compact state.
267    ///
268    /// The caller has reconstructed the compact Merkle in memory and already authenticated the
269    /// supplied witness/root pair. This seeds the in-memory witness cache from that verified witness
270    /// but does not itself persist anything; persistence happens only after the caller finishes the
271    /// root check for the reconstructed db.
272    #[allow(clippy::too_many_arguments)]
273    pub(crate) fn init_from_verified_state(
274        merkle: compact_merkle::Merkle<F, E, H::Digest, S>,
275        commit_codec_config: C,
276        last_commit_metadata: Option<V::Value>,
277        inactivity_floor_loc: Location<F>,
278        root: H::Digest,
279        last_commit_op_bytes: Vec<u8>,
280        last_commit_proof: Proof<F, H::Digest>,
281        pinned_nodes: Vec<H::Digest>,
282    ) -> Result<Self, Error<F>> {
283        let (last_commit_loc, witness) = witness::witness_from_authenticated_state(
284            &merkle,
285            root,
286            inactivity_floor_loc,
287            last_commit_op_bytes,
288            last_commit_proof,
289            pinned_nodes,
290        )?;
291
292        Ok(Self {
293            merkle,
294            last_commit_loc,
295            last_commit_metadata,
296            inactivity_floor_loc,
297            commit_codec_config,
298            witness: witness::Cache::new(witness),
299        })
300    }
301
302    /// Open a compact db from persisted compact state and rebuild its witness cache.
303    ///
304    /// On first open, this bootstraps the initial commit and its witness so every later reopen and
305    /// rewind can assume "the active slot has a complete compact witness".
306    pub(crate) async fn init_from_merkle(
307        mut merkle: compact_merkle::Merkle<F, E, H::Digest, S>,
308        commit_codec_config: C,
309    ) -> Result<Self, Error<F>>
310    where
311        F: Family,
312        Operation<F, V>: Read<Cfg = C>,
313    {
314        // Bootstrap: append an initial Commit(None, 0) on first open. This establishes the
315        // invariant that every merkleized batch ends with a Commit op, so `last_commit_loc =
316        // leaves - 1` is always correct without replaying the log (which we can't, since we
317        // don't retain it).
318        //
319        // We also persist that initial commit's witness immediately so every later reopen or
320        // rewind can uniformly assume "the active slot has a current tip witness".
321        if merkle.leaves() == 0 {
322            witness::bootstrap_initial_commit::<F, E, H, S>(
323                &mut merkle,
324                Operation::<F, V>::Commit(None, Location::new(0))
325                    .encode()
326                    .to_vec(),
327            )
328            .await?;
329        }
330
331        let (witness, last_commit_op) =
332            Self::load_active_witness(&merkle, &commit_codec_config).await?;
333        let Operation::Commit(last_commit_metadata, inactivity_floor_loc) = last_commit_op else {
334            return Err(Error::DataCorrupted("last operation was not a commit"));
335        };
336
337        Self::init_from_verified_state(
338            merkle,
339            commit_codec_config,
340            last_commit_metadata,
341            inactivity_floor_loc,
342            witness.root,
343            witness.last_commit_op_bytes,
344            witness.last_commit_proof,
345            witness.pinned_nodes,
346        )
347    }
348
349    /// Return the root of the db.
350    pub fn root(&self) -> H::Digest
351    where
352        F: Family,
353    {
354        let hasher = qmdb::hasher::<H>();
355        let inactive_peaks = F::inactive_peaks(
356            F::location_to_position(Location::new(*self.last_commit_loc + 1)),
357            self.inactivity_floor_loc,
358        );
359        self.merkle
360            .root(&hasher, inactive_peaks)
361            .expect("compact Merkle root should not fail")
362    }
363
364    /// Return a reference to the merkleization strategy.
365    pub const fn strategy(&self) -> &S {
366        self.merkle.strategy()
367    }
368
369    /// Return the location of the last commit.
370    pub const fn last_commit_loc(&self) -> Location<F> {
371        self.last_commit_loc
372    }
373
374    /// Return the inactivity floor declared by the last committed batch.
375    pub const fn inactivity_floor_loc(&self) -> Location<F> {
376        self.inactivity_floor_loc
377    }
378
379    /// Return the location of the next operation appended to this db.
380    pub fn size(&self) -> Location<F> {
381        Location::new(*self.last_commit_loc + 1)
382    }
383
384    /// Get the metadata associated with the last commit.
385    pub fn get_metadata(&self) -> Option<V::Value> {
386        self.last_commit_metadata.clone()
387    }
388
389    /// Return the compact-sync target described by the current witness.
390    ///
391    /// This reflects the last state for which both frontier and witness were durably captured,
392    /// which may lag behind live in-memory mutations until [`Self::sync`] is called.
393    pub fn current_target(&self) -> compact_sync::Target<F, H::Digest> {
394        self.witness.with(ServeState::target)
395    }
396
397    /// Return the compact-sync state for `target`, or a stale-target error if the source's
398    /// current witness no longer matches.
399    ///
400    /// The witness lock is held only long enough to verify the requested target and snapshot
401    /// the bytes, proof, and pinned nodes needed for [`compact_sync::State`]. Decoding the
402    /// commit operation runs outside the lock so concurrent readers do not contend on it.
403    pub(crate) fn compact_state(
404        &self,
405        target: compact_sync::Target<F, H::Digest>,
406    ) -> CompactStateResult<F, V, H::Digest>
407    where
408        Operation<F, V>: Read<Cfg = C>,
409    {
410        let (op_bytes, last_commit_proof, pinned_nodes, leaf_count) = self.witness.with(|w| {
411            if target.root != w.root || target.leaf_count != w.leaf_count {
412                return Err(compact_sync::ServeError::StaleTarget {
413                    requested: target.clone(),
414                    current: w.target(),
415                });
416            }
417            Ok((
418                w.last_commit_op_bytes.clone(),
419                w.last_commit_proof.clone(),
420                w.pinned_nodes.clone(),
421                w.leaf_count,
422            ))
423        })?;
424        let op = Operation::<F, V>::decode_cfg(op_bytes.as_ref(), &self.commit_codec_config)
425            .map_err(|_| {
426                compact_sync::ServeError::Database(Error::DataCorrupted("invalid commit operation"))
427            })?;
428        if !matches!(&op, Operation::Commit(_, _)) {
429            return Err(compact_sync::ServeError::Database(Error::DataCorrupted(
430                "last operation was not a commit",
431            )));
432        }
433        Ok(compact_sync::State {
434            leaf_count,
435            pinned_nodes,
436            last_commit_op: op,
437            last_commit_proof,
438        })
439    }
440
441    /// Create a new speculative batch of operations with this database as its parent.
442    pub fn new_batch(&self) -> UnmerkleizedBatch<F, H, V, S> {
443        let committed_size = *self.last_commit_loc + 1;
444        UnmerkleizedBatch::new(self, committed_size)
445    }
446
447    /// Create an owned merkleized batch representing the current committed state.
448    pub fn to_batch(&self) -> Arc<MerkleizedBatch<F, H::Digest, V, S>>
449    where
450        F: Family,
451    {
452        let committed_size = *self.last_commit_loc + 1;
453        Arc::new(MerkleizedBatch {
454            merkle_batch: self.merkle.to_batch(),
455            root: self.root(),
456            commit_metadata: self.last_commit_metadata.clone(),
457            parent: None,
458            bounds: batch_chain::Bounds {
459                base_size: committed_size,
460                db_size: committed_size,
461                total_size: committed_size,
462                ancestors: Vec::new(),
463                inactivity_floor: self.inactivity_floor_loc,
464            },
465        })
466    }
467
468    /// Apply a merkleized batch to the database.
469    ///
470    /// Returns the range of locations written. The state is updated in memory only; call
471    /// [`Self::sync`] or [`Self::commit`] to persist.
472    ///
473    /// # Errors
474    ///
475    /// - [`Error::StaleBatch`] if the batch was created from a stale DB state.
476    /// - [`Error::FloorRegressed`] if any unapplied commit's floor is below the running floor
477    ///   (walking ancestors oldest-first, then the tip).
478    /// - [`Error::FloorBeyondSize`] if any unapplied commit's floor exceeds its own commit
479    ///   location.
480    pub fn apply_batch(
481        &mut self,
482        batch: Arc<MerkleizedBatch<F, H::Digest, V, S>>,
483    ) -> Result<core::ops::Range<Location<F>>, Error<F>> {
484        let db_size = *self.last_commit_loc + 1;
485        batch
486            .bounds
487            .validate_apply_to(db_size, self.inactivity_floor_loc)?;
488
489        let start_loc = self.last_commit_loc + 1;
490        self.merkle.apply_batch(&batch.merkle_batch)?;
491        self.last_commit_loc = Location::new(batch.bounds.total_size - 1);
492        self.last_commit_metadata = batch.commit_metadata.clone();
493        self.inactivity_floor_loc = batch.bounds.inactivity_floor;
494        Ok(start_loc..Location::new(batch.bounds.total_size))
495    }
496
497    /// Durably persist the current db state to disk.
498    ///
499    /// This is the point at which in-memory mutations become servable via compact sync. The compact
500    /// Merkle frontier and last-commit witness are written into the same slot, reusing the cached
501    /// witness when the current state has already been persisted.
502    pub async fn sync(&self) -> Result<(), Error<F>> {
503        witness::persist_witness::<F, E, H, S>(
504            &self.merkle,
505            &self.witness,
506            self.last_commit_loc,
507            self.inactivity_floor_loc,
508            Self::encode_commit_op(self.last_commit_metadata.clone(), self.inactivity_floor_loc),
509        )
510        .await
511    }
512
513    /// Durably persist the current db state to disk (alias for [`Self::sync`]).
514    pub async fn commit(&self) -> Result<(), Error<F>>
515    where
516        F: Family,
517    {
518        self.sync().await
519    }
520
521    /// Restore the state as of the sync before the most recent one.
522    ///
523    /// Discards any uncommitted batches, flips the db back to the previous persisted state,
524    /// and reloads the cached commit metadata and inactivity floor from that slot.
525    ///
526    /// Callers must drop any [`Arc<MerkleizedBatch>`] merkleized against state that this rewind
527    /// discards. [`Self::apply_batch`] validates batches by size only: a discarded-branch batch
528    /// will usually trip the size-mismatch check, but if the db later regrows to the same size
529    /// along an alternate branch, the stale batch becomes admissible again and applying it will
530    /// corrupt the committed root. Batches merkleized against the state this rewind restores to
531    /// (for example, a batch built before an advance that is then discarded by the rewind)
532    /// remain compatible and apply cleanly.
533    ///
534    /// # Errors
535    ///
536    /// Returns [`crate::merkle::Error::RewindBeyondHistory`] (wrapped as [`Error::Merkle`]) if
537    /// no prior state exists — either no sync has occurred yet, or the previous state was
538    /// already consumed by a rewind with no intervening sync.
539    ///
540    /// Any error from this method is fatal for this handle. The Merkle layer may have already
541    /// flipped its generation pointer and rebuilt its in-memory state before a later step (e.g.
542    /// reloading the cached commit metadata or inactivity floor) fails, leaving this `Db`'s
543    /// in-memory fields out of sync with the persisted slot. Callers must drop this handle
544    /// after any `Err` from `rewind` and reopen from storage.
545    pub async fn rewind(&mut self) -> Result<(), Error<F>>
546    where
547        F: Family,
548    {
549        self.merkle.rewind().await?;
550        // Reload the witness from the reverted slot as well, so compact serving stays aligned with
551        // the same frontier/root that `rewind` restored.
552        let (witness, last_commit_op) =
553            Self::load_active_witness(&self.merkle, &self.commit_codec_config).await?;
554        let Operation::Commit(last_commit_metadata, inactivity_floor_loc) = last_commit_op else {
555            return Err(Error::DataCorrupted("last operation was not a commit"));
556        };
557        self.last_commit_metadata = last_commit_metadata;
558        self.inactivity_floor_loc = inactivity_floor_loc;
559        self.last_commit_loc = Location::new(*witness.leaf_count - 1);
560        self.witness.replace(witness);
561        Ok(())
562    }
563
564    /// Destroy all persisted state associated with this database.
565    pub async fn destroy(self) -> Result<(), Error<F>> {
566        self.merkle.destroy().await.map_err(Into::into)
567    }
568
569    pub(crate) async fn persist_cached_witness(&self) -> Result<(), Error<F>> {
570        witness::persist_cached_witness::<F, E, H, S>(&self.merkle, &self.witness).await
571    }
572}
573
574#[cfg(test)]
575mod tests {
576    use super::*;
577    use crate::{
578        merkle::mmr,
579        metadata::{Config as MConfig, Metadata},
580        qmdb::any::value::FixedEncoding,
581    };
582    use commonware_cryptography::Sha256;
583    use commonware_macros::test_traced;
584    use commonware_parallel::Sequential;
585    use commonware_runtime::{deterministic, Runner as _, Supervisor as _};
586    use commonware_utils::sequence::{prefixed_u64::U64 as MetadataKey, U64};
587
588    type TestDb<F> = Db<F, deterministic::Context, FixedEncoding<U64>, Sha256, (), Sequential>;
589
590    async fn open_db<F: Family>(context: deterministic::Context, partition: &str) -> TestDb<F> {
591        let merkle = crate::merkle::compact::Merkle::init(
592            context,
593            crate::merkle::compact::Config {
594                partition: partition.into(),
595                strategy: Sequential,
596            },
597        )
598        .await
599        .unwrap();
600        Db::init_from_merkle(merkle, ()).await.unwrap()
601    }
602
603    async fn tamper_metadata_key(
604        context: deterministic::Context,
605        partition: &str,
606        key: MetadataKey,
607    ) {
608        let mut metadata = open_metadata(context, partition).await;
609        let mut bytes = metadata.get(&key).cloned().expect("metadata entry missing");
610        *bytes.last_mut().expect("metadata entry empty") ^= 0x01;
611        metadata.put_sync(key, bytes).await.unwrap();
612    }
613
614    async fn open_metadata(
615        context: deterministic::Context,
616        partition: &str,
617    ) -> Metadata<deterministic::Context, MetadataKey, Vec<u8>> {
618        Metadata::<_, MetadataKey, Vec<u8>>::init(
619            context.child("meta_write"),
620            MConfig {
621                partition: partition.into(),
622                codec_config: ((0..).into(), ()),
623            },
624        )
625        .await
626        .unwrap()
627    }
628
629    async fn overwrite_metadata_key(
630        context: deterministic::Context,
631        partition: &str,
632        key: MetadataKey,
633        bytes: Vec<u8>,
634    ) {
635        let mut metadata = open_metadata(context, partition).await;
636        metadata.put_sync(key, bytes).await.unwrap();
637    }
638
639    #[test_traced("INFO")]
640    fn test_compact_stale_batch_rejected() {
641        deterministic::Runner::default().start(|context| async move {
642            let mut db = open_db::<mmr::Family>(context.child("db"), "keyless-stale").await;
643            let floor = db.inactivity_floor_loc();
644
645            let batch_a =
646                db.new_batch()
647                    .append(U64::new(1))
648                    .merkleize(&db, Some(U64::new(11)), floor);
649            let batch_b =
650                db.new_batch()
651                    .append(U64::new(2))
652                    .merkleize(&db, Some(U64::new(22)), floor);
653
654            let expected_root = batch_a.root();
655            db.apply_batch(batch_a).unwrap();
656            assert_eq!(db.root(), expected_root);
657            assert!(matches!(
658                db.apply_batch(batch_b),
659                Err(Error::StaleBatch { .. })
660            ));
661
662            db.destroy().await.unwrap();
663        });
664    }
665
666    /// Regression: `to_batch()` must snapshot the live in-memory state, not the durable serve
667    /// cache.
668    #[test_traced("INFO")]
669    fn test_compact_to_batch_reflects_live_state() {
670        deterministic::Runner::default().start(|context| async move {
671            let mut db = open_db::<mmr::Family>(context.child("db"), "keyless-to-batch-live").await;
672            let floor = db.inactivity_floor_loc();
673
674            let pre_apply_root = db.root();
675            let pre_snapshot = db.to_batch();
676            assert_eq!(
677                pre_snapshot.root(),
678                pre_apply_root,
679                "snapshot before any mutation should match the live root"
680            );
681
682            db.apply_batch(db.new_batch().append(U64::new(1)).merkleize(
683                &db,
684                Some(U64::new(11)),
685                floor,
686            ))
687            .unwrap();
688
689            // Leave the durable serve cache behind the live Merkle state.
690            let live_root = db.root();
691            assert_ne!(
692                live_root, pre_apply_root,
693                "applying a non-empty batch must change the live root"
694            );
695
696            let snapshot = db.to_batch();
697            assert_eq!(
698                snapshot.root(),
699                live_root,
700                "to_batch().root() must match the live db.root() even before sync/commit"
701            );
702
703            db.destroy().await.unwrap();
704        });
705    }
706
707    #[test_traced("INFO")]
708    fn test_compact_stale_batch_chained() {
709        deterministic::Runner::default().start(|context| async move {
710            let mut db = open_db::<mmr::Family>(context.child("db"), "keyless-chained-stale").await;
711            let floor = db.inactivity_floor_loc();
712
713            let parent =
714                db.new_batch()
715                    .append(U64::new(1))
716                    .merkleize(&db, Some(U64::new(11)), floor);
717            let child_a = parent.new_batch::<Sha256>().append(U64::new(2)).merkleize(
718                &db,
719                Some(U64::new(22)),
720                floor,
721            );
722            let child_b = parent.new_batch::<Sha256>().append(U64::new(3)).merkleize(
723                &db,
724                Some(U64::new(33)),
725                floor,
726            );
727
728            db.apply_batch(child_a).unwrap();
729            assert!(matches!(
730                db.apply_batch(child_b),
731                Err(Error::StaleBatch { .. })
732            ));
733
734            db.destroy().await.unwrap();
735        });
736    }
737
738    #[test_traced("INFO")]
739    fn test_compact_stale_parent_after_child_applied() {
740        deterministic::Runner::default().start(|context| async move {
741            let mut db =
742                open_db::<mmr::Family>(context.child("db"), "keyless-child-before-parent").await;
743            let floor = db.inactivity_floor_loc();
744
745            let parent =
746                db.new_batch()
747                    .append(U64::new(1))
748                    .merkleize(&db, Some(U64::new(11)), floor);
749            let child = parent.new_batch::<Sha256>().append(U64::new(2)).merkleize(
750                &db,
751                Some(U64::new(22)),
752                floor,
753            );
754
755            db.apply_batch(child).unwrap();
756            assert!(matches!(
757                db.apply_batch(parent),
758                Err(Error::StaleBatch { .. })
759            ));
760
761            db.destroy().await.unwrap();
762        });
763    }
764
765    #[test_traced("INFO")]
766    fn test_compact_sequential_commit_parent_then_child() {
767        deterministic::Runner::default().start(|context| async move {
768            let mut db = open_db::<mmr::Family>(context.child("db"), "keyless-parent-child").await;
769            let floor = db.inactivity_floor_loc();
770
771            let parent =
772                db.new_batch()
773                    .append(U64::new(1))
774                    .merkleize(&db, Some(U64::new(11)), floor);
775            let child = parent.new_batch::<Sha256>().append(U64::new(2)).merkleize(
776                &db,
777                Some(U64::new(22)),
778                floor,
779            );
780            let expected_root = child.root();
781
782            db.apply_batch(parent).unwrap();
783            db.apply_batch(child).unwrap();
784            db.commit().await.unwrap();
785
786            assert_eq!(db.root(), expected_root);
787
788            db.destroy().await.unwrap();
789        });
790    }
791
792    // A chained batch whose tip floor is below its parent's floor must be rejected:
793    // the parent's Commit participates in the per-commit monotonicity invariant even
794    // before it is applied.
795    #[test_traced("INFO")]
796    fn test_compact_ancestor_floor_regression_rejected() {
797        deterministic::Runner::default().start(|context| async move {
798            let mut db =
799                open_db::<mmr::Family>(context.child("db"), "keyless-ancestor-floor-regressed")
800                    .await;
801
802            // parent: append + commit at loc 2 with floor=2.
803            let parent = db
804                .new_batch()
805                .append(U64::new(1))
806                .merkleize(&db, None, Location::new(2));
807            // child: append + commit at loc 4 with floor=1 (regressed from parent's floor=2).
808            let child = parent.new_batch::<Sha256>().append(U64::new(2)).merkleize(
809                &db,
810                None,
811                Location::new(1),
812            );
813
814            assert!(matches!(
815                db.apply_batch(child),
816                Err(Error::FloorRegressed(new, prev))
817                    if new == Location::new(1) && prev == Location::new(2)
818            ));
819
820            db.destroy().await.unwrap();
821        });
822    }
823
824    #[test_traced("INFO")]
825    fn test_compact_rewind_restores_commit_metadata_and_floor() {
826        deterministic::Runner::default().start(|context| async move {
827            let mut db = open_db::<mmr::Family>(context.child("db"), "keyless-rewind-meta").await;
828
829            let v1 = U64::new(1);
830            let meta1 = U64::new(11);
831            let floor1 = Location::new(0);
832            db.apply_batch(
833                db.new_batch()
834                    .append(v1)
835                    .merkleize(&db, Some(meta1.clone()), floor1),
836            )
837            .unwrap();
838            db.commit().await.unwrap();
839            let root_after_first = db.root();
840
841            let v2 = U64::new(2);
842            let meta2 = U64::new(22);
843            let floor2 = Location::new(1);
844            db.apply_batch(
845                db.new_batch()
846                    .append(v2)
847                    .merkleize(&db, Some(meta2.clone()), floor2),
848            )
849            .unwrap();
850            db.commit().await.unwrap();
851            assert_eq!(db.get_metadata(), Some(meta2));
852            assert_eq!(db.inactivity_floor_loc(), floor2);
853
854            db.rewind().await.unwrap();
855            assert_eq!(db.root(), root_after_first);
856            assert_eq!(db.get_metadata(), Some(meta1));
857            assert_eq!(db.inactivity_floor_loc(), floor1);
858
859            db.destroy().await.unwrap();
860        });
861    }
862
863    #[test_traced("INFO")]
864    fn test_compact_rewind_persists_across_reopen() {
865        deterministic::Runner::default().start(|context| async move {
866            let partition = "keyless-rewind-reopen";
867            let meta1 = U64::new(11);
868            let floor1 = Location::new(0);
869            let meta2 = U64::new(22);
870            let floor2 = Location::new(1);
871
872            let root_after_first = {
873                let mut db = open_db::<mmr::Family>(context.child("first"), partition).await;
874                db.apply_batch(db.new_batch().append(U64::new(1)).merkleize(
875                    &db,
876                    Some(meta1.clone()),
877                    floor1,
878                ))
879                .unwrap();
880                db.commit().await.unwrap();
881                let root = db.root();
882
883                db.apply_batch(db.new_batch().append(U64::new(2)).merkleize(
884                    &db,
885                    Some(meta2),
886                    floor2,
887                ))
888                .unwrap();
889                db.commit().await.unwrap();
890
891                db.rewind().await.unwrap();
892                root
893            };
894
895            let db = open_db::<mmr::Family>(context.child("second"), partition).await;
896            assert_eq!(db.root(), root_after_first);
897            assert_eq!(db.get_metadata(), Some(meta1));
898            assert_eq!(db.inactivity_floor_loc(), floor1);
899
900            db.destroy().await.unwrap();
901        });
902    }
903
904    #[test_traced("INFO")]
905    fn test_compact_reopen_rejects_tampered_witness() {
906        deterministic::Runner::default().start(|context| async move {
907            let partition = "keyless-witness-tamper";
908            let mut db = open_db::<mmr::Family>(context.child("db"), partition).await;
909            db.apply_batch(db.new_batch().append(U64::new(7)).merkleize(
910                &db,
911                Some(U64::new(11)),
912                Location::new(1),
913            ))
914            .unwrap();
915            db.commit().await.unwrap();
916            let slot = db.merkle.active_slot();
917            drop(db);
918
919            tamper_metadata_key(
920                context.child("tamper"),
921                partition,
922                crate::qmdb::compact::witness::last_commit_proof_key(slot),
923            )
924            .await;
925
926            let merkle: crate::merkle::compact::Merkle<mmr::Family, _, _, Sequential> =
927                crate::merkle::compact::Merkle::init(
928                    context.child("reopen"),
929                    crate::merkle::compact::Config {
930                        partition: partition.into(),
931                        strategy: Sequential,
932                    },
933                )
934                .await
935                .unwrap();
936            let reopened = TestDb::<mmr::Family>::init_from_merkle(merkle, ()).await;
937            assert!(matches!(reopened, Err(Error::DataCorrupted(_))));
938        });
939    }
940
941    #[test_traced("INFO")]
942    fn test_compact_reopen_rejects_commit_floor_beyond_tip() {
943        deterministic::Runner::default().start(|context| async move {
944            let partition = "keyless-invalid-persisted-floor";
945            let mut db = open_db::<mmr::Family>(context.child("db"), partition).await;
946            db.apply_batch(db.new_batch().append(U64::new(7)).merkleize(
947                &db,
948                Some(U64::new(11)),
949                Location::new(1),
950            ))
951            .unwrap();
952            db.commit().await.unwrap();
953            let slot = db.merkle.active_slot();
954            drop(db);
955            let oversized_floor = Location::new(10);
956
957            overwrite_metadata_key(
958                context.child("tamper"),
959                partition,
960                crate::qmdb::compact::witness::last_commit_op_key(slot),
961                Operation::<mmr::Family, FixedEncoding<U64>>::Commit(
962                    Some(U64::new(11)),
963                    oversized_floor,
964                )
965                .encode()
966                .to_vec(),
967            )
968            .await;
969
970            let merkle: crate::merkle::compact::Merkle<mmr::Family, _, _, Sequential> =
971                crate::merkle::compact::Merkle::init(
972                    context.child("reopen"),
973                    crate::merkle::compact::Config {
974                        partition: partition.into(),
975                        strategy: Sequential,
976                    },
977                )
978                .await
979                .unwrap();
980            let reopened = TestDb::<mmr::Family>::init_from_merkle(merkle, ()).await;
981            assert!(matches!(
982                reopened,
983                Err(Error::DataCorrupted("invalid compact witness"))
984            ));
985        });
986    }
987
988    #[test_traced("INFO")]
989    fn test_compact_rewind_beyond_history() {
990        deterministic::Runner::default().start(|context| async move {
991            let mut db = open_db::<mmr::Family>(context.child("db"), "keyless-rewind-beyond").await;
992            assert!(matches!(
993                db.rewind().await,
994                Err(Error::Merkle(crate::merkle::Error::RewindBeyondHistory))
995            ));
996            db.destroy().await.unwrap();
997        });
998    }
999
1000    #[test_traced("INFO")]
1001    fn test_compact_rewind_preserves_pre_advance_batch() {
1002        deterministic::Runner::default().start(|context| async move {
1003            let mut db =
1004                open_db::<mmr::Family>(context.child("db"), "keyless-rewind-preserves-pre-advance")
1005                    .await;
1006
1007            db.apply_batch(db.new_batch().append(U64::new(1)).merkleize(
1008                &db,
1009                None,
1010                Location::new(0),
1011            ))
1012            .unwrap();
1013            db.commit().await.unwrap();
1014
1015            // Merkleize a batch against the post-commit-A state.
1016            let held = db
1017                .new_batch()
1018                .append(U64::new(2))
1019                .merkleize(&db, None, Location::new(0));
1020
1021            // Advance past that state and commit, then rewind back to it.
1022            db.apply_batch(db.new_batch().append(U64::new(3)).merkleize(
1023                &db,
1024                None,
1025                Location::new(0),
1026            ))
1027            .unwrap();
1028            db.commit().await.unwrap();
1029            db.rewind().await.unwrap();
1030
1031            // The rewind restored the state that `held` was merkleized against, so it still
1032            // matches the Merkle size and applies cleanly.
1033            db.apply_batch(held).unwrap();
1034
1035            db.destroy().await.unwrap();
1036        });
1037    }
1038
1039    #[test_traced("INFO")]
1040    fn test_compact_noop_commit_after_commit() {
1041        deterministic::Runner::default().start(|context| async move {
1042            let mut db =
1043                open_db::<mmr::Family>(context.child("db"), "keyless-noop-after-commit").await;
1044
1045            db.apply_batch(
1046                db.new_batch()
1047                    .append(U64::new(1))
1048                    .append(U64::new(2))
1049                    .merkleize(&db, Some(U64::new(11)), Location::new(0)),
1050            )
1051            .unwrap();
1052            db.commit().await.unwrap();
1053            let root_after_first = db.root();
1054            assert_eq!(db.size(), Location::new(4));
1055
1056            db.commit().await.unwrap();
1057            assert_eq!(db.size(), Location::new(4));
1058            assert_eq!(db.root(), root_after_first);
1059            assert_eq!(db.current_target().root, db.root());
1060
1061            db.destroy().await.unwrap();
1062        });
1063    }
1064
1065    #[test_traced("INFO")]
1066    fn test_compact_noop_commit_after_reopen() {
1067        deterministic::Runner::default().start(|context| async move {
1068            let partition = "keyless-noop-after-reopen";
1069
1070            let root_before_drop = {
1071                let mut db = open_db::<mmr::Family>(context.child("first"), partition).await;
1072                db.apply_batch(
1073                    db.new_batch()
1074                        .append(U64::new(1))
1075                        .append(U64::new(2))
1076                        .merkleize(&db, Some(U64::new(11)), Location::new(0)),
1077                )
1078                .unwrap();
1079                db.commit().await.unwrap();
1080                let root = db.root();
1081                assert_eq!(db.size(), Location::new(4));
1082                root
1083            };
1084
1085            let db = open_db::<mmr::Family>(context.child("second"), partition).await;
1086            assert_eq!(db.root(), root_before_drop);
1087            assert_eq!(db.size(), Location::new(4));
1088
1089            db.commit().await.unwrap();
1090            assert_eq!(db.size(), Location::new(4));
1091            assert_eq!(db.root(), root_before_drop);
1092            assert_eq!(db.current_target().root, db.root());
1093
1094            db.destroy().await.unwrap();
1095        });
1096    }
1097
1098    #[test_traced("INFO")]
1099    fn test_compact_noop_commit_after_rewind() {
1100        deterministic::Runner::default().start(|context| async move {
1101            let mut db =
1102                open_db::<mmr::Family>(context.child("db"), "keyless-noop-after-rewind").await;
1103
1104            db.apply_batch(
1105                db.new_batch()
1106                    .append(U64::new(1))
1107                    .append(U64::new(2))
1108                    .merkleize(&db, Some(U64::new(11)), Location::new(0)),
1109            )
1110            .unwrap();
1111            db.commit().await.unwrap();
1112            let root_after_first = db.root();
1113
1114            db.apply_batch(db.new_batch().append(U64::new(3)).merkleize(
1115                &db,
1116                Some(U64::new(22)),
1117                Location::new(1),
1118            ))
1119            .unwrap();
1120            db.commit().await.unwrap();
1121
1122            db.rewind().await.unwrap();
1123            assert_eq!(db.size(), Location::new(4));
1124            assert_eq!(db.root(), root_after_first);
1125
1126            db.commit().await.unwrap();
1127            assert_eq!(db.size(), Location::new(4));
1128            assert_eq!(db.root(), root_after_first);
1129            assert_eq!(db.current_target().root, db.root());
1130
1131            db.destroy().await.unwrap();
1132        });
1133    }
1134
1135    #[test_traced("INFO")]
1136    fn test_compact_rewind_makes_post_advance_batch_stale() {
1137        deterministic::Runner::default().start(|context| async move {
1138            let mut db =
1139                open_db::<mmr::Family>(context.child("db"), "keyless-rewind-makes-stale").await;
1140
1141            db.apply_batch(db.new_batch().append(U64::new(1)).merkleize(
1142                &db,
1143                None,
1144                Location::new(0),
1145            ))
1146            .unwrap();
1147            db.commit().await.unwrap();
1148
1149            db.apply_batch(db.new_batch().append(U64::new(2)).merkleize(
1150                &db,
1151                None,
1152                Location::new(0),
1153            ))
1154            .unwrap();
1155            db.commit().await.unwrap();
1156
1157            // Merkleize a batch against the post-commit-B state, which the rewind will discard.
1158            let held = db
1159                .new_batch()
1160                .append(U64::new(3))
1161                .merkleize(&db, None, Location::new(0));
1162
1163            db.rewind().await.unwrap();
1164
1165            // After rewind, mem.size reflects post-commit-A, but the held batch starts after
1166            // post-commit-B. Apply must be rejected with StaleBatch.
1167            assert!(matches!(
1168                db.apply_batch(held),
1169                Err(Error::StaleBatch { .. })
1170            ));
1171
1172            db.destroy().await.unwrap();
1173        });
1174    }
1175
1176    #[test_traced("INFO")]
1177    fn test_witness_state_reports_cached_commit_corruption() {
1178        deterministic::Runner::default().start(|context| async move {
1179            let db = open_db::<mmr::Family>(context.child("db"), "keyless-serve-corruption").await;
1180            let target = db.current_target();
1181            db.witness
1182                .mutate(|witness| witness.last_commit_op_bytes.clear());
1183
1184            assert!(matches!(
1185                db.compact_state(target),
1186                Err(compact_sync::ServeError::Database(Error::DataCorrupted(
1187                    "invalid commit operation"
1188                )))
1189            ));
1190
1191            db.destroy().await.unwrap();
1192        });
1193    }
1194
1195    // A chained batch whose ancestor's floor exceeds that ancestor's own commit location
1196    // must be rejected, identifying the ancestor's bound rather than the tip's.
1197    #[test_traced("INFO")]
1198    fn test_compact_ancestor_floor_beyond_commit_loc_rejected() {
1199        deterministic::Runner::default().start(|context| async move {
1200            let mut db =
1201                open_db::<mmr::Family>(context.child("db"), "keyless-ancestor-floor-beyond").await;
1202
1203            // parent: append + commit at loc 2, floor=3 (one past parent's commit).
1204            let parent = db
1205                .new_batch()
1206                .append(U64::new(1))
1207                .merkleize(&db, None, Location::new(3));
1208            // child: valid on its own (floor=0), but parent's floor is bad.
1209            let child = parent.new_batch::<Sha256>().append(U64::new(2)).merkleize(
1210                &db,
1211                None,
1212                Location::new(0),
1213            );
1214
1215            assert!(matches!(
1216                db.apply_batch(child),
1217                Err(Error::FloorBeyondSize(floor, commit))
1218                    if floor == Location::new(3) && commit == Location::new(2)
1219            ));
1220
1221            db.destroy().await.unwrap();
1222        });
1223    }
1224}