use super::Immutable;
use crate::{
journal::{authenticated, contiguous::Mutable, Error as JournalError},
merkle::{Family, Location},
qmdb::{any::ValueEncoding, immutable::operation::Operation, operation::Key, Error},
translator::Translator,
Context, Persistable,
};
use commonware_codec::EncodeShared;
use commonware_cryptography::{Digest, Hasher as CHasher};
use core::iter;
use std::{
collections::BTreeMap,
sync::{Arc, Weak},
};
#[derive(Clone)]
pub(crate) struct DiffEntry<F: Family, V> {
pub(crate) value: V,
pub(crate) loc: Location<F>,
}
#[allow(clippy::type_complexity)]
pub struct UnmerkleizedBatch<F, H, K, V>
where
F: Family,
K: Key,
V: ValueEncoding,
H: CHasher,
{
journal_batch: authenticated::UnmerkleizedBatch<F, H, Operation<K, V>>,
mutations: BTreeMap<K, V::Value>,
parent: Option<Arc<MerkleizedBatch<F, H::Digest, K, V>>>,
base_size: u64,
db_size: u64,
}
#[derive(Clone)]
pub struct MerkleizedBatch<F: Family, D: Digest, K: Key, V: ValueEncoding> {
pub(super) journal_batch: Arc<authenticated::MerkleizedBatch<F, D, Operation<K, V>>>,
pub(super) diff: Arc<BTreeMap<K, DiffEntry<F, V::Value>>>,
pub(super) parent: Option<Weak<Self>>,
pub(super) base_size: u64,
pub(super) total_size: u64,
pub(super) db_size: u64,
#[allow(clippy::type_complexity)]
pub(super) ancestor_diffs: Vec<Arc<BTreeMap<K, DiffEntry<F, V::Value>>>>,
pub(super) ancestor_diff_ends: Vec<u64>,
}
impl<F, H, K, V> UnmerkleizedBatch<F, H, K, V>
where
F: Family,
K: Key,
V: ValueEncoding,
H: CHasher,
Operation<K, V>: EncodeShared,
{
pub(super) fn new<E, C, T>(
immutable: &Immutable<F, E, K, V, C, H, T>,
journal_size: u64,
) -> Self
where
E: Context,
C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
T: Translator,
{
Self {
journal_batch: immutable.journal.new_batch(),
mutations: BTreeMap::new(),
parent: None,
base_size: journal_size,
db_size: journal_size,
}
}
pub fn set(mut self, key: K, value: V::Value) -> Self {
self.mutations.insert(key, value);
self
}
pub async fn get<E, C, T>(
&self,
key: &K,
db: &Immutable<F, E, K, V, C, H, T>,
) -> Result<Option<V::Value>, Error<F>>
where
E: Context,
C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
T: Translator,
{
if let Some(value) = self.mutations.get(key) {
return Ok(Some(value.clone()));
}
if let Some(parent) = self.parent.as_ref() {
if let Some(entry) = parent.diff.get(key) {
return Ok(Some(entry.value.clone()));
}
for batch in parent.ancestors() {
if let Some(entry) = batch.diff.get(key) {
return Ok(Some(entry.value.clone()));
}
}
}
db.get(key).await
}
pub fn merkleize<E, C, T>(
self,
db: &Immutable<F, E, K, V, C, H, T>,
metadata: Option<V::Value>,
) -> Arc<MerkleizedBatch<F, H::Digest, K, V>>
where
E: Context,
C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
T: Translator,
{
let base = self.base_size;
let mut ops: Vec<Operation<K, V>> = Vec::with_capacity(self.mutations.len() + 1);
let mut diff: BTreeMap<K, DiffEntry<F, V::Value>> = BTreeMap::new();
for (key, value) in self.mutations {
let loc = Location::new(base + ops.len() as u64);
ops.push(Operation::Set(key.clone(), value.clone()));
diff.insert(key, DiffEntry { value, loc });
}
ops.push(Operation::Commit(metadata));
let total_size = base + ops.len() as u64;
let mut journal_batch = self.journal_batch;
for op in &ops {
journal_batch = journal_batch.add(op.clone());
}
let journal_merkleized = db.journal.with_mem(|mem| journal_batch.merkleize(mem));
let mut ancestor_diffs = Vec::new();
let mut ancestor_diff_ends = Vec::new();
if let Some(parent) = &self.parent {
ancestor_diffs.push(Arc::clone(&parent.diff));
ancestor_diff_ends.push(parent.total_size);
for batch in parent.ancestors() {
ancestor_diffs.push(Arc::clone(&batch.diff));
ancestor_diff_ends.push(batch.total_size);
}
}
Arc::new(MerkleizedBatch {
journal_batch: journal_merkleized,
diff: Arc::new(diff),
parent: self.parent.as_ref().map(Arc::downgrade),
base_size: self.base_size,
total_size,
db_size: self.db_size,
ancestor_diffs,
ancestor_diff_ends,
})
}
}
impl<F: Family, D: Digest, K: Key, V: ValueEncoding> MerkleizedBatch<F, D, K, V>
where
Operation<K, V>: EncodeShared,
{
pub fn root(&self) -> D {
self.journal_batch.root()
}
pub(super) fn ancestors(&self) -> impl Iterator<Item = Arc<Self>> {
let mut next = self.parent.as_ref().and_then(Weak::upgrade);
iter::from_fn(move || {
let batch = next.take()?;
next = batch.parent.as_ref().and_then(Weak::upgrade);
Some(batch)
})
}
pub async fn get<E, C, H, T>(
&self,
key: &K,
db: &Immutable<F, E, K, V, C, H, T>,
) -> Result<Option<V::Value>, Error<F>>
where
E: Context,
C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
H: CHasher<Digest = D>,
T: Translator,
{
if let Some(entry) = self.diff.get(key) {
return Ok(Some(entry.value.clone()));
}
for batch in self.ancestors() {
if let Some(entry) = batch.diff.get(key) {
return Ok(Some(entry.value.clone()));
}
}
db.get(key).await
}
pub fn new_batch<H>(self: &Arc<Self>) -> UnmerkleizedBatch<F, H, K, V>
where
H: CHasher<Digest = D>,
{
UnmerkleizedBatch {
journal_batch: self.journal_batch.new_batch::<H>(),
mutations: BTreeMap::new(),
parent: Some(Arc::clone(self)),
base_size: self.total_size,
db_size: self.db_size,
}
}
}
impl<F, E, K, V, C, H, T> Immutable<F, E, K, V, C, H, T>
where
F: Family,
E: Context,
K: Key,
V: ValueEncoding,
C: Mutable<Item = Operation<K, V>> + Persistable<Error = JournalError>,
C::Item: EncodeShared,
H: CHasher,
T: Translator,
{
pub fn to_batch(&self) -> Arc<MerkleizedBatch<F, H::Digest, K, V>> {
let journal_size = *self.last_commit_loc + 1;
Arc::new(MerkleizedBatch {
journal_batch: self.journal.to_merkleized_batch(),
diff: Arc::new(BTreeMap::new()),
parent: None,
base_size: journal_size,
total_size: journal_size,
db_size: journal_size,
ancestor_diffs: Vec::new(),
ancestor_diff_ends: Vec::new(),
})
}
}