1use crate::{
5 journal::{
6 authenticated,
7 contiguous::variable::{Config as JournalConfig, Journal as ContiguousJournal},
8 },
9 mmr::{journaled::Config as MmrConfig, Location, Proof},
10 qmdb::{
11 any::VariableValue,
12 operation::Committable,
13 store::{LogStore, MerkleizedStore, PrunableStore},
14 DurabilityState, Durable, Error, MerkleizationState, Merkleized, NonDurable, Unmerkleized,
15 },
16};
17use commonware_cryptography::{DigestOf, Hasher};
18use commonware_parallel::ThreadPool;
19use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage};
20use core::{marker::PhantomData, ops::Range};
21use std::num::{NonZeroU64, NonZeroUsize};
22use tracing::{debug, warn};
23
24mod operation;
25pub use operation::Operation;
26
27#[derive(Clone)]
29pub struct Config<C> {
30 pub mmr_journal_partition: String,
32
33 pub mmr_items_per_blob: NonZeroU64,
35
36 pub mmr_write_buffer: NonZeroUsize,
38
39 pub mmr_metadata_partition: String,
41
42 pub log_partition: String,
44
45 pub log_write_buffer: NonZeroUsize,
47
48 pub log_compression: Option<u8>,
50
51 pub log_codec_config: C,
53
54 pub log_items_per_section: NonZeroU64,
56
57 pub thread_pool: Option<ThreadPool>,
59
60 pub page_cache: CacheRef,
62}
63
64type Journal<E, V, H, S> = authenticated::Journal<E, ContiguousJournal<E, Operation<V>>, H, S>;
66
67pub struct Keyless<
69 E: Storage + Clock + Metrics,
70 V: VariableValue,
71 H: Hasher,
72 M: MerkleizationState<DigestOf<H>> = Merkleized<H>,
73 D: DurabilityState = Durable,
74> {
75 journal: Journal<E, V, H, M>,
77
78 last_commit_loc: Location,
80
81 _durability: PhantomData<D>,
83}
84
85impl<
87 E: Storage + Clock + Metrics,
88 V: VariableValue,
89 H: Hasher,
90 M: MerkleizationState<DigestOf<H>>,
91 D: DurabilityState,
92 > Keyless<E, V, H, M, D>
93{
94 pub async fn get(&self, loc: Location) -> Result<Option<V>, Error> {
100 let op_count = self.journal.bounds().end;
101 if loc >= op_count {
102 return Err(Error::LocationOutOfBounds(loc, op_count));
103 }
104 let op = self.journal.read(loc).await?;
105
106 Ok(op.into_value())
107 }
108
109 pub const fn last_commit_loc(&self) -> Location {
111 self.last_commit_loc
112 }
113
114 pub fn inactivity_floor_loc(&self) -> Location {
116 self.bounds().start
117 }
118
119 pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
121 let op = self.journal.read(self.last_commit_loc).await?;
122 let Operation::Commit(metadata) = op else {
123 return Ok(None);
124 };
125
126 Ok(metadata)
127 }
128}
129
130impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher>
132 Keyless<E, V, H, Merkleized<H>, Durable>
133{
134 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
137 let mmr_cfg = MmrConfig {
138 journal_partition: cfg.mmr_journal_partition,
139 metadata_partition: cfg.mmr_metadata_partition,
140 items_per_blob: cfg.mmr_items_per_blob,
141 write_buffer: cfg.mmr_write_buffer,
142 thread_pool: cfg.thread_pool,
143 page_cache: cfg.page_cache.clone(),
144 };
145
146 let journal_cfg = JournalConfig {
147 partition: cfg.log_partition,
148 items_per_section: cfg.log_items_per_section,
149 compression: cfg.log_compression,
150 codec_config: cfg.log_codec_config,
151 page_cache: cfg.page_cache,
152 write_buffer: cfg.log_write_buffer,
153 };
154
155 let mut journal = Journal::new(context, mmr_cfg, journal_cfg, Operation::is_commit).await?;
156 if journal.size() == 0 {
157 warn!("no operations found in log, creating initial commit");
158 let mut dirty_journal = journal.into_dirty();
159 dirty_journal.append(Operation::Commit(None)).await?;
160 journal = dirty_journal.merkleize();
161 journal.sync().await?;
162 }
163
164 let last_commit_loc = journal
165 .size()
166 .checked_sub(1)
167 .expect("at least one commit should exist");
168
169 Ok(Self {
170 journal,
171 last_commit_loc,
172 _durability: PhantomData,
173 })
174 }
175
176 pub const fn root(&self) -> H::Digest {
178 self.journal.root()
179 }
180
181 pub async fn proof(
188 &self,
189 start_loc: Location,
190 max_ops: NonZeroU64,
191 ) -> Result<(Proof<H::Digest>, Vec<Operation<V>>), Error> {
192 self.historical_proof(self.size(), start_loc, max_ops).await
193 }
194
195 pub async fn historical_proof(
205 &self,
206 op_count: Location,
207 start_loc: Location,
208 max_ops: NonZeroU64,
209 ) -> Result<(Proof<H::Digest>, Vec<Operation<V>>), Error> {
210 Ok(self
211 .journal
212 .historical_proof(op_count, start_loc, max_ops)
213 .await?)
214 }
215
216 pub async fn prune(&mut self, loc: Location) -> Result<(), Error> {
223 if loc > self.last_commit_loc {
224 return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
225 }
226 self.journal.prune(loc).await?;
227
228 Ok(())
229 }
230
231 pub async fn sync(&mut self) -> Result<(), Error> {
235 self.journal.sync().await.map_err(Into::into)
236 }
237
238 pub async fn destroy(self) -> Result<(), Error> {
240 Ok(self.journal.destroy().await?)
241 }
242
243 pub fn into_mutable(self) -> Keyless<E, V, H, Unmerkleized, NonDurable> {
245 Keyless {
246 journal: self.journal.into_dirty(),
247 last_commit_loc: self.last_commit_loc,
248 _durability: PhantomData,
249 }
250 }
251}
252
253impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher>
255 Keyless<E, V, H, Unmerkleized, NonDurable>
256{
257 pub async fn append(&mut self, value: V) -> Result<Location, Error> {
259 self.journal
260 .append(Operation::Append(value))
261 .await
262 .map_err(Into::into)
263 }
264
265 pub async fn commit(
271 mut self,
272 metadata: Option<V>,
273 ) -> Result<(Keyless<E, V, H, Unmerkleized, Durable>, Range<Location>), Error> {
274 let start_loc = self.last_commit_loc + 1;
275 self.last_commit_loc = self.journal.append(Operation::Commit(metadata)).await?;
276 self.journal.commit().await?;
277 let op_count = self.bounds().end;
278 debug!(size = ?op_count, "committed db");
279
280 let durable = Keyless {
281 journal: self.journal,
282 last_commit_loc: self.last_commit_loc,
283 _durability: PhantomData,
284 };
285
286 Ok((durable, start_loc..op_count))
287 }
288
289 pub fn into_merkleized(self) -> Keyless<E, V, H, Merkleized<H>, Durable> {
290 Keyless {
291 journal: self.journal.merkleize(),
292 last_commit_loc: self.last_commit_loc,
293 _durability: PhantomData,
294 }
295 }
296}
297
298impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher>
300 Keyless<E, V, H, Unmerkleized, Durable>
301{
302 pub fn into_mutable(self) -> Keyless<E, V, H, Unmerkleized, NonDurable> {
305 Keyless {
306 journal: self.journal,
307 last_commit_loc: self.last_commit_loc,
308 _durability: PhantomData,
309 }
310 }
311
312 pub fn into_merkleized(self) -> Keyless<E, V, H, Merkleized<H>, Durable> {
314 Keyless {
315 journal: self.journal.merkleize(),
316 last_commit_loc: self.last_commit_loc,
317 _durability: PhantomData,
318 }
319 }
320}
321
322impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher, D: DurabilityState> MerkleizedStore
324 for Keyless<E, V, H, Merkleized<H>, D>
325{
326 type Digest = H::Digest;
327 type Operation = Operation<V>;
328
329 fn root(&self) -> Self::Digest {
330 self.journal.root()
331 }
332
333 async fn historical_proof(
334 &self,
335 historical_size: Location,
336 start_loc: Location,
337 max_ops: NonZeroU64,
338 ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
339 Ok(self
340 .journal
341 .historical_proof(historical_size, start_loc, max_ops)
342 .await?)
343 }
344}
345
346impl<
348 E: Storage + Clock + Metrics,
349 V: VariableValue,
350 H: Hasher,
351 M: MerkleizationState<DigestOf<H>>,
352 D: DurabilityState,
353 > LogStore for Keyless<E, V, H, M, D>
354{
355 type Value = V;
356
357 fn is_empty(&self) -> bool {
358 self.bounds().end <= 1
361 }
362
363 fn bounds(&self) -> std::ops::Range<Location> {
364 self.journal.bounds()
365 }
366
367 fn inactivity_floor_loc(&self) -> Location {
368 self.inactivity_floor_loc()
369 }
370
371 async fn get_metadata(&self) -> Result<Option<Self::Value>, Error> {
372 self.get_metadata().await
373 }
374}
375
376impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher, D: DurabilityState> PrunableStore
378 for Keyless<E, V, H, Merkleized<H>, D>
379{
380 async fn prune(&mut self, loc: Location) -> Result<(), Error> {
381 if loc > self.last_commit_loc {
382 return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
383 }
384 self.journal.prune(loc).await?;
385 Ok(())
386 }
387}
388
389#[cfg(test)]
390mod test {
391 use super::*;
392 use crate::{
393 mmr::StandardHasher as Standard,
394 qmdb::{store::LogStore, verify_proof},
395 };
396 use commonware_cryptography::Sha256;
397 use commonware_macros::test_traced;
398 use commonware_runtime::{deterministic, Metrics, Runner as _};
399 use commonware_utils::{NZUsize, NZU16, NZU64};
400 use rand::Rng;
401 use std::num::NonZeroU16;
402
403 const PAGE_SIZE: NonZeroU16 = NZU16!(101);
405 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11);
406
407 fn db_config(suffix: &str) -> Config<(commonware_codec::RangeCfg<usize>, ())> {
408 Config {
409 mmr_journal_partition: format!("journal_{suffix}"),
410 mmr_metadata_partition: format!("metadata_{suffix}"),
411 mmr_items_per_blob: NZU64!(11),
412 mmr_write_buffer: NZUsize!(1024),
413 log_partition: format!("log_journal_{suffix}"),
414 log_write_buffer: NZUsize!(1024),
415 log_compression: None,
416 log_codec_config: ((0..=10000).into(), ()),
417 log_items_per_section: NZU64!(7),
418 thread_pool: None,
419 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
420 }
421 }
422
423 type CleanDb = Keyless<deterministic::Context, Vec<u8>, Sha256, Merkleized<Sha256>, Durable>;
425
426 type MutableDb = Keyless<deterministic::Context, Vec<u8>, Sha256, Unmerkleized, NonDurable>;
428
429 async fn open_db(context: deterministic::Context) -> CleanDb {
431 CleanDb::init(context, db_config("partition"))
432 .await
433 .unwrap()
434 }
435
436 #[test_traced("INFO")]
437 pub fn test_keyless_db_empty() {
438 let executor = deterministic::Runner::default();
439 executor.start(|context| async move {
440 let db = open_db(context.with_label("db1")).await;
441 assert_eq!(db.bounds().end, 1); assert_eq!(db.journal.bounds().start, Location::new_unchecked(0));
443
444 assert_eq!(db.get_metadata().await.unwrap(), None);
445 assert_eq!(db.last_commit_loc(), Location::new_unchecked(0));
446
447 let v1 = vec![1u8; 8];
449 let root = db.root();
450 let mut db = db.into_mutable();
451 db.append(v1).await.unwrap();
452 drop(db); let db = open_db(context.with_label("db2")).await;
454 assert_eq!(db.root(), root);
455 assert_eq!(db.bounds().end, 1);
456 assert_eq!(db.get_metadata().await.unwrap(), None);
457
458 let metadata = vec![3u8; 10];
460 let db = db.into_mutable();
461 let (durable, _) = db.commit(Some(metadata.clone())).await.unwrap();
462 let db = durable.into_merkleized();
463 assert_eq!(db.bounds().end, 2); assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
465 assert_eq!(
466 db.get(Location::new_unchecked(1)).await.unwrap(),
467 Some(metadata.clone())
468 ); let root = db.root();
470
471 let db = open_db(context.with_label("db3")).await;
473 assert_eq!(db.bounds().end, 2); assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
475 assert_eq!(db.root(), root);
476 assert_eq!(db.last_commit_loc(), Location::new_unchecked(1));
477
478 db.destroy().await.unwrap();
479 });
480 }
481
482 #[test_traced("WARN")]
483 pub fn test_keyless_db_build_basic() {
484 let executor = deterministic::Runner::default();
485 executor.start(|context| async move {
486 let db = open_db(context.with_label("db1")).await;
488 let mut db = db.into_mutable();
489
490 let v1 = vec![1u8; 8];
491 let v2 = vec![2u8; 20];
492
493 let loc1 = db.append(v1.clone()).await.unwrap();
494 assert_eq!(db.get(loc1).await.unwrap().unwrap(), v1);
495
496 let loc2 = db.append(v2.clone()).await.unwrap();
497 assert_eq!(db.get(loc2).await.unwrap().unwrap(), v2);
498
499 let (durable, _) = db.commit(None).await.unwrap();
501 let mut db = durable.into_merkleized();
502 assert_eq!(db.bounds().end, 4); assert_eq!(db.get_metadata().await.unwrap(), None);
504 assert_eq!(db.get(Location::new_unchecked(3)).await.unwrap(), None); let root = db.root();
506 db.sync().await.unwrap();
507 drop(db);
508 let db = open_db(context.with_label("db2")).await;
509 assert_eq!(db.bounds().end, 4);
510 assert_eq!(db.root(), root);
511
512 assert_eq!(db.get(loc1).await.unwrap().unwrap(), v1);
513 assert_eq!(db.get(loc2).await.unwrap().unwrap(), v2);
514
515 let mut db = db.into_mutable();
516 db.append(v2).await.unwrap();
517 db.append(v1).await.unwrap();
518
519 drop(db); let db = open_db(context.with_label("db3")).await;
522 assert_eq!(db.bounds().end, 4);
523 assert_eq!(db.root(), root);
524
525 drop(db);
527 let db = open_db(context.with_label("db4")).await;
528 assert_eq!(db.bounds().end, 4);
529 assert_eq!(db.root(), root);
530
531 db.destroy().await.unwrap();
532 });
533 }
534
535 async fn append_elements<T: Rng>(db: &mut MutableDb, rng: &mut T, num_elements: usize) {
537 for _ in 0..num_elements {
538 let value = vec![(rng.next_u32() % 255) as u8, (rng.next_u32() % 255) as u8];
539 db.append(value).await.unwrap();
540 }
541 }
542
543 #[test_traced("WARN")]
544 pub fn test_keyless_db_recovery() {
545 let executor = deterministic::Runner::default();
546 const ELEMENTS: usize = 1000;
547 executor.start(|mut context| async move {
548 let db = open_db(context.with_label("db1")).await;
549 let root = db.root();
550 let mut db = db.into_mutable();
551
552 append_elements(&mut db, &mut context, ELEMENTS).await;
553
554 drop(db);
556 let db = open_db(context.with_label("db2")).await;
558 assert_eq!(root, db.root());
559
560 let mut db = db.into_mutable();
562 append_elements(&mut db, &mut context, ELEMENTS).await;
563 let (durable, _) = db.commit(None).await.unwrap();
564 let db = durable.into_merkleized();
565 let root = db.root();
566
567 let mut db = db.into_mutable();
569 append_elements(&mut db, &mut context, ELEMENTS).await;
570
571 drop(db);
573 let db = open_db(context.with_label("db3")).await;
575 assert_eq!(root, db.root());
576
577 let mut db = db.into_mutable();
579 append_elements(&mut db, &mut context, ELEMENTS).await;
580 let (durable, _) = db.commit(None).await.unwrap();
581 let db = durable.into_merkleized();
582 let root = db.root();
583
584 drop(db);
586 let db = open_db(context.with_label("db4")).await;
587 assert_eq!(db.bounds().end, 2 * ELEMENTS as u64 + 3);
588 assert_eq!(db.root(), root);
589
590 db.destroy().await.unwrap();
591 });
592 }
593
594 #[test_traced("WARN")]
597 fn test_keyless_db_non_empty_db_recovery() {
598 let executor = deterministic::Runner::default();
599 executor.start(|mut context| async move {
600 let db = open_db(context.with_label("db1")).await;
601
602 const ELEMENTS: usize = 200;
604 let mut db = db.into_mutable();
605 append_elements(&mut db, &mut context, ELEMENTS).await;
606 let (durable, _) = db.commit(None).await.unwrap();
607 let db = durable.into_merkleized();
608 let root = db.root();
609 let op_count = db.bounds().end;
610
611 let db = open_db(context.with_label("db2")).await;
613 assert_eq!(db.bounds().end, op_count);
614 assert_eq!(db.root(), root);
615 assert_eq!(db.last_commit_loc(), op_count - 1);
616 drop(db);
617
618 async fn recover_from_failure(
620 mut context: deterministic::Context,
621 label1: &str,
622 label2: &str,
623 root: <Sha256 as Hasher>::Digest,
624 op_count: Location,
625 ) {
626 let mut db = open_db(context.with_label(label1)).await.into_mutable();
627
628 append_elements(&mut db, &mut context, ELEMENTS).await;
630 drop(db);
631 let db = open_db(context.with_label(label2)).await;
632 assert_eq!(db.bounds().end, op_count);
633 assert_eq!(db.root(), root);
634 }
635
636 recover_from_failure(context.with_label("recovery1"), "a", "b", root, op_count).await;
637
638 let mut db = open_db(context.with_label("db3")).await;
640 db.prune(db.last_commit_loc()).await.unwrap();
641 assert_eq!(db.bounds().end, op_count);
642 assert_eq!(db.root(), root);
643 db.sync().await.unwrap();
644 drop(db);
645
646 recover_from_failure(context.with_label("recovery2"), "c", "d", root, op_count).await;
647
648 let mut db = open_db(context.with_label("db4")).await.into_mutable();
650 append_elements(&mut db, &mut context, ELEMENTS).await;
651 let (_durable, _) = db.commit(None).await.unwrap();
652 let db = open_db(context.with_label("db5")).await;
653 assert!(db.bounds().end > op_count);
654 assert_ne!(db.root(), root);
655 assert_eq!(db.last_commit_loc(), db.bounds().end - 1);
656
657 db.destroy().await.unwrap();
658 });
659 }
660
661 #[test_traced("WARN")]
664 fn test_keyless_db_empty_db_recovery() {
665 const ELEMENTS: u64 = 1000;
666 let executor = deterministic::Runner::default();
667 executor.start(|context| async move {
668 let db = open_db(context.with_label("db1")).await;
669 let root = db.root();
670
671 let db = open_db(context.with_label("db2")).await;
673 assert_eq!(db.bounds().end, 1); assert_eq!(db.root(), root);
675
676 async fn apply_ops(db: &mut MutableDb) {
677 for i in 0..ELEMENTS {
678 let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
679 db.append(v).await.unwrap();
680 }
681 }
682
683 let mut db = db.into_mutable();
685 apply_ops(&mut db).await;
686 drop(db);
687 let db = open_db(context.with_label("db3")).await;
688 assert_eq!(db.bounds().end, 1); assert_eq!(db.root(), root);
690
691 let mut db = db.into_mutable();
693 apply_ops(&mut db).await;
694 drop(db);
695 let db = open_db(context.with_label("db4")).await;
696 assert_eq!(db.bounds().end, 1); assert_eq!(db.root(), root);
698
699 let mut db = db.into_mutable();
701 apply_ops(&mut db).await;
702 apply_ops(&mut db).await;
703 apply_ops(&mut db).await;
704 drop(db);
705 let db = open_db(context.with_label("db5")).await;
706 assert_eq!(db.bounds().end, 1); assert_eq!(db.root(), root);
708 assert_eq!(db.last_commit_loc(), Location::new_unchecked(0));
709
710 let mut db = db.into_mutable();
712 apply_ops(&mut db).await;
713 let (_db, _) = db.commit(None).await.unwrap();
714 let db = open_db(context.with_label("db6")).await;
715 assert!(db.bounds().end > 1);
716 assert_ne!(db.root(), root);
717
718 db.destroy().await.unwrap();
719 });
720 }
721
722 #[test_traced("INFO")]
723 pub fn test_keyless_db_proof_generation_and_verification() {
724 let executor = deterministic::Runner::default();
725 executor.start(|context| async move {
726 let mut hasher = Standard::<Sha256>::new();
727 let db = open_db(context.clone()).await;
728 let mut db = db.into_mutable();
729
730 const ELEMENTS: u64 = 100;
732 let mut values = Vec::new();
733 for i in 0u64..ELEMENTS {
734 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
735 values.push(v.clone());
736 db.append(v).await.unwrap();
737 }
738 let (durable, _) = db.commit(None).await.unwrap();
739 let db = durable.into_merkleized();
740
741 assert!(matches!(
743 db.historical_proof(db.bounds().end + 1, Location::new_unchecked(5), NZU64!(10))
744 .await,
745 Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
746 ));
747
748 let root = db.root();
749
750 let test_cases = vec![
752 (0, 10), (10, 5), (50, 20), (90, 15), (0, 1), (ELEMENTS - 1, 1), (ELEMENTS, 1), ];
760
761 for (start_loc, max_ops) in test_cases {
762 let (proof, ops) = db.proof(Location::new_unchecked(start_loc), NZU64!(max_ops)).await.unwrap();
763
764 assert!(
766 verify_proof(&mut hasher, &proof, Location::new_unchecked(start_loc), &ops, &root),
767 "Failed to verify proof for range starting at {start_loc} with max {max_ops} ops",
768 );
769
770 let expected_ops = std::cmp::min(max_ops, *db.bounds().end - start_loc);
772 assert_eq!(
773 ops.len() as u64,
774 expected_ops,
775 "Expected {expected_ops} operations, got {}",
776 ops.len(),
777 );
778
779 for (i, op) in ops.iter().enumerate() {
781 let loc = start_loc + i as u64;
782 if loc == 0 {
783 assert!(
784 matches!(op, Operation::Commit(None)),
785 "Expected Initial Commit operation at location {loc}, got {op:?}",
786 );
787 } else if loc <= ELEMENTS {
788 assert!(
790 matches!(op, Operation::Append(_)),
791 "Expected Append operation at location {loc}, got {op:?}",
792 );
793 } else if loc == ELEMENTS + 1 {
794 assert!(
796 matches!(op, Operation::Commit(_)),
797 "Expected Commit operation at location {loc}, got {op:?}",
798 );
799 }
800 }
801
802 let wrong_root = Sha256::hash(&[0xFF; 32]);
804 assert!(
805 !verify_proof(&mut hasher, &proof, Location::new_unchecked(start_loc), &ops, &wrong_root),
806 "Proof should fail with wrong root"
807 );
808
809 if start_loc > 0 {
811 assert!(
812 !verify_proof(&mut hasher, &proof, Location::new_unchecked(start_loc - 1), &ops, &root),
813 "Proof should fail with wrong start location"
814 );
815 }
816 }
817
818 db.destroy().await.unwrap();
819 });
820 }
821
822 #[test_traced("INFO")]
823 pub fn test_keyless_db_proof_with_pruning() {
824 let executor = deterministic::Runner::default();
825 executor.start(|context| async move {
826 let mut hasher = Standard::<Sha256>::new();
827 let db = open_db(context.with_label("db1")).await;
828 let mut db = db.into_mutable();
829
830 const ELEMENTS: u64 = 100;
832 let mut values = Vec::new();
833 for i in 0u64..ELEMENTS {
834 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
835 values.push(v.clone());
836 db.append(v).await.unwrap();
837 }
838 let (durable, _) = db.commit(None).await.unwrap();
839
840 let mut db = durable.into_mutable();
842 for i in ELEMENTS..ELEMENTS * 2 {
843 let v = vec![(i % 255) as u8; ((i % 17) + 5) as usize];
844 values.push(v.clone());
845 db.append(v).await.unwrap();
846 }
847 let (durable, _) = db.commit(None).await.unwrap();
848 let mut db = durable.into_merkleized();
849 let root = db.root();
850
851 println!("last commit loc: {}", db.last_commit_loc());
852
853 const PRUNE_LOC: u64 = 30;
855 db.prune(Location::new_unchecked(PRUNE_LOC)).await.unwrap();
856
857 let oldest_retained = db.journal.bounds().start;
859
860 assert_eq!(
862 db.root(),
863 root,
864 "Root should not change after pruning"
865 );
866
867 db.sync().await.unwrap();
868 drop(db);
869 let mut db = open_db(context.with_label("db2")).await;
870 assert_eq!(db.root(), root);
871 assert_eq!(db.bounds().end, 2 * ELEMENTS + 3);
872 assert!(db.journal.bounds().start <= PRUNE_LOC);
873
874 for i in 0..*oldest_retained {
876 let result = db.get(Location::new_unchecked(i)).await;
877 match result {
879 Ok(None) => {} Ok(Some(_)) => {
881 panic!("Should not be able to get pruned value at location {i}")
882 }
883 Err(_) => {} }
885 }
886
887 let test_cases = vec![
889 (oldest_retained, 10), (Location::new_unchecked(50), 20), (Location::new_unchecked(150), 10), (Location::new_unchecked(190), 15), ];
894
895 for (start_loc, max_ops) in test_cases {
896 if start_loc < oldest_retained {
898 continue;
899 }
900
901 let (proof, ops) = db.proof(start_loc, NZU64!(max_ops)).await.unwrap();
902
903 assert!(
905 verify_proof(&mut hasher, &proof, start_loc, &ops, &root),
906 "Failed to verify proof for range starting at {start_loc} with max {max_ops} ops after pruning",
907 );
908
909 let expected_ops = std::cmp::min(max_ops, *db.bounds().end - *start_loc);
911 assert_eq!(
912 ops.len() as u64,
913 expected_ops,
914 "Expected {expected_ops} operations, got {}",
915 ops.len(),
916 );
917 }
918
919 const AGGRESSIVE_PRUNE: Location = Location::new_unchecked(150);
921 db.prune(AGGRESSIVE_PRUNE).await.unwrap();
922
923 let new_oldest = db.journal.bounds().start;
924 assert!(new_oldest <= AGGRESSIVE_PRUNE);
925
926 let (proof, ops) = db.proof(new_oldest, NZU64!(20)).await.unwrap();
928 assert!(
929 verify_proof(&mut hasher, &proof, new_oldest, &ops, &root),
930 "Proof should still verify after aggressive pruning"
931 );
932
933 let almost_all = db.bounds().end - 5;
935 db.prune(almost_all).await.unwrap();
936
937 let final_oldest = db.journal.bounds().start;
938
939 if final_oldest < db.bounds().end {
941 let (final_proof, final_ops) = db.proof(final_oldest, NZU64!(10)).await.unwrap();
942 assert!(
943 verify_proof(&mut hasher, &final_proof, final_oldest, &final_ops, &root),
944 "Should be able to prove remaining operations after extensive pruning"
945 );
946 }
947
948 db.destroy().await.unwrap();
949 });
950 }
951
952 #[test_traced("WARN")]
953 fn test_keyless_db_replay_with_trailing_appends() {
954 let executor = deterministic::Runner::default();
955 executor.start(|context| async move {
956 let db = open_db(context.with_label("db1")).await;
958 let mut db = db.into_mutable();
959
960 for i in 0..10 {
962 let v = vec![i as u8; 10];
963 db.append(v).await.unwrap();
964 }
965 let (durable, _) = db.commit(None).await.unwrap();
966 let db = durable.into_merkleized();
967 let committed_root = db.root();
968 let committed_size = db.bounds().end;
969
970 let uncommitted_value = vec![99u8; 20];
972 let mut db = db.into_mutable();
973 db.append(uncommitted_value.clone()).await.unwrap();
974
975 drop(db);
977
978 let db = open_db(context.with_label("db2")).await;
980
981 assert_eq!(
983 db.bounds().end,
984 committed_size,
985 "Should rewind to last commit"
986 );
987 assert_eq!(db.root(), committed_root, "Root should match last commit");
988 assert_eq!(
989 db.last_commit_loc(),
990 committed_size - 1,
991 "Last commit location should be correct"
992 );
993
994 let mut db = db.into_mutable();
997 let new_value = vec![77u8; 15];
998 let loc = db.append(new_value.clone()).await.unwrap();
999 assert_eq!(
1000 loc, committed_size,
1001 "New append should get the expected location"
1002 );
1003
1004 assert_eq!(db.get(loc).await.unwrap(), Some(new_value));
1006
1007 let (durable, _) = db.commit(None).await.unwrap();
1009 let db = durable.into_merkleized();
1010 let new_committed_root = db.root();
1011 let new_committed_size = db.bounds().end;
1012
1013 let mut db = db.into_mutable();
1015 for i in 0..5 {
1016 let v = vec![(200 + i) as u8; 10];
1017 db.append(v).await.unwrap();
1018 }
1019
1020 drop(db);
1022
1023 let db = open_db(context.with_label("db3")).await;
1025 assert_eq!(
1026 db.bounds().end,
1027 new_committed_size,
1028 "Should rewind to last commit with multiple trailing appends"
1029 );
1030 assert_eq!(
1031 db.root(),
1032 new_committed_root,
1033 "Root should match last commit after multiple appends"
1034 );
1035 assert_eq!(
1036 db.last_commit_loc(),
1037 new_committed_size - 1,
1038 "Last commit location should be correct after multiple appends"
1039 );
1040
1041 db.destroy().await.unwrap();
1042 });
1043 }
1044
1045 #[test_traced("INFO")]
1046 pub fn test_keyless_db_get_out_of_bounds() {
1047 let executor = deterministic::Runner::default();
1048 executor.start(|context| async move {
1049 let db = open_db(context.clone()).await;
1050
1051 let result = db.get(Location::new_unchecked(0)).await.unwrap();
1053 assert!(result.is_none());
1054
1055 let v1 = vec![1u8; 8];
1057 let v2 = vec![2u8; 8];
1058 let mut db = db.into_mutable();
1059 db.append(v1.clone()).await.unwrap();
1060 db.append(v2.clone()).await.unwrap();
1061 let (durable, _) = db.commit(None).await.unwrap();
1062
1063 assert_eq!(durable.get(Location::new_unchecked(1)).await.unwrap().unwrap(), v1);
1065 assert_eq!(durable.get(Location::new_unchecked(2)).await.unwrap().unwrap(), v2);
1066
1067 let result = durable.get(Location::new_unchecked(3)).await.unwrap();
1069 assert!(result.is_none());
1070
1071 let result = durable.get(Location::new_unchecked(4)).await;
1073 assert!(
1074 matches!(result, Err(Error::LocationOutOfBounds(loc, size)) if loc == Location::new_unchecked(4) && size == Location::new_unchecked(4))
1075 );
1076
1077 let db = durable.into_merkleized();
1078 db.destroy().await.unwrap();
1079 });
1080 }
1081
1082 #[test_traced("INFO")]
1083 pub fn test_keyless_db_prune_beyond_commit() {
1084 let executor = deterministic::Runner::default();
1085 executor.start(|context| async move {
1086 let mut db = open_db(context.clone()).await;
1087
1088 let result = db.prune(Location::new_unchecked(1)).await;
1090 assert!(
1091 matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
1092 if prune_loc == Location::new_unchecked(1) && commit_loc == Location::new_unchecked(0))
1093 );
1094
1095 let v1 = vec![1u8; 8];
1097 let v2 = vec![2u8; 8];
1098 let v3 = vec![3u8; 8];
1099 let mut db = db.into_mutable();
1100 db.append(v1.clone()).await.unwrap();
1101 db.append(v2.clone()).await.unwrap();
1102 let (db, _) = db.commit(None).await.unwrap();
1103 let mut db = db.into_mutable();
1104 db.append(v3.clone()).await.unwrap();
1105
1106 let last_commit = db.last_commit_loc();
1108 assert_eq!(last_commit, Location::new_unchecked(3));
1109
1110 let (durable, _) = db.commit(None).await.unwrap();
1112 let mut db = durable.into_merkleized();
1113 assert!(db.prune(Location::new_unchecked(3)).await.is_ok());
1114
1115 let new_last_commit = db.last_commit_loc();
1117 let beyond = Location::new_unchecked(*new_last_commit + 1);
1118 let result = db.prune(beyond).await;
1119 assert!(
1120 matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
1121 if prune_loc == beyond && commit_loc == new_last_commit)
1122 );
1123
1124 db.destroy().await.unwrap();
1125 });
1126 }
1127
1128 use crate::{
1129 kv::tests::assert_send,
1130 qmdb::store::tests::{assert_log_store, assert_merkleized_store, assert_prunable_store},
1131 };
1132
1133 #[allow(dead_code)]
1134 fn assert_clean_db_futures_are_send(db: &mut CleanDb, loc: Location) {
1135 assert_log_store(db);
1136 assert_prunable_store(db, loc);
1137 assert_merkleized_store(db, loc);
1138 assert_send(db.sync());
1139 assert_send(db.get(loc));
1140 }
1141
1142 #[allow(dead_code)]
1143 fn assert_mutable_db_futures_are_send(db: &mut MutableDb, loc: Location, value: Vec<u8>) {
1144 assert_log_store(db);
1145 assert_send(db.get(loc));
1146 assert_send(db.append(value));
1147 }
1148
1149 #[allow(dead_code)]
1150 fn assert_mutable_db_commit_is_send(db: MutableDb) {
1151 assert_send(db.commit(None));
1152 }
1153}