1use super::operation::Operation;
24use crate::{
25 merkle::{batch, compact as compact_merkle, Family, Location, Proof},
26 qmdb::{
27 self,
28 any::value::ValueEncoding,
29 batch_chain::{self, Bounds},
30 compact::{
31 batch as compact_batch,
32 witness::{self, ServeState},
33 },
34 operation::Key,
35 sync::compact as compact_sync,
36 Error,
37 },
38 Context,
39};
40use commonware_codec::{Decode as _, Encode, EncodeShared, Read};
41use commonware_cryptography::{Digest, Hasher};
42use commonware_parallel::Strategy;
43use core::marker::PhantomData;
44use std::{
45 collections::BTreeMap,
46 sync::{Arc, Weak},
47};
48
49#[derive(Clone)]
51pub struct Config<C, S: Strategy> {
52 pub merkle: compact_merkle::Config<S>,
54
55 pub commit_codec_config: C,
57}
58
59pub struct Db<F, E, K, V, H, C, S: Strategy>
61where
62 F: Family,
63 E: Context,
64 K: Key,
65 V: ValueEncoding,
66 H: Hasher,
67 Operation<F, K, V>: EncodeShared,
68 Operation<F, K, V>: Read<Cfg = C>,
69 C: Clone + Send + Sync + 'static,
70{
71 merkle: compact_merkle::Merkle<F, E, H::Digest, S>,
72 last_commit_loc: Location<F>,
73 last_commit_metadata: Option<V::Value>,
74 inactivity_floor_loc: Location<F>,
75 commit_codec_config: C,
76 witness: witness::Cache<F, H::Digest>,
82 _key: PhantomData<K>,
83}
84
85type CompactStateResult<F, K, V, D> =
86 Result<compact_sync::State<F, Operation<F, K, V>, D>, compact_sync::ServeError<F, D>>;
87
88#[allow(clippy::type_complexity)]
90pub struct UnmerkleizedBatch<F, H, K, V, S: Strategy>
91where
92 F: Family,
93 K: Key,
94 V: ValueEncoding,
95 H: Hasher,
96 Operation<F, K, V>: EncodeShared,
97{
98 merkle_batch: compact_merkle::UnmerkleizedBatch<F, H::Digest, S>,
99 mutations: BTreeMap<K, V::Value>,
100 parent: Option<Arc<MerkleizedBatch<F, H::Digest, K, V, S>>>,
101 base_size: u64,
102 db_size: u64,
103}
104
105#[derive(Clone)]
107pub struct MerkleizedBatch<F: Family, D: Digest, K: Key, V: ValueEncoding, S: Strategy>
108where
109 Operation<F, K, V>: EncodeShared,
110{
111 pub(super) merkle_batch: Arc<batch::MerkleizedBatch<F, D, S>>,
112 pub(super) root: D,
113 pub(super) commit_metadata: Option<V::Value>,
114 pub(super) parent: Option<Weak<Self>>,
115 pub(super) bounds: batch_chain::Bounds<F>,
116 pub(super) _key: PhantomData<K>,
117}
118
119impl<F: Family, D: Digest, K: Key, V: ValueEncoding, S: Strategy> MerkleizedBatch<F, D, K, V, S>
120where
121 Operation<F, K, V>: EncodeShared,
122{
123 pub(super) fn ancestors(&self) -> impl Iterator<Item = Arc<Self>> {
124 batch_chain::ancestors(self.parent.clone(), |batch| batch.parent.as_ref())
125 }
126
127 pub const fn root(&self) -> D {
129 self.root
130 }
131
132 pub const fn bounds(&self) -> &Bounds<F> {
134 &self.bounds
135 }
136
137 pub fn new_batch<H>(self: &Arc<Self>) -> UnmerkleizedBatch<F, H, K, V, S>
139 where
140 H: Hasher<Digest = D>,
141 {
142 UnmerkleizedBatch {
143 merkle_batch: compact_merkle::UnmerkleizedBatch::wrap(self.merkle_batch.new_batch()),
144 mutations: BTreeMap::new(),
145 parent: Some(Arc::clone(self)),
146 base_size: self.bounds.total_size,
147 db_size: self.bounds.db_size,
148 }
149 }
150}
151
152impl<F, H, K, V, S> UnmerkleizedBatch<F, H, K, V, S>
153where
154 F: Family,
155 K: Key,
156 V: ValueEncoding,
157 H: Hasher,
158 S: Strategy,
159 Operation<F, K, V>: EncodeShared,
160{
161 pub(super) fn new<E, C>(db: &Db<F, E, K, V, H, C, S>, committed_size: u64) -> Self
162 where
163 E: Context,
164 C: Clone + Send + Sync + 'static,
165 Operation<F, K, V>: Read<Cfg = C>,
166 {
167 Self {
168 merkle_batch: db.merkle.new_batch(),
169 mutations: BTreeMap::new(),
170 parent: None,
171 base_size: committed_size,
172 db_size: committed_size,
173 }
174 }
175
176 pub fn set(mut self, key: K, value: V::Value) -> Self {
177 self.mutations.insert(key, value);
178 self
179 }
180
181 pub fn merkleize<E, C>(
189 self,
190 db: &Db<F, E, K, V, H, C, S>,
191 metadata: Option<V::Value>,
192 inactivity_floor: Location<F>,
193 ) -> Arc<MerkleizedBatch<F, H::Digest, K, V, S>>
194 where
195 F: Family,
196 E: Context,
197 C: Clone + Send + Sync + 'static,
198 Operation<F, K, V>: Read<Cfg = C>,
199 {
200 let mut ops: Vec<Operation<F, K, V>> = Vec::with_capacity(self.mutations.len() + 1);
201 for (key, value) in self.mutations {
202 ops.push(Operation::Set(key, value));
203 }
204 ops.push(Operation::Commit(metadata.clone(), inactivity_floor));
205
206 let total_size = self.base_size + ops.len() as u64;
207 let merkle = compact_batch::merkleize_ops::<F, E, H, S, _>(
208 &db.merkle,
209 self.merkle_batch,
210 ops.as_slice(),
211 );
212
213 let inactive_peaks = F::inactive_peaks(
214 F::location_to_position(Location::new(total_size)),
215 inactivity_floor,
216 );
217 let hasher = qmdb::hasher::<H>();
218 let root = db
219 .merkle
220 .with_mem(|mem| merkle.root(mem, &hasher, inactive_peaks))
221 .expect("inactive_peaks computed from batch size");
222
223 let ancestors =
224 batch_chain::parent_and_ancestors(self.parent.as_ref(), |parent| parent.ancestors());
225 let ancestors = batch_chain::collect_ancestor_bounds(
226 ancestors,
227 |batch| batch.bounds.inactivity_floor,
228 |batch| batch.bounds.total_size,
229 );
230
231 Arc::new(MerkleizedBatch {
232 merkle_batch: merkle,
233 root,
234 commit_metadata: metadata,
235 parent: self.parent.as_ref().map(Arc::downgrade),
236 bounds: batch_chain::Bounds {
237 base_size: self.base_size,
238 db_size: self.db_size,
239 total_size,
240 ancestors,
241 inactivity_floor,
242 },
243 _key: PhantomData,
244 })
245 }
246}
247
248impl<F, E, K, V, H, C, S: Strategy> Db<F, E, K, V, H, C, S>
249where
250 F: Family,
251 E: Context,
252 K: Key,
253 V: ValueEncoding,
254 H: Hasher,
255 Operation<F, K, V>: EncodeShared,
256 Operation<F, K, V>: Read<Cfg = C>,
257 C: Clone + Send + Sync + 'static,
258{
259 fn encode_commit_op(metadata: Option<V::Value>, inactivity_floor_loc: Location<F>) -> Vec<u8> {
260 Operation::<F, K, V>::Commit(metadata, inactivity_floor_loc)
261 .encode()
262 .to_vec()
263 }
264
265 async fn load_active_witness(
266 merkle: &compact_merkle::Merkle<F, E, H::Digest, S>,
267 commit_codec_config: &C,
268 ) -> Result<(ServeState<F, H::Digest>, Operation<F, K, V>), Error<F>> {
269 witness::load_active_witness::<F, E, H, S, _, Operation<F, K, V>, _>(
270 merkle,
271 commit_codec_config,
272 Operation::has_floor,
273 )
274 .await
275 }
276
277 #[allow(clippy::too_many_arguments)]
284 pub(crate) fn init_from_verified_state(
285 merkle: compact_merkle::Merkle<F, E, H::Digest, S>,
286 commit_codec_config: C,
287 last_commit_metadata: Option<V::Value>,
288 inactivity_floor_loc: Location<F>,
289 root: H::Digest,
290 last_commit_op_bytes: Vec<u8>,
291 last_commit_proof: Proof<F, H::Digest>,
292 pinned_nodes: Vec<H::Digest>,
293 ) -> Result<Self, Error<F>> {
294 let (last_commit_loc, witness) = witness::witness_from_authenticated_state(
295 &merkle,
296 root,
297 inactivity_floor_loc,
298 last_commit_op_bytes,
299 last_commit_proof,
300 pinned_nodes,
301 )?;
302
303 Ok(Self {
304 merkle,
305 last_commit_loc,
306 last_commit_metadata,
307 inactivity_floor_loc,
308 commit_codec_config,
309 witness: witness::Cache::new(witness),
310 _key: PhantomData,
311 })
312 }
313
314 pub(crate) async fn init_from_merkle(
319 mut merkle: compact_merkle::Merkle<F, E, H::Digest, S>,
320 commit_codec_config: C,
321 ) -> Result<Self, Error<F>>
322 where
323 F: Family,
324 Operation<F, K, V>: Read<Cfg = C>,
325 {
326 if merkle.leaves() == 0 {
334 witness::bootstrap_initial_commit::<F, E, H, S>(
335 &mut merkle,
336 Operation::<F, K, V>::Commit(None, Location::new(0))
337 .encode()
338 .to_vec(),
339 )
340 .await?;
341 }
342
343 let (witness, last_commit_op) =
344 Self::load_active_witness(&merkle, &commit_codec_config).await?;
345 let Operation::Commit(last_commit_metadata, inactivity_floor_loc) = last_commit_op else {
346 return Err(Error::DataCorrupted("last operation was not a commit"));
347 };
348
349 Self::init_from_verified_state(
350 merkle,
351 commit_codec_config,
352 last_commit_metadata,
353 inactivity_floor_loc,
354 witness.root,
355 witness.last_commit_op_bytes,
356 witness.last_commit_proof,
357 witness.pinned_nodes,
358 )
359 }
360
361 pub fn root(&self) -> H::Digest
363 where
364 F: Family,
365 {
366 let hasher = qmdb::hasher::<H>();
367 let inactive_peaks = F::inactive_peaks(
368 F::location_to_position(Location::new(*self.last_commit_loc + 1)),
369 self.inactivity_floor_loc,
370 );
371 self.merkle
372 .root(&hasher, inactive_peaks)
373 .expect("compact Merkle root should not fail")
374 }
375
376 pub const fn strategy(&self) -> &S {
378 self.merkle.strategy()
379 }
380
381 pub const fn last_commit_loc(&self) -> Location<F> {
383 self.last_commit_loc
384 }
385
386 pub const fn inactivity_floor_loc(&self) -> Location<F> {
388 self.inactivity_floor_loc
389 }
390
391 pub fn size(&self) -> Location<F> {
393 Location::new(*self.last_commit_loc + 1)
394 }
395
396 pub fn get_metadata(&self) -> Option<V::Value> {
398 self.last_commit_metadata.clone()
399 }
400
401 pub fn current_target(&self) -> compact_sync::Target<F, H::Digest> {
406 self.witness.with(ServeState::target)
407 }
408
409 pub(crate) fn compact_state(
416 &self,
417 target: compact_sync::Target<F, H::Digest>,
418 ) -> CompactStateResult<F, K, V, H::Digest>
419 where
420 Operation<F, K, V>: Read<Cfg = C>,
421 {
422 let (op_bytes, last_commit_proof, pinned_nodes, leaf_count) = self.witness.with(|w| {
423 if target.root != w.root || target.leaf_count != w.leaf_count {
424 return Err(compact_sync::ServeError::StaleTarget {
425 requested: target.clone(),
426 current: w.target(),
427 });
428 }
429 Ok((
430 w.last_commit_op_bytes.clone(),
431 w.last_commit_proof.clone(),
432 w.pinned_nodes.clone(),
433 w.leaf_count,
434 ))
435 })?;
436 let op = Operation::<F, K, V>::decode_cfg(op_bytes.as_ref(), &self.commit_codec_config)
437 .map_err(|_| {
438 compact_sync::ServeError::Database(Error::DataCorrupted("invalid commit operation"))
439 })?;
440 if !matches!(&op, Operation::Commit(_, _)) {
441 return Err(compact_sync::ServeError::Database(Error::DataCorrupted(
442 "last operation was not a commit",
443 )));
444 }
445 Ok(compact_sync::State {
446 leaf_count,
447 pinned_nodes,
448 last_commit_op: op,
449 last_commit_proof,
450 })
451 }
452
453 pub fn new_batch(&self) -> UnmerkleizedBatch<F, H, K, V, S> {
455 let committed_size = *self.last_commit_loc + 1;
456 UnmerkleizedBatch::new(self, committed_size)
457 }
458
459 pub fn to_batch(&self) -> Arc<MerkleizedBatch<F, H::Digest, K, V, S>>
461 where
462 F: Family,
463 {
464 let committed_size = *self.last_commit_loc + 1;
465 Arc::new(MerkleizedBatch {
466 merkle_batch: self.merkle.to_batch(),
467 root: self.root(),
468 commit_metadata: self.last_commit_metadata.clone(),
469 parent: None,
470 bounds: batch_chain::Bounds {
471 base_size: committed_size,
472 db_size: committed_size,
473 total_size: committed_size,
474 ancestors: Vec::new(),
475 inactivity_floor: self.inactivity_floor_loc,
476 },
477 _key: PhantomData,
478 })
479 }
480
481 pub fn apply_batch(
494 &mut self,
495 batch: Arc<MerkleizedBatch<F, H::Digest, K, V, S>>,
496 ) -> Result<core::ops::Range<Location<F>>, Error<F>> {
497 let db_size = *self.last_commit_loc + 1;
498 batch
499 .bounds
500 .validate_apply_to(db_size, self.inactivity_floor_loc)?;
501
502 let start_loc = self.last_commit_loc + 1;
503 self.merkle.apply_batch(&batch.merkle_batch)?;
504 self.last_commit_loc = Location::new(batch.bounds.total_size - 1);
505 self.last_commit_metadata = batch.commit_metadata.clone();
506 self.inactivity_floor_loc = batch.bounds.inactivity_floor;
507 Ok(start_loc..Location::new(batch.bounds.total_size))
508 }
509
510 pub async fn sync(&self) -> Result<(), Error<F>> {
516 witness::persist_witness::<F, E, H, S>(
517 &self.merkle,
518 &self.witness,
519 self.last_commit_loc,
520 self.inactivity_floor_loc,
521 Self::encode_commit_op(self.last_commit_metadata.clone(), self.inactivity_floor_loc),
522 )
523 .await
524 }
525
526 pub async fn commit(&self) -> Result<(), Error<F>>
528 where
529 F: Family,
530 {
531 self.sync().await
532 }
533
534 pub async fn rewind(&mut self) -> Result<(), Error<F>>
559 where
560 F: Family,
561 {
562 self.merkle.rewind().await?;
563 let (witness, last_commit_op) =
566 Self::load_active_witness(&self.merkle, &self.commit_codec_config).await?;
567 let Operation::Commit(last_commit_metadata, inactivity_floor_loc) = last_commit_op else {
568 return Err(Error::DataCorrupted("last operation was not a commit"));
569 };
570 self.last_commit_metadata = last_commit_metadata;
571 self.inactivity_floor_loc = inactivity_floor_loc;
572 self.last_commit_loc = Location::new(*witness.leaf_count - 1);
573 self.witness.replace(witness);
574 Ok(())
575 }
576
577 pub async fn destroy(self) -> Result<(), Error<F>> {
579 self.merkle.destroy().await.map_err(Into::into)
580 }
581
582 pub(crate) async fn persist_cached_witness(&self) -> Result<(), Error<F>> {
583 witness::persist_cached_witness::<F, E, H, S>(&self.merkle, &self.witness).await
584 }
585}
586
587#[cfg(test)]
588mod tests {
589 use super::*;
590 use crate::{
591 merkle::mmr,
592 metadata::{Config as MConfig, Metadata},
593 qmdb::any::value::FixedEncoding,
594 };
595 use commonware_cryptography::{sha256::Digest, Sha256};
596 use commonware_macros::test_traced;
597 use commonware_parallel::Sequential;
598 use commonware_runtime::{deterministic, Runner as _, Supervisor as _};
599 use commonware_utils::sequence::prefixed_u64::U64 as MetadataKey;
600
601 type TestDb<F> =
602 Db<F, deterministic::Context, Digest, FixedEncoding<Digest>, Sha256, (), Sequential>;
603
604 async fn open_db<F: Family>(context: deterministic::Context, partition: &str) -> TestDb<F> {
605 let merkle = crate::merkle::compact::Merkle::init(
606 context,
607 crate::merkle::compact::Config {
608 partition: partition.into(),
609 strategy: Sequential,
610 },
611 )
612 .await
613 .unwrap();
614 Db::init_from_merkle(merkle, ()).await.unwrap()
615 }
616
617 async fn tamper_metadata_key(
618 context: deterministic::Context,
619 partition: &str,
620 key: MetadataKey,
621 ) {
622 let mut metadata = open_metadata(context, partition).await;
623 let mut bytes = metadata.get(&key).cloned().expect("metadata entry missing");
624 *bytes.last_mut().expect("metadata entry empty") ^= 0x01;
625 metadata.put_sync(key, bytes).await.unwrap();
626 }
627
628 async fn open_metadata(
629 context: deterministic::Context,
630 partition: &str,
631 ) -> Metadata<deterministic::Context, MetadataKey, Vec<u8>> {
632 Metadata::<_, MetadataKey, Vec<u8>>::init(
633 context.child("meta_write"),
634 MConfig {
635 partition: partition.into(),
636 codec_config: ((0..).into(), ()),
637 },
638 )
639 .await
640 .unwrap()
641 }
642
643 async fn overwrite_metadata_key(
644 context: deterministic::Context,
645 partition: &str,
646 key: MetadataKey,
647 bytes: Vec<u8>,
648 ) {
649 let mut metadata = open_metadata(context, partition).await;
650 metadata.put_sync(key, bytes).await.unwrap();
651 }
652
653 #[test_traced("INFO")]
654 fn test_compact_stale_batch_rejected() {
655 deterministic::Runner::default().start(|context| async move {
656 let mut db = open_db::<mmr::Family>(context.child("db"), "immutable-stale").await;
657
658 let key1 = Sha256::hash(&[1]);
659 let key2 = Sha256::hash(&[2]);
660 let value1 = Sha256::fill(10u8);
661 let value2 = Sha256::fill(20u8);
662
663 let batch_a = db
664 .new_batch()
665 .set(key1, value1)
666 .merkleize(&db, None, Location::new(0));
667 let batch_b = db
668 .new_batch()
669 .set(key2, value2)
670 .merkleize(&db, None, Location::new(0));
671
672 let expected_root = batch_a.root();
673 db.apply_batch(batch_a).unwrap();
674 assert_eq!(db.root(), expected_root);
675 assert!(matches!(
676 db.apply_batch(batch_b),
677 Err(Error::StaleBatch { .. })
678 ));
679
680 db.destroy().await.unwrap();
681 });
682 }
683
684 #[test_traced("INFO")]
690 fn test_compact_to_batch_reflects_live_state() {
691 deterministic::Runner::default().start(|context| async move {
692 let mut db =
693 open_db::<mmr::Family>(context.child("db"), "immutable-to-batch-live").await;
694
695 let pre_apply_root = db.root();
696 let pre_snapshot = db.to_batch();
697 assert_eq!(
698 pre_snapshot.root(),
699 pre_apply_root,
700 "snapshot before any mutation should match the live root"
701 );
702
703 let key = Sha256::hash(&[1]);
704 let value = Sha256::fill(10u8);
705 db.apply_batch(
706 db.new_batch()
707 .set(key, value)
708 .merkleize(&db, None, Location::new(0)),
709 )
710 .unwrap();
711
712 let live_root = db.root();
715 assert_ne!(
716 live_root, pre_apply_root,
717 "applying a non-empty batch must change the live root"
718 );
719
720 let snapshot = db.to_batch();
721 assert_eq!(
722 snapshot.root(),
723 live_root,
724 "to_batch().root() must match the live db.root() even before sync/commit"
725 );
726
727 db.destroy().await.unwrap();
728 });
729 }
730
731 #[test_traced("INFO")]
732 fn test_compact_stale_batch_chained() {
733 deterministic::Runner::default().start(|context| async move {
734 let mut db =
735 open_db::<mmr::Family>(context.child("db"), "immutable-chained-stale").await;
736
737 let parent = db
738 .new_batch()
739 .set(Sha256::hash(&[1]), Sha256::fill(1u8))
740 .merkleize(&db, None, Location::new(0));
741 let child_a = parent
742 .new_batch::<Sha256>()
743 .set(Sha256::hash(&[2]), Sha256::fill(2u8))
744 .merkleize(&db, None, Location::new(0));
745 let child_b = parent
746 .new_batch::<Sha256>()
747 .set(Sha256::hash(&[3]), Sha256::fill(3u8))
748 .merkleize(&db, None, Location::new(0));
749
750 db.apply_batch(child_a).unwrap();
751 assert!(matches!(
752 db.apply_batch(child_b),
753 Err(Error::StaleBatch { .. })
754 ));
755
756 db.destroy().await.unwrap();
757 });
758 }
759
760 #[test_traced("INFO")]
761 fn test_compact_stale_parent_after_child_applied() {
762 deterministic::Runner::default().start(|context| async move {
763 let mut db =
764 open_db::<mmr::Family>(context.child("db"), "immutable-child-before-parent").await;
765
766 let parent = db
767 .new_batch()
768 .set(Sha256::hash(&[1]), Sha256::fill(1u8))
769 .merkleize(&db, None, Location::new(0));
770 let child = parent
771 .new_batch::<Sha256>()
772 .set(Sha256::hash(&[2]), Sha256::fill(2u8))
773 .merkleize(&db, None, Location::new(0));
774
775 db.apply_batch(child).unwrap();
776 assert!(matches!(
777 db.apply_batch(parent),
778 Err(Error::StaleBatch { .. })
779 ));
780
781 db.destroy().await.unwrap();
782 });
783 }
784
785 #[test_traced("INFO")]
786 fn test_compact_sequential_commit_parent_then_child() {
787 deterministic::Runner::default().start(|context| async move {
788 let mut db =
789 open_db::<mmr::Family>(context.child("db"), "immutable-parent-child").await;
790
791 let parent = db
792 .new_batch()
793 .set(Sha256::hash(&[1]), Sha256::fill(1u8))
794 .merkleize(&db, None, Location::new(0));
795 let child = parent
796 .new_batch::<Sha256>()
797 .set(Sha256::hash(&[2]), Sha256::fill(2u8))
798 .merkleize(&db, None, Location::new(0));
799 let expected_root = child.root();
800
801 db.apply_batch(parent).unwrap();
802 db.apply_batch(child).unwrap();
803 db.commit().await.unwrap();
804
805 assert_eq!(db.root(), expected_root);
806
807 db.destroy().await.unwrap();
808 });
809 }
810
811 #[test_traced("INFO")]
812 fn test_compact_floor_regressed() {
813 deterministic::Runner::default().start(|context| async move {
814 let mut db =
815 open_db::<mmr::Family>(context.child("db"), "immutable-floor-regressed").await;
816
817 let advance_floor = db.new_batch().set(Sha256::hash(&[1]), Sha256::fill(1u8));
818 let advance_floor = advance_floor.merkleize(&db, None, Location::new(1));
819 db.apply_batch(advance_floor).unwrap();
820
821 let regressed = db
822 .new_batch()
823 .set(Sha256::hash(&[2]), Sha256::fill(2u8))
824 .merkleize(&db, None, Location::new(0));
825
826 assert!(matches!(
827 db.apply_batch(regressed),
828 Err(Error::FloorRegressed(new, current))
829 if new == Location::new(0) && current == Location::new(1)
830 ));
831
832 db.destroy().await.unwrap();
833 });
834 }
835
836 #[test_traced("INFO")]
837 fn test_compact_rejects_regressed_ancestor_floor() {
838 deterministic::Runner::default().start(|context| async move {
839 let mut db =
840 open_db::<mmr::Family>(context.child("db"), "immutable-regressed-ancestor-floor")
841 .await;
842
843 let parent = db
844 .new_batch()
845 .set(Sha256::hash(&[1]), Sha256::fill(1u8))
846 .merkleize(&db, None, Location::new(1));
847 let child = parent
848 .new_batch::<Sha256>()
849 .set(Sha256::hash(&[2]), Sha256::fill(2u8))
850 .merkleize(&db, None, Location::new(0));
851
852 assert!(matches!(
853 db.apply_batch(child),
854 Err(Error::FloorRegressed(new, prev))
855 if new == Location::new(0) && prev == Location::new(1)
856 ));
857
858 db.destroy().await.unwrap();
859 });
860 }
861
862 #[test_traced("INFO")]
863 fn test_compact_rewind_restores_commit_metadata_and_floor() {
864 deterministic::Runner::default().start(|context| async move {
865 let mut db = open_db::<mmr::Family>(context.child("db"), "immutable-rewind-meta").await;
866
867 let k1 = Sha256::hash(&[1]);
868 let v1 = Sha256::fill(11u8);
869 let meta1 = Sha256::fill(0xaa);
870 let floor1 = Location::new(0);
871 db.apply_batch(
872 db.new_batch()
873 .set(k1, v1)
874 .merkleize(&db, Some(meta1), floor1),
875 )
876 .unwrap();
877 db.commit().await.unwrap();
878 let root_after_first = db.root();
879
880 let k2 = Sha256::hash(&[2]);
881 let v2 = Sha256::fill(22u8);
882 let meta2 = Sha256::fill(0xbb);
883 let floor2 = Location::new(1);
885 db.apply_batch(
886 db.new_batch()
887 .set(k2, v2)
888 .merkleize(&db, Some(meta2), floor2),
889 )
890 .unwrap();
891 db.commit().await.unwrap();
892 assert_eq!(db.get_metadata(), Some(meta2));
893 assert_eq!(db.inactivity_floor_loc(), floor2);
894
895 db.rewind().await.unwrap();
896 assert_eq!(db.root(), root_after_first);
897 assert_eq!(db.get_metadata(), Some(meta1));
898 assert_eq!(db.inactivity_floor_loc(), floor1);
899
900 db.destroy().await.unwrap();
901 });
902 }
903
904 #[test_traced("INFO")]
905 fn test_compact_rewind_persists_across_reopen() {
906 deterministic::Runner::default().start(|context| async move {
907 let partition = "immutable-rewind-reopen";
908 let meta1 = Sha256::fill(0xaa);
909 let floor1 = Location::new(0);
910 let meta2 = Sha256::fill(0xbb);
911 let floor2 = Location::new(1);
912
913 let root_after_first = {
914 let mut db = open_db::<mmr::Family>(context.child("first"), partition).await;
915 db.apply_batch(
916 db.new_batch()
917 .set(Sha256::hash(&[1]), Sha256::fill(11u8))
918 .merkleize(&db, Some(meta1), floor1),
919 )
920 .unwrap();
921 db.commit().await.unwrap();
922 let root = db.root();
923
924 db.apply_batch(
925 db.new_batch()
926 .set(Sha256::hash(&[2]), Sha256::fill(22u8))
927 .merkleize(&db, Some(meta2), floor2),
928 )
929 .unwrap();
930 db.commit().await.unwrap();
931
932 db.rewind().await.unwrap();
933 root
934 };
935
936 let db = open_db::<mmr::Family>(context.child("second"), partition).await;
937 assert_eq!(db.root(), root_after_first);
938 assert_eq!(db.get_metadata(), Some(meta1));
939 assert_eq!(db.inactivity_floor_loc(), floor1);
940
941 db.destroy().await.unwrap();
942 });
943 }
944
945 #[test_traced("INFO")]
946 fn test_compact_reopen_rejects_tampered_witness() {
947 deterministic::Runner::default().start(|context| async move {
948 let partition = "immutable-witness-tamper";
949 let mut db = open_db::<mmr::Family>(context.child("db"), partition).await;
950 db.apply_batch(
951 db.new_batch()
952 .set(Sha256::hash(&[7]), Sha256::fill(7u8))
953 .merkleize(&db, Some(Sha256::fill(0xaa)), Location::new(1)),
954 )
955 .unwrap();
956 db.commit().await.unwrap();
957 let slot = db.merkle.active_slot();
958 drop(db);
959
960 tamper_metadata_key(
961 context.child("tamper"),
962 partition,
963 crate::qmdb::compact::witness::last_commit_proof_key(slot),
964 )
965 .await;
966
967 let merkle: crate::merkle::compact::Merkle<mmr::Family, _, _, Sequential> =
968 crate::merkle::compact::Merkle::init(
969 context.child("reopen"),
970 crate::merkle::compact::Config {
971 partition: partition.into(),
972 strategy: Sequential,
973 },
974 )
975 .await
976 .unwrap();
977 let reopened = TestDb::<mmr::Family>::init_from_merkle(merkle, ()).await;
978 assert!(matches!(reopened, Err(Error::DataCorrupted(_))));
979 });
980 }
981
982 #[test_traced("INFO")]
983 fn test_compact_reopen_rejects_commit_floor_beyond_tip() {
984 deterministic::Runner::default().start(|context| async move {
985 let partition = "immutable-invalid-persisted-floor";
986 let mut db = open_db::<mmr::Family>(context.child("db"), partition).await;
987 db.apply_batch(
988 db.new_batch()
989 .set(Sha256::hash(&[7]), Sha256::fill(7u8))
990 .merkleize(&db, Some(Sha256::fill(0xaa)), Location::new(1)),
991 )
992 .unwrap();
993 db.commit().await.unwrap();
994 let slot = db.merkle.active_slot();
995 drop(db);
996 let oversized_floor = Location::new(10);
997
998 overwrite_metadata_key(
999 context.child("tamper"),
1000 partition,
1001 crate::qmdb::compact::witness::last_commit_op_key(slot),
1002 Operation::<mmr::Family, Digest, FixedEncoding<Digest>>::Commit(
1003 Some(Sha256::fill(0xaa)),
1004 oversized_floor,
1005 )
1006 .encode()
1007 .to_vec(),
1008 )
1009 .await;
1010
1011 let merkle: crate::merkle::compact::Merkle<mmr::Family, _, _, Sequential> =
1012 crate::merkle::compact::Merkle::init(
1013 context.child("reopen"),
1014 crate::merkle::compact::Config {
1015 partition: partition.into(),
1016 strategy: Sequential,
1017 },
1018 )
1019 .await
1020 .unwrap();
1021 let reopened = TestDb::<mmr::Family>::init_from_merkle(merkle, ()).await;
1022 assert!(matches!(
1023 reopened,
1024 Err(Error::DataCorrupted("invalid compact witness"))
1025 ));
1026 });
1027 }
1028
1029 #[test_traced("INFO")]
1030 fn test_compact_rewind_beyond_history() {
1031 deterministic::Runner::default().start(|context| async move {
1032 let mut db =
1033 open_db::<mmr::Family>(context.child("db"), "immutable-rewind-beyond").await;
1034 assert!(matches!(
1037 db.rewind().await,
1038 Err(Error::Merkle(crate::merkle::Error::RewindBeyondHistory))
1039 ));
1040 db.destroy().await.unwrap();
1041 });
1042 }
1043
1044 #[test_traced("INFO")]
1045 fn test_compact_rewind_preserves_pre_advance_batch() {
1046 deterministic::Runner::default().start(|context| async move {
1047 let mut db = open_db::<mmr::Family>(
1048 context.child("db"),
1049 "immutable-rewind-preserves-pre-advance",
1050 )
1051 .await;
1052
1053 db.apply_batch(
1054 db.new_batch()
1055 .set(Sha256::hash(&[1]), Sha256::fill(1u8))
1056 .merkleize(&db, None, Location::new(0)),
1057 )
1058 .unwrap();
1059 db.commit().await.unwrap();
1060
1061 let held = db
1063 .new_batch()
1064 .set(Sha256::hash(&[2]), Sha256::fill(2u8))
1065 .merkleize(&db, None, Location::new(0));
1066
1067 db.apply_batch(
1069 db.new_batch()
1070 .set(Sha256::hash(&[3]), Sha256::fill(3u8))
1071 .merkleize(&db, None, Location::new(0)),
1072 )
1073 .unwrap();
1074 db.commit().await.unwrap();
1075 db.rewind().await.unwrap();
1076
1077 db.apply_batch(held).unwrap();
1080
1081 db.destroy().await.unwrap();
1082 });
1083 }
1084
1085 #[test_traced("INFO")]
1086 fn test_compact_noop_commit_after_commit() {
1087 deterministic::Runner::default().start(|context| async move {
1088 let mut db =
1089 open_db::<mmr::Family>(context.child("db"), "immutable-noop-after-commit").await;
1090
1091 let k1 = Sha256::hash(&[1]);
1092 let v1 = Sha256::fill(11u8);
1093 let k2 = Sha256::hash(&[2]);
1094 let v2 = Sha256::fill(22u8);
1095 db.apply_batch(db.new_batch().set(k1, v1).set(k2, v2).merkleize(
1096 &db,
1097 Some(Sha256::fill(0xaa)),
1098 Location::new(0),
1099 ))
1100 .unwrap();
1101 db.commit().await.unwrap();
1102 let root_after_first = db.root();
1103 let size_after_first = db.size();
1104
1105 db.commit().await.unwrap();
1106 assert_eq!(db.size(), size_after_first);
1107 assert_eq!(db.root(), root_after_first);
1108 assert_eq!(db.current_target().root, db.root());
1109
1110 db.destroy().await.unwrap();
1111 });
1112 }
1113
1114 #[test_traced("INFO")]
1115 fn test_compact_noop_commit_after_reopen() {
1116 deterministic::Runner::default().start(|context| async move {
1117 let partition = "immutable-noop-after-reopen";
1118
1119 let (root_before_drop, size_before_drop) = {
1120 let mut db = open_db::<mmr::Family>(context.child("first"), partition).await;
1121 let k1 = Sha256::hash(&[1]);
1122 let v1 = Sha256::fill(11u8);
1123 let k2 = Sha256::hash(&[2]);
1124 let v2 = Sha256::fill(22u8);
1125 db.apply_batch(db.new_batch().set(k1, v1).set(k2, v2).merkleize(
1126 &db,
1127 Some(Sha256::fill(0xaa)),
1128 Location::new(0),
1129 ))
1130 .unwrap();
1131 db.commit().await.unwrap();
1132 (db.root(), db.size())
1133 };
1134
1135 let db = open_db::<mmr::Family>(context.child("second"), partition).await;
1136 assert_eq!(db.root(), root_before_drop);
1137 assert_eq!(db.size(), size_before_drop);
1138
1139 db.commit().await.unwrap();
1140 assert_eq!(db.size(), size_before_drop);
1141 assert_eq!(db.root(), root_before_drop);
1142 assert_eq!(db.current_target().root, db.root());
1143
1144 db.destroy().await.unwrap();
1145 });
1146 }
1147
1148 #[test_traced("INFO")]
1149 fn test_compact_noop_commit_after_rewind() {
1150 deterministic::Runner::default().start(|context| async move {
1151 let mut db =
1152 open_db::<mmr::Family>(context.child("db"), "immutable-noop-after-rewind").await;
1153
1154 let k1 = Sha256::hash(&[1]);
1155 let v1 = Sha256::fill(11u8);
1156 let k2 = Sha256::hash(&[2]);
1157 let v2 = Sha256::fill(22u8);
1158 db.apply_batch(db.new_batch().set(k1, v1).set(k2, v2).merkleize(
1159 &db,
1160 Some(Sha256::fill(0xaa)),
1161 Location::new(0),
1162 ))
1163 .unwrap();
1164 db.commit().await.unwrap();
1165 let root_after_first = db.root();
1166 let size_after_first = db.size();
1167
1168 let k3 = Sha256::hash(&[3]);
1169 let v3 = Sha256::fill(33u8);
1170 db.apply_batch(db.new_batch().set(k3, v3).merkleize(
1171 &db,
1172 Some(Sha256::fill(0xbb)),
1173 Location::new(1),
1174 ))
1175 .unwrap();
1176 db.commit().await.unwrap();
1177
1178 db.rewind().await.unwrap();
1179 assert_eq!(db.size(), size_after_first);
1180 assert_eq!(db.root(), root_after_first);
1181
1182 db.commit().await.unwrap();
1183 assert_eq!(db.size(), size_after_first);
1184 assert_eq!(db.root(), root_after_first);
1185 assert_eq!(db.current_target().root, db.root());
1186
1187 db.destroy().await.unwrap();
1188 });
1189 }
1190
1191 #[test_traced("INFO")]
1192 fn test_compact_rewind_makes_post_advance_batch_stale() {
1193 deterministic::Runner::default().start(|context| async move {
1194 let mut db =
1195 open_db::<mmr::Family>(context.child("db"), "immutable-rewind-makes-stale").await;
1196
1197 db.apply_batch(
1198 db.new_batch()
1199 .set(Sha256::hash(&[1]), Sha256::fill(1u8))
1200 .merkleize(&db, None, Location::new(0)),
1201 )
1202 .unwrap();
1203 db.commit().await.unwrap();
1204
1205 db.apply_batch(
1206 db.new_batch()
1207 .set(Sha256::hash(&[2]), Sha256::fill(2u8))
1208 .merkleize(&db, None, Location::new(0)),
1209 )
1210 .unwrap();
1211 db.commit().await.unwrap();
1212
1213 let held = db
1215 .new_batch()
1216 .set(Sha256::hash(&[3]), Sha256::fill(3u8))
1217 .merkleize(&db, None, Location::new(0));
1218
1219 db.rewind().await.unwrap();
1220
1221 assert!(matches!(
1224 db.apply_batch(held),
1225 Err(Error::StaleBatch { .. })
1226 ));
1227
1228 db.destroy().await.unwrap();
1229 });
1230 }
1231
1232 #[test_traced("INFO")]
1233 fn test_witness_state_reports_cached_commit_corruption() {
1234 deterministic::Runner::default().start(|context| async move {
1235 let db =
1236 open_db::<mmr::Family>(context.child("db"), "immutable-serve-corruption").await;
1237 let target = db.current_target();
1238 db.witness
1239 .mutate(|witness| witness.last_commit_op_bytes.clear());
1240
1241 assert!(matches!(
1242 db.compact_state(target),
1243 Err(compact_sync::ServeError::Database(Error::DataCorrupted(
1244 "invalid commit operation"
1245 )))
1246 ));
1247
1248 db.destroy().await.unwrap();
1249 });
1250 }
1251
1252 #[test_traced("INFO")]
1253 fn test_compact_floor_beyond_size() {
1254 deterministic::Runner::default().start(|context| async move {
1255 let mut db =
1256 open_db::<mmr::Family>(context.child("db"), "immutable-floor-beyond").await;
1257
1258 let batch = db.new_batch().merkleize(&db, None, Location::new(2));
1259
1260 assert!(matches!(
1261 db.apply_batch(batch),
1262 Err(Error::FloorBeyondSize(floor, tip))
1263 if floor == Location::new(2) && tip == Location::new(1)
1264 ));
1265
1266 db.destroy().await.unwrap();
1267 });
1268 }
1269}