use crate::{Config, PhasedScheme, ValidatingScheme};
use bytes::BufMut;
use commonware_codec::{Encode, EncodeSize, FixedSize, RangeCfg, Read, ReadExt, Write};
use commonware_cryptography::{
transcript::{Summary, Transcript},
Digest, Hasher,
};
use commonware_math::{
fields::goldilocks::F,
ntt::{EvaluationVector, Matrix},
};
use commonware_parallel::Strategy;
use commonware_storage::bmt::{Builder as BmtBuilder, Error as BmtError, Proof};
use rand::seq::SliceRandom as _;
use std::{marker::PhantomData, sync::Arc};
use thiserror::Error;
fn iter_u64_le(data: impl bytes::Buf) -> impl Iterator<Item = u64> {
struct Iter<B> {
remaining_u64s: usize,
tail: usize,
inner: B,
}
impl<B: bytes::Buf> Iter<B> {
fn new(inner: B) -> Self {
let remaining_u64s = inner.remaining() / 8;
let tail = inner.remaining() % 8;
Self {
remaining_u64s,
tail,
inner,
}
}
}
impl<B: bytes::Buf> Iterator for Iter<B> {
type Item = u64;
fn next(&mut self) -> Option<Self::Item> {
if self.remaining_u64s > 0 {
self.remaining_u64s -= 1;
return Some(self.inner.get_u64_le());
}
if self.tail > 0 {
let mut chunk = [0u8; 8];
self.inner.copy_to_slice(&mut chunk[..self.tail]);
self.tail = 0;
return Some(u64::from_le_bytes(chunk));
}
None
}
}
Iter::new(data)
}
fn collect_u64_le(max_length: usize, data: impl Iterator<Item = u64>) -> Vec<u8> {
let mut out = Vec::with_capacity(max_length);
for d in data {
out.extend_from_slice(&d.to_le_bytes());
}
out.truncate(max_length);
out
}
fn row_digest<H: Hasher>(row: &[F]) -> H::Digest {
let mut h = H::new();
for x in row {
h.update(&x.to_le_bytes());
}
h.finalize()
}
mod topology;
use topology::Topology;
#[derive(Clone, Debug)]
pub struct StrongShard<D: Digest> {
data_bytes: usize,
root: D,
inclusion_proof: Proof<D>,
rows: Matrix<F>,
checksum: Arc<Matrix<F>>,
}
impl<D: Digest> PartialEq for StrongShard<D> {
fn eq(&self, other: &Self) -> bool {
self.data_bytes == other.data_bytes
&& self.root == other.root
&& self.inclusion_proof == other.inclusion_proof
&& self.rows == other.rows
&& self.checksum == other.checksum
}
}
impl<D: Digest> Eq for StrongShard<D> {}
impl<D: Digest> EncodeSize for StrongShard<D> {
fn encode_size(&self) -> usize {
self.data_bytes.encode_size()
+ self.root.encode_size()
+ self.inclusion_proof.encode_size()
+ self.rows.encode_size()
+ self.checksum.encode_size()
}
}
impl<D: Digest> Write for StrongShard<D> {
fn write(&self, buf: &mut impl BufMut) {
self.data_bytes.write(buf);
self.root.write(buf);
self.inclusion_proof.write(buf);
self.rows.write(buf);
self.checksum.write(buf);
}
}
impl<D: Digest> Read for StrongShard<D> {
type Cfg = crate::CodecConfig;
fn read_cfg(
buf: &mut impl bytes::Buf,
cfg: &Self::Cfg,
) -> Result<Self, commonware_codec::Error> {
let data_bytes = usize::read_cfg(buf, &RangeCfg::from(..=cfg.maximum_shard_size))?;
let max_els = cfg.maximum_shard_size / F::SIZE;
Ok(Self {
data_bytes,
root: ReadExt::read(buf)?,
inclusion_proof: Read::read_cfg(buf, &max_els)?,
rows: Read::read_cfg(buf, &(max_els, ()))?,
checksum: Arc::new(Read::read_cfg(buf, &(max_els, ()))?),
})
}
}
#[cfg(feature = "arbitrary")]
impl<D: Digest> arbitrary::Arbitrary<'_> for StrongShard<D>
where
D: for<'a> arbitrary::Arbitrary<'a>,
{
fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
Ok(Self {
data_bytes: u.arbitrary::<u32>()? as usize,
root: u.arbitrary()?,
inclusion_proof: u.arbitrary()?,
rows: u.arbitrary()?,
checksum: Arc::new(u.arbitrary()?),
})
}
}
#[derive(Clone, Debug)]
pub struct WeakShard<D: Digest> {
inclusion_proof: Proof<D>,
shard: Matrix<F>,
}
impl<D: Digest> PartialEq for WeakShard<D> {
fn eq(&self, other: &Self) -> bool {
self.inclusion_proof == other.inclusion_proof && self.shard == other.shard
}
}
impl<D: Digest> Eq for WeakShard<D> {}
impl<D: Digest> EncodeSize for WeakShard<D> {
fn encode_size(&self) -> usize {
self.inclusion_proof.encode_size() + self.shard.encode_size()
}
}
impl<D: Digest> Write for WeakShard<D> {
fn write(&self, buf: &mut impl BufMut) {
self.inclusion_proof.write(buf);
self.shard.write(buf);
}
}
impl<D: Digest> Read for WeakShard<D> {
type Cfg = crate::CodecConfig;
fn read_cfg(
buf: &mut impl bytes::Buf,
cfg: &Self::Cfg,
) -> Result<Self, commonware_codec::Error> {
let max_data_bits = cfg.maximum_shard_size.saturating_mul(8);
let max_data_els = F::bits_to_elements(max_data_bits).max(1);
Ok(Self {
inclusion_proof: Read::read_cfg(buf, &max_data_els)?,
shard: Read::read_cfg(buf, &(max_data_els, ()))?,
})
}
}
#[cfg(feature = "arbitrary")]
impl<D: Digest> arbitrary::Arbitrary<'_> for WeakShard<D>
where
D: for<'a> arbitrary::Arbitrary<'a>,
{
fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
Ok(Self {
inclusion_proof: u.arbitrary()?,
shard: u.arbitrary()?,
})
}
}
#[derive(Clone)]
pub struct CheckedShard {
index: usize,
shard: Matrix<F>,
commitment: Summary,
}
fn shuffle_indices(transcript: &Transcript, total: usize) -> Vec<u32> {
let total: u32 = total
.try_into()
.expect("encoded_rows exceeds u32::MAX; data too large for ZODA");
let mut out = (0..total).collect::<Vec<_>>();
out.shuffle(&mut transcript.noise(b"shuffle"));
out
}
fn checking_matrix(transcript: &Transcript, topology: &Topology) -> Matrix<F> {
Matrix::rand(
&mut transcript.noise(b"checking matrix"),
topology.data_cols,
topology.column_samples,
)
}
#[derive(Clone, PartialEq)]
pub struct CheckingData<D: Digest> {
commitment: Summary,
topology: Topology,
root: D,
checking_matrix: Matrix<F>,
encoded_checksum: Matrix<F>,
shuffled_indices: Vec<u32>,
}
impl<D: Digest> Eq for CheckingData<D> {}
impl<D: Digest> CheckingData<D> {
fn reckon(
config: &Config,
commitment: &Summary,
data_bytes: usize,
root: D,
checksum: &Matrix<F>,
) -> Result<Self, Error> {
let topology = Topology::reckon(config, data_bytes);
let mut transcript = Transcript::new(NAMESPACE);
transcript.commit((topology.data_bytes as u64).encode());
transcript.commit(root.encode());
let expected_commitment = transcript.summarize();
if *commitment != expected_commitment {
return Err(Error::InvalidShard);
}
let mut transcript = Transcript::resume(expected_commitment);
let checking_matrix = checking_matrix(&transcript, &topology);
if checksum.rows() != topology.data_rows || checksum.cols() != topology.column_samples {
return Err(Error::InvalidShard);
}
transcript.commit(checksum.encode());
let encoded_checksum = checksum
.as_polynomials(topology.encoded_rows)
.expect("checksum has too many rows")
.evaluate()
.data();
let shuffled_indices = shuffle_indices(&transcript, topology.encoded_rows);
Ok(Self {
commitment: expected_commitment,
topology,
root,
checking_matrix,
encoded_checksum,
shuffled_indices,
})
}
fn check<H: Hasher<Digest = D>>(
&self,
commitment: &Summary,
index: u16,
weak_shard: &WeakShard<D>,
) -> Result<CheckedShard, Error> {
if self.commitment != *commitment {
return Err(Error::InvalidShard);
}
self.topology.check_index(index)?;
if weak_shard.shard.rows() != self.topology.samples
|| weak_shard.shard.cols() != self.topology.data_cols
{
return Err(Error::InvalidWeakShard);
}
let index = index as usize;
let these_shuffled_indices = &self.shuffled_indices
[index * self.topology.samples..(index + 1) * self.topology.samples];
let proof_elements: Vec<(H::Digest, u32)> = these_shuffled_indices
.iter()
.zip(weak_shard.shard.iter())
.map(|(&i, row)| (row_digest::<H>(row), i))
.collect();
let mut hasher = H::new();
if weak_shard
.inclusion_proof
.verify_multi_inclusion(&mut hasher, &proof_elements, &self.root)
.is_err()
{
return Err(Error::InvalidWeakShard);
}
let shard_checksum = weak_shard.shard.mul(&self.checking_matrix);
for (row, &i) in shard_checksum.iter().zip(these_shuffled_indices) {
if row != &self.encoded_checksum[i as usize] {
return Err(Error::InvalidWeakShard);
}
}
Ok(CheckedShard {
index,
shard: weak_shard.shard.clone(),
commitment: *commitment,
})
}
}
#[derive(Debug, Error)]
pub enum Error {
#[error("invalid shard")]
InvalidShard,
#[error("invalid weak shard")]
InvalidWeakShard,
#[error("invalid index {0}")]
InvalidIndex(u16),
#[error("insufficient shards {0} < {1}")]
InsufficientShards(usize, usize),
#[error("insufficient unique rows {0} < {1}")]
InsufficientUniqueRows(usize, usize),
#[error("failed to create inclusion proof: {0}")]
FailedToCreateInclusionProof(BmtError),
}
const NAMESPACE: &[u8] = b"_COMMONWARE_CODING_ZODA";
#[derive(Clone, Copy)]
pub struct Zoda<H> {
_marker: PhantomData<H>,
}
impl<H> std::fmt::Debug for Zoda<H> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Zoda")
}
}
impl<H: Hasher> PhasedScheme for Zoda<H> {
type Commitment = Summary;
type StrongShard = StrongShard<H::Digest>;
type WeakShard = WeakShard<H::Digest>;
type CheckingData = CheckingData<H::Digest>;
type CheckedShard = CheckedShard;
type Error = Error;
fn encode(
config: &Config,
data: impl bytes::Buf,
strategy: &impl Strategy,
) -> Result<(Self::Commitment, Vec<Self::StrongShard>), Self::Error> {
let data_bytes = data.remaining();
let topology = Topology::reckon(config, data_bytes);
let data = Matrix::init(
topology.data_rows,
topology.data_cols,
F::stream_from_u64s(iter_u64_le(data)),
);
let encoded_data = data
.as_polynomials(topology.encoded_rows)
.expect("data has too many rows")
.evaluate()
.data();
let row_hashes: Vec<H::Digest> = strategy.map_collect_vec(0..encoded_data.rows(), |i| {
row_digest::<H>(&encoded_data[i])
});
let mut bmt_builder = BmtBuilder::<H>::new(row_hashes.len());
for hash in &row_hashes {
bmt_builder.add(hash);
}
let bmt = bmt_builder.build();
let root = bmt.root();
let mut transcript = Transcript::new(NAMESPACE);
transcript.commit((topology.data_bytes as u64).encode());
transcript.commit(root.encode());
let commitment = transcript.summarize();
let mut transcript = Transcript::resume(commitment);
let checking_matrix = checking_matrix(&transcript, &topology);
let checksum = Arc::new(data.mul(&checking_matrix));
transcript.commit(checksum.encode());
let shuffled_indices = shuffle_indices(&transcript, encoded_data.rows());
let shard_results: Vec<Result<StrongShard<H::Digest>, Error>> =
strategy.map_collect_vec(0..topology.total_shards, |shard_idx| {
let indices = &shuffled_indices
[shard_idx * topology.samples..(shard_idx + 1) * topology.samples];
let rows = Matrix::init(
indices.len(),
topology.data_cols,
indices
.iter()
.flat_map(|&i| encoded_data[i as usize].iter().copied()),
);
let inclusion_proof = bmt
.multi_proof(indices)
.map_err(Error::FailedToCreateInclusionProof)?;
Ok(StrongShard {
data_bytes,
root,
inclusion_proof,
rows,
checksum: checksum.clone(),
})
});
let shards = shard_results
.into_iter()
.collect::<Result<Vec<_>, Error>>()?;
Ok((commitment, shards))
}
fn weaken(
config: &Config,
commitment: &Self::Commitment,
index: u16,
shard: Self::StrongShard,
) -> Result<(Self::CheckingData, Self::CheckedShard, Self::WeakShard), Self::Error> {
let weak_shard = WeakShard {
inclusion_proof: shard.inclusion_proof,
shard: shard.rows,
};
let checking_data = CheckingData::reckon(
config,
commitment,
shard.data_bytes,
shard.root,
shard.checksum.as_ref(),
)?;
let checked_shard = checking_data.check::<H>(commitment, index, &weak_shard)?;
Ok((checking_data, checked_shard, weak_shard))
}
fn check(
_config: &Config,
commitment: &Self::Commitment,
checking_data: &Self::CheckingData,
index: u16,
weak_shard: Self::WeakShard,
) -> Result<Self::CheckedShard, Self::Error> {
checking_data.check::<H>(commitment, index, &weak_shard)
}
fn decode<'a>(
_config: &Config,
commitment: &Self::Commitment,
checking_data: Self::CheckingData,
shards: impl Iterator<Item = &'a Self::CheckedShard>,
_strategy: &impl Strategy,
) -> Result<Vec<u8>, Self::Error> {
if checking_data.commitment != *commitment {
return Err(Error::InvalidShard);
}
let Topology {
encoded_rows,
data_cols,
samples,
data_rows,
data_bytes,
min_shards,
..
} = checking_data.topology;
let mut evaluation = EvaluationVector::<F>::empty(encoded_rows.ilog2() as usize, data_cols);
let mut shard_count = 0usize;
for shard in shards {
shard_count += 1;
if shard.commitment != *commitment {
return Err(Error::InvalidShard);
}
let indices =
&checking_data.shuffled_indices[shard.index * samples..(shard.index + 1) * samples];
for (&i, row) in indices.iter().zip(shard.shard.iter()) {
evaluation.fill_row(u64::from(i) as usize, row);
}
}
if shard_count < min_shards {
return Err(Error::InsufficientShards(shard_count, min_shards));
}
let filled_rows = evaluation.filled_rows();
if filled_rows < data_rows {
return Err(Error::InsufficientUniqueRows(filled_rows, data_rows));
}
Ok(collect_u64_le(
data_bytes,
F::stream_to_u64s(
evaluation
.recover()
.coefficients_up_to(data_rows)
.flatten()
.copied(),
),
))
}
}
impl<H: Hasher> ValidatingScheme for Zoda<H> {}
#[cfg(test)]
mod tests {
use super::*;
use crate::{Config, PhasedScheme};
use commonware_cryptography::Sha256;
use commonware_math::{
algebra::{FieldNTT as _, Ring as _},
ntt::PolynomialVector,
};
use commonware_parallel::Sequential;
use commonware_utils::NZU16;
const STRATEGY: Sequential = Sequential;
#[test]
fn decode_rejects_duplicate_indices() {
let config = Config {
minimum_shards: NZU16!(2),
extra_shards: NZU16!(1),
};
let data = b"duplicate shard coverage";
let (commitment, shards) = Zoda::<Sha256>::encode(&config, &data[..], &STRATEGY).unwrap();
let shard0 = shards[0].clone();
let (checking_data, checked_shard0, _weak_shard0) =
Zoda::<Sha256>::weaken(&config, &commitment, 0, shard0).unwrap();
let duplicate = CheckedShard {
index: checked_shard0.index,
shard: checked_shard0.shard.clone(),
commitment: checked_shard0.commitment,
};
let shards = [checked_shard0, duplicate];
let result = Zoda::<Sha256>::decode(
&config,
&commitment,
checking_data,
shards.iter(),
&STRATEGY,
);
match result {
Err(Error::InsufficientUniqueRows(actual, expected)) => {
assert!(actual < expected);
}
other => panic!("expected insufficient unique rows error, got {other:?}"),
}
}
#[test]
fn checksum_malleability() {
fn vanishing(lg_domain: u8, vanish_indices: &[u32]) -> PolynomialVector<F> {
let w = F::root_of_unity(lg_domain).expect("domain too large for Goldilocks");
let mut domain = Vec::with_capacity(1usize << lg_domain);
let mut x = F::one();
for _ in 0..(1usize << lg_domain) {
domain.push(x);
x *= &w;
}
let roots: Vec<F> = vanish_indices.iter().map(|&i| domain[i as usize]).collect();
let mut out = EvaluationVector::empty(lg_domain as usize, 1);
domain.into_iter().enumerate().for_each(|(i, x)| {
let mut acc = F::one();
for root in &roots {
acc *= &(x - root);
}
out.fill_row(i, &[acc]);
});
out.recover()
}
let config = Config {
minimum_shards: NZU16!(2),
extra_shards: NZU16!(1),
};
let data = vec![0x5Au8; 256 * 1024];
let (commitment, mut shards) =
Zoda::<Sha256>::encode(&config, &data[..], &STRATEGY).unwrap();
let leader_i = 0usize;
let a_i = 1usize;
let b_i = 2usize;
{
let (checking_data, _, _) = Zoda::<Sha256>::weaken(
&config,
&commitment,
leader_i as u16,
shards[leader_i].clone(),
)
.unwrap();
let samples = checking_data.topology.samples;
let a_indices =
checking_data.shuffled_indices[a_i * samples..(a_i + 1) * samples].to_vec();
let lg_rows = checking_data.topology.encoded_rows.ilog2() as usize;
let shift = vanishing(lg_rows as u8, &a_indices);
let mut checksum = (*shards[1].checksum).clone();
for (i, shift_i) in shift.coefficients_up_to(checksum.rows()).enumerate() {
for j in 0..checksum.cols() {
checksum[(i, j)] += &shift_i[0];
}
}
shards[1].checksum = Arc::new(checksum);
shards[2].checksum = shards[1].checksum.clone();
}
assert!(matches!(
Zoda::<Sha256>::weaken(&config, &commitment, b_i as u16, shards[b_i].clone()),
Err(Error::InvalidWeakShard)
));
assert!(matches!(
Zoda::<Sha256>::weaken(&config, &commitment, a_i as u16, shards[a_i].clone()),
Err(Error::InvalidWeakShard)
));
}
#[cfg(feature = "arbitrary")]
mod conformance {
use super::*;
use commonware_codec::conformance::CodecConformance;
use commonware_cryptography::sha256::Digest as Sha256Digest;
commonware_conformance::conformance_tests! {
CodecConformance<StrongShard<Sha256Digest>>,
CodecConformance<WeakShard<Sha256Digest>>,
}
}
}