1use crate::{Config, Scheme, ValidatingScheme};
117use bytes::BufMut;
118use commonware_codec::{Encode, EncodeSize, FixedSize, RangeCfg, Read, ReadExt, Write};
119use commonware_cryptography::{
120 transcript::{Summary, Transcript},
121 Hasher,
122};
123use commonware_math::{
124 fields::goldilocks::F,
125 ntt::{EvaluationVector, Matrix},
126};
127use commonware_storage::mmr::{
128 mem::DirtyMmr, verification::multi_proof, Error as MmrError, Location, Proof, StandardHasher,
129};
130use futures::executor::block_on;
131use rand::seq::SliceRandom as _;
132use rayon::{iter::ParallelIterator, prelude::IntoParallelIterator, ThreadPoolBuilder};
133use std::{marker::PhantomData, sync::Arc};
134use thiserror::Error;
135
136fn iter_u64_le(data: impl bytes::Buf) -> impl Iterator<Item = u64> {
138 struct Iter<B> {
139 remaining_u64s: usize,
140 tail: usize,
141 inner: B,
142 }
143
144 impl<B: bytes::Buf> Iter<B> {
145 fn new(inner: B) -> Self {
146 let remaining_u64s = inner.remaining() / 8;
147 let tail = inner.remaining() % 8;
148 Self {
149 remaining_u64s,
150 tail,
151 inner,
152 }
153 }
154 }
155
156 impl<B: bytes::Buf> Iterator for Iter<B> {
157 type Item = u64;
158
159 fn next(&mut self) -> Option<Self::Item> {
160 if self.remaining_u64s > 0 {
161 self.remaining_u64s -= 1;
162 return Some(self.inner.get_u64_le());
163 }
164 if self.tail > 0 {
165 let mut chunk = [0u8; 8];
166 self.inner.copy_to_slice(&mut chunk[..self.tail]);
167 self.tail = 0;
168 return Some(u64::from_le_bytes(chunk));
169 }
170 None
171 }
172 }
173 Iter::new(data)
174}
175
176fn collect_u64_le(max_length: usize, data: impl Iterator<Item = u64>) -> Vec<u8> {
177 let mut out = Vec::with_capacity(max_length);
178 for d in data {
179 out.extend_from_slice(&d.to_le_bytes());
180 }
181 out.truncate(max_length);
182 out
183}
184
185fn row_digest<H: Hasher>(row: &[F]) -> H::Digest {
186 let mut h = H::new();
187 for x in row {
188 h.update(&x.to_le_bytes());
189 }
190 h.finalize()
191}
192
193mod topology {
194 use super::Error;
195 use crate::Config;
196 use commonware_math::fields::goldilocks::F;
197 use commonware_utils::BigRationalExt as _;
198 use num_rational::BigRational;
199
200 const SECURITY_BITS: usize = 126;
201 const LOG2_PRECISION: usize = SECURITY_BITS.next_power_of_two().trailing_zeros() as usize;
205
206 #[derive(Debug, Clone, Copy, PartialEq)]
208 pub struct Topology {
209 pub data_bytes: usize,
211 pub data_cols: usize,
213 pub data_rows: usize,
215 pub encoded_rows: usize,
217 pub samples: usize,
219 pub column_samples: usize,
221 pub min_shards: usize,
223 pub total_shards: usize,
225 }
226
227 impl Topology {
228 const fn with_cols(data_bytes: usize, n: usize, k: usize, cols: usize) -> Self {
229 let data_els = F::bits_to_elements(8 * data_bytes);
230 let data_rows = data_els.div_ceil(cols);
231 let samples = data_rows.div_ceil(n);
232 Self {
233 data_bytes,
234 data_cols: cols,
235 data_rows,
236 encoded_rows: ((n + k) * samples).next_power_of_two(),
237 samples,
238 column_samples: 0,
239 min_shards: n,
240 total_shards: n + k,
241 }
242 }
243
244 pub(crate) fn required_samples(&self) -> usize {
245 let k = BigRational::from_usize(self.encoded_rows - self.data_rows);
246 let m = BigRational::from_usize(self.encoded_rows);
247 let fraction = (&k + BigRational::from_u64(1)) / (BigRational::from_usize(2) * &m);
248
249 let one_minus = BigRational::from_usize(1) - &fraction;
252 let log_term = one_minus.log2_ceil(LOG2_PRECISION);
253 if log_term >= BigRational::from_u64(0) {
254 return usize::MAX;
255 }
256
257 let required = BigRational::from_usize(SECURITY_BITS) / -log_term;
258 required.ceil_to_u128().unwrap_or(u128::MAX) as usize
259 }
260
261 fn correct_column_samples(&mut self) {
262 self.column_samples =
269 F::bits_to_elements(SECURITY_BITS) * self.required_samples().div_ceil(self.samples);
270 }
271
272 pub fn reckon(config: &Config, data_bytes: usize) -> Self {
274 let n = config.minimum_shards as usize;
275 let k = config.extra_shards as usize;
276 let corrected_data_bytes = data_bytes.max(1);
280 let mut out = Self::with_cols(corrected_data_bytes, n, k, 1);
299 loop {
300 let attempt = Self::with_cols(corrected_data_bytes, n, k, out.data_cols + 1);
301 let required_samples = attempt.required_samples();
302 if required_samples.saturating_mul(n + k) <= attempt.encoded_rows {
303 out = Self {
304 samples: required_samples.max(attempt.samples),
305 ..attempt
306 };
307 } else {
308 break;
309 }
310 }
311 out.correct_column_samples();
312 out.data_bytes = data_bytes;
313 out
314 }
315
316 pub fn check_index(&self, i: u16) -> Result<(), Error> {
317 if (0..self.total_shards).contains(&(i as usize)) {
318 return Ok(());
319 }
320 Err(Error::InvalidIndex(i))
321 }
322 }
323}
324use topology::Topology;
325
326#[derive(Clone)]
328pub struct Shard<H: Hasher> {
329 data_bytes: usize,
330 root: H::Digest,
331 inclusion_proof: Proof<H::Digest>,
332 rows: Matrix,
333 checksum: Arc<Matrix>,
334}
335
336impl<H: Hasher> PartialEq for Shard<H> {
337 fn eq(&self, other: &Self) -> bool {
338 self.data_bytes == other.data_bytes
339 && self.root == other.root
340 && self.inclusion_proof == other.inclusion_proof
341 && self.rows == other.rows
342 && self.checksum == other.checksum
343 }
344}
345
346impl<H: Hasher> Eq for Shard<H> {}
347
348impl<H: Hasher> EncodeSize for Shard<H> {
349 fn encode_size(&self) -> usize {
350 self.data_bytes.encode_size()
351 + self.root.encode_size()
352 + self.inclusion_proof.encode_size()
353 + self.rows.encode_size()
354 + self.checksum.encode_size()
355 }
356}
357
358impl<H: Hasher> Write for Shard<H> {
359 fn write(&self, buf: &mut impl BufMut) {
360 self.data_bytes.write(buf);
361 self.root.write(buf);
362 self.inclusion_proof.write(buf);
363 self.rows.write(buf);
364 self.checksum.write(buf);
365 }
366}
367
368impl<H: Hasher> Read for Shard<H> {
369 type Cfg = crate::CodecConfig;
370
371 fn read_cfg(
372 buf: &mut impl bytes::Buf,
373 cfg: &Self::Cfg,
374 ) -> Result<Self, commonware_codec::Error> {
375 let data_bytes = usize::read_cfg(buf, &RangeCfg::from(..=cfg.maximum_shard_size))?;
376 let max_els = cfg.maximum_shard_size / F::SIZE;
377 Ok(Self {
378 data_bytes,
379 root: ReadExt::read(buf)?,
380 inclusion_proof: Read::read_cfg(buf, &max_els)?,
381 rows: Read::read_cfg(buf, &max_els)?,
382 checksum: Arc::new(Read::read_cfg(buf, &max_els)?),
383 })
384 }
385}
386
387#[cfg(feature = "arbitrary")]
388impl<H: Hasher> arbitrary::Arbitrary<'_> for Shard<H>
389where
390 H::Digest: for<'a> arbitrary::Arbitrary<'a>,
391{
392 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
393 Ok(Self {
394 data_bytes: u.arbitrary::<u32>()? as usize,
395 root: u.arbitrary()?,
396 inclusion_proof: u.arbitrary()?,
397 rows: u.arbitrary()?,
398 checksum: Arc::new(u.arbitrary()?),
399 })
400 }
401}
402
403#[derive(Clone, Debug)]
404pub struct ReShard<H: Hasher> {
405 inclusion_proof: Proof<H::Digest>,
406 shard: Matrix,
407}
408
409impl<H: Hasher> PartialEq for ReShard<H> {
410 fn eq(&self, other: &Self) -> bool {
411 self.inclusion_proof == other.inclusion_proof && self.shard == other.shard
412 }
413}
414
415impl<H: Hasher> Eq for ReShard<H> {}
416
417impl<H: Hasher> EncodeSize for ReShard<H> {
418 fn encode_size(&self) -> usize {
419 self.inclusion_proof.encode_size() + self.shard.encode_size()
420 }
421}
422
423impl<H: Hasher> Write for ReShard<H> {
424 fn write(&self, buf: &mut impl BufMut) {
425 self.inclusion_proof.write(buf);
426 self.shard.write(buf);
427 }
428}
429
430impl<H: Hasher> Read for ReShard<H> {
431 type Cfg = crate::CodecConfig;
432
433 fn read_cfg(
434 buf: &mut impl bytes::Buf,
435 cfg: &Self::Cfg,
436 ) -> Result<Self, commonware_codec::Error> {
437 let max_data_bits = cfg.maximum_shard_size.saturating_mul(8);
438 let max_data_els = F::bits_to_elements(max_data_bits).max(1);
439 Ok(Self {
440 inclusion_proof: Read::read_cfg(buf, &max_data_els)?,
443 shard: Read::read_cfg(buf, &max_data_els)?,
444 })
445 }
446}
447
448#[cfg(feature = "arbitrary")]
449impl<H: Hasher> arbitrary::Arbitrary<'_> for ReShard<H>
450where
451 H::Digest: for<'a> arbitrary::Arbitrary<'a>,
452{
453 fn arbitrary(u: &mut arbitrary::Unstructured<'_>) -> arbitrary::Result<Self> {
454 Ok(Self {
455 inclusion_proof: u.arbitrary()?,
456 shard: u.arbitrary()?,
457 })
458 }
459}
460
461pub struct CheckedShard {
463 index: usize,
464 shard: Matrix,
465}
466
467fn shuffle_indices(transcript: &Transcript, total: usize) -> Vec<Location> {
471 let mut out = (0..total as u64).map(Location::from).collect::<Vec<_>>();
472 out.shuffle(&mut transcript.noise(b"shuffle"));
473 out
474}
475
476fn checking_matrix(transcript: &Transcript, topology: &Topology) -> Matrix {
480 Matrix::rand(
481 &mut transcript.noise(b"checking matrix"),
482 topology.data_cols,
483 topology.column_samples,
484 )
485}
486
487#[derive(Clone)]
489pub struct CheckingData<H: Hasher> {
490 topology: Topology,
491 root: H::Digest,
492 checking_matrix: Matrix,
493 encoded_checksum: Matrix,
494 shuffled_indices: Vec<Location>,
495}
496
497impl<H: Hasher> CheckingData<H> {
498 fn reckon(
507 config: &Config,
508 commitment: &Summary,
509 data_bytes: usize,
510 root: H::Digest,
511 checksum: &Matrix,
512 ) -> Result<Self, Error> {
513 let topology = Topology::reckon(config, data_bytes);
514 let mut transcript = Transcript::new(NAMESPACE);
515 transcript.commit((topology.data_bytes as u64).encode());
516 transcript.commit(root.encode());
517 let expected_commitment = transcript.summarize();
518 if *commitment != expected_commitment {
519 return Err(Error::InvalidShard);
520 }
521 let transcript = Transcript::resume(expected_commitment);
522 let checking_matrix = checking_matrix(&transcript, &topology);
523 if checksum.rows() != topology.data_rows || checksum.cols() != topology.column_samples {
524 return Err(Error::InvalidShard);
525 }
526 let encoded_checksum = checksum
527 .as_polynomials(topology.encoded_rows)
528 .expect("checksum has too many rows")
529 .evaluate()
530 .data();
531 let shuffled_indices = shuffle_indices(&transcript, topology.encoded_rows);
532
533 Ok(Self {
534 topology,
535 root,
536 checking_matrix,
537 encoded_checksum,
538 shuffled_indices,
539 })
540 }
541
542 fn check(&self, index: u16, reshard: &ReShard<H>) -> Result<CheckedShard, Error> {
543 self.topology.check_index(index)?;
544 if reshard.shard.rows() != self.topology.samples
545 || reshard.shard.cols() != self.topology.data_cols
546 {
547 return Err(Error::InvalidReShard);
548 }
549 let index = index as usize;
550 let these_shuffled_indices = &self.shuffled_indices
551 [index * self.topology.samples..(index + 1) * self.topology.samples];
552 let proof_elements = {
553 these_shuffled_indices
554 .iter()
555 .zip(reshard.shard.iter())
556 .map(|(&i, row)| (row_digest::<H>(row), i))
557 .collect::<Vec<_>>()
558 };
559 if !reshard.inclusion_proof.verify_multi_inclusion(
560 &mut StandardHasher::<H>::new(),
561 &proof_elements,
562 &self.root,
563 ) {
564 return Err(Error::InvalidReShard);
565 }
566 let shard_checksum = reshard.shard.mul(&self.checking_matrix);
567 for (row, &i) in shard_checksum.iter().zip(these_shuffled_indices) {
569 if row != &self.encoded_checksum[u64::from(i) as usize] {
570 return Err(Error::InvalidReShard);
571 }
572 }
573 Ok(CheckedShard {
574 index,
575 shard: reshard.shard.clone(),
576 })
577 }
578}
579
580#[derive(Debug, Error)]
581pub enum Error {
582 #[error("invalid shard")]
583 InvalidShard,
584 #[error("invalid reshard")]
585 InvalidReShard,
586 #[error("invalid index {0}")]
587 InvalidIndex(u16),
588 #[error("insufficient shards {0} < {1}")]
589 InsufficientShards(usize, usize),
590 #[error("insufficient unique rows {0} < {1}")]
591 InsufficientUniqueRows(usize, usize),
592 #[error("failed to create inclusion proof: {0}")]
593 FailedToCreateInclusionProof(MmrError),
594}
595
596const NAMESPACE: &[u8] = b"commonware-zoda";
598
599#[derive(Clone, Copy)]
600pub struct Zoda<H> {
601 _marker: PhantomData<H>,
602}
603
604impl<H> std::fmt::Debug for Zoda<H> {
605 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
606 write!(f, "Zoda")
607 }
608}
609
610impl<H: Hasher> Scheme for Zoda<H> {
611 type Commitment = Summary;
612
613 type Shard = Shard<H>;
614
615 type ReShard = ReShard<H>;
616
617 type CheckingData = CheckingData<H>;
618
619 type CheckedShard = CheckedShard;
620
621 type Error = Error;
622
623 fn encode(
624 config: &Config,
625 data: impl bytes::Buf,
626 concurrency: usize,
627 ) -> Result<(Self::Commitment, Vec<Self::Shard>), Self::Error> {
628 let data_bytes = data.remaining();
630 let topology = Topology::reckon(config, data_bytes);
631 let data = Matrix::init(
632 topology.data_rows,
633 topology.data_cols,
634 F::stream_from_u64s(iter_u64_le(data)),
635 );
636
637 let encoded_data = data
639 .as_polynomials(topology.encoded_rows)
640 .expect("data has too many rows")
641 .evaluate()
642 .data();
643
644 let mut hasher = StandardHasher::<H>::new();
646 let mut mmr = DirtyMmr::new();
647 if concurrency > 1 {
648 let pool = ThreadPoolBuilder::new()
649 .num_threads(concurrency)
650 .build()
651 .expect("failed to build thread pool");
652 let row_hashes = pool.install(|| {
653 (0..encoded_data.rows())
654 .into_par_iter()
655 .map(|i| row_digest::<H>(&encoded_data[i]))
656 .collect::<Vec<_>>()
657 });
658 for hash in &row_hashes {
659 mmr.add(&mut hasher, hash);
660 }
661 } else {
662 for row in encoded_data.iter() {
663 mmr.add(&mut hasher, &row_digest::<H>(row));
664 }
665 }
666 let mmr = mmr.merkleize(&mut hasher, None);
667 let root = *mmr.root();
668
669 let mut transcript = Transcript::new(NAMESPACE);
671 transcript.commit((topology.data_bytes as u64).encode());
672 transcript.commit(root.encode());
673 let commitment = transcript.summarize();
674
675 let transcript = Transcript::resume(commitment);
677 let checking_matrix = checking_matrix(&transcript, &topology);
678 let shuffled_indices = shuffle_indices(&transcript, encoded_data.rows());
679
680 let checksum = Arc::new(data.mul(&checking_matrix));
682
683 let index_chunks = (0..topology.total_shards)
686 .map(|i| &shuffled_indices[i * topology.samples..(i + 1) * topology.samples]);
687 let shards = index_chunks
688 .map(|indices| {
689 let rows = Matrix::init(
690 indices.len(),
691 topology.data_cols,
692 indices
693 .iter()
694 .flat_map(|&i| encoded_data[u64::from(i) as usize].iter().copied()),
695 );
696 let inclusion_proof = block_on(multi_proof(&mmr, indices))
697 .map_err(Error::FailedToCreateInclusionProof)?;
698 Ok(Shard {
699 data_bytes,
700 root,
701 inclusion_proof,
702 rows,
703 checksum: checksum.clone(),
704 })
705 })
706 .collect::<Result<Vec<_>, Error>>()?;
707 Ok((commitment, shards))
708 }
709
710 fn reshard(
711 config: &Config,
712 commitment: &Self::Commitment,
713 index: u16,
714 shard: Self::Shard,
715 ) -> Result<(Self::CheckingData, Self::CheckedShard, Self::ReShard), Self::Error> {
716 let reshard = ReShard {
717 inclusion_proof: shard.inclusion_proof,
718 shard: shard.rows,
719 };
720 let checking_data = CheckingData::reckon(
721 config,
722 commitment,
723 shard.data_bytes,
724 shard.root,
725 shard.checksum.as_ref(),
726 )?;
727 let checked_shard = checking_data.check(index, &reshard)?;
728 Ok((checking_data, checked_shard, reshard))
729 }
730
731 fn check(
732 _config: &Config,
733 _commitment: &Self::Commitment,
734 checking_data: &Self::CheckingData,
735 index: u16,
736 reshard: Self::ReShard,
737 ) -> Result<Self::CheckedShard, Self::Error> {
738 checking_data.check(index, &reshard)
739 }
740
741 fn decode(
742 _config: &Config,
743 _commitment: &Self::Commitment,
744 checking_data: Self::CheckingData,
745 shards: &[Self::CheckedShard],
746 _concurrency: usize,
747 ) -> Result<Vec<u8>, Self::Error> {
748 let Topology {
749 encoded_rows,
750 data_cols,
751 samples,
752 data_rows,
753 data_bytes,
754 min_shards,
755 ..
756 } = checking_data.topology;
757 if shards.len() < min_shards {
758 return Err(Error::InsufficientShards(shards.len(), min_shards));
759 }
760 let mut evaluation = EvaluationVector::empty(encoded_rows.ilog2() as usize, data_cols);
761 for shard in shards {
762 let indices =
763 &checking_data.shuffled_indices[shard.index * samples..(shard.index + 1) * samples];
764 for (&i, row) in indices.iter().zip(shard.shard.iter()) {
765 evaluation.fill_row(u64::from(i) as usize, row);
766 }
767 }
768 let filled_rows = evaluation.filled_rows();
771 if filled_rows < data_rows {
772 return Err(Error::InsufficientUniqueRows(filled_rows, data_rows));
773 }
774 Ok(collect_u64_le(
775 data_bytes,
776 F::stream_to_u64s(
777 evaluation
778 .recover()
779 .coefficients_up_to(data_rows)
780 .flatten()
781 .copied(),
782 ),
783 ))
784 }
785}
786
787impl<H: Hasher> ValidatingScheme for Zoda<H> {}
788
789#[cfg(test)]
790mod tests {
791 use super::*;
792 use crate::{CodecConfig, Config};
793 use commonware_cryptography::Sha256;
794
795 const CONCURRENCY: usize = 1;
796
797 #[test]
798 fn topology_reckon_handles_small_extra_shards() {
799 let config = Config {
800 minimum_shards: 3,
801 extra_shards: 1,
802 };
803 let topology = Topology::reckon(&config, 16);
804 assert_eq!(topology.min_shards, 3);
805 assert_eq!(topology.total_shards, 4);
806
807 assert_eq!(topology.data_cols, 1);
811 let required = topology.required_samples();
812 let provided = topology.samples * (topology.column_samples / 2);
813 assert!(
814 provided >= required,
815 "security invariant violated: provided {provided} < required {required}"
816 );
817 }
818
819 #[test]
820 fn reshard_roundtrip_handles_field_packing() {
821 use bytes::BytesMut;
822 use commonware_cryptography::Sha256;
823
824 let config = Config {
825 minimum_shards: 3,
826 extra_shards: 2,
827 };
828 let data = vec![0xAA; 64];
829
830 let (commitment, shards) =
831 Zoda::<Sha256>::encode(&config, data.as_slice(), CONCURRENCY).unwrap();
832 let shard = shards.into_iter().next().unwrap();
833
834 let (_, _, reshard) = Zoda::<Sha256>::reshard(&config, &commitment, 0, shard).unwrap();
835
836 let mut buf = BytesMut::new();
837 reshard.write(&mut buf);
838 let mut bytes = buf.freeze();
839 let decoded = ReShard::<Sha256>::read_cfg(
840 &mut bytes,
841 &CodecConfig {
842 maximum_shard_size: data.len(),
843 },
844 )
845 .unwrap();
846
847 assert_eq!(decoded, reshard);
848 }
849
850 #[test]
851 fn decode_rejects_duplicate_indices() {
852 let config = Config {
853 minimum_shards: 2,
854 extra_shards: 0,
855 };
856 let data = b"duplicate shard coverage";
857 let (commitment, shards) = Zoda::<Sha256>::encode(&config, &data[..], CONCURRENCY).unwrap();
858 let shard0 = shards[0].clone();
859 let (checking_data, checked_shard0, _reshard0) =
860 Zoda::<Sha256>::reshard(&config, &commitment, 0, shard0).unwrap();
861 let duplicate = CheckedShard {
862 index: checked_shard0.index,
863 shard: checked_shard0.shard.clone(),
864 };
865 let shards = vec![checked_shard0, duplicate];
866 let result =
867 Zoda::<Sha256>::decode(&config, &commitment, checking_data, &shards, CONCURRENCY);
868 match result {
869 Err(Error::InsufficientUniqueRows(actual, expected)) => {
870 assert!(actual < expected);
871 }
872 other => panic!("expected insufficient unique rows error, got {other:?}"),
873 }
874 }
875
876 #[cfg(feature = "arbitrary")]
877 mod conformance {
878 use super::*;
879 use commonware_codec::conformance::CodecConformance;
880
881 commonware_conformance::conformance_tests! {
882 CodecConformance<Shard<Sha256>>,
883 CodecConformance<ReShard<Sha256>>,
884 }
885 }
886}