use crate::{
index::Factory as IndexFactory,
journal::{
authenticated,
contiguous::{fixed, variable, Mutable, Reader as _},
},
merkle::{
full::{self, Merkle},
Graftable, Location,
},
qmdb::{
self,
any::{
db::{Db as AnyDb, Metrics as AnyMetrics},
operation::{update::Update, Operation},
ordered::{
fixed::{Operation as OrderedFixedOp, Update as OrderedFixedUpdate},
variable::{Operation as OrderedVariableOp, Update as OrderedVariableUpdate},
},
unordered::{
fixed::{Operation as UnorderedFixedOp, Update as UnorderedFixedUpdate},
variable::{Operation as UnorderedVariableOp, Update as UnorderedVariableUpdate},
},
FixedValue, VariableValue,
},
bitmap::Shared,
current::{
db, grafting,
ordered::{
fixed::Db as CurrentOrderedFixedDb, variable::Db as CurrentOrderedVariableDb,
},
unordered::{
fixed::Db as CurrentUnorderedFixedDb, variable::Db as CurrentUnorderedVariableDb,
},
FixedConfig, VariableConfig,
},
operation::{Committable, Key, Operation as _},
sync::{resolver::fetch_operations, Database, DatabaseConfig as Config},
},
translator::Translator,
Context, Persistable,
};
use commonware_codec::{Codec, CodecShared, Read as CodecRead};
use commonware_cryptography::{DigestOf, Hasher};
use commonware_parallel::Strategy;
use commonware_utils::{
bitmap::Prunable as BitMap, channel::oneshot, range::NonEmptyRange, sync::AsyncMutex, Array,
};
use std::sync::Arc;
#[cfg(test)]
pub(crate) mod tests;
impl<T: Translator, J: Clone, S: Strategy> Config for super::Config<T, J, S> {
type JournalConfig = J;
fn journal_config(&self) -> Self::JournalConfig {
self.journal_config.clone()
}
}
#[allow(clippy::too_many_arguments)]
async fn build_db<F, E, U, I, H, J, T, const N: usize, S>(
context: E,
merkle_config: full::Config<S>,
log: J,
translator: T,
pinned_nodes: Option<Vec<H::Digest>>,
range: NonEmptyRange<Location<F>>,
apply_batch_size: usize,
metadata_partition: String,
strategy: S,
) -> Result<db::Db<F, E, J, I, H, U, N, S>, qmdb::Error<F>>
where
F: Graftable,
E: Context,
U: Update + Send + Sync + 'static,
I: IndexFactory<T, Value = Location<F>>,
H: Hasher,
T: Translator,
J: Mutable<Item = Operation<F, U>> + Persistable<Error = crate::journal::Error>,
S: Strategy,
Operation<F, U>: Codec + Committable + CodecShared,
{
let hasher = qmdb::hasher::<H>();
let merkle = Merkle::<F, _, _, S>::init_sync(
context.child("merkle"),
full::SyncConfig {
config: merkle_config,
range: range.clone(),
pinned_nodes,
},
)
.await?;
let index = I::new(context.child("index"), translator);
let log = authenticated::Journal::<F, _, _, _, S>::from_components(
merkle,
log,
hasher,
apply_batch_size as u64,
)
.await?;
let pruned_chunks = (*range.start() / BitMap::<N>::CHUNK_SIZE_BITS) as usize;
let bitmap = BitMap::<N>::new_with_pruned_chunks(pruned_chunks)
.map_err(|_| qmdb::Error::<F>::DataCorrupted("pruned chunks overflow"))?;
let bitmap = Arc::new(Shared::<N>::new(bitmap));
let any_metrics = AnyMetrics::new(context.child("any"));
let any: AnyDb<F, E, J, I, H, U, N, S> =
AnyDb::init_from_log(index, log, Some(bitmap), any_metrics).await?;
let grafted_pinned_nodes = {
let grafted_boundary = Location::<F>::new(pruned_chunks as u64);
let grafting_height = grafting::height::<N>();
let mut pins = Vec::new();
for grafted_pos in F::nodes_to_pin(grafted_boundary) {
let ops_pos = grafting::grafted_to_ops_pos::<F>(grafted_pos, grafting_height);
let digest = any
.log
.merkle
.get_node(ops_pos)
.await?
.ok_or(qmdb::Error::<F>::DataCorrupted("missing ops pinned node"))?;
pins.push(digest);
}
pins
};
let hasher = qmdb::hasher::<H>();
let ops_size = any.log.merkle.size();
let ops_leaves = Location::<F>::try_from(ops_size)?;
let grafted_tree = db::build_grafted_tree::<F, H, S, N>(
&hasher,
any.bitmap.as_ref(),
&grafted_pinned_nodes,
&any.log.merkle,
ops_leaves,
&strategy,
)
.await?;
let storage = grafting::Storage::new(
&grafted_tree,
grafting::height::<N>(),
&any.log.merkle,
hasher.clone(),
);
let partial = db::partial_chunk(any.bitmap.as_ref());
let grafted_root = db::compute_grafted_root(
&hasher,
any.bitmap.as_ref(),
&storage,
ops_leaves,
any.inactivity_floor_loc,
)
.await?;
let ops_root = any.root();
let partial_digest = partial.map(|(chunk, next_bit)| {
let digest = hasher.digest(&chunk);
(next_bit, digest)
});
let pending_digest =
db::pending_chunk::<F, _, N>(any.bitmap.as_ref(), ops_leaves, grafting::height::<N>())?
.map(|chunk| hasher.digest(&chunk));
let root = db::combine_roots(
&hasher,
&ops_root,
&grafted_root,
pending_digest.as_ref(),
partial_digest.as_ref().map(|(nb, d)| (*nb, d)),
);
let (metadata, _, _) =
db::init_metadata::<F, E, DigestOf<H>>(context.child("metadata"), &metadata_partition)
.await?;
let metrics = db::Metrics::new(context);
let current_db = db::Db {
any,
grafted_tree,
metadata: AsyncMutex::new(metadata),
strategy,
root,
metrics,
};
current_db.update_metrics();
current_db.sync_metadata().await?;
Ok(current_db)
}
macro_rules! impl_current_sync_database {
($db:ident, $op:ident, $update:ident,
$journal:ty, $config:ty,
$key_bound:path, $value_bound:ident
$(; $($where_extra:tt)+)?) => {
impl<F, E, K, V, H, T, const N: usize, S> Database for $db<F, E, K, V, H, T, N, S>
where
F: Graftable,
E: Context,
K: $key_bound,
V: $value_bound + 'static,
H: Hasher,
T: Translator,
S: Strategy,
$($($where_extra)+)?
{
type Family = F;
type Context = E;
type Op = $op<F, K, V>;
type Journal = $journal;
type Hasher = H;
type Config = $config;
type Digest = H::Digest;
async fn from_sync_result(
context: Self::Context,
config: Self::Config,
log: Self::Journal,
pinned_nodes: Option<Vec<Self::Digest>>,
range: NonEmptyRange<Location<F>>,
apply_batch_size: usize,
) -> Result<Self, qmdb::Error<F>> {
let merkle_config = config.merkle_config.clone();
let metadata_partition = config.grafted_metadata_partition.clone();
let strategy = config.merkle_config.strategy.clone();
let translator = config.translator.clone();
build_db::<F, _, $update<K, V>, _, H, _, T, N, _>(
context,
merkle_config,
log,
translator,
pinned_nodes,
range,
apply_batch_size,
metadata_partition,
strategy,
)
.await
}
async fn local_boundary_nodes(
context: Self::Context,
config: &Self::Config,
target: &qmdb::sync::Target<Self::Family, Self::Digest>,
journal: &Self::Journal,
) -> Result<Option<Vec<Self::Digest>>, qmdb::Error<F>> {
if target.range.start() == Location::new(0) {
return Ok(None);
}
let reader = journal.reader().await;
let bounds = reader.bounds();
if Location::new(bounds.start) > target.range.start()
|| Location::new(bounds.end) != target.range.end()
{
return Ok(None);
}
let inactivity_floor = qmdb::find_inactivity_floor_at::<F, _>(
&reader,
target.range.end(),
|op| op.has_floor(),
)
.await?;
drop(reader);
let hasher = qmdb::hasher::<H>();
let merkle = Merkle::<F, _, _, S>::init(
context.child("local_boundary_merkle"),
&hasher,
config.merkle_config.clone(),
)
.await?;
let bounds = merkle.bounds();
if bounds.start > target.range.start() || bounds.end != target.range.end() {
return Ok(None);
}
let inactive_peaks = F::inactive_peaks(
F::location_to_position(target.range.end()),
inactivity_floor,
);
if merkle.root(&hasher, inactive_peaks)? != target.root {
return Ok(None);
}
merkle
.pinned_nodes_at(target.range.start())
.await
.map(Some)
.map_err(Into::into)
}
fn root(&self) -> Self::Digest {
self.any.root()
}
}
};
}
impl_current_sync_database!(
CurrentUnorderedFixedDb, UnorderedFixedOp, UnorderedFixedUpdate,
fixed::Journal<E, Self::Op>, FixedConfig<T, S>,
Array, FixedValue
);
impl_current_sync_database!(
CurrentUnorderedVariableDb, UnorderedVariableOp, UnorderedVariableUpdate,
variable::Journal<E, Self::Op>,
VariableConfig<T, <UnorderedVariableOp<F, K, V> as CodecRead>::Cfg, S>,
Key, VariableValue;
UnorderedVariableOp<F, K, V>: CodecShared
);
impl_current_sync_database!(
CurrentOrderedFixedDb, OrderedFixedOp, OrderedFixedUpdate,
fixed::Journal<E, Self::Op>, FixedConfig<T, S>,
Array, FixedValue
);
impl_current_sync_database!(
CurrentOrderedVariableDb, OrderedVariableOp, OrderedVariableUpdate,
variable::Journal<E, Self::Op>,
VariableConfig<T, <OrderedVariableOp<F, K, V> as CodecRead>::Cfg, S>,
Key, VariableValue;
OrderedVariableOp<F, K, V>: CodecShared
);
macro_rules! impl_current_resolver {
($db:ident, $op:ident, $val_bound:ident, $key_bound:path $(; $($where_extra:tt)+)?) => {
impl<F, E, K, V, H, T, const N: usize, S> crate::qmdb::sync::Resolver
for std::sync::Arc<$db<F, E, K, V, H, T, N, S>>
where
F: Graftable,
E: Context,
K: $key_bound,
V: $val_bound + Send + Sync + 'static,
H: Hasher,
T: Translator + Send + Sync + 'static,
T::Key: Send + Sync,
S: Strategy,
$($($where_extra)+)?
{
type Family = F;
type Digest = H::Digest;
type Op = $op<F, K, V>;
type Error = qmdb::Error<F>;
async fn get_operations(
&self,
op_count: Location<F>,
start_loc: Location<F>,
max_ops: std::num::NonZeroU64,
include_pinned_nodes: bool,
_cancel_rx: oneshot::Receiver<()>,
) -> Result<crate::qmdb::sync::FetchResult<F, Self::Op, Self::Digest>, Self::Error> {
fetch_operations(
op_count,
start_loc,
max_ops,
include_pinned_nodes,
|op_count, start_loc, max_ops| {
self.any.historical_proof(op_count, start_loc, max_ops)
},
|start_loc| self.any.pinned_nodes_at(start_loc),
)
.await
}
}
impl<F, E, K, V, H, T, const N: usize, S> crate::qmdb::sync::Resolver
for std::sync::Arc<
commonware_utils::sync::AsyncRwLock<
$db<F, E, K, V, H, T, N, S>,
>,
>
where
F: Graftable,
E: Context,
K: $key_bound,
V: $val_bound + Send + Sync + 'static,
H: Hasher,
T: Translator + Send + Sync + 'static,
T::Key: Send + Sync,
S: Strategy,
$($($where_extra)+)?
{
type Family = F;
type Digest = H::Digest;
type Op = $op<F, K, V>;
type Error = qmdb::Error<F>;
async fn get_operations(
&self,
op_count: Location<F>,
start_loc: Location<F>,
max_ops: std::num::NonZeroU64,
include_pinned_nodes: bool,
_cancel_rx: oneshot::Receiver<()>,
) -> Result<crate::qmdb::sync::FetchResult<F, Self::Op, Self::Digest>, qmdb::Error<F>> {
let db = self.read().await;
fetch_operations(
op_count,
start_loc,
max_ops,
include_pinned_nodes,
|op_count, start_loc, max_ops| {
db.any.historical_proof(op_count, start_loc, max_ops)
},
|start_loc| db.any.pinned_nodes_at(start_loc),
)
.await
}
}
impl<F, E, K, V, H, T, const N: usize, S> crate::qmdb::sync::Resolver
for std::sync::Arc<
commonware_utils::sync::AsyncRwLock<
Option<$db<F, E, K, V, H, T, N, S>>,
>,
>
where
F: Graftable,
E: Context,
K: $key_bound,
V: $val_bound + Send + Sync + 'static,
H: Hasher,
T: Translator + Send + Sync + 'static,
T::Key: Send + Sync,
S: Strategy,
$($($where_extra)+)?
{
type Family = F;
type Digest = H::Digest;
type Op = $op<F, K, V>;
type Error = qmdb::Error<F>;
async fn get_operations(
&self,
op_count: Location<F>,
start_loc: Location<F>,
max_ops: std::num::NonZeroU64,
include_pinned_nodes: bool,
_cancel_rx: oneshot::Receiver<()>,
) -> Result<crate::qmdb::sync::FetchResult<F, Self::Op, Self::Digest>, qmdb::Error<F>> {
let guard = self.read().await;
let db = guard.as_ref().ok_or(qmdb::Error::<F>::KeyNotFound)?;
fetch_operations(
op_count,
start_loc,
max_ops,
include_pinned_nodes,
|op_count, start_loc, max_ops| {
db.any.historical_proof(op_count, start_loc, max_ops)
},
|start_loc| db.any.pinned_nodes_at(start_loc),
)
.await
}
}
};
}
impl_current_resolver!(CurrentUnorderedFixedDb, UnorderedFixedOp, FixedValue, Array);
impl_current_resolver!(
CurrentUnorderedVariableDb, UnorderedVariableOp, VariableValue, Key;
UnorderedVariableOp<F, K, V>: CodecShared,
);
impl_current_resolver!(CurrentOrderedFixedDb, OrderedFixedOp, FixedValue, Array);
impl_current_resolver!(
CurrentOrderedVariableDb, OrderedVariableOp, VariableValue, Key;
OrderedVariableOp<F, K, V>: CodecShared,
);