1use crate::{
89 adb::operation::variable::Operation,
90 index::{Cursor, Index as _, Unordered as Index},
91 journal::{
92 contiguous::fixed::{Config as FConfig, Journal as FJournal},
93 segmented::variable::{Config as VConfig, Journal as VJournal},
94 },
95 mmr::Location,
96 translator::Translator,
97};
98use commonware_codec::{Codec, Read};
99use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage};
100use commonware_utils::{Array, NZUsize};
101use core::future::Future;
102use futures::{pin_mut, try_join, StreamExt};
103use std::{
104 collections::HashMap,
105 num::{NonZeroU64, NonZeroUsize},
106};
107use tracing::{debug, warn};
108
109const SNAPSHOT_READ_BUFFER_SIZE: usize = 1 << 16;
112
113#[derive(thiserror::Error, Debug)]
115pub enum Error {
116 #[error("operation pruned")]
118 OperationPruned(Location),
119
120 #[error(transparent)]
121 Journal(#[from] crate::journal::Error),
122
123 #[error(transparent)]
124 Adb(#[from] crate::adb::Error),
125}
126
127#[derive(Clone)]
129pub struct Config<T: Translator, C> {
130 pub log_journal_partition: String,
132
133 pub log_write_buffer: NonZeroUsize,
135
136 pub log_compression: Option<u8>,
138
139 pub log_codec_config: C,
141
142 pub log_items_per_section: NonZeroU64,
144
145 pub locations_journal_partition: String,
147
148 pub locations_items_per_blob: NonZeroU64,
150
151 pub translator: T,
153
154 pub buffer_pool: PoolRef,
156}
157
158pub trait Db<E: RStorage + Clock + Metrics, K: Array, V: Codec, T: Translator> {
160 fn op_count(&self) -> Location;
163
164 fn inactivity_floor_loc(&self) -> Location;
167
168 fn get(&self, key: &K) -> impl Future<Output = Result<Option<V>, Error>>;
170
171 fn update(&mut self, key: K, value: V) -> impl Future<Output = Result<(), Error>>;
174
175 fn delete(&mut self, key: K) -> impl Future<Output = Result<(), Error>>;
179
180 fn commit(&mut self) -> impl Future<Output = Result<(), Error>>;
186
187 fn sync(&mut self) -> impl Future<Output = Result<(), Error>>;
191
192 fn prune(&mut self, target_prune_loc: Location) -> impl Future<Output = Result<(), Error>>;
195
196 fn close(self) -> impl Future<Output = Result<(), Error>>;
199
200 fn destroy(self) -> impl Future<Output = Result<(), Error>>;
202}
203
204pub struct Store<E, K, V, T>
206where
207 E: RStorage + Clock + Metrics,
208 K: Array,
209 V: Codec,
210 T: Translator,
211{
212 log: VJournal<E, Operation<K, V>>,
214
215 snapshot: Index<T, Location>,
222
223 log_items_per_section: u64,
225
226 locations: FJournal<E, u32>,
229
230 inactivity_floor_loc: Location,
233
234 oldest_retained_loc: Location,
236
237 log_size: Location,
239
240 pub(crate) steps: u64,
243
244 pub(crate) last_commit: Option<Location>,
246}
247
248impl<E, K, V, T> Store<E, K, V, T>
249where
250 E: RStorage + Clock + Metrics,
251 K: Array,
252 V: Codec,
253 T: Translator,
254{
255 pub async fn init(
262 context: E,
263 cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
264 ) -> Result<Self, Error> {
265 let snapshot = Index::init(context.with_label("snapshot"), cfg.translator);
266
267 let log = VJournal::init(
268 context.with_label("log"),
269 VConfig {
270 partition: cfg.log_journal_partition,
271 compression: cfg.log_compression,
272 codec_config: cfg.log_codec_config,
273 buffer_pool: cfg.buffer_pool.clone(),
274 write_buffer: cfg.log_write_buffer,
275 },
276 )
277 .await?;
278
279 let locations = FJournal::init(
280 context.with_label("locations"),
281 FConfig {
282 partition: cfg.locations_journal_partition,
283 items_per_blob: cfg.locations_items_per_blob,
284 write_buffer: cfg.log_write_buffer,
285 buffer_pool: cfg.buffer_pool,
286 },
287 )
288 .await?;
289
290 let db = Self {
291 log,
292 snapshot,
293 log_items_per_section: cfg.log_items_per_section.get(),
294 locations,
295 inactivity_floor_loc: Location::new_unchecked(0),
296 oldest_retained_loc: Location::new_unchecked(0),
297 log_size: Location::new_unchecked(0),
298 steps: 0,
299 last_commit: None,
300 };
301
302 db.build_snapshot_from_log().await
303 }
304
305 pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
309 for &loc in self.snapshot.get(key) {
310 let Operation::Update(k, v) = self.get_op(loc).await? else {
311 unreachable!("location ({loc}) does not reference update operation");
312 };
313
314 if &k == key {
315 return Ok(Some(v));
316 }
317 }
318
319 Ok(None)
320 }
321
322 pub async fn get_loc(&self, loc: Location) -> Result<Option<V>, Error> {
326 assert!(loc < self.log_size);
327 let op = self.get_op(loc).await?;
328
329 Ok(op.into_value())
330 }
331
332 pub async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
338 let new_loc = self.log_size;
339 if let Some(old_loc) = self.get_key_loc(&key).await? {
340 Self::update_loc(&mut self.snapshot, &key, old_loc, new_loc);
341 self.steps += 1;
342 } else {
343 self.snapshot.insert(&key, new_loc);
344 };
345
346 self.apply_op(Operation::Update(key, value))
347 .await
348 .map(|_| ())
349 }
350
351 pub async fn upsert(&mut self, key: K, update: impl FnOnce(&mut V)) -> Result<(), Error>
358 where
359 V: Default,
360 {
361 let mut value = self.get(&key).await?.unwrap_or_default();
362 update(&mut value);
363
364 self.update(key, value).await
365 }
366
367 pub async fn delete(&mut self, key: K) -> Result<(), Error> {
370 let Some(old_loc) = self.get_key_loc(&key).await? else {
371 return Ok(());
373 };
374
375 Self::delete_loc(&mut self.snapshot, &key, old_loc);
376 self.steps += 1;
377
378 self.apply_op(Operation::Delete(key)).await.map(|_| ())
379 }
380
381 pub async fn commit(&mut self, metadata: Option<V>) -> Result<(), Error> {
388 if self.is_empty() {
391 self.inactivity_floor_loc = self.op_count();
392 debug!(tip = ?self.inactivity_floor_loc, "db is empty, raising floor to tip");
393 } else {
394 let steps_to_take = self.steps + 1;
395 for _ in 0..steps_to_take {
396 self.raise_floor().await?;
397 }
398 }
399 self.steps = 0;
400
401 self.apply_op(Operation::CommitFloor(metadata, self.inactivity_floor_loc))
403 .await?;
404 self.last_commit = Some(self.op_count() - 1);
405
406 let section = self.current_section();
407 self.log.sync(section).await?;
408
409 debug!(log_size = ?self.log_size, "commit complete");
410
411 Ok(())
412 }
413
414 async fn raise_floor(&mut self) -> Result<(), Error> {
424 let mut op = self.get_op(self.inactivity_floor_loc).await?;
428 while self
429 .move_op_if_active(op, self.inactivity_floor_loc)
430 .await?
431 .is_none()
432 {
433 self.inactivity_floor_loc += 1;
434 op = self.get_op(self.inactivity_floor_loc).await?;
435 }
436
437 self.inactivity_floor_loc += 1;
439
440 Ok(())
441 }
442
443 fn current_section(&self) -> u64 {
444 *self.log_size / self.log_items_per_section
445 }
446
447 pub async fn sync(&mut self) -> Result<(), Error> {
451 let current_section = *self.log_size / self.log_items_per_section;
452 try_join!(self.log.sync(current_section), self.locations.sync())?;
453
454 Ok(())
455 }
456
457 pub async fn prune(&mut self, target_prune_loc: Location) -> Result<(), Error> {
464 assert!(target_prune_loc <= self.inactivity_floor_loc);
466 if target_prune_loc <= self.oldest_retained_loc {
467 return Ok(());
468 }
469
470 self.locations.sync().await?;
474
475 let section_with_target = *target_prune_loc / self.log_items_per_section;
480 if !self.log.prune(section_with_target).await? {
481 return Ok(());
482 }
483 self.oldest_retained_loc =
484 Location::new_unchecked(section_with_target * self.log_items_per_section);
485 debug!(
486 log_size = ?self.log_size,
487 oldest_retained_loc = ?self.oldest_retained_loc,
488 ?target_prune_loc,
489 "pruned inactive ops"
490 );
491
492 self.locations
494 .prune(*self.oldest_retained_loc)
495 .await
496 .map_err(Error::Journal)?;
497
498 Ok(())
499 }
500
501 pub async fn get_metadata(&self) -> Result<Option<(Location, Option<V>)>, Error> {
508 let Some(last_commit) = self.last_commit else {
509 return Ok(None);
510 };
511
512 let Operation::CommitFloor(metadata, _) = self.get_op(last_commit).await? else {
513 unreachable!("last commit should be a commit floor operation");
514 };
515
516 Ok(Some((last_commit, metadata)))
517 }
518
519 pub async fn close(self) -> Result<(), Error> {
522 try_join!(self.log.close(), self.locations.close())?;
523
524 Ok(())
525 }
526
527 #[cfg(any(test, feature = "fuzzing"))]
529 pub async fn simulate_failure(
530 mut self,
531 sync_locations: bool,
532 sync_log: bool,
533 ) -> Result<(), Error> {
534 if sync_locations {
535 self.locations.sync().await?;
536 }
537 if sync_log {
538 let section = self.current_section();
539 self.log.sync(section).await?;
540 }
541
542 Ok(())
543 }
544
545 pub async fn destroy(self) -> Result<(), Error> {
552 try_join!(self.log.destroy(), self.locations.destroy())?;
553 Ok(())
554 }
555
556 pub fn op_count(&self) -> Location {
559 self.log_size
560 }
561
562 pub fn is_empty(&self) -> bool {
564 self.snapshot.keys() == 0
565 }
566
567 pub fn inactivity_floor_loc(&self) -> Location {
570 self.inactivity_floor_loc
571 }
572
573 async fn build_snapshot_from_log(mut self) -> Result<Self, Error> {
579 let mut locations_size = self.locations.size().await;
580
581 let mut after_last_commit = None;
583 let mut uncommitted_ops = HashMap::new();
585 let mut oldest_retained_loc_found = false;
586 {
587 let stream = self
588 .log
589 .replay(0, 0, NZUsize!(SNAPSHOT_READ_BUFFER_SIZE))
590 .await?;
591 pin_mut!(stream);
592 while let Some(result) = stream.next().await {
593 match result {
594 Err(e) => {
595 return Err(Error::Journal(e));
596 }
597 Ok((section, offset, _, op)) => {
598 if !oldest_retained_loc_found {
599 self.log_size =
600 Location::new_unchecked(section * self.log_items_per_section);
601 self.oldest_retained_loc = self.log_size;
602 oldest_retained_loc_found = true;
603 }
604
605 let loc = self.log_size; if after_last_commit.is_none() {
607 after_last_commit = Some((loc, offset));
608 }
609
610 self.log_size += 1;
611
612 let expected = *loc / self.log_items_per_section;
615 assert_eq!(section, expected,
616 "given section {section} did not match expected section {expected} from location {loc}");
617
618 if self.log_size > locations_size {
619 warn!(section, offset, "operation was missing from location map");
620 self.locations.append(offset).await?;
621 locations_size += 1;
622 }
623
624 match op {
625 Operation::Delete(key) => {
626 let result = self.get_key_loc(&key).await?;
627 if let Some(old_loc) = result {
628 uncommitted_ops.insert(key, (Some(old_loc), None));
629 } else {
630 uncommitted_ops.remove(&key);
631 }
632 }
633 Operation::Update(key, _) => {
634 let result = self.get_key_loc(&key).await?;
635 if let Some(old_loc) = result {
636 uncommitted_ops.insert(key, (Some(old_loc), Some(loc)));
637 } else {
638 uncommitted_ops.insert(key, (None, Some(loc)));
639 }
640 }
641 Operation::CommitFloor(_, loc) => {
642 self.inactivity_floor_loc = loc;
643
644 for (key, (old_loc, new_loc)) in uncommitted_ops.iter() {
646 if let Some(old_loc) = old_loc {
647 if let Some(new_loc) = new_loc {
648 Self::update_loc(
649 &mut self.snapshot,
650 key,
651 *old_loc,
652 *new_loc,
653 );
654 } else {
655 Self::delete_loc(&mut self.snapshot, key, *old_loc);
656 }
657 } else {
658 assert!(new_loc.is_some());
659 self.snapshot.insert(key, new_loc.unwrap());
660 }
661 }
662 uncommitted_ops.clear();
663 after_last_commit = None;
664 }
665 _ => unreachable!(
666 "unexpected operation type at offset {offset} of section {section}"
667 ),
668 }
669 }
670 }
671 }
672 }
673
674 if let Some((end_loc, end_offset)) = after_last_commit {
676 assert!(!uncommitted_ops.is_empty());
677 warn!(
678 op_count = uncommitted_ops.len(),
679 log_size = ?end_loc,
680 end_offset,
681 "rewinding over uncommitted operations at end of log"
682 );
683 let prune_to_section = *end_loc / self.log_items_per_section;
684 self.log
685 .rewind_to_offset(prune_to_section, end_offset)
686 .await?;
687 self.log.sync(prune_to_section).await?;
688 self.log_size = end_loc;
689 }
690
691 if locations_size > self.log_size {
693 warn!(
694 locations_size,
695 log_size = ?self.log_size,
696 "rewinding uncommitted locations"
697 );
698 self.locations.rewind(*self.log_size).await?;
699 self.locations.sync().await?;
700 }
701
702 assert_eq!(self.log_size, self.locations.size().await);
704 self.last_commit = self
705 .locations
706 .size()
707 .await
708 .checked_sub(1)
709 .map(Location::new_unchecked);
710 assert!(
711 self.last_commit.is_none()
712 || matches!(
713 self.get_op(self.last_commit.unwrap()).await?,
714 Operation::CommitFloor(_, _)
715 )
716 );
717
718 debug!(log_size = ?self.log_size, "build_snapshot_from_log complete");
719
720 Ok(self)
721 }
722
723 async fn apply_op(&mut self, op: Operation<K, V>) -> Result<u32, Error> {
726 let section = self.current_section();
728 let (offset, _) = self.log.append(section, op).await?;
729
730 self.locations.append(offset).await?;
732
733 self.log_size += 1;
735
736 if self.current_section() != section {
738 self.log.sync(section).await?;
739 }
740
741 Ok(offset)
742 }
743
744 async fn get_key_loc(&self, key: &K) -> Result<Option<Location>, Error> {
747 for loc in self.snapshot.get(key) {
748 match self.get_op(*loc).await {
749 Ok(Operation::Update(k, _)) => {
750 if k == *key {
751 return Ok(Some(*loc));
752 }
753 }
754 Err(Error::OperationPruned(_)) => {
755 unreachable!("invalid location in snapshot: loc={loc}")
756 }
757 _ => unreachable!("non-update operation referenced by snapshot: loc={loc}"),
758 }
759 }
760
761 Ok(None)
762 }
763
764 async fn get_op(&self, loc: Location) -> Result<Operation<K, V>, Error> {
768 assert!(loc < self.log_size);
769 if loc < self.oldest_retained_loc {
770 return Err(Error::OperationPruned(loc));
771 }
772
773 let section = *loc / self.log_items_per_section;
774 let offset = self.locations.read(*loc).await?;
775
776 self.log.get(section, offset).await.map_err(Error::Journal)
778 }
779
780 fn update_loc(
782 snapshot: &mut Index<T, Location>,
783 key: &K,
784 old_loc: Location,
785 new_loc: Location,
786 ) {
787 let Some(mut cursor) = snapshot.get_mut(key) else {
788 return;
789 };
790
791 if cursor.find(|&loc| loc == old_loc) {
793 cursor.update(new_loc);
794 }
795 }
796
797 fn delete_loc(snapshot: &mut Index<T, Location>, key: &K, old_loc: Location) {
799 let Some(mut cursor) = snapshot.get_mut(key) else {
800 return;
801 };
802
803 if cursor.find(|&loc| loc == old_loc) {
805 cursor.delete();
806 }
807 }
808
809 async fn move_op_if_active(
813 &mut self,
814 op: Operation<K, V>,
815 old_loc: Location,
816 ) -> Result<Option<Location>, Error> {
817 let Some(key) = op.key() else {
819 return Ok(None);
821 };
822
823 let Some(mut cursor) = self.snapshot.get_mut(key) else {
824 return Ok(None);
825 };
826
827 let new_loc = self.log_size;
828
829 if cursor.find(|&loc| loc == old_loc) {
830 cursor.update(new_loc);
832 drop(cursor);
833
834 self.apply_op(op).await?;
835 Ok(Some(old_loc))
836 } else {
837 Ok(None)
839 }
840 }
841}
842
843impl<E, K, V, T> Db<E, K, V, T> for Store<E, K, V, T>
844where
845 E: RStorage + Clock + Metrics,
846 K: Array,
847 V: Codec,
848 T: Translator,
849{
850 fn op_count(&self) -> Location {
851 self.op_count()
852 }
853
854 fn inactivity_floor_loc(&self) -> Location {
855 self.inactivity_floor_loc()
856 }
857
858 async fn get(&self, key: &K) -> Result<Option<V>, Error> {
859 self.get(key).await
860 }
861
862 async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
863 self.update(key, value).await
864 }
865
866 async fn delete(&mut self, key: K) -> Result<(), Error> {
867 self.delete(key).await
868 }
869
870 async fn commit(&mut self) -> Result<(), Error> {
871 self.commit(None).await
872 }
873
874 async fn sync(&mut self) -> Result<(), Error> {
875 self.sync().await
876 }
877
878 async fn prune(&mut self, target_prune_loc: Location) -> Result<(), Error> {
879 self.prune(target_prune_loc).await
880 }
881
882 async fn close(self) -> Result<(), Error> {
883 self.close().await
884 }
885
886 async fn destroy(self) -> Result<(), Error> {
887 self.destroy().await
888 }
889}
890
891#[cfg(test)]
892mod test {
893 use super::*;
894 use crate::translator::TwoCap;
895 use commonware_cryptography::{
896 blake3::{Blake3, Digest},
897 Digest as _, Hasher as _,
898 };
899 use commonware_macros::test_traced;
900 use commonware_runtime::{deterministic, Runner};
901 use commonware_utils::{NZUsize, NZU64};
902
903 const PAGE_SIZE: usize = 77;
904 const PAGE_CACHE_SIZE: usize = 9;
905
906 type TestStore = Store<deterministic::Context, Digest, Vec<u8>, TwoCap>;
908
909 async fn create_test_store(context: deterministic::Context) -> TestStore {
910 let cfg = Config {
911 log_journal_partition: "journal".to_string(),
912 log_write_buffer: NZUsize!(64 * 1024),
913 log_compression: None,
914 log_codec_config: ((0..=10000).into(), ()),
915 log_items_per_section: NZU64!(7),
916 locations_journal_partition: "locations_journal".to_string(),
917 locations_items_per_blob: NZU64!(11),
918 translator: TwoCap,
919 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
920 };
921 Store::init(context, cfg).await.unwrap()
922 }
923
924 #[test_traced("DEBUG")]
925 pub fn test_store_construct_empty() {
926 let executor = deterministic::Runner::default();
927 executor.start(|mut context| async move {
928 let mut db = create_test_store(context.clone()).await;
929 assert_eq!(db.op_count(), 0);
930 assert_eq!(db.oldest_retained_loc, 0);
931 assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
932 assert!(db.get_metadata().await.unwrap().is_none());
933
934 let d1 = Digest::random(&mut context);
936 let v1 = vec![1, 2, 3];
937 db.update(d1, v1).await.unwrap();
938 db.close().await.unwrap();
939 let mut db = create_test_store(context.clone()).await;
940 assert_eq!(db.op_count(), 0);
941
942 db.commit(None).await.unwrap();
944 assert_eq!(db.op_count(), 1);
945 assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
946 let mut db = create_test_store(context.clone()).await;
947
948 db.update(Digest::random(&mut context), vec![1, 2, 3])
951 .await
952 .unwrap();
953 for _ in 1..100 {
954 db.commit(None).await.unwrap();
955 assert!(db.op_count() - db.inactivity_floor_loc <= 3);
958 }
959
960 db.destroy().await.unwrap();
961 });
962 }
963
964 #[test_traced("DEBUG")]
965 fn test_store_construct_basic() {
966 let executor = deterministic::Runner::default();
967
968 executor.start(|mut ctx| async move {
969 let mut store = create_test_store(ctx.with_label("store")).await;
970
971 assert_eq!(store.op_count(), 0);
973 assert_eq!(store.inactivity_floor_loc, 0);
974
975 let key = Digest::random(&mut ctx);
976 let value = vec![2, 3, 4, 5];
977
978 let result = store.get(&key).await;
980 assert!(result.unwrap().is_none());
981
982 store.update(key, value.clone()).await.unwrap();
984
985 assert_eq!(store.log_size, 1);
986 assert_eq!(store.inactivity_floor_loc, 0);
987
988 let fetched_value = store.get(&key).await.unwrap();
990 assert_eq!(fetched_value.unwrap(), value);
991
992 store.sync().await.unwrap();
994
995 let mut store = create_test_store(ctx.with_label("store")).await;
997
998 assert_eq!(store.log_size, 0);
1000 assert_eq!(store.inactivity_floor_loc, 0);
1001 assert!(store.get_metadata().await.unwrap().is_none());
1002
1003 store.update(key, value.clone()).await.unwrap();
1005
1006 assert_eq!(store.log_size, 1);
1007 assert_eq!(store.inactivity_floor_loc, 0);
1008
1009 let metadata = Some(vec![99, 100]);
1011 store.commit(metadata.clone()).await.unwrap();
1012 assert_eq!(
1013 store.get_metadata().await.unwrap(),
1014 Some((Location::new_unchecked(2), metadata.clone()))
1015 );
1016
1017 assert_eq!(store.log_size, 3);
1021 assert_eq!(store.inactivity_floor_loc, 1);
1022
1023 let mut store = create_test_store(ctx.with_label("store")).await;
1025
1026 assert_eq!(store.log_size, 3);
1028 assert_eq!(store.inactivity_floor_loc, 1);
1029
1030 let fetched_value = store.get(&key).await.unwrap();
1032 assert_eq!(fetched_value.unwrap(), value);
1033
1034 let (k1, v1) = (Digest::random(&mut ctx), vec![2, 3, 4, 5, 6]);
1036 let (k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8]);
1037 store.update(k1, v1.clone()).await.unwrap();
1038 store.update(k2, v2.clone()).await.unwrap();
1039
1040 assert_eq!(store.log_size, 5);
1041 assert_eq!(store.inactivity_floor_loc, 1);
1042
1043 assert_eq!(
1045 store.get_metadata().await.unwrap(),
1046 Some((Location::new_unchecked(2), metadata))
1047 );
1048
1049 store.commit(None).await.unwrap();
1050 assert_eq!(
1051 store.get_metadata().await.unwrap(),
1052 Some((Location::new_unchecked(6), None))
1053 );
1054
1055 assert_eq!(store.log_size, 7);
1056 assert_eq!(store.inactivity_floor_loc, 2);
1057
1058 assert_eq!(store.get(&key).await.unwrap().unwrap(), value);
1060 assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
1061 assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
1062
1063 store.destroy().await.unwrap();
1065 });
1066 }
1067
1068 #[test_traced("DEBUG")]
1069 fn test_store_log_replay() {
1070 let executor = deterministic::Runner::default();
1071
1072 executor.start(|mut ctx| async move {
1073 let mut store = create_test_store(ctx.with_label("store")).await;
1074
1075 const UPDATES: u64 = 100;
1077 let k = Digest::random(&mut ctx);
1078 for _ in 0..UPDATES {
1079 let v = vec![1, 2, 3, 4, 5];
1080 store.update(k, v.clone()).await.unwrap();
1081 }
1082
1083 let iter = store.snapshot.get(&k);
1084 assert_eq!(iter.count(), 1);
1085
1086 store.commit(None).await.unwrap();
1087 store.close().await.unwrap();
1088
1089 let mut store = create_test_store(ctx.with_label("store")).await;
1091 store.prune(store.inactivity_floor_loc()).await.unwrap();
1092
1093 let iter = store.snapshot.get(&k);
1094 assert_eq!(iter.count(), 1);
1095
1096 assert_eq!(store.log_size, UPDATES * 2 + 1);
1098 let expected_floor = UPDATES * 2 - 1;
1100 assert_eq!(store.inactivity_floor_loc, expected_floor);
1101
1102 assert_eq!(
1105 store.oldest_retained_loc,
1106 expected_floor - expected_floor % 7
1107 );
1108
1109 store.destroy().await.unwrap();
1110 });
1111 }
1112
1113 #[test_traced("DEBUG")]
1114 fn test_store_build_snapshot_keys_with_shared_prefix() {
1115 let executor = deterministic::Runner::default();
1116
1117 executor.start(|mut ctx| async move {
1118 let mut store = create_test_store(ctx.with_label("store")).await;
1119
1120 let (k1, v1) = (Digest::random(&mut ctx), vec![1, 2, 3, 4, 5]);
1121 let (mut k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8, 9, 10]);
1122
1123 k2.0[0..2].copy_from_slice(&k1.0[0..2]);
1125
1126 store.update(k1, v1.clone()).await.unwrap();
1127 store.update(k2, v2.clone()).await.unwrap();
1128
1129 assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
1130 assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
1131
1132 store.commit(None).await.unwrap();
1133 store.close().await.unwrap();
1134
1135 let store = create_test_store(ctx.with_label("store")).await;
1138
1139 assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
1140 assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
1141
1142 store.destroy().await.unwrap();
1143 });
1144 }
1145
1146 #[test_traced("DEBUG")]
1147 fn test_store_delete() {
1148 let executor = deterministic::Runner::default();
1149
1150 executor.start(|mut ctx| async move {
1151 let mut store = create_test_store(ctx.with_label("store")).await;
1152
1153 let k = Digest::random(&mut ctx);
1155 let v = vec![1, 2, 3, 4, 5];
1156 store.update(k, v.clone()).await.unwrap();
1157
1158 let fetched_value = store.get(&k).await.unwrap();
1160 assert_eq!(fetched_value.unwrap(), v);
1161
1162 store.delete(k).await.unwrap();
1164
1165 let fetched_value = store.get(&k).await.unwrap();
1167 assert!(fetched_value.is_none());
1168
1169 store.commit(None).await.unwrap();
1171
1172 let mut store = create_test_store(ctx.with_label("store")).await;
1174 let fetched_value = store.get(&k).await.unwrap();
1175 assert!(fetched_value.is_none());
1176
1177 store.update(k, v.clone()).await.unwrap();
1179 let fetched_value = store.get(&k).await.unwrap();
1180 assert_eq!(fetched_value.unwrap(), v);
1181
1182 store.commit(None).await.unwrap();
1184
1185 let mut store = create_test_store(ctx.with_label("store")).await;
1188 let fetched_value = store.get(&k).await.unwrap();
1189 assert_eq!(fetched_value.unwrap(), v);
1190
1191 let k_n = Digest::random(&mut ctx);
1193 store.delete(k_n).await.unwrap();
1194
1195 let iter = store.snapshot.get(&k);
1196 assert_eq!(iter.count(), 1);
1197
1198 let iter = store.snapshot.get(&k_n);
1199 assert_eq!(iter.count(), 0);
1200
1201 store.destroy().await.unwrap();
1202 });
1203 }
1204
1205 #[test_traced("DEBUG")]
1207 fn test_store_pruning() {
1208 let executor = deterministic::Runner::default();
1209
1210 executor.start(|mut ctx| async move {
1211 let mut store = create_test_store(ctx.with_label("store")).await;
1212
1213 let k_a = Digest::random(&mut ctx);
1214 let k_b = Digest::random(&mut ctx);
1215
1216 let v_a = vec![1];
1217 let v_b = vec![];
1218 let v_c = vec![4, 5, 6];
1219
1220 store.update(k_a, v_a.clone()).await.unwrap();
1221 store.update(k_b, v_b.clone()).await.unwrap();
1222
1223 store.commit(None).await.unwrap();
1224 assert_eq!(store.op_count(), 4);
1225 assert_eq!(store.inactivity_floor_loc, 1);
1226 assert_eq!(store.get(&k_a).await.unwrap().unwrap(), v_a);
1227
1228 store.update(k_b, v_a.clone()).await.unwrap();
1229 store.update(k_a, v_c.clone()).await.unwrap();
1230
1231 store.commit(None).await.unwrap();
1232 assert_eq!(store.op_count(), 10);
1233 assert_eq!(store.inactivity_floor_loc, 7);
1234 assert_eq!(store.get(&k_a).await.unwrap().unwrap(), v_c);
1235 assert_eq!(store.get(&k_b).await.unwrap().unwrap(), v_a);
1236
1237 store.destroy().await.unwrap();
1238 });
1239 }
1240
1241 #[test_traced("WARN")]
1242 pub fn test_store_db_recovery() {
1243 let executor = deterministic::Runner::default();
1244 const ELEMENTS: u64 = 1000;
1246 executor.start(|context| async move {
1247 let mut db = create_test_store(context.with_label("store")).await;
1248
1249 for i in 0u64..ELEMENTS {
1250 let k = Blake3::hash(&i.to_be_bytes());
1251 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1252 db.update(k, v.clone()).await.unwrap();
1253 }
1254
1255 db.simulate_failure(false, false).await.unwrap();
1257 let mut db = create_test_store(context.with_label("store")).await;
1258 assert_eq!(db.op_count(), 0);
1259
1260 for i in 0u64..ELEMENTS {
1262 let k = Blake3::hash(&i.to_be_bytes());
1263 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1264 db.update(k, v.clone()).await.unwrap();
1265 }
1266 db.commit(None).await.unwrap();
1267 let op_count = db.op_count();
1268
1269 for i in 0u64..ELEMENTS {
1271 if i % 3 != 0 {
1272 continue;
1273 }
1274 let k = Blake3::hash(&i.to_be_bytes());
1275 let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1276 db.update(k, v.clone()).await.unwrap();
1277 }
1278
1279 db.simulate_failure(false, false).await.unwrap();
1281 let mut db = create_test_store(context.with_label("store")).await;
1282 assert_eq!(db.op_count(), op_count);
1283
1284 for i in 0u64..ELEMENTS {
1286 if i % 3 != 0 {
1287 continue;
1288 }
1289 let k = Blake3::hash(&i.to_be_bytes());
1290 let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1291 db.update(k, v.clone()).await.unwrap();
1292 }
1293 db.commit(None).await.unwrap();
1294 let op_count = db.op_count();
1295 assert_eq!(op_count, 1672);
1296 assert_eq!(db.snapshot.items(), 1000);
1297
1298 for i in 0u64..ELEMENTS {
1300 if i % 7 != 1 {
1301 continue;
1302 }
1303 let k = Blake3::hash(&i.to_be_bytes());
1304 db.delete(k).await.unwrap();
1305 }
1306
1307 db.simulate_failure(false, false).await.unwrap();
1309 let db = create_test_store(context.with_label("store")).await;
1310 assert_eq!(db.op_count(), op_count);
1311
1312 db.close().await.unwrap();
1314 let mut db = create_test_store(context.with_label("store")).await;
1315 assert_eq!(db.op_count(), op_count);
1316
1317 for i in 0u64..ELEMENTS {
1319 if i % 7 != 1 {
1320 continue;
1321 }
1322 let k = Blake3::hash(&i.to_be_bytes());
1323 db.delete(k).await.unwrap();
1324 }
1325 db.commit(None).await.unwrap();
1326
1327 assert_eq!(db.op_count(), 1960);
1328 assert_eq!(db.inactivity_floor_loc, 755);
1329
1330 db.prune(db.inactivity_floor_loc()).await.unwrap();
1331 assert_eq!(db.oldest_retained_loc, 755 - 755 % 7);
1332 assert_eq!(db.snapshot.items(), 857);
1333
1334 db.destroy().await.unwrap();
1335 });
1336 }
1337}