1use crate::{
45 journal::{
46 authenticated,
47 contiguous::{Contiguous, Mutable, Reader},
48 Error as JournalError,
49 },
50 merkle::{journaled::Config as MerkleConfig, Family, Location, Proof},
51 qmdb::{any::value::ValueEncoding, Error},
52 Context, Persistable,
53};
54use commonware_codec::EncodeShared;
55use commonware_cryptography::Hasher;
56use std::{num::NonZeroU64, sync::Arc};
57use tracing::{debug, warn};
58
59pub mod batch;
60pub mod fixed;
61mod operation;
62pub mod variable;
63pub use operation::Operation;
64
65#[derive(Clone)]
67pub struct Config<J> {
68 pub merkle: MerkleConfig,
70
71 pub log: J,
73}
74
75pub struct Keyless<F, E, V, C, H>
77where
78 F: Family,
79 E: Context,
80 V: ValueEncoding,
81 C: Contiguous<Item = Operation<V>>,
82 H: Hasher,
83 Operation<V>: EncodeShared,
84{
85 journal: authenticated::Journal<F, E, C, H>,
87
88 last_commit_loc: Location<F>,
90}
91
92impl<F, E, V, C, H> Keyless<F, E, V, C, H>
93where
94 F: Family,
95 E: Context,
96 V: ValueEncoding,
97 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
98 H: Hasher,
99 Operation<V>: EncodeShared,
100{
101 pub(crate) async fn init_from_journal(
102 mut journal: authenticated::Journal<F, E, C, H>,
103 ) -> Result<Self, Error<F>> {
104 if journal.size().await == 0 {
105 warn!("no operations found in log, creating initial commit");
106 journal.append(&Operation::Commit(None)).await?;
107 journal.sync().await?;
108 }
109
110 let last_commit_loc = journal
111 .size()
112 .await
113 .checked_sub(1)
114 .expect("at least one commit should exist");
115
116 Ok(Self {
117 journal,
118 last_commit_loc,
119 })
120 }
121
122 pub async fn get(&self, loc: Location<F>) -> Result<Option<V::Value>, Error<F>> {
129 let reader = self.journal.reader().await;
130 let op_count = reader.bounds().end;
131 if loc >= op_count {
132 return Err(Error::LocationOutOfBounds(loc, Location::new(op_count)));
133 }
134 let op = reader.read(*loc).await?;
135
136 Ok(op.into_value())
137 }
138
139 pub const fn last_commit_loc(&self) -> Location<F> {
141 self.last_commit_loc
142 }
143
144 pub async fn bounds(&self) -> std::ops::Range<Location<F>> {
147 let bounds = self.journal.reader().await.bounds();
148 Location::new(bounds.start)..Location::new(bounds.end)
149 }
150
151 pub async fn get_metadata(&self) -> Result<Option<V::Value>, Error<F>> {
153 let op = self
154 .journal
155 .reader()
156 .await
157 .read(*self.last_commit_loc)
158 .await?;
159 let Operation::Commit(metadata) = op else {
160 return Ok(None);
161 };
162
163 Ok(metadata)
164 }
165
166 pub fn root(&self) -> H::Digest {
168 self.journal.root()
169 }
170
171 pub async fn proof(
185 &self,
186 start_loc: Location<F>,
187 max_ops: NonZeroU64,
188 ) -> Result<(Proof<F, H::Digest>, Vec<Operation<V>>), Error<F>> {
189 self.historical_proof(self.bounds().await.end, start_loc, max_ops)
190 .await
191 }
192
193 pub async fn historical_proof(
203 &self,
204 op_count: Location<F>,
205 start_loc: Location<F>,
206 max_ops: NonZeroU64,
207 ) -> Result<(Proof<F, H::Digest>, Vec<Operation<V>>), Error<F>> {
208 Ok(self
209 .journal
210 .historical_proof(op_count, start_loc, max_ops)
211 .await?)
212 }
213
214 pub async fn prune(&mut self, loc: Location<F>) -> Result<(), Error<F>> {
220 if loc > self.last_commit_loc {
221 return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
222 }
223 self.journal.prune(loc).await?;
224
225 Ok(())
226 }
227
228 pub async fn rewind(&mut self, size: Location<F>) -> Result<(), Error<F>> {
248 let rewind_size = *size;
249 let current_size = *self.last_commit_loc + 1;
250 if rewind_size == current_size {
251 return Ok(());
252 }
253 if rewind_size == 0 || rewind_size > current_size {
254 return Err(Error::Journal(crate::journal::Error::InvalidRewind(
255 rewind_size,
256 )));
257 }
258
259 let rewind_last_loc = Location::new(rewind_size - 1);
260 {
261 let reader = self.journal.reader().await;
262 let bounds = reader.bounds();
263 if rewind_size <= bounds.start {
264 return Err(Error::Journal(crate::journal::Error::ItemPruned(
265 *rewind_last_loc,
266 )));
267 }
268 let rewind_last_op = reader.read(*rewind_last_loc).await?;
269 if !matches!(rewind_last_op, Operation::Commit(_)) {
270 return Err(Error::UnexpectedData(rewind_last_loc));
271 }
272 }
273
274 self.journal.rewind(rewind_size).await?;
277 self.last_commit_loc = rewind_last_loc;
278 Ok(())
279 }
280
281 pub async fn sync(&self) -> Result<(), Error<F>> {
285 self.journal.sync().await.map_err(Into::into)
286 }
287
288 pub async fn commit(&self) -> Result<(), Error<F>> {
290 self.journal.commit().await.map_err(Into::into)
291 }
292
293 pub async fn destroy(self) -> Result<(), Error<F>> {
295 Ok(self.journal.destroy().await?)
296 }
297
298 pub fn new_batch(&self) -> batch::UnmerkleizedBatch<F, H, V> {
300 let journal_size = *self.last_commit_loc + 1;
301 batch::UnmerkleizedBatch::new(self, journal_size)
302 }
303
304 pub fn to_batch(&self) -> Arc<batch::MerkleizedBatch<F, H::Digest, V>> {
306 let journal_size = *self.last_commit_loc + 1;
307 Arc::new(batch::MerkleizedBatch {
308 journal_batch: self.journal.to_merkleized_batch(),
309 parent: None,
310 base_size: journal_size,
311 total_size: journal_size,
312 db_size: journal_size,
313 ancestor_batch_ends: Vec::new(),
314 })
315 }
316
317 pub async fn apply_batch(
329 &mut self,
330 batch: Arc<batch::MerkleizedBatch<F, H::Digest, V>>,
331 ) -> Result<core::ops::Range<Location<F>>, Error<F>> {
332 let db_size = *self.last_commit_loc + 1;
333 let valid = db_size == batch.db_size
334 || db_size == batch.base_size
335 || batch.ancestor_batch_ends.contains(&db_size);
336 if !valid {
337 return Err(Error::StaleBatch {
338 db_size,
339 batch_db_size: batch.db_size,
340 batch_base_size: batch.base_size,
341 });
342 }
343 let start_loc = self.last_commit_loc + 1;
344
345 self.journal.apply_batch(&batch.journal_batch).await?;
346
347 self.last_commit_loc = Location::new(batch.total_size - 1);
348 let end_loc = Location::new(batch.total_size);
349 debug!(size = ?end_loc, "applied batch");
350 Ok(start_loc..end_loc)
351 }
352}
353
354#[cfg(test)]
355pub(crate) mod tests {
356 use super::*;
357 use crate::{
358 journal::{contiguous::Mutable, Error as JournalError},
359 merkle::hasher::Standard,
360 qmdb::verify_proof,
361 Persistable,
362 };
363 use commonware_cryptography::Sha256;
364 use commonware_runtime::{deterministic, Metrics};
365 use commonware_utils::NZU64;
366 use std::{future::Future, pin::Pin};
367
368 pub(crate) type Reopen<D> =
369 Box<dyn Fn(deterministic::Context) -> Pin<Box<dyn Future<Output = D> + Send>>>;
370
371 pub(crate) trait TestValue: Clone + PartialEq + std::fmt::Debug + Send + Sync {
373 fn make(i: u64) -> Self;
374 }
375
376 impl TestValue for Vec<u8> {
377 fn make(i: u64) -> Self {
378 vec![(i % 255) as u8; ((i % 13) + 7) as usize]
379 }
380 }
381
382 impl TestValue for commonware_utils::sequence::U64 {
383 fn make(i: u64) -> Self {
384 Self::new(i * 10 + 1)
385 }
386 }
387
388 pub(crate) async fn test_keyless_db_empty<F: Family, V, C, H>(
389 context: deterministic::Context,
390 db: Keyless<F, deterministic::Context, V, C, H>,
391 reopen: Reopen<Keyless<F, deterministic::Context, V, C, H>>,
392 ) where
393 V: ValueEncoding<Value: TestValue>,
394 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
395 H: Hasher,
396 Operation<V>: EncodeShared,
397 {
398 let bounds = db.bounds().await;
399 assert_eq!(bounds.end, 1); assert_eq!(bounds.start, Location::new(0));
401 assert_eq!(db.get_metadata().await.unwrap(), None);
402 assert_eq!(db.last_commit_loc(), Location::new(0));
403
404 let root = db.root();
406 {
407 db.new_batch().append(V::Value::make(1));
408 }
410 drop(db);
411
412 let mut db = reopen(context.with_label("db2")).await;
413 assert_eq!(db.root(), root);
414 assert_eq!(db.bounds().await.end, 1);
415 assert_eq!(db.get_metadata().await.unwrap(), None);
416
417 let metadata = V::Value::make(99);
419 let merkleized = db.new_batch().merkleize(&db, Some(metadata.clone()));
420 db.apply_batch(merkleized).await.unwrap();
421 db.commit().await.unwrap();
422 assert_eq!(db.bounds().await.end, 2); assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
424 assert_eq!(
425 db.get(Location::new(1)).await.unwrap(),
426 Some(metadata.clone())
427 ); let root = db.root();
429
430 let db = reopen(context.with_label("db3")).await;
432 assert_eq!(db.bounds().await.end, 2); assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
434 assert_eq!(db.root(), root);
435 assert_eq!(db.last_commit_loc(), Location::new(1));
436
437 db.destroy().await.unwrap();
438 }
439
440 pub(crate) async fn test_keyless_db_build_basic<F: Family, V, C, H>(
441 context: deterministic::Context,
442 mut db: Keyless<F, deterministic::Context, V, C, H>,
443 reopen: Reopen<Keyless<F, deterministic::Context, V, C, H>>,
444 ) where
445 V: ValueEncoding<Value: TestValue>,
446 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
447 H: Hasher,
448 Operation<V>: EncodeShared,
449 {
450 let v1 = V::Value::make(1);
452 let v2 = V::Value::make(2);
453
454 {
455 let batch = db.new_batch();
456 let loc1 = batch.size();
457 let batch = batch.append(v1.clone());
458 let loc2 = batch.size();
459 let batch = batch.append(v2.clone());
460 assert_eq!(loc1, Location::new(1));
461 assert_eq!(loc2, Location::new(2));
462 db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
463 }
464
465 assert_eq!(db.bounds().await.end, 4); assert_eq!(db.get_metadata().await.unwrap(), None);
468 assert_eq!(db.get(Location::new(3)).await.unwrap(), None); let root = db.root();
470 db.sync().await.unwrap();
471 drop(db);
472
473 let db = reopen(context.with_label("db2")).await;
474 assert_eq!(db.bounds().await.end, 4);
475 assert_eq!(db.root(), root);
476 assert_eq!(db.get(Location::new(1)).await.unwrap().unwrap(), v1);
477 assert_eq!(db.get(Location::new(2)).await.unwrap().unwrap(), v2);
478
479 drop(db);
481 let db = reopen(context.with_label("db3")).await;
482 assert_eq!(db.bounds().await.end, 4);
483 assert_eq!(db.root(), root);
484
485 db.destroy().await.unwrap();
486 }
487
488 pub(crate) async fn test_keyless_db_recovery<F: Family, V, C, H>(
489 context: deterministic::Context,
490 db: Keyless<F, deterministic::Context, V, C, H>,
491 reopen: Reopen<Keyless<F, deterministic::Context, V, C, H>>,
492 ) where
493 V: ValueEncoding<Value: TestValue>,
494 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
495 H: Hasher,
496 Operation<V>: EncodeShared,
497 {
498 let root = db.root();
499 const ELEMENTS: u64 = 100;
500
501 {
503 let mut batch = db.new_batch();
504 for i in 0..ELEMENTS {
505 batch = batch.append(V::Value::make(i));
506 }
507 }
509 drop(db);
510 let mut db = reopen(context.with_label("db2")).await;
512 assert_eq!(root, db.root());
513
514 {
516 let mut batch = db.new_batch();
517 for i in 0..ELEMENTS {
518 batch = batch.append(V::Value::make(i + 100));
519 }
520 db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
521 }
522 db.commit().await.unwrap();
523 let root = db.root();
524
525 {
527 let mut batch = db.new_batch();
528 for i in 0..ELEMENTS {
529 batch = batch.append(V::Value::make(i + 200));
530 }
531 }
533 drop(db);
534 let mut db = reopen(context.with_label("db3")).await;
536 assert_eq!(root, db.root());
537
538 {
540 let mut batch = db.new_batch();
541 for i in 0..ELEMENTS {
542 batch = batch.append(V::Value::make(i + 300));
543 }
544 db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
545 }
546 db.commit().await.unwrap();
547 let root = db.root();
548
549 drop(db);
551 let db = reopen(context.with_label("db4")).await;
552 assert_eq!(db.bounds().await.end, 2 * ELEMENTS + 3);
553 assert_eq!(db.root(), root);
554
555 db.destroy().await.unwrap();
556 }
557
558 pub(crate) async fn test_keyless_db_proof<F: Family, V, C>(
559 mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
560 ) where
561 V: ValueEncoding<Value: TestValue>,
562 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
563 Operation<V>: EncodeShared + std::fmt::Debug,
564 {
565 let hasher = Standard::<Sha256>::new();
566 const ELEMENTS: u64 = 50;
567
568 {
569 let mut batch = db.new_batch();
570 for i in 0..ELEMENTS {
571 batch = batch.append(V::Value::make(i));
572 }
573 db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
574 }
575 let root = db.root();
576
577 let (proof, ops) = db.proof(Location::new(0), NZU64!(100)).await.unwrap();
578 assert!(verify_proof(&hasher, &proof, Location::new(0), &ops, &root));
579 assert_eq!(ops.len() as u64, 1 + ELEMENTS + 1);
580
581 let (proof, ops) = db.proof(Location::new(10), NZU64!(5)).await.unwrap();
582 assert!(verify_proof(
583 &hasher,
584 &proof,
585 Location::new(10),
586 &ops,
587 &root
588 ));
589 assert_eq!(ops.len(), 5);
590
591 db.destroy().await.unwrap();
592 }
593
594 pub(crate) async fn test_keyless_db_metadata<F: Family, V, C, H>(
595 mut db: Keyless<F, deterministic::Context, V, C, H>,
596 ) where
597 V: ValueEncoding<Value: TestValue>,
598 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
599 H: Hasher,
600 Operation<V>: EncodeShared,
601 {
602 let metadata = V::Value::make(99);
603 let merkleized = db
604 .new_batch()
605 .append(V::Value::make(1))
606 .merkleize(&db, Some(metadata.clone()));
607 db.apply_batch(merkleized).await.unwrap();
608 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
609
610 let merkleized = db.new_batch().merkleize(&db, None);
611 db.apply_batch(merkleized).await.unwrap();
612 assert_eq!(db.get_metadata().await.unwrap(), None);
613
614 db.destroy().await.unwrap();
615 }
616
617 pub(crate) async fn test_keyless_db_pruning<F: Family, V, C, H>(
618 mut db: Keyless<F, deterministic::Context, V, C, H>,
619 ) where
620 V: ValueEncoding<Value: TestValue>,
621 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
622 H: Hasher,
623 Operation<V>: EncodeShared,
624 {
625 let result = db.prune(Location::new(1)).await;
627 assert!(
628 matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
629 if prune_loc == Location::new(1) && commit_loc == Location::new(0))
630 );
631
632 let merkleized = db
634 .new_batch()
635 .append(V::Value::make(1))
636 .append(V::Value::make(2))
637 .merkleize(&db, None);
638 db.apply_batch(merkleized).await.unwrap();
639
640 let last_commit = db.last_commit_loc();
642 assert_eq!(last_commit, Location::new(3));
643
644 let merkleized = db
645 .new_batch()
646 .append(V::Value::make(3))
647 .merkleize(&db, None);
648 db.apply_batch(merkleized).await.unwrap();
649
650 let root = db.root();
652 assert!(db.prune(Location::new(3)).await.is_ok());
653 assert_eq!(db.root(), root);
654
655 let new_last_commit = db.last_commit_loc();
657 let beyond = Location::new(*new_last_commit + 1);
658 let result = db.prune(beyond).await;
659 assert!(
660 matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
661 if prune_loc == beyond && commit_loc == new_last_commit)
662 );
663
664 db.destroy().await.unwrap();
665 }
666
667 pub(crate) async fn test_keyless_db_empty_db_recovery<F: Family, V, C, H>(
668 context: deterministic::Context,
669 db: Keyless<F, deterministic::Context, V, C, H>,
670 reopen: Reopen<Keyless<F, deterministic::Context, V, C, H>>,
671 ) where
672 V: ValueEncoding<Value: TestValue>,
673 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
674 H: Hasher,
675 Operation<V>: EncodeShared,
676 {
677 let root = db.root();
678 const ELEMENTS: u64 = 200;
679
680 let db = reopen(context.with_label("db2")).await;
682 assert_eq!(db.bounds().await.end, 1); assert_eq!(db.root(), root);
684
685 {
687 let mut batch = db.new_batch();
688 for i in 0..ELEMENTS {
689 batch = batch.append(V::Value::make(i));
690 }
691 }
693 drop(db);
694 let db = reopen(context.with_label("db3")).await;
695 assert_eq!(db.bounds().await.end, 1); assert_eq!(db.root(), root);
697
698 {
700 let mut batch = db.new_batch();
701 for i in 0..ELEMENTS {
702 batch = batch.append(V::Value::make(i + 500));
703 }
704 }
706 drop(db);
707 let db = reopen(context.with_label("db4")).await;
708 assert_eq!(db.bounds().await.end, 1); assert_eq!(db.root(), root);
710
711 {
713 let mut batch = db.new_batch();
714 for i in 0..ELEMENTS * 3 {
715 batch = batch.append(V::Value::make(i + 1000));
716 }
717 }
719 drop(db);
720 let mut db = reopen(context.with_label("db5")).await;
721 assert_eq!(db.bounds().await.end, 1); assert_eq!(db.root(), root);
723 assert_eq!(db.last_commit_loc(), Location::new(0));
724
725 {
727 let mut batch = db.new_batch();
728 for i in 0..ELEMENTS {
729 batch = batch.append(V::Value::make(i + 2000));
730 }
731 db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
732 }
733 db.commit().await.unwrap();
734 let db = reopen(context.with_label("db6")).await;
735 assert!(db.bounds().await.end > 1);
736 assert_ne!(db.root(), root);
737
738 db.destroy().await.unwrap();
739 }
740
741 pub(crate) async fn test_keyless_db_replay_with_trailing_appends<F: Family, V, C, H>(
742 context: deterministic::Context,
743 mut db: Keyless<F, deterministic::Context, V, C, H>,
744 reopen: Reopen<Keyless<F, deterministic::Context, V, C, H>>,
745 ) where
746 V: ValueEncoding<Value: TestValue>,
747 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
748 H: Hasher,
749 Operation<V>: EncodeShared,
750 {
751 {
753 let mut batch = db.new_batch();
754 for i in 0..10u64 {
755 batch = batch.append(V::Value::make(i));
756 }
757 db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
758 }
759 db.commit().await.unwrap();
760 let committed_root = db.root();
761 let committed_size = db.bounds().await.end;
762
763 {
765 db.new_batch().append(V::Value::make(99));
766 }
768 drop(db);
769
770 let mut db = reopen(context.with_label("db2")).await;
772 assert_eq!(
773 db.bounds().await.end,
774 committed_size,
775 "Should rewind to last commit"
776 );
777 assert_eq!(db.root(), committed_root, "Root should match last commit");
778 assert_eq!(
779 db.last_commit_loc(),
780 committed_size - 1,
781 "Last commit location should be correct"
782 );
783
784 let new_value = V::Value::make(77);
786 {
787 let batch = db.new_batch();
788 let loc = batch.size();
789 let batch = batch.append(new_value.clone());
790 assert_eq!(
791 loc, committed_size,
792 "New append should get the expected location"
793 );
794 db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
795 }
796 db.commit().await.unwrap();
797
798 assert_eq!(db.get(committed_size).await.unwrap(), Some(new_value));
799
800 let new_committed_root = db.root();
801 let new_committed_size = db.bounds().await.end;
802
803 {
805 let mut batch = db.new_batch();
806 for i in 0..5u64 {
807 batch = batch.append(V::Value::make(200 + i));
808 }
809 }
811 drop(db);
812
813 let db = reopen(context.with_label("db3")).await;
815 assert_eq!(
816 db.bounds().await.end,
817 new_committed_size,
818 "Should rewind to last commit with multiple trailing appends"
819 );
820 assert_eq!(
821 db.root(),
822 new_committed_root,
823 "Root should match last commit after multiple appends"
824 );
825 assert_eq!(
826 db.last_commit_loc(),
827 new_committed_size - 1,
828 "Last commit location should be correct after multiple appends"
829 );
830
831 db.destroy().await.unwrap();
832 }
833
834 pub(crate) async fn test_keyless_batch_chained<F: Family, V, C>(
835 mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
836 ) where
837 V: ValueEncoding<Value: TestValue>,
838 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
839 Operation<V>: EncodeShared,
840 {
841 let v1 = V::Value::make(10);
842 let v2 = V::Value::make(20);
843 let v3 = V::Value::make(30);
844
845 let parent = db.new_batch();
846 let loc1 = parent.size();
847 let parent = parent.append(v1.clone());
848 let parent_m = parent.merkleize(&db, None);
849
850 let child = parent_m.new_batch::<Sha256>();
851 let loc2 = child.size();
852 let child = child.append(v2.clone());
853 let loc3 = child.size();
854 let child = child.append(v3.clone());
855 let child_m = child.merkleize(&db, None);
856 let child_root = child_m.root();
857
858 db.apply_batch(child_m).await.unwrap();
859 db.commit().await.unwrap();
860
861 assert_eq!(db.root(), child_root);
862 assert_eq!(db.get(loc1).await.unwrap(), Some(v1));
863 assert_eq!(db.get(loc2).await.unwrap(), Some(v2));
864 assert_eq!(db.get(loc3).await.unwrap(), Some(v3));
865
866 db.destroy().await.unwrap();
867 }
868
869 pub(crate) async fn test_keyless_stale_batch<F: Family, V, C, H>(
870 mut db: Keyless<F, deterministic::Context, V, C, H>,
871 ) where
872 V: ValueEncoding<Value: TestValue>,
873 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
874 H: Hasher,
875 Operation<V>: EncodeShared,
876 {
877 let batch_a = db
878 .new_batch()
879 .append(V::Value::make(10))
880 .merkleize(&db, None);
881 let batch_b = db
882 .new_batch()
883 .append(V::Value::make(20))
884 .merkleize(&db, None);
885
886 db.apply_batch(batch_a).await.unwrap();
887
888 let result = db.apply_batch(batch_b).await;
889 assert!(matches!(result, Err(Error::StaleBatch { .. })));
890
891 db.destroy().await.unwrap();
892 }
893
894 pub(crate) async fn test_keyless_partial_ancestor_commit<F: Family, V, C, H>(
895 mut db: Keyless<F, deterministic::Context, V, C, H>,
896 ) where
897 V: ValueEncoding<Value: TestValue>,
898 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
899 H: Hasher,
900 Operation<V>: EncodeShared,
901 {
902 let a = db
904 .new_batch()
905 .append(V::Value::make(10))
906 .merkleize(&db, None);
907 let b = a
908 .new_batch::<H>()
909 .append(V::Value::make(20))
910 .merkleize(&db, None);
911 let c = b
912 .new_batch::<H>()
913 .append(V::Value::make(30))
914 .merkleize(&db, None);
915
916 let expected_root = c.root();
917
918 db.apply_batch(a).await.unwrap();
920 db.apply_batch(c).await.unwrap();
921
922 assert_eq!(db.root(), expected_root);
924
925 db.destroy().await.unwrap();
926 }
927
928 pub(crate) async fn test_keyless_to_batch<F: Family, V, C>(
929 mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
930 ) where
931 V: ValueEncoding<Value: TestValue>,
932 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
933 Operation<V>: EncodeShared,
934 {
935 let batch = db.new_batch();
936 let loc1 = batch.size();
937 let batch = batch.append(V::Value::make(10));
938 db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
939
940 let snapshot = db.to_batch();
941 assert_eq!(snapshot.root(), db.root());
942
943 let child_batch = snapshot.new_batch::<Sha256>();
944 let loc2 = child_batch.size();
945 let child_batch = child_batch.append(V::Value::make(20));
946 db.apply_batch(child_batch.merkleize(&db, None))
947 .await
948 .unwrap();
949
950 assert_eq!(db.get(loc1).await.unwrap(), Some(V::Value::make(10)));
951 assert_eq!(db.get(loc2).await.unwrap(), Some(V::Value::make(20)));
952
953 db.destroy().await.unwrap();
954 }
955
956 pub(crate) async fn test_keyless_db_non_empty_recovery<F: Family, V, C, H>(
957 context: deterministic::Context,
958 mut db: Keyless<F, deterministic::Context, V, C, H>,
959 reopen: Reopen<Keyless<F, deterministic::Context, V, C, H>>,
960 ) where
961 V: ValueEncoding<Value: TestValue>,
962 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
963 H: Hasher,
964 Operation<V>: EncodeShared,
965 {
966 const ELEMENTS: u64 = 200;
968 {
969 let mut batch = db.new_batch();
970 for i in 0..ELEMENTS {
971 batch = batch.append(V::Value::make(i));
972 }
973 db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
974 }
975 db.commit().await.unwrap();
976 let root = db.root();
977 let op_count = db.bounds().await.end;
978
979 let db = reopen(context.with_label("db2")).await;
981 assert_eq!(db.bounds().await.end, op_count);
982 assert_eq!(db.root(), root);
983 assert_eq!(db.last_commit_loc(), op_count - 1);
984 drop(db);
985
986 let db = reopen(context.with_label("recovery_a")).await;
988 {
989 let mut batch = db.new_batch();
990 for i in 0..ELEMENTS {
991 batch = batch.append(V::Value::make(i + 1000));
992 }
993 }
995 drop(db);
996 let db = reopen(context.with_label("recovery_b")).await;
997 assert_eq!(db.bounds().await.end, op_count);
998 assert_eq!(db.root(), root);
999 drop(db);
1000
1001 let mut db = reopen(context.with_label("db3")).await;
1003 db.prune(db.last_commit_loc()).await.unwrap();
1004 assert_eq!(db.bounds().await.end, op_count);
1005 assert_eq!(db.root(), root);
1006 db.sync().await.unwrap();
1007 drop(db);
1008
1009 let db = reopen(context.with_label("recovery_c")).await;
1010 {
1011 let mut batch = db.new_batch();
1012 for i in 0..ELEMENTS {
1013 batch = batch.append(V::Value::make(i + 2000));
1014 }
1015 }
1016 drop(db);
1017 let db = reopen(context.with_label("recovery_d")).await;
1018 assert_eq!(db.bounds().await.end, op_count);
1019 assert_eq!(db.root(), root);
1020 drop(db);
1021
1022 let mut db = reopen(context.with_label("db4")).await;
1024 {
1025 let mut batch = db.new_batch();
1026 for i in 0..ELEMENTS {
1027 batch = batch.append(V::Value::make(i + 3000));
1028 }
1029 db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
1030 }
1031 db.commit().await.unwrap();
1032 let db = reopen(context.with_label("db5")).await;
1033 let bounds = db.bounds().await;
1034 assert!(bounds.end > op_count);
1035 assert_ne!(db.root(), root);
1036 assert_eq!(db.last_commit_loc(), bounds.end - 1);
1037
1038 db.destroy().await.unwrap();
1039 }
1040
1041 pub(crate) async fn test_keyless_db_proof_comprehensive<F: Family, V, C>(
1042 mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
1043 ) where
1044 V: ValueEncoding<Value: TestValue>,
1045 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1046 Operation<V>: EncodeShared + std::fmt::Debug,
1047 {
1048 let hasher = Standard::<Sha256>::new();
1049
1050 const ELEMENTS: u64 = 100;
1052 {
1053 let mut batch = db.new_batch();
1054 for i in 0u64..ELEMENTS {
1055 batch = batch.append(V::Value::make(i));
1056 }
1057 db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
1058 }
1059
1060 assert!(matches!(
1062 db.historical_proof(db.bounds().await.end + 1, Location::new(5), NZU64!(10))
1063 .await,
1064 Err(Error::<F>::Merkle(crate::merkle::Error::RangeOutOfBounds(
1065 _
1066 )))
1067 ));
1068
1069 let root = db.root();
1070
1071 for (start_loc, max_ops) in [
1072 (0, 10),
1073 (10, 5),
1074 (50, 20),
1075 (90, 15),
1076 (0, 1),
1077 (ELEMENTS - 1, 1),
1078 (ELEMENTS, 1),
1079 ] {
1080 let (proof, ops) = db
1081 .proof(Location::new(start_loc), NZU64!(max_ops))
1082 .await
1083 .unwrap();
1084 assert!(
1085 verify_proof(&hasher, &proof, Location::new(start_loc), &ops, &root),
1086 "Failed to verify proof for range starting at {start_loc} with max {max_ops} ops",
1087 );
1088 let expected_ops = std::cmp::min(max_ops, *db.bounds().await.end - start_loc);
1089 assert_eq!(ops.len() as u64, expected_ops);
1090
1091 let wrong_root = Sha256::hash(&[0xFF; 32]);
1092 assert!(!verify_proof(
1093 &hasher,
1094 &proof,
1095 Location::new(start_loc),
1096 &ops,
1097 &wrong_root
1098 ));
1099 if start_loc > 0 {
1100 assert!(!verify_proof(
1101 &hasher,
1102 &proof,
1103 Location::new(start_loc - 1),
1104 &ops,
1105 &root
1106 ));
1107 }
1108 }
1109
1110 db.destroy().await.unwrap();
1111 }
1112
1113 pub(crate) async fn test_keyless_db_proof_with_pruning<F: Family, V, C>(
1114 context: deterministic::Context,
1115 mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
1116 reopen: Reopen<Keyless<F, deterministic::Context, V, C, Sha256>>,
1117 ) where
1118 V: ValueEncoding<Value: TestValue>,
1119 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1120 Operation<V>: EncodeShared + std::fmt::Debug,
1121 {
1122 let hasher = Standard::<Sha256>::new();
1123
1124 const ELEMENTS: u64 = 100;
1125 {
1126 let mut batch = db.new_batch();
1127 for i in 0u64..ELEMENTS {
1128 batch = batch.append(V::Value::make(i));
1129 }
1130 db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
1131 }
1132
1133 {
1134 let mut batch = db.new_batch();
1135 for i in ELEMENTS..ELEMENTS * 2 {
1136 batch = batch.append(V::Value::make(i));
1137 }
1138 db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
1139 }
1140 let root = db.root();
1141
1142 const PRUNE_LOC: u64 = 30;
1143 db.prune(Location::new(PRUNE_LOC)).await.unwrap();
1144 let oldest_retained = db.bounds().await.start;
1145 assert_eq!(db.root(), root);
1146
1147 db.sync().await.unwrap();
1148 drop(db);
1149 let mut db = reopen(context).await;
1150 assert_eq!(db.root(), root);
1151
1152 for (start_loc, max_ops) in [
1153 (oldest_retained, 10),
1154 (Location::new(50), 20),
1155 (Location::new(150), 10),
1156 (Location::new(190), 15),
1157 ] {
1158 if start_loc < oldest_retained {
1159 continue;
1160 }
1161 let (proof, ops) = db.proof(start_loc, NZU64!(max_ops)).await.unwrap();
1162 assert!(verify_proof(&hasher, &proof, start_loc, &ops, &root));
1163 }
1164
1165 let aggressive_prune: Location<F> = Location::new(150);
1166 db.prune(aggressive_prune).await.unwrap();
1167
1168 let new_oldest = db.bounds().await.start;
1169 let (proof, ops) = db.proof(new_oldest, NZU64!(20)).await.unwrap();
1170 assert!(verify_proof(&hasher, &proof, new_oldest, &ops, &root));
1171
1172 let almost_all = db.bounds().await.end - 5;
1173 db.prune(almost_all).await.unwrap();
1174 let final_oldest = db.bounds().await.start;
1175 if final_oldest < db.bounds().await.end {
1176 let (final_proof, final_ops) = db.proof(final_oldest, NZU64!(10)).await.unwrap();
1177 assert!(verify_proof(
1178 &hasher,
1179 &final_proof,
1180 final_oldest,
1181 &final_ops,
1182 &root
1183 ));
1184 }
1185
1186 db.destroy().await.unwrap();
1187 }
1188
1189 pub(crate) async fn test_keyless_db_get_out_of_bounds<F: Family, V, C, H>(
1190 mut db: Keyless<F, deterministic::Context, V, C, H>,
1191 ) where
1192 V: ValueEncoding<Value: TestValue>,
1193 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1194 H: Hasher,
1195 Operation<V>: EncodeShared,
1196 {
1197 assert!(db.get(Location::new(0)).await.unwrap().is_none());
1198
1199 let merkleized = db
1200 .new_batch()
1201 .append(V::Value::make(1))
1202 .append(V::Value::make(2))
1203 .merkleize(&db, None);
1204 db.apply_batch(merkleized).await.unwrap();
1205
1206 assert_eq!(
1207 db.get(Location::new(1)).await.unwrap(),
1208 Some(V::Value::make(1))
1209 );
1210 assert!(db.get(Location::new(3)).await.unwrap().is_none());
1211 assert!(matches!(
1212 db.get(Location::new(4)).await,
1213 Err(Error::LocationOutOfBounds(loc, size))
1214 if loc == Location::new(4) && size == Location::new(4)
1215 ));
1216
1217 db.destroy().await.unwrap();
1218 }
1219
1220 pub(crate) async fn test_keyless_batch_get<F: Family, V, C, H>(
1221 mut db: Keyless<F, deterministic::Context, V, C, H>,
1222 ) where
1223 V: ValueEncoding<Value: TestValue>,
1224 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1225 H: Hasher,
1226 Operation<V>: EncodeShared,
1227 {
1228 let base_vals: Vec<V::Value> = (0..3).map(|i| V::Value::make(10 + i)).collect();
1229 let mut base_locs = Vec::new();
1230 {
1231 let mut batch = db.new_batch();
1232 for v in &base_vals {
1233 let loc = batch.size();
1234 batch = batch.append(v.clone());
1235 base_locs.push(loc);
1236 }
1237 db.apply_batch(batch.merkleize(&db, None)).await.unwrap();
1238 }
1239
1240 let batch = db.new_batch();
1241 for (i, loc) in base_locs.iter().enumerate() {
1242 assert_eq!(
1243 batch.get(*loc, &db).await.unwrap(),
1244 Some(base_vals[i].clone()),
1245 );
1246 }
1247
1248 let new_val = V::Value::make(99);
1249 let new_loc = batch.size();
1250 let batch = batch.append(new_val.clone());
1251 assert_eq!(batch.get(new_loc, &db).await.unwrap(), Some(new_val));
1252 assert_eq!(
1253 batch.get(Location::new(*new_loc + 1), &db).await.unwrap(),
1254 None
1255 );
1256
1257 db.destroy().await.unwrap();
1258 }
1259
1260 pub(crate) async fn test_keyless_batch_stacked_get<F: Family, V, C>(
1261 db: Keyless<F, deterministic::Context, V, C, Sha256>,
1262 ) where
1263 V: ValueEncoding<Value: TestValue>,
1264 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1265 Operation<V>: EncodeShared,
1266 {
1267 let v1 = V::Value::make(1);
1268 let v2 = V::Value::make(2);
1269
1270 let parent = db.new_batch();
1271 let loc1 = parent.size();
1272 let parent = parent.append(v1.clone());
1273 let parent_m = parent.merkleize(&db, None);
1274
1275 let child = parent_m.new_batch::<Sha256>();
1276 assert_eq!(child.get(loc1, &db).await.unwrap(), Some(v1));
1277
1278 let loc2 = child.size();
1279 let child = child.append(v2.clone());
1280 assert_eq!(child.get(loc2, &db).await.unwrap(), Some(v2));
1281 assert_eq!(child.get(Location::new(9999), &db).await.unwrap(), None);
1282
1283 db.destroy().await.unwrap();
1284 }
1285
1286 pub(crate) async fn test_keyless_batch_speculative_root<F: Family, V, C, H>(
1287 mut db: Keyless<F, deterministic::Context, V, C, H>,
1288 ) where
1289 V: ValueEncoding<Value: TestValue>,
1290 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1291 H: Hasher,
1292 Operation<V>: EncodeShared,
1293 {
1294 let mut batch = db.new_batch();
1295 for i in 0u64..10 {
1296 batch = batch.append(V::Value::make(i));
1297 }
1298 let merkleized = batch.merkleize(&db, None);
1299 let speculative = merkleized.root();
1300 db.apply_batch(merkleized).await.unwrap();
1301 assert_eq!(db.root(), speculative);
1302
1303 let merkleized = db
1304 .new_batch()
1305 .append(V::Value::make(100))
1306 .merkleize(&db, Some(V::Value::make(55)));
1307 let speculative = merkleized.root();
1308 db.apply_batch(merkleized).await.unwrap();
1309 assert_eq!(db.root(), speculative);
1310
1311 db.destroy().await.unwrap();
1312 }
1313
1314 pub(crate) async fn test_keyless_merkleized_batch_get<F: Family, V, C>(
1315 mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
1316 ) where
1317 V: ValueEncoding<Value: TestValue>,
1318 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1319 Operation<V>: EncodeShared,
1320 {
1321 let base_val = V::Value::make(10);
1322 let merkleized = db.new_batch().append(base_val.clone()).merkleize(&db, None);
1323 db.apply_batch(merkleized).await.unwrap();
1324
1325 let new_val = V::Value::make(20);
1326 let merkleized = db.new_batch().append(new_val.clone()).merkleize(&db, None);
1327
1328 assert_eq!(
1329 merkleized.get(Location::new(1), &db).await.unwrap(),
1330 Some(base_val),
1331 );
1332 assert_eq!(
1333 merkleized.get(Location::new(3), &db).await.unwrap(),
1334 Some(new_val),
1335 );
1336 assert_eq!(merkleized.get(Location::new(4), &db).await.unwrap(), None);
1337
1338 db.destroy().await.unwrap();
1339 }
1340
1341 pub(crate) async fn test_keyless_batch_chained_apply_sequential<F: Family, V, C, H>(
1342 mut db: Keyless<F, deterministic::Context, V, C, H>,
1343 ) where
1344 V: ValueEncoding<Value: TestValue>,
1345 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1346 H: Hasher,
1347 Operation<V>: EncodeShared,
1348 {
1349 let v1 = V::Value::make(1);
1350 let v2 = V::Value::make(2);
1351
1352 let parent = db.new_batch();
1353 let loc1 = parent.size();
1354 let parent = parent.append(v1.clone());
1355 let parent_m = parent.merkleize(&db, None);
1356 let parent_root = parent_m.root();
1357
1358 db.apply_batch(parent_m).await.unwrap();
1359 assert_eq!(db.root(), parent_root);
1360 assert_eq!(db.get(loc1).await.unwrap(), Some(v1));
1361
1362 let batch2 = db.new_batch();
1363 let loc2 = batch2.size();
1364 let batch2 = batch2.append(v2.clone());
1365 let batch2_m = batch2.merkleize(&db, None);
1366 let batch2_root = batch2_m.root();
1367 db.apply_batch(batch2_m).await.unwrap();
1368 assert_eq!(db.root(), batch2_root);
1369 assert_eq!(db.get(loc2).await.unwrap(), Some(v2));
1370
1371 db.destroy().await.unwrap();
1372 }
1373
1374 pub(crate) async fn test_keyless_batch_many_sequential<F: Family, V, C>(
1375 mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
1376 ) where
1377 V: ValueEncoding<Value: TestValue>,
1378 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1379 Operation<V>: EncodeShared + std::fmt::Debug,
1380 {
1381 let hasher = Standard::<Sha256>::new();
1382
1383 const BATCHES: u64 = 20;
1384 const APPENDS_PER_BATCH: u64 = 5;
1385 let mut all_values: Vec<V::Value> = Vec::new();
1386 let mut all_locs: Vec<Location<F>> = Vec::new();
1387
1388 for batch_idx in 0..BATCHES {
1389 let mut batch = db.new_batch();
1390 for j in 0..APPENDS_PER_BATCH {
1391 let v = V::Value::make(batch_idx * 10 + j);
1392 let loc = batch.size();
1393 batch = batch.append(v.clone());
1394 all_values.push(v);
1395 all_locs.push(loc);
1396 }
1397 let merkleized = batch.merkleize(&db, None);
1398 db.apply_batch(merkleized).await.unwrap();
1399 }
1400
1401 for (i, loc) in all_locs.iter().enumerate() {
1402 assert_eq!(db.get(*loc).await.unwrap(), Some(all_values[i].clone()));
1403 }
1404
1405 let root = db.root();
1406 let (proof, ops) = db.proof(Location::new(0), NZU64!(1000)).await.unwrap();
1407 assert!(verify_proof(&hasher, &proof, Location::new(0), &ops, &root));
1408 assert_eq!(db.bounds().await.end, 1 + BATCHES * (APPENDS_PER_BATCH + 1));
1409
1410 db.destroy().await.unwrap();
1411 }
1412
1413 pub(crate) async fn test_keyless_batch_empty<F: Family, V, C, H>(
1414 mut db: Keyless<F, deterministic::Context, V, C, H>,
1415 ) where
1416 V: ValueEncoding<Value: TestValue>,
1417 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1418 H: Hasher,
1419 Operation<V>: EncodeShared,
1420 {
1421 let merkleized = db
1422 .new_batch()
1423 .append(V::Value::make(1))
1424 .merkleize(&db, None);
1425 db.apply_batch(merkleized).await.unwrap();
1426 let root_before = db.root();
1427 let size_before = db.bounds().await.end;
1428
1429 let merkleized = db.new_batch().merkleize(&db, None);
1430 let speculative = merkleized.root();
1431 db.apply_batch(merkleized).await.unwrap();
1432
1433 assert_ne!(db.root(), root_before);
1434 assert_eq!(db.root(), speculative);
1435 assert_eq!(db.bounds().await.end, size_before + 1);
1436
1437 db.destroy().await.unwrap();
1438 }
1439
1440 pub(crate) async fn test_keyless_batch_chained_merkleized_get<F: Family, V, C>(
1441 mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
1442 ) where
1443 V: ValueEncoding<Value: TestValue>,
1444 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1445 Operation<V>: EncodeShared,
1446 {
1447 let base_val = V::Value::make(10);
1448 db.apply_batch(db.new_batch().append(base_val.clone()).merkleize(&db, None))
1449 .await
1450 .unwrap();
1451
1452 let v1 = V::Value::make(1);
1453 let parent = db.new_batch();
1454 let loc1 = parent.size();
1455 let parent_m = parent.append(v1.clone()).merkleize(&db, None);
1456
1457 let v2 = V::Value::make(2);
1458 let child = parent_m.new_batch::<Sha256>();
1459 let loc2 = child.size();
1460 let child_m = child.append(v2.clone()).merkleize(&db, None);
1461
1462 assert_eq!(
1463 child_m.get(Location::new(1), &db).await.unwrap(),
1464 Some(base_val),
1465 );
1466 assert_eq!(child_m.get(loc1, &db).await.unwrap(), Some(v1));
1467 assert_eq!(child_m.get(loc2, &db).await.unwrap(), Some(v2));
1468
1469 db.destroy().await.unwrap();
1470 }
1471
1472 pub(crate) async fn test_keyless_batch_large<F: Family, V, C>(
1473 mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
1474 ) where
1475 V: ValueEncoding<Value: TestValue>,
1476 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1477 Operation<V>: EncodeShared + std::fmt::Debug,
1478 {
1479 let hasher = Standard::<Sha256>::new();
1480 const N: u64 = 500;
1481 let mut values = Vec::new();
1482 let mut locs = Vec::new();
1483
1484 let mut batch = db.new_batch();
1485 for i in 0..N {
1486 let v = V::Value::make(i);
1487 locs.push(batch.size());
1488 batch = batch.append(v.clone());
1489 values.push(v);
1490 }
1491 let merkleized = batch.merkleize(&db, None);
1492 db.apply_batch(merkleized).await.unwrap();
1493
1494 for (i, loc) in locs.iter().enumerate() {
1495 assert_eq!(db.get(*loc).await.unwrap(), Some(values[i].clone()));
1496 }
1497
1498 let root = db.root();
1499 let (proof, ops) = db.proof(Location::new(0), NZU64!(1000)).await.unwrap();
1500 assert!(verify_proof(&hasher, &proof, Location::new(0), &ops, &root));
1501 assert_eq!(db.bounds().await.end, 1 + N + 1);
1502
1503 db.destroy().await.unwrap();
1504 }
1505
1506 pub(crate) async fn test_keyless_stale_batch_chained<F: Family, V, C>(
1507 mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
1508 ) where
1509 V: ValueEncoding<Value: TestValue>,
1510 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1511 Operation<V>: EncodeShared,
1512 {
1513 let parent = db
1514 .new_batch()
1515 .append(V::Value::make(1))
1516 .merkleize(&db, None);
1517 let child_a = parent
1518 .new_batch::<Sha256>()
1519 .append(V::Value::make(2))
1520 .merkleize(&db, None);
1521 let child_b = parent
1522 .new_batch::<Sha256>()
1523 .append(V::Value::make(3))
1524 .merkleize(&db, None);
1525
1526 db.apply_batch(child_a).await.unwrap();
1527 assert!(matches!(
1528 db.apply_batch(child_b).await,
1529 Err(Error::StaleBatch { .. })
1530 ));
1531
1532 db.destroy().await.unwrap();
1533 }
1534
1535 pub(crate) async fn test_keyless_sequential_commit_parent_then_child<F: Family, V, C>(
1536 mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
1537 ) where
1538 V: ValueEncoding<Value: TestValue>,
1539 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1540 Operation<V>: EncodeShared,
1541 {
1542 let parent = db
1543 .new_batch()
1544 .append(V::Value::make(1))
1545 .merkleize(&db, None);
1546 let child = parent
1547 .new_batch::<Sha256>()
1548 .append(V::Value::make(2))
1549 .merkleize(&db, None);
1550
1551 db.apply_batch(parent).await.unwrap();
1552 db.apply_batch(child).await.unwrap();
1553
1554 db.destroy().await.unwrap();
1555 }
1556
1557 pub(crate) async fn test_keyless_stale_batch_child_before_parent<F: Family, V, C>(
1558 mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
1559 ) where
1560 V: ValueEncoding<Value: TestValue>,
1561 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1562 Operation<V>: EncodeShared,
1563 {
1564 let parent = db
1565 .new_batch()
1566 .append(V::Value::make(1))
1567 .merkleize(&db, None);
1568 let child = parent
1569 .new_batch::<Sha256>()
1570 .append(V::Value::make(2))
1571 .merkleize(&db, None);
1572
1573 db.apply_batch(child).await.unwrap();
1574 assert!(matches!(
1575 db.apply_batch(parent).await,
1576 Err(Error::StaleBatch { .. })
1577 ));
1578
1579 db.destroy().await.unwrap();
1580 }
1581
1582 pub(crate) async fn test_keyless_child_root_matches_pending_and_committed<F: Family, V, C>(
1583 mut db: Keyless<F, deterministic::Context, V, C, Sha256>,
1584 ) where
1585 V: ValueEncoding<Value: TestValue>,
1586 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1587 Operation<V>: EncodeShared,
1588 {
1589 let parent = db
1591 .new_batch()
1592 .append(V::Value::make(1))
1593 .merkleize(&db, None);
1594 let pending_child = parent
1595 .new_batch::<Sha256>()
1596 .append(V::Value::make(2))
1597 .merkleize(&db, None);
1598
1599 db.apply_batch(parent).await.unwrap();
1602 db.commit().await.unwrap();
1603
1604 let committed_child = db
1605 .new_batch()
1606 .append(V::Value::make(2))
1607 .merkleize(&db, None);
1608
1609 assert_eq!(pending_child.root(), committed_child.root());
1610
1611 db.destroy().await.unwrap();
1612 }
1613
1614 async fn commit_appends<F: Family, V, C, H>(
1615 db: &mut Keyless<F, deterministic::Context, V, C, H>,
1616 values: impl IntoIterator<Item = V::Value>,
1617 metadata: Option<V::Value>,
1618 ) -> core::ops::Range<Location<F>>
1619 where
1620 V: ValueEncoding<Value: TestValue>,
1621 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1622 H: Hasher,
1623 Operation<V>: EncodeShared,
1624 {
1625 let mut batch = db.new_batch();
1626 for value in values {
1627 batch = batch.append(value);
1628 }
1629 let range = db.apply_batch(batch.merkleize(db, metadata)).await.unwrap();
1630 db.commit().await.unwrap();
1631 range
1632 }
1633
1634 pub(crate) async fn test_keyless_db_rewind_recovery<F: Family, V, C, H>(
1635 context: deterministic::Context,
1636 mut db: Keyless<F, deterministic::Context, V, C, H>,
1637 reopen: Reopen<Keyless<F, deterministic::Context, V, C, H>>,
1638 ) where
1639 V: ValueEncoding<Value: TestValue>,
1640 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1641 H: Hasher,
1642 Operation<V>: EncodeShared,
1643 {
1644 let initial_root = db.root();
1645 let initial_size = db.bounds().await.end;
1646
1647 let value_a = V::Value::make(1);
1648 let value_b = V::Value::make(2);
1649 let metadata_a = V::Value::make(3);
1650 let first_range = commit_appends(
1651 &mut db,
1652 [value_a.clone(), value_b.clone()],
1653 Some(metadata_a.clone()),
1654 )
1655 .await;
1656
1657 let root_before = db.root();
1658 let size_before = db.bounds().await.end;
1659 let commit_before = db.last_commit_loc();
1660 assert_eq!(size_before, first_range.end);
1661
1662 let value_c = V::Value::make(4);
1663 let metadata_b = V::Value::make(5);
1664 let second_range =
1665 commit_appends(&mut db, [value_c.clone()], Some(metadata_b.clone())).await;
1666 assert_eq!(second_range.start, size_before);
1667 assert_ne!(db.root(), root_before);
1668 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_b));
1669
1670 db.rewind(size_before).await.unwrap();
1671 assert_eq!(db.root(), root_before);
1672 assert_eq!(db.bounds().await.end, size_before);
1673 assert_eq!(db.last_commit_loc(), commit_before);
1674 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a.clone()));
1675 assert_eq!(
1676 db.get(Location::new(1)).await.unwrap(),
1677 Some(value_a.clone())
1678 );
1679 assert_eq!(
1680 db.get(Location::new(2)).await.unwrap(),
1681 Some(value_b.clone())
1682 );
1683 assert!(
1684 matches!(
1685 db.get(Location::new(4)).await,
1686 Err(Error::LocationOutOfBounds(_, size)) if size == size_before
1687 ),
1688 "rewound append should be out of bounds",
1689 );
1690
1691 db.commit().await.unwrap();
1692 drop(db);
1693 let mut db = reopen(context.with_label("reopen")).await;
1694 assert_eq!(db.root(), root_before);
1695 assert_eq!(db.bounds().await.end, size_before);
1696 assert_eq!(db.last_commit_loc(), commit_before);
1697 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a));
1698 assert_eq!(
1699 db.get(Location::new(1)).await.unwrap(),
1700 Some(value_a.clone())
1701 );
1702 assert_eq!(
1703 db.get(Location::new(2)).await.unwrap(),
1704 Some(value_b.clone())
1705 );
1706 assert!(matches!(
1707 db.get(Location::new(4)).await,
1708 Err(Error::LocationOutOfBounds(_, size)) if size == size_before
1709 ));
1710
1711 db.rewind(initial_size).await.unwrap();
1712 assert_eq!(db.root(), initial_root);
1713 assert_eq!(db.bounds().await.end, initial_size);
1714 assert_eq!(db.get_metadata().await.unwrap(), None);
1715 assert!(matches!(
1716 db.get(Location::new(1)).await,
1717 Err(Error::LocationOutOfBounds(_, size)) if size == initial_size
1718 ));
1719
1720 db.commit().await.unwrap();
1721 drop(db);
1722 let db = reopen(context.with_label("reopen_initial_boundary")).await;
1723 assert_eq!(db.root(), initial_root);
1724 assert_eq!(db.bounds().await.end, initial_size);
1725 assert_eq!(db.get_metadata().await.unwrap(), None);
1726 assert!(matches!(
1727 db.get(Location::new(1)).await,
1728 Err(Error::LocationOutOfBounds(_, size)) if size == initial_size
1729 ));
1730
1731 db.destroy().await.unwrap();
1732 }
1733
1734 pub(crate) async fn test_keyless_db_rewind_pruned_target_errors<F: Family, V, C, H>(
1735 mut db: Keyless<F, deterministic::Context, V, C, H>,
1736 ) where
1737 V: ValueEncoding<Value: TestValue>,
1738 C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
1739 H: Hasher,
1740 Operation<V>: EncodeShared,
1741 {
1742 let first_range = commit_appends(&mut db, (0..16).map(V::Value::make), None).await;
1743
1744 let mut round = 0u64;
1745 loop {
1746 round += 1;
1747 assert!(
1748 round <= 64,
1749 "failed to prune enough history for rewind test"
1750 );
1751
1752 commit_appends(
1753 &mut db,
1754 (0..16).map(|i| V::Value::make(round * 100 + i)),
1755 None,
1756 )
1757 .await;
1758 db.prune(db.last_commit_loc()).await.unwrap();
1759
1760 if db.bounds().await.start > first_range.start {
1761 break;
1762 }
1763 }
1764
1765 let oldest_retained = db.bounds().await.start;
1766 let boundary_err = db.rewind(oldest_retained).await.unwrap_err();
1767 assert!(
1768 matches!(
1769 boundary_err,
1770 Error::Journal(crate::journal::Error::ItemPruned(_))
1771 ),
1772 "unexpected rewind error at retained boundary: {boundary_err:?}"
1773 );
1774
1775 let err = db.rewind(first_range.start).await.unwrap_err();
1776 assert!(
1777 matches!(err, Error::Journal(crate::journal::Error::ItemPruned(_))),
1778 "unexpected rewind error: {err:?}"
1779 );
1780
1781 db.destroy().await.unwrap();
1782 }
1783}