1use crate::{
5 journal::{
6 authenticated,
7 contiguous::variable::{Config as JournalConfig, Journal as ContiguousJournal},
8 },
9 mmr::{journaled::Config as MmrConfig, Location, Proof},
10 qmdb::{
11 any::VariableValue,
12 operation::Committable,
13 store::{LogStore, MerkleizedStore, PrunableStore},
14 DurabilityState, Durable, Error, MerkleizationState, Merkleized, NonDurable, Unmerkleized,
15 },
16};
17use commonware_cryptography::{DigestOf, Hasher};
18use commonware_parallel::ThreadPool;
19use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage};
20use core::{marker::PhantomData, ops::Range};
21use std::num::{NonZeroU64, NonZeroUsize};
22use tracing::{debug, warn};
23
24mod operation;
25pub use operation::Operation;
26
27#[derive(Clone)]
29pub struct Config<C> {
30 pub mmr_journal_partition: String,
32
33 pub mmr_items_per_blob: NonZeroU64,
35
36 pub mmr_write_buffer: NonZeroUsize,
38
39 pub mmr_metadata_partition: String,
41
42 pub log_partition: String,
44
45 pub log_write_buffer: NonZeroUsize,
47
48 pub log_compression: Option<u8>,
50
51 pub log_codec_config: C,
53
54 pub log_items_per_section: NonZeroU64,
56
57 pub thread_pool: Option<ThreadPool>,
59
60 pub buffer_pool: PoolRef,
62}
63
64type Journal<E, V, H, S> = authenticated::Journal<E, ContiguousJournal<E, Operation<V>>, H, S>;
66
67pub struct Keyless<
69 E: Storage + Clock + Metrics,
70 V: VariableValue,
71 H: Hasher,
72 M: MerkleizationState<DigestOf<H>> = Merkleized<H>,
73 D: DurabilityState = Durable,
74> {
75 journal: Journal<E, V, H, M>,
77
78 last_commit_loc: Location,
80
81 _durability: PhantomData<D>,
83}
84
85impl<
87 E: Storage + Clock + Metrics,
88 V: VariableValue,
89 H: Hasher,
90 M: MerkleizationState<DigestOf<H>>,
91 D: DurabilityState,
92 > Keyless<E, V, H, M, D>
93{
94 pub async fn get(&self, loc: Location) -> Result<Option<V>, Error> {
100 let op_count = self.op_count();
101 if loc >= op_count {
102 return Err(Error::LocationOutOfBounds(loc, op_count));
103 }
104 let op = self.journal.read(loc).await?;
105
106 Ok(op.into_value())
107 }
108
109 pub fn op_count(&self) -> Location {
112 self.journal.size()
113 }
114
115 pub const fn last_commit_loc(&self) -> Location {
117 self.last_commit_loc
118 }
119
120 pub fn oldest_retained_loc(&self) -> Location {
122 self.journal
123 .oldest_retained_loc()
124 .expect("at least one operation should exist")
125 }
126
127 pub fn inactivity_floor_loc(&self) -> Location {
129 self.journal.pruning_boundary()
130 }
131
132 pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
134 let op = self.journal.read(self.last_commit_loc).await?;
135 let Operation::Commit(metadata) = op else {
136 return Ok(None);
137 };
138
139 Ok(metadata)
140 }
141}
142
143impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher>
145 Keyless<E, V, H, Merkleized<H>, Durable>
146{
147 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
150 let mmr_cfg = MmrConfig {
151 journal_partition: cfg.mmr_journal_partition,
152 metadata_partition: cfg.mmr_metadata_partition,
153 items_per_blob: cfg.mmr_items_per_blob,
154 write_buffer: cfg.mmr_write_buffer,
155 thread_pool: cfg.thread_pool,
156 buffer_pool: cfg.buffer_pool.clone(),
157 };
158
159 let journal_cfg = JournalConfig {
160 partition: cfg.log_partition,
161 items_per_section: cfg.log_items_per_section,
162 compression: cfg.log_compression,
163 codec_config: cfg.log_codec_config,
164 buffer_pool: cfg.buffer_pool,
165 write_buffer: cfg.log_write_buffer,
166 };
167
168 let mut journal = Journal::new(context, mmr_cfg, journal_cfg, Operation::is_commit).await?;
169 if journal.size() == 0 {
170 warn!("no operations found in log, creating initial commit");
171 journal.append(Operation::Commit(None)).await?;
172 journal.sync().await?;
173 }
174
175 let last_commit_loc = journal
176 .size()
177 .checked_sub(1)
178 .expect("at least one commit should exist");
179
180 Ok(Self {
181 journal,
182 last_commit_loc,
183 _durability: PhantomData,
184 })
185 }
186
187 pub const fn root(&self) -> H::Digest {
189 self.journal.root()
190 }
191
192 pub async fn proof(
199 &self,
200 start_loc: Location,
201 max_ops: NonZeroU64,
202 ) -> Result<(Proof<H::Digest>, Vec<Operation<V>>), Error> {
203 self.historical_proof(self.op_count(), start_loc, max_ops)
204 .await
205 }
206
207 pub async fn historical_proof(
217 &self,
218 op_count: Location,
219 start_loc: Location,
220 max_ops: NonZeroU64,
221 ) -> Result<(Proof<H::Digest>, Vec<Operation<V>>), Error> {
222 Ok(self
223 .journal
224 .historical_proof(op_count, start_loc, max_ops)
225 .await?)
226 }
227
228 pub async fn prune(&mut self, loc: Location) -> Result<(), Error> {
235 if loc > self.last_commit_loc {
236 return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
237 }
238 self.journal.prune(loc).await?;
239
240 Ok(())
241 }
242
243 pub async fn sync(&mut self) -> Result<(), Error> {
247 self.journal.sync().await.map_err(Into::into)
248 }
249
250 pub async fn destroy(self) -> Result<(), Error> {
252 Ok(self.journal.destroy().await?)
253 }
254
255 pub fn into_mutable(self) -> Keyless<E, V, H, Unmerkleized, NonDurable> {
257 Keyless {
258 journal: self.journal.into_dirty(),
259 last_commit_loc: self.last_commit_loc,
260 _durability: PhantomData,
261 }
262 }
263}
264
265impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher>
267 Keyless<E, V, H, Unmerkleized, NonDurable>
268{
269 pub async fn append(&mut self, value: V) -> Result<Location, Error> {
271 self.journal
272 .append(Operation::Append(value))
273 .await
274 .map_err(Into::into)
275 }
276
277 pub async fn commit(
283 mut self,
284 metadata: Option<V>,
285 ) -> Result<(Keyless<E, V, H, Unmerkleized, Durable>, Range<Location>), Error> {
286 let start_loc = self.last_commit_loc + 1;
287 self.last_commit_loc = self.journal.append(Operation::Commit(metadata)).await?;
288 self.journal.commit().await?;
289 debug!(size = ?self.op_count(), "committed db");
290
291 let op_count = self.op_count();
292 let durable = Keyless {
293 journal: self.journal,
294 last_commit_loc: self.last_commit_loc,
295 _durability: PhantomData,
296 };
297
298 Ok((durable, start_loc..op_count))
299 }
300
301 pub fn into_merkleized(self) -> Keyless<E, V, H, Merkleized<H>, Durable> {
302 Keyless {
303 journal: self.journal.merkleize(),
304 last_commit_loc: self.last_commit_loc,
305 _durability: PhantomData,
306 }
307 }
308}
309
310impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher>
312 Keyless<E, V, H, Unmerkleized, Durable>
313{
314 pub fn into_mutable(self) -> Keyless<E, V, H, Unmerkleized, NonDurable> {
317 Keyless {
318 journal: self.journal,
319 last_commit_loc: self.last_commit_loc,
320 _durability: PhantomData,
321 }
322 }
323
324 pub fn into_merkleized(self) -> Keyless<E, V, H, Merkleized<H>, Durable> {
326 Keyless {
327 journal: self.journal.merkleize(),
328 last_commit_loc: self.last_commit_loc,
329 _durability: PhantomData,
330 }
331 }
332}
333
334impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher, D: DurabilityState> MerkleizedStore
336 for Keyless<E, V, H, Merkleized<H>, D>
337{
338 type Digest = H::Digest;
339 type Operation = Operation<V>;
340
341 fn root(&self) -> Self::Digest {
342 self.journal.root()
343 }
344
345 async fn historical_proof(
346 &self,
347 historical_size: Location,
348 start_loc: Location,
349 max_ops: NonZeroU64,
350 ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
351 Ok(self
352 .journal
353 .historical_proof(historical_size, start_loc, max_ops)
354 .await?)
355 }
356}
357
358impl<
360 E: Storage + Clock + Metrics,
361 V: VariableValue,
362 H: Hasher,
363 M: MerkleizationState<DigestOf<H>>,
364 D: DurabilityState,
365 > LogStore for Keyless<E, V, H, M, D>
366{
367 type Value = V;
368
369 fn is_empty(&self) -> bool {
370 self.op_count() <= 1
373 }
374
375 fn op_count(&self) -> Location {
376 self.op_count()
377 }
378
379 fn inactivity_floor_loc(&self) -> Location {
380 self.inactivity_floor_loc()
381 }
382
383 async fn get_metadata(&self) -> Result<Option<Self::Value>, Error> {
384 self.get_metadata().await
385 }
386}
387
388impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher, D: DurabilityState> PrunableStore
390 for Keyless<E, V, H, Merkleized<H>, D>
391{
392 async fn prune(&mut self, loc: Location) -> Result<(), Error> {
393 if loc > self.last_commit_loc {
394 return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
395 }
396 self.journal.prune(loc).await?;
397 Ok(())
398 }
399}
400
401#[cfg(test)]
402mod test {
403 use super::*;
404 use crate::{mmr::StandardHasher as Standard, qmdb::verify_proof};
405 use commonware_cryptography::Sha256;
406 use commonware_macros::test_traced;
407 use commonware_runtime::{deterministic, Runner as _};
408 use commonware_utils::{NZUsize, NZU16, NZU64};
409 use rand::Rng;
410 use std::num::NonZeroU16;
411
412 const PAGE_SIZE: NonZeroU16 = NZU16!(101);
414 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(11);
415
416 fn db_config(suffix: &str) -> Config<(commonware_codec::RangeCfg<usize>, ())> {
417 Config {
418 mmr_journal_partition: format!("journal_{suffix}"),
419 mmr_metadata_partition: format!("metadata_{suffix}"),
420 mmr_items_per_blob: NZU64!(11),
421 mmr_write_buffer: NZUsize!(1024),
422 log_partition: format!("log_journal_{suffix}"),
423 log_write_buffer: NZUsize!(1024),
424 log_compression: None,
425 log_codec_config: ((0..=10000).into(), ()),
426 log_items_per_section: NZU64!(7),
427 thread_pool: None,
428 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
429 }
430 }
431
432 type CleanDb = Keyless<deterministic::Context, Vec<u8>, Sha256, Merkleized<Sha256>, Durable>;
434
435 type MutableDb = Keyless<deterministic::Context, Vec<u8>, Sha256, Unmerkleized, NonDurable>;
437
438 async fn open_db(context: deterministic::Context) -> CleanDb {
440 CleanDb::init(context, db_config("partition"))
441 .await
442 .unwrap()
443 }
444
445 #[test_traced("INFO")]
446 pub fn test_keyless_db_empty() {
447 let executor = deterministic::Runner::default();
448 executor.start(|context| async move {
449 let db = open_db(context.clone()).await;
450 assert_eq!(db.op_count(), 1); assert_eq!(db.oldest_retained_loc(), Location::new_unchecked(0));
452
453 assert_eq!(db.get_metadata().await.unwrap(), None);
454 assert_eq!(db.last_commit_loc(), Location::new_unchecked(0));
455
456 let v1 = vec![1u8; 8];
458 let root = db.root();
459 let mut db = db.into_mutable();
460 db.append(v1).await.unwrap();
461 drop(db); let db = open_db(context.clone()).await;
463 assert_eq!(db.root(), root);
464 assert_eq!(db.op_count(), 1);
465 assert_eq!(db.get_metadata().await.unwrap(), None);
466
467 let metadata = vec![3u8; 10];
469 let db = db.into_mutable();
470 let (durable, _) = db.commit(Some(metadata.clone())).await.unwrap();
471 let db = durable.into_merkleized();
472 assert_eq!(db.op_count(), 2); assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
474 assert_eq!(
475 db.get(Location::new_unchecked(1)).await.unwrap(),
476 Some(metadata.clone())
477 ); let root = db.root();
479
480 let db = open_db(context.clone()).await;
482 assert_eq!(db.op_count(), 2); assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
484 assert_eq!(db.root(), root);
485 assert_eq!(db.last_commit_loc(), Location::new_unchecked(1));
486
487 db.destroy().await.unwrap();
488 });
489 }
490
491 #[test_traced("WARN")]
492 pub fn test_keyless_db_build_basic() {
493 let executor = deterministic::Runner::default();
494 executor.start(|context| async move {
495 let db = open_db(context.clone()).await;
497 let mut db = db.into_mutable();
498
499 let v1 = vec![1u8; 8];
500 let v2 = vec![2u8; 20];
501
502 let loc1 = db.append(v1.clone()).await.unwrap();
503 assert_eq!(db.get(loc1).await.unwrap().unwrap(), v1);
504
505 let loc2 = db.append(v2.clone()).await.unwrap();
506 assert_eq!(db.get(loc2).await.unwrap().unwrap(), v2);
507
508 let (durable, _) = db.commit(None).await.unwrap();
510 let mut db = durable.into_merkleized();
511 assert_eq!(db.op_count(), 4); assert_eq!(db.get_metadata().await.unwrap(), None);
513 assert_eq!(db.get(Location::new_unchecked(3)).await.unwrap(), None); let root = db.root();
515 db.sync().await.unwrap();
516 drop(db);
517 let db = open_db(context.clone()).await;
518 assert_eq!(db.op_count(), 4);
519 assert_eq!(db.root(), root);
520
521 assert_eq!(db.get(loc1).await.unwrap().unwrap(), v1);
522 assert_eq!(db.get(loc2).await.unwrap().unwrap(), v2);
523
524 let mut db = db.into_mutable();
525 db.append(v2).await.unwrap();
526 db.append(v1).await.unwrap();
527
528 drop(db); let db = open_db(context.clone()).await;
531 assert_eq!(db.op_count(), 4);
532 assert_eq!(db.root(), root);
533
534 drop(db);
536 let db = open_db(context.clone()).await;
537 assert_eq!(db.op_count(), 4);
538 assert_eq!(db.root(), root);
539
540 db.destroy().await.unwrap();
541 });
542 }
543
544 async fn append_elements<T: Rng>(db: &mut MutableDb, rng: &mut T, num_elements: usize) {
546 for _ in 0..num_elements {
547 let value = vec![(rng.next_u32() % 255) as u8, (rng.next_u32() % 255) as u8];
548 db.append(value).await.unwrap();
549 }
550 }
551
552 #[test_traced("WARN")]
553 pub fn test_keyless_db_recovery() {
554 let executor = deterministic::Runner::default();
555 const ELEMENTS: usize = 1000;
556 executor.start(|mut context| async move {
557 let db = open_db(context.clone()).await;
558 let root = db.root();
559 let mut db = db.into_mutable();
560
561 append_elements(&mut db, &mut context, ELEMENTS).await;
562
563 drop(db);
565 let db = open_db(context.clone()).await;
567 assert_eq!(root, db.root());
568
569 let mut db = db.into_mutable();
571 append_elements(&mut db, &mut context, ELEMENTS).await;
572 let (durable, _) = db.commit(None).await.unwrap();
573 let db = durable.into_merkleized();
574 let root = db.root();
575
576 let mut db = db.into_mutable();
578 append_elements(&mut db, &mut context, ELEMENTS).await;
579
580 drop(db);
582 let db = open_db(context.clone()).await;
584 assert_eq!(root, db.root());
585
586 let mut db = db.into_mutable();
588 append_elements(&mut db, &mut context, ELEMENTS).await;
589 let (durable, _) = db.commit(None).await.unwrap();
590 let db = durable.into_merkleized();
591 let root = db.root();
592
593 drop(db);
595 let db = open_db(context.clone()).await;
596 assert_eq!(db.op_count(), 2 * ELEMENTS as u64 + 3);
597 assert_eq!(db.root(), root);
598
599 db.destroy().await.unwrap();
600 });
601 }
602
603 #[test_traced("WARN")]
606 fn test_keyless_db_non_empty_db_recovery() {
607 let executor = deterministic::Runner::default();
608 executor.start(|mut context| async move {
609 let db = open_db(context.clone()).await;
610
611 const ELEMENTS: usize = 200;
613 let mut db = db.into_mutable();
614 append_elements(&mut db, &mut context, ELEMENTS).await;
615 let (durable, _) = db.commit(None).await.unwrap();
616 let db = durable.into_merkleized();
617 let root = db.root();
618 let op_count = db.op_count();
619
620 let db = open_db(context.clone()).await;
622 assert_eq!(db.op_count(), op_count);
623 assert_eq!(db.root(), root);
624 assert_eq!(db.last_commit_loc(), op_count - 1);
625 drop(db);
626
627 async fn recover_from_failure(
629 mut context: deterministic::Context,
630 root: <Sha256 as Hasher>::Digest,
631 op_count: Location,
632 ) {
633 let mut db = open_db(context.clone()).await.into_mutable();
634
635 append_elements(&mut db, &mut context, ELEMENTS).await;
637 drop(db);
638 let db = open_db(context.clone()).await;
639 assert_eq!(db.op_count(), op_count);
640 assert_eq!(db.root(), root);
641 }
642
643 recover_from_failure(context.clone(), root, op_count).await;
644
645 let mut db = open_db(context.clone()).await;
647 db.prune(db.last_commit_loc()).await.unwrap();
648 assert_eq!(db.op_count(), op_count);
649 assert_eq!(db.root(), root);
650 db.sync().await.unwrap();
651 drop(db);
652
653 recover_from_failure(context.clone(), root, op_count).await;
654
655 let mut db = open_db(context.clone()).await.into_mutable();
657 append_elements(&mut db, &mut context, ELEMENTS).await;
658 let (_durable, _) = db.commit(None).await.unwrap();
659 let db = open_db(context.clone()).await;
660 assert!(db.op_count() > op_count);
661 assert_ne!(db.root(), root);
662 assert_eq!(db.last_commit_loc(), db.op_count() - 1);
663
664 db.destroy().await.unwrap();
665 });
666 }
667
668 #[test_traced("WARN")]
671 fn test_keyless_db_empty_db_recovery() {
672 const ELEMENTS: u64 = 1000;
673 let executor = deterministic::Runner::default();
674 executor.start(|context| async move {
675 let db = open_db(context.clone()).await;
676 let root = db.root();
677
678 let db = open_db(context.clone()).await;
680 assert_eq!(db.op_count(), 1); assert_eq!(db.root(), root);
682
683 async fn apply_ops(db: &mut MutableDb) {
684 for i in 0..ELEMENTS {
685 let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
686 db.append(v).await.unwrap();
687 }
688 }
689
690 let mut db = db.into_mutable();
692 apply_ops(&mut db).await;
693 drop(db);
694 let db = open_db(context.clone()).await;
695 assert_eq!(db.op_count(), 1); assert_eq!(db.root(), root);
697
698 let mut db = db.into_mutable();
700 apply_ops(&mut db).await;
701 drop(db);
702 let db = open_db(context.clone()).await;
703 assert_eq!(db.op_count(), 1); assert_eq!(db.root(), root);
705
706 let mut db = db.into_mutable();
708 apply_ops(&mut db).await;
709 apply_ops(&mut db).await;
710 apply_ops(&mut db).await;
711 drop(db);
712 let db = open_db(context.clone()).await;
713 assert_eq!(db.op_count(), 1); assert_eq!(db.root(), root);
715 assert_eq!(db.last_commit_loc(), Location::new_unchecked(0));
716
717 let mut db = db.into_mutable();
719 apply_ops(&mut db).await;
720 let (_db, _) = db.commit(None).await.unwrap();
721 let db = open_db(context.clone()).await;
722 assert!(db.op_count() > 1);
723 assert_ne!(db.root(), root);
724
725 db.destroy().await.unwrap();
726 });
727 }
728
729 #[test_traced("INFO")]
730 pub fn test_keyless_db_proof_generation_and_verification() {
731 let executor = deterministic::Runner::default();
732 executor.start(|context| async move {
733 let mut hasher = Standard::<Sha256>::new();
734 let db = open_db(context.clone()).await;
735 let mut db = db.into_mutable();
736
737 const ELEMENTS: u64 = 100;
739 let mut values = Vec::new();
740 for i in 0u64..ELEMENTS {
741 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
742 values.push(v.clone());
743 db.append(v).await.unwrap();
744 }
745 let (durable, _) = db.commit(None).await.unwrap();
746 let db = durable.into_merkleized();
747
748 assert!(matches!(
750 db.historical_proof(db.op_count() + 1, Location::new_unchecked(5), NZU64!(10))
751 .await,
752 Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
753 ));
754
755 let root = db.root();
756
757 let test_cases = vec![
759 (0, 10), (10, 5), (50, 20), (90, 15), (0, 1), (ELEMENTS - 1, 1), (ELEMENTS, 1), ];
767
768 for (start_loc, max_ops) in test_cases {
769 let (proof, ops) = db.proof(Location::new_unchecked(start_loc), NZU64!(max_ops)).await.unwrap();
770
771 assert!(
773 verify_proof(&mut hasher, &proof, Location::new_unchecked(start_loc), &ops, &root),
774 "Failed to verify proof for range starting at {start_loc} with max {max_ops} ops",
775 );
776
777 let expected_ops = std::cmp::min(max_ops, *db.op_count() - start_loc);
779 assert_eq!(
780 ops.len() as u64,
781 expected_ops,
782 "Expected {expected_ops} operations, got {}",
783 ops.len(),
784 );
785
786 for (i, op) in ops.iter().enumerate() {
788 let loc = start_loc + i as u64;
789 if loc == 0 {
790 assert!(
791 matches!(op, Operation::Commit(None)),
792 "Expected Initial Commit operation at location {loc}, got {op:?}",
793 );
794 } else if loc <= ELEMENTS {
795 assert!(
797 matches!(op, Operation::Append(_)),
798 "Expected Append operation at location {loc}, got {op:?}",
799 );
800 } else if loc == ELEMENTS + 1 {
801 assert!(
803 matches!(op, Operation::Commit(_)),
804 "Expected Commit operation at location {loc}, got {op:?}",
805 );
806 }
807 }
808
809 let wrong_root = Sha256::hash(&[0xFF; 32]);
811 assert!(
812 !verify_proof(&mut hasher, &proof, Location::new_unchecked(start_loc), &ops, &wrong_root),
813 "Proof should fail with wrong root"
814 );
815
816 if start_loc > 0 {
818 assert!(
819 !verify_proof(&mut hasher, &proof, Location::new_unchecked(start_loc - 1), &ops, &root),
820 "Proof should fail with wrong start location"
821 );
822 }
823 }
824
825 db.destroy().await.unwrap();
826 });
827 }
828
829 #[test_traced("INFO")]
830 pub fn test_keyless_db_proof_with_pruning() {
831 let executor = deterministic::Runner::default();
832 executor.start(|context| async move {
833 let mut hasher = Standard::<Sha256>::new();
834 let db = open_db(context.clone()).await;
835 let mut db = db.into_mutable();
836
837 const ELEMENTS: u64 = 100;
839 let mut values = Vec::new();
840 for i in 0u64..ELEMENTS {
841 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
842 values.push(v.clone());
843 db.append(v).await.unwrap();
844 }
845 let (durable, _) = db.commit(None).await.unwrap();
846
847 let mut db = durable.into_mutable();
849 for i in ELEMENTS..ELEMENTS * 2 {
850 let v = vec![(i % 255) as u8; ((i % 17) + 5) as usize];
851 values.push(v.clone());
852 db.append(v).await.unwrap();
853 }
854 let (durable, _) = db.commit(None).await.unwrap();
855 let mut db = durable.into_merkleized();
856 let root = db.root();
857
858 println!("last commit loc: {}", db.last_commit_loc());
859
860 const PRUNE_LOC: u64 = 30;
862 db.prune(Location::new_unchecked(PRUNE_LOC)).await.unwrap();
863
864 let oldest_retained = db.oldest_retained_loc();
866
867 assert_eq!(
869 db.root(),
870 root,
871 "Root should not change after pruning"
872 );
873
874 db.sync().await.unwrap();
875 drop(db);
876 let mut db = open_db(context.clone()).await;
877 assert_eq!(db.root(), root);
878 assert_eq!(db.op_count(), 2 * ELEMENTS + 3);
879 assert!(db.oldest_retained_loc() <= PRUNE_LOC);
880
881 for i in 0..*oldest_retained {
883 let result = db.get(Location::new_unchecked(i)).await;
884 match result {
886 Ok(None) => {} Ok(Some(_)) => {
888 panic!("Should not be able to get pruned value at location {i}")
889 }
890 Err(_) => {} }
892 }
893
894 let test_cases = vec![
896 (oldest_retained, 10), (Location::new_unchecked(50), 20), (Location::new_unchecked(150), 10), (Location::new_unchecked(190), 15), ];
901
902 for (start_loc, max_ops) in test_cases {
903 if start_loc < oldest_retained {
905 continue;
906 }
907
908 let (proof, ops) = db.proof(start_loc, NZU64!(max_ops)).await.unwrap();
909
910 assert!(
912 verify_proof(&mut hasher, &proof, start_loc, &ops, &root),
913 "Failed to verify proof for range starting at {start_loc} with max {max_ops} ops after pruning",
914 );
915
916 let expected_ops = std::cmp::min(max_ops, *db.op_count() - *start_loc);
918 assert_eq!(
919 ops.len() as u64,
920 expected_ops,
921 "Expected {expected_ops} operations, got {}",
922 ops.len(),
923 );
924 }
925
926 const AGGRESSIVE_PRUNE: Location = Location::new_unchecked(150);
928 db.prune(AGGRESSIVE_PRUNE).await.unwrap();
929
930 let new_oldest = db.oldest_retained_loc();
931 assert!(new_oldest <= AGGRESSIVE_PRUNE);
932
933 let (proof, ops) = db.proof(new_oldest, NZU64!(20)).await.unwrap();
935 assert!(
936 verify_proof(&mut hasher, &proof, new_oldest, &ops, &root),
937 "Proof should still verify after aggressive pruning"
938 );
939
940 let almost_all = db.op_count() - 5;
942 db.prune(almost_all).await.unwrap();
943
944 let final_oldest = db.oldest_retained_loc();
945
946 if final_oldest < db.op_count() {
948 let (final_proof, final_ops) = db.proof(final_oldest, NZU64!(10)).await.unwrap();
949 assert!(
950 verify_proof(&mut hasher, &final_proof, final_oldest, &final_ops, &root),
951 "Should be able to prove remaining operations after extensive pruning"
952 );
953 }
954
955 db.destroy().await.unwrap();
956 });
957 }
958
959 #[test_traced("WARN")]
960 fn test_keyless_db_replay_with_trailing_appends() {
961 let executor = deterministic::Runner::default();
962 executor.start(|context| async move {
963 let db = open_db(context.clone()).await;
965 let mut db = db.into_mutable();
966
967 for i in 0..10 {
969 let v = vec![i as u8; 10];
970 db.append(v).await.unwrap();
971 }
972 let (durable, _) = db.commit(None).await.unwrap();
973 let db = durable.into_merkleized();
974 let committed_root = db.root();
975 let committed_size = db.op_count();
976
977 let uncommitted_value = vec![99u8; 20];
979 let mut db = db.into_mutable();
980 db.append(uncommitted_value.clone()).await.unwrap();
981
982 drop(db);
984
985 let db = open_db(context.clone()).await;
987
988 assert_eq!(
990 db.op_count(),
991 committed_size,
992 "Should rewind to last commit"
993 );
994 assert_eq!(db.root(), committed_root, "Root should match last commit");
995 assert_eq!(
996 db.last_commit_loc(),
997 committed_size - 1,
998 "Last commit location should be correct"
999 );
1000
1001 let mut db = db.into_mutable();
1004 let new_value = vec![77u8; 15];
1005 let loc = db.append(new_value.clone()).await.unwrap();
1006 assert_eq!(
1007 loc, committed_size,
1008 "New append should get the expected location"
1009 );
1010
1011 assert_eq!(db.get(loc).await.unwrap(), Some(new_value));
1013
1014 let (durable, _) = db.commit(None).await.unwrap();
1016 let db = durable.into_merkleized();
1017 let new_committed_root = db.root();
1018 let new_committed_size = db.op_count();
1019
1020 let mut db = db.into_mutable();
1022 for i in 0..5 {
1023 let v = vec![(200 + i) as u8; 10];
1024 db.append(v).await.unwrap();
1025 }
1026
1027 drop(db);
1029
1030 let db = open_db(context.clone()).await;
1032 assert_eq!(
1033 db.op_count(),
1034 new_committed_size,
1035 "Should rewind to last commit with multiple trailing appends"
1036 );
1037 assert_eq!(
1038 db.root(),
1039 new_committed_root,
1040 "Root should match last commit after multiple appends"
1041 );
1042 assert_eq!(
1043 db.last_commit_loc(),
1044 new_committed_size - 1,
1045 "Last commit location should be correct after multiple appends"
1046 );
1047
1048 db.destroy().await.unwrap();
1049 });
1050 }
1051
1052 #[test_traced("INFO")]
1053 pub fn test_keyless_db_get_out_of_bounds() {
1054 let executor = deterministic::Runner::default();
1055 executor.start(|context| async move {
1056 let db = open_db(context.clone()).await;
1057
1058 let result = db.get(Location::new_unchecked(0)).await.unwrap();
1060 assert!(result.is_none());
1061
1062 let v1 = vec![1u8; 8];
1064 let v2 = vec![2u8; 8];
1065 let mut db = db.into_mutable();
1066 db.append(v1.clone()).await.unwrap();
1067 db.append(v2.clone()).await.unwrap();
1068 let (durable, _) = db.commit(None).await.unwrap();
1069
1070 assert_eq!(durable.get(Location::new_unchecked(1)).await.unwrap().unwrap(), v1);
1072 assert_eq!(durable.get(Location::new_unchecked(2)).await.unwrap().unwrap(), v2);
1073
1074 let result = durable.get(Location::new_unchecked(3)).await.unwrap();
1076 assert!(result.is_none());
1077
1078 let result = durable.get(Location::new_unchecked(4)).await;
1080 assert!(
1081 matches!(result, Err(Error::LocationOutOfBounds(loc, size)) if loc == Location::new_unchecked(4) && size == Location::new_unchecked(4))
1082 );
1083
1084 let db = durable.into_merkleized();
1085 db.destroy().await.unwrap();
1086 });
1087 }
1088
1089 #[test_traced("INFO")]
1090 pub fn test_keyless_db_prune_beyond_commit() {
1091 let executor = deterministic::Runner::default();
1092 executor.start(|context| async move {
1093 let mut db = open_db(context.clone()).await;
1094
1095 let result = db.prune(Location::new_unchecked(1)).await;
1097 assert!(
1098 matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
1099 if prune_loc == Location::new_unchecked(1) && commit_loc == Location::new_unchecked(0))
1100 );
1101
1102 let v1 = vec![1u8; 8];
1104 let v2 = vec![2u8; 8];
1105 let v3 = vec![3u8; 8];
1106 let mut db = db.into_mutable();
1107 db.append(v1.clone()).await.unwrap();
1108 db.append(v2.clone()).await.unwrap();
1109 let (db, _) = db.commit(None).await.unwrap();
1110 let mut db = db.into_mutable();
1111 db.append(v3.clone()).await.unwrap();
1112
1113 let last_commit = db.last_commit_loc();
1115 assert_eq!(last_commit, Location::new_unchecked(3));
1116
1117 let (durable, _) = db.commit(None).await.unwrap();
1119 let mut db = durable.into_merkleized();
1120 assert!(db.prune(Location::new_unchecked(3)).await.is_ok());
1121
1122 let new_last_commit = db.last_commit_loc();
1124 let beyond = Location::new_unchecked(*new_last_commit + 1);
1125 let result = db.prune(beyond).await;
1126 assert!(
1127 matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
1128 if prune_loc == beyond && commit_loc == new_last_commit)
1129 );
1130
1131 db.destroy().await.unwrap();
1132 });
1133 }
1134
1135 use crate::{
1136 kv::tests::assert_send,
1137 qmdb::store::tests::{assert_log_store, assert_merkleized_store, assert_prunable_store},
1138 };
1139
1140 #[allow(dead_code)]
1141 fn assert_clean_db_futures_are_send(db: &mut CleanDb, loc: Location) {
1142 assert_log_store(db);
1143 assert_prunable_store(db, loc);
1144 assert_merkleized_store(db, loc);
1145 assert_send(db.sync());
1146 assert_send(db.get(loc));
1147 }
1148
1149 #[allow(dead_code)]
1150 fn assert_mutable_db_futures_are_send(db: &mut MutableDb, loc: Location, value: Vec<u8>) {
1151 assert_log_store(db);
1152 assert_send(db.get(loc));
1153 assert_send(db.append(value));
1154 }
1155
1156 #[allow(dead_code)]
1157 fn assert_mutable_db_commit_is_send(db: MutableDb) {
1158 assert_send(db.commit(None));
1159 }
1160}