1use crate::{
77 index::{unordered::Index, Unordered as _},
78 journal::contiguous::{
79 variable::{Config as JournalConfig, Journal},
80 MutableContiguous as _,
81 },
82 kv::{Batchable, Deletable, Updatable},
83 mmr::Location,
84 qmdb::{
85 any::{
86 unordered::{variable::Operation, Update},
87 VariableValue,
88 },
89 build_snapshot_from_log, create_key, delete_key,
90 operation::{Committable as _, Operation as _},
91 store::{Durable, LogStore, NonDurable, PrunableStore, State},
92 update_key, Error, FloorHelper,
93 },
94 translator::Translator,
95 Persistable,
96};
97use commonware_codec::Read;
98use commonware_runtime::{buffer::PoolRef, Clock, Metrics, Storage};
99use commonware_utils::Array;
100use core::ops::Range;
101use std::num::{NonZeroU64, NonZeroUsize};
102use tracing::{debug, warn};
103
104#[derive(Clone)]
106pub struct Config<T: Translator, C> {
107 pub log_partition: String,
109
110 pub log_write_buffer: NonZeroUsize,
112
113 pub log_compression: Option<u8>,
115
116 pub log_codec_config: C,
118
119 pub log_items_per_section: NonZeroU64,
121
122 pub translator: T,
124
125 pub buffer_pool: PoolRef,
127}
128
129pub struct Db<E, K, V, T, S = Durable>
131where
132 E: Storage + Clock + Metrics,
133 K: Array,
134 V: VariableValue,
135 T: Translator,
136 S: State,
137{
138 log: Journal<E, Operation<K, V>>,
145
146 snapshot: Index<T, Location>,
153
154 active_keys: usize,
156
157 pub inactivity_floor_loc: Location,
160
161 pub last_commit_loc: Location,
163
164 pub state: S,
166}
167
168impl<E, K, V, T, S> Db<E, K, V, T, S>
169where
170 E: Storage + Clock + Metrics,
171 K: Array,
172 V: VariableValue,
173 T: Translator,
174 S: State,
175{
176 pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
178 for &loc in self.snapshot.get(key) {
179 let Operation::Update(Update(k, v)) = self.get_op(loc).await? else {
180 unreachable!("location ({loc}) does not reference update operation");
181 };
182
183 if &k == key {
184 return Ok(Some(v));
185 }
186 }
187
188 Ok(None)
189 }
190
191 pub const fn is_empty(&self) -> bool {
193 self.active_keys == 0
194 }
195
196 async fn get_op(&self, loc: Location) -> Result<Operation<K, V>, Error> {
200 assert!(loc < self.op_count());
201
202 self.log.read(*loc).await.map_err(|e| match e {
205 crate::journal::Error::ItemPruned(_) => Error::OperationPruned(loc),
206 e => Error::Journal(e),
207 })
208 }
209
210 pub const fn op_count(&self) -> Location {
213 Location::new_unchecked(self.log.size())
214 }
215
216 pub const fn inactivity_floor_loc(&self) -> Location {
219 self.inactivity_floor_loc
220 }
221
222 pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
224 let Operation::CommitFloor(metadata, _) = self.log.read(*self.last_commit_loc).await?
225 else {
226 unreachable!("last commit should be a commit floor operation");
227 };
228
229 Ok(metadata)
230 }
231
232 pub async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
235 if prune_loc > self.inactivity_floor_loc {
236 return Err(Error::PruneBeyondMinRequired(
237 prune_loc,
238 self.inactivity_floor_loc,
239 ));
240 }
241
242 if !self.log.prune(*prune_loc).await? {
245 return Ok(());
246 }
247
248 debug!(
249 log_size = ?self.op_count(),
250 oldest_retained_loc = ?self.log.oldest_retained_pos(),
251 ?prune_loc,
252 "pruned inactive ops"
253 );
254
255 Ok(())
256 }
257}
258
259impl<E, K, V, T> Db<E, K, V, T, Durable>
260where
261 E: Storage + Clock + Metrics,
262 K: Array,
263 V: VariableValue,
264 T: Translator,
265{
266 pub async fn init(
268 context: E,
269 cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
270 ) -> Result<Self, Error> {
271 let mut log = Journal::<E, Operation<K, V>>::init(
272 context.with_label("log"),
273 JournalConfig {
274 partition: cfg.log_partition,
275 items_per_section: cfg.log_items_per_section,
276 compression: cfg.log_compression,
277 codec_config: cfg.log_codec_config,
278 buffer_pool: cfg.buffer_pool,
279 write_buffer: cfg.log_write_buffer,
280 },
281 )
282 .await?;
283
284 if log.rewind_to(|op| op.is_commit()).await? == 0 {
286 warn!("Log is empty, initializing new db");
287 log.append(Operation::CommitFloor(None, Location::new_unchecked(0)))
288 .await?;
289 }
290
291 log.sync().await?;
294
295 let last_commit_loc =
296 Location::new_unchecked(log.size().checked_sub(1).expect("commit should exist"));
297 let op = log.read(*last_commit_loc).await?;
298 let inactivity_floor_loc = op.has_floor().expect("last op should be a commit");
299
300 let mut snapshot = Index::new(context.with_label("snapshot"), cfg.translator);
302 let active_keys =
303 build_snapshot_from_log(inactivity_floor_loc, &log, &mut snapshot, |_, _| {}).await?;
304
305 Ok(Self {
306 log,
307 snapshot,
308 active_keys,
309 inactivity_floor_loc,
310 last_commit_loc,
311 state: Durable,
312 })
313 }
314
315 pub fn into_dirty(self) -> Db<E, K, V, T, NonDurable> {
317 Db {
318 log: self.log,
319 snapshot: self.snapshot,
320 active_keys: self.active_keys,
321 inactivity_floor_loc: self.inactivity_floor_loc,
322 last_commit_loc: self.last_commit_loc,
323 state: NonDurable::default(),
324 }
325 }
326
327 pub async fn sync(&mut self) -> Result<(), Error> {
331 self.log.sync().await.map_err(Into::into)
332 }
333
334 pub async fn destroy(self) -> Result<(), Error> {
336 self.log.destroy().await.map_err(Into::into)
337 }
338}
339
340impl<E, K, V, T> Db<E, K, V, T, NonDurable>
341where
342 E: Storage + Clock + Metrics,
343 K: Array,
344 V: VariableValue,
345 T: Translator,
346{
347 const fn as_floor_helper(
348 &mut self,
349 ) -> FloorHelper<'_, Index<T, Location>, Journal<E, Operation<K, V>>> {
350 FloorHelper {
351 snapshot: &mut self.snapshot,
352 log: &mut self.log,
353 }
354 }
355
356 pub async fn update(&mut self, key: K, value: V) -> Result<(), Error> {
359 let new_loc = self.op_count();
360 if update_key(&mut self.snapshot, &self.log, &key, new_loc)
361 .await?
362 .is_some()
363 {
364 self.state.steps += 1;
365 } else {
366 self.active_keys += 1;
367 }
368
369 self.log
370 .append(Operation::Update(Update(key, value)))
371 .await?;
372
373 Ok(())
374 }
375
376 pub async fn create(&mut self, key: K, value: V) -> Result<bool, Error> {
380 let new_loc = self.op_count();
381 if !create_key(&mut self.snapshot, &self.log, &key, new_loc).await? {
382 return Ok(false);
383 }
384
385 self.active_keys += 1;
386 self.log
387 .append(Operation::Update(Update(key, value)))
388 .await?;
389
390 Ok(true)
391 }
392
393 pub async fn delete(&mut self, key: K) -> Result<bool, Error> {
397 let r = delete_key(&mut self.snapshot, &self.log, &key).await?;
398 if r.is_none() {
399 return Ok(false);
400 }
401
402 self.log.append(Operation::Delete(key)).await?;
403 self.state.steps += 1;
404 self.active_keys -= 1;
405
406 Ok(true)
407 }
408
409 pub async fn commit(
422 mut self,
423 metadata: Option<V>,
424 ) -> Result<(Db<E, K, V, T, Durable>, Range<Location>), Error> {
425 let start_loc = self.last_commit_loc + 1;
426
427 if self.is_empty() {
430 self.inactivity_floor_loc = self.op_count();
431 debug!(tip = ?self.inactivity_floor_loc, "db is empty, raising floor to tip");
432 } else {
433 let steps_to_take = self.state.steps + 1;
434 for _ in 0..steps_to_take {
435 let loc = self.inactivity_floor_loc;
436 self.inactivity_floor_loc = self.as_floor_helper().raise_floor(loc).await?;
437 }
438 }
439
440 self.last_commit_loc = Location::new_unchecked(
442 self.log
443 .append(Operation::CommitFloor(metadata, self.inactivity_floor_loc))
444 .await?,
445 );
446
447 let range = start_loc..self.op_count();
448
449 self.log.commit().await?;
451
452 Ok((
453 Db {
454 log: self.log,
455 snapshot: self.snapshot,
456 active_keys: self.active_keys,
457 inactivity_floor_loc: self.inactivity_floor_loc,
458 last_commit_loc: self.last_commit_loc,
459 state: Durable,
460 },
461 range,
462 ))
463 }
464}
465
466impl<E, K, V, T> Persistable for Db<E, K, V, T, Durable>
467where
468 E: Storage + Clock + Metrics,
469 K: Array,
470 V: VariableValue,
471 T: Translator,
472{
473 type Error = Error;
474
475 async fn commit(&mut self) -> Result<(), Error> {
476 Ok(())
478 }
479
480 async fn sync(&mut self) -> Result<(), Error> {
481 self.sync().await
482 }
483
484 async fn destroy(self) -> Result<(), Error> {
485 self.destroy().await
486 }
487}
488
489impl<E, K, V, T, S> LogStore for Db<E, K, V, T, S>
490where
491 E: Storage + Clock + Metrics,
492 K: Array,
493 V: VariableValue,
494 T: Translator,
495 S: State,
496{
497 type Value = V;
498
499 fn op_count(&self) -> Location {
500 self.op_count()
501 }
502
503 fn inactivity_floor_loc(&self) -> Location {
504 self.inactivity_floor_loc()
505 }
506
507 async fn get_metadata(&self) -> Result<Option<V>, Error> {
508 self.get_metadata().await
509 }
510
511 fn is_empty(&self) -> bool {
512 self.is_empty()
513 }
514}
515
516impl<E, K, V, T, S> PrunableStore for Db<E, K, V, T, S>
517where
518 E: Storage + Clock + Metrics,
519 K: Array,
520 V: VariableValue,
521 T: Translator,
522 S: State,
523{
524 async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
525 self.prune(prune_loc).await
526 }
527}
528
529impl<E, K, V, T, S> crate::kv::Gettable for Db<E, K, V, T, S>
530where
531 E: Storage + Clock + Metrics,
532 K: Array,
533 V: VariableValue,
534 T: Translator,
535 S: State,
536{
537 type Key = K;
538 type Value = V;
539 type Error = Error;
540
541 async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
542 self.get(key).await
543 }
544}
545
546impl<E, K, V, T> Updatable for Db<E, K, V, T, NonDurable>
547where
548 E: Storage + Clock + Metrics,
549 K: Array,
550 V: VariableValue,
551 T: Translator,
552{
553 async fn update(&mut self, key: Self::Key, value: Self::Value) -> Result<(), Self::Error> {
554 self.update(key, value).await
555 }
556}
557
558impl<E, K, V, T> Deletable for Db<E, K, V, T, NonDurable>
559where
560 E: Storage + Clock + Metrics,
561 K: Array,
562 V: VariableValue,
563 T: Translator,
564{
565 async fn delete(&mut self, key: Self::Key) -> Result<bool, Self::Error> {
566 self.delete(key).await
567 }
568}
569
570impl<E, K, V, T> Batchable for Db<E, K, V, T, NonDurable>
571where
572 E: Storage + Clock + Metrics,
573 K: Array,
574 V: VariableValue,
575 T: Translator,
576{
577 async fn write_batch<'a, Iter>(&'a mut self, iter: Iter) -> Result<(), Self::Error>
578 where
579 Iter: Iterator<Item = (Self::Key, Option<Self::Value>)> + Send + 'a,
580 {
581 for (key, value) in iter {
582 if let Some(value) = value {
583 self.update(key, value).await?;
584 } else {
585 self.delete(key).await?;
586 }
587 }
588 Ok(())
589 }
590}
591
592#[cfg(test)]
593mod test {
594 use super::*;
595 use crate::{
596 kv::{
597 tests::{
598 assert_batchable, assert_deletable, assert_gettable, assert_send, assert_updatable,
599 },
600 Gettable as _,
601 },
602 qmdb::store::tests::{assert_log_store, assert_prunable_store},
603 translator::TwoCap,
604 };
605 use commonware_cryptography::{
606 blake3::{Blake3, Digest},
607 Hasher as _,
608 };
609 use commonware_macros::test_traced;
610 use commonware_math::algebra::Random;
611 use commonware_runtime::{deterministic, Runner};
612 use commonware_utils::{NZUsize, NZU16, NZU64};
613 use std::num::NonZeroU16;
614
615 const PAGE_SIZE: NonZeroU16 = NZU16!(77);
616 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
617
618 type TestStore = Db<deterministic::Context, Digest, Vec<u8>, TwoCap, Durable>;
620
621 async fn create_test_store(context: deterministic::Context) -> TestStore {
622 let cfg = Config {
623 log_partition: "journal".to_string(),
624 log_write_buffer: NZUsize!(64 * 1024),
625 log_compression: None,
626 log_codec_config: ((0..=10000).into(), ()),
627 log_items_per_section: NZU64!(7),
628 translator: TwoCap,
629 buffer_pool: PoolRef::new(PAGE_SIZE, PAGE_CACHE_SIZE),
630 };
631 TestStore::init(context, cfg).await.unwrap()
632 }
633
634 #[test_traced("DEBUG")]
635 pub fn test_store_construct_empty() {
636 let executor = deterministic::Runner::default();
637 executor.start(|mut context| async move {
638 let mut db = create_test_store(context.clone()).await;
639 assert_eq!(db.op_count(), 1);
640 assert_eq!(db.log.oldest_retained_pos(), Some(0));
641 assert_eq!(db.inactivity_floor_loc(), 0);
642 assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
643 assert!(matches!(
644 db.prune(Location::new_unchecked(1)).await,
645 Err(Error::PruneBeyondMinRequired(_, _))
646 ));
647 assert!(db.get_metadata().await.unwrap().is_none());
648
649 let d1 = Digest::random(&mut context);
651 let v1 = vec![1, 2, 3];
652 let mut dirty = db.into_dirty();
653 dirty.update(d1, v1).await.unwrap();
654 drop(dirty);
655
656 let db = create_test_store(context.clone()).await.into_dirty();
657 assert_eq!(db.op_count(), 1);
658
659 let metadata = vec![1, 2, 3];
661 let (mut db, range) = db.commit(Some(metadata.clone())).await.unwrap();
662 assert_eq!(range.start, 1);
663 assert_eq!(range.end, 2);
664 assert_eq!(db.op_count(), 2);
665 assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
666 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
667
668 let mut db = create_test_store(context.clone()).await.into_dirty();
669 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
670
671 db.update(Digest::random(&mut context), vec![1, 2, 3])
674 .await
675 .unwrap();
676 let (mut db, _) = db.commit(None).await.unwrap();
677 for _ in 1..100 {
678 (db, _) = db.into_dirty().commit(None).await.unwrap();
679 assert!(db.op_count() - db.inactivity_floor_loc <= 3);
682 assert!(db.get_metadata().await.unwrap().is_none());
683 }
684
685 db.destroy().await.unwrap();
686 });
687 }
688
689 #[test_traced("DEBUG")]
690 fn test_store_construct_basic() {
691 let executor = deterministic::Runner::default();
692
693 executor.start(|mut ctx| async move {
694 let mut db = create_test_store(ctx.with_label("store"))
695 .await
696 .into_dirty();
697
698 assert_eq!(db.op_count(), 1);
700 assert_eq!(db.inactivity_floor_loc, 0);
701
702 let key = Digest::random(&mut ctx);
703 let value = vec![2, 3, 4, 5];
704
705 let result = db.get(&key).await;
707 assert!(result.unwrap().is_none());
708
709 db.update(key, value.clone()).await.unwrap();
711
712 assert_eq!(db.op_count(), 2);
713 assert_eq!(db.inactivity_floor_loc, 0);
714
715 let fetched_value = db.get(&key).await.unwrap();
717 assert_eq!(fetched_value.unwrap(), value);
718
719 drop(db);
721
722 let mut db = create_test_store(ctx.with_label("store"))
724 .await
725 .into_dirty();
726
727 assert_eq!(db.op_count(), 1);
729 assert_eq!(db.inactivity_floor_loc, 0);
730 assert!(db.get_metadata().await.unwrap().is_none());
731
732 db.update(key, value.clone()).await.unwrap();
734
735 assert_eq!(db.op_count(), 2);
736 assert_eq!(db.inactivity_floor_loc, 0);
737
738 let metadata = vec![99, 100];
740 let (db, range) = db.commit(Some(metadata.clone())).await.unwrap();
741 assert_eq!(range.start, 1);
742 assert_eq!(range.end, 4);
743 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
744
745 assert_eq!(db.op_count(), 4);
749 assert_eq!(db.inactivity_floor_loc, 2);
750
751 let mut db = create_test_store(ctx.with_label("store"))
753 .await
754 .into_dirty();
755
756 assert_eq!(db.op_count(), 4);
758 assert_eq!(db.inactivity_floor_loc, 2);
759
760 let fetched_value = db.get(&key).await.unwrap();
762 assert_eq!(fetched_value.unwrap(), value);
763
764 let (k1, v1) = (Digest::random(&mut ctx), vec![2, 3, 4, 5, 6]);
766 let (k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8]);
767 db.update(k1, v1.clone()).await.unwrap();
768 db.update(k2, v2.clone()).await.unwrap();
769
770 assert_eq!(db.op_count(), 6);
771 assert_eq!(db.inactivity_floor_loc, 2);
772
773 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
775
776 let (db, range) = db.commit(None).await.unwrap();
777 assert_eq!(range.start, 4);
778 assert_eq!(range.end, db.op_count());
779 assert_eq!(db.get_metadata().await.unwrap(), None);
780 let mut db = db.into_dirty();
781
782 assert_eq!(db.op_count(), 8);
783 assert_eq!(db.inactivity_floor_loc, 3);
784
785 assert_eq!(db.get(&key).await.unwrap().unwrap(), value);
787 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
788 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
789
790 let mut v1_updated = db.get(&k1).await.unwrap().unwrap();
792 v1_updated.push(7);
793 db.update(k1, v1_updated).await.unwrap();
794 let (db, _) = db.commit(None).await.unwrap();
795 assert_eq!(db.get(&k1).await.unwrap().unwrap(), vec![2, 3, 4, 5, 6, 7]);
796
797 let mut db = db.into_dirty();
799 let k3 = Digest::random(&mut ctx);
800 db.update(k3, vec![8]).await.unwrap();
801 let (db, _) = db.commit(None).await.unwrap();
802 assert_eq!(db.get(&k3).await.unwrap().unwrap(), vec![8]);
803
804 db.destroy().await.unwrap();
806 });
807 }
808
809 #[test_traced("DEBUG")]
810 fn test_store_log_replay() {
811 let executor = deterministic::Runner::default();
812
813 executor.start(|mut ctx| async move {
814 let mut db = create_test_store(ctx.with_label("store"))
815 .await
816 .into_dirty();
817
818 const UPDATES: u64 = 100;
820 let k = Digest::random(&mut ctx);
821 for _ in 0..UPDATES {
822 let v = vec![1, 2, 3, 4, 5];
823 db.update(k, v.clone()).await.unwrap();
824 }
825
826 let iter = db.snapshot.get(&k);
827 assert_eq!(iter.count(), 1);
828
829 let (mut db, _) = db.commit(None).await.unwrap();
830 db.sync().await.unwrap();
831 drop(db);
832
833 let mut db = create_test_store(ctx.with_label("store")).await;
835 db.prune(db.inactivity_floor_loc()).await.unwrap();
836
837 let iter = db.snapshot.get(&k);
838 assert_eq!(iter.count(), 1);
839
840 assert_eq!(db.op_count(), UPDATES * 2 + 2);
842 let expected_floor = UPDATES * 2;
844 assert_eq!(db.inactivity_floor_loc, expected_floor);
845
846 assert_eq!(
849 db.log.oldest_retained_pos(),
850 Some(expected_floor - expected_floor % 7)
851 );
852
853 db.destroy().await.unwrap();
854 });
855 }
856
857 #[test_traced("DEBUG")]
858 fn test_store_build_snapshot_keys_with_shared_prefix() {
859 let executor = deterministic::Runner::default();
860
861 executor.start(|mut ctx| async move {
862 let mut db = create_test_store(ctx.with_label("store"))
863 .await
864 .into_dirty();
865
866 let (k1, v1) = (Digest::random(&mut ctx), vec![1, 2, 3, 4, 5]);
867 let (mut k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8, 9, 10]);
868
869 k2.0[0..2].copy_from_slice(&k1.0[0..2]);
871
872 db.update(k1, v1.clone()).await.unwrap();
873 db.update(k2, v2.clone()).await.unwrap();
874
875 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
876 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
877
878 let (mut db, _) = db.commit(None).await.unwrap();
879 db.sync().await.unwrap();
880 drop(db);
881
882 let db = create_test_store(ctx.with_label("store")).await;
885
886 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
887 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
888
889 db.destroy().await.unwrap();
890 });
891 }
892
893 #[test_traced("DEBUG")]
894 fn test_store_delete() {
895 let executor = deterministic::Runner::default();
896
897 executor.start(|mut ctx| async move {
898 let mut db = create_test_store(ctx.with_label("store"))
899 .await
900 .into_dirty();
901
902 let k = Digest::random(&mut ctx);
904 let v = vec![1, 2, 3, 4, 5];
905 db.update(k, v.clone()).await.unwrap();
906 let (db, _) = db.commit(None).await.unwrap();
907
908 let fetched_value = db.get(&k).await.unwrap();
910 assert_eq!(fetched_value.unwrap(), v);
911
912 let mut db = db.into_dirty();
914 assert!(db.delete(k).await.unwrap());
915
916 let fetched_value = db.get(&k).await.unwrap();
918 assert!(fetched_value.is_none());
919 assert!(!db.delete(k).await.unwrap());
920
921 let _ = db.commit(None).await.unwrap();
923
924 let mut db = create_test_store(ctx.with_label("store"))
926 .await
927 .into_dirty();
928 let fetched_value = db.get(&k).await.unwrap();
929 assert!(fetched_value.is_none());
930
931 db.update(k, v.clone()).await.unwrap();
933 let fetched_value = db.get(&k).await.unwrap();
934 assert_eq!(fetched_value.unwrap(), v);
935
936 let _ = db.commit(None).await.unwrap();
938
939 let mut db = create_test_store(ctx.with_label("store"))
942 .await
943 .into_dirty();
944 let fetched_value = db.get(&k).await.unwrap();
945 assert_eq!(fetched_value.unwrap(), v);
946
947 let k_n = Digest::random(&mut ctx);
949 db.delete(k_n).await.unwrap();
950
951 let (db, range) = db.commit(None).await.unwrap();
952 assert_eq!(range.start, 9);
953 assert_eq!(range.end, 11);
954
955 assert!(db.get(&k_n).await.unwrap().is_none());
956 assert!(db.get(&k).await.unwrap().is_some());
958
959 db.destroy().await.unwrap();
960 });
961 }
962
963 #[test_traced("DEBUG")]
965 fn test_store_pruning() {
966 let executor = deterministic::Runner::default();
967
968 executor.start(|mut ctx| async move {
969 let mut db = create_test_store(ctx.with_label("store"))
970 .await
971 .into_dirty();
972
973 let k_a = Digest::random(&mut ctx);
974 let k_b = Digest::random(&mut ctx);
975
976 let v_a = vec![1];
977 let v_b = vec![];
978 let v_c = vec![4, 5, 6];
979
980 db.update(k_a, v_a.clone()).await.unwrap();
981 db.update(k_b, v_b.clone()).await.unwrap();
982
983 let (db, _) = db.commit(None).await.unwrap();
984 assert_eq!(db.op_count(), 5);
985 assert_eq!(db.inactivity_floor_loc, 2);
986 assert_eq!(db.get(&k_a).await.unwrap().unwrap(), v_a);
987
988 let mut db = db.into_dirty();
989 db.update(k_b, v_a.clone()).await.unwrap();
990 db.update(k_a, v_c.clone()).await.unwrap();
991
992 let (db, _) = db.commit(None).await.unwrap();
993 assert_eq!(db.op_count(), 11);
994 assert_eq!(db.inactivity_floor_loc, 8);
995 assert_eq!(db.get(&k_a).await.unwrap().unwrap(), v_c);
996 assert_eq!(db.get(&k_b).await.unwrap().unwrap(), v_a);
997
998 db.destroy().await.unwrap();
999 });
1000 }
1001
1002 #[test_traced("WARN")]
1003 pub fn test_store_db_recovery() {
1004 let executor = deterministic::Runner::default();
1005 const ELEMENTS: u64 = 1000;
1007 executor.start(|context| async move {
1008 let mut db = create_test_store(context.with_label("store"))
1009 .await
1010 .into_dirty();
1011
1012 for i in 0u64..ELEMENTS {
1013 let k = Blake3::hash(&i.to_be_bytes());
1014 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1015 db.update(k, v.clone()).await.unwrap();
1016 }
1017
1018 drop(db);
1020 let db = create_test_store(context.with_label("store")).await;
1021 assert_eq!(db.op_count(), 1);
1022
1023 let mut db = db.into_dirty();
1025 for i in 0u64..ELEMENTS {
1026 let k = Blake3::hash(&i.to_be_bytes());
1027 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
1028 db.update(k, v.clone()).await.unwrap();
1029 }
1030 let (db, _) = db.commit(None).await.unwrap();
1031 let op_count = db.op_count();
1032
1033 let mut db = db.into_dirty();
1035 for i in 0u64..ELEMENTS {
1036 if i % 3 != 0 {
1037 continue;
1038 }
1039 let k = Blake3::hash(&i.to_be_bytes());
1040 let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1041 db.update(k, v.clone()).await.unwrap();
1042 }
1043
1044 drop(db);
1046 let mut db = create_test_store(context.with_label("store"))
1047 .await
1048 .into_dirty();
1049 assert_eq!(db.op_count(), op_count);
1050
1051 for i in 0u64..ELEMENTS {
1053 if i % 3 != 0 {
1054 continue;
1055 }
1056 let k = Blake3::hash(&i.to_be_bytes());
1057 let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
1058 db.update(k, v.clone()).await.unwrap();
1059 }
1060 let (db, _) = db.commit(None).await.unwrap();
1061 let op_count = db.op_count();
1062 assert_eq!(op_count, 1673);
1063 assert_eq!(db.snapshot.items(), 1000);
1064
1065 let mut db = db.into_dirty();
1067 for i in 0u64..ELEMENTS {
1068 if i % 7 != 1 {
1069 continue;
1070 }
1071 let k = Blake3::hash(&i.to_be_bytes());
1072 db.delete(k).await.unwrap();
1073 }
1074
1075 drop(db);
1077 let db = create_test_store(context.with_label("store")).await;
1078 assert_eq!(db.op_count(), op_count);
1079
1080 let mut db = db;
1082 db.sync().await.unwrap();
1083 drop(db);
1084 let mut db = create_test_store(context.with_label("store"))
1085 .await
1086 .into_dirty();
1087 assert_eq!(db.op_count(), op_count);
1088
1089 for i in 0u64..ELEMENTS {
1091 if i % 7 != 1 {
1092 continue;
1093 }
1094 let k = Blake3::hash(&i.to_be_bytes());
1095 db.delete(k).await.unwrap();
1096 }
1097 let (mut db, _) = db.commit(None).await.unwrap();
1098
1099 assert_eq!(db.op_count(), 1961);
1100 assert_eq!(db.inactivity_floor_loc, 756);
1101
1102 db.prune(db.inactivity_floor_loc()).await.unwrap();
1103 assert_eq!(db.log.oldest_retained_pos(), Some(756 ));
1104 assert_eq!(db.snapshot.items(), 857);
1105
1106 db.destroy().await.unwrap();
1107 });
1108 }
1109
1110 #[test_traced("DEBUG")]
1111 fn test_store_batchable() {
1112 let executor = deterministic::Runner::default();
1113
1114 executor.start(|mut ctx| async move {
1115 let mut db = create_test_store(ctx.with_label("store"))
1116 .await
1117 .into_dirty();
1118
1119 assert_eq!(db.op_count(), 1);
1121 assert_eq!(db.inactivity_floor_loc, 0);
1122
1123 let key = Digest::random(&mut ctx);
1124 let value = vec![2, 3, 4, 5];
1125
1126 let mut batch = db.start_batch();
1127
1128 let result = batch.get(&key).await;
1130 assert!(result.unwrap().is_none());
1131
1132 batch.update(key, value.clone()).await.unwrap();
1134
1135 assert_eq!(db.op_count(), 1); assert_eq!(db.inactivity_floor_loc, 0);
1137
1138 let fetched_value = batch.get(&key).await.unwrap();
1140 assert_eq!(fetched_value.unwrap(), value);
1141 db.write_batch(batch.into_iter()).await.unwrap();
1142 drop(db);
1143
1144 let mut db = create_test_store(ctx.with_label("store"))
1146 .await
1147 .into_dirty();
1148
1149 assert_eq!(db.op_count(), 1);
1151 assert_eq!(db.inactivity_floor_loc, 0);
1152 assert!(db.get_metadata().await.unwrap().is_none());
1153
1154 let mut batch = db.start_batch();
1156 batch.update(key, value.clone()).await.unwrap();
1157
1158 db.write_batch(batch.into_iter()).await.unwrap();
1160 assert_eq!(db.op_count(), 2);
1161 assert_eq!(db.inactivity_floor_loc, 0);
1162 let metadata = vec![99, 100];
1163 let (db, range) = db.commit(Some(metadata.clone())).await.unwrap();
1164 assert_eq!(range.start, 1);
1165 assert_eq!(range.end, 4);
1166 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
1167 drop(db);
1168
1169 let db = create_test_store(ctx.with_label("store")).await;
1171
1172 assert_eq!(db.op_count(), 4);
1174 assert_eq!(db.inactivity_floor_loc, 2);
1175
1176 let fetched_value = db.get(&key).await.unwrap();
1178 assert_eq!(fetched_value.unwrap(), value);
1179
1180 db.destroy().await.unwrap();
1182 });
1183 }
1184
1185 #[allow(dead_code)]
1186 fn assert_durable_futures_are_send(db: &mut TestStore, key: Digest, loc: Location) {
1187 assert_log_store(db);
1188 assert_prunable_store(db, loc);
1189 assert_gettable(db, &key);
1190 assert_send(db.sync());
1191 }
1192
1193 #[allow(dead_code)]
1194 fn assert_dirty_futures_are_send(
1195 db: &mut Db<deterministic::Context, Digest, Vec<u8>, TwoCap, NonDurable>,
1196 key: Digest,
1197 value: Vec<u8>,
1198 ) {
1199 assert_log_store(db);
1200 assert_gettable(db, &key);
1201 assert_updatable(db, key, value.clone());
1202 assert_deletable(db, key);
1203 assert_batchable(db, key, value);
1204 }
1205
1206 #[allow(dead_code)]
1207 fn assert_dirty_commit_is_send(
1208 db: Db<deterministic::Context, Digest, Vec<u8>, TwoCap, NonDurable>,
1209 ) {
1210 assert_send(db.commit(None));
1211 }
1212}