1use fsqlite_error::{FrankenError, Result};
7use fsqlite_types::{ObjectId, gf256_inverse_byte, gf256_mul_byte};
8use tracing::{debug, error, info, warn};
9
10const INTER_OBJECT_BEAD_ID: &str = "bd-1hi.26";
11const INTER_OBJECT_LOGGING_STANDARD: &str = "bd-1fpm";
12const ECS_OBJECT_ID_DOMAIN: &[u8] = b"fsqlite:ecs:v1";
13const CODING_GROUP_ID_DOMAIN: &[u8] = b"fsqlite:coding-group:v1";
14const DEFAULT_REPAIR_OVERHEAD_BPS: u16 = 2_000; #[derive(Debug, Clone, PartialEq, Eq)]
18pub struct EcsObject {
19 pub object_id: ObjectId,
21 pub canonical_bytes: Vec<u8>,
23}
24
25impl EcsObject {
26 #[must_use]
28 pub fn from_canonical(canonical_bytes: Vec<u8>) -> Self {
29 let object_id = derive_ecs_object_id(&canonical_bytes);
30 Self {
31 object_id,
32 canonical_bytes,
33 }
34 }
35
36 #[must_use]
38 pub const fn with_object_id(object_id: ObjectId, canonical_bytes: Vec<u8>) -> Self {
39 Self {
40 object_id,
41 canonical_bytes,
42 }
43 }
44}
45
46#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct CodingGroup {
49 pub group_id: ObjectId,
51 pub member_ids: Vec<ObjectId>,
53 pub total_len: u64,
55 pub member_lens: Vec<u64>,
57 pub k_source: u32,
59 pub symbol_size: u32,
61 pub repair_symbol_count: u32,
63}
64
65#[derive(Debug, Clone, PartialEq, Eq)]
67pub struct GroupSymbol {
68 pub group_id: ObjectId,
70 pub esi: u32,
72 pub data: Vec<u8>,
74 pub coefficients: Vec<u8>,
76}
77
78#[derive(Debug, Clone, PartialEq, Eq)]
80pub struct CodedCatchupBatch {
81 pub group: CodingGroup,
83 pub symbols: Vec<GroupSymbol>,
85}
86
87#[derive(Debug, Clone, PartialEq, Eq)]
88struct LinearRow {
89 coefficients: Vec<u8>,
90 payload: Vec<u8>,
91}
92
93pub fn encode_coding_group(objects: &[EcsObject], symbol_size: u32) -> Result<CodedCatchupBatch> {
100 encode_coding_group_with_repair(objects, symbol_size, None)
101}
102
103#[allow(clippy::too_many_lines)]
110pub fn encode_coding_group_with_repair(
111 objects: &[EcsObject],
112 symbol_size: u32,
113 repair_symbol_count: Option<u32>,
114) -> Result<CodedCatchupBatch> {
115 #[allow(clippy::too_many_lines)]
116 fn inner(
117 objects: &[EcsObject],
118 symbol_size: u32,
119 repair_symbol_count: Option<u32>,
120 ) -> Result<CodedCatchupBatch> {
121 if objects.is_empty() {
122 return Err(FrankenError::OutOfRange {
123 what: "coding_group.member_count".to_owned(),
124 value: "0".to_owned(),
125 });
126 }
127 if symbol_size == 0 {
128 return Err(FrankenError::OutOfRange {
129 what: "coding_group.symbol_size".to_owned(),
130 value: "0".to_owned(),
131 });
132 }
133
134 debug!(
135 bead_id = INTER_OBJECT_BEAD_ID,
136 logging_standard = INTER_OBJECT_LOGGING_STANDARD,
137 member_count = objects.len(),
138 symbol_size = symbol_size,
139 "encoding inter-object coding group"
140 );
141
142 let member_ids: Vec<ObjectId> = objects.iter().map(|object| object.object_id).collect();
143 let member_lens: Vec<u64> = objects
144 .iter()
145 .map(|object| u64::try_from(object.canonical_bytes.len()).unwrap_or(u64::MAX))
146 .collect();
147
148 let mut concatenated = Vec::new();
149 for object in objects {
150 concatenated.extend_from_slice(&object.canonical_bytes);
151 }
152 let total_len =
153 u64::try_from(concatenated.len()).map_err(|_| FrankenError::OutOfRange {
154 what: "coding_group.total_len".to_owned(),
155 value: concatenated.len().to_string(),
156 })?;
157
158 let symbol_size_usize =
159 usize::try_from(symbol_size).map_err(|_| FrankenError::OutOfRange {
160 what: "coding_group.symbol_size".to_owned(),
161 value: symbol_size.to_string(),
162 })?;
163 let k_source_usize = ceil_div_usize(concatenated.len(), symbol_size_usize);
164 let k_source = u32::try_from(k_source_usize).map_err(|_| FrankenError::OutOfRange {
165 what: "coding_group.k_source".to_owned(),
166 value: k_source_usize.to_string(),
167 })?;
168
169 if k_source == 0 {
170 return Err(FrankenError::OutOfRange {
171 what: "coding_group.k_source".to_owned(),
172 value: "0".to_owned(),
173 });
174 }
175
176 let group_id = derive_group_id(&member_ids, &member_lens, total_len, k_source, symbol_size);
177 let default_repairs = default_repair_count(k_source);
178 let repair_count = repair_symbol_count.unwrap_or(default_repairs);
179
180 let padded_len = k_source_usize
181 .checked_mul(symbol_size_usize)
182 .ok_or_else(|| FrankenError::OutOfRange {
183 what: "coding_group.padded_len".to_owned(),
184 value: format!("{k_source_usize}*{symbol_size_usize}"),
185 })?;
186 concatenated.resize(padded_len, 0);
187
188 let mut source_payloads = Vec::with_capacity(k_source_usize);
189 let mut symbols = Vec::with_capacity(
190 usize::try_from(k_source.saturating_add(repair_count)).unwrap_or(k_source_usize),
191 );
192
193 for source_idx in 0..k_source_usize {
194 let start = source_idx * symbol_size_usize;
195 let end = start + symbol_size_usize;
196 let data = concatenated[start..end].to_vec();
197 source_payloads.push(data.clone());
198 symbols.push(GroupSymbol {
199 group_id,
200 esi: u32::try_from(source_idx).unwrap_or(u32::MAX),
201 data,
202 coefficients: unit_vector(k_source_usize, source_idx),
203 });
204 }
205
206 for repair_idx in 0..repair_count {
207 let esi = k_source.saturating_add(repair_idx);
208 let coefficients = deterministic_coefficients(group_id, esi, k_source_usize);
209 let data = linear_combine(&source_payloads, &coefficients, symbol_size_usize);
210 symbols.push(GroupSymbol {
211 group_id,
212 esi,
213 data,
214 coefficients,
215 });
216 }
217
218 info!(
219 bead_id = INTER_OBJECT_BEAD_ID,
220 logging_standard = INTER_OBJECT_LOGGING_STANDARD,
221 group_id = %group_id,
222 member_count = member_ids.len(),
223 total_len = total_len,
224 k_source = k_source,
225 repair_symbol_count = repair_count,
226 symbol_count = symbols.len(),
227 "inter-object coding group encoded"
228 );
229
230 Ok(CodedCatchupBatch {
231 group: CodingGroup {
232 group_id,
233 member_ids,
234 total_len,
235 member_lens,
236 k_source,
237 symbol_size,
238 repair_symbol_count: repair_count,
239 },
240 symbols,
241 })
242 }
243 inner(objects, symbol_size, repair_symbol_count)
244}
245
246#[allow(clippy::too_many_lines)]
252pub fn decode_coding_group(group: &CodingGroup, symbols: &[GroupSymbol]) -> Result<Vec<EcsObject>> {
253 #[allow(clippy::too_many_lines)]
254 fn inner(group: &CodingGroup, symbols: &[GroupSymbol]) -> Result<Vec<EcsObject>> {
255 validate_group(group)?;
256 let symbol_size_usize =
257 usize::try_from(group.symbol_size).map_err(|_| FrankenError::DatabaseCorrupt {
258 detail: format!("invalid group symbol_size {}", group.symbol_size),
259 })?;
260 let k_source_usize =
261 usize::try_from(group.k_source).map_err(|_| FrankenError::DatabaseCorrupt {
262 detail: format!("invalid group k_source {}", group.k_source),
263 })?;
264
265 let mut relevant: Vec<&GroupSymbol> = symbols
266 .iter()
267 .filter(|symbol| symbol.group_id == group.group_id)
268 .collect();
269 relevant.sort_by_key(|symbol| symbol.esi);
270
271 if relevant.len() < k_source_usize {
272 warn!(
273 bead_id = INTER_OBJECT_BEAD_ID,
274 logging_standard = INTER_OBJECT_LOGGING_STANDARD,
275 group_id = %group.group_id,
276 required_symbols = k_source_usize,
277 received_symbols = relevant.len(),
278 "insufficient symbols for coding-group decode"
279 );
280 return Err(FrankenError::DatabaseCorrupt {
281 detail: format!(
282 "reason_code=inter_object_decode_insufficient_symbols group_id={} required={} received={}",
283 group.group_id,
284 k_source_usize,
285 relevant.len()
286 ),
287 });
288 }
289
290 debug!(
291 bead_id = INTER_OBJECT_BEAD_ID,
292 logging_standard = INTER_OBJECT_LOGGING_STANDARD,
293 group_id = %group.group_id,
294 required_symbols = k_source_usize,
295 candidate_symbols = relevant.len(),
296 "decoding inter-object coding group"
297 );
298
299 let mut rows = Vec::with_capacity(relevant.len());
300 for symbol in relevant {
301 if symbol.data.len() != symbol_size_usize {
302 return Err(FrankenError::DatabaseCorrupt {
303 detail: format!(
304 "symbol size mismatch for group {}: expected {}, got {}",
305 group.group_id,
306 symbol_size_usize,
307 symbol.data.len()
308 ),
309 });
310 }
311 let coefficients = if symbol.coefficients.len() == k_source_usize {
312 symbol.coefficients.clone()
313 } else if symbol.esi < group.k_source {
314 unit_vector(
315 k_source_usize,
316 usize::try_from(symbol.esi).unwrap_or(usize::MAX),
317 )
318 } else {
319 return Err(FrankenError::DatabaseCorrupt {
320 detail: format!(
321 "missing coefficient row for repair symbol esi={} group_id={}",
322 symbol.esi, group.group_id
323 ),
324 });
325 };
326 rows.push(LinearRow {
327 coefficients,
328 payload: symbol.data.clone(),
329 });
330 }
331
332 let source_payloads = solve_source_symbols(rows, k_source_usize, symbol_size_usize)
333 .map_err(|decode_error| {
334 error!(
335 bead_id = INTER_OBJECT_BEAD_ID,
336 logging_standard = INTER_OBJECT_LOGGING_STANDARD,
337 group_id = %group.group_id,
338 error = %decode_error,
339 "coding-group decode failed"
340 );
341 decode_error
342 })?;
343
344 let mut concatenated =
345 Vec::with_capacity(k_source_usize.checked_mul(symbol_size_usize).unwrap_or(0));
346 for payload in &source_payloads {
347 concatenated.extend_from_slice(payload);
348 }
349 let total_len_usize =
350 usize::try_from(group.total_len).map_err(|_| FrankenError::DatabaseCorrupt {
351 detail: format!("invalid group total_len {}", group.total_len),
352 })?;
353 if total_len_usize > concatenated.len() {
354 return Err(FrankenError::DatabaseCorrupt {
355 detail: format!(
356 "decoded payload shorter than total_len: decoded={} total_len={}",
357 concatenated.len(),
358 total_len_usize
359 ),
360 });
361 }
362 concatenated.truncate(total_len_usize);
363
364 let mut offset = 0_usize;
365 let mut recovered = Vec::with_capacity(group.member_lens.len());
366 for (member_idx, member_len) in group.member_lens.iter().enumerate() {
367 let member_len =
368 usize::try_from(*member_len).map_err(|_| FrankenError::DatabaseCorrupt {
369 detail: format!("invalid member length {}", member_len),
370 })?;
371 let end = offset.saturating_add(member_len);
372 if end > concatenated.len() {
373 return Err(FrankenError::DatabaseCorrupt {
374 detail: format!(
375 "demultiplex overflow at member {}: offset={} len={} decoded_len={}",
376 member_idx,
377 offset,
378 member_len,
379 concatenated.len()
380 ),
381 });
382 }
383 let object = EcsObject::from_canonical(concatenated[offset..end].to_vec());
384 let expected = group.member_ids[member_idx];
385 if object.object_id != expected {
386 return Err(FrankenError::DatabaseCorrupt {
387 detail: format!(
388 "object id mismatch at member {}: expected {}, got {}",
389 member_idx, expected, object.object_id
390 ),
391 });
392 }
393 recovered.push(object);
394 offset = end;
395 }
396
397 info!(
398 bead_id = INTER_OBJECT_BEAD_ID,
399 logging_standard = INTER_OBJECT_LOGGING_STANDARD,
400 group_id = %group.group_id,
401 recovered_objects = recovered.len(),
402 "inter-object coding group decoded"
403 );
404
405 Ok(recovered)
406 }
407 inner(group, symbols)
408}
409
410pub fn build_replication_catchup_batch(
416 missing_objects: &[EcsObject],
417 symbol_size: u32,
418) -> Result<CodedCatchupBatch> {
419 encode_coding_group(missing_objects, symbol_size)
420}
421
422fn validate_group(group: &CodingGroup) -> Result<()> {
423 if group.member_ids.is_empty() {
424 return Err(FrankenError::DatabaseCorrupt {
425 detail: "coding group has no members".to_owned(),
426 });
427 }
428 if group.member_ids.len() != group.member_lens.len() {
429 return Err(FrankenError::DatabaseCorrupt {
430 detail: format!(
431 "member id/length mismatch: ids={} lens={}",
432 group.member_ids.len(),
433 group.member_lens.len()
434 ),
435 });
436 }
437 let total: u64 = group.member_lens.iter().copied().sum();
438 if total != group.total_len {
439 return Err(FrankenError::DatabaseCorrupt {
440 detail: format!(
441 "total_len mismatch: declared={} computed={}",
442 group.total_len, total
443 ),
444 });
445 }
446 if group.k_source == 0 || group.symbol_size == 0 {
447 return Err(FrankenError::DatabaseCorrupt {
448 detail: format!(
449 "invalid group dimensions: k_source={} symbol_size={}",
450 group.k_source, group.symbol_size
451 ),
452 });
453 }
454 Ok(())
455}
456
457fn solve_source_symbols(
458 mut rows: Vec<LinearRow>,
459 k_source: usize,
460 symbol_size: usize,
461) -> Result<Vec<Vec<u8>>> {
462 let mut pivot_row = 0_usize;
463 for col in 0..k_source {
464 let Some(found) = rows
465 .iter()
466 .enumerate()
467 .skip(pivot_row)
468 .find(|(_, row)| row.coefficients[col] != 0)
469 .map(|(idx, _)| idx)
470 else {
471 return Err(FrankenError::DatabaseCorrupt {
472 detail: format!(
473 "reason_code=inter_object_decode_rank_deficient missing_pivot_col={col}"
474 ),
475 });
476 };
477
478 if found != pivot_row {
479 rows.swap(found, pivot_row);
480 }
481
482 let pivot = rows[pivot_row].coefficients[col];
483 let Some(inv_pivot) = gf256_inverse_byte(pivot) else {
484 return Err(FrankenError::DatabaseCorrupt {
485 detail: format!("non-invertible pivot in column {col}: value={pivot}"),
486 });
487 };
488
489 scale_row(&mut rows[pivot_row], inv_pivot);
490 let pivot_snapshot = rows[pivot_row].clone();
491
492 let mut row_idx = 0_usize;
493 while row_idx < rows.len() {
494 if row_idx == pivot_row {
495 row_idx += 1;
496 continue;
497 }
498 let factor = rows[row_idx].coefficients[col];
499 if factor == 0 {
500 row_idx += 1;
501 continue;
502 }
503 eliminate_row(
504 &mut rows[row_idx],
505 &pivot_snapshot,
506 factor,
507 col,
508 symbol_size,
509 );
510 row_idx += 1;
511 }
512
513 pivot_row += 1;
514 if pivot_row == k_source {
515 break;
516 }
517 }
518
519 if pivot_row < k_source {
520 return Err(FrankenError::DatabaseCorrupt {
521 detail: format!(
522 "reason_code=inter_object_decode_not_enough_independent_symbols rank={pivot_row} required={k_source}"
523 ),
524 });
525 }
526
527 Ok(rows
528 .into_iter()
529 .take(k_source)
530 .map(|row| row.payload)
531 .collect())
532}
533
534fn scale_row(row: &mut LinearRow, scalar: u8) {
535 for coeff in &mut row.coefficients {
536 *coeff = gf256_mul_byte(*coeff, scalar);
537 }
538 for byte in &mut row.payload {
539 *byte = gf256_mul_byte(*byte, scalar);
540 }
541}
542
543fn eliminate_row(
544 target: &mut LinearRow,
545 pivot: &LinearRow,
546 factor: u8,
547 col_start: usize,
548 symbol_size: usize,
549) {
550 for idx in col_start..target.coefficients.len() {
551 let scaled = gf256_mul_byte(pivot.coefficients[idx], factor);
552 target.coefficients[idx] ^= scaled;
553 }
554 for idx in 0..symbol_size {
555 let scaled = gf256_mul_byte(pivot.payload[idx], factor);
556 target.payload[idx] ^= scaled;
557 }
558}
559
560fn linear_combine(source_payloads: &[Vec<u8>], coefficients: &[u8], symbol_size: usize) -> Vec<u8> {
561 let mut out = vec![0_u8; symbol_size];
562 for (source, coefficient) in source_payloads.iter().zip(coefficients.iter()) {
563 if *coefficient == 0 {
564 continue;
565 }
566 for (dst, src) in out.iter_mut().zip(source.iter()) {
567 *dst ^= gf256_mul_byte(*coefficient, *src);
568 }
569 }
570 out
571}
572
573fn unit_vector(len: usize, hot_index: usize) -> Vec<u8> {
574 let mut out = vec![0_u8; len];
575 if hot_index < len {
576 out[hot_index] = 1;
577 }
578 out
579}
580
581fn ceil_div_usize(numerator: usize, denominator: usize) -> usize {
582 let q = numerator / denominator;
583 let r = numerator % denominator;
584 if r == 0 { q } else { q + 1 }
585}
586
587fn default_repair_count(k_source: u32) -> u32 {
588 let k_source_u64 = u64::from(k_source);
589 let numerator = k_source_u64.saturating_mul(u64::from(DEFAULT_REPAIR_OVERHEAD_BPS));
590 let repair = numerator.div_ceil(10_000);
591 u32::try_from(repair.max(1)).unwrap_or(u32::MAX)
592}
593
594fn derive_ecs_object_id(canonical_bytes: &[u8]) -> ObjectId {
595 let mut hasher = blake3::Hasher::new();
596 hasher.update(ECS_OBJECT_ID_DOMAIN);
597 hasher.update(canonical_bytes);
598 let digest = hasher.finalize();
599 let mut bytes = [0_u8; 16];
600 bytes.copy_from_slice(&digest.as_bytes()[..16]);
601 ObjectId::from_bytes(bytes)
602}
603
604fn derive_group_id(
605 member_ids: &[ObjectId],
606 member_lens: &[u64],
607 total_len: u64,
608 k_source: u32,
609 symbol_size: u32,
610) -> ObjectId {
611 let mut hasher = blake3::Hasher::new();
612 hasher.update(CODING_GROUP_ID_DOMAIN);
613 hasher.update(&total_len.to_le_bytes());
614 hasher.update(&k_source.to_le_bytes());
615 hasher.update(&symbol_size.to_le_bytes());
616 for (member_id, member_len) in member_ids.iter().zip(member_lens.iter()) {
617 hasher.update(member_id.as_bytes());
618 hasher.update(&member_len.to_le_bytes());
619 }
620 let digest = hasher.finalize();
621 let mut bytes = [0_u8; 16];
622 bytes.copy_from_slice(&digest.as_bytes()[..16]);
623 ObjectId::from_bytes(bytes)
624}
625
626fn deterministic_coefficients(group_id: ObjectId, esi: u32, k_source: usize) -> Vec<u8> {
627 let mut seed_buf = [0_u8; 20];
628 seed_buf[..16].copy_from_slice(group_id.as_bytes());
629 seed_buf[16..].copy_from_slice(&esi.to_le_bytes());
630 let mut seed = xxhash_rust::xxh3::xxh3_64(&seed_buf);
631
632 let mut coefficients = Vec::with_capacity(k_source);
633 for _ in 0..k_source {
634 seed = xorshift64(seed);
635 let mut coefficient = seed.to_le_bytes()[0];
636 if coefficient == 0 {
637 coefficient = 1;
638 }
639 coefficients.push(coefficient);
640 }
641 coefficients
642}
643
644fn xorshift64(mut value: u64) -> u64 {
645 value ^= value << 13;
646 value ^= value >> 7;
647 value ^ (value << 17)
648}
649
650#[cfg(test)]
651#[allow(clippy::too_many_lines)]
652mod tests {
653 use super::*;
654
655 fn make_object(seed: u8, len: usize) -> EcsObject {
656 let mut bytes = Vec::with_capacity(len);
657 for idx in 0..len {
658 let idx_u8 = u8::try_from(idx % 251).unwrap_or(0);
659 bytes.push(seed.wrapping_mul(31).wrapping_add(idx_u8));
660 }
661 EcsObject::from_canonical(bytes)
662 }
663
664 fn drop_indices(symbols: &[GroupSymbol], drop: &[usize]) -> Vec<GroupSymbol> {
665 symbols
666 .iter()
667 .enumerate()
668 .filter(|(idx, _)| !drop.contains(idx))
669 .map(|(_, symbol)| symbol.clone())
670 .collect()
671 }
672
673 #[test]
674 fn test_coding_group_encode_decode() {
675 let objects = vec![
676 make_object(1, 96),
677 make_object(2, 64),
678 make_object(3, 48),
679 make_object(4, 80),
680 make_object(5, 120),
681 ];
682 let encoded =
683 encode_coding_group_with_repair(&objects, 64, Some(6)).expect("encode coding group");
684 let decoded = decode_coding_group(&encoded.group, &encoded.symbols).expect("decode");
685 assert_eq!(decoded, objects);
686 }
687
688 #[test]
689 fn test_coding_group_with_loss() {
690 let objects = vec![
691 make_object(11, 72),
692 make_object(12, 53),
693 make_object(13, 88),
694 make_object(14, 41),
695 make_object(15, 97),
696 ];
697 let encoded =
698 encode_coding_group_with_repair(&objects, 48, Some(8)).expect("encode coding group");
699 let total = encoded.symbols.len();
700 let drop_count = total / 5; let mut drop = Vec::new();
702 for idx in 0..drop_count {
703 drop.push((idx * 3) % total);
704 }
705 let received = drop_indices(&encoded.symbols, &drop);
706 let decoded = decode_coding_group(&encoded.group, &received).expect("decode with loss");
707 assert_eq!(decoded, objects);
708 }
709
710 #[test]
711 fn test_coding_group_member_verification() {
712 let objects = vec![
713 make_object(21, 50),
714 make_object(22, 70),
715 make_object(23, 90),
716 ];
717 let encoded = encode_coding_group_with_repair(&objects, 32, Some(4)).expect("encode");
718 let mut tampered_group = encoded.group.clone();
719 tampered_group.member_ids[1] = ObjectId::from_bytes([0xFF; 16]);
720 let err = decode_coding_group(&tampered_group, &encoded.symbols).expect_err("must fail");
721 assert!(
722 err.to_string().contains("object id mismatch"),
723 "unexpected error: {err}"
724 );
725 }
726
727 #[test]
728 fn test_coding_group_demultiplexing() {
729 let objects = vec![
730 make_object(31, 3),
731 make_object(32, 129),
732 make_object(33, 7),
733 make_object(34, 48),
734 ];
735 let encoded = encode_coding_group_with_repair(&objects, 64, Some(6)).expect("encode");
736 let decoded = decode_coding_group(&encoded.group, &encoded.symbols).expect("decode");
737 let decoded_lens: Vec<usize> = decoded
738 .iter()
739 .map(|object| object.canonical_bytes.len())
740 .collect();
741 assert_eq!(decoded_lens, vec![3, 129, 7, 48]);
742 assert_eq!(decoded, objects);
743 }
744
745 #[test]
746 fn test_coding_group_deterministic() {
747 let objects = vec![
748 make_object(41, 77),
749 make_object(42, 88),
750 make_object(43, 99),
751 ];
752 let encoded_a = encode_coding_group_with_repair(&objects, 32, Some(5)).expect("encode a");
753 let encoded_b = encode_coding_group_with_repair(&objects, 32, Some(5)).expect("encode b");
754 assert_eq!(encoded_a.group, encoded_b.group);
755 assert_eq!(encoded_a.symbols, encoded_b.symbols);
756 }
757
758 #[test]
759 fn test_coding_group_single_object() {
760 let objects = vec![make_object(51, 200)];
761 let encoded = encode_coding_group_with_repair(&objects, 64, Some(4)).expect("encode");
762 let decoded = decode_coding_group(&encoded.group, &encoded.symbols).expect("decode");
763 assert_eq!(decoded, objects);
764 assert_eq!(encoded.group.member_ids.len(), 1);
765 }
766
767 #[test]
768 fn prop_coding_group_roundtrip() {
769 for object_count in 1_u8..=8 {
770 let mut objects = Vec::new();
771 for idx in 0..object_count {
772 let len = usize::from(idx).saturating_mul(37).saturating_add(11);
773 objects.push(make_object(idx.wrapping_add(70), len));
774 }
775 let encoded =
776 encode_coding_group_with_repair(&objects, 48, Some(8)).expect("encode property");
777 let decoded = decode_coding_group(&encoded.group, &encoded.symbols).expect("decode");
778 assert_eq!(decoded, objects);
779 }
780 }
781
782 fn run_e2e_replication_catchup() {
783 let missing_objects = vec![
784 make_object(81, 64),
785 make_object(82, 32),
786 make_object(83, 96),
787 make_object(84, 55),
788 make_object(85, 78),
789 ];
790 let batch = build_replication_catchup_batch(&missing_objects, 40).expect("encode catchup");
791
792 let received = drop_indices(&batch.symbols, &[1, 7]);
794 let recovered = decode_coding_group(&batch.group, &received).expect("decode catchup");
795 assert_eq!(recovered, missing_objects);
796 }
797
798 #[test]
799 fn test_e2e_replication_catchup_with_coding_group() {
800 run_e2e_replication_catchup();
801 }
802
803 #[test]
804 fn test_e2e_multicast_coding_group() {
805 let objects = vec![
806 make_object(91, 64),
807 make_object(92, 128),
808 make_object(93, 36),
809 make_object(94, 72),
810 make_object(95, 28),
811 make_object(96, 40),
812 ];
813 let batch = encode_coding_group_with_repair(&objects, 48, Some(9)).expect("encode");
814
815 let replica_a = drop_indices(&batch.symbols, &[1, 5, 9]);
816 let replica_b = drop_indices(&batch.symbols, &[0, 3, 8, 11]);
817 let replica_c = drop_indices(&batch.symbols, &[2, 4, 6, 10]);
818
819 let decoded_a = decode_coding_group(&batch.group, &replica_a).expect("decode a");
820 let decoded_b = decode_coding_group(&batch.group, &replica_b).expect("decode b");
821 let decoded_c = decode_coding_group(&batch.group, &replica_c).expect("decode c");
822
823 assert_eq!(decoded_a, objects);
824 assert_eq!(decoded_b, objects);
825 assert_eq!(decoded_c, objects);
826 }
827
828 #[test]
829 fn test_bd_1hi_26_unit_compliance_gate() {
830 assert_eq!(INTER_OBJECT_BEAD_ID, "bd-1hi.26");
831 assert_eq!(INTER_OBJECT_LOGGING_STANDARD, "bd-1fpm");
832 let objects = vec![make_object(101, 48), make_object(102, 72)];
833 let batch = encode_coding_group(&objects, 32).expect("encode");
834 assert!(batch.group.k_source >= 1);
835 assert!(!batch.symbols.is_empty());
836 }
837
838 #[test]
839 fn prop_bd_1hi_26_structure_compliance() {
840 for symbol_size in [16_u32, 32, 48, 64, 96, 128] {
841 let objects = vec![
842 make_object(111, 25),
843 make_object(112, 63),
844 make_object(113, 91),
845 ];
846 let batch =
847 encode_coding_group_with_repair(&objects, symbol_size, Some(6)).expect("encode");
848 assert_eq!(batch.group.member_ids.len(), batch.group.member_lens.len());
849 assert_eq!(batch.group.group_id, batch.symbols[0].group_id);
850 let recovered = decode_coding_group(&batch.group, &batch.symbols).expect("decode");
851 assert_eq!(recovered, objects);
852 }
853 }
854
855 #[test]
856 fn test_e2e_bd_1hi_26_compliance() {
857 run_e2e_replication_catchup();
858 test_e2e_multicast_coding_group();
859 }
860}