1use crate::{
87 index::{unordered::Index, Unordered as _},
88 journal::contiguous::{
89 variable::{Config as JournalConfig, Journal},
90 MutableContiguous as _,
91 },
92 mmr::{Location, Proof},
93 qmdb::{
94 any::{
95 unordered::{variable::Operation, Update},
96 VariableValue,
97 },
98 build_snapshot_from_log, create_key, delete_key,
99 operation::{Committable as _, Operation as _},
100 update_key, Error, FloorHelper,
101 },
102 translator::Translator,
103};
104use commonware_codec::{Codec, Read};
105use commonware_cryptography::Digest;
106use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage};
107use commonware_utils::Array;
108use core::{future::Future, ops::Range};
109use std::num::{NonZeroU64, NonZeroUsize};
110use tracing::{debug, warn};
111
112mod batch;
113#[cfg(test)]
114pub use batch::tests as batch_tests;
115pub use batch::{Batch, Batchable, Getter};
116
117#[derive(Clone)]
119pub struct Config<T: Translator, C> {
120 pub log_partition: String,
122
123 pub log_write_buffer: NonZeroUsize,
125
126 pub log_compression: Option<u8>,
128
129 pub log_codec_config: C,
131
132 pub log_items_per_section: NonZeroU64,
134
135 pub translator: T,
137
138 pub buffer_pool: PoolRef,
140}
141
142pub trait LogStore {
144 type Value: Codec + Clone;
145
146 fn is_empty(&self) -> bool;
148
149 fn op_count(&self) -> Location;
152
153 fn inactivity_floor_loc(&self) -> Location;
156
157 fn get_metadata(&self) -> impl Future<Output = Result<Option<Self::Value>, Error>>;
159}
160
161pub trait LogStorePrunable: LogStore {
163 fn prune(&mut self, prune_loc: Location) -> impl Future<Output = Result<(), Error>>;
165}
166
167pub trait DirtyStore: LogStore {
169 type Digest: Digest;
171
172 type Operation;
174
175 type Clean: CleanStore<
177 Digest = Self::Digest,
178 Operation = Self::Operation,
179 Dirty = Self,
180 Value = Self::Value,
181 >;
182
183 fn merkleize(self) -> impl Future<Output = Result<Self::Clean, Error>>;
187}
188
189pub trait CleanStore: LogStore {
191 type Digest: Digest;
193
194 type Operation;
196
197 type Dirty: DirtyStore<
199 Digest = Self::Digest,
200 Operation = Self::Operation,
201 Clean = Self,
202 Value = Self::Value,
203 >;
204
205 fn root(&self) -> Self::Digest;
207
208 #[allow(clippy::type_complexity)]
220 fn proof(
221 &self,
222 start_loc: Location,
223 max_ops: NonZeroU64,
224 ) -> impl Future<Output = Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error>>;
225
226 #[allow(clippy::type_complexity)]
240 fn historical_proof(
241 &self,
242 historical_size: Location,
243 start_loc: Location,
244 max_ops: NonZeroU64,
245 ) -> impl Future<Output = Result<(Proof<Self::Digest>, Vec<Self::Operation>), Error>>;
246
247 fn into_dirty(self) -> Self::Dirty;
249}
250
251pub struct Store<E, K, V, T>
253where
254 E: Storage + Clock + Metrics,
255 K: Array,
256 V: VariableValue,
257 T: Translator,
258{
259 log: Journal<E, Operation<K, V>>,
266
267 snapshot: Index<T, Location>,
274
275 active_keys: usize,
277
278 inactivity_floor_loc: Location,
281
282 steps: u64,
285
286 last_commit_loc: Location,
288}
289
290type FloorHelperState<'a, E, K, V, T> =
292 FloorHelper<'a, Index<T, Location>, Journal<E, Operation<K, V>>>;
293
294impl<E, K, V, T> Store<E, K, V, T>
295where
296 E: Storage + Clock + Metrics,
297 K: Array,
298 V: VariableValue,
299 T: Translator,
300{
301 pub async fn init(
308 context: E,
309 cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
310 ) -> Result<Self, Error> {
311 let mut log = Journal::<E, Operation<K, V>>::init(
312 context.with_label("log"),
313 JournalConfig {
314 partition: cfg.log_partition,
315 items_per_section: cfg.log_items_per_section,
316 compression: cfg.log_compression,
317 codec_config: cfg.log_codec_config,
318 buffer_pool: cfg.buffer_pool,
319 write_buffer: cfg.log_write_buffer,
320 },
321 )
322 .await?;
323
324 if log.rewind_to(|op| op.is_commit()).await? == 0 {
326 warn!("Log is empty, initializing new db");
327 log.append(Operation::CommitFloor(None, Location::new_unchecked(0)))
328 .await?;
329 }
330
331 log.sync().await?;
334
335 let last_commit_loc =
336 Location::new_unchecked(log.size().checked_sub(1).expect("commit should exist"));
337 let op = log.read(*last_commit_loc).await?;
338 let inactivity_floor_loc = op.has_floor().expect("last op should be a commit");
339
340 let mut snapshot = Index::new(context.with_label("snapshot"), cfg.translator);
342 let active_keys =
343 build_snapshot_from_log(inactivity_floor_loc, &log, &mut snapshot, |_, _| {}).await?;
344
345 Ok(Self {
346 log,
347 snapshot,
348 active_keys,
349 inactivity_floor_loc,
350 steps: 0,
351 last_commit_loc,
352 })
353 }
354
355 pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
357 for &loc in self.snapshot.get(key) {
358 let Operation::Update(Update(k, v)) = self.get_op(loc).await? else {
359 unreachable!("location ({loc}) does not reference update operation");
360 };
361
362 if &k == key {
363 return Ok(Some(v));
364 }
365 }
366
367 Ok(None)
368 }
369
370 const fn as_floor_helper(&mut self) -> FloorHelperState<'_, E, K, V, T> {
371 FloorHelper {
372 snapshot: &mut self.snapshot,
373 log: &mut self.log,
374 }
375 }
376
377 pub const fn is_empty(&self) -> bool {
379 self.active_keys == 0
380 }
381
382 async fn get_op(&self, loc: Location) -> Result<Operation<K, V>, Error> {
386 assert!(loc < self.op_count());
387
388 self.log.read(*loc).await.map_err(|e| match e {
391 crate::journal::Error::ItemPruned(_) => Error::OperationPruned(loc),
392 e => Error::Journal(e),
393 })
394 }
395
396 pub const fn op_count(&self) -> Location {
399 Location::new_unchecked(self.log.size())
400 }
401
402 pub const fn inactivity_floor_loc(&self) -> Location {
405 self.inactivity_floor_loc
406 }
407
408 pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
410 let Operation::CommitFloor(metadata, _) = self.log.read(*self.last_commit_loc).await?
411 else {
412 unreachable!("last commit should be a commit floor operation");
413 };
414
415 Ok(metadata)
416 }
417
418 pub async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
421 let new_loc = self.op_count();
422 if update_key(&mut self.snapshot, &self.log, &key, new_loc)
423 .await?
424 .is_some()
425 {
426 self.steps += 1;
427 } else {
428 self.active_keys += 1;
429 }
430
431 self.log
432 .append(Operation::Update(Update(key, value)))
433 .await?;
434
435 Ok(())
436 }
437
438 pub async fn create(&mut self, key: K, value: V) -> Result<bool, Error> {
442 let new_loc = self.op_count();
443 if !create_key(&mut self.snapshot, &self.log, &key, new_loc).await? {
444 return Ok(false);
445 }
446
447 self.active_keys += 1;
448 self.log
449 .append(Operation::Update(Update(key, value)))
450 .await?;
451
452 Ok(true)
453 }
454
455 pub async fn delete(&mut self, key: K) -> Result<bool, Error> {
459 let r = delete_key(&mut self.snapshot, &self.log, &key).await?;
460 if r.is_none() {
461 return Ok(false);
462 }
463
464 self.log.append(Operation::Delete(key)).await?;
465 self.steps += 1;
466 self.active_keys -= 1;
467
468 Ok(true)
469 }
470
471 pub async fn commit(&mut self, metadata: Option<V>) -> Result<Range<Location>, Error> {
479 let start_loc = self.last_commit_loc + 1;
480
481 if self.is_empty() {
484 self.inactivity_floor_loc = self.op_count();
485 debug!(tip = ?self.inactivity_floor_loc, "db is empty, raising floor to tip");
486 } else {
487 let steps_to_take = self.steps + 1;
488 for _ in 0..steps_to_take {
489 let loc = self.inactivity_floor_loc;
490 self.inactivity_floor_loc = self.as_floor_helper().raise_floor(loc).await?;
491 }
492 }
493 self.steps = 0;
494
495 self.last_commit_loc = Location::new_unchecked(
497 self.log
498 .append(Operation::CommitFloor(metadata, self.inactivity_floor_loc))
499 .await?,
500 );
501
502 self.log.commit().await?;
504
505 Ok(start_loc..self.op_count())
506 }
507
508 pub async fn sync(&mut self) -> Result<(), Error> {
512 self.log.sync().await.map_err(Into::into)
513 }
514
515 pub async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
518 if prune_loc > self.inactivity_floor_loc {
519 return Err(Error::PruneBeyondMinRequired(
520 prune_loc,
521 self.inactivity_floor_loc,
522 ));
523 }
524
525 if !self.log.prune(*prune_loc).await? {
528 return Ok(());
529 }
530
531 debug!(
532 log_size = ?self.op_count(),
533 oldest_retained_loc = ?self.log.oldest_retained_pos(),
534 ?prune_loc,
535 "pruned inactive ops"
536 );
537
538 Ok(())
539 }
540
541 pub async fn close(self) -> Result<(), Error> {
544 self.log.close().await.map_err(Into::into)
545 }
546
547 pub async fn destroy(self) -> Result<(), Error> {
549 self.log.destroy().await.map_err(Into::into)
550 }
551}
552
553impl<E, K, V, T> LogStorePrunable for Store<E, K, V, T>
554where
555 E: Storage + Clock + Metrics,
556 K: Array,
557 V: VariableValue,
558 T: Translator,
559{
560 async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
561 self.prune(prune_loc).await
562 }
563}
564
565impl<E, K, V, T> crate::store::StorePersistable for Store<E, K, V, T>
566where
567 E: Storage + Clock + Metrics,
568 K: Array,
569 V: VariableValue,
570 T: Translator,
571{
572 async fn commit(&mut self) -> Result<(), Error> {
573 self.commit(None).await.map(|_| ())
574 }
575
576 async fn destroy(self) -> Result<(), Error> {
577 self.destroy().await
578 }
579}
580
581impl<E, K, V, T> LogStore for Store<E, K, V, T>
582where
583 E: Storage + Clock + Metrics,
584 K: Array,
585 V: VariableValue,
586 T: Translator,
587{
588 type Value = V;
589
590 fn op_count(&self) -> Location {
591 self.op_count()
592 }
593
594 fn inactivity_floor_loc(&self) -> Location {
595 self.inactivity_floor_loc()
596 }
597
598 async fn get_metadata(&self) -> Result<Option<V>, Error> {
599 self.get_metadata().await
600 }
601
602 fn is_empty(&self) -> bool {
603 self.is_empty()
604 }
605}
606
607impl<E, K, V, T> crate::store::Store for Store<E, K, V, T>
608where
609 E: Storage + Clock + Metrics,
610 K: Array,
611 V: VariableValue,
612 T: Translator,
613{
614 type Key = K;
615 type Value = V;
616 type Error = Error;
617
618 async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
619 self.get(key).await
620 }
621}
622
623impl<E, K, V, T> crate::store::StoreMut for Store<E, K, V, T>
624where
625 E: Storage + Clock + Metrics,
626 K: Array,
627 V: VariableValue,
628 T: Translator,
629{
630 async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
631 self.update(key, value).await
632 }
633}
634
635impl<E, K, V, T> crate::store::StoreDeletable for Store<E, K, V, T>
636where
637 E: Storage + Clock + Metrics,
638 K: Array,
639 V: VariableValue,
640 T: Translator,
641{
642 async fn delete(&mut self, key: Self::Key) -> Result<bool, Self::Error> {
643 self.delete(key).await
644 }
645}
646
647impl<E, K, V, T> Batchable for Store<E, K, V, T>
648where
649 E: Storage + Clock + Metrics,
650 K: Array,
651 V: VariableValue,
652 T: Translator,
653{
654}
655
656#[cfg(test)]
657mod test {
658 use super::*;
659 use crate::{qmdb::store::batch_tests, store::StoreMut as _, translator::TwoCap};
660 use commonware_cryptography::{
661 blake3::{Blake3, Digest},
662 Hasher as _,
663 };
664 use commonware_macros::test_traced;
665 use commonware_math::algebra::Random;
666 use commonware_runtime::{deterministic, Runner};
667 use commonware_utils::{NZUsize, NZU64};
668
669 const PAGE_SIZE: usize = 77;
670 const PAGE_CACHE_SIZE: usize = 9;
671
672 type TestStore = Store<deterministic::Context, Digest, Vec<u8>, TwoCap>;
674
675 async fn create_test_store(context: deterministic::Context) -> TestStore {
676 let cfg = Config {
677 log_partition: "journal".to_string(),
678 log_write_buffer: NZUsize!(64 * 1024),
679 log_compression: None,
680 log_codec_config: ((0..=10000).into(), ()),
681 log_items_per_section: NZU64!(7),
682 translator: TwoCap,
683 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
684 };
685 Store::init(context, cfg).await.unwrap()
686 }
687
688 #[test_traced("DEBUG")]
689 pub fn test_store_construct_empty() {
690 let executor = deterministic::Runner::default();
691 executor.start(|mut context| async move {
692 let mut db = create_test_store(context.clone()).await;
693 assert_eq!(db.op_count(), 1);
694 assert_eq!(db.log.oldest_retained_pos(), Some(0));
695 assert_eq!(db.inactivity_floor_loc(), 0);
696 assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
697 assert!(matches!(
698 db.prune(Location::new_unchecked(1)).await,
699 Err(Error::PruneBeyondMinRequired(_, _))
700 ));
701 assert!(db.get_metadata().await.unwrap().is_none());
702
703 let d1 = Digest::random(&mut context);
705 let v1 = vec![1, 2, 3];
706 db.update(d1, v1).await.unwrap();
707 db.close().await.unwrap();
708 let mut db = create_test_store(context.clone()).await;
709 assert_eq!(db.op_count(), 1);
710
711 let metadata = vec![1, 2, 3];
713 let range = db.commit(Some(metadata.clone())).await.unwrap();
714 assert_eq!(range.start, 1);
715 assert_eq!(range.end, 2);
716 assert_eq!(db.op_count(), 2);
717 assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
718 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
719
720 let mut db = create_test_store(context.clone()).await;
721 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
722
723 db.update(Digest::random(&mut context), vec![1, 2, 3])
726 .await
727 .unwrap();
728 for _ in 1..100 {
729 db.commit(None).await.unwrap();
730 assert!(db.op_count() - db.inactivity_floor_loc <= 3);
733 assert!(db.get_metadata().await.unwrap().is_none());
734 }
735
736 db.destroy().await.unwrap();
737 });
738 }
739
740 #[test_traced("DEBUG")]
741 fn test_store_construct_basic() {
742 let executor = deterministic::Runner::default();
743
744 executor.start(|mut ctx| async move {
745 let mut store = create_test_store(ctx.with_label("store")).await;
746
747 assert_eq!(store.op_count(), 1);
749 assert_eq!(store.inactivity_floor_loc, 0);
750
751 let key = Digest::random(&mut ctx);
752 let value = vec![2, 3, 4, 5];
753
754 let result = store.get(&key).await;
756 assert!(result.unwrap().is_none());
757
758 store.update(key, value.clone()).await.unwrap();
760
761 assert_eq!(store.op_count(), 2);
762 assert_eq!(store.inactivity_floor_loc, 0);
763
764 let fetched_value = store.get(&key).await.unwrap();
766 assert_eq!(fetched_value.unwrap(), value);
767
768 store.sync().await.unwrap();
770
771 let mut store = create_test_store(ctx.with_label("store")).await;
773
774 assert_eq!(store.op_count(), 1);
776 assert_eq!(store.inactivity_floor_loc, 0);
777 assert!(store.get_metadata().await.unwrap().is_none());
778
779 store.update(key, value.clone()).await.unwrap();
781
782 assert_eq!(store.op_count(), 2);
783 assert_eq!(store.inactivity_floor_loc, 0);
784
785 let metadata = vec![99, 100];
787 let range = store.commit(Some(metadata.clone())).await.unwrap();
788 assert_eq!(range.start, 1);
789 assert_eq!(range.end, 4);
790 assert_eq!(store.get_metadata().await.unwrap(), Some(metadata.clone()));
791
792 assert_eq!(store.op_count(), 4);
796 assert_eq!(store.inactivity_floor_loc, 2);
797
798 let mut store = create_test_store(ctx.with_label("store")).await;
800
801 assert_eq!(store.op_count(), 4);
803 assert_eq!(store.inactivity_floor_loc, 2);
804
805 let fetched_value = store.get(&key).await.unwrap();
807 assert_eq!(fetched_value.unwrap(), value);
808
809 let (k1, v1) = (Digest::random(&mut ctx), vec![2, 3, 4, 5, 6]);
811 let (k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8]);
812 store.update(k1, v1.clone()).await.unwrap();
813 store.update(k2, v2.clone()).await.unwrap();
814
815 assert_eq!(store.op_count(), 6);
816 assert_eq!(store.inactivity_floor_loc, 2);
817
818 assert_eq!(store.get_metadata().await.unwrap(), Some(metadata));
820
821 let range = store.commit(None).await.unwrap();
822 assert_eq!(range.start, 4);
823 assert_eq!(range.end, store.op_count());
824 assert_eq!(store.get_metadata().await.unwrap(), None);
825
826 assert_eq!(store.op_count(), 8);
827 assert_eq!(store.inactivity_floor_loc, 3);
828
829 assert_eq!(store.get(&key).await.unwrap().unwrap(), value);
831 assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
832 assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
833
834 store.upsert(k1, |v| v.push(7)).await.unwrap();
836 assert_eq!(
837 store.get(&k1).await.unwrap().unwrap(),
838 vec![2, 3, 4, 5, 6, 7]
839 );
840
841 let k3 = Digest::random(&mut ctx);
843 store.upsert(k3, |v| v.push(8)).await.unwrap();
844 assert_eq!(store.get(&k3).await.unwrap().unwrap(), vec![8]);
845
846 store.destroy().await.unwrap();
848 });
849 }
850
851 #[test_traced("DEBUG")]
852 fn test_store_log_replay() {
853 let executor = deterministic::Runner::default();
854
855 executor.start(|mut ctx| async move {
856 let mut store = create_test_store(ctx.with_label("store")).await;
857
858 const UPDATES: u64 = 100;
860 let k = Digest::random(&mut ctx);
861 for _ in 0..UPDATES {
862 let v = vec![1, 2, 3, 4, 5];
863 store.update(k, v.clone()).await.unwrap();
864 }
865
866 let iter = store.snapshot.get(&k);
867 assert_eq!(iter.count(), 1);
868
869 store.commit(None).await.unwrap();
870 store.close().await.unwrap();
871
872 let mut store = create_test_store(ctx.with_label("store")).await;
874 store.prune(store.inactivity_floor_loc()).await.unwrap();
875
876 let iter = store.snapshot.get(&k);
877 assert_eq!(iter.count(), 1);
878
879 assert_eq!(store.op_count(), UPDATES * 2 + 2);
881 let expected_floor = UPDATES * 2;
883 assert_eq!(store.inactivity_floor_loc, expected_floor);
884
885 assert_eq!(
888 store.log.oldest_retained_pos(),
889 Some(expected_floor - expected_floor % 7)
890 );
891
892 store.destroy().await.unwrap();
893 });
894 }
895
896 #[test_traced("DEBUG")]
897 fn test_store_build_snapshot_keys_with_shared_prefix() {
898 let executor = deterministic::Runner::default();
899
900 executor.start(|mut ctx| async move {
901 let mut store = create_test_store(ctx.with_label("store")).await;
902
903 let (k1, v1) = (Digest::random(&mut ctx), vec![1, 2, 3, 4, 5]);
904 let (mut k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8, 9, 10]);
905
906 k2.0[0..2].copy_from_slice(&k1.0[0..2]);
908
909 store.update(k1, v1.clone()).await.unwrap();
910 store.update(k2, v2.clone()).await.unwrap();
911
912 assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
913 assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
914
915 store.commit(None).await.unwrap();
916 store.close().await.unwrap();
917
918 let store = create_test_store(ctx.with_label("store")).await;
921
922 assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
923 assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
924
925 store.destroy().await.unwrap();
926 });
927 }
928
929 #[test_traced("DEBUG")]
930 fn test_store_delete() {
931 let executor = deterministic::Runner::default();
932
933 executor.start(|mut ctx| async move {
934 let mut store = create_test_store(ctx.with_label("store")).await;
935
936 let k = Digest::random(&mut ctx);
938 let v = vec![1, 2, 3, 4, 5];
939 store.update(k, v.clone()).await.unwrap();
940
941 let fetched_value = store.get(&k).await.unwrap();
943 assert_eq!(fetched_value.unwrap(), v);
944
945 assert!(store.delete(k).await.unwrap());
947
948 let fetched_value = store.get(&k).await.unwrap();
950 assert!(fetched_value.is_none());
951 assert!(!store.delete(k).await.unwrap());
952
953 store.commit(None).await.unwrap();
955
956 let mut store = create_test_store(ctx.with_label("store")).await;
958 let fetched_value = store.get(&k).await.unwrap();
959 assert!(fetched_value.is_none());
960
961 store.update(k, v.clone()).await.unwrap();
963 let fetched_value = store.get(&k).await.unwrap();
964 assert_eq!(fetched_value.unwrap(), v);
965
966 store.commit(None).await.unwrap();
968
969 let mut store = create_test_store(ctx.with_label("store")).await;
972 let fetched_value = store.get(&k).await.unwrap();
973 assert_eq!(fetched_value.unwrap(), v);
974
975 let k_n = Digest::random(&mut ctx);
977 store.delete(k_n).await.unwrap();
978
979 let iter = store.snapshot.get(&k);
980 assert_eq!(iter.count(), 1);
981
982 let iter = store.snapshot.get(&k_n);
983 assert_eq!(iter.count(), 0);
984
985 store.destroy().await.unwrap();
986 });
987 }
988
989 #[test_traced("DEBUG")]
991 fn test_store_pruning() {
992 let executor = deterministic::Runner::default();
993
994 executor.start(|mut ctx| async move {
995 let mut store = create_test_store(ctx.with_label("store")).await;
996
997 let k_a = Digest::random(&mut ctx);
998 let k_b = Digest::random(&mut ctx);
999
1000 let v_a = vec![1];
1001 let v_b = vec![];
1002 let v_c = vec![4, 5, 6];
1003
1004 store.update(k_a, v_a.clone()).await.unwrap();
1005 store.update(k_b, v_b.clone()).await.unwrap();
1006
1007 store.commit(None).await.unwrap();
1008 assert_eq!(store.op_count(), 5);
1009 assert_eq!(store.inactivity_floor_loc, 2);
1010 assert_eq!(store.get(&k_a).await.unwrap().unwrap(), v_a);
1011
1012 store.update(k_b, v_a.clone()).await.unwrap();
1013 store.update(k_a, v_c.clone()).await.unwrap();
1014
1015 store.commit(None).await.unwrap();
1016 assert_eq!(store.op_count(), 11);
1017 assert_eq!(store.inactivity_floor_loc, 8);
1018 assert_eq!(store.get(&k_a).await.unwrap().unwrap(), v_c);
1019 assert_eq!(store.get(&k_b).await.unwrap().unwrap(), v_a);
1020
1021 store.destroy().await.unwrap();
1022 });
1023 }
1024
1025 #[test_traced("WARN")]
1026 pub fn test_store_db_recovery() {
1027 let executor = deterministic::Runner::default();
1028 const ELEMENTS: u64 = 1000;
1030 executor.start(|context| async move {
1031 let mut db = create_test_store(context.with_label("store")).await;
1032
1033 for i in 0u64..ELEMENTS {
1034 let k = Blake3::hash(&i.to_be_bytes());
1035 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1036 db.update(k, v.clone()).await.unwrap();
1037 }
1038
1039 drop(db);
1041 let mut db = create_test_store(context.with_label("store")).await;
1042 assert_eq!(db.op_count(), 1);
1043
1044 for i in 0u64..ELEMENTS {
1046 let k = Blake3::hash(&i.to_be_bytes());
1047 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1048 db.update(k, v.clone()).await.unwrap();
1049 }
1050 db.commit(None).await.unwrap();
1051 let op_count = db.op_count();
1052
1053 for i in 0u64..ELEMENTS {
1055 if i % 3 != 0 {
1056 continue;
1057 }
1058 let k = Blake3::hash(&i.to_be_bytes());
1059 let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1060 db.update(k, v.clone()).await.unwrap();
1061 }
1062
1063 drop(db);
1065 let mut db = create_test_store(context.with_label("store")).await;
1066 assert_eq!(db.op_count(), op_count);
1067
1068 for i in 0u64..ELEMENTS {
1070 if i % 3 != 0 {
1071 continue;
1072 }
1073 let k = Blake3::hash(&i.to_be_bytes());
1074 let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1075 db.update(k, v.clone()).await.unwrap();
1076 }
1077 db.commit(None).await.unwrap();
1078 let op_count = db.op_count();
1079 assert_eq!(op_count, 1673);
1080 assert_eq!(db.snapshot.items(), 1000);
1081
1082 for i in 0u64..ELEMENTS {
1084 if i % 7 != 1 {
1085 continue;
1086 }
1087 let k = Blake3::hash(&i.to_be_bytes());
1088 db.delete(k).await.unwrap();
1089 }
1090
1091 drop(db);
1093 let db = create_test_store(context.with_label("store")).await;
1094 assert_eq!(db.op_count(), op_count);
1095
1096 db.close().await.unwrap();
1098 let mut db = create_test_store(context.with_label("store")).await;
1099 assert_eq!(db.op_count(), op_count);
1100
1101 for i in 0u64..ELEMENTS {
1103 if i % 7 != 1 {
1104 continue;
1105 }
1106 let k = Blake3::hash(&i.to_be_bytes());
1107 db.delete(k).await.unwrap();
1108 }
1109 db.commit(None).await.unwrap();
1110
1111 assert_eq!(db.op_count(), 1961);
1112 assert_eq!(db.inactivity_floor_loc, 756);
1113
1114 db.prune(db.inactivity_floor_loc()).await.unwrap();
1115 assert_eq!(db.log.oldest_retained_pos(), Some(756 ));
1116 assert_eq!(db.snapshot.items(), 857);
1117
1118 db.destroy().await.unwrap();
1119 });
1120 }
1121
1122 #[test_traced("DEBUG")]
1123 fn test_batch() {
1124 batch_tests::test_batch(
1125 |ctx| async move { create_test_store(ctx.with_label("batch")).await },
1126 );
1127 }
1128}