1use crate::error::{Error, ErrorKind};
10use crate::raptorq::decoder::{
11 DecodeError as RaptorDecodeError, InactivationDecoder, ReceivedSymbol,
12};
13use crate::raptorq::gf256::Gf256;
14use crate::security::{AuthenticatedSymbol, SecurityContext};
15use crate::types::symbol_set::{InsertResult, SymbolSet, ThresholdConfig};
16use crate::types::{ObjectId, ObjectParams, Symbol, SymbolId, SymbolKind};
17use std::collections::{HashMap, HashSet};
18use std::time::Duration;
19
20#[derive(Debug, thiserror::Error)]
22pub enum DecodingError {
23 #[error("authentication failed for symbol {symbol_id}")]
25 AuthenticationFailed {
26 symbol_id: SymbolId,
28 },
29 #[error("insufficient symbols: have {received}, need {needed}")]
31 InsufficientSymbols {
32 received: usize,
34 needed: usize,
36 },
37 #[error("matrix inversion failed: {reason}")]
39 MatrixInversionFailed {
40 reason: String,
42 },
43 #[error("block timeout after {elapsed:?}")]
45 BlockTimeout {
46 sbn: u8,
48 elapsed: Duration,
50 },
51 #[error("inconsistent block metadata: {sbn} {details}")]
53 InconsistentMetadata {
54 sbn: u8,
56 details: String,
58 },
59 #[error("symbol size mismatch: expected {expected}, got {actual}")]
61 SymbolSizeMismatch {
62 expected: u16,
64 actual: usize,
66 },
67}
68
69impl From<DecodingError> for Error {
70 fn from(err: DecodingError) -> Self {
71 match &err {
72 DecodingError::AuthenticationFailed { .. } => Self::new(ErrorKind::CorruptedSymbol),
73 DecodingError::InsufficientSymbols { .. } => Self::new(ErrorKind::InsufficientSymbols),
74 DecodingError::MatrixInversionFailed { .. }
75 | DecodingError::InconsistentMetadata { .. }
76 | DecodingError::SymbolSizeMismatch { .. } => Self::new(ErrorKind::DecodingFailed),
77 DecodingError::BlockTimeout { .. } => Self::new(ErrorKind::ThresholdTimeout),
78 }
79 .with_message(err.to_string())
80 }
81}
82
83#[derive(Debug, Clone, Copy, PartialEq, Eq)]
85pub enum RejectReason {
86 WrongObjectId,
88 AuthenticationFailed,
90 SymbolSizeMismatch,
92 BlockAlreadyDecoded,
94 InsufficientRank,
96 InconsistentEquations,
98 InvalidMetadata,
100 MemoryLimitReached,
102}
103
104#[derive(Debug, Clone, PartialEq, Eq)]
106pub enum SymbolAcceptResult {
107 Accepted {
109 received: usize,
111 needed: usize,
113 },
114 DecodingStarted {
116 block_sbn: u8,
118 },
119 BlockComplete {
121 block_sbn: u8,
123 data: Vec<u8>,
125 },
126 Duplicate,
128 Rejected(RejectReason),
130}
131
132#[derive(Debug, Clone)]
134pub struct DecodingConfig {
135 pub symbol_size: u16,
137 pub max_block_size: usize,
139 pub repair_overhead: f64,
141 pub min_overhead: usize,
143 pub max_buffered_symbols: usize,
145 pub block_timeout: Duration,
147 pub verify_auth: bool,
149}
150
151impl Default for DecodingConfig {
152 fn default() -> Self {
153 Self {
154 symbol_size: 256,
155 max_block_size: 1024 * 1024,
156 repair_overhead: 1.05,
157 min_overhead: 0,
158 max_buffered_symbols: 0,
159 block_timeout: Duration::from_secs(30),
160 verify_auth: false,
161 }
162 }
163}
164
165#[derive(Debug, Clone, Copy)]
167pub struct DecodingProgress {
168 pub blocks_complete: usize,
170 pub blocks_total: Option<usize>,
172 pub symbols_received: usize,
174 pub symbols_needed_estimate: usize,
176}
177
178#[derive(Debug, Clone, Copy)]
180pub struct BlockStatus {
181 pub sbn: u8,
183 pub symbols_received: usize,
185 pub symbols_needed: usize,
187 pub state: BlockStateKind,
189}
190
191#[derive(Debug, Clone, Copy, PartialEq, Eq)]
193pub enum BlockStateKind {
194 Collecting,
196 Decoding,
198 Decoded,
200 Failed,
202}
203
204#[derive(Debug)]
205struct BlockDecoder {
206 state: BlockDecodingState,
207 decoded: Option<Vec<u8>>,
208}
209
210#[derive(Debug)]
211enum BlockDecodingState {
212 Collecting,
213 Decoding,
214 Decoded,
215 Failed,
216}
217
218#[derive(Debug)]
220pub struct DecodingPipeline {
221 config: DecodingConfig,
222 symbols: SymbolSet,
223 accepted_symbols_total: usize,
224 blocks: HashMap<u8, BlockDecoder>,
225 completed_blocks: HashSet<u8>,
226 object_id: Option<ObjectId>,
227 object_size: Option<u64>,
228 block_plans: Option<Vec<BlockPlan>>,
229 auth_context: Option<SecurityContext>,
230}
231
232impl DecodingPipeline {
233 #[must_use]
235 pub fn new(config: DecodingConfig) -> Self {
236 let threshold = ThresholdConfig::new(
237 config.repair_overhead,
238 config.min_overhead,
239 config.max_buffered_symbols,
240 );
241 Self {
242 config,
243 symbols: SymbolSet::with_config(threshold),
244 accepted_symbols_total: 0,
245 blocks: HashMap::new(),
246 completed_blocks: HashSet::new(),
247 object_id: None,
248 object_size: None,
249 block_plans: None,
250 auth_context: None,
251 }
252 }
253
254 #[must_use]
256 pub fn with_auth(config: DecodingConfig, ctx: SecurityContext) -> Self {
257 let mut pipeline = Self::new(config);
258 pipeline.auth_context = Some(ctx);
259 pipeline
260 }
261
262 pub fn set_object_params(&mut self, params: ObjectParams) -> Result<(), DecodingError> {
264 if params.symbol_size != self.config.symbol_size {
265 return Err(DecodingError::SymbolSizeMismatch {
266 expected: self.config.symbol_size,
267 actual: params.symbol_size as usize,
268 });
269 }
270 if let Some(existing) = self.object_id {
271 if existing != params.object_id {
272 return Err(DecodingError::InconsistentMetadata {
273 sbn: 0,
274 details: format!(
275 "object id mismatch: expected {existing:?}, got {:?}",
276 params.object_id
277 ),
278 });
279 }
280 }
281 let plans = plan_blocks(
282 params.object_size as usize,
283 usize::from(params.symbol_size),
284 self.config.max_block_size,
285 )?;
286 validate_object_params_layout(params, &plans)?;
287 self.object_id = Some(params.object_id);
288 self.object_size = Some(params.object_size);
289 self.block_plans = Some(plans);
290 self.configure_block_k();
291 Ok(())
292 }
293
294 pub fn feed(
296 &mut self,
297 mut auth_symbol: AuthenticatedSymbol,
298 ) -> Result<SymbolAcceptResult, DecodingError> {
299 if self.config.verify_auth {
300 match &self.auth_context {
301 Some(ctx) => {
302 if !auth_symbol.is_verified()
303 && ctx.verify_authenticated_symbol(&mut auth_symbol).is_err()
304 {
305 return Ok(SymbolAcceptResult::Rejected(
306 RejectReason::AuthenticationFailed,
307 ));
308 }
309 }
310 None => {
311 if !auth_symbol.is_verified() {
315 return Ok(SymbolAcceptResult::Rejected(
316 RejectReason::AuthenticationFailed,
317 ));
318 }
319 }
320 }
321 }
322
323 let symbol = auth_symbol.into_symbol();
324
325 if symbol.len() != usize::from(self.config.symbol_size) {
326 return Ok(SymbolAcceptResult::Rejected(
327 RejectReason::SymbolSizeMismatch,
328 ));
329 }
330
331 if let Some(object_id) = self.object_id {
332 if object_id != symbol.object_id() {
333 return Ok(SymbolAcceptResult::Rejected(RejectReason::WrongObjectId));
334 }
335 } else {
336 self.object_id = Some(symbol.object_id());
337 }
338
339 let sbn = symbol.sbn();
340 if self.completed_blocks.contains(&sbn) {
341 return Ok(SymbolAcceptResult::Rejected(
342 RejectReason::BlockAlreadyDecoded,
343 ));
344 }
345
346 self.blocks.entry(sbn).or_insert_with(|| BlockDecoder {
348 state: BlockDecodingState::Collecting,
349 decoded: None,
350 });
351
352 let insert_result = self.symbols.insert(symbol);
353 match insert_result {
354 InsertResult::Duplicate => Ok(SymbolAcceptResult::Duplicate),
355 InsertResult::MemoryLimitReached | InsertResult::BlockLimitReached { .. } => Ok(
356 SymbolAcceptResult::Rejected(RejectReason::MemoryLimitReached),
357 ),
358 InsertResult::Inserted {
359 block_progress,
360 threshold_reached,
361 } => {
362 self.accepted_symbols_total = self.accepted_symbols_total.saturating_add(1);
363 if block_progress.k.is_none() {
364 self.configure_block_k();
365 }
366 let needed = block_progress.k.map_or(0, |k| {
367 required_symbols(k, self.config.repair_overhead, self.config.min_overhead)
368 });
369 let received = block_progress.total();
370
371 if threshold_reached {
372 if let Some(block) = self.blocks.get_mut(&sbn) {
374 block.state = BlockDecodingState::Decoding;
375 }
376 if let Some(result) = self.try_decode_block(sbn) {
377 return Ok(result);
378 }
379 }
380
381 if let Some(block) = self.blocks.get_mut(&sbn) {
383 if !matches!(
384 block.state,
385 BlockDecodingState::Decoded | BlockDecodingState::Failed
386 ) {
387 block.state = BlockDecodingState::Collecting;
388 }
389 }
390 Ok(SymbolAcceptResult::Accepted { received, needed })
391 }
392 }
393 }
394
395 pub fn feed_batch(
397 &mut self,
398 symbols: impl Iterator<Item = AuthenticatedSymbol>,
399 ) -> Vec<Result<SymbolAcceptResult, DecodingError>> {
400 symbols.map(|symbol| self.feed(symbol)).collect()
401 }
402
403 #[must_use]
405 pub fn is_complete(&self) -> bool {
406 let Some(plans) = &self.block_plans else {
407 return false;
408 };
409 self.completed_blocks.len() == plans.len()
410 }
411
412 #[must_use]
414 pub fn progress(&self) -> DecodingProgress {
415 let blocks_total = self.block_plans.as_ref().map(Vec::len);
416 let symbols_received = self.accepted_symbols_total;
417 let symbols_needed_estimate = self.block_plans.as_ref().map_or(0, |plans| {
418 sum_required_symbols(plans, self.config.repair_overhead, self.config.min_overhead)
419 });
420
421 DecodingProgress {
422 blocks_complete: self.completed_blocks.len(),
423 blocks_total,
424 symbols_received,
425 symbols_needed_estimate,
426 }
427 }
428
429 #[must_use]
431 pub fn block_status(&self, sbn: u8) -> Option<BlockStatus> {
432 let progress = self.symbols.block_progress(sbn)?;
433 let state = self
434 .blocks
435 .get(&sbn)
436 .map_or(BlockStateKind::Collecting, |block| match block.state {
437 BlockDecodingState::Collecting => BlockStateKind::Collecting,
438 BlockDecodingState::Decoding => BlockStateKind::Decoding,
439 BlockDecodingState::Decoded => BlockStateKind::Decoded,
440 BlockDecodingState::Failed => BlockStateKind::Failed,
441 });
442
443 let symbols_needed = progress.k.map_or(0, |k| {
444 required_symbols(k, self.config.repair_overhead, self.config.min_overhead)
445 });
446
447 Some(BlockStatus {
448 sbn,
449 symbols_received: progress.total(),
450 symbols_needed,
451 state,
452 })
453 }
454
455 pub fn into_data(self) -> Result<Vec<u8>, DecodingError> {
457 let Some(plans) = &self.block_plans else {
458 return Err(DecodingError::InconsistentMetadata {
459 sbn: 0,
460 details: "object parameters not set".to_string(),
461 });
462 };
463 if !self.is_complete() {
464 let received = self.accepted_symbols_total;
465 let needed =
466 sum_required_symbols(plans, self.config.repair_overhead, self.config.min_overhead);
467 return Err(DecodingError::InsufficientSymbols { received, needed });
468 }
469
470 let mut output = Vec::with_capacity(self.object_size.unwrap_or(0) as usize);
471 for plan in plans {
472 let block = self
473 .blocks
474 .get(&plan.sbn)
475 .and_then(|b| b.decoded.as_ref())
476 .ok_or_else(|| DecodingError::InconsistentMetadata {
477 sbn: plan.sbn,
478 details: "missing decoded block".to_string(),
479 })?;
480 output.extend_from_slice(block);
481 }
482
483 if let Some(size) = self.object_size {
484 output.truncate(size as usize);
485 }
486
487 Ok(output)
488 }
489
490 fn configure_block_k(&mut self) {
491 let Some(plans) = &self.block_plans else {
492 return;
493 };
494 for plan in plans {
495 let k = u16::try_from(plan.k).unwrap_or(u16::MAX);
496 let _ = self.symbols.set_block_k(plan.sbn, k);
497 }
498 }
499
500 fn try_decode_block(&mut self, sbn: u8) -> Option<SymbolAcceptResult> {
501 let block_plan = self.block_plan(sbn)?;
502 let k = block_plan.k;
503 if k == 0 {
504 return None;
505 }
506
507 let symbols: Vec<Symbol> = self.symbols.symbols_for_block(sbn).cloned().collect();
508 if symbols.len() < k {
509 return None;
510 }
511
512 let decoded_symbols = match decode_block(
513 block_plan,
514 &symbols,
515 usize::from(self.config.symbol_size),
516 ) {
517 Ok(symbols) => symbols,
518 Err(DecodingError::InsufficientSymbols { .. }) => {
519 return Some(SymbolAcceptResult::Rejected(RejectReason::InsufficientRank));
520 }
521 Err(DecodingError::MatrixInversionFailed { .. }) => {
522 return Some(SymbolAcceptResult::Rejected(
523 RejectReason::InconsistentEquations,
524 ));
525 }
526 Err(DecodingError::InconsistentMetadata { .. }) => {
527 let block = self.blocks.get_mut(&sbn);
528 if let Some(block) = block {
529 block.state = BlockDecodingState::Failed;
530 }
531 return Some(SymbolAcceptResult::Rejected(RejectReason::InvalidMetadata));
532 }
533 Err(DecodingError::SymbolSizeMismatch { .. }) => {
534 let block = self.blocks.get_mut(&sbn);
535 if let Some(block) = block {
536 block.state = BlockDecodingState::Failed;
537 }
538 return Some(SymbolAcceptResult::Rejected(
539 RejectReason::SymbolSizeMismatch,
540 ));
541 }
542 Err(err) => {
543 let block = self.blocks.get_mut(&sbn);
544 if let Some(block) = block {
545 block.state = BlockDecodingState::Failed;
546 }
547 #[cfg(feature = "tracing-integration")]
548 tracing::error!(sbn = sbn, error = %err, "unexpected error during block decode");
549 #[cfg(not(feature = "tracing-integration"))]
550 let _ = &err;
551 return Some(SymbolAcceptResult::Rejected(
552 RejectReason::InconsistentEquations,
553 ));
554 }
555 };
556
557 let mut block_data = Vec::with_capacity(block_plan.len);
558 for symbol in &decoded_symbols {
559 block_data.extend_from_slice(symbol.data());
560 }
561 block_data.truncate(block_plan.len);
562
563 if let Some(block) = self.blocks.get_mut(&sbn) {
564 block.state = BlockDecodingState::Decoded;
565 block.decoded = Some(block_data.clone());
566 }
567
568 self.completed_blocks.insert(sbn);
569 self.symbols.clear_block(sbn);
570
571 Some(SymbolAcceptResult::BlockComplete {
572 block_sbn: sbn,
573 data: block_data,
574 })
575 }
576
577 fn block_plan(&self, sbn: u8) -> Option<&BlockPlan> {
578 self.block_plans
579 .as_ref()
580 .and_then(|plans| plans.iter().find(|plan| plan.sbn == sbn))
581 }
582}
583
584#[derive(Debug, Clone)]
585struct BlockPlan {
586 sbn: u8,
587 len: usize,
588 k: usize,
589}
590
591fn plan_blocks(
592 object_size: usize,
593 symbol_size: usize,
594 max_block_size: usize,
595) -> Result<Vec<BlockPlan>, DecodingError> {
596 if object_size == 0 {
597 return Ok(Vec::new());
598 }
599
600 if symbol_size == 0 {
601 return Err(DecodingError::InconsistentMetadata {
602 sbn: 0,
603 details: "symbol_size must be > 0".to_string(),
604 });
605 }
606
607 let max_blocks = u8::MAX as usize + 1;
608 let max_total = max_block_size.saturating_mul(max_blocks);
609 if object_size > max_total {
610 return Err(DecodingError::InconsistentMetadata {
611 sbn: 0,
612 details: format!("object size {object_size} exceeds limit {max_total}"),
613 });
614 }
615
616 let mut blocks = Vec::new();
617 let mut offset = 0;
618 let mut sbn: u8 = 0;
619
620 while offset < object_size {
621 let len = usize::min(max_block_size, object_size - offset);
622 let k = len.div_ceil(symbol_size);
623 blocks.push(BlockPlan { sbn, len, k });
624 offset += len;
625 sbn = sbn.wrapping_add(1);
626 }
627
628 Ok(blocks)
629}
630
631fn validate_object_params_layout(
632 params: ObjectParams,
633 plans: &[BlockPlan],
634) -> Result<(), DecodingError> {
635 let declared_blocks = usize::from(params.source_blocks);
636 let declared_k = usize::from(params.symbols_per_block);
637
638 if plans.is_empty() {
639 if declared_blocks == 0 && declared_k == 0 {
640 return Ok(());
641 }
642 if declared_blocks == 1 {
643 return Ok(());
644 }
645 return Err(DecodingError::InconsistentMetadata {
646 sbn: 0,
647 details: format!(
648 "object params layout mismatch: empty object expects either 0 blocks / 0 symbols-per-block or a single empty sentinel block, got {declared_blocks} block(s) with {declared_k} symbols/block"
649 ),
650 });
651 }
652
653 let expected_blocks = plans.len();
654 if declared_blocks != expected_blocks {
655 return Err(DecodingError::InconsistentMetadata {
656 sbn: 0,
657 details: format!(
658 "object params block count mismatch: expected {expected_blocks}, got {declared_blocks}"
659 ),
660 });
661 }
662
663 let expected_k = plans.iter().map(|plan| plan.k).max().unwrap_or(0);
664 if declared_k != expected_k {
665 return Err(DecodingError::InconsistentMetadata {
666 sbn: 0,
667 details: format!(
668 "object params symbols_per_block mismatch: expected {expected_k}, got {declared_k}"
669 ),
670 });
671 }
672
673 Ok(())
674}
675
676fn required_symbols(k: u16, overhead: f64, min_overhead: usize) -> usize {
677 if k == 0 {
678 return 0;
679 }
680 let raw = (f64::from(k) * overhead).ceil();
681 let minimum_threshold = usize::from(k).saturating_add(min_overhead);
682 if raw.is_nan() {
683 return minimum_threshold;
684 }
685 if raw.is_sign_positive() && !raw.is_finite() {
686 return usize::MAX;
687 }
688 if raw.is_sign_negative() {
689 return minimum_threshold;
690 }
691 #[allow(clippy::cast_sign_loss)]
692 let factor_threshold = raw as usize;
693 factor_threshold.max(minimum_threshold)
696}
697
698fn sum_required_symbols(plans: &[BlockPlan], overhead: f64, min_overhead: usize) -> usize {
699 plans.iter().fold(0usize, |acc, plan| {
700 acc.saturating_add(required_symbols(
701 u16::try_from(plan.k).unwrap_or(u16::MAX),
702 overhead,
703 min_overhead,
704 ))
705 })
706}
707
708#[allow(clippy::too_many_lines)]
709fn decode_block(
710 plan: &BlockPlan,
711 symbols: &[Symbol],
712 symbol_size: usize,
713) -> Result<Vec<Symbol>, DecodingError> {
714 let k = plan.k;
715 if symbols.len() < k {
716 return Err(DecodingError::InsufficientSymbols {
717 received: symbols.len(),
718 needed: k,
719 });
720 }
721
722 let object_id = symbols.first().map_or(ObjectId::NIL, Symbol::object_id);
723 let block_seed = seed_for_block(object_id, plan.sbn);
724 let decoder = InactivationDecoder::new(k, symbol_size, block_seed);
725
726 let mut received = decoder.constraint_symbols();
728 received.reserve(symbols.len());
729
730 for symbol in symbols {
732 match symbol.kind() {
733 SymbolKind::Source => {
734 let esi = symbol.esi() as usize;
735 if esi >= k {
736 return Err(DecodingError::InconsistentMetadata {
737 sbn: plan.sbn,
738 details: format!("source esi {esi} >= k {k}"),
739 });
740 }
741 received.push(ReceivedSymbol {
743 esi: symbol.esi(),
744 is_source: true,
745 columns: vec![esi],
746 coefficients: vec![Gf256::ONE],
747 data: symbol.data().to_vec(),
748 });
749 }
750 SymbolKind::Repair => {
751 let (columns, coefficients) = decoder.repair_equation(symbol.esi());
752 received.push(ReceivedSymbol {
753 esi: symbol.esi(),
754 is_source: false,
755 columns,
756 coefficients,
757 data: symbol.data().to_vec(),
758 });
759 }
760 }
761 }
762
763 let result = match decoder.decode(&received) {
764 Ok(result) => result,
765 Err(err) => {
766 let mapped = match err {
767 RaptorDecodeError::InsufficientSymbols { received, required } => {
768 DecodingError::InsufficientSymbols {
769 received,
770 needed: required,
771 }
772 }
773 RaptorDecodeError::SingularMatrix { row } => DecodingError::MatrixInversionFailed {
774 reason: format!("singular matrix at row {row}"),
775 },
776 RaptorDecodeError::SymbolSizeMismatch { expected, actual } => {
777 DecodingError::SymbolSizeMismatch {
778 expected: u16::try_from(expected).unwrap_or(u16::MAX),
779 actual,
780 }
781 }
782 RaptorDecodeError::SymbolEquationArityMismatch {
783 esi,
784 columns,
785 coefficients,
786 } => DecodingError::InconsistentMetadata {
787 sbn: plan.sbn,
788 details: format!(
789 "symbol {esi} has mismatched equation vectors: columns={columns}, coefficients={coefficients}"
790 ),
791 },
792 RaptorDecodeError::ColumnIndexOutOfRange {
793 esi,
794 column,
795 max_valid,
796 } => DecodingError::InconsistentMetadata {
797 sbn: plan.sbn,
798 details: format!(
799 "symbol {esi} references out-of-range column {column} (valid < {max_valid})"
800 ),
801 },
802 RaptorDecodeError::SourceEsiOutOfRange { esi, max_valid } => {
803 DecodingError::InconsistentMetadata {
804 sbn: plan.sbn,
805 details: format!(
806 "source symbol {esi} falls outside the systematic domain (valid < {max_valid})"
807 ),
808 }
809 }
810 RaptorDecodeError::InvalidSourceSymbolEquation {
811 esi,
812 expected_column,
813 } => DecodingError::InconsistentMetadata {
814 sbn: plan.sbn,
815 details: format!(
816 "source symbol {esi} must use the identity equation for column {expected_column}"
817 ),
818 },
819 RaptorDecodeError::CorruptDecodedOutput {
820 esi,
821 byte_index,
822 expected,
823 actual,
824 } => DecodingError::MatrixInversionFailed {
825 reason: format!(
826 "decoded output verification failed at symbol {esi}, byte {byte_index}: expected 0x{expected:02x}, actual 0x{actual:02x}"
827 ),
828 },
829 };
830 return Err(mapped);
831 }
832 };
833
834 let mut decoded_symbols = Vec::with_capacity(k);
838 for (esi, data) in result.source.into_iter().enumerate() {
839 decoded_symbols.push(Symbol::new(
840 SymbolId::new(object_id, plan.sbn, esi as u32),
841 data,
842 SymbolKind::Source,
843 ));
844 }
845
846 Ok(decoded_symbols)
847}
848
849fn seed_for_block(object_id: ObjectId, sbn: u8) -> u64 {
850 seed_for(object_id, sbn, 0)
851}
852
853fn seed_for(object_id: ObjectId, sbn: u8, esi: u32) -> u64 {
854 let obj = object_id.as_u128();
855 let hi = (obj >> 64) as u64;
856 let lo = obj as u64;
857 let mut seed = hi ^ lo.rotate_left(13);
858 seed ^= u64::from(sbn) << 56;
859 seed ^= u64::from(esi);
860 if seed == 0 { 1 } else { seed }
861}
862
863#[cfg(test)]
864mod tests {
865 use super::*;
866 use crate::encoding::EncodingPipeline;
867 use crate::types::resource::{PoolConfig, SymbolPool};
868
869 fn init_test(name: &str) {
870 crate::test_utils::init_test_logging();
871 crate::test_phase!(name);
872 }
873
874 fn pool() -> SymbolPool {
875 SymbolPool::new(PoolConfig {
876 symbol_size: 256,
877 initial_size: 64,
878 max_size: 64,
879 allow_growth: false,
880 growth_increment: 0,
881 })
882 }
883
884 fn encoding_config() -> crate::config::EncodingConfig {
885 crate::config::EncodingConfig {
886 symbol_size: 256,
887 max_block_size: 1024,
888 repair_overhead: 1.05,
889 encoding_parallelism: 1,
890 decoding_parallelism: 1,
891 }
892 }
893
894 fn decoder_with_params(
895 config: &crate::config::EncodingConfig,
896 object_id: ObjectId,
897 data_len: usize,
898 repair_overhead: f64,
899 min_overhead: usize,
900 ) -> DecodingPipeline {
901 let mut decoder = DecodingPipeline::new(DecodingConfig {
902 symbol_size: config.symbol_size,
903 max_block_size: config.max_block_size,
904 repair_overhead,
905 min_overhead,
906 max_buffered_symbols: 0,
907 block_timeout: Duration::from_secs(30),
908 verify_auth: false,
909 });
910 let symbols_per_block = (data_len.div_ceil(usize::from(config.symbol_size))) as u16;
911 decoder
912 .set_object_params(ObjectParams::new(
913 object_id,
914 data_len as u64,
915 config.symbol_size,
916 1,
917 symbols_per_block,
918 ))
919 .expect("params");
920 decoder
921 }
922
923 #[test]
924 fn decode_roundtrip_sources_only() {
925 init_test("decode_roundtrip_sources_only");
926 let config = encoding_config();
927 let mut encoder = EncodingPipeline::new(config.clone(), pool());
928 let object_id = ObjectId::new_for_test(1);
929 let data = vec![42u8; 512];
930 let symbols: Vec<Symbol> = encoder
931 .encode_with_repair(object_id, &data, 0)
932 .map(|res| res.unwrap().into_symbol())
933 .collect();
934
935 let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.0, 0);
936
937 for symbol in symbols {
938 let auth = AuthenticatedSymbol::from_parts(
939 symbol,
940 crate::security::tag::AuthenticationTag::zero(),
941 );
942 let _ = decoder.feed(auth).unwrap();
943 }
944
945 let decoded_data = decoder.into_data().expect("decoded");
946 let ok = decoded_data == data;
947 crate::assert_with_log!(ok, "decoded data", data, decoded_data);
948 crate::test_complete!("decode_roundtrip_sources_only");
949 }
950
951 #[test]
952 fn decode_roundtrip_out_of_order() {
953 init_test("decode_roundtrip_out_of_order");
954 let config = encoding_config();
955 let mut encoder = EncodingPipeline::new(config.clone(), pool());
956 let object_id = ObjectId::new_for_test(2);
957 let data = vec![7u8; 768];
958 let mut symbols: Vec<Symbol> = encoder
959 .encode_with_repair(object_id, &data, 2)
960 .map(|res| res.expect("encode").into_symbol())
961 .collect();
962
963 symbols.reverse();
964
965 let mut decoder =
966 decoder_with_params(&config, object_id, data.len(), config.repair_overhead, 0);
967
968 for symbol in symbols {
969 let auth = AuthenticatedSymbol::from_parts(
970 symbol,
971 crate::security::tag::AuthenticationTag::zero(),
972 );
973 let _ = decoder.feed(auth).expect("feed");
974 }
975
976 let decoded_data = decoder.into_data().expect("decoded");
977 let ok = decoded_data == data;
978 crate::assert_with_log!(ok, "decoded data", data, decoded_data);
979 crate::test_complete!("decode_roundtrip_out_of_order");
980 }
981
982 #[test]
983 fn reject_wrong_object_id() {
984 init_test("reject_wrong_object_id");
985 let config = encoding_config();
986 let mut encoder = EncodingPipeline::new(config.clone(), pool());
987 let object_id_a = ObjectId::new_for_test(10);
988 let object_id_b = ObjectId::new_for_test(11);
989 let data = vec![1u8; 128];
990
991 let mut decoder =
992 decoder_with_params(&config, object_id_a, data.len(), config.repair_overhead, 0);
993
994 let symbol_b = encoder
995 .encode_with_repair(object_id_b, &data, 0)
996 .next()
997 .expect("symbol")
998 .expect("encode")
999 .into_symbol();
1000 let auth = AuthenticatedSymbol::from_parts(
1001 symbol_b,
1002 crate::security::tag::AuthenticationTag::zero(),
1003 );
1004
1005 let result = decoder.feed(auth).expect("feed");
1006 let expected = SymbolAcceptResult::Rejected(RejectReason::WrongObjectId);
1007 let ok = result == expected;
1008 crate::assert_with_log!(ok, "wrong object id", expected, result);
1009 crate::test_complete!("reject_wrong_object_id");
1010 }
1011
1012 #[test]
1013 fn reject_symbol_size_mismatch() {
1014 init_test("reject_symbol_size_mismatch");
1015 let config = encoding_config();
1016 let mut decoder = DecodingPipeline::new(DecodingConfig {
1017 symbol_size: config.symbol_size,
1018 max_block_size: config.max_block_size,
1019 repair_overhead: config.repair_overhead,
1020 min_overhead: 0,
1021 max_buffered_symbols: 0,
1022 block_timeout: Duration::from_secs(30),
1023 verify_auth: false,
1024 });
1025
1026 let symbol = Symbol::new(
1027 SymbolId::new(ObjectId::new_for_test(20), 0, 0),
1028 vec![0u8; 8],
1029 SymbolKind::Source,
1030 );
1031 let auth = AuthenticatedSymbol::from_parts(
1032 symbol,
1033 crate::security::tag::AuthenticationTag::zero(),
1034 );
1035 let result = decoder.feed(auth).expect("feed");
1036 let expected = SymbolAcceptResult::Rejected(RejectReason::SymbolSizeMismatch);
1037 let ok = result == expected;
1038 crate::assert_with_log!(ok, "symbol size mismatch", expected, result);
1039 crate::test_complete!("reject_symbol_size_mismatch");
1040 }
1041
1042 #[test]
1043 fn reject_invalid_metadata_esi_out_of_range() {
1044 init_test("reject_invalid_metadata_esi_out_of_range");
1045 let mut decoder = DecodingPipeline::new(DecodingConfig {
1046 symbol_size: 8,
1047 max_block_size: 8,
1048 repair_overhead: 1.0,
1049 min_overhead: 0,
1050 max_buffered_symbols: 0,
1051 block_timeout: Duration::from_secs(30),
1052 verify_auth: false,
1053 });
1054 let object_id = ObjectId::new_for_test(21);
1055 decoder
1056 .set_object_params(ObjectParams::new(object_id, 8, 8, 1, 1))
1057 .expect("params");
1058
1059 let symbol = Symbol::new(
1060 SymbolId::new(object_id, 0, 1),
1061 vec![0u8; 8],
1062 SymbolKind::Source,
1063 );
1064 let auth = AuthenticatedSymbol::from_parts(
1065 symbol,
1066 crate::security::tag::AuthenticationTag::zero(),
1067 );
1068
1069 let result = decoder.feed(auth).expect("feed");
1070 let expected = SymbolAcceptResult::Rejected(RejectReason::InvalidMetadata);
1071 let ok = result == expected;
1072 crate::assert_with_log!(ok, "invalid metadata", expected, result);
1073 crate::test_complete!("reject_invalid_metadata_esi_out_of_range");
1074 }
1075
1076 #[test]
1077 fn duplicate_symbol_before_decode() {
1078 init_test("duplicate_symbol_before_decode");
1079 let config = encoding_config();
1080 let mut encoder = EncodingPipeline::new(config.clone(), pool());
1081 let object_id = ObjectId::new_for_test(30);
1082 let data = vec![9u8; 512];
1084
1085 let symbol = encoder
1086 .encode_with_repair(object_id, &data, 0)
1087 .next()
1088 .expect("symbol")
1089 .expect("encode")
1090 .into_symbol();
1091
1092 let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.5, 1);
1093
1094 let first = decoder
1095 .feed(AuthenticatedSymbol::from_parts(
1096 symbol.clone(),
1097 crate::security::tag::AuthenticationTag::zero(),
1098 ))
1099 .expect("feed");
1100 let accepted = matches!(
1101 first,
1102 SymbolAcceptResult::Accepted { .. } | SymbolAcceptResult::DecodingStarted { .. }
1103 );
1104 crate::assert_with_log!(accepted, "first accepted", true, accepted);
1105
1106 let second = decoder
1107 .feed(AuthenticatedSymbol::from_parts(
1108 symbol,
1109 crate::security::tag::AuthenticationTag::zero(),
1110 ))
1111 .expect("feed");
1112 let expected = SymbolAcceptResult::Duplicate;
1113 let ok = second == expected;
1114 crate::assert_with_log!(ok, "second duplicate", expected, second);
1115 crate::test_complete!("duplicate_symbol_before_decode");
1116 }
1117
1118 #[test]
1119 fn into_data_reports_insufficient_symbols() {
1120 init_test("into_data_reports_insufficient_symbols");
1121 let config = encoding_config();
1122 let mut encoder = EncodingPipeline::new(config.clone(), pool());
1123 let object_id = ObjectId::new_for_test(40);
1124 let data = vec![5u8; 512];
1125
1126 let mut decoder =
1127 decoder_with_params(&config, object_id, data.len(), config.repair_overhead, 0);
1128
1129 let symbol = encoder
1130 .encode_with_repair(object_id, &data, 0)
1131 .next()
1132 .expect("symbol")
1133 .expect("encode")
1134 .into_symbol();
1135 let auth = AuthenticatedSymbol::from_parts(
1136 symbol,
1137 crate::security::tag::AuthenticationTag::zero(),
1138 );
1139 let _ = decoder.feed(auth).expect("feed");
1140
1141 let err = decoder
1142 .into_data()
1143 .expect_err("expected insufficient symbols");
1144 let insufficient = matches!(err, DecodingError::InsufficientSymbols { .. });
1145 crate::assert_with_log!(insufficient, "insufficient symbols", true, insufficient);
1146 crate::test_complete!("into_data_reports_insufficient_symbols");
1147 }
1148
1149 #[test]
1152 fn decoding_error_display_authentication_failed() {
1153 let err = DecodingError::AuthenticationFailed {
1154 symbol_id: SymbolId::new(ObjectId::new_for_test(1), 0, 0),
1155 };
1156 let msg = err.to_string();
1157 assert!(msg.contains("authentication failed"), "{msg}");
1158 }
1159
1160 #[test]
1161 fn decoding_error_display_insufficient_symbols() {
1162 let err = DecodingError::InsufficientSymbols {
1163 received: 3,
1164 needed: 10,
1165 };
1166 assert_eq!(err.to_string(), "insufficient symbols: have 3, need 10");
1167 }
1168
1169 #[test]
1170 fn decoding_error_display_matrix_inversion() {
1171 let err = DecodingError::MatrixInversionFailed {
1172 reason: "rank deficient".into(),
1173 };
1174 assert_eq!(err.to_string(), "matrix inversion failed: rank deficient");
1175 }
1176
1177 #[test]
1178 fn decoding_error_display_block_timeout() {
1179 let err = DecodingError::BlockTimeout {
1180 sbn: 2,
1181 elapsed: Duration::from_millis(1500),
1182 };
1183 let msg = err.to_string();
1184 assert!(msg.contains("block timeout"), "{msg}");
1185 assert!(msg.contains("1.5"), "{msg}");
1186 }
1187
1188 #[test]
1189 fn decoding_error_display_inconsistent_metadata() {
1190 let err = DecodingError::InconsistentMetadata {
1191 sbn: 0,
1192 details: "mismatch".into(),
1193 };
1194 let msg = err.to_string();
1195 assert!(msg.contains("inconsistent block metadata"), "{msg}");
1196 assert!(msg.contains("mismatch"), "{msg}");
1197 }
1198
1199 #[test]
1200 fn decoding_error_display_symbol_size_mismatch() {
1201 let err = DecodingError::SymbolSizeMismatch {
1202 expected: 256,
1203 actual: 128,
1204 };
1205 assert_eq!(
1206 err.to_string(),
1207 "symbol size mismatch: expected 256, got 128"
1208 );
1209 }
1210
1211 #[test]
1214 fn decoding_error_into_error_auth() {
1215 let err = DecodingError::AuthenticationFailed {
1216 symbol_id: SymbolId::new(ObjectId::new_for_test(1), 0, 0),
1217 };
1218 let error: crate::error::Error = err.into();
1219 assert_eq!(error.kind(), crate::error::ErrorKind::CorruptedSymbol);
1220 }
1221
1222 #[test]
1223 fn decoding_error_into_error_insufficient() {
1224 let err = DecodingError::InsufficientSymbols {
1225 received: 1,
1226 needed: 5,
1227 };
1228 let error: crate::error::Error = err.into();
1229 assert_eq!(error.kind(), crate::error::ErrorKind::InsufficientSymbols);
1230 }
1231
1232 #[test]
1233 fn decoding_error_into_error_matrix() {
1234 let err = DecodingError::MatrixInversionFailed {
1235 reason: "singular".into(),
1236 };
1237 let error: crate::error::Error = err.into();
1238 assert_eq!(error.kind(), crate::error::ErrorKind::DecodingFailed);
1239 }
1240
1241 #[test]
1242 fn decoding_error_into_error_timeout() {
1243 let err = DecodingError::BlockTimeout {
1244 sbn: 0,
1245 elapsed: Duration::from_secs(30),
1246 };
1247 let error: crate::error::Error = err.into();
1248 assert_eq!(error.kind(), crate::error::ErrorKind::ThresholdTimeout);
1249 }
1250
1251 #[test]
1252 fn decoding_error_into_error_inconsistent() {
1253 let err = DecodingError::InconsistentMetadata {
1254 sbn: 1,
1255 details: "x".into(),
1256 };
1257 let error: crate::error::Error = err.into();
1258 assert_eq!(error.kind(), crate::error::ErrorKind::DecodingFailed);
1259 }
1260
1261 #[test]
1262 fn decoding_error_into_error_size_mismatch() {
1263 let err = DecodingError::SymbolSizeMismatch {
1264 expected: 256,
1265 actual: 64,
1266 };
1267 let error: crate::error::Error = err.into();
1268 assert_eq!(error.kind(), crate::error::ErrorKind::DecodingFailed);
1269 }
1270
1271 #[test]
1274 fn reject_reason_variants_are_eq() {
1275 assert_eq!(RejectReason::WrongObjectId, RejectReason::WrongObjectId);
1276 assert_ne!(
1277 RejectReason::AuthenticationFailed,
1278 RejectReason::SymbolSizeMismatch
1279 );
1280 }
1281
1282 #[test]
1283 fn reject_reason_debug() {
1284 let dbg = format!("{:?}", RejectReason::BlockAlreadyDecoded);
1285 assert_eq!(dbg, "BlockAlreadyDecoded");
1286 }
1287
1288 #[test]
1291 fn symbol_accept_result_accepted_eq() {
1292 let a = SymbolAcceptResult::Accepted {
1293 received: 3,
1294 needed: 5,
1295 };
1296 let b = SymbolAcceptResult::Accepted {
1297 received: 3,
1298 needed: 5,
1299 };
1300 assert_eq!(a, b);
1301 }
1302
1303 #[test]
1304 fn symbol_accept_result_duplicate_eq() {
1305 assert_eq!(SymbolAcceptResult::Duplicate, SymbolAcceptResult::Duplicate);
1306 }
1307
1308 #[test]
1309 fn symbol_accept_result_rejected_eq() {
1310 let a = SymbolAcceptResult::Rejected(RejectReason::MemoryLimitReached);
1311 let b = SymbolAcceptResult::Rejected(RejectReason::MemoryLimitReached);
1312 assert_eq!(a, b);
1313 }
1314
1315 #[test]
1316 fn symbol_accept_result_variants_ne() {
1317 assert_ne!(
1318 SymbolAcceptResult::Duplicate,
1319 SymbolAcceptResult::Rejected(RejectReason::WrongObjectId)
1320 );
1321 }
1322
1323 #[test]
1326 fn decoding_config_default_values() {
1327 let cfg = DecodingConfig::default();
1328 assert_eq!(cfg.symbol_size, 256);
1329 assert_eq!(cfg.max_block_size, 1024 * 1024);
1330 assert!((cfg.repair_overhead - 1.05).abs() < f64::EPSILON);
1331 assert_eq!(cfg.min_overhead, 0);
1332 assert_eq!(cfg.max_buffered_symbols, 0);
1333 assert_eq!(cfg.block_timeout, Duration::from_secs(30));
1334 assert!(!cfg.verify_auth);
1335 }
1336
1337 #[test]
1338 fn required_symbols_uses_total_factor_and_minimum_extra_floor() {
1339 assert_eq!(required_symbols(0, 1.05, 3), 0);
1340 assert_eq!(required_symbols(10, 1.05, 3), 13);
1341 assert_eq!(required_symbols(10, 1.5, 1), 15);
1342 assert_eq!(required_symbols(10, 0.5, 0), 10);
1343 assert_eq!(required_symbols(10, f64::NAN, 3), 13);
1344 assert_eq!(required_symbols(10, f64::INFINITY, 3), usize::MAX);
1345 }
1346
1347 #[test]
1350 fn block_state_kind_eq_and_debug() {
1351 assert_eq!(BlockStateKind::Collecting, BlockStateKind::Collecting);
1352 assert_ne!(BlockStateKind::Collecting, BlockStateKind::Decoded);
1353 assert_eq!(format!("{:?}", BlockStateKind::Failed), "Failed");
1354 assert_eq!(format!("{:?}", BlockStateKind::Decoding), "Decoding");
1355 }
1356
1357 #[test]
1360 fn pipeline_new_starts_empty() {
1361 let pipeline = DecodingPipeline::new(DecodingConfig::default());
1362 let progress = pipeline.progress();
1363 assert_eq!(progress.blocks_complete, 0);
1364 assert_eq!(progress.symbols_received, 0);
1365 }
1366
1367 #[test]
1368 fn pipeline_set_object_params_rejects_mismatched_symbol_size() {
1369 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1370 symbol_size: 256,
1371 ..DecodingConfig::default()
1372 });
1373 let params = ObjectParams::new(ObjectId::new_for_test(1), 1024, 128, 1, 8);
1374 let err = pipeline.set_object_params(params).unwrap_err();
1375 assert!(matches!(err, DecodingError::SymbolSizeMismatch { .. }));
1376 }
1377
1378 #[test]
1379 fn pipeline_set_object_params_rejects_inconsistent_object_id() {
1380 let config = encoding_config();
1381 let oid1 = ObjectId::new_for_test(1);
1382 let oid2 = ObjectId::new_for_test(2);
1383
1384 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1385 symbol_size: config.symbol_size,
1386 ..DecodingConfig::default()
1387 });
1388 pipeline
1389 .set_object_params(ObjectParams::new(oid1, 512, config.symbol_size, 1, 2))
1390 .expect("first set_object_params");
1391 let err = pipeline
1392 .set_object_params(ObjectParams::new(oid2, 512, config.symbol_size, 1, 2))
1393 .unwrap_err();
1394 assert!(matches!(err, DecodingError::InconsistentMetadata { .. }));
1395 }
1396
1397 #[test]
1398 fn pipeline_set_object_params_same_id_is_ok() {
1399 let config = encoding_config();
1400 let oid = ObjectId::new_for_test(1);
1401
1402 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1403 symbol_size: config.symbol_size,
1404 ..DecodingConfig::default()
1405 });
1406 pipeline
1407 .set_object_params(ObjectParams::new(oid, 512, config.symbol_size, 1, 2))
1408 .expect("first");
1409 pipeline
1410 .set_object_params(ObjectParams::new(oid, 512, config.symbol_size, 1, 2))
1411 .expect("second with same id should succeed");
1412 }
1413
1414 #[test]
1415 fn pipeline_set_object_params_rejects_declared_block_count_drift() {
1416 let config = encoding_config();
1417 let object_id = ObjectId::new_for_test(104);
1418
1419 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1420 symbol_size: config.symbol_size,
1421 max_block_size: config.max_block_size,
1422 ..DecodingConfig::default()
1423 });
1424 let err = pipeline
1425 .set_object_params(ObjectParams::new(object_id, 1536, config.symbol_size, 1, 4))
1426 .unwrap_err();
1427 assert!(matches!(err, DecodingError::InconsistentMetadata { .. }));
1428 assert!(
1429 err.to_string().contains("block count mismatch"),
1430 "unexpected error: {err}"
1431 );
1432 }
1433
1434 #[test]
1435 fn pipeline_set_object_params_rejects_total_k_metadata_for_multi_block_object() {
1436 let config = encoding_config();
1437 let object_id = ObjectId::new_for_test(105);
1438
1439 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1440 symbol_size: config.symbol_size,
1441 max_block_size: config.max_block_size,
1442 ..DecodingConfig::default()
1443 });
1444 let err = pipeline
1445 .set_object_params(ObjectParams::new(object_id, 2048, config.symbol_size, 2, 8))
1446 .unwrap_err();
1447 assert!(matches!(err, DecodingError::InconsistentMetadata { .. }));
1448 assert!(
1449 err.to_string().contains("symbols_per_block mismatch"),
1450 "unexpected error: {err}"
1451 );
1452 }
1453
1454 #[test]
1455 fn pipeline_set_object_params_failure_does_not_latch_object_identity() {
1456 let config = encoding_config();
1457 let invalid_object_id = ObjectId::new_for_test(106);
1458 let valid_object_id = ObjectId::new_for_test(107);
1459
1460 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1461 symbol_size: config.symbol_size,
1462 max_block_size: config.max_block_size,
1463 ..DecodingConfig::default()
1464 });
1465 let err = pipeline
1466 .set_object_params(ObjectParams::new(
1467 invalid_object_id,
1468 2048,
1469 config.symbol_size,
1470 2,
1471 8,
1472 ))
1473 .unwrap_err();
1474 assert!(matches!(err, DecodingError::InconsistentMetadata { .. }));
1475
1476 pipeline
1477 .set_object_params(ObjectParams::new(
1478 valid_object_id,
1479 512,
1480 config.symbol_size,
1481 1,
1482 2,
1483 ))
1484 .expect("failed set_object_params must not poison object identity");
1485 }
1486
1487 #[test]
1488 fn pipeline_set_object_params_accepts_empty_object_single_block_sentinel_metadata() {
1489 let config = encoding_config();
1490 let object_id = ObjectId::new_for_test(108);
1491
1492 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1493 symbol_size: config.symbol_size,
1494 max_block_size: config.max_block_size,
1495 ..DecodingConfig::default()
1496 });
1497 pipeline
1498 .set_object_params(ObjectParams::new(
1499 object_id,
1500 0,
1501 config.symbol_size,
1502 1,
1503 config
1504 .max_block_size
1505 .div_ceil(usize::from(config.symbol_size))
1506 .try_into()
1507 .expect("sentinel block K should fit in u16"),
1508 ))
1509 .expect("empty object sentinel metadata should be accepted");
1510
1511 assert!(pipeline.is_complete());
1512 assert_eq!(pipeline.progress().blocks_total, Some(0));
1513 assert_eq!(
1514 pipeline.into_data().expect("empty object should decode"),
1515 Vec::<u8>::new()
1516 );
1517 }
1518
1519 #[test]
1520 fn pipeline_set_object_params_accepts_full_256_block_boundary() {
1521 let config = crate::config::EncodingConfig {
1522 symbol_size: 8,
1523 max_block_size: 8,
1524 ..encoding_config()
1525 };
1526 let object_id = ObjectId::new_for_test(109);
1527
1528 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1529 symbol_size: config.symbol_size,
1530 max_block_size: config.max_block_size,
1531 ..DecodingConfig::default()
1532 });
1533 pipeline
1534 .set_object_params(ObjectParams::new(
1535 object_id,
1536 u64::try_from(config.max_block_size * 256).expect("boundary object size fits u64"),
1537 config.symbol_size,
1538 256,
1539 1,
1540 ))
1541 .expect("256-block metadata boundary should be representable");
1542
1543 assert_eq!(pipeline.progress().blocks_total, Some(256));
1544 }
1545
1546 #[test]
1549 fn feed_batch_returns_results_per_symbol() {
1550 init_test("feed_batch_returns_results_per_symbol");
1551 let config = encoding_config();
1552 let mut encoder = EncodingPipeline::new(config.clone(), pool());
1553 let object_id = ObjectId::new_for_test(100);
1554 let data = vec![0xAAu8; 768]; let symbols: Vec<AuthenticatedSymbol> = encoder
1557 .encode_with_repair(object_id, &data, 0)
1558 .map(|res| {
1559 AuthenticatedSymbol::from_parts(
1560 res.unwrap().into_symbol(),
1561 crate::security::tag::AuthenticationTag::zero(),
1562 )
1563 })
1564 .take(3)
1565 .collect();
1566
1567 let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.5, 1);
1568
1569 let results = decoder.feed_batch(symbols.into_iter());
1570 let len = results.len();
1571 let expected_len = 3usize;
1572 crate::assert_with_log!(len == expected_len, "batch length", expected_len, len);
1573 for (i, r) in results.iter().enumerate() {
1574 let is_ok = r.is_ok();
1575 crate::assert_with_log!(is_ok, &format!("result[{i}] is Ok"), true, is_ok);
1576 }
1577 crate::test_complete!("feed_batch_returns_results_per_symbol");
1578 }
1579
1580 #[test]
1581 fn is_complete_false_without_params() {
1582 init_test("is_complete_false_without_params");
1583 let pipeline = DecodingPipeline::new(DecodingConfig::default());
1584 let complete = pipeline.is_complete();
1585 crate::assert_with_log!(!complete, "is_complete without params", false, complete);
1586 crate::test_complete!("is_complete_false_without_params");
1587 }
1588
1589 #[test]
1590 fn is_complete_true_after_all_blocks_decoded() {
1591 init_test("is_complete_true_after_all_blocks_decoded");
1592 let config = encoding_config();
1593 let mut encoder = EncodingPipeline::new(config.clone(), pool());
1594 let object_id = ObjectId::new_for_test(101);
1595 let data = vec![42u8; 512];
1596 let symbols: Vec<Symbol> = encoder
1597 .encode_with_repair(object_id, &data, 0)
1598 .map(|res| res.unwrap().into_symbol())
1599 .collect();
1600
1601 let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.0, 0);
1602
1603 for symbol in symbols {
1604 let auth = AuthenticatedSymbol::from_parts(
1605 symbol,
1606 crate::security::tag::AuthenticationTag::zero(),
1607 );
1608 let _ = decoder.feed(auth).unwrap();
1609 }
1610
1611 let complete = decoder.is_complete();
1612 crate::assert_with_log!(complete, "is_complete after all blocks", true, complete);
1613 crate::test_complete!("is_complete_true_after_all_blocks_decoded");
1614 }
1615
1616 #[test]
1617 fn progress_reports_blocks_total_after_params() {
1618 init_test("progress_reports_blocks_total_after_params");
1619 let config = encoding_config();
1620 let object_id = ObjectId::new_for_test(102);
1621
1622 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1623 symbol_size: config.symbol_size,
1624 max_block_size: 1024,
1625 ..DecodingConfig::default()
1626 });
1627 let k = (512usize).div_ceil(usize::from(config.symbol_size)) as u16;
1629 pipeline
1630 .set_object_params(ObjectParams::new(object_id, 512, config.symbol_size, 1, k))
1631 .expect("set params");
1632
1633 let progress = pipeline.progress();
1634 let blocks_total = progress.blocks_total;
1635 let expected_blocks = Some(1usize);
1636 crate::assert_with_log!(
1637 blocks_total == expected_blocks,
1638 "blocks_total",
1639 expected_blocks,
1640 blocks_total
1641 );
1642 let estimate = progress.symbols_needed_estimate;
1643 let positive = estimate > 0;
1644 crate::assert_with_log!(positive, "symbols_needed_estimate > 0", true, positive);
1645 crate::test_complete!("progress_reports_blocks_total_after_params");
1646 }
1647
1648 #[test]
1649 fn progress_symbols_needed_estimate_does_not_double_count_min_overhead() {
1650 init_test("progress_symbols_needed_estimate_does_not_double_count_min_overhead");
1651 let object_id = ObjectId::new_for_test(1020);
1652 let symbol_size = 256u16;
1653 let k = 10u16;
1654 let data_len = usize::from(symbol_size) * usize::from(k);
1655
1656 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1657 symbol_size,
1658 max_block_size: 4096,
1659 repair_overhead: 1.05,
1660 min_overhead: 3,
1661 max_buffered_symbols: 0,
1662 block_timeout: Duration::from_secs(30),
1663 verify_auth: false,
1664 });
1665 pipeline
1666 .set_object_params(ObjectParams::new(
1667 object_id,
1668 data_len as u64,
1669 symbol_size,
1670 1,
1671 k,
1672 ))
1673 .expect("set params");
1674
1675 let progress = pipeline.progress();
1676 assert_eq!(progress.blocks_total, Some(1));
1677 assert_eq!(progress.symbols_needed_estimate, 13);
1678 crate::test_complete!(
1679 "progress_symbols_needed_estimate_does_not_double_count_min_overhead"
1680 );
1681 }
1682
1683 #[test]
1684 fn progress_symbols_needed_estimate_saturates_for_infinite_overhead() {
1685 init_test("progress_symbols_needed_estimate_saturates_for_infinite_overhead");
1686 let object_id = ObjectId::new_for_test(1021);
1687 let symbol_size = 256u16;
1688 let data_len = 2048usize;
1689
1690 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1691 symbol_size,
1692 max_block_size: 1024,
1693 repair_overhead: f64::INFINITY,
1694 min_overhead: 0,
1695 max_buffered_symbols: 0,
1696 block_timeout: Duration::from_secs(30),
1697 verify_auth: false,
1698 });
1699 pipeline
1700 .set_object_params(ObjectParams::new(
1701 object_id,
1702 data_len as u64,
1703 symbol_size,
1704 2,
1705 4,
1706 ))
1707 .expect("set params");
1708
1709 let progress = pipeline.progress();
1710 assert_eq!(progress.blocks_total, Some(2));
1711 assert_eq!(progress.symbols_needed_estimate, usize::MAX);
1712 crate::test_complete!("progress_symbols_needed_estimate_saturates_for_infinite_overhead");
1713 }
1714
1715 #[test]
1716 fn block_status_none_for_unknown_block() {
1717 init_test("block_status_none_for_unknown_block");
1718 let config = encoding_config();
1719 let object_id = ObjectId::new_for_test(103);
1720
1721 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1722 symbol_size: config.symbol_size,
1723 max_block_size: config.max_block_size,
1724 ..DecodingConfig::default()
1725 });
1726 let k = (512usize).div_ceil(usize::from(config.symbol_size)) as u16;
1727 pipeline
1728 .set_object_params(ObjectParams::new(object_id, 512, config.symbol_size, 1, k))
1729 .expect("set params");
1730
1731 let status = pipeline.block_status(99);
1732 let is_none = status.is_none();
1733 crate::assert_with_log!(is_none, "block_status(99) is None", true, is_none);
1734 crate::test_complete!("block_status_none_for_unknown_block");
1735 }
1736
1737 #[test]
1738 fn block_status_collecting_after_partial_feed() {
1739 init_test("block_status_collecting_after_partial_feed");
1740 let config = encoding_config();
1741 let mut encoder = EncodingPipeline::new(config.clone(), pool());
1742 let object_id = ObjectId::new_for_test(104);
1743 let data = vec![0xBBu8; 512];
1744
1745 let first_symbol = encoder
1746 .encode_with_repair(object_id, &data, 0)
1747 .next()
1748 .expect("symbol")
1749 .expect("encode")
1750 .into_symbol();
1751
1752 let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.5, 1);
1754
1755 let auth = AuthenticatedSymbol::from_parts(
1756 first_symbol,
1757 crate::security::tag::AuthenticationTag::zero(),
1758 );
1759 let _ = decoder.feed(auth).expect("feed");
1760
1761 let status = decoder.block_status(0);
1762 let is_some = status.is_some();
1763 crate::assert_with_log!(is_some, "block_status(0) is Some", true, is_some);
1764
1765 let status = status.unwrap();
1766 let state = status.state;
1767 let expected_state = BlockStateKind::Collecting;
1768 crate::assert_with_log!(
1769 state == expected_state,
1770 "state is Collecting",
1771 expected_state,
1772 state
1773 );
1774 let received = status.symbols_received;
1775 let expected_received = 1usize;
1776 crate::assert_with_log!(
1777 received == expected_received,
1778 "symbols_received",
1779 expected_received,
1780 received
1781 );
1782 crate::test_complete!("block_status_collecting_after_partial_feed");
1783 }
1784
1785 #[test]
1786 fn block_status_decoded_after_complete() {
1787 init_test("block_status_decoded_after_complete");
1788 let config = encoding_config();
1789 let mut encoder = EncodingPipeline::new(config.clone(), pool());
1790 let object_id = ObjectId::new_for_test(105);
1791 let data = vec![42u8; 512];
1792 let symbols: Vec<Symbol> = encoder
1793 .encode_with_repair(object_id, &data, 0)
1794 .map(|res| res.unwrap().into_symbol())
1795 .collect();
1796
1797 let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.0, 0);
1798
1799 for symbol in symbols {
1800 let auth = AuthenticatedSymbol::from_parts(
1801 symbol,
1802 crate::security::tag::AuthenticationTag::zero(),
1803 );
1804 let _ = decoder.feed(auth).unwrap();
1805 }
1806
1807 let _status = decoder.block_status(0);
1811 let complete = decoder.is_complete();
1812 crate::assert_with_log!(complete, "is_complete", true, complete);
1813
1814 let extra = Symbol::new(
1817 SymbolId::new(object_id, 0, 99),
1818 vec![0u8; usize::from(config.symbol_size)],
1819 SymbolKind::Source,
1820 );
1821 let auth =
1822 AuthenticatedSymbol::from_parts(extra, crate::security::tag::AuthenticationTag::zero());
1823 let result = decoder.feed(auth).expect("feed");
1824 let expected = SymbolAcceptResult::Rejected(RejectReason::BlockAlreadyDecoded);
1825 let ok = result == expected;
1826 crate::assert_with_log!(ok, "block already decoded", expected, result);
1827 crate::test_complete!("block_status_decoded_after_complete");
1828 }
1829
1830 #[test]
1831 fn block_already_decoded_reject() {
1832 init_test("block_already_decoded_reject");
1833 let config = encoding_config();
1834 let mut encoder = EncodingPipeline::new(config.clone(), pool());
1835 let object_id = ObjectId::new_for_test(106);
1836 let data = vec![42u8; 512];
1837 let symbols: Vec<Symbol> = encoder
1838 .encode_with_repair(object_id, &data, 0)
1839 .map(|res| res.unwrap().into_symbol())
1840 .collect();
1841
1842 let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.0, 0);
1843
1844 for symbol in symbols {
1845 let auth = AuthenticatedSymbol::from_parts(
1846 symbol,
1847 crate::security::tag::AuthenticationTag::zero(),
1848 );
1849 let _ = decoder.feed(auth).unwrap();
1850 }
1851
1852 let extra = Symbol::new(
1854 SymbolId::new(object_id, 0, 0),
1855 vec![0u8; usize::from(config.symbol_size)],
1856 SymbolKind::Source,
1857 );
1858 let auth =
1859 AuthenticatedSymbol::from_parts(extra, crate::security::tag::AuthenticationTag::zero());
1860 let result = decoder.feed(auth).expect("feed");
1861 let expected = SymbolAcceptResult::Rejected(RejectReason::BlockAlreadyDecoded);
1862 let ok = result == expected;
1863 crate::assert_with_log!(ok, "block already decoded reject", expected, result);
1864 crate::test_complete!("block_already_decoded_reject");
1865 }
1866
1867 #[test]
1868 fn verify_auth_no_context_unverified_symbol_errors() {
1869 init_test("verify_auth_no_context_unverified_symbol_errors");
1870 let config = encoding_config();
1871 let mut decoder = DecodingPipeline::new(DecodingConfig {
1872 symbol_size: config.symbol_size,
1873 max_block_size: config.max_block_size,
1874 verify_auth: true,
1875 ..DecodingConfig::default()
1876 });
1877
1878 let symbol = Symbol::new(
1879 SymbolId::new(ObjectId::new_for_test(107), 0, 0),
1880 vec![0u8; usize::from(config.symbol_size)],
1881 SymbolKind::Source,
1882 );
1883 let auth = AuthenticatedSymbol::from_parts(
1885 symbol,
1886 crate::security::tag::AuthenticationTag::zero(),
1887 );
1888
1889 let result = decoder.feed(auth);
1890 let is_ok = result.is_ok();
1891 crate::assert_with_log!(
1892 is_ok,
1893 "unverified with no context is rejected safely",
1894 true,
1895 is_ok
1896 );
1897
1898 let accept = result.unwrap();
1899 let expected = SymbolAcceptResult::Rejected(RejectReason::AuthenticationFailed);
1900 crate::assert_with_log!(
1901 accept == expected,
1902 "rejected as auth failed",
1903 expected,
1904 accept
1905 );
1906 crate::test_complete!("verify_auth_no_context_unverified_symbol_errors");
1907 }
1908
1909 #[test]
1910 fn verify_auth_no_context_preverified_symbol_ok() {
1911 init_test("verify_auth_no_context_preverified_symbol_ok");
1912 let config = encoding_config();
1913 let mut decoder = DecodingPipeline::new(DecodingConfig {
1914 symbol_size: config.symbol_size,
1915 max_block_size: config.max_block_size,
1916 verify_auth: true,
1917 ..DecodingConfig::default()
1918 });
1919
1920 let symbol = Symbol::new(
1921 SymbolId::new(ObjectId::new_for_test(108), 0, 0),
1922 vec![0u8; usize::from(config.symbol_size)],
1923 SymbolKind::Source,
1924 );
1925 let auth = AuthenticatedSymbol::new_verified(
1927 symbol,
1928 crate::security::tag::AuthenticationTag::zero(),
1929 );
1930
1931 let result = decoder.feed(auth);
1932 let is_ok = result.is_ok();
1933 crate::assert_with_log!(is_ok, "preverified symbol accepted", true, is_ok);
1934 let accept = result.unwrap();
1935 let is_accepted = matches!(accept, SymbolAcceptResult::Accepted { .. });
1936 crate::assert_with_log!(is_accepted, "result is Accepted variant", true, is_accepted);
1937 crate::test_complete!("verify_auth_no_context_preverified_symbol_ok");
1938 }
1939
1940 #[test]
1941 fn with_auth_rejects_bad_tag() {
1942 init_test("with_auth_rejects_bad_tag");
1943 let config = encoding_config();
1944 let mut decoder = DecodingPipeline::with_auth(
1945 DecodingConfig {
1946 symbol_size: config.symbol_size,
1947 max_block_size: config.max_block_size,
1948 verify_auth: true,
1949 ..DecodingConfig::default()
1950 },
1951 crate::security::SecurityContext::for_testing(42),
1952 );
1953
1954 let symbol = Symbol::new(
1955 SymbolId::new(ObjectId::new_for_test(109), 0, 0),
1956 vec![0u8; usize::from(config.symbol_size)],
1957 SymbolKind::Source,
1958 );
1959 let auth = AuthenticatedSymbol::from_parts(
1961 symbol,
1962 crate::security::tag::AuthenticationTag::zero(),
1963 );
1964
1965 let result = decoder.feed(auth).expect("feed should not return Err");
1966 let expected = SymbolAcceptResult::Rejected(RejectReason::AuthenticationFailed);
1967 let ok = result == expected;
1968 crate::assert_with_log!(ok, "bad tag rejected", expected, result);
1969 crate::test_complete!("with_auth_rejects_bad_tag");
1970 }
1971
1972 #[test]
1973 fn multi_block_roundtrip() {
1974 init_test("multi_block_roundtrip");
1975 let config = crate::config::EncodingConfig {
1976 symbol_size: 256,
1977 max_block_size: 1024,
1978 repair_overhead: 1.05,
1979 encoding_parallelism: 1,
1980 decoding_parallelism: 1,
1981 };
1982 let mut encoder = EncodingPipeline::new(config.clone(), pool());
1983 let object_id = ObjectId::new_for_test(110);
1984 let data: Vec<u8> = (0u32..2048).map(|i| (i % 251) as u8).collect();
1985
1986 let symbols: Vec<Symbol> = encoder
1987 .encode_with_repair(object_id, &data, 0)
1988 .map(|res| res.unwrap().into_symbol())
1989 .collect();
1990
1991 let mut decoder = DecodingPipeline::new(DecodingConfig {
1992 symbol_size: config.symbol_size,
1993 max_block_size: config.max_block_size,
1994 repair_overhead: 1.0,
1995 min_overhead: 0,
1996 max_buffered_symbols: 0,
1997 block_timeout: Duration::from_secs(30),
1998 verify_auth: false,
1999 });
2000
2001 let symbol_size = usize::from(config.symbol_size);
2003 let num_blocks = data.len().div_ceil(config.max_block_size);
2004 let mut full_block_k: u16 = 0;
2005 for b in 0..num_blocks {
2006 let block_start = b * config.max_block_size;
2007 let block_len = usize::min(config.max_block_size, data.len() - block_start);
2008 let k = block_len.div_ceil(symbol_size) as u16;
2009 full_block_k = full_block_k.max(k);
2010 }
2011 decoder
2012 .set_object_params(ObjectParams::new(
2013 object_id,
2014 data.len() as u64,
2015 config.symbol_size,
2016 num_blocks as u16,
2017 full_block_k,
2018 ))
2019 .expect("set params");
2020
2021 for symbol in symbols {
2022 let auth = AuthenticatedSymbol::from_parts(
2023 symbol,
2024 crate::security::tag::AuthenticationTag::zero(),
2025 );
2026 let _ = decoder.feed(auth).unwrap();
2027 }
2028
2029 let complete = decoder.is_complete();
2030 crate::assert_with_log!(complete, "multi-block is_complete", true, complete);
2031
2032 let decoded_data = decoder.into_data().expect("decoded");
2033 let ok = decoded_data == data;
2034 crate::assert_with_log!(
2035 ok,
2036 "multi-block roundtrip data",
2037 data.len(),
2038 decoded_data.len()
2039 );
2040 crate::test_complete!("multi_block_roundtrip");
2041 }
2042
2043 #[test]
2044 fn multi_block_progress_retains_cumulative_symbols_after_block_completion() {
2045 init_test("multi_block_progress_retains_cumulative_symbols_after_block_completion");
2046 let config = crate::config::EncodingConfig {
2047 symbol_size: 256,
2048 max_block_size: 1024,
2049 repair_overhead: 1.05,
2050 encoding_parallelism: 1,
2051 decoding_parallelism: 1,
2052 };
2053 let mut encoder = EncodingPipeline::new(config.clone(), pool());
2054 let object_id = ObjectId::new_for_test(111);
2055 let data: Vec<u8> = (0u32..2048).map(|i| (i % 251) as u8).collect();
2056
2057 let mut block_zero_symbols: Vec<Symbol> = encoder
2058 .encode_with_repair(object_id, &data, 0)
2059 .map(|res| res.expect("encode").into_symbol())
2060 .filter(|symbol| symbol.sbn() == 0)
2061 .collect();
2062 block_zero_symbols.sort_by_key(Symbol::esi);
2063 assert_eq!(block_zero_symbols.len(), 4);
2064
2065 let mut decoder = DecodingPipeline::new(DecodingConfig {
2066 symbol_size: config.symbol_size,
2067 max_block_size: config.max_block_size,
2068 repair_overhead: 1.0,
2069 min_overhead: 0,
2070 max_buffered_symbols: 0,
2071 block_timeout: Duration::from_secs(30),
2072 verify_auth: false,
2073 });
2074 decoder
2075 .set_object_params(ObjectParams::new(
2076 object_id,
2077 data.len() as u64,
2078 config.symbol_size,
2079 2,
2080 4,
2081 ))
2082 .expect("set params");
2083
2084 for symbol in block_zero_symbols {
2085 let auth = AuthenticatedSymbol::from_parts(
2086 symbol,
2087 crate::security::tag::AuthenticationTag::zero(),
2088 );
2089 let _ = decoder.feed(auth).expect("feed");
2090 }
2091
2092 assert_eq!(decoder.progress().blocks_complete, 1);
2093 assert_eq!(decoder.progress().blocks_total, Some(2));
2094 assert_eq!(decoder.progress().symbols_received, 4);
2095 assert_eq!(decoder.progress().symbols_needed_estimate, 8);
2096
2097 let err = decoder.into_data().expect_err("block one is still missing");
2098 assert!(matches!(
2099 err,
2100 DecodingError::InsufficientSymbols {
2101 received: 4,
2102 needed: 8
2103 }
2104 ));
2105 crate::test_complete!(
2106 "multi_block_progress_retains_cumulative_symbols_after_block_completion"
2107 );
2108 }
2109
2110 #[test]
2111 fn into_data_no_params_errors() {
2112 init_test("into_data_no_params_errors");
2113 let pipeline = DecodingPipeline::new(DecodingConfig::default());
2114 let result = pipeline.into_data();
2115 let is_err = result.is_err();
2116 crate::assert_with_log!(is_err, "into_data without params errors", true, is_err);
2117 let err = result.unwrap_err();
2118 let msg = err.to_string();
2119 let contains = msg.contains("object parameters not set");
2120 crate::assert_with_log!(
2121 contains,
2122 "error message contains expected text",
2123 true,
2124 contains
2125 );
2126 crate::test_complete!("into_data_no_params_errors");
2127 }
2128
2129 #[test]
2132 fn reject_reason_debug_clone_copy_eq() {
2133 let r = RejectReason::WrongObjectId;
2134 let r2 = r; let r3 = r;
2136 assert_eq!(r, r2);
2137 assert_eq!(r, r3);
2138 assert_ne!(r, RejectReason::AuthenticationFailed);
2139 assert_ne!(r, RejectReason::SymbolSizeMismatch);
2140 assert_ne!(r, RejectReason::BlockAlreadyDecoded);
2141 assert_ne!(r, RejectReason::InsufficientRank);
2142 assert_ne!(r, RejectReason::InconsistentEquations);
2143 assert_ne!(r, RejectReason::InvalidMetadata);
2144 assert_ne!(r, RejectReason::MemoryLimitReached);
2145 let dbg = format!("{r:?}");
2146 assert!(dbg.contains("WrongObjectId"));
2147 }
2148
2149 #[test]
2150 fn symbol_accept_result_debug_clone_eq() {
2151 let a = SymbolAcceptResult::Accepted {
2152 received: 3,
2153 needed: 5,
2154 };
2155 let a2 = a.clone();
2156 assert_eq!(a, a2);
2157 assert_ne!(a, SymbolAcceptResult::Duplicate);
2158 let r = SymbolAcceptResult::Rejected(RejectReason::InvalidMetadata);
2159 let r2 = r.clone();
2160 assert_eq!(r, r2);
2161 let dbg = format!("{a:?}");
2162 assert!(dbg.contains("Accepted"));
2163 }
2164
2165 #[test]
2166 fn block_state_kind_debug_clone_copy_eq() {
2167 let s = BlockStateKind::Collecting;
2168 let s2 = s; let s3 = s;
2170 assert_eq!(s, s2);
2171 assert_eq!(s, s3);
2172 assert_ne!(s, BlockStateKind::Decoding);
2173 assert_ne!(s, BlockStateKind::Decoded);
2174 assert_ne!(s, BlockStateKind::Failed);
2175 let dbg = format!("{s:?}");
2176 assert!(dbg.contains("Collecting"));
2177 }
2178}