1use crate::{
11 journal::{
12 authenticated,
13 contiguous::variable::{Config as JournalConfig, Journal as ContiguousJournal},
14 },
15 mmr::{
16 journaled::Config as MmrConfig,
17 mem::{Clean, Dirty, State},
18 Location, Proof,
19 },
20 qmdb::{any::VariableValue, operation::Committable, Error},
21};
22use commonware_cryptography::{DigestOf, Hasher};
23use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage, ThreadPool};
24use core::ops::Range;
25use std::num::{NonZeroU64, NonZeroUsize};
26use tracing::{debug, warn};
27
28mod operation;
29pub use operation::Operation;
30
31#[derive(Clone)]
33pub struct Config<C> {
34 pub mmr_journal_partition: String,
36
37 pub mmr_items_per_blob: NonZeroU64,
39
40 pub mmr_write_buffer: NonZeroUsize,
42
43 pub mmr_metadata_partition: String,
45
46 pub log_partition: String,
48
49 pub log_write_buffer: NonZeroUsize,
51
52 pub log_compression: Option<u8>,
54
55 pub log_codec_config: C,
57
58 pub log_items_per_section: NonZeroU64,
60
61 pub thread_pool: Option<ThreadPool>,
63
64 pub buffer_pool: PoolRef,
66}
67
68type Journal<E, V, H, S> = authenticated::Journal<E, ContiguousJournal<E, Operation<V>>, H, S>;
70
71pub struct Keyless<
72 E: Storage + Clock + Metrics,
73 V: VariableValue,
74 H: Hasher,
75 S: State<DigestOf<H>> = Dirty,
76> {
77 journal: Journal<E, V, H, S>,
79
80 last_commit_loc: Location,
82}
83
84impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher, S: State<DigestOf<H>>>
85 Keyless<E, V, H, S>
86{
87 pub async fn get(&self, loc: Location) -> Result<Option<V>, Error> {
93 let op_count = self.op_count();
94 if loc >= op_count {
95 return Err(Error::LocationOutOfBounds(loc, op_count));
96 }
97 let op = self.journal.read(loc).await?;
98
99 Ok(op.into_value())
100 }
101
102 pub fn op_count(&self) -> Location {
105 self.journal.size()
106 }
107
108 pub const fn last_commit_loc(&self) -> Location {
110 self.last_commit_loc
111 }
112
113 pub fn oldest_retained_loc(&self) -> Location {
115 self.journal
116 .oldest_retained_loc()
117 .expect("at least one operation should exist")
118 }
119
120 pub async fn append(&mut self, value: V) -> Result<Location, Error> {
122 self.journal
123 .append(Operation::Append(value))
124 .await
125 .map_err(Into::into)
126 }
127
128 pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
130 let op = self.journal.read(self.last_commit_loc).await?;
131 let Operation::Commit(metadata) = op else {
132 return Ok(None);
133 };
134
135 Ok(metadata)
136 }
137}
138
139impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher> Keyless<E, V, H, Clean<H::Digest>> {
140 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
143 let mmr_cfg = MmrConfig {
144 journal_partition: cfg.mmr_journal_partition,
145 metadata_partition: cfg.mmr_metadata_partition,
146 items_per_blob: cfg.mmr_items_per_blob,
147 write_buffer: cfg.mmr_write_buffer,
148 thread_pool: cfg.thread_pool,
149 buffer_pool: cfg.buffer_pool.clone(),
150 };
151
152 let journal_cfg = JournalConfig {
153 partition: cfg.log_partition,
154 items_per_section: cfg.log_items_per_section,
155 compression: cfg.log_compression,
156 codec_config: cfg.log_codec_config,
157 buffer_pool: cfg.buffer_pool,
158 write_buffer: cfg.log_write_buffer,
159 };
160
161 let mut journal = Journal::new(context, mmr_cfg, journal_cfg, Operation::is_commit).await?;
162 if journal.size() == 0 {
163 warn!("no operations found in log, creating initial commit");
164 journal.append(Operation::Commit(None)).await?;
165 journal.sync().await?;
166 }
167
168 let last_commit_loc = journal
169 .size()
170 .checked_sub(1)
171 .expect("at least one commit should exist");
172
173 Ok(Self {
174 journal,
175 last_commit_loc,
176 })
177 }
178
179 pub const fn root(&self) -> H::Digest {
181 self.journal.root()
182 }
183
184 pub async fn proof(
191 &self,
192 start_loc: Location,
193 max_ops: NonZeroU64,
194 ) -> Result<(Proof<H::Digest>, Vec<Operation<V>>), Error> {
195 self.historical_proof(self.op_count(), start_loc, max_ops)
196 .await
197 }
198
199 pub async fn historical_proof(
209 &self,
210 op_count: Location,
211 start_loc: Location,
212 max_ops: NonZeroU64,
213 ) -> Result<(Proof<H::Digest>, Vec<Operation<V>>), Error> {
214 Ok(self
215 .journal
216 .historical_proof(op_count, start_loc, max_ops)
217 .await?)
218 }
219
220 pub async fn commit(&mut self, metadata: Option<V>) -> Result<Range<Location>, Error> {
228 let start_loc = self.last_commit_loc + 1;
229 self.last_commit_loc = self.journal.append(Operation::Commit(metadata)).await?;
230 self.journal.commit().await?;
231 debug!(size = ?self.op_count(), "committed db");
232
233 Ok(start_loc..self.op_count())
234 }
235
236 pub async fn prune(&mut self, loc: Location) -> Result<(), Error> {
243 if loc > self.last_commit_loc {
244 return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
245 }
246 self.journal.prune(loc).await?;
247
248 Ok(())
249 }
250
251 pub async fn sync(&mut self) -> Result<(), Error> {
255 self.journal.sync().await.map_err(Into::into)
256 }
257
258 pub async fn close(self) -> Result<(), Error> {
260 Ok(self.journal.close().await?)
261 }
262
263 pub async fn destroy(self) -> Result<(), Error> {
265 Ok(self.journal.destroy().await?)
266 }
267
268 #[cfg(any(test, feature = "fuzzing"))]
269 pub async fn simulate_failure(mut self, sync_log: bool, sync_mmr: bool) -> Result<(), Error> {
271 if sync_log {
272 self.journal.journal.sync().await.map_err(Error::Journal)?;
273 }
274 if sync_mmr {
275 self.journal.mmr.sync().await.map_err(Error::Mmr)?;
276 }
277
278 Ok(())
279 }
280
281 #[cfg(test)]
282 pub(super) async fn simulate_prune_failure(mut self, loc: Location) -> Result<(), Error> {
284 if loc > self.last_commit_loc {
285 return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
286 }
287 self.journal.mmr.sync().await.map_err(Error::Mmr)?;
289 assert!(
290 self.journal
291 .journal
292 .prune(*loc)
293 .await
294 .map_err(Error::Journal)?,
295 "nothing was pruned, so could not simulate failure"
296 );
297
298 Ok(())
300 }
301
302 pub fn into_dirty(self) -> Keyless<E, V, H, Dirty> {
304 Keyless {
305 journal: self.journal.into_dirty(),
306 last_commit_loc: self.last_commit_loc,
307 }
308 }
309}
310
311impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher> Keyless<E, V, H, Dirty> {
312 pub fn merkleize(self) -> Keyless<E, V, H, Clean<H::Digest>> {
314 Keyless {
315 journal: self.journal.merkleize(),
316 last_commit_loc: self.last_commit_loc,
317 }
318 }
319}
320
321impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher, S: State<DigestOf<H>>>
322 crate::qmdb::store::LogStore for Keyless<E, V, H, S>
323{
324 type Value = V;
325
326 fn op_count(&self) -> Location {
327 self.op_count()
328 }
329
330 fn inactivity_floor_loc(&self) -> Location {
332 self.journal.pruning_boundary()
333 }
334
335 fn is_empty(&self) -> bool {
336 self.op_count() == 0
337 }
338
339 async fn get_metadata(&self) -> Result<Option<V>, Error> {
340 self.get_metadata().await
341 }
342}
343
344impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher> crate::qmdb::store::CleanStore
345 for Keyless<E, V, H, Clean<H::Digest>>
346{
347 type Digest = H::Digest;
348 type Operation = Operation<V>;
349 type Dirty = Keyless<E, V, H, Dirty>;
350
351 fn root(&self) -> Self::Digest {
352 self.root()
353 }
354
355 async fn proof(
356 &self,
357 start_loc: Location,
358 max_ops: NonZeroU64,
359 ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
360 self.proof(start_loc, max_ops).await
361 }
362
363 async fn historical_proof(
364 &self,
365 historical_size: Location,
366 start_loc: Location,
367 max_ops: NonZeroU64,
368 ) -> Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error> {
369 self.historical_proof(historical_size, start_loc, max_ops)
370 .await
371 }
372
373 fn into_dirty(self) -> Self::Dirty {
374 self.into_dirty()
375 }
376}
377
378impl<E: Storage + Clock + Metrics, V: VariableValue, H: Hasher> crate::qmdb::store::DirtyStore
379 for Keyless<E, V, H, Dirty>
380{
381 type Digest = H::Digest;
382 type Operation = Operation<V>;
383 type Clean = Keyless<E, V, H, Clean<H::Digest>>;
384
385 async fn merkleize(self) -> Result<Self::Clean, Error> {
386 Ok(self.merkleize())
387 }
388}
389
390#[cfg(test)]
391mod test {
392 use super::*;
393 use crate::{mmr::StandardHasher as Standard, qmdb::verify_proof};
394 use commonware_cryptography::Sha256;
395 use commonware_macros::test_traced;
396 use commonware_runtime::{deterministic, Runner as _};
397 use commonware_utils::{NZUsize, NZU64};
398 use rand::Rng;
399
400 const PAGE_SIZE: usize = 101;
402 const PAGE_CACHE_SIZE: usize = 11;
403
404 fn db_config(suffix: &str) -> Config<(commonware_codec::RangeCfg<usize>, ())> {
405 Config {
406 mmr_journal_partition: format!("journal_{suffix}"),
407 mmr_metadata_partition: format!("metadata_{suffix}"),
408 mmr_items_per_blob: NZU64!(11),
409 mmr_write_buffer: NZUsize!(1024),
410 log_partition: format!("log_journal_{suffix}"),
411 log_write_buffer: NZUsize!(1024),
412 log_compression: None,
413 log_codec_config: ((0..=10000).into(), ()),
414 log_items_per_section: NZU64!(7),
415 thread_pool: None,
416 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
417 }
418 }
419
420 type Db = Keyless<deterministic::Context, Vec<u8>, Sha256, Clean<<Sha256 as Hasher>::Digest>>;
422
423 async fn open_db(context: deterministic::Context) -> Db {
425 Db::init(context, db_config("partition")).await.unwrap()
426 }
427
428 #[test_traced("INFO")]
429 pub fn test_keyless_db_empty() {
430 let executor = deterministic::Runner::default();
431 executor.start(|context| async move {
432 let mut db = open_db(context.clone()).await;
433 assert_eq!(db.op_count(), 1); assert_eq!(db.oldest_retained_loc(), Location::new_unchecked(0));
435
436 assert_eq!(db.get_metadata().await.unwrap(), None);
437 assert_eq!(db.last_commit_loc(), Location::new_unchecked(0));
438
439 let v1 = vec![1u8; 8];
441 let root = db.root();
442 db.append(v1).await.unwrap();
443 db.close().await.unwrap();
444 let mut db = open_db(context.clone()).await;
445 assert_eq!(db.root(), root);
446 assert_eq!(db.op_count(), 1);
447 assert_eq!(db.get_metadata().await.unwrap(), None);
448
449 let metadata = vec![3u8; 10];
451 db.commit(Some(metadata.clone())).await.unwrap();
452 assert_eq!(db.op_count(), 2); assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
454 assert_eq!(
455 db.get(Location::new_unchecked(1)).await.unwrap(),
456 Some(metadata.clone())
457 ); let root = db.root();
459
460 let db = open_db(context.clone()).await;
462 assert_eq!(db.op_count(), 2); assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
464 assert_eq!(db.root(), root);
465 assert_eq!(db.last_commit_loc(), Location::new_unchecked(1));
466
467 db.destroy().await.unwrap();
468 });
469 }
470
471 #[test_traced("WARN")]
472 pub fn test_keyless_db_build_basic() {
473 let executor = deterministic::Runner::default();
474 executor.start(|context| async move {
475 let mut db = open_db(context.clone()).await;
477
478 let v1 = vec![1u8; 8];
479 let v2 = vec![2u8; 20];
480
481 let loc1 = db.append(v1.clone()).await.unwrap();
482 assert_eq!(db.get(loc1).await.unwrap().unwrap(), v1);
483
484 let loc2 = db.append(v2.clone()).await.unwrap();
485 assert_eq!(db.get(loc2).await.unwrap().unwrap(), v2);
486
487 db.commit(None).await.unwrap();
489 assert_eq!(db.op_count(), 4); assert_eq!(db.get_metadata().await.unwrap(), None);
491 assert_eq!(db.get(Location::new_unchecked(3)).await.unwrap(), None); let root = db.root();
493 db.close().await.unwrap();
494 let mut db = open_db(context.clone()).await;
495 assert_eq!(db.op_count(), 4);
496 assert_eq!(db.root(), root);
497
498 assert_eq!(db.get(loc1).await.unwrap().unwrap(), v1);
499 assert_eq!(db.get(loc2).await.unwrap().unwrap(), v2);
500
501 db.append(v2).await.unwrap();
502 db.append(v1).await.unwrap();
503
504 db.close().await.unwrap();
506 let db = open_db(context.clone()).await;
507 assert_eq!(db.op_count(), 4);
508 assert_eq!(db.root(), root);
509
510 db.close().await.unwrap();
512 let db = open_db(context.clone()).await;
513 assert_eq!(db.op_count(), 4);
514 assert_eq!(db.root(), root);
515
516 db.destroy().await.unwrap();
517 });
518 }
519
520 async fn append_elements<T: Rng>(db: &mut Db, rng: &mut T, num_elements: usize) {
522 for _ in 0..num_elements {
523 let value = vec![(rng.next_u32() % 255) as u8, (rng.next_u32() % 255) as u8];
524 db.append(value).await.unwrap();
525 }
526 }
527
528 #[test_traced("WARN")]
529 pub fn test_keyless_db_recovery() {
530 let executor = deterministic::Runner::default();
531 const ELEMENTS: usize = 1000;
532 executor.start(|mut context| async move {
533 let mut db = open_db(context.clone()).await;
534 let root = db.root();
535
536 append_elements(&mut db, &mut context, ELEMENTS).await;
537
538 db.simulate_failure(false, false).await.unwrap();
540 let mut db = open_db(context.clone()).await;
542 assert_eq!(root, db.root());
543
544 append_elements(&mut db, &mut context, ELEMENTS).await;
546 db.commit(None).await.unwrap();
547 let root = db.root();
548
549 append_elements(&mut db, &mut context, ELEMENTS).await;
551
552 db.simulate_failure(false, false).await.unwrap();
554 let mut db = open_db(context.clone()).await;
556 assert_eq!(root, db.root());
557
558 append_elements(&mut db, &mut context, ELEMENTS).await;
560 db.simulate_failure(true, false).await.unwrap();
562 let mut db = open_db(context.clone()).await;
564 assert_eq!(root, db.root());
565
566 append_elements(&mut db, &mut context, ELEMENTS).await;
568 db.simulate_failure(false, true).await.unwrap();
570 let mut db = open_db(context.clone()).await;
572 assert_eq!(root, db.root());
573
574 append_elements(&mut db, &mut context, ELEMENTS).await;
576 db.commit(None).await.unwrap();
577 let root = db.root();
578
579 db.close().await.unwrap();
581 let db = open_db(context.clone()).await;
582 assert_eq!(db.op_count(), 2 * ELEMENTS as u64 + 3);
583 assert_eq!(db.root(), root);
584
585 db.destroy().await.unwrap();
586 });
587 }
588
589 #[test_traced("WARN")]
592 fn test_keyless_db_non_empty_db_recovery() {
593 let executor = deterministic::Runner::default();
594 executor.start(|mut context| async move {
595 let mut db = open_db(context.clone()).await;
596
597 const ELEMENTS: usize = 200;
599 append_elements(&mut db, &mut context, ELEMENTS).await;
600 db.commit(None).await.unwrap();
601 let root = db.root();
602 let op_count = db.op_count();
603
604 let db = open_db(context.clone()).await;
606 assert_eq!(db.op_count(), op_count);
607 assert_eq!(db.root(), root);
608 assert_eq!(db.last_commit_loc(), op_count - 1);
609 db.close().await.unwrap();
610
611 async fn recover_from_failure(
613 mut context: deterministic::Context,
614 root: <Sha256 as Hasher>::Digest,
615 op_count: Location,
616 ) {
617 let mut db = open_db(context.clone()).await;
618
619 append_elements(&mut db, &mut context, ELEMENTS).await;
621 db.simulate_failure(false, false).await.unwrap();
622 let mut db = open_db(context.clone()).await;
623 assert_eq!(db.op_count(), op_count);
624 assert_eq!(db.root(), root);
625
626 append_elements(&mut db, &mut context, ELEMENTS).await;
628 db.simulate_failure(true, false).await.unwrap();
629 let mut db = open_db(context.clone()).await;
630 assert_eq!(db.op_count(), op_count);
631 assert_eq!(db.root(), root);
632
633 append_elements(&mut db, &mut context, ELEMENTS).await;
635 db.simulate_failure(false, true).await.unwrap();
636 let db = open_db(context.clone()).await;
637 assert_eq!(db.op_count(), op_count);
638 assert_eq!(db.root(), root);
639 }
640
641 recover_from_failure(context.clone(), root, op_count).await;
642
643 let db = open_db(context.clone()).await;
645 let last_commit_loc = db.last_commit_loc();
646 db.simulate_prune_failure(last_commit_loc).await.unwrap();
647 let db = open_db(context.clone()).await;
648 assert_eq!(db.op_count(), op_count);
649 assert_eq!(db.root(), root);
650 db.close().await.unwrap();
651
652 let mut db = open_db(context.clone()).await;
654 db.prune(db.last_commit_loc()).await.unwrap();
655 assert_eq!(db.op_count(), op_count);
656 assert_eq!(db.root(), root);
657 db.close().await.unwrap();
658
659 recover_from_failure(context.clone(), root, op_count).await;
660
661 let mut db = open_db(context.clone()).await;
663 append_elements(&mut db, &mut context, ELEMENTS).await;
664 db.commit(None).await.unwrap();
665 let db = open_db(context.clone()).await;
666 assert!(db.op_count() > op_count);
667 assert_ne!(db.root(), root);
668 assert_eq!(db.last_commit_loc(), db.op_count() - 1);
669
670 db.destroy().await.unwrap();
671 });
672 }
673
674 #[test_traced("WARN")]
677 fn test_keyless_db_empty_db_recovery() {
678 const ELEMENTS: u64 = 1000;
679 let executor = deterministic::Runner::default();
680 executor.start(|context| async move {
681 let db = open_db(context.clone()).await;
682 let root = db.root();
683
684 let mut db = open_db(context.clone()).await;
686 assert_eq!(db.op_count(), 1); assert_eq!(db.root(), root);
688
689 async fn apply_ops(db: &mut Db) {
690 for i in 0..ELEMENTS {
691 let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
692 db.append(v).await.unwrap();
693 }
694 }
695
696 apply_ops(&mut db).await;
698 db.simulate_failure(false, false).await.unwrap();
699 let mut db = open_db(context.clone()).await;
700 assert_eq!(db.op_count(), 1); assert_eq!(db.root(), root);
702
703 apply_ops(&mut db).await;
704 db.simulate_failure(true, false).await.unwrap();
705 let mut db = open_db(context.clone()).await;
706 assert_eq!(db.op_count(), 1); assert_eq!(db.root(), root);
708
709 apply_ops(&mut db).await;
710 db.simulate_failure(false, false).await.unwrap();
711 let mut db = open_db(context.clone()).await;
712 assert_eq!(db.op_count(), 1); assert_eq!(db.root(), root);
714
715 apply_ops(&mut db).await;
716 db.simulate_failure(false, true).await.unwrap();
717 let mut db = open_db(context.clone()).await;
718 assert_eq!(db.op_count(), 1); assert_eq!(db.root(), root);
720
721 apply_ops(&mut db).await;
722 db.simulate_failure(true, false).await.unwrap();
723 let mut db = open_db(context.clone()).await;
724 assert_eq!(db.op_count(), 1); assert_eq!(db.root(), root);
726
727 apply_ops(&mut db).await;
728 db.simulate_failure(true, true).await.unwrap();
729 let mut db = open_db(context.clone()).await;
730 assert_eq!(db.op_count(), 1); assert_eq!(db.root(), root);
732
733 apply_ops(&mut db).await;
734 db.simulate_failure(false, true).await.unwrap();
735 let mut db = open_db(context.clone()).await;
736 assert_eq!(db.op_count(), 1); assert_eq!(db.root(), root);
738
739 apply_ops(&mut db).await;
741 apply_ops(&mut db).await;
742 apply_ops(&mut db).await;
743 let mut db = open_db(context.clone()).await;
744 assert_eq!(db.op_count(), 1); assert_eq!(db.root(), root);
746 assert_eq!(db.last_commit_loc(), Location::new_unchecked(0));
747
748 apply_ops(&mut db).await;
750 db.commit(None).await.unwrap();
751 let db = open_db(context.clone()).await;
752 assert!(db.op_count() > 1);
753 assert_ne!(db.root(), root);
754
755 db.destroy().await.unwrap();
756 });
757 }
758
759 #[test_traced("INFO")]
760 pub fn test_keyless_db_proof_generation_and_verification() {
761 let executor = deterministic::Runner::default();
762 executor.start(|context| async move {
763 let mut hasher = Standard::<Sha256>::new();
764 let mut db = open_db(context.clone()).await;
765
766 const ELEMENTS: u64 = 100;
768 let mut values = Vec::new();
769 for i in 0u64..ELEMENTS {
770 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
771 values.push(v.clone());
772 db.append(v).await.unwrap();
773 }
774 db.commit(None).await.unwrap();
775
776 assert!(matches!(
778 db.historical_proof(db.op_count() + 1, Location::new_unchecked(5), NZU64!(10))
779 .await,
780 Err(Error::Mmr(crate::mmr::Error::RangeOutOfBounds(_)))
781 ));
782
783 let root = db.root();
784
785 let test_cases = vec![
787 (0, 10), (10, 5), (50, 20), (90, 15), (0, 1), (ELEMENTS - 1, 1), (ELEMENTS, 1), ];
795
796 for (start_loc, max_ops) in test_cases {
797 let (proof, ops) = db.proof(Location::new_unchecked(start_loc), NZU64!(max_ops)).await.unwrap();
798
799 assert!(
801 verify_proof(&mut hasher, &proof, Location::new_unchecked(start_loc), &ops, &root),
802 "Failed to verify proof for range starting at {start_loc} with max {max_ops} ops",
803 );
804
805 let expected_ops = std::cmp::min(max_ops, *db.op_count() - start_loc);
807 assert_eq!(
808 ops.len() as u64,
809 expected_ops,
810 "Expected {expected_ops} operations, got {}",
811 ops.len(),
812 );
813
814 for (i, op) in ops.iter().enumerate() {
816 let loc = start_loc + i as u64;
817 if loc == 0 {
818 assert!(
819 matches!(op, Operation::Commit(None)),
820 "Expected Initial Commit operation at location {loc}, got {op:?}",
821 );
822 } else if loc <= ELEMENTS {
823 assert!(
825 matches!(op, Operation::Append(_)),
826 "Expected Append operation at location {loc}, got {op:?}",
827 );
828 } else if loc == ELEMENTS + 1 {
829 assert!(
831 matches!(op, Operation::Commit(_)),
832 "Expected Commit operation at location {loc}, got {op:?}",
833 );
834 }
835 }
836
837 let wrong_root = Sha256::hash(&[0xFF; 32]);
839 assert!(
840 !verify_proof(&mut hasher, &proof, Location::new_unchecked(start_loc), &ops, &wrong_root),
841 "Proof should fail with wrong root"
842 );
843
844 if start_loc > 0 {
846 assert!(
847 !verify_proof(&mut hasher, &proof, Location::new_unchecked(start_loc - 1), &ops, &root),
848 "Proof should fail with wrong start location"
849 );
850 }
851 }
852
853 db.destroy().await.unwrap();
854 });
855 }
856
857 #[test_traced("INFO")]
858 pub fn test_keyless_db_proof_with_pruning() {
859 let executor = deterministic::Runner::default();
860 executor.start(|context| async move {
861 let mut hasher = Standard::<Sha256>::new();
862 let mut db = open_db(context.clone()).await;
863
864 const ELEMENTS: u64 = 100;
866 let mut values = Vec::new();
867 for i in 0u64..ELEMENTS {
868 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
869 values.push(v.clone());
870 db.append(v).await.unwrap();
871 }
872 db.commit(None).await.unwrap();
873
874 for i in ELEMENTS..ELEMENTS * 2 {
876 let v = vec![(i % 255) as u8; ((i % 17) + 5) as usize];
877 values.push(v.clone());
878 db.append(v).await.unwrap();
879 }
880 db.commit(None).await.unwrap();
881 let root = db.root();
882
883 println!("last commit loc: {}", db.last_commit_loc());
884
885 const PRUNE_LOC: u64 = 30;
887 db.prune(Location::new_unchecked(PRUNE_LOC)).await.unwrap();
888
889 let oldest_retained = db.oldest_retained_loc();
891
892 assert_eq!(
894 db.root(),
895 root,
896 "Root should not change after pruning"
897 );
898
899 db.close().await.unwrap();
900 let mut db = open_db(context.clone()).await;
901 assert_eq!(db.root(), root);
902 assert_eq!(db.op_count(), 2 * ELEMENTS + 3);
903 assert!(db.oldest_retained_loc() <= PRUNE_LOC);
904
905 for i in 0..*oldest_retained {
907 let result = db.get(Location::new_unchecked(i)).await;
908 match result {
910 Ok(None) => {} Ok(Some(_)) => {
912 panic!("Should not be able to get pruned value at location {i}")
913 }
914 Err(_) => {} }
916 }
917
918 let test_cases = vec![
920 (oldest_retained, 10), (Location::new_unchecked(50), 20), (Location::new_unchecked(150), 10), (Location::new_unchecked(190), 15), ];
925
926 for (start_loc, max_ops) in test_cases {
927 if start_loc < oldest_retained {
929 continue;
930 }
931
932 let (proof, ops) = db.proof(start_loc, NZU64!(max_ops)).await.unwrap();
933
934 assert!(
936 verify_proof(&mut hasher, &proof, start_loc, &ops, &root),
937 "Failed to verify proof for range starting at {start_loc} with max {max_ops} ops after pruning",
938 );
939
940 let expected_ops = std::cmp::min(max_ops, *db.op_count() - *start_loc);
942 assert_eq!(
943 ops.len() as u64,
944 expected_ops,
945 "Expected {expected_ops} operations, got {}",
946 ops.len(),
947 );
948 }
949
950 const AGGRESSIVE_PRUNE: Location = Location::new_unchecked(150);
952 db.prune(AGGRESSIVE_PRUNE).await.unwrap();
953
954 let new_oldest = db.oldest_retained_loc();
955 assert!(new_oldest <= AGGRESSIVE_PRUNE);
956
957 let (proof, ops) = db.proof(new_oldest, NZU64!(20)).await.unwrap();
959 assert!(
960 verify_proof(&mut hasher, &proof, new_oldest, &ops, &root),
961 "Proof should still verify after aggressive pruning"
962 );
963
964 let almost_all = db.op_count() - 5;
966 db.prune(almost_all).await.unwrap();
967
968 let final_oldest = db.oldest_retained_loc();
969
970 if final_oldest < db.op_count() {
972 let (final_proof, final_ops) = db.proof(final_oldest, NZU64!(10)).await.unwrap();
973 assert!(
974 verify_proof(&mut hasher, &final_proof, final_oldest, &final_ops, &root),
975 "Should be able to prove remaining operations after extensive pruning"
976 );
977 }
978
979 db.destroy().await.unwrap();
980 });
981 }
982
983 #[test_traced("WARN")]
984 fn test_keyless_db_replay_with_trailing_appends() {
985 let executor = deterministic::Runner::default();
986 executor.start(|context| async move {
987 let mut db = open_db(context.clone()).await;
989
990 for i in 0..10 {
992 let v = vec![i as u8; 10];
993 db.append(v).await.unwrap();
994 }
995 db.commit(None).await.unwrap();
996 let committed_root = db.root();
997 let committed_size = db.op_count();
998
999 let uncommitted_value = vec![99u8; 20];
1001 db.append(uncommitted_value.clone()).await.unwrap();
1002
1003 db.simulate_failure(true, false).await.unwrap();
1005
1006 let mut db = open_db(context.clone()).await;
1008
1009 assert_eq!(
1011 db.op_count(),
1012 committed_size,
1013 "Should rewind to last commit"
1014 );
1015 assert_eq!(db.root(), committed_root, "Root should match last commit");
1016 assert_eq!(
1017 db.last_commit_loc(),
1018 committed_size - 1,
1019 "Last commit location should be correct"
1020 );
1021
1022 let new_value = vec![77u8; 15];
1025 let loc = db.append(new_value.clone()).await.unwrap();
1026 assert_eq!(
1027 loc, committed_size,
1028 "New append should get the expected location"
1029 );
1030
1031 assert_eq!(db.get(loc).await.unwrap(), Some(new_value));
1033
1034 db.commit(None).await.unwrap();
1036 let new_committed_root = db.root();
1037 let new_committed_size = db.op_count();
1038
1039 for i in 0..5 {
1041 let v = vec![(200 + i) as u8; 10];
1042 db.append(v).await.unwrap();
1043 }
1044
1045 db.simulate_failure(true, false).await.unwrap();
1047
1048 let db = open_db(context.clone()).await;
1050 assert_eq!(
1051 db.op_count(),
1052 new_committed_size,
1053 "Should rewind to last commit with multiple trailing appends"
1054 );
1055 assert_eq!(
1056 db.root(),
1057 new_committed_root,
1058 "Root should match last commit after multiple appends"
1059 );
1060 assert_eq!(
1061 db.last_commit_loc(),
1062 new_committed_size - 1,
1063 "Last commit location should be correct after multiple appends"
1064 );
1065
1066 db.destroy().await.unwrap();
1067 });
1068 }
1069
1070 #[test_traced("INFO")]
1071 pub fn test_keyless_db_get_out_of_bounds() {
1072 let executor = deterministic::Runner::default();
1073 executor.start(|context| async move {
1074 let mut db = open_db(context.clone()).await;
1075
1076 let result = db.get(Location::new_unchecked(0)).await.unwrap();
1078 assert!(result.is_none());
1079
1080 let v1 = vec![1u8; 8];
1082 let v2 = vec![2u8; 8];
1083 db.append(v1.clone()).await.unwrap();
1084 db.append(v2.clone()).await.unwrap();
1085 db.commit(None).await.unwrap();
1086
1087 assert_eq!(db.get(Location::new_unchecked(1)).await.unwrap().unwrap(), v1);
1089 assert_eq!(db.get(Location::new_unchecked(2)).await.unwrap().unwrap(), v2);
1090
1091 let result = db.get(Location::new_unchecked(3)).await.unwrap();
1093 assert!(result.is_none());
1094
1095 let result = db.get(Location::new_unchecked(4)).await;
1097 assert!(
1098 matches!(result, Err(Error::LocationOutOfBounds(loc, size)) if loc == Location::new_unchecked(4) && size == Location::new_unchecked(4))
1099 );
1100
1101 db.destroy().await.unwrap();
1102 });
1103 }
1104
1105 #[test_traced("INFO")]
1106 pub fn test_keyless_db_prune_beyond_commit() {
1107 let executor = deterministic::Runner::default();
1108 executor.start(|context| async move {
1109 let mut db = open_db(context.clone()).await;
1110
1111 let result = db.prune(Location::new_unchecked(1)).await;
1113 assert!(
1114 matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
1115 if prune_loc == Location::new_unchecked(1) && commit_loc == Location::new_unchecked(0))
1116 );
1117
1118 let v1 = vec![1u8; 8];
1120 let v2 = vec![2u8; 8];
1121 let v3 = vec![3u8; 8];
1122 db.append(v1.clone()).await.unwrap();
1123 db.append(v2.clone()).await.unwrap();
1124 db.commit(None).await.unwrap();
1125 db.append(v3.clone()).await.unwrap();
1126
1127 let last_commit = db.last_commit_loc();
1129 assert_eq!(last_commit, Location::new_unchecked(3));
1130
1131 assert!(db.prune(last_commit).await.is_ok());
1133
1134 db.commit(None).await.unwrap();
1136 let new_last_commit = db.last_commit_loc();
1137
1138 let beyond = Location::new_unchecked(*new_last_commit + 1);
1140 let result = db.prune(beyond).await;
1141 assert!(
1142 matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
1143 if prune_loc == beyond && commit_loc == new_last_commit)
1144 );
1145
1146 db.destroy().await.unwrap();
1147 });
1148 }
1149}