use super::operation::{update::Update, Operation};
use crate::{
index::Unordered as UnorderedIndex,
journal::{
authenticated,
contiguous::{Contiguous, Mutable, Reader},
Error as JournalError,
},
merkle::{Family, Location, Proof},
qmdb::{
build_snapshot_from_log, delete_known_loc, operation::Operation as OperationTrait,
update_known_loc, Error,
},
Context, Persistable,
};
use commonware_codec::{Codec, CodecShared};
use commonware_cryptography::Hasher;
use core::num::NonZeroU64;
use std::collections::HashMap;
pub(crate) type AuthenticatedLog<F, E, C, H> = authenticated::Journal<F, E, C, H>;
enum SnapshotUndo<F: Family, K> {
Replace {
key: K,
old_loc: Location<F>,
new_loc: Location<F>,
},
Remove {
key: K,
old_loc: Location<F>,
},
Insert {
key: K,
new_loc: Location<F>,
},
}
pub struct Db<
F: Family,
E: Context,
C: Contiguous<Item: CodecShared>,
I: UnorderedIndex<Value = Location<F>>,
H: Hasher,
U: Send + Sync,
> {
pub(crate) log: AuthenticatedLog<F, E, C, H>,
pub(crate) inactivity_floor_loc: Location<F>,
pub(crate) last_commit_loc: Location<F>,
pub(crate) snapshot: I,
pub(crate) active_keys: usize,
pub(crate) _update: core::marker::PhantomData<U>,
}
impl<F, E, U, C, I, H> Db<F, E, C, I, H, U>
where
F: Family,
E: Context,
U: Update,
C: Contiguous<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>>,
H: Hasher,
Operation<F, U>: Codec,
{
pub const fn inactivity_floor_loc(&self) -> Location<F> {
self.inactivity_floor_loc
}
pub const fn is_empty(&self) -> bool {
self.active_keys == 0
}
pub async fn get_metadata(&self) -> Result<Option<U::Value>, crate::qmdb::Error<F>> {
match self.log.reader().await.read(*self.last_commit_loc).await? {
Operation::CommitFloor(metadata, _) => Ok(metadata),
_ => unreachable!("last commit is not a CommitFloor operation"),
}
}
pub fn root(&self) -> H::Digest {
self.log.root()
}
pub async fn get(&self, key: &U::Key) -> Result<Option<U::Value>, crate::qmdb::Error<F>> {
let locs: Vec<Location<F>> = self.snapshot.get(key).copied().collect();
let reader = self.log.reader().await;
for loc in locs {
let op = reader.read(*loc).await?;
let Operation::Update(data) = op else {
panic!("location does not reference update operation. loc={loc}");
};
if data.key() == key {
return Ok(Some(data.value().clone()));
}
}
Ok(None)
}
pub async fn bounds(&self) -> std::ops::Range<Location<F>> {
let bounds = self.log.reader().await.bounds();
Location::new(bounds.start)..Location::new(bounds.end)
}
pub async fn pinned_nodes_at(
&self,
loc: Location<F>,
) -> Result<Vec<H::Digest>, crate::qmdb::Error<F>> {
if !loc.is_valid() {
return Err(crate::merkle::Error::LocationOverflow(loc).into());
}
let futs: Vec<_> = F::nodes_to_pin(loc)
.map(|p| async move {
self.log
.merkle
.get_node(p)
.await?
.ok_or(crate::merkle::Error::ElementPruned(p).into())
})
.collect();
futures::future::try_join_all(futs).await
}
}
impl<F, E, U, C, I, H> Db<F, E, C, I, H, U>
where
F: Family,
E: Context,
U: Update,
C: Mutable<Item = Operation<F, U>>,
I: UnorderedIndex<Value = Location<F>>,
H: Hasher,
Operation<F, U>: Codec,
{
pub async fn prune(&mut self, prune_loc: Location<F>) -> Result<(), crate::qmdb::Error<F>> {
if prune_loc > self.inactivity_floor_loc {
return Err(crate::qmdb::Error::PruneBeyondMinRequired(
prune_loc,
self.inactivity_floor_loc,
));
}
self.log.prune(prune_loc).await?;
Ok(())
}
pub async fn historical_proof(
&self,
historical_size: Location<F>,
start_loc: Location<F>,
max_ops: NonZeroU64,
) -> Result<(Proof<F, H::Digest>, Vec<Operation<F, U>>), crate::qmdb::Error<F>> {
self.log
.historical_proof(historical_size, start_loc, max_ops)
.await
.map_err(Into::into)
}
pub async fn proof(
&self,
loc: Location<F>,
max_ops: NonZeroU64,
) -> Result<(Proof<F, H::Digest>, Vec<Operation<F, U>>), crate::qmdb::Error<F>> {
self.historical_proof(self.log.size().await, loc, max_ops)
.await
}
pub async fn rewind(&mut self, size: Location<F>) -> Result<Vec<Location<F>>, Error<F>> {
let rewind_size = *size;
let current_size = *self.last_commit_loc + 1;
if rewind_size == current_size {
return Ok(Vec::new());
}
if rewind_size == 0 || rewind_size > current_size {
return Err(Error::Journal(JournalError::InvalidRewind(rewind_size)));
}
let (rewind_floor, undos, active_keys_delta) = {
let reader = self.log.reader().await;
let bounds = reader.bounds();
let rewind_last_loc = Location::new(rewind_size - 1);
if rewind_size <= bounds.start {
return Err(Error::<F>::Journal(JournalError::ItemPruned(
*rewind_last_loc,
)));
}
let rewind_last_op = reader.read(*rewind_last_loc).await?;
let Some(rewind_floor) = rewind_last_op.has_floor() else {
return Err(Error::UnexpectedData(rewind_last_loc));
};
if *rewind_floor < bounds.start {
return Err(Error::<F>::Journal(JournalError::ItemPruned(*rewind_floor)));
}
let mut undos = Vec::with_capacity((current_size - rewind_size) as usize);
let mut active_keys_delta = 0isize;
let mut prior_state_by_key: HashMap<U::Key, Option<Location<F>>> = HashMap::new();
for loc in *rewind_floor..current_size {
let op = reader.read(loc).await?;
let op_loc = Location::new(loc);
match op {
Operation::CommitFloor(_, _) => {}
Operation::Update(update) => {
let key = update.key().clone();
let previous_loc = prior_state_by_key.get(&key).copied().flatten();
if loc >= rewind_size {
if let Some(previous_loc) = previous_loc {
undos.push(SnapshotUndo::Replace {
key: key.clone(),
old_loc: op_loc,
new_loc: previous_loc,
});
} else {
active_keys_delta -= 1;
undos.push(SnapshotUndo::Remove {
key: key.clone(),
old_loc: op_loc,
});
}
}
prior_state_by_key.insert(key, Some(op_loc));
}
Operation::Delete(key) => {
let previous_loc = prior_state_by_key.get(&key).copied().flatten();
if loc >= rewind_size {
if let Some(previous_loc) = previous_loc {
active_keys_delta += 1;
undos.push(SnapshotUndo::Insert {
key: key.clone(),
new_loc: previous_loc,
});
}
}
prior_state_by_key.insert(key, None);
}
}
}
undos.reverse();
(rewind_floor, undos, active_keys_delta)
};
self.log.rewind(rewind_size).await?;
let mut restored_locs = Vec::new();
for undo in undos {
match undo {
SnapshotUndo::Replace {
key,
old_loc,
new_loc,
} => {
if new_loc < rewind_size {
restored_locs.push(new_loc);
}
update_known_loc(&mut self.snapshot, &key, old_loc, new_loc);
}
SnapshotUndo::Remove { key, old_loc } => {
delete_known_loc(&mut self.snapshot, &key, old_loc)
}
SnapshotUndo::Insert { key, new_loc } => {
if new_loc < rewind_size {
restored_locs.push(new_loc);
}
self.snapshot.insert(&key, new_loc);
}
}
}
self.active_keys = self
.active_keys
.checked_add_signed(active_keys_delta)
.ok_or(Error::DataCorrupted(
"active_keys underflow while rewinding",
))?;
self.last_commit_loc = Location::new(rewind_size - 1);
self.inactivity_floor_loc = rewind_floor;
Ok(restored_locs)
}
}
impl<F, E, U, C, I, H> Db<F, E, C, I, H, U>
where
F: Family,
E: Context,
U: Update,
C: Mutable<Item = Operation<F, U>> + Persistable<Error = JournalError>,
I: UnorderedIndex<Value = Location<F>>,
H: Hasher,
Operation<F, U>: Codec,
{
pub async fn init_from_log<Cb>(
mut index: I,
log: AuthenticatedLog<F, E, C, H>,
known_inactivity_floor: Option<Location<F>>,
mut callback: Cb,
) -> Result<Self, crate::qmdb::Error<F>>
where
Cb: FnMut(bool, Option<Location<F>>),
{
let (last_commit_loc, inactivity_floor_loc, active_keys) = {
let reader = log.reader().await;
let last_commit_loc = reader
.bounds()
.end
.checked_sub(1)
.expect("commit should exist");
let last_commit = reader.read(last_commit_loc).await?;
let inactivity_floor_loc = last_commit.has_floor().expect("should be a commit");
if let Some(known_inactivity_floor) = known_inactivity_floor {
(*known_inactivity_floor..*inactivity_floor_loc)
.for_each(|_| callback(false, None));
}
let active_keys =
build_snapshot_from_log(inactivity_floor_loc, &reader, &mut index, callback)
.await?;
(
Location::new(last_commit_loc),
inactivity_floor_loc,
active_keys,
)
};
Ok(Self {
log,
inactivity_floor_loc,
snapshot: index,
last_commit_loc,
active_keys,
_update: core::marker::PhantomData,
})
}
pub async fn sync(&self) -> Result<(), crate::qmdb::Error<F>> {
self.log.sync().await.map_err(Into::into)
}
pub async fn commit(&self) -> Result<(), crate::qmdb::Error<F>> {
self.log.commit().await.map_err(Into::into)
}
pub async fn destroy(self) -> Result<(), crate::qmdb::Error<F>> {
self.log.destroy().await.map_err(Into::into)
}
}
impl<F, E, U, C, I, H> Persistable for Db<F, E, C, I, H, U>
where
F: Family,
E: Context,
U: Update,
C: Mutable<Item = Operation<F, U>> + Persistable<Error = JournalError>,
I: UnorderedIndex<Value = Location<F>>,
H: Hasher,
Operation<F, U>: Codec,
{
type Error = crate::qmdb::Error<F>;
async fn commit(&self) -> Result<(), crate::qmdb::Error<F>> {
Self::commit(self).await
}
async fn sync(&self) -> Result<(), crate::qmdb::Error<F>> {
Self::sync(self).await
}
async fn destroy(self) -> Result<(), crate::qmdb::Error<F>> {
self.destroy().await
}
}