1use crate::{
15 journal::{
16 authenticated,
17 contiguous::{
18 variable::{Config as JournalConfig, Journal as ContiguousJournal},
19 Contiguous, Reader,
20 },
21 },
22 mmr::{
23 journaled::{Config as MmrConfig, Mmr},
24 Location, Proof,
25 },
26 qmdb::{
27 any::VariableValue,
28 operation::Committable,
29 store::{LogStore, MerkleizedStore},
30 Error,
31 },
32};
33use commonware_cryptography::Hasher;
34use commonware_parallel::ThreadPool;
35use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage};
36use std::num::{NonZeroU64, NonZeroUsize};
37use tracing::{debug, warn};
38
39pub mod batch;
40mod operation;
41pub use operation::Operation;
42
43#[derive(Clone)]
45pub struct Config<C> {
46 pub mmr_journal_partition: String,
48
49 pub mmr_items_per_blob: NonZeroU64,
51
52 pub mmr_write_buffer: NonZeroUsize,
54
55 pub mmr_metadata_partition: String,
57
58 pub log_partition: String,
60
61 pub log_write_buffer: NonZeroUsize,
63
64 pub log_compression: Option<u8>,
66
67 pub log_codec_config: C,
69
70 pub log_items_per_section: NonZeroU64,
72
73 pub thread_pool: Option<ThreadPool>,
75
76 pub page_cache: CacheRef,
78}
79
80type Journal<E, V, H> = authenticated::Journal<E, ContiguousJournal<E, Operation<V>>, H>;
82
83pub struct Keyless<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher> {
85 journal: Journal<E, V, H>,
87
88 last_commit_loc: Location,
90}
91
92impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher> Keyless<E, V, H> {
93 pub async fn get(&self, loc: Location) -> Result<Option<V>, Error> {
99 let reader = self.journal.reader().await;
100 let op_count = reader.bounds().end;
101 if loc >= op_count {
102 return Err(Error::LocationOutOfBounds(loc, Location::new(op_count)));
103 }
104 let op = reader.read(*loc).await?;
105
106 Ok(op.into_value())
107 }
108
109 pub const fn last_commit_loc(&self) -> Location {
111 self.last_commit_loc
112 }
113
114 pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
116 let op = self
117 .journal
118 .reader()
119 .await
120 .read(*self.last_commit_loc)
121 .await?;
122 let Operation::Commit(metadata) = op else {
123 return Ok(None);
124 };
125
126 Ok(metadata)
127 }
128
129 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
132 let mmr_cfg = MmrConfig {
133 journal_partition: cfg.mmr_journal_partition,
134 metadata_partition: cfg.mmr_metadata_partition,
135 items_per_blob: cfg.mmr_items_per_blob,
136 write_buffer: cfg.mmr_write_buffer,
137 thread_pool: cfg.thread_pool,
138 page_cache: cfg.page_cache.clone(),
139 };
140
141 let journal_cfg = JournalConfig {
142 partition: cfg.log_partition,
143 items_per_section: cfg.log_items_per_section,
144 compression: cfg.log_compression,
145 codec_config: cfg.log_codec_config,
146 page_cache: cfg.page_cache,
147 write_buffer: cfg.log_write_buffer,
148 };
149
150 let mut journal = Journal::new(context, mmr_cfg, journal_cfg, Operation::is_commit).await?;
151 if journal.size().await == 0 {
152 warn!("no operations found in log, creating initial commit");
153 journal.append(&Operation::Commit(None)).await?;
154 journal.sync().await?;
155 }
156
157 let last_commit_loc = journal
158 .size()
159 .await
160 .checked_sub(1)
161 .expect("at least one commit should exist");
162
163 Ok(Self {
164 journal,
165 last_commit_loc,
166 })
167 }
168
169 pub fn root(&self) -> H::Digest {
171 self.journal.root()
172 }
173
174 pub async fn proof(
181 &self,
182 start_loc: Location,
183 max_ops: NonZeroU64,
184 ) -> Result<(Proof<H::Digest>, Vec<Operation<V>>), Error> {
185 self.historical_proof(self.size().await, start_loc, max_ops)
186 .await
187 }
188
189 pub async fn historical_proof(
199 &self,
200 op_count: Location,
201 start_loc: Location,
202 max_ops: NonZeroU64,
203 ) -> Result<(Proof<H::Digest>, Vec<Operation<V>>), Error> {
204 Ok(self
205 .journal
206 .historical_proof(op_count, start_loc, max_ops)
207 .await?)
208 }
209
210 pub async fn prune(&mut self, loc: Location) -> Result<(), Error> {
217 if loc > self.last_commit_loc {
218 return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
219 }
220 self.journal.prune(loc).await?;
221
222 Ok(())
223 }
224
225 pub async fn sync(&mut self) -> Result<(), Error> {
229 self.journal.sync().await.map_err(Into::into)
230 }
231
232 pub async fn destroy(self) -> Result<(), Error> {
234 Ok(self.journal.destroy().await?)
235 }
236
237 #[allow(clippy::type_complexity)]
239 pub fn new_batch(&self) -> batch::UnmerkleizedBatch<'_, E, V, H, Mmr<E, H::Digest>> {
240 let journal_size = *self.last_commit_loc + 1;
241 batch::UnmerkleizedBatch {
242 keyless: self,
243 journal_batch: self.journal.new_batch(),
244 appends: Vec::new(),
245 base_operations: Vec::new(),
246 base_size: journal_size,
247 db_size: journal_size,
248 }
249 }
250
251 pub async fn apply_batch(
260 &mut self,
261 batch: batch::Changeset<H::Digest, V>,
262 ) -> Result<core::ops::Range<Location>, Error> {
263 let journal_size = *self.last_commit_loc + 1;
264 if batch.db_size != journal_size {
265 return Err(Error::StaleChangeset {
266 expected: batch.db_size,
267 actual: journal_size,
268 });
269 }
270 let start_loc = self.last_commit_loc + 1;
271
272 self.journal.apply_batch(batch.journal_finalized).await?;
274
275 self.journal.commit().await?;
277
278 self.last_commit_loc = Location::new(batch.total_size - 1);
280
281 let end_loc = Location::new(batch.total_size);
282 debug!(size = ?end_loc, "applied batch");
283 Ok(start_loc..end_loc)
284 }
285}
286
287impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher> MerkleizedStore
288 for Keyless<E, V, H>
289{
290 type Digest = H::Digest;
291 type Operation = Operation<V>;
292
293 fn root(&self) -> Self::Digest {
294 self.journal.root()
295 }
296
297 async fn historical_proof(
298 &self,
299 historical_size: Location,
300 start_loc: Location,
301 max_ops: NonZeroU64,
302 ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
303 Ok(self
304 .journal
305 .historical_proof(historical_size, start_loc, max_ops)
306 .await?)
307 }
308}
309
310impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher> LogStore for Keyless<E, V, H> {
311 type Value = V;
312
313 async fn bounds(&self) -> std::ops::Range<Location> {
314 let bounds = self.journal.reader().await.bounds();
315 Location::new(bounds.start)..Location::new(bounds.end)
316 }
317
318 async fn get_metadata(&self) -> Result<Option<Self::Value>, Error> {
319 self.get_metadata().await
320 }
321}
322
323#[cfg(test)]
324mod test {
325 use super::*;
326 use crate::{
327 mmr::StandardHasher as Standard,
328 qmdb::{store::LogStore, verify_proof},
329 };
330 use commonware_cryptography::Sha256;
331 use commonware_macros::test_traced;
332 use commonware_runtime::{deterministic, BufferPooler, Metrics, Runner as _};
333 use commonware_utils::{NZUsize, NZU16, NZU64};
334 use rand::Rng;
335 use std::num::NonZeroU16;
336
337 const PAGE_SIZE: NonZeroU16 = NZU16!(101);
339 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11);
340
341 fn db_config(
342 suffix: &str,
343 pooler: &impl BufferPooler,
344 ) -> Config<(commonware_codec::RangeCfg<usize>, ())> {
345 Config {
346 mmr_journal_partition: format!("journal-{suffix}"),
347 mmr_metadata_partition: format!("metadata-{suffix}"),
348 mmr_items_per_blob: NZU64!(11),
349 mmr_write_buffer: NZUsize!(1024),
350 log_partition: format!("log-journal-{suffix}"),
351 log_write_buffer: NZUsize!(1024),
352 log_compression: None,
353 log_codec_config: ((0..=10000).into(), ()),
354 log_items_per_section: NZU64!(7),
355 thread_pool: None,
356 page_cache: CacheRef::from_pooler(pooler, PAGE_SIZE, PAGE_CACHE_SIZE),
357 }
358 }
359
360 type Db = Keyless<deterministic::Context, Vec<u8>, Sha256>;
361
362 async fn open_db(context: deterministic::Context) -> Db {
364 let cfg = db_config("partition", &context);
365 Db::init(context, cfg).await.unwrap()
366 }
367
368 #[test_traced("INFO")]
369 pub fn test_keyless_db_empty() {
370 let executor = deterministic::Runner::default();
371 executor.start(|context| async move {
372 let db = open_db(context.with_label("db1")).await;
373 let bounds = db.bounds().await;
374 assert_eq!(bounds.end, 1); assert_eq!(bounds.start, Location::new(0));
376
377 assert_eq!(db.get_metadata().await.unwrap(), None);
378 assert_eq!(db.last_commit_loc(), Location::new(0));
379
380 let v1 = vec![1u8; 8];
382 let root = db.root();
383 {
384 let mut batch = db.new_batch();
385 batch.append(v1);
386 }
388 drop(db);
389 let mut db = open_db(context.with_label("db2")).await;
390 assert_eq!(db.root(), root);
391 assert_eq!(db.bounds().await.end, 1);
392 assert_eq!(db.get_metadata().await.unwrap(), None);
393
394 let metadata = vec![3u8; 10];
396 let finalized = db.new_batch().merkleize(Some(metadata.clone())).finalize();
397 db.apply_batch(finalized).await.unwrap();
398 assert_eq!(db.bounds().await.end, 2); assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
400 assert_eq!(
401 db.get(Location::new(1)).await.unwrap(),
402 Some(metadata.clone())
403 ); let root = db.root();
405
406 let db = open_db(context.with_label("db3")).await;
408 assert_eq!(db.bounds().await.end, 2); assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
410 assert_eq!(db.root(), root);
411 assert_eq!(db.last_commit_loc(), Location::new(1));
412
413 db.destroy().await.unwrap();
414 });
415 }
416
417 #[test_traced("WARN")]
418 pub fn test_keyless_db_build_basic() {
419 let executor = deterministic::Runner::default();
420 executor.start(|context| async move {
421 let mut db = open_db(context.with_label("db1")).await;
423
424 let v1 = vec![1u8; 8];
425 let v2 = vec![2u8; 20];
426
427 let finalized = {
428 let mut batch = db.new_batch();
429 let loc1 = batch.append(v1.clone());
430 let loc2 = batch.append(v2.clone());
431 assert_eq!(loc1, Location::new(1));
432 assert_eq!(loc2, Location::new(2));
433 batch.merkleize(None).finalize()
434 };
435 db.apply_batch(finalized).await.unwrap();
436 let loc1 = Location::new(1);
437 let loc2 = Location::new(2);
438
439 assert_eq!(db.bounds().await.end, 4); assert_eq!(db.get_metadata().await.unwrap(), None);
442 assert_eq!(db.get(Location::new(3)).await.unwrap(), None); let root = db.root();
444 db.sync().await.unwrap();
445 drop(db);
446 let db = open_db(context.with_label("db2")).await;
447 assert_eq!(db.bounds().await.end, 4);
448 assert_eq!(db.root(), root);
449
450 assert_eq!(db.get(loc1).await.unwrap().unwrap(), v1);
451 assert_eq!(db.get(loc2).await.unwrap().unwrap(), v2);
452
453 {
454 let mut batch = db.new_batch();
455 batch.append(v2);
456 batch.append(v1);
457 }
459
460 drop(db); let db = open_db(context.with_label("db3")).await;
463 assert_eq!(db.bounds().await.end, 4);
464 assert_eq!(db.root(), root);
465
466 drop(db);
468 let db = open_db(context.with_label("db4")).await;
469 assert_eq!(db.bounds().await.end, 4);
470 assert_eq!(db.root(), root);
471
472 db.destroy().await.unwrap();
473 });
474 }
475
476 fn generate_values<T: Rng>(rng: &mut T, num_elements: usize) -> Vec<Vec<u8>> {
478 (0..num_elements)
479 .map(|_| vec![(rng.next_u32() % 255) as u8, (rng.next_u32() % 255) as u8])
480 .collect()
481 }
482
483 #[test_traced("WARN")]
484 pub fn test_keyless_db_recovery() {
485 let executor = deterministic::Runner::default();
486 const ELEMENTS: usize = 1000;
487 executor.start(|mut context| async move {
488 let db = open_db(context.with_label("db1")).await;
489 let root = db.root();
490
491 {
493 let mut batch = db.new_batch();
494 for value in generate_values(&mut context, ELEMENTS) {
495 batch.append(value);
496 }
497 }
499 drop(db);
500 let mut db = open_db(context.with_label("db2")).await;
502 assert_eq!(root, db.root());
503
504 let finalized = {
506 let mut batch = db.new_batch();
507 for value in generate_values(&mut context, ELEMENTS) {
508 batch.append(value);
509 }
510 batch.merkleize(None).finalize()
511 };
512 db.apply_batch(finalized).await.unwrap();
513 let root = db.root();
514
515 {
517 let mut batch = db.new_batch();
518 for value in generate_values(&mut context, ELEMENTS) {
519 batch.append(value);
520 }
521 }
523 drop(db);
524 let mut db = open_db(context.with_label("db3")).await;
526 assert_eq!(root, db.root());
527
528 let finalized = {
530 let mut batch = db.new_batch();
531 for value in generate_values(&mut context, ELEMENTS) {
532 batch.append(value);
533 }
534 batch.merkleize(None).finalize()
535 };
536 db.apply_batch(finalized).await.unwrap();
537 let root = db.root();
538
539 drop(db);
541 let db = open_db(context.with_label("db4")).await;
542 assert_eq!(db.bounds().await.end, 2 * ELEMENTS as u64 + 3);
543 assert_eq!(db.root(), root);
544
545 db.destroy().await.unwrap();
546 });
547 }
548
549 #[test_traced("WARN")]
552 fn test_keyless_db_non_empty_db_recovery() {
553 let executor = deterministic::Runner::default();
554 executor.start(|mut context| async move {
555 let mut db = open_db(context.with_label("db1")).await;
556
557 const ELEMENTS: usize = 200;
559 let finalized = {
560 let mut batch = db.new_batch();
561 for value in generate_values(&mut context, ELEMENTS) {
562 batch.append(value);
563 }
564 batch.merkleize(None).finalize()
565 };
566 db.apply_batch(finalized).await.unwrap();
567 let root = db.root();
568 let op_count = db.bounds().await.end;
569
570 let db = open_db(context.with_label("db2")).await;
572 assert_eq!(db.bounds().await.end, op_count);
573 assert_eq!(db.root(), root);
574 assert_eq!(db.last_commit_loc(), op_count - 1);
575 drop(db);
576
577 async fn recover_from_failure(
579 mut context: deterministic::Context,
580 label1: &str,
581 label2: &str,
582 root: <Sha256 as Hasher>::Digest,
583 op_count: Location,
584 ) {
585 let db = open_db(context.with_label(label1)).await;
586
587 {
589 let mut batch = db.new_batch();
590 for value in generate_values(&mut context, ELEMENTS) {
591 batch.append(value);
592 }
593 }
595 drop(db);
596 let db = open_db(context.with_label(label2)).await;
597 assert_eq!(db.bounds().await.end, op_count);
598 assert_eq!(db.root(), root);
599 }
600
601 recover_from_failure(context.with_label("recovery1"), "a", "b", root, op_count).await;
602
603 let mut db = open_db(context.with_label("db3")).await;
605 db.prune(db.last_commit_loc()).await.unwrap();
606 assert_eq!(db.bounds().await.end, op_count);
607 assert_eq!(db.root(), root);
608 db.sync().await.unwrap();
609 drop(db);
610
611 recover_from_failure(context.with_label("recovery2"), "c", "d", root, op_count).await;
612
613 let mut db = open_db(context.with_label("db4")).await;
615 let finalized = {
616 let mut batch = db.new_batch();
617 for value in generate_values(&mut context, ELEMENTS) {
618 batch.append(value);
619 }
620 batch.merkleize(None).finalize()
621 };
622 db.apply_batch(finalized).await.unwrap();
623 let db = open_db(context.with_label("db5")).await;
624 let bounds = db.bounds().await;
625 assert!(bounds.end > op_count);
626 assert_ne!(db.root(), root);
627 assert_eq!(db.last_commit_loc(), bounds.end - 1);
628
629 db.destroy().await.unwrap();
630 });
631 }
632
633 #[test_traced("WARN")]
636 fn test_keyless_db_empty_db_recovery() {
637 const ELEMENTS: u64 = 1000;
638 let executor = deterministic::Runner::default();
639 executor.start(|context| async move {
640 let db = open_db(context.with_label("db1")).await;
641 let root = db.root();
642
643 let db = open_db(context.with_label("db2")).await;
645 assert_eq!(db.bounds().await.end, 1); assert_eq!(db.root(), root);
647
648 fn build_values() -> Vec<Vec<u8>> {
649 (0..ELEMENTS)
650 .map(|i| vec![(i % 255) as u8; ((i % 17) + 13) as usize])
651 .collect()
652 }
653
654 {
656 let mut batch = db.new_batch();
657 for v in build_values() {
658 batch.append(v);
659 }
660 }
662 drop(db);
663 let db = open_db(context.with_label("db3")).await;
664 assert_eq!(db.bounds().await.end, 1); assert_eq!(db.root(), root);
666
667 {
669 let mut batch = db.new_batch();
670 for v in build_values() {
671 batch.append(v);
672 }
673 }
675 drop(db);
676 let db = open_db(context.with_label("db4")).await;
677 assert_eq!(db.bounds().await.end, 1); assert_eq!(db.root(), root);
679
680 {
682 let mut batch = db.new_batch();
683 for v in build_values() {
684 batch.append(v);
685 }
686 for v in build_values() {
687 batch.append(v);
688 }
689 for v in build_values() {
690 batch.append(v);
691 }
692 }
694 drop(db);
695 let mut db = open_db(context.with_label("db5")).await;
696 assert_eq!(db.bounds().await.end, 1); assert_eq!(db.root(), root);
698 assert_eq!(db.last_commit_loc(), Location::new(0));
699
700 let finalized = {
702 let mut batch = db.new_batch();
703 for v in build_values() {
704 batch.append(v);
705 }
706 batch.merkleize(None).finalize()
707 };
708 db.apply_batch(finalized).await.unwrap();
709 let db = open_db(context.with_label("db6")).await;
710 assert!(db.bounds().await.end > 1);
711 assert_ne!(db.root(), root);
712
713 db.destroy().await.unwrap();
714 });
715 }
716
717 #[test_traced("INFO")]
718 pub fn test_keyless_db_proof_generation_and_verification() {
719 let executor = deterministic::Runner::default();
720 executor.start(|context| async move {
721 let mut hasher = Standard::<Sha256>::new();
722 let mut db = open_db(context.clone()).await;
723
724 const ELEMENTS: u64 = 100;
726 let mut values = Vec::new();
727 let finalized = {
728 let mut batch = db.new_batch();
729 for i in 0u64..ELEMENTS {
730 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
731 values.push(v.clone());
732 batch.append(v);
733 }
734 batch.merkleize(None).finalize()
735 };
736 db.apply_batch(finalized).await.unwrap();
737
738 assert!(matches!(
740 db.historical_proof(db.bounds().await.end + 1, Location::new(5), NZU64!(10))
741 .await,
742 Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
743 ));
744
745 let root = db.root();
746
747 let test_cases = vec![
749 (0, 10), (10, 5), (50, 20), (90, 15), (0, 1), (ELEMENTS - 1, 1), (ELEMENTS, 1), ];
757
758 for (start_loc, max_ops) in test_cases {
759 let (proof, ops) = db.proof(Location::new(start_loc), NZU64!(max_ops)).await.unwrap();
760
761 assert!(
763 verify_proof(&mut hasher, &proof, Location::new(start_loc), &ops, &root),
764 "Failed to verify proof for range starting at {start_loc} with max {max_ops} ops",
765 );
766
767 let expected_ops = std::cmp::min(max_ops, *db.bounds().await.end - start_loc);
769 assert_eq!(
770 ops.len() as u64,
771 expected_ops,
772 "Expected {expected_ops} operations, got {}",
773 ops.len(),
774 );
775
776 for (i, op) in ops.iter().enumerate() {
778 let loc = start_loc + i as u64;
779 if loc == 0 {
780 assert!(
781 matches!(op, Operation::Commit(None)),
782 "Expected Initial Commit operation at location {loc}, got {op:?}",
783 );
784 } else if loc <= ELEMENTS {
785 assert!(
787 matches!(op, Operation::Append(_)),
788 "Expected Append operation at location {loc}, got {op:?}",
789 );
790 } else if loc == ELEMENTS + 1 {
791 assert!(
793 matches!(op, Operation::Commit(_)),
794 "Expected Commit operation at location {loc}, got {op:?}",
795 );
796 }
797 }
798
799 let wrong_root = Sha256::hash(&[0xFF; 32]);
801 assert!(
802 !verify_proof(&mut hasher, &proof, Location::new(start_loc), &ops, &wrong_root),
803 "Proof should fail with wrong root"
804 );
805
806 if start_loc > 0 {
808 assert!(
809 !verify_proof(&mut hasher, &proof, Location::new(start_loc - 1), &ops, &root),
810 "Proof should fail with wrong start location"
811 );
812 }
813 }
814
815 db.destroy().await.unwrap();
816 });
817 }
818
819 #[test_traced("INFO")]
820 pub fn test_keyless_db_proof_with_pruning() {
821 let executor = deterministic::Runner::default();
822 executor.start(|context| async move {
823 let mut hasher = Standard::<Sha256>::new();
824 let mut db = open_db(context.with_label("db1")).await;
825
826 const ELEMENTS: u64 = 100;
828 let mut values = Vec::new();
829 let finalized = {
830 let mut batch = db.new_batch();
831 for i in 0u64..ELEMENTS {
832 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
833 values.push(v.clone());
834 batch.append(v);
835 }
836 batch.merkleize(None).finalize()
837 };
838 db.apply_batch(finalized).await.unwrap();
839
840 let finalized = {
842 let mut batch = db.new_batch();
843 for i in ELEMENTS..ELEMENTS * 2 {
844 let v = vec![(i % 255) as u8; ((i % 17) + 5) as usize];
845 values.push(v.clone());
846 batch.append(v);
847 }
848 batch.merkleize(None).finalize()
849 };
850 db.apply_batch(finalized).await.unwrap();
851 let root = db.root();
852
853 println!("last commit loc: {}", db.last_commit_loc());
854
855 const PRUNE_LOC: u64 = 30;
857 db.prune(Location::new(PRUNE_LOC)).await.unwrap();
858
859 let oldest_retained = db.bounds().await.start;
861
862 assert_eq!(
864 db.root(),
865 root,
866 "Root should not change after pruning"
867 );
868
869 db.sync().await.unwrap();
870 drop(db);
871 let mut db = open_db(context.with_label("db2")).await;
872 assert_eq!(db.root(), root);
873 let bounds = db.bounds().await;
874 assert_eq!(bounds.end, 2 * ELEMENTS + 3);
875 assert!(bounds.start <= PRUNE_LOC);
876
877 for i in 0..*oldest_retained {
879 let result = db.get(Location::new(i)).await;
880 match result {
882 Ok(None) => {} Ok(Some(_)) => {
884 panic!("Should not be able to get pruned value at location {i}")
885 }
886 Err(_) => {} }
888 }
889
890 let test_cases = vec![
892 (oldest_retained, 10), (Location::new(50), 20), (Location::new(150), 10), (Location::new(190), 15), ];
897
898 for (start_loc, max_ops) in test_cases {
899 if start_loc < oldest_retained {
901 continue;
902 }
903
904 let (proof, ops) = db.proof(start_loc, NZU64!(max_ops)).await.unwrap();
905
906 assert!(
908 verify_proof(&mut hasher, &proof, start_loc, &ops, &root),
909 "Failed to verify proof for range starting at {start_loc} with max {max_ops} ops after pruning",
910 );
911
912 let expected_ops = std::cmp::min(max_ops, *db.bounds().await.end - *start_loc);
914 assert_eq!(
915 ops.len() as u64,
916 expected_ops,
917 "Expected {expected_ops} operations, got {}",
918 ops.len(),
919 );
920 }
921
922 const AGGRESSIVE_PRUNE: Location = Location::new(150);
924 db.prune(AGGRESSIVE_PRUNE).await.unwrap();
925
926 let new_oldest = db.bounds().await.start;
927 assert!(new_oldest <= AGGRESSIVE_PRUNE);
928
929 let (proof, ops) = db.proof(new_oldest, NZU64!(20)).await.unwrap();
931 assert!(
932 verify_proof(&mut hasher, &proof, new_oldest, &ops, &root),
933 "Proof should still verify after aggressive pruning"
934 );
935
936 let almost_all = db.bounds().await.end - 5;
938 db.prune(almost_all).await.unwrap();
939
940 let bounds = db.bounds().await;
941 let final_oldest = bounds.start;
942
943 if final_oldest < bounds.end {
945 let (final_proof, final_ops) = db.proof(final_oldest, NZU64!(10)).await.unwrap();
946 assert!(
947 verify_proof(&mut hasher, &final_proof, final_oldest, &final_ops, &root),
948 "Should be able to prove remaining operations after extensive pruning"
949 );
950 }
951
952 db.destroy().await.unwrap();
953 });
954 }
955
956 #[test_traced("WARN")]
957 fn test_keyless_db_replay_with_trailing_appends() {
958 let executor = deterministic::Runner::default();
959 executor.start(|context| async move {
960 let mut db = open_db(context.with_label("db1")).await;
962
963 let finalized = {
965 let mut batch = db.new_batch();
966 for i in 0..10 {
967 let v = vec![i as u8; 10];
968 batch.append(v);
969 }
970 batch.merkleize(None).finalize()
971 };
972 db.apply_batch(finalized).await.unwrap();
973 let committed_root = db.root();
974 let committed_size = db.bounds().await.end;
975
976 {
978 let mut batch = db.new_batch();
979 batch.append(vec![99u8; 20]);
980 }
982
983 drop(db);
985
986 let mut db = open_db(context.with_label("db2")).await;
988
989 assert_eq!(
991 db.bounds().await.end,
992 committed_size,
993 "Should rewind to last commit"
994 );
995 assert_eq!(db.root(), committed_root, "Root should match last commit");
996 assert_eq!(
997 db.last_commit_loc(),
998 committed_size - 1,
999 "Last commit location should be correct"
1000 );
1001
1002 let new_value = vec![77u8; 15];
1004 let finalized = {
1005 let mut batch = db.new_batch();
1006 let loc = batch.append(new_value.clone());
1007 assert_eq!(
1008 loc, committed_size,
1009 "New append should get the expected location"
1010 );
1011 batch.merkleize(None).finalize()
1012 };
1013 db.apply_batch(finalized).await.unwrap();
1014
1015 assert_eq!(db.get(committed_size).await.unwrap(), Some(new_value));
1017
1018 let new_committed_root = db.root();
1019 let new_committed_size = db.bounds().await.end;
1020
1021 {
1023 let mut batch = db.new_batch();
1024 for i in 0..5 {
1025 let v = vec![(200 + i) as u8; 10];
1026 batch.append(v);
1027 }
1028 }
1030
1031 drop(db);
1033
1034 let db = open_db(context.with_label("db3")).await;
1036 assert_eq!(
1037 db.bounds().await.end,
1038 new_committed_size,
1039 "Should rewind to last commit with multiple trailing appends"
1040 );
1041 assert_eq!(
1042 db.root(),
1043 new_committed_root,
1044 "Root should match last commit after multiple appends"
1045 );
1046 assert_eq!(
1047 db.last_commit_loc(),
1048 new_committed_size - 1,
1049 "Last commit location should be correct after multiple appends"
1050 );
1051
1052 db.destroy().await.unwrap();
1053 });
1054 }
1055
1056 #[test_traced("INFO")]
1057 pub fn test_keyless_db_get_out_of_bounds() {
1058 let executor = deterministic::Runner::default();
1059 executor.start(|context| async move {
1060 let mut db = open_db(context.clone()).await;
1061
1062 let result = db.get(Location::new(0)).await.unwrap();
1064 assert!(result.is_none());
1065
1066 let v1 = vec![1u8; 8];
1068 let v2 = vec![2u8; 8];
1069 let finalized = {
1070 let mut batch = db.new_batch();
1071 batch.append(v1.clone());
1072 batch.append(v2.clone());
1073 batch.merkleize(None).finalize()
1074 };
1075 db.apply_batch(finalized).await.unwrap();
1076
1077 assert_eq!(db.get(Location::new(1)).await.unwrap().unwrap(), v1);
1079 assert_eq!(db.get(Location::new(2)).await.unwrap().unwrap(), v2);
1080
1081 let result = db.get(Location::new(3)).await.unwrap();
1083 assert!(result.is_none());
1084
1085 let result = db.get(Location::new(4)).await;
1087 assert!(
1088 matches!(result, Err(Error::LocationOutOfBounds(loc, size)) if loc == Location::new(4) && size == Location::new(4))
1089 );
1090 db.destroy().await.unwrap();
1091 });
1092 }
1093
1094 #[test_traced("INFO")]
1095 pub fn test_keyless_db_prune_beyond_commit() {
1096 let executor = deterministic::Runner::default();
1097 executor.start(|context| async move {
1098 let mut db = open_db(context.clone()).await;
1099
1100 let result = db.prune(Location::new(1)).await;
1102 assert!(
1103 matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
1104 if prune_loc == Location::new(1) && commit_loc == Location::new(0))
1105 );
1106
1107 let v1 = vec![1u8; 8];
1109 let v2 = vec![2u8; 8];
1110 let v3 = vec![3u8; 8];
1111 let finalized = {
1112 let mut batch = db.new_batch();
1113 batch.append(v1.clone());
1114 batch.append(v2.clone());
1115 batch.merkleize(None).finalize()
1116 };
1117 db.apply_batch(finalized).await.unwrap();
1118
1119 let last_commit = db.last_commit_loc();
1121 assert_eq!(last_commit, Location::new(3));
1122
1123 let finalized = {
1124 let mut batch = db.new_batch();
1125 batch.append(v3.clone());
1126 batch.merkleize(None).finalize()
1127 };
1128 db.apply_batch(finalized).await.unwrap();
1129
1130 assert!(db.prune(Location::new(3)).await.is_ok());
1132
1133 let new_last_commit = db.last_commit_loc();
1135 let beyond = Location::new(*new_last_commit + 1);
1136 let result = db.prune(beyond).await;
1137 assert!(
1138 matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
1139 if prune_loc == beyond && commit_loc == new_last_commit)
1140 );
1141
1142 db.destroy().await.unwrap();
1143 });
1144 }
1145
1146 use crate::{
1147 kv::tests::assert_send,
1148 qmdb::store::tests::{assert_log_store, assert_merkleized_store},
1149 };
1150
1151 #[allow(dead_code)]
1152 fn assert_db_futures_are_send(db: &mut Db, loc: Location) {
1153 assert_log_store(db);
1154 assert_merkleized_store(db, loc);
1155 assert_send(db.sync());
1156 assert_send(db.get(loc));
1157 }
1158
1159 #[test_traced("INFO")]
1161 fn test_keyless_batch_get() {
1162 let executor = deterministic::Runner::default();
1163 executor.start(|context| async move {
1164 let mut db = open_db(context.with_label("db")).await;
1165
1166 let base_vals: Vec<Vec<u8>> = (0..3).map(|i| vec![(10 + i) as u8; 12]).collect();
1168 let mut base_locs = Vec::new();
1169 let finalized = {
1170 let mut batch = db.new_batch();
1171 for v in &base_vals {
1172 base_locs.push(batch.append(v.clone()));
1173 }
1174 batch.merkleize(None).finalize()
1175 };
1176 db.apply_batch(finalized).await.unwrap();
1177
1178 let mut batch = db.new_batch();
1180 for (i, loc) in base_locs.iter().enumerate() {
1181 assert_eq!(
1182 batch.get(*loc).await.unwrap(),
1183 Some(base_vals[i].clone()),
1184 "base DB value at loc {loc} mismatch"
1185 );
1186 }
1187
1188 let new_val = vec![99u8; 16];
1190 let new_loc = batch.append(new_val.clone());
1191 assert_eq!(batch.get(new_loc).await.unwrap(), Some(new_val));
1192
1193 let beyond = Location::new(*new_loc + 1);
1195 assert_eq!(batch.get(beyond).await.unwrap(), None);
1196
1197 db.destroy().await.unwrap();
1198 });
1199 }
1200
1201 #[test_traced("INFO")]
1203 fn test_keyless_batch_stacked_get() {
1204 let executor = deterministic::Runner::default();
1205 executor.start(|context| async move {
1206 let db = open_db(context.with_label("db")).await;
1207
1208 let v1 = vec![1u8; 8];
1209 let v2 = vec![2u8; 16];
1210
1211 let mut parent = db.new_batch();
1213 let loc1 = parent.append(v1.clone());
1214 let parent_m = parent.merkleize(None);
1215
1216 let mut child = parent_m.new_batch();
1218 assert_eq!(child.get(loc1).await.unwrap(), Some(v1));
1219
1220 let loc2 = child.append(v2.clone());
1222 assert_eq!(child.get(loc2).await.unwrap(), Some(v2));
1223
1224 let nonexistent = Location::new(9999);
1226 assert_eq!(child.get(nonexistent).await.unwrap(), None);
1227
1228 db.destroy().await.unwrap();
1229 });
1230 }
1231
1232 #[test_traced("INFO")]
1234 fn test_keyless_batch_metadata() {
1235 let executor = deterministic::Runner::default();
1236 executor.start(|context| async move {
1237 let mut db = open_db(context.with_label("db")).await;
1238
1239 let metadata = vec![42u8; 32];
1241 let finalized = {
1242 let mut batch = db.new_batch();
1243 batch.append(vec![1u8; 8]);
1244 batch.merkleize(Some(metadata.clone())).finalize()
1245 };
1246 db.apply_batch(finalized).await.unwrap();
1247 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
1248
1249 let finalized = db.new_batch().merkleize(None).finalize();
1251 db.apply_batch(finalized).await.unwrap();
1252 assert_eq!(db.get_metadata().await.unwrap(), None);
1253
1254 db.destroy().await.unwrap();
1255 });
1256 }
1257
1258 #[test_traced("INFO")]
1260 fn test_keyless_batch_speculative_root() {
1261 let executor = deterministic::Runner::default();
1262 executor.start(|context| async move {
1263 let mut db = open_db(context.with_label("db")).await;
1264
1265 let merkleized = {
1266 let mut batch = db.new_batch();
1267 for i in 0u8..10 {
1268 batch.append(vec![i; 16]);
1269 }
1270 batch.merkleize(None)
1271 };
1272
1273 let speculative = merkleized.root();
1275 let finalized = merkleized.finalize();
1276 db.apply_batch(finalized).await.unwrap();
1277
1278 assert_eq!(db.root(), speculative);
1279
1280 let metadata = vec![55u8; 8];
1282 let merkleized = {
1283 let mut batch = db.new_batch();
1284 batch.append(vec![0xAA; 20]);
1285 batch.merkleize(Some(metadata))
1286 };
1287 let speculative = merkleized.root();
1288 let finalized = merkleized.finalize();
1289 db.apply_batch(finalized).await.unwrap();
1290
1291 assert_eq!(db.root(), speculative);
1292
1293 db.destroy().await.unwrap();
1294 });
1295 }
1296
1297 #[test_traced("INFO")]
1299 fn test_keyless_merkleized_batch_get() {
1300 let executor = deterministic::Runner::default();
1301 executor.start(|context| async move {
1302 let mut db = open_db(context.with_label("db")).await;
1303
1304 let base_val = vec![10u8; 12];
1306 let finalized = {
1307 let mut batch = db.new_batch();
1308 batch.append(base_val.clone());
1309 batch.merkleize(None).finalize()
1310 };
1311 db.apply_batch(finalized).await.unwrap();
1312 let new_val = vec![20u8; 16];
1316 let mut batch = db.new_batch();
1317 batch.append(new_val.clone());
1318 let merkleized = batch.merkleize(None);
1319 assert_eq!(
1323 merkleized.get(Location::new(1)).await.unwrap(),
1324 Some(base_val),
1325 );
1326
1327 assert_eq!(
1329 merkleized.get(Location::new(3)).await.unwrap(),
1330 Some(new_val),
1331 );
1332
1333 assert_eq!(merkleized.get(Location::new(4)).await.unwrap(), None);
1335
1336 db.destroy().await.unwrap();
1337 });
1338 }
1339
1340 #[test_traced("INFO")]
1346 fn test_keyless_batch_chained_apply() {
1347 let executor = deterministic::Runner::default();
1348 executor.start(|context| async move {
1349 let mut db = open_db(context.with_label("db")).await;
1350
1351 let v1 = vec![1u8; 8];
1352 let v2 = vec![2u8; 16];
1353 let v3 = vec![3u8; 24];
1354
1355 let mut parent = db.new_batch();
1357 let loc1 = parent.append(v1.clone());
1358 let parent_m = parent.merkleize(None);
1359 let parent_root = parent_m.root();
1360
1361 let mut child = parent_m.new_batch();
1363 let loc2 = child.append(v2.clone());
1364 let loc3 = child.append(v3.clone());
1365 let child_m = child.merkleize(None);
1366 let child_root = child_m.root();
1367
1368 assert_ne!(parent_root, child_root);
1370
1371 let child_finalized = child_m.finalize();
1373 db.apply_batch(child_finalized).await.unwrap();
1374 assert_eq!(db.root(), child_root);
1375 assert_eq!(db.get(loc1).await.unwrap(), Some(v1));
1376 assert_eq!(db.get(loc2).await.unwrap(), Some(v2));
1377 assert_eq!(db.get(loc3).await.unwrap(), Some(v3));
1378
1379 drop(db);
1381 let db = open_db(context.with_label("db2")).await;
1382 assert_eq!(db.root(), child_root);
1383
1384 db.destroy().await.unwrap();
1385 });
1386 }
1387
1388 #[test_traced("INFO")]
1390 fn test_keyless_batch_chained_apply_sequential() {
1391 let executor = deterministic::Runner::default();
1392 executor.start(|context| async move {
1393 let mut db = open_db(context.with_label("db")).await;
1394
1395 let v1 = vec![1u8; 8];
1396 let v2 = vec![2u8; 16];
1397
1398 let mut parent = db.new_batch();
1400 let loc1 = parent.append(v1.clone());
1401 let parent_m = parent.merkleize(None);
1402 let parent_root = parent_m.root();
1403
1404 let parent_finalized = parent_m.finalize();
1406 db.apply_batch(parent_finalized).await.unwrap();
1407 assert_eq!(db.root(), parent_root);
1408 assert_eq!(db.get(loc1).await.unwrap(), Some(v1));
1409
1410 let mut batch2 = db.new_batch();
1412 let loc2 = batch2.append(v2.clone());
1413 let batch2_m = batch2.merkleize(None);
1414 let batch2_root = batch2_m.root();
1415 let batch2_finalized = batch2_m.finalize();
1416 db.apply_batch(batch2_finalized).await.unwrap();
1417 assert_eq!(db.root(), batch2_root);
1418 assert_eq!(db.get(loc2).await.unwrap(), Some(v2));
1419
1420 db.destroy().await.unwrap();
1421 });
1422 }
1423
1424 #[test_traced("INFO")]
1426 fn test_keyless_batch_many_sequential() {
1427 let executor = deterministic::Runner::default();
1428 executor.start(|context| async move {
1429 let mut db = open_db(context.with_label("db")).await;
1430 let mut hasher = Standard::<Sha256>::new();
1431
1432 const BATCHES: u64 = 20;
1433 const APPENDS_PER_BATCH: u64 = 5;
1434
1435 let mut all_values: Vec<Vec<u8>> = Vec::new();
1436 let mut all_locs: Vec<Location> = Vec::new();
1437
1438 for batch_idx in 0..BATCHES {
1439 let finalized = {
1440 let mut batch = db.new_batch();
1441 for j in 0..APPENDS_PER_BATCH {
1442 let v = vec![(batch_idx * 10 + j) as u8; 8];
1443 let loc = batch.append(v.clone());
1444 all_values.push(v);
1445 all_locs.push(loc);
1446 }
1447 batch.merkleize(None).finalize()
1448 };
1449 db.apply_batch(finalized).await.unwrap();
1450 }
1451
1452 for (i, loc) in all_locs.iter().enumerate() {
1454 assert_eq!(
1455 db.get(*loc).await.unwrap(),
1456 Some(all_values[i].clone()),
1457 "mismatch at index {i}, loc {loc}"
1458 );
1459 }
1460
1461 let root = db.root();
1463 let (proof, ops) = db.proof(Location::new(0), NZU64!(1000)).await.unwrap();
1464 assert!(verify_proof(
1465 &mut hasher,
1466 &proof,
1467 Location::new(0),
1468 &ops,
1469 &root
1470 ));
1471
1472 let expected = 1 + BATCHES * (APPENDS_PER_BATCH + 1);
1474 assert_eq!(db.bounds().await.end, expected);
1475
1476 db.destroy().await.unwrap();
1477 });
1478 }
1479
1480 #[test_traced("INFO")]
1482 fn test_keyless_batch_empty() {
1483 let executor = deterministic::Runner::default();
1484 executor.start(|context| async move {
1485 let mut db = open_db(context.with_label("db")).await;
1486
1487 let finalized = {
1489 let mut batch = db.new_batch();
1490 batch.append(vec![1u8; 8]);
1491 batch.merkleize(None).finalize()
1492 };
1493 db.apply_batch(finalized).await.unwrap();
1494 let root_before = db.root();
1495 let size_before = db.bounds().await.end;
1496
1497 let merkleized = db.new_batch().merkleize(None);
1499 let speculative = merkleized.root();
1500 let finalized = merkleized.finalize();
1501 db.apply_batch(finalized).await.unwrap();
1502
1503 assert_ne!(db.root(), root_before);
1505 assert_eq!(db.root(), speculative);
1506 assert_eq!(db.bounds().await.end, size_before + 1);
1508
1509 db.destroy().await.unwrap();
1510 });
1511 }
1512
1513 #[test_traced("INFO")]
1515 fn test_keyless_batch_chained_merkleized_get() {
1516 let executor = deterministic::Runner::default();
1517 executor.start(|context| async move {
1518 let mut db = open_db(context.with_label("db")).await;
1519
1520 let base_val = vec![10u8; 12];
1522 let finalized = {
1523 let mut batch = db.new_batch();
1524 batch.append(base_val.clone());
1525 batch.merkleize(None).finalize()
1526 };
1527 db.apply_batch(finalized).await.unwrap();
1528 let base_loc = Location::new(1);
1529
1530 let v1 = vec![1u8; 8];
1532 let mut parent = db.new_batch();
1533 let loc1 = parent.append(v1.clone());
1534 let parent_m = parent.merkleize(None);
1535
1536 let v2 = vec![2u8; 16];
1538 let mut child = parent_m.new_batch();
1539 let loc2 = child.append(v2.clone());
1540 let child_m = child.merkleize(None);
1541
1542 assert_eq!(
1545 child_m.get(base_loc).await.unwrap(),
1546 Some(base_val),
1547 "should read base DB value"
1548 );
1549 assert_eq!(
1551 child_m.get(loc1).await.unwrap(),
1552 Some(v1),
1553 "should read parent chain value"
1554 );
1555 assert_eq!(
1557 child_m.get(loc2).await.unwrap(),
1558 Some(v2),
1559 "should read child's own value"
1560 );
1561
1562 db.destroy().await.unwrap();
1563 });
1564 }
1565
1566 #[test_traced("INFO")]
1568 fn test_keyless_batch_large() {
1569 let executor = deterministic::Runner::default();
1570 executor.start(|context| async move {
1571 let mut db = open_db(context.with_label("db")).await;
1572 let mut hasher = Standard::<Sha256>::new();
1573
1574 const N: u64 = 500;
1575 let mut values = Vec::new();
1576 let mut locs = Vec::new();
1577
1578 let finalized = {
1579 let mut batch = db.new_batch();
1580 for i in 0..N {
1581 let v = vec![(i % 256) as u8; ((i % 29) + 3) as usize];
1582 locs.push(batch.append(v.clone()));
1583 values.push(v);
1584 }
1585 batch.merkleize(None).finalize()
1586 };
1587 db.apply_batch(finalized).await.unwrap();
1588
1589 for (i, loc) in locs.iter().enumerate() {
1591 assert_eq!(
1592 db.get(*loc).await.unwrap(),
1593 Some(values[i].clone()),
1594 "mismatch at index {i}"
1595 );
1596 }
1597
1598 let root = db.root();
1600 let (proof, ops) = db.proof(Location::new(0), NZU64!(1000)).await.unwrap();
1601 assert!(verify_proof(
1602 &mut hasher,
1603 &proof,
1604 Location::new(0),
1605 &ops,
1606 &root
1607 ));
1608
1609 assert_eq!(db.bounds().await.end, 1 + N + 1);
1611
1612 db.destroy().await.unwrap();
1613 });
1614 }
1615
1616 #[test_traced]
1617 fn test_stale_changeset_rejected() {
1618 let executor = deterministic::Runner::default();
1619 executor.start(|context| async move {
1620 let mut db = open_db(context.with_label("db")).await;
1621
1622 let changeset_a = {
1624 let mut batch = db.new_batch();
1625 batch.append(vec![10]);
1626 batch.merkleize(None).finalize()
1627 };
1628 let changeset_b = {
1629 let mut batch = db.new_batch();
1630 batch.append(vec![20]);
1631 batch.merkleize(None).finalize()
1632 };
1633
1634 db.apply_batch(changeset_a).await.unwrap();
1636 let expected_root = db.root();
1637 let expected_bounds = db.bounds().await;
1638 let expected_last_commit = db.last_commit_loc();
1639 assert_eq!(db.get(Location::new(1)).await.unwrap(), Some(vec![10]));
1640 assert_eq!(db.get_metadata().await.unwrap(), None);
1641
1642 let result = db.apply_batch(changeset_b).await;
1644 assert!(
1645 matches!(result, Err(Error::StaleChangeset { .. })),
1646 "expected StaleChangeset error, got {result:?}"
1647 );
1648 assert_eq!(db.root(), expected_root);
1649 assert_eq!(db.bounds().await, expected_bounds);
1650 assert_eq!(db.last_commit_loc(), expected_last_commit);
1651 assert_eq!(db.get(Location::new(1)).await.unwrap(), Some(vec![10]));
1652 assert_eq!(db.get_metadata().await.unwrap(), None);
1653
1654 db.destroy().await.unwrap();
1655 });
1656 }
1657
1658 #[test_traced]
1659 fn test_stale_changeset_chained() {
1660 let executor = deterministic::Runner::default();
1661 executor.start(|context| async move {
1662 let mut db = open_db(context.with_label("db")).await;
1663
1664 let mut parent = db.new_batch();
1666 parent.append(vec![1]);
1667 let parent_m = parent.merkleize(None);
1668
1669 let child_a = {
1671 let mut batch = parent_m.new_batch();
1672 batch.append(vec![2]);
1673 batch.merkleize(None).finalize()
1674 };
1675 let child_b = {
1676 let mut batch = parent_m.new_batch();
1677 batch.append(vec![3]);
1678 batch.merkleize(None).finalize()
1679 };
1680
1681 db.apply_batch(child_a).await.unwrap();
1683
1684 let result = db.apply_batch(child_b).await;
1686 assert!(
1687 matches!(result, Err(Error::StaleChangeset { .. })),
1688 "expected StaleChangeset error, got {result:?}"
1689 );
1690
1691 db.destroy().await.unwrap();
1692 });
1693 }
1694
1695 #[test_traced]
1696 fn test_stale_changeset_parent_applied_before_child() {
1697 let executor = deterministic::Runner::default();
1698 executor.start(|context| async move {
1699 let mut db = open_db(context.with_label("db")).await;
1700
1701 let mut parent = db.new_batch();
1703 parent.append(vec![1]);
1704 let parent_m = parent.merkleize(None);
1705
1706 let mut child = parent_m.new_batch();
1708 child.append(vec![2]);
1709 let child_changeset = child.merkleize(None).finalize();
1710
1711 let parent_changeset = parent_m.finalize();
1713 db.apply_batch(parent_changeset).await.unwrap();
1714
1715 let result = db.apply_batch(child_changeset).await;
1718 assert!(
1719 matches!(result, Err(Error::StaleChangeset { .. })),
1720 "expected StaleChangeset error, got {result:?}"
1721 );
1722
1723 db.destroy().await.unwrap();
1724 });
1725 }
1726
1727 #[test_traced]
1728 fn test_stale_changeset_child_applied_before_parent() {
1729 let executor = deterministic::Runner::default();
1730 executor.start(|context| async move {
1731 let mut db = open_db(context.with_label("db")).await;
1732
1733 let mut parent = db.new_batch();
1735 parent.append(vec![1]);
1736 let parent_m = parent.merkleize(None);
1737
1738 let mut child = parent_m.new_batch();
1741 child.append(vec![2]);
1742 let child_changeset = child.merkleize(None).finalize();
1743 let parent_changeset = parent_m.finalize();
1744
1745 db.apply_batch(child_changeset).await.unwrap();
1747
1748 let result = db.apply_batch(parent_changeset).await;
1750 assert!(
1751 matches!(result, Err(Error::StaleChangeset { .. })),
1752 "expected StaleChangeset error, got {result:?}"
1753 );
1754
1755 db.destroy().await.unwrap();
1756 });
1757 }
1758}