1use crate::{
77 index::{unordered::Index, Unordered as _},
78 journal::contiguous::{
79 variable::{Config as JournalConfig, Journal},
80 MutableContiguous as _,
81 },
82 kv::{Batchable, Deletable, Updatable},
83 mmr::Location,
84 qmdb::{
85 any::{
86 unordered::{variable::Operation, Update},
87 VariableValue,
88 },
89 build_snapshot_from_log, create_key, delete_key,
90 operation::{Committable as _, Operation as _},
91 store::{Durable, LogStore, NonDurable, PrunableStore, State},
92 update_key, Error, FloorHelper,
93 },
94 translator::Translator,
95 Persistable,
96};
97use commonware_codec::Read;
98use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage};
99use commonware_utils::Array;
100use core::ops::Range;
101use std::num::{NonZeroU64, NonZeroUsize};
102use tracing::{debug, warn};
103
104#[derive(Clone)]
106pub struct Config<T: Translator, C> {
107 pub log_partition: String,
109
110 pub log_write_buffer: NonZeroUsize,
112
113 pub log_compression: Option<u8>,
115
116 pub log_codec_config: C,
118
119 pub log_items_per_section: NonZeroU64,
121
122 pub translator: T,
124
125 pub page_cache: CacheRef,
127}
128
129pub struct Db<E, K, V, T, S = Durable>
131where
132 E: Storage + Clock + Metrics,
133 K: Array,
134 V: VariableValue,
135 T: Translator,
136 S: State,
137{
138 log: Journal<E, Operation<K, V>>,
145
146 snapshot: Index<T, Location>,
153
154 active_keys: usize,
156
157 pub inactivity_floor_loc: Location,
160
161 pub last_commit_loc: Location,
163
164 pub state: S,
166}
167
168impl<E, K, V, T, S> Db<E, K, V, T, S>
169where
170 E: Storage + Clock + Metrics,
171 K: Array,
172 V: VariableValue,
173 T: Translator,
174 S: State,
175{
176 pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
178 for &loc in self.snapshot.get(key) {
179 let Operation::Update(Update(k, v)) = self.get_op(loc).await? else {
180 unreachable!("location ({loc}) does not reference update operation");
181 };
182
183 if &k == key {
184 return Ok(Some(v));
185 }
186 }
187
188 Ok(None)
189 }
190
191 pub const fn is_empty(&self) -> bool {
193 self.active_keys == 0
194 }
195
196 async fn get_op(&self, loc: Location) -> Result<Operation<K, V>, Error> {
200 assert!(*loc < self.size());
201
202 self.log.read(*loc).await.map_err(|e| match e {
205 crate::journal::Error::ItemPruned(_) => Error::OperationPruned(loc),
206 e => Error::Journal(e),
207 })
208 }
209
210 pub const fn bounds(&self) -> std::ops::Range<Location> {
213 let bounds = self.log.bounds();
214 Location::new_unchecked(bounds.start)..Location::new_unchecked(bounds.end)
215 }
216
217 pub const fn size(&self) -> Location {
219 self.bounds().end
220 }
221
222 pub const fn inactivity_floor_loc(&self) -> Location {
225 self.inactivity_floor_loc
226 }
227
228 pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
230 let Operation::CommitFloor(metadata, _) = self.log.read(*self.last_commit_loc).await?
231 else {
232 unreachable!("last commit should be a commit floor operation");
233 };
234
235 Ok(metadata)
236 }
237
238 pub async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
241 if prune_loc > self.inactivity_floor_loc {
242 return Err(Error::PruneBeyondMinRequired(
243 prune_loc,
244 self.inactivity_floor_loc,
245 ));
246 }
247
248 if !self.log.prune(*prune_loc).await? {
251 return Ok(());
252 }
253
254 debug!(
255 log_size = ?self.size(),
256 oldest_retained_loc = ?self.bounds().start,
257 ?prune_loc,
258 "pruned inactive ops"
259 );
260
261 Ok(())
262 }
263}
264
265impl<E, K, V, T> Db<E, K, V, T, Durable>
266where
267 E: Storage + Clock + Metrics,
268 K: Array,
269 V: VariableValue,
270 T: Translator,
271{
272 pub async fn init(
274 context: E,
275 cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
276 ) -> Result<Self, Error> {
277 let mut log = Journal::<E, Operation<K, V>>::init(
278 context.with_label("log"),
279 JournalConfig {
280 partition: cfg.log_partition,
281 items_per_section: cfg.log_items_per_section,
282 compression: cfg.log_compression,
283 codec_config: cfg.log_codec_config,
284 page_cache: cfg.page_cache,
285 write_buffer: cfg.log_write_buffer,
286 },
287 )
288 .await?;
289
290 if log.rewind_to(|op| op.is_commit()).await? == 0 {
292 warn!("Log is empty, initializing new db");
293 log.append(Operation::CommitFloor(None, Location::new_unchecked(0)))
294 .await?;
295 }
296
297 log.sync().await?;
300
301 let last_commit_loc =
302 Location::new_unchecked(log.size().checked_sub(1).expect("commit should exist"));
303 let op = log.read(*last_commit_loc).await?;
304 let inactivity_floor_loc = op.has_floor().expect("last op should be a commit");
305
306 let mut snapshot = Index::new(context.with_label("snapshot"), cfg.translator);
308 let active_keys =
309 build_snapshot_from_log(inactivity_floor_loc, &log, &mut snapshot, |_, _| {}).await?;
310
311 Ok(Self {
312 log,
313 snapshot,
314 active_keys,
315 inactivity_floor_loc,
316 last_commit_loc,
317 state: Durable,
318 })
319 }
320
321 pub fn into_dirty(self) -> Db<E, K, V, T, NonDurable> {
323 Db {
324 log: self.log,
325 snapshot: self.snapshot,
326 active_keys: self.active_keys,
327 inactivity_floor_loc: self.inactivity_floor_loc,
328 last_commit_loc: self.last_commit_loc,
329 state: NonDurable::default(),
330 }
331 }
332
333 pub async fn sync(&mut self) -> Result<(), Error> {
337 self.log.sync().await.map_err(Into::into)
338 }
339
340 pub async fn destroy(self) -> Result<(), Error> {
342 self.log.destroy().await.map_err(Into::into)
343 }
344}
345
346impl<E, K, V, T> Db<E, K, V, T, NonDurable>
347where
348 E: Storage + Clock + Metrics,
349 K: Array,
350 V: VariableValue,
351 T: Translator,
352{
353 const fn as_floor_helper(
354 &mut self,
355 ) -> FloorHelper<'_, Index<T, Location>, Journal<E, Operation<K, V>>> {
356 FloorHelper {
357 snapshot: &mut self.snapshot,
358 log: &mut self.log,
359 }
360 }
361
362 pub async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
365 let new_loc = self.size();
366 if update_key(&mut self.snapshot, &self.log, &key, new_loc)
367 .await?
368 .is_some()
369 {
370 self.state.steps += 1;
371 } else {
372 self.active_keys += 1;
373 }
374
375 self.log
376 .append(Operation::Update(Update(key, value)))
377 .await?;
378
379 Ok(())
380 }
381
382 pub async fn create(&mut self, key: K, value: V) -> Result<bool, Error> {
386 let new_loc = self.size();
387 if !create_key(&mut self.snapshot, &self.log, &key, new_loc).await? {
388 return Ok(false);
389 }
390
391 self.active_keys += 1;
392 self.log
393 .append(Operation::Update(Update(key, value)))
394 .await?;
395
396 Ok(true)
397 }
398
399 pub async fn delete(&mut self, key: K) -> Result<bool, Error> {
403 let r = delete_key(&mut self.snapshot, &self.log, &key).await?;
404 if r.is_none() {
405 return Ok(false);
406 }
407
408 self.log.append(Operation::Delete(key)).await?;
409 self.state.steps += 1;
410 self.active_keys -= 1;
411
412 Ok(true)
413 }
414
415 pub async fn commit(
428 mut self,
429 metadata: Option<V>,
430 ) -> Result<(Db<E, K, V, T, Durable>, Range<Location>), Error> {
431 let start_loc = self.last_commit_loc + 1;
432
433 if self.is_empty() {
436 self.inactivity_floor_loc = self.size();
437 debug!(tip = ?self.inactivity_floor_loc, "db is empty, raising floor to tip");
438 } else {
439 let steps_to_take = self.state.steps + 1;
440 for _ in 0..steps_to_take {
441 let loc = self.inactivity_floor_loc;
442 self.inactivity_floor_loc = self.as_floor_helper().raise_floor(loc).await?;
443 }
444 }
445
446 self.last_commit_loc = Location::new_unchecked(
448 self.log
449 .append(Operation::CommitFloor(metadata, self.inactivity_floor_loc))
450 .await?,
451 );
452
453 let range = start_loc..self.size();
454
455 self.log.commit().await?;
457
458 Ok((
459 Db {
460 log: self.log,
461 snapshot: self.snapshot,
462 active_keys: self.active_keys,
463 inactivity_floor_loc: self.inactivity_floor_loc,
464 last_commit_loc: self.last_commit_loc,
465 state: Durable,
466 },
467 range,
468 ))
469 }
470}
471
472impl<E, K, V, T> Persistable for Db<E, K, V, T, Durable>
473where
474 E: Storage + Clock + Metrics,
475 K: Array,
476 V: VariableValue,
477 T: Translator,
478{
479 type Error = Error;
480
481 async fn commit(&mut self) -> Result<(), Error> {
482 Ok(())
484 }
485
486 async fn sync(&mut self) -> Result<(), Error> {
487 self.sync().await
488 }
489
490 async fn destroy(self) -> Result<(), Error> {
491 self.destroy().await
492 }
493}
494
495impl<E, K, V, T, S> LogStore for Db<E, K, V, T, S>
496where
497 E: Storage + Clock + Metrics,
498 K: Array,
499 V: VariableValue,
500 T: Translator,
501 S: State,
502{
503 type Value = V;
504
505 fn bounds(&self) -> std::ops::Range<Location> {
506 self.bounds()
507 }
508
509 fn inactivity_floor_loc(&self) -> Location {
510 self.inactivity_floor_loc()
511 }
512
513 async fn get_metadata(&self) -> Result<Option<V>, Error> {
514 self.get_metadata().await
515 }
516
517 fn is_empty(&self) -> bool {
518 self.is_empty()
519 }
520}
521
522impl<E, K, V, T, S> PrunableStore for Db<E, K, V, T, S>
523where
524 E: Storage + Clock + Metrics,
525 K: Array,
526 V: VariableValue,
527 T: Translator,
528 S: State,
529{
530 async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
531 self.prune(prune_loc).await
532 }
533}
534
535impl<E, K, V, T, S> crate::kv::Gettable for Db<E, K, V, T, S>
536where
537 E: Storage + Clock + Metrics,
538 K: Array,
539 V: VariableValue,
540 T: Translator,
541 S: State,
542{
543 type Key = K;
544 type Value = V;
545 type Error = Error;
546
547 async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
548 self.get(key).await
549 }
550}
551
552impl<E, K, V, T> Updatable for Db<E, K, V, T, NonDurable>
553where
554 E: Storage + Clock + Metrics,
555 K: Array,
556 V: VariableValue,
557 T: Translator,
558{
559 async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
560 self.update(key, value).await
561 }
562}
563
564impl<E, K, V, T> Deletable for Db<E, K, V, T, NonDurable>
565where
566 E: Storage + Clock + Metrics,
567 K: Array,
568 V: VariableValue,
569 T: Translator,
570{
571 async fn delete(&mut self, key: Self::Key) -> Result<bool, Self::Error> {
572 self.delete(key).await
573 }
574}
575
576impl<E, K, V, T> Batchable for Db<E, K, V, T, NonDurable>
577where
578 E: Storage + Clock + Metrics,
579 K: Array,
580 V: VariableValue,
581 T: Translator,
582{
583 async fn write_batch<'a, Iter>(&'a mut self, iter: Iter) -> Result<(), Self::Error>
584 where
585 Iter: Iterator<Item = (Self::Key, Option<Self::Value>)> + Send + 'a,
586 {
587 for (key, value) in iter {
588 if let Some(value) = value {
589 self.update(key, value).await?;
590 } else {
591 self.delete(key).await?;
592 }
593 }
594 Ok(())
595 }
596}
597
598#[cfg(test)]
599mod test {
600 use super::*;
601 use crate::{
602 kv::{
603 tests::{
604 assert_batchable, assert_deletable, assert_gettable, assert_send, assert_updatable,
605 },
606 Gettable as _,
607 },
608 qmdb::store::tests::{assert_log_store, assert_prunable_store},
609 translator::TwoCap,
610 };
611 use commonware_cryptography::{
612 blake3::{Blake3, Digest},
613 Hasher as _,
614 };
615 use commonware_macros::test_traced;
616 use commonware_math::algebra::Random;
617 use commonware_runtime::{deterministic, Runner};
618 use commonware_utils::{NZUsize, NZU16, NZU64};
619 use std::num::NonZeroU16;
620
621 const PAGE_SIZE: NonZeroU16 = NZU16!(77);
622 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
623
624 type TestStore = Db<deterministic::Context, Digest, Vec<u8>, TwoCap, Durable>;
626
627 async fn create_test_store(context: deterministic::Context) -> TestStore {
628 let cfg = Config {
629 log_partition: "journal".to_string(),
630 log_write_buffer: NZUsize!(64 * 1024),
631 log_compression: None,
632 log_codec_config: ((0..=10000).into(), ()),
633 log_items_per_section: NZU64!(7),
634 translator: TwoCap,
635 page_cache: CacheRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
636 };
637 TestStore::init(context, cfg).await.unwrap()
638 }
639
640 #[test_traced("DEBUG")]
641 pub fn test_store_construct_empty() {
642 let executor = deterministic::Runner::default();
643 executor.start(|mut context| async move {
644 let mut db = create_test_store(context.with_label("store_0")).await;
645 assert_eq!(db.bounds().end, 1);
646 assert_eq!(db.log.bounds().start, 0);
647 assert_eq!(db.inactivity_floor_loc(), 0);
648 assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
649 assert!(matches!(
650 db.prune(Location::new_unchecked(1)).await,
651 Err(Error::PruneBeyondMinRequired(_, _))
652 ));
653 assert!(db.get_metadata().await.unwrap().is_none());
654
655 let d1 = Digest::random(&mut context);
657 let v1 = vec![1, 2, 3];
658 let mut dirty = db.into_dirty();
659 dirty.update(d1, v1).await.unwrap();
660 drop(dirty);
661
662 let db = create_test_store(context.with_label("store_1"))
663 .await
664 .into_dirty();
665 assert_eq!(db.bounds().end, 1);
666
667 let metadata = vec![1, 2, 3];
669 let (mut db, range) = db.commit(Some(metadata.clone())).await.unwrap();
670 assert_eq!(range.start, 1);
671 assert_eq!(range.end, 2);
672 assert_eq!(db.bounds().end, 2);
673 assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
674 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
675
676 let mut db = create_test_store(context.with_label("store_2"))
677 .await
678 .into_dirty();
679 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
680
681 db.update(Digest::random(&mut context), vec![1, 2, 3])
684 .await
685 .unwrap();
686 let (mut db, _) = db.commit(None).await.unwrap();
687 for _ in 1..100 {
688 (db, _) = db.into_dirty().commit(None).await.unwrap();
689 assert!(db.bounds().end - db.inactivity_floor_loc <= 3);
692 assert!(db.get_metadata().await.unwrap().is_none());
693 }
694
695 db.destroy().await.unwrap();
696 });
697 }
698
699 #[test_traced("DEBUG")]
700 fn test_store_construct_basic() {
701 let executor = deterministic::Runner::default();
702
703 executor.start(|mut ctx| async move {
704 let mut db = create_test_store(ctx.with_label("store_0"))
705 .await
706 .into_dirty();
707
708 assert_eq!(db.bounds().end, 1);
710 assert_eq!(db.inactivity_floor_loc, 0);
711
712 let key = Digest::random(&mut ctx);
713 let value = vec![2, 3, 4, 5];
714
715 let result = db.get(&key).await;
717 assert!(result.unwrap().is_none());
718
719 db.update(key, value.clone()).await.unwrap();
721
722 assert_eq!(db.bounds().end, 2);
723 assert_eq!(db.inactivity_floor_loc, 0);
724
725 let fetched_value = db.get(&key).await.unwrap();
727 assert_eq!(fetched_value.unwrap(), value);
728
729 drop(db);
731
732 let mut db = create_test_store(ctx.with_label("store_1"))
734 .await
735 .into_dirty();
736
737 assert_eq!(db.bounds().end, 1);
739 assert_eq!(db.inactivity_floor_loc, 0);
740 assert!(db.get_metadata().await.unwrap().is_none());
741
742 db.update(key, value.clone()).await.unwrap();
744
745 assert_eq!(db.bounds().end, 2);
746 assert_eq!(db.inactivity_floor_loc, 0);
747
748 let metadata = vec![99, 100];
750 let (db, range) = db.commit(Some(metadata.clone())).await.unwrap();
751 assert_eq!(range.start, 1);
752 assert_eq!(range.end, 4);
753 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
754
755 assert_eq!(db.bounds().end, 4);
759 assert_eq!(db.inactivity_floor_loc, 2);
760
761 let mut db = create_test_store(ctx.with_label("store_2"))
763 .await
764 .into_dirty();
765
766 assert_eq!(db.bounds().end, 4);
768 assert_eq!(db.inactivity_floor_loc, 2);
769
770 let fetched_value = db.get(&key).await.unwrap();
772 assert_eq!(fetched_value.unwrap(), value);
773
774 let (k1, v1) = (Digest::random(&mut ctx), vec![2, 3, 4, 5, 6]);
776 let (k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8]);
777 db.update(k1, v1.clone()).await.unwrap();
778 db.update(k2, v2.clone()).await.unwrap();
779
780 assert_eq!(db.bounds().end, 6);
781 assert_eq!(db.inactivity_floor_loc, 2);
782
783 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
785
786 let (db, range) = db.commit(None).await.unwrap();
787 assert_eq!(range.start, 4);
788 assert_eq!(range.end, db.bounds().end);
789 assert_eq!(db.get_metadata().await.unwrap(), None);
790 let mut db = db.into_dirty();
791
792 assert_eq!(db.bounds().end, 8);
793 assert_eq!(db.inactivity_floor_loc, 3);
794
795 assert_eq!(db.get(&key).await.unwrap().unwrap(), value);
797 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
798 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
799
800 let mut v1_updated = db.get(&k1).await.unwrap().unwrap();
802 v1_updated.push(7);
803 db.update(k1, v1_updated).await.unwrap();
804 let (db, _) = db.commit(None).await.unwrap();
805 assert_eq!(db.get(&k1).await.unwrap().unwrap(), vec![2, 3, 4, 5, 6, 7]);
806
807 let mut db = db.into_dirty();
809 let k3 = Digest::random(&mut ctx);
810 db.update(k3, vec![8]).await.unwrap();
811 let (db, _) = db.commit(None).await.unwrap();
812 assert_eq!(db.get(&k3).await.unwrap().unwrap(), vec![8]);
813
814 db.destroy().await.unwrap();
816 });
817 }
818
819 #[test_traced("DEBUG")]
820 fn test_store_log_replay() {
821 let executor = deterministic::Runner::default();
822
823 executor.start(|mut ctx| async move {
824 let mut db = create_test_store(ctx.with_label("store_0"))
825 .await
826 .into_dirty();
827
828 const UPDATES: u64 = 100;
830 let k = Digest::random(&mut ctx);
831 for _ in 0..UPDATES {
832 let v = vec![1, 2, 3, 4, 5];
833 db.update(k, v.clone()).await.unwrap();
834 }
835
836 let iter = db.snapshot.get(&k);
837 assert_eq!(iter.count(), 1);
838
839 let (mut db, _) = db.commit(None).await.unwrap();
840 db.sync().await.unwrap();
841 drop(db);
842
843 let mut db = create_test_store(ctx.with_label("store_1")).await;
845 db.prune(db.inactivity_floor_loc()).await.unwrap();
846
847 let iter = db.snapshot.get(&k);
848 assert_eq!(iter.count(), 1);
849
850 assert_eq!(db.bounds().end, UPDATES * 2 + 2);
852 let expected_floor = UPDATES * 2;
854 assert_eq!(db.inactivity_floor_loc, expected_floor);
855
856 assert_eq!(db.log.bounds().start, expected_floor - expected_floor % 7);
859
860 db.destroy().await.unwrap();
861 });
862 }
863
864 #[test_traced("DEBUG")]
865 fn test_store_build_snapshot_keys_with_shared_prefix() {
866 let executor = deterministic::Runner::default();
867
868 executor.start(|mut ctx| async move {
869 let mut db = create_test_store(ctx.with_label("store_0"))
870 .await
871 .into_dirty();
872
873 let (k1, v1) = (Digest::random(&mut ctx), vec![1, 2, 3, 4, 5]);
874 let (mut k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8, 9, 10]);
875
876 k2.0[0..2].copy_from_slice(&k1.0[0..2]);
878
879 db.update(k1, v1.clone()).await.unwrap();
880 db.update(k2, v2.clone()).await.unwrap();
881
882 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
883 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
884
885 let (mut db, _) = db.commit(None).await.unwrap();
886 db.sync().await.unwrap();
887 drop(db);
888
889 let db = create_test_store(ctx.with_label("store_1")).await;
892
893 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
894 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
895
896 db.destroy().await.unwrap();
897 });
898 }
899
900 #[test_traced("DEBUG")]
901 fn test_store_delete() {
902 let executor = deterministic::Runner::default();
903
904 executor.start(|mut ctx| async move {
905 let mut db = create_test_store(ctx.with_label("store_0"))
906 .await
907 .into_dirty();
908
909 let k = Digest::random(&mut ctx);
911 let v = vec![1, 2, 3, 4, 5];
912 db.update(k, v.clone()).await.unwrap();
913 let (db, _) = db.commit(None).await.unwrap();
914
915 let fetched_value = db.get(&k).await.unwrap();
917 assert_eq!(fetched_value.unwrap(), v);
918
919 let mut db = db.into_dirty();
921 assert!(db.delete(k).await.unwrap());
922
923 let fetched_value = db.get(&k).await.unwrap();
925 assert!(fetched_value.is_none());
926 assert!(!db.delete(k).await.unwrap());
927
928 let _ = db.commit(None).await.unwrap();
930
931 let mut db = create_test_store(ctx.with_label("store_1"))
933 .await
934 .into_dirty();
935 let fetched_value = db.get(&k).await.unwrap();
936 assert!(fetched_value.is_none());
937
938 db.update(k, v.clone()).await.unwrap();
940 let fetched_value = db.get(&k).await.unwrap();
941 assert_eq!(fetched_value.unwrap(), v);
942
943 let _ = db.commit(None).await.unwrap();
945
946 let mut db = create_test_store(ctx.with_label("store_2"))
949 .await
950 .into_dirty();
951 let fetched_value = db.get(&k).await.unwrap();
952 assert_eq!(fetched_value.unwrap(), v);
953
954 let k_n = Digest::random(&mut ctx);
956 db.delete(k_n).await.unwrap();
957
958 let (db, range) = db.commit(None).await.unwrap();
959 assert_eq!(range.start, 9);
960 assert_eq!(range.end, 11);
961
962 assert!(db.get(&k_n).await.unwrap().is_none());
963 assert!(db.get(&k).await.unwrap().is_some());
965
966 db.destroy().await.unwrap();
967 });
968 }
969
970 #[test_traced("DEBUG")]
972 fn test_store_pruning() {
973 let executor = deterministic::Runner::default();
974
975 executor.start(|mut ctx| async move {
976 let mut db = create_test_store(ctx.with_label("store"))
977 .await
978 .into_dirty();
979
980 let k_a = Digest::random(&mut ctx);
981 let k_b = Digest::random(&mut ctx);
982
983 let v_a = vec![1];
984 let v_b = vec![];
985 let v_c = vec![4, 5, 6];
986
987 db.update(k_a, v_a.clone()).await.unwrap();
988 db.update(k_b, v_b.clone()).await.unwrap();
989
990 let (db, _) = db.commit(None).await.unwrap();
991 assert_eq!(db.bounds().end, 5);
992 assert_eq!(db.inactivity_floor_loc, 2);
993 assert_eq!(db.get(&k_a).await.unwrap().unwrap(), v_a);
994
995 let mut db = db.into_dirty();
996 db.update(k_b, v_a.clone()).await.unwrap();
997 db.update(k_a, v_c.clone()).await.unwrap();
998
999 let (db, _) = db.commit(None).await.unwrap();
1000 assert_eq!(db.bounds().end, 11);
1001 assert_eq!(db.inactivity_floor_loc, 8);
1002 assert_eq!(db.get(&k_a).await.unwrap().unwrap(), v_c);
1003 assert_eq!(db.get(&k_b).await.unwrap().unwrap(), v_a);
1004
1005 db.destroy().await.unwrap();
1006 });
1007 }
1008
1009 #[test_traced("WARN")]
1010 pub fn test_store_db_recovery() {
1011 let executor = deterministic::Runner::default();
1012 const ELEMENTS: u64 = 1000;
1014 executor.start(|context| async move {
1015 let mut db = create_test_store(context.with_label("store_0"))
1016 .await
1017 .into_dirty();
1018
1019 for i in 0u64..ELEMENTS {
1020 let k = Blake3::hash(&i.to_be_bytes());
1021 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1022 db.update(k, v.clone()).await.unwrap();
1023 }
1024
1025 drop(db);
1027 let db = create_test_store(context.with_label("store_1")).await;
1028 assert_eq!(db.bounds().end, 1);
1029
1030 let mut db = db.into_dirty();
1032 for i in 0u64..ELEMENTS {
1033 let k = Blake3::hash(&i.to_be_bytes());
1034 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1035 db.update(k, v.clone()).await.unwrap();
1036 }
1037 let (db, _) = db.commit(None).await.unwrap();
1038 let op_count = db.bounds().end;
1039
1040 let mut db = db.into_dirty();
1042 for i in 0u64..ELEMENTS {
1043 if i % 3 != 0 {
1044 continue;
1045 }
1046 let k = Blake3::hash(&i.to_be_bytes());
1047 let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1048 db.update(k, v.clone()).await.unwrap();
1049 }
1050
1051 drop(db);
1053 let mut db = create_test_store(context.with_label("store_2"))
1054 .await
1055 .into_dirty();
1056 assert_eq!(db.bounds().end, op_count);
1057
1058 for i in 0u64..ELEMENTS {
1060 if i % 3 != 0 {
1061 continue;
1062 }
1063 let k = Blake3::hash(&i.to_be_bytes());
1064 let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1065 db.update(k, v.clone()).await.unwrap();
1066 }
1067 let (db, _) = db.commit(None).await.unwrap();
1068 let op_count = db.bounds().end;
1069 assert_eq!(op_count, 1673);
1070 assert_eq!(db.snapshot.items(), 1000);
1071
1072 let mut db = db.into_dirty();
1074 for i in 0u64..ELEMENTS {
1075 if i % 7 != 1 {
1076 continue;
1077 }
1078 let k = Blake3::hash(&i.to_be_bytes());
1079 db.delete(k).await.unwrap();
1080 }
1081
1082 drop(db);
1084 let db = create_test_store(context.with_label("store_3")).await;
1085 assert_eq!(db.bounds().end, op_count);
1086
1087 let mut db = db;
1089 db.sync().await.unwrap();
1090 drop(db);
1091 let mut db = create_test_store(context.with_label("store_4"))
1092 .await
1093 .into_dirty();
1094 assert_eq!(db.bounds().end, op_count);
1095
1096 for i in 0u64..ELEMENTS {
1098 if i % 7 != 1 {
1099 continue;
1100 }
1101 let k = Blake3::hash(&i.to_be_bytes());
1102 db.delete(k).await.unwrap();
1103 }
1104 let (mut db, _) = db.commit(None).await.unwrap();
1105
1106 assert_eq!(db.bounds().end, 1961);
1107 assert_eq!(db.inactivity_floor_loc, 756);
1108
1109 db.prune(db.inactivity_floor_loc()).await.unwrap();
1110 assert_eq!(db.log.bounds().start, 756 );
1111 assert_eq!(db.snapshot.items(), 857);
1112
1113 db.destroy().await.unwrap();
1114 });
1115 }
1116
1117 #[test_traced("DEBUG")]
1118 fn test_store_batchable() {
1119 let executor = deterministic::Runner::default();
1120
1121 executor.start(|mut ctx| async move {
1122 let mut db = create_test_store(ctx.with_label("store_0"))
1123 .await
1124 .into_dirty();
1125
1126 assert_eq!(db.bounds().end, 1);
1128 assert_eq!(db.inactivity_floor_loc, 0);
1129
1130 let key = Digest::random(&mut ctx);
1131 let value = vec![2, 3, 4, 5];
1132
1133 let mut batch = db.start_batch();
1134
1135 let result = batch.get(&key).await;
1137 assert!(result.unwrap().is_none());
1138
1139 batch.update(key, value.clone()).await.unwrap();
1141
1142 assert_eq!(db.bounds().end, 1); assert_eq!(db.inactivity_floor_loc, 0);
1144
1145 let fetched_value = batch.get(&key).await.unwrap();
1147 assert_eq!(fetched_value.unwrap(), value);
1148 db.write_batch(batch.into_iter()).await.unwrap();
1149 drop(db);
1150
1151 let mut db = create_test_store(ctx.with_label("store_1"))
1153 .await
1154 .into_dirty();
1155
1156 assert_eq!(db.bounds().end, 1);
1158 assert_eq!(db.inactivity_floor_loc, 0);
1159 assert!(db.get_metadata().await.unwrap().is_none());
1160
1161 let mut batch = db.start_batch();
1163 batch.update(key, value.clone()).await.unwrap();
1164
1165 db.write_batch(batch.into_iter()).await.unwrap();
1167 assert_eq!(db.bounds().end, 2);
1168 assert_eq!(db.inactivity_floor_loc, 0);
1169 let metadata = vec![99, 100];
1170 let (db, range) = db.commit(Some(metadata.clone())).await.unwrap();
1171 assert_eq!(range.start, 1);
1172 assert_eq!(range.end, 4);
1173 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
1174 drop(db);
1175
1176 let db = create_test_store(ctx.with_label("store_2")).await;
1178
1179 assert_eq!(db.bounds().end, 4);
1181 assert_eq!(db.inactivity_floor_loc, 2);
1182
1183 let fetched_value = db.get(&key).await.unwrap();
1185 assert_eq!(fetched_value.unwrap(), value);
1186
1187 db.destroy().await.unwrap();
1189 });
1190 }
1191
1192 #[allow(dead_code)]
1193 fn assert_durable_futures_are_send(db: &mut TestStore, key: Digest, loc: Location) {
1194 assert_log_store(db);
1195 assert_prunable_store(db, loc);
1196 assert_gettable(db, &key);
1197 assert_send(db.sync());
1198 }
1199
1200 #[allow(dead_code)]
1201 fn assert_dirty_futures_are_send(
1202 db: &mut Db<deterministic::Context, Digest, Vec<u8>, TwoCap, NonDurable>,
1203 key: Digest,
1204 value: Vec<u8>,
1205 ) {
1206 assert_log_store(db);
1207 assert_gettable(db, &key);
1208 assert_updatable(db, key, value.clone());
1209 assert_deletable(db, key);
1210 assert_batchable(db, key, value);
1211 }
1212
1213 #[allow(dead_code)]
1214 fn assert_dirty_commit_is_send(
1215 db: Db<deterministic::Context, Digest, Vec<u8>, TwoCap, NonDurable>,
1216 ) {
1217 assert_send(db.commit(None));
1218 }
1219}