1use crate::{
13 adb::{align_mmr_and_locations, Error},
14 journal::{
15 fixed::{Config as FConfig, Journal as FJournal},
16 variable::{Config as VConfig, Journal as VJournal},
17 },
18 mmr::{
19 hasher::Standard,
20 iterator::leaf_num_to_pos,
21 journaled::{Config as MmrConfig, Mmr},
22 verification::Proof,
23 },
24 store::operation::Keyless as Operation,
25};
26use commonware_codec::{Codec, Encode as _};
27use commonware_cryptography::Hasher as CHasher;
28use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage, ThreadPool};
29use commonware_utils::NZUsize;
30use futures::{future::TryFutureExt, pin_mut, try_join, StreamExt as _};
31use std::num::{NonZeroU64, NonZeroUsize};
32use tracing::{debug, warn};
33
34const REPLAY_BUFFER_SIZE: NonZeroUsize = NZUsize!(1 << 14);
36
37#[derive(Clone)]
39pub struct Config<C> {
40 pub mmr_journal_partition: String,
42
43 pub mmr_items_per_blob: NonZeroU64,
45
46 pub mmr_write_buffer: NonZeroUsize,
48
49 pub mmr_metadata_partition: String,
51
52 pub log_journal_partition: String,
54
55 pub log_write_buffer: NonZeroUsize,
57
58 pub log_compression: Option<u8>,
60
61 pub log_codec_config: C,
63
64 pub log_items_per_section: NonZeroU64,
66
67 pub locations_journal_partition: String,
69
70 pub locations_items_per_blob: NonZeroU64,
72
73 pub locations_write_buffer: NonZeroUsize,
75
76 pub thread_pool: Option<ThreadPool>,
78
79 pub buffer_pool: PoolRef,
81}
82
83pub struct Keyless<E: Storage + Clock + Metrics, V: Codec, H: CHasher> {
85 mmr: Mmr<E, H>,
92
93 log: VJournal<E, Operation<V>>,
95
96 size: u64,
99
100 log_items_per_section: u64,
102
103 locations: FJournal<E, u32>,
109
110 hasher: Standard<H>,
112
113 last_commit_loc: Option<u64>,
115}
116
117impl<E: Storage + Clock + Metrics, V: Codec, H: CHasher> Keyless<E, V, H> {
118 async fn find_last_operation(
123 locations: &FJournal<E, u32>,
124 log: &VJournal<E, Operation<V>>,
125 aligned_size: u64,
126 log_items_per_section: u64,
127 ) -> Result<(u64, Option<(u64, u32)>), Error> {
128 let mut valid_size = aligned_size;
129 let mut section_offset = None;
130
131 while valid_size > 0 {
132 let loc = valid_size - 1;
133 let offset = locations.read(loc).await?;
134 let section = loc / log_items_per_section;
135 match log.get(section, offset).await {
136 Ok(_) => {
137 section_offset = Some((section, offset));
138 break;
139 }
140 Err(e) => {
141 warn!(loc, err=?e, "log operation missing");
142 }
143 };
144 warn!(loc, offset, section, "walking back locations");
145 valid_size -= 1;
146 }
147
148 Ok((valid_size, section_offset))
149 }
150
151 async fn replay_operations(
156 mmr: &mut Mmr<E, H>,
157 hasher: &mut Standard<H>,
158 locations: &mut FJournal<E, u32>,
159 log: &VJournal<E, Operation<V>>,
160 section_offset: Option<(u64, u32)>,
161 ) -> Result<Option<(u32, Operation<V>)>, Error> {
162 let (section, offset, skip_first) = match section_offset {
164 Some((s, o)) => (s, o, true),
165 None => (0, 0, false),
166 };
167 let stream = log.replay(section, offset, REPLAY_BUFFER_SIZE).await?;
168 pin_mut!(stream);
169
170 let first_op = stream.next().await;
172 let (mut last_offset, mut last_op) = if skip_first {
173 let first_op = first_op.expect("operation known to exist")?;
175 (offset, first_op.3)
176 } else {
177 let Some(first_op) = first_op else {
179 debug!("no starting log operation found, returning empty db");
180 return Ok(None);
181 };
182 let first_op = first_op?;
183 let encoded_op = first_op.3.encode();
184
185 mmr.add_batched(hasher, &encoded_op).await?;
187 locations.append(first_op.1).await?;
188 (first_op.1, first_op.3)
189 };
190
191 while let Some(result) = stream.next().await {
193 let (section, offset, _, next_op) = result?;
194 let encoded_op = next_op.encode();
195 last_offset = offset;
196 last_op = next_op;
197 warn!(
198 location = mmr.leaves(),
199 section, offset, "adding missing operation to MMR/location map"
200 );
201 mmr.add_batched(hasher, &encoded_op).await?;
202 locations.append(offset).await?;
203 }
204
205 if mmr.is_dirty() {
207 mmr.sync(hasher).await?;
208 locations.sync().await?;
209 }
210
211 Ok(Some((last_offset, last_op)))
212 }
213
214 async fn rewind_to_last_commit(
219 mmr: &mut Mmr<E, H>,
220 locations: &mut FJournal<E, u32>,
221 log: &mut VJournal<E, Operation<V>>,
222 last_log_op: Operation<V>,
223 op_count: u64,
224 last_offset: u32,
225 log_items_per_section: u64,
226 ) -> Result<u64, Error> {
227 let mut first_uncommitted: Option<(u64, u32)> = None;
228 let mut op_index = op_count - 1;
229 let mut op = last_log_op;
230 let mut offset = last_offset;
231 let oldest_retained_loc = locations
232 .oldest_retained_pos()
233 .await?
234 .expect("location should be nonempty");
235
236 loop {
238 match op {
239 Operation::Commit(_) => {
240 break;
241 }
242 Operation::Append(_) => {
243 first_uncommitted = Some((op_index, offset));
247 }
248 }
249 if op_index == oldest_retained_loc {
250 assert_eq!(op_index, 0, "no commit operation found");
251 break;
252 }
253 op_index -= 1;
254 offset = locations.read(op_index).await?;
255 let section = op_index / log_items_per_section;
256 op = log.get(section, offset).await?;
257 }
258
259 let Some((rewind_size, rewind_offset)) = first_uncommitted else {
261 return Ok(op_index + 1);
262 };
263
264 let ops_to_rewind = (op_count - rewind_size) as usize;
266 warn!(ops_to_rewind, rewind_size, "rewinding log to last commit");
267 locations.rewind(rewind_size).await?;
268 locations.sync().await?;
269 mmr.pop(ops_to_rewind).await?; let section = rewind_size / log_items_per_section;
271 log.rewind_to_offset(section, rewind_offset).await?;
272 log.sync(section).await?;
273
274 Ok(rewind_size)
275 }
276
277 pub async fn init(context: E, cfg: Config<V::Cfg>) -> Result<Self, Error> {
280 let mut hasher = Standard::<H>::new();
281
282 let mut mmr = Mmr::init(
283 context.with_label("mmr"),
284 &mut hasher,
285 MmrConfig {
286 journal_partition: cfg.mmr_journal_partition,
287 metadata_partition: cfg.mmr_metadata_partition,
288 items_per_blob: cfg.mmr_items_per_blob,
289 write_buffer: cfg.mmr_write_buffer,
290 thread_pool: cfg.thread_pool,
291 buffer_pool: cfg.buffer_pool.clone(),
292 },
293 )
294 .await?;
295
296 let mut locations = FJournal::init(
297 context.with_label("locations"),
298 FConfig {
299 partition: cfg.locations_journal_partition,
300 items_per_blob: cfg.locations_items_per_blob,
301 write_buffer: cfg.locations_write_buffer,
302 buffer_pool: cfg.buffer_pool.clone(),
303 },
304 )
305 .await?;
306
307 let aligned_size = align_mmr_and_locations(&mut mmr, &mut locations).await?;
309
310 let mut log = VJournal::<E, Operation<V>>::init(
311 context.with_label("log"),
312 VConfig {
313 partition: cfg.log_journal_partition,
314 compression: cfg.log_compression,
315 codec_config: cfg.log_codec_config,
316 buffer_pool: cfg.buffer_pool,
317 write_buffer: cfg.log_write_buffer,
318 },
319 )
320 .await?;
321
322 let log_items_per_section = cfg.log_items_per_section.get();
324 let (valid_size, section_offset) =
325 Self::find_last_operation(&locations, &log, aligned_size, log_items_per_section)
326 .await?;
327
328 if aligned_size != valid_size {
330 warn!(
331 size = aligned_size,
332 new_size = valid_size,
333 "trimming locations & mmr elements ahead of log"
334 );
335 locations.rewind(valid_size).await?;
336 locations.sync().await?;
337 mmr.pop((aligned_size - valid_size) as usize).await?;
338 }
339 assert_eq!(mmr.leaves(), locations.size().await?);
340
341 let replay_result =
346 Self::replay_operations(&mut mmr, &mut hasher, &mut locations, &log, section_offset)
347 .await?;
348 let Some((last_offset, last_op)) = replay_result else {
349 return Ok(Self {
351 mmr,
352 log,
353 size: 0,
354 locations,
355 log_items_per_section,
356 hasher,
357 last_commit_loc: None,
358 });
359 };
360
361 let op_count = mmr.leaves();
363 let size = Self::rewind_to_last_commit(
364 &mut mmr,
365 &mut locations,
366 &mut log,
367 last_op,
368 op_count,
369 last_offset,
370 log_items_per_section,
371 )
372 .await?;
373 assert_eq!(size, mmr.leaves());
374 assert_eq!(size, locations.size().await?);
375
376 Ok(Self {
377 mmr,
378 log,
379 size,
380 locations,
381 log_items_per_section,
382 hasher,
383 last_commit_loc: size.checked_sub(1),
384 })
385 }
386
387 pub async fn get(&self, loc: u64) -> Result<Option<V>, Error> {
389 assert!(loc < self.size);
390 let offset = self.locations.read(loc).await?;
391
392 let section = loc / self.log_items_per_section;
393 let op = self.log.get(section, offset).await?;
394
395 Ok(op.into_value())
396 }
397
398 pub fn op_count(&self) -> u64 {
401 self.size
402 }
403
404 pub fn last_commit_loc(&self) -> Option<u64> {
406 self.last_commit_loc
407 }
408
409 fn current_section(&self) -> u64 {
411 self.size / self.log_items_per_section
412 }
413
414 pub async fn oldest_retained_loc(&self) -> Result<Option<u64>, Error> {
416 if let Some(oldest_section) = self.log.oldest_section() {
417 Ok(Some(oldest_section * self.log_items_per_section))
418 } else {
419 Ok(None)
420 }
421 }
422
423 pub async fn prune(&mut self, loc: u64) -> Result<(), Error> {
430 assert!(loc <= self.last_commit_loc.unwrap_or(0));
431
432 try_join!(
438 self.mmr.sync(&mut self.hasher).map_err(Error::Mmr),
439 self.locations.sync().map_err(Error::Journal),
440 )?;
441
442 let section = loc / self.log_items_per_section;
444 if !self.log.prune(section).await? {
445 return Ok(());
446 }
447
448 let prune_loc = section * self.log_items_per_section;
449 debug!(size = self.size, loc = prune_loc, "pruned log");
450
451 try_join!(
453 self.mmr
454 .prune_to_pos(&mut self.hasher, leaf_num_to_pos(prune_loc))
455 .map_err(Error::Mmr),
456 self.locations.prune(prune_loc).map_err(Error::Journal),
457 )?;
458
459 Ok(())
460 }
461
462 pub async fn append(&mut self, value: V) -> Result<u64, Error> {
464 let loc = self.size;
465 let section = self.current_section();
466 let operation = Operation::Append(value);
467 let encoded_operation = operation.encode();
468
469 let log_loc_fut = async {
472 let (offset, _) = self.log.append(section, operation).await?;
473 self.locations.append(offset).await?;
474 Ok::<(), Error>(())
475 };
476
477 let mmr_fut = async {
479 self.mmr
480 .add_batched(&mut self.hasher, &encoded_operation)
481 .await?;
482 Ok::<(), Error>(())
483 };
484
485 try_join!(log_loc_fut, mmr_fut)?;
487 self.size += 1;
488
489 if section != self.current_section() {
491 self.log.sync(section).await?;
492 }
493
494 Ok(loc)
495 }
496
497 pub async fn commit(&mut self, metadata: Option<V>) -> Result<u64, Error> {
503 let loc = self.size;
504 let section = self.current_section();
505 self.last_commit_loc = Some(loc);
506
507 let operation = Operation::Commit(metadata);
508 let encoded_operation = operation.encode();
509
510 let log_loc_fut = async {
513 let (offset, _) = self.log.append(section, operation).await?;
514 try_join!(
516 self.log.sync(section).map_err(Error::Journal),
517 self.locations.append(offset).map_err(Error::Journal),
518 )?;
519
520 Ok::<(), Error>(())
521 };
522
523 let mmr_fut = async {
525 self.mmr
526 .add_batched(&mut self.hasher, &encoded_operation)
527 .await?;
528 self.mmr.process_updates(&mut self.hasher);
529
530 Ok::<(), Error>(())
531 };
532
533 try_join!(log_loc_fut, mmr_fut)?;
535 self.size += 1;
536
537 debug!(size = self.size, "committed db");
538
539 Ok(loc)
540 }
541
542 pub async fn sync(&mut self) -> Result<(), Error> {
546 let section = self.current_section();
547 try_join!(
548 self.locations.sync().map_err(Error::Journal),
549 self.mmr.sync(&mut self.hasher).map_err(Error::Mmr),
550 self.log.sync(section).map_err(Error::Journal),
551 )?;
552
553 Ok(())
554 }
555
556 pub async fn get_metadata(&self) -> Result<Option<(u64, Option<V>)>, Error> {
559 let Some(loc) = self.last_commit_loc else {
560 return Ok(None);
561 };
562 let offset = self.locations.read(loc).await?;
563 let section = loc / self.log_items_per_section;
564 let op = self.log.get(section, offset).await?;
565 let Operation::Commit(metadata) = op else {
566 return Ok(None);
567 };
568
569 Ok(Some((loc, metadata)))
570 }
571
572 pub fn root(&self, hasher: &mut Standard<H>) -> H::Digest {
578 self.mmr.root(hasher)
579 }
580
581 pub async fn proof(
592 &self,
593 start_loc: u64,
594 max_ops: NonZeroU64,
595 ) -> Result<(Proof<H::Digest>, Vec<Operation<V>>), Error> {
596 self.historical_proof(self.size, start_loc, max_ops).await
597 }
598
599 pub async fn historical_proof(
601 &self,
602 size: u64,
603 start_loc: u64,
604 max_ops: NonZeroU64,
605 ) -> Result<(Proof<H::Digest>, Vec<Operation<V>>), Error> {
606 let start_pos = leaf_num_to_pos(start_loc);
607 let end_index = std::cmp::min(size - 1, start_loc + max_ops.get() - 1);
608 let end_pos = leaf_num_to_pos(end_index);
609 let mmr_size = leaf_num_to_pos(size);
610
611 let proof = self
612 .mmr
613 .historical_range_proof(mmr_size, start_pos, end_pos)
614 .await?;
615 let mut ops = Vec::with_capacity((end_index - start_loc + 1) as usize);
616 for loc in start_loc..=end_index {
617 let offset = self.locations.read(loc).await?;
618 let section = loc / self.log_items_per_section;
619 let value = self.log.get(section, offset).await?;
620 ops.push(value);
621 }
622
623 Ok((proof, ops))
624 }
625
626 pub async fn close(mut self) -> Result<(), Error> {
628 self.locations.close().await?;
631
632 try_join!(
633 self.mmr.close(&mut self.hasher).map_err(Error::Mmr),
634 self.log.close().map_err(Error::Journal),
635 )?;
636
637 Ok(())
638 }
639
640 pub async fn destroy(self) -> Result<(), Error> {
642 try_join!(
643 self.mmr.destroy().map_err(Error::Mmr),
644 self.log.destroy().map_err(Error::Journal),
645 self.locations.destroy().map_err(Error::Journal),
646 )?;
647
648 Ok(())
649 }
650
651 #[cfg(test)]
652 pub(super) async fn simulate_failure(
654 mut self,
655 sync_log: bool,
656 sync_locations: bool,
657 sync_mmr: bool,
658 ) -> Result<(), Error> {
659 if sync_log {
660 let section = self.current_section();
661 self.log.sync(section).await?;
662 }
663 if sync_locations {
664 self.locations.sync().await?;
665 }
666 if sync_mmr {
667 self.mmr.sync(&mut self.hasher).await?;
668 }
669
670 Ok(())
671 }
672
673 #[cfg(test)]
674 pub(super) async fn simulate_prune_failure(mut self, loc: u64) -> Result<(), Error> {
676 assert!(loc <= self.last_commit_loc.unwrap_or(0));
677 try_join!(
679 self.mmr.sync(&mut self.hasher).map_err(Error::Mmr),
680 self.locations.sync().map_err(Error::Journal),
681 )?;
682 let section = loc / self.log_items_per_section;
683 assert!(
684 self.log.prune(section).await?,
685 "nothing was pruned, so could not simulate failure"
686 );
687
688 Ok(())
690 }
691}
692
693#[cfg(test)]
694mod test {
695 use super::*;
696 use crate::{
697 adb::verify_proof,
698 mmr::{hasher::Standard, mem::Mmr as MemMmr},
699 };
700 use commonware_cryptography::Sha256;
701 use commonware_macros::test_traced;
702 use commonware_runtime::{deterministic, Runner as _};
703 use commonware_utils::{NZUsize, NZU64};
704
705 const PAGE_SIZE: usize = 101;
707 const PAGE_CACHE_SIZE: usize = 11;
708
709 fn db_config(suffix: &str) -> Config<(commonware_codec::RangeCfg, ())> {
710 Config {
711 mmr_journal_partition: format!("journal_{suffix}"),
712 mmr_metadata_partition: format!("metadata_{suffix}"),
713 mmr_items_per_blob: NZU64!(11),
714 mmr_write_buffer: NZUsize!(1024),
715 log_journal_partition: format!("log_journal_{suffix}"),
716 log_write_buffer: NZUsize!(1024),
717 log_compression: None,
718 log_codec_config: ((0..=10000).into(), ()),
719 log_items_per_section: NZU64!(7),
720 locations_journal_partition: format!("locations_journal_{suffix}"),
721 locations_items_per_blob: NZU64!(13),
722 locations_write_buffer: NZUsize!(1024),
723 thread_pool: None,
724 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
725 }
726 }
727
728 type Db = Keyless<deterministic::Context, Vec<u8>, Sha256>;
730
731 async fn open_db(context: deterministic::Context) -> Db {
733 Db::init(context, db_config("partition")).await.unwrap()
734 }
735
736 #[test_traced("INFO")]
737 pub fn test_keyless_db_empty() {
738 let executor = deterministic::Runner::default();
739 executor.start(|context| async move {
740 let mut db = open_db(context.clone()).await;
741 let mut hasher = Standard::<Sha256>::new();
742 assert_eq!(db.op_count(), 0);
743 assert_eq!(db.oldest_retained_loc().await.unwrap(), None);
744 assert_eq!(db.root(&mut hasher), MemMmr::default().root(&mut hasher));
745 assert_eq!(db.get_metadata().await.unwrap(), None);
746 assert_eq!(db.last_commit_loc(), None);
747
748 let v1 = vec![1u8; 8];
750 let root = db.root(&mut hasher);
751 db.append(v1).await.unwrap();
752 db.close().await.unwrap();
753 let mut db = open_db(context.clone()).await;
754 assert_eq!(db.root(&mut hasher), root);
755 assert_eq!(db.op_count(), 0);
756 assert_eq!(db.get_metadata().await.unwrap(), None);
757
758 let metadata = Some(vec![3u8; 10]);
760 db.commit(metadata.clone()).await.unwrap();
761 assert_eq!(db.op_count(), 1); assert_eq!(
763 db.get_metadata().await.unwrap(),
764 Some((0, metadata.clone()))
765 );
766 assert_eq!(db.get(0).await.unwrap(), metadata); let root = db.root(&mut hasher);
768
769 let db = open_db(context.clone()).await;
771 assert_eq!(db.op_count(), 1); assert_eq!(db.get_metadata().await.unwrap(), Some((0, metadata)));
773 assert_eq!(db.root(&mut hasher), root);
774 assert_eq!(db.last_commit_loc(), Some(0));
775
776 db.destroy().await.unwrap();
777 });
778 }
779
780 #[test_traced("WARN")]
781 pub fn test_keyless_db_build_basic() {
782 let executor = deterministic::Runner::default();
783 executor.start(|context| async move {
784 let mut hasher = Standard::<Sha256>::new();
786 let mut db = open_db(context.clone()).await;
787
788 let v1 = vec![1u8; 8];
789 let v2 = vec![2u8; 20];
790
791 let loc1 = db.append(v1.clone()).await.unwrap();
792 assert_eq!(db.get(loc1).await.unwrap().unwrap(), v1);
793
794 let loc2 = db.append(v2.clone()).await.unwrap();
795 assert_eq!(db.get(loc2).await.unwrap().unwrap(), v2);
796
797 db.commit(None).await.unwrap();
799 assert_eq!(db.op_count(), 3); assert_eq!(db.get_metadata().await.unwrap(), Some((2, None)));
801 assert_eq!(db.get(2).await.unwrap(), None); let root = db.root(&mut hasher);
803 db.close().await.unwrap();
804 let mut db = open_db(context.clone()).await;
805 assert_eq!(db.op_count(), 3);
806 assert_eq!(db.root(&mut hasher), root);
807
808 assert_eq!(db.get(loc1).await.unwrap().unwrap(), v1);
809 assert_eq!(db.get(loc2).await.unwrap().unwrap(), v2);
810
811 db.append(v2).await.unwrap();
812 db.append(v1).await.unwrap();
813
814 db.close().await.unwrap();
816 let db = open_db(context.clone()).await;
817 assert_eq!(db.op_count(), 3);
818 assert_eq!(db.root(&mut hasher), root);
819
820 db.close().await.unwrap();
822 let db = open_db(context.clone()).await;
823 assert_eq!(db.op_count(), 3);
824 assert_eq!(db.root(&mut hasher), root);
825
826 db.destroy().await.unwrap();
827 });
828 }
829
830 #[test_traced("WARN")]
831 pub fn test_keyless_db_recovery() {
832 let executor = deterministic::Runner::default();
833 const ELEMENTS: u64 = 1000;
834 executor.start(|context| async move {
835 let mut hasher = Standard::<Sha256>::new();
836 let mut db = open_db(context.clone()).await;
837 let root = db.root(&mut hasher);
838
839 for i in 0u64..ELEMENTS {
840 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
841 db.append(v.clone()).await.unwrap();
842 }
843
844 db.simulate_failure(false, false, false).await.unwrap();
846 let mut db = open_db(context.clone()).await;
847 assert_eq!(root, db.root(&mut hasher));
848
849 for i in 0u64..ELEMENTS {
851 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
852 db.append(v.clone()).await.unwrap();
853 }
854 db.commit(None).await.unwrap();
855 let root = db.root(&mut hasher);
856
857 for i in ELEMENTS..2 * ELEMENTS {
859 let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
860 db.append(v.clone()).await.unwrap();
861 }
862
863 db.simulate_failure(false, false, false).await.unwrap();
865 let mut db = open_db(context.clone()).await;
866 assert_eq!(root, db.root(&mut hasher));
867
868 for i in ELEMENTS..2 * ELEMENTS {
870 let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
871 db.append(v.clone()).await.unwrap();
872 }
873 db.simulate_failure(true, false, false).await.unwrap();
874 let mut db = open_db(context.clone()).await;
875 assert_eq!(root, db.root(&mut hasher));
876
877 for i in ELEMENTS..2 * ELEMENTS {
879 let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
880 db.append(v.clone()).await.unwrap();
881 }
882 db.simulate_failure(true, true, false).await.unwrap();
883 let mut db = open_db(context.clone()).await;
884 assert_eq!(root, db.root(&mut hasher));
885
886 for i in ELEMENTS..2 * ELEMENTS {
888 let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
889 db.append(v.clone()).await.unwrap();
890 }
891 db.simulate_failure(true, false, true).await.unwrap();
892 let mut db = open_db(context.clone()).await;
893 assert_eq!(root, db.root(&mut hasher));
894
895 for i in ELEMENTS..2 * ELEMENTS {
897 let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
898 db.append(v.clone()).await.unwrap();
899 }
900 db.simulate_failure(false, true, false).await.unwrap();
901 let mut db = open_db(context.clone()).await;
902 assert_eq!(root, db.root(&mut hasher));
903
904 for i in ELEMENTS..2 * ELEMENTS {
906 let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
907 db.append(v.clone()).await.unwrap();
908 }
909 db.simulate_failure(false, false, true).await.unwrap();
910 let mut db = open_db(context.clone()).await;
911 assert_eq!(root, db.root(&mut hasher));
912
913 for i in ELEMENTS..2 * ELEMENTS {
915 let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
916 db.append(v.clone()).await.unwrap();
917 }
918 db.simulate_failure(false, true, true).await.unwrap();
919 let mut db = open_db(context.clone()).await;
920 assert_eq!(root, db.root(&mut hasher));
921
922 for i in ELEMENTS..2 * ELEMENTS {
924 let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
925 db.append(v.clone()).await.unwrap();
926 }
927 db.commit(None).await.unwrap();
928 let root = db.root(&mut hasher);
929
930 db.close().await.unwrap();
932 let db = open_db(context.clone()).await;
933 assert_eq!(db.op_count(), 2 * ELEMENTS + 2);
934 assert_eq!(db.root(&mut hasher), root);
935
936 db.destroy().await.unwrap();
937 });
938 }
939
940 #[test_traced("WARN")]
943 fn test_keyless_db_non_empty_db_recovery() {
944 let executor = deterministic::Runner::default();
945 executor.start(|context| async move {
946 let mut hasher = Standard::<Sha256>::new();
947 let mut db = open_db(context.clone()).await;
948
949 const ELEMENTS: u64 = 200;
951 for i in 0u64..ELEMENTS {
952 let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
953 db.append(v).await.unwrap();
954 }
955 db.commit(None).await.unwrap();
956 db.prune(10).await.unwrap();
957 let root = db.root(&mut hasher);
958 let op_count = db.op_count();
959
960 let db = open_db(context.clone()).await;
962 assert_eq!(db.op_count(), op_count);
963 assert_eq!(db.root(&mut hasher), root);
964 assert_eq!(db.last_commit_loc(), Some(op_count - 1));
965 db.close().await.unwrap();
966
967 async fn apply_more_ops(db: &mut Db) {
968 for i in 0..ELEMENTS {
969 let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
970 db.append(v).await.unwrap();
971 }
972 }
973
974 async fn recover_from_failure(
976 context: deterministic::Context,
977 root: <Sha256 as CHasher>::Digest,
978 hasher: &mut Standard<Sha256>,
979 op_count: u64,
980 ) {
981 let mut db = open_db(context.clone()).await;
982 apply_more_ops(&mut db).await;
983 db.simulate_failure(false, false, false).await.unwrap();
984 let mut db = open_db(context.clone()).await;
985 assert_eq!(db.op_count(), op_count);
986 assert_eq!(db.root(hasher), root);
987
988 apply_more_ops(&mut db).await;
989 db.simulate_failure(true, false, false).await.unwrap();
990 let mut db = open_db(context.clone()).await;
991 assert_eq!(db.op_count(), op_count);
992 assert_eq!(db.root(hasher), root);
993
994 apply_more_ops(&mut db).await;
995 db.simulate_failure(false, true, false).await.unwrap();
996 let mut db = open_db(context.clone()).await;
997 assert_eq!(db.op_count(), op_count);
998 assert_eq!(db.root(hasher), root);
999
1000 apply_more_ops(&mut db).await;
1001 db.simulate_failure(false, false, true).await.unwrap();
1002 let mut db = open_db(context.clone()).await;
1003 assert_eq!(db.op_count(), op_count);
1004 assert_eq!(db.root(hasher), root);
1005
1006 apply_more_ops(&mut db).await;
1007 db.simulate_failure(true, true, false).await.unwrap();
1008 let mut db = open_db(context.clone()).await;
1009 assert_eq!(db.op_count(), op_count);
1010 assert_eq!(db.root(hasher), root);
1011
1012 apply_more_ops(&mut db).await;
1013 db.simulate_failure(true, false, true).await.unwrap();
1014 let mut db = open_db(context.clone()).await;
1015 assert_eq!(db.op_count(), op_count);
1016 assert_eq!(db.root(hasher), root);
1017
1018 apply_more_ops(&mut db).await;
1019 db.simulate_failure(false, true, true).await.unwrap();
1020 let db = open_db(context.clone()).await;
1021 assert_eq!(db.op_count(), op_count);
1022 assert_eq!(db.root(hasher), root);
1023 assert_eq!(db.last_commit_loc(), Some(op_count - 1));
1024 }
1025
1026 recover_from_failure(context.clone(), root, &mut hasher, op_count).await;
1027
1028 let db = open_db(context.clone()).await;
1030 let last_commit_loc = db.last_commit_loc().unwrap();
1031 db.simulate_prune_failure(last_commit_loc).await.unwrap();
1032 let db = open_db(context.clone()).await;
1033 assert_eq!(db.op_count(), op_count);
1034 assert_eq!(db.root(&mut hasher), root);
1035 db.close().await.unwrap();
1036
1037 let mut db = open_db(context.clone()).await;
1039 db.prune(db.last_commit_loc().unwrap()).await.unwrap();
1040 assert_eq!(db.op_count(), op_count);
1041 assert_eq!(db.root(&mut hasher), root);
1042 db.close().await.unwrap();
1043
1044 recover_from_failure(context.clone(), root, &mut hasher, op_count).await;
1045
1046 let mut db = open_db(context.clone()).await;
1048 apply_more_ops(&mut db).await;
1049 db.commit(None).await.unwrap();
1050 let db = open_db(context.clone()).await;
1051 assert!(db.op_count() > op_count);
1052 assert_ne!(db.root(&mut hasher), root);
1053 assert_eq!(db.last_commit_loc(), Some(db.op_count() - 1));
1054
1055 db.destroy().await.unwrap();
1056 });
1057 }
1058
1059 #[test_traced("WARN")]
1062 fn test_keyless_db_empty_db_recovery() {
1063 const ELEMENTS: u64 = 1000;
1064 let executor = deterministic::Runner::default();
1065 executor.start(|context| async move {
1066 let mut hasher = Standard::<Sha256>::new();
1067 let db = open_db(context.clone()).await;
1068 let root = db.root(&mut hasher);
1069
1070 let mut db = open_db(context.clone()).await;
1072 assert_eq!(db.op_count(), 0);
1073 assert_eq!(db.root(&mut hasher), root);
1074
1075 async fn apply_ops(db: &mut Db) {
1076 for i in 0..ELEMENTS {
1077 let v = vec![(i % 255) as u8; ((i % 17) + 13) as usize];
1078 db.append(v).await.unwrap();
1079 }
1080 }
1081
1082 apply_ops(&mut db).await;
1084 db.simulate_failure(false, false, false).await.unwrap();
1085 let mut db = open_db(context.clone()).await;
1086 assert_eq!(db.op_count(), 0);
1087 assert_eq!(db.root(&mut hasher), root);
1088
1089 apply_ops(&mut db).await;
1090 db.simulate_failure(true, false, false).await.unwrap();
1091 let mut db = open_db(context.clone()).await;
1092 assert_eq!(db.op_count(), 0);
1093 assert_eq!(db.root(&mut hasher), root);
1094
1095 apply_ops(&mut db).await;
1096 db.simulate_failure(false, true, false).await.unwrap();
1097 let mut db = open_db(context.clone()).await;
1098 assert_eq!(db.op_count(), 0);
1099 assert_eq!(db.root(&mut hasher), root);
1100
1101 apply_ops(&mut db).await;
1102 db.simulate_failure(false, false, true).await.unwrap();
1103 let mut db = open_db(context.clone()).await;
1104 assert_eq!(db.op_count(), 0);
1105 assert_eq!(db.root(&mut hasher), root);
1106
1107 apply_ops(&mut db).await;
1108 db.simulate_failure(true, true, false).await.unwrap();
1109 let mut db = open_db(context.clone()).await;
1110 assert_eq!(db.op_count(), 0);
1111 assert_eq!(db.root(&mut hasher), root);
1112
1113 apply_ops(&mut db).await;
1114 db.simulate_failure(true, false, true).await.unwrap();
1115 let mut db = open_db(context.clone()).await;
1116 assert_eq!(db.op_count(), 0);
1117 assert_eq!(db.root(&mut hasher), root);
1118
1119 apply_ops(&mut db).await;
1120 db.simulate_failure(false, true, true).await.unwrap();
1121 let mut db = open_db(context.clone()).await;
1122 assert_eq!(db.op_count(), 0);
1123 assert_eq!(db.root(&mut hasher), root);
1124
1125 apply_ops(&mut db).await;
1127 apply_ops(&mut db).await;
1128 apply_ops(&mut db).await;
1129 let mut db = open_db(context.clone()).await;
1130 assert_eq!(db.op_count(), 0);
1131 assert_eq!(db.root(&mut hasher), root);
1132 assert_eq!(db.last_commit_loc(), None);
1133
1134 apply_ops(&mut db).await;
1136 db.commit(None).await.unwrap();
1137 let db = open_db(context.clone()).await;
1138 assert!(db.op_count() > 0);
1139 assert_ne!(db.root(&mut hasher), root);
1140
1141 db.destroy().await.unwrap();
1142 });
1143 }
1144
1145 #[test_traced("INFO")]
1146 pub fn test_keyless_db_proof_generation_and_verification() {
1147 let executor = deterministic::Runner::default();
1148 executor.start(|context| async move {
1149 let mut hasher = Standard::<Sha256>::new();
1150 let mut db = open_db(context.clone()).await;
1151
1152 const ELEMENTS: u64 = 100;
1154 let mut values = Vec::new();
1155 for i in 0u64..ELEMENTS {
1156 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1157 values.push(v.clone());
1158 db.append(v).await.unwrap();
1159 }
1160 db.commit(None).await.unwrap();
1161 let root = db.root(&mut hasher);
1162
1163 let test_cases = vec![
1165 (0, 10), (10, 5), (50, 20), (90, 15), (0, 1), (ELEMENTS - 1, 1), (ELEMENTS, 1), ];
1173
1174 for (start_loc, max_ops) in test_cases {
1175 let (proof, ops) = db.proof(start_loc, NZU64!(max_ops)).await.unwrap();
1176
1177 assert!(
1179 verify_proof(&mut hasher, &proof, start_loc, &ops, &root),
1180 "Failed to verify proof for range starting at {start_loc} with max {max_ops} ops",
1181 );
1182
1183 let expected_ops = std::cmp::min(max_ops, db.op_count() - start_loc);
1185 assert_eq!(
1186 ops.len() as u64,
1187 expected_ops,
1188 "Expected {expected_ops} operations, got {}",
1189 ops.len(),
1190 );
1191
1192 for (i, op) in ops.iter().enumerate() {
1194 let loc = start_loc + i as u64;
1195 if loc < ELEMENTS {
1196 assert!(
1198 matches!(op, Operation::Append(_)),
1199 "Expected Append operation at location {loc}, got {op:?}",
1200 );
1201 } else if loc == ELEMENTS {
1202 assert!(
1204 matches!(op, Operation::Commit(_)),
1205 "Expected Commit operation at location {loc}, got {op:?}",
1206 );
1207 }
1208 }
1209
1210 let wrong_root = Sha256::hash(&[0xFF; 32]);
1212 assert!(
1213 !verify_proof(&mut hasher, &proof, start_loc, &ops, &wrong_root),
1214 "Proof should fail with wrong root"
1215 );
1216
1217 if start_loc > 0 {
1219 assert!(
1220 !verify_proof(&mut hasher, &proof, start_loc - 1, &ops, &root),
1221 "Proof should fail with wrong start location"
1222 );
1223 }
1224 }
1225
1226 db.destroy().await.unwrap();
1227 });
1228 }
1229
1230 #[test_traced("INFO")]
1231 pub fn test_keyless_db_proof_with_pruning() {
1232 let executor = deterministic::Runner::default();
1233 executor.start(|context| async move {
1234 let mut hasher = Standard::<Sha256>::new();
1235 let mut db = open_db(context.clone()).await;
1236
1237 const ELEMENTS: u64 = 100;
1239 let mut values = Vec::new();
1240 for i in 0u64..ELEMENTS {
1241 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1242 values.push(v.clone());
1243 db.append(v).await.unwrap();
1244 }
1245 db.commit(None).await.unwrap();
1246
1247 for i in ELEMENTS..ELEMENTS * 2 {
1249 let v = vec![(i % 255) as u8; ((i % 17) + 5) as usize];
1250 values.push(v.clone());
1251 db.append(v).await.unwrap();
1252 }
1253 db.commit(None).await.unwrap();
1254 let root = db.root(&mut hasher);
1255
1256 println!("last commit loc: {}", db.last_commit_loc.unwrap());
1257
1258 const PRUNE_LOC: u64 = 30;
1260 db.prune(PRUNE_LOC).await.unwrap();
1261
1262 let oldest_retained = db.oldest_retained_loc().await.unwrap();
1264 assert!(
1265 oldest_retained.is_some(),
1266 "Should have oldest retained location after pruning"
1267 );
1268
1269 assert_eq!(
1271 db.root(&mut hasher),
1272 root,
1273 "Root should not change after pruning"
1274 );
1275
1276 db.close().await.unwrap();
1277 let mut db = open_db(context.clone()).await;
1278 assert_eq!(db.root(&mut hasher), root);
1279 assert_eq!(db.op_count(), 2 * ELEMENTS + 2);
1280 assert!(db.oldest_retained_loc().await.unwrap().unwrap() <= PRUNE_LOC);
1281
1282 for i in 0..oldest_retained.unwrap() {
1284 let result = db.get(i).await;
1285 match result {
1287 Ok(None) => {} Ok(Some(_)) => {
1289 panic!("Should not be able to get pruned value at location {i}")
1290 }
1291 Err(_) => {} }
1293 }
1294
1295 let test_cases = vec![
1297 (oldest_retained.unwrap(), 10), (50, 20), (150, 10), (190, 15), ];
1302
1303 for (start_loc, max_ops) in test_cases {
1304 if start_loc < oldest_retained.unwrap() {
1306 continue;
1307 }
1308
1309 let (proof, ops) = db.proof(start_loc, NZU64!(max_ops)).await.unwrap();
1310
1311 assert!(
1313 verify_proof(&mut hasher, &proof, start_loc, &ops, &root),
1314 "Failed to verify proof for range starting at {start_loc} with max {max_ops} ops after pruning",
1315 );
1316
1317 let expected_ops = std::cmp::min(max_ops, db.op_count() - start_loc);
1319 assert_eq!(
1320 ops.len() as u64,
1321 expected_ops,
1322 "Expected {expected_ops} operations, got {}",
1323 ops.len(),
1324 );
1325 }
1326
1327 const AGGRESSIVE_PRUNE: u64 = 150;
1329 db.prune(AGGRESSIVE_PRUNE).await.unwrap();
1330
1331 let new_oldest = db.oldest_retained_loc().await.unwrap().unwrap();
1332 assert!(new_oldest <= AGGRESSIVE_PRUNE);
1333
1334 let (proof, ops) = db.proof(new_oldest, NZU64!(20)).await.unwrap();
1336 assert!(
1337 verify_proof(&mut hasher, &proof, new_oldest, &ops, &root),
1338 "Proof should still verify after aggressive pruning"
1339 );
1340
1341 let almost_all = db.op_count() - 5;
1343 db.prune(almost_all).await.unwrap();
1344
1345 let final_oldest = db.oldest_retained_loc().await.unwrap().unwrap();
1346
1347 if final_oldest < db.op_count() {
1349 let (final_proof, final_ops) = db.proof(final_oldest, NZU64!(10)).await.unwrap();
1350 assert!(
1351 verify_proof(&mut hasher, &final_proof, final_oldest, &final_ops, &root),
1352 "Should be able to prove remaining operations after extensive pruning"
1353 );
1354 }
1355
1356 db.destroy().await.unwrap();
1357 });
1358 }
1359
1360 #[test_traced("WARN")]
1361 fn test_keyless_db_replay_with_trailing_appends() {
1362 let executor = deterministic::Runner::default();
1363 executor.start(|context| async move {
1364 let mut hasher = Standard::<Sha256>::new();
1365
1366 let mut db = open_db(context.clone()).await;
1368
1369 for i in 0..10 {
1371 let v = vec![i as u8; 10];
1372 db.append(v).await.unwrap();
1373 }
1374 db.commit(None).await.unwrap();
1375 let committed_root = db.root(&mut hasher);
1376 let committed_size = db.op_count();
1377
1378 let uncommitted_value = vec![99u8; 20];
1380 db.append(uncommitted_value.clone()).await.unwrap();
1381
1382 db.simulate_failure(true, false, false).await.unwrap();
1384
1385 let mut db = open_db(context.clone()).await;
1387
1388 assert_eq!(
1390 db.op_count(),
1391 committed_size,
1392 "Should rewind to last commit"
1393 );
1394 assert_eq!(
1395 db.root(&mut hasher),
1396 committed_root,
1397 "Root should match last commit"
1398 );
1399 assert_eq!(
1400 db.last_commit_loc(),
1401 Some(committed_size - 1),
1402 "Last commit location should be correct"
1403 );
1404
1405 let new_value = vec![77u8; 15];
1408 let loc = db.append(new_value.clone()).await.unwrap();
1409 assert_eq!(
1410 loc, committed_size,
1411 "New append should get the expected location"
1412 );
1413
1414 assert_eq!(db.get(loc).await.unwrap(), Some(new_value));
1416
1417 db.commit(None).await.unwrap();
1419 let new_committed_root = db.root(&mut hasher);
1420 let new_committed_size = db.op_count();
1421
1422 for i in 0..5 {
1424 let v = vec![(200 + i) as u8; 10];
1425 db.append(v).await.unwrap();
1426 }
1427
1428 db.simulate_failure(true, false, false).await.unwrap();
1430
1431 let db = open_db(context.clone()).await;
1433 assert_eq!(
1434 db.op_count(),
1435 new_committed_size,
1436 "Should rewind to last commit with multiple trailing appends"
1437 );
1438 assert_eq!(
1439 db.root(&mut hasher),
1440 new_committed_root,
1441 "Root should match last commit after multiple appends"
1442 );
1443 assert_eq!(
1444 db.last_commit_loc(),
1445 Some(new_committed_size - 1),
1446 "Last commit location should be correct after multiple appends"
1447 );
1448
1449 db.destroy().await.unwrap();
1450 });
1451 }
1452}