use super::{operation::Operation, Keyless};
use crate::{
journal::{authenticated, contiguous::Mutable, Error as JournalError},
merkle::{Family, Location},
qmdb::{any::value::ValueEncoding, Error},
Context, Persistable,
};
use commonware_codec::EncodeShared;
use commonware_cryptography::{Digest, Hasher};
use core::iter;
use std::sync::{Arc, Weak};
pub struct UnmerkleizedBatch<F, H, V>
where
F: Family,
V: ValueEncoding,
H: Hasher,
Operation<V>: EncodeShared,
{
journal_batch: authenticated::UnmerkleizedBatch<F, H, Operation<V>>,
appends: Vec<V::Value>,
parent: Option<Arc<MerkleizedBatch<F, H::Digest, V>>>,
base_size: u64,
db_size: u64,
}
#[derive(Clone)]
pub struct MerkleizedBatch<F: Family, D: Digest, V: ValueEncoding>
where
Operation<V>: EncodeShared,
{
pub(super) journal_batch: Arc<authenticated::MerkleizedBatch<F, D, Operation<V>>>,
pub(super) parent: Option<Weak<Self>>,
pub(super) base_size: u64,
pub(super) total_size: u64,
pub(super) db_size: u64,
pub(super) ancestor_batch_ends: Vec<u64>,
}
impl<F: Family, D: Digest, V: ValueEncoding> MerkleizedBatch<F, D, V>
where
Operation<V>: EncodeShared,
{
pub(super) fn ancestors(&self) -> impl Iterator<Item = Arc<Self>> {
let mut next = self.parent.as_ref().and_then(Weak::upgrade);
iter::from_fn(move || {
let batch = next.take()?;
next = batch.parent.as_ref().and_then(Weak::upgrade);
Some(batch)
})
}
}
fn read_chain_op<F: Family, D: Digest, V: ValueEncoding>(
batch: &MerkleizedBatch<F, D, V>,
loc: u64,
) -> Option<Operation<V>>
where
Operation<V>: EncodeShared,
{
let self_end = batch.journal_batch.size();
let self_base = self_end - batch.journal_batch.items().len() as u64;
if loc >= self_base && loc < self_end {
return Some(batch.journal_batch.items()[(loc - self_base) as usize].clone());
}
for ancestor in batch.ancestors() {
let end = ancestor.journal_batch.size();
let base = end - ancestor.journal_batch.items().len() as u64;
if loc >= base && loc < end {
return Some(ancestor.journal_batch.items()[(loc - base) as usize].clone());
}
}
None
}
impl<F, H, V> UnmerkleizedBatch<F, H, V>
where
F: Family,
V: ValueEncoding,
H: Hasher,
Operation<V>: EncodeShared,
{
pub(super) fn new<E, C>(keyless: &Keyless<F, E, V, C, H>, journal_size: u64) -> Self
where
E: Context,
C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
{
Self {
journal_batch: keyless.journal.new_batch(),
appends: Vec::new(),
parent: None,
base_size: journal_size,
db_size: journal_size,
}
}
pub const fn size(&self) -> Location<F> {
Location::new(self.base_size + self.appends.len() as u64)
}
pub fn append(mut self, value: V::Value) -> Self {
self.appends.push(value);
self
}
pub async fn get<E, C>(
&self,
loc: Location<F>,
db: &Keyless<F, E, V, C, H>,
) -> Result<Option<V::Value>, Error<F>>
where
E: Context,
C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
{
let loc_val = *loc;
if loc_val >= self.base_size {
let idx = (loc_val - self.base_size) as usize;
return if idx < self.appends.len() {
Ok(Some(self.appends[idx].clone()))
} else {
Ok(None)
};
}
if let Some(parent) = self.parent.as_ref() {
if loc_val >= self.db_size {
if let Some(op) = read_chain_op(parent, loc_val) {
return Ok(op.into_value());
}
}
}
db.get(loc).await
}
pub fn merkleize<E, C>(
self,
db: &Keyless<F, E, V, C, H>,
metadata: Option<V::Value>,
) -> Arc<MerkleizedBatch<F, H::Digest, V>>
where
E: Context,
C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
{
let base = self.base_size;
let mut ops: Vec<Operation<V>> = Vec::with_capacity(self.appends.len() + 1);
for value in self.appends {
ops.push(Operation::Append(value));
}
ops.push(Operation::Commit(metadata));
let total_size = base + ops.len() as u64;
let mut journal_batch = self.journal_batch;
for op in &ops {
journal_batch = journal_batch.add(op.clone());
}
let journal = db.journal.with_mem(|mem| journal_batch.merkleize(mem));
let mut ancestor_batch_ends = Vec::new();
if let Some(parent) = &self.parent {
ancestor_batch_ends.push(parent.total_size);
for batch in parent.ancestors() {
ancestor_batch_ends.push(batch.total_size);
}
}
Arc::new(MerkleizedBatch {
journal_batch: journal,
parent: self.parent.as_ref().map(Arc::downgrade),
base_size: self.base_size,
total_size,
db_size: self.db_size,
ancestor_batch_ends,
})
}
}
impl<F: Family, D: Digest, V: ValueEncoding> MerkleizedBatch<F, D, V>
where
Operation<V>: EncodeShared,
{
pub fn root(&self) -> D {
self.journal_batch.root()
}
pub async fn get<E, H, C>(
&self,
loc: Location<F>,
db: &Keyless<F, E, V, C, H>,
) -> Result<Option<V::Value>, Error<F>>
where
E: Context,
H: Hasher<Digest = D>,
C: Mutable<Item = Operation<V>> + Persistable<Error = JournalError>,
{
let loc_val = *loc;
if loc_val >= self.db_size {
if let Some(op) = read_chain_op(self, loc_val) {
return Ok(op.into_value());
}
}
db.get(loc).await
}
pub fn new_batch<H>(self: &Arc<Self>) -> UnmerkleizedBatch<F, H, V>
where
H: Hasher<Digest = D>,
{
UnmerkleizedBatch {
journal_batch: self.journal_batch.new_batch::<H>(),
appends: Vec::new(),
parent: Some(Arc::clone(self)),
base_size: self.total_size,
db_size: self.db_size,
}
}
}