1use super::operation::Operation;
24use crate::{
25 merkle::{batch, compact as compact_merkle, Family, Location, Proof},
26 qmdb::{
27 self,
28 any::value::ValueEncoding,
29 batch_chain::{self, Bounds},
30 compact::{
31 batch as compact_batch,
32 witness::{self, ServeState},
33 },
34 sync::compact as compact_sync,
35 Error,
36 },
37 Context,
38};
39use commonware_codec::{Decode as _, Encode, EncodeShared, Read};
40use commonware_cryptography::{Digest, Hasher};
41use commonware_parallel::Strategy;
42use std::sync::{Arc, Weak};
43
44#[derive(Clone)]
46pub struct Config<C, S: Strategy> {
47 pub merkle: compact_merkle::Config<S>,
49
50 pub commit_codec_config: C,
52}
53
54pub struct Db<F, E, V, H, C, S: Strategy>
56where
57 F: Family,
58 E: Context,
59 V: ValueEncoding,
60 H: Hasher,
61 Operation<F, V>: EncodeShared,
62 Operation<F, V>: Read<Cfg = C>,
63 C: Clone + Send + Sync + 'static,
64{
65 merkle: compact_merkle::Merkle<F, E, H::Digest, S>,
66 last_commit_loc: Location<F>,
67 last_commit_metadata: Option<V::Value>,
68 inactivity_floor_loc: Location<F>,
69 commit_codec_config: C,
70 witness: witness::Cache<F, H::Digest>,
76}
77
78type CompactStateResult<F, V, D> =
79 Result<compact_sync::State<F, Operation<F, V>, D>, compact_sync::ServeError<F, D>>;
80
81#[allow(clippy::type_complexity)]
83pub struct UnmerkleizedBatch<F, H, V, S: Strategy>
84where
85 F: Family,
86 V: ValueEncoding,
87 H: Hasher,
88 Operation<F, V>: EncodeShared,
89{
90 merkle_batch: compact_merkle::UnmerkleizedBatch<F, H::Digest, S>,
91 appends: Vec<V::Value>,
92 parent: Option<Arc<MerkleizedBatch<F, H::Digest, V, S>>>,
93 base_size: u64,
94 db_size: u64,
95}
96
97#[derive(Clone)]
99pub struct MerkleizedBatch<F: Family, D: Digest, V: ValueEncoding, S: Strategy>
100where
101 Operation<F, V>: EncodeShared,
102{
103 pub(super) merkle_batch: Arc<batch::MerkleizedBatch<F, D, S>>,
104 pub(super) root: D,
105 pub(super) commit_metadata: Option<V::Value>,
106 pub(super) parent: Option<Weak<Self>>,
107 pub(super) bounds: batch_chain::Bounds<F>,
108}
109
110impl<F: Family, D: Digest, V: ValueEncoding, S: Strategy> MerkleizedBatch<F, D, V, S>
111where
112 Operation<F, V>: EncodeShared,
113{
114 pub(super) fn ancestors(&self) -> impl Iterator<Item = Arc<Self>> {
115 batch_chain::ancestors(self.parent.clone(), |batch| batch.parent.as_ref())
116 }
117
118 pub const fn root(&self) -> D {
120 self.root
121 }
122
123 pub const fn bounds(&self) -> &Bounds<F> {
125 &self.bounds
126 }
127
128 pub fn new_batch<H>(self: &Arc<Self>) -> UnmerkleizedBatch<F, H, V, S>
130 where
131 H: Hasher<Digest = D>,
132 {
133 UnmerkleizedBatch {
134 merkle_batch: compact_merkle::UnmerkleizedBatch::wrap(self.merkle_batch.new_batch()),
135 appends: Vec::new(),
136 parent: Some(Arc::clone(self)),
137 base_size: self.bounds.total_size,
138 db_size: self.bounds.db_size,
139 }
140 }
141}
142
143impl<F, H, V, S> UnmerkleizedBatch<F, H, V, S>
144where
145 F: Family,
146 V: ValueEncoding,
147 H: Hasher,
148 S: Strategy,
149 Operation<F, V>: EncodeShared,
150{
151 pub(super) fn new<E, C>(db: &Db<F, E, V, H, C, S>, committed_size: u64) -> Self
152 where
153 E: Context,
154 C: Clone + Send + Sync + 'static,
155 Operation<F, V>: Read<Cfg = C>,
156 {
157 Self {
158 merkle_batch: db.merkle.new_batch(),
159 appends: Vec::new(),
160 parent: None,
161 base_size: committed_size,
162 db_size: committed_size,
163 }
164 }
165
166 pub fn append(mut self, value: V::Value) -> Self {
167 self.appends.push(value);
168 self
169 }
170
171 pub fn merkleize<E, C>(
179 self,
180 db: &Db<F, E, V, H, C, S>,
181 metadata: Option<V::Value>,
182 inactivity_floor: Location<F>,
183 ) -> Arc<MerkleizedBatch<F, H::Digest, V, S>>
184 where
185 F: Family,
186 E: Context,
187 C: Clone + Send + Sync + 'static,
188 Operation<F, V>: Read<Cfg = C>,
189 {
190 let mut ops: Vec<Operation<F, V>> = Vec::with_capacity(self.appends.len() + 1);
191 for value in self.appends {
192 ops.push(Operation::Append(value));
193 }
194 ops.push(Operation::Commit(metadata.clone(), inactivity_floor));
195
196 let total_size = self.base_size + ops.len() as u64;
197 let merkle = compact_batch::merkleize_ops::<F, E, H, S, _>(
198 &db.merkle,
199 self.merkle_batch,
200 ops.as_slice(),
201 );
202
203 let inactive_peaks = F::inactive_peaks(
204 F::location_to_position(Location::new(total_size)),
205 inactivity_floor,
206 );
207 let hasher = qmdb::hasher::<H>();
208 let root = db
209 .merkle
210 .with_mem(|mem| merkle.root(mem, &hasher, inactive_peaks))
211 .expect("inactive_peaks computed from batch size");
212
213 let ancestors =
214 batch_chain::parent_and_ancestors(self.parent.as_ref(), |parent| parent.ancestors());
215 let ancestors = batch_chain::collect_ancestor_bounds(
216 ancestors,
217 |batch| batch.bounds.inactivity_floor,
218 |batch| batch.bounds.total_size,
219 );
220
221 Arc::new(MerkleizedBatch {
222 merkle_batch: merkle,
223 root,
224 commit_metadata: metadata,
225 parent: self.parent.as_ref().map(Arc::downgrade),
226 bounds: batch_chain::Bounds {
227 base_size: self.base_size,
228 db_size: self.db_size,
229 total_size,
230 ancestors,
231 inactivity_floor,
232 },
233 })
234 }
235}
236
237impl<F, E, V, H, C, S> Db<F, E, V, H, C, S>
238where
239 F: Family,
240 E: Context,
241 V: ValueEncoding,
242 H: Hasher,
243 S: Strategy,
244 Operation<F, V>: EncodeShared,
245 Operation<F, V>: Read<Cfg = C>,
246 C: Clone + Send + Sync + 'static,
247{
248 fn encode_commit_op(metadata: Option<V::Value>, inactivity_floor_loc: Location<F>) -> Vec<u8> {
249 Operation::<F, V>::Commit(metadata, inactivity_floor_loc)
250 .encode()
251 .to_vec()
252 }
253
254 async fn load_active_witness(
255 merkle: &compact_merkle::Merkle<F, E, H::Digest, S>,
256 commit_codec_config: &C,
257 ) -> Result<(ServeState<F, H::Digest>, Operation<F, V>), Error<F>> {
258 witness::load_active_witness::<F, E, H, S, _, Operation<F, V>, _>(
259 merkle,
260 commit_codec_config,
261 Operation::has_floor,
262 )
263 .await
264 }
265
266 #[allow(clippy::too_many_arguments)]
273 pub(crate) fn init_from_verified_state(
274 merkle: compact_merkle::Merkle<F, E, H::Digest, S>,
275 commit_codec_config: C,
276 last_commit_metadata: Option<V::Value>,
277 inactivity_floor_loc: Location<F>,
278 root: H::Digest,
279 last_commit_op_bytes: Vec<u8>,
280 last_commit_proof: Proof<F, H::Digest>,
281 pinned_nodes: Vec<H::Digest>,
282 ) -> Result<Self, Error<F>> {
283 let (last_commit_loc, witness) = witness::witness_from_authenticated_state(
284 &merkle,
285 root,
286 inactivity_floor_loc,
287 last_commit_op_bytes,
288 last_commit_proof,
289 pinned_nodes,
290 )?;
291
292 Ok(Self {
293 merkle,
294 last_commit_loc,
295 last_commit_metadata,
296 inactivity_floor_loc,
297 commit_codec_config,
298 witness: witness::Cache::new(witness),
299 })
300 }
301
302 pub(crate) async fn init_from_merkle(
307 mut merkle: compact_merkle::Merkle<F, E, H::Digest, S>,
308 commit_codec_config: C,
309 ) -> Result<Self, Error<F>>
310 where
311 F: Family,
312 Operation<F, V>: Read<Cfg = C>,
313 {
314 if merkle.leaves() == 0 {
322 witness::bootstrap_initial_commit::<F, E, H, S>(
323 &mut merkle,
324 Operation::<F, V>::Commit(None, Location::new(0))
325 .encode()
326 .to_vec(),
327 )
328 .await?;
329 }
330
331 let (witness, last_commit_op) =
332 Self::load_active_witness(&merkle, &commit_codec_config).await?;
333 let Operation::Commit(last_commit_metadata, inactivity_floor_loc) = last_commit_op else {
334 return Err(Error::DataCorrupted("last operation was not a commit"));
335 };
336
337 Self::init_from_verified_state(
338 merkle,
339 commit_codec_config,
340 last_commit_metadata,
341 inactivity_floor_loc,
342 witness.root,
343 witness.last_commit_op_bytes,
344 witness.last_commit_proof,
345 witness.pinned_nodes,
346 )
347 }
348
349 pub fn root(&self) -> H::Digest
351 where
352 F: Family,
353 {
354 let hasher = qmdb::hasher::<H>();
355 let inactive_peaks = F::inactive_peaks(
356 F::location_to_position(Location::new(*self.last_commit_loc + 1)),
357 self.inactivity_floor_loc,
358 );
359 self.merkle
360 .root(&hasher, inactive_peaks)
361 .expect("compact Merkle root should not fail")
362 }
363
364 pub const fn strategy(&self) -> &S {
366 self.merkle.strategy()
367 }
368
369 pub const fn last_commit_loc(&self) -> Location<F> {
371 self.last_commit_loc
372 }
373
374 pub const fn inactivity_floor_loc(&self) -> Location<F> {
376 self.inactivity_floor_loc
377 }
378
379 pub fn size(&self) -> Location<F> {
381 Location::new(*self.last_commit_loc + 1)
382 }
383
384 pub fn get_metadata(&self) -> Option<V::Value> {
386 self.last_commit_metadata.clone()
387 }
388
389 pub fn current_target(&self) -> compact_sync::Target<F, H::Digest> {
394 self.witness.with(ServeState::target)
395 }
396
397 pub(crate) fn compact_state(
404 &self,
405 target: compact_sync::Target<F, H::Digest>,
406 ) -> CompactStateResult<F, V, H::Digest>
407 where
408 Operation<F, V>: Read<Cfg = C>,
409 {
410 let (op_bytes, last_commit_proof, pinned_nodes, leaf_count) = self.witness.with(|w| {
411 if target.root != w.root || target.leaf_count != w.leaf_count {
412 return Err(compact_sync::ServeError::StaleTarget {
413 requested: target.clone(),
414 current: w.target(),
415 });
416 }
417 Ok((
418 w.last_commit_op_bytes.clone(),
419 w.last_commit_proof.clone(),
420 w.pinned_nodes.clone(),
421 w.leaf_count,
422 ))
423 })?;
424 let op = Operation::<F, V>::decode_cfg(op_bytes.as_ref(), &self.commit_codec_config)
425 .map_err(|_| {
426 compact_sync::ServeError::Database(Error::DataCorrupted("invalid commit operation"))
427 })?;
428 if !matches!(&op, Operation::Commit(_, _)) {
429 return Err(compact_sync::ServeError::Database(Error::DataCorrupted(
430 "last operation was not a commit",
431 )));
432 }
433 Ok(compact_sync::State {
434 leaf_count,
435 pinned_nodes,
436 last_commit_op: op,
437 last_commit_proof,
438 })
439 }
440
441 pub fn new_batch(&self) -> UnmerkleizedBatch<F, H, V, S> {
443 let committed_size = *self.last_commit_loc + 1;
444 UnmerkleizedBatch::new(self, committed_size)
445 }
446
447 pub fn to_batch(&self) -> Arc<MerkleizedBatch<F, H::Digest, V, S>>
449 where
450 F: Family,
451 {
452 let committed_size = *self.last_commit_loc + 1;
453 Arc::new(MerkleizedBatch {
454 merkle_batch: self.merkle.to_batch(),
455 root: self.root(),
456 commit_metadata: self.last_commit_metadata.clone(),
457 parent: None,
458 bounds: batch_chain::Bounds {
459 base_size: committed_size,
460 db_size: committed_size,
461 total_size: committed_size,
462 ancestors: Vec::new(),
463 inactivity_floor: self.inactivity_floor_loc,
464 },
465 })
466 }
467
468 pub fn apply_batch(
481 &mut self,
482 batch: Arc<MerkleizedBatch<F, H::Digest, V, S>>,
483 ) -> Result<core::ops::Range<Location<F>>, Error<F>> {
484 let db_size = *self.last_commit_loc + 1;
485 batch
486 .bounds
487 .validate_apply_to(db_size, self.inactivity_floor_loc)?;
488
489 let start_loc = self.last_commit_loc + 1;
490 self.merkle.apply_batch(&batch.merkle_batch)?;
491 self.last_commit_loc = Location::new(batch.bounds.total_size - 1);
492 self.last_commit_metadata = batch.commit_metadata.clone();
493 self.inactivity_floor_loc = batch.bounds.inactivity_floor;
494 Ok(start_loc..Location::new(batch.bounds.total_size))
495 }
496
497 pub async fn sync(&self) -> Result<(), Error<F>> {
503 witness::persist_witness::<F, E, H, S>(
504 &self.merkle,
505 &self.witness,
506 self.last_commit_loc,
507 self.inactivity_floor_loc,
508 Self::encode_commit_op(self.last_commit_metadata.clone(), self.inactivity_floor_loc),
509 )
510 .await
511 }
512
513 pub async fn commit(&self) -> Result<(), Error<F>>
515 where
516 F: Family,
517 {
518 self.sync().await
519 }
520
521 pub async fn rewind(&mut self) -> Result<(), Error<F>>
546 where
547 F: Family,
548 {
549 self.merkle.rewind().await?;
550 let (witness, last_commit_op) =
553 Self::load_active_witness(&self.merkle, &self.commit_codec_config).await?;
554 let Operation::Commit(last_commit_metadata, inactivity_floor_loc) = last_commit_op else {
555 return Err(Error::DataCorrupted("last operation was not a commit"));
556 };
557 self.last_commit_metadata = last_commit_metadata;
558 self.inactivity_floor_loc = inactivity_floor_loc;
559 self.last_commit_loc = Location::new(*witness.leaf_count - 1);
560 self.witness.replace(witness);
561 Ok(())
562 }
563
564 pub async fn destroy(self) -> Result<(), Error<F>> {
566 self.merkle.destroy().await.map_err(Into::into)
567 }
568
569 pub(crate) async fn persist_cached_witness(&self) -> Result<(), Error<F>> {
570 witness::persist_cached_witness::<F, E, H, S>(&self.merkle, &self.witness).await
571 }
572}
573
574#[cfg(test)]
575mod tests {
576 use super::*;
577 use crate::{
578 merkle::mmr,
579 metadata::{Config as MConfig, Metadata},
580 qmdb::any::value::FixedEncoding,
581 };
582 use commonware_cryptography::Sha256;
583 use commonware_macros::test_traced;
584 use commonware_parallel::Sequential;
585 use commonware_runtime::{deterministic, Runner as _, Supervisor as _};
586 use commonware_utils::sequence::{prefixed_u64::U64 as MetadataKey, U64};
587
588 type TestDb<F> = Db<F, deterministic::Context, FixedEncoding<U64>, Sha256, (), Sequential>;
589
590 async fn open_db<F: Family>(context: deterministic::Context, partition: &str) -> TestDb<F> {
591 let merkle = crate::merkle::compact::Merkle::init(
592 context,
593 crate::merkle::compact::Config {
594 partition: partition.into(),
595 strategy: Sequential,
596 },
597 )
598 .await
599 .unwrap();
600 Db::init_from_merkle(merkle, ()).await.unwrap()
601 }
602
603 async fn tamper_metadata_key(
604 context: deterministic::Context,
605 partition: &str,
606 key: MetadataKey,
607 ) {
608 let mut metadata = open_metadata(context, partition).await;
609 let mut bytes = metadata.get(&key).cloned().expect("metadata entry missing");
610 *bytes.last_mut().expect("metadata entry empty") ^= 0x01;
611 metadata.put_sync(key, bytes).await.unwrap();
612 }
613
614 async fn open_metadata(
615 context: deterministic::Context,
616 partition: &str,
617 ) -> Metadata<deterministic::Context, MetadataKey, Vec<u8>> {
618 Metadata::<_, MetadataKey, Vec<u8>>::init(
619 context.child("meta_write"),
620 MConfig {
621 partition: partition.into(),
622 codec_config: ((0..).into(), ()),
623 },
624 )
625 .await
626 .unwrap()
627 }
628
629 async fn overwrite_metadata_key(
630 context: deterministic::Context,
631 partition: &str,
632 key: MetadataKey,
633 bytes: Vec<u8>,
634 ) {
635 let mut metadata = open_metadata(context, partition).await;
636 metadata.put_sync(key, bytes).await.unwrap();
637 }
638
639 #[test_traced("INFO")]
640 fn test_compact_stale_batch_rejected() {
641 deterministic::Runner::default().start(|context| async move {
642 let mut db = open_db::<mmr::Family>(context.child("db"), "keyless-stale").await;
643 let floor = db.inactivity_floor_loc();
644
645 let batch_a =
646 db.new_batch()
647 .append(U64::new(1))
648 .merkleize(&db, Some(U64::new(11)), floor);
649 let batch_b =
650 db.new_batch()
651 .append(U64::new(2))
652 .merkleize(&db, Some(U64::new(22)), floor);
653
654 let expected_root = batch_a.root();
655 db.apply_batch(batch_a).unwrap();
656 assert_eq!(db.root(), expected_root);
657 assert!(matches!(
658 db.apply_batch(batch_b),
659 Err(Error::StaleBatch { .. })
660 ));
661
662 db.destroy().await.unwrap();
663 });
664 }
665
666 #[test_traced("INFO")]
669 fn test_compact_to_batch_reflects_live_state() {
670 deterministic::Runner::default().start(|context| async move {
671 let mut db = open_db::<mmr::Family>(context.child("db"), "keyless-to-batch-live").await;
672 let floor = db.inactivity_floor_loc();
673
674 let pre_apply_root = db.root();
675 let pre_snapshot = db.to_batch();
676 assert_eq!(
677 pre_snapshot.root(),
678 pre_apply_root,
679 "snapshot before any mutation should match the live root"
680 );
681
682 db.apply_batch(db.new_batch().append(U64::new(1)).merkleize(
683 &db,
684 Some(U64::new(11)),
685 floor,
686 ))
687 .unwrap();
688
689 let live_root = db.root();
691 assert_ne!(
692 live_root, pre_apply_root,
693 "applying a non-empty batch must change the live root"
694 );
695
696 let snapshot = db.to_batch();
697 assert_eq!(
698 snapshot.root(),
699 live_root,
700 "to_batch().root() must match the live db.root() even before sync/commit"
701 );
702
703 db.destroy().await.unwrap();
704 });
705 }
706
707 #[test_traced("INFO")]
708 fn test_compact_stale_batch_chained() {
709 deterministic::Runner::default().start(|context| async move {
710 let mut db = open_db::<mmr::Family>(context.child("db"), "keyless-chained-stale").await;
711 let floor = db.inactivity_floor_loc();
712
713 let parent =
714 db.new_batch()
715 .append(U64::new(1))
716 .merkleize(&db, Some(U64::new(11)), floor);
717 let child_a = parent.new_batch::<Sha256>().append(U64::new(2)).merkleize(
718 &db,
719 Some(U64::new(22)),
720 floor,
721 );
722 let child_b = parent.new_batch::<Sha256>().append(U64::new(3)).merkleize(
723 &db,
724 Some(U64::new(33)),
725 floor,
726 );
727
728 db.apply_batch(child_a).unwrap();
729 assert!(matches!(
730 db.apply_batch(child_b),
731 Err(Error::StaleBatch { .. })
732 ));
733
734 db.destroy().await.unwrap();
735 });
736 }
737
738 #[test_traced("INFO")]
739 fn test_compact_stale_parent_after_child_applied() {
740 deterministic::Runner::default().start(|context| async move {
741 let mut db =
742 open_db::<mmr::Family>(context.child("db"), "keyless-child-before-parent").await;
743 let floor = db.inactivity_floor_loc();
744
745 let parent =
746 db.new_batch()
747 .append(U64::new(1))
748 .merkleize(&db, Some(U64::new(11)), floor);
749 let child = parent.new_batch::<Sha256>().append(U64::new(2)).merkleize(
750 &db,
751 Some(U64::new(22)),
752 floor,
753 );
754
755 db.apply_batch(child).unwrap();
756 assert!(matches!(
757 db.apply_batch(parent),
758 Err(Error::StaleBatch { .. })
759 ));
760
761 db.destroy().await.unwrap();
762 });
763 }
764
765 #[test_traced("INFO")]
766 fn test_compact_sequential_commit_parent_then_child() {
767 deterministic::Runner::default().start(|context| async move {
768 let mut db = open_db::<mmr::Family>(context.child("db"), "keyless-parent-child").await;
769 let floor = db.inactivity_floor_loc();
770
771 let parent =
772 db.new_batch()
773 .append(U64::new(1))
774 .merkleize(&db, Some(U64::new(11)), floor);
775 let child = parent.new_batch::<Sha256>().append(U64::new(2)).merkleize(
776 &db,
777 Some(U64::new(22)),
778 floor,
779 );
780 let expected_root = child.root();
781
782 db.apply_batch(parent).unwrap();
783 db.apply_batch(child).unwrap();
784 db.commit().await.unwrap();
785
786 assert_eq!(db.root(), expected_root);
787
788 db.destroy().await.unwrap();
789 });
790 }
791
792 #[test_traced("INFO")]
796 fn test_compact_ancestor_floor_regression_rejected() {
797 deterministic::Runner::default().start(|context| async move {
798 let mut db =
799 open_db::<mmr::Family>(context.child("db"), "keyless-ancestor-floor-regressed")
800 .await;
801
802 let parent = db
804 .new_batch()
805 .append(U64::new(1))
806 .merkleize(&db, None, Location::new(2));
807 let child = parent.new_batch::<Sha256>().append(U64::new(2)).merkleize(
809 &db,
810 None,
811 Location::new(1),
812 );
813
814 assert!(matches!(
815 db.apply_batch(child),
816 Err(Error::FloorRegressed(new, prev))
817 if new == Location::new(1) && prev == Location::new(2)
818 ));
819
820 db.destroy().await.unwrap();
821 });
822 }
823
824 #[test_traced("INFO")]
825 fn test_compact_rewind_restores_commit_metadata_and_floor() {
826 deterministic::Runner::default().start(|context| async move {
827 let mut db = open_db::<mmr::Family>(context.child("db"), "keyless-rewind-meta").await;
828
829 let v1 = U64::new(1);
830 let meta1 = U64::new(11);
831 let floor1 = Location::new(0);
832 db.apply_batch(
833 db.new_batch()
834 .append(v1)
835 .merkleize(&db, Some(meta1.clone()), floor1),
836 )
837 .unwrap();
838 db.commit().await.unwrap();
839 let root_after_first = db.root();
840
841 let v2 = U64::new(2);
842 let meta2 = U64::new(22);
843 let floor2 = Location::new(1);
844 db.apply_batch(
845 db.new_batch()
846 .append(v2)
847 .merkleize(&db, Some(meta2.clone()), floor2),
848 )
849 .unwrap();
850 db.commit().await.unwrap();
851 assert_eq!(db.get_metadata(), Some(meta2));
852 assert_eq!(db.inactivity_floor_loc(), floor2);
853
854 db.rewind().await.unwrap();
855 assert_eq!(db.root(), root_after_first);
856 assert_eq!(db.get_metadata(), Some(meta1));
857 assert_eq!(db.inactivity_floor_loc(), floor1);
858
859 db.destroy().await.unwrap();
860 });
861 }
862
863 #[test_traced("INFO")]
864 fn test_compact_rewind_persists_across_reopen() {
865 deterministic::Runner::default().start(|context| async move {
866 let partition = "keyless-rewind-reopen";
867 let meta1 = U64::new(11);
868 let floor1 = Location::new(0);
869 let meta2 = U64::new(22);
870 let floor2 = Location::new(1);
871
872 let root_after_first = {
873 let mut db = open_db::<mmr::Family>(context.child("first"), partition).await;
874 db.apply_batch(db.new_batch().append(U64::new(1)).merkleize(
875 &db,
876 Some(meta1.clone()),
877 floor1,
878 ))
879 .unwrap();
880 db.commit().await.unwrap();
881 let root = db.root();
882
883 db.apply_batch(db.new_batch().append(U64::new(2)).merkleize(
884 &db,
885 Some(meta2),
886 floor2,
887 ))
888 .unwrap();
889 db.commit().await.unwrap();
890
891 db.rewind().await.unwrap();
892 root
893 };
894
895 let db = open_db::<mmr::Family>(context.child("second"), partition).await;
896 assert_eq!(db.root(), root_after_first);
897 assert_eq!(db.get_metadata(), Some(meta1));
898 assert_eq!(db.inactivity_floor_loc(), floor1);
899
900 db.destroy().await.unwrap();
901 });
902 }
903
904 #[test_traced("INFO")]
905 fn test_compact_reopen_rejects_tampered_witness() {
906 deterministic::Runner::default().start(|context| async move {
907 let partition = "keyless-witness-tamper";
908 let mut db = open_db::<mmr::Family>(context.child("db"), partition).await;
909 db.apply_batch(db.new_batch().append(U64::new(7)).merkleize(
910 &db,
911 Some(U64::new(11)),
912 Location::new(1),
913 ))
914 .unwrap();
915 db.commit().await.unwrap();
916 let slot = db.merkle.active_slot();
917 drop(db);
918
919 tamper_metadata_key(
920 context.child("tamper"),
921 partition,
922 crate::qmdb::compact::witness::last_commit_proof_key(slot),
923 )
924 .await;
925
926 let merkle: crate::merkle::compact::Merkle<mmr::Family, _, _, Sequential> =
927 crate::merkle::compact::Merkle::init(
928 context.child("reopen"),
929 crate::merkle::compact::Config {
930 partition: partition.into(),
931 strategy: Sequential,
932 },
933 )
934 .await
935 .unwrap();
936 let reopened = TestDb::<mmr::Family>::init_from_merkle(merkle, ()).await;
937 assert!(matches!(reopened, Err(Error::DataCorrupted(_))));
938 });
939 }
940
941 #[test_traced("INFO")]
942 fn test_compact_reopen_rejects_commit_floor_beyond_tip() {
943 deterministic::Runner::default().start(|context| async move {
944 let partition = "keyless-invalid-persisted-floor";
945 let mut db = open_db::<mmr::Family>(context.child("db"), partition).await;
946 db.apply_batch(db.new_batch().append(U64::new(7)).merkleize(
947 &db,
948 Some(U64::new(11)),
949 Location::new(1),
950 ))
951 .unwrap();
952 db.commit().await.unwrap();
953 let slot = db.merkle.active_slot();
954 drop(db);
955 let oversized_floor = Location::new(10);
956
957 overwrite_metadata_key(
958 context.child("tamper"),
959 partition,
960 crate::qmdb::compact::witness::last_commit_op_key(slot),
961 Operation::<mmr::Family, FixedEncoding<U64>>::Commit(
962 Some(U64::new(11)),
963 oversized_floor,
964 )
965 .encode()
966 .to_vec(),
967 )
968 .await;
969
970 let merkle: crate::merkle::compact::Merkle<mmr::Family, _, _, Sequential> =
971 crate::merkle::compact::Merkle::init(
972 context.child("reopen"),
973 crate::merkle::compact::Config {
974 partition: partition.into(),
975 strategy: Sequential,
976 },
977 )
978 .await
979 .unwrap();
980 let reopened = TestDb::<mmr::Family>::init_from_merkle(merkle, ()).await;
981 assert!(matches!(
982 reopened,
983 Err(Error::DataCorrupted("invalid compact witness"))
984 ));
985 });
986 }
987
988 #[test_traced("INFO")]
989 fn test_compact_rewind_beyond_history() {
990 deterministic::Runner::default().start(|context| async move {
991 let mut db = open_db::<mmr::Family>(context.child("db"), "keyless-rewind-beyond").await;
992 assert!(matches!(
993 db.rewind().await,
994 Err(Error::Merkle(crate::merkle::Error::RewindBeyondHistory))
995 ));
996 db.destroy().await.unwrap();
997 });
998 }
999
1000 #[test_traced("INFO")]
1001 fn test_compact_rewind_preserves_pre_advance_batch() {
1002 deterministic::Runner::default().start(|context| async move {
1003 let mut db =
1004 open_db::<mmr::Family>(context.child("db"), "keyless-rewind-preserves-pre-advance")
1005 .await;
1006
1007 db.apply_batch(db.new_batch().append(U64::new(1)).merkleize(
1008 &db,
1009 None,
1010 Location::new(0),
1011 ))
1012 .unwrap();
1013 db.commit().await.unwrap();
1014
1015 let held = db
1017 .new_batch()
1018 .append(U64::new(2))
1019 .merkleize(&db, None, Location::new(0));
1020
1021 db.apply_batch(db.new_batch().append(U64::new(3)).merkleize(
1023 &db,
1024 None,
1025 Location::new(0),
1026 ))
1027 .unwrap();
1028 db.commit().await.unwrap();
1029 db.rewind().await.unwrap();
1030
1031 db.apply_batch(held).unwrap();
1034
1035 db.destroy().await.unwrap();
1036 });
1037 }
1038
1039 #[test_traced("INFO")]
1040 fn test_compact_noop_commit_after_commit() {
1041 deterministic::Runner::default().start(|context| async move {
1042 let mut db =
1043 open_db::<mmr::Family>(context.child("db"), "keyless-noop-after-commit").await;
1044
1045 db.apply_batch(
1046 db.new_batch()
1047 .append(U64::new(1))
1048 .append(U64::new(2))
1049 .merkleize(&db, Some(U64::new(11)), Location::new(0)),
1050 )
1051 .unwrap();
1052 db.commit().await.unwrap();
1053 let root_after_first = db.root();
1054 assert_eq!(db.size(), Location::new(4));
1055
1056 db.commit().await.unwrap();
1057 assert_eq!(db.size(), Location::new(4));
1058 assert_eq!(db.root(), root_after_first);
1059 assert_eq!(db.current_target().root, db.root());
1060
1061 db.destroy().await.unwrap();
1062 });
1063 }
1064
1065 #[test_traced("INFO")]
1066 fn test_compact_noop_commit_after_reopen() {
1067 deterministic::Runner::default().start(|context| async move {
1068 let partition = "keyless-noop-after-reopen";
1069
1070 let root_before_drop = {
1071 let mut db = open_db::<mmr::Family>(context.child("first"), partition).await;
1072 db.apply_batch(
1073 db.new_batch()
1074 .append(U64::new(1))
1075 .append(U64::new(2))
1076 .merkleize(&db, Some(U64::new(11)), Location::new(0)),
1077 )
1078 .unwrap();
1079 db.commit().await.unwrap();
1080 let root = db.root();
1081 assert_eq!(db.size(), Location::new(4));
1082 root
1083 };
1084
1085 let db = open_db::<mmr::Family>(context.child("second"), partition).await;
1086 assert_eq!(db.root(), root_before_drop);
1087 assert_eq!(db.size(), Location::new(4));
1088
1089 db.commit().await.unwrap();
1090 assert_eq!(db.size(), Location::new(4));
1091 assert_eq!(db.root(), root_before_drop);
1092 assert_eq!(db.current_target().root, db.root());
1093
1094 db.destroy().await.unwrap();
1095 });
1096 }
1097
1098 #[test_traced("INFO")]
1099 fn test_compact_noop_commit_after_rewind() {
1100 deterministic::Runner::default().start(|context| async move {
1101 let mut db =
1102 open_db::<mmr::Family>(context.child("db"), "keyless-noop-after-rewind").await;
1103
1104 db.apply_batch(
1105 db.new_batch()
1106 .append(U64::new(1))
1107 .append(U64::new(2))
1108 .merkleize(&db, Some(U64::new(11)), Location::new(0)),
1109 )
1110 .unwrap();
1111 db.commit().await.unwrap();
1112 let root_after_first = db.root();
1113
1114 db.apply_batch(db.new_batch().append(U64::new(3)).merkleize(
1115 &db,
1116 Some(U64::new(22)),
1117 Location::new(1),
1118 ))
1119 .unwrap();
1120 db.commit().await.unwrap();
1121
1122 db.rewind().await.unwrap();
1123 assert_eq!(db.size(), Location::new(4));
1124 assert_eq!(db.root(), root_after_first);
1125
1126 db.commit().await.unwrap();
1127 assert_eq!(db.size(), Location::new(4));
1128 assert_eq!(db.root(), root_after_first);
1129 assert_eq!(db.current_target().root, db.root());
1130
1131 db.destroy().await.unwrap();
1132 });
1133 }
1134
1135 #[test_traced("INFO")]
1136 fn test_compact_rewind_makes_post_advance_batch_stale() {
1137 deterministic::Runner::default().start(|context| async move {
1138 let mut db =
1139 open_db::<mmr::Family>(context.child("db"), "keyless-rewind-makes-stale").await;
1140
1141 db.apply_batch(db.new_batch().append(U64::new(1)).merkleize(
1142 &db,
1143 None,
1144 Location::new(0),
1145 ))
1146 .unwrap();
1147 db.commit().await.unwrap();
1148
1149 db.apply_batch(db.new_batch().append(U64::new(2)).merkleize(
1150 &db,
1151 None,
1152 Location::new(0),
1153 ))
1154 .unwrap();
1155 db.commit().await.unwrap();
1156
1157 let held = db
1159 .new_batch()
1160 .append(U64::new(3))
1161 .merkleize(&db, None, Location::new(0));
1162
1163 db.rewind().await.unwrap();
1164
1165 assert!(matches!(
1168 db.apply_batch(held),
1169 Err(Error::StaleBatch { .. })
1170 ));
1171
1172 db.destroy().await.unwrap();
1173 });
1174 }
1175
1176 #[test_traced("INFO")]
1177 fn test_witness_state_reports_cached_commit_corruption() {
1178 deterministic::Runner::default().start(|context| async move {
1179 let db = open_db::<mmr::Family>(context.child("db"), "keyless-serve-corruption").await;
1180 let target = db.current_target();
1181 db.witness
1182 .mutate(|witness| witness.last_commit_op_bytes.clear());
1183
1184 assert!(matches!(
1185 db.compact_state(target),
1186 Err(compact_sync::ServeError::Database(Error::DataCorrupted(
1187 "invalid commit operation"
1188 )))
1189 ));
1190
1191 db.destroy().await.unwrap();
1192 });
1193 }
1194
1195 #[test_traced("INFO")]
1198 fn test_compact_ancestor_floor_beyond_commit_loc_rejected() {
1199 deterministic::Runner::default().start(|context| async move {
1200 let mut db =
1201 open_db::<mmr::Family>(context.child("db"), "keyless-ancestor-floor-beyond").await;
1202
1203 let parent = db
1205 .new_batch()
1206 .append(U64::new(1))
1207 .merkleize(&db, None, Location::new(3));
1208 let child = parent.new_batch::<Sha256>().append(U64::new(2)).merkleize(
1210 &db,
1211 None,
1212 Location::new(0),
1213 );
1214
1215 assert!(matches!(
1216 db.apply_batch(child),
1217 Err(Error::FloorBeyondSize(floor, commit))
1218 if floor == Location::new(3) && commit == Location::new(2)
1219 ));
1220
1221 db.destroy().await.unwrap();
1222 });
1223 }
1224}