1use crate::error::{Error, ErrorKind};
10use crate::raptorq::decoder::{
11 DecodeError as RaptorDecodeError, InactivationDecoder, ReceivedSymbol,
12};
13use crate::raptorq::gf256::{gf256_addmul_slice, Gf256};
14use crate::raptorq::systematic::{ConstraintMatrix, SystematicParams};
15use crate::security::{AuthenticatedSymbol, SecurityContext};
16use crate::types::symbol_set::{InsertResult, SymbolSet, ThresholdConfig};
17use crate::types::{ObjectId, ObjectParams, Symbol, SymbolId, SymbolKind};
18use std::collections::{BTreeSet, HashMap};
19use std::time::Duration;
20
21#[derive(Debug, thiserror::Error)]
23pub enum DecodingError {
24 #[error("authentication failed for symbol {symbol_id}")]
26 AuthenticationFailed {
27 symbol_id: SymbolId,
29 },
30 #[error("insufficient symbols: have {received}, need {needed}")]
32 InsufficientSymbols {
33 received: usize,
35 needed: usize,
37 },
38 #[error("matrix inversion failed: {reason}")]
40 MatrixInversionFailed {
41 reason: String,
43 },
44 #[error("block timeout after {elapsed:?}")]
46 BlockTimeout {
47 sbn: u8,
49 elapsed: Duration,
51 },
52 #[error("inconsistent block metadata: {sbn} {details}")]
54 InconsistentMetadata {
55 sbn: u8,
57 details: String,
59 },
60 #[error("symbol size mismatch: expected {expected}, got {actual}")]
62 SymbolSizeMismatch {
63 expected: u16,
65 actual: usize,
67 },
68}
69
70impl From<DecodingError> for Error {
71 fn from(err: DecodingError) -> Self {
72 match &err {
73 DecodingError::AuthenticationFailed { .. } => Self::new(ErrorKind::CorruptedSymbol),
74 DecodingError::InsufficientSymbols { .. } => Self::new(ErrorKind::InsufficientSymbols),
75 DecodingError::MatrixInversionFailed { .. }
76 | DecodingError::InconsistentMetadata { .. }
77 | DecodingError::SymbolSizeMismatch { .. } => Self::new(ErrorKind::DecodingFailed),
78 DecodingError::BlockTimeout { .. } => Self::new(ErrorKind::ThresholdTimeout),
79 }
80 .with_message(err.to_string())
81 }
82}
83
84#[derive(Debug, Clone, Copy, PartialEq, Eq)]
86pub enum RejectReason {
87 WrongObjectId,
89 AuthenticationFailed,
91 SymbolSizeMismatch,
93 BlockAlreadyDecoded,
95 InsufficientRank,
97 InconsistentEquations,
99 InvalidMetadata,
101 MemoryLimitReached,
103}
104
105#[derive(Debug, Clone, PartialEq, Eq)]
107pub enum SymbolAcceptResult {
108 Accepted {
110 received: usize,
112 needed: usize,
114 },
115 DecodingStarted {
117 block_sbn: u8,
119 },
120 BlockComplete {
122 block_sbn: u8,
124 data: Vec<u8>,
126 },
127 Duplicate,
129 Rejected(RejectReason),
131}
132
133#[derive(Debug, Clone)]
135pub struct DecodingConfig {
136 pub symbol_size: u16,
138 pub max_block_size: usize,
140 pub repair_overhead: f64,
142 pub min_overhead: usize,
144 pub max_buffered_symbols: usize,
146 pub block_timeout: Duration,
148 pub verify_auth: bool,
150}
151
152impl Default for DecodingConfig {
153 fn default() -> Self {
154 Self {
155 symbol_size: 256,
156 max_block_size: 1024 * 1024,
157 repair_overhead: 1.05,
158 min_overhead: 0,
159 max_buffered_symbols: 0,
160 block_timeout: Duration::from_secs(30),
161 verify_auth: false,
162 }
163 }
164}
165
166#[derive(Debug, Clone, Copy)]
168pub struct DecodingProgress {
169 pub blocks_complete: usize,
171 pub blocks_total: Option<usize>,
173 pub symbols_received: usize,
175 pub symbols_needed_estimate: usize,
177}
178
179#[derive(Debug, Clone, Copy)]
181pub struct BlockStatus {
182 pub sbn: u8,
184 pub symbols_received: usize,
186 pub symbols_needed: usize,
188 pub state: BlockStateKind,
190}
191
192#[derive(Debug, Clone, Copy, PartialEq, Eq)]
194pub enum BlockStateKind {
195 Collecting,
197 Decoding,
199 Decoded,
201 Failed,
203}
204
205#[derive(Debug)]
206struct BlockDecoder {
207 sbn: u8,
208 state: BlockDecodingState,
209 decoded: Option<Vec<u8>>,
210}
211
212#[derive(Debug)]
213enum BlockDecodingState {
214 Collecting,
215 Decoding,
216 Decoded,
217 Failed,
218}
219
220#[derive(Debug)]
222pub struct DecodingPipeline {
223 config: DecodingConfig,
224 symbols: SymbolSet,
225 blocks: HashMap<u8, BlockDecoder>,
226 completed_blocks: BTreeSet<u8>,
227 object_id: Option<ObjectId>,
228 object_size: Option<u64>,
229 block_plans: Option<Vec<BlockPlan>>,
230 auth_context: Option<SecurityContext>,
231}
232
233impl DecodingPipeline {
234 #[must_use]
236 pub fn new(config: DecodingConfig) -> Self {
237 let threshold = ThresholdConfig::new(
238 config.repair_overhead,
239 config.min_overhead,
240 config.max_buffered_symbols,
241 );
242 Self {
243 config,
244 symbols: SymbolSet::with_config(threshold),
245 blocks: HashMap::new(),
246 completed_blocks: BTreeSet::new(),
247 object_id: None,
248 object_size: None,
249 block_plans: None,
250 auth_context: None,
251 }
252 }
253
254 #[must_use]
256 pub fn with_auth(config: DecodingConfig, ctx: SecurityContext) -> Self {
257 let mut pipeline = Self::new(config);
258 pipeline.auth_context = Some(ctx);
259 pipeline
260 }
261
262 pub fn set_object_params(&mut self, params: ObjectParams) -> Result<(), DecodingError> {
264 if params.symbol_size != self.config.symbol_size {
265 return Err(DecodingError::SymbolSizeMismatch {
266 expected: self.config.symbol_size,
267 actual: params.symbol_size as usize,
268 });
269 }
270 if let Some(existing) = self.object_id {
271 if existing != params.object_id {
272 return Err(DecodingError::InconsistentMetadata {
273 sbn: 0,
274 details: format!(
275 "object id mismatch: expected {existing:?}, got {:?}",
276 params.object_id
277 ),
278 });
279 }
280 }
281 self.object_id = Some(params.object_id);
282 self.object_size = Some(params.object_size);
283 self.block_plans = Some(plan_blocks(
284 params.object_size as usize,
285 usize::from(params.symbol_size),
286 self.config.max_block_size,
287 )?);
288 self.configure_block_k();
289 Ok(())
290 }
291
292 pub fn feed(
294 &mut self,
295 mut auth_symbol: AuthenticatedSymbol,
296 ) -> Result<SymbolAcceptResult, DecodingError> {
297 let symbol_id = auth_symbol.symbol().id();
298
299 if self.config.verify_auth {
300 let Some(ctx) = &self.auth_context else {
301 return Err(DecodingError::AuthenticationFailed { symbol_id });
302 };
303 if !auth_symbol.is_verified()
304 && ctx.verify_authenticated_symbol(&mut auth_symbol).is_err()
305 {
306 return Ok(SymbolAcceptResult::Rejected(
307 RejectReason::AuthenticationFailed,
308 ));
309 }
310 }
311
312 let symbol = auth_symbol.into_symbol();
313
314 if symbol.len() != usize::from(self.config.symbol_size) {
315 return Ok(SymbolAcceptResult::Rejected(
316 RejectReason::SymbolSizeMismatch,
317 ));
318 }
319
320 if let Some(object_id) = self.object_id {
321 if object_id != symbol.object_id() {
322 return Ok(SymbolAcceptResult::Rejected(RejectReason::WrongObjectId));
323 }
324 } else {
325 self.object_id = Some(symbol.object_id());
326 }
327
328 let sbn = symbol.sbn();
329 if self.completed_blocks.contains(&sbn) {
330 return Ok(SymbolAcceptResult::Rejected(
331 RejectReason::BlockAlreadyDecoded,
332 ));
333 }
334
335 self.blocks.entry(sbn).or_insert_with(|| BlockDecoder {
337 sbn,
338 state: BlockDecodingState::Collecting,
339 decoded: None,
340 });
341
342 let insert_result = self.symbols.insert(symbol);
343 match insert_result {
344 InsertResult::Duplicate => Ok(SymbolAcceptResult::Duplicate),
345 InsertResult::MemoryLimitReached | InsertResult::BlockLimitReached { .. } => Ok(
346 SymbolAcceptResult::Rejected(RejectReason::MemoryLimitReached),
347 ),
348 InsertResult::Inserted {
349 block_progress,
350 threshold_reached,
351 } => {
352 if block_progress.k.is_none() {
353 self.configure_block_k();
354 }
355 let needed = block_progress.k.map_or(0, |k| {
356 required_symbols(k, self.config.repair_overhead, self.config.min_overhead)
357 });
358 let received = block_progress.total();
359
360 if threshold_reached {
361 if let Some(block) = self.blocks.get_mut(&sbn) {
363 block.state = BlockDecodingState::Decoding;
364 }
365 if let Some(result) = self.try_decode_block(sbn) {
366 return Ok(result);
367 }
368 }
369
370 if let Some(block) = self.blocks.get_mut(&sbn) {
372 if !matches!(
373 block.state,
374 BlockDecodingState::Decoded | BlockDecodingState::Failed
375 ) {
376 block.state = BlockDecodingState::Collecting;
377 }
378 }
379 Ok(SymbolAcceptResult::Accepted { received, needed })
380 }
381 }
382 }
383
384 pub fn feed_batch(
386 &mut self,
387 symbols: impl Iterator<Item = AuthenticatedSymbol>,
388 ) -> Vec<Result<SymbolAcceptResult, DecodingError>> {
389 symbols.map(|symbol| self.feed(symbol)).collect()
390 }
391
392 #[must_use]
394 pub fn is_complete(&self) -> bool {
395 let Some(plans) = &self.block_plans else {
396 return false;
397 };
398 self.completed_blocks.len() == plans.len()
399 }
400
401 #[must_use]
403 pub fn progress(&self) -> DecodingProgress {
404 let blocks_total = self.block_plans.as_ref().map(Vec::len);
405 let symbols_received = self.symbols.len();
406 let symbols_needed_estimate = self.block_plans.as_ref().map_or(0, |plans| {
407 plans
408 .iter()
409 .map(|plan| {
410 required_symbols(
411 plan.k as u16,
412 self.config.repair_overhead,
413 self.config.min_overhead,
414 )
415 })
416 .sum()
417 });
418
419 DecodingProgress {
420 blocks_complete: self.completed_blocks.len(),
421 blocks_total,
422 symbols_received,
423 symbols_needed_estimate,
424 }
425 }
426
427 #[must_use]
429 pub fn block_status(&self, sbn: u8) -> Option<BlockStatus> {
430 let progress = self.symbols.block_progress(sbn)?;
431 let state = self
432 .blocks
433 .get(&sbn)
434 .map_or(BlockStateKind::Collecting, |block| match block.state {
435 BlockDecodingState::Collecting => BlockStateKind::Collecting,
436 BlockDecodingState::Decoding => BlockStateKind::Decoding,
437 BlockDecodingState::Decoded => BlockStateKind::Decoded,
438 BlockDecodingState::Failed => BlockStateKind::Failed,
439 });
440
441 let symbols_needed = progress.k.map_or(0, |k| {
442 required_symbols(k, self.config.repair_overhead, self.config.min_overhead)
443 });
444
445 Some(BlockStatus {
446 sbn,
447 symbols_received: progress.total(),
448 symbols_needed,
449 state,
450 })
451 }
452
453 pub fn into_data(self) -> Result<Vec<u8>, DecodingError> {
455 let Some(plans) = &self.block_plans else {
456 return Err(DecodingError::InconsistentMetadata {
457 sbn: 0,
458 details: "object parameters not set".to_string(),
459 });
460 };
461 if !self.is_complete() {
462 let received = self.symbols.len();
463 let needed = plans
464 .iter()
465 .map(|plan| {
466 required_symbols(
467 plan.k as u16,
468 self.config.repair_overhead,
469 self.config.min_overhead,
470 )
471 })
472 .sum();
473 return Err(DecodingError::InsufficientSymbols { received, needed });
474 }
475
476 let mut output = Vec::with_capacity(self.object_size.unwrap_or(0) as usize);
477 for plan in plans {
478 let block = self
479 .blocks
480 .get(&plan.sbn)
481 .and_then(|b| b.decoded.as_ref())
482 .ok_or_else(|| DecodingError::InconsistentMetadata {
483 sbn: plan.sbn,
484 details: "missing decoded block".to_string(),
485 })?;
486 output.extend_from_slice(block);
487 }
488
489 if let Some(size) = self.object_size {
490 output.truncate(size as usize);
491 }
492
493 Ok(output)
494 }
495
496 fn configure_block_k(&mut self) {
497 let Some(plans) = &self.block_plans else {
498 return;
499 };
500 for plan in plans {
501 let _ = self.symbols.set_block_k(plan.sbn, plan.k as u16);
502 }
503 }
504
505 fn try_decode_block(&mut self, sbn: u8) -> Option<SymbolAcceptResult> {
506 let block_plan = self.block_plan(sbn)?;
507 let k = block_plan.k;
508 if k == 0 {
509 return None;
510 }
511
512 let symbols: Vec<Symbol> = self.symbols.symbols_for_block(sbn).cloned().collect();
513 if symbols.len() < k {
514 return None;
515 }
516
517 let decoded_symbols =
518 match decode_block(block_plan, &symbols, usize::from(self.config.symbol_size)) {
519 Ok(symbols) => symbols,
520 Err(DecodingError::InsufficientSymbols { .. }) => {
521 return Some(SymbolAcceptResult::Rejected(RejectReason::InsufficientRank));
522 }
523 Err(DecodingError::MatrixInversionFailed { .. }) => {
524 return Some(SymbolAcceptResult::Rejected(
525 RejectReason::InconsistentEquations,
526 ));
527 }
528 Err(DecodingError::InconsistentMetadata { .. }) => {
529 let block = self.blocks.get_mut(&sbn);
530 if let Some(block) = block {
531 block.state = BlockDecodingState::Failed;
532 }
533 return Some(SymbolAcceptResult::Rejected(RejectReason::InvalidMetadata));
534 }
535 Err(DecodingError::SymbolSizeMismatch { .. }) => {
536 let block = self.blocks.get_mut(&sbn);
537 if let Some(block) = block {
538 block.state = BlockDecodingState::Failed;
539 }
540 return Some(SymbolAcceptResult::Rejected(
541 RejectReason::SymbolSizeMismatch,
542 ));
543 }
544 Err(_err) => {
545 let block = self.blocks.get_mut(&sbn);
546 if let Some(block) = block {
547 block.state = BlockDecodingState::Failed;
548 }
549 return Some(SymbolAcceptResult::Rejected(
550 RejectReason::MemoryLimitReached,
551 ));
552 }
553 };
554
555 let mut block_data = Vec::with_capacity(block_plan.len);
556 for symbol in &decoded_symbols {
557 block_data.extend_from_slice(symbol.data());
558 }
559 block_data.truncate(block_plan.len);
560
561 if let Some(block) = self.blocks.get_mut(&sbn) {
562 block.state = BlockDecodingState::Decoded;
563 block.decoded = Some(block_data.clone());
564 }
565
566 self.completed_blocks.insert(sbn);
567 self.symbols.clear_block(sbn);
568
569 Some(SymbolAcceptResult::BlockComplete {
570 block_sbn: sbn,
571 data: block_data,
572 })
573 }
574
575 fn block_plan(&self, sbn: u8) -> Option<&BlockPlan> {
576 self.block_plans
577 .as_ref()
578 .and_then(|plans| plans.iter().find(|plan| plan.sbn == sbn))
579 }
580}
581
582#[derive(Debug, Clone)]
583struct BlockPlan {
584 sbn: u8,
585 start: usize,
586 len: usize,
587 k: usize,
588}
589
590impl BlockPlan {
591 fn end(&self) -> usize {
592 self.start + self.len
593 }
594}
595
596fn plan_blocks(
597 object_size: usize,
598 symbol_size: usize,
599 max_block_size: usize,
600) -> Result<Vec<BlockPlan>, DecodingError> {
601 if object_size == 0 {
602 return Ok(Vec::new());
603 }
604
605 if symbol_size == 0 {
606 return Err(DecodingError::InconsistentMetadata {
607 sbn: 0,
608 details: "symbol_size must be > 0".to_string(),
609 });
610 }
611
612 let max_blocks = u8::MAX as usize + 1;
613 let max_total = max_block_size.saturating_mul(max_blocks);
614 if object_size > max_total {
615 return Err(DecodingError::InconsistentMetadata {
616 sbn: 0,
617 details: format!("object size {object_size} exceeds limit {max_total}"),
618 });
619 }
620
621 let mut blocks = Vec::new();
622 let mut offset = 0;
623 let mut sbn: u8 = 0;
624
625 while offset < object_size {
626 let len = usize::min(max_block_size, object_size - offset);
627 let k = len.div_ceil(symbol_size);
628 blocks.push(BlockPlan {
629 sbn,
630 start: offset,
631 len,
632 k,
633 });
634 offset += len;
635 sbn = sbn.wrapping_add(1);
636 }
637
638 Ok(blocks)
639}
640
641fn required_symbols(k: u16, overhead: f64, min_overhead: usize) -> usize {
642 let raw = (f64::from(k) * overhead).ceil();
643 if raw.is_sign_negative() {
644 return 0;
645 }
646 #[allow(clippy::cast_sign_loss)]
647 let threshold = raw as usize + min_overhead;
648 threshold
649}
650
651fn decode_block(
652 plan: &BlockPlan,
653 symbols: &[Symbol],
654 symbol_size: usize,
655) -> Result<Vec<Symbol>, DecodingError> {
656 let k = plan.k;
657 if symbols.len() < k {
658 return Err(DecodingError::InsufficientSymbols {
659 received: symbols.len(),
660 needed: k,
661 });
662 }
663
664 let object_id = symbols.first().map_or(ObjectId::NIL, Symbol::object_id);
665 let params = SystematicParams::for_source_block(k, symbol_size);
666 let block_seed = seed_for_block(object_id, plan.sbn);
667 let constraints = ConstraintMatrix::build(¶ms, block_seed);
668 let base_rows = params.s + params.h;
669
670 let decoder = InactivationDecoder::new(k, symbol_size, block_seed);
671 let mut received: Vec<ReceivedSymbol> = Vec::with_capacity(base_rows + symbols.len());
672
673 for row in 0..base_rows {
674 let (columns, coefficients) = constraint_row_equation(&constraints, row);
675 received.push(ReceivedSymbol {
676 esi: row as u32,
677 is_source: false,
678 columns,
679 coefficients,
680 data: vec![0u8; symbol_size],
681 });
682 }
683
684 for symbol in symbols {
685 match symbol.kind() {
686 SymbolKind::Source => {
687 let esi = symbol.esi() as usize;
688 if esi >= k {
689 return Err(DecodingError::InconsistentMetadata {
690 sbn: plan.sbn,
691 details: format!("source esi {esi} >= k {k}"),
692 });
693 }
694 let row = base_rows + esi;
695 let (columns, coefficients) = constraint_row_equation(&constraints, row);
696 received.push(ReceivedSymbol {
697 esi: symbol.esi(),
698 is_source: true,
699 columns,
700 coefficients,
701 data: symbol.data().to_vec(),
702 });
703 }
704 SymbolKind::Repair => {
705 let (columns, coefficients) = decoder.repair_equation(symbol.esi());
706 received.push(ReceivedSymbol {
707 esi: symbol.esi(),
708 is_source: false,
709 columns,
710 coefficients,
711 data: symbol.data().to_vec(),
712 });
713 }
714 }
715 }
716
717 let intermediate = match decoder.decode(&received) {
718 Ok(result) => result.intermediate,
719 Err(err) => {
720 let mapped = match err {
721 RaptorDecodeError::InsufficientSymbols { received, required } => {
722 DecodingError::InsufficientSymbols {
723 received,
724 needed: required,
725 }
726 }
727 RaptorDecodeError::SingularMatrix { row } => DecodingError::MatrixInversionFailed {
728 reason: format!("singular matrix at row {row}"),
729 },
730 RaptorDecodeError::SymbolSizeMismatch { expected, actual } => {
731 DecodingError::SymbolSizeMismatch {
732 expected: expected as u16,
733 actual,
734 }
735 }
736 };
737 return Err(mapped);
738 }
739 };
740
741 let mut decoded_symbols = Vec::with_capacity(k);
742 for esi in 0..k {
743 let row = base_rows + esi;
744 let mut data = vec![0u8; symbol_size];
745 for (col, symbol) in intermediate.iter().enumerate().take(params.l) {
746 let coeff = constraints.get(row, col);
747 if !coeff.is_zero() {
748 gf256_addmul_slice(&mut data, symbol, coeff);
749 }
750 }
751 decoded_symbols.push(Symbol::new(
752 SymbolId::new(object_id, plan.sbn, esi as u32),
753 data,
754 SymbolKind::Source,
755 ));
756 }
757
758 Ok(decoded_symbols)
759}
760
761fn constraint_row_equation(constraints: &ConstraintMatrix, row: usize) -> (Vec<usize>, Vec<Gf256>) {
762 let mut columns = Vec::new();
763 let mut coefficients = Vec::new();
764 for col in 0..constraints.cols {
765 let coeff = constraints.get(row, col);
766 if !coeff.is_zero() {
767 columns.push(col);
768 coefficients.push(coeff);
769 }
770 }
771 (columns, coefficients)
772}
773
774fn seed_for_block(object_id: ObjectId, sbn: u8) -> u64 {
775 seed_for(object_id, sbn, 0)
776}
777
778fn seed_for(object_id: ObjectId, sbn: u8, esi: u32) -> u64 {
779 let obj = object_id.as_u128();
780 let hi = (obj >> 64) as u64;
781 let lo = obj as u64;
782 let mut seed = hi ^ lo.rotate_left(13);
783 seed ^= u64::from(sbn) << 56;
784 seed ^= u64::from(esi);
785 if seed == 0 {
786 1
787 } else {
788 seed
789 }
790}
791
792#[cfg(test)]
793mod tests {
794 use super::*;
795 use crate::encoding::EncodingPipeline;
796 use crate::types::resource::{PoolConfig, SymbolPool};
797
798 fn init_test(name: &str) {
799 crate::test_utils::init_test_logging();
800 crate::test_phase!(name);
801 }
802
803 fn pool() -> SymbolPool {
804 SymbolPool::new(PoolConfig {
805 symbol_size: 256,
806 initial_size: 64,
807 max_size: 64,
808 allow_growth: false,
809 growth_increment: 0,
810 })
811 }
812
813 fn encoding_config() -> crate::config::EncodingConfig {
814 crate::config::EncodingConfig {
815 symbol_size: 256,
816 max_block_size: 1024,
817 repair_overhead: 1.05,
818 encoding_parallelism: 1,
819 decoding_parallelism: 1,
820 }
821 }
822
823 fn decoder_with_params(
824 config: &crate::config::EncodingConfig,
825 object_id: ObjectId,
826 data_len: usize,
827 repair_overhead: f64,
828 min_overhead: usize,
829 ) -> DecodingPipeline {
830 let mut decoder = DecodingPipeline::new(DecodingConfig {
831 symbol_size: config.symbol_size,
832 max_block_size: config.max_block_size,
833 repair_overhead,
834 min_overhead,
835 max_buffered_symbols: 0,
836 block_timeout: Duration::from_secs(30),
837 verify_auth: false,
838 });
839 let symbols_per_block = (data_len.div_ceil(usize::from(config.symbol_size))) as u16;
840 decoder
841 .set_object_params(ObjectParams::new(
842 object_id,
843 data_len as u64,
844 config.symbol_size,
845 1,
846 symbols_per_block,
847 ))
848 .expect("params");
849 decoder
850 }
851
852 #[test]
853 fn decode_roundtrip_sources_only() {
854 init_test("decode_roundtrip_sources_only");
855 let config = encoding_config();
856 let mut encoder = EncodingPipeline::new(config.clone(), pool());
857 let object_id = ObjectId::new_for_test(1);
858 let data = vec![42u8; 512];
859 let symbols: Vec<Symbol> = encoder
860 .encode_with_repair(object_id, &data, 0)
861 .map(|res| res.unwrap().into_symbol())
862 .collect();
863
864 let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.0, 0);
865
866 for symbol in symbols {
867 let auth = AuthenticatedSymbol::from_parts(
868 symbol,
869 crate::security::tag::AuthenticationTag::zero(),
870 );
871 let _ = decoder.feed(auth).unwrap();
872 }
873
874 let decoded_data = decoder.into_data().expect("decoded");
875 let ok = decoded_data == data;
876 crate::assert_with_log!(ok, "decoded data", data, decoded_data);
877 crate::test_complete!("decode_roundtrip_sources_only");
878 }
879
880 #[test]
881 fn decode_roundtrip_out_of_order() {
882 init_test("decode_roundtrip_out_of_order");
883 let config = encoding_config();
884 let mut encoder = EncodingPipeline::new(config.clone(), pool());
885 let object_id = ObjectId::new_for_test(2);
886 let data = vec![7u8; 768];
887 let mut symbols: Vec<Symbol> = encoder
888 .encode_with_repair(object_id, &data, 2)
889 .map(|res| res.expect("encode").into_symbol())
890 .collect();
891
892 symbols.reverse();
893
894 let mut decoder =
895 decoder_with_params(&config, object_id, data.len(), config.repair_overhead, 0);
896
897 for symbol in symbols {
898 let auth = AuthenticatedSymbol::from_parts(
899 symbol,
900 crate::security::tag::AuthenticationTag::zero(),
901 );
902 let _ = decoder.feed(auth).expect("feed");
903 }
904
905 let decoded_data = decoder.into_data().expect("decoded");
906 let ok = decoded_data == data;
907 crate::assert_with_log!(ok, "decoded data", data, decoded_data);
908 crate::test_complete!("decode_roundtrip_out_of_order");
909 }
910
911 #[test]
912 fn reject_wrong_object_id() {
913 init_test("reject_wrong_object_id");
914 let config = encoding_config();
915 let mut encoder = EncodingPipeline::new(config.clone(), pool());
916 let object_id_a = ObjectId::new_for_test(10);
917 let object_id_b = ObjectId::new_for_test(11);
918 let data = vec![1u8; 128];
919
920 let mut decoder =
921 decoder_with_params(&config, object_id_a, data.len(), config.repair_overhead, 0);
922
923 let symbol_b = encoder
924 .encode_with_repair(object_id_b, &data, 0)
925 .next()
926 .expect("symbol")
927 .expect("encode")
928 .into_symbol();
929 let auth = AuthenticatedSymbol::from_parts(
930 symbol_b,
931 crate::security::tag::AuthenticationTag::zero(),
932 );
933
934 let result = decoder.feed(auth).expect("feed");
935 let expected = SymbolAcceptResult::Rejected(RejectReason::WrongObjectId);
936 let ok = result == expected;
937 crate::assert_with_log!(ok, "wrong object id", expected, result);
938 crate::test_complete!("reject_wrong_object_id");
939 }
940
941 #[test]
942 fn reject_symbol_size_mismatch() {
943 init_test("reject_symbol_size_mismatch");
944 let config = encoding_config();
945 let mut decoder = DecodingPipeline::new(DecodingConfig {
946 symbol_size: config.symbol_size,
947 max_block_size: config.max_block_size,
948 repair_overhead: config.repair_overhead,
949 min_overhead: 0,
950 max_buffered_symbols: 0,
951 block_timeout: Duration::from_secs(30),
952 verify_auth: false,
953 });
954
955 let symbol = Symbol::new(
956 SymbolId::new(ObjectId::new_for_test(20), 0, 0),
957 vec![0u8; 8],
958 SymbolKind::Source,
959 );
960 let auth = AuthenticatedSymbol::from_parts(
961 symbol,
962 crate::security::tag::AuthenticationTag::zero(),
963 );
964 let result = decoder.feed(auth).expect("feed");
965 let expected = SymbolAcceptResult::Rejected(RejectReason::SymbolSizeMismatch);
966 let ok = result == expected;
967 crate::assert_with_log!(ok, "symbol size mismatch", expected, result);
968 crate::test_complete!("reject_symbol_size_mismatch");
969 }
970
971 #[test]
972 fn reject_invalid_metadata_esi_out_of_range() {
973 init_test("reject_invalid_metadata_esi_out_of_range");
974 let mut decoder = DecodingPipeline::new(DecodingConfig {
975 symbol_size: 8,
976 max_block_size: 8,
977 repair_overhead: 1.0,
978 min_overhead: 0,
979 max_buffered_symbols: 0,
980 block_timeout: Duration::from_secs(30),
981 verify_auth: false,
982 });
983 let object_id = ObjectId::new_for_test(21);
984 decoder
985 .set_object_params(ObjectParams::new(object_id, 8, 8, 1, 1))
986 .expect("params");
987
988 let symbol = Symbol::new(
989 SymbolId::new(object_id, 0, 1),
990 vec![0u8; 8],
991 SymbolKind::Source,
992 );
993 let auth = AuthenticatedSymbol::from_parts(
994 symbol,
995 crate::security::tag::AuthenticationTag::zero(),
996 );
997
998 let result = decoder.feed(auth).expect("feed");
999 let expected = SymbolAcceptResult::Rejected(RejectReason::InvalidMetadata);
1000 let ok = result == expected;
1001 crate::assert_with_log!(ok, "invalid metadata", expected, result);
1002 crate::test_complete!("reject_invalid_metadata_esi_out_of_range");
1003 }
1004
1005 #[test]
1006 fn duplicate_symbol_before_decode() {
1007 init_test("duplicate_symbol_before_decode");
1008 let config = encoding_config();
1009 let mut encoder = EncodingPipeline::new(config.clone(), pool());
1010 let object_id = ObjectId::new_for_test(30);
1011 let data = vec![9u8; 512];
1013
1014 let symbol = encoder
1015 .encode_with_repair(object_id, &data, 0)
1016 .next()
1017 .expect("symbol")
1018 .expect("encode")
1019 .into_symbol();
1020
1021 let mut decoder = decoder_with_params(&config, object_id, data.len(), 1.5, 1);
1022
1023 let first = decoder
1024 .feed(AuthenticatedSymbol::from_parts(
1025 symbol.clone(),
1026 crate::security::tag::AuthenticationTag::zero(),
1027 ))
1028 .expect("feed");
1029 let accepted = matches!(
1030 first,
1031 SymbolAcceptResult::Accepted { .. } | SymbolAcceptResult::DecodingStarted { .. }
1032 );
1033 crate::assert_with_log!(accepted, "first accepted", true, accepted);
1034
1035 let second = decoder
1036 .feed(AuthenticatedSymbol::from_parts(
1037 symbol,
1038 crate::security::tag::AuthenticationTag::zero(),
1039 ))
1040 .expect("feed");
1041 let expected = SymbolAcceptResult::Duplicate;
1042 let ok = second == expected;
1043 crate::assert_with_log!(ok, "second duplicate", expected, second);
1044 crate::test_complete!("duplicate_symbol_before_decode");
1045 }
1046
1047 #[test]
1048 fn into_data_reports_insufficient_symbols() {
1049 init_test("into_data_reports_insufficient_symbols");
1050 let config = encoding_config();
1051 let mut encoder = EncodingPipeline::new(config.clone(), pool());
1052 let object_id = ObjectId::new_for_test(40);
1053 let data = vec![5u8; 512];
1054
1055 let mut decoder =
1056 decoder_with_params(&config, object_id, data.len(), config.repair_overhead, 0);
1057
1058 let symbol = encoder
1059 .encode_with_repair(object_id, &data, 0)
1060 .next()
1061 .expect("symbol")
1062 .expect("encode")
1063 .into_symbol();
1064 let auth = AuthenticatedSymbol::from_parts(
1065 symbol,
1066 crate::security::tag::AuthenticationTag::zero(),
1067 );
1068 let _ = decoder.feed(auth).expect("feed");
1069
1070 let err = decoder
1071 .into_data()
1072 .expect_err("expected insufficient symbols");
1073 let insufficient = matches!(err, DecodingError::InsufficientSymbols { .. });
1074 crate::assert_with_log!(insufficient, "insufficient symbols", true, insufficient);
1075 crate::test_complete!("into_data_reports_insufficient_symbols");
1076 }
1077}