Skip to main content

fsqlite_core/
inter_object_coding.rs

1//! Inter-object coding groups for replication catch-up (ยง3.5.6, `bd-1hi.26`).
2//!
3//! This module provides deterministic group encoding across multiple ECS
4//! objects and reconstruction from any sufficiently informative symbol subset.
5
6use fsqlite_error::{FrankenError, Result};
7use fsqlite_types::{ObjectId, gf256_inverse_byte, gf256_mul_byte};
8use tracing::{debug, error, info, warn};
9
10const INTER_OBJECT_BEAD_ID: &str = "bd-1hi.26";
11const INTER_OBJECT_LOGGING_STANDARD: &str = "bd-1fpm";
12const ECS_OBJECT_ID_DOMAIN: &[u8] = b"fsqlite:ecs:v1";
13const CODING_GROUP_ID_DOMAIN: &[u8] = b"fsqlite:coding-group:v1";
14const DEFAULT_REPAIR_OVERHEAD_BPS: u16 = 2_000; // 20%
15
16/// Canonical ECS object payload for coding-group construction.
17#[derive(Debug, Clone, PartialEq, Eq)]
18pub struct EcsObject {
19    /// Content-addressed identity of `canonical_bytes`.
20    pub object_id: ObjectId,
21    /// Canonical bytes (wire format) for the object.
22    pub canonical_bytes: Vec<u8>,
23}
24
25impl EcsObject {
26    /// Construct an object from canonical bytes and derive its object id.
27    #[must_use]
28    pub fn from_canonical(canonical_bytes: Vec<u8>) -> Self {
29        let object_id = derive_ecs_object_id(&canonical_bytes);
30        Self {
31            object_id,
32            canonical_bytes,
33        }
34    }
35
36    /// Construct with explicit object id.
37    #[must_use]
38    pub const fn with_object_id(object_id: ObjectId, canonical_bytes: Vec<u8>) -> Self {
39        Self {
40            object_id,
41            canonical_bytes,
42        }
43    }
44}
45
46/// Coding-group metadata used by sender and receiver.
47#[derive(Debug, Clone, PartialEq, Eq)]
48pub struct CodingGroup {
49    /// Deterministic content-addressed group id.
50    pub group_id: ObjectId,
51    /// Member object ids in canonical concatenation order.
52    pub member_ids: Vec<ObjectId>,
53    /// Total concatenated canonical bytes before padding.
54    pub total_len: u64,
55    /// Individual member byte lengths (for demultiplexing).
56    pub member_lens: Vec<u64>,
57    /// Number of source symbols in the grouped stream.
58    pub k_source: u32,
59    /// Symbol size used for the group.
60    pub symbol_size: u32,
61    /// Number of repair symbols generated for this group.
62    pub repair_symbol_count: u32,
63}
64
65/// One symbol in a coded group stream.
66#[derive(Debug, Clone, PartialEq, Eq)]
67pub struct GroupSymbol {
68    /// Coding group this symbol belongs to.
69    pub group_id: ObjectId,
70    /// Encoding symbol identifier.
71    pub esi: u32,
72    /// Symbol payload bytes.
73    pub data: Vec<u8>,
74    /// Coefficient row over source symbols.
75    pub coefficients: Vec<u8>,
76}
77
78/// Encoded catch-up batch for replication transfer.
79#[derive(Debug, Clone, PartialEq, Eq)]
80pub struct CodedCatchupBatch {
81    /// Group metadata.
82    pub group: CodingGroup,
83    /// Streamable symbols.
84    pub symbols: Vec<GroupSymbol>,
85}
86
87#[derive(Debug, Clone, PartialEq, Eq)]
88struct LinearRow {
89    coefficients: Vec<u8>,
90    payload: Vec<u8>,
91}
92
93/// Encode multiple ECS objects into one inter-object coding group with default
94/// repair overhead.
95///
96/// # Errors
97///
98/// Returns an error when object inputs or symbol size are invalid.
99pub fn encode_coding_group(objects: &[EcsObject], symbol_size: u32) -> Result<CodedCatchupBatch> {
100    encode_coding_group_with_repair(objects, symbol_size, None)
101}
102
103/// Encode multiple ECS objects into one inter-object coding group with an
104/// optional explicit repair-symbol count.
105///
106/// # Errors
107///
108/// Returns an error when inputs are invalid.
109#[allow(clippy::too_many_lines)]
110pub fn encode_coding_group_with_repair(
111    objects: &[EcsObject],
112    symbol_size: u32,
113    repair_symbol_count: Option<u32>,
114) -> Result<CodedCatchupBatch> {
115    #[allow(clippy::too_many_lines)]
116    fn inner(
117        objects: &[EcsObject],
118        symbol_size: u32,
119        repair_symbol_count: Option<u32>,
120    ) -> Result<CodedCatchupBatch> {
121        if objects.is_empty() {
122            return Err(FrankenError::OutOfRange {
123                what: "coding_group.member_count".to_owned(),
124                value: "0".to_owned(),
125            });
126        }
127        if symbol_size == 0 {
128            return Err(FrankenError::OutOfRange {
129                what: "coding_group.symbol_size".to_owned(),
130                value: "0".to_owned(),
131            });
132        }
133
134        debug!(
135            bead_id = INTER_OBJECT_BEAD_ID,
136            logging_standard = INTER_OBJECT_LOGGING_STANDARD,
137            member_count = objects.len(),
138            symbol_size = symbol_size,
139            "encoding inter-object coding group"
140        );
141
142        let member_ids: Vec<ObjectId> = objects.iter().map(|object| object.object_id).collect();
143        let member_lens: Vec<u64> = objects
144            .iter()
145            .map(|object| u64::try_from(object.canonical_bytes.len()).unwrap_or(u64::MAX))
146            .collect();
147
148        let mut concatenated = Vec::new();
149        for object in objects {
150            concatenated.extend_from_slice(&object.canonical_bytes);
151        }
152        let total_len =
153            u64::try_from(concatenated.len()).map_err(|_| FrankenError::OutOfRange {
154                what: "coding_group.total_len".to_owned(),
155                value: concatenated.len().to_string(),
156            })?;
157
158        let symbol_size_usize =
159            usize::try_from(symbol_size).map_err(|_| FrankenError::OutOfRange {
160                what: "coding_group.symbol_size".to_owned(),
161                value: symbol_size.to_string(),
162            })?;
163        let k_source_usize = ceil_div_usize(concatenated.len(), symbol_size_usize);
164        let k_source = u32::try_from(k_source_usize).map_err(|_| FrankenError::OutOfRange {
165            what: "coding_group.k_source".to_owned(),
166            value: k_source_usize.to_string(),
167        })?;
168
169        if k_source == 0 {
170            return Err(FrankenError::OutOfRange {
171                what: "coding_group.k_source".to_owned(),
172                value: "0".to_owned(),
173            });
174        }
175
176        let group_id = derive_group_id(&member_ids, &member_lens, total_len, k_source, symbol_size);
177        let default_repairs = default_repair_count(k_source);
178        let repair_count = repair_symbol_count.unwrap_or(default_repairs);
179
180        let padded_len = k_source_usize
181            .checked_mul(symbol_size_usize)
182            .ok_or_else(|| FrankenError::OutOfRange {
183                what: "coding_group.padded_len".to_owned(),
184                value: format!("{k_source_usize}*{symbol_size_usize}"),
185            })?;
186        concatenated.resize(padded_len, 0);
187
188        let mut source_payloads = Vec::with_capacity(k_source_usize);
189        let mut symbols = Vec::with_capacity(
190            usize::try_from(k_source.saturating_add(repair_count)).unwrap_or(k_source_usize),
191        );
192
193        for source_idx in 0..k_source_usize {
194            let start = source_idx * symbol_size_usize;
195            let end = start + symbol_size_usize;
196            let data = concatenated[start..end].to_vec();
197            source_payloads.push(data.clone());
198            symbols.push(GroupSymbol {
199                group_id,
200                esi: u32::try_from(source_idx).unwrap_or(u32::MAX),
201                data,
202                coefficients: unit_vector(k_source_usize, source_idx),
203            });
204        }
205
206        for repair_idx in 0..repair_count {
207            let esi = k_source.saturating_add(repair_idx);
208            let coefficients = deterministic_coefficients(group_id, esi, k_source_usize);
209            let data = linear_combine(&source_payloads, &coefficients, symbol_size_usize);
210            symbols.push(GroupSymbol {
211                group_id,
212                esi,
213                data,
214                coefficients,
215            });
216        }
217
218        info!(
219            bead_id = INTER_OBJECT_BEAD_ID,
220            logging_standard = INTER_OBJECT_LOGGING_STANDARD,
221            group_id = %group_id,
222            member_count = member_ids.len(),
223            total_len = total_len,
224            k_source = k_source,
225            repair_symbol_count = repair_count,
226            symbol_count = symbols.len(),
227            "inter-object coding group encoded"
228        );
229
230        Ok(CodedCatchupBatch {
231            group: CodingGroup {
232                group_id,
233                member_ids,
234                total_len,
235                member_lens,
236                k_source,
237                symbol_size,
238                repair_symbol_count: repair_count,
239            },
240            symbols,
241        })
242    }
243    inner(objects, symbol_size, repair_symbol_count)
244}
245
246/// Decode a coding group from received symbols.
247///
248/// # Errors
249///
250/// Returns an error when symbols are insufficient or inconsistent.
251#[allow(clippy::too_many_lines)]
252pub fn decode_coding_group(group: &CodingGroup, symbols: &[GroupSymbol]) -> Result<Vec<EcsObject>> {
253    #[allow(clippy::too_many_lines)]
254    fn inner(group: &CodingGroup, symbols: &[GroupSymbol]) -> Result<Vec<EcsObject>> {
255        validate_group(group)?;
256        let symbol_size_usize =
257            usize::try_from(group.symbol_size).map_err(|_| FrankenError::DatabaseCorrupt {
258                detail: format!("invalid group symbol_size {}", group.symbol_size),
259            })?;
260        let k_source_usize =
261            usize::try_from(group.k_source).map_err(|_| FrankenError::DatabaseCorrupt {
262                detail: format!("invalid group k_source {}", group.k_source),
263            })?;
264
265        let mut relevant: Vec<&GroupSymbol> = symbols
266            .iter()
267            .filter(|symbol| symbol.group_id == group.group_id)
268            .collect();
269        relevant.sort_by_key(|symbol| symbol.esi);
270
271        if relevant.len() < k_source_usize {
272            warn!(
273                bead_id = INTER_OBJECT_BEAD_ID,
274                logging_standard = INTER_OBJECT_LOGGING_STANDARD,
275                group_id = %group.group_id,
276                required_symbols = k_source_usize,
277                received_symbols = relevant.len(),
278                "insufficient symbols for coding-group decode"
279            );
280            return Err(FrankenError::DatabaseCorrupt {
281                detail: format!(
282                    "reason_code=inter_object_decode_insufficient_symbols group_id={} required={} received={}",
283                    group.group_id,
284                    k_source_usize,
285                    relevant.len()
286                ),
287            });
288        }
289
290        debug!(
291            bead_id = INTER_OBJECT_BEAD_ID,
292            logging_standard = INTER_OBJECT_LOGGING_STANDARD,
293            group_id = %group.group_id,
294            required_symbols = k_source_usize,
295            candidate_symbols = relevant.len(),
296            "decoding inter-object coding group"
297        );
298
299        let mut rows = Vec::with_capacity(relevant.len());
300        for symbol in relevant {
301            if symbol.data.len() != symbol_size_usize {
302                return Err(FrankenError::DatabaseCorrupt {
303                    detail: format!(
304                        "symbol size mismatch for group {}: expected {}, got {}",
305                        group.group_id,
306                        symbol_size_usize,
307                        symbol.data.len()
308                    ),
309                });
310            }
311            let coefficients = if symbol.coefficients.len() == k_source_usize {
312                symbol.coefficients.clone()
313            } else if symbol.esi < group.k_source {
314                unit_vector(
315                    k_source_usize,
316                    usize::try_from(symbol.esi).unwrap_or(usize::MAX),
317                )
318            } else {
319                return Err(FrankenError::DatabaseCorrupt {
320                    detail: format!(
321                        "missing coefficient row for repair symbol esi={} group_id={}",
322                        symbol.esi, group.group_id
323                    ),
324                });
325            };
326            rows.push(LinearRow {
327                coefficients,
328                payload: symbol.data.clone(),
329            });
330        }
331
332        let source_payloads = solve_source_symbols(rows, k_source_usize, symbol_size_usize)
333            .map_err(|decode_error| {
334                error!(
335                    bead_id = INTER_OBJECT_BEAD_ID,
336                    logging_standard = INTER_OBJECT_LOGGING_STANDARD,
337                    group_id = %group.group_id,
338                    error = %decode_error,
339                    "coding-group decode failed"
340                );
341                decode_error
342            })?;
343
344        let mut concatenated =
345            Vec::with_capacity(k_source_usize.checked_mul(symbol_size_usize).unwrap_or(0));
346        for payload in &source_payloads {
347            concatenated.extend_from_slice(payload);
348        }
349        let total_len_usize =
350            usize::try_from(group.total_len).map_err(|_| FrankenError::DatabaseCorrupt {
351                detail: format!("invalid group total_len {}", group.total_len),
352            })?;
353        if total_len_usize > concatenated.len() {
354            return Err(FrankenError::DatabaseCorrupt {
355                detail: format!(
356                    "decoded payload shorter than total_len: decoded={} total_len={}",
357                    concatenated.len(),
358                    total_len_usize
359                ),
360            });
361        }
362        concatenated.truncate(total_len_usize);
363
364        let mut offset = 0_usize;
365        let mut recovered = Vec::with_capacity(group.member_lens.len());
366        for (member_idx, member_len) in group.member_lens.iter().enumerate() {
367            let member_len =
368                usize::try_from(*member_len).map_err(|_| FrankenError::DatabaseCorrupt {
369                    detail: format!("invalid member length {}", member_len),
370                })?;
371            let end = offset.saturating_add(member_len);
372            if end > concatenated.len() {
373                return Err(FrankenError::DatabaseCorrupt {
374                    detail: format!(
375                        "demultiplex overflow at member {}: offset={} len={} decoded_len={}",
376                        member_idx,
377                        offset,
378                        member_len,
379                        concatenated.len()
380                    ),
381                });
382            }
383            let object = EcsObject::from_canonical(concatenated[offset..end].to_vec());
384            let expected = group.member_ids[member_idx];
385            if object.object_id != expected {
386                return Err(FrankenError::DatabaseCorrupt {
387                    detail: format!(
388                        "object id mismatch at member {}: expected {}, got {}",
389                        member_idx, expected, object.object_id
390                    ),
391                });
392            }
393            recovered.push(object);
394            offset = end;
395        }
396
397        info!(
398            bead_id = INTER_OBJECT_BEAD_ID,
399            logging_standard = INTER_OBJECT_LOGGING_STANDARD,
400            group_id = %group.group_id,
401            recovered_objects = recovered.len(),
402            "inter-object coding group decoded"
403        );
404
405        Ok(recovered)
406    }
407    inner(group, symbols)
408}
409
410/// Build a catch-up batch for replication anti-entropy transfer.
411///
412/// # Errors
413///
414/// Returns an error when group encoding fails.
415pub fn build_replication_catchup_batch(
416    missing_objects: &[EcsObject],
417    symbol_size: u32,
418) -> Result<CodedCatchupBatch> {
419    encode_coding_group(missing_objects, symbol_size)
420}
421
422fn validate_group(group: &CodingGroup) -> Result<()> {
423    if group.member_ids.is_empty() {
424        return Err(FrankenError::DatabaseCorrupt {
425            detail: "coding group has no members".to_owned(),
426        });
427    }
428    if group.member_ids.len() != group.member_lens.len() {
429        return Err(FrankenError::DatabaseCorrupt {
430            detail: format!(
431                "member id/length mismatch: ids={} lens={}",
432                group.member_ids.len(),
433                group.member_lens.len()
434            ),
435        });
436    }
437    let total: u64 = group.member_lens.iter().copied().sum();
438    if total != group.total_len {
439        return Err(FrankenError::DatabaseCorrupt {
440            detail: format!(
441                "total_len mismatch: declared={} computed={}",
442                group.total_len, total
443            ),
444        });
445    }
446    if group.k_source == 0 || group.symbol_size == 0 {
447        return Err(FrankenError::DatabaseCorrupt {
448            detail: format!(
449                "invalid group dimensions: k_source={} symbol_size={}",
450                group.k_source, group.symbol_size
451            ),
452        });
453    }
454    Ok(())
455}
456
457fn solve_source_symbols(
458    mut rows: Vec<LinearRow>,
459    k_source: usize,
460    symbol_size: usize,
461) -> Result<Vec<Vec<u8>>> {
462    let mut pivot_row = 0_usize;
463    for col in 0..k_source {
464        let Some(found) = rows
465            .iter()
466            .enumerate()
467            .skip(pivot_row)
468            .find(|(_, row)| row.coefficients[col] != 0)
469            .map(|(idx, _)| idx)
470        else {
471            return Err(FrankenError::DatabaseCorrupt {
472                detail: format!(
473                    "reason_code=inter_object_decode_rank_deficient missing_pivot_col={col}"
474                ),
475            });
476        };
477
478        if found != pivot_row {
479            rows.swap(found, pivot_row);
480        }
481
482        let pivot = rows[pivot_row].coefficients[col];
483        let Some(inv_pivot) = gf256_inverse_byte(pivot) else {
484            return Err(FrankenError::DatabaseCorrupt {
485                detail: format!("non-invertible pivot in column {col}: value={pivot}"),
486            });
487        };
488
489        scale_row(&mut rows[pivot_row], inv_pivot);
490        let pivot_snapshot = rows[pivot_row].clone();
491
492        let mut row_idx = 0_usize;
493        while row_idx < rows.len() {
494            if row_idx == pivot_row {
495                row_idx += 1;
496                continue;
497            }
498            let factor = rows[row_idx].coefficients[col];
499            if factor == 0 {
500                row_idx += 1;
501                continue;
502            }
503            eliminate_row(
504                &mut rows[row_idx],
505                &pivot_snapshot,
506                factor,
507                col,
508                symbol_size,
509            );
510            row_idx += 1;
511        }
512
513        pivot_row += 1;
514        if pivot_row == k_source {
515            break;
516        }
517    }
518
519    if pivot_row < k_source {
520        return Err(FrankenError::DatabaseCorrupt {
521            detail: format!(
522                "reason_code=inter_object_decode_not_enough_independent_symbols rank={pivot_row} required={k_source}"
523            ),
524        });
525    }
526
527    Ok(rows
528        .into_iter()
529        .take(k_source)
530        .map(|row| row.payload)
531        .collect())
532}
533
534fn scale_row(row: &mut LinearRow, scalar: u8) {
535    for coeff in &mut row.coefficients {
536        *coeff = gf256_mul_byte(*coeff, scalar);
537    }
538    for byte in &mut row.payload {
539        *byte = gf256_mul_byte(*byte, scalar);
540    }
541}
542
543fn eliminate_row(
544    target: &mut LinearRow,
545    pivot: &LinearRow,
546    factor: u8,
547    col_start: usize,
548    symbol_size: usize,
549) {
550    for idx in col_start..target.coefficients.len() {
551        let scaled = gf256_mul_byte(pivot.coefficients[idx], factor);
552        target.coefficients[idx] ^= scaled;
553    }
554    for idx in 0..symbol_size {
555        let scaled = gf256_mul_byte(pivot.payload[idx], factor);
556        target.payload[idx] ^= scaled;
557    }
558}
559
560fn linear_combine(source_payloads: &[Vec<u8>], coefficients: &[u8], symbol_size: usize) -> Vec<u8> {
561    let mut out = vec![0_u8; symbol_size];
562    for (source, coefficient) in source_payloads.iter().zip(coefficients.iter()) {
563        if *coefficient == 0 {
564            continue;
565        }
566        for (dst, src) in out.iter_mut().zip(source.iter()) {
567            *dst ^= gf256_mul_byte(*coefficient, *src);
568        }
569    }
570    out
571}
572
573fn unit_vector(len: usize, hot_index: usize) -> Vec<u8> {
574    let mut out = vec![0_u8; len];
575    if hot_index < len {
576        out[hot_index] = 1;
577    }
578    out
579}
580
581fn ceil_div_usize(numerator: usize, denominator: usize) -> usize {
582    let q = numerator / denominator;
583    let r = numerator % denominator;
584    if r == 0 { q } else { q + 1 }
585}
586
587fn default_repair_count(k_source: u32) -> u32 {
588    let k_source_u64 = u64::from(k_source);
589    let numerator = k_source_u64.saturating_mul(u64::from(DEFAULT_REPAIR_OVERHEAD_BPS));
590    let repair = numerator.div_ceil(10_000);
591    u32::try_from(repair.max(1)).unwrap_or(u32::MAX)
592}
593
594fn derive_ecs_object_id(canonical_bytes: &[u8]) -> ObjectId {
595    let mut hasher = blake3::Hasher::new();
596    hasher.update(ECS_OBJECT_ID_DOMAIN);
597    hasher.update(canonical_bytes);
598    let digest = hasher.finalize();
599    let mut bytes = [0_u8; 16];
600    bytes.copy_from_slice(&digest.as_bytes()[..16]);
601    ObjectId::from_bytes(bytes)
602}
603
604fn derive_group_id(
605    member_ids: &[ObjectId],
606    member_lens: &[u64],
607    total_len: u64,
608    k_source: u32,
609    symbol_size: u32,
610) -> ObjectId {
611    let mut hasher = blake3::Hasher::new();
612    hasher.update(CODING_GROUP_ID_DOMAIN);
613    hasher.update(&total_len.to_le_bytes());
614    hasher.update(&k_source.to_le_bytes());
615    hasher.update(&symbol_size.to_le_bytes());
616    for (member_id, member_len) in member_ids.iter().zip(member_lens.iter()) {
617        hasher.update(member_id.as_bytes());
618        hasher.update(&member_len.to_le_bytes());
619    }
620    let digest = hasher.finalize();
621    let mut bytes = [0_u8; 16];
622    bytes.copy_from_slice(&digest.as_bytes()[..16]);
623    ObjectId::from_bytes(bytes)
624}
625
626fn deterministic_coefficients(group_id: ObjectId, esi: u32, k_source: usize) -> Vec<u8> {
627    let mut seed_buf = [0_u8; 20];
628    seed_buf[..16].copy_from_slice(group_id.as_bytes());
629    seed_buf[16..].copy_from_slice(&esi.to_le_bytes());
630    let mut seed = xxhash_rust::xxh3::xxh3_64(&seed_buf);
631
632    let mut coefficients = Vec::with_capacity(k_source);
633    for _ in 0..k_source {
634        seed = xorshift64(seed);
635        let mut coefficient = seed.to_le_bytes()[0];
636        if coefficient == 0 {
637            coefficient = 1;
638        }
639        coefficients.push(coefficient);
640    }
641    coefficients
642}
643
644fn xorshift64(mut value: u64) -> u64 {
645    value ^= value << 13;
646    value ^= value >> 7;
647    value ^ (value << 17)
648}
649
650#[cfg(test)]
651#[allow(clippy::too_many_lines)]
652mod tests {
653    use super::*;
654
655    fn make_object(seed: u8, len: usize) -> EcsObject {
656        let mut bytes = Vec::with_capacity(len);
657        for idx in 0..len {
658            let idx_u8 = u8::try_from(idx % 251).unwrap_or(0);
659            bytes.push(seed.wrapping_mul(31).wrapping_add(idx_u8));
660        }
661        EcsObject::from_canonical(bytes)
662    }
663
664    fn drop_indices(symbols: &[GroupSymbol], drop: &[usize]) -> Vec<GroupSymbol> {
665        symbols
666            .iter()
667            .enumerate()
668            .filter(|(idx, _)| !drop.contains(idx))
669            .map(|(_, symbol)| symbol.clone())
670            .collect()
671    }
672
673    #[test]
674    fn test_coding_group_encode_decode() {
675        let objects = vec![
676            make_object(1, 96),
677            make_object(2, 64),
678            make_object(3, 48),
679            make_object(4, 80),
680            make_object(5, 120),
681        ];
682        let encoded =
683            encode_coding_group_with_repair(&objects, 64, Some(6)).expect("encode coding group");
684        let decoded = decode_coding_group(&encoded.group, &encoded.symbols).expect("decode");
685        assert_eq!(decoded, objects);
686    }
687
688    #[test]
689    fn test_coding_group_with_loss() {
690        let objects = vec![
691            make_object(11, 72),
692            make_object(12, 53),
693            make_object(13, 88),
694            make_object(14, 41),
695            make_object(15, 97),
696        ];
697        let encoded =
698            encode_coding_group_with_repair(&objects, 48, Some(8)).expect("encode coding group");
699        let total = encoded.symbols.len();
700        let drop_count = total / 5; // 20%
701        let mut drop = Vec::new();
702        for idx in 0..drop_count {
703            drop.push((idx * 3) % total);
704        }
705        let received = drop_indices(&encoded.symbols, &drop);
706        let decoded = decode_coding_group(&encoded.group, &received).expect("decode with loss");
707        assert_eq!(decoded, objects);
708    }
709
710    #[test]
711    fn test_coding_group_member_verification() {
712        let objects = vec![
713            make_object(21, 50),
714            make_object(22, 70),
715            make_object(23, 90),
716        ];
717        let encoded = encode_coding_group_with_repair(&objects, 32, Some(4)).expect("encode");
718        let mut tampered_group = encoded.group.clone();
719        tampered_group.member_ids[1] = ObjectId::from_bytes([0xFF; 16]);
720        let err = decode_coding_group(&tampered_group, &encoded.symbols).expect_err("must fail");
721        assert!(
722            err.to_string().contains("object id mismatch"),
723            "unexpected error: {err}"
724        );
725    }
726
727    #[test]
728    fn test_coding_group_demultiplexing() {
729        let objects = vec![
730            make_object(31, 3),
731            make_object(32, 129),
732            make_object(33, 7),
733            make_object(34, 48),
734        ];
735        let encoded = encode_coding_group_with_repair(&objects, 64, Some(6)).expect("encode");
736        let decoded = decode_coding_group(&encoded.group, &encoded.symbols).expect("decode");
737        let decoded_lens: Vec<usize> = decoded
738            .iter()
739            .map(|object| object.canonical_bytes.len())
740            .collect();
741        assert_eq!(decoded_lens, vec![3, 129, 7, 48]);
742        assert_eq!(decoded, objects);
743    }
744
745    #[test]
746    fn test_coding_group_deterministic() {
747        let objects = vec![
748            make_object(41, 77),
749            make_object(42, 88),
750            make_object(43, 99),
751        ];
752        let encoded_a = encode_coding_group_with_repair(&objects, 32, Some(5)).expect("encode a");
753        let encoded_b = encode_coding_group_with_repair(&objects, 32, Some(5)).expect("encode b");
754        assert_eq!(encoded_a.group, encoded_b.group);
755        assert_eq!(encoded_a.symbols, encoded_b.symbols);
756    }
757
758    #[test]
759    fn test_coding_group_single_object() {
760        let objects = vec![make_object(51, 200)];
761        let encoded = encode_coding_group_with_repair(&objects, 64, Some(4)).expect("encode");
762        let decoded = decode_coding_group(&encoded.group, &encoded.symbols).expect("decode");
763        assert_eq!(decoded, objects);
764        assert_eq!(encoded.group.member_ids.len(), 1);
765    }
766
767    #[test]
768    fn prop_coding_group_roundtrip() {
769        for object_count in 1_u8..=8 {
770            let mut objects = Vec::new();
771            for idx in 0..object_count {
772                let len = usize::from(idx).saturating_mul(37).saturating_add(11);
773                objects.push(make_object(idx.wrapping_add(70), len));
774            }
775            let encoded =
776                encode_coding_group_with_repair(&objects, 48, Some(8)).expect("encode property");
777            let decoded = decode_coding_group(&encoded.group, &encoded.symbols).expect("decode");
778            assert_eq!(decoded, objects);
779        }
780    }
781
782    fn run_e2e_replication_catchup() {
783        let missing_objects = vec![
784            make_object(81, 64),
785            make_object(82, 32),
786            make_object(83, 96),
787            make_object(84, 55),
788            make_object(85, 78),
789        ];
790        let batch = build_replication_catchup_batch(&missing_objects, 40).expect("encode catchup");
791
792        // Simulate lossy replication transport for lagging replica.
793        let received = drop_indices(&batch.symbols, &[1, 7]);
794        let recovered = decode_coding_group(&batch.group, &received).expect("decode catchup");
795        assert_eq!(recovered, missing_objects);
796    }
797
798    #[test]
799    fn test_e2e_replication_catchup_with_coding_group() {
800        run_e2e_replication_catchup();
801    }
802
803    #[test]
804    fn test_e2e_multicast_coding_group() {
805        let objects = vec![
806            make_object(91, 64),
807            make_object(92, 128),
808            make_object(93, 36),
809            make_object(94, 72),
810            make_object(95, 28),
811            make_object(96, 40),
812        ];
813        let batch = encode_coding_group_with_repair(&objects, 48, Some(9)).expect("encode");
814
815        let replica_a = drop_indices(&batch.symbols, &[1, 5, 9]);
816        let replica_b = drop_indices(&batch.symbols, &[0, 3, 8, 11]);
817        let replica_c = drop_indices(&batch.symbols, &[2, 4, 6, 10]);
818
819        let decoded_a = decode_coding_group(&batch.group, &replica_a).expect("decode a");
820        let decoded_b = decode_coding_group(&batch.group, &replica_b).expect("decode b");
821        let decoded_c = decode_coding_group(&batch.group, &replica_c).expect("decode c");
822
823        assert_eq!(decoded_a, objects);
824        assert_eq!(decoded_b, objects);
825        assert_eq!(decoded_c, objects);
826    }
827
828    #[test]
829    fn test_bd_1hi_26_unit_compliance_gate() {
830        assert_eq!(INTER_OBJECT_BEAD_ID, "bd-1hi.26");
831        assert_eq!(INTER_OBJECT_LOGGING_STANDARD, "bd-1fpm");
832        let objects = vec![make_object(101, 48), make_object(102, 72)];
833        let batch = encode_coding_group(&objects, 32).expect("encode");
834        assert!(batch.group.k_source >= 1);
835        assert!(!batch.symbols.is_empty());
836    }
837
838    #[test]
839    fn prop_bd_1hi_26_structure_compliance() {
840        for symbol_size in [16_u32, 32, 48, 64, 96, 128] {
841            let objects = vec![
842                make_object(111, 25),
843                make_object(112, 63),
844                make_object(113, 91),
845            ];
846            let batch =
847                encode_coding_group_with_repair(&objects, symbol_size, Some(6)).expect("encode");
848            assert_eq!(batch.group.member_ids.len(), batch.group.member_lens.len());
849            assert_eq!(batch.group.group_id, batch.symbols[0].group_id);
850            let recovered = decode_coding_group(&batch.group, &batch.symbols).expect("decode");
851            assert_eq!(recovered, objects);
852        }
853    }
854
855    #[test]
856    fn test_e2e_bd_1hi_26_compliance() {
857        run_e2e_replication_catchup();
858        test_e2e_multicast_coding_group();
859    }
860}