use crate::{
index::{unordered::Index, Unordered as _},
journal::{
authenticated,
contiguous::{Contiguous, Mutable, Reader},
Error as JournalError,
},
merkle::{journaled::Config as MmrConfig, Family, Location, Proof},
qmdb::{any::ValueEncoding, build_snapshot_from_log, delete_known_loc, operation::Key, Error},
translator::Translator,
Context, Persistable,
};
use commonware_codec::EncodeShared;
use commonware_cryptography::Hasher as CHasher;
use std::{collections::BTreeSet, num::NonZeroU64, ops::Range, sync::Arc};
use tracing::warn;
pub mod batch;
pub mod fixed;
mod operation;
pub mod sync;
pub mod variable;
pub use operation::Operation;
#[derive(Clone)]
pub struct Config<T: Translator, J> {
pub merkle_config: MmrConfig,
pub log: J,
pub translator: T,
}
pub struct Immutable<
F: Family,
E: Context,
K: Key,
V: ValueEncoding,
C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
H: CHasher,
T: Translator,
> where
C::Item: EncodeShared,
{
pub(crate) journal: authenticated::Journal<F, E, C, H>,
pub(crate) snapshot: Index<T, Location<F>>,
pub(crate) last_commit_loc: Location<F>,
}
impl<F, E, K, V, C, H, T> Immutable<F, E, K, V, C, H, T>
where
F: Family,
E: Context,
K: Key,
V: ValueEncoding,
C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
H: CHasher,
T: Translator,
{
pub(crate) async fn init_from_journal(
mut journal: authenticated::Journal<F, E, C, H>,
context: E,
translator: T,
) -> Result<Self, Error<F>> {
if journal.size().await == 0 {
warn!("Authenticated log is empty, initialized new db.");
journal.append(&Operation::Commit(None)).await?;
journal.sync().await?;
}
let mut snapshot = Index::new(context.with_label("snapshot"), translator);
let last_commit_loc = {
let reader = journal.journal.reader().await;
let start_loc = Location::new(reader.bounds().start);
build_snapshot_from_log::<F, _, _, _>(start_loc, &reader, &mut snapshot, |_, _| {})
.await?;
Location::new(
reader
.bounds()
.end
.checked_sub(1)
.expect("commit should exist"),
)
};
Ok(Self {
journal,
snapshot,
last_commit_loc,
})
}
pub async fn size(&self) -> Location<F> {
self.bounds().await.end
}
pub async fn bounds(&self) -> Range<Location<F>> {
let bounds = self.journal.reader().await.bounds();
Location::new(bounds.start)..Location::new(bounds.end)
}
pub async fn get(&self, key: &K) -> Result<Option<V::Value>, Error<F>> {
let iter = self.snapshot.get(key);
let reader = self.journal.reader().await;
let oldest = reader.bounds().start;
for &loc in iter {
if loc < oldest {
continue;
}
if let Some(v) = Self::get_from_loc(&reader, key, loc).await? {
return Ok(Some(v));
}
}
Ok(None)
}
async fn get_from_loc(
reader: &impl Reader<Item = Operation<K, V>>,
key: &K,
loc: Location<F>,
) -> Result<Option<V::Value>, Error<F>> {
if loc < reader.bounds().start {
return Err(Error::OperationPruned(loc));
}
let Operation::Set(k, v) = reader.read(*loc).await? else {
return Err(Error::UnexpectedData(loc));
};
if k != *key {
Ok(None)
} else {
Ok(Some(v))
}
}
pub async fn get_metadata(&self) -> Result<Option<V::Value>, Error<F>> {
let last_commit_loc = self.last_commit_loc;
let Operation::Commit(metadata) = self
.journal
.journal
.reader()
.await
.read(*last_commit_loc)
.await?
else {
unreachable!("no commit operation at location of last commit {last_commit_loc}");
};
Ok(metadata)
}
pub async fn historical_proof(
&self,
op_count: Location<F>,
start_loc: Location<F>,
max_ops: NonZeroU64,
) -> Result<(Proof<F, H::Digest>, Vec<Operation<K, V>>), Error<F>> {
Ok(self
.journal
.historical_proof(op_count, start_loc, max_ops)
.await?)
}
pub async fn proof(
&self,
start_index: Location<F>,
max_ops: NonZeroU64,
) -> Result<(Proof<F, H::Digest>, Vec<Operation<K, V>>), Error<F>> {
let op_count = self.bounds().await.end;
self.historical_proof(op_count, start_index, max_ops).await
}
pub async fn prune(&mut self, loc: Location<F>) -> Result<(), Error<F>> {
if loc > self.last_commit_loc {
return Err(Error::PruneBeyondMinRequired(loc, self.last_commit_loc));
}
self.journal.prune(loc).await?;
Ok(())
}
pub async fn rewind(&mut self, size: Location<F>) -> Result<(), Error<F>> {
let rewind_size = *size;
let current_size = *self.last_commit_loc + 1;
if rewind_size == current_size {
return Ok(());
}
if rewind_size == 0 || rewind_size > current_size {
return Err(Error::Journal(crate::journal::Error::InvalidRewind(
rewind_size,
)));
}
let (rewind_last_loc, rewound_sets) = {
let reader = self.journal.reader().await;
let bounds = reader.bounds();
let rewind_last_loc = Location::new(rewind_size - 1);
if rewind_size <= bounds.start {
return Err(Error::Journal(crate::journal::Error::ItemPruned(
*rewind_last_loc,
)));
}
let rewind_last_op = reader.read(*rewind_last_loc).await?;
if !matches!(rewind_last_op, Operation::Commit(_)) {
return Err(Error::UnexpectedData(rewind_last_loc));
}
let mut rewound_sets = Vec::new();
for loc in rewind_size..current_size {
if let Operation::Set(key, _) = reader.read(loc).await? {
rewound_sets.push((Location::new(loc), key));
}
}
(rewind_last_loc, rewound_sets)
};
self.journal.rewind(rewind_size).await?;
for (loc, key) in rewound_sets {
delete_known_loc(&mut self.snapshot, &key, loc);
}
self.last_commit_loc = rewind_last_loc;
Ok(())
}
pub fn root(&self) -> H::Digest {
self.journal.root()
}
pub async fn pinned_nodes_at(&self, loc: Location<F>) -> Result<Vec<H::Digest>, Error<F>> {
if !loc.is_valid() {
return Err(crate::merkle::Error::LocationOverflow(loc).into());
}
let futs: Vec<_> = F::nodes_to_pin(loc)
.map(|p| async move {
self.journal
.merkle
.get_node(p)
.await?
.ok_or(crate::merkle::Error::ElementPruned(p).into())
})
.collect();
futures::future::try_join_all(futs).await
}
pub async fn sync(&self) -> Result<(), Error<F>> {
Ok(self.journal.sync().await?)
}
pub async fn commit(&self) -> Result<(), Error<F>> {
Ok(self.journal.commit().await?)
}
pub async fn destroy(self) -> Result<(), Error<F>> {
Ok(self.journal.destroy().await?)
}
#[allow(clippy::type_complexity)]
pub fn new_batch(&self) -> batch::UnmerkleizedBatch<F, H, K, V> {
let journal_size = *self.last_commit_loc + 1;
batch::UnmerkleizedBatch::new(self, journal_size)
}
pub async fn apply_batch(
&mut self,
batch: Arc<batch::MerkleizedBatch<F, H::Digest, K, V>>,
) -> Result<Range<Location<F>>, Error<F>> {
let db_size = *self.last_commit_loc + 1;
let valid = db_size == batch.db_size
|| db_size == batch.base_size
|| batch.ancestor_diff_ends.contains(&db_size);
if !valid {
return Err(Error::StaleBatch {
db_size,
batch_db_size: batch.db_size,
batch_base_size: batch.base_size,
});
}
let start_loc = Location::new(db_size);
self.journal.apply_batch(&batch.journal_batch).await?;
let bounds = self.journal.reader().await.bounds();
let mut seen = BTreeSet::new();
for (key, entry) in batch.diff.iter() {
seen.insert(key.clone());
self.snapshot
.insert_and_prune(key, entry.loc, |v| *v < bounds.start);
}
for (i, ancestor_diff) in batch.ancestor_diffs.iter().enumerate() {
if batch.ancestor_diff_ends[i] <= db_size {
continue;
}
for (key, entry) in ancestor_diff.iter() {
if seen.insert(key.clone()) {
self.snapshot
.insert_and_prune(key, entry.loc, |v| *v < bounds.start);
}
}
}
self.last_commit_loc = Location::new(batch.total_size - 1);
Ok(start_loc..Location::new(batch.total_size))
}
}
#[cfg(test)]
pub(super) mod test {
use super::*;
use crate::{
merkle::{Family, Location},
qmdb::verify_proof,
translator::TwoCap,
};
use commonware_codec::EncodeShared;
use commonware_cryptography::{sha256, sha256::Digest, Sha256};
use commonware_runtime::{deterministic, Metrics};
use commonware_utils::NZU64;
use core::{future::Future, pin::Pin};
use std::ops::Range;
type StandardHasher<H> = crate::merkle::hasher::Standard<H>;
const ITEMS_PER_SECTION: u64 = 5;
type TestDb<F, V, C> = Immutable<F, deterministic::Context, Digest, V, C, Sha256, TwoCap>;
pub(crate) async fn test_immutable_empty<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let db = open_db(context.with_label("first")).await;
let bounds = db.bounds().await;
assert_eq!(bounds.end, 1);
assert_eq!(bounds.start, Location::new(0));
assert!(db.get_metadata().await.unwrap().is_none());
let k1 = Sha256::fill(1u8);
let v1 = Sha256::fill(2u8);
let root = db.root();
{
let _batch = db.new_batch().set(k1, v1);
}
drop(db);
let mut db = open_db(context.with_label("second")).await;
assert_eq!(db.root(), root);
assert_eq!(db.bounds().await.end, 1);
db.apply_batch(db.new_batch().merkleize(&db, None))
.await
.unwrap();
db.commit().await.unwrap();
assert_eq!(db.bounds().await.end, 2); let root = db.root();
drop(db);
let db = open_db(context.with_label("third")).await;
assert_eq!(db.root(), root);
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_build_basic<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("first")).await;
let k1 = Sha256::fill(1u8);
let k2 = Sha256::fill(2u8);
let v1 = Sha256::fill(3u8);
let v2 = Sha256::fill(4u8);
assert!(db.get(&k1).await.unwrap().is_none());
assert!(db.get(&k2).await.unwrap().is_none());
let metadata = Some(Sha256::fill(99u8));
db.apply_batch(db.new_batch().set(k1, v1).merkleize(&db, metadata))
.await
.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
assert!(db.get(&k2).await.unwrap().is_none());
assert_eq!(db.bounds().await.end, 3);
assert_eq!(db.get_metadata().await.unwrap(), Some(Sha256::fill(99u8)));
db.apply_batch(db.new_batch().set(k2, v2).merkleize(&db, None))
.await
.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
assert_eq!(db.bounds().await.end, 5);
assert_eq!(db.get_metadata().await.unwrap(), None);
let root = db.root();
let k3 = Sha256::fill(5u8);
let v3 = Sha256::fill(6u8);
{
let _batch = db.new_batch().set(k3, v3);
}
drop(db); let db = open_db(context.with_label("second")).await;
assert!(db.get(&k3).await.unwrap().is_none());
assert_eq!(db.root(), root);
assert_eq!(db.get(&k1).await.unwrap().unwrap(), v1);
assert_eq!(db.get(&k2).await.unwrap().unwrap(), v2);
assert_eq!(db.bounds().await.end, 5);
assert_eq!(db.get_metadata().await.unwrap(), None);
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_proof_verify<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("first")).await;
let k1 = Sha256::fill(1u8);
let v1 = Sha256::fill(10u8);
db.apply_batch(db.new_batch().set(k1, v1).merkleize(&db, None))
.await
.unwrap();
db.commit().await.unwrap();
let (proof, ops) = db.proof(Location::new(0), NZU64!(100)).await.unwrap();
let root = db.root();
let hasher = StandardHasher::<Sha256>::new();
assert!(verify_proof(&hasher, &proof, Location::new(0), &ops, &root));
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_prune<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("first")).await;
for i in 0..20u8 {
let key = Sha256::fill(i);
let value = Sha256::fill(i.wrapping_add(100));
db.apply_batch(db.new_batch().set(key, value).merkleize(&db, None))
.await
.unwrap();
db.commit().await.unwrap();
}
let root_before = db.root();
let bounds_before = db.bounds().await;
let prune_loc = Location::new(*bounds_before.end - 5);
db.prune(prune_loc).await.unwrap();
assert_eq!(db.root(), root_before);
let key_0 = Sha256::fill(0u8);
assert!(db.get(&key_0).await.unwrap().is_none());
let key_19 = Sha256::fill(19u8);
assert_eq!(
db.get(&key_19).await.unwrap(),
Some(Sha256::fill(19u8.wrapping_add(100)))
);
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_batch_chain<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("first")).await;
let k1 = Sha256::fill(1u8);
let k2 = Sha256::fill(2u8);
let k3 = Sha256::fill(3u8);
let v1 = Sha256::fill(11u8);
let v2 = Sha256::fill(12u8);
let v3 = Sha256::fill(13u8);
let parent = db.new_batch().set(k1, v1).merkleize(&db, None);
let child = parent
.new_batch::<Sha256>()
.set(k2, v2)
.merkleize(&db, None);
assert_eq!(child.get(&k1, &db).await.unwrap(), Some(v1));
assert_eq!(child.get(&k2, &db).await.unwrap(), Some(v2));
assert!(child.get(&k3, &db).await.unwrap().is_none());
db.apply_batch(child).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&k1).await.unwrap(), Some(v1));
assert_eq!(db.get(&k2).await.unwrap(), Some(v2));
db.apply_batch(db.new_batch().set(k3, v3).merkleize(&db, None))
.await
.unwrap();
db.commit().await.unwrap();
assert_eq!(db.get(&k3).await.unwrap(), Some(v3));
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_build_and_authenticate<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let hasher = StandardHasher::<Sha256>::new();
let mut db = open_db(context.with_label("first")).await;
let mut batch = db.new_batch();
for i in 0u64..2_000 {
let k = Sha256::hash(&i.to_be_bytes());
let v = Sha256::fill(i as u8);
batch = batch.set(k, v);
}
let merkleized = batch.merkleize(&db, None);
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.bounds().await.end, 2_000 + 2);
let root = db.root();
drop(db);
let db = open_db(context.with_label("second")).await;
assert_eq!(root, db.root());
assert_eq!(db.bounds().await.end, 2_000 + 2);
for i in 0u64..2_000 {
let k = Sha256::hash(&i.to_be_bytes());
let v = Sha256::fill(i as u8);
assert_eq!(db.get(&k).await.unwrap().unwrap(), v);
}
let max_ops = NZU64!(5);
for i in 0..*db.bounds().await.end {
let (proof, log) = db.proof(Location::new(i), max_ops).await.unwrap();
assert!(verify_proof(&hasher, &proof, Location::new(i), &log, &root));
}
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_recovery_from_failed_merkle_sync<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
const ELEMENTS: u64 = 1000;
let mut db = open_db(context.with_label("first")).await;
let mut batch = db.new_batch();
for i in 0u64..ELEMENTS {
let k = Sha256::hash(&i.to_be_bytes());
let v = Sha256::fill(i as u8);
batch = batch.set(k, v);
}
let merkleized = batch.merkleize(&db, None);
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
assert_eq!(db.bounds().await.end, ELEMENTS + 2);
db.sync().await.unwrap();
let halfway_root = db.root();
let mut batch = db.new_batch();
for i in 0u64..ELEMENTS {
let k = Sha256::hash(&i.to_be_bytes());
let v = Sha256::fill(i as u8);
batch = batch.set(k, v);
}
let merkleized = batch.merkleize(&db, None);
db.apply_batch(merkleized).await.unwrap();
db.commit().await.unwrap();
drop(db);
let db = open_db(context.with_label("second")).await;
assert_eq!(db.bounds().await.end, 2003);
let root = db.root();
assert_ne!(root, halfway_root);
drop(db);
let db = open_db(context.with_label("third")).await;
assert_eq!(db.bounds().await.end, 2003);
assert_eq!(db.root(), root);
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_recovery_from_failed_log_sync<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("first")).await;
let k1 = Sha256::fill(1u8);
let v1 = Sha256::fill(3u8);
db.apply_batch(db.new_batch().set(k1, v1).merkleize(&db, None))
.await
.unwrap();
db.commit().await.unwrap();
let first_commit_root = db.root();
drop(db);
let db = open_db(context.with_label("second")).await;
assert_eq!(db.bounds().await.end, 3);
let root = db.root();
assert_eq!(root, first_commit_root);
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_pruning<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
const ELEMENTS: u64 = 2_000;
let mut db = open_db(context.with_label("first")).await;
let mut sorted_keys: Vec<sha256::Digest> = (1u64..ELEMENTS + 1)
.map(|i| Sha256::hash(&i.to_be_bytes()))
.collect();
sorted_keys.sort();
let mut batch = db.new_batch();
for i in 1u64..ELEMENTS + 1 {
let k = Sha256::hash(&i.to_be_bytes());
let v = Sha256::fill(i as u8);
batch = batch.set(k, v);
}
let merkleized = batch.merkleize(&db, None);
db.apply_batch(merkleized).await.unwrap();
assert_eq!(db.bounds().await.end, ELEMENTS + 2);
db.prune(Location::new((ELEMENTS + 2) / 2)).await.unwrap();
let bounds = db.bounds().await;
assert_eq!(bounds.end, ELEMENTS + 2);
let oldest_retained_loc = bounds.start;
assert_eq!(oldest_retained_loc, Location::new(ELEMENTS / 2));
let pruned_key = sorted_keys[*oldest_retained_loc as usize - 2];
assert!(db.get(&pruned_key).await.unwrap().is_none());
let unpruned_key = sorted_keys[*oldest_retained_loc as usize - 1];
assert!(db.get(&unpruned_key).await.unwrap().is_some());
let root = db.root();
db.sync().await.unwrap();
drop(db);
let mut db = open_db(context.with_label("second")).await;
assert_eq!(root, db.root());
let bounds = db.bounds().await;
assert_eq!(bounds.end, ELEMENTS + 2);
let oldest_retained_loc = bounds.start;
assert_eq!(oldest_retained_loc, Location::new(ELEMENTS / 2));
let loc = Location::new(ELEMENTS / 2 + (ITEMS_PER_SECTION * 2 - 1));
db.prune(loc).await.unwrap();
let oldest_retained_loc = db.bounds().await.start;
assert_eq!(
oldest_retained_loc,
Location::new(ELEMENTS / 2 + ITEMS_PER_SECTION)
);
db.sync().await.unwrap();
drop(db);
let db = open_db(context.with_label("third")).await;
let oldest_retained_loc = db.bounds().await.start;
assert_eq!(
oldest_retained_loc,
Location::new(ELEMENTS / 2 + ITEMS_PER_SECTION)
);
let pruned_key = sorted_keys[*oldest_retained_loc as usize - 4];
assert!(db.get(&pruned_key).await.unwrap().is_none());
let unpruned_key = sorted_keys[*oldest_retained_loc as usize - 1];
assert!(db.get(&unpruned_key).await.unwrap().is_some());
let pruned_pos = ELEMENTS / 2;
let proof_result = db
.proof(Location::new(pruned_pos), NZU64!(pruned_pos + 100))
.await;
assert!(
matches!(proof_result, Err(Error::Journal(crate::journal::Error::ItemPruned(pos))) if pos == pruned_pos)
);
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_prune_beyond_commit<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("test")).await;
let result = db.prune(Location::new(1)).await;
assert!(
matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
if prune_loc == Location::new(1) && commit_loc == Location::new(0))
);
let k1 = Digest::from(*b"12345678901234567890123456789012");
let k2 = Digest::from(*b"abcdefghijklmnopqrstuvwxyz123456");
let k3 = Digest::from(*b"99999999999999999999999999999999");
let v1 = Sha256::fill(1u8);
let v2 = Sha256::fill(2u8);
let v3 = Sha256::fill(3u8);
db.apply_batch(db.new_batch().set(k1, v1).set(k2, v2).merkleize(&db, None))
.await
.unwrap();
assert_eq!(*db.last_commit_loc, 3);
db.apply_batch(db.new_batch().set(k3, v3).merkleize(&db, None))
.await
.unwrap();
assert!(db.prune(Location::new(3)).await.is_ok());
let new_last_commit = db.last_commit_loc;
let beyond = new_last_commit + 1;
let result = db.prune(beyond).await;
assert!(
matches!(result, Err(Error::PruneBeyondMinRequired(prune_loc, commit_loc))
if prune_loc == beyond && commit_loc == new_last_commit)
);
db.destroy().await.unwrap();
}
async fn commit_sets<F: Family, V, C>(
db: &mut TestDb<F, V, C>,
sets: impl IntoIterator<Item = (Digest, V::Value)>,
metadata: Option<V::Value>,
) -> Range<Location<F>>
where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut batch = db.new_batch();
for (key, value) in sets {
batch = batch.set(key, value);
}
let range = db.apply_batch(batch.merkleize(db, metadata)).await.unwrap();
db.commit().await.unwrap();
range
}
pub(crate) async fn test_immutable_rewind_recovery<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("db")).await;
let key1 = Sha256::hash(&1u64.to_be_bytes());
let key2 = Sha256::hash(&2u64.to_be_bytes());
let key3 = Sha256::hash(&3u64.to_be_bytes());
let key4 = Sha256::hash(&4u64.to_be_bytes());
let value1 = Sha256::fill(11u8);
let value2 = Sha256::fill(22u8);
let value3 = Sha256::fill(33u8);
let value4 = Sha256::fill(66u8);
let metadata_a = Sha256::fill(44u8);
let first_range =
commit_sets(&mut db, [(key1, value1), (key2, value2)], Some(metadata_a)).await;
let size_before = db.bounds().await.end;
let root_before = db.root();
let last_commit_before = db.last_commit_loc;
assert_eq!(size_before, first_range.end);
let metadata_b = Sha256::fill(55u8);
let second_range =
commit_sets(&mut db, [(key3, value3), (key4, value4)], Some(metadata_b)).await;
assert_eq!(second_range.start, size_before);
assert_ne!(db.root(), root_before);
assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_b));
assert_eq!(db.get(&key3).await.unwrap(), Some(value3));
assert_eq!(db.get(&key4).await.unwrap(), Some(value4));
db.rewind(size_before).await.unwrap();
assert_eq!(db.root(), root_before);
assert_eq!(db.bounds().await.end, size_before);
assert_eq!(db.last_commit_loc, last_commit_before);
assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a));
assert_eq!(db.get(&key1).await.unwrap(), Some(value1));
assert_eq!(db.get(&key2).await.unwrap(), Some(value2));
assert_eq!(db.get(&key3).await.unwrap(), None);
assert_eq!(db.get(&key4).await.unwrap(), None);
db.commit().await.unwrap();
drop(db);
let db = open_db(context.with_label("reopen")).await;
assert_eq!(db.root(), root_before);
assert_eq!(db.bounds().await.end, size_before);
assert_eq!(db.last_commit_loc, last_commit_before);
assert_eq!(db.get_metadata().await.unwrap(), Some(metadata_a));
assert_eq!(db.get(&key1).await.unwrap(), Some(value1));
assert_eq!(db.get(&key2).await.unwrap(), Some(value2));
assert_eq!(db.get(&key3).await.unwrap(), None);
assert_eq!(db.get(&key4).await.unwrap(), None);
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_rewind_pruned_target_errors<F: Family, V, C>(
context: deterministic::Context,
open_small_sections_db: impl Fn(
deterministic::Context,
)
-> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_small_sections_db(context.with_label("db")).await;
let first_range = commit_sets(
&mut db,
(0u64..16).map(|i| (Sha256::hash(&i.to_be_bytes()), Sha256::fill(i as u8))),
None,
)
.await;
let mut round = 0u64;
loop {
round += 1;
assert!(
round <= 64,
"failed to prune enough history for rewind test"
);
commit_sets(
&mut db,
(0u64..16).map(|i| {
let seed = round * 100 + i;
(Sha256::hash(&seed.to_be_bytes()), Sha256::fill(seed as u8))
}),
None,
)
.await;
db.prune(db.last_commit_loc).await.unwrap();
if db.bounds().await.start > first_range.start {
break;
}
}
let oldest_retained = db.bounds().await.start;
let boundary_err = db.rewind(oldest_retained).await.unwrap_err();
assert!(
matches!(
boundary_err,
Error::Journal(crate::journal::Error::ItemPruned(_))
),
"unexpected rewind error at retained boundary: {boundary_err:?}"
);
let err = db.rewind(first_range.start).await.unwrap_err();
assert!(
matches!(err, Error::Journal(crate::journal::Error::ItemPruned(_))),
"unexpected rewind error: {err:?}"
);
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_batch_get_read_through<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("db")).await;
let key_a = Sha256::hash(&0u64.to_be_bytes());
let val_a = Sha256::fill(1u8);
db.apply_batch(db.new_batch().set(key_a, val_a).merkleize(&db, None))
.await
.unwrap();
let mut batch = db.new_batch();
assert_eq!(batch.get(&key_a, &db).await.unwrap(), Some(val_a));
let key_b = Sha256::hash(&1u64.to_be_bytes());
let val_b = Sha256::fill(2u8);
batch = batch.set(key_b, val_b);
assert_eq!(batch.get(&key_b, &db).await.unwrap(), Some(val_b));
let key_c = Sha256::hash(&2u64.to_be_bytes());
assert_eq!(batch.get(&key_c, &db).await.unwrap(), None);
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_batch_stacked_get<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let db = open_db(context.with_label("db")).await;
let key_a = Sha256::hash(&0u64.to_be_bytes());
let val_a = Sha256::fill(10u8);
let parent = db.new_batch().set(key_a, val_a);
let parent_m = parent.merkleize(&db, None);
let mut child = parent_m.new_batch::<Sha256>();
assert_eq!(child.get(&key_a, &db).await.unwrap(), Some(val_a));
let key_b = Sha256::hash(&1u64.to_be_bytes());
let val_b = Sha256::fill(20u8);
child = child.set(key_b, val_b);
assert_eq!(child.get(&key_b, &db).await.unwrap(), Some(val_b));
let key_c = Sha256::hash(&2u64.to_be_bytes());
assert_eq!(child.get(&key_c, &db).await.unwrap(), None);
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_batch_stacked_apply<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("db")).await;
let mut kvs_first: Vec<(Digest, Digest)> = (0u64..5)
.map(|i| (Sha256::hash(&i.to_be_bytes()), Sha256::fill(i as u8)))
.collect();
kvs_first.sort_by(|a, b| a.0.cmp(&b.0));
let mut kvs_second: Vec<(Digest, Digest)> = (5u64..10)
.map(|i| (Sha256::hash(&i.to_be_bytes()), Sha256::fill(i as u8)))
.collect();
kvs_second.sort_by(|a, b| a.0.cmp(&b.0));
let mut parent = db.new_batch();
for (k, v) in &kvs_first {
parent = parent.set(*k, *v);
}
let parent_m = parent.merkleize(&db, None);
let mut child = parent_m.new_batch::<Sha256>();
for (k, v) in &kvs_second {
child = child.set(*k, *v);
}
let child_m = child.merkleize(&db, None);
let expected_root = child_m.root();
db.apply_batch(child_m).await.unwrap();
assert_eq!(db.root(), expected_root);
for (k, v) in kvs_first.iter().chain(kvs_second.iter()) {
assert_eq!(db.get(k).await.unwrap(), Some(*v));
}
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_batch_speculative_root<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("db")).await;
let mut batch = db.new_batch();
for i in 0u8..10 {
let k = Sha256::hash(&[i]);
batch = batch.set(k, Sha256::fill(i));
}
let merkleized = batch.merkleize(&db, None);
let speculative = merkleized.root();
db.apply_batch(merkleized).await.unwrap();
assert_eq!(db.root(), speculative);
let metadata = Some(Sha256::fill(55u8));
let mut batch = db.new_batch();
let k = Sha256::hash(&[0xAA]);
batch = batch.set(k, Sha256::fill(0xAA));
let merkleized = batch.merkleize(&db, metadata);
let speculative = merkleized.root();
db.apply_batch(merkleized).await.unwrap();
assert_eq!(db.root(), speculative);
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_merkleized_batch_get<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("db")).await;
let key_a = Sha256::hash(&0u64.to_be_bytes());
let val_a = Sha256::fill(10u8);
db.apply_batch(db.new_batch().set(key_a, val_a).merkleize(&db, None))
.await
.unwrap();
let key_b = Sha256::hash(&1u64.to_be_bytes());
let val_b = Sha256::fill(20u8);
let merkleized = db.new_batch().set(key_b, val_b).merkleize(&db, None);
assert_eq!(merkleized.get(&key_a, &db).await.unwrap(), Some(val_a));
assert_eq!(merkleized.get(&key_b, &db).await.unwrap(), Some(val_b));
let key_c = Sha256::hash(&2u64.to_be_bytes());
assert_eq!(merkleized.get(&key_c, &db).await.unwrap(), None);
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_batch_sequential_apply<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("db")).await;
let key_a = Sha256::hash(&0u64.to_be_bytes());
let val_a = Sha256::fill(1u8);
let m = db.new_batch().set(key_a, val_a).merkleize(&db, None);
let root1 = m.root();
db.apply_batch(m).await.unwrap();
assert_eq!(db.root(), root1);
assert_eq!(db.get(&key_a).await.unwrap(), Some(val_a));
let key_b = Sha256::hash(&1u64.to_be_bytes());
let val_b = Sha256::fill(2u8);
let m = db.new_batch().set(key_b, val_b).merkleize(&db, None);
let root2 = m.root();
db.apply_batch(m).await.unwrap();
assert_eq!(db.root(), root2);
assert_eq!(db.get(&key_b).await.unwrap(), Some(val_b));
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_batch_many_sequential<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("db")).await;
let hasher = StandardHasher::<Sha256>::new();
const BATCHES: u64 = 20;
const KEYS_PER_BATCH: u64 = 5;
let mut all_kvs: Vec<(Digest, Digest)> = Vec::new();
for batch_idx in 0..BATCHES {
let mut batch = db.new_batch();
for j in 0..KEYS_PER_BATCH {
let seed = batch_idx * 100 + j;
let k = Sha256::hash(&seed.to_be_bytes());
let v = Sha256::fill(seed as u8);
batch = batch.set(k, v);
all_kvs.push((k, v));
}
let merkleized = batch.merkleize(&db, None);
db.apply_batch(merkleized).await.unwrap();
}
for (k, v) in &all_kvs {
assert_eq!(db.get(k).await.unwrap(), Some(*v));
}
let root = db.root();
let (proof, ops) = db.proof(Location::new(0), NZU64!(10000)).await.unwrap();
assert!(verify_proof(&hasher, &proof, Location::new(0), &ops, &root));
let expected = 1 + BATCHES * (KEYS_PER_BATCH + 1);
assert_eq!(db.bounds().await.end, expected);
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_batch_empty_batch<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("db")).await;
let k = Sha256::hash(&[1u8]);
db.apply_batch(
db.new_batch()
.set(k, Sha256::fill(1u8))
.merkleize(&db, None),
)
.await
.unwrap();
let root_before = db.root();
let size_before = db.bounds().await.end;
let merkleized = db.new_batch().merkleize(&db, None);
let speculative = merkleized.root();
db.apply_batch(merkleized).await.unwrap();
assert_ne!(db.root(), root_before);
assert_eq!(db.root(), speculative);
assert_eq!(db.bounds().await.end, size_before + 1);
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_batch_chained_merkleized_get<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("db")).await;
let key_a = Sha256::hash(&0u64.to_be_bytes());
let val_a = Sha256::fill(10u8);
db.apply_batch(db.new_batch().set(key_a, val_a).merkleize(&db, None))
.await
.unwrap();
let key_b = Sha256::hash(&1u64.to_be_bytes());
let val_b = Sha256::fill(1u8);
let parent_m = db.new_batch().set(key_b, val_b).merkleize(&db, None);
let key_c = Sha256::hash(&2u64.to_be_bytes());
let val_c = Sha256::fill(2u8);
let child_m = parent_m
.new_batch::<Sha256>()
.set(key_c, val_c)
.merkleize(&db, None);
assert_eq!(child_m.get(&key_a, &db).await.unwrap(), Some(val_a));
assert_eq!(child_m.get(&key_b, &db).await.unwrap(), Some(val_b));
assert_eq!(child_m.get(&key_c, &db).await.unwrap(), Some(val_c));
let key_d = Sha256::hash(&3u64.to_be_bytes());
assert_eq!(child_m.get(&key_d, &db).await.unwrap(), None);
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_batch_large<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("db")).await;
let hasher = StandardHasher::<Sha256>::new();
const N: u64 = 500;
let mut kvs: Vec<(Digest, Digest)> = Vec::new();
let mut batch = db.new_batch();
for i in 0..N {
let k = Sha256::hash(&i.to_be_bytes());
let v = Sha256::fill((i % 256) as u8);
batch = batch.set(k, v);
kvs.push((k, v));
}
let merkleized = batch.merkleize(&db, None);
db.apply_batch(merkleized).await.unwrap();
for (k, v) in &kvs {
assert_eq!(db.get(k).await.unwrap(), Some(*v));
}
let root = db.root();
let (proof, ops) = db.proof(Location::new(0), NZU64!(1000)).await.unwrap();
assert!(verify_proof(&hasher, &proof, Location::new(0), &ops, &root));
assert_eq!(db.bounds().await.end, 1 + N + 1);
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_batch_chained_key_override<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("db")).await;
let key = Sha256::hash(&0u64.to_be_bytes());
let val_parent = Sha256::fill(1u8);
let val_child = Sha256::fill(2u8);
let parent_m = db.new_batch().set(key, val_parent).merkleize(&db, None);
let mut child = parent_m.new_batch::<Sha256>();
child = child.set(key, val_child);
assert_eq!(child.get(&key, &db).await.unwrap(), Some(val_child));
let child_m = child.merkleize(&db, None);
assert_eq!(child_m.get(&key, &db).await.unwrap(), Some(val_child));
db.apply_batch(child_m).await.unwrap();
assert_eq!(db.get(&key).await.unwrap(), Some(val_child));
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_batch_sequential_key_override<F: Family, V, C>(
context: deterministic::Context,
open_db_small_sections: impl Fn(
deterministic::Context,
)
-> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db_small_sections(context.with_label("db")).await;
let key = Sha256::hash(&0u64.to_be_bytes());
let v1 = Sha256::fill(1u8);
let v2 = Sha256::fill(2u8);
db.apply_batch(db.new_batch().set(key, v1).merkleize(&db, None))
.await
.unwrap();
assert_eq!(db.get(&key).await.unwrap(), Some(v1));
db.apply_batch(db.new_batch().set(key, v2).merkleize(&db, None))
.await
.unwrap();
assert_eq!(db.get(&key).await.unwrap(), Some(v1));
db.prune(Location::new(2)).await.unwrap();
assert_eq!(db.get(&key).await.unwrap(), Some(v2));
db.sync().await.unwrap();
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_batch_metadata<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("db")).await;
let metadata = Sha256::fill(42u8);
let k = Sha256::hash(&[1u8]);
db.apply_batch(
db.new_batch()
.set(k, Sha256::fill(1u8))
.merkleize(&db, Some(metadata)),
)
.await
.unwrap();
assert_eq!(db.get_metadata().await.unwrap(), Some(metadata));
db.apply_batch(db.new_batch().merkleize(&db, None))
.await
.unwrap();
assert_eq!(db.get_metadata().await.unwrap(), None);
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_stale_batch_rejected<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("db")).await;
let key1 = Sha256::hash(&[1]);
let key2 = Sha256::hash(&[2]);
let v1 = Sha256::fill(10u8);
let v2 = Sha256::fill(20u8);
let batch_a = db.new_batch().set(key1, v1).merkleize(&db, None);
let batch_b = db.new_batch().set(key2, v2).merkleize(&db, None);
db.apply_batch(batch_a).await.unwrap();
let expected_root = db.root();
let expected_bounds = db.bounds().await;
assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
assert_eq!(db.get(&key2).await.unwrap(), None);
assert_eq!(db.get_metadata().await.unwrap(), None);
let result = db.apply_batch(batch_b).await;
assert!(
matches!(result, Err(Error::StaleBatch { .. })),
"expected StaleBatch error, got {result:?}"
);
assert_eq!(db.root(), expected_root);
assert_eq!(db.bounds().await, expected_bounds);
assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
assert_eq!(db.get(&key2).await.unwrap(), None);
assert_eq!(db.get_metadata().await.unwrap(), None);
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_stale_batch_chained<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("db")).await;
let key1 = Sha256::hash(&[1]);
let key2 = Sha256::hash(&[2]);
let key3 = Sha256::hash(&[3]);
let parent_m = db
.new_batch()
.set(key1, Sha256::fill(1u8))
.merkleize(&db, None);
let child_a = parent_m
.new_batch::<Sha256>()
.set(key2, Sha256::fill(2u8))
.merkleize(&db, None);
let child_b = parent_m
.new_batch::<Sha256>()
.set(key3, Sha256::fill(3u8))
.merkleize(&db, None);
db.apply_batch(child_a).await.unwrap();
let result = db.apply_batch(child_b).await;
assert!(
matches!(result, Err(Error::StaleBatch { .. })),
"expected StaleBatch error, got {result:?}"
);
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_partial_ancestor_commit<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("db")).await;
let key1 = Sha256::hash(&[1]);
let key2 = Sha256::hash(&[2]);
let key3 = Sha256::hash(&[3]);
let v1 = Sha256::fill(1u8);
let v2 = Sha256::fill(2u8);
let v3 = Sha256::fill(3u8);
let a = db.new_batch().set(key1, v1).merkleize(&db, None);
let b = a.new_batch::<Sha256>().set(key2, v2).merkleize(&db, None);
let c = b.new_batch::<Sha256>().set(key3, v3).merkleize(&db, None);
let expected_root = c.root();
db.apply_batch(a).await.unwrap();
db.apply_batch(c).await.unwrap();
assert_eq!(db.root(), expected_root);
assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
assert_eq!(db.get(&key2).await.unwrap(), Some(v2));
assert_eq!(db.get(&key3).await.unwrap(), Some(v3));
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_sequential_commit_parent_then_child<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("db")).await;
let key1 = Sha256::hash(&[1]);
let key2 = Sha256::hash(&[2]);
let v1 = Sha256::fill(1u8);
let v2 = Sha256::fill(2u8);
let parent_m = db.new_batch().set(key1, v1).merkleize(&db, None);
let child_m = parent_m
.new_batch::<Sha256>()
.set(key2, v2)
.merkleize(&db, None);
db.apply_batch(parent_m).await.unwrap();
db.apply_batch(child_m).await.unwrap();
assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
assert_eq!(db.get(&key2).await.unwrap(), Some(v2));
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_child_root_matches_pending_and_committed<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("db")).await;
let key1 = Sha256::hash(&[1]);
let key2 = Sha256::hash(&[2]);
let parent = db
.new_batch()
.set(key1, Sha256::fill(1u8))
.merkleize(&db, None);
let pending_child = parent
.new_batch::<Sha256>()
.set(key2, Sha256::fill(2u8))
.merkleize(&db, None);
db.apply_batch(parent).await.unwrap();
db.commit().await.unwrap();
let committed_child = db
.new_batch()
.set(key2, Sha256::fill(2u8))
.merkleize(&db, None);
assert_eq!(pending_child.root(), committed_child.root());
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_stale_batch_child_applied_before_parent<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("db")).await;
let key1 = Sha256::hash(&[1]);
let key2 = Sha256::hash(&[2]);
let parent_m = db
.new_batch()
.set(key1, Sha256::fill(1u8))
.merkleize(&db, None);
let child_m = parent_m
.new_batch::<Sha256>()
.set(key2, Sha256::fill(2u8))
.merkleize(&db, None);
db.apply_batch(child_m).await.unwrap();
let result = db.apply_batch(parent_m).await;
assert!(
matches!(result, Err(Error::StaleBatch { .. })),
"expected StaleBatch error, got {result:?}"
);
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_to_batch<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("db")).await;
let key1 = Sha256::hash(&[1]);
let v1 = Sha256::fill(10u8);
db.apply_batch(db.new_batch().set(key1, v1).merkleize(&db, None))
.await
.unwrap();
let snapshot = db.to_batch();
assert_eq!(snapshot.root(), db.root());
let key2 = Sha256::hash(&[2]);
let v2 = Sha256::fill(20u8);
let child = snapshot
.new_batch::<Sha256>()
.set(key2, v2)
.merkleize(&db, None);
db.apply_batch(child).await.unwrap();
assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
assert_eq!(db.get(&key2).await.unwrap(), Some(v2));
db.destroy().await.unwrap();
}
pub(crate) async fn test_immutable_apply_after_ancestor_dropped<F: Family, V, C>(
context: deterministic::Context,
open_db: impl Fn(
deterministic::Context,
) -> Pin<Box<dyn Future<Output = TestDb<F, V, C>> + Send>>,
) where
V: ValueEncoding<Value = Digest>,
C: Mutable<Item = Operation<Digest, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
{
let mut db = open_db(context.with_label("db")).await;
let key1 = Sha256::hash(&[1]);
let key2 = Sha256::hash(&[2]);
let key3 = Sha256::hash(&[3]);
let v1 = Sha256::fill(1u8);
let v2 = Sha256::fill(2u8);
let v3 = Sha256::fill(3u8);
let a = db.new_batch().set(key1, v1).merkleize(&db, None);
let b = a.new_batch::<Sha256>().set(key2, v2).merkleize(&db, None);
let c = b.new_batch::<Sha256>().set(key3, v3).merkleize(&db, None);
drop(a);
drop(b);
db.apply_batch(c).await.unwrap();
assert_eq!(db.get(&key1).await.unwrap(), Some(v1));
assert_eq!(db.get(&key2).await.unwrap(), Some(v2));
assert_eq!(db.get(&key3).await.unwrap(), Some(v3));
db.destroy().await.unwrap();
}
}