Skip to main content

asupersync/
decoding.rs

1//! RaptorQ decoding pipeline (Phase 0).
2//!
3//! This module provides a deterministic, block-oriented decoding pipeline that
4//! reconstructs original data from a set of received symbols. The current
5//! implementation mirrors the systematic RaptorQ encoder: it solves for
6//! intermediate symbols using the precode constraints and LT repair rows, then
7//! reconstitutes source symbols deterministically for testing.
8
9use crate::error::{Error, ErrorKind};
10use crate::raptorq::decoder::{
11    DecodeError as RaptorDecodeError, InactivationDecoder, ReceivedSymbol,
12};
13use crate::raptorq::gf256::{gf256_addmul_slice, Gf256};
14use crate::raptorq::systematic::{ConstraintMatrix, SystematicParams};
15use crate::security::{AuthenticatedSymbol, SecurityContext};
16use crate::types::symbol_set::{InsertResult, SymbolSet, ThresholdConfig};
17use crate::types::{ObjectId, ObjectParams, Symbol, SymbolId, SymbolKind};
18use std::collections::{BTreeSet, HashMap};
19use std::time::Duration;
20
21/// Errors produced by the decoding pipeline.
22#[derive(Debug, thiserror::Error)]
23pub enum DecodingError {
24    /// Authentication failed for a symbol.
25    #[error("authentication failed for symbol {symbol_id}")]
26    AuthenticationFailed {
27        /// The symbol that failed authentication.
28        symbol_id: SymbolId,
29    },
30    /// Not enough symbols to decode.
31    #[error("insufficient symbols: have {received}, need {needed}")]
32    InsufficientSymbols {
33        /// Received symbol count.
34        received: usize,
35        /// Needed symbol count.
36        needed: usize,
37    },
38    /// Matrix inversion failed during decoding.
39    #[error("matrix inversion failed: {reason}")]
40    MatrixInversionFailed {
41        /// Reason for failure.
42        reason: String,
43    },
44    /// Block timed out before decoding completed.
45    #[error("block timeout after {elapsed:?}")]
46    BlockTimeout {
47        /// Block number.
48        sbn: u8,
49        /// Elapsed time.
50        elapsed: Duration,
51    },
52    /// Inconsistent metadata for a block or object.
53    #[error("inconsistent block metadata: {sbn} {details}")]
54    InconsistentMetadata {
55        /// Block number.
56        sbn: u8,
57        /// Details of the inconsistency.
58        details: String,
59    },
60    /// Symbol size mismatch.
61    #[error("symbol size mismatch: expected {expected}, got {actual}")]
62    SymbolSizeMismatch {
63        /// Expected size in bytes.
64        expected: u16,
65        /// Actual size in bytes.
66        actual: usize,
67    },
68}
69
70impl From<DecodingError> for Error {
71    fn from(err: DecodingError) -> Self {
72        match &err {
73            DecodingError::AuthenticationFailed { .. } => Self::new(ErrorKind::CorruptedSymbol),
74            DecodingError::InsufficientSymbols { .. } => Self::new(ErrorKind::InsufficientSymbols),
75            DecodingError::MatrixInversionFailed { .. }
76            | DecodingError::InconsistentMetadata { .. }
77            | DecodingError::SymbolSizeMismatch { .. } => Self::new(ErrorKind::DecodingFailed),
78            DecodingError::BlockTimeout { .. } => Self::new(ErrorKind::ThresholdTimeout),
79        }
80        .with_message(err.to_string())
81    }
82}
83
84/// Reasons a symbol may be rejected by the decoder.
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum RejectReason {
87    /// Symbol belongs to a different object.
88    WrongObjectId,
89    /// Authentication failed.
90    AuthenticationFailed,
91    /// Symbol size mismatch.
92    SymbolSizeMismatch,
93    /// Block already decoded.
94    BlockAlreadyDecoded,
95    /// Decode failed due to insufficient rank.
96    InsufficientRank,
97    /// Decode failed due to inconsistent equations.
98    InconsistentEquations,
99    /// Invalid or inconsistent metadata.
100    InvalidMetadata,
101    /// Memory or buffer limit reached.
102    MemoryLimitReached,
103}
104
105/// Result of feeding a symbol into the decoder.
106#[derive(Debug, Clone, PartialEq, Eq)]
107pub enum SymbolAcceptResult {
108    /// Symbol accepted and stored.
109    Accepted {
110        /// Symbols received for the block.
111        received: usize,
112        /// Estimated symbols needed for decode.
113        needed: usize,
114    },
115    /// Decoding started for the block.
116    DecodingStarted {
117        /// Block number being decoded.
118        block_sbn: u8,
119    },
120    /// Block fully decoded.
121    BlockComplete {
122        /// Block number.
123        block_sbn: u8,
124        /// Decoded block data.
125        data: Vec<u8>,
126    },
127    /// Duplicate symbol ignored.
128    Duplicate,
129    /// Symbol rejected.
130    Rejected(RejectReason),
131}
132
133/// Configuration for decoding operations.
134#[derive(Debug, Clone)]
135pub struct DecodingConfig {
136    /// Symbol size in bytes (must match encoding).
137    pub symbol_size: u16,
138    /// Maximum source block size in bytes.
139    pub max_block_size: usize,
140    /// Repair overhead factor (e.g., 1.05 = 5% extra symbols).
141    pub repair_overhead: f64,
142    /// Minimum extra symbols beyond K.
143    pub min_overhead: usize,
144    /// Maximum symbols to buffer per block (0 = unlimited).
145    pub max_buffered_symbols: usize,
146    /// Block timeout (not enforced in Phase 0).
147    pub block_timeout: Duration,
148    /// Whether to verify authentication tags.
149    pub verify_auth: bool,
150}
151
152impl Default for DecodingConfig {
153    fn default() -> Self {
154        Self {
155            symbol_size: 256,
156            max_block_size: 1024 * 1024,
157            repair_overhead: 1.05,
158            min_overhead: 0,
159            max_buffered_symbols: 0,
160            block_timeout: Duration::from_secs(30),
161            verify_auth: false,
162        }
163    }
164}
165
166/// Progress summary for decoding.
167#[derive(Debug, Clone, Copy)]
168pub struct DecodingProgress {
169    /// Blocks fully decoded.
170    pub blocks_complete: usize,
171    /// Total blocks expected (if known).
172    pub blocks_total: Option<usize>,
173    /// Total symbols received.
174    pub symbols_received: usize,
175    /// Estimated symbols needed to complete decode.
176    pub symbols_needed_estimate: usize,
177}
178
179/// Per-block status.
180#[derive(Debug, Clone, Copy)]
181pub struct BlockStatus {
182    /// Block number.
183    pub sbn: u8,
184    /// Symbols received for this block.
185    pub symbols_received: usize,
186    /// Estimated symbols needed for this block.
187    pub symbols_needed: usize,
188    /// Block state.
189    pub state: BlockStateKind,
190}
191
192/// High-level block state.
193#[derive(Debug, Clone, Copy, PartialEq, Eq)]
194pub enum BlockStateKind {
195    /// Collecting symbols.
196    Collecting,
197    /// Decoding in progress.
198    Decoding,
199    /// Decoded successfully.
200    Decoded,
201    /// Decoding failed.
202    Failed,
203}
204
205#[derive(Debug)]
206struct BlockDecoder {
207    sbn: u8,
208    state: BlockDecodingState,
209    decoded: Option<Vec<u8>>,
210}
211
212#[derive(Debug)]
213enum BlockDecodingState {
214    Collecting,
215    Decoding,
216    Decoded,
217    Failed,
218}
219
220/// Main decoding pipeline.
221#[derive(Debug)]
222pub struct DecodingPipeline {
223    config: DecodingConfig,
224    symbols: SymbolSet,
225    blocks: HashMap<u8, BlockDecoder>,
226    completed_blocks: BTreeSet<u8>,
227    object_id: Option<ObjectId>,
228    object_size: Option<u64>,
229    block_plans: Option<Vec<BlockPlan>>,
230    auth_context: Option<SecurityContext>,
231}
232
233impl DecodingPipeline {
234    /// Creates a new decoding pipeline.
235    #[must_use]
236    pub fn new(config: DecodingConfig) -> Self {
237        let threshold = ThresholdConfig::new(
238            config.repair_overhead,
239            config.min_overhead,
240            config.max_buffered_symbols,
241        );
242        Self {
243            config,
244            symbols: SymbolSet::with_config(threshold),
245            blocks: HashMap::new(),
246            completed_blocks: BTreeSet::new(),
247            object_id: None,
248            object_size: None,
249            block_plans: None,
250            auth_context: None,
251        }
252    }
253
254    /// Creates a new decoding pipeline with authentication enabled.
255    #[must_use]
256    pub fn with_auth(config: DecodingConfig, ctx: SecurityContext) -> Self {
257        let mut pipeline = Self::new(config);
258        pipeline.auth_context = Some(ctx);
259        pipeline
260    }
261
262    /// Sets object parameters (object size, symbol size, and block layout).
263    pub fn set_object_params(&mut self, params: ObjectParams) -> Result<(), DecodingError> {
264        if params.symbol_size != self.config.symbol_size {
265            return Err(DecodingError::SymbolSizeMismatch {
266                expected: self.config.symbol_size,
267                actual: params.symbol_size as usize,
268            });
269        }
270        if let Some(existing) = self.object_id {
271            if existing != params.object_id {
272                return Err(DecodingError::InconsistentMetadata {
273                    sbn: 0,
274                    details: format!(
275                        "object id mismatch: expected {existing:?}, got {:?}",
276                        params.object_id
277                    ),
278                });
279            }
280        }
281        self.object_id = Some(params.object_id);
282        self.object_size = Some(params.object_size);
283        self.block_plans = Some(plan_blocks(
284            params.object_size as usize,
285            usize::from(params.symbol_size),
286            self.config.max_block_size,
287        )?);
288        self.configure_block_k();
289        Ok(())
290    }
291
292    /// Feeds a received authenticated symbol into the pipeline.
293    pub fn feed(
294        &mut self,
295        mut auth_symbol: AuthenticatedSymbol,
296    ) -> Result<SymbolAcceptResult, DecodingError> {
297        let symbol_id = auth_symbol.symbol().id();
298
299        if self.config.verify_auth {
300            let Some(ctx) = &self.auth_context else {
301                return Err(DecodingError::AuthenticationFailed { symbol_id });
302            };
303            if !auth_symbol.is_verified()
304                && ctx.verify_authenticated_symbol(&mut auth_symbol).is_err()
305            {
306                return Ok(SymbolAcceptResult::Rejected(
307                    RejectReason::AuthenticationFailed,
308                ));
309            }
310        }
311
312        let symbol = auth_symbol.into_symbol();
313
314        if symbol.len() != usize::from(self.config.symbol_size) {
315            return Ok(SymbolAcceptResult::Rejected(
316                RejectReason::SymbolSizeMismatch,
317            ));
318        }
319
320        if let Some(object_id) = self.object_id {
321            if object_id != symbol.object_id() {
322                return Ok(SymbolAcceptResult::Rejected(RejectReason::WrongObjectId));
323            }
324        } else {
325            self.object_id = Some(symbol.object_id());
326        }
327
328        let sbn = symbol.sbn();
329        if self.completed_blocks.contains(&sbn) {
330            return Ok(SymbolAcceptResult::Rejected(
331                RejectReason::BlockAlreadyDecoded,
332            ));
333        }
334
335        // Ensure block entry exists
336        self.blocks.entry(sbn).or_insert_with(|| BlockDecoder {
337            sbn,
338            state: BlockDecodingState::Collecting,
339            decoded: None,
340        });
341
342        let insert_result = self.symbols.insert(symbol);
343        match insert_result {
344            InsertResult::Duplicate => Ok(SymbolAcceptResult::Duplicate),
345            InsertResult::MemoryLimitReached | InsertResult::BlockLimitReached { .. } => Ok(
346                SymbolAcceptResult::Rejected(RejectReason::MemoryLimitReached),
347            ),
348            InsertResult::Inserted {
349                block_progress,
350                threshold_reached,
351            } => {
352                if block_progress.k.is_none() {
353                    self.configure_block_k();
354                }
355                let needed = block_progress.k.map_or(0, |k| {
356                    required_symbols(k, self.config.repair_overhead, self.config.min_overhead)
357                });
358                let received = block_progress.total();
359
360                if threshold_reached {
361                    // Update state to Decoding
362                    if let Some(block) = self.blocks.get_mut(&sbn) {
363                        block.state = BlockDecodingState::Decoding;
364                    }
365                    if let Some(result) = self.try_decode_block(sbn) {
366                        return Ok(result);
367                    }
368                }
369
370                // Reset state to Collecting (if not decoded)
371                if let Some(block) = self.blocks.get_mut(&sbn) {
372                    if !matches!(
373                        block.state,
374                        BlockDecodingState::Decoded | BlockDecodingState::Failed
375                    ) {
376                        block.state = BlockDecodingState::Collecting;
377                    }
378                }
379                Ok(SymbolAcceptResult::Accepted { received, needed })
380            }
381        }
382    }
383
384    /// Feeds a batch of symbols.
385    pub fn feed_batch(
386        &mut self,
387        symbols: impl Iterator<Item = AuthenticatedSymbol>,
388    ) -> Vec<Result<SymbolAcceptResult, DecodingError>> {
389        symbols.map(|symbol| self.feed(symbol)).collect()
390    }
391
392    /// Returns true if all expected blocks are decoded.
393    #[must_use]
394    pub fn is_complete(&self) -> bool {
395        let Some(plans) = &self.block_plans else {
396            return false;
397        };
398        self.completed_blocks.len() == plans.len()
399    }
400
401    /// Returns decoding progress.
402    #[must_use]
403    pub fn progress(&self) -> DecodingProgress {
404        let blocks_total = self.block_plans.as_ref().map(Vec::len);
405        let symbols_received = self.symbols.len();
406        let symbols_needed_estimate = self.block_plans.as_ref().map_or(0, |plans| {
407            plans
408                .iter()
409                .map(|plan| {
410                    required_symbols(
411                        plan.k as u16,
412                        self.config.repair_overhead,
413                        self.config.min_overhead,
414                    )
415                })
416                .sum()
417        });
418
419        DecodingProgress {
420            blocks_complete: self.completed_blocks.len(),
421            blocks_total,
422            symbols_received,
423            symbols_needed_estimate,
424        }
425    }
426
427    /// Returns per-block status if known.
428    #[must_use]
429    pub fn block_status(&self, sbn: u8) -> Option<BlockStatus> {
430        let progress = self.symbols.block_progress(sbn)?;
431        let state = self
432            .blocks
433            .get(&sbn)
434            .map_or(BlockStateKind::Collecting, |block| match block.state {
435                BlockDecodingState::Collecting => BlockStateKind::Collecting,
436                BlockDecodingState::Decoding => BlockStateKind::Decoding,
437                BlockDecodingState::Decoded => BlockStateKind::Decoded,
438                BlockDecodingState::Failed => BlockStateKind::Failed,
439            });
440
441        let symbols_needed = progress.k.map_or(0, |k| {
442            required_symbols(k, self.config.repair_overhead, self.config.min_overhead)
443        });
444
445        Some(BlockStatus {
446            sbn,
447            symbols_received: progress.total(),
448            symbols_needed,
449            state,
450        })
451    }
452
453    /// Consumes the pipeline and returns decoded data if complete.
454    pub fn into_data(self) -> Result<Vec<u8>, DecodingError> {
455        let Some(plans) = &self.block_plans else {
456            return Err(DecodingError::InconsistentMetadata {
457                sbn: 0,
458                details: "object parameters not set".to_string(),
459            });
460        };
461        if !self.is_complete() {
462            let received = self.symbols.len();
463            let needed = plans
464                .iter()
465                .map(|plan| {
466                    required_symbols(
467                        plan.k as u16,
468                        self.config.repair_overhead,
469                        self.config.min_overhead,
470                    )
471                })
472                .sum();
473            return Err(DecodingError::InsufficientSymbols { received, needed });
474        }
475
476        let mut output = Vec::with_capacity(self.object_size.unwrap_or(0) as usize);
477        for plan in plans {
478            let block = self
479                .blocks
480                .get(&plan.sbn)
481                .and_then(|b| b.decoded.as_ref())
482                .ok_or_else(|| DecodingError::InconsistentMetadata {
483                    sbn: plan.sbn,
484                    details: "missing decoded block".to_string(),
485                })?;
486            output.extend_from_slice(block);
487        }
488
489        if let Some(size) = self.object_size {
490            output.truncate(size as usize);
491        }
492
493        Ok(output)
494    }
495
496    fn configure_block_k(&mut self) {
497        let Some(plans) = &self.block_plans else {
498            return;
499        };
500        for plan in plans {
501            let _ = self.symbols.set_block_k(plan.sbn, plan.k as u16);
502        }
503    }
504
505    fn try_decode_block(&mut self, sbn: u8) -> Option<SymbolAcceptResult> {
506        let block_plan = self.block_plan(sbn)?;
507        let k = block_plan.k;
508        if k == 0 {
509            return None;
510        }
511
512        let symbols: Vec<Symbol> = self.symbols.symbols_for_block(sbn).cloned().collect();
513        if symbols.len() < k {
514            return None;
515        }
516
517        let decoded_symbols =
518            match decode_block(block_plan, &symbols, usize::from(self.config.symbol_size)) {
519                Ok(symbols) => symbols,
520                Err(DecodingError::InsufficientSymbols { .. }) => {
521                    return Some(SymbolAcceptResult::Rejected(RejectReason::InsufficientRank));
522                }
523                Err(DecodingError::MatrixInversionFailed { .. }) => {
524                    return Some(SymbolAcceptResult::Rejected(
525                        RejectReason::InconsistentEquations,
526                    ));
527                }
528                Err(DecodingError::InconsistentMetadata { .. }) => {
529                    let block = self.blocks.get_mut(&sbn);
530                    if let Some(block) = block {
531                        block.state = BlockDecodingState::Failed;
532                    }
533                    return Some(SymbolAcceptResult::Rejected(RejectReason::InvalidMetadata));
534                }
535                Err(DecodingError::SymbolSizeMismatch { .. }) => {
536                    let block = self.blocks.get_mut(&sbn);
537                    if let Some(block) = block {
538                        block.state = BlockDecodingState::Failed;
539                    }
540                    return Some(SymbolAcceptResult::Rejected(
541                        RejectReason::SymbolSizeMismatch,
542                    ));
543                }
544                Err(_err) => {
545                    let block = self.blocks.get_mut(&sbn);
546                    if let Some(block) = block {
547                        block.state = BlockDecodingState::Failed;
548                    }
549                    return Some(SymbolAcceptResult::Rejected(
550                        RejectReason::MemoryLimitReached,
551                    ));
552                }
553            };
554
555        let mut block_data = Vec::with_capacity(block_plan.len);
556        for symbol in &decoded_symbols {
557            block_data.extend_from_slice(symbol.data());
558        }
559        block_data.truncate(block_plan.len);
560
561        if let Some(block) = self.blocks.get_mut(&sbn) {
562            block.state = BlockDecodingState::Decoded;
563            block.decoded = Some(block_data.clone());
564        }
565
566        self.completed_blocks.insert(sbn);
567        self.symbols.clear_block(sbn);
568
569        Some(SymbolAcceptResult::BlockComplete {
570            block_sbn: sbn,
571            data: block_data,
572        })
573    }
574
575    fn block_plan(&self, sbn: u8) -> Option<&BlockPlan> {
576        self.block_plans
577            .as_ref()
578            .and_then(|plans| plans.iter().find(|plan| plan.sbn == sbn))
579    }
580}
581
582#[derive(Debug, Clone)]
583struct BlockPlan {
584    sbn: u8,
585    start: usize,
586    len: usize,
587    k: usize,
588}
589
590impl BlockPlan {
591    fn end(&self) -> usize {
592        self.start + self.len
593    }
594}
595
596fn plan_blocks(
597    object_size: usize,
598    symbol_size: usize,
599    max_block_size: usize,
600) -> Result<Vec<BlockPlan>, DecodingError> {
601    if object_size == 0 {
602        return Ok(Vec::new());
603    }
604
605    if symbol_size == 0 {
606        return Err(DecodingError::InconsistentMetadata {
607            sbn: 0,
608            details: "symbol_size must be > 0".to_string(),
609        });
610    }
611
612    let max_blocks = u8::MAX as usize + 1;
613    let max_total = max_block_size.saturating_mul(max_blocks);
614    if object_size > max_total {
615        return Err(DecodingError::InconsistentMetadata {
616            sbn: 0,
617            details: format!("object size {object_size} exceeds limit {max_total}"),
618        });
619    }
620
621    let mut blocks = Vec::new();
622    let mut offset = 0;
623    let mut sbn: u8 = 0;
624
625    while offset < object_size {
626        let len = usize::min(max_block_size, object_size - offset);
627        let k = len.div_ceil(symbol_size);
628        blocks.push(BlockPlan {
629            sbn,
630            start: offset,
631            len,
632            k,
633        });
634        offset += len;
635        sbn = sbn.wrapping_add(1);
636    }
637
638    Ok(blocks)
639}
640
641fn required_symbols(k: u16, overhead: f64, min_overhead: usize) -> usize {
642    let raw = (f64::from(k) * overhead).ceil();
643    if raw.is_sign_negative() {
644        return 0;
645    }
646    #[allow(clippy::cast_sign_loss)]
647    let threshold = raw as usize + min_overhead;
648    threshold
649}
650
651fn decode_block(
652    plan: &BlockPlan,
653    symbols: &[Symbol],
654    symbol_size: usize,
655) -> Result<Vec<Symbol>, DecodingError> {
656    let k = plan.k;
657    if symbols.len() < k {
658        return Err(DecodingError::InsufficientSymbols {
659            received: symbols.len(),
660            needed: k,
661        });
662    }
663
664    let object_id = symbols.first().map_or(ObjectId::NIL, Symbol::object_id);
665    let params = SystematicParams::for_source_block(k, symbol_size);
666    let block_seed = seed_for_block(object_id, plan.sbn);
667    let constraints = ConstraintMatrix::build(&params, block_seed);
668    let base_rows = params.s + params.h;
669
670    let decoder = InactivationDecoder::new(k, symbol_size, block_seed);
671    let mut received: Vec<ReceivedSymbol> = Vec::with_capacity(base_rows + symbols.len());
672
673    for row in 0..base_rows {
674        let (columns, coefficients) = constraint_row_equation(&constraints, row);
675        received.push(ReceivedSymbol {
676            esi: row as u32,
677            is_source: false,
678            columns,
679            coefficients,
680            data: vec![0u8; symbol_size],
681        });
682    }
683
684    for symbol in symbols {
685        match symbol.kind() {
686            SymbolKind::Source => {
687                let esi = symbol.esi() as usize;
688                if esi >= k {
689                    return Err(DecodingError::InconsistentMetadata {
690                        sbn: plan.sbn,
691                        details: format!("source esi {esi} >= k {k}"),
692                    });
693                }
694                let row = base_rows + esi;
695                let (columns, coefficients) = constraint_row_equation(&constraints, row);
696                received.push(ReceivedSymbol {
697                    esi: symbol.esi(),
698                    is_source: true,
699                    columns,
700                    coefficients,
701                    data: symbol.data().to_vec(),
702                });
703            }
704            SymbolKind::Repair => {
705                let (columns, coefficients) = decoder.repair_equation(symbol.esi());
706                received.push(ReceivedSymbol {
707                    esi: symbol.esi(),
708                    is_source: false,
709                    columns,
710                    coefficients,
711                    data: symbol.data().to_vec(),
712                });
713            }
714        }
715    }
716
717    let intermediate = match decoder.decode(&received) {
718        Ok(result) => result.intermediate,
719        Err(err) => {
720            let mapped = match err {
721                RaptorDecodeError::InsufficientSymbols { received, required } => {
722                    DecodingError::InsufficientSymbols {
723                        received,
724                        needed: required,
725                    }
726                }
727                RaptorDecodeError::SingularMatrix { row } => DecodingError::MatrixInversionFailed {
728                    reason: format!("singular matrix at row {row}"),
729                },
730                RaptorDecodeError::SymbolSizeMismatch { expected, actual } => {
731                    DecodingError::SymbolSizeMismatch {
732                        expected: expected as u16,
733                        actual,
734                    }
735                }
736            };
737            return Err(mapped);
738        }
739    };
740
741    let mut decoded_symbols = Vec::with_capacity(k);
742    for esi in 0..k {
743        let row = base_rows + esi;
744        let mut data = vec![0u8; symbol_size];
745        for (col, symbol) in intermediate.iter().enumerate().take(params.l) {
746            let coeff = constraints.get(row, col);
747            if !coeff.is_zero() {
748                gf256_addmul_slice(&mut data, symbol, coeff);
749            }
750        }
751        decoded_symbols.push(Symbol::new(
752            SymbolId::new(object_id, plan.sbn, esi as u32),
753            data,
754            SymbolKind::Source,
755        ));
756    }
757
758    Ok(decoded_symbols)
759}
760
761fn constraint_row_equation(constraints: &ConstraintMatrix, row: usize) -> (Vec<usize>, Vec<Gf256>) {
762    let mut columns = Vec::new();
763    let mut coefficients = Vec::new();
764    for col in 0..constraints.cols {
765        let coeff = constraints.get(row, col);
766        if !coeff.is_zero() {
767            columns.push(col);
768            coefficients.push(coeff);
769        }
770    }
771    (columns, coefficients)
772}
773
774fn seed_for_block(object_id: ObjectId, sbn: u8) -> u64 {
775    seed_for(object_id, sbn, 0)
776}
777
778fn seed_for(object_id: ObjectId, sbn: u8, esi: u32) -> u64 {
779    let obj = object_id.as_u128();
780    let hi = (obj >> 64) as u64;
781    let lo = obj as u64;
782    let mut seed = hi ^ lo.rotate_left(13);
783    seed ^= u64::from(sbn) << 56;
784    seed ^= u64::from(esi);
785    if seed == 0 {
786        1
787    } else {
788        seed
789    }
790}
791
792#[cfg(test)]
793mod tests {
794    use super::*;
795    use crate::encoding::EncodingPipeline;
796    use crate::types::resource::{PoolConfig, SymbolPool};
797
798    fn init_test(name: &str) {
799        crate::test_utils::init_test_logging();
800        crate::test_phase!(name);
801    }
802
803    fn pool() -> SymbolPool {
804        SymbolPool::new(PoolConfig {
805            symbol_size: 256,
806            initial_size: 64,
807            max_size: 64,
808            allow_growth: false,
809            growth_increment: 0,
810        })
811    }
812
813    fn encoding_config() -> crate::config::EncodingConfig {
814        crate::config::EncodingConfig {
815            symbol_size: 256,
816            max_block_size: 1024,
817            repair_overhead: 1.05,
818            encoding_parallelism: 1,
819            decoding_parallelism: 1,
820        }
821    }
822
823    fn decoder_with_params(
824        config: &crate::config::EncodingConfig,
825        object_id: ObjectId,
826        data_len: usize,
827        repair_overhead: f64,
828        min_overhead: usize,
829    ) -> DecodingPipeline {
830        let mut decoder = DecodingPipeline::new(DecodingConfig {
831            symbol_size: config.symbol_size,
832            max_block_size: config.max_block_size,
833            repair_overhead,
834            min_overhead,
835            max_buffered_symbols: 0,
836            block_timeout: Duration::from_secs(30),
837            verify_auth: false,
838        });
839        let symbols_per_block = (data_len.div_ceil(usize::from(config.symbol_size))) as u16;
840        decoder
841            .set_object_params(ObjectParams::new(
842                object_id,
843                data_len as u64,
844                config.symbol_size,
845                1,
846                symbols_per_block,
847            ))
848            .expect("params");
849        decoder
850    }
851
852    #[test]
853    fn decode_roundtrip_sources_only() {
854        init_test("decode_roundtrip_sources_only");
855        let config = encoding_config();
856        let mut encoder = EncodingPipeline::new(config.clone(), pool());
857        let object_id = ObjectId::new_for_test(1);
858        let data = vec![42u8; 512];
859        let symbols: Vec<Symbol> = encoder
860            .encode_with_repair(object_id, &data, 0)
861            .map(|res| res.unwrap().into_symbol())
862            .collect();
863
864        let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.0, 0);
865
866        for symbol in symbols {
867            let auth = AuthenticatedSymbol::from_parts(
868                symbol,
869                crate::security::tag::AuthenticationTag::zero(),
870            );
871            let _ = decoder.feed(auth).unwrap();
872        }
873
874        let decoded_data = decoder.into_data().expect("decoded");
875        let ok = decoded_data == data;
876        crate::assert_with_log!(ok, "decoded data", data, decoded_data);
877        crate::test_complete!("decode_roundtrip_sources_only");
878    }
879
880    #[test]
881    fn decode_roundtrip_out_of_order() {
882        init_test("decode_roundtrip_out_of_order");
883        let config = encoding_config();
884        let mut encoder = EncodingPipeline::new(config.clone(), pool());
885        let object_id = ObjectId::new_for_test(2);
886        let data = vec![7u8; 768];
887        let mut symbols: Vec<Symbol> = encoder
888            .encode_with_repair(object_id, &data, 2)
889            .map(|res| res.expect("encode").into_symbol())
890            .collect();
891
892        symbols.reverse();
893
894        let mut decoder =
895            decoder_with_params(&config, object_id, data.len(), config.repair_overhead, 0);
896
897        for symbol in symbols {
898            let auth = AuthenticatedSymbol::from_parts(
899                symbol,
900                crate::security::tag::AuthenticationTag::zero(),
901            );
902            let _ = decoder.feed(auth).expect("feed");
903        }
904
905        let decoded_data = decoder.into_data().expect("decoded");
906        let ok = decoded_data == data;
907        crate::assert_with_log!(ok, "decoded data", data, decoded_data);
908        crate::test_complete!("decode_roundtrip_out_of_order");
909    }
910
911    #[test]
912    fn reject_wrong_object_id() {
913        init_test("reject_wrong_object_id");
914        let config = encoding_config();
915        let mut encoder = EncodingPipeline::new(config.clone(), pool());
916        let object_id_a = ObjectId::new_for_test(10);
917        let object_id_b = ObjectId::new_for_test(11);
918        let data = vec![1u8; 128];
919
920        let mut decoder =
921            decoder_with_params(&config, object_id_a, data.len(), config.repair_overhead, 0);
922
923        let symbol_b = encoder
924            .encode_with_repair(object_id_b, &data, 0)
925            .next()
926            .expect("symbol")
927            .expect("encode")
928            .into_symbol();
929        let auth = AuthenticatedSymbol::from_parts(
930            symbol_b,
931            crate::security::tag::AuthenticationTag::zero(),
932        );
933
934        let result = decoder.feed(auth).expect("feed");
935        let expected = SymbolAcceptResult::Rejected(RejectReason::WrongObjectId);
936        let ok = result == expected;
937        crate::assert_with_log!(ok, "wrong object id", expected, result);
938        crate::test_complete!("reject_wrong_object_id");
939    }
940
941    #[test]
942    fn reject_symbol_size_mismatch() {
943        init_test("reject_symbol_size_mismatch");
944        let config = encoding_config();
945        let mut decoder = DecodingPipeline::new(DecodingConfig {
946            symbol_size: config.symbol_size,
947            max_block_size: config.max_block_size,
948            repair_overhead: config.repair_overhead,
949            min_overhead: 0,
950            max_buffered_symbols: 0,
951            block_timeout: Duration::from_secs(30),
952            verify_auth: false,
953        });
954
955        let symbol = Symbol::new(
956            SymbolId::new(ObjectId::new_for_test(20), 0, 0),
957            vec![0u8; 8],
958            SymbolKind::Source,
959        );
960        let auth = AuthenticatedSymbol::from_parts(
961            symbol,
962            crate::security::tag::AuthenticationTag::zero(),
963        );
964        let result = decoder.feed(auth).expect("feed");
965        let expected = SymbolAcceptResult::Rejected(RejectReason::SymbolSizeMismatch);
966        let ok = result == expected;
967        crate::assert_with_log!(ok, "symbol size mismatch", expected, result);
968        crate::test_complete!("reject_symbol_size_mismatch");
969    }
970
971    #[test]
972    fn reject_invalid_metadata_esi_out_of_range() {
973        init_test("reject_invalid_metadata_esi_out_of_range");
974        let mut decoder = DecodingPipeline::new(DecodingConfig {
975            symbol_size: 8,
976            max_block_size: 8,
977            repair_overhead: 1.0,
978            min_overhead: 0,
979            max_buffered_symbols: 0,
980            block_timeout: Duration::from_secs(30),
981            verify_auth: false,
982        });
983        let object_id = ObjectId::new_for_test(21);
984        decoder
985            .set_object_params(ObjectParams::new(object_id, 8, 8, 1, 1))
986            .expect("params");
987
988        let symbol = Symbol::new(
989            SymbolId::new(object_id, 0, 1),
990            vec![0u8; 8],
991            SymbolKind::Source,
992        );
993        let auth = AuthenticatedSymbol::from_parts(
994            symbol,
995            crate::security::tag::AuthenticationTag::zero(),
996        );
997
998        let result = decoder.feed(auth).expect("feed");
999        let expected = SymbolAcceptResult::Rejected(RejectReason::InvalidMetadata);
1000        let ok = result == expected;
1001        crate::assert_with_log!(ok, "invalid metadata", expected, result);
1002        crate::test_complete!("reject_invalid_metadata_esi_out_of_range");
1003    }
1004
1005    #[test]
1006    fn duplicate_symbol_before_decode() {
1007        init_test("duplicate_symbol_before_decode");
1008        let config = encoding_config();
1009        let mut encoder = EncodingPipeline::new(config.clone(), pool());
1010        let object_id = ObjectId::new_for_test(30);
1011        // Ensure K > 1 so the first symbol cannot complete the block decode.
1012        let data = vec![9u8; 512];
1013
1014        let symbol = encoder
1015            .encode_with_repair(object_id, &data, 0)
1016            .next()
1017            .expect("symbol")
1018            .expect("encode")
1019            .into_symbol();
1020
1021        let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.5, 1);
1022
1023        let first = decoder
1024            .feed(AuthenticatedSymbol::from_parts(
1025                symbol.clone(),
1026                crate::security::tag::AuthenticationTag::zero(),
1027            ))
1028            .expect("feed");
1029        let accepted = matches!(
1030            first,
1031            SymbolAcceptResult::Accepted { .. } | SymbolAcceptResult::DecodingStarted { .. }
1032        );
1033        crate::assert_with_log!(accepted, "first accepted", true, accepted);
1034
1035        let second = decoder
1036            .feed(AuthenticatedSymbol::from_parts(
1037                symbol,
1038                crate::security::tag::AuthenticationTag::zero(),
1039            ))
1040            .expect("feed");
1041        let expected = SymbolAcceptResult::Duplicate;
1042        let ok = second == expected;
1043        crate::assert_with_log!(ok, "second duplicate", expected, second);
1044        crate::test_complete!("duplicate_symbol_before_decode");
1045    }
1046
1047    #[test]
1048    fn into_data_reports_insufficient_symbols() {
1049        init_test("into_data_reports_insufficient_symbols");
1050        let config = encoding_config();
1051        let mut encoder = EncodingPipeline::new(config.clone(), pool());
1052        let object_id = ObjectId::new_for_test(40);
1053        let data = vec![5u8; 512];
1054
1055        let mut decoder =
1056            decoder_with_params(&config, object_id, data.len(), config.repair_overhead, 0);
1057
1058        let symbol = encoder
1059            .encode_with_repair(object_id, &data, 0)
1060            .next()
1061            .expect("symbol")
1062            .expect("encode")
1063            .into_symbol();
1064        let auth = AuthenticatedSymbol::from_parts(
1065            symbol,
1066            crate::security::tag::AuthenticationTag::zero(),
1067        );
1068        let _ = decoder.feed(auth).expect("feed");
1069
1070        let err = decoder
1071            .into_data()
1072            .expect_err("expected insufficient symbols");
1073        let insufficient = matches!(err, DecodingError::InsufficientSymbols { .. });
1074        crate::assert_with_log!(insufficient, "insufficient symbols", true, insufficient);
1075        crate::test_complete!("into_data_reports_insufficient_symbols");
1076    }
1077}