use crate::{
merkle::{Family, Location, Proof},
qmdb::{
self,
any::{value::ValueEncoding, FixedValue, VariableValue},
immutable::{
fixed::{Db as ImmutableFixedDb, Operation as ImmutableFixedOp},
variable::{Db as ImmutableVariableDb, Operation as ImmutableVariableOp},
CompactDb as ImmutableCompactDb, Operation as ImmutableOp,
},
keyless::{
fixed::{Db as KeylessFixedDb, Operation as KeylessFixedOp},
variable::{Db as KeylessVariableDb, Operation as KeylessVariableOp},
CompactDb as KeylessCompactDb, Operation as KeylessOp,
},
operation::Key,
sync::{EngineError, Error},
verify_proof,
},
translator::Translator,
};
use commonware_codec::{
Encode, EncodeSize, Error as CodecError, RangeCfg, Read, ReadExt as _, Write,
};
use commonware_cryptography::{Digest, Hasher};
use commonware_parallel::Strategy;
use commonware_runtime::{Buf, BufMut, Clock, Metrics, Storage, Supervisor};
use commonware_utils::{channel::oneshot, sync::AsyncRwLock, Array};
use std::{future::Future, num::NonZeroU64, sync::Arc};
#[derive(Debug)]
pub struct Target<F: Family, D: Digest> {
pub root: D,
pub leaf_count: Location<F>,
}
impl<F: Family, D: Digest> Target<F, D> {
const INVALID_LEAF_COUNT: &'static str = "leaf_count must be in 1..=MAX_LEAVES";
pub const fn new(root: D, leaf_count: Location<F>) -> Self {
Self { root, leaf_count }
}
pub fn validate(&self) -> Result<(), &'static str> {
if !self.leaf_count.is_valid() || self.leaf_count == 0 {
return Err(Self::INVALID_LEAF_COUNT);
}
Ok(())
}
}
impl<F: Family, D: Digest> Clone for Target<F, D> {
fn clone(&self) -> Self {
Self {
root: self.root,
leaf_count: self.leaf_count,
}
}
}
impl<F: Family, D: Digest> PartialEq for Target<F, D> {
fn eq(&self, other: &Self) -> bool {
self.root == other.root && self.leaf_count == other.leaf_count
}
}
impl<F: Family, D: Digest> Eq for Target<F, D> {}
impl<F: Family, D: Digest> Write for Target<F, D> {
fn write(&self, buf: &mut impl BufMut) {
self.root.write(buf);
self.leaf_count.write(buf);
}
}
impl<F: Family, D: Digest> EncodeSize for Target<F, D> {
fn encode_size(&self) -> usize {
self.root.encode_size() + self.leaf_count.encode_size()
}
}
impl<F: Family, D: Digest> Read for Target<F, D> {
type Cfg = ();
fn read_cfg(buf: &mut impl Buf, _: &()) -> Result<Self, CodecError> {
let root = D::read(buf)?;
let leaf_count = Location::<F>::read(buf)?;
let target = Self { root, leaf_count };
target.validate().map_err(|reason| {
CodecError::Invalid("storage::qmdb::sync::compact::Target", reason)
})?;
Ok(target)
}
}
#[cfg(feature = "arbitrary")]
impl<F: Family, D: Digest> arbitrary::Arbitrary<'_> for Target<F, D>
where
D: for<'a> arbitrary::Arbitrary<'a>,
{
fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
let root = u.arbitrary()?;
let leaf_count = Location::new(u.int_in_range(1..=*F::MAX_LEAVES)?);
Ok(Self { root, leaf_count })
}
}
#[derive(Clone, Debug)]
pub struct State<F: Family, Op, D: Digest> {
pub leaf_count: Location<F>,
pub pinned_nodes: Vec<D>,
pub last_commit_op: Op,
pub last_commit_proof: Proof<F, D>,
}
#[derive(Clone, Debug)]
pub struct ValidatedState<F: Family, Op, D: Digest> {
pub state: State<F, Op, D>,
pub root: D,
pub inactivity_floor: Location<F>,
}
impl<F: Family, Op, D: Digest> ValidatedState<F, Op, D> {
const fn new(state: State<F, Op, D>, root: D, inactivity_floor: Location<F>) -> Self {
Self {
state,
root,
inactivity_floor,
}
}
}
impl<F: Family, Op, D: Digest> Write for State<F, Op, D>
where
Op: Write,
{
fn write(&self, buf: &mut impl BufMut) {
self.leaf_count.write(buf);
self.pinned_nodes.write(buf);
self.last_commit_op.write(buf);
self.last_commit_proof.write(buf);
}
}
pub struct FetchResult<F: Family, Op, D: Digest> {
pub state: State<F, Op, D>,
pub callback: Option<oneshot::Sender<bool>>,
}
impl<F: Family, Op: std::fmt::Debug, D: Digest> std::fmt::Debug for FetchResult<F, Op, D> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("FetchResult")
.field("state", &self.state)
.field("callback", &self.callback.as_ref().map(|_| "<callback>"))
.finish()
}
}
impl<F: Family, Op, D: Digest> From<State<F, Op, D>> for FetchResult<F, Op, D> {
fn from(state: State<F, Op, D>) -> Self {
Self {
state,
callback: None,
}
}
}
impl<F: Family, Op, D: Digest> EncodeSize for State<F, Op, D>
where
Op: EncodeSize,
{
fn encode_size(&self) -> usize {
self.leaf_count.encode_size()
+ self.pinned_nodes.encode_size()
+ self.last_commit_op.encode_size()
+ self.last_commit_proof.encode_size()
}
}
impl<F: Family, Op, D: Digest> Read for State<F, Op, D>
where
Op: Read,
{
type Cfg = (RangeCfg<usize>, Op::Cfg, usize);
fn read_cfg(buf: &mut impl Buf, cfg: &Self::Cfg) -> Result<Self, CodecError> {
let (pinned_nodes_cfg, op_cfg, max_proof_digests) = cfg;
Ok(Self {
leaf_count: Location::<F>::read(buf)?,
pinned_nodes: Vec::<D>::read_cfg(buf, &(*pinned_nodes_cfg, ()))?,
last_commit_op: Op::read_cfg(buf, op_cfg)?,
last_commit_proof: Proof::<F, D>::read_cfg(buf, max_proof_digests)?,
})
}
}
#[derive(Debug, thiserror::Error)]
pub enum ServeError<F: Family, D: Digest> {
#[error("compact source database error: {0}")]
Database(#[from] qmdb::Error<F>),
#[error("invalid compact target: {0}")]
InvalidTarget(&'static str),
#[error("compact source missing")]
MissingSource,
#[error("stale compact target - requested {requested:?}, current {current:?}")]
StaleTarget {
requested: Target<F, D>,
current: Target<F, D>,
},
}
#[allow(clippy::type_complexity)]
pub trait Resolver: Send + Sync + Clone + 'static {
type Family: Family;
type Digest: Digest;
type Op;
type Error: std::error::Error + Send + 'static;
fn get_compact_state<'a>(
&'a self,
target: Target<Self::Family, Self::Digest>,
) -> impl Future<Output = Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error>>
+ Send
+ 'a;
}
pub trait CompactDbResolver<DB: Database>:
Resolver<Family = DB::Family, Op = DB::Op, Digest = DB::Digest>
{
}
impl<DB, R> CompactDbResolver<DB> for R
where
DB: Database,
R: Resolver<Family = DB::Family, Op = DB::Op, Digest = DB::Digest>,
{
}
pub trait Database: Sized + Send {
type Family: Family;
type Op: Encode + Send;
type Config: Clone;
type Digest: Digest;
type Context: Storage + Clock + Metrics;
type Hasher: Hasher<Digest = Self::Digest>;
fn from_validated_state(
context: Self::Context,
config: Self::Config,
state: ValidatedState<Self::Family, Self::Op, Self::Digest>,
) -> impl Future<Output = Result<Self, qmdb::Error<Self::Family>>> + Send;
fn inactivity_floor(op: &Self::Op) -> Option<Location<Self::Family>>;
fn root(&self) -> Self::Digest;
fn persist_compact_state(
&self,
) -> impl Future<Output = Result<(), qmdb::Error<Self::Family>>> + Send;
}
pub struct Config<DB, R>
where
DB: Database,
R: CompactDbResolver<DB>,
{
pub context: DB::Context,
pub resolver: R,
pub target: Target<DB::Family, DB::Digest>,
pub db_config: DB::Config,
}
pub async fn sync<DB, R>(
config: Config<DB, R>,
) -> Result<DB, Error<DB::Family, R::Error, DB::Digest>>
where
DB: Database,
R: CompactDbResolver<DB>,
{
let target = config.target;
target
.validate()
.map_err(|reason| Error::Engine(EngineError::InvalidCompactTarget(reason)))?;
loop {
let FetchResult { state, callback } = config
.resolver
.get_compact_state(target.clone())
.await
.map_err(Error::Resolver)?;
let validated_state = match validate_compact_state::<DB>(&target, state) {
Ok(state) => state,
Err(err) => {
if let Some(callback) = callback {
let _ = callback.send(false);
}
tracing::debug!(error = ?err, "compact state failed validation, will retry");
continue;
}
};
let db = DB::from_validated_state(
config.context.child("compact"),
config.db_config.clone(),
validated_state,
)
.await
.map_err(Error::Database)?;
assert_eq!(
db.root(),
target.root,
"validated compact state reconstructed unexpected root",
);
if let Some(callback) = callback {
let _ = callback.send(true);
}
db.persist_compact_state().await?;
return Ok(db);
}
}
fn validate_compact_state<DB>(
target: &Target<DB::Family, DB::Digest>,
state: State<DB::Family, DB::Op, DB::Digest>,
) -> CompactFrontierValidation<DB>
where
DB: Database,
{
if state.leaf_count != target.leaf_count {
return Err(EngineError::UnexpectedLeafCount {
expected: target.leaf_count,
actual: state.leaf_count,
});
}
let hasher = qmdb::hasher::<DB::Hasher>();
let last_commit_loc = Location::new(*state.leaf_count - 1);
if !verify_proof(
&hasher,
&state.last_commit_proof,
last_commit_loc,
std::slice::from_ref(&state.last_commit_op),
&target.root,
) {
return Err(EngineError::InvalidProof);
}
validate_compact_frontier::<DB>(target, state)
}
type CompactFrontierValidation<DB> = Result<
ValidatedState<<DB as Database>::Family, <DB as Database>::Op, <DB as Database>::Digest>,
EngineError<<DB as Database>::Family, <DB as Database>::Digest>,
>;
fn validate_compact_frontier<DB>(
target: &Target<DB::Family, DB::Digest>,
state: State<DB::Family, DB::Op, DB::Digest>,
) -> CompactFrontierValidation<DB>
where
DB: Database,
{
let last_commit_loc = Location::new(*state.leaf_count - 1);
let Some(inactivity_floor_loc) = DB::inactivity_floor(&state.last_commit_op) else {
return Err(EngineError::InvalidProof);
};
if inactivity_floor_loc > last_commit_loc {
return Err(EngineError::InvalidProof);
}
let mem = crate::merkle::mem::Mem::<DB::Family, DB::Digest>::init(crate::merkle::mem::Config {
nodes: Vec::new(),
pruning_boundary: state.leaf_count,
pinned_nodes: state.pinned_nodes.clone(),
})
.map_err(|_| EngineError::InvalidProof)?;
let hasher = qmdb::hasher::<DB::Hasher>();
let inactive_peaks = DB::Family::inactive_peaks(
DB::Family::location_to_position(state.leaf_count),
inactivity_floor_loc,
);
let actual = mem
.root(&hasher, inactive_peaks)
.map_err(|_| EngineError::InvalidProof)?;
if actual != target.root {
return Err(EngineError::RootMismatch {
expected: target.root,
actual,
});
}
Ok(ValidatedState::new(
state,
target.root,
inactivity_floor_loc,
))
}
async fn fetch_state_from_full_source<F, Op, D, Current, CurrentFut, Hist, HistFut, Pins, PinsFut>(
target: Target<F, D>,
current_target: Current,
historical_proof: Hist,
pinned_nodes_at: Pins,
) -> Result<State<F, Op, D>, ServeError<F, D>>
where
F: Family,
D: Digest,
Current: FnOnce() -> CurrentFut,
CurrentFut: Future<Output = Target<F, D>>,
Hist: FnOnce(Location<F>, Location<F>) -> HistFut,
HistFut: Future<Output = Result<(Proof<F, D>, Vec<Op>), qmdb::Error<F>>>,
Pins: FnOnce(Location<F>) -> PinsFut,
PinsFut: Future<Output = Result<Vec<D>, qmdb::Error<F>>>,
{
target.validate().map_err(ServeError::InvalidTarget)?;
let current = current_target().await;
if target.root != current.root || target.leaf_count != current.leaf_count {
return Err(ServeError::StaleTarget {
requested: target,
current,
});
}
let leaf_count = target.leaf_count;
let last_commit_loc = Location::new(*leaf_count - 1);
let (last_commit_proof, mut operations) = historical_proof(leaf_count, last_commit_loc)
.await
.map_err(ServeError::Database)?;
let last_commit_op =
operations
.pop()
.ok_or(ServeError::Database(qmdb::Error::DataCorrupted(
"missing last commit operation",
)))?;
let pinned_nodes = pinned_nodes_at(leaf_count)
.await
.map_err(ServeError::Database)?;
Ok(State {
leaf_count,
pinned_nodes,
last_commit_op,
last_commit_proof,
})
}
macro_rules! impl_compact_resolver_keyless {
($db:ident, $op:ident, $val_bound:ident) => {
impl<F, E, V, H, S> Resolver for Arc<$db<F, E, V, H, S>>
where
F: Family,
E: crate::Context,
V: $val_bound + Send + Sync + 'static,
H: Hasher,
S: Strategy,
{
type Family = F;
type Digest = H::Digest;
type Op = $op<F, V>;
type Error = ServeError<F, H::Digest>;
async fn get_compact_state(
&self,
target: Target<Self::Family, Self::Digest>,
) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
fetch_state_from_full_source(
target,
|| async { Target::new(self.root(), self.bounds().await.end) },
|leaf_count, last_commit_loc| {
self.historical_proof(
leaf_count,
last_commit_loc,
NonZeroU64::new(1).unwrap(),
)
},
|leaf_count| self.pinned_nodes_at(leaf_count),
)
.await
.map(Into::into)
}
}
impl<F, E, V, H, S> Resolver for Arc<AsyncRwLock<$db<F, E, V, H, S>>>
where
F: Family,
E: crate::Context,
V: $val_bound + Send + Sync + 'static,
H: Hasher,
S: Strategy,
{
type Family = F;
type Digest = H::Digest;
type Op = $op<F, V>;
type Error = ServeError<F, H::Digest>;
async fn get_compact_state(
&self,
target: Target<Self::Family, Self::Digest>,
) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
let db = self.read().await;
fetch_state_from_full_source(
target,
|| async { Target::new(db.root(), db.bounds().await.end) },
|leaf_count, last_commit_loc| {
db.historical_proof(
leaf_count,
last_commit_loc,
NonZeroU64::new(1).unwrap(),
)
},
|leaf_count| db.pinned_nodes_at(leaf_count),
)
.await
.map(Into::into)
}
}
impl<F, E, V, H, S> Resolver for Arc<AsyncRwLock<Option<$db<F, E, V, H, S>>>>
where
F: Family,
E: crate::Context,
V: $val_bound + Send + Sync + 'static,
H: Hasher,
S: Strategy,
{
type Family = F;
type Digest = H::Digest;
type Op = $op<F, V>;
type Error = ServeError<F, H::Digest>;
async fn get_compact_state(
&self,
target: Target<Self::Family, Self::Digest>,
) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
let guard = self.read().await;
let db = guard.as_ref().ok_or(ServeError::MissingSource)?;
fetch_state_from_full_source(
target,
|| async { Target::new(db.root(), db.bounds().await.end) },
|leaf_count, last_commit_loc| {
db.historical_proof(
leaf_count,
last_commit_loc,
NonZeroU64::new(1).unwrap(),
)
},
|leaf_count| db.pinned_nodes_at(leaf_count),
)
.await
.map(Into::into)
}
}
};
}
macro_rules! impl_compact_resolver_immutable {
($db:ident, $op:ident, $val_bound:ident, $key_bound:path) => {
impl<F, E, K, V, H, T, S> Resolver for Arc<$db<F, E, K, V, H, T, S>>
where
F: Family,
E: crate::Context,
K: $key_bound,
V: $val_bound + Send + Sync + 'static,
H: Hasher,
T: Translator + Send + Sync + 'static,
T::Key: Send + Sync,
S: Strategy,
{
type Family = F;
type Digest = H::Digest;
type Op = $op<F, K, V>;
type Error = ServeError<F, H::Digest>;
async fn get_compact_state(
&self,
target: Target<Self::Family, Self::Digest>,
) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
fetch_state_from_full_source(
target,
|| async { Target::new(self.root(), self.bounds().await.end) },
|leaf_count, last_commit_loc| {
self.historical_proof(
leaf_count,
last_commit_loc,
NonZeroU64::new(1).unwrap(),
)
},
|leaf_count| self.pinned_nodes_at(leaf_count),
)
.await
.map(Into::into)
}
}
impl<F, E, K, V, H, T, S> Resolver for Arc<AsyncRwLock<$db<F, E, K, V, H, T, S>>>
where
F: Family,
E: crate::Context,
K: $key_bound,
V: $val_bound + Send + Sync + 'static,
H: Hasher,
T: Translator + Send + Sync + 'static,
T::Key: Send + Sync,
S: Strategy,
{
type Family = F;
type Digest = H::Digest;
type Op = $op<F, K, V>;
type Error = ServeError<F, H::Digest>;
async fn get_compact_state(
&self,
target: Target<Self::Family, Self::Digest>,
) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
let db = self.read().await;
fetch_state_from_full_source(
target,
|| async { Target::new(db.root(), db.bounds().await.end) },
|leaf_count, last_commit_loc| {
db.historical_proof(
leaf_count,
last_commit_loc,
NonZeroU64::new(1).unwrap(),
)
},
|leaf_count| db.pinned_nodes_at(leaf_count),
)
.await
.map(Into::into)
}
}
impl<F, E, K, V, H, T, S> Resolver for Arc<AsyncRwLock<Option<$db<F, E, K, V, H, T, S>>>>
where
F: Family,
E: crate::Context,
K: $key_bound,
V: $val_bound + Send + Sync + 'static,
H: Hasher,
T: Translator + Send + Sync + 'static,
T::Key: Send + Sync,
S: Strategy,
{
type Family = F;
type Digest = H::Digest;
type Op = $op<F, K, V>;
type Error = ServeError<F, H::Digest>;
async fn get_compact_state(
&self,
target: Target<Self::Family, Self::Digest>,
) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
let guard = self.read().await;
let db = guard.as_ref().ok_or(ServeError::MissingSource)?;
fetch_state_from_full_source(
target,
|| async { Target::new(db.root(), db.bounds().await.end) },
|leaf_count, last_commit_loc| {
db.historical_proof(
leaf_count,
last_commit_loc,
NonZeroU64::new(1).unwrap(),
)
},
|leaf_count| db.pinned_nodes_at(leaf_count),
)
.await
.map(Into::into)
}
}
};
}
macro_rules! impl_compact_resolver_compact_keyless {
($db:ident, $op:ident) => {
impl<F, E, V, H, C, S> Resolver for Arc<$db<F, E, V, H, C, S>>
where
F: Family,
E: crate::Context,
V: ValueEncoding + Send + Sync + 'static,
H: Hasher,
$op<F, V>: Encode + Read<Cfg = C>,
C: Clone + Send + Sync + 'static,
S: Strategy,
{
type Family = F;
type Digest = H::Digest;
type Op = $op<F, V>;
type Error = ServeError<F, H::Digest>;
async fn get_compact_state(
&self,
target: Target<Self::Family, Self::Digest>,
) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
self.compact_state(target).map(Into::into)
}
}
impl<F, E, V, H, C, S> Resolver for Arc<AsyncRwLock<$db<F, E, V, H, C, S>>>
where
F: Family,
E: crate::Context,
V: ValueEncoding + Send + Sync + 'static,
H: Hasher,
$op<F, V>: Encode + Read<Cfg = C>,
C: Clone + Send + Sync + 'static,
S: Strategy,
{
type Family = F;
type Digest = H::Digest;
type Op = $op<F, V>;
type Error = ServeError<F, H::Digest>;
async fn get_compact_state(
&self,
target: Target<Self::Family, Self::Digest>,
) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
let db = self.read().await;
db.compact_state(target).map(Into::into)
}
}
impl<F, E, V, H, C, S> Resolver for Arc<AsyncRwLock<Option<$db<F, E, V, H, C, S>>>>
where
F: Family,
E: crate::Context,
V: ValueEncoding + Send + Sync + 'static,
H: Hasher,
$op<F, V>: Encode + Read<Cfg = C>,
C: Clone + Send + Sync + 'static,
S: Strategy,
{
type Family = F;
type Digest = H::Digest;
type Op = $op<F, V>;
type Error = ServeError<F, H::Digest>;
async fn get_compact_state(
&self,
target: Target<Self::Family, Self::Digest>,
) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
let guard = self.read().await;
let db = guard.as_ref().ok_or(ServeError::MissingSource)?;
db.compact_state(target).map(Into::into)
}
}
};
}
macro_rules! impl_compact_resolver_compact_immutable {
($db:ident, $op:ident) => {
impl<F, E, K, V, H, C, S> Resolver for Arc<$db<F, E, K, V, H, C, S>>
where
F: Family,
E: crate::Context,
K: Key,
V: ValueEncoding + Send + Sync + 'static,
H: Hasher,
$op<F, K, V>: Encode + Read<Cfg = C>,
C: Clone + Send + Sync + 'static,
S: Strategy,
{
type Family = F;
type Digest = H::Digest;
type Op = $op<F, K, V>;
type Error = ServeError<F, H::Digest>;
async fn get_compact_state(
&self,
target: Target<Self::Family, Self::Digest>,
) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
self.compact_state(target).map(Into::into)
}
}
impl<F, E, K, V, H, C, S> Resolver for Arc<AsyncRwLock<$db<F, E, K, V, H, C, S>>>
where
F: Family,
E: crate::Context,
K: Key,
V: ValueEncoding + Send + Sync + 'static,
H: Hasher,
$op<F, K, V>: Encode + Read<Cfg = C>,
C: Clone + Send + Sync + 'static,
S: Strategy,
{
type Family = F;
type Digest = H::Digest;
type Op = $op<F, K, V>;
type Error = ServeError<F, H::Digest>;
async fn get_compact_state(
&self,
target: Target<Self::Family, Self::Digest>,
) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
let db = self.read().await;
db.compact_state(target).map(Into::into)
}
}
impl<F, E, K, V, H, C, S> Resolver for Arc<AsyncRwLock<Option<$db<F, E, K, V, H, C, S>>>>
where
F: Family,
E: crate::Context,
K: Key,
V: ValueEncoding + Send + Sync + 'static,
H: Hasher,
$op<F, K, V>: Encode + Read<Cfg = C>,
C: Clone + Send + Sync + 'static,
S: Strategy,
{
type Family = F;
type Digest = H::Digest;
type Op = $op<F, K, V>;
type Error = ServeError<F, H::Digest>;
async fn get_compact_state(
&self,
target: Target<Self::Family, Self::Digest>,
) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
let guard = self.read().await;
let db = guard.as_ref().ok_or(ServeError::MissingSource)?;
db.compact_state(target).map(Into::into)
}
}
};
}
impl_compact_resolver_compact_keyless!(KeylessCompactDb, KeylessOp);
impl_compact_resolver_compact_immutable!(ImmutableCompactDb, ImmutableOp);
impl_compact_resolver_keyless!(KeylessFixedDb, KeylessFixedOp, FixedValue);
impl_compact_resolver_keyless!(KeylessVariableDb, KeylessVariableOp, VariableValue);
impl_compact_resolver_immutable!(ImmutableFixedDb, ImmutableFixedOp, FixedValue, Array);
impl_compact_resolver_immutable!(ImmutableVariableDb, ImmutableVariableOp, VariableValue, Key);
#[cfg(test)]
mod tests {
use super::{Config, Database, FetchResult, Resolver, State, Target};
use crate::{
merkle::{mmr, Location},
qmdb,
};
use commonware_codec::{DecodeExt as _, Encode as _, RangeCfg};
use commonware_cryptography::{sha256::Digest, Hasher as _, Sha256};
use commonware_parallel::Rayon;
use commonware_runtime::{deterministic, Runner as _};
use commonware_utils::sync::AsyncRwLock;
use std::{
collections::VecDeque,
convert::Infallible,
sync::{
atomic::{AtomicUsize, Ordering},
Arc,
},
};
macro_rules! assert_resolver_variants {
($db:ty) => {
assert_resolver::<Arc<$db>>();
assert_resolver::<Arc<AsyncRwLock<$db>>>();
assert_resolver::<Arc<AsyncRwLock<Option<$db>>>>();
};
}
fn assert_resolver<R: super::Resolver>() {}
struct TestDb {
root: Digest,
}
impl Database for TestDb {
type Family = mmr::Family;
type Op = u8;
type Config = (Digest, Arc<AtomicUsize>);
type Digest = Digest;
type Context = deterministic::Context;
type Hasher = Sha256;
async fn from_validated_state(
_context: Self::Context,
(root, constructions): Self::Config,
_state: super::ValidatedState<Self::Family, Self::Op, Self::Digest>,
) -> Result<Self, qmdb::Error<Self::Family>> {
constructions.fetch_add(1, Ordering::SeqCst);
Ok(Self { root })
}
fn inactivity_floor(_op: &Self::Op) -> Option<Location<Self::Family>> {
Some(Location::new(0))
}
fn root(&self) -> Self::Digest {
self.root
}
async fn persist_compact_state(&self) -> Result<(), qmdb::Error<Self::Family>> {
Ok(())
}
}
#[derive(Clone)]
struct SequenceResolver {
states: Arc<commonware_utils::sync::Mutex<VecDeque<FetchResult<mmr::Family, u8, Digest>>>>,
}
impl Resolver for SequenceResolver {
type Family = mmr::Family;
type Digest = Digest;
type Op = u8;
type Error = Infallible;
async fn get_compact_state(
&self,
_target: Target<Self::Family, Self::Digest>,
) -> Result<FetchResult<Self::Family, Self::Op, Self::Digest>, Self::Error> {
Ok(self
.states
.lock()
.pop_front()
.expect("missing compact fetch result"))
}
}
fn valid_state_and_target() -> (State<mmr::Family, u8, Digest>, Target<mmr::Family, Digest>) {
let hasher = qmdb::hasher::<Sha256>();
let mut merkle = crate::merkle::mem::Mem::<mmr::Family, Digest>::new();
let op = 0u8;
let first_op = 1u8;
let batch = merkle
.new_batch()
.add(&hasher, &first_op.encode())
.add(&hasher, &op.encode());
let batch = batch.merkleize(&merkle, &hasher);
merkle.apply_batch(&batch).unwrap();
let root = merkle.root(&hasher, 0).unwrap();
let leaf_count = Location::new(2);
let pinned_nodes = merkle
.nodes_to_pin(leaf_count)
.into_values()
.collect::<Vec<_>>();
let proof = merkle.proof(&hasher, Location::new(1), 0).unwrap();
(
State {
leaf_count,
pinned_nodes,
last_commit_op: op,
last_commit_proof: proof,
},
Target::<mmr::Family, Digest> { root, leaf_count },
)
}
#[test]
fn test_all_compact_qmdb_variants_implement_strategy_resolvers() {
type KeylessFixedCompactDb = crate::qmdb::keyless::fixed::CompactDb<
mmr::Family,
deterministic::Context,
Digest,
commonware_cryptography::Sha256,
Rayon,
>;
type KeylessVariableCompactDb = crate::qmdb::keyless::variable::CompactDb<
mmr::Family,
deterministic::Context,
Vec<u8>,
commonware_cryptography::Sha256,
(RangeCfg<usize>, ()),
Rayon,
>;
type ImmutableFixedCompactDb = crate::qmdb::immutable::fixed::CompactDb<
mmr::Family,
deterministic::Context,
Digest,
Digest,
commonware_cryptography::Sha256,
Rayon,
>;
type ImmutableVariableCompactDb = crate::qmdb::immutable::variable::CompactDb<
mmr::Family,
deterministic::Context,
Digest,
Vec<u8>,
commonware_cryptography::Sha256,
((), (RangeCfg<usize>, ())),
Rayon,
>;
assert_resolver_variants!(KeylessFixedCompactDb);
assert_resolver_variants!(KeylessVariableCompactDb);
assert_resolver_variants!(ImmutableFixedCompactDb);
assert_resolver_variants!(ImmutableVariableCompactDb);
}
#[test]
fn test_target_decode_rejects_zero_leaf_count() {
let unused_root = commonware_cryptography::Sha256::hash(b"unused");
let encoded = Target::<mmr::Family, Digest> {
root: unused_root,
leaf_count: crate::merkle::Location::new(0),
}
.encode();
assert!(Target::<mmr::Family, Digest>::decode(encoded).is_err());
}
#[test]
fn test_compact_sync_retries_invalid_state_without_feedback() {
deterministic::Runner::default().start(|context| async move {
let (good_state, target) = valid_state_and_target();
let mut bad_state = good_state.clone();
bad_state.pinned_nodes.push(Sha256::hash(b"extra pin"));
let (good_tx, good_rx) = commonware_utils::channel::oneshot::channel();
let constructions = Arc::new(AtomicUsize::new(0));
let db = super::sync::<TestDb, _>(Config {
context,
resolver: SequenceResolver {
states: Arc::new(commonware_utils::sync::Mutex::new(VecDeque::from([
FetchResult {
state: bad_state,
callback: None,
},
FetchResult {
state: good_state,
callback: Some(good_tx),
},
]))),
},
target: target.clone(),
db_config: (target.root, constructions.clone()),
})
.await
.unwrap();
assert!(good_rx.await.expect("valid feedback should arrive"));
assert_eq!(constructions.load(Ordering::SeqCst), 1);
assert_eq!(db.root(), target.root);
});
}
}
#[cfg(all(test, feature = "arbitrary"))]
mod conformance {
use super::*;
use crate::merkle::{mmb, mmr};
use commonware_codec::conformance::CodecConformance;
use commonware_cryptography::sha256::Digest as Sha256Digest;
commonware_conformance::conformance_tests! {
CodecConformance<Target<mmr::Family, Sha256Digest>>,
CodecConformance<Target<mmb::Family, Sha256Digest>>,
}
}