1use crate::{
100 index::Index,
101 journal::{
102 fixed::{Config as FConfig, Journal as FJournal},
103 variable::{Config as VConfig, Journal as VJournal},
104 },
105 store::operation::Variable as Operation,
106 translator::Translator,
107};
108use commonware_codec::{Codec, Read};
109use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage as RStorage};
110use commonware_utils::{sequence::U32, Array, NZUsize};
111use futures::{pin_mut, try_join, StreamExt};
112use std::{
113 collections::HashMap,
114 num::{NonZeroU64, NonZeroUsize},
115};
116use tracing::{debug, warn};
117
118pub mod operation;
119
120const SNAPSHOT_READ_BUFFER_SIZE: usize = 1 << 16;
123
124#[derive(thiserror::Error, Debug)]
126pub enum Error {
127 #[error(transparent)]
128 Journal(#[from] crate::journal::Error),
129
130 #[error("key not found")]
132 KeyNotFound,
133}
134
135#[derive(Clone)]
137pub struct Config<T: Translator, C> {
138 pub log_journal_partition: String,
140
141 pub log_write_buffer: NonZeroUsize,
143
144 pub log_compression: Option<u8>,
146
147 pub log_codec_config: C,
149
150 pub log_items_per_section: NonZeroU64,
152
153 pub locations_journal_partition: String,
155
156 pub locations_items_per_blob: NonZeroU64,
158
159 pub translator: T,
161
162 pub buffer_pool: PoolRef,
164}
165
166pub struct Store<E, K, V, T>
168where
169 E: RStorage + Clock + Metrics,
170 K: Array,
171 V: Codec,
172 T: Translator,
173{
174 log: VJournal<E, Operation<K, V>>,
176
177 snapshot: Index<T, u64>,
184
185 log_items_per_section: u64,
187
188 locations: FJournal<E, U32>,
191
192 inactivity_floor_loc: u64,
195
196 oldest_retained_loc: u64,
198
199 log_size: u64,
201
202 uncommitted_ops: u64,
204}
205
206impl<E, K, V, T> Store<E, K, V, T>
207where
208 E: RStorage + Clock + Metrics,
209 K: Array,
210 V: Codec,
211 T: Translator,
212{
213 pub async fn init(
220 context: E,
221 cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
222 ) -> Result<Self, Error> {
223 let snapshot: Index<T, u64> = Index::init(context.with_label("snapshot"), cfg.translator);
224
225 let log = VJournal::init(
226 context.with_label("log"),
227 VConfig {
228 partition: cfg.log_journal_partition,
229 compression: cfg.log_compression,
230 codec_config: cfg.log_codec_config,
231 buffer_pool: cfg.buffer_pool.clone(),
232 write_buffer: cfg.log_write_buffer,
233 },
234 )
235 .await?;
236
237 let locations = FJournal::init(
238 context.with_label("locations"),
239 FConfig {
240 partition: cfg.locations_journal_partition,
241 items_per_blob: cfg.locations_items_per_blob,
242 write_buffer: cfg.log_write_buffer,
243 buffer_pool: cfg.buffer_pool,
244 },
245 )
246 .await?;
247
248 let db = Self {
249 log,
250 snapshot,
251 log_items_per_section: cfg.log_items_per_section.get(),
252 locations,
253 inactivity_floor_loc: 0,
254 oldest_retained_loc: 0,
255 log_size: 0,
256 uncommitted_ops: 0,
257 };
258 db.build_snapshot_from_log().await
259 }
260
261 pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
265 for location in self.snapshot.get(key) {
266 let Operation::Update(k, v) = self.get_op(*location).await? else {
267 panic!("location ({location}) does not reference set operation",);
268 };
269
270 if &k == key {
271 return Ok(Some(v));
272 }
273 }
274
275 Ok(None)
276 }
277
278 pub async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
284 let new_location = self.log_size;
285 if let Some(old_location) = self.get_loc(&key).await? {
286 Self::update_loc(&mut self.snapshot, &key, old_location, new_location);
287 } else {
288 self.snapshot.insert(&key, new_location);
289 };
290
291 self.apply_op(Operation::Update(key, value))
292 .await
293 .map(|_| ())
294 }
295
296 pub async fn delete(&mut self, key: K) -> Result<(), Error> {
299 let Some(old_loc) = self.get_loc(&key).await? else {
300 return Ok(());
302 };
303
304 Self::delete_loc(&mut self.snapshot, &key, old_loc);
305
306 self.apply_op(Operation::Delete(key)).await.map(|_| ())
307 }
308
309 pub async fn commit(&mut self) -> Result<(), Error> {
311 self.raise_inactivity_floor(self.uncommitted_ops + 1)
312 .await?;
313 self.uncommitted_ops = 0;
314
315 self.sync().await?;
316 self.prune_inactive().await
317 }
318
319 pub async fn close(self) -> Result<(), Error> {
322 if self.uncommitted_ops > 0 {
323 warn!(
324 log_size = self.log_size,
325 uncommitted_ops = self.uncommitted_ops,
326 "closing store with uncommitted operations"
327 );
328 }
329
330 try_join!(self.log.close(), self.locations.close())?;
331 Ok(())
332 }
333
334 #[cfg(test)]
336 pub async fn simulate_failure(
337 mut self,
338 sync_locations: bool,
339 sync_log: bool,
340 ) -> Result<(), Error> {
341 if sync_locations {
342 self.locations.sync().await?;
343 }
344 if sync_log {
345 self.log
346 .sync(self.log_size / self.log_items_per_section)
347 .await?;
348 }
349
350 Ok(())
351 }
352
353 pub async fn destroy(self) -> Result<(), Error> {
360 try_join!(self.log.destroy(), self.locations.destroy())?;
361 Ok(())
362 }
363
364 pub fn op_count(&self) -> u64 {
367 self.log_size
368 }
369
370 async fn sync(&mut self) -> Result<(), Error> {
378 let current_section = self.log_size / self.log_items_per_section;
379 try_join!(self.log.sync(current_section), self.locations.sync())?;
380 Ok(())
381 }
382
383 async fn build_snapshot_from_log(mut self) -> Result<Self, Error> {
389 let mut locations_size = self.locations.size().await?;
390
391 let mut end_loc = 0;
394 let mut end_offset = 0;
396 let mut uncommitted_ops = HashMap::new();
398 let mut oldest_retained_loc_found = false;
399 {
400 let stream = self.log.replay(NZUsize!(SNAPSHOT_READ_BUFFER_SIZE)).await?;
401 pin_mut!(stream);
402 while let Some(result) = stream.next().await {
403 match result {
404 Err(e) => {
405 return Err(Error::Journal(e));
406 }
407 Ok((section, offset, size, op)) => {
408 if !oldest_retained_loc_found {
409 self.log_size = section * self.log_items_per_section;
410 self.oldest_retained_loc = self.log_size;
411 oldest_retained_loc_found = true;
412 }
413 let loc = self.log_size; self.log_size += 1;
415
416 let expected = loc / self.log_items_per_section;
419 assert_eq!(section, expected,
420 "given section {section} did not match expected section {expected} from location {loc}");
421
422 if self.log_size > locations_size {
423 warn!(section, offset, "operation was missing from location map");
424 self.locations.append(offset.into()).await?;
425 locations_size += 1;
426 }
427
428 match op {
429 Operation::Delete(key) => {
430 let result = self.get_loc(&key).await?;
431 if let Some(old_loc) = result {
432 uncommitted_ops.insert(key, (Some(old_loc), None));
433 } else {
434 uncommitted_ops.remove(&key);
435 }
436 }
437 Operation::Update(key, _) => {
438 let result = self.get_loc(&key).await?;
439 if let Some(old_loc) = result {
440 uncommitted_ops.insert(key, (Some(old_loc), Some(loc)));
441 } else {
442 uncommitted_ops.insert(key, (None, Some(loc)));
443 }
444 }
445 Operation::CommitFloor(loc) => {
446 self.inactivity_floor_loc = loc;
447
448 for (key, (old_loc, new_loc)) in uncommitted_ops.iter() {
450 if let Some(old_loc) = old_loc {
451 if let Some(new_loc) = new_loc {
452 Self::update_loc(
453 &mut self.snapshot,
454 key,
455 *old_loc,
456 *new_loc,
457 );
458 } else {
459 Self::delete_loc(&mut self.snapshot, key, *old_loc);
460 }
461 } else {
462 assert!(new_loc.is_some());
463 self.snapshot.insert(key, new_loc.unwrap());
464 }
465 }
466 uncommitted_ops.clear();
467 end_loc = self.log_size;
468 end_offset = offset + size;
469 }
470 _ => unreachable!(
471 "unexpected operation type at offset {offset} of section {section}"
472 ),
473 }
474 }
475 }
476 }
477 }
478 if end_loc < self.log_size {
479 warn!(
480 op_count = uncommitted_ops.len(),
481 log_size = end_loc,
482 "rewinding over uncommitted operations at end of log"
483 );
484 let prune_to_section = end_loc.saturating_sub(1) / self.log_items_per_section;
485 self.log
486 .rewind_to_offset(prune_to_section, end_offset)
487 .await?;
488 self.log.sync(prune_to_section).await?;
489 self.log_size = end_loc;
490 }
491
492 if locations_size > self.log_size {
494 warn!(
495 locations_size,
496 log_size = self.log_size,
497 "rewinding uncommitted locations"
498 );
499 self.locations.rewind(self.log_size).await?;
500 }
501
502 assert_eq!(self.log_size, self.locations.size().await?);
504
505 debug!(log_size = self.log_size, "build_snapshot_from_log complete");
506
507 Ok(self)
508 }
509
510 async fn apply_op(&mut self, op: Operation<K, V>) -> Result<u32, Error> {
513 let current_section = self.log_size / self.log_items_per_section;
514
515 let (offset, _) = self.log.append(current_section, op).await?;
521 self.locations.append(offset.into()).await?;
522
523 self.uncommitted_ops += 1;
524 self.log_size += 1;
525
526 let new_section = self.log_size / self.log_items_per_section;
527
528 if new_section != current_section {
530 self.log.sync(current_section).await?;
531 }
532
533 Ok(offset)
534 }
535
536 async fn get_loc(&self, key: &K) -> Result<Option<u64>, Error> {
539 for loc in self.snapshot.get(key) {
540 match self.get_op(*loc).await {
541 Ok(Operation::Update(k, _)) => {
542 if k == *key {
543 return Ok(Some(*loc));
544 }
545 }
546 Err(Error::KeyNotFound) => return Ok(None),
547 _ => continue,
548 }
549 }
550
551 Ok(None)
552 }
553
554 async fn get_op(&self, location: u64) -> Result<Operation<K, V>, Error> {
556 let section = location / self.log_items_per_section;
557 let offset = self.locations.read(location).await?.into();
558
559 let Some(op) = self.log.get(section, offset).await? else {
561 return Err(Error::KeyNotFound);
562 };
563
564 Ok(op)
565 }
566
567 fn update_loc(snapshot: &mut Index<T, u64>, key: &K, old_location: u64, new_location: u64) {
569 let Some(mut cursor) = snapshot.get_mut(key) else {
570 return;
571 };
572
573 while let Some(location) = cursor.next() {
575 if *location == old_location {
576 cursor.update(new_location);
578 return;
579 }
580 }
581 }
582
583 fn delete_loc(snapshot: &mut Index<T, u64>, key: &K, old_location: u64) {
585 let Some(mut cursor) = snapshot.get_mut(key) else {
586 return;
587 };
588
589 while let Some(location) = cursor.next() {
591 if *location == old_location {
592 cursor.delete();
594 return;
595 }
596 }
597 }
598
599 async fn move_op_if_active(
603 &mut self,
604 op: Operation<K, V>,
605 old_location: u64,
606 ) -> Result<Option<u64>, Error> {
607 let Some(key) = op.to_key() else {
609 return Ok(None);
611 };
612
613 let Some(mut cursor) = self.snapshot.get_mut(key) else {
614 return Ok(None);
615 };
616
617 let new_location = self.log_size;
618
619 while let Some(&location) = cursor.next() {
621 if location == old_location {
622 cursor.update(new_location);
624 drop(cursor);
625
626 self.apply_op(op).await?;
627 return Ok(Some(old_location));
628 }
629 }
630
631 Ok(None)
633 }
634
635 async fn raise_inactivity_floor(&mut self, max_steps: u64) -> Result<(), Error> {
641 for _ in 0..max_steps {
642 if self.inactivity_floor_loc == self.log_size {
643 break;
644 }
645 let op = self.get_op(self.inactivity_floor_loc).await?;
646 self.move_op_if_active(op, self.inactivity_floor_loc)
647 .await?;
648 self.inactivity_floor_loc += 1;
649 }
650
651 self.apply_op(Operation::CommitFloor(self.inactivity_floor_loc))
652 .await
653 .map(|_| ())
654 }
655
656 async fn prune_inactive(&mut self) -> Result<(), Error> {
659 if self.log_size == 0 {
660 return Ok(());
661 }
662
663 let target_prune_loc = self.inactivity_floor_loc;
665 let ops_to_prune = target_prune_loc.saturating_sub(self.oldest_retained_loc);
666 if ops_to_prune == 0 {
667 return Ok(());
668 }
669 debug!(ops_to_prune, target_prune_loc, "pruning inactive ops");
670
671 let section = target_prune_loc / self.log_items_per_section;
677 self.log.prune(section).await?;
678 self.oldest_retained_loc = section * self.log_items_per_section;
679
680 self.locations
682 .prune(self.oldest_retained_loc)
683 .await
684 .map_err(Error::Journal)
685 }
686}
687
688#[cfg(test)]
689mod test {
690 use super::*;
691 use crate::translator::TwoCap;
692 use commonware_cryptography::{
693 blake3::{hash, Digest},
694 Digest as _,
695 };
696 use commonware_macros::test_traced;
697 use commonware_runtime::{deterministic, Runner};
698 use commonware_utils::{NZUsize, NZU64};
699
700 const PAGE_SIZE: usize = 77;
701 const PAGE_CACHE_SIZE: usize = 9;
702
703 type TestStore = Store<deterministic::Context, Digest, Vec<u8>, TwoCap>;
705
706 async fn create_test_store(context: deterministic::Context) -> TestStore {
707 let cfg = Config {
708 log_journal_partition: "journal".to_string(),
709 log_write_buffer: NZUsize!(64 * 1024),
710 log_compression: None,
711 log_codec_config: ((0..=10000).into(), ()),
712 log_items_per_section: NZU64!(7),
713 locations_journal_partition: "locations_journal".to_string(),
714 locations_items_per_blob: NZU64!(11),
715 translator: TwoCap,
716 buffer_pool: PoolRef::new(NZUsize!(PAGE_SIZE), NZUsize!(PAGE_CACHE_SIZE)),
717 };
718 Store::init(context, cfg).await.unwrap()
719 }
720
721 #[test_traced("DEBUG")]
722 pub fn test_store_construct_empty() {
723 let executor = deterministic::Runner::default();
724 executor.start(|mut context| async move {
725 let mut db = create_test_store(context.clone()).await;
726 assert_eq!(db.op_count(), 0);
727 assert_eq!(db.oldest_retained_loc, 0);
728 assert!(matches!(db.prune_inactive().await, Ok(())));
729
730 let d1 = Digest::random(&mut context);
732 let v1 = vec![1, 2, 3];
733 db.update(d1, v1).await.unwrap();
734 db.close().await.unwrap();
735 let mut db = create_test_store(context.clone()).await;
736 assert_eq!(db.op_count(), 0);
737
738 db.commit().await.unwrap();
740 assert_eq!(db.op_count(), 1);
741 assert!(matches!(db.prune_inactive().await, Ok(())));
742 let mut db = create_test_store(context.clone()).await;
743
744 for _ in 1..100 {
746 db.commit().await.unwrap();
747 assert_eq!(db.op_count() - 1, db.inactivity_floor_loc);
748 }
749
750 db.destroy().await.unwrap();
751 });
752 }
753
754 #[test_traced("DEBUG")]
755 fn test_store_construct_basic() {
756 let executor = deterministic::Runner::default();
757
758 executor.start(|mut ctx| async move {
759 let mut store = create_test_store(ctx.with_label("store")).await;
760
761 assert_eq!(store.op_count(), 0);
763 assert_eq!(store.uncommitted_ops, 0);
764 assert_eq!(store.inactivity_floor_loc, 0);
765
766 let key = Digest::random(&mut ctx);
767 let value = vec![2, 3, 4, 5];
768
769 let result = store.get(&key).await;
771 assert!(result.unwrap().is_none());
772
773 store.update(key, value.clone()).await.unwrap();
775
776 assert_eq!(store.log_size, 1);
777 assert_eq!(store.uncommitted_ops, 1);
778 assert_eq!(store.inactivity_floor_loc, 0);
779
780 let fetched_value = store.get(&key).await.unwrap();
782 assert_eq!(fetched_value.unwrap(), value);
783
784 store.sync().await.unwrap();
786
787 let mut store = create_test_store(ctx.with_label("store")).await;
789
790 assert_eq!(store.log_size, 0);
792 assert_eq!(store.uncommitted_ops, 0);
793 assert_eq!(store.inactivity_floor_loc, 0);
794
795 store.update(key, value.clone()).await.unwrap();
797
798 assert_eq!(store.log_size, 1);
799 assert_eq!(store.uncommitted_ops, 1);
800 assert_eq!(store.inactivity_floor_loc, 0);
801
802 store.commit().await.unwrap();
804
805 assert_eq!(store.log_size, 4);
809 assert_eq!(store.uncommitted_ops, 0);
810 assert_eq!(store.inactivity_floor_loc, 2);
811
812 let mut store = create_test_store(ctx.with_label("store")).await;
814
815 assert_eq!(store.log_size, 4);
817 assert_eq!(store.uncommitted_ops, 0);
818 assert_eq!(store.inactivity_floor_loc, 2);
819
820 let fetched_value = store.get(&key).await.unwrap();
822 assert_eq!(fetched_value.unwrap(), value);
823
824 let (k1, v1) = (Digest::random(&mut ctx), vec![2, 3, 4, 5, 6]);
826 let (k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8]);
827 store.update(k1, v1.clone()).await.unwrap();
828 store.update(k2, v2.clone()).await.unwrap();
829
830 assert_eq!(store.log_size, 6);
831 assert_eq!(store.uncommitted_ops, 2);
832 assert_eq!(store.inactivity_floor_loc, 2);
833
834 store.commit().await.unwrap();
835
836 assert_eq!(store.log_size, 9);
837 assert_eq!(store.uncommitted_ops, 0);
838 assert_eq!(store.inactivity_floor_loc, 5);
839
840 assert_eq!(store.get(&key).await.unwrap().unwrap(), value);
842 assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
843 assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
844
845 store.destroy().await.unwrap();
847 });
848 }
849
850 #[test_traced("DEBUG")]
851 fn test_store_log_replay() {
852 let executor = deterministic::Runner::default();
853
854 executor.start(|mut ctx| async move {
855 let mut store = create_test_store(ctx.with_label("store")).await;
856
857 const UPDATES: u64 = 100;
859 let k = Digest::random(&mut ctx);
860 for _ in 0..UPDATES {
861 let v = vec![1, 2, 3, 4, 5];
862 store.update(k, v.clone()).await.unwrap();
863 }
864
865 let iter = store.snapshot.get(&k);
866 assert_eq!(iter.count(), 1);
867
868 store.commit().await.unwrap();
869 store.close().await.unwrap();
870
871 let store = create_test_store(ctx.with_label("store")).await;
873
874 let iter = store.snapshot.get(&k);
875 assert_eq!(iter.count(), 1);
876
877 assert_eq!(store.log_size, UPDATES + 3);
880 assert_eq!(store.inactivity_floor_loc, UPDATES + 1);
882 assert_eq!(store.oldest_retained_loc, UPDATES - UPDATES % 7);
885 assert_eq!(store.uncommitted_ops, 0);
886
887 store.destroy().await.unwrap();
888 });
889 }
890
891 #[test_traced("DEBUG")]
892 fn test_store_build_snapshot_keys_with_shared_prefix() {
893 let executor = deterministic::Runner::default();
894
895 executor.start(|mut ctx| async move {
896 let mut store = create_test_store(ctx.with_label("store")).await;
897
898 let (k1, v1) = (Digest::random(&mut ctx), vec![1, 2, 3, 4, 5]);
899 let (mut k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8, 9, 10]);
900
901 k2.0[0..2].copy_from_slice(&k1.0[0..2]);
903
904 store.update(k1, v1.clone()).await.unwrap();
905 store.update(k2, v2.clone()).await.unwrap();
906
907 assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
908 assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
909
910 store.commit().await.unwrap();
911 store.close().await.unwrap();
912
913 let store = create_test_store(ctx.with_label("store")).await;
916
917 assert_eq!(store.get(&k1).await.unwrap().unwrap(), v1);
918 assert_eq!(store.get(&k2).await.unwrap().unwrap(), v2);
919
920 store.destroy().await.unwrap();
921 });
922 }
923
924 #[test_traced("DEBUG")]
925 fn test_store_delete() {
926 let executor = deterministic::Runner::default();
927
928 executor.start(|mut ctx| async move {
929 let mut store = create_test_store(ctx.with_label("store")).await;
930
931 let k = Digest::random(&mut ctx);
933 let v = vec![1, 2, 3, 4, 5];
934 store.update(k, v.clone()).await.unwrap();
935
936 let fetched_value = store.get(&k).await.unwrap();
938 assert_eq!(fetched_value.unwrap(), v);
939
940 store.delete(k).await.unwrap();
942
943 let fetched_value = store.get(&k).await.unwrap();
945 assert!(fetched_value.is_none());
946
947 store.commit().await.unwrap();
949
950 let mut store = create_test_store(ctx.with_label("store")).await;
952 let fetched_value = store.get(&k).await.unwrap();
953 assert!(fetched_value.is_none());
954
955 store.update(k, v.clone()).await.unwrap();
957 let fetched_value = store.get(&k).await.unwrap();
958 assert_eq!(fetched_value.unwrap(), v);
959
960 store.commit().await.unwrap();
962
963 let mut store = create_test_store(ctx.with_label("store")).await;
966 let fetched_value = store.get(&k).await.unwrap();
967 assert_eq!(fetched_value.unwrap(), v);
968
969 let k_n = Digest::random(&mut ctx);
971 store.delete(k_n).await.unwrap();
972
973 let iter = store.snapshot.get(&k);
974 assert_eq!(iter.count(), 1);
975
976 let iter = store.snapshot.get(&k_n);
977 assert_eq!(iter.count(), 0);
978
979 store.destroy().await.unwrap();
980 });
981 }
982
983 #[test_traced("DEBUG")]
985 fn test_store_pruning() {
986 let executor = deterministic::Runner::default();
987
988 executor.start(|mut ctx| async move {
989 let mut store = create_test_store(ctx.with_label("store")).await;
990
991 let k_a = Digest::random(&mut ctx);
992 let k_b = Digest::random(&mut ctx);
993
994 let v_a = vec![1];
995 let v_b = vec![];
996 let v_c = vec![4, 5, 6];
997
998 store.update(k_a, v_a.clone()).await.unwrap();
999 store.update(k_b, v_b.clone()).await.unwrap();
1000
1001 store.commit().await.unwrap();
1002 assert_eq!(store.op_count(), 6);
1003 assert_eq!(store.uncommitted_ops, 0);
1004 assert_eq!(store.inactivity_floor_loc, 3);
1005 assert_eq!(store.get(&k_a).await.unwrap().unwrap(), v_a);
1006
1007 store.update(k_b, v_a.clone()).await.unwrap();
1008 store.update(k_a, v_c.clone()).await.unwrap();
1009
1010 store.commit().await.unwrap();
1011 assert_eq!(store.op_count(), 9);
1012 assert_eq!(store.uncommitted_ops, 0);
1013 assert_eq!(store.inactivity_floor_loc, 6);
1014 assert_eq!(store.get(&k_a).await.unwrap().unwrap(), v_c);
1015 assert_eq!(store.get(&k_b).await.unwrap().unwrap(), v_a);
1016
1017 store.destroy().await.unwrap();
1018 });
1019 }
1020
1021 #[test_traced("WARN")]
1022 pub fn test_store_db_recovery() {
1023 let executor = deterministic::Runner::default();
1024 const ELEMENTS: u64 = 1000;
1026 executor.start(|context| async move {
1027 let mut db = create_test_store(context.with_label("store")).await;
1028
1029 for i in 0u64..ELEMENTS {
1030 let k = hash(&i.to_be_bytes());
1031 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1032 db.update(k, v.clone()).await.unwrap();
1033 }
1034
1035 db.simulate_failure(false, false).await.unwrap();
1037 let mut db = create_test_store(context.with_label("store")).await;
1038 assert_eq!(db.op_count(), 0);
1039
1040 for i in 0u64..ELEMENTS {
1042 let k = hash(&i.to_be_bytes());
1043 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1044 db.update(k, v.clone()).await.unwrap();
1045 }
1046 db.commit().await.unwrap();
1047 let op_count = db.op_count();
1048
1049 for i in 0u64..ELEMENTS {
1051 if i % 3 != 0 {
1052 continue;
1053 }
1054 let k = hash(&i.to_be_bytes());
1055 let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1056 db.update(k, v.clone()).await.unwrap();
1057 }
1058
1059 db.simulate_failure(false, false).await.unwrap();
1061 let mut db = create_test_store(context.with_label("store")).await;
1062 assert_eq!(db.op_count(), op_count);
1063
1064 for i in 0u64..ELEMENTS {
1066 if i % 3 != 0 {
1067 continue;
1068 }
1069 let k = hash(&i.to_be_bytes());
1070 let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1071 db.update(k, v.clone()).await.unwrap();
1072 }
1073 db.commit().await.unwrap();
1074 let op_count = db.op_count();
1075 assert_eq!(op_count, 2561);
1076 assert_eq!(db.snapshot.items(), 1000);
1077
1078 for i in 0u64..ELEMENTS {
1080 if i % 7 != 1 {
1081 continue;
1082 }
1083 let k = hash(&i.to_be_bytes());
1084 db.delete(k).await.unwrap();
1085 }
1086
1087 db.simulate_failure(false, false).await.unwrap();
1089 let mut db = create_test_store(context.with_label("store")).await;
1090 assert_eq!(db.op_count(), op_count);
1091
1092 for i in 0u64..ELEMENTS {
1094 if i % 7 != 1 {
1095 continue;
1096 }
1097 let k = hash(&i.to_be_bytes());
1098 db.delete(k).await.unwrap();
1099 }
1100 db.commit().await.unwrap();
1101
1102 assert_eq!(db.op_count(), 2787);
1103 assert_eq!(db.inactivity_floor_loc, 1480);
1104 assert_eq!(db.oldest_retained_loc, 1480 - 1480 % 7);
1105 assert_eq!(db.snapshot.items(), 857);
1106
1107 db.destroy().await.unwrap();
1108 });
1109 }
1110}