1use crate::{
101 index::Index,
102 journal::{
103 fixed::{Config as FConfig, Journal as FJournal},
104 variable::{Config as VConfig, Journal as VJournal},
105 },
106 store::operation::Variable as Operation,
107 translator::Translator,
108};
109use commonware_codec::{Codec, Read};
110use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage};
111use commonware_utils::{Array, NZUsize};
112use futures::{pin_mut, try_join, StreamExt};
113use std::{
114 collections::HashMap,
115 num::{NonZeroU64, NonZeroUsize},
116};
117use tracing::{debug, warn};
118
119pub mod operation;
120
121const SNAPSHOT_READ_BUFFER_SIZE: usize = 1 << 16;
124
125#[derive(thiserror::Error, Debug)]
127pub enum Error {
128 #[error(transparent)]
129 Journal(#[from] crate::journal::Error),
130
131 #[error("operation pruned")]
133 OperationPruned(u64),
134}
135
136#[derive(Clone)]
138pub struct Config<T: Translator, C> {
139 pub log_journal_partition: String,
141
142 pub log_write_buffer: NonZeroUsize,
144
145 pub log_compression: Option<u8>,
147
148 pub log_codec_config: C,
150
151 pub log_items_per_section: NonZeroU64,
153
154 pub locations_journal_partition: String,
156
157 pub locations_items_per_blob: NonZeroU64,
159
160 pub translator: T,
162
163 pub buffer_pool: PoolRef,
165}
166
167pub struct Store<E, K, V, T>
169where
170 E: RStorage + Clock + Metrics,
171 K: Array,
172 V: Codec,
173 T: Translator,
174{
175 log: VJournal<E, Operation<K, V>>,
177
178 snapshot: Index<T, u64>,
185
186 log_items_per_section: u64,
188
189 locations: FJournal<E, u32>,
192
193 inactivity_floor_loc: u64,
196
197 oldest_retained_loc: u64,
199
200 log_size: u64,
202
203 uncommitted_ops: u64,
205}
206
207impl<E, K, V, T> Store<E, K, V, T>
208where
209 E: RStorage + Clock + Metrics,
210 K: Array,
211 V: Codec,
212 T: Translator,
213{
214 pub async fn init(
221 context: E,
222 cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
223 ) -> Result<Self, Error> {
224 let snapshot: Index<T, u64> = Index::init(context.with_label("snapshot"), cfg.translator);
225
226 let log = VJournal::init(
227 context.with_label("log"),
228 VConfig {
229 partition: cfg.log_journal_partition,
230 compression: cfg.log_compression,
231 codec_config: cfg.log_codec_config,
232 buffer_pool: cfg.buffer_pool.clone(),
233 write_buffer: cfg.log_write_buffer,
234 },
235 )
236 .await?;
237
238 let locations = FJournal::init(
239 context.with_label("locations"),
240 FConfig {
241 partition: cfg.locations_journal_partition,
242 items_per_blob: cfg.locations_items_per_blob,
243 write_buffer: cfg.log_write_buffer,
244 buffer_pool: cfg.buffer_pool,
245 },
246 )
247 .await?;
248
249 let db = Self {
250 log,
251 snapshot,
252 log_items_per_section: cfg.log_items_per_section.get(),
253 locations,
254 inactivity_floor_loc: 0,
255 oldest_retained_loc: 0,
256 log_size: 0,
257 uncommitted_ops: 0,
258 };
259
260 db.build_snapshot_from_log().await
261 }
262
263 pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
267 for &loc in self.snapshot.get(key) {
268 let Operation::Update(k, v) = self.get_op(loc).await? else {
269 unreachable!("location ({loc}) does not reference update operation");
270 };
271
272 if &k == key {
273 return Ok(Some(v));
274 }
275 }
276
277 Ok(None)
278 }
279
280 pub async fn get_loc(&self, loc: u64) -> Result<Option<V>, Error> {
284 assert!(loc < self.log_size);
285 let op = self.get_op(loc).await?;
286
287 Ok(op.into_value())
288 }
289
290 pub async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
296 let new_loc = self.log_size;
297 if let Some(old_loc) = self.get_key_loc(&key).await? {
298 Self::update_loc(&mut self.snapshot, &key, old_loc, new_loc);
299 } else {
300 self.snapshot.insert(&key, new_loc);
301 };
302
303 self.apply_op(Operation::Update(key, value))
304 .await
305 .map(|_| ())
306 }
307
308 pub async fn delete(&mut self, key: K) -> Result<(), Error> {
311 let Some(old_loc) = self.get_key_loc(&key).await? else {
312 return Ok(());
314 };
315
316 Self::delete_loc(&mut self.snapshot, &key, old_loc);
317
318 self.apply_op(Operation::Delete(key)).await.map(|_| ())
319 }
320
321 pub async fn commit(&mut self, metadata: Option<V>) -> Result<(), Error> {
328 self.raise_inactivity_floor(metadata, self.uncommitted_ops + 1)
329 .await?;
330 self.uncommitted_ops = 0;
331
332 let section = self.current_section();
333 self.log.sync(section).await?;
334 debug!(log_size = self.log_size, "commit complete");
335
336 Ok(())
337 }
338
339 fn current_section(&self) -> u64 {
340 self.log_size / self.log_items_per_section
341 }
342
343 pub async fn sync(&mut self) -> Result<(), Error> {
347 let current_section = self.log_size / self.log_items_per_section;
348 try_join!(self.log.sync(current_section), self.locations.sync())?;
349
350 Ok(())
351 }
352
353 pub async fn prune(&mut self, target_prune_loc: u64) -> Result<(), Error> {
360 assert!(target_prune_loc <= self.inactivity_floor_loc);
362 if target_prune_loc <= self.oldest_retained_loc {
363 return Ok(());
364 }
365
366 self.locations.sync().await?;
370
371 let section_with_target = target_prune_loc / self.log_items_per_section;
376 if !self.log.prune(section_with_target).await? {
377 return Ok(());
378 }
379 self.oldest_retained_loc = section_with_target * self.log_items_per_section;
380 debug!(
381 log_size = self.log_size,
382 oldest_retained_loc = self.oldest_retained_loc,
383 target_prune_loc,
384 "pruned inactive ops"
385 );
386
387 self.locations
389 .prune(self.oldest_retained_loc)
390 .await
391 .map_err(Error::Journal)?;
392
393 Ok(())
394 }
395
396 pub async fn get_metadata(&self) -> Result<Option<(u64, Option<V>)>, Error> {
399 let mut last_commit = self.op_count() - self.uncommitted_ops;
400 if last_commit == 0 {
401 return Ok(None);
402 }
403 last_commit -= 1;
404 let section = last_commit / self.log_items_per_section;
405 let offset = self.locations.read(last_commit).await?;
406 let Some(Operation::CommitFloor(metadata, _)) = self.log.get(section, offset).await? else {
407 unreachable!("no commit operation at location of last commit {last_commit}");
408 };
409
410 Ok(Some((last_commit, metadata)))
411 }
412
413 pub async fn close(self) -> Result<(), Error> {
416 if self.uncommitted_ops > 0 {
417 warn!(
418 log_size = self.log_size,
419 uncommitted_ops = self.uncommitted_ops,
420 "closing store with uncommitted operations"
421 );
422 }
423
424 try_join!(self.log.close(), self.locations.close())?;
425 Ok(())
426 }
427
428 #[cfg(test)]
430 pub async fn simulate_failure(
431 mut self,
432 sync_locations: bool,
433 sync_log: bool,
434 ) -> Result<(), Error> {
435 if sync_locations {
436 self.locations.sync().await?;
437 }
438 if sync_log {
439 let section = self.current_section();
440 self.log.sync(section).await?;
441 }
442
443 Ok(())
444 }
445
446 pub async fn destroy(self) -> Result<(), Error> {
453 try_join!(self.log.destroy(), self.locations.destroy())?;
454 Ok(())
455 }
456
457 pub fn op_count(&self) -> u64 {
460 self.log_size
461 }
462
463 pub fn inactivity_floor_loc(&self) -> u64 {
466 self.inactivity_floor_loc
467 }
468
469 async fn build_snapshot_from_log(mut self) -> Result<Self, Error> {
475 let mut locations_size = self.locations.size().await?;
476
477 let mut after_last_commit = None;
479 let mut uncommitted_ops = HashMap::new();
481 let mut oldest_retained_loc_found = false;
482 {
483 let stream = self
484 .log
485 .replay(0, 0, NZUsize!(SNAPSHOT_READ_BUFFER_SIZE))
486 .await?;
487 pin_mut!(stream);
488 while let Some(result) = stream.next().await {
489 match result {
490 Err(e) => {
491 return Err(Error::Journal(e));
492 }
493 Ok((section, offset, _, op)) => {
494 if !oldest_retained_loc_found {
495 self.log_size = section * self.log_items_per_section;
496 self.oldest_retained_loc = self.log_size;
497 oldest_retained_loc_found = true;
498 }
499
500 let loc = self.log_size; if after_last_commit.is_none() {
502 after_last_commit = Some((loc, offset));
503 }
504
505 self.log_size += 1;
506
507 let expected = loc / self.log_items_per_section;
510 assert_eq!(section, expected,
511 "given section {section} did not match expected section {expected} from location {loc}");
512
513 if self.log_size > locations_size {
514 warn!(section, offset, "operation was missing from location map");
515 self.locations.append(offset).await?;
516 locations_size += 1;
517 }
518
519 match op {
520 Operation::Delete(key) => {
521 let result = self.get_key_loc(&key).await?;
522 if let Some(old_loc) = result {
523 uncommitted_ops.insert(key, (Some(old_loc), None));
524 } else {
525 uncommitted_ops.remove(&key);
526 }
527 }
528 Operation::Update(key, _) => {
529 let result = self.get_key_loc(&key).await?;
530 if let Some(old_loc) = result {
531 uncommitted_ops.insert(key, (Some(old_loc), Some(loc)));
532 } else {
533 uncommitted_ops.insert(key, (None, Some(loc)));
534 }
535 }
536 Operation::CommitFloor(_, loc) => {
537 self.inactivity_floor_loc = loc;
538
539 for (key, (old_loc, new_loc)) in uncommitted_ops.iter() {
541 if let Some(old_loc) = old_loc {
542 if let Some(new_loc) = new_loc {
543 Self::update_loc(
544 &mut self.snapshot,
545 key,
546 *old_loc,
547 *new_loc,
548 );
549 } else {
550 Self::delete_loc(&mut self.snapshot, key, *old_loc);
551 }
552 } else {
553 assert!(new_loc.is_some());
554 self.snapshot.insert(key, new_loc.unwrap());
555 }
556 }
557 uncommitted_ops.clear();
558 after_last_commit = None;
559 }
560 _ => unreachable!(
561 "unexpected operation type at offset {offset} of section {section}"
562 ),
563 }
564 }
565 }
566 }
567 }
568
569 if let Some((end_loc, end_offset)) = after_last_commit {
571 assert!(!uncommitted_ops.is_empty());
572 warn!(
573 op_count = uncommitted_ops.len(),
574 log_size = end_loc,
575 end_offset,
576 "rewinding over uncommitted operations at end of log"
577 );
578 let prune_to_section = end_loc / self.log_items_per_section;
579 self.log
580 .rewind_to_offset(prune_to_section, end_offset)
581 .await?;
582 self.log.sync(prune_to_section).await?;
583 self.log_size = end_loc;
584 }
585
586 if locations_size > self.log_size {
588 warn!(
589 locations_size,
590 log_size = self.log_size,
591 "rewinding uncommitted locations"
592 );
593 self.locations.rewind(self.log_size).await?;
594 self.locations.sync().await?;
595 }
596
597 assert_eq!(self.log_size, self.locations.size().await?);
599
600 debug!(log_size = self.log_size, "build_snapshot_from_log complete");
601
602 Ok(self)
603 }
604
605 async fn apply_op(&mut self, op: Operation<K, V>) -> Result<u32, Error> {
608 let section = self.current_section();
610 let (offset, _) = self.log.append(section, op).await?;
611
612 self.locations.append(offset).await?;
614
615 self.uncommitted_ops += 1;
617 self.log_size += 1;
618
619 if self.current_section() != section {
621 self.log.sync(section).await?;
622 }
623
624 Ok(offset)
625 }
626
627 async fn get_key_loc(&self, key: &K) -> Result<Option<u64>, Error> {
630 for loc in self.snapshot.get(key) {
631 match self.get_op(*loc).await {
632 Ok(Operation::Update(k, _)) => {
633 if k == *key {
634 return Ok(Some(*loc));
635 }
636 }
637 Err(Error::OperationPruned(_)) => {
638 unreachable!("invalid location in snapshot: loc={loc}")
639 }
640 _ => unreachable!("non-update operation referenced by snapshot: loc={loc}"),
641 }
642 }
643
644 Ok(None)
645 }
646
647 async fn get_op(&self, loc: u64) -> Result<Operation<K, V>, Error> {
651 assert!(loc < self.log_size);
652 if loc < self.oldest_retained_loc {
653 return Err(Error::OperationPruned(loc));
654 }
655
656 let section = loc / self.log_items_per_section;
657 let offset = self.locations.read(loc).await?;
658
659 let Some(op) = self.log.get(section, offset).await? else {
661 panic!("invalid location {loc}");
662 };
663
664 Ok(op)
665 }
666
667 fn update_loc(snapshot: &mut Index<T, u64>, key: &K, old_loc: u64, new_loc: u64) {
669 let Some(mut cursor) = snapshot.get_mut(key) else {
670 return;
671 };
672
673 while let Some(loc) = cursor.next() {
675 if *loc == old_loc {
676 cursor.update(new_loc);
678 return;
679 }
680 }
681 }
682
683 fn delete_loc(snapshot: &mut Index<T, u64>, key: &K, old_loc: u64) {
685 let Some(mut cursor) = snapshot.get_mut(key) else {
686 return;
687 };
688
689 while let Some(loc) = cursor.next() {
691 if *loc == old_loc {
692 cursor.delete();
694 return;
695 }
696 }
697 }
698
699 async fn move_op_if_active(
703 &mut self,
704 op: Operation<K, V>,
705 old_loc: u64,
706 ) -> Result<Option<u64>, Error> {
707 let Some(key) = op.key() else {
709 return Ok(None);
711 };
712
713 let Some(mut cursor) = self.snapshot.get_mut(key) else {
714 return Ok(None);
715 };
716
717 let new_loc = self.log_size;
718
719 while let Some(&loc) = cursor.next() {
721 if loc == old_loc {
722 cursor.update(new_loc);
724 drop(cursor);
725
726 self.apply_op(op).await?;
727 return Ok(Some(old_loc));
728 }
729 }
730
731 Ok(None)
733 }
734
735 async fn raise_inactivity_floor(
741 &mut self,
742 metadata: Option<V>,
743 max_steps: u64,
744 ) -> Result<(), Error> {
745 for _ in 0..max_steps {
746 if self.inactivity_floor_loc == self.log_size {
747 break;
748 }
749 let op = self.get_op(self.inactivity_floor_loc).await?;
750 self.move_op_if_active(op, self.inactivity_floor_loc)
751 .await?;
752 self.inactivity_floor_loc += 1;
753 }
754
755 self.apply_op(Operation::CommitFloor(metadata, self.inactivity_floor_loc))
756 .await
757 .map(|_| ())
758 }
759}
760
761#[cfg(test)]
762mod test {
763 use super::*;
764 use crate::translator::TwoCap;
765 use commonware_cryptography::{
766 blake3::{hash, Digest},
767 Digest as _,
768 };
769 use commonware_macros::test_traced;
770 use commonware_runtime::{deterministic, Runner};
771 use commonware_utils::{NZUsize, NZU64};
772
773 const PAGE_SIZE: usize = 77;
774 const PAGE_CACHE_SIZE: usize = 9;
775
776 type TestStore = Store<deterministic::Context, Digest, Vec<u8>, TwoCap>;
778
779 async fn create_test_store(context: deterministic::Context) -> TestStore {
780 let cfg = Config {
781 log_journal_partition: "journal".to_string(),
782 log_write_buffer: NZUsize!(64 * 1024),
783 log_compression: None,
784 log_codec_config: ((0..=10000).into(), ()),
785 log_items_per_section: NZU64!(7),
786 locations_journal_partition: "locations_journal".to_string(),
787 locations_items_per_blob: NZU64!(11),
788 translator: TwoCap,
789 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
790 };
791 Store::init(context, cfg).await.unwrap()
792 }
793
794 #[test_traced("DEBUG")]
795 pub fn test_store_construct_empty() {
796 let executor = deterministic::Runner::default();
797 executor.start(|mut context| async move {
798 let mut db = create_test_store(context.clone()).await;
799 assert_eq!(db.op_count(), 0);
800 assert_eq!(db.oldest_retained_loc, 0);
801 assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
802
803 let d1 = Digest::random(&mut context);
805 let v1 = vec![1, 2, 3];
806 db.update(d1, v1).await.unwrap();
807 db.close().await.unwrap();
808 let mut db = create_test_store(context.clone()).await;
809 assert_eq!(db.op_count(), 0);
810
811 db.commit(None).await.unwrap();
813 assert_eq!(db.op_count(), 1);
814 assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
815 let mut db = create_test_store(context.clone()).await;
816
817 for _ in 1..100 {
819 db.commit(None).await.unwrap();
820 assert_eq!(db.op_count() - 1, db.inactivity_floor_loc);
821 }
822
823 db.destroy().await.unwrap();
824 });
825 }
826
827 #[test_traced("DEBUG")]
828 fn test_store_construct_basic() {
829 let executor = deterministic::Runner::default();
830
831 executor.start(|mut ctx| async move {
832 let mut store = create_test_store(ctx.with_label("store")).await;
833
834 assert_eq!(store.op_count(), 0);
836 assert_eq!(store.uncommitted_ops, 0);
837 assert_eq!(store.inactivity_floor_loc, 0);
838
839 let key = Digest::random(&mut ctx);
840 let value = vec![2, 3, 4, 5];
841
842 let result = store.get(&key).await;
844 assert!(result.unwrap().is_none());
845
846 store.update(key, value.clone()).await.unwrap();
848
849 assert_eq!(store.log_size, 1);
850 assert_eq!(store.uncommitted_ops, 1);
851 assert_eq!(store.inactivity_floor_loc, 0);
852
853 let fetched_value = store.get(&key).await.unwrap();
855 assert_eq!(fetched_value.unwrap(), value);
856
857 store.sync().await.unwrap();
859
860 let mut store = create_test_store(ctx.with_label("store")).await;
862
863 assert_eq!(store.log_size, 0);
865 assert_eq!(store.uncommitted_ops, 0);
866 assert_eq!(store.inactivity_floor_loc, 0);
867 assert_eq!(store.get_metadata().await.unwrap(), None);
868
869 store.update(key, value.clone()).await.unwrap();
871
872 assert_eq!(store.log_size, 1);
873 assert_eq!(store.uncommitted_ops, 1);
874 assert_eq!(store.inactivity_floor_loc, 0);
875
876 let metadata = Some(vec![99, 100]);
878 store.commit(metadata.clone()).await.unwrap();
879 assert_eq!(
880 store.get_metadata().await.unwrap(),
881 Some((3, metadata.clone()))
882 );
883
884 assert_eq!(store.log_size, 4);
888 assert_eq!(store.uncommitted_ops, 0);
889 assert_eq!(store.inactivity_floor_loc, 2);
890
891 let mut store = create_test_store(ctx.with_label("store")).await;
893
894 assert_eq!(store.log_size, 4);
896 assert_eq!(store.uncommitted_ops, 0);
897 assert_eq!(store.inactivity_floor_loc, 2);
898
899 let fetched_value = store.get(&key).await.unwrap();
901 assert_eq!(fetched_value.unwrap(), value);
902
903 let (k1, v1) = (Digest::random(&mut ctx), vec![2, 3, 4, 5, 6]);
905 let (k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8]);
906 store.update(k1, v1.clone()).await.unwrap();
907 store.update(k2, v2.clone()).await.unwrap();
908
909 assert_eq!(store.log_size, 6);
910 assert_eq!(store.uncommitted_ops, 2);
911 assert_eq!(store.inactivity_floor_loc, 2);
912
913 assert_eq!(store.get_metadata().await.unwrap(), Some((3, metadata)));
915
916 store.commit(None).await.unwrap();
917 assert_eq!(store.get_metadata().await.unwrap(), Some((8, None)));
918
919 assert_eq!(store.log_size, 9);
920 assert_eq!(store.uncommitted_ops, 0);
921 assert_eq!(store.inactivity_floor_loc, 5);
922
923 assert_eq!(store.get(&key).await.unwrap().unwrap(), value);
925 assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
926 assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
927
928 store.destroy().await.unwrap();
930 });
931 }
932
933 #[test_traced("DEBUG")]
934 fn test_store_log_replay() {
935 let executor = deterministic::Runner::default();
936
937 executor.start(|mut ctx| async move {
938 let mut store = create_test_store(ctx.with_label("store")).await;
939
940 const UPDATES: u64 = 100;
942 let k = Digest::random(&mut ctx);
943 for _ in 0..UPDATES {
944 let v = vec![1, 2, 3, 4, 5];
945 store.update(k, v.clone()).await.unwrap();
946 }
947
948 let iter = store.snapshot.get(&k);
949 assert_eq!(iter.count(), 1);
950
951 store.commit(None).await.unwrap();
952 store.close().await.unwrap();
953
954 let mut store = create_test_store(ctx.with_label("store")).await;
956 store.prune(store.inactivity_floor_loc()).await.unwrap();
957
958 let iter = store.snapshot.get(&k);
959 assert_eq!(iter.count(), 1);
960
961 assert_eq!(store.log_size, UPDATES + 3);
964 assert_eq!(store.inactivity_floor_loc, UPDATES + 1);
966
967 assert_eq!(store.oldest_retained_loc, UPDATES - UPDATES % 7);
970 assert_eq!(store.uncommitted_ops, 0);
971
972 store.destroy().await.unwrap();
973 });
974 }
975
976 #[test_traced("DEBUG")]
977 fn test_store_build_snapshot_keys_with_shared_prefix() {
978 let executor = deterministic::Runner::default();
979
980 executor.start(|mut ctx| async move {
981 let mut store = create_test_store(ctx.with_label("store")).await;
982
983 let (k1, v1) = (Digest::random(&mut ctx), vec![1, 2, 3, 4, 5]);
984 let (mut k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8, 9, 10]);
985
986 k2.0[0..2].copy_from_slice(&k1.0[0..2]);
988
989 store.update(k1, v1.clone()).await.unwrap();
990 store.update(k2, v2.clone()).await.unwrap();
991
992 assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
993 assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
994
995 store.commit(None).await.unwrap();
996 store.close().await.unwrap();
997
998 let store = create_test_store(ctx.with_label("store")).await;
1001
1002 assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
1003 assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
1004
1005 store.destroy().await.unwrap();
1006 });
1007 }
1008
1009 #[test_traced("DEBUG")]
1010 fn test_store_delete() {
1011 let executor = deterministic::Runner::default();
1012
1013 executor.start(|mut ctx| async move {
1014 let mut store = create_test_store(ctx.with_label("store")).await;
1015
1016 let k = Digest::random(&mut ctx);
1018 let v = vec![1, 2, 3, 4, 5];
1019 store.update(k, v.clone()).await.unwrap();
1020
1021 let fetched_value = store.get(&k).await.unwrap();
1023 assert_eq!(fetched_value.unwrap(), v);
1024
1025 store.delete(k).await.unwrap();
1027
1028 let fetched_value = store.get(&k).await.unwrap();
1030 assert!(fetched_value.is_none());
1031
1032 store.commit(None).await.unwrap();
1034
1035 let mut store = create_test_store(ctx.with_label("store")).await;
1037 let fetched_value = store.get(&k).await.unwrap();
1038 assert!(fetched_value.is_none());
1039
1040 store.update(k, v.clone()).await.unwrap();
1042 let fetched_value = store.get(&k).await.unwrap();
1043 assert_eq!(fetched_value.unwrap(), v);
1044
1045 store.commit(None).await.unwrap();
1047
1048 let mut store = create_test_store(ctx.with_label("store")).await;
1051 let fetched_value = store.get(&k).await.unwrap();
1052 assert_eq!(fetched_value.unwrap(), v);
1053
1054 let k_n = Digest::random(&mut ctx);
1056 store.delete(k_n).await.unwrap();
1057
1058 let iter = store.snapshot.get(&k);
1059 assert_eq!(iter.count(), 1);
1060
1061 let iter = store.snapshot.get(&k_n);
1062 assert_eq!(iter.count(), 0);
1063
1064 store.destroy().await.unwrap();
1065 });
1066 }
1067
1068 #[test_traced("DEBUG")]
1070 fn test_store_pruning() {
1071 let executor = deterministic::Runner::default();
1072
1073 executor.start(|mut ctx| async move {
1074 let mut store = create_test_store(ctx.with_label("store")).await;
1075
1076 let k_a = Digest::random(&mut ctx);
1077 let k_b = Digest::random(&mut ctx);
1078
1079 let v_a = vec![1];
1080 let v_b = vec![];
1081 let v_c = vec![4, 5, 6];
1082
1083 store.update(k_a, v_a.clone()).await.unwrap();
1084 store.update(k_b, v_b.clone()).await.unwrap();
1085
1086 store.commit(None).await.unwrap();
1087 assert_eq!(store.op_count(), 6);
1088 assert_eq!(store.uncommitted_ops, 0);
1089 assert_eq!(store.inactivity_floor_loc, 3);
1090 assert_eq!(store.get(&k_a).await.unwrap().unwrap(), v_a);
1091
1092 store.update(k_b, v_a.clone()).await.unwrap();
1093 store.update(k_a, v_c.clone()).await.unwrap();
1094
1095 store.commit(None).await.unwrap();
1096 assert_eq!(store.op_count(), 9);
1097 assert_eq!(store.uncommitted_ops, 0);
1098 assert_eq!(store.inactivity_floor_loc, 6);
1099 assert_eq!(store.get(&k_a).await.unwrap().unwrap(), v_c);
1100 assert_eq!(store.get(&k_b).await.unwrap().unwrap(), v_a);
1101
1102 store.destroy().await.unwrap();
1103 });
1104 }
1105
1106 #[test_traced("WARN")]
1107 pub fn test_store_db_recovery() {
1108 let executor = deterministic::Runner::default();
1109 const ELEMENTS: u64 = 1000;
1111 executor.start(|context| async move {
1112 let mut db = create_test_store(context.with_label("store")).await;
1113
1114 for i in 0u64..ELEMENTS {
1115 let k = hash(&i.to_be_bytes());
1116 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1117 db.update(k, v.clone()).await.unwrap();
1118 }
1119
1120 db.simulate_failure(false, false).await.unwrap();
1122 let mut db = create_test_store(context.with_label("store")).await;
1123 assert_eq!(db.op_count(), 0);
1124
1125 for i in 0u64..ELEMENTS {
1127 let k = hash(&i.to_be_bytes());
1128 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1129 db.update(k, v.clone()).await.unwrap();
1130 }
1131 db.commit(None).await.unwrap();
1132 let op_count = db.op_count();
1133
1134 for i in 0u64..ELEMENTS {
1136 if i % 3 != 0 {
1137 continue;
1138 }
1139 let k = hash(&i.to_be_bytes());
1140 let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1141 db.update(k, v.clone()).await.unwrap();
1142 }
1143
1144 db.simulate_failure(false, false).await.unwrap();
1146 let mut db = create_test_store(context.with_label("store")).await;
1147 assert_eq!(db.op_count(), op_count);
1148
1149 for i in 0u64..ELEMENTS {
1151 if i % 3 != 0 {
1152 continue;
1153 }
1154 let k = hash(&i.to_be_bytes());
1155 let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1156 db.update(k, v.clone()).await.unwrap();
1157 }
1158 db.commit(None).await.unwrap();
1159 let op_count = db.op_count();
1160 assert_eq!(op_count, 2561);
1161 assert_eq!(db.snapshot.items(), 1000);
1162
1163 for i in 0u64..ELEMENTS {
1165 if i % 7 != 1 {
1166 continue;
1167 }
1168 let k = hash(&i.to_be_bytes());
1169 db.delete(k).await.unwrap();
1170 }
1171
1172 db.simulate_failure(false, false).await.unwrap();
1174 let db = create_test_store(context.with_label("store")).await;
1175 assert_eq!(db.op_count(), op_count);
1176
1177 db.close().await.unwrap();
1179 let mut db = create_test_store(context.with_label("store")).await;
1180 assert_eq!(db.op_count(), op_count);
1181
1182 for i in 0u64..ELEMENTS {
1184 if i % 7 != 1 {
1185 continue;
1186 }
1187 let k = hash(&i.to_be_bytes());
1188 db.delete(k).await.unwrap();
1189 }
1190 db.commit(None).await.unwrap();
1191
1192 assert_eq!(db.op_count(), 2787);
1193 assert_eq!(db.inactivity_floor_loc, 1480);
1194
1195 db.prune(db.inactivity_floor_loc()).await.unwrap();
1196 assert_eq!(db.oldest_retained_loc, 1480 - 1480 % 7);
1197 assert_eq!(db.snapshot.items(), 857);
1198
1199 db.destroy().await.unwrap();
1200 });
1201 }
1202}