1use crate::{
64 index::{unordered::Index, Unordered as _},
65 journal::contiguous::{
66 variable::{Config as JournalConfig, Journal},
67 Mutable as _, Reader,
68 },
69 kv::Batchable,
70 mmr::Location,
71 qmdb::{
72 any::{
73 unordered::{variable::Operation, Update},
74 VariableValue,
75 },
76 build_snapshot_from_log, delete_key,
77 operation::{Committable as _, Operation as _},
78 store::{LogStore, PrunableStore},
79 update_key, Error, FloorHelper,
80 },
81 translator::Translator,
82 Persistable,
83};
84use commonware_codec::Read;
85use commonware_runtime::{buffer::paged::CacheRef, Clock, Metrics, Storage};
86use commonware_utils::Array;
87use core::ops::Range;
88use std::num::{NonZeroU64, NonZeroUsize};
89use tracing::{debug, warn};
90
91#[derive(Clone)]
93pub struct Config<T: Translator, C> {
94 pub log_partition: String,
96
97 pub log_write_buffer: NonZeroUsize,
99
100 pub log_compression: Option<u8>,
102
103 pub log_codec_config: C,
105
106 pub log_items_per_section: NonZeroU64,
108
109 pub translator: T,
111
112 pub page_cache: CacheRef,
114}
115
116pub struct Db<E, K, V, T>
118where
119 E: Storage + Clock + Metrics,
120 K: Array,
121 V: VariableValue,
122 T: Translator,
123{
124 log: Journal<E, Operation<K, V>>,
131
132 snapshot: Index<T, Location>,
139
140 active_keys: usize,
142
143 pub inactivity_floor_loc: Location,
146
147 pub last_commit_loc: Location,
149
150 pub steps: u64,
153}
154
155impl<E, K, V, T> Db<E, K, V, T>
156where
157 E: Storage + Clock + Metrics,
158 K: Array,
159 V: VariableValue,
160 T: Translator,
161{
162 pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
164 for &loc in self.snapshot.get(key) {
165 let Operation::Update(Update(k, v)) = self.get_op(loc).await? else {
166 unreachable!("location ({loc}) does not reference update operation");
167 };
168
169 if &k == key {
170 return Ok(Some(v));
171 }
172 }
173
174 Ok(None)
175 }
176
177 pub const fn is_empty(&self) -> bool {
179 self.active_keys == 0
180 }
181
182 async fn get_op(&self, loc: Location) -> Result<Operation<K, V>, Error> {
186 let reader = self.log.reader().await;
187 assert!(*loc < reader.bounds().end);
188 reader.read(*loc).await.map_err(|e| match e {
189 crate::journal::Error::ItemPruned(_) => Error::OperationPruned(loc),
190 e => Error::Journal(e),
191 })
192 }
193
194 pub async fn bounds(&self) -> std::ops::Range<Location> {
197 let bounds = self.log.reader().await.bounds();
198 Location::new(bounds.start)..Location::new(bounds.end)
199 }
200
201 pub async fn size(&self) -> Location {
203 Location::new(self.log.size().await)
204 }
205
206 pub const fn inactivity_floor_loc(&self) -> Location {
209 self.inactivity_floor_loc
210 }
211
212 pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
214 let Operation::CommitFloor(metadata, _) =
215 self.log.reader().await.read(*self.last_commit_loc).await?
216 else {
217 unreachable!("last commit should be a commit floor operation");
218 };
219
220 Ok(metadata)
221 }
222
223 pub async fn prune(&self, prune_loc: Location) -> Result<(), Error> {
226 if prune_loc > self.inactivity_floor_loc {
227 return Err(Error::PruneBeyondMinRequired(
228 prune_loc,
229 self.inactivity_floor_loc,
230 ));
231 }
232
233 if !self.log.prune(*prune_loc).await? {
236 return Ok(());
237 }
238
239 let bounds = self.log.reader().await.bounds();
240 let log_size = Location::new(bounds.end);
241 let oldest_retained_loc = Location::new(bounds.start);
242 debug!(
243 ?log_size,
244 ?oldest_retained_loc,
245 ?prune_loc,
246 "pruned inactive ops"
247 );
248
249 Ok(())
250 }
251
252 pub async fn init(
254 context: E,
255 cfg: Config<T, <Operation<K, V> as Read>::Cfg>,
256 ) -> Result<Self, Error> {
257 let mut log = Journal::<E, Operation<K, V>>::init(
258 context.with_label("log"),
259 JournalConfig {
260 partition: cfg.log_partition,
261 items_per_section: cfg.log_items_per_section,
262 compression: cfg.log_compression,
263 codec_config: cfg.log_codec_config,
264 page_cache: cfg.page_cache,
265 write_buffer: cfg.log_write_buffer,
266 },
267 )
268 .await?;
269
270 if log.rewind_to(|op| op.is_commit()).await? == 0 {
272 warn!("Log is empty, initializing new db");
273 log.append(&Operation::CommitFloor(None, Location::new(0)))
274 .await?;
275 }
276
277 log.sync().await?;
280
281 let last_commit_loc = Location::new(
282 log.size()
283 .await
284 .checked_sub(1)
285 .expect("commit should exist"),
286 );
287
288 let mut snapshot = Index::new(context.with_label("snapshot"), cfg.translator);
290 let (inactivity_floor_loc, active_keys) = {
291 let reader = log.reader().await;
292 let op = reader.read(*last_commit_loc).await?;
293 let inactivity_floor_loc = op.has_floor().expect("last op should be a commit");
294 let active_keys =
295 build_snapshot_from_log(inactivity_floor_loc, &reader, &mut snapshot, |_, _| {})
296 .await?;
297 (inactivity_floor_loc, active_keys)
298 };
299
300 Ok(Self {
301 log,
302 snapshot,
303 active_keys,
304 inactivity_floor_loc,
305 last_commit_loc,
306 steps: 0,
307 })
308 }
309
310 pub async fn sync(&self) -> Result<(), Error> {
314 self.log.sync().await.map_err(Into::into)
315 }
316
317 pub async fn destroy(self) -> Result<(), Error> {
319 self.log.destroy().await.map_err(Into::into)
320 }
321
322 const fn as_floor_helper(
323 &mut self,
324 ) -> FloorHelper<'_, Index<T, Location>, Journal<E, Operation<K, V>>> {
325 FloorHelper {
326 snapshot: &mut self.snapshot,
327 log: &mut self.log,
328 }
329 }
330
331 pub async fn write_batch(
337 &mut self,
338 iter: impl IntoIterator<Item = (K, Option<V>)> + Send,
339 ) -> Result<(), Error> {
340 for (key, value) in iter {
341 if let Some(value) = value {
342 let updated = {
343 let reader = self.log.reader().await;
344 let new_loc = reader.bounds().end;
345 update_key(&mut self.snapshot, &reader, &key, Location::new(new_loc)).await?
346 };
347 if updated.is_some() {
348 self.steps += 1;
349 } else {
350 self.active_keys += 1;
351 }
352 self.log
353 .append(&Operation::Update(Update(key, value)))
354 .await?;
355 } else {
356 let deleted = {
357 let reader = self.log.reader().await;
358 delete_key(&mut self.snapshot, &reader, &key).await?
359 };
360 if deleted.is_some() {
361 self.log.append(&Operation::Delete(key)).await?;
362 self.steps += 1;
363 self.active_keys -= 1;
364 }
365 }
366 }
367 Ok(())
368 }
369
370 pub async fn commit(&mut self, metadata: Option<V>) -> Result<Range<Location>, Error> {
382 let start_loc = self.last_commit_loc + 1;
383
384 if self.is_empty() {
387 self.inactivity_floor_loc = self.size().await;
388 debug!(tip = ?self.inactivity_floor_loc, "db is empty, raising floor to tip");
389 } else {
390 let steps_to_take = self.steps + 1;
391 for _ in 0..steps_to_take {
392 let loc = self.inactivity_floor_loc;
393 self.inactivity_floor_loc = self.as_floor_helper().raise_floor(loc).await?;
394 }
395 }
396
397 self.last_commit_loc = Location::new(
399 self.log
400 .append(&Operation::CommitFloor(metadata, self.inactivity_floor_loc))
401 .await?,
402 );
403
404 let range = start_loc..self.size().await;
405
406 self.log.commit().await?;
408
409 self.steps = 0;
410
411 Ok(range)
412 }
413}
414
415impl<E, K, V, T> Persistable for Db<E, K, V, T>
416where
417 E: Storage + Clock + Metrics,
418 K: Array,
419 V: VariableValue,
420 T: Translator,
421{
422 type Error = Error;
423
424 async fn commit(&self) -> Result<(), Error> {
425 Ok(())
427 }
428
429 async fn sync(&self) -> Result<(), Error> {
430 self.sync().await
431 }
432
433 async fn destroy(self) -> Result<(), Error> {
434 self.destroy().await
435 }
436}
437
438impl<E, K, V, T> LogStore for Db<E, K, V, T>
439where
440 E: Storage + Clock + Metrics,
441 K: Array,
442 V: VariableValue,
443 T: Translator,
444{
445 type Value = V;
446
447 async fn bounds(&self) -> std::ops::Range<Location> {
448 self.bounds().await
449 }
450
451 async fn get_metadata(&self) -> Result<Option<V>, Error> {
452 self.get_metadata().await
453 }
454}
455
456impl<E, K, V, T> PrunableStore for Db<E, K, V, T>
457where
458 E: Storage + Clock + Metrics,
459 K: Array,
460 V: VariableValue,
461 T: Translator,
462{
463 async fn prune(&mut self, prune_loc: Location) -> Result<(), Error> {
464 Self::prune(self, prune_loc).await
465 }
466
467 async fn inactivity_floor_loc(&self) -> Location {
468 self.inactivity_floor_loc()
469 }
470}
471
472impl<E, K, V, T> crate::kv::Gettable for Db<E, K, V, T>
473where
474 E: Storage + Clock + Metrics,
475 K: Array,
476 V: VariableValue,
477 T: Translator,
478{
479 type Key = K;
480 type Value = V;
481 type Error = Error;
482
483 async fn get(&self, key: &Self::Key) -> Result<Option<Self::Value>, Self::Error> {
484 self.get(key).await
485 }
486}
487
488impl<E, K, V, T> Batchable for Db<E, K, V, T>
489where
490 E: Storage + Clock + Metrics,
491 K: Array,
492 V: VariableValue,
493 T: Translator,
494{
495 async fn write_batch<'a, Iter>(&'a mut self, iter: Iter) -> Result<(), Self::Error>
496 where
497 Iter: IntoIterator<Item = (Self::Key, Option<Self::Value>)> + Send + 'a,
498 Iter::IntoIter: Send,
499 {
500 self.write_batch(iter).await
501 }
502}
503
504#[cfg(test)]
505mod test {
506 use super::*;
507 use crate::{
508 kv::{
509 tests::{assert_batchable, assert_gettable, assert_send},
510 Gettable as _, Updatable as _,
511 },
512 qmdb::store::tests::{assert_log_store, assert_prunable_store},
513 translator::TwoCap,
514 };
515 use commonware_cryptography::{
516 blake3::{Blake3, Digest},
517 Hasher as _,
518 };
519 use commonware_macros::test_traced;
520 use commonware_math::algebra::Random;
521 use commonware_runtime::{deterministic, Runner};
522 use commonware_utils::{NZUsize, NZU16, NZU64};
523 use std::num::NonZeroU16;
524
525 const PAGE_SIZE: NonZeroU16 = NZU16!(77);
526 const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
527
528 type TestStore = Db<deterministic::Context, Digest, Vec<u8>, TwoCap>;
530
531 async fn create_test_store(context: deterministic::Context) -> TestStore {
532 let cfg = Config {
533 log_partition: "journal".into(),
534 log_write_buffer: NZUsize!(64 * 1024),
535 log_compression: None,
536 log_codec_config: ((), ((0..=10000).into(), ())),
537 log_items_per_section: NZU64!(7),
538 translator: TwoCap,
539 page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
540 };
541 TestStore::init(context, cfg).await.unwrap()
542 }
543
544 #[test_traced("DEBUG")]
545 pub fn test_store_construct_empty() {
546 let executor = deterministic::Runner::default();
547 executor.start(|mut context| async move {
548 let mut db = create_test_store(context.with_label("store_0")).await;
549 assert_eq!(db.bounds().await.end, 1);
550 assert_eq!(db.log.bounds().await.start, 0);
551 assert_eq!(db.inactivity_floor_loc(), 0);
552 assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
553 assert!(matches!(
554 db.prune(Location::new(1)).await,
555 Err(Error::PruneBeyondMinRequired(_, _))
556 ));
557 assert!(db.get_metadata().await.unwrap().is_none());
558
559 let d1 = Digest::random(&mut context);
561 let v1 = vec![1, 2, 3];
562 db.write_batch([(d1, Some(v1))]).await.unwrap();
563 drop(db);
564
565 let mut db = create_test_store(context.with_label("store_1")).await;
566 assert_eq!(db.bounds().await.end, 1);
567
568 let metadata = vec![1, 2, 3];
570 let range = Db::commit(&mut db, Some(metadata.clone())).await.unwrap();
571 assert_eq!(range.start, 1);
572 assert_eq!(range.end, 2);
573 assert_eq!(db.bounds().await.end, 2);
574 assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
575 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
576
577 let mut db = create_test_store(context.with_label("store_2")).await;
578 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
579
580 db.write_batch([(Digest::random(&mut context), Some(vec![1, 2, 3]))])
583 .await
584 .unwrap();
585 Db::commit(&mut db, None).await.unwrap();
586 for _ in 1..100 {
587 Db::commit(&mut db, None).await.unwrap();
588 assert!(db.bounds().await.end - db.inactivity_floor_loc <= 3);
591 assert!(db.get_metadata().await.unwrap().is_none());
592 }
593
594 db.destroy().await.unwrap();
595 });
596 }
597
598 #[test_traced("DEBUG")]
599 fn test_store_construct_basic() {
600 let executor = deterministic::Runner::default();
601
602 executor.start(|mut ctx| async move {
603 let mut db = create_test_store(ctx.with_label("store_0")).await;
604
605 assert_eq!(db.bounds().await.end, 1);
607 assert_eq!(db.inactivity_floor_loc, 0);
608
609 let key = Digest::random(&mut ctx);
610 let value = vec![2, 3, 4, 5];
611
612 let result = db.get(&key).await;
614 assert!(result.unwrap().is_none());
615
616 db.write_batch([(key, Some(value.clone()))]).await.unwrap();
618
619 assert_eq!(db.bounds().await.end, 2);
620 assert_eq!(db.inactivity_floor_loc, 0);
621
622 let fetched_value = db.get(&key).await.unwrap();
624 assert_eq!(fetched_value.unwrap(), value);
625
626 drop(db);
628
629 let mut db = create_test_store(ctx.with_label("store_1")).await;
631
632 assert_eq!(db.bounds().await.end, 1);
634 assert_eq!(db.inactivity_floor_loc, 0);
635 assert!(db.get_metadata().await.unwrap().is_none());
636
637 db.write_batch([(key, Some(value.clone()))]).await.unwrap();
639
640 assert_eq!(db.bounds().await.end, 2);
641 assert_eq!(db.inactivity_floor_loc, 0);
642
643 let metadata = vec![99, 100];
645 let range = Db::commit(&mut db, Some(metadata.clone())).await.unwrap();
646 assert_eq!(range.start, 1);
647 assert_eq!(range.end, 4);
648 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
649
650 assert_eq!(db.bounds().await.end, 4);
654 assert_eq!(db.inactivity_floor_loc, 2);
655
656 let mut db = create_test_store(ctx.with_label("store_2")).await;
658
659 assert_eq!(db.bounds().await.end, 4);
661 assert_eq!(db.inactivity_floor_loc, 2);
662
663 let fetched_value = db.get(&key).await.unwrap();
665 assert_eq!(fetched_value.unwrap(), value);
666
667 let (k1, v1) = (Digest::random(&mut ctx), vec![2, 3, 4, 5, 6]);
669 let (k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8]);
670 db.write_batch([(k1, Some(v1.clone()))]).await.unwrap();
671 db.write_batch([(k2, Some(v2.clone()))]).await.unwrap();
672
673 assert_eq!(db.bounds().await.end, 6);
674 assert_eq!(db.inactivity_floor_loc, 2);
675
676 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
678
679 let range = Db::commit(&mut db, None).await.unwrap();
680 assert_eq!(range.start, 4);
681 assert_eq!(range.end, db.bounds().await.end);
682 assert_eq!(db.get_metadata().await.unwrap(), None);
683
684 assert_eq!(db.bounds().await.end, 8);
685 assert_eq!(db.inactivity_floor_loc, 3);
686
687 assert_eq!(db.get(&key).await.unwrap().unwrap(), value);
689 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
690 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
691
692 let mut v1_updated = db.get(&k1).await.unwrap().unwrap();
694 v1_updated.push(7);
695 db.write_batch([(k1, Some(v1_updated))]).await.unwrap();
696 Db::commit(&mut db, None).await.unwrap();
697 assert_eq!(db.get(&k1).await.unwrap().unwrap(), vec![2, 3, 4, 5, 6, 7]);
698
699 let k3 = Digest::random(&mut ctx);
701 db.write_batch([(k3, Some(vec![8]))]).await.unwrap();
702 Db::commit(&mut db, None).await.unwrap();
703 assert_eq!(db.get(&k3).await.unwrap().unwrap(), vec![8]);
704
705 db.destroy().await.unwrap();
707 });
708 }
709
710 #[test_traced("DEBUG")]
711 fn test_store_log_replay() {
712 let executor = deterministic::Runner::default();
713
714 executor.start(|mut ctx| async move {
715 let mut db = create_test_store(ctx.with_label("store_0")).await;
716
717 const UPDATES: u64 = 100;
719 let k = Digest::random(&mut ctx);
720 for _ in 0..UPDATES {
721 let v = vec![1, 2, 3, 4, 5];
722 db.write_batch([(k, Some(v.clone()))]).await.unwrap();
723 }
724
725 let iter = db.snapshot.get(&k);
726 assert_eq!(iter.count(), 1);
727
728 Db::commit(&mut db, None).await.unwrap();
729 db.sync().await.unwrap();
730 drop(db);
731
732 let db = create_test_store(ctx.with_label("store_1")).await;
734 db.prune(db.inactivity_floor_loc()).await.unwrap();
735
736 let iter = db.snapshot.get(&k);
737 assert_eq!(iter.count(), 1);
738
739 assert_eq!(db.bounds().await.end, UPDATES * 2 + 2);
741 let expected_floor = UPDATES * 2;
743 assert_eq!(db.inactivity_floor_loc, expected_floor);
744
745 assert_eq!(
748 db.log.bounds().await.start,
749 expected_floor - expected_floor % 7
750 );
751
752 db.destroy().await.unwrap();
753 });
754 }
755
756 #[test_traced("DEBUG")]
757 fn test_store_build_snapshot_keys_with_shared_prefix() {
758 let executor = deterministic::Runner::default();
759
760 executor.start(|mut ctx| async move {
761 let mut db = create_test_store(ctx.with_label("store_0")).await;
762
763 let (k1, v1) = (Digest::random(&mut ctx), vec![1, 2, 3, 4, 5]);
764 let (mut k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8, 9, 10]);
765
766 k2.0[0..2].copy_from_slice(&k1.0[0..2]);
768
769 db.write_batch([(k1, Some(v1.clone()))]).await.unwrap();
770 db.write_batch([(k2, Some(v2.clone()))]).await.unwrap();
771
772 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
773 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
774
775 Db::commit(&mut db, None).await.unwrap();
776 db.sync().await.unwrap();
777 drop(db);
778
779 let db = create_test_store(ctx.with_label("store_1")).await;
782
783 assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
784 assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
785
786 db.destroy().await.unwrap();
787 });
788 }
789
790 #[test_traced("DEBUG")]
791 fn test_store_delete() {
792 let executor = deterministic::Runner::default();
793
794 executor.start(|mut ctx| async move {
795 let mut db = create_test_store(ctx.with_label("store_0")).await;
796
797 let k = Digest::random(&mut ctx);
799 let v = vec![1, 2, 3, 4, 5];
800 db.write_batch([(k, Some(v.clone()))]).await.unwrap();
801 Db::commit(&mut db, None).await.unwrap();
802
803 let fetched_value = db.get(&k).await.unwrap();
805 assert_eq!(fetched_value.unwrap(), v);
806
807 assert!(db.get(&k).await.unwrap().is_some());
809 db.write_batch([(k, None)]).await.unwrap();
810
811 let fetched_value = db.get(&k).await.unwrap();
813 assert!(fetched_value.is_none());
814 assert!(db.get(&k).await.unwrap().is_none());
815
816 Db::commit(&mut db, None).await.unwrap();
818
819 let mut db = create_test_store(ctx.with_label("store_1")).await;
821 let fetched_value = db.get(&k).await.unwrap();
822 assert!(fetched_value.is_none());
823
824 db.write_batch([(k, Some(v.clone()))]).await.unwrap();
826 let fetched_value = db.get(&k).await.unwrap();
827 assert_eq!(fetched_value.unwrap(), v);
828
829 Db::commit(&mut db, None).await.unwrap();
831
832 let mut db = create_test_store(ctx.with_label("store_2")).await;
835 let fetched_value = db.get(&k).await.unwrap();
836 assert_eq!(fetched_value.unwrap(), v);
837
838 let k_n = Digest::random(&mut ctx);
840 db.write_batch([(k_n, None)]).await.unwrap();
841
842 let range = Db::commit(&mut db, None).await.unwrap();
843 assert_eq!(range.start, 9);
844 assert_eq!(range.end, 11);
845
846 assert!(db.get(&k_n).await.unwrap().is_none());
847 assert!(db.get(&k).await.unwrap().is_some());
849
850 db.destroy().await.unwrap();
851 });
852 }
853
854 #[test_traced("DEBUG")]
856 fn test_store_pruning() {
857 let executor = deterministic::Runner::default();
858
859 executor.start(|mut ctx| async move {
860 let mut db = create_test_store(ctx.with_label("store")).await;
861
862 let k_a = Digest::random(&mut ctx);
863 let k_b = Digest::random(&mut ctx);
864
865 let v_a = vec![1];
866 let v_b = vec![];
867 let v_c = vec![4, 5, 6];
868
869 db.write_batch([(k_a, Some(v_a.clone()))]).await.unwrap();
870 db.write_batch([(k_b, Some(v_b.clone()))]).await.unwrap();
871
872 Db::commit(&mut db, None).await.unwrap();
873 assert_eq!(db.bounds().await.end, 5);
874 assert_eq!(db.inactivity_floor_loc, 2);
875 assert_eq!(db.get(&k_a).await.unwrap().unwrap(), v_a);
876
877 db.write_batch([(k_b, Some(v_a.clone()))]).await.unwrap();
878 db.write_batch([(k_a, Some(v_c.clone()))]).await.unwrap();
879
880 Db::commit(&mut db, None).await.unwrap();
881 assert_eq!(db.bounds().await.end, 11);
882 assert_eq!(db.inactivity_floor_loc, 8);
883 assert_eq!(db.get(&k_a).await.unwrap().unwrap(), v_c);
884 assert_eq!(db.get(&k_b).await.unwrap().unwrap(), v_a);
885
886 db.destroy().await.unwrap();
887 });
888 }
889
890 #[test_traced("WARN")]
891 pub fn test_store_db_recovery() {
892 let executor = deterministic::Runner::default();
893 const ELEMENTS: u64 = 1000;
895 executor.start(|context| async move {
896 let mut db = create_test_store(context.with_label("store_0")).await;
897
898 for i in 0u64..ELEMENTS {
899 let k = Blake3::hash(&i.to_be_bytes());
900 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
901 db.write_batch([(k, Some(v.clone()))]).await.unwrap();
902 }
903
904 drop(db);
906 let mut db = create_test_store(context.with_label("store_1")).await;
907 assert_eq!(db.bounds().await.end, 1);
908
909 for i in 0u64..ELEMENTS {
911 let k = Blake3::hash(&i.to_be_bytes());
912 let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
913 db.write_batch([(k, Some(v.clone()))]).await.unwrap();
914 }
915 Db::commit(&mut db, None).await.unwrap();
916 let op_count = db.bounds().await.end;
917
918 for i in 0u64..ELEMENTS {
920 if i % 3 != 0 {
921 continue;
922 }
923 let k = Blake3::hash(&i.to_be_bytes());
924 let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
925 db.write_batch([(k, Some(v.clone()))]).await.unwrap();
926 }
927
928 drop(db);
930 let mut db = create_test_store(context.with_label("store_2")).await;
931 assert_eq!(db.bounds().await.end, op_count);
932
933 for i in 0u64..ELEMENTS {
935 if i % 3 != 0 {
936 continue;
937 }
938 let k = Blake3::hash(&i.to_be_bytes());
939 let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
940 db.write_batch([(k, Some(v.clone()))]).await.unwrap();
941 }
942 Db::commit(&mut db, None).await.unwrap();
943 let op_count = db.bounds().await.end;
944 assert_eq!(op_count, 1673);
945 assert_eq!(db.snapshot.items(), 1000);
946
947 for i in 0u64..ELEMENTS {
949 if i % 7 != 1 {
950 continue;
951 }
952 let k = Blake3::hash(&i.to_be_bytes());
953 db.write_batch([(k, None)]).await.unwrap();
954 }
955
956 drop(db);
958 let db = create_test_store(context.with_label("store_3")).await;
959 assert_eq!(db.bounds().await.end, op_count);
960
961 db.sync().await.unwrap();
963 drop(db);
964 let mut db = create_test_store(context.with_label("store_4")).await;
965 assert_eq!(db.bounds().await.end, op_count);
966
967 for i in 0u64..ELEMENTS {
969 if i % 7 != 1 {
970 continue;
971 }
972 let k = Blake3::hash(&i.to_be_bytes());
973 db.write_batch([(k, None)]).await.unwrap();
974 }
975 Db::commit(&mut db, None).await.unwrap();
976
977 assert_eq!(db.bounds().await.end, 1961);
978 assert_eq!(db.inactivity_floor_loc, 756);
979
980 db.prune(db.inactivity_floor_loc()).await.unwrap();
981 assert_eq!(db.log.bounds().await.start, 756 );
982 assert_eq!(db.snapshot.items(), 857);
983
984 db.destroy().await.unwrap();
985 });
986 }
987
988 #[test_traced("DEBUG")]
989 fn test_store_batchable() {
990 let executor = deterministic::Runner::default();
991
992 executor.start(|mut ctx| async move {
993 let mut db = create_test_store(ctx.with_label("store_0")).await;
994
995 assert_eq!(db.bounds().await.end, 1);
997 assert_eq!(db.inactivity_floor_loc, 0);
998
999 let key = Digest::random(&mut ctx);
1000 let value = vec![2, 3, 4, 5];
1001
1002 let mut batch = db.new_batch();
1003
1004 let result = batch.get(&key).await;
1006 assert!(result.unwrap().is_none());
1007
1008 batch.update(key, value.clone()).await.unwrap();
1010
1011 assert_eq!(db.bounds().await.end, 1); assert_eq!(db.inactivity_floor_loc, 0);
1013
1014 let fetched_value = batch.get(&key).await.unwrap();
1016 assert_eq!(fetched_value.unwrap(), value);
1017 db.write_batch(batch.into_iter()).await.unwrap();
1018 drop(db);
1019
1020 let mut db = create_test_store(ctx.with_label("store_1")).await;
1022
1023 assert_eq!(db.bounds().await.end, 1);
1025 assert_eq!(db.inactivity_floor_loc, 0);
1026 assert!(db.get_metadata().await.unwrap().is_none());
1027
1028 let mut batch = db.new_batch();
1030 batch.update(key, value.clone()).await.unwrap();
1031
1032 db.write_batch(batch.into_iter()).await.unwrap();
1034 assert_eq!(db.bounds().await.end, 2);
1035 assert_eq!(db.inactivity_floor_loc, 0);
1036 let metadata = vec![99, 100];
1037 let range = Db::commit(&mut db, Some(metadata.clone())).await.unwrap();
1038 assert_eq!(range.start, 1);
1039 assert_eq!(range.end, 4);
1040 assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
1041 drop(db);
1042
1043 let db = create_test_store(ctx.with_label("store_2")).await;
1045
1046 assert_eq!(db.bounds().await.end, 4);
1048 assert_eq!(db.inactivity_floor_loc, 2);
1049
1050 let fetched_value = db.get(&key).await.unwrap();
1052 assert_eq!(fetched_value.unwrap(), value);
1053
1054 db.destroy().await.unwrap();
1056 });
1057 }
1058
1059 #[allow(dead_code)]
1060 fn assert_read_futures_are_send(db: &mut TestStore, key: Digest, loc: Location) {
1061 assert_log_store(db);
1062 assert_prunable_store(db, loc);
1063 assert_gettable(db, &key);
1064 assert_send(db.sync());
1065 }
1066
1067 #[allow(dead_code)]
1068 fn assert_write_futures_are_send(
1069 db: &mut Db<deterministic::Context, Digest, Vec<u8>, TwoCap>,
1070 key: Digest,
1071 value: Vec<u8>,
1072 ) {
1073 assert_log_store(db);
1074 assert_gettable(db, &key);
1075 assert_send(db.write_batch([(key, Some(value.clone()))]));
1076 assert_send(db.write_batch([(key, None)]));
1077 assert_batchable(db, key, value);
1078 }
1079
1080 #[allow(dead_code)]
1081 fn assert_commit_is_send(db: &mut Db<deterministic::Context, Digest, Vec<u8>, TwoCap>) {
1082 assert_send(Db::commit(db, None));
1083 }
1084}