Skip to main content

asupersync/
decoding.rs

1//! RaptorQ decoding pipeline (Phase 0).
2//!
3//! This module provides a deterministic, block-oriented decoding pipeline that
4//! reconstructs original data from a set of received symbols. The current
5//! implementation mirrors the systematic RaptorQ encoder: it solves for
6//! intermediate symbols using the precode constraints and LT repair rows, then
7//! reconstitutes source symbols deterministically for testing.
8
9use crate::error::{Error, ErrorKind};
10use crate::raptorq::decoder::{
11    DecodeError as RaptorDecodeError, InactivationDecoder, ReceivedSymbol,
12};
13use crate::raptorq::gf256::Gf256;
14use crate::raptorq::systematic::SystematicParams;
15use crate::security::{AuthenticatedSymbol, SecurityContext};
16use crate::types::symbol_set::{InsertResult, SymbolSet, ThresholdConfig};
17use crate::types::{ObjectId, ObjectParams, Symbol, SymbolId, SymbolKind};
18use std::collections::{HashMap, HashSet};
19use std::time::Duration;
20
21/// Errors produced by the decoding pipeline.
22#[derive(Debug, thiserror::Error)]
23pub enum DecodingError {
24    /// Authentication failed for a symbol.
25    #[error("authentication failed for symbol {symbol_id}")]
26    AuthenticationFailed {
27        /// The symbol that failed authentication.
28        symbol_id: SymbolId,
29    },
30    /// Not enough symbols to decode.
31    #[error("insufficient symbols: have {received}, need {needed}")]
32    InsufficientSymbols {
33        /// Received symbol count.
34        received: usize,
35        /// Needed symbol count.
36        needed: usize,
37    },
38    /// Matrix inversion failed during decoding.
39    #[error("matrix inversion failed: {reason}")]
40    MatrixInversionFailed {
41        /// Reason for failure.
42        reason: String,
43    },
44    /// Block timed out before decoding completed.
45    #[error("block timeout after {elapsed:?}")]
46    BlockTimeout {
47        /// Block number.
48        sbn: u8,
49        /// Elapsed time.
50        elapsed: Duration,
51    },
52    /// Inconsistent metadata for a block or object.
53    #[error("inconsistent block metadata: {sbn} {details}")]
54    InconsistentMetadata {
55        /// Block number.
56        sbn: u8,
57        /// Details of the inconsistency.
58        details: String,
59    },
60    /// Symbol size mismatch.
61    #[error("symbol size mismatch: expected {expected}, got {actual}")]
62    SymbolSizeMismatch {
63        /// Expected size in bytes.
64        expected: u16,
65        /// Actual size in bytes.
66        actual: usize,
67    },
68}
69
70impl From<DecodingError> for Error {
71    fn from(err: DecodingError) -> Self {
72        match &err {
73            DecodingError::AuthenticationFailed { .. } => Self::new(ErrorKind::CorruptedSymbol),
74            DecodingError::InsufficientSymbols { .. } => Self::new(ErrorKind::InsufficientSymbols),
75            DecodingError::MatrixInversionFailed { .. }
76            | DecodingError::InconsistentMetadata { .. }
77            | DecodingError::SymbolSizeMismatch { .. } => Self::new(ErrorKind::DecodingFailed),
78            DecodingError::BlockTimeout { .. } => Self::new(ErrorKind::ThresholdTimeout),
79        }
80        .with_message(err.to_string())
81    }
82}
83
84/// Reasons a symbol may be rejected by the decoder.
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum RejectReason {
87    /// Symbol belongs to a different object.
88    WrongObjectId,
89    /// Authentication failed.
90    AuthenticationFailed,
91    /// Symbol size mismatch.
92    SymbolSizeMismatch,
93    /// Block already decoded.
94    BlockAlreadyDecoded,
95    /// Decode failed due to insufficient rank.
96    InsufficientRank,
97    /// Decode failed due to inconsistent equations.
98    InconsistentEquations,
99    /// Invalid or inconsistent metadata.
100    InvalidMetadata,
101    /// Memory or buffer limit reached.
102    MemoryLimitReached,
103}
104
105/// Result of feeding a symbol into the decoder.
106#[derive(Debug, Clone, PartialEq, Eq)]
107pub enum SymbolAcceptResult {
108    /// Symbol accepted and stored.
109    Accepted {
110        /// Symbols received for the block.
111        received: usize,
112        /// Estimated symbols needed for decode.
113        needed: usize,
114    },
115    /// Decoding started for the block.
116    DecodingStarted {
117        /// Block number being decoded.
118        block_sbn: u8,
119    },
120    /// Block fully decoded.
121    BlockComplete {
122        /// Block number.
123        block_sbn: u8,
124        /// Decoded block data.
125        data: Vec<u8>,
126    },
127    /// Duplicate symbol ignored.
128    Duplicate,
129    /// Symbol rejected.
130    Rejected(RejectReason),
131}
132
133/// Configuration for decoding operations.
134#[derive(Debug, Clone)]
135pub struct DecodingConfig {
136    /// Symbol size in bytes (must match encoding).
137    pub symbol_size: u16,
138    /// Maximum source block size in bytes.
139    pub max_block_size: usize,
140    /// Repair overhead factor (e.g., 1.05 = 5% extra symbols).
141    pub repair_overhead: f64,
142    /// Minimum extra symbols beyond K.
143    pub min_overhead: usize,
144    /// Maximum symbols to buffer per block (0 = unlimited).
145    pub max_buffered_symbols: usize,
146    /// Block timeout (not enforced in Phase 0).
147    pub block_timeout: Duration,
148    /// Whether to verify authentication tags.
149    pub verify_auth: bool,
150}
151
152impl Default for DecodingConfig {
153    fn default() -> Self {
154        Self {
155            symbol_size: 256,
156            max_block_size: 1024 * 1024,
157            repair_overhead: 1.05,
158            min_overhead: 0,
159            max_buffered_symbols: 0,
160            block_timeout: Duration::from_secs(30),
161            verify_auth: false,
162        }
163    }
164}
165
166/// Progress summary for decoding.
167#[derive(Debug, Clone, Copy)]
168pub struct DecodingProgress {
169    /// Blocks fully decoded.
170    pub blocks_complete: usize,
171    /// Total blocks expected (if known).
172    pub blocks_total: Option<usize>,
173    /// Total symbols received.
174    pub symbols_received: usize,
175    /// Estimated symbols needed to complete decode.
176    pub symbols_needed_estimate: usize,
177}
178
179/// Per-block status.
180#[derive(Debug, Clone, Copy)]
181pub struct BlockStatus {
182    /// Block number.
183    pub sbn: u8,
184    /// Symbols received for this block.
185    pub symbols_received: usize,
186    /// Estimated symbols needed for this block.
187    pub symbols_needed: usize,
188    /// Block state.
189    pub state: BlockStateKind,
190}
191
192/// High-level block state.
193#[derive(Debug, Clone, Copy, PartialEq, Eq)]
194pub enum BlockStateKind {
195    /// Collecting symbols.
196    Collecting,
197    /// Decoding in progress.
198    Decoding,
199    /// Decoded successfully.
200    Decoded,
201    /// Decoding failed.
202    Failed,
203}
204
205#[derive(Debug)]
206struct BlockDecoder {
207    sbn: u8,
208    state: BlockDecodingState,
209    decoded: Option<Vec<u8>>,
210}
211
212#[derive(Debug)]
213enum BlockDecodingState {
214    Collecting,
215    Decoding,
216    Decoded,
217    Failed,
218}
219
220/// Main decoding pipeline.
221#[derive(Debug)]
222pub struct DecodingPipeline {
223    config: DecodingConfig,
224    symbols: SymbolSet,
225    accepted_symbols_total: usize,
226    blocks: HashMap<u8, BlockDecoder>,
227    completed_blocks: HashSet<u8>,
228    object_id: Option<ObjectId>,
229    object_size: Option<u64>,
230    block_plans: Option<Vec<BlockPlan>>,
231    auth_context: Option<SecurityContext>,
232}
233
234impl DecodingPipeline {
235    /// Creates a new decoding pipeline.
236    #[must_use]
237    pub fn new(config: DecodingConfig) -> Self {
238        let threshold = ThresholdConfig::new(
239            config.repair_overhead,
240            config.min_overhead,
241            config.max_buffered_symbols,
242        );
243        Self {
244            config,
245            symbols: SymbolSet::with_config(threshold),
246            accepted_symbols_total: 0,
247            blocks: HashMap::new(),
248            completed_blocks: HashSet::new(),
249            object_id: None,
250            object_size: None,
251            block_plans: None,
252            auth_context: None,
253        }
254    }
255
256    /// Creates a new decoding pipeline with authentication enabled.
257    #[must_use]
258    pub fn with_auth(config: DecodingConfig, ctx: SecurityContext) -> Self {
259        let mut pipeline = Self::new(config);
260        pipeline.auth_context = Some(ctx);
261        pipeline
262    }
263
264    /// Sets object parameters (object size, symbol size, and block layout).
265    pub fn set_object_params(&mut self, params: ObjectParams) -> Result<(), DecodingError> {
266        if params.symbol_size != self.config.symbol_size {
267            return Err(DecodingError::SymbolSizeMismatch {
268                expected: self.config.symbol_size,
269                actual: params.symbol_size as usize,
270            });
271        }
272        if let Some(existing) = self.object_id {
273            if existing != params.object_id {
274                return Err(DecodingError::InconsistentMetadata {
275                    sbn: 0,
276                    details: format!(
277                        "object id mismatch: expected {existing:?}, got {:?}",
278                        params.object_id
279                    ),
280                });
281            }
282        }
283        let plans = plan_blocks(
284            params.object_size as usize,
285            usize::from(params.symbol_size),
286            self.config.max_block_size,
287        )?;
288        validate_object_params_layout(params, &plans)?;
289        self.object_id = Some(params.object_id);
290        self.object_size = Some(params.object_size);
291        self.block_plans = Some(plans);
292        self.configure_block_k();
293        Ok(())
294    }
295
296    /// Feeds a received authenticated symbol into the pipeline.
297    pub fn feed(
298        &mut self,
299        mut auth_symbol: AuthenticatedSymbol,
300    ) -> Result<SymbolAcceptResult, DecodingError> {
301        if self.config.verify_auth {
302            match &self.auth_context {
303                Some(ctx) => {
304                    if !auth_symbol.is_verified()
305                        && ctx.verify_authenticated_symbol(&mut auth_symbol).is_err()
306                    {
307                        return Ok(SymbolAcceptResult::Rejected(
308                            RejectReason::AuthenticationFailed,
309                        ));
310                    }
311                }
312                None => {
313                    // If callers already verified the symbol (e.g. at a trusted boundary), allow
314                    // the verified flag to stand. If not verified and we have no auth context,
315                    // we cannot validate the tag and must reject the symbol deterministically.
316                    if !auth_symbol.is_verified() {
317                        return Ok(SymbolAcceptResult::Rejected(
318                            RejectReason::AuthenticationFailed,
319                        ));
320                    }
321                }
322            }
323        }
324
325        let symbol = auth_symbol.into_symbol();
326
327        if symbol.len() != usize::from(self.config.symbol_size) {
328            return Ok(SymbolAcceptResult::Rejected(
329                RejectReason::SymbolSizeMismatch,
330            ));
331        }
332
333        if let Some(object_id) = self.object_id {
334            if object_id != symbol.object_id() {
335                return Ok(SymbolAcceptResult::Rejected(RejectReason::WrongObjectId));
336            }
337        } else {
338            self.object_id = Some(symbol.object_id());
339        }
340
341        let sbn = symbol.sbn();
342        if self.completed_blocks.contains(&sbn) {
343            return Ok(SymbolAcceptResult::Rejected(
344                RejectReason::BlockAlreadyDecoded,
345            ));
346        }
347
348        // Ensure block entry exists
349        self.blocks.entry(sbn).or_insert_with(|| BlockDecoder {
350            sbn,
351            state: BlockDecodingState::Collecting,
352            decoded: None,
353        });
354
355        let insert_result = self.symbols.insert(symbol);
356        match insert_result {
357            InsertResult::Duplicate => Ok(SymbolAcceptResult::Duplicate),
358            InsertResult::MemoryLimitReached | InsertResult::BlockLimitReached { .. } => Ok(
359                SymbolAcceptResult::Rejected(RejectReason::MemoryLimitReached),
360            ),
361            InsertResult::Inserted {
362                block_progress,
363                threshold_reached,
364            } => {
365                self.accepted_symbols_total = self.accepted_symbols_total.saturating_add(1);
366                if block_progress.k.is_none() {
367                    self.configure_block_k();
368                }
369                let needed = block_progress.k.map_or(0, |k| {
370                    required_symbols(k, self.config.repair_overhead, self.config.min_overhead)
371                });
372                let received = block_progress.total();
373
374                if threshold_reached {
375                    // Update state to Decoding
376                    if let Some(block) = self.blocks.get_mut(&sbn) {
377                        block.state = BlockDecodingState::Decoding;
378                    }
379                    if let Some(result) = self.try_decode_block(sbn) {
380                        return Ok(result);
381                    }
382                }
383
384                // Reset state to Collecting (if not decoded)
385                if let Some(block) = self.blocks.get_mut(&sbn) {
386                    if !matches!(
387                        block.state,
388                        BlockDecodingState::Decoded | BlockDecodingState::Failed
389                    ) {
390                        block.state = BlockDecodingState::Collecting;
391                    }
392                }
393                Ok(SymbolAcceptResult::Accepted { received, needed })
394            }
395        }
396    }
397
398    /// Feeds a batch of symbols.
399    pub fn feed_batch(
400        &mut self,
401        symbols: impl Iterator<Item = AuthenticatedSymbol>,
402    ) -> Vec<Result<SymbolAcceptResult, DecodingError>> {
403        symbols.map(|symbol| self.feed(symbol)).collect()
404    }
405
406    /// Returns true if all expected blocks are decoded.
407    #[must_use]
408    pub fn is_complete(&self) -> bool {
409        let Some(plans) = &self.block_plans else {
410            return false;
411        };
412        self.completed_blocks.len() == plans.len()
413    }
414
415    /// Returns decoding progress.
416    #[must_use]
417    pub fn progress(&self) -> DecodingProgress {
418        let blocks_total = self.block_plans.as_ref().map(Vec::len);
419        let symbols_received = self.accepted_symbols_total;
420        let symbols_needed_estimate = self.block_plans.as_ref().map_or(0, |plans| {
421            sum_required_symbols(plans, self.config.repair_overhead, self.config.min_overhead)
422        });
423
424        DecodingProgress {
425            blocks_complete: self.completed_blocks.len(),
426            blocks_total,
427            symbols_received,
428            symbols_needed_estimate,
429        }
430    }
431
432    /// Returns per-block status if known.
433    #[must_use]
434    pub fn block_status(&self, sbn: u8) -> Option<BlockStatus> {
435        let progress = self.symbols.block_progress(sbn)?;
436        let state = self
437            .blocks
438            .get(&sbn)
439            .map_or(BlockStateKind::Collecting, |block| match block.state {
440                BlockDecodingState::Collecting => BlockStateKind::Collecting,
441                BlockDecodingState::Decoding => BlockStateKind::Decoding,
442                BlockDecodingState::Decoded => BlockStateKind::Decoded,
443                BlockDecodingState::Failed => BlockStateKind::Failed,
444            });
445
446        let symbols_needed = progress.k.map_or(0, |k| {
447            required_symbols(k, self.config.repair_overhead, self.config.min_overhead)
448        });
449
450        Some(BlockStatus {
451            sbn,
452            symbols_received: progress.total(),
453            symbols_needed,
454            state,
455        })
456    }
457
458    /// Consumes the pipeline and returns decoded data if complete.
459    pub fn into_data(self) -> Result<Vec<u8>, DecodingError> {
460        let Some(plans) = &self.block_plans else {
461            return Err(DecodingError::InconsistentMetadata {
462                sbn: 0,
463                details: "object parameters not set".to_string(),
464            });
465        };
466        if !self.is_complete() {
467            let received = self.accepted_symbols_total;
468            let needed =
469                sum_required_symbols(plans, self.config.repair_overhead, self.config.min_overhead);
470            return Err(DecodingError::InsufficientSymbols { received, needed });
471        }
472
473        let mut output = Vec::with_capacity(self.object_size.unwrap_or(0) as usize);
474        for plan in plans {
475            let block = self
476                .blocks
477                .get(&plan.sbn)
478                .and_then(|b| b.decoded.as_ref())
479                .ok_or_else(|| DecodingError::InconsistentMetadata {
480                    sbn: plan.sbn,
481                    details: "missing decoded block".to_string(),
482                })?;
483            output.extend_from_slice(block);
484        }
485
486        if let Some(size) = self.object_size {
487            output.truncate(size as usize);
488        }
489
490        Ok(output)
491    }
492
493    fn configure_block_k(&mut self) {
494        let Some(plans) = &self.block_plans else {
495            return;
496        };
497        for plan in plans {
498            let k = u16::try_from(plan.k).unwrap_or(u16::MAX);
499            let _ = self.symbols.set_block_k(plan.sbn, k);
500        }
501    }
502
503    fn try_decode_block(&mut self, sbn: u8) -> Option<SymbolAcceptResult> {
504        let block_plan = self.block_plan(sbn)?;
505        let k = block_plan.k;
506        if k == 0 {
507            return None;
508        }
509
510        let symbols: Vec<Symbol> = self.symbols.symbols_for_block(sbn).cloned().collect();
511        if symbols.len() < k {
512            return None;
513        }
514
515        let decoded_symbols =
516            match decode_block(block_plan, &symbols, usize::from(self.config.symbol_size)) {
517                Ok(symbols) => symbols,
518                Err(DecodingError::InsufficientSymbols { .. }) => {
519                    return Some(SymbolAcceptResult::Rejected(RejectReason::InsufficientRank));
520                }
521                Err(DecodingError::MatrixInversionFailed { .. }) => {
522                    return Some(SymbolAcceptResult::Rejected(
523                        RejectReason::InconsistentEquations,
524                    ));
525                }
526                Err(DecodingError::InconsistentMetadata { .. }) => {
527                    let block = self.blocks.get_mut(&sbn);
528                    if let Some(block) = block {
529                        block.state = BlockDecodingState::Failed;
530                    }
531                    return Some(SymbolAcceptResult::Rejected(RejectReason::InvalidMetadata));
532                }
533                Err(DecodingError::SymbolSizeMismatch { .. }) => {
534                    let block = self.blocks.get_mut(&sbn);
535                    if let Some(block) = block {
536                        block.state = BlockDecodingState::Failed;
537                    }
538                    return Some(SymbolAcceptResult::Rejected(
539                        RejectReason::SymbolSizeMismatch,
540                    ));
541                }
542                Err(_err) => {
543                    let block = self.blocks.get_mut(&sbn);
544                    if let Some(block) = block {
545                        block.state = BlockDecodingState::Failed;
546                    }
547                    return Some(SymbolAcceptResult::Rejected(
548                        RejectReason::MemoryLimitReached,
549                    ));
550                }
551            };
552
553        let mut block_data = Vec::with_capacity(block_plan.len);
554        for symbol in &decoded_symbols {
555            block_data.extend_from_slice(symbol.data());
556        }
557        block_data.truncate(block_plan.len);
558
559        if let Some(block) = self.blocks.get_mut(&sbn) {
560            block.state = BlockDecodingState::Decoded;
561            block.decoded = Some(block_data.clone());
562        }
563
564        self.completed_blocks.insert(sbn);
565        self.symbols.clear_block(sbn);
566
567        Some(SymbolAcceptResult::BlockComplete {
568            block_sbn: sbn,
569            data: block_data,
570        })
571    }
572
573    fn block_plan(&self, sbn: u8) -> Option<&BlockPlan> {
574        self.block_plans
575            .as_ref()
576            .and_then(|plans| plans.iter().find(|plan| plan.sbn == sbn))
577    }
578}
579
580#[derive(Debug, Clone)]
581struct BlockPlan {
582    sbn: u8,
583    start: usize,
584    len: usize,
585    k: usize,
586}
587
588impl BlockPlan {
589    fn end(&self) -> usize {
590        self.start + self.len
591    }
592}
593
594fn plan_blocks(
595    object_size: usize,
596    symbol_size: usize,
597    max_block_size: usize,
598) -> Result<Vec<BlockPlan>, DecodingError> {
599    if object_size == 0 {
600        return Ok(Vec::new());
601    }
602
603    if symbol_size == 0 {
604        return Err(DecodingError::InconsistentMetadata {
605            sbn: 0,
606            details: "symbol_size must be > 0".to_string(),
607        });
608    }
609
610    let max_blocks = u8::MAX as usize + 1;
611    let max_total = max_block_size.saturating_mul(max_blocks);
612    if object_size > max_total {
613        return Err(DecodingError::InconsistentMetadata {
614            sbn: 0,
615            details: format!("object size {object_size} exceeds limit {max_total}"),
616        });
617    }
618
619    let mut blocks = Vec::new();
620    let mut offset = 0;
621    let mut sbn: u8 = 0;
622
623    while offset < object_size {
624        let len = usize::min(max_block_size, object_size - offset);
625        let k = len.div_ceil(symbol_size);
626        blocks.push(BlockPlan {
627            sbn,
628            start: offset,
629            len,
630            k,
631        });
632        offset += len;
633        sbn = sbn.wrapping_add(1);
634    }
635
636    Ok(blocks)
637}
638
639fn validate_object_params_layout(
640    params: ObjectParams,
641    plans: &[BlockPlan],
642) -> Result<(), DecodingError> {
643    let declared_blocks = usize::from(params.source_blocks);
644    let declared_k = usize::from(params.symbols_per_block);
645
646    if plans.is_empty() {
647        if declared_blocks == 0 && declared_k == 0 {
648            return Ok(());
649        }
650        if declared_blocks == 1 {
651            return Ok(());
652        }
653        return Err(DecodingError::InconsistentMetadata {
654            sbn: 0,
655            details: format!(
656                "object params layout mismatch: empty object expects either 0 blocks / 0 symbols-per-block or a single empty sentinel block, got {declared_blocks} block(s) with {declared_k} symbols/block"
657            ),
658        });
659    }
660
661    let expected_blocks = plans.len();
662    if declared_blocks != expected_blocks {
663        return Err(DecodingError::InconsistentMetadata {
664            sbn: 0,
665            details: format!(
666                "object params block count mismatch: expected {expected_blocks}, got {declared_blocks}"
667            ),
668        });
669    }
670
671    let expected_k = plans.iter().map(|plan| plan.k).max().unwrap_or(0);
672    if declared_k != expected_k {
673        return Err(DecodingError::InconsistentMetadata {
674            sbn: 0,
675            details: format!(
676                "object params symbols_per_block mismatch: expected {expected_k}, got {declared_k}"
677            ),
678        });
679    }
680
681    Ok(())
682}
683
684fn required_symbols(k: u16, overhead: f64, min_overhead: usize) -> usize {
685    if k == 0 {
686        return 0;
687    }
688    let raw = (f64::from(k) * overhead).ceil();
689    let minimum_threshold = usize::from(k).saturating_add(min_overhead);
690    if raw.is_nan() {
691        return minimum_threshold;
692    }
693    if raw.is_sign_positive() && !raw.is_finite() {
694        return usize::MAX;
695    }
696    if raw.is_sign_negative() {
697        return minimum_threshold;
698    }
699    #[allow(clippy::cast_sign_loss)]
700    let factor_threshold = raw as usize;
701    // `overhead` already encodes the total-symbol target; `min_overhead` is a
702    // floor on extra symbols beyond K, not an additional increment on top.
703    factor_threshold.max(minimum_threshold)
704}
705
706fn sum_required_symbols(plans: &[BlockPlan], overhead: f64, min_overhead: usize) -> usize {
707    plans.iter().fold(0usize, |acc, plan| {
708        acc.saturating_add(required_symbols(
709            u16::try_from(plan.k).unwrap_or(u16::MAX),
710            overhead,
711            min_overhead,
712        ))
713    })
714}
715
716#[allow(clippy::too_many_lines)]
717fn decode_block(
718    plan: &BlockPlan,
719    symbols: &[Symbol],
720    symbol_size: usize,
721) -> Result<Vec<Symbol>, DecodingError> {
722    let k = plan.k;
723    if symbols.len() < k {
724        return Err(DecodingError::InsufficientSymbols {
725            received: symbols.len(),
726            needed: k,
727        });
728    }
729
730    let object_id = symbols.first().map_or(ObjectId::NIL, Symbol::object_id);
731    let params = SystematicParams::for_source_block(k, symbol_size);
732    let block_seed = seed_for_block(object_id, plan.sbn);
733    let decoder = InactivationDecoder::new(k, symbol_size, block_seed);
734    let padding_rows = params.k_prime.saturating_sub(k);
735
736    // 1. Start with constraint symbols (LDPC + HDPC)
737    let mut received = decoder.constraint_symbols();
738    received.reserve(symbols.len() + padding_rows);
739
740    // 2. Add received symbols (Source + Repair)
741    for symbol in symbols {
742        match symbol.kind() {
743            SymbolKind::Source => {
744                let esi = symbol.esi() as usize;
745                if esi >= k {
746                    return Err(DecodingError::InconsistentMetadata {
747                        sbn: plan.sbn,
748                        details: format!("source esi {esi} >= k {k}"),
749                    });
750                }
751                // Systematic: source symbol i maps to intermediate symbol i (identity).
752                received.push(ReceivedSymbol {
753                    esi: symbol.esi(),
754                    is_source: true,
755                    columns: vec![esi],
756                    coefficients: vec![Gf256::ONE],
757                    data: symbol.data().to_vec(),
758                });
759            }
760            SymbolKind::Repair => {
761                let (columns, coefficients) = decoder.repair_equation(symbol.esi());
762                received.push(ReceivedSymbol {
763                    esi: symbol.esi(),
764                    is_source: false,
765                    columns,
766                    coefficients,
767                    data: symbol.data().to_vec(),
768                });
769            }
770        }
771    }
772
773    // 3. Add zero-padded rows for K..K'.
774    // These act as implicit source symbols with value 0.
775    for esi in k..params.k_prime {
776        received.push(ReceivedSymbol {
777            esi: esi as u32,
778            is_source: false,
779            columns: vec![esi],
780            coefficients: vec![Gf256::ONE],
781            data: vec![0u8; symbol_size],
782        });
783    }
784
785    let result = match decoder.decode(&received) {
786        Ok(result) => result,
787        Err(err) => {
788            let mapped = match err {
789                RaptorDecodeError::InsufficientSymbols { received, required } => {
790                    DecodingError::InsufficientSymbols {
791                        received,
792                        needed: required,
793                    }
794                }
795                RaptorDecodeError::SingularMatrix { row } => DecodingError::MatrixInversionFailed {
796                    reason: format!("singular matrix at row {row}"),
797                },
798                RaptorDecodeError::SymbolSizeMismatch { expected, actual } => {
799                    DecodingError::SymbolSizeMismatch {
800                        expected: u16::try_from(expected).unwrap_or(u16::MAX),
801                        actual,
802                    }
803                }
804                RaptorDecodeError::SymbolEquationArityMismatch {
805                    esi,
806                    columns,
807                    coefficients,
808                } => DecodingError::InconsistentMetadata {
809                    sbn: plan.sbn,
810                    details: format!(
811                        "symbol {esi} has mismatched equation vectors: columns={columns}, coefficients={coefficients}"
812                    ),
813                },
814                RaptorDecodeError::ColumnIndexOutOfRange {
815                    esi,
816                    column,
817                    max_valid,
818                } => DecodingError::InconsistentMetadata {
819                    sbn: plan.sbn,
820                    details: format!(
821                        "symbol {esi} references out-of-range column {column} (valid < {max_valid})"
822                    ),
823                },
824                RaptorDecodeError::CorruptDecodedOutput {
825                    esi,
826                    byte_index,
827                    expected,
828                    actual,
829                } => DecodingError::MatrixInversionFailed {
830                    reason: format!(
831                        "decoded output verification failed at symbol {esi}, byte {byte_index}: expected 0x{expected:02x}, actual 0x{actual:02x}"
832                    ),
833                },
834            };
835            return Err(mapped);
836        }
837    };
838
839    // 4. Construct decoded symbols from the source data returned by the decoder.
840    // InactivationDecoder::decode already extracts the first K intermediate symbols
841    // into `result.source`, which corresponds exactly to the systematic source data.
842    let mut decoded_symbols = Vec::with_capacity(k);
843    for (esi, data) in result.source.into_iter().enumerate() {
844        decoded_symbols.push(Symbol::new(
845            SymbolId::new(object_id, plan.sbn, esi as u32),
846            data,
847            SymbolKind::Source,
848        ));
849    }
850
851    Ok(decoded_symbols)
852}
853
854fn seed_for_block(object_id: ObjectId, sbn: u8) -> u64 {
855    seed_for(object_id, sbn, 0)
856}
857
858fn seed_for(object_id: ObjectId, sbn: u8, esi: u32) -> u64 {
859    let obj = object_id.as_u128();
860    let hi = (obj >> 64) as u64;
861    let lo = obj as u64;
862    let mut seed = hi ^ lo.rotate_left(13);
863    seed ^= u64::from(sbn) << 56;
864    seed ^= u64::from(esi);
865    if seed == 0 { 1 } else { seed }
866}
867
868#[cfg(test)]
869mod tests {
870    use super::*;
871    use crate::encoding::EncodingPipeline;
872    use crate::types::resource::{PoolConfig, SymbolPool};
873
874    fn init_test(name: &str) {
875        crate::test_utils::init_test_logging();
876        crate::test_phase!(name);
877    }
878
879    fn pool() -> SymbolPool {
880        SymbolPool::new(PoolConfig {
881            symbol_size: 256,
882            initial_size: 64,
883            max_size: 64,
884            allow_growth: false,
885            growth_increment: 0,
886        })
887    }
888
889    fn encoding_config() -> crate::config::EncodingConfig {
890        crate::config::EncodingConfig {
891            symbol_size: 256,
892            max_block_size: 1024,
893            repair_overhead: 1.05,
894            encoding_parallelism: 1,
895            decoding_parallelism: 1,
896        }
897    }
898
899    fn decoder_with_params(
900        config: &crate::config::EncodingConfig,
901        object_id: ObjectId,
902        data_len: usize,
903        repair_overhead: f64,
904        min_overhead: usize,
905    ) -> DecodingPipeline {
906        let mut decoder = DecodingPipeline::new(DecodingConfig {
907            symbol_size: config.symbol_size,
908            max_block_size: config.max_block_size,
909            repair_overhead,
910            min_overhead,
911            max_buffered_symbols: 0,
912            block_timeout: Duration::from_secs(30),
913            verify_auth: false,
914        });
915        let symbols_per_block = (data_len.div_ceil(usize::from(config.symbol_size))) as u16;
916        decoder
917            .set_object_params(ObjectParams::new(
918                object_id,
919                data_len as u64,
920                config.symbol_size,
921                1,
922                symbols_per_block,
923            ))
924            .expect("params");
925        decoder
926    }
927
928    #[test]
929    fn decode_roundtrip_sources_only() {
930        init_test("decode_roundtrip_sources_only");
931        let config = encoding_config();
932        let mut encoder = EncodingPipeline::new(config.clone(), pool());
933        let object_id = ObjectId::new_for_test(1);
934        let data = vec![42u8; 512];
935        let symbols: Vec<Symbol> = encoder
936            .encode_with_repair(object_id, &data, 0)
937            .map(|res| res.unwrap().into_symbol())
938            .collect();
939
940        let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.0, 0);
941
942        for symbol in symbols {
943            let auth = AuthenticatedSymbol::from_parts(
944                symbol,
945                crate::security::tag::AuthenticationTag::zero(),
946            );
947            let _ = decoder.feed(auth).unwrap();
948        }
949
950        let decoded_data = decoder.into_data().expect("decoded");
951        let ok = decoded_data == data;
952        crate::assert_with_log!(ok, "decoded data", data, decoded_data);
953        crate::test_complete!("decode_roundtrip_sources_only");
954    }
955
956    #[test]
957    fn decode_roundtrip_out_of_order() {
958        init_test("decode_roundtrip_out_of_order");
959        let config = encoding_config();
960        let mut encoder = EncodingPipeline::new(config.clone(), pool());
961        let object_id = ObjectId::new_for_test(2);
962        let data = vec![7u8; 768];
963        let mut symbols: Vec<Symbol> = encoder
964            .encode_with_repair(object_id, &data, 2)
965            .map(|res| res.expect("encode").into_symbol())
966            .collect();
967
968        symbols.reverse();
969
970        let mut decoder =
971            decoder_with_params(&config, object_id, data.len(), config.repair_overhead, 0);
972
973        for symbol in symbols {
974            let auth = AuthenticatedSymbol::from_parts(
975                symbol,
976                crate::security::tag::AuthenticationTag::zero(),
977            );
978            let _ = decoder.feed(auth).expect("feed");
979        }
980
981        let decoded_data = decoder.into_data().expect("decoded");
982        let ok = decoded_data == data;
983        crate::assert_with_log!(ok, "decoded data", data, decoded_data);
984        crate::test_complete!("decode_roundtrip_out_of_order");
985    }
986
987    #[test]
988    fn reject_wrong_object_id() {
989        init_test("reject_wrong_object_id");
990        let config = encoding_config();
991        let mut encoder = EncodingPipeline::new(config.clone(), pool());
992        let object_id_a = ObjectId::new_for_test(10);
993        let object_id_b = ObjectId::new_for_test(11);
994        let data = vec![1u8; 128];
995
996        let mut decoder =
997            decoder_with_params(&config, object_id_a, data.len(), config.repair_overhead, 0);
998
999        let symbol_b = encoder
1000            .encode_with_repair(object_id_b, &data, 0)
1001            .next()
1002            .expect("symbol")
1003            .expect("encode")
1004            .into_symbol();
1005        let auth = AuthenticatedSymbol::from_parts(
1006            symbol_b,
1007            crate::security::tag::AuthenticationTag::zero(),
1008        );
1009
1010        let result = decoder.feed(auth).expect("feed");
1011        let expected = SymbolAcceptResult::Rejected(RejectReason::WrongObjectId);
1012        let ok = result == expected;
1013        crate::assert_with_log!(ok, "wrong object id", expected, result);
1014        crate::test_complete!("reject_wrong_object_id");
1015    }
1016
1017    #[test]
1018    fn reject_symbol_size_mismatch() {
1019        init_test("reject_symbol_size_mismatch");
1020        let config = encoding_config();
1021        let mut decoder = DecodingPipeline::new(DecodingConfig {
1022            symbol_size: config.symbol_size,
1023            max_block_size: config.max_block_size,
1024            repair_overhead: config.repair_overhead,
1025            min_overhead: 0,
1026            max_buffered_symbols: 0,
1027            block_timeout: Duration::from_secs(30),
1028            verify_auth: false,
1029        });
1030
1031        let symbol = Symbol::new(
1032            SymbolId::new(ObjectId::new_for_test(20), 0, 0),
1033            vec![0u8; 8],
1034            SymbolKind::Source,
1035        );
1036        let auth = AuthenticatedSymbol::from_parts(
1037            symbol,
1038            crate::security::tag::AuthenticationTag::zero(),
1039        );
1040        let result = decoder.feed(auth).expect("feed");
1041        let expected = SymbolAcceptResult::Rejected(RejectReason::SymbolSizeMismatch);
1042        let ok = result == expected;
1043        crate::assert_with_log!(ok, "symbol size mismatch", expected, result);
1044        crate::test_complete!("reject_symbol_size_mismatch");
1045    }
1046
1047    #[test]
1048    fn reject_invalid_metadata_esi_out_of_range() {
1049        init_test("reject_invalid_metadata_esi_out_of_range");
1050        let mut decoder = DecodingPipeline::new(DecodingConfig {
1051            symbol_size: 8,
1052            max_block_size: 8,
1053            repair_overhead: 1.0,
1054            min_overhead: 0,
1055            max_buffered_symbols: 0,
1056            block_timeout: Duration::from_secs(30),
1057            verify_auth: false,
1058        });
1059        let object_id = ObjectId::new_for_test(21);
1060        decoder
1061            .set_object_params(ObjectParams::new(object_id, 8, 8, 1, 1))
1062            .expect("params");
1063
1064        let symbol = Symbol::new(
1065            SymbolId::new(object_id, 0, 1),
1066            vec![0u8; 8],
1067            SymbolKind::Source,
1068        );
1069        let auth = AuthenticatedSymbol::from_parts(
1070            symbol,
1071            crate::security::tag::AuthenticationTag::zero(),
1072        );
1073
1074        let result = decoder.feed(auth).expect("feed");
1075        let expected = SymbolAcceptResult::Rejected(RejectReason::InvalidMetadata);
1076        let ok = result == expected;
1077        crate::assert_with_log!(ok, "invalid metadata", expected, result);
1078        crate::test_complete!("reject_invalid_metadata_esi_out_of_range");
1079    }
1080
1081    #[test]
1082    fn duplicate_symbol_before_decode() {
1083        init_test("duplicate_symbol_before_decode");
1084        let config = encoding_config();
1085        let mut encoder = EncodingPipeline::new(config.clone(), pool());
1086        let object_id = ObjectId::new_for_test(30);
1087        // Ensure K > 1 so the first symbol cannot complete the block decode.
1088        let data = vec![9u8; 512];
1089
1090        let symbol = encoder
1091            .encode_with_repair(object_id, &data, 0)
1092            .next()
1093            .expect("symbol")
1094            .expect("encode")
1095            .into_symbol();
1096
1097        let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.5, 1);
1098
1099        let first = decoder
1100            .feed(AuthenticatedSymbol::from_parts(
1101                symbol.clone(),
1102                crate::security::tag::AuthenticationTag::zero(),
1103            ))
1104            .expect("feed");
1105        let accepted = matches!(
1106            first,
1107            SymbolAcceptResult::Accepted { .. } | SymbolAcceptResult::DecodingStarted { .. }
1108        );
1109        crate::assert_with_log!(accepted, "first accepted", true, accepted);
1110
1111        let second = decoder
1112            .feed(AuthenticatedSymbol::from_parts(
1113                symbol,
1114                crate::security::tag::AuthenticationTag::zero(),
1115            ))
1116            .expect("feed");
1117        let expected = SymbolAcceptResult::Duplicate;
1118        let ok = second == expected;
1119        crate::assert_with_log!(ok, "second duplicate", expected, second);
1120        crate::test_complete!("duplicate_symbol_before_decode");
1121    }
1122
1123    #[test]
1124    fn into_data_reports_insufficient_symbols() {
1125        init_test("into_data_reports_insufficient_symbols");
1126        let config = encoding_config();
1127        let mut encoder = EncodingPipeline::new(config.clone(), pool());
1128        let object_id = ObjectId::new_for_test(40);
1129        let data = vec![5u8; 512];
1130
1131        let mut decoder =
1132            decoder_with_params(&config, object_id, data.len(), config.repair_overhead, 0);
1133
1134        let symbol = encoder
1135            .encode_with_repair(object_id, &data, 0)
1136            .next()
1137            .expect("symbol")
1138            .expect("encode")
1139            .into_symbol();
1140        let auth = AuthenticatedSymbol::from_parts(
1141            symbol,
1142            crate::security::tag::AuthenticationTag::zero(),
1143        );
1144        let _ = decoder.feed(auth).expect("feed");
1145
1146        let err = decoder
1147            .into_data()
1148            .expect_err("expected insufficient symbols");
1149        let insufficient = matches!(err, DecodingError::InsufficientSymbols { .. });
1150        crate::assert_with_log!(insufficient, "insufficient symbols", true, insufficient);
1151        crate::test_complete!("into_data_reports_insufficient_symbols");
1152    }
1153
1154    // ---- DecodingError Display ----
1155
1156    #[test]
1157    fn decoding_error_display_authentication_failed() {
1158        let err = DecodingError::AuthenticationFailed {
1159            symbol_id: SymbolId::new(ObjectId::new_for_test(1), 0, 0),
1160        };
1161        let msg = err.to_string();
1162        assert!(msg.contains("authentication failed"), "{msg}");
1163    }
1164
1165    #[test]
1166    fn decoding_error_display_insufficient_symbols() {
1167        let err = DecodingError::InsufficientSymbols {
1168            received: 3,
1169            needed: 10,
1170        };
1171        assert_eq!(err.to_string(), "insufficient symbols: have 3, need 10");
1172    }
1173
1174    #[test]
1175    fn decoding_error_display_matrix_inversion() {
1176        let err = DecodingError::MatrixInversionFailed {
1177            reason: "rank deficient".into(),
1178        };
1179        assert_eq!(err.to_string(), "matrix inversion failed: rank deficient");
1180    }
1181
1182    #[test]
1183    fn decoding_error_display_block_timeout() {
1184        let err = DecodingError::BlockTimeout {
1185            sbn: 2,
1186            elapsed: Duration::from_millis(1500),
1187        };
1188        let msg = err.to_string();
1189        assert!(msg.contains("block timeout"), "{msg}");
1190        assert!(msg.contains("1.5"), "{msg}");
1191    }
1192
1193    #[test]
1194    fn decoding_error_display_inconsistent_metadata() {
1195        let err = DecodingError::InconsistentMetadata {
1196            sbn: 0,
1197            details: "mismatch".into(),
1198        };
1199        let msg = err.to_string();
1200        assert!(msg.contains("inconsistent block metadata"), "{msg}");
1201        assert!(msg.contains("mismatch"), "{msg}");
1202    }
1203
1204    #[test]
1205    fn decoding_error_display_symbol_size_mismatch() {
1206        let err = DecodingError::SymbolSizeMismatch {
1207            expected: 256,
1208            actual: 128,
1209        };
1210        assert_eq!(
1211            err.to_string(),
1212            "symbol size mismatch: expected 256, got 128"
1213        );
1214    }
1215
1216    // ---- DecodingError -> Error conversion ----
1217
1218    #[test]
1219    fn decoding_error_into_error_auth() {
1220        let err = DecodingError::AuthenticationFailed {
1221            symbol_id: SymbolId::new(ObjectId::new_for_test(1), 0, 0),
1222        };
1223        let error: crate::error::Error = err.into();
1224        assert_eq!(error.kind(), crate::error::ErrorKind::CorruptedSymbol);
1225    }
1226
1227    #[test]
1228    fn decoding_error_into_error_insufficient() {
1229        let err = DecodingError::InsufficientSymbols {
1230            received: 1,
1231            needed: 5,
1232        };
1233        let error: crate::error::Error = err.into();
1234        assert_eq!(error.kind(), crate::error::ErrorKind::InsufficientSymbols);
1235    }
1236
1237    #[test]
1238    fn decoding_error_into_error_matrix() {
1239        let err = DecodingError::MatrixInversionFailed {
1240            reason: "singular".into(),
1241        };
1242        let error: crate::error::Error = err.into();
1243        assert_eq!(error.kind(), crate::error::ErrorKind::DecodingFailed);
1244    }
1245
1246    #[test]
1247    fn decoding_error_into_error_timeout() {
1248        let err = DecodingError::BlockTimeout {
1249            sbn: 0,
1250            elapsed: Duration::from_secs(30),
1251        };
1252        let error: crate::error::Error = err.into();
1253        assert_eq!(error.kind(), crate::error::ErrorKind::ThresholdTimeout);
1254    }
1255
1256    #[test]
1257    fn decoding_error_into_error_inconsistent() {
1258        let err = DecodingError::InconsistentMetadata {
1259            sbn: 1,
1260            details: "x".into(),
1261        };
1262        let error: crate::error::Error = err.into();
1263        assert_eq!(error.kind(), crate::error::ErrorKind::DecodingFailed);
1264    }
1265
1266    #[test]
1267    fn decoding_error_into_error_size_mismatch() {
1268        let err = DecodingError::SymbolSizeMismatch {
1269            expected: 256,
1270            actual: 64,
1271        };
1272        let error: crate::error::Error = err.into();
1273        assert_eq!(error.kind(), crate::error::ErrorKind::DecodingFailed);
1274    }
1275
1276    // ---- RejectReason ----
1277
1278    #[test]
1279    fn reject_reason_variants_are_eq() {
1280        assert_eq!(RejectReason::WrongObjectId, RejectReason::WrongObjectId);
1281        assert_ne!(
1282            RejectReason::AuthenticationFailed,
1283            RejectReason::SymbolSizeMismatch
1284        );
1285    }
1286
1287    #[test]
1288    fn reject_reason_debug() {
1289        let dbg = format!("{:?}", RejectReason::BlockAlreadyDecoded);
1290        assert_eq!(dbg, "BlockAlreadyDecoded");
1291    }
1292
1293    // ---- SymbolAcceptResult ----
1294
1295    #[test]
1296    fn symbol_accept_result_accepted_eq() {
1297        let a = SymbolAcceptResult::Accepted {
1298            received: 3,
1299            needed: 5,
1300        };
1301        let b = SymbolAcceptResult::Accepted {
1302            received: 3,
1303            needed: 5,
1304        };
1305        assert_eq!(a, b);
1306    }
1307
1308    #[test]
1309    fn symbol_accept_result_duplicate_eq() {
1310        assert_eq!(SymbolAcceptResult::Duplicate, SymbolAcceptResult::Duplicate);
1311    }
1312
1313    #[test]
1314    fn symbol_accept_result_rejected_eq() {
1315        let a = SymbolAcceptResult::Rejected(RejectReason::MemoryLimitReached);
1316        let b = SymbolAcceptResult::Rejected(RejectReason::MemoryLimitReached);
1317        assert_eq!(a, b);
1318    }
1319
1320    #[test]
1321    fn symbol_accept_result_variants_ne() {
1322        assert_ne!(
1323            SymbolAcceptResult::Duplicate,
1324            SymbolAcceptResult::Rejected(RejectReason::WrongObjectId)
1325        );
1326    }
1327
1328    // ---- DecodingConfig default ----
1329
1330    #[test]
1331    fn decoding_config_default_values() {
1332        let cfg = DecodingConfig::default();
1333        assert_eq!(cfg.symbol_size, 256);
1334        assert_eq!(cfg.max_block_size, 1024 * 1024);
1335        assert!((cfg.repair_overhead - 1.05).abs() < f64::EPSILON);
1336        assert_eq!(cfg.min_overhead, 0);
1337        assert_eq!(cfg.max_buffered_symbols, 0);
1338        assert_eq!(cfg.block_timeout, Duration::from_secs(30));
1339        assert!(!cfg.verify_auth);
1340    }
1341
1342    #[test]
1343    fn required_symbols_uses_total_factor_and_minimum_extra_floor() {
1344        assert_eq!(required_symbols(0, 1.05, 3), 0);
1345        assert_eq!(required_symbols(10, 1.05, 3), 13);
1346        assert_eq!(required_symbols(10, 1.5, 1), 15);
1347        assert_eq!(required_symbols(10, 0.5, 0), 10);
1348        assert_eq!(required_symbols(10, f64::NAN, 3), 13);
1349        assert_eq!(required_symbols(10, f64::INFINITY, 3), usize::MAX);
1350    }
1351
1352    // ---- BlockStateKind ----
1353
1354    #[test]
1355    fn block_state_kind_eq_and_debug() {
1356        assert_eq!(BlockStateKind::Collecting, BlockStateKind::Collecting);
1357        assert_ne!(BlockStateKind::Collecting, BlockStateKind::Decoded);
1358        assert_eq!(format!("{:?}", BlockStateKind::Failed), "Failed");
1359        assert_eq!(format!("{:?}", BlockStateKind::Decoding), "Decoding");
1360    }
1361
1362    // ---- DecodingPipeline construction ----
1363
1364    #[test]
1365    fn pipeline_new_starts_empty() {
1366        let pipeline = DecodingPipeline::new(DecodingConfig::default());
1367        let progress = pipeline.progress();
1368        assert_eq!(progress.blocks_complete, 0);
1369        assert_eq!(progress.symbols_received, 0);
1370    }
1371
1372    #[test]
1373    fn pipeline_set_object_params_rejects_mismatched_symbol_size() {
1374        let mut pipeline = DecodingPipeline::new(DecodingConfig {
1375            symbol_size: 256,
1376            ..DecodingConfig::default()
1377        });
1378        let params = ObjectParams::new(ObjectId::new_for_test(1), 1024, 128, 1, 8);
1379        let err = pipeline.set_object_params(params).unwrap_err();
1380        assert!(matches!(err, DecodingError::SymbolSizeMismatch { .. }));
1381    }
1382
1383    #[test]
1384    fn pipeline_set_object_params_rejects_inconsistent_object_id() {
1385        let config = encoding_config();
1386        let oid1 = ObjectId::new_for_test(1);
1387        let oid2 = ObjectId::new_for_test(2);
1388
1389        let mut pipeline = DecodingPipeline::new(DecodingConfig {
1390            symbol_size: config.symbol_size,
1391            ..DecodingConfig::default()
1392        });
1393        pipeline
1394            .set_object_params(ObjectParams::new(oid1, 512, config.symbol_size, 1, 2))
1395            .expect("first set_object_params");
1396        let err = pipeline
1397            .set_object_params(ObjectParams::new(oid2, 512, config.symbol_size, 1, 2))
1398            .unwrap_err();
1399        assert!(matches!(err, DecodingError::InconsistentMetadata { .. }));
1400    }
1401
1402    #[test]
1403    fn pipeline_set_object_params_same_id_is_ok() {
1404        let config = encoding_config();
1405        let oid = ObjectId::new_for_test(1);
1406
1407        let mut pipeline = DecodingPipeline::new(DecodingConfig {
1408            symbol_size: config.symbol_size,
1409            ..DecodingConfig::default()
1410        });
1411        pipeline
1412            .set_object_params(ObjectParams::new(oid, 512, config.symbol_size, 1, 2))
1413            .expect("first");
1414        pipeline
1415            .set_object_params(ObjectParams::new(oid, 512, config.symbol_size, 1, 2))
1416            .expect("second with same id should succeed");
1417    }
1418
1419    #[test]
1420    fn pipeline_set_object_params_rejects_declared_block_count_drift() {
1421        let config = encoding_config();
1422        let object_id = ObjectId::new_for_test(104);
1423
1424        let mut pipeline = DecodingPipeline::new(DecodingConfig {
1425            symbol_size: config.symbol_size,
1426            max_block_size: config.max_block_size,
1427            ..DecodingConfig::default()
1428        });
1429        let err = pipeline
1430            .set_object_params(ObjectParams::new(object_id, 1536, config.symbol_size, 1, 4))
1431            .unwrap_err();
1432        assert!(matches!(err, DecodingError::InconsistentMetadata { .. }));
1433        assert!(
1434            err.to_string().contains("block count mismatch"),
1435            "unexpected error: {err}"
1436        );
1437    }
1438
1439    #[test]
1440    fn pipeline_set_object_params_rejects_total_k_metadata_for_multi_block_object() {
1441        let config = encoding_config();
1442        let object_id = ObjectId::new_for_test(105);
1443
1444        let mut pipeline = DecodingPipeline::new(DecodingConfig {
1445            symbol_size: config.symbol_size,
1446            max_block_size: config.max_block_size,
1447            ..DecodingConfig::default()
1448        });
1449        let err = pipeline
1450            .set_object_params(ObjectParams::new(object_id, 2048, config.symbol_size, 2, 8))
1451            .unwrap_err();
1452        assert!(matches!(err, DecodingError::InconsistentMetadata { .. }));
1453        assert!(
1454            err.to_string().contains("symbols_per_block mismatch"),
1455            "unexpected error: {err}"
1456        );
1457    }
1458
1459    #[test]
1460    fn pipeline_set_object_params_failure_does_not_latch_object_identity() {
1461        let config = encoding_config();
1462        let invalid_object_id = ObjectId::new_for_test(106);
1463        let valid_object_id = ObjectId::new_for_test(107);
1464
1465        let mut pipeline = DecodingPipeline::new(DecodingConfig {
1466            symbol_size: config.symbol_size,
1467            max_block_size: config.max_block_size,
1468            ..DecodingConfig::default()
1469        });
1470        let err = pipeline
1471            .set_object_params(ObjectParams::new(
1472                invalid_object_id,
1473                2048,
1474                config.symbol_size,
1475                2,
1476                8,
1477            ))
1478            .unwrap_err();
1479        assert!(matches!(err, DecodingError::InconsistentMetadata { .. }));
1480
1481        pipeline
1482            .set_object_params(ObjectParams::new(
1483                valid_object_id,
1484                512,
1485                config.symbol_size,
1486                1,
1487                2,
1488            ))
1489            .expect("failed set_object_params must not poison object identity");
1490    }
1491
1492    #[test]
1493    fn pipeline_set_object_params_accepts_empty_object_single_block_sentinel_metadata() {
1494        let config = encoding_config();
1495        let object_id = ObjectId::new_for_test(108);
1496
1497        let mut pipeline = DecodingPipeline::new(DecodingConfig {
1498            symbol_size: config.symbol_size,
1499            max_block_size: config.max_block_size,
1500            ..DecodingConfig::default()
1501        });
1502        pipeline
1503            .set_object_params(ObjectParams::new(
1504                object_id,
1505                0,
1506                config.symbol_size,
1507                1,
1508                config
1509                    .max_block_size
1510                    .div_ceil(usize::from(config.symbol_size))
1511                    .try_into()
1512                    .expect("sentinel block K should fit in u16"),
1513            ))
1514            .expect("empty object sentinel metadata should be accepted");
1515
1516        assert!(pipeline.is_complete());
1517        assert_eq!(pipeline.progress().blocks_total, Some(0));
1518        assert_eq!(
1519            pipeline.into_data().expect("empty object should decode"),
1520            Vec::<u8>::new()
1521        );
1522    }
1523
1524    #[test]
1525    fn pipeline_set_object_params_accepts_full_256_block_boundary() {
1526        let config = crate::config::EncodingConfig {
1527            symbol_size: 8,
1528            max_block_size: 8,
1529            ..encoding_config()
1530        };
1531        let object_id = ObjectId::new_for_test(109);
1532
1533        let mut pipeline = DecodingPipeline::new(DecodingConfig {
1534            symbol_size: config.symbol_size,
1535            max_block_size: config.max_block_size,
1536            ..DecodingConfig::default()
1537        });
1538        pipeline
1539            .set_object_params(ObjectParams::new(
1540                object_id,
1541                u64::try_from(config.max_block_size * 256).expect("boundary object size fits u64"),
1542                config.symbol_size,
1543                256,
1544                1,
1545            ))
1546            .expect("256-block metadata boundary should be representable");
1547
1548        assert_eq!(pipeline.progress().blocks_total, Some(256));
1549    }
1550
1551    // ---- Gap tests ----
1552
1553    #[test]
1554    fn feed_batch_returns_results_per_symbol() {
1555        init_test("feed_batch_returns_results_per_symbol");
1556        let config = encoding_config();
1557        let mut encoder = EncodingPipeline::new(config.clone(), pool());
1558        let object_id = ObjectId::new_for_test(100);
1559        let data = vec![0xAAu8; 768]; // 3 source symbols at 256 bytes each
1560
1561        let symbols: Vec<AuthenticatedSymbol> = encoder
1562            .encode_with_repair(object_id, &data, 0)
1563            .map(|res| {
1564                AuthenticatedSymbol::from_parts(
1565                    res.unwrap().into_symbol(),
1566                    crate::security::tag::AuthenticationTag::zero(),
1567                )
1568            })
1569            .take(3)
1570            .collect();
1571
1572        let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.5, 1);
1573
1574        let results = decoder.feed_batch(symbols.into_iter());
1575        let len = results.len();
1576        let expected_len = 3usize;
1577        crate::assert_with_log!(len == expected_len, "batch length", expected_len, len);
1578        for (i, r) in results.iter().enumerate() {
1579            let is_ok = r.is_ok();
1580            crate::assert_with_log!(is_ok, &format!("result[{i}] is Ok"), true, is_ok);
1581        }
1582        crate::test_complete!("feed_batch_returns_results_per_symbol");
1583    }
1584
1585    #[test]
1586    fn is_complete_false_without_params() {
1587        init_test("is_complete_false_without_params");
1588        let pipeline = DecodingPipeline::new(DecodingConfig::default());
1589        let complete = pipeline.is_complete();
1590        crate::assert_with_log!(!complete, "is_complete without params", false, complete);
1591        crate::test_complete!("is_complete_false_without_params");
1592    }
1593
1594    #[test]
1595    fn is_complete_true_after_all_blocks_decoded() {
1596        init_test("is_complete_true_after_all_blocks_decoded");
1597        let config = encoding_config();
1598        let mut encoder = EncodingPipeline::new(config.clone(), pool());
1599        let object_id = ObjectId::new_for_test(101);
1600        let data = vec![42u8; 512];
1601        let symbols: Vec<Symbol> = encoder
1602            .encode_with_repair(object_id, &data, 0)
1603            .map(|res| res.unwrap().into_symbol())
1604            .collect();
1605
1606        let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.0, 0);
1607
1608        for symbol in symbols {
1609            let auth = AuthenticatedSymbol::from_parts(
1610                symbol,
1611                crate::security::tag::AuthenticationTag::zero(),
1612            );
1613            let _ = decoder.feed(auth).unwrap();
1614        }
1615
1616        let complete = decoder.is_complete();
1617        crate::assert_with_log!(complete, "is_complete after all blocks", true, complete);
1618        crate::test_complete!("is_complete_true_after_all_blocks_decoded");
1619    }
1620
1621    #[test]
1622    fn progress_reports_blocks_total_after_params() {
1623        init_test("progress_reports_blocks_total_after_params");
1624        let config = encoding_config();
1625        let object_id = ObjectId::new_for_test(102);
1626
1627        let mut pipeline = DecodingPipeline::new(DecodingConfig {
1628            symbol_size: config.symbol_size,
1629            max_block_size: 1024,
1630            ..DecodingConfig::default()
1631        });
1632        // data_len=512 < max_block_size=1024 => 1 block
1633        let k = (512usize).div_ceil(usize::from(config.symbol_size)) as u16;
1634        pipeline
1635            .set_object_params(ObjectParams::new(object_id, 512, config.symbol_size, 1, k))
1636            .expect("set params");
1637
1638        let progress = pipeline.progress();
1639        let blocks_total = progress.blocks_total;
1640        let expected_blocks = Some(1usize);
1641        crate::assert_with_log!(
1642            blocks_total == expected_blocks,
1643            "blocks_total",
1644            expected_blocks,
1645            blocks_total
1646        );
1647        let estimate = progress.symbols_needed_estimate;
1648        let positive = estimate > 0;
1649        crate::assert_with_log!(positive, "symbols_needed_estimate > 0", true, positive);
1650        crate::test_complete!("progress_reports_blocks_total_after_params");
1651    }
1652
1653    #[test]
1654    fn progress_symbols_needed_estimate_does_not_double_count_min_overhead() {
1655        init_test("progress_symbols_needed_estimate_does_not_double_count_min_overhead");
1656        let object_id = ObjectId::new_for_test(1020);
1657        let symbol_size = 256u16;
1658        let k = 10u16;
1659        let data_len = usize::from(symbol_size) * usize::from(k);
1660
1661        let mut pipeline = DecodingPipeline::new(DecodingConfig {
1662            symbol_size,
1663            max_block_size: 4096,
1664            repair_overhead: 1.05,
1665            min_overhead: 3,
1666            max_buffered_symbols: 0,
1667            block_timeout: Duration::from_secs(30),
1668            verify_auth: false,
1669        });
1670        pipeline
1671            .set_object_params(ObjectParams::new(
1672                object_id,
1673                data_len as u64,
1674                symbol_size,
1675                1,
1676                k,
1677            ))
1678            .expect("set params");
1679
1680        let progress = pipeline.progress();
1681        assert_eq!(progress.blocks_total, Some(1));
1682        assert_eq!(progress.symbols_needed_estimate, 13);
1683        crate::test_complete!(
1684            "progress_symbols_needed_estimate_does_not_double_count_min_overhead"
1685        );
1686    }
1687
1688    #[test]
1689    fn progress_symbols_needed_estimate_saturates_for_infinite_overhead() {
1690        init_test("progress_symbols_needed_estimate_saturates_for_infinite_overhead");
1691        let object_id = ObjectId::new_for_test(1021);
1692        let symbol_size = 256u16;
1693        let data_len = 2048usize;
1694
1695        let mut pipeline = DecodingPipeline::new(DecodingConfig {
1696            symbol_size,
1697            max_block_size: 1024,
1698            repair_overhead: f64::INFINITY,
1699            min_overhead: 0,
1700            max_buffered_symbols: 0,
1701            block_timeout: Duration::from_secs(30),
1702            verify_auth: false,
1703        });
1704        pipeline
1705            .set_object_params(ObjectParams::new(
1706                object_id,
1707                data_len as u64,
1708                symbol_size,
1709                2,
1710                4,
1711            ))
1712            .expect("set params");
1713
1714        let progress = pipeline.progress();
1715        assert_eq!(progress.blocks_total, Some(2));
1716        assert_eq!(progress.symbols_needed_estimate, usize::MAX);
1717        crate::test_complete!("progress_symbols_needed_estimate_saturates_for_infinite_overhead");
1718    }
1719
1720    #[test]
1721    fn block_status_none_for_unknown_block() {
1722        init_test("block_status_none_for_unknown_block");
1723        let config = encoding_config();
1724        let object_id = ObjectId::new_for_test(103);
1725
1726        let mut pipeline = DecodingPipeline::new(DecodingConfig {
1727            symbol_size: config.symbol_size,
1728            max_block_size: config.max_block_size,
1729            ..DecodingConfig::default()
1730        });
1731        let k = (512usize).div_ceil(usize::from(config.symbol_size)) as u16;
1732        pipeline
1733            .set_object_params(ObjectParams::new(object_id, 512, config.symbol_size, 1, k))
1734            .expect("set params");
1735
1736        let status = pipeline.block_status(99);
1737        let is_none = status.is_none();
1738        crate::assert_with_log!(is_none, "block_status(99) is None", true, is_none);
1739        crate::test_complete!("block_status_none_for_unknown_block");
1740    }
1741
1742    #[test]
1743    fn block_status_collecting_after_partial_feed() {
1744        init_test("block_status_collecting_after_partial_feed");
1745        let config = encoding_config();
1746        let mut encoder = EncodingPipeline::new(config.clone(), pool());
1747        let object_id = ObjectId::new_for_test(104);
1748        let data = vec![0xBBu8; 512];
1749
1750        let first_symbol = encoder
1751            .encode_with_repair(object_id, &data, 0)
1752            .next()
1753            .expect("symbol")
1754            .expect("encode")
1755            .into_symbol();
1756
1757        // Use high overhead so 1 symbol doesn't trigger decode
1758        let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.5, 1);
1759
1760        let auth = AuthenticatedSymbol::from_parts(
1761            first_symbol,
1762            crate::security::tag::AuthenticationTag::zero(),
1763        );
1764        let _ = decoder.feed(auth).expect("feed");
1765
1766        let status = decoder.block_status(0);
1767        let is_some = status.is_some();
1768        crate::assert_with_log!(is_some, "block_status(0) is Some", true, is_some);
1769
1770        let status = status.unwrap();
1771        let state = status.state;
1772        let expected_state = BlockStateKind::Collecting;
1773        crate::assert_with_log!(
1774            state == expected_state,
1775            "state is Collecting",
1776            expected_state,
1777            state
1778        );
1779        let received = status.symbols_received;
1780        let expected_received = 1usize;
1781        crate::assert_with_log!(
1782            received == expected_received,
1783            "symbols_received",
1784            expected_received,
1785            received
1786        );
1787        crate::test_complete!("block_status_collecting_after_partial_feed");
1788    }
1789
1790    #[test]
1791    fn block_status_decoded_after_complete() {
1792        init_test("block_status_decoded_after_complete");
1793        let config = encoding_config();
1794        let mut encoder = EncodingPipeline::new(config.clone(), pool());
1795        let object_id = ObjectId::new_for_test(105);
1796        let data = vec![42u8; 512];
1797        let symbols: Vec<Symbol> = encoder
1798            .encode_with_repair(object_id, &data, 0)
1799            .map(|res| res.unwrap().into_symbol())
1800            .collect();
1801
1802        let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.0, 0);
1803
1804        for symbol in symbols {
1805            let auth = AuthenticatedSymbol::from_parts(
1806                symbol,
1807                crate::security::tag::AuthenticationTag::zero(),
1808            );
1809            let _ = decoder.feed(auth).unwrap();
1810        }
1811
1812        // Block 0 should now be decoded; symbols are cleared but block state persists.
1813        // After decode, symbols are cleared so block_progress returns None.
1814        // The completed_blocks set tracks completion separately.
1815        let _status = decoder.block_status(0);
1816        let complete = decoder.is_complete();
1817        crate::assert_with_log!(complete, "is_complete", true, complete);
1818
1819        // Verify via completed_blocks indirectly: feeding another sbn=0 symbol
1820        // should give BlockAlreadyDecoded
1821        let extra = Symbol::new(
1822            SymbolId::new(object_id, 0, 99),
1823            vec![0u8; usize::from(config.symbol_size)],
1824            SymbolKind::Source,
1825        );
1826        let auth =
1827            AuthenticatedSymbol::from_parts(extra, crate::security::tag::AuthenticationTag::zero());
1828        let result = decoder.feed(auth).expect("feed");
1829        let expected = SymbolAcceptResult::Rejected(RejectReason::BlockAlreadyDecoded);
1830        let ok = result == expected;
1831        crate::assert_with_log!(ok, "block already decoded", expected, result);
1832        crate::test_complete!("block_status_decoded_after_complete");
1833    }
1834
1835    #[test]
1836    fn block_already_decoded_reject() {
1837        init_test("block_already_decoded_reject");
1838        let config = encoding_config();
1839        let mut encoder = EncodingPipeline::new(config.clone(), pool());
1840        let object_id = ObjectId::new_for_test(106);
1841        let data = vec![42u8; 512];
1842        let symbols: Vec<Symbol> = encoder
1843            .encode_with_repair(object_id, &data, 0)
1844            .map(|res| res.unwrap().into_symbol())
1845            .collect();
1846
1847        let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.0, 0);
1848
1849        for symbol in symbols {
1850            let auth = AuthenticatedSymbol::from_parts(
1851                symbol,
1852                crate::security::tag::AuthenticationTag::zero(),
1853            );
1854            let _ = decoder.feed(auth).unwrap();
1855        }
1856
1857        // Feed one more symbol for sbn=0
1858        let extra = Symbol::new(
1859            SymbolId::new(object_id, 0, 0),
1860            vec![0u8; usize::from(config.symbol_size)],
1861            SymbolKind::Source,
1862        );
1863        let auth =
1864            AuthenticatedSymbol::from_parts(extra, crate::security::tag::AuthenticationTag::zero());
1865        let result = decoder.feed(auth).expect("feed");
1866        let expected = SymbolAcceptResult::Rejected(RejectReason::BlockAlreadyDecoded);
1867        let ok = result == expected;
1868        crate::assert_with_log!(ok, "block already decoded reject", expected, result);
1869        crate::test_complete!("block_already_decoded_reject");
1870    }
1871
1872    #[test]
1873    fn verify_auth_no_context_unverified_symbol_errors() {
1874        init_test("verify_auth_no_context_unverified_symbol_errors");
1875        let config = encoding_config();
1876        let mut decoder = DecodingPipeline::new(DecodingConfig {
1877            symbol_size: config.symbol_size,
1878            max_block_size: config.max_block_size,
1879            verify_auth: true,
1880            ..DecodingConfig::default()
1881        });
1882
1883        let symbol = Symbol::new(
1884            SymbolId::new(ObjectId::new_for_test(107), 0, 0),
1885            vec![0u8; usize::from(config.symbol_size)],
1886            SymbolKind::Source,
1887        );
1888        // from_parts creates an unverified symbol
1889        let auth = AuthenticatedSymbol::from_parts(
1890            symbol,
1891            crate::security::tag::AuthenticationTag::zero(),
1892        );
1893
1894        let result = decoder.feed(auth);
1895        let is_ok = result.is_ok();
1896        crate::assert_with_log!(
1897            is_ok,
1898            "unverified with no context is rejected safely",
1899            true,
1900            is_ok
1901        );
1902
1903        let accept = result.unwrap();
1904        let expected = SymbolAcceptResult::Rejected(RejectReason::AuthenticationFailed);
1905        crate::assert_with_log!(
1906            accept == expected,
1907            "rejected as auth failed",
1908            expected,
1909            accept
1910        );
1911        crate::test_complete!("verify_auth_no_context_unverified_symbol_errors");
1912    }
1913
1914    #[test]
1915    fn verify_auth_no_context_preverified_symbol_ok() {
1916        init_test("verify_auth_no_context_preverified_symbol_ok");
1917        let config = encoding_config();
1918        let mut decoder = DecodingPipeline::new(DecodingConfig {
1919            symbol_size: config.symbol_size,
1920            max_block_size: config.max_block_size,
1921            verify_auth: true,
1922            ..DecodingConfig::default()
1923        });
1924
1925        let symbol = Symbol::new(
1926            SymbolId::new(ObjectId::new_for_test(108), 0, 0),
1927            vec![0u8; usize::from(config.symbol_size)],
1928            SymbolKind::Source,
1929        );
1930        // new_verified creates a pre-verified symbol
1931        let auth = AuthenticatedSymbol::new_verified(
1932            symbol,
1933            crate::security::tag::AuthenticationTag::zero(),
1934        );
1935
1936        let result = decoder.feed(auth);
1937        let is_ok = result.is_ok();
1938        crate::assert_with_log!(is_ok, "preverified symbol accepted", true, is_ok);
1939        let accept = result.unwrap();
1940        let is_accepted = matches!(accept, SymbolAcceptResult::Accepted { .. });
1941        crate::assert_with_log!(is_accepted, "result is Accepted variant", true, is_accepted);
1942        crate::test_complete!("verify_auth_no_context_preverified_symbol_ok");
1943    }
1944
1945    #[test]
1946    fn with_auth_rejects_bad_tag() {
1947        init_test("with_auth_rejects_bad_tag");
1948        let config = encoding_config();
1949        let mut decoder = DecodingPipeline::with_auth(
1950            DecodingConfig {
1951                symbol_size: config.symbol_size,
1952                max_block_size: config.max_block_size,
1953                verify_auth: true,
1954                ..DecodingConfig::default()
1955            },
1956            crate::security::SecurityContext::for_testing(42),
1957        );
1958
1959        let symbol = Symbol::new(
1960            SymbolId::new(ObjectId::new_for_test(109), 0, 0),
1961            vec![0u8; usize::from(config.symbol_size)],
1962            SymbolKind::Source,
1963        );
1964        // zero tag is wrong for any real key
1965        let auth = AuthenticatedSymbol::from_parts(
1966            symbol,
1967            crate::security::tag::AuthenticationTag::zero(),
1968        );
1969
1970        let result = decoder.feed(auth).expect("feed should not return Err");
1971        let expected = SymbolAcceptResult::Rejected(RejectReason::AuthenticationFailed);
1972        let ok = result == expected;
1973        crate::assert_with_log!(ok, "bad tag rejected", expected, result);
1974        crate::test_complete!("with_auth_rejects_bad_tag");
1975    }
1976
1977    #[test]
1978    fn multi_block_roundtrip() {
1979        init_test("multi_block_roundtrip");
1980        let config = crate::config::EncodingConfig {
1981            symbol_size: 256,
1982            max_block_size: 1024,
1983            repair_overhead: 1.05,
1984            encoding_parallelism: 1,
1985            decoding_parallelism: 1,
1986        };
1987        let mut encoder = EncodingPipeline::new(config.clone(), pool());
1988        let object_id = ObjectId::new_for_test(110);
1989        let data: Vec<u8> = (0u32..2048).map(|i| (i % 251) as u8).collect();
1990
1991        let symbols: Vec<Symbol> = encoder
1992            .encode_with_repair(object_id, &data, 0)
1993            .map(|res| res.unwrap().into_symbol())
1994            .collect();
1995
1996        let mut decoder = DecodingPipeline::new(DecodingConfig {
1997            symbol_size: config.symbol_size,
1998            max_block_size: config.max_block_size,
1999            repair_overhead: 1.0,
2000            min_overhead: 0,
2001            max_buffered_symbols: 0,
2002            block_timeout: Duration::from_secs(30),
2003            verify_auth: false,
2004        });
2005
2006        // Compute block plan matching what the encoder does
2007        let symbol_size = usize::from(config.symbol_size);
2008        let num_blocks = data.len().div_ceil(config.max_block_size);
2009        let mut full_block_k: u16 = 0;
2010        for b in 0..num_blocks {
2011            let block_start = b * config.max_block_size;
2012            let block_len = usize::min(config.max_block_size, data.len() - block_start);
2013            let k = block_len.div_ceil(symbol_size) as u16;
2014            full_block_k = full_block_k.max(k);
2015        }
2016        decoder
2017            .set_object_params(ObjectParams::new(
2018                object_id,
2019                data.len() as u64,
2020                config.symbol_size,
2021                num_blocks as u16,
2022                full_block_k,
2023            ))
2024            .expect("set params");
2025
2026        for symbol in symbols {
2027            let auth = AuthenticatedSymbol::from_parts(
2028                symbol,
2029                crate::security::tag::AuthenticationTag::zero(),
2030            );
2031            let _ = decoder.feed(auth).unwrap();
2032        }
2033
2034        let complete = decoder.is_complete();
2035        crate::assert_with_log!(complete, "multi-block is_complete", true, complete);
2036
2037        let decoded_data = decoder.into_data().expect("decoded");
2038        let ok = decoded_data == data;
2039        crate::assert_with_log!(
2040            ok,
2041            "multi-block roundtrip data",
2042            data.len(),
2043            decoded_data.len()
2044        );
2045        crate::test_complete!("multi_block_roundtrip");
2046    }
2047
2048    #[test]
2049    fn multi_block_progress_retains_cumulative_symbols_after_block_completion() {
2050        init_test("multi_block_progress_retains_cumulative_symbols_after_block_completion");
2051        let config = crate::config::EncodingConfig {
2052            symbol_size: 256,
2053            max_block_size: 1024,
2054            repair_overhead: 1.05,
2055            encoding_parallelism: 1,
2056            decoding_parallelism: 1,
2057        };
2058        let mut encoder = EncodingPipeline::new(config.clone(), pool());
2059        let object_id = ObjectId::new_for_test(111);
2060        let data: Vec<u8> = (0u32..2048).map(|i| (i % 251) as u8).collect();
2061
2062        let mut block_zero_symbols: Vec<Symbol> = encoder
2063            .encode_with_repair(object_id, &data, 0)
2064            .map(|res| res.expect("encode").into_symbol())
2065            .filter(|symbol| symbol.sbn() == 0)
2066            .collect();
2067        block_zero_symbols.sort_by_key(Symbol::esi);
2068        assert_eq!(block_zero_symbols.len(), 4);
2069
2070        let mut decoder = DecodingPipeline::new(DecodingConfig {
2071            symbol_size: config.symbol_size,
2072            max_block_size: config.max_block_size,
2073            repair_overhead: 1.0,
2074            min_overhead: 0,
2075            max_buffered_symbols: 0,
2076            block_timeout: Duration::from_secs(30),
2077            verify_auth: false,
2078        });
2079        decoder
2080            .set_object_params(ObjectParams::new(
2081                object_id,
2082                data.len() as u64,
2083                config.symbol_size,
2084                2,
2085                4,
2086            ))
2087            .expect("set params");
2088
2089        for symbol in block_zero_symbols {
2090            let auth = AuthenticatedSymbol::from_parts(
2091                symbol,
2092                crate::security::tag::AuthenticationTag::zero(),
2093            );
2094            let _ = decoder.feed(auth).expect("feed");
2095        }
2096
2097        assert_eq!(decoder.progress().blocks_complete, 1);
2098        assert_eq!(decoder.progress().blocks_total, Some(2));
2099        assert_eq!(decoder.progress().symbols_received, 4);
2100        assert_eq!(decoder.progress().symbols_needed_estimate, 8);
2101
2102        let err = decoder.into_data().expect_err("block one is still missing");
2103        assert!(matches!(
2104            err,
2105            DecodingError::InsufficientSymbols {
2106                received: 4,
2107                needed: 8
2108            }
2109        ));
2110        crate::test_complete!(
2111            "multi_block_progress_retains_cumulative_symbols_after_block_completion"
2112        );
2113    }
2114
2115    #[test]
2116    fn into_data_no_params_errors() {
2117        init_test("into_data_no_params_errors");
2118        let pipeline = DecodingPipeline::new(DecodingConfig::default());
2119        let result = pipeline.into_data();
2120        let is_err = result.is_err();
2121        crate::assert_with_log!(is_err, "into_data without params errors", true, is_err);
2122        let err = result.unwrap_err();
2123        let msg = err.to_string();
2124        let contains = msg.contains("object parameters not set");
2125        crate::assert_with_log!(
2126            contains,
2127            "error message contains expected text",
2128            true,
2129            contains
2130        );
2131        crate::test_complete!("into_data_no_params_errors");
2132    }
2133
2134    // --- wave 76 trait coverage ---
2135
2136    #[test]
2137    fn reject_reason_debug_clone_copy_eq() {
2138        let r = RejectReason::WrongObjectId;
2139        let r2 = r; // Copy
2140        let r3 = r;
2141        assert_eq!(r, r2);
2142        assert_eq!(r, r3);
2143        assert_ne!(r, RejectReason::AuthenticationFailed);
2144        assert_ne!(r, RejectReason::SymbolSizeMismatch);
2145        assert_ne!(r, RejectReason::BlockAlreadyDecoded);
2146        assert_ne!(r, RejectReason::InsufficientRank);
2147        assert_ne!(r, RejectReason::InconsistentEquations);
2148        assert_ne!(r, RejectReason::InvalidMetadata);
2149        assert_ne!(r, RejectReason::MemoryLimitReached);
2150        let dbg = format!("{r:?}");
2151        assert!(dbg.contains("WrongObjectId"));
2152    }
2153
2154    #[test]
2155    fn symbol_accept_result_debug_clone_eq() {
2156        let a = SymbolAcceptResult::Accepted {
2157            received: 3,
2158            needed: 5,
2159        };
2160        let a2 = a.clone();
2161        assert_eq!(a, a2);
2162        assert_ne!(a, SymbolAcceptResult::Duplicate);
2163        let r = SymbolAcceptResult::Rejected(RejectReason::InvalidMetadata);
2164        let r2 = r.clone();
2165        assert_eq!(r, r2);
2166        let dbg = format!("{a:?}");
2167        assert!(dbg.contains("Accepted"));
2168    }
2169
2170    #[test]
2171    fn block_state_kind_debug_clone_copy_eq() {
2172        let s = BlockStateKind::Collecting;
2173        let s2 = s; // Copy
2174        let s3 = s;
2175        assert_eq!(s, s2);
2176        assert_eq!(s, s3);
2177        assert_ne!(s, BlockStateKind::Decoding);
2178        assert_ne!(s, BlockStateKind::Decoded);
2179        assert_ne!(s, BlockStateKind::Failed);
2180        let dbg = format!("{s:?}");
2181        assert!(dbg.contains("Collecting"));
2182    }
2183}