1use crate::{
80 index::{unordered::Index, Unordered as _},
81 journal::contiguous::{
82 variable::{Config as JournalConfig, Journal},
83 Mutable as _, Reader,
84 },
85 merkle::mmr::Location,
86 qmdb::{
87 any::{
88 unordered::{variable::Operation, Update},
89 VariableValue,
90 },
91 build_snapshot_from_log, delete_key,
92 operation::{Committable as _, Key, Operation as _},
93 update_key, FloorHelper,
94 },
95 translator::Translator,
96 Context, Persistable,
97};
98use commonware_codec::{CodecShared, Read};
99use commonware_utils::Array;
100use core::ops::Range;
101use std::collections::BTreeMap;
102use tracing::{debug, warn};
103
104type Error = crate::qmdb::Error<crate::mmr::Family>;
105
106#[derive(Clone)]
108pub struct Config<T: Translator, C> {
109 pub log: JournalConfig<C>,
111
112 pub translator: T,
114}
115
116pub struct Changeset<K: Key, V: CodecShared + Clone> {
118 diff: BTreeMap<K, Option<V>>,
119 metadata: Option<V>,
120}
121
122impl<K: Key, V: CodecShared + Clone> Changeset<K, V> {
123 fn into_parts(self) -> (BTreeMap<K, Option<V>>, Option<V>) {
124 (self.diff, self.metadata)
125 }
126}
127
128impl<K: Key, V: CodecShared + Clone> FromIterator<(K, Option<V>)> for Changeset<K, V> {
129 fn from_iter<TIter: IntoIterator<Item = (K, Option<V>)>>(iter: TIter) -> Self {
130 Self {
131 diff: iter.into_iter().collect(),
132 metadata: None,
133 }
134 }
135}
136
137impl<K: Key, V: CodecShared + Clone, const N: usize> From<[(K, Option<V>); N]> for Changeset<K, V> {
138 fn from(items: [(K, Option<V>); N]) -> Self {
139 items.into_iter().collect()
140 }
141}
142
143pub struct Batch<'a, E, K, V, T>
145where
146 E: Context,
147 K: Array,
148 V: VariableValue,
149 T: Translator,
150{
151 db: &'a Db<E, K, V, T>,
152 diff: BTreeMap<K, Option<V>>,
153}
154
155impl<'a, E, K, V, T> Batch<'a, E, K, V, T>
156where
157 E: Context,
158 K: Array,
159 V: VariableValue,
160 T: Translator,
161{
162 const fn new(db: &'a Db<E, K, V, T>) -> Self {
163 Self {
164 db,
165 diff: BTreeMap::new(),
166 }
167 }
168
169 pub fn finalize(self, metadata: Option<V>) -> Changeset<K, V> {
171 Changeset {
172 diff: self.diff,
173 metadata,
174 }
175 }
176
177 pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
180 if let Some(value) = self.diff.get(key) {
181 return Ok(value.clone());
182 }
183 self.db.get(key).await
184 }
185
186 pub fn update(mut self, key: K, value: V) -> Self {
188 self.diff.insert(key, Some(value));
189 self
190 }
191
192 pub fn delete(mut self, key: K) -> Self {
194 self.diff.insert(key, None);
195 self
196 }
197}
198
199pub struct Db<E, K, V, T>
201where
202 E: Context,
203 K: Array,
204 V: VariableValue,
205 T: Translator,
206{
207 log: Journal<E, Operation<crate::mmr::Family, K, V>>,
214
215 snapshot: Index<T, Location>,
222
223 active_keys: usize,
225
226 pub inactivity_floor_loc: Location,
229
230 pub last_commit_loc: Location,
232
233 pub steps: u64,
236}
237
238impl<E, K, V, T> Db<E, K, V, T>
239where
240 E: Context,
241 K: Array,
242 V: VariableValue,
243 T: Translator,
244{
245 pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
247 for &loc in self.snapshot.get(key) {
248 let Operation::Update(Update(k, v)) = self.get_op(loc).await? else {
249 unreachable!("location ({loc}) does not reference update operation");
250 };
251
252 if &k == key {
253 return Ok(Some(v));
254 }
255 }
256
257 Ok(None)
258 }
259
260 pub const fn new_batch(&self) -> Batch<'_, E, K, V, T> {
262 Batch::new(self)
263 }
264
265 pub const fn is_empty(&self) -> bool {
267 self.active_keys == 0
268 }
269
270 async fn get_op(&self, loc: Location) -> Result<Operation<crate::mmr::Family, K, V>, Error> {
274 let reader = self.log.reader().await;
275 assert!(*loc < reader.bounds().end);
276 reader.read(*loc).await.map_err(|e| match e {
277 crate::journal::Error::ItemPruned(_) => Error::OperationPruned(loc),
278 e => Error::Journal(e),
279 })
280 }
281
282 pub async fn bounds(&self) -> std::ops::Range<Location> {
285 let bounds = self.log.reader().await.bounds();
286 Location::new(bounds.start)..Location::new(bounds.end)
287 }
288
289 pub async fn size(&self) -> Location {
291 Location::new(self.log.size().await)
292 }
293
294 pub const fn inactivity_floor_loc(&self) -> Location {
297 self.inactivity_floor_loc
298 }
299
300 pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
302 let Operation::CommitFloor(metadata, _) =
303 self.log.reader().await.read(*self.last_commit_loc).await?
304 else {
305 unreachable!("last commit should be a commit floor operation");
306 };
307
308 Ok(metadata)
309 }
310
311 pub async fn prune(&self, prune_loc: Location) -> Result<(), Error> {
314 if prune_loc > self.inactivity_floor_loc {
315 return Err(Error::PruneBeyondMinRequired(
316 prune_loc,
317 self.inactivity_floor_loc,
318 ));
319 }
320
321 if !self.log.prune(*prune_loc).await? {
324 return Ok(());
325 }
326
327 let bounds = self.log.reader().await.bounds();
328 let log_size = Location::new(bounds.end);
329 let oldest_retained_loc = Location::new(bounds.start);
330 debug!(
331 ?log_size,
332 ?oldest_retained_loc,
333 ?prune_loc,
334 "pruned inactive ops"
335 );
336
337 Ok(())
338 }
339
340 pub async fn init(
342 context: E,
343 cfg: Config<T, <Operation<crate::mmr::Family, K, V> as Read>::Cfg>,
344 ) -> Result<Self, Error> {
345 let mut log =
346 Journal::<E, Operation<crate::mmr::Family, K, V>>::init(context.child("log"), cfg.log)
347 .await?;
348
349 if log.rewind_to(|op| op.is_commit()).await? == 0 {
351 warn!("Log is empty, initializing new db");
352 log.append(&Operation::CommitFloor(None, Location::new(0)))
353 .await?;
354 }
355
356 log.sync().await?;
359
360 let last_commit_loc = Location::new(
361 log.size()
362 .await
363 .checked_sub(1)
364 .expect("commit should exist"),
365 );
366
367 let mut snapshot = Index::new(context.child("snapshot"), cfg.translator);
369 let (inactivity_floor_loc, active_keys) = {
370 let reader = log.reader().await;
371 let op = reader.read(*last_commit_loc).await?;
372 let inactivity_floor_loc = op.has_floor().expect("last op should be a commit");
373 if inactivity_floor_loc > last_commit_loc {
374 return Err(crate::qmdb::Error::DataCorrupted(
375 "inactivity floor exceeds last commit",
376 ));
377 }
378 let active_keys =
379 build_snapshot_from_log(inactivity_floor_loc, &reader, &mut snapshot, |_, _| {})
380 .await?;
381 (inactivity_floor_loc, active_keys)
382 };
383
384 Ok(Self {
385 log,
386 snapshot,
387 active_keys,
388 inactivity_floor_loc,
389 last_commit_loc,
390 steps: 0,
391 })
392 }
393
394 pub async fn sync(&self) -> Result<(), Error> {
398 self.log.sync().await.map_err(Into::into)
399 }
400
401 pub async fn destroy(self) -> Result<(), Error> {
403 self.log.destroy().await.map_err(Into::into)
404 }
405
406 #[allow(clippy::type_complexity)]
407 const fn as_floor_helper(
408 &mut self,
409 ) -> FloorHelper<
410 '_,
411 crate::mmr::Family,
412 Index<T, Location>,
413 Journal<E, Operation<crate::mmr::Family, K, V>>,
414 > {
415 FloorHelper {
416 snapshot: &mut self.snapshot,
417 log: &mut self.log,
418 }
419 }
420
421 pub async fn apply_batch(&mut self, batch: Changeset<K, V>) -> Result<Range<Location>, Error> {
427 let start_loc = self.last_commit_loc + 1;
428 let (diff, metadata) = batch.into_parts();
429
430 for (key, value) in diff {
431 if let Some(value) = value {
432 let updated = {
433 let reader = self.log.reader().await;
434 let new_loc = reader.bounds().end;
435 update_key::<crate::mmr::Family, _, _>(
436 &mut self.snapshot,
437 &reader,
438 &key,
439 Location::new(new_loc),
440 )
441 .await?
442 };
443 if updated.is_some() {
444 self.steps += 1;
445 } else {
446 self.active_keys += 1;
447 }
448 self.log
449 .append(&Operation::Update(Update(key, value)))
450 .await?;
451 } else {
452 let deleted = {
453 let reader = self.log.reader().await;
454 delete_key::<crate::mmr::Family, _, _>(&mut self.snapshot, &reader, &key)
455 .await?
456 };
457 if deleted.is_some() {
458 self.log.append(&Operation::Delete(key)).await?;
459 self.steps += 1;
460 self.active_keys -= 1;
461 }
462 }
463 }
464
465 if self.is_empty() {
468 self.inactivity_floor_loc = self.size().await;
469 debug!(tip = ?self.inactivity_floor_loc, "db is empty, raising floor to tip");
470 } else {
471 let steps_to_take = self.steps + 1;
472 for _ in 0..steps_to_take {
473 let loc = self.inactivity_floor_loc;
474 self.inactivity_floor_loc = self.as_floor_helper().raise_floor(loc).await?;
475 }
476 }
477
478 self.last_commit_loc = Location::new(
480 self.log
481 .append(&Operation::CommitFloor(metadata, self.inactivity_floor_loc))
482 .await?,
483 );
484
485 self.steps = 0;
486
487 let end_loc = self.size().await;
488 Ok(start_loc..end_loc)
489 }
490
491 pub async fn commit(&self) -> Result<(), Error> {
493 self.log.commit().await.map_err(Into::into)
494 }
495}
496
497impl<E, K, V, T> Persistable for Db<E, K, V, T>
498where
499 E: Context,
500 K: Array,
501 V: VariableValue,
502 T: Translator,
503{
504 type Error = Error;
505
506 async fn commit(&self) -> Result<(), Error> {
507 Self::commit(self).await
508 }
509
510 async fn sync(&self) -> Result<(), Error> {
511 self.sync().await
512 }
513
514 async fn destroy(self) -> Result<(), Error> {
515 self.destroy().await
516 }
517}
518
519#[cfg(test)]
520mod test {
521 use super::*;
522 use crate::translator::TwoCap;
523 use commonware_cryptography::{
524 blake3::{Blake3, Digest},
525 Hasher as _,
526 };
527 use commonware_macros::test_traced;
528 use commonware_math::algebra::Random;
529 use commonware_runtime::{buffer::paged::CacheRef, deterministic, Runner, Supervisor as _};
530 use commonware_utils::{NZUsize, NZU16, NZU64};
531 use std::num::{NonZeroU16, NonZeroUsize};
532
533 const PAGE_SIZE: NonZeroU16 = NZU16!(77);
534 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
535
536 type TestStore = Db<deterministic::Context, Digest, Vec<u8>, TwoCap>;
538
539 async fn create_test_store(context: deterministic::Context) -> TestStore {
540 let cfg = Config {
541 log: JournalConfig {
542 partition: "journal".into(),
543 write_buffer: NZUsize!(64 * 1024),
544 compression: None,
545 codec_config: ((), ((0..=10000).into(), ())),
546 items_per_section: NZU64!(7),
547 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
548 },
549 translator: TwoCap,
550 };
551 TestStore::init(context, cfg).await.unwrap()
552 }
553
554 async fn apply_entries(
555 db: &mut TestStore,
556 iter: impl IntoIterator<Item = (Digest, Option<Vec<u8>>)> + Send,
557 ) -> Range<Location> {
558 db.apply_batch(iter.into_iter().collect()).await.unwrap()
559 }
560
561 #[test_traced("DEBUG")]
562 pub fn test_store_construct_empty() {
563 let executor = deterministic::Runner::default();
564 executor.start(|mut context| async move {
565 let mut db = create_test_store(context.child("store").with_attribute("index", 0)).await;
566 assert_eq!(db.bounds().await.end, 1);
567 assert_eq!(db.log.bounds().await.start, 0);
568 assert_eq!(db.inactivity_floor_loc(), 0);
569 assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
570 assert!(matches!(
571 db.prune(Location::new(1)).await,
572 Err(Error::PruneBeyondMinRequired(_, _))
573 ));
574 assert!(db.get_metadata().await.unwrap().is_none());
575
576 let d1 = Digest::random(&mut context);
578 let v1 = vec![1, 2, 3];
579 apply_entries(&mut db, [(d1, Some(v1))]).await;
580 drop(db);
581
582 let mut db = create_test_store(context.child("store").with_attribute("index", 1)).await;
583 assert_eq!(db.bounds().await.end, 1);
584
585 let metadata = vec![1, 2, 3];
587 let batch = db.new_batch().finalize(Some(metadata.clone()));
588 let range = db.apply_batch(batch).await.unwrap();
589 assert_eq!(range.start, 1);
590 assert_eq!(range.end, 2);
591 db.commit().await.unwrap();
592 assert_eq!(db.bounds().await.end, 2);
593 assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
594 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
595
596 let mut db = create_test_store(context.child("store").with_attribute("index", 2)).await;
597 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
598
599 apply_entries(
602 &mut db,
603 [(Digest::random(&mut context), Some(vec![1, 2, 3]))],
604 )
605 .await;
606 db.commit().await.unwrap();
607 for _ in 1..100 {
608 db.apply_batch(db.new_batch().finalize(None)).await.unwrap();
609 db.commit().await.unwrap();
610 assert!(db.bounds().await.end - db.inactivity_floor_loc <= 3);
613 assert!(db.get_metadata().await.unwrap().is_none());
614 }
615
616 db.destroy().await.unwrap();
617 });
618 }
619
620 #[test_traced("DEBUG")]
621 fn test_store_construct_basic() {
622 let executor = deterministic::Runner::default();
623
624 executor.start(|mut ctx| async move {
625 let mut db = create_test_store(ctx.child("store").with_attribute("index", 0)).await;
626
627 assert_eq!(db.bounds().await.end, 1);
629 assert_eq!(db.inactivity_floor_loc, 0);
630
631 let key = Digest::random(&mut ctx);
632 let value = vec![2, 3, 4, 5];
633
634 let result = db.get(&key).await;
636 assert!(result.unwrap().is_none());
637
638 apply_entries(&mut db, [(key, Some(value.clone()))]).await;
641
642 assert_eq!(*db.bounds().await.end, 4);
643 assert_eq!(*db.inactivity_floor_loc, 2);
644
645 let fetched_value = db.get(&key).await.unwrap();
647 assert_eq!(fetched_value.unwrap(), value);
648
649 drop(db);
652
653 let mut db = create_test_store(ctx.child("store").with_attribute("index", 1)).await;
655
656 assert_eq!(*db.bounds().await.end, 1);
658 assert_eq!(*db.inactivity_floor_loc, 0);
659 assert!(db.get_metadata().await.unwrap().is_none());
660
661 let metadata = vec![99, 100];
663 let range = db
664 .apply_batch(
665 db.new_batch()
666 .update(key, value.clone())
667 .finalize(Some(metadata.clone())),
668 )
669 .await
670 .unwrap();
671 assert_eq!(*range.start, 1);
672 assert_eq!(*range.end, 4);
673 db.commit().await.unwrap();
674 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
675
676 assert_eq!(*db.bounds().await.end, 4);
677 assert_eq!(*db.inactivity_floor_loc, 2);
678
679 let mut db = create_test_store(ctx.child("store").with_attribute("index", 2)).await;
681
682 assert_eq!(*db.bounds().await.end, 4);
684 assert_eq!(*db.inactivity_floor_loc, 2);
685
686 let fetched_value = db.get(&key).await.unwrap();
688 assert_eq!(fetched_value.unwrap(), value);
689
690 let (k1, v1) = (Digest::random(&mut ctx), vec![2, 3, 4, 5, 6]);
692 let (k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8]);
693 apply_entries(&mut db, [(k1, Some(v1.clone()))]).await;
694 apply_entries(&mut db, [(k2, Some(v2.clone()))]).await;
695
696 assert_eq!(*db.bounds().await.end, 10);
697 assert_eq!(*db.inactivity_floor_loc, 5);
698
699 assert_eq!(db.get_metadata().await.unwrap(), None);
702
703 db.commit().await.unwrap();
704 assert_eq!(db.get_metadata().await.unwrap(), None);
705
706 assert_eq!(*db.bounds().await.end, 10);
708 assert_eq!(*db.inactivity_floor_loc, 5);
709
710 assert_eq!(db.get(&key).await.unwrap().unwrap(), value);
712 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
713 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
714
715 let mut v1_updated = db.get(&k1).await.unwrap().unwrap();
717 v1_updated.push(7);
718 apply_entries(&mut db, [(k1, Some(v1_updated))]).await;
719 db.commit().await.unwrap();
720 assert_eq!(db.get(&k1).await.unwrap().unwrap(), vec![2, 3, 4, 5, 6, 7]);
721
722 let k3 = Digest::random(&mut ctx);
724 apply_entries(&mut db, [(k3, Some(vec![8]))]).await;
725 db.commit().await.unwrap();
726 assert_eq!(db.get(&k3).await.unwrap().unwrap(), vec![8]);
727
728 db.destroy().await.unwrap();
730 });
731 }
732
733 #[test_traced("DEBUG")]
734 fn test_store_log_replay() {
735 let executor = deterministic::Runner::default();
736
737 executor.start(|mut ctx| async move {
738 let mut db = create_test_store(ctx.child("store").with_attribute("index", 0)).await;
739
740 const UPDATES: u64 = 100;
742 let k = Digest::random(&mut ctx);
743 for _ in 0..UPDATES {
744 let v = vec![1, 2, 3, 4, 5];
745 apply_entries(&mut db, [(k, Some(v.clone()))]).await;
746 }
747
748 let iter = db.snapshot.get(&k);
749 assert_eq!(iter.count(), 1);
750
751 db.commit().await.unwrap();
752 db.sync().await.unwrap();
753 drop(db);
754
755 let db = create_test_store(ctx.child("store").with_attribute("index", 1)).await;
757 db.prune(db.inactivity_floor_loc()).await.unwrap();
758
759 let iter = db.snapshot.get(&k);
760 assert_eq!(iter.count(), 1);
761
762 assert_eq!(*db.bounds().await.end, 400);
765 assert_eq!(*db.inactivity_floor_loc, 398);
767 let floor = db.inactivity_floor_loc;
768
769 assert_eq!(db.log.bounds().await.start, *floor - *floor % 7);
772
773 db.destroy().await.unwrap();
774 });
775 }
776
777 #[test_traced("DEBUG")]
778 fn test_store_build_snapshot_keys_with_shared_prefix() {
779 let executor = deterministic::Runner::default();
780
781 executor.start(|mut ctx| async move {
782 let mut db = create_test_store(ctx.child("store").with_attribute("index", 0)).await;
783
784 let (k1, v1) = (Digest::random(&mut ctx), vec![1, 2, 3, 4, 5]);
785 let (mut k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8, 9, 10]);
786
787 k2.0[0..2].copy_from_slice(&k1.0[0..2]);
789
790 apply_entries(&mut db, [(k1, Some(v1.clone()))]).await;
791 apply_entries(&mut db, [(k2, Some(v2.clone()))]).await;
792
793 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
794 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
795
796 db.commit().await.unwrap();
797 db.sync().await.unwrap();
798 drop(db);
799
800 let db = create_test_store(ctx.child("store").with_attribute("index", 1)).await;
803
804 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
805 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
806
807 db.destroy().await.unwrap();
808 });
809 }
810
811 #[test_traced("DEBUG")]
812 fn test_store_delete() {
813 let executor = deterministic::Runner::default();
814
815 executor.start(|mut ctx| async move {
816 let mut db = create_test_store(ctx.child("store").with_attribute("index", 0)).await;
817
818 let k = Digest::random(&mut ctx);
820 let v = vec![1, 2, 3, 4, 5];
821 apply_entries(&mut db, [(k, Some(v.clone()))]).await;
822 db.commit().await.unwrap();
823
824 let fetched_value = db.get(&k).await.unwrap();
826 assert_eq!(fetched_value.unwrap(), v);
827
828 assert!(db.get(&k).await.unwrap().is_some());
830 apply_entries(&mut db, [(k, None)]).await;
831
832 let fetched_value = db.get(&k).await.unwrap();
834 assert!(fetched_value.is_none());
835 assert!(db.get(&k).await.unwrap().is_none());
836
837 db.commit().await.unwrap();
839
840 let mut db = create_test_store(ctx.child("store").with_attribute("index", 1)).await;
842 let fetched_value = db.get(&k).await.unwrap();
843 assert!(fetched_value.is_none());
844
845 apply_entries(&mut db, [(k, Some(v.clone()))]).await;
847 let fetched_value = db.get(&k).await.unwrap();
848 assert_eq!(fetched_value.unwrap(), v);
849
850 db.commit().await.unwrap();
852
853 let mut db = create_test_store(ctx.child("store").with_attribute("index", 2)).await;
856 let fetched_value = db.get(&k).await.unwrap();
857 assert_eq!(fetched_value.unwrap(), v);
858
859 let k_n = Digest::random(&mut ctx);
861 let range = apply_entries(&mut db, [(k_n, None)]).await;
862 assert_eq!(range.start, 9);
863 assert_eq!(range.end, 11);
864 db.commit().await.unwrap();
865
866 assert!(db.get(&k_n).await.unwrap().is_none());
867 assert!(db.get(&k).await.unwrap().is_some());
869
870 db.destroy().await.unwrap();
871 });
872 }
873
874 #[test_traced("DEBUG")]
876 fn test_store_pruning() {
877 let executor = deterministic::Runner::default();
878
879 executor.start(|mut ctx| async move {
880 let mut db = create_test_store(ctx.child("store")).await;
881
882 let k_a = Digest::random(&mut ctx);
883 let k_b = Digest::random(&mut ctx);
884
885 let v_a = vec![1];
886 let v_b = vec![];
887 let v_c = vec![4, 5, 6];
888
889 apply_entries(&mut db, [(k_a, Some(v_a.clone()))]).await;
890 apply_entries(&mut db, [(k_b, Some(v_b.clone()))]).await;
891
892 db.commit().await.unwrap();
893 assert_eq!(*db.bounds().await.end, 7);
894 assert_eq!(*db.inactivity_floor_loc, 3);
895 assert_eq!(db.get(&k_a).await.unwrap().unwrap(), v_a);
896
897 apply_entries(&mut db, [(k_b, Some(v_a.clone()))]).await;
898 apply_entries(&mut db, [(k_a, Some(v_c.clone()))]).await;
899
900 db.commit().await.unwrap();
901 assert_eq!(*db.bounds().await.end, 15);
902 assert_eq!(*db.inactivity_floor_loc, 12);
903 assert_eq!(db.get(&k_a).await.unwrap().unwrap(), v_c);
904 assert_eq!(db.get(&k_b).await.unwrap().unwrap(), v_a);
905
906 db.destroy().await.unwrap();
907 });
908 }
909
910 #[test_traced("WARN")]
911 pub fn test_store_db_recovery() {
912 let executor = deterministic::Runner::default();
913 const ELEMENTS: u64 = 1000;
915 executor.start(|context| async move {
916 let db = create_test_store(context.child("store").with_attribute("index", 0)).await;
917
918 {
920 let mut batch = db.new_batch();
921 for i in 0u64..ELEMENTS {
922 let k = Blake3::hash(&i.to_be_bytes());
923 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
924 batch = batch.update(k, v);
925 }
926 }
928 drop(db);
929 let mut db = create_test_store(context.child("store").with_attribute("index", 1)).await;
930 assert_eq!(*db.bounds().await.end, 1);
931
932 for i in 0u64..ELEMENTS {
934 let k = Blake3::hash(&i.to_be_bytes());
935 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
936 apply_entries(&mut db, [(k, Some(v.clone()))]).await;
937 }
938 db.commit().await.unwrap();
939
940 for i in 0u64..ELEMENTS {
942 if i % 3 != 0 {
943 continue;
944 }
945 let k = Blake3::hash(&i.to_be_bytes());
946 let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
947 apply_entries(&mut db, [(k, Some(v.clone()))]).await;
948 }
949 db.commit().await.unwrap();
950 assert_eq!(db.snapshot.items(), 1000);
951
952 for i in 0u64..ELEMENTS {
954 if i % 7 != 1 {
955 continue;
956 }
957 let k = Blake3::hash(&i.to_be_bytes());
958 apply_entries(&mut db, [(k, None)]).await;
959 }
960 db.commit().await.unwrap();
961 let final_count = db.bounds().await.end;
962 let final_floor = db.inactivity_floor_loc;
963
964 db.sync().await.unwrap();
966 drop(db);
967 let db = create_test_store(context.child("store").with_attribute("index", 2)).await;
968 assert_eq!(db.bounds().await.end, final_count);
969 assert_eq!(db.inactivity_floor_loc, final_floor);
970
971 db.prune(db.inactivity_floor_loc()).await.unwrap();
972 assert_eq!(db.log.bounds().await.start, *final_floor - *final_floor % 7);
973 assert_eq!(db.snapshot.items(), 857);
974
975 db.destroy().await.unwrap();
976 });
977 }
978
979 #[test_traced("DEBUG")]
980 fn test_store_batch() {
981 let executor = deterministic::Runner::default();
982
983 executor.start(|mut ctx| async move {
984 let mut db = create_test_store(ctx.child("store").with_attribute("index", 0)).await;
985
986 assert_eq!(db.bounds().await.end, 1);
988 assert_eq!(db.inactivity_floor_loc, 0);
989
990 let key = Digest::random(&mut ctx);
991 let value = vec![2, 3, 4, 5];
992
993 let batch = db.new_batch();
994
995 let result = batch.get(&key).await;
997 assert!(result.unwrap().is_none());
998
999 let batch = batch.update(key, value.clone());
1001
1002 assert_eq!(db.bounds().await.end, 1); assert_eq!(db.inactivity_floor_loc, 0);
1004
1005 let fetched_value = batch.get(&key).await.unwrap();
1007 assert_eq!(fetched_value.unwrap(), value);
1008 db.apply_batch(batch.finalize(None)).await.unwrap();
1009 drop(db);
1010
1011 let mut db = create_test_store(ctx.child("store").with_attribute("index", 1)).await;
1013
1014 assert_eq!(db.bounds().await.end, 1);
1016 assert_eq!(db.inactivity_floor_loc, 0);
1017 assert!(db.get_metadata().await.unwrap().is_none());
1018
1019 let metadata = vec![99, 100];
1021 let range = db
1022 .apply_batch(
1023 db.new_batch()
1024 .update(key, value.clone())
1025 .finalize(Some(metadata.clone())),
1026 )
1027 .await
1028 .unwrap();
1029 assert_eq!(range.start, 1);
1030 assert_eq!(range.end, 4);
1031 db.commit().await.unwrap();
1032 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
1033 drop(db);
1034
1035 let db = create_test_store(ctx.child("store").with_attribute("index", 2)).await;
1037
1038 assert_eq!(db.bounds().await.end, 4);
1040 assert_eq!(db.inactivity_floor_loc, 2);
1041
1042 let fetched_value = db.get(&key).await.unwrap();
1044 assert_eq!(fetched_value.unwrap(), value);
1045
1046 db.destroy().await.unwrap();
1048 });
1049 }
1050
1051 fn is_send<T: Send>(_: T) {}
1052
1053 #[allow(dead_code)]
1054 fn assert_read_futures_are_send(db: &mut TestStore, key: Digest, loc: Location) {
1055 is_send(db.get(&key));
1056 is_send(db.get_metadata());
1057 is_send(db.prune(loc));
1058 is_send(db.sync());
1059 }
1060
1061 #[allow(dead_code)]
1062 fn assert_write_futures_are_send(
1063 db: &mut Db<deterministic::Context, Digest, Vec<u8>, TwoCap>,
1064 key: Digest,
1065 value: Vec<u8>,
1066 ) {
1067 is_send(db.get(&key));
1068 is_send(db.apply_batch(Changeset::from([(key, Some(value))])));
1069 is_send(db.apply_batch(Changeset::from([(key, None)])));
1070 let batch = db.new_batch();
1071 is_send(batch.get(&key));
1072 }
1073
1074 #[allow(dead_code)]
1075 fn assert_commit_is_send(db: &Db<deterministic::Context, Digest, Vec<u8>, TwoCap>) {
1076 is_send(db.commit());
1077 }
1078}