1use crate::{
78 index::{unordered::Index, Unordered as _},
79 journal::contiguous::{
80 variable::{Config as JournalConfig, Journal},
81 Mutable as _, Reader,
82 },
83 merkle::mmr::Location,
84 qmdb::{
85 any::{
86 unordered::{variable::Operation, Update},
87 VariableValue,
88 },
89 build_snapshot_from_log, delete_key,
90 operation::{Committable as _, Key, Operation as _},
91 update_key, FloorHelper,
92 },
93 translator::Translator,
94 Context, Persistable,
95};
96use commonware_codec::{CodecShared, Read};
97use commonware_utils::Array;
98use core::ops::Range;
99use std::collections::BTreeMap;
100use tracing::{debug, warn};
101
102type Error = crate::qmdb::Error<crate::mmr::Family>;
103
104#[derive(Clone)]
106pub struct Config<T: Translator, C> {
107 pub log: JournalConfig<C>,
109
110 pub translator: T,
112}
113
114pub struct Changeset<K: Key, V: CodecShared + Clone> {
116 diff: BTreeMap<K, Option<V>>,
117 metadata: Option<V>,
118}
119
120impl<K: Key, V: CodecShared + Clone> Changeset<K, V> {
121 fn into_parts(self) -> (BTreeMap<K, Option<V>>, Option<V>) {
122 (self.diff, self.metadata)
123 }
124}
125
126impl<K: Key, V: CodecShared + Clone> FromIterator<(K, Option<V>)> for Changeset<K, V> {
127 fn from_iter<TIter: IntoIterator<Item = (K, Option<V>)>>(iter: TIter) -> Self {
128 Self {
129 diff: iter.into_iter().collect(),
130 metadata: None,
131 }
132 }
133}
134
135impl<K: Key, V: CodecShared + Clone, const N: usize> From<[(K, Option<V>); N]> for Changeset<K, V> {
136 fn from(items: [(K, Option<V>); N]) -> Self {
137 items.into_iter().collect()
138 }
139}
140
141pub struct Batch<'a, E, K, V, T>
143where
144 E: Context,
145 K: Array,
146 V: VariableValue,
147 T: Translator,
148{
149 db: &'a Db<E, K, V, T>,
150 diff: BTreeMap<K, Option<V>>,
151}
152
153impl<'a, E, K, V, T> Batch<'a, E, K, V, T>
154where
155 E: Context,
156 K: Array,
157 V: VariableValue,
158 T: Translator,
159{
160 const fn new(db: &'a Db<E, K, V, T>) -> Self {
161 Self {
162 db,
163 diff: BTreeMap::new(),
164 }
165 }
166
167 pub fn finalize(self, metadata: Option<V>) -> Changeset<K, V> {
169 Changeset {
170 diff: self.diff,
171 metadata,
172 }
173 }
174
175 pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
178 if let Some(value) = self.diff.get(key) {
179 return Ok(value.clone());
180 }
181 self.db.get(key).await
182 }
183
184 pub fn update(mut self, key: K, value: V) -> Self {
186 self.diff.insert(key, Some(value));
187 self
188 }
189
190 pub fn delete(mut self, key: K) -> Self {
192 self.diff.insert(key, None);
193 self
194 }
195}
196
197pub struct Db<E, K, V, T>
199where
200 E: Context,
201 K: Array,
202 V: VariableValue,
203 T: Translator,
204{
205 log: Journal<E, Operation<crate::mmr::Family, K, V>>,
212
213 snapshot: Index<T, Location>,
220
221 active_keys: usize,
223
224 pub inactivity_floor_loc: Location,
227
228 pub last_commit_loc: Location,
230
231 pub steps: u64,
234}
235
236impl<E, K, V, T> Db<E, K, V, T>
237where
238 E: Context,
239 K: Array,
240 V: VariableValue,
241 T: Translator,
242{
243 pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
245 for &loc in self.snapshot.get(key) {
246 let Operation::Update(Update(k, v)) = self.get_op(loc).await? else {
247 unreachable!("location ({loc}) does not reference update operation");
248 };
249
250 if &k == key {
251 return Ok(Some(v));
252 }
253 }
254
255 Ok(None)
256 }
257
258 pub const fn new_batch(&self) -> Batch<'_, E, K, V, T> {
260 Batch::new(self)
261 }
262
263 pub const fn is_empty(&self) -> bool {
265 self.active_keys == 0
266 }
267
268 async fn get_op(&self, loc: Location) -> Result<Operation<crate::mmr::Family, K, V>, Error> {
272 let reader = self.log.reader().await;
273 assert!(*loc < reader.bounds().end);
274 reader.read(*loc).await.map_err(|e| match e {
275 crate::journal::Error::ItemPruned(_) => Error::OperationPruned(loc),
276 e => Error::Journal(e),
277 })
278 }
279
280 pub async fn bounds(&self) -> std::ops::Range<Location> {
283 let bounds = self.log.reader().await.bounds();
284 Location::new(bounds.start)..Location::new(bounds.end)
285 }
286
287 pub async fn size(&self) -> Location {
289 Location::new(self.log.size().await)
290 }
291
292 pub const fn inactivity_floor_loc(&self) -> Location {
295 self.inactivity_floor_loc
296 }
297
298 pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
300 let Operation::CommitFloor(metadata, _) =
301 self.log.reader().await.read(*self.last_commit_loc).await?
302 else {
303 unreachable!("last commit should be a commit floor operation");
304 };
305
306 Ok(metadata)
307 }
308
309 pub async fn prune(&self, prune_loc: Location) -> Result<(), Error> {
312 if prune_loc > self.inactivity_floor_loc {
313 return Err(Error::PruneBeyondMinRequired(
314 prune_loc,
315 self.inactivity_floor_loc,
316 ));
317 }
318
319 if !self.log.prune(*prune_loc).await? {
322 return Ok(());
323 }
324
325 let bounds = self.log.reader().await.bounds();
326 let log_size = Location::new(bounds.end);
327 let oldest_retained_loc = Location::new(bounds.start);
328 debug!(
329 ?log_size,
330 ?oldest_retained_loc,
331 ?prune_loc,
332 "pruned inactive ops"
333 );
334
335 Ok(())
336 }
337
338 pub async fn init(
340 context: E,
341 cfg: Config<T, <Operation<crate::mmr::Family, K, V> as Read>::Cfg>,
342 ) -> Result<Self, Error> {
343 let mut log = Journal::<E, Operation<crate::mmr::Family, K, V>>::init(
344 context.with_label("log"),
345 cfg.log,
346 )
347 .await?;
348
349 if log.rewind_to(|op| op.is_commit()).await? == 0 {
351 warn!("Log is empty, initializing new db");
352 log.append(&Operation::CommitFloor(None, Location::new(0)))
353 .await?;
354 }
355
356 log.sync().await?;
359
360 let last_commit_loc = Location::new(
361 log.size()
362 .await
363 .checked_sub(1)
364 .expect("commit should exist"),
365 );
366
367 let mut snapshot = Index::new(context.with_label("snapshot"), cfg.translator);
369 let (inactivity_floor_loc, active_keys) = {
370 let reader = log.reader().await;
371 let op = reader.read(*last_commit_loc).await?;
372 let inactivity_floor_loc = op.has_floor().expect("last op should be a commit");
373 let active_keys =
374 build_snapshot_from_log(inactivity_floor_loc, &reader, &mut snapshot, |_, _| {})
375 .await?;
376 (inactivity_floor_loc, active_keys)
377 };
378
379 Ok(Self {
380 log,
381 snapshot,
382 active_keys,
383 inactivity_floor_loc,
384 last_commit_loc,
385 steps: 0,
386 })
387 }
388
389 pub async fn sync(&self) -> Result<(), Error> {
393 self.log.sync().await.map_err(Into::into)
394 }
395
396 pub async fn destroy(self) -> Result<(), Error> {
398 self.log.destroy().await.map_err(Into::into)
399 }
400
401 #[allow(clippy::type_complexity)]
402 const fn as_floor_helper(
403 &mut self,
404 ) -> FloorHelper<
405 '_,
406 crate::mmr::Family,
407 Index<T, Location>,
408 Journal<E, Operation<crate::mmr::Family, K, V>>,
409 > {
410 FloorHelper {
411 snapshot: &mut self.snapshot,
412 log: &mut self.log,
413 }
414 }
415
416 pub async fn apply_batch(&mut self, batch: Changeset<K, V>) -> Result<Range<Location>, Error> {
422 let start_loc = self.last_commit_loc + 1;
423 let (diff, metadata) = batch.into_parts();
424
425 for (key, value) in diff {
426 if let Some(value) = value {
427 let updated = {
428 let reader = self.log.reader().await;
429 let new_loc = reader.bounds().end;
430 update_key::<crate::mmr::Family, _, _>(
431 &mut self.snapshot,
432 &reader,
433 &key,
434 Location::new(new_loc),
435 )
436 .await?
437 };
438 if updated.is_some() {
439 self.steps += 1;
440 } else {
441 self.active_keys += 1;
442 }
443 self.log
444 .append(&Operation::Update(Update(key, value)))
445 .await?;
446 } else {
447 let deleted = {
448 let reader = self.log.reader().await;
449 delete_key::<crate::mmr::Family, _, _>(&mut self.snapshot, &reader, &key)
450 .await?
451 };
452 if deleted.is_some() {
453 self.log.append(&Operation::Delete(key)).await?;
454 self.steps += 1;
455 self.active_keys -= 1;
456 }
457 }
458 }
459
460 if self.is_empty() {
463 self.inactivity_floor_loc = self.size().await;
464 debug!(tip = ?self.inactivity_floor_loc, "db is empty, raising floor to tip");
465 } else {
466 let steps_to_take = self.steps + 1;
467 for _ in 0..steps_to_take {
468 let loc = self.inactivity_floor_loc;
469 self.inactivity_floor_loc = self.as_floor_helper().raise_floor(loc).await?;
470 }
471 }
472
473 self.last_commit_loc = Location::new(
475 self.log
476 .append(&Operation::CommitFloor(metadata, self.inactivity_floor_loc))
477 .await?,
478 );
479
480 self.steps = 0;
481
482 let end_loc = self.size().await;
483 Ok(start_loc..end_loc)
484 }
485
486 pub async fn commit(&self) -> Result<(), Error> {
488 self.log.commit().await.map_err(Into::into)
489 }
490}
491
492impl<E, K, V, T> Persistable for Db<E, K, V, T>
493where
494 E: Context,
495 K: Array,
496 V: VariableValue,
497 T: Translator,
498{
499 type Error = Error;
500
501 async fn commit(&self) -> Result<(), Error> {
502 Self::commit(self).await
503 }
504
505 async fn sync(&self) -> Result<(), Error> {
506 self.sync().await
507 }
508
509 async fn destroy(self) -> Result<(), Error> {
510 self.destroy().await
511 }
512}
513
514#[cfg(test)]
515mod test {
516 use super::*;
517 use crate::translator::TwoCap;
518 use commonware_cryptography::{
519 blake3::{Blake3, Digest},
520 Hasher as _,
521 };
522 use commonware_macros::test_traced;
523 use commonware_math::algebra::Random;
524 use commonware_runtime::{buffer::paged::CacheRef, deterministic, Metrics, Runner};
525 use commonware_utils::{NZUsize, NZU16, NZU64};
526 use std::num::{NonZeroU16, NonZeroUsize};
527
528 const PAGE_SIZE: NonZeroU16 = NZU16!(77);
529 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
530
531 type TestStore = Db<deterministic::Context, Digest, Vec<u8>, TwoCap>;
533
534 async fn create_test_store(context: deterministic::Context) -> TestStore {
535 let cfg = Config {
536 log: JournalConfig {
537 partition: "journal".into(),
538 write_buffer: NZUsize!(64 * 1024),
539 compression: None,
540 codec_config: ((), ((0..=10000).into(), ())),
541 items_per_section: NZU64!(7),
542 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
543 },
544 translator: TwoCap,
545 };
546 TestStore::init(context, cfg).await.unwrap()
547 }
548
549 async fn apply_entries(
550 db: &mut TestStore,
551 iter: impl IntoIterator<Item = (Digest, Option<Vec<u8>>)> + Send,
552 ) -> Range<Location> {
553 db.apply_batch(iter.into_iter().collect()).await.unwrap()
554 }
555
556 #[test_traced("DEBUG")]
557 pub fn test_store_construct_empty() {
558 let executor = deterministic::Runner::default();
559 executor.start(|mut context| async move {
560 let mut db = create_test_store(context.with_label("store_0")).await;
561 assert_eq!(db.bounds().await.end, 1);
562 assert_eq!(db.log.bounds().await.start, 0);
563 assert_eq!(db.inactivity_floor_loc(), 0);
564 assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
565 assert!(matches!(
566 db.prune(Location::new(1)).await,
567 Err(Error::PruneBeyondMinRequired(_, _))
568 ));
569 assert!(db.get_metadata().await.unwrap().is_none());
570
571 let d1 = Digest::random(&mut context);
573 let v1 = vec![1, 2, 3];
574 apply_entries(&mut db, [(d1, Some(v1))]).await;
575 drop(db);
576
577 let mut db = create_test_store(context.with_label("store_1")).await;
578 assert_eq!(db.bounds().await.end, 1);
579
580 let metadata = vec![1, 2, 3];
582 let batch = db.new_batch().finalize(Some(metadata.clone()));
583 let range = db.apply_batch(batch).await.unwrap();
584 assert_eq!(range.start, 1);
585 assert_eq!(range.end, 2);
586 db.commit().await.unwrap();
587 assert_eq!(db.bounds().await.end, 2);
588 assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
589 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
590
591 let mut db = create_test_store(context.with_label("store_2")).await;
592 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
593
594 apply_entries(
597 &mut db,
598 [(Digest::random(&mut context), Some(vec![1, 2, 3]))],
599 )
600 .await;
601 db.commit().await.unwrap();
602 for _ in 1..100 {
603 db.apply_batch(db.new_batch().finalize(None)).await.unwrap();
604 db.commit().await.unwrap();
605 assert!(db.bounds().await.end - db.inactivity_floor_loc <= 3);
608 assert!(db.get_metadata().await.unwrap().is_none());
609 }
610
611 db.destroy().await.unwrap();
612 });
613 }
614
615 #[test_traced("DEBUG")]
616 fn test_store_construct_basic() {
617 let executor = deterministic::Runner::default();
618
619 executor.start(|mut ctx| async move {
620 let mut db = create_test_store(ctx.with_label("store_0")).await;
621
622 assert_eq!(db.bounds().await.end, 1);
624 assert_eq!(db.inactivity_floor_loc, 0);
625
626 let key = Digest::random(&mut ctx);
627 let value = vec![2, 3, 4, 5];
628
629 let result = db.get(&key).await;
631 assert!(result.unwrap().is_none());
632
633 apply_entries(&mut db, [(key, Some(value.clone()))]).await;
636
637 assert_eq!(*db.bounds().await.end, 4);
638 assert_eq!(*db.inactivity_floor_loc, 2);
639
640 let fetched_value = db.get(&key).await.unwrap();
642 assert_eq!(fetched_value.unwrap(), value);
643
644 drop(db);
647
648 let mut db = create_test_store(ctx.with_label("store_1")).await;
650
651 assert_eq!(*db.bounds().await.end, 1);
653 assert_eq!(*db.inactivity_floor_loc, 0);
654 assert!(db.get_metadata().await.unwrap().is_none());
655
656 let metadata = vec![99, 100];
658 let range = db
659 .apply_batch(
660 db.new_batch()
661 .update(key, value.clone())
662 .finalize(Some(metadata.clone())),
663 )
664 .await
665 .unwrap();
666 assert_eq!(*range.start, 1);
667 assert_eq!(*range.end, 4);
668 db.commit().await.unwrap();
669 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
670
671 assert_eq!(*db.bounds().await.end, 4);
672 assert_eq!(*db.inactivity_floor_loc, 2);
673
674 let mut db = create_test_store(ctx.with_label("store_2")).await;
676
677 assert_eq!(*db.bounds().await.end, 4);
679 assert_eq!(*db.inactivity_floor_loc, 2);
680
681 let fetched_value = db.get(&key).await.unwrap();
683 assert_eq!(fetched_value.unwrap(), value);
684
685 let (k1, v1) = (Digest::random(&mut ctx), vec![2, 3, 4, 5, 6]);
687 let (k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8]);
688 apply_entries(&mut db, [(k1, Some(v1.clone()))]).await;
689 apply_entries(&mut db, [(k2, Some(v2.clone()))]).await;
690
691 assert_eq!(*db.bounds().await.end, 10);
692 assert_eq!(*db.inactivity_floor_loc, 5);
693
694 assert_eq!(db.get_metadata().await.unwrap(), None);
697
698 db.commit().await.unwrap();
699 assert_eq!(db.get_metadata().await.unwrap(), None);
700
701 assert_eq!(*db.bounds().await.end, 10);
703 assert_eq!(*db.inactivity_floor_loc, 5);
704
705 assert_eq!(db.get(&key).await.unwrap().unwrap(), value);
707 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
708 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
709
710 let mut v1_updated = db.get(&k1).await.unwrap().unwrap();
712 v1_updated.push(7);
713 apply_entries(&mut db, [(k1, Some(v1_updated))]).await;
714 db.commit().await.unwrap();
715 assert_eq!(db.get(&k1).await.unwrap().unwrap(), vec![2, 3, 4, 5, 6, 7]);
716
717 let k3 = Digest::random(&mut ctx);
719 apply_entries(&mut db, [(k3, Some(vec![8]))]).await;
720 db.commit().await.unwrap();
721 assert_eq!(db.get(&k3).await.unwrap().unwrap(), vec![8]);
722
723 db.destroy().await.unwrap();
725 });
726 }
727
728 #[test_traced("DEBUG")]
729 fn test_store_log_replay() {
730 let executor = deterministic::Runner::default();
731
732 executor.start(|mut ctx| async move {
733 let mut db = create_test_store(ctx.with_label("store_0")).await;
734
735 const UPDATES: u64 = 100;
737 let k = Digest::random(&mut ctx);
738 for _ in 0..UPDATES {
739 let v = vec![1, 2, 3, 4, 5];
740 apply_entries(&mut db, [(k, Some(v.clone()))]).await;
741 }
742
743 let iter = db.snapshot.get(&k);
744 assert_eq!(iter.count(), 1);
745
746 db.commit().await.unwrap();
747 db.sync().await.unwrap();
748 drop(db);
749
750 let db = create_test_store(ctx.with_label("store_1")).await;
752 db.prune(db.inactivity_floor_loc()).await.unwrap();
753
754 let iter = db.snapshot.get(&k);
755 assert_eq!(iter.count(), 1);
756
757 assert_eq!(*db.bounds().await.end, 400);
760 assert_eq!(*db.inactivity_floor_loc, 398);
762 let floor = db.inactivity_floor_loc;
763
764 assert_eq!(db.log.bounds().await.start, *floor - *floor % 7);
767
768 db.destroy().await.unwrap();
769 });
770 }
771
772 #[test_traced("DEBUG")]
773 fn test_store_build_snapshot_keys_with_shared_prefix() {
774 let executor = deterministic::Runner::default();
775
776 executor.start(|mut ctx| async move {
777 let mut db = create_test_store(ctx.with_label("store_0")).await;
778
779 let (k1, v1) = (Digest::random(&mut ctx), vec![1, 2, 3, 4, 5]);
780 let (mut k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8, 9, 10]);
781
782 k2.0[0..2].copy_from_slice(&k1.0[0..2]);
784
785 apply_entries(&mut db, [(k1, Some(v1.clone()))]).await;
786 apply_entries(&mut db, [(k2, Some(v2.clone()))]).await;
787
788 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
789 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
790
791 db.commit().await.unwrap();
792 db.sync().await.unwrap();
793 drop(db);
794
795 let db = create_test_store(ctx.with_label("store_1")).await;
798
799 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
800 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
801
802 db.destroy().await.unwrap();
803 });
804 }
805
806 #[test_traced("DEBUG")]
807 fn test_store_delete() {
808 let executor = deterministic::Runner::default();
809
810 executor.start(|mut ctx| async move {
811 let mut db = create_test_store(ctx.with_label("store_0")).await;
812
813 let k = Digest::random(&mut ctx);
815 let v = vec![1, 2, 3, 4, 5];
816 apply_entries(&mut db, [(k, Some(v.clone()))]).await;
817 db.commit().await.unwrap();
818
819 let fetched_value = db.get(&k).await.unwrap();
821 assert_eq!(fetched_value.unwrap(), v);
822
823 assert!(db.get(&k).await.unwrap().is_some());
825 apply_entries(&mut db, [(k, None)]).await;
826
827 let fetched_value = db.get(&k).await.unwrap();
829 assert!(fetched_value.is_none());
830 assert!(db.get(&k).await.unwrap().is_none());
831
832 db.commit().await.unwrap();
834
835 let mut db = create_test_store(ctx.with_label("store_1")).await;
837 let fetched_value = db.get(&k).await.unwrap();
838 assert!(fetched_value.is_none());
839
840 apply_entries(&mut db, [(k, Some(v.clone()))]).await;
842 let fetched_value = db.get(&k).await.unwrap();
843 assert_eq!(fetched_value.unwrap(), v);
844
845 db.commit().await.unwrap();
847
848 let mut db = create_test_store(ctx.with_label("store_2")).await;
851 let fetched_value = db.get(&k).await.unwrap();
852 assert_eq!(fetched_value.unwrap(), v);
853
854 let k_n = Digest::random(&mut ctx);
856 let range = apply_entries(&mut db, [(k_n, None)]).await;
857 assert_eq!(range.start, 9);
858 assert_eq!(range.end, 11);
859 db.commit().await.unwrap();
860
861 assert!(db.get(&k_n).await.unwrap().is_none());
862 assert!(db.get(&k).await.unwrap().is_some());
864
865 db.destroy().await.unwrap();
866 });
867 }
868
869 #[test_traced("DEBUG")]
871 fn test_store_pruning() {
872 let executor = deterministic::Runner::default();
873
874 executor.start(|mut ctx| async move {
875 let mut db = create_test_store(ctx.with_label("store")).await;
876
877 let k_a = Digest::random(&mut ctx);
878 let k_b = Digest::random(&mut ctx);
879
880 let v_a = vec![1];
881 let v_b = vec![];
882 let v_c = vec![4, 5, 6];
883
884 apply_entries(&mut db, [(k_a, Some(v_a.clone()))]).await;
885 apply_entries(&mut db, [(k_b, Some(v_b.clone()))]).await;
886
887 db.commit().await.unwrap();
888 assert_eq!(*db.bounds().await.end, 7);
889 assert_eq!(*db.inactivity_floor_loc, 3);
890 assert_eq!(db.get(&k_a).await.unwrap().unwrap(), v_a);
891
892 apply_entries(&mut db, [(k_b, Some(v_a.clone()))]).await;
893 apply_entries(&mut db, [(k_a, Some(v_c.clone()))]).await;
894
895 db.commit().await.unwrap();
896 assert_eq!(*db.bounds().await.end, 15);
897 assert_eq!(*db.inactivity_floor_loc, 12);
898 assert_eq!(db.get(&k_a).await.unwrap().unwrap(), v_c);
899 assert_eq!(db.get(&k_b).await.unwrap().unwrap(), v_a);
900
901 db.destroy().await.unwrap();
902 });
903 }
904
905 #[test_traced("WARN")]
906 pub fn test_store_db_recovery() {
907 let executor = deterministic::Runner::default();
908 const ELEMENTS: u64 = 1000;
910 executor.start(|context| async move {
911 let db = create_test_store(context.with_label("store_0")).await;
912
913 {
915 let mut batch = db.new_batch();
916 for i in 0u64..ELEMENTS {
917 let k = Blake3::hash(&i.to_be_bytes());
918 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
919 batch = batch.update(k, v);
920 }
921 }
923 drop(db);
924 let mut db = create_test_store(context.with_label("store_1")).await;
925 assert_eq!(*db.bounds().await.end, 1);
926
927 for i in 0u64..ELEMENTS {
929 let k = Blake3::hash(&i.to_be_bytes());
930 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
931 apply_entries(&mut db, [(k, Some(v.clone()))]).await;
932 }
933 db.commit().await.unwrap();
934
935 for i in 0u64..ELEMENTS {
937 if i % 3 != 0 {
938 continue;
939 }
940 let k = Blake3::hash(&i.to_be_bytes());
941 let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
942 apply_entries(&mut db, [(k, Some(v.clone()))]).await;
943 }
944 db.commit().await.unwrap();
945 assert_eq!(db.snapshot.items(), 1000);
946
947 for i in 0u64..ELEMENTS {
949 if i % 7 != 1 {
950 continue;
951 }
952 let k = Blake3::hash(&i.to_be_bytes());
953 apply_entries(&mut db, [(k, None)]).await;
954 }
955 db.commit().await.unwrap();
956 let final_count = db.bounds().await.end;
957 let final_floor = db.inactivity_floor_loc;
958
959 db.sync().await.unwrap();
961 drop(db);
962 let db = create_test_store(context.with_label("store_2")).await;
963 assert_eq!(db.bounds().await.end, final_count);
964 assert_eq!(db.inactivity_floor_loc, final_floor);
965
966 db.prune(db.inactivity_floor_loc()).await.unwrap();
967 assert_eq!(db.log.bounds().await.start, *final_floor - *final_floor % 7);
968 assert_eq!(db.snapshot.items(), 857);
969
970 db.destroy().await.unwrap();
971 });
972 }
973
974 #[test_traced("DEBUG")]
975 fn test_store_batch() {
976 let executor = deterministic::Runner::default();
977
978 executor.start(|mut ctx| async move {
979 let mut db = create_test_store(ctx.with_label("store_0")).await;
980
981 assert_eq!(db.bounds().await.end, 1);
983 assert_eq!(db.inactivity_floor_loc, 0);
984
985 let key = Digest::random(&mut ctx);
986 let value = vec![2, 3, 4, 5];
987
988 let batch = db.new_batch();
989
990 let result = batch.get(&key).await;
992 assert!(result.unwrap().is_none());
993
994 let batch = batch.update(key, value.clone());
996
997 assert_eq!(db.bounds().await.end, 1); assert_eq!(db.inactivity_floor_loc, 0);
999
1000 let fetched_value = batch.get(&key).await.unwrap();
1002 assert_eq!(fetched_value.unwrap(), value);
1003 db.apply_batch(batch.finalize(None)).await.unwrap();
1004 drop(db);
1005
1006 let mut db = create_test_store(ctx.with_label("store_1")).await;
1008
1009 assert_eq!(db.bounds().await.end, 1);
1011 assert_eq!(db.inactivity_floor_loc, 0);
1012 assert!(db.get_metadata().await.unwrap().is_none());
1013
1014 let metadata = vec![99, 100];
1016 let range = db
1017 .apply_batch(
1018 db.new_batch()
1019 .update(key, value.clone())
1020 .finalize(Some(metadata.clone())),
1021 )
1022 .await
1023 .unwrap();
1024 assert_eq!(range.start, 1);
1025 assert_eq!(range.end, 4);
1026 db.commit().await.unwrap();
1027 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
1028 drop(db);
1029
1030 let db = create_test_store(ctx.with_label("store_2")).await;
1032
1033 assert_eq!(db.bounds().await.end, 4);
1035 assert_eq!(db.inactivity_floor_loc, 2);
1036
1037 let fetched_value = db.get(&key).await.unwrap();
1039 assert_eq!(fetched_value.unwrap(), value);
1040
1041 db.destroy().await.unwrap();
1043 });
1044 }
1045
1046 fn is_send<T: Send>(_: T) {}
1047
1048 #[allow(dead_code)]
1049 fn assert_read_futures_are_send(db: &mut TestStore, key: Digest, loc: Location) {
1050 is_send(db.get(&key));
1051 is_send(db.get_metadata());
1052 is_send(db.prune(loc));
1053 is_send(db.sync());
1054 }
1055
1056 #[allow(dead_code)]
1057 fn assert_write_futures_are_send(
1058 db: &mut Db<deterministic::Context, Digest, Vec<u8>, TwoCap>,
1059 key: Digest,
1060 value: Vec<u8>,
1061 ) {
1062 is_send(db.get(&key));
1063 is_send(db.apply_batch(Changeset::from([(key, Some(value))])));
1064 is_send(db.apply_batch(Changeset::from([(key, None)])));
1065 let batch = db.new_batch();
1066 is_send(batch.get(&key));
1067 }
1068
1069 #[allow(dead_code)]
1070 fn assert_commit_is_send(db: &Db<deterministic::Context, Digest, Vec<u8>, TwoCap>) {
1071 is_send(db.commit());
1072 }
1073}