use crate::{
index::{Cursor, Unordered as Index},
journal::contiguous::{Mutable, Reader},
merkle::{Family, Location},
qmdb::operation::Operation,
};
use commonware_utils::NZUsize;
use core::num::NonZeroUsize;
use futures::{pin_mut, StreamExt as _};
use thiserror::Error;
pub mod any;
pub mod current;
pub mod immutable;
pub mod keyless;
pub mod operation;
pub mod store;
pub mod sync;
pub mod verify;
pub use verify::{
create_multi_proof, create_proof_store, verify_multi_proof, verify_proof,
verify_proof_and_extract_digests, verify_proof_and_pinned_nodes,
};
#[derive(Error, Debug)]
pub enum Error<F: Family> {
#[error("data corrupted: {0}")]
DataCorrupted(&'static str),
#[error("merkle error: {0}")]
Merkle(#[from] crate::merkle::Error<F>),
#[error("metadata error: {0}")]
Metadata(#[from] crate::metadata::Error),
#[error("journal error: {0}")]
Journal(#[from] crate::journal::Error),
#[error("runtime error: {0}")]
Runtime(#[from] commonware_runtime::Error),
#[error("operation pruned: {0}")]
OperationPruned(Location<F>),
#[error("key not found")]
KeyNotFound,
#[error("key exists")]
KeyExists,
#[error("unexpected data at location: {0}")]
UnexpectedData(Location<F>),
#[error("location out of bounds: {0} >= {1}")]
LocationOutOfBounds(Location<F>, Location<F>),
#[error("prune location {0} beyond minimum required location {1}")]
PruneBeyondMinRequired(Location<F>, Location<F>),
#[error(
"stale batch: db has {db_size} ops, batch requires {batch_db_size} or {batch_base_size}"
)]
StaleBatch {
db_size: u64,
batch_db_size: u64,
batch_base_size: u64,
},
}
impl<F: Family> From<crate::journal::authenticated::Error<F>> for Error<F> {
fn from(e: crate::journal::authenticated::Error<F>) -> Self {
match e {
crate::journal::authenticated::Error::Journal(j) => Self::Journal(j),
crate::journal::authenticated::Error::Merkle(m) => Self::Merkle(m),
}
}
}
const SNAPSHOT_READ_BUFFER_SIZE: NonZeroUsize = NZUsize!(1 << 16);
pub(super) async fn build_snapshot_from_log<F, C, I, Fn>(
inactivity_floor_loc: crate::merkle::Location<F>,
reader: &C,
snapshot: &mut I,
mut callback: Fn,
) -> Result<usize, Error<F>>
where
F: crate::merkle::Family,
C: Reader<Item: Operation<F>>,
I: Index<Value = crate::merkle::Location<F>>,
Fn: FnMut(bool, Option<crate::merkle::Location<F>>),
{
let bounds = reader.bounds();
let stream = reader
.replay(SNAPSHOT_READ_BUFFER_SIZE, *inactivity_floor_loc)
.await?;
pin_mut!(stream);
let last_commit_loc = bounds.end.saturating_sub(1);
let mut active_keys: usize = 0;
while let Some(result) = stream.next().await {
let (loc, op) = result?;
if let Some(key) = op.key() {
if op.is_delete() {
let old_loc = delete_key(snapshot, reader, key).await?;
callback(false, old_loc);
if old_loc.is_some() {
active_keys -= 1;
}
} else if op.is_update() {
let new_loc = crate::merkle::Location::new(loc);
let old_loc = update_key(snapshot, reader, key, new_loc).await?;
callback(true, old_loc);
if old_loc.is_none() {
active_keys += 1;
}
}
} else if op.has_floor().is_some() {
callback(loc == last_commit_loc, None);
}
}
Ok(active_keys)
}
async fn delete_key<F, I, R>(
snapshot: &mut I,
reader: &R,
key: &<R::Item as Operation<F>>::Key,
) -> Result<Option<Location<F>>, Error<F>>
where
F: Family,
I: Index<Value = Location<F>>,
R: Reader,
R::Item: Operation<F>,
{
let Some(mut cursor) = snapshot.get_mut(key) else {
return Ok(None);
};
let Some(loc) = find_update_op::<F, _>(reader, &mut cursor, key).await? else {
return Ok(None);
};
cursor.delete();
Ok(Some(loc))
}
async fn update_key<F, I, R>(
snapshot: &mut I,
reader: &R,
key: &<R::Item as Operation<F>>::Key,
new_loc: Location<F>,
) -> Result<Option<Location<F>>, Error<F>>
where
F: Family,
I: Index<Value = Location<F>>,
R: Reader,
R::Item: Operation<F>,
{
let Some(mut cursor) = snapshot.get_mut_or_insert(key, new_loc) else {
return Ok(None);
};
if let Some(loc) = find_update_op::<F, _>(reader, &mut cursor, key).await? {
assert!(new_loc > loc);
cursor.update(new_loc);
return Ok(Some(loc));
}
cursor.insert(new_loc);
Ok(None)
}
async fn find_update_op<F, R>(
reader: &R,
cursor: &mut impl Cursor<Value = Location<F>>,
key: &<R::Item as Operation<F>>::Key,
) -> Result<Option<Location<F>>, Error<F>>
where
F: Family,
R: Reader,
R::Item: Operation<F>,
{
while let Some(&loc) = cursor.next() {
let op = reader.read(*loc).await?;
let k = op.key().expect("operation without key");
if *k == *key {
return Ok(Some(loc));
}
}
Ok(None)
}
fn update_known_loc<F: Family, I: Index<Value = Location<F>>>(
snapshot: &mut I,
key: &[u8],
old_loc: Location<F>,
new_loc: Location<F>,
) {
let mut cursor = snapshot.get_mut(key).expect("key should be known to exist");
assert!(
cursor.find(|&loc| *loc == old_loc),
"known key with given old_loc should have been found"
);
cursor.update(new_loc);
}
fn delete_known_loc<F: Family, I: Index<Value = Location<F>>>(
snapshot: &mut I,
key: &[u8],
old_loc: Location<F>,
) {
let mut cursor = snapshot.get_mut(key).expect("key should be known to exist");
assert!(
cursor.find(|&loc| *loc == old_loc),
"known key with given old_loc should have been found"
);
cursor.delete();
}
pub(crate) struct FloorHelper<
'a,
F: Family,
I: Index<Value = Location<F>>,
C: Mutable<Item: Operation<F>>,
> {
pub snapshot: &'a mut I,
pub log: &'a mut C,
}
impl<F, I, C> FloorHelper<'_, F, I, C>
where
F: Family,
I: Index<Value = Location<F>>,
C: Mutable<Item: Operation<F>>,
{
async fn move_op_if_active(
&mut self,
op: C::Item,
old_loc: Location<F>,
) -> Result<bool, Error<F>> {
let Some(key) = op.key() else {
return Ok(false); };
{
let Some(mut cursor) = self.snapshot.get_mut(key) else {
return Ok(false);
};
if !cursor.find(|&loc| loc == old_loc) {
return Ok(false);
}
cursor.update(Location::<F>::new(self.log.size().await));
}
self.log.append(&op).await?;
Ok(true)
}
async fn raise_floor(
&mut self,
mut inactivity_floor_loc: Location<F>,
) -> Result<Location<F>, Error<F>> {
let tip_loc: Location<F> = Location::new(self.log.size().await);
loop {
assert!(
*inactivity_floor_loc < tip_loc,
"no active operations above the inactivity floor"
);
let old_loc = inactivity_floor_loc;
inactivity_floor_loc += 1;
let op = {
let reader = self.log.reader().await;
reader.read(*old_loc).await?
};
if self.move_op_if_active(op, old_loc).await? {
return Ok(inactivity_floor_loc);
}
}
}
}