commonware_coding/
zoda.rs

1//! This module implements the [ZODA](https://eprint.iacr.org/2025/034) coding scheme.
2//!
3//! At a high level, the scheme works like any other coding scheme: you start with
4//! a piece of data, and split it into shards, and a commitment. Each shard can
5//! be checked to belong to the commitment, and, given enough shards, the data can
6//! be reconstructed.
7//!
8//! What makes ZODA interesting is that upon receiving and checking one shard,
9//! you become convinced that there exists an original piece of data that will
10//! be reconstructable given enough shards. This fails in the case of, e.g.,
11//! plain Reed-Solomon coding. For example, if you give people random shards,
12//! instead of actually encoding data, then when they attempt to reconstruct the
13//! data, they can come to different results depending on which shards they use.
14//!
15//! Ultimately, this stems from the fact that you can't know if your shard comes
16//! from a valid encoding of the data until you have enough shards to reconstruct
17//! the data. With ZODA, you know that the shard comes from a valid encoding as
18//! soon as you've checked it.
19//!
20//! # Variant
21//!
22//! ZODA supports different configurations based on the coding scheme you use
23//! for sharding data, and for checking it.
24//!
25//! We use the Reed-Solomon and Hadamard variant of ZODA: in essence, this means
26//! that the shards are Reed-Solomon encoded, and we include additional checksum
27//! data which does not help reconstruct the data.
28//!
29//! ## Deviations
30//!
31//! In the paper, a sample consists of rows chosen at random from the encoding of
32//! the data. With multiple participants receiving samples, they might receive
33//! overlapping samples, which we don't want. Instead, we shuffle the rows of
34//! the encoded data, and each participant receives a different segment.
35//! From that participant's perspective, they've received a completely random
36//! choice of rows. The other participants' rows are less random, since they're
37//! guaranteed to not overlap. However, no guarantee on the randomness of the other
38//! rows is required: each sample is large enough to guarantee that the data
39//! has been validly encoded.
40//!
41//! We also use a Fiat-Shamir transform to make all randomness sampled
42//! non-interactively, based on the commitment to the encoded data.
43//!
44//! # Protocol
45//!
46//! Let n denote the minimum number of shards needed to recover the data.
47//! Let k denote the number of extra shards to generate.
48//!
49//! We consider the data as being an array of elements in a field F, of 64 bits.
50//!
51//! Given n and k, we have a certain number of required samples R.
52//! We can split these into row samples S, and column samples S',
53//! such that S * S' = R.
54//!
55//! Given a choice of S, our data will need to be arranged into a matrix of size
56//!
57//!   n S x c
58//!
59//! with c being >= 1.
60//!
61//! We choose S as close to R as possible without padding the data. We then
62//! choose S' so that S * S' >= R.
63//!
64//! We also then double S', because the field over which we compute checksums
65//! only has 64 bits. This effectively makes the checksum calculated over the
66//! extension field F^2. Because we don't actually need to multiply elements
67//! in F^2 together, but only ever take linear combinations with elements in F,
68//! we can effectively compute over the larger field simply by using 2 "virtual"
69//! checksum columns per required column.
70//!
71//! For technical reasons, the encoded data will have not have (n + k) S rows,
72//! but pad((n + k) S) rows, where pad returns the next power of two.
73//! This is to our advantage, in that given n shards, we will be able to reconstruct
74//! the data, but these shards consists of rows sampled at random from
75//! pad((n + k) S) rows, thus requiring fewer samples.
76//!
77//! ## Encoding
78//!
79//! 1. The data is arranged as a matrix X of size n S x c.
80//! 2. The data is Reed-Solomon encoded, turning it into a matrix X' of size pad((n + k) S) x c.
81//! 3. The rows of X' are committed to using a vector commitment V (concretely, a Merkle Tree).
82//! 4. V, along with the size of the data, in bytes, are committed to, producing Com.
83//! 5. Com is hashed to create randomness, first to generate a matrix H of size c x S',
84//!    and then to shuffle the rows of X'.
85//! 6. Z := X H, a matrix of size n S x S' is computed.
86//! 7. The ith shard (starting from 0) then consists of:
87//!    - the size of the data, in bytes,
88//!    - the vector commitment, V,
89//!    - the checksum Z,
90//!    - rows i * S..(i + 1) * S of Y, along with a proof of inclusion in V, at the original index.
91//!
92//! ## Re-Sharding
93//!
94//! When re-transmitting a shard to other people, only the following are transmitted:
95//! - rows i * S..(i + 1) * S of Y, along with the inclusion proofs.
96//!
97//! ## Checking
98//!
99//! Let A_{S} denote the matrix formed by taking the rows in a given subset S.
100//!
101//! 1. Check that Com is the hash of V and the size of the data, in bytes.
102//! 2. Use Com to compute H of size c x S', and figure recompute the ith row sample S_i.
103//! 3. Check that Z is of size n S x S'.
104//! 4. Encode Z to get Z', a matrix of size pad((n + k) S) x S'.
105//!
106//! These steps now depend on the particular shard.
107//!
108//! 5. Check that X'_{S_i} (the shard's data) is a matrix of size S x c.
109//! 6. Use the inclusion proofs to check that each row of X'_{S_i} is included in V,
110//!    at the correct index.
111//! 7. Check that X'_{S_i} H = Z'_{S_i}
112//!
113//! ## Decoding
114//!
115//! 1. Given n checked shards, you have n S encoded rows, which can be Reed-Solomon decoded.
116use crate::{Config, Scheme, ValidatingScheme};
117use bytes::BufMut;
118use commonware_codec::{Encode, EncodeSize, FixedSize, RangeCfg, Read, ReadExt, Write};
119use commonware_cryptography::{
120    transcript::{Summary, Transcript},
121    Hasher,
122};
123use commonware_math::{
124    fields::goldilocks::F,
125    ntt::{EvaluationVector, Matrix},
126};
127use commonware_storage::mmr::{
128    mem::DirtyMmr, verification::multi_proof, Error as MmrError, Location, Proof, StandardHasher,
129};
130use futures::executor::block_on;
131use rand::seq::SliceRandom as _;
132use rayon::{iter::ParallelIterator, prelude::IntoParallelIterator, ThreadPoolBuilder};
133use std::{marker::PhantomData, sync::Arc};
134use thiserror::Error;
135
136/// Create an iterator over the data of a buffer, interpreted as little-endian u64s.
137fn iter_u64_le(data: impl bytes::Buf) -> impl Iterator<Item = u64> {
138    struct Iter<B> {
139        remaining_u64s: usize,
140        tail: usize,
141        inner: B,
142    }
143
144    impl<B: bytes::Buf> Iter<B> {
145        fn new(inner: B) -> Self {
146            let remaining_u64s = inner.remaining() / 8;
147            let tail = inner.remaining() % 8;
148            Self {
149                remaining_u64s,
150                tail,
151                inner,
152            }
153        }
154    }
155
156    impl<B: bytes::Buf> Iterator for Iter<B> {
157        type Item = u64;
158
159        fn next(&mut self) -> Option<Self::Item> {
160            if self.remaining_u64s > 0 {
161                self.remaining_u64s -= 1;
162                return Some(self.inner.get_u64_le());
163            }
164            if self.tail > 0 {
165                let mut chunk = [0u8; 8];
166                self.inner.copy_to_slice(&mut chunk[..self.tail]);
167                self.tail = 0;
168                return Some(u64::from_le_bytes(chunk));
169            }
170            None
171        }
172    }
173    Iter::new(data)
174}
175
176fn collect_u64_le(max_length: usize, data: impl Iterator<Item = u64>) -> Vec<u8> {
177    let mut out = Vec::with_capacity(max_length);
178    for d in data {
179        out.extend_from_slice(&d.to_le_bytes());
180    }
181    out.truncate(max_length);
182    out
183}
184
185fn row_digest<H: Hasher>(row: &[F]) -> H::Digest {
186    let mut h = H::new();
187    for x in row {
188        h.update(&x.to_le_bytes());
189    }
190    h.finalize()
191}
192
193mod topology {
194    use super::Error;
195    use crate::Config;
196    use commonware_math::fields::goldilocks::F;
197    use commonware_utils::BigRationalExt as _;
198    use num_rational::BigRational;
199
200    const SECURITY_BITS: usize = 126;
201    // Fractional precision for log2 calculations when computing required samples.
202    // We use the next power of 2 above SECURITY_BITS (128 = 2^7), which provides
203    // 1/128 fractional precision, sufficient for these security calculations.
204    const LOG2_PRECISION: usize = SECURITY_BITS.next_power_of_two().trailing_zeros() as usize;
205
206    /// Contains the sizes of various objects in the protocol.
207    #[derive(Debug, Clone, Copy, PartialEq)]
208    pub struct Topology {
209        /// How many bytes the data has.
210        pub data_bytes: usize,
211        /// How many columns the data has.
212        pub data_cols: usize,
213        /// How many rows the data has.
214        pub data_rows: usize,
215        /// How many rows the encoded data has.
216        pub encoded_rows: usize,
217        /// How many samples each shard has.
218        pub samples: usize,
219        /// How many column samples we need.
220        pub column_samples: usize,
221        /// How many shards we need to recover.
222        pub min_shards: usize,
223        /// How many shards there are in total (each shard containing multiple rows).
224        pub total_shards: usize,
225    }
226
227    impl Topology {
228        const fn with_cols(data_bytes: usize, n: usize, k: usize, cols: usize) -> Self {
229            let data_els = F::bits_to_elements(8 * data_bytes);
230            let data_rows = data_els.div_ceil(cols);
231            let samples = data_rows.div_ceil(n);
232            Self {
233                data_bytes,
234                data_cols: cols,
235                data_rows,
236                encoded_rows: ((n + k) * samples).next_power_of_two(),
237                samples,
238                column_samples: 0,
239                min_shards: n,
240                total_shards: n + k,
241            }
242        }
243
244        pub(crate) fn required_samples(&self) -> usize {
245            let k = BigRational::from_usize(self.encoded_rows - self.data_rows);
246            let m = BigRational::from_usize(self.encoded_rows);
247            let fraction = (&k + BigRational::from_u64(1)) / (BigRational::from_usize(2) * &m);
248
249            // Compute log2(one_minus). When m is close to n, one_minus is close to 1, making log2(one_minus)
250            // a small negative value that requires sufficient precision to correctly capture the sign.
251            let one_minus = BigRational::from_usize(1) - &fraction;
252            let log_term = one_minus.log2_ceil(LOG2_PRECISION);
253            if log_term >= BigRational::from_u64(0) {
254                return usize::MAX;
255            }
256
257            let required = BigRational::from_usize(SECURITY_BITS) / -log_term;
258            required.ceil_to_u128().unwrap_or(u128::MAX) as usize
259        }
260
261        fn correct_column_samples(&mut self) {
262            // We make sure we have enough column samples to get 126 bits of security.
263            //
264            // This effectively does two elements per column. To get strictly greater
265            // than 128 bits, we would need to add another column per column_sample.
266            // We also have less than 128 bits in other places because of the bounds
267            // on the messages encoded size.
268            self.column_samples =
269                F::bits_to_elements(SECURITY_BITS) * self.required_samples().div_ceil(self.samples);
270        }
271
272        /// Figure out what size different values will have, based on the config and the data.
273        pub fn reckon(config: &Config, data_bytes: usize) -> Self {
274            let n = config.minimum_shards as usize;
275            let k = config.extra_shards as usize;
276            // The following calculations don't tolerate data_bytes = 0, so we
277            // temporarily correct that to be at least 1, then make sure to adjust
278            // it back again to 0.
279            let corrected_data_bytes = data_bytes.max(1);
280            // The goal here is to try and maximize the number of columns in the
281            // data. ZODA is more efficient the more columns there are. However,
282            // we need to make sure that every shard has enough samples to guarantee
283            // correct encoding, and that the number of encoded rows can contain
284            // all of the samples in each shard, without overlap.
285            //
286            // To determine if a column configuration is good, we need to choose
287            // the number of encoded rows. To do this, we pick a number of samples
288            // `S` such that `S * n >= data_rows`. Then, our encoded rows will
289            // equal `((n + k) * S).next_power_of_two()`. If the number of required
290            // samples `R` for this configuration satisfies `(n + k) * R <= encoded_rows`,
291            // then this configuration is valid, using `R` as the necessary number
292            // of samples.
293            //
294            // We try increasing column counts, picking the configuration that's good.
295            // It's possible that the first configuration, with one column, is not good.
296            // To correct for that, we need to add extra checksum columns to guarantee
297            // security.
298            let mut out = Self::with_cols(corrected_data_bytes, n, k, 1);
299            loop {
300                let attempt = Self::with_cols(corrected_data_bytes, n, k, out.data_cols + 1);
301                let required_samples = attempt.required_samples();
302                if required_samples.saturating_mul(n + k) <= attempt.encoded_rows {
303                    out = Self {
304                        samples: required_samples.max(attempt.samples),
305                        ..attempt
306                    };
307                } else {
308                    break;
309                }
310            }
311            out.correct_column_samples();
312            out.data_bytes = data_bytes;
313            out
314        }
315
316        pub fn check_index(&self, i: u16) -> Result<(), Error> {
317            if (0..self.total_shards).contains(&(i as usize)) {
318                return Ok(());
319            }
320            Err(Error::InvalidIndex(i))
321        }
322    }
323}
324use topology::Topology;
325
326/// A shard of data produced by the encoding scheme.
327#[derive(Clone)]
328pub struct Shard<H: Hasher> {
329    data_bytes: usize,
330    root: H::Digest,
331    inclusion_proof: Proof<H::Digest>,
332    rows: Matrix,
333    checksum: Arc<Matrix>,
334}
335
336impl<H: Hasher> PartialEq for Shard<H> {
337    fn eq(&self, other: &Self) -> bool {
338        self.data_bytes == other.data_bytes
339            && self.root == other.root
340            && self.inclusion_proof == other.inclusion_proof
341            && self.rows == other.rows
342            && self.checksum == other.checksum
343    }
344}
345
346impl<H: Hasher> Eq for Shard<H> {}
347
348impl<H: Hasher> EncodeSize for Shard<H> {
349    fn encode_size(&self) -> usize {
350        self.data_bytes.encode_size()
351            + self.root.encode_size()
352            + self.inclusion_proof.encode_size()
353            + self.rows.encode_size()
354            + self.checksum.encode_size()
355    }
356}
357
358impl<H: Hasher> Write for Shard<H> {
359    fn write(&self, buf: &mut impl BufMut) {
360        self.data_bytes.write(buf);
361        self.root.write(buf);
362        self.inclusion_proof.write(buf);
363        self.rows.write(buf);
364        self.checksum.write(buf);
365    }
366}
367
368impl<H: Hasher> Read for Shard<H> {
369    type Cfg = crate::CodecConfig;
370
371    fn read_cfg(
372        buf: &mut impl bytes::Buf,
373        cfg: &Self::Cfg,
374    ) -> Result<Self, commonware_codec::Error> {
375        let data_bytes = usize::read_cfg(buf, &RangeCfg::from(..=cfg.maximum_shard_size))?;
376        let max_els = cfg.maximum_shard_size / F::SIZE;
377        Ok(Self {
378            data_bytes,
379            root: ReadExt::read(buf)?,
380            inclusion_proof: Read::read_cfg(buf, &max_els)?,
381            rows: Read::read_cfg(buf, &max_els)?,
382            checksum: Arc::new(Read::read_cfg(buf, &max_els)?),
383        })
384    }
385}
386
387#[cfg(feature = "arbitrary")]
388impl<H: Hasher> arbitrary::Arbitrary<'_> for Shard<H>
389where
390    H::Digest: for<'a> arbitrary::Arbitrary<'a>,
391{
392    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
393        Ok(Self {
394            data_bytes: u.arbitrary::<u32>()? as usize,
395            root: u.arbitrary()?,
396            inclusion_proof: u.arbitrary()?,
397            rows: u.arbitrary()?,
398            checksum: Arc::new(u.arbitrary()?),
399        })
400    }
401}
402
403#[derive(Clone, Debug)]
404pub struct ReShard<H: Hasher> {
405    inclusion_proof: Proof<H::Digest>,
406    shard: Matrix,
407}
408
409impl<H: Hasher> PartialEq for ReShard<H> {
410    fn eq(&self, other: &Self) -> bool {
411        self.inclusion_proof == other.inclusion_proof && self.shard == other.shard
412    }
413}
414
415impl<H: Hasher> Eq for ReShard<H> {}
416
417impl<H: Hasher> EncodeSize for ReShard<H> {
418    fn encode_size(&self) -> usize {
419        self.inclusion_proof.encode_size() + self.shard.encode_size()
420    }
421}
422
423impl<H: Hasher> Write for ReShard<H> {
424    fn write(&self, buf: &mut impl BufMut) {
425        self.inclusion_proof.write(buf);
426        self.shard.write(buf);
427    }
428}
429
430impl<H: Hasher> Read for ReShard<H> {
431    type Cfg = crate::CodecConfig;
432
433    fn read_cfg(
434        buf: &mut impl bytes::Buf,
435        cfg: &Self::Cfg,
436    ) -> Result<Self, commonware_codec::Error> {
437        let max_data_bits = cfg.maximum_shard_size.saturating_mul(8);
438        let max_data_els = F::bits_to_elements(max_data_bits).max(1);
439        Ok(Self {
440            // Worst case: every row is one data element, and the sample size is all rows.
441            // TODO (#2506): use correct bounds on inclusion proof size
442            inclusion_proof: Read::read_cfg(buf, &max_data_els)?,
443            shard: Read::read_cfg(buf, &max_data_els)?,
444        })
445    }
446}
447
448#[cfg(feature = "arbitrary")]
449impl<H: Hasher> arbitrary::Arbitrary<'_> for ReShard<H>
450where
451    H::Digest: for<'a> arbitrary::Arbitrary<'a>,
452{
453    fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
454        Ok(Self {
455            inclusion_proof: u.arbitrary()?,
456            shard: u.arbitrary()?,
457        })
458    }
459}
460
461/// A ZODA shard that has been checked for integrity already.
462pub struct CheckedShard {
463    index: usize,
464    shard: Matrix,
465}
466
467/// Take indices up to `total`, and shuffle them.
468///
469/// The shuffle depends, deterministically, on the transcript.
470fn shuffle_indices(transcript: &Transcript, total: usize) -> Vec<Location> {
471    let mut out = (0..total as u64).map(Location::from).collect::<Vec<_>>();
472    out.shuffle(&mut transcript.noise(b"shuffle"));
473    out
474}
475
476/// Create a checking matrix of the right shape.
477///
478/// This matrix is random, using the transcript as a deterministic source of randomness.
479fn checking_matrix(transcript: &Transcript, topology: &Topology) -> Matrix {
480    Matrix::rand(
481        &mut transcript.noise(b"checking matrix"),
482        topology.data_cols,
483        topology.column_samples,
484    )
485}
486
487/// Data used to check [ReShard]s.
488#[derive(Clone)]
489pub struct CheckingData<H: Hasher> {
490    topology: Topology,
491    root: H::Digest,
492    checking_matrix: Matrix,
493    encoded_checksum: Matrix,
494    shuffled_indices: Vec<Location>,
495}
496
497impl<H: Hasher> CheckingData<H> {
498    /// Calculate the values of this struct, based on information received.
499    ///
500    /// We control `config`.
501    ///
502    /// We're provided with `commitment`, which should hash over `root`,
503    /// and `data_bytes`.
504    ///
505    /// We're also give a `checksum` matrix used to check the shards we receive.
506    fn reckon(
507        config: &Config,
508        commitment: &Summary,
509        data_bytes: usize,
510        root: H::Digest,
511        checksum: &Matrix,
512    ) -> Result<Self, Error> {
513        let topology = Topology::reckon(config, data_bytes);
514        let mut transcript = Transcript::new(NAMESPACE);
515        transcript.commit((topology.data_bytes as u64).encode());
516        transcript.commit(root.encode());
517        let expected_commitment = transcript.summarize();
518        if *commitment != expected_commitment {
519            return Err(Error::InvalidShard);
520        }
521        let transcript = Transcript::resume(expected_commitment);
522        let checking_matrix = checking_matrix(&transcript, &topology);
523        if checksum.rows() != topology.data_rows || checksum.cols() != topology.column_samples {
524            return Err(Error::InvalidShard);
525        }
526        let encoded_checksum = checksum
527            .as_polynomials(topology.encoded_rows)
528            .expect("checksum has too many rows")
529            .evaluate()
530            .data();
531        let shuffled_indices = shuffle_indices(&transcript, topology.encoded_rows);
532
533        Ok(Self {
534            topology,
535            root,
536            checking_matrix,
537            encoded_checksum,
538            shuffled_indices,
539        })
540    }
541
542    fn check(&self, index: u16, reshard: &ReShard<H>) -> Result<CheckedShard, Error> {
543        self.topology.check_index(index)?;
544        if reshard.shard.rows() != self.topology.samples
545            || reshard.shard.cols() != self.topology.data_cols
546        {
547            return Err(Error::InvalidReShard);
548        }
549        let index = index as usize;
550        let these_shuffled_indices = &self.shuffled_indices
551            [index * self.topology.samples..(index + 1) * self.topology.samples];
552        let proof_elements = {
553            these_shuffled_indices
554                .iter()
555                .zip(reshard.shard.iter())
556                .map(|(&i, row)| (row_digest::<H>(row), i))
557                .collect::<Vec<_>>()
558        };
559        if !reshard.inclusion_proof.verify_multi_inclusion(
560            &mut StandardHasher::<H>::new(),
561            &proof_elements,
562            &self.root,
563        ) {
564            return Err(Error::InvalidReShard);
565        }
566        let shard_checksum = reshard.shard.mul(&self.checking_matrix);
567        // Check that the shard checksum rows match the encoded checksums
568        for (row, &i) in shard_checksum.iter().zip(these_shuffled_indices) {
569            if row != &self.encoded_checksum[u64::from(i) as usize] {
570                return Err(Error::InvalidReShard);
571            }
572        }
573        Ok(CheckedShard {
574            index,
575            shard: reshard.shard.clone(),
576        })
577    }
578}
579
580#[derive(Debug, Error)]
581pub enum Error {
582    #[error("invalid shard")]
583    InvalidShard,
584    #[error("invalid reshard")]
585    InvalidReShard,
586    #[error("invalid index {0}")]
587    InvalidIndex(u16),
588    #[error("insufficient shards {0} < {1}")]
589    InsufficientShards(usize, usize),
590    #[error("insufficient unique rows {0} < {1}")]
591    InsufficientUniqueRows(usize, usize),
592    #[error("failed to create inclusion proof: {0}")]
593    FailedToCreateInclusionProof(MmrError),
594}
595
596// TODO (#2506): rename this to `_COMMONWARE_CODING_ZODA`
597const NAMESPACE: &[u8] = b"commonware-zoda";
598
599#[derive(Clone, Copy)]
600pub struct Zoda<H> {
601    _marker: PhantomData<H>,
602}
603
604impl<H> std::fmt::Debug for Zoda<H> {
605    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
606        write!(f, "Zoda")
607    }
608}
609
610impl<H: Hasher> Scheme for Zoda<H> {
611    type Commitment = Summary;
612
613    type Shard = Shard<H>;
614
615    type ReShard = ReShard<H>;
616
617    type CheckingData = CheckingData<H>;
618
619    type CheckedShard = CheckedShard;
620
621    type Error = Error;
622
623    fn encode(
624        config: &Config,
625        data: impl bytes::Buf,
626        concurrency: usize,
627    ) -> Result<(Self::Commitment, Vec<Self::Shard>), Self::Error> {
628        // Step 1: arrange the data as a matrix.
629        let data_bytes = data.remaining();
630        let topology = Topology::reckon(config, data_bytes);
631        let data = Matrix::init(
632            topology.data_rows,
633            topology.data_cols,
634            F::stream_from_u64s(iter_u64_le(data)),
635        );
636
637        // Step 2: Encode the data.
638        let encoded_data = data
639            .as_polynomials(topology.encoded_rows)
640            .expect("data has too many rows")
641            .evaluate()
642            .data();
643
644        // Step 3: Commit to the rows of the data.
645        let mut hasher = StandardHasher::<H>::new();
646        let mut mmr = DirtyMmr::new();
647        if concurrency > 1 {
648            let pool = ThreadPoolBuilder::new()
649                .num_threads(concurrency)
650                .build()
651                .expect("failed to build thread pool");
652            let row_hashes = pool.install(|| {
653                (0..encoded_data.rows())
654                    .into_par_iter()
655                    .map(|i| row_digest::<H>(&encoded_data[i]))
656                    .collect::<Vec<_>>()
657            });
658            for hash in &row_hashes {
659                mmr.add(&mut hasher, hash);
660            }
661        } else {
662            for row in encoded_data.iter() {
663                mmr.add(&mut hasher, &row_digest::<H>(row));
664            }
665        }
666        let mmr = mmr.merkleize(&mut hasher, None);
667        let root = *mmr.root();
668
669        // Step 4: Commit to the root, and the size of the data.
670        let mut transcript = Transcript::new(NAMESPACE);
671        transcript.commit((topology.data_bytes as u64).encode());
672        transcript.commit(root.encode());
673        let commitment = transcript.summarize();
674
675        // Step 5: Generate a checking matrix, and a shuffling with the commitment.
676        let transcript = Transcript::resume(commitment);
677        let checking_matrix = checking_matrix(&transcript, &topology);
678        let shuffled_indices = shuffle_indices(&transcript, encoded_data.rows());
679
680        // Step 6: Multiply the data with the checking matrix.
681        let checksum = Arc::new(data.mul(&checking_matrix));
682
683        // Step 7: Produce the shards.
684        // We can't use "chunks" because we need to handle a sample size of 0
685        let index_chunks = (0..topology.total_shards)
686            .map(|i| &shuffled_indices[i * topology.samples..(i + 1) * topology.samples]);
687        let shards = index_chunks
688            .map(|indices| {
689                let rows = Matrix::init(
690                    indices.len(),
691                    topology.data_cols,
692                    indices
693                        .iter()
694                        .flat_map(|&i| encoded_data[u64::from(i) as usize].iter().copied()),
695                );
696                let inclusion_proof = block_on(multi_proof(&mmr, indices))
697                    .map_err(Error::FailedToCreateInclusionProof)?;
698                Ok(Shard {
699                    data_bytes,
700                    root,
701                    inclusion_proof,
702                    rows,
703                    checksum: checksum.clone(),
704                })
705            })
706            .collect::<Result<Vec<_>, Error>>()?;
707        Ok((commitment, shards))
708    }
709
710    fn reshard(
711        config: &Config,
712        commitment: &Self::Commitment,
713        index: u16,
714        shard: Self::Shard,
715    ) -> Result<(Self::CheckingData, Self::CheckedShard, Self::ReShard), Self::Error> {
716        let reshard = ReShard {
717            inclusion_proof: shard.inclusion_proof,
718            shard: shard.rows,
719        };
720        let checking_data = CheckingData::reckon(
721            config,
722            commitment,
723            shard.data_bytes,
724            shard.root,
725            shard.checksum.as_ref(),
726        )?;
727        let checked_shard = checking_data.check(index, &reshard)?;
728        Ok((checking_data, checked_shard, reshard))
729    }
730
731    fn check(
732        _config: &Config,
733        _commitment: &Self::Commitment,
734        checking_data: &Self::CheckingData,
735        index: u16,
736        reshard: Self::ReShard,
737    ) -> Result<Self::CheckedShard, Self::Error> {
738        checking_data.check(index, &reshard)
739    }
740
741    fn decode(
742        _config: &Config,
743        _commitment: &Self::Commitment,
744        checking_data: Self::CheckingData,
745        shards: &[Self::CheckedShard],
746        _concurrency: usize,
747    ) -> Result<Vec<u8>, Self::Error> {
748        let Topology {
749            encoded_rows,
750            data_cols,
751            samples,
752            data_rows,
753            data_bytes,
754            min_shards,
755            ..
756        } = checking_data.topology;
757        if shards.len() < min_shards {
758            return Err(Error::InsufficientShards(shards.len(), min_shards));
759        }
760        let mut evaluation = EvaluationVector::empty(encoded_rows.ilog2() as usize, data_cols);
761        for shard in shards {
762            let indices =
763                &checking_data.shuffled_indices[shard.index * samples..(shard.index + 1) * samples];
764            for (&i, row) in indices.iter().zip(shard.shard.iter()) {
765                evaluation.fill_row(u64::from(i) as usize, row);
766            }
767        }
768        // This should never happen, because we check each shard, and the shards
769        // should have distinct rows. But, as a sanity check, this doesn't hurt.
770        let filled_rows = evaluation.filled_rows();
771        if filled_rows < data_rows {
772            return Err(Error::InsufficientUniqueRows(filled_rows, data_rows));
773        }
774        Ok(collect_u64_le(
775            data_bytes,
776            F::stream_to_u64s(
777                evaluation
778                    .recover()
779                    .coefficients_up_to(data_rows)
780                    .flatten()
781                    .copied(),
782            ),
783        ))
784    }
785}
786
787impl<H: Hasher> ValidatingScheme for Zoda<H> {}
788
789#[cfg(test)]
790mod tests {
791    use super::*;
792    use crate::{CodecConfig, Config};
793    use commonware_cryptography::Sha256;
794
795    const CONCURRENCY: usize = 1;
796
797    #[test]
798    fn topology_reckon_handles_small_extra_shards() {
799        let config = Config {
800            minimum_shards: 3,
801            extra_shards: 1,
802        };
803        let topology = Topology::reckon(&config, 16);
804        assert_eq!(topology.min_shards, 3);
805        assert_eq!(topology.total_shards, 4);
806
807        // Verify we hit the 1-column fallback and the security invariant holds.
808        // When the loop in reckon() exits without finding a multi-column config,
809        // correct_column_samples() must compensate by adding column samples.
810        assert_eq!(topology.data_cols, 1);
811        let required = topology.required_samples();
812        let provided = topology.samples * (topology.column_samples / 2);
813        assert!(
814            provided >= required,
815            "security invariant violated: provided {provided} < required {required}"
816        );
817    }
818
819    #[test]
820    fn reshard_roundtrip_handles_field_packing() {
821        use bytes::BytesMut;
822        use commonware_cryptography::Sha256;
823
824        let config = Config {
825            minimum_shards: 3,
826            extra_shards: 2,
827        };
828        let data = vec![0xAA; 64];
829
830        let (commitment, shards) =
831            Zoda::<Sha256>::encode(&config, data.as_slice(), CONCURRENCY).unwrap();
832        let shard = shards.into_iter().next().unwrap();
833
834        let (_, _, reshard) = Zoda::<Sha256>::reshard(&config, &commitment, 0, shard).unwrap();
835
836        let mut buf = BytesMut::new();
837        reshard.write(&mut buf);
838        let mut bytes = buf.freeze();
839        let decoded = ReShard::<Sha256>::read_cfg(
840            &mut bytes,
841            &CodecConfig {
842                maximum_shard_size: data.len(),
843            },
844        )
845        .unwrap();
846
847        assert_eq!(decoded, reshard);
848    }
849
850    #[test]
851    fn decode_rejects_duplicate_indices() {
852        let config = Config {
853            minimum_shards: 2,
854            extra_shards: 0,
855        };
856        let data = b"duplicate shard coverage";
857        let (commitment, shards) = Zoda::<Sha256>::encode(&config, &data[..], CONCURRENCY).unwrap();
858        let shard0 = shards[0].clone();
859        let (checking_data, checked_shard0, _reshard0) =
860            Zoda::<Sha256>::reshard(&config, &commitment, 0, shard0).unwrap();
861        let duplicate = CheckedShard {
862            index: checked_shard0.index,
863            shard: checked_shard0.shard.clone(),
864        };
865        let shards = vec![checked_shard0, duplicate];
866        let result =
867            Zoda::<Sha256>::decode(&config, &commitment, checking_data, &shards, CONCURRENCY);
868        match result {
869            Err(Error::InsufficientUniqueRows(actual, expected)) => {
870                assert!(actual < expected);
871            }
872            other => panic!("expected insufficient unique rows error, got {other:?}"),
873        }
874    }
875
876    #[cfg(feature = "arbitrary")]
877    mod conformance {
878        use super::*;
879        use commonware_codec::conformance::CodecConformance;
880
881        commonware_conformance::conformance_tests! {
882            CodecConformance<Shard<Sha256>>,
883            CodecConformance<ReShard<Sha256>>,
884        }
885    }
886}