1use crate::error::{Error, ErrorKind};
10use crate::raptorq::decoder::{
11 DecodeError as RaptorDecodeError, InactivationDecoder, ReceivedSymbol,
12};
13use crate::raptorq::gf256::Gf256;
14use crate::raptorq::systematic::SystematicParams;
15use crate::security::{AuthenticatedSymbol, SecurityContext};
16use crate::types::symbol_set::{InsertResult, SymbolSet, ThresholdConfig};
17use crate::types::{ObjectId, ObjectParams, Symbol, SymbolId, SymbolKind};
18use std::collections::{HashMap, HashSet};
19use std::time::Duration;
20
21#[derive(Debug, thiserror::Error)]
23pub enum DecodingError {
24 #[error("authentication failed for symbol {symbol_id}")]
26 AuthenticationFailed {
27 symbol_id: SymbolId,
29 },
30 #[error("insufficient symbols: have {received}, need {needed}")]
32 InsufficientSymbols {
33 received: usize,
35 needed: usize,
37 },
38 #[error("matrix inversion failed: {reason}")]
40 MatrixInversionFailed {
41 reason: String,
43 },
44 #[error("block timeout after {elapsed:?}")]
46 BlockTimeout {
47 sbn: u8,
49 elapsed: Duration,
51 },
52 #[error("inconsistent block metadata: {sbn} {details}")]
54 InconsistentMetadata {
55 sbn: u8,
57 details: String,
59 },
60 #[error("symbol size mismatch: expected {expected}, got {actual}")]
62 SymbolSizeMismatch {
63 expected: u16,
65 actual: usize,
67 },
68}
69
70impl From<DecodingError> for Error {
71 fn from(err: DecodingError) -> Self {
72 match &err {
73 DecodingError::AuthenticationFailed { .. } => Self::new(ErrorKind::CorruptedSymbol),
74 DecodingError::InsufficientSymbols { .. } => Self::new(ErrorKind::InsufficientSymbols),
75 DecodingError::MatrixInversionFailed { .. }
76 | DecodingError::InconsistentMetadata { .. }
77 | DecodingError::SymbolSizeMismatch { .. } => Self::new(ErrorKind::DecodingFailed),
78 DecodingError::BlockTimeout { .. } => Self::new(ErrorKind::ThresholdTimeout),
79 }
80 .with_message(err.to_string())
81 }
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum RejectReason {
87 WrongObjectId,
89 AuthenticationFailed,
91 SymbolSizeMismatch,
93 BlockAlreadyDecoded,
95 InsufficientRank,
97 InconsistentEquations,
99 InvalidMetadata,
101 MemoryLimitReached,
103}
104
105#[derive(Debug, Clone, PartialEq, Eq)]
107pub enum SymbolAcceptResult {
108 Accepted {
110 received: usize,
112 needed: usize,
114 },
115 DecodingStarted {
117 block_sbn: u8,
119 },
120 BlockComplete {
122 block_sbn: u8,
124 data: Vec<u8>,
126 },
127 Duplicate,
129 Rejected(RejectReason),
131}
132
133#[derive(Debug, Clone)]
135pub struct DecodingConfig {
136 pub symbol_size: u16,
138 pub max_block_size: usize,
140 pub repair_overhead: f64,
142 pub min_overhead: usize,
144 pub max_buffered_symbols: usize,
146 pub block_timeout: Duration,
148 pub verify_auth: bool,
150}
151
152impl Default for DecodingConfig {
153 fn default() -> Self {
154 Self {
155 symbol_size: 256,
156 max_block_size: 1024 * 1024,
157 repair_overhead: 1.05,
158 min_overhead: 0,
159 max_buffered_symbols: 0,
160 block_timeout: Duration::from_secs(30),
161 verify_auth: false,
162 }
163 }
164}
165
166#[derive(Debug, Clone, Copy)]
168pub struct DecodingProgress {
169 pub blocks_complete: usize,
171 pub blocks_total: Option<usize>,
173 pub symbols_received: usize,
175 pub symbols_needed_estimate: usize,
177}
178
179#[derive(Debug, Clone, Copy)]
181pub struct BlockStatus {
182 pub sbn: u8,
184 pub symbols_received: usize,
186 pub symbols_needed: usize,
188 pub state: BlockStateKind,
190}
191
192#[derive(Debug, Clone, Copy, PartialEq, Eq)]
194pub enum BlockStateKind {
195 Collecting,
197 Decoding,
199 Decoded,
201 Failed,
203}
204
205#[derive(Debug)]
206struct BlockDecoder {
207 sbn: u8,
208 state: BlockDecodingState,
209 decoded: Option<Vec<u8>>,
210}
211
212#[derive(Debug)]
213enum BlockDecodingState {
214 Collecting,
215 Decoding,
216 Decoded,
217 Failed,
218}
219
220#[derive(Debug)]
222pub struct DecodingPipeline {
223 config: DecodingConfig,
224 symbols: SymbolSet,
225 accepted_symbols_total: usize,
226 blocks: HashMap<u8, BlockDecoder>,
227 completed_blocks: HashSet<u8>,
228 object_id: Option<ObjectId>,
229 object_size: Option<u64>,
230 block_plans: Option<Vec<BlockPlan>>,
231 auth_context: Option<SecurityContext>,
232}
233
234impl DecodingPipeline {
235 #[must_use]
237 pub fn new(config: DecodingConfig) -> Self {
238 let threshold = ThresholdConfig::new(
239 config.repair_overhead,
240 config.min_overhead,
241 config.max_buffered_symbols,
242 );
243 Self {
244 config,
245 symbols: SymbolSet::with_config(threshold),
246 accepted_symbols_total: 0,
247 blocks: HashMap::new(),
248 completed_blocks: HashSet::new(),
249 object_id: None,
250 object_size: None,
251 block_plans: None,
252 auth_context: None,
253 }
254 }
255
256 #[must_use]
258 pub fn with_auth(config: DecodingConfig, ctx: SecurityContext) -> Self {
259 let mut pipeline = Self::new(config);
260 pipeline.auth_context = Some(ctx);
261 pipeline
262 }
263
264 pub fn set_object_params(&mut self, params: ObjectParams) -> Result<(), DecodingError> {
266 if params.symbol_size != self.config.symbol_size {
267 return Err(DecodingError::SymbolSizeMismatch {
268 expected: self.config.symbol_size,
269 actual: params.symbol_size as usize,
270 });
271 }
272 if let Some(existing) = self.object_id {
273 if existing != params.object_id {
274 return Err(DecodingError::InconsistentMetadata {
275 sbn: 0,
276 details: format!(
277 "object id mismatch: expected {existing:?}, got {:?}",
278 params.object_id
279 ),
280 });
281 }
282 }
283 let plans = plan_blocks(
284 params.object_size as usize,
285 usize::from(params.symbol_size),
286 self.config.max_block_size,
287 )?;
288 validate_object_params_layout(params, &plans)?;
289 self.object_id = Some(params.object_id);
290 self.object_size = Some(params.object_size);
291 self.block_plans = Some(plans);
292 self.configure_block_k();
293 Ok(())
294 }
295
296 pub fn feed(
298 &mut self,
299 mut auth_symbol: AuthenticatedSymbol,
300 ) -> Result<SymbolAcceptResult, DecodingError> {
301 if self.config.verify_auth {
302 match &self.auth_context {
303 Some(ctx) => {
304 if !auth_symbol.is_verified()
305 && ctx.verify_authenticated_symbol(&mut auth_symbol).is_err()
306 {
307 return Ok(SymbolAcceptResult::Rejected(
308 RejectReason::AuthenticationFailed,
309 ));
310 }
311 }
312 None => {
313 if !auth_symbol.is_verified() {
317 return Ok(SymbolAcceptResult::Rejected(
318 RejectReason::AuthenticationFailed,
319 ));
320 }
321 }
322 }
323 }
324
325 let symbol = auth_symbol.into_symbol();
326
327 if symbol.len() != usize::from(self.config.symbol_size) {
328 return Ok(SymbolAcceptResult::Rejected(
329 RejectReason::SymbolSizeMismatch,
330 ));
331 }
332
333 if let Some(object_id) = self.object_id {
334 if object_id != symbol.object_id() {
335 return Ok(SymbolAcceptResult::Rejected(RejectReason::WrongObjectId));
336 }
337 } else {
338 self.object_id = Some(symbol.object_id());
339 }
340
341 let sbn = symbol.sbn();
342 if self.completed_blocks.contains(&sbn) {
343 return Ok(SymbolAcceptResult::Rejected(
344 RejectReason::BlockAlreadyDecoded,
345 ));
346 }
347
348 self.blocks.entry(sbn).or_insert_with(|| BlockDecoder {
350 sbn,
351 state: BlockDecodingState::Collecting,
352 decoded: None,
353 });
354
355 let insert_result = self.symbols.insert(symbol);
356 match insert_result {
357 InsertResult::Duplicate => Ok(SymbolAcceptResult::Duplicate),
358 InsertResult::MemoryLimitReached | InsertResult::BlockLimitReached { .. } => Ok(
359 SymbolAcceptResult::Rejected(RejectReason::MemoryLimitReached),
360 ),
361 InsertResult::Inserted {
362 block_progress,
363 threshold_reached,
364 } => {
365 self.accepted_symbols_total = self.accepted_symbols_total.saturating_add(1);
366 if block_progress.k.is_none() {
367 self.configure_block_k();
368 }
369 let needed = block_progress.k.map_or(0, |k| {
370 required_symbols(k, self.config.repair_overhead, self.config.min_overhead)
371 });
372 let received = block_progress.total();
373
374 if threshold_reached {
375 if let Some(block) = self.blocks.get_mut(&sbn) {
377 block.state = BlockDecodingState::Decoding;
378 }
379 if let Some(result) = self.try_decode_block(sbn) {
380 return Ok(result);
381 }
382 }
383
384 if let Some(block) = self.blocks.get_mut(&sbn) {
386 if !matches!(
387 block.state,
388 BlockDecodingState::Decoded | BlockDecodingState::Failed
389 ) {
390 block.state = BlockDecodingState::Collecting;
391 }
392 }
393 Ok(SymbolAcceptResult::Accepted { received, needed })
394 }
395 }
396 }
397
398 pub fn feed_batch(
400 &mut self,
401 symbols: impl Iterator<Item = AuthenticatedSymbol>,
402 ) -> Vec<Result<SymbolAcceptResult, DecodingError>> {
403 symbols.map(|symbol| self.feed(symbol)).collect()
404 }
405
406 #[must_use]
408 pub fn is_complete(&self) -> bool {
409 let Some(plans) = &self.block_plans else {
410 return false;
411 };
412 self.completed_blocks.len() == plans.len()
413 }
414
415 #[must_use]
417 pub fn progress(&self) -> DecodingProgress {
418 let blocks_total = self.block_plans.as_ref().map(Vec::len);
419 let symbols_received = self.accepted_symbols_total;
420 let symbols_needed_estimate = self.block_plans.as_ref().map_or(0, |plans| {
421 sum_required_symbols(plans, self.config.repair_overhead, self.config.min_overhead)
422 });
423
424 DecodingProgress {
425 blocks_complete: self.completed_blocks.len(),
426 blocks_total,
427 symbols_received,
428 symbols_needed_estimate,
429 }
430 }
431
432 #[must_use]
434 pub fn block_status(&self, sbn: u8) -> Option<BlockStatus> {
435 let progress = self.symbols.block_progress(sbn)?;
436 let state = self
437 .blocks
438 .get(&sbn)
439 .map_or(BlockStateKind::Collecting, |block| match block.state {
440 BlockDecodingState::Collecting => BlockStateKind::Collecting,
441 BlockDecodingState::Decoding => BlockStateKind::Decoding,
442 BlockDecodingState::Decoded => BlockStateKind::Decoded,
443 BlockDecodingState::Failed => BlockStateKind::Failed,
444 });
445
446 let symbols_needed = progress.k.map_or(0, |k| {
447 required_symbols(k, self.config.repair_overhead, self.config.min_overhead)
448 });
449
450 Some(BlockStatus {
451 sbn,
452 symbols_received: progress.total(),
453 symbols_needed,
454 state,
455 })
456 }
457
458 pub fn into_data(self) -> Result<Vec<u8>, DecodingError> {
460 let Some(plans) = &self.block_plans else {
461 return Err(DecodingError::InconsistentMetadata {
462 sbn: 0,
463 details: "object parameters not set".to_string(),
464 });
465 };
466 if !self.is_complete() {
467 let received = self.accepted_symbols_total;
468 let needed =
469 sum_required_symbols(plans, self.config.repair_overhead, self.config.min_overhead);
470 return Err(DecodingError::InsufficientSymbols { received, needed });
471 }
472
473 let mut output = Vec::with_capacity(self.object_size.unwrap_or(0) as usize);
474 for plan in plans {
475 let block = self
476 .blocks
477 .get(&plan.sbn)
478 .and_then(|b| b.decoded.as_ref())
479 .ok_or_else(|| DecodingError::InconsistentMetadata {
480 sbn: plan.sbn,
481 details: "missing decoded block".to_string(),
482 })?;
483 output.extend_from_slice(block);
484 }
485
486 if let Some(size) = self.object_size {
487 output.truncate(size as usize);
488 }
489
490 Ok(output)
491 }
492
493 fn configure_block_k(&mut self) {
494 let Some(plans) = &self.block_plans else {
495 return;
496 };
497 for plan in plans {
498 let k = u16::try_from(plan.k).unwrap_or(u16::MAX);
499 let _ = self.symbols.set_block_k(plan.sbn, k);
500 }
501 }
502
503 fn try_decode_block(&mut self, sbn: u8) -> Option<SymbolAcceptResult> {
504 let block_plan = self.block_plan(sbn)?;
505 let k = block_plan.k;
506 if k == 0 {
507 return None;
508 }
509
510 let symbols: Vec<Symbol> = self.symbols.symbols_for_block(sbn).cloned().collect();
511 if symbols.len() < k {
512 return None;
513 }
514
515 let decoded_symbols =
516 match decode_block(block_plan, &symbols, usize::from(self.config.symbol_size)) {
517 Ok(symbols) => symbols,
518 Err(DecodingError::InsufficientSymbols { .. }) => {
519 return Some(SymbolAcceptResult::Rejected(RejectReason::InsufficientRank));
520 }
521 Err(DecodingError::MatrixInversionFailed { .. }) => {
522 return Some(SymbolAcceptResult::Rejected(
523 RejectReason::InconsistentEquations,
524 ));
525 }
526 Err(DecodingError::InconsistentMetadata { .. }) => {
527 let block = self.blocks.get_mut(&sbn);
528 if let Some(block) = block {
529 block.state = BlockDecodingState::Failed;
530 }
531 return Some(SymbolAcceptResult::Rejected(RejectReason::InvalidMetadata));
532 }
533 Err(DecodingError::SymbolSizeMismatch { .. }) => {
534 let block = self.blocks.get_mut(&sbn);
535 if let Some(block) = block {
536 block.state = BlockDecodingState::Failed;
537 }
538 return Some(SymbolAcceptResult::Rejected(
539 RejectReason::SymbolSizeMismatch,
540 ));
541 }
542 Err(_err) => {
543 let block = self.blocks.get_mut(&sbn);
544 if let Some(block) = block {
545 block.state = BlockDecodingState::Failed;
546 }
547 return Some(SymbolAcceptResult::Rejected(
548 RejectReason::MemoryLimitReached,
549 ));
550 }
551 };
552
553 let mut block_data = Vec::with_capacity(block_plan.len);
554 for symbol in &decoded_symbols {
555 block_data.extend_from_slice(symbol.data());
556 }
557 block_data.truncate(block_plan.len);
558
559 if let Some(block) = self.blocks.get_mut(&sbn) {
560 block.state = BlockDecodingState::Decoded;
561 block.decoded = Some(block_data.clone());
562 }
563
564 self.completed_blocks.insert(sbn);
565 self.symbols.clear_block(sbn);
566
567 Some(SymbolAcceptResult::BlockComplete {
568 block_sbn: sbn,
569 data: block_data,
570 })
571 }
572
573 fn block_plan(&self, sbn: u8) -> Option<&BlockPlan> {
574 self.block_plans
575 .as_ref()
576 .and_then(|plans| plans.iter().find(|plan| plan.sbn == sbn))
577 }
578}
579
580#[derive(Debug, Clone)]
581struct BlockPlan {
582 sbn: u8,
583 start: usize,
584 len: usize,
585 k: usize,
586}
587
588impl BlockPlan {
589 fn end(&self) -> usize {
590 self.start + self.len
591 }
592}
593
594fn plan_blocks(
595 object_size: usize,
596 symbol_size: usize,
597 max_block_size: usize,
598) -> Result<Vec<BlockPlan>, DecodingError> {
599 if object_size == 0 {
600 return Ok(Vec::new());
601 }
602
603 if symbol_size == 0 {
604 return Err(DecodingError::InconsistentMetadata {
605 sbn: 0,
606 details: "symbol_size must be > 0".to_string(),
607 });
608 }
609
610 let max_blocks = u8::MAX as usize + 1;
611 let max_total = max_block_size.saturating_mul(max_blocks);
612 if object_size > max_total {
613 return Err(DecodingError::InconsistentMetadata {
614 sbn: 0,
615 details: format!("object size {object_size} exceeds limit {max_total}"),
616 });
617 }
618
619 let mut blocks = Vec::new();
620 let mut offset = 0;
621 let mut sbn: u8 = 0;
622
623 while offset < object_size {
624 let len = usize::min(max_block_size, object_size - offset);
625 let k = len.div_ceil(symbol_size);
626 blocks.push(BlockPlan {
627 sbn,
628 start: offset,
629 len,
630 k,
631 });
632 offset += len;
633 sbn = sbn.wrapping_add(1);
634 }
635
636 Ok(blocks)
637}
638
639fn validate_object_params_layout(
640 params: ObjectParams,
641 plans: &[BlockPlan],
642) -> Result<(), DecodingError> {
643 let declared_blocks = usize::from(params.source_blocks);
644 let declared_k = usize::from(params.symbols_per_block);
645
646 if plans.is_empty() {
647 if declared_blocks == 0 && declared_k == 0 {
648 return Ok(());
649 }
650 if declared_blocks == 1 {
651 return Ok(());
652 }
653 return Err(DecodingError::InconsistentMetadata {
654 sbn: 0,
655 details: format!(
656 "object params layout mismatch: empty object expects either 0 blocks / 0 symbols-per-block or a single empty sentinel block, got {declared_blocks} block(s) with {declared_k} symbols/block"
657 ),
658 });
659 }
660
661 let expected_blocks = plans.len();
662 if declared_blocks != expected_blocks {
663 return Err(DecodingError::InconsistentMetadata {
664 sbn: 0,
665 details: format!(
666 "object params block count mismatch: expected {expected_blocks}, got {declared_blocks}"
667 ),
668 });
669 }
670
671 let expected_k = plans.iter().map(|plan| plan.k).max().unwrap_or(0);
672 if declared_k != expected_k {
673 return Err(DecodingError::InconsistentMetadata {
674 sbn: 0,
675 details: format!(
676 "object params symbols_per_block mismatch: expected {expected_k}, got {declared_k}"
677 ),
678 });
679 }
680
681 Ok(())
682}
683
684fn required_symbols(k: u16, overhead: f64, min_overhead: usize) -> usize {
685 if k == 0 {
686 return 0;
687 }
688 let raw = (f64::from(k) * overhead).ceil();
689 let minimum_threshold = usize::from(k).saturating_add(min_overhead);
690 if raw.is_nan() {
691 return minimum_threshold;
692 }
693 if raw.is_sign_positive() && !raw.is_finite() {
694 return usize::MAX;
695 }
696 if raw.is_sign_negative() {
697 return minimum_threshold;
698 }
699 #[allow(clippy::cast_sign_loss)]
700 let factor_threshold = raw as usize;
701 factor_threshold.max(minimum_threshold)
704}
705
706fn sum_required_symbols(plans: &[BlockPlan], overhead: f64, min_overhead: usize) -> usize {
707 plans.iter().fold(0usize, |acc, plan| {
708 acc.saturating_add(required_symbols(
709 u16::try_from(plan.k).unwrap_or(u16::MAX),
710 overhead,
711 min_overhead,
712 ))
713 })
714}
715
716#[allow(clippy::too_many_lines)]
717fn decode_block(
718 plan: &BlockPlan,
719 symbols: &[Symbol],
720 symbol_size: usize,
721) -> Result<Vec<Symbol>, DecodingError> {
722 let k = plan.k;
723 if symbols.len() < k {
724 return Err(DecodingError::InsufficientSymbols {
725 received: symbols.len(),
726 needed: k,
727 });
728 }
729
730 let object_id = symbols.first().map_or(ObjectId::NIL, Symbol::object_id);
731 let params = SystematicParams::for_source_block(k, symbol_size);
732 let block_seed = seed_for_block(object_id, plan.sbn);
733 let decoder = InactivationDecoder::new(k, symbol_size, block_seed);
734 let padding_rows = params.k_prime.saturating_sub(k);
735
736 let mut received = decoder.constraint_symbols();
738 received.reserve(symbols.len() + padding_rows);
739
740 for symbol in symbols {
742 match symbol.kind() {
743 SymbolKind::Source => {
744 let esi = symbol.esi() as usize;
745 if esi >= k {
746 return Err(DecodingError::InconsistentMetadata {
747 sbn: plan.sbn,
748 details: format!("source esi {esi} >= k {k}"),
749 });
750 }
751 received.push(ReceivedSymbol {
753 esi: symbol.esi(),
754 is_source: true,
755 columns: vec![esi],
756 coefficients: vec![Gf256::ONE],
757 data: symbol.data().to_vec(),
758 });
759 }
760 SymbolKind::Repair => {
761 let (columns, coefficients) = decoder.repair_equation(symbol.esi());
762 received.push(ReceivedSymbol {
763 esi: symbol.esi(),
764 is_source: false,
765 columns,
766 coefficients,
767 data: symbol.data().to_vec(),
768 });
769 }
770 }
771 }
772
773 for esi in k..params.k_prime {
776 received.push(ReceivedSymbol {
777 esi: esi as u32,
778 is_source: false,
779 columns: vec![esi],
780 coefficients: vec![Gf256::ONE],
781 data: vec![0u8; symbol_size],
782 });
783 }
784
785 let result = match decoder.decode(&received) {
786 Ok(result) => result,
787 Err(err) => {
788 let mapped = match err {
789 RaptorDecodeError::InsufficientSymbols { received, required } => {
790 DecodingError::InsufficientSymbols {
791 received,
792 needed: required,
793 }
794 }
795 RaptorDecodeError::SingularMatrix { row } => DecodingError::MatrixInversionFailed {
796 reason: format!("singular matrix at row {row}"),
797 },
798 RaptorDecodeError::SymbolSizeMismatch { expected, actual } => {
799 DecodingError::SymbolSizeMismatch {
800 expected: u16::try_from(expected).unwrap_or(u16::MAX),
801 actual,
802 }
803 }
804 RaptorDecodeError::SymbolEquationArityMismatch {
805 esi,
806 columns,
807 coefficients,
808 } => DecodingError::InconsistentMetadata {
809 sbn: plan.sbn,
810 details: format!(
811 "symbol {esi} has mismatched equation vectors: columns={columns}, coefficients={coefficients}"
812 ),
813 },
814 RaptorDecodeError::ColumnIndexOutOfRange {
815 esi,
816 column,
817 max_valid,
818 } => DecodingError::InconsistentMetadata {
819 sbn: plan.sbn,
820 details: format!(
821 "symbol {esi} references out-of-range column {column} (valid < {max_valid})"
822 ),
823 },
824 RaptorDecodeError::CorruptDecodedOutput {
825 esi,
826 byte_index,
827 expected,
828 actual,
829 } => DecodingError::MatrixInversionFailed {
830 reason: format!(
831 "decoded output verification failed at symbol {esi}, byte {byte_index}: expected 0x{expected:02x}, actual 0x{actual:02x}"
832 ),
833 },
834 };
835 return Err(mapped);
836 }
837 };
838
839 let mut decoded_symbols = Vec::with_capacity(k);
843 for (esi, data) in result.source.into_iter().enumerate() {
844 decoded_symbols.push(Symbol::new(
845 SymbolId::new(object_id, plan.sbn, esi as u32),
846 data,
847 SymbolKind::Source,
848 ));
849 }
850
851 Ok(decoded_symbols)
852}
853
854fn seed_for_block(object_id: ObjectId, sbn: u8) -> u64 {
855 seed_for(object_id, sbn, 0)
856}
857
858fn seed_for(object_id: ObjectId, sbn: u8, esi: u32) -> u64 {
859 let obj = object_id.as_u128();
860 let hi = (obj >> 64) as u64;
861 let lo = obj as u64;
862 let mut seed = hi ^ lo.rotate_left(13);
863 seed ^= u64::from(sbn) << 56;
864 seed ^= u64::from(esi);
865 if seed == 0 { 1 } else { seed }
866}
867
868#[cfg(test)]
869mod tests {
870 use super::*;
871 use crate::encoding::EncodingPipeline;
872 use crate::types::resource::{PoolConfig, SymbolPool};
873
874 fn init_test(name: &str) {
875 crate::test_utils::init_test_logging();
876 crate::test_phase!(name);
877 }
878
879 fn pool() -> SymbolPool {
880 SymbolPool::new(PoolConfig {
881 symbol_size: 256,
882 initial_size: 64,
883 max_size: 64,
884 allow_growth: false,
885 growth_increment: 0,
886 })
887 }
888
889 fn encoding_config() -> crate::config::EncodingConfig {
890 crate::config::EncodingConfig {
891 symbol_size: 256,
892 max_block_size: 1024,
893 repair_overhead: 1.05,
894 encoding_parallelism: 1,
895 decoding_parallelism: 1,
896 }
897 }
898
899 fn decoder_with_params(
900 config: &crate::config::EncodingConfig,
901 object_id: ObjectId,
902 data_len: usize,
903 repair_overhead: f64,
904 min_overhead: usize,
905 ) -> DecodingPipeline {
906 let mut decoder = DecodingPipeline::new(DecodingConfig {
907 symbol_size: config.symbol_size,
908 max_block_size: config.max_block_size,
909 repair_overhead,
910 min_overhead,
911 max_buffered_symbols: 0,
912 block_timeout: Duration::from_secs(30),
913 verify_auth: false,
914 });
915 let symbols_per_block = (data_len.div_ceil(usize::from(config.symbol_size))) as u16;
916 decoder
917 .set_object_params(ObjectParams::new(
918 object_id,
919 data_len as u64,
920 config.symbol_size,
921 1,
922 symbols_per_block,
923 ))
924 .expect("params");
925 decoder
926 }
927
928 #[test]
929 fn decode_roundtrip_sources_only() {
930 init_test("decode_roundtrip_sources_only");
931 let config = encoding_config();
932 let mut encoder = EncodingPipeline::new(config.clone(), pool());
933 let object_id = ObjectId::new_for_test(1);
934 let data = vec![42u8; 512];
935 let symbols: Vec<Symbol> = encoder
936 .encode_with_repair(object_id, &data, 0)
937 .map(|res| res.unwrap().into_symbol())
938 .collect();
939
940 let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.0, 0);
941
942 for symbol in symbols {
943 let auth = AuthenticatedSymbol::from_parts(
944 symbol,
945 crate::security::tag::AuthenticationTag::zero(),
946 );
947 let _ = decoder.feed(auth).unwrap();
948 }
949
950 let decoded_data = decoder.into_data().expect("decoded");
951 let ok = decoded_data == data;
952 crate::assert_with_log!(ok, "decoded data", data, decoded_data);
953 crate::test_complete!("decode_roundtrip_sources_only");
954 }
955
956 #[test]
957 fn decode_roundtrip_out_of_order() {
958 init_test("decode_roundtrip_out_of_order");
959 let config = encoding_config();
960 let mut encoder = EncodingPipeline::new(config.clone(), pool());
961 let object_id = ObjectId::new_for_test(2);
962 let data = vec![7u8; 768];
963 let mut symbols: Vec<Symbol> = encoder
964 .encode_with_repair(object_id, &data, 2)
965 .map(|res| res.expect("encode").into_symbol())
966 .collect();
967
968 symbols.reverse();
969
970 let mut decoder =
971 decoder_with_params(&config, object_id, data.len(), config.repair_overhead, 0);
972
973 for symbol in symbols {
974 let auth = AuthenticatedSymbol::from_parts(
975 symbol,
976 crate::security::tag::AuthenticationTag::zero(),
977 );
978 let _ = decoder.feed(auth).expect("feed");
979 }
980
981 let decoded_data = decoder.into_data().expect("decoded");
982 let ok = decoded_data == data;
983 crate::assert_with_log!(ok, "decoded data", data, decoded_data);
984 crate::test_complete!("decode_roundtrip_out_of_order");
985 }
986
987 #[test]
988 fn reject_wrong_object_id() {
989 init_test("reject_wrong_object_id");
990 let config = encoding_config();
991 let mut encoder = EncodingPipeline::new(config.clone(), pool());
992 let object_id_a = ObjectId::new_for_test(10);
993 let object_id_b = ObjectId::new_for_test(11);
994 let data = vec![1u8; 128];
995
996 let mut decoder =
997 decoder_with_params(&config, object_id_a, data.len(), config.repair_overhead, 0);
998
999 let symbol_b = encoder
1000 .encode_with_repair(object_id_b, &data, 0)
1001 .next()
1002 .expect("symbol")
1003 .expect("encode")
1004 .into_symbol();
1005 let auth = AuthenticatedSymbol::from_parts(
1006 symbol_b,
1007 crate::security::tag::AuthenticationTag::zero(),
1008 );
1009
1010 let result = decoder.feed(auth).expect("feed");
1011 let expected = SymbolAcceptResult::Rejected(RejectReason::WrongObjectId);
1012 let ok = result == expected;
1013 crate::assert_with_log!(ok, "wrong object id", expected, result);
1014 crate::test_complete!("reject_wrong_object_id");
1015 }
1016
1017 #[test]
1018 fn reject_symbol_size_mismatch() {
1019 init_test("reject_symbol_size_mismatch");
1020 let config = encoding_config();
1021 let mut decoder = DecodingPipeline::new(DecodingConfig {
1022 symbol_size: config.symbol_size,
1023 max_block_size: config.max_block_size,
1024 repair_overhead: config.repair_overhead,
1025 min_overhead: 0,
1026 max_buffered_symbols: 0,
1027 block_timeout: Duration::from_secs(30),
1028 verify_auth: false,
1029 });
1030
1031 let symbol = Symbol::new(
1032 SymbolId::new(ObjectId::new_for_test(20), 0, 0),
1033 vec![0u8; 8],
1034 SymbolKind::Source,
1035 );
1036 let auth = AuthenticatedSymbol::from_parts(
1037 symbol,
1038 crate::security::tag::AuthenticationTag::zero(),
1039 );
1040 let result = decoder.feed(auth).expect("feed");
1041 let expected = SymbolAcceptResult::Rejected(RejectReason::SymbolSizeMismatch);
1042 let ok = result == expected;
1043 crate::assert_with_log!(ok, "symbol size mismatch", expected, result);
1044 crate::test_complete!("reject_symbol_size_mismatch");
1045 }
1046
1047 #[test]
1048 fn reject_invalid_metadata_esi_out_of_range() {
1049 init_test("reject_invalid_metadata_esi_out_of_range");
1050 let mut decoder = DecodingPipeline::new(DecodingConfig {
1051 symbol_size: 8,
1052 max_block_size: 8,
1053 repair_overhead: 1.0,
1054 min_overhead: 0,
1055 max_buffered_symbols: 0,
1056 block_timeout: Duration::from_secs(30),
1057 verify_auth: false,
1058 });
1059 let object_id = ObjectId::new_for_test(21);
1060 decoder
1061 .set_object_params(ObjectParams::new(object_id, 8, 8, 1, 1))
1062 .expect("params");
1063
1064 let symbol = Symbol::new(
1065 SymbolId::new(object_id, 0, 1),
1066 vec![0u8; 8],
1067 SymbolKind::Source,
1068 );
1069 let auth = AuthenticatedSymbol::from_parts(
1070 symbol,
1071 crate::security::tag::AuthenticationTag::zero(),
1072 );
1073
1074 let result = decoder.feed(auth).expect("feed");
1075 let expected = SymbolAcceptResult::Rejected(RejectReason::InvalidMetadata);
1076 let ok = result == expected;
1077 crate::assert_with_log!(ok, "invalid metadata", expected, result);
1078 crate::test_complete!("reject_invalid_metadata_esi_out_of_range");
1079 }
1080
1081 #[test]
1082 fn duplicate_symbol_before_decode() {
1083 init_test("duplicate_symbol_before_decode");
1084 let config = encoding_config();
1085 let mut encoder = EncodingPipeline::new(config.clone(), pool());
1086 let object_id = ObjectId::new_for_test(30);
1087 let data = vec![9u8; 512];
1089
1090 let symbol = encoder
1091 .encode_with_repair(object_id, &data, 0)
1092 .next()
1093 .expect("symbol")
1094 .expect("encode")
1095 .into_symbol();
1096
1097 let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.5, 1);
1098
1099 let first = decoder
1100 .feed(AuthenticatedSymbol::from_parts(
1101 symbol.clone(),
1102 crate::security::tag::AuthenticationTag::zero(),
1103 ))
1104 .expect("feed");
1105 let accepted = matches!(
1106 first,
1107 SymbolAcceptResult::Accepted { .. } | SymbolAcceptResult::DecodingStarted { .. }
1108 );
1109 crate::assert_with_log!(accepted, "first accepted", true, accepted);
1110
1111 let second = decoder
1112 .feed(AuthenticatedSymbol::from_parts(
1113 symbol,
1114 crate::security::tag::AuthenticationTag::zero(),
1115 ))
1116 .expect("feed");
1117 let expected = SymbolAcceptResult::Duplicate;
1118 let ok = second == expected;
1119 crate::assert_with_log!(ok, "second duplicate", expected, second);
1120 crate::test_complete!("duplicate_symbol_before_decode");
1121 }
1122
1123 #[test]
1124 fn into_data_reports_insufficient_symbols() {
1125 init_test("into_data_reports_insufficient_symbols");
1126 let config = encoding_config();
1127 let mut encoder = EncodingPipeline::new(config.clone(), pool());
1128 let object_id = ObjectId::new_for_test(40);
1129 let data = vec![5u8; 512];
1130
1131 let mut decoder =
1132 decoder_with_params(&config, object_id, data.len(), config.repair_overhead, 0);
1133
1134 let symbol = encoder
1135 .encode_with_repair(object_id, &data, 0)
1136 .next()
1137 .expect("symbol")
1138 .expect("encode")
1139 .into_symbol();
1140 let auth = AuthenticatedSymbol::from_parts(
1141 symbol,
1142 crate::security::tag::AuthenticationTag::zero(),
1143 );
1144 let _ = decoder.feed(auth).expect("feed");
1145
1146 let err = decoder
1147 .into_data()
1148 .expect_err("expected insufficient symbols");
1149 let insufficient = matches!(err, DecodingError::InsufficientSymbols { .. });
1150 crate::assert_with_log!(insufficient, "insufficient symbols", true, insufficient);
1151 crate::test_complete!("into_data_reports_insufficient_symbols");
1152 }
1153
1154 #[test]
1157 fn decoding_error_display_authentication_failed() {
1158 let err = DecodingError::AuthenticationFailed {
1159 symbol_id: SymbolId::new(ObjectId::new_for_test(1), 0, 0),
1160 };
1161 let msg = err.to_string();
1162 assert!(msg.contains("authentication failed"), "{msg}");
1163 }
1164
1165 #[test]
1166 fn decoding_error_display_insufficient_symbols() {
1167 let err = DecodingError::InsufficientSymbols {
1168 received: 3,
1169 needed: 10,
1170 };
1171 assert_eq!(err.to_string(), "insufficient symbols: have 3, need 10");
1172 }
1173
1174 #[test]
1175 fn decoding_error_display_matrix_inversion() {
1176 let err = DecodingError::MatrixInversionFailed {
1177 reason: "rank deficient".into(),
1178 };
1179 assert_eq!(err.to_string(), "matrix inversion failed: rank deficient");
1180 }
1181
1182 #[test]
1183 fn decoding_error_display_block_timeout() {
1184 let err = DecodingError::BlockTimeout {
1185 sbn: 2,
1186 elapsed: Duration::from_millis(1500),
1187 };
1188 let msg = err.to_string();
1189 assert!(msg.contains("block timeout"), "{msg}");
1190 assert!(msg.contains("1.5"), "{msg}");
1191 }
1192
1193 #[test]
1194 fn decoding_error_display_inconsistent_metadata() {
1195 let err = DecodingError::InconsistentMetadata {
1196 sbn: 0,
1197 details: "mismatch".into(),
1198 };
1199 let msg = err.to_string();
1200 assert!(msg.contains("inconsistent block metadata"), "{msg}");
1201 assert!(msg.contains("mismatch"), "{msg}");
1202 }
1203
1204 #[test]
1205 fn decoding_error_display_symbol_size_mismatch() {
1206 let err = DecodingError::SymbolSizeMismatch {
1207 expected: 256,
1208 actual: 128,
1209 };
1210 assert_eq!(
1211 err.to_string(),
1212 "symbol size mismatch: expected 256, got 128"
1213 );
1214 }
1215
1216 #[test]
1219 fn decoding_error_into_error_auth() {
1220 let err = DecodingError::AuthenticationFailed {
1221 symbol_id: SymbolId::new(ObjectId::new_for_test(1), 0, 0),
1222 };
1223 let error: crate::error::Error = err.into();
1224 assert_eq!(error.kind(), crate::error::ErrorKind::CorruptedSymbol);
1225 }
1226
1227 #[test]
1228 fn decoding_error_into_error_insufficient() {
1229 let err = DecodingError::InsufficientSymbols {
1230 received: 1,
1231 needed: 5,
1232 };
1233 let error: crate::error::Error = err.into();
1234 assert_eq!(error.kind(), crate::error::ErrorKind::InsufficientSymbols);
1235 }
1236
1237 #[test]
1238 fn decoding_error_into_error_matrix() {
1239 let err = DecodingError::MatrixInversionFailed {
1240 reason: "singular".into(),
1241 };
1242 let error: crate::error::Error = err.into();
1243 assert_eq!(error.kind(), crate::error::ErrorKind::DecodingFailed);
1244 }
1245
1246 #[test]
1247 fn decoding_error_into_error_timeout() {
1248 let err = DecodingError::BlockTimeout {
1249 sbn: 0,
1250 elapsed: Duration::from_secs(30),
1251 };
1252 let error: crate::error::Error = err.into();
1253 assert_eq!(error.kind(), crate::error::ErrorKind::ThresholdTimeout);
1254 }
1255
1256 #[test]
1257 fn decoding_error_into_error_inconsistent() {
1258 let err = DecodingError::InconsistentMetadata {
1259 sbn: 1,
1260 details: "x".into(),
1261 };
1262 let error: crate::error::Error = err.into();
1263 assert_eq!(error.kind(), crate::error::ErrorKind::DecodingFailed);
1264 }
1265
1266 #[test]
1267 fn decoding_error_into_error_size_mismatch() {
1268 let err = DecodingError::SymbolSizeMismatch {
1269 expected: 256,
1270 actual: 64,
1271 };
1272 let error: crate::error::Error = err.into();
1273 assert_eq!(error.kind(), crate::error::ErrorKind::DecodingFailed);
1274 }
1275
1276 #[test]
1279 fn reject_reason_variants_are_eq() {
1280 assert_eq!(RejectReason::WrongObjectId, RejectReason::WrongObjectId);
1281 assert_ne!(
1282 RejectReason::AuthenticationFailed,
1283 RejectReason::SymbolSizeMismatch
1284 );
1285 }
1286
1287 #[test]
1288 fn reject_reason_debug() {
1289 let dbg = format!("{:?}", RejectReason::BlockAlreadyDecoded);
1290 assert_eq!(dbg, "BlockAlreadyDecoded");
1291 }
1292
1293 #[test]
1296 fn symbol_accept_result_accepted_eq() {
1297 let a = SymbolAcceptResult::Accepted {
1298 received: 3,
1299 needed: 5,
1300 };
1301 let b = SymbolAcceptResult::Accepted {
1302 received: 3,
1303 needed: 5,
1304 };
1305 assert_eq!(a, b);
1306 }
1307
1308 #[test]
1309 fn symbol_accept_result_duplicate_eq() {
1310 assert_eq!(SymbolAcceptResult::Duplicate, SymbolAcceptResult::Duplicate);
1311 }
1312
1313 #[test]
1314 fn symbol_accept_result_rejected_eq() {
1315 let a = SymbolAcceptResult::Rejected(RejectReason::MemoryLimitReached);
1316 let b = SymbolAcceptResult::Rejected(RejectReason::MemoryLimitReached);
1317 assert_eq!(a, b);
1318 }
1319
1320 #[test]
1321 fn symbol_accept_result_variants_ne() {
1322 assert_ne!(
1323 SymbolAcceptResult::Duplicate,
1324 SymbolAcceptResult::Rejected(RejectReason::WrongObjectId)
1325 );
1326 }
1327
1328 #[test]
1331 fn decoding_config_default_values() {
1332 let cfg = DecodingConfig::default();
1333 assert_eq!(cfg.symbol_size, 256);
1334 assert_eq!(cfg.max_block_size, 1024 * 1024);
1335 assert!((cfg.repair_overhead - 1.05).abs() < f64::EPSILON);
1336 assert_eq!(cfg.min_overhead, 0);
1337 assert_eq!(cfg.max_buffered_symbols, 0);
1338 assert_eq!(cfg.block_timeout, Duration::from_secs(30));
1339 assert!(!cfg.verify_auth);
1340 }
1341
1342 #[test]
1343 fn required_symbols_uses_total_factor_and_minimum_extra_floor() {
1344 assert_eq!(required_symbols(0, 1.05, 3), 0);
1345 assert_eq!(required_symbols(10, 1.05, 3), 13);
1346 assert_eq!(required_symbols(10, 1.5, 1), 15);
1347 assert_eq!(required_symbols(10, 0.5, 0), 10);
1348 assert_eq!(required_symbols(10, f64::NAN, 3), 13);
1349 assert_eq!(required_symbols(10, f64::INFINITY, 3), usize::MAX);
1350 }
1351
1352 #[test]
1355 fn block_state_kind_eq_and_debug() {
1356 assert_eq!(BlockStateKind::Collecting, BlockStateKind::Collecting);
1357 assert_ne!(BlockStateKind::Collecting, BlockStateKind::Decoded);
1358 assert_eq!(format!("{:?}", BlockStateKind::Failed), "Failed");
1359 assert_eq!(format!("{:?}", BlockStateKind::Decoding), "Decoding");
1360 }
1361
1362 #[test]
1365 fn pipeline_new_starts_empty() {
1366 let pipeline = DecodingPipeline::new(DecodingConfig::default());
1367 let progress = pipeline.progress();
1368 assert_eq!(progress.blocks_complete, 0);
1369 assert_eq!(progress.symbols_received, 0);
1370 }
1371
1372 #[test]
1373 fn pipeline_set_object_params_rejects_mismatched_symbol_size() {
1374 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1375 symbol_size: 256,
1376 ..DecodingConfig::default()
1377 });
1378 let params = ObjectParams::new(ObjectId::new_for_test(1), 1024, 128, 1, 8);
1379 let err = pipeline.set_object_params(params).unwrap_err();
1380 assert!(matches!(err, DecodingError::SymbolSizeMismatch { .. }));
1381 }
1382
1383 #[test]
1384 fn pipeline_set_object_params_rejects_inconsistent_object_id() {
1385 let config = encoding_config();
1386 let oid1 = ObjectId::new_for_test(1);
1387 let oid2 = ObjectId::new_for_test(2);
1388
1389 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1390 symbol_size: config.symbol_size,
1391 ..DecodingConfig::default()
1392 });
1393 pipeline
1394 .set_object_params(ObjectParams::new(oid1, 512, config.symbol_size, 1, 2))
1395 .expect("first set_object_params");
1396 let err = pipeline
1397 .set_object_params(ObjectParams::new(oid2, 512, config.symbol_size, 1, 2))
1398 .unwrap_err();
1399 assert!(matches!(err, DecodingError::InconsistentMetadata { .. }));
1400 }
1401
1402 #[test]
1403 fn pipeline_set_object_params_same_id_is_ok() {
1404 let config = encoding_config();
1405 let oid = ObjectId::new_for_test(1);
1406
1407 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1408 symbol_size: config.symbol_size,
1409 ..DecodingConfig::default()
1410 });
1411 pipeline
1412 .set_object_params(ObjectParams::new(oid, 512, config.symbol_size, 1, 2))
1413 .expect("first");
1414 pipeline
1415 .set_object_params(ObjectParams::new(oid, 512, config.symbol_size, 1, 2))
1416 .expect("second with same id should succeed");
1417 }
1418
1419 #[test]
1420 fn pipeline_set_object_params_rejects_declared_block_count_drift() {
1421 let config = encoding_config();
1422 let object_id = ObjectId::new_for_test(104);
1423
1424 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1425 symbol_size: config.symbol_size,
1426 max_block_size: config.max_block_size,
1427 ..DecodingConfig::default()
1428 });
1429 let err = pipeline
1430 .set_object_params(ObjectParams::new(object_id, 1536, config.symbol_size, 1, 4))
1431 .unwrap_err();
1432 assert!(matches!(err, DecodingError::InconsistentMetadata { .. }));
1433 assert!(
1434 err.to_string().contains("block count mismatch"),
1435 "unexpected error: {err}"
1436 );
1437 }
1438
1439 #[test]
1440 fn pipeline_set_object_params_rejects_total_k_metadata_for_multi_block_object() {
1441 let config = encoding_config();
1442 let object_id = ObjectId::new_for_test(105);
1443
1444 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1445 symbol_size: config.symbol_size,
1446 max_block_size: config.max_block_size,
1447 ..DecodingConfig::default()
1448 });
1449 let err = pipeline
1450 .set_object_params(ObjectParams::new(object_id, 2048, config.symbol_size, 2, 8))
1451 .unwrap_err();
1452 assert!(matches!(err, DecodingError::InconsistentMetadata { .. }));
1453 assert!(
1454 err.to_string().contains("symbols_per_block mismatch"),
1455 "unexpected error: {err}"
1456 );
1457 }
1458
1459 #[test]
1460 fn pipeline_set_object_params_failure_does_not_latch_object_identity() {
1461 let config = encoding_config();
1462 let invalid_object_id = ObjectId::new_for_test(106);
1463 let valid_object_id = ObjectId::new_for_test(107);
1464
1465 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1466 symbol_size: config.symbol_size,
1467 max_block_size: config.max_block_size,
1468 ..DecodingConfig::default()
1469 });
1470 let err = pipeline
1471 .set_object_params(ObjectParams::new(
1472 invalid_object_id,
1473 2048,
1474 config.symbol_size,
1475 2,
1476 8,
1477 ))
1478 .unwrap_err();
1479 assert!(matches!(err, DecodingError::InconsistentMetadata { .. }));
1480
1481 pipeline
1482 .set_object_params(ObjectParams::new(
1483 valid_object_id,
1484 512,
1485 config.symbol_size,
1486 1,
1487 2,
1488 ))
1489 .expect("failed set_object_params must not poison object identity");
1490 }
1491
1492 #[test]
1493 fn pipeline_set_object_params_accepts_empty_object_single_block_sentinel_metadata() {
1494 let config = encoding_config();
1495 let object_id = ObjectId::new_for_test(108);
1496
1497 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1498 symbol_size: config.symbol_size,
1499 max_block_size: config.max_block_size,
1500 ..DecodingConfig::default()
1501 });
1502 pipeline
1503 .set_object_params(ObjectParams::new(
1504 object_id,
1505 0,
1506 config.symbol_size,
1507 1,
1508 config
1509 .max_block_size
1510 .div_ceil(usize::from(config.symbol_size))
1511 .try_into()
1512 .expect("sentinel block K should fit in u16"),
1513 ))
1514 .expect("empty object sentinel metadata should be accepted");
1515
1516 assert!(pipeline.is_complete());
1517 assert_eq!(pipeline.progress().blocks_total, Some(0));
1518 assert_eq!(
1519 pipeline.into_data().expect("empty object should decode"),
1520 Vec::<u8>::new()
1521 );
1522 }
1523
1524 #[test]
1525 fn pipeline_set_object_params_accepts_full_256_block_boundary() {
1526 let config = crate::config::EncodingConfig {
1527 symbol_size: 8,
1528 max_block_size: 8,
1529 ..encoding_config()
1530 };
1531 let object_id = ObjectId::new_for_test(109);
1532
1533 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1534 symbol_size: config.symbol_size,
1535 max_block_size: config.max_block_size,
1536 ..DecodingConfig::default()
1537 });
1538 pipeline
1539 .set_object_params(ObjectParams::new(
1540 object_id,
1541 u64::try_from(config.max_block_size * 256).expect("boundary object size fits u64"),
1542 config.symbol_size,
1543 256,
1544 1,
1545 ))
1546 .expect("256-block metadata boundary should be representable");
1547
1548 assert_eq!(pipeline.progress().blocks_total, Some(256));
1549 }
1550
1551 #[test]
1554 fn feed_batch_returns_results_per_symbol() {
1555 init_test("feed_batch_returns_results_per_symbol");
1556 let config = encoding_config();
1557 let mut encoder = EncodingPipeline::new(config.clone(), pool());
1558 let object_id = ObjectId::new_for_test(100);
1559 let data = vec![0xAAu8; 768]; let symbols: Vec<AuthenticatedSymbol> = encoder
1562 .encode_with_repair(object_id, &data, 0)
1563 .map(|res| {
1564 AuthenticatedSymbol::from_parts(
1565 res.unwrap().into_symbol(),
1566 crate::security::tag::AuthenticationTag::zero(),
1567 )
1568 })
1569 .take(3)
1570 .collect();
1571
1572 let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.5, 1);
1573
1574 let results = decoder.feed_batch(symbols.into_iter());
1575 let len = results.len();
1576 let expected_len = 3usize;
1577 crate::assert_with_log!(len == expected_len, "batch length", expected_len, len);
1578 for (i, r) in results.iter().enumerate() {
1579 let is_ok = r.is_ok();
1580 crate::assert_with_log!(is_ok, &format!("result[{i}] is Ok"), true, is_ok);
1581 }
1582 crate::test_complete!("feed_batch_returns_results_per_symbol");
1583 }
1584
1585 #[test]
1586 fn is_complete_false_without_params() {
1587 init_test("is_complete_false_without_params");
1588 let pipeline = DecodingPipeline::new(DecodingConfig::default());
1589 let complete = pipeline.is_complete();
1590 crate::assert_with_log!(!complete, "is_complete without params", false, complete);
1591 crate::test_complete!("is_complete_false_without_params");
1592 }
1593
1594 #[test]
1595 fn is_complete_true_after_all_blocks_decoded() {
1596 init_test("is_complete_true_after_all_blocks_decoded");
1597 let config = encoding_config();
1598 let mut encoder = EncodingPipeline::new(config.clone(), pool());
1599 let object_id = ObjectId::new_for_test(101);
1600 let data = vec![42u8; 512];
1601 let symbols: Vec<Symbol> = encoder
1602 .encode_with_repair(object_id, &data, 0)
1603 .map(|res| res.unwrap().into_symbol())
1604 .collect();
1605
1606 let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.0, 0);
1607
1608 for symbol in symbols {
1609 let auth = AuthenticatedSymbol::from_parts(
1610 symbol,
1611 crate::security::tag::AuthenticationTag::zero(),
1612 );
1613 let _ = decoder.feed(auth).unwrap();
1614 }
1615
1616 let complete = decoder.is_complete();
1617 crate::assert_with_log!(complete, "is_complete after all blocks", true, complete);
1618 crate::test_complete!("is_complete_true_after_all_blocks_decoded");
1619 }
1620
1621 #[test]
1622 fn progress_reports_blocks_total_after_params() {
1623 init_test("progress_reports_blocks_total_after_params");
1624 let config = encoding_config();
1625 let object_id = ObjectId::new_for_test(102);
1626
1627 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1628 symbol_size: config.symbol_size,
1629 max_block_size: 1024,
1630 ..DecodingConfig::default()
1631 });
1632 let k = (512usize).div_ceil(usize::from(config.symbol_size)) as u16;
1634 pipeline
1635 .set_object_params(ObjectParams::new(object_id, 512, config.symbol_size, 1, k))
1636 .expect("set params");
1637
1638 let progress = pipeline.progress();
1639 let blocks_total = progress.blocks_total;
1640 let expected_blocks = Some(1usize);
1641 crate::assert_with_log!(
1642 blocks_total == expected_blocks,
1643 "blocks_total",
1644 expected_blocks,
1645 blocks_total
1646 );
1647 let estimate = progress.symbols_needed_estimate;
1648 let positive = estimate > 0;
1649 crate::assert_with_log!(positive, "symbols_needed_estimate > 0", true, positive);
1650 crate::test_complete!("progress_reports_blocks_total_after_params");
1651 }
1652
1653 #[test]
1654 fn progress_symbols_needed_estimate_does_not_double_count_min_overhead() {
1655 init_test("progress_symbols_needed_estimate_does_not_double_count_min_overhead");
1656 let object_id = ObjectId::new_for_test(1020);
1657 let symbol_size = 256u16;
1658 let k = 10u16;
1659 let data_len = usize::from(symbol_size) * usize::from(k);
1660
1661 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1662 symbol_size,
1663 max_block_size: 4096,
1664 repair_overhead: 1.05,
1665 min_overhead: 3,
1666 max_buffered_symbols: 0,
1667 block_timeout: Duration::from_secs(30),
1668 verify_auth: false,
1669 });
1670 pipeline
1671 .set_object_params(ObjectParams::new(
1672 object_id,
1673 data_len as u64,
1674 symbol_size,
1675 1,
1676 k,
1677 ))
1678 .expect("set params");
1679
1680 let progress = pipeline.progress();
1681 assert_eq!(progress.blocks_total, Some(1));
1682 assert_eq!(progress.symbols_needed_estimate, 13);
1683 crate::test_complete!(
1684 "progress_symbols_needed_estimate_does_not_double_count_min_overhead"
1685 );
1686 }
1687
1688 #[test]
1689 fn progress_symbols_needed_estimate_saturates_for_infinite_overhead() {
1690 init_test("progress_symbols_needed_estimate_saturates_for_infinite_overhead");
1691 let object_id = ObjectId::new_for_test(1021);
1692 let symbol_size = 256u16;
1693 let data_len = 2048usize;
1694
1695 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1696 symbol_size,
1697 max_block_size: 1024,
1698 repair_overhead: f64::INFINITY,
1699 min_overhead: 0,
1700 max_buffered_symbols: 0,
1701 block_timeout: Duration::from_secs(30),
1702 verify_auth: false,
1703 });
1704 pipeline
1705 .set_object_params(ObjectParams::new(
1706 object_id,
1707 data_len as u64,
1708 symbol_size,
1709 2,
1710 4,
1711 ))
1712 .expect("set params");
1713
1714 let progress = pipeline.progress();
1715 assert_eq!(progress.blocks_total, Some(2));
1716 assert_eq!(progress.symbols_needed_estimate, usize::MAX);
1717 crate::test_complete!("progress_symbols_needed_estimate_saturates_for_infinite_overhead");
1718 }
1719
1720 #[test]
1721 fn block_status_none_for_unknown_block() {
1722 init_test("block_status_none_for_unknown_block");
1723 let config = encoding_config();
1724 let object_id = ObjectId::new_for_test(103);
1725
1726 let mut pipeline = DecodingPipeline::new(DecodingConfig {
1727 symbol_size: config.symbol_size,
1728 max_block_size: config.max_block_size,
1729 ..DecodingConfig::default()
1730 });
1731 let k = (512usize).div_ceil(usize::from(config.symbol_size)) as u16;
1732 pipeline
1733 .set_object_params(ObjectParams::new(object_id, 512, config.symbol_size, 1, k))
1734 .expect("set params");
1735
1736 let status = pipeline.block_status(99);
1737 let is_none = status.is_none();
1738 crate::assert_with_log!(is_none, "block_status(99) is None", true, is_none);
1739 crate::test_complete!("block_status_none_for_unknown_block");
1740 }
1741
1742 #[test]
1743 fn block_status_collecting_after_partial_feed() {
1744 init_test("block_status_collecting_after_partial_feed");
1745 let config = encoding_config();
1746 let mut encoder = EncodingPipeline::new(config.clone(), pool());
1747 let object_id = ObjectId::new_for_test(104);
1748 let data = vec![0xBBu8; 512];
1749
1750 let first_symbol = encoder
1751 .encode_with_repair(object_id, &data, 0)
1752 .next()
1753 .expect("symbol")
1754 .expect("encode")
1755 .into_symbol();
1756
1757 let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.5, 1);
1759
1760 let auth = AuthenticatedSymbol::from_parts(
1761 first_symbol,
1762 crate::security::tag::AuthenticationTag::zero(),
1763 );
1764 let _ = decoder.feed(auth).expect("feed");
1765
1766 let status = decoder.block_status(0);
1767 let is_some = status.is_some();
1768 crate::assert_with_log!(is_some, "block_status(0) is Some", true, is_some);
1769
1770 let status = status.unwrap();
1771 let state = status.state;
1772 let expected_state = BlockStateKind::Collecting;
1773 crate::assert_with_log!(
1774 state == expected_state,
1775 "state is Collecting",
1776 expected_state,
1777 state
1778 );
1779 let received = status.symbols_received;
1780 let expected_received = 1usize;
1781 crate::assert_with_log!(
1782 received == expected_received,
1783 "symbols_received",
1784 expected_received,
1785 received
1786 );
1787 crate::test_complete!("block_status_collecting_after_partial_feed");
1788 }
1789
1790 #[test]
1791 fn block_status_decoded_after_complete() {
1792 init_test("block_status_decoded_after_complete");
1793 let config = encoding_config();
1794 let mut encoder = EncodingPipeline::new(config.clone(), pool());
1795 let object_id = ObjectId::new_for_test(105);
1796 let data = vec![42u8; 512];
1797 let symbols: Vec<Symbol> = encoder
1798 .encode_with_repair(object_id, &data, 0)
1799 .map(|res| res.unwrap().into_symbol())
1800 .collect();
1801
1802 let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.0, 0);
1803
1804 for symbol in symbols {
1805 let auth = AuthenticatedSymbol::from_parts(
1806 symbol,
1807 crate::security::tag::AuthenticationTag::zero(),
1808 );
1809 let _ = decoder.feed(auth).unwrap();
1810 }
1811
1812 let _status = decoder.block_status(0);
1816 let complete = decoder.is_complete();
1817 crate::assert_with_log!(complete, "is_complete", true, complete);
1818
1819 let extra = Symbol::new(
1822 SymbolId::new(object_id, 0, 99),
1823 vec![0u8; usize::from(config.symbol_size)],
1824 SymbolKind::Source,
1825 );
1826 let auth =
1827 AuthenticatedSymbol::from_parts(extra, crate::security::tag::AuthenticationTag::zero());
1828 let result = decoder.feed(auth).expect("feed");
1829 let expected = SymbolAcceptResult::Rejected(RejectReason::BlockAlreadyDecoded);
1830 let ok = result == expected;
1831 crate::assert_with_log!(ok, "block already decoded", expected, result);
1832 crate::test_complete!("block_status_decoded_after_complete");
1833 }
1834
1835 #[test]
1836 fn block_already_decoded_reject() {
1837 init_test("block_already_decoded_reject");
1838 let config = encoding_config();
1839 let mut encoder = EncodingPipeline::new(config.clone(), pool());
1840 let object_id = ObjectId::new_for_test(106);
1841 let data = vec![42u8; 512];
1842 let symbols: Vec<Symbol> = encoder
1843 .encode_with_repair(object_id, &data, 0)
1844 .map(|res| res.unwrap().into_symbol())
1845 .collect();
1846
1847 let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.0, 0);
1848
1849 for symbol in symbols {
1850 let auth = AuthenticatedSymbol::from_parts(
1851 symbol,
1852 crate::security::tag::AuthenticationTag::zero(),
1853 );
1854 let _ = decoder.feed(auth).unwrap();
1855 }
1856
1857 let extra = Symbol::new(
1859 SymbolId::new(object_id, 0, 0),
1860 vec![0u8; usize::from(config.symbol_size)],
1861 SymbolKind::Source,
1862 );
1863 let auth =
1864 AuthenticatedSymbol::from_parts(extra, crate::security::tag::AuthenticationTag::zero());
1865 let result = decoder.feed(auth).expect("feed");
1866 let expected = SymbolAcceptResult::Rejected(RejectReason::BlockAlreadyDecoded);
1867 let ok = result == expected;
1868 crate::assert_with_log!(ok, "block already decoded reject", expected, result);
1869 crate::test_complete!("block_already_decoded_reject");
1870 }
1871
1872 #[test]
1873 fn verify_auth_no_context_unverified_symbol_errors() {
1874 init_test("verify_auth_no_context_unverified_symbol_errors");
1875 let config = encoding_config();
1876 let mut decoder = DecodingPipeline::new(DecodingConfig {
1877 symbol_size: config.symbol_size,
1878 max_block_size: config.max_block_size,
1879 verify_auth: true,
1880 ..DecodingConfig::default()
1881 });
1882
1883 let symbol = Symbol::new(
1884 SymbolId::new(ObjectId::new_for_test(107), 0, 0),
1885 vec![0u8; usize::from(config.symbol_size)],
1886 SymbolKind::Source,
1887 );
1888 let auth = AuthenticatedSymbol::from_parts(
1890 symbol,
1891 crate::security::tag::AuthenticationTag::zero(),
1892 );
1893
1894 let result = decoder.feed(auth);
1895 let is_ok = result.is_ok();
1896 crate::assert_with_log!(
1897 is_ok,
1898 "unverified with no context is rejected safely",
1899 true,
1900 is_ok
1901 );
1902
1903 let accept = result.unwrap();
1904 let expected = SymbolAcceptResult::Rejected(RejectReason::AuthenticationFailed);
1905 crate::assert_with_log!(
1906 accept == expected,
1907 "rejected as auth failed",
1908 expected,
1909 accept
1910 );
1911 crate::test_complete!("verify_auth_no_context_unverified_symbol_errors");
1912 }
1913
1914 #[test]
1915 fn verify_auth_no_context_preverified_symbol_ok() {
1916 init_test("verify_auth_no_context_preverified_symbol_ok");
1917 let config = encoding_config();
1918 let mut decoder = DecodingPipeline::new(DecodingConfig {
1919 symbol_size: config.symbol_size,
1920 max_block_size: config.max_block_size,
1921 verify_auth: true,
1922 ..DecodingConfig::default()
1923 });
1924
1925 let symbol = Symbol::new(
1926 SymbolId::new(ObjectId::new_for_test(108), 0, 0),
1927 vec![0u8; usize::from(config.symbol_size)],
1928 SymbolKind::Source,
1929 );
1930 let auth = AuthenticatedSymbol::new_verified(
1932 symbol,
1933 crate::security::tag::AuthenticationTag::zero(),
1934 );
1935
1936 let result = decoder.feed(auth);
1937 let is_ok = result.is_ok();
1938 crate::assert_with_log!(is_ok, "preverified symbol accepted", true, is_ok);
1939 let accept = result.unwrap();
1940 let is_accepted = matches!(accept, SymbolAcceptResult::Accepted { .. });
1941 crate::assert_with_log!(is_accepted, "result is Accepted variant", true, is_accepted);
1942 crate::test_complete!("verify_auth_no_context_preverified_symbol_ok");
1943 }
1944
1945 #[test]
1946 fn with_auth_rejects_bad_tag() {
1947 init_test("with_auth_rejects_bad_tag");
1948 let config = encoding_config();
1949 let mut decoder = DecodingPipeline::with_auth(
1950 DecodingConfig {
1951 symbol_size: config.symbol_size,
1952 max_block_size: config.max_block_size,
1953 verify_auth: true,
1954 ..DecodingConfig::default()
1955 },
1956 crate::security::SecurityContext::for_testing(42),
1957 );
1958
1959 let symbol = Symbol::new(
1960 SymbolId::new(ObjectId::new_for_test(109), 0, 0),
1961 vec![0u8; usize::from(config.symbol_size)],
1962 SymbolKind::Source,
1963 );
1964 let auth = AuthenticatedSymbol::from_parts(
1966 symbol,
1967 crate::security::tag::AuthenticationTag::zero(),
1968 );
1969
1970 let result = decoder.feed(auth).expect("feed should not return Err");
1971 let expected = SymbolAcceptResult::Rejected(RejectReason::AuthenticationFailed);
1972 let ok = result == expected;
1973 crate::assert_with_log!(ok, "bad tag rejected", expected, result);
1974 crate::test_complete!("with_auth_rejects_bad_tag");
1975 }
1976
1977 #[test]
1978 fn multi_block_roundtrip() {
1979 init_test("multi_block_roundtrip");
1980 let config = crate::config::EncodingConfig {
1981 symbol_size: 256,
1982 max_block_size: 1024,
1983 repair_overhead: 1.05,
1984 encoding_parallelism: 1,
1985 decoding_parallelism: 1,
1986 };
1987 let mut encoder = EncodingPipeline::new(config.clone(), pool());
1988 let object_id = ObjectId::new_for_test(110);
1989 let data: Vec<u8> = (0u32..2048).map(|i| (i % 251) as u8).collect();
1990
1991 let symbols: Vec<Symbol> = encoder
1992 .encode_with_repair(object_id, &data, 0)
1993 .map(|res| res.unwrap().into_symbol())
1994 .collect();
1995
1996 let mut decoder = DecodingPipeline::new(DecodingConfig {
1997 symbol_size: config.symbol_size,
1998 max_block_size: config.max_block_size,
1999 repair_overhead: 1.0,
2000 min_overhead: 0,
2001 max_buffered_symbols: 0,
2002 block_timeout: Duration::from_secs(30),
2003 verify_auth: false,
2004 });
2005
2006 let symbol_size = usize::from(config.symbol_size);
2008 let num_blocks = data.len().div_ceil(config.max_block_size);
2009 let mut full_block_k: u16 = 0;
2010 for b in 0..num_blocks {
2011 let block_start = b * config.max_block_size;
2012 let block_len = usize::min(config.max_block_size, data.len() - block_start);
2013 let k = block_len.div_ceil(symbol_size) as u16;
2014 full_block_k = full_block_k.max(k);
2015 }
2016 decoder
2017 .set_object_params(ObjectParams::new(
2018 object_id,
2019 data.len() as u64,
2020 config.symbol_size,
2021 num_blocks as u16,
2022 full_block_k,
2023 ))
2024 .expect("set params");
2025
2026 for symbol in symbols {
2027 let auth = AuthenticatedSymbol::from_parts(
2028 symbol,
2029 crate::security::tag::AuthenticationTag::zero(),
2030 );
2031 let _ = decoder.feed(auth).unwrap();
2032 }
2033
2034 let complete = decoder.is_complete();
2035 crate::assert_with_log!(complete, "multi-block is_complete", true, complete);
2036
2037 let decoded_data = decoder.into_data().expect("decoded");
2038 let ok = decoded_data == data;
2039 crate::assert_with_log!(
2040 ok,
2041 "multi-block roundtrip data",
2042 data.len(),
2043 decoded_data.len()
2044 );
2045 crate::test_complete!("multi_block_roundtrip");
2046 }
2047
2048 #[test]
2049 fn multi_block_progress_retains_cumulative_symbols_after_block_completion() {
2050 init_test("multi_block_progress_retains_cumulative_symbols_after_block_completion");
2051 let config = crate::config::EncodingConfig {
2052 symbol_size: 256,
2053 max_block_size: 1024,
2054 repair_overhead: 1.05,
2055 encoding_parallelism: 1,
2056 decoding_parallelism: 1,
2057 };
2058 let mut encoder = EncodingPipeline::new(config.clone(), pool());
2059 let object_id = ObjectId::new_for_test(111);
2060 let data: Vec<u8> = (0u32..2048).map(|i| (i % 251) as u8).collect();
2061
2062 let mut block_zero_symbols: Vec<Symbol> = encoder
2063 .encode_with_repair(object_id, &data, 0)
2064 .map(|res| res.expect("encode").into_symbol())
2065 .filter(|symbol| symbol.sbn() == 0)
2066 .collect();
2067 block_zero_symbols.sort_by_key(Symbol::esi);
2068 assert_eq!(block_zero_symbols.len(), 4);
2069
2070 let mut decoder = DecodingPipeline::new(DecodingConfig {
2071 symbol_size: config.symbol_size,
2072 max_block_size: config.max_block_size,
2073 repair_overhead: 1.0,
2074 min_overhead: 0,
2075 max_buffered_symbols: 0,
2076 block_timeout: Duration::from_secs(30),
2077 verify_auth: false,
2078 });
2079 decoder
2080 .set_object_params(ObjectParams::new(
2081 object_id,
2082 data.len() as u64,
2083 config.symbol_size,
2084 2,
2085 4,
2086 ))
2087 .expect("set params");
2088
2089 for symbol in block_zero_symbols {
2090 let auth = AuthenticatedSymbol::from_parts(
2091 symbol,
2092 crate::security::tag::AuthenticationTag::zero(),
2093 );
2094 let _ = decoder.feed(auth).expect("feed");
2095 }
2096
2097 assert_eq!(decoder.progress().blocks_complete, 1);
2098 assert_eq!(decoder.progress().blocks_total, Some(2));
2099 assert_eq!(decoder.progress().symbols_received, 4);
2100 assert_eq!(decoder.progress().symbols_needed_estimate, 8);
2101
2102 let err = decoder.into_data().expect_err("block one is still missing");
2103 assert!(matches!(
2104 err,
2105 DecodingError::InsufficientSymbols {
2106 received: 4,
2107 needed: 8
2108 }
2109 ));
2110 crate::test_complete!(
2111 "multi_block_progress_retains_cumulative_symbols_after_block_completion"
2112 );
2113 }
2114
2115 #[test]
2116 fn into_data_no_params_errors() {
2117 init_test("into_data_no_params_errors");
2118 let pipeline = DecodingPipeline::new(DecodingConfig::default());
2119 let result = pipeline.into_data();
2120 let is_err = result.is_err();
2121 crate::assert_with_log!(is_err, "into_data without params errors", true, is_err);
2122 let err = result.unwrap_err();
2123 let msg = err.to_string();
2124 let contains = msg.contains("object parameters not set");
2125 crate::assert_with_log!(
2126 contains,
2127 "error message contains expected text",
2128 true,
2129 contains
2130 );
2131 crate::test_complete!("into_data_no_params_errors");
2132 }
2133
2134 #[test]
2137 fn reject_reason_debug_clone_copy_eq() {
2138 let r = RejectReason::WrongObjectId;
2139 let r2 = r; let r3 = r;
2141 assert_eq!(r, r2);
2142 assert_eq!(r, r3);
2143 assert_ne!(r, RejectReason::AuthenticationFailed);
2144 assert_ne!(r, RejectReason::SymbolSizeMismatch);
2145 assert_ne!(r, RejectReason::BlockAlreadyDecoded);
2146 assert_ne!(r, RejectReason::InsufficientRank);
2147 assert_ne!(r, RejectReason::InconsistentEquations);
2148 assert_ne!(r, RejectReason::InvalidMetadata);
2149 assert_ne!(r, RejectReason::MemoryLimitReached);
2150 let dbg = format!("{r:?}");
2151 assert!(dbg.contains("WrongObjectId"));
2152 }
2153
2154 #[test]
2155 fn symbol_accept_result_debug_clone_eq() {
2156 let a = SymbolAcceptResult::Accepted {
2157 received: 3,
2158 needed: 5,
2159 };
2160 let a2 = a.clone();
2161 assert_eq!(a, a2);
2162 assert_ne!(a, SymbolAcceptResult::Duplicate);
2163 let r = SymbolAcceptResult::Rejected(RejectReason::InvalidMetadata);
2164 let r2 = r.clone();
2165 assert_eq!(r, r2);
2166 let dbg = format!("{a:?}");
2167 assert!(dbg.contains("Accepted"));
2168 }
2169
2170 #[test]
2171 fn block_state_kind_debug_clone_copy_eq() {
2172 let s = BlockStateKind::Collecting;
2173 let s2 = s; let s3 = s;
2175 assert_eq!(s, s2);
2176 assert_eq!(s, s3);
2177 assert_ne!(s, BlockStateKind::Decoding);
2178 assert_ne!(s, BlockStateKind::Decoded);
2179 assert_ne!(s, BlockStateKind::Failed);
2180 let dbg = format!("{s:?}");
2181 assert!(dbg.contains("Collecting"));
2182 }
2183}