use crate::{
index::unordered::Index,
journal::{
authenticated,
contiguous::{Mutable, Reader as _},
Error as JournalError,
},
merkle::{
full::{self, Merkle},
Family, Location,
},
qmdb::{
self,
any::ValueEncoding,
build_snapshot_from_log,
immutable::{self, CompactDb, Metrics, Operation},
operation::Key,
sync::{self},
Error,
},
translator::Translator,
Context, Persistable,
};
use commonware_codec::{Encode, EncodeShared, Read};
use commonware_cryptography::Hasher;
use commonware_parallel::Strategy;
use commonware_utils::range::NonEmptyRange;
impl<F, E, K, V, C, H, T, S> sync::Database for immutable::Immutable<F, E, K, V, C, H, T, S>
where
F: Family,
E: Context,
K: Key,
V: ValueEncoding,
C: Mutable<Item = Operation<F, K, V>>
+ Persistable<Error = JournalError>
+ sync::Journal<F, Context = E, Op = Operation<F, K, V>>,
C::Item: EncodeShared,
C::Config: Clone + Send,
H: Hasher,
T: Translator,
S: Strategy,
{
type Family = F;
type Op = Operation<F, K, V>;
type Journal = C;
type Hasher = H;
type Config = immutable::Config<T, C::Config, S>;
type Digest = H::Digest;
type Context = E;
async fn from_sync_result(
context: Self::Context,
db_config: Self::Config,
log: Self::Journal,
pinned_nodes: Option<Vec<Self::Digest>>,
range: NonEmptyRange<Location<F>>,
apply_batch_size: usize,
) -> Result<Self, Error<F>> {
let hasher = qmdb::hasher::<H>();
let merkle = Merkle::<F, _, _, S>::init_sync(
context.child("merkle"),
full::SyncConfig {
config: db_config.merkle_config.clone(),
range: range.clone(),
pinned_nodes,
},
)
.await?;
let journal = authenticated::Journal::<_, _, _, _, S>::from_components(
merkle,
log,
hasher,
apply_batch_size as u64,
)
.await?;
let mut snapshot: Index<T, Location<F>> =
Index::new(context.child("snapshot"), db_config.translator.clone());
let (last_commit_loc, inactivity_floor_loc) = {
let reader = journal.journal.reader().await;
let bounds = reader.bounds();
let last_commit_loc = Location::<F>::new(
bounds
.end
.checked_sub(1)
.ok_or(Error::HistoricalFloorPruned(Location::new(bounds.end)))?,
);
let inactivity_floor_loc = crate::qmdb::find_inactivity_floor_at::<F, _>(
&reader,
Location::new(bounds.end),
|op| op.has_floor(),
)
.await?;
build_snapshot_from_log::<F, _, _, _>(
inactivity_floor_loc,
&reader,
&mut snapshot,
|_, _| {},
)
.await?;
(last_commit_loc, inactivity_floor_loc)
};
let inactive_peaks = F::inactive_peaks(
F::location_to_position(Location::new(*last_commit_loc + 1)),
inactivity_floor_loc,
);
let root = journal.root(inactive_peaks)?;
let metrics = Metrics::new(context);
let db = Self {
journal,
root,
snapshot,
last_commit_loc,
inactivity_floor_loc,
metrics,
};
db.update_metrics().await;
db.sync().await?;
Ok(db)
}
fn root(&self) -> Self::Digest {
self.root()
}
}
impl<F, E, K, V, H, Cfg, S> sync::compact::Database for CompactDb<F, E, K, V, H, Cfg, S>
where
F: Family,
E: Context,
K: Key,
V: ValueEncoding,
H: Hasher,
S: Strategy,
Operation<F, K, V>: EncodeShared,
Operation<F, K, V>: Read<Cfg = Cfg>,
Cfg: Clone + Send + Sync + 'static,
{
type Family = F;
type Op = Operation<F, K, V>;
type Config = immutable::CompactConfig<Cfg, S>;
type Digest = H::Digest;
type Context = E;
type Hasher = H;
async fn from_validated_state(
context: Self::Context,
config: Self::Config,
state: sync::compact::ValidatedState<Self::Family, Self::Op, Self::Digest>,
) -> Result<Self, Error<F>> {
let sync::compact::ValidatedState {
state,
root,
inactivity_floor: inactivity_floor_loc,
} = state;
let sync::compact::State {
leaf_count,
pinned_nodes,
last_commit_op,
last_commit_proof,
} = state;
let last_commit_loc = Location::new(*leaf_count - 1);
let Operation::Commit(last_commit_metadata, op_floor) = last_commit_op else {
return Err(Error::UnexpectedData(last_commit_loc));
};
assert_eq!(op_floor, inactivity_floor_loc, "inactivity floor mismatch");
let commit_codec_config = config.commit_codec_config.clone();
let last_commit_op_bytes =
Operation::<F, K, V>::Commit(last_commit_metadata.clone(), inactivity_floor_loc)
.encode()
.to_vec();
let merkle = crate::merkle::compact::Merkle::init_from_compact_state(
context.child("merkle"),
config.merkle,
leaf_count,
pinned_nodes.clone(),
)
.await?;
Self::init_from_verified_state(
merkle,
commit_codec_config,
last_commit_metadata,
inactivity_floor_loc,
root,
last_commit_op_bytes,
last_commit_proof,
pinned_nodes,
)
}
fn inactivity_floor(op: &Self::Op) -> Option<Location<Self::Family>> {
op.has_floor()
}
fn root(&self) -> Self::Digest {
self.root()
}
async fn persist_compact_state(&self) -> Result<(), Error<F>> {
self.persist_cached_witness().await
}
}
#[cfg(test)]
mod tests;