use crate::{
index::{unordered::Index, Unordered as _},
journal::contiguous::{
variable::{Config as JournalConfig, Journal},
Mutable as _, Reader,
},
merkle::mmr::Location,
qmdb::{
any::{
unordered::{variable::Operation, Update},
VariableValue,
},
build_snapshot_from_log, delete_key,
operation::{Committable as _, Key, Operation as _},
update_key, FloorHelper,
},
translator::Translator,
Context, Persistable,
};
use commonware_codec::{CodecShared, Read};
use commonware_utils::Array;
use core::ops::Range;
use std::collections::BTreeMap;
use tracing::{debug, warn};
type Error = crate::qmdb::Error<crate::mmr::Family>;
#[derive(Clone)]
pub struct Config<T: Translator, C> {
pub log: JournalConfig<C>,
pub translator: T,
}
pub struct Changeset<K: Key, V: CodecShared + Clone> {
diff: BTreeMap<K, Option<V>>,
metadata: Option<V>,
}
impl<K: Key, V: CodecShared + Clone> Changeset<K, V> {
fn into_parts(self) -> (BTreeMap<K, Option<V>>, Option<V>) {
(self.diff, self.metadata)
}
}
impl<K: Key, V: CodecShared + Clone> FromIterator<(K, Option<V>)> for Changeset<K, V> {
fn from_iter<TIter: IntoIterator<Item = (K, Option<V>)>>(iter: TIter) -> Self {
Self {
diff: iter.into_iter().collect(),
metadata: None,
}
}
}
impl<K: Key, V: CodecShared + Clone, const N: usize> From<[(K, Option<V>); N]> for Changeset<K, V> {
fn from(items: [(K, Option<V>); N]) -> Self {
items.into_iter().collect()
}
}
pub struct Batch<'a, E, K, V, T>
where
E: Context,
K: Array,
V: VariableValue,
T: Translator,
{
db: &'a Db<E, K, V, T>,
diff: BTreeMap<K, Option<V>>,
}
impl<'a, E, K, V, T> Batch<'a, E, K, V, T>
where
E: Context,
K: Array,
V: VariableValue,
T: Translator,
{
const fn new(db: &'a Db<E, K, V, T>) -> Self {
Self {
db,
diff: BTreeMap::new(),
}
}
pub fn finalize(self, metadata: Option<V>) -> Changeset<K, V> {
Changeset {
diff: self.diff,
metadata,
}
}
pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
if let Some(value) = self.diff.get(key) {
return Ok(value.clone());
}
self.db.get(key).await
}
pub fn update(mut self, key: K, value: V) -> Self {
self.diff.insert(key, Some(value));
self
}
pub fn delete(mut self, key: K) -> Self {
self.diff.insert(key, None);
self
}
}
pub struct Db<E, K, V, T>
where
E: Context,
K: Array,
V: VariableValue,
T: Translator,
{
log: Journal<E, Operation<crate::mmr::Family, K, V>>,
snapshot: Index<T, Location>,
active_keys: usize,
pub inactivity_floor_loc: Location,
pub last_commit_loc: Location,
pub steps: u64,
}
impl<E, K, V, T> Db<E, K, V, T>
where
E: Context,
K: Array,
V: VariableValue,
T: Translator,
{
pub async fn get(&self, key: &K) -> Result<Option<V>, Error> {
for &loc in self.snapshot.get(key) {
let Operation::Update(Update(k, v)) = self.get_op(loc).await? else {
unreachable!("location ({loc}) does not reference update operation");
};
if &k == key {
return Ok(Some(v));
}
}
Ok(None)
}
pub const fn new_batch(&self) -> Batch<'_, E, K, V, T> {
Batch::new(self)
}
pub const fn is_empty(&self) -> bool {
self.active_keys == 0
}
async fn get_op(&self, loc: Location) -> Result<Operation<crate::mmr::Family, K, V>, Error> {
let reader = self.log.reader().await;
assert!(*loc < reader.bounds().end);
reader.read(*loc).await.map_err(|e| match e {
crate::journal::Error::ItemPruned(_) => Error::OperationPruned(loc),
e => Error::Journal(e),
})
}
pub async fn bounds(&self) -> std::ops::Range<Location> {
let bounds = self.log.reader().await.bounds();
Location::new(bounds.start)..Location::new(bounds.end)
}
pub async fn size(&self) -> Location {
Location::new(self.log.size().await)
}
pub const fn inactivity_floor_loc(&self) -> Location {
self.inactivity_floor_loc
}
pub async fn get_metadata(&self) -> Result<Option<V>, Error> {
let Operation::CommitFloor(metadata, _) =
self.log.reader().await.read(*self.last_commit_loc).await?
else {
unreachable!("last commit should be a commit floor operation");
};
Ok(metadata)
}
pub async fn prune(&self, prune_loc: Location) -> Result<(), Error> {
if prune_loc > self.inactivity_floor_loc {
return Err(Error::PruneBeyondMinRequired(
prune_loc,
self.inactivity_floor_loc,
));
}
if !self.log.prune(*prune_loc).await? {
return Ok(());
}
let bounds = self.log.reader().await.bounds();
let log_size = Location::new(bounds.end);
let oldest_retained_loc = Location::new(bounds.start);
debug!(
?log_size,
?oldest_retained_loc,
?prune_loc,
"pruned inactive ops"
);
Ok(())
}
pub async fn init(
context: E,
cfg: Config<T, <Operation<crate::mmr::Family, K, V> as Read>::Cfg>,
) -> Result<Self, Error> {
let mut log = Journal::<E, Operation<crate::mmr::Family, K, V>>::init(
context.with_label("log"),
cfg.log,
)
.await?;
if log.rewind_to(|op| op.is_commit()).await? == 0 {
warn!("Log is empty, initializing new db");
log.append(&Operation::CommitFloor(None, Location::new(0)))
.await?;
}
log.sync().await?;
let last_commit_loc = Location::new(
log.size()
.await
.checked_sub(1)
.expect("commit should exist"),
);
let mut snapshot = Index::new(context.with_label("snapshot"), cfg.translator);
let (inactivity_floor_loc, active_keys) = {
let reader = log.reader().await;
let op = reader.read(*last_commit_loc).await?;
let inactivity_floor_loc = op.has_floor().expect("last op should be a commit");
let active_keys =
build_snapshot_from_log(inactivity_floor_loc, &reader, &mut snapshot, |_, _| {})
.await?;
(inactivity_floor_loc, active_keys)
};
Ok(Self {
log,
snapshot,
active_keys,
inactivity_floor_loc,
last_commit_loc,
steps: 0,
})
}
pub async fn sync(&self) -> Result<(), Error> {
self.log.sync().await.map_err(Into::into)
}
pub async fn destroy(self) -> Result<(), Error> {
self.log.destroy().await.map_err(Into::into)
}
#[allow(clippy::type_complexity)]
const fn as_floor_helper(
&mut self,
) -> FloorHelper<
'_,
crate::mmr::Family,
Index<T, Location>,
Journal<E, Operation<crate::mmr::Family, K, V>>,
> {
FloorHelper {
snapshot: &mut self.snapshot,
log: &mut self.log,
}
}
pub async fn apply_batch(&mut self, batch: Changeset<K, V>) -> Result<Range<Location>, Error> {
let start_loc = self.last_commit_loc + 1;
let (diff, metadata) = batch.into_parts();
for (key, value) in diff {
if let Some(value) = value {
let updated = {
let reader = self.log.reader().await;
let new_loc = reader.bounds().end;
update_key::<crate::mmr::Family, _, _>(
&mut self.snapshot,
&reader,
&key,
Location::new(new_loc),
)
.await?
};
if updated.is_some() {
self.steps += 1;
} else {
self.active_keys += 1;
}
self.log
.append(&Operation::Update(Update(key, value)))
.await?;
} else {
let deleted = {
let reader = self.log.reader().await;
delete_key::<crate::mmr::Family, _, _>(&mut self.snapshot, &reader, &key)
.await?
};
if deleted.is_some() {
self.log.append(&Operation::Delete(key)).await?;
self.steps += 1;
self.active_keys -= 1;
}
}
}
if self.is_empty() {
self.inactivity_floor_loc = self.size().await;
debug!(tip = ?self.inactivity_floor_loc, "db is empty, raising floor to tip");
} else {
let steps_to_take = self.steps + 1;
for _ in 0..steps_to_take {
let loc = self.inactivity_floor_loc;
self.inactivity_floor_loc = self.as_floor_helper().raise_floor(loc).await?;
}
}
self.last_commit_loc = Location::new(
self.log
.append(&Operation::CommitFloor(metadata, self.inactivity_floor_loc))
.await?,
);
self.steps = 0;
let end_loc = self.size().await;
Ok(start_loc..end_loc)
}
pub async fn commit(&self) -> Result<(), Error> {
self.log.commit().await.map_err(Into::into)
}
}
impl<E, K, V, T> Persistable for Db<E, K, V, T>
where
E: Context,
K: Array,
V: VariableValue,
T: Translator,
{
type Error = Error;
async fn commit(&self) -> Result<(), Error> {
Self::commit(self).await
}
async fn sync(&self) -> Result<(), Error> {
self.sync().await
}
async fn destroy(self) -> Result<(), Error> {
self.destroy().await
}
}
#[cfg(test)]
mod test {
use super::*;
use crate::translator::TwoCap;
use commonware_cryptography::{
blake3::{Blake3, Digest},
Hasher as _,
};
use commonware_macros::test_traced;
use commonware_math::algebra::Random;
use commonware_runtime::{buffer::paged::CacheRef, deterministic, Metrics, Runner};
use commonware_utils::{NZUsize, NZU16, NZU64};
use std::num::{NonZeroU16, NonZeroUsize};
const PAGE_SIZE: NonZeroU16 = NZU16!(77);
const PAGE_CACHE_SIZE: NonZeroUsize = NZUsize!(9);
type TestStore = Db<deterministic::Context, Digest, Vec<u8>, TwoCap>;
async fn create_test_store(context: deterministic::Context) -> TestStore {
let cfg = Config {
log: JournalConfig {
partition: "journal".into(),
write_buffer: NZUsize!(64 * 1024),
compression: None,
codec_config: ((), ((0..=10000).into(), ())),
items_per_section: NZU64!(7),
page_cache: CacheRef::from_pooler(&context, PAGE_SIZE, PAGE_CACHE_SIZE),
},
translator: TwoCap,
};
TestStore::init(context, cfg).await.unwrap()
}
async fn apply_entries(
db: &mut TestStore,
iter: impl IntoIterator<Item = (Digest, Option<Vec<u8>>)> + Send,
) -> Range<Location> {
db.apply_batch(iter.into_iter().collect()).await.unwrap()
}
#[test_traced("DEBUG")]
pub fn test_store_construct_empty() {
let executor = deterministic::Runner::default();
executor.start(|mut context| async move {
let mut db = create_test_store(context.with_label("store_0")).await;
assert_eq!(db.bounds().await.end, 1);
assert_eq!(db.log.bounds().await.start, 0);
assert_eq!(db.inactivity_floor_loc(), 0);
assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
assert!(matches!(
db.prune(Location::new(1)).await,
Err(Error::PruneBeyondMinRequired(_, _))
));
assert!(db.get_metadata().await.unwrap().is_none());
let d1 = Digest::random(&mut context);
let v1 = vec![1, 2, 3];
apply_entries(&mut db, [(d1, Some(v1))]).await;
drop(db);
let mut db = create_test_store(context.with_label("store_1")).await;
assert_eq!(db.bounds().await.end, 1);
let metadata = vec![1, 2, 3];
let batch = db.new_batch().finalize(Some(metadata.clone()));
let range = db.apply_batch(batch).await.unwrap();
assert_eq!(range.start, 1);
assert_eq!(range.end, 2);
db.commit().await.unwrap();
assert_eq!(db.bounds().await.end, 2);
assert!(matches!(db.prune(db.inactivity_floor_loc()).await, Ok(())));
assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
let mut db = create_test_store(context.with_label("store_2")).await;
assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
apply_entries(
&mut db,
[(Digest::random(&mut context), Some(vec![1, 2, 3]))],
)
.await;
db.commit().await.unwrap();
for _ in 1..100 {
db.apply_batch(db.new_batch().finalize(None)).await.unwrap();
db.commit().await.unwrap();
assert!(db.bounds().await.end - db.inactivity_floor_loc <= 3);
assert!(db.get_metadata().await.unwrap().is_none());
}
db.destroy().await.unwrap();
});
}
#[test_traced("DEBUG")]
fn test_store_construct_basic() {
let executor = deterministic::Runner::default();
executor.start(|mut ctx| async move {
let mut db = create_test_store(ctx.with_label("store_0")).await;
assert_eq!(db.bounds().await.end, 1);
assert_eq!(db.inactivity_floor_loc, 0);
let key = Digest::random(&mut ctx);
let value = vec![2, 3, 4, 5];
let result = db.get(&key).await;
assert!(result.unwrap().is_none());
apply_entries(&mut db, [(key, Some(value.clone()))]).await;
assert_eq!(*db.bounds().await.end, 4);
assert_eq!(*db.inactivity_floor_loc, 2);
let fetched_value = db.get(&key).await.unwrap();
assert_eq!(fetched_value.unwrap(), value);
drop(db);
let mut db = create_test_store(ctx.with_label("store_1")).await;
assert_eq!(*db.bounds().await.end, 1);
assert_eq!(*db.inactivity_floor_loc, 0);
assert!(db.get_metadata().await.unwrap().is_none());
let metadata = vec![99, 100];
let range = db
.apply_batch(
db.new_batch()
.update(key, value.clone())
.finalize(Some(metadata.clone())),
)
.await
.unwrap();
assert_eq!(*range.start, 1);
assert_eq!(*range.end, 4);
db.commit().await.unwrap();
assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
assert_eq!(*db.bounds().await.end, 4);
assert_eq!(*db.inactivity_floor_loc, 2);
let mut db = create_test_store(ctx.with_label("store_2")).await;
assert_eq!(*db.bounds().await.end, 4);
assert_eq!(*db.inactivity_floor_loc, 2);
let fetched_value = db.get(&key).await.unwrap();
assert_eq!(fetched_value.unwrap(), value);
let (k1, v1) = (Digest::random(&mut ctx), vec![2, 3, 4, 5, 6]);
let (k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8]);
apply_entries(&mut db, [(k1, Some(v1.clone()))]).await;
apply_entries(&mut db, [(k2, Some(v2.clone()))]).await;
assert_eq!(*db.bounds().await.end, 10);
assert_eq!(*db.inactivity_floor_loc, 5);
assert_eq!(db.get_metadata().await.unwrap(), None);
db.commit().await.unwrap();
assert_eq!(db.get_metadata().await.unwrap(), None);
assert_eq!(*db.bounds().await.end, 10);
assert_eq!(*db.inactivity_floor_loc, 5);
assert_eq!(db.get(&key).await.unwrap().unwrap(), value);
assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
let mut v1_updated = db.get(&k1).await.unwrap().unwrap();
v1_updated.push(7);
apply_entries(&mut db, [(k1, Some(v1_updated))]).await;
db.commit().await.unwrap();
assert_eq!(db.get(&k1).await.unwrap().unwrap(), vec![2, 3, 4, 5, 6, 7]);
let k3 = Digest::random(&mut ctx);
apply_entries(&mut db, [(k3, Some(vec![8]))]).await;
db.commit().await.unwrap();
assert_eq!(db.get(&k3).await.unwrap().unwrap(), vec![8]);
db.destroy().await.unwrap();
});
}
#[test_traced("DEBUG")]
fn test_store_log_replay() {
let executor = deterministic::Runner::default();
executor.start(|mut ctx| async move {
let mut db = create_test_store(ctx.with_label("store_0")).await;
const UPDATES: u64 = 100;
let k = Digest::random(&mut ctx);
for _ in 0..UPDATES {
let v = vec![1, 2, 3, 4, 5];
apply_entries(&mut db, [(k, Some(v.clone()))]).await;
}
let iter = db.snapshot.get(&k);
assert_eq!(iter.count(), 1);
db.commit().await.unwrap();
db.sync().await.unwrap();
drop(db);
let db = create_test_store(ctx.with_label("store_1")).await;
db.prune(db.inactivity_floor_loc()).await.unwrap();
let iter = db.snapshot.get(&k);
assert_eq!(iter.count(), 1);
assert_eq!(*db.bounds().await.end, 400);
assert_eq!(*db.inactivity_floor_loc, 398);
let floor = db.inactivity_floor_loc;
assert_eq!(db.log.bounds().await.start, *floor - *floor % 7);
db.destroy().await.unwrap();
});
}
#[test_traced("DEBUG")]
fn test_store_build_snapshot_keys_with_shared_prefix() {
let executor = deterministic::Runner::default();
executor.start(|mut ctx| async move {
let mut db = create_test_store(ctx.with_label("store_0")).await;
let (k1, v1) = (Digest::random(&mut ctx), vec![1, 2, 3, 4, 5]);
let (mut k2, v2) = (Digest::random(&mut ctx), vec![6, 7, 8, 9, 10]);
k2.0[0..2].copy_from_slice(&k1.0[0..2]);
apply_entries(&mut db, [(k1, Some(v1.clone()))]).await;
apply_entries(&mut db, [(k2, Some(v2.clone()))]).await;
assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
db.commit().await.unwrap();
db.sync().await.unwrap();
drop(db);
let db = create_test_store(ctx.with_label("store_1")).await;
assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
db.destroy().await.unwrap();
});
}
#[test_traced("DEBUG")]
fn test_store_delete() {
let executor = deterministic::Runner::default();
executor.start(|mut ctx| async move {
let mut db = create_test_store(ctx.with_label("store_0")).await;
let k = Digest::random(&mut ctx);
let v = vec![1, 2, 3, 4, 5];
apply_entries(&mut db, [(k, Some(v.clone()))]).await;
db.commit().await.unwrap();
let fetched_value = db.get(&k).await.unwrap();
assert_eq!(fetched_value.unwrap(), v);
assert!(db.get(&k).await.unwrap().is_some());
apply_entries(&mut db, [(k, None)]).await;
let fetched_value = db.get(&k).await.unwrap();
assert!(fetched_value.is_none());
assert!(db.get(&k).await.unwrap().is_none());
db.commit().await.unwrap();
let mut db = create_test_store(ctx.with_label("store_1")).await;
let fetched_value = db.get(&k).await.unwrap();
assert!(fetched_value.is_none());
apply_entries(&mut db, [(k, Some(v.clone()))]).await;
let fetched_value = db.get(&k).await.unwrap();
assert_eq!(fetched_value.unwrap(), v);
db.commit().await.unwrap();
let mut db = create_test_store(ctx.with_label("store_2")).await;
let fetched_value = db.get(&k).await.unwrap();
assert_eq!(fetched_value.unwrap(), v);
let k_n = Digest::random(&mut ctx);
let range = apply_entries(&mut db, [(k_n, None)]).await;
assert_eq!(range.start, 9);
assert_eq!(range.end, 11);
db.commit().await.unwrap();
assert!(db.get(&k_n).await.unwrap().is_none());
assert!(db.get(&k).await.unwrap().is_some());
db.destroy().await.unwrap();
});
}
#[test_traced("DEBUG")]
fn test_store_pruning() {
let executor = deterministic::Runner::default();
executor.start(|mut ctx| async move {
let mut db = create_test_store(ctx.with_label("store")).await;
let k_a = Digest::random(&mut ctx);
let k_b = Digest::random(&mut ctx);
let v_a = vec![1];
let v_b = vec![];
let v_c = vec![4, 5, 6];
apply_entries(&mut db, [(k_a, Some(v_a.clone()))]).await;
apply_entries(&mut db, [(k_b, Some(v_b.clone()))]).await;
db.commit().await.unwrap();
assert_eq!(*db.bounds().await.end, 7);
assert_eq!(*db.inactivity_floor_loc, 3);
assert_eq!(db.get(&k_a).await.unwrap().unwrap(), v_a);
apply_entries(&mut db, [(k_b, Some(v_a.clone()))]).await;
apply_entries(&mut db, [(k_a, Some(v_c.clone()))]).await;
db.commit().await.unwrap();
assert_eq!(*db.bounds().await.end, 15);
assert_eq!(*db.inactivity_floor_loc, 12);
assert_eq!(db.get(&k_a).await.unwrap().unwrap(), v_c);
assert_eq!(db.get(&k_b).await.unwrap().unwrap(), v_a);
db.destroy().await.unwrap();
});
}
#[test_traced("WARN")]
pub fn test_store_db_recovery() {
let executor = deterministic::Runner::default();
const ELEMENTS: u64 = 1000;
executor.start(|context| async move {
let db = create_test_store(context.with_label("store_0")).await;
{
let mut batch = db.new_batch();
for i in 0u64..ELEMENTS {
let k = Blake3::hash(&i.to_be_bytes());
let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
batch = batch.update(k, v);
}
}
drop(db);
let mut db = create_test_store(context.with_label("store_1")).await;
assert_eq!(*db.bounds().await.end, 1);
for i in 0u64..ELEMENTS {
let k = Blake3::hash(&i.to_be_bytes());
let v = vec![(i % 255) as u8; ((i % 13) + 7) as usize];
apply_entries(&mut db, [(k, Some(v.clone()))]).await;
}
db.commit().await.unwrap();
for i in 0u64..ELEMENTS {
if i % 3 != 0 {
continue;
}
let k = Blake3::hash(&i.to_be_bytes());
let v = vec![((i + 1) % 255) as u8; ((i % 13) + 8) as usize];
apply_entries(&mut db, [(k, Some(v.clone()))]).await;
}
db.commit().await.unwrap();
assert_eq!(db.snapshot.items(), 1000);
for i in 0u64..ELEMENTS {
if i % 7 != 1 {
continue;
}
let k = Blake3::hash(&i.to_be_bytes());
apply_entries(&mut db, [(k, None)]).await;
}
db.commit().await.unwrap();
let final_count = db.bounds().await.end;
let final_floor = db.inactivity_floor_loc;
db.sync().await.unwrap();
drop(db);
let db = create_test_store(context.with_label("store_2")).await;
assert_eq!(db.bounds().await.end, final_count);
assert_eq!(db.inactivity_floor_loc, final_floor);
db.prune(db.inactivity_floor_loc()).await.unwrap();
assert_eq!(db.log.bounds().await.start, *final_floor - *final_floor % 7);
assert_eq!(db.snapshot.items(), 857);
db.destroy().await.unwrap();
});
}
#[test_traced("DEBUG")]
fn test_store_batch() {
let executor = deterministic::Runner::default();
executor.start(|mut ctx| async move {
let mut db = create_test_store(ctx.with_label("store_0")).await;
assert_eq!(db.bounds().await.end, 1);
assert_eq!(db.inactivity_floor_loc, 0);
let key = Digest::random(&mut ctx);
let value = vec![2, 3, 4, 5];
let batch = db.new_batch();
let result = batch.get(&key).await;
assert!(result.unwrap().is_none());
let batch = batch.update(key, value.clone());
assert_eq!(db.bounds().await.end, 1); assert_eq!(db.inactivity_floor_loc, 0);
let fetched_value = batch.get(&key).await.unwrap();
assert_eq!(fetched_value.unwrap(), value);
db.apply_batch(batch.finalize(None)).await.unwrap();
drop(db);
let mut db = create_test_store(ctx.with_label("store_1")).await;
assert_eq!(db.bounds().await.end, 1);
assert_eq!(db.inactivity_floor_loc, 0);
assert!(db.get_metadata().await.unwrap().is_none());
let metadata = vec![99, 100];
let range = db
.apply_batch(
db.new_batch()
.update(key, value.clone())
.finalize(Some(metadata.clone())),
)
.await
.unwrap();
assert_eq!(range.start, 1);
assert_eq!(range.end, 4);
db.commit().await.unwrap();
assert_eq!(db.get_metadata().await.unwrap(), Some(metadata.clone()));
drop(db);
let db = create_test_store(ctx.with_label("store_2")).await;
assert_eq!(db.bounds().await.end, 4);
assert_eq!(db.inactivity_floor_loc, 2);
let fetched_value = db.get(&key).await.unwrap();
assert_eq!(fetched_value.unwrap(), value);
db.destroy().await.unwrap();
});
}
fn is_send<T: Send>(_: T) {}
#[allow(dead_code)]
fn assert_read_futures_are_send(db: &mut TestStore, key: Digest, loc: Location) {
is_send(db.get(&key));
is_send(db.get_metadata());
is_send(db.prune(loc));
is_send(db.sync());
}
#[allow(dead_code)]
fn assert_write_futures_are_send(
db: &mut Db<deterministic::Context, Digest, Vec<u8>, TwoCap>,
key: Digest,
value: Vec<u8>,
) {
is_send(db.get(&key));
is_send(db.apply_batch(Changeset::from([(key, Some(value))])));
is_send(db.apply_batch(Changeset::from([(key, None)])));
let batch = db.new_batch();
is_send(batch.get(&key));
}
#[allow(dead_code)]
fn assert_commit_is_send(db: &Db<deterministic::Context, Digest, Vec<u8>, TwoCap>) {
is_send(db.commit());
}
}