use crate::{
merkle::{compact, Family, Location, Proof},
metadata::Metadata,
qmdb::{self, sync::compact::Target, Error},
Context,
};
use commonware_codec::{Decode as _, Encode as _, FixedSize, Read};
use commonware_cryptography::{Digest, Hasher};
use commonware_parallel::Strategy;
use commonware_utils::{sequence::prefixed_u64::U64, sync::RwLock};
const SLOT_A_LAST_COMMIT_OP_PREFIX: u8 = 5;
const SLOT_A_LAST_COMMIT_PROOF_PREFIX: u8 = 6;
const SLOT_B_LAST_COMMIT_OP_PREFIX: u8 = 7;
const SLOT_B_LAST_COMMIT_PROOF_PREFIX: u8 = 8;
const fn last_commit_op_prefix(slot: u8) -> u8 {
if slot == 0 {
SLOT_A_LAST_COMMIT_OP_PREFIX
} else {
SLOT_B_LAST_COMMIT_OP_PREFIX
}
}
const fn last_commit_proof_prefix(slot: u8) -> u8 {
if slot == 0 {
SLOT_A_LAST_COMMIT_PROOF_PREFIX
} else {
SLOT_B_LAST_COMMIT_PROOF_PREFIX
}
}
pub(crate) const fn last_commit_op_key(slot: u8) -> U64 {
U64::new(last_commit_op_prefix(slot), 0)
}
pub(crate) const fn last_commit_proof_key(slot: u8) -> U64 {
U64::new(last_commit_proof_prefix(slot), 0)
}
#[derive(Clone)]
pub(crate) struct ServeState<F: Family, D: Digest> {
pub(crate) root: D,
pub(crate) leaf_count: Location<F>,
pub(crate) pinned_nodes: Vec<D>,
pub(crate) last_commit_op_bytes: Vec<u8>,
pub(crate) last_commit_proof: Proof<F, D>,
}
impl<F: Family, D: Digest> ServeState<F, D> {
pub(crate) const fn target(&self) -> Target<F, D> {
Target {
root: self.root,
leaf_count: self.leaf_count,
}
}
}
pub(crate) struct Cache<F: Family, D: Digest> {
witness: RwLock<ServeState<F, D>>,
}
impl<F: Family, D: Digest> Cache<F, D> {
pub(crate) const fn new(witness: ServeState<F, D>) -> Self {
Self {
witness: RwLock::new(witness),
}
}
pub(crate) fn with<R>(&self, f: impl FnOnce(&ServeState<F, D>) -> R) -> R {
f(&self.witness.read())
}
pub(crate) fn replace(&self, witness: ServeState<F, D>) {
*self.witness.write() = witness;
}
#[cfg(test)]
pub(crate) fn mutate(&self, f: impl FnOnce(&mut ServeState<F, D>)) {
f(&mut self.witness.write());
}
}
pub(crate) fn write_witness_metadata<E, F, D>(
metadata: &mut Metadata<E, U64, Vec<u8>>,
slot: u8,
witness: &ServeState<F, D>,
) where
E: Context,
F: Family,
D: Digest,
{
metadata.put(
last_commit_op_key(slot),
witness.last_commit_op_bytes.clone(),
);
metadata.put(
last_commit_proof_key(slot),
witness.last_commit_proof.encode().to_vec(),
);
}
pub(crate) fn validate_inactivity_floor<F: Family>(
inactivity_floor_loc: Location<F>,
last_commit_loc: Location<F>,
) -> Result<(), Error<F>> {
if inactivity_floor_loc > last_commit_loc {
return Err(Error::DataCorrupted("invalid compact witness"));
}
Ok(())
}
#[allow(clippy::type_complexity)]
pub(crate) fn witness_from_authenticated_state<F, E, D, S>(
merkle: &compact::Merkle<F, E, D, S>,
root: D,
inactivity_floor_loc: Location<F>,
last_commit_op_bytes: Vec<u8>,
last_commit_proof: Proof<F, D>,
pinned_nodes: Vec<D>,
) -> Result<(Location<F>, ServeState<F, D>), Error<F>>
where
F: Family,
E: Context,
D: Digest,
S: Strategy,
{
if merkle.leaves() == 0 {
return Err(Error::DataCorrupted("missing final commit"));
}
let leaf_count = merkle.leaves();
let last_commit_loc = Location::<F>::new(*leaf_count - 1);
validate_inactivity_floor(inactivity_floor_loc, last_commit_loc)?;
let witness = ServeState {
root,
leaf_count,
pinned_nodes,
last_commit_op_bytes,
last_commit_proof,
};
Ok((last_commit_loc, witness))
}
pub(crate) async fn load_active_witness<F, E, H, S, C, Op, LastCommitFloor>(
merkle: &compact::Merkle<F, E, H::Digest, S>,
commit_codec_config: &C,
last_commit_floor: LastCommitFloor,
) -> Result<(ServeState<F, H::Digest>, Op), Error<F>>
where
F: Family,
E: Context,
H: Hasher,
S: Strategy,
Op: Read<Cfg = C>,
LastCommitFloor: FnOnce(&Op) -> Option<Location<F>>,
{
let slot = merkle.active_slot();
let last_commit_op_bytes = merkle
.read_metadata_key(&last_commit_op_key(slot))
.await
.ok_or(Error::DataCorrupted("missing compact witness"))?;
let last_commit_proof_bytes = merkle
.read_metadata_key(&last_commit_proof_key(slot))
.await
.ok_or(Error::DataCorrupted("missing compact witness"))?;
let max_digests = last_commit_proof_bytes.len() / H::Digest::SIZE;
let last_commit_proof =
Proof::<F, H::Digest>::decode_cfg(last_commit_proof_bytes.as_ref(), &max_digests)
.map_err(|_| Error::DataCorrupted("invalid compact witness"))?;
let leaf_count = last_commit_proof.leaves;
if leaf_count == 0 {
return Err(Error::DataCorrupted("invalid compact witness"));
}
let last_commit_loc = Location::new(*leaf_count - 1);
let last_commit_op = Op::decode_cfg(last_commit_op_bytes.as_ref(), commit_codec_config)
.map_err(|_| Error::DataCorrupted("invalid commit operation"))?;
let inactivity_floor_loc = last_commit_floor(&last_commit_op)
.ok_or(Error::DataCorrupted("last operation was not a commit"))?;
validate_inactivity_floor(inactivity_floor_loc, last_commit_loc)?;
let inactive_peaks =
F::inactive_peaks(F::location_to_position(leaf_count), inactivity_floor_loc);
let hasher = qmdb::hasher::<H>();
let root = merkle
.root(&hasher, inactive_peaks)
.map_err(|_| Error::DataCorrupted("failed to compute compact witness root"))?;
if !last_commit_proof.verify_range_inclusion(
&hasher,
&[last_commit_op_bytes.as_slice()],
last_commit_loc,
&root,
) {
return Err(Error::DataCorrupted("invalid compact witness"));
}
let pinned_nodes = merkle.with_mem(|mem| {
F::nodes_to_pin(leaf_count)
.map(|pos| *mem.get_node_unchecked(pos))
.collect::<Vec<_>>()
});
let witness = ServeState {
root,
leaf_count,
pinned_nodes,
last_commit_op_bytes,
last_commit_proof,
};
Ok((witness, last_commit_op))
}
pub(crate) async fn bootstrap_initial_commit<F, E, H, S>(
merkle: &mut compact::Merkle<F, E, H::Digest, S>,
last_commit_op_bytes: Vec<u8>,
) -> Result<(), Error<F>>
where
F: Family,
E: Context,
H: Hasher,
S: Strategy,
{
let hasher = qmdb::hasher::<H>();
let batch = {
let batch = merkle.new_batch().add(&hasher, &last_commit_op_bytes);
merkle.with_mem(|mem| batch.merkleize(mem, &hasher))
};
merkle.apply_batch(&batch)?;
let leaf_count = merkle.leaves();
let inactive_peaks = F::inactive_peaks(F::location_to_position(leaf_count), Location::new(0));
merkle
.sync_with_witness(
|mem| {
let root = mem.root(&hasher, inactive_peaks)?;
let last_commit_proof = mem.proof(&hasher, Location::new(0), inactive_peaks)?;
Ok(ServeState {
root,
leaf_count: Location::new(1),
pinned_nodes: Vec::new(),
last_commit_op_bytes: last_commit_op_bytes.clone(),
last_commit_proof,
})
},
|metadata, slot, witness| {
write_witness_metadata(metadata, slot, &witness);
Ok(())
},
)
.await?;
Ok(())
}
pub(crate) async fn persist_witness<F, E, H, S>(
merkle: &compact::Merkle<F, E, H::Digest, S>,
cache: &Cache<F, H::Digest>,
last_commit_loc: Location<F>,
inactivity_floor_loc: Location<F>,
last_commit_op_bytes: Vec<u8>,
) -> Result<(), Error<F>>
where
F: Family,
E: Context,
H: Hasher,
S: Strategy,
{
let hasher = qmdb::hasher::<H>();
merkle
.sync_with_witness(
|mem| {
let mem_leaves = mem.leaves();
if let Some(cached) = cache
.with(|witness| (witness.leaf_count == mem_leaves).then(|| witness.clone()))
{
return Ok(cached);
}
let inactive_peaks =
F::inactive_peaks(F::location_to_position(mem_leaves), inactivity_floor_loc);
let mem_root = mem.root(&hasher, inactive_peaks)?;
let pinned_nodes = F::nodes_to_pin(mem_leaves)
.map(|pos| *mem.get_node_unchecked(pos))
.collect::<Vec<_>>();
let last_commit_proof = mem.proof(&hasher, last_commit_loc, inactive_peaks)?;
Ok(ServeState {
root: mem_root,
leaf_count: mem_leaves,
pinned_nodes,
last_commit_op_bytes: last_commit_op_bytes.clone(),
last_commit_proof,
})
},
|metadata, slot, witness| {
write_witness_metadata(metadata, slot, &witness);
cache.replace(witness);
Ok(())
},
)
.await?;
Ok(())
}
pub(crate) async fn persist_cached_witness<F, E, H, S>(
merkle: &compact::Merkle<F, E, H::Digest, S>,
cache: &Cache<F, H::Digest>,
) -> Result<(), Error<F>>
where
F: Family,
E: Context,
H: Hasher,
S: Strategy,
{
let witness = cache.with(Clone::clone);
merkle
.sync_with_witness(
|_| Ok(witness),
|metadata, slot, witness| {
write_witness_metadata(metadata, slot, &witness);
Ok(())
},
)
.await
.map(|_| ())
.map_err(Into::into)
}