1use fsqlite_types::sync_primitives::Instant;
15use std::fmt;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use fsqlite_error::{FrankenError, Result};
19use fsqlite_types::{ObjectId, cx::Cx};
20use tracing::{debug, error, info, warn};
21use xxhash_rust::xxh3::xxh3_64;
22
23use crate::decode_proofs::EcsDecodeProof;
24
25const BEAD_ID: &str = "bd-1hi.5";
26
27pub struct RaptorQMetrics {
37 pub encoded_bytes_total: AtomicU64,
39 pub repair_symbols_generated_total: AtomicU64,
41 pub decoded_bytes_total: AtomicU64,
43 pub encode_ops: AtomicU64,
45 pub decode_ops: AtomicU64,
47 pub decode_failures: AtomicU64,
49}
50
51impl RaptorQMetrics {
52 #[must_use]
55 pub const fn new() -> Self {
56 Self {
57 encoded_bytes_total: AtomicU64::new(0),
58 repair_symbols_generated_total: AtomicU64::new(0),
59 decoded_bytes_total: AtomicU64::new(0),
60 encode_ops: AtomicU64::new(0),
61 decode_ops: AtomicU64::new(0),
62 decode_failures: AtomicU64::new(0),
63 }
64 }
65
66 pub fn record_encode(&self, encoded_bytes: u64, repair_symbols: u64) {
68 self.encoded_bytes_total
69 .fetch_add(encoded_bytes, Ordering::Relaxed);
70 self.repair_symbols_generated_total
71 .fetch_add(repair_symbols, Ordering::Relaxed);
72 self.encode_ops.fetch_add(1, Ordering::Relaxed);
73 }
74
75 pub fn record_decode_success(&self, decoded_bytes: u64) {
77 self.decoded_bytes_total
78 .fetch_add(decoded_bytes, Ordering::Relaxed);
79 self.decode_ops.fetch_add(1, Ordering::Relaxed);
80 }
81
82 pub fn record_decode_failure(&self) {
84 self.decode_ops.fetch_add(1, Ordering::Relaxed);
85 self.decode_failures.fetch_add(1, Ordering::Relaxed);
86 }
87
88 #[must_use]
90 pub fn snapshot(&self) -> RaptorQMetricsSnapshot {
91 RaptorQMetricsSnapshot {
92 encoded_bytes_total: self.encoded_bytes_total.load(Ordering::Relaxed),
93 repair_symbols_generated_total: self
94 .repair_symbols_generated_total
95 .load(Ordering::Relaxed),
96 decoded_bytes_total: self.decoded_bytes_total.load(Ordering::Relaxed),
97 encode_ops: self.encode_ops.load(Ordering::Relaxed),
98 decode_ops: self.decode_ops.load(Ordering::Relaxed),
99 decode_failures: self.decode_failures.load(Ordering::Relaxed),
100 }
101 }
102
103 pub fn reset(&self) {
105 self.encoded_bytes_total.store(0, Ordering::Relaxed);
106 self.repair_symbols_generated_total
107 .store(0, Ordering::Relaxed);
108 self.decoded_bytes_total.store(0, Ordering::Relaxed);
109 self.encode_ops.store(0, Ordering::Relaxed);
110 self.decode_ops.store(0, Ordering::Relaxed);
111 self.decode_failures.store(0, Ordering::Relaxed);
112 }
113}
114
115impl Default for RaptorQMetrics {
116 fn default() -> Self {
117 Self::new()
118 }
119}
120
121pub static GLOBAL_RAPTORQ_METRICS: RaptorQMetrics = RaptorQMetrics::new();
123
124#[derive(Debug, Clone, Copy, PartialEq, Eq)]
126pub struct RaptorQMetricsSnapshot {
127 pub encoded_bytes_total: u64,
128 pub repair_symbols_generated_total: u64,
129 pub decoded_bytes_total: u64,
130 pub encode_ops: u64,
131 pub decode_ops: u64,
132 pub decode_failures: u64,
133}
134
135impl fmt::Display for RaptorQMetricsSnapshot {
136 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137 write!(
138 f,
139 "raptorq: encoded={} bytes ({} ops, {} repair syms), decoded={} bytes ({} ops, {} failures)",
140 self.encoded_bytes_total,
141 self.encode_ops,
142 self.repair_symbols_generated_total,
143 self.decoded_bytes_total,
144 self.decode_ops,
145 self.decode_failures,
146 )
147 }
148}
149
150fn duration_us_saturating(d: std::time::Duration) -> u64 {
152 u64::try_from(d.as_micros()).unwrap_or(u64::MAX)
153}
154
155pub const MIN_PIPELINE_SYMBOL_SIZE: u32 = 512;
161
162pub const MAX_PIPELINE_SYMBOL_SIZE: u32 = 65_536;
164
165pub const DEFAULT_CHECKPOINT_INTERVAL: u32 = 64;
167
168#[derive(Debug, Clone, Copy, PartialEq, Eq)]
173pub struct DecodeProofEmissionPolicy {
174 pub emit_on_decode_failure: bool,
176 pub emit_on_repair_success: bool,
178}
179
180impl DecodeProofEmissionPolicy {
181 #[must_use]
183 pub const fn disabled() -> Self {
184 Self {
185 emit_on_decode_failure: false,
186 emit_on_repair_success: false,
187 }
188 }
189
190 #[must_use]
192 pub const fn durability_critical() -> Self {
193 Self {
194 emit_on_decode_failure: true,
195 emit_on_repair_success: true,
196 }
197 }
198}
199
200impl Default for DecodeProofEmissionPolicy {
201 fn default() -> Self {
202 Self::disabled()
203 }
204}
205
206#[derive(Debug, Clone, PartialEq)]
211pub struct PipelineConfig {
212 pub symbol_size: u32,
215 pub max_block_size: u32,
217 pub repair_overhead: f64,
219 pub checkpoint_interval: u32,
221 pub decode_proof_policy: DecodeProofEmissionPolicy,
223}
224
225impl PipelineConfig {
226 #[must_use]
228 pub fn for_page_size(page_size: u32) -> Self {
229 Self {
230 symbol_size: page_size,
231 max_block_size: 64 * 1024,
232 repair_overhead: 1.25,
233 checkpoint_interval: DEFAULT_CHECKPOINT_INTERVAL,
234 decode_proof_policy: DecodeProofEmissionPolicy::default(),
235 }
236 }
237
238 pub fn validate(&self) -> Result<()> {
248 if self.symbol_size == 0 {
249 return Err(FrankenError::OutOfRange {
250 what: "pipeline symbol_size".to_owned(),
251 value: "0".to_owned(),
252 });
253 }
254 if !self.symbol_size.is_power_of_two() {
255 return Err(FrankenError::OutOfRange {
256 what: "pipeline symbol_size (must be power of 2)".to_owned(),
257 value: self.symbol_size.to_string(),
258 });
259 }
260 if self.symbol_size < MIN_PIPELINE_SYMBOL_SIZE
261 || self.symbol_size > MAX_PIPELINE_SYMBOL_SIZE
262 {
263 return Err(FrankenError::OutOfRange {
264 what: format!(
265 "pipeline symbol_size (must be in [{MIN_PIPELINE_SYMBOL_SIZE}, {MAX_PIPELINE_SYMBOL_SIZE}])"
266 ),
267 value: self.symbol_size.to_string(),
268 });
269 }
270 if self.max_block_size == 0 {
271 return Err(FrankenError::OutOfRange {
272 what: "pipeline max_block_size".to_owned(),
273 value: "0".to_owned(),
274 });
275 }
276 if self.repair_overhead < 1.0 {
277 return Err(FrankenError::OutOfRange {
278 what: "pipeline repair_overhead (must be >= 1.0)".to_owned(),
279 value: self.repair_overhead.to_string(),
280 });
281 }
282 if self.checkpoint_interval == 0 {
283 return Err(FrankenError::OutOfRange {
284 what: "pipeline checkpoint_interval".to_owned(),
285 value: "0".to_owned(),
286 });
287 }
288 Ok(())
289 }
290}
291
292impl Default for PipelineConfig {
293 fn default() -> Self {
294 Self::for_page_size(4096)
295 }
296}
297
298pub trait PageSymbolSink {
304 fn write_symbol(&mut self, esi: u32, data: &[u8]) -> Result<()>;
306
307 fn flush(&mut self) -> Result<()>;
309
310 fn written_count(&self) -> u32;
312}
313
314pub trait PageSymbolSource {
316 fn read_symbol(&mut self, esi: u32) -> Result<Option<Vec<u8>>>;
318
319 fn available_esis(&self) -> Vec<u32>;
321
322 fn available_count(&self) -> u32;
324}
325
326pub trait SymbolCodec: Send + Sync {
335 fn encode(
337 &self,
338 cx: &Cx,
339 source_data: &[u8],
340 symbol_size: u32,
341 repair_overhead: f64,
342 ) -> Result<CodecEncodeResult>;
343
344 fn decode(
346 &self,
347 cx: &Cx,
348 symbols: &[(u32, Vec<u8>)],
349 k_source: u32,
350 symbol_size: u32,
351 ) -> Result<CodecDecodeResult>;
352}
353
354#[derive(Debug, Clone)]
356pub struct CodecEncodeResult {
357 pub source_symbols: Vec<(u32, Vec<u8>)>,
359 pub repair_symbols: Vec<(u32, Vec<u8>)>,
361 pub k_source: u32,
363}
364
365#[derive(Debug, Clone)]
367pub enum CodecDecodeResult {
368 Success {
370 data: Vec<u8>,
372 symbols_used: u32,
374 peeled_count: u32,
376 inactivated_count: u32,
378 },
379 Failure {
381 reason: DecodeFailureReason,
383 symbols_received: u32,
385 k_required: u32,
387 },
388}
389
390#[derive(Debug, Clone)]
396pub struct EncodeOutcome {
397 pub source_count: u32,
399 pub repair_count: u32,
401 pub symbol_size: u32,
403}
404
405#[derive(Debug, Clone)]
407pub enum DecodeOutcome {
408 Success(DecodeSuccess),
410 Failure(DecodeFailure),
412}
413
414#[derive(Debug, Clone)]
416pub struct DecodeSuccess {
417 pub data: Vec<u8>,
419 pub symbols_used: u32,
421 pub peeled_count: u32,
423 pub inactivated_count: u32,
425 pub decode_proof: Option<EcsDecodeProof>,
427}
428
429#[derive(Debug, Clone)]
431pub struct DecodeFailure {
432 pub reason: DecodeFailureReason,
434 pub symbols_received: u32,
436 pub k_required: u32,
438 pub decode_proof: Option<EcsDecodeProof>,
440}
441
442#[derive(Debug, Clone, Copy, PartialEq, Eq)]
444pub enum DecodeFailureReason {
445 InsufficientSymbols,
447 SingularMatrix,
449 SymbolSizeMismatch,
451 Cancelled,
453}
454
455pub struct RaptorQPageEncoder<C: SymbolCodec> {
462 config: PipelineConfig,
463 codec: C,
464}
465
466impl<C: SymbolCodec> RaptorQPageEncoder<C> {
467 pub fn new(config: PipelineConfig, codec: C) -> Result<Self> {
469 config.validate()?;
470 info!(
471 bead_id = BEAD_ID,
472 symbol_size = config.symbol_size,
473 max_block_size = config.max_block_size,
474 repair_overhead = config.repair_overhead,
475 "RaptorQ page encoder created"
476 );
477 Ok(Self { config, codec })
478 }
479
480 #[allow(clippy::cast_possible_truncation)]
486 pub fn encode_pages(
487 &self,
488 cx: &Cx,
489 page_data: &[u8],
490 sink: &mut dyn PageSymbolSink,
491 ) -> Result<EncodeOutcome> {
492 cx.checkpoint().map_err(|_| FrankenError::Abort)?;
493
494 let symbol_size = self.config.symbol_size;
495 let t0 = Instant::now();
496 debug!(
497 bead_id = BEAD_ID,
498 data_len = page_data.len(),
499 symbol_size,
500 "starting page encode"
501 );
502
503 let result = self
504 .codec
505 .encode(cx, page_data, symbol_size, self.config.repair_overhead)?;
506
507 let interval = self.config.checkpoint_interval as usize;
509 for (idx, (esi, data)) in result.source_symbols.iter().enumerate() {
510 if idx > 0 && idx % interval == 0 {
511 cx.checkpoint().map_err(|_| FrankenError::Abort)?;
512 }
513 sink.write_symbol(*esi, data)?;
514 }
515
516 for (idx, (esi, data)) in result.repair_symbols.iter().enumerate() {
518 if idx > 0 && idx % interval == 0 {
519 cx.checkpoint().map_err(|_| FrankenError::Abort)?;
520 }
521 sink.write_symbol(*esi, data)?;
522 }
523
524 sink.flush()?;
525
526 let outcome = EncodeOutcome {
527 source_count: result.k_source,
528 repair_count: result.repair_symbols.len() as u32,
529 symbol_size,
530 };
531
532 let encode_time_us = duration_us_saturating(t0.elapsed());
533
534 let span = tracing::span!(
536 tracing::Level::DEBUG,
537 "raptorq_encode",
538 source_symbols = outcome.source_count,
539 repair_symbols = outcome.repair_count,
540 encode_time_us,
541 encoded_bytes = page_data.len(),
542 symbol_size = outcome.symbol_size,
543 );
544 let _guard = span.enter();
545
546 info!(
547 bead_id = BEAD_ID,
548 source_count = outcome.source_count,
549 repair_count = outcome.repair_count,
550 symbol_size = outcome.symbol_size,
551 encode_time_us,
552 "page encode complete"
553 );
554
555 GLOBAL_RAPTORQ_METRICS
557 .record_encode(page_data.len() as u64, u64::from(outcome.repair_count));
558
559 Ok(outcome)
560 }
561
562 #[must_use]
564 pub const fn config(&self) -> &PipelineConfig {
565 &self.config
566 }
567}
568
569pub struct RaptorQPageDecoder<C: SymbolCodec> {
576 config: PipelineConfig,
577 codec: C,
578}
579
580impl<C: SymbolCodec> RaptorQPageDecoder<C> {
581 pub fn new(config: PipelineConfig, codec: C) -> Result<Self> {
583 config.validate()?;
584 info!(
585 bead_id = BEAD_ID,
586 symbol_size = config.symbol_size,
587 "RaptorQ page decoder created"
588 );
589 Ok(Self { config, codec })
590 }
591
592 #[allow(clippy::cast_possible_truncation, clippy::too_many_lines)]
599 pub fn decode_pages(
600 &self,
601 cx: &Cx,
602 source: &mut dyn PageSymbolSource,
603 k_source: u32,
604 ) -> Result<DecodeOutcome> {
605 cx.checkpoint().map_err(|_| FrankenError::Abort)?;
606 let t0 = Instant::now();
607
608 let available = source.available_count();
609 debug!(
610 bead_id = BEAD_ID,
611 k_source, available, "starting page decode"
612 );
613
614 if available < k_source {
615 warn!(
616 bead_id = BEAD_ID,
617 k_source, available, "fewer symbols than K_source — decode likely to fail"
618 );
619 }
620
621 let esis = source.available_esis();
623 let interval = self.config.checkpoint_interval as usize;
624 let mut symbols = Vec::with_capacity(esis.len());
625 for (idx, esi) in esis.iter().enumerate() {
626 if idx > 0 && idx % interval == 0 {
627 cx.checkpoint().map_err(|_| FrankenError::Abort)?;
628 }
629 if let Some(data) = source.read_symbol(*esi)? {
630 symbols.push((*esi, data));
631 }
632 }
633
634 let codec_result = self
636 .codec
637 .decode(cx, &symbols, k_source, self.config.symbol_size)?;
638 let all_esis = canonical_esis(&symbols);
639 let proof_object_id =
640 derive_decode_proof_object_id(k_source, self.config.symbol_size, &all_esis);
641 let proof_seed = xxh3_64(proof_object_id.as_bytes());
642
643 match codec_result {
644 CodecDecodeResult::Success {
645 data,
646 symbols_used,
647 peeled_count,
648 inactivated_count,
649 } => {
650 info!(
651 bead_id = BEAD_ID,
652 k_source,
653 symbols_used,
654 peeled_count,
655 inactivated_count,
656 "page decode succeeded"
657 );
658 let decode_proof = if self.config.decode_proof_policy.emit_on_repair_success
659 && contains_repair_esi(&all_esis, k_source)
660 {
661 let proof = EcsDecodeProof::from_esis(
662 proof_object_id,
663 k_source,
664 &all_esis,
665 true,
666 Some(symbols_used),
667 deterministic_timing_ns(k_source, self.config.symbol_size, symbols_used),
668 proof_seed,
669 );
670 debug!(
671 bead_id = "bd-faz4",
672 symbols_used, k_source, "emitted repair-success decode proof"
673 );
674 Some(proof)
675 } else {
676 None
677 };
678 if symbols_used == k_source {
679 warn!(
680 bead_id = BEAD_ID,
681 k_source,
682 symbols_used,
683 "fragile recovery: decoded with minimum symbol count"
684 );
685 }
686 let decoded_len = data.len() as u64;
687 let decode_time_us = duration_us_saturating(t0.elapsed());
688
689 let span = tracing::span!(
691 tracing::Level::DEBUG,
692 "raptorq_decode",
693 k_source,
694 symbols_used,
695 decoded_bytes = decoded_len,
696 decode_time_us,
697 ok = true,
698 );
699 let _guard = span.enter();
700
701 GLOBAL_RAPTORQ_METRICS.record_decode_success(decoded_len);
702
703 Ok(DecodeOutcome::Success(DecodeSuccess {
704 data,
705 symbols_used,
706 peeled_count,
707 inactivated_count,
708 decode_proof,
709 }))
710 }
711 CodecDecodeResult::Failure {
712 reason,
713 symbols_received,
714 k_required,
715 } => {
716 let decode_proof = if self.config.decode_proof_policy.emit_on_decode_failure {
717 let intermediate_rank = Some(symbols_received.min(k_required));
718 let proof = EcsDecodeProof::from_esis(
719 proof_object_id,
720 k_source,
721 &all_esis,
722 false,
723 intermediate_rank,
724 deterministic_timing_ns(
725 k_source,
726 self.config.symbol_size,
727 symbols_received,
728 ),
729 proof_seed,
730 );
731 debug!(
732 bead_id = "bd-faz4",
733 symbols_received, k_required, "emitted decode-failure proof"
734 );
735 Some(proof)
736 } else {
737 None
738 };
739 let decode_time_us = duration_us_saturating(t0.elapsed());
740
741 let span = tracing::span!(
743 tracing::Level::DEBUG,
744 "raptorq_decode",
745 k_source,
746 symbols_received,
747 k_required,
748 decode_time_us,
749 ok = false,
750 );
751 let _guard = span.enter();
752
753 error!(
754 bead_id = BEAD_ID,
755 k_source,
756 symbols_received,
757 k_required,
758 reason = ?reason,
759 "page decode failed"
760 );
761
762 GLOBAL_RAPTORQ_METRICS.record_decode_failure();
763
764 Ok(DecodeOutcome::Failure(DecodeFailure {
765 reason,
766 symbols_received,
767 k_required,
768 decode_proof,
769 }))
770 }
771 }
772 }
773
774 #[must_use]
776 pub const fn config(&self) -> &PipelineConfig {
777 &self.config
778 }
779}
780
781fn canonical_esis(symbols: &[(u32, Vec<u8>)]) -> Vec<u32> {
782 let mut esis: Vec<u32> = symbols.iter().map(|(esi, _)| *esi).collect();
783 esis.sort_unstable();
784 esis.dedup();
785 esis
786}
787
788fn contains_repair_esi(esis: &[u32], k_source: u32) -> bool {
789 esis.iter().any(|&esi| esi >= k_source)
790}
791
792fn derive_decode_proof_object_id(k_source: u32, symbol_size: u32, esis: &[u32]) -> ObjectId {
793 let mut material = Vec::with_capacity(40 + esis.len() * 4);
794 material.extend_from_slice(b"fsqlite:raptorq:decode-proof:v1");
795 material.extend_from_slice(&k_source.to_le_bytes());
796 material.extend_from_slice(&symbol_size.to_le_bytes());
797 for esi in esis {
798 material.extend_from_slice(&esi.to_le_bytes());
799 }
800 ObjectId::derive_from_canonical_bytes(&material)
801}
802
803fn deterministic_timing_ns(k_source: u32, symbol_size: u32, symbols_used: u32) -> u64 {
804 let mut material = [0_u8; 12];
805 material[..4].copy_from_slice(&k_source.to_le_bytes());
806 material[4..8].copy_from_slice(&symbol_size.to_le_bytes());
807 material[8..12].copy_from_slice(&symbols_used.to_le_bytes());
808 xxh3_64(&material)
809}
810
811#[cfg(test)]
816#[allow(
817 clippy::cast_possible_truncation,
818 clippy::cast_lossless,
819 clippy::cast_precision_loss,
820 clippy::cast_sign_loss
821)]
822mod tests {
823 use std::collections::{BTreeMap, VecDeque};
824 use std::pin::Pin;
825 use std::task::{Context, Poll};
826
827 use asupersync::error::ErrorKind as AsErrorKind;
828 use asupersync::raptorq::RaptorQReceiverBuilder;
829 use asupersync::raptorq::RaptorQSenderBuilder;
830 use asupersync::security::AuthenticationTag;
831 use asupersync::security::authenticated::AuthenticatedSymbol;
832 use asupersync::transport::error::{SinkError, StreamError};
833 use asupersync::transport::sink::SymbolSink;
834 use asupersync::transport::stream::SymbolStream;
835 use asupersync::types::{ObjectId as AsObjectId, ObjectParams, Symbol, SymbolId, SymbolKind};
836 use asupersync::{Cx as AsCx, RaptorQConfig};
837
838 use super::*;
839
840 struct VecPageSink {
845 symbols: BTreeMap<u32, Vec<u8>>,
846 flushed: bool,
847 }
848
849 impl VecPageSink {
850 fn new() -> Self {
851 Self {
852 symbols: BTreeMap::new(),
853 flushed: false,
854 }
855 }
856 }
857
858 impl PageSymbolSink for VecPageSink {
859 fn write_symbol(&mut self, esi: u32, data: &[u8]) -> Result<()> {
860 self.symbols.insert(esi, data.to_vec());
861 Ok(())
862 }
863
864 fn flush(&mut self) -> Result<()> {
865 self.flushed = true;
866 Ok(())
867 }
868
869 fn written_count(&self) -> u32 {
870 self.symbols.len() as u32
871 }
872 }
873
874 struct VecPageSource {
875 symbols: BTreeMap<u32, Vec<u8>>,
876 }
877
878 impl VecPageSource {
879 fn from_sink(sink: &VecPageSink) -> Self {
880 Self {
881 symbols: sink.symbols.clone(),
882 }
883 }
884
885 fn from_map(symbols: BTreeMap<u32, Vec<u8>>) -> Self {
886 Self { symbols }
887 }
888 }
889
890 impl PageSymbolSource for VecPageSource {
891 fn read_symbol(&mut self, esi: u32) -> Result<Option<Vec<u8>>> {
892 Ok(self.symbols.get(&esi).cloned())
893 }
894
895 fn available_esis(&self) -> Vec<u32> {
896 self.symbols.keys().copied().collect()
897 }
898
899 fn available_count(&self) -> u32 {
900 self.symbols.len() as u32
901 }
902 }
903
904 #[derive(Debug)]
909 struct VecTransportSink {
910 symbols: Vec<Symbol>,
911 }
912
913 impl VecTransportSink {
914 fn new() -> Self {
915 Self {
916 symbols: Vec::new(),
917 }
918 }
919 }
920
921 #[derive(Debug)]
922 struct VecTransportStream {
923 symbols: VecDeque<AuthenticatedSymbol>,
924 }
925
926 impl VecTransportStream {
927 fn new(symbols: Vec<Symbol>) -> Self {
928 let symbols = symbols
929 .into_iter()
930 .map(|symbol| AuthenticatedSymbol::new_verified(symbol, AuthenticationTag::zero()))
931 .collect();
932 Self { symbols }
933 }
934 }
935
936 impl SymbolStream for VecTransportStream {
937 fn poll_next(
938 mut self: Pin<&mut Self>,
939 _cx: &mut Context<'_>,
940 ) -> Poll<Option<std::result::Result<AuthenticatedSymbol, StreamError>>> {
941 match self.symbols.pop_front() {
942 Some(symbol) => Poll::Ready(Some(Ok(symbol))),
943 None => Poll::Ready(None),
944 }
945 }
946
947 fn size_hint(&self) -> (usize, Option<usize>) {
948 (self.symbols.len(), Some(self.symbols.len()))
949 }
950
951 fn is_exhausted(&self) -> bool {
952 self.symbols.is_empty()
953 }
954 }
955
956 const TEST_OBJECT_ID: u64 = 0xBD_1A15;
957 const TEST_MAX_BLOCK_SIZE: usize = 64 * 1024;
958 const PACKED_KIND_REPAIR_BIT: u32 = 1_u32 << 31;
959 const PACKED_SBN_SHIFT: u32 = 23;
960 const PACKED_SBN_MASK: u32 = 0xFF;
961 const PACKED_ESI_MASK: u32 = 0x7F_FFFF;
962
963 fn pack_symbol_key(kind: SymbolKind, sbn: u8, esi: u32) -> Result<u32> {
964 if esi > PACKED_ESI_MASK {
965 return Err(FrankenError::OutOfRange {
966 what: "packed symbol esi (must fit 23 bits)".to_owned(),
967 value: esi.to_string(),
968 });
969 }
970
971 let kind_bit = if kind.is_repair() {
972 PACKED_KIND_REPAIR_BIT
973 } else {
974 0
975 };
976 Ok(kind_bit | (u32::from(sbn) << PACKED_SBN_SHIFT) | esi)
977 }
978
979 fn unpack_symbol_key(packed: u32) -> (SymbolKind, u8, u32) {
980 let kind = if packed & PACKED_KIND_REPAIR_BIT == 0 {
981 SymbolKind::Source
982 } else {
983 SymbolKind::Repair
984 };
985 let sbn = ((packed >> PACKED_SBN_SHIFT) & PACKED_SBN_MASK) as u8;
986 let esi = packed & PACKED_ESI_MASK;
987 (kind, sbn, esi)
988 }
989
990 impl SymbolSink for VecTransportSink {
991 fn poll_send(
992 mut self: Pin<&mut Self>,
993 _cx: &mut Context<'_>,
994 symbol: AuthenticatedSymbol,
995 ) -> Poll<std::result::Result<(), SinkError>> {
996 self.symbols.push(symbol.into_symbol());
997 Poll::Ready(Ok(()))
998 }
999
1000 fn poll_flush(
1001 self: Pin<&mut Self>,
1002 _cx: &mut Context<'_>,
1003 ) -> Poll<std::result::Result<(), SinkError>> {
1004 Poll::Ready(Ok(()))
1005 }
1006
1007 fn poll_close(
1008 self: Pin<&mut Self>,
1009 _cx: &mut Context<'_>,
1010 ) -> Poll<std::result::Result<(), SinkError>> {
1011 Poll::Ready(Ok(()))
1012 }
1013
1014 fn poll_ready(
1015 self: Pin<&mut Self>,
1016 _cx: &mut Context<'_>,
1017 ) -> Poll<std::result::Result<(), SinkError>> {
1018 Poll::Ready(Ok(()))
1019 }
1020 }
1021
1022 struct AsupersyncCodec;
1024
1025 impl SymbolCodec for AsupersyncCodec {
1026 fn encode(
1027 &self,
1028 _cx: &Cx,
1029 source_data: &[u8],
1030 symbol_size: u32,
1031 repair_overhead: f64,
1032 ) -> Result<CodecEncodeResult> {
1033 let mut config = RaptorQConfig::default();
1034 config.encoding.symbol_size = symbol_size as u16;
1035 config.encoding.max_block_size = TEST_MAX_BLOCK_SIZE;
1036 config.encoding.repair_overhead = repair_overhead;
1037
1038 let cx = AsCx::for_testing();
1039 let object_id = AsObjectId::new_for_test(TEST_OBJECT_ID);
1040 let mut sender = RaptorQSenderBuilder::new()
1041 .config(config)
1042 .transport(VecTransportSink::new())
1043 .build()
1044 .map_err(|e| FrankenError::Internal(format!("sender build: {e}")))?;
1045
1046 let outcome = sender
1047 .send_object(&cx, object_id, source_data)
1048 .map_err(|e| FrankenError::Internal(format!("send_object: {e}")))?;
1049
1050 let symbols = std::mem::take(&mut sender.transport_mut().symbols);
1051 let k = outcome.source_symbols as u32;
1052
1053 let mut source_symbols = Vec::new();
1054 let mut repair_symbols = Vec::new();
1055 for s in &symbols {
1056 let packed_key = pack_symbol_key(s.kind(), s.sbn(), s.esi())?;
1057 if s.kind().is_source() {
1058 source_symbols.push((packed_key, s.data().to_vec()));
1059 } else {
1060 repair_symbols.push((packed_key, s.data().to_vec()));
1061 }
1062 }
1063
1064 Ok(CodecEncodeResult {
1065 source_symbols,
1066 repair_symbols,
1067 k_source: k,
1068 })
1069 }
1070
1071 fn decode(
1072 &self,
1073 _cx: &Cx,
1074 symbols: &[(u32, Vec<u8>)],
1075 k_source: u32,
1076 symbol_size: u32,
1077 ) -> Result<CodecDecodeResult> {
1078 if symbols.is_empty() {
1079 return Ok(CodecDecodeResult::Failure {
1080 reason: DecodeFailureReason::InsufficientSymbols,
1081 symbols_received: 0,
1082 k_required: k_source,
1083 });
1084 }
1085
1086 let object_id = AsObjectId::new_for_test(TEST_OBJECT_ID);
1087 let mut config = RaptorQConfig::default();
1088 config.encoding.symbol_size = symbol_size as u16;
1089 config.encoding.max_block_size = TEST_MAX_BLOCK_SIZE;
1090
1091 let source_blocks = 1_u16;
1095 let symbols_per_block = k_source.max(1);
1096 let object_size = u64::from(k_source)
1097 .checked_mul(u64::from(symbol_size))
1098 .ok_or_else(|| FrankenError::OutOfRange {
1099 what: "object_size for decode params".to_owned(),
1100 value: format!("{k_source}*{symbol_size}"),
1101 })?;
1102 let params = ObjectParams::new(
1103 object_id,
1104 object_size,
1105 u16::try_from(symbol_size).map_err(|_| FrankenError::OutOfRange {
1106 what: "symbol_size as u16".to_owned(),
1107 value: symbol_size.to_string(),
1108 })?,
1109 source_blocks,
1110 u16::try_from(symbols_per_block).map_err(|_| FrankenError::OutOfRange {
1111 what: "symbols_per_block as u16".to_owned(),
1112 value: symbols_per_block.to_string(),
1113 })?,
1114 );
1115
1116 let mut rebuilt = Vec::with_capacity(symbols.len());
1117 for (packed, data) in symbols {
1118 let (kind, sbn, esi) = unpack_symbol_key(*packed);
1119 rebuilt.push(Symbol::new(
1120 SymbolId::new(object_id, sbn, esi),
1121 data.clone(),
1122 kind,
1123 ));
1124 }
1125
1126 let cx = AsCx::for_testing();
1127 let mut receiver = RaptorQReceiverBuilder::new()
1128 .config(config)
1129 .source(VecTransportStream::new(rebuilt))
1130 .build()
1131 .map_err(|e| FrankenError::Internal(format!("receiver build: {e}")))?;
1132
1133 match receiver.receive_object(&cx, ¶ms) {
1134 Ok(outcome) => Ok(CodecDecodeResult::Success {
1135 data: outcome.data,
1136 symbols_used: outcome.symbols_received as u32,
1137 peeled_count: 0,
1138 inactivated_count: 0,
1139 }),
1140 Err(err) => {
1141 let reason = match err.kind() {
1142 AsErrorKind::InsufficientSymbols => {
1143 DecodeFailureReason::InsufficientSymbols
1144 }
1145 _ => DecodeFailureReason::SingularMatrix,
1146 };
1147 Ok(CodecDecodeResult::Failure {
1148 reason,
1149 symbols_received: symbols.len() as u32,
1150 k_required: k_source,
1151 })
1152 }
1153 }
1154 }
1155 }
1156
1157 fn deterministic_page_data(k: usize, symbol_size: usize, seed: u64) -> Vec<u8> {
1162 let mut state = seed ^ 0x9E37_79B9_7F4A_7C15;
1163 let total = k * symbol_size;
1164 let mut out = Vec::with_capacity(total);
1165 for idx in 0..total {
1166 state ^= state << 7;
1167 state ^= state >> 9;
1168 state = state.wrapping_mul(0xA24B_AED4_963E_E407);
1169 let idx_byte = (idx % 251) as u8;
1170 out.push((state & 0xFF) as u8 ^ idx_byte);
1171 }
1172 out
1173 }
1174
1175 fn test_cx() -> fsqlite_types::cx::Cx {
1176 fsqlite_types::cx::Cx::new()
1177 }
1178
1179 fn default_codec() -> AsupersyncCodec {
1180 AsupersyncCodec
1181 }
1182
1183 fn default_config() -> PipelineConfig {
1184 PipelineConfig::for_page_size(512)
1185 }
1186
1187 #[test]
1192 fn test_pipeline_encode_produces_source_and_repair() {
1193 let config = default_config();
1194 let encoder =
1195 RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1196 let cx = test_cx();
1197 let k = 10_usize;
1198 let data = deterministic_page_data(k, config.symbol_size as usize, 0x1234);
1199
1200 let mut sink = VecPageSink::new();
1201 let outcome = encoder
1202 .encode_pages(&cx, &data, &mut sink)
1203 .expect("encode must succeed");
1204
1205 assert_eq!(
1206 outcome.source_count as usize, k,
1207 "bead_id={BEAD_ID} case=encode_source_count"
1208 );
1209 assert!(
1210 outcome.repair_count > 0,
1211 "bead_id={BEAD_ID} case=encode_repair_present"
1212 );
1213 assert_eq!(
1214 outcome.symbol_size, config.symbol_size,
1215 "bead_id={BEAD_ID} case=encode_symbol_size"
1216 );
1217 assert!(sink.flushed, "bead_id={BEAD_ID} case=encode_sink_flushed");
1218
1219 let sym_size = config.symbol_size as usize;
1221 for i in 0..k {
1222 let esi = i as u32;
1223 let expected = &data[i * sym_size..(i + 1) * sym_size];
1224 let actual = sink.symbols.get(&esi);
1225 assert!(actual.is_some(), "source symbol ESI {esi} missing");
1226 let actual = actual.expect("source symbol existence asserted");
1227 assert_eq!(
1228 actual, expected,
1229 "bead_id={BEAD_ID} case=encode_source_symbol_matches esi={esi}"
1230 );
1231 }
1232
1233 info!(
1234 bead_id = BEAD_ID,
1235 source_count = outcome.source_count,
1236 repair_count = outcome.repair_count,
1237 total_written = sink.written_count(),
1238 "test_pipeline_encode complete"
1239 );
1240 }
1241
1242 #[test]
1247 fn test_pipeline_decode_with_extra_symbols() {
1248 let config = default_config();
1249 let encoder =
1250 RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1251 let decoder =
1252 RaptorQPageDecoder::new(config.clone(), default_codec()).expect("decoder build");
1253 let cx = test_cx();
1254 let k = 10_usize;
1255 let data = deterministic_page_data(k, config.symbol_size as usize, 0x5678);
1256
1257 let mut sink = VecPageSink::new();
1259 let outcome = encoder
1260 .encode_pages(&cx, &data, &mut sink)
1261 .expect("encode must succeed");
1262
1263 let mut source = VecPageSource::from_sink(&sink);
1265 let decode_outcome = decoder
1266 .decode_pages(&cx, &mut source, outcome.source_count)
1267 .expect("decode must succeed");
1268
1269 match decode_outcome {
1270 DecodeOutcome::Success(success) => {
1271 assert_eq!(
1272 success.data, data,
1273 "bead_id={BEAD_ID} case=decode_roundtrip_bytes"
1274 );
1275 assert!(
1276 success.symbols_used >= outcome.source_count,
1277 "bead_id={BEAD_ID} case=decode_symbols_used"
1278 );
1279 info!(
1280 bead_id = BEAD_ID,
1281 symbols_used = success.symbols_used,
1282 peeled = success.peeled_count,
1283 inactivated = success.inactivated_count,
1284 "test_pipeline_decode complete"
1285 );
1286 }
1287 DecodeOutcome::Failure(failure) => unreachable!(
1288 "bead_id={BEAD_ID} case=decode_unexpected_failure reason={:?}",
1289 failure.reason
1290 ),
1291 }
1292 }
1293
1294 #[test]
1299 fn test_pipeline_cancel_safe_encode() {
1300 let config = PipelineConfig {
1301 checkpoint_interval: 2, ..default_config()
1303 };
1304 let encoder =
1305 RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1306
1307 let cx = fsqlite_types::cx::Cx::new();
1309 cx.cancel_with_reason(fsqlite_types::cx::CancelReason::UserInterrupt);
1310
1311 let k = 10_usize;
1312 let data = deterministic_page_data(k, config.symbol_size as usize, 0xABCD);
1313 let mut sink = VecPageSink::new();
1314
1315 let result = encoder.encode_pages(&cx, &data, &mut sink);
1316 assert!(
1317 result.is_err(),
1318 "bead_id={BEAD_ID} case=cancel_safe_encode_aborts"
1319 );
1320 assert!(
1321 matches!(result.unwrap_err(), FrankenError::Abort),
1322 "bead_id={BEAD_ID} case=cancel_safe_encode_error_type"
1323 );
1324 assert!(!sink.flushed, "bead_id={BEAD_ID} case=cancel_safe_no_flush");
1326 }
1327
1328 #[test]
1329 fn test_pipeline_cancel_safe_decode() {
1330 let config = PipelineConfig {
1331 checkpoint_interval: 2,
1332 ..default_config()
1333 };
1334 let decoder =
1335 RaptorQPageDecoder::new(config.clone(), default_codec()).expect("decoder build");
1336
1337 let cx = fsqlite_types::cx::Cx::new();
1339 cx.cancel_with_reason(fsqlite_types::cx::CancelReason::UserInterrupt);
1340
1341 let mut symbols = BTreeMap::new();
1343 for esi in 0..10_u32 {
1344 symbols.insert(esi, vec![0xAA; config.symbol_size as usize]);
1345 }
1346 let mut source = VecPageSource::from_map(symbols);
1347
1348 let result = decoder.decode_pages(&cx, &mut source, 10);
1349 assert!(
1350 result.is_err(),
1351 "bead_id={BEAD_ID} case=cancel_safe_decode_aborts"
1352 );
1353 assert!(
1354 matches!(result.unwrap_err(), FrankenError::Abort),
1355 "bead_id={BEAD_ID} case=cancel_safe_decode_error_type"
1356 );
1357 }
1358
1359 struct BackpressureSink {
1365 limit: u32,
1366 count: u32,
1367 }
1368
1369 impl BackpressureSink {
1370 fn new(limit: u32) -> Self {
1371 Self { limit, count: 0 }
1372 }
1373 }
1374
1375 impl PageSymbolSink for BackpressureSink {
1376 fn write_symbol(&mut self, _esi: u32, _data: &[u8]) -> Result<()> {
1377 if self.count >= self.limit {
1378 return Err(FrankenError::Busy);
1379 }
1380 self.count += 1;
1381 Ok(())
1382 }
1383
1384 fn flush(&mut self) -> Result<()> {
1385 Ok(())
1386 }
1387
1388 fn written_count(&self) -> u32 {
1389 self.count
1390 }
1391 }
1392
1393 #[test]
1394 fn test_pipeline_backpressure_sink_full() {
1395 let config = default_config();
1396 let encoder =
1397 RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1398 let cx = test_cx();
1399 let k = 10_usize;
1400 let data = deterministic_page_data(k, config.symbol_size as usize, 0xEEFF);
1401
1402 let mut sink = BackpressureSink::new(3);
1404 let result = encoder.encode_pages(&cx, &data, &mut sink);
1405
1406 assert!(
1407 result.is_err(),
1408 "bead_id={BEAD_ID} case=backpressure_propagated"
1409 );
1410 assert!(
1411 matches!(result.unwrap_err(), FrankenError::Busy),
1412 "bead_id={BEAD_ID} case=backpressure_error_type"
1413 );
1414 assert_eq!(
1415 sink.written_count(),
1416 3,
1417 "bead_id={BEAD_ID} case=backpressure_partial_write"
1418 );
1419 }
1420
1421 #[test]
1426 fn test_config_validation_zero_symbol_size() {
1427 let config = PipelineConfig {
1428 symbol_size: 0,
1429 ..default_config()
1430 };
1431 assert!(
1432 config.validate().is_err(),
1433 "bead_id={BEAD_ID} case=config_reject_zero_symbol_size"
1434 );
1435 }
1436
1437 #[test]
1438 fn test_config_validation_non_power_of_two() {
1439 let config = PipelineConfig {
1440 symbol_size: 1000,
1441 ..default_config()
1442 };
1443 assert!(
1444 config.validate().is_err(),
1445 "bead_id={BEAD_ID} case=config_reject_non_power_of_two"
1446 );
1447 }
1448
1449 #[test]
1450 fn test_config_validation_below_min() {
1451 let config = PipelineConfig {
1452 symbol_size: 256,
1453 ..default_config()
1454 };
1455 assert!(
1456 config.validate().is_err(),
1457 "bead_id={BEAD_ID} case=config_reject_below_min"
1458 );
1459 }
1460
1461 #[test]
1462 fn test_config_validation_above_max() {
1463 let config = PipelineConfig {
1464 symbol_size: 128 * 1024,
1465 ..default_config()
1466 };
1467 assert!(
1468 config.validate().is_err(),
1469 "bead_id={BEAD_ID} case=config_reject_above_max"
1470 );
1471 }
1472
1473 #[test]
1474 fn test_config_validation_zero_max_block_size() {
1475 let config = PipelineConfig {
1476 max_block_size: 0,
1477 ..default_config()
1478 };
1479 assert!(
1480 config.validate().is_err(),
1481 "bead_id={BEAD_ID} case=config_reject_zero_max_block"
1482 );
1483 }
1484
1485 #[test]
1486 fn test_config_validation_repair_overhead_below_one() {
1487 let config = PipelineConfig {
1488 repair_overhead: 0.5,
1489 ..default_config()
1490 };
1491 assert!(
1492 config.validate().is_err(),
1493 "bead_id={BEAD_ID} case=config_reject_repair_overhead_below_one"
1494 );
1495 }
1496
1497 #[test]
1498 fn test_config_validation_zero_checkpoint_interval() {
1499 let config = PipelineConfig {
1500 checkpoint_interval: 0,
1501 ..default_config()
1502 };
1503 assert!(
1504 config.validate().is_err(),
1505 "bead_id={BEAD_ID} case=config_reject_zero_checkpoint_interval"
1506 );
1507 }
1508
1509 #[test]
1510 fn test_config_validation_valid_configs() {
1511 for symbol_size in [512, 1024, 2048, 4096, 8192, 16384, 32768, 65536] {
1512 let config = PipelineConfig::for_page_size(symbol_size);
1513 assert!(
1514 config.validate().is_ok(),
1515 "bead_id={BEAD_ID} case=config_valid symbol_size={symbol_size}"
1516 );
1517 }
1518 }
1519
1520 #[test]
1525 fn test_decode_failure_insufficient_symbols() {
1526 let config = default_config();
1527 let encoder =
1528 RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1529 let decoder =
1530 RaptorQPageDecoder::new(config.clone(), default_codec()).expect("decoder build");
1531 let cx = test_cx();
1532 let k = 10_usize;
1533 let data = deterministic_page_data(k, config.symbol_size as usize, 0xDEAD);
1534
1535 let mut sink = VecPageSink::new();
1537 let outcome = encoder
1538 .encode_pages(&cx, &data, &mut sink)
1539 .expect("encode must succeed");
1540
1541 let mut partial = BTreeMap::new();
1543 for esi in 0..((k - 3) as u32) {
1544 if let Some(sym) = sink.symbols.get(&esi) {
1545 partial.insert(esi, sym.clone());
1546 }
1547 }
1548 let mut source = VecPageSource::from_map(partial);
1549
1550 let decode_outcome = decoder
1551 .decode_pages(&cx, &mut source, outcome.source_count)
1552 .expect("decode call itself should not error");
1553
1554 match decode_outcome {
1555 DecodeOutcome::Failure(failure) => {
1556 assert_eq!(
1557 failure.reason,
1558 DecodeFailureReason::InsufficientSymbols,
1559 "bead_id={BEAD_ID} case=decode_failure_reason"
1560 );
1561 assert!(
1562 failure.symbols_received < outcome.source_count,
1563 "bead_id={BEAD_ID} case=decode_failure_symbol_count"
1564 );
1565 assert_eq!(
1566 failure.k_required, outcome.source_count,
1567 "bead_id={BEAD_ID} case=decode_failure_k_required"
1568 );
1569 assert!(
1570 failure.decode_proof.is_none(),
1571 "bead_id={BEAD_ID} case=decode_failure_proof_disabled_by_default"
1572 );
1573 }
1574 DecodeOutcome::Success(_) => {
1575 unreachable!("bead_id={BEAD_ID} case=decode_should_have_failed")
1576 }
1577 }
1578 }
1579
1580 #[test]
1581 fn test_decode_failure_emits_proof_when_enabled() {
1582 let mut config = default_config();
1583 config.decode_proof_policy = DecodeProofEmissionPolicy {
1584 emit_on_decode_failure: true,
1585 emit_on_repair_success: false,
1586 };
1587 let encoder =
1588 RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1589 let decoder =
1590 RaptorQPageDecoder::new(config.clone(), default_codec()).expect("decoder build");
1591 let cx = test_cx();
1592 let k = 10_usize;
1593 let data = deterministic_page_data(k, config.symbol_size as usize, 0xFA24);
1594
1595 let mut sink = VecPageSink::new();
1596 let outcome = encoder
1597 .encode_pages(&cx, &data, &mut sink)
1598 .expect("encode must succeed");
1599
1600 let mut partial = BTreeMap::new();
1601 for esi in 0..((k - 2) as u32) {
1602 if let Some(sym) = sink.symbols.get(&esi) {
1603 partial.insert(esi, sym.clone());
1604 }
1605 }
1606 let mut source = VecPageSource::from_map(partial);
1607 let decode_outcome = decoder
1608 .decode_pages(&cx, &mut source, outcome.source_count)
1609 .expect("decode call itself should not error");
1610
1611 match decode_outcome {
1612 DecodeOutcome::Failure(failure) => {
1613 let proof = failure
1614 .decode_proof
1615 .expect("bead_id=bd-faz4 case=decode_failure_proof_emitted");
1616 assert!(
1617 !proof.decode_success,
1618 "bead_id=bd-faz4 case=decode_failure_proof_flag"
1619 );
1620 assert!(
1621 proof.is_consistent(),
1622 "bead_id=bd-faz4 case=decode_failure_proof_consistent"
1623 );
1624 }
1625 DecodeOutcome::Success(_) => {
1626 unreachable!("bead_id=bd-faz4 case=decode_failure_expected")
1627 }
1628 }
1629 }
1630
1631 #[test]
1632 fn test_decode_success_with_repair_emits_proof_when_enabled() {
1633 let mut config = default_config();
1634 config.decode_proof_policy = DecodeProofEmissionPolicy {
1635 emit_on_decode_failure: false,
1636 emit_on_repair_success: true,
1637 };
1638 let encoder =
1639 RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1640 let decoder =
1641 RaptorQPageDecoder::new(config.clone(), default_codec()).expect("decoder build");
1642 let cx = test_cx();
1643 let k = 10_usize;
1644 let data = deterministic_page_data(k, config.symbol_size as usize, 0xF0AA);
1645
1646 let mut sink = VecPageSink::new();
1647 let outcome = encoder
1648 .encode_pages(&cx, &data, &mut sink)
1649 .expect("encode must succeed");
1650 let mut source = VecPageSource::from_sink(&sink);
1651 let decode_outcome = decoder
1652 .decode_pages(&cx, &mut source, outcome.source_count)
1653 .expect("decode must succeed");
1654
1655 match decode_outcome {
1656 DecodeOutcome::Success(success) => {
1657 let proof = success
1658 .decode_proof
1659 .expect("bead_id=bd-faz4 case=repair_success_proof_emitted");
1660 assert!(proof.decode_success);
1661 assert!(proof.is_repair());
1662 assert!(
1663 proof.is_consistent(),
1664 "bead_id=bd-faz4 case=repair_success_proof_consistent"
1665 );
1666 }
1667 DecodeOutcome::Failure(failure) => unreachable!(
1668 "bead_id=bd-faz4 case=repair_success_should_decode reason={:?}",
1669 failure.reason
1670 ),
1671 }
1672 }
1673
1674 #[test]
1679 fn test_e2e_roundtrip_multiple_page_sizes() {
1680 for &symbol_size in &[512_u32, 1024, 4096] {
1681 let config = PipelineConfig::for_page_size(symbol_size);
1682 let encoder =
1683 RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1684 let decoder =
1685 RaptorQPageDecoder::new(config.clone(), default_codec()).expect("decoder build");
1686 let cx = test_cx();
1687
1688 let k = 8_usize;
1689 let data = deterministic_page_data(k, symbol_size as usize, u64::from(symbol_size));
1690
1691 let mut sink = VecPageSink::new();
1693 let outcome = encoder
1694 .encode_pages(&cx, &data, &mut sink)
1695 .expect("encode must succeed");
1696
1697 let mut source = VecPageSource::from_sink(&sink);
1699 let decode_result = decoder
1700 .decode_pages(&cx, &mut source, outcome.source_count)
1701 .expect("decode must succeed");
1702
1703 match decode_result {
1704 DecodeOutcome::Success(success) => {
1705 assert_eq!(
1706 success.data, data,
1707 "bead_id={BEAD_ID} case=e2e_roundtrip symbol_size={symbol_size}"
1708 );
1709 }
1710 DecodeOutcome::Failure(f) => unreachable!(
1711 "bead_id={BEAD_ID} case=e2e_roundtrip_failure symbol_size={symbol_size} reason={:?}",
1712 f.reason
1713 ),
1714 }
1715 }
1716 }
1717
1718 #[test]
1719 fn test_e2e_roundtrip_64_pages() {
1720 let config = PipelineConfig::for_page_size(4096);
1721 let encoder =
1722 RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1723 let decoder =
1724 RaptorQPageDecoder::new(config.clone(), default_codec()).expect("decoder build");
1725 let cx = test_cx();
1726
1727 let k = 64_usize;
1728 let data = deterministic_page_data(k, config.symbol_size as usize, 0xE2E6_4000);
1729
1730 let mut sink = VecPageSink::new();
1731 let outcome = encoder
1732 .encode_pages(&cx, &data, &mut sink)
1733 .expect("encode must succeed");
1734
1735 assert_eq!(
1736 outcome.source_count as usize, k,
1737 "bead_id={BEAD_ID} case=e2e_64_source_count"
1738 );
1739
1740 let mut source = VecPageSource::from_sink(&sink);
1741 let decode_result = decoder
1742 .decode_pages(&cx, &mut source, outcome.source_count)
1743 .expect("decode must succeed");
1744
1745 match decode_result {
1746 DecodeOutcome::Success(success) => {
1747 assert_eq!(
1748 success.data, data,
1749 "bead_id={BEAD_ID} case=e2e_64_roundtrip_bytes"
1750 );
1751 info!(
1752 bead_id = BEAD_ID,
1753 k,
1754 peeled = success.peeled_count,
1755 inactivated = success.inactivated_count,
1756 "E2E 64-page roundtrip complete"
1757 );
1758 }
1759 DecodeOutcome::Failure(f) => unreachable!(
1760 "bead_id={BEAD_ID} case=e2e_64_failure reason={:?}",
1761 f.reason
1762 ),
1763 }
1764 }
1765
1766 #[test]
1767 fn test_e2e_bd_1hi_5() {
1768 let config = PipelineConfig::for_page_size(4096);
1769 let encoder =
1770 RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1771 let decoder =
1772 RaptorQPageDecoder::new(config.clone(), default_codec()).expect("decoder build");
1773 let cx = test_cx();
1774
1775 let k = 64_usize;
1777 let data = deterministic_page_data(k, config.symbol_size as usize, 0xB1D1_5005);
1778 let mut sink = VecPageSink::new();
1779 let outcome = encoder
1780 .encode_pages(&cx, &data, &mut sink)
1781 .expect("encode must succeed");
1782
1783 let mut dropped = 0_u32;
1785 let mut degraded = BTreeMap::new();
1786 for (packed_key, symbol_bytes) in &sink.symbols {
1787 let (kind, _sbn, esi) = unpack_symbol_key(*packed_key);
1788 if kind.is_source() && esi == 0 {
1789 dropped += 1;
1790 continue;
1791 }
1792 degraded.insert(*packed_key, symbol_bytes.clone());
1793 }
1794 assert!(dropped > 0, "bead_id={BEAD_ID} case=e2e_named_dropped_some");
1795
1796 let mut source = VecPageSource::from_map(degraded);
1797 let decode_result = decoder
1798 .decode_pages(&cx, &mut source, outcome.source_count)
1799 .expect("decode must complete");
1800
1801 match decode_result {
1802 DecodeOutcome::Success(success) => {
1803 assert_eq!(
1804 success.data, data,
1805 "bead_id={BEAD_ID} case=e2e_named_byte_perfect_recovery"
1806 );
1807 }
1808 DecodeOutcome::Failure(f) => unreachable!(
1809 "bead_id={BEAD_ID} case=e2e_named_unexpected_failure reason={:?}",
1810 f.reason
1811 ),
1812 }
1813 }
1814
1815 #[test]
1820 fn test_e2e_retry_after_failure() {
1821 let config = default_config();
1822 let encoder =
1823 RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1824 let decoder =
1825 RaptorQPageDecoder::new(config.clone(), default_codec()).expect("decoder build");
1826 let cx = test_cx();
1827 let k = 10_usize;
1828 let data = deterministic_page_data(k, config.symbol_size as usize, 0xAE_7121);
1829
1830 let mut sink = VecPageSink::new();
1832 let outcome = encoder
1833 .encode_pages(&cx, &data, &mut sink)
1834 .expect("encode must succeed");
1835
1836 let mut partial = BTreeMap::new();
1838 for esi in 0..((k - 2) as u32) {
1839 if let Some(sym) = sink.symbols.get(&esi) {
1840 partial.insert(esi, sym.clone());
1841 }
1842 }
1843 let mut source_attempt1 = VecPageSource::from_map(partial.clone());
1844 let result1 = decoder
1845 .decode_pages(&cx, &mut source_attempt1, outcome.source_count)
1846 .expect("decode call should not error");
1847 assert!(
1848 matches!(result1, DecodeOutcome::Failure(_)),
1849 "bead_id={BEAD_ID} case=retry_first_attempt_fails"
1850 );
1851
1852 let full = sink.symbols.clone();
1854 let mut source_attempt2 = VecPageSource::from_map(full);
1855 let result2 = decoder
1856 .decode_pages(&cx, &mut source_attempt2, outcome.source_count)
1857 .expect("decode call should not error");
1858 match result2 {
1859 DecodeOutcome::Success(success) => {
1860 assert_eq!(
1861 success.data, data,
1862 "bead_id={BEAD_ID} case=retry_second_attempt_succeeds"
1863 );
1864 }
1865 DecodeOutcome::Failure(f) => unreachable!(
1866 "bead_id={BEAD_ID} case=retry_second_should_succeed reason={:?}",
1867 f.reason
1868 ),
1869 }
1870 }
1871
1872 #[test]
1877 fn test_decode_source_only_exact_k() {
1878 let config = default_config();
1879 let encoder =
1880 RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1881 let decoder =
1882 RaptorQPageDecoder::new(config.clone(), default_codec()).expect("decoder build");
1883 let cx = test_cx();
1884 let k = 8_usize;
1885 let data = deterministic_page_data(k, config.symbol_size as usize, 0xE4AC7);
1886
1887 let mut sink = VecPageSink::new();
1888 let outcome = encoder
1889 .encode_pages(&cx, &data, &mut sink)
1890 .expect("encode must succeed");
1891
1892 let mut source_only = BTreeMap::new();
1894 for esi in 0..(k as u32) {
1895 if let Some(sym) = sink.symbols.get(&esi) {
1896 source_only.insert(esi, sym.clone());
1897 }
1898 }
1899
1900 let mut source = VecPageSource::from_map(source_only);
1901 let decode_result = decoder
1902 .decode_pages(&cx, &mut source, outcome.source_count)
1903 .expect("decode must not error");
1904
1905 match decode_result {
1906 DecodeOutcome::Success(success) => {
1907 assert_eq!(
1908 success.data, data,
1909 "bead_id={BEAD_ID} case=exact_k_roundtrip"
1910 );
1911 assert_eq!(
1912 success.symbols_used, k as u32,
1913 "bead_id={BEAD_ID} case=exact_k_symbols_used"
1914 );
1915 }
1916 DecodeOutcome::Failure(f) => unreachable!(
1917 "bead_id={BEAD_ID} case=exact_k_should_succeed reason={:?}",
1918 f.reason
1919 ),
1920 }
1921 }
1922
1923 #[test]
1932 fn metrics_struct_encode_counters() {
1933 let m = RaptorQMetrics::new();
1934 m.record_encode(2048, 3);
1935 m.record_encode(4096, 5);
1936
1937 let snap = m.snapshot();
1938 assert_eq!(snap.encode_ops, 2);
1939 assert_eq!(snap.encoded_bytes_total, 6144);
1940 assert_eq!(snap.repair_symbols_generated_total, 8);
1941 assert_eq!(snap.decode_ops, 0);
1942 }
1943
1944 #[test]
1945 fn metrics_struct_decode_counters() {
1946 let m = RaptorQMetrics::new();
1947 m.record_decode_success(4096);
1948 m.record_decode_success(2048);
1949 m.record_decode_failure();
1950
1951 let snap = m.snapshot();
1952 assert_eq!(snap.decode_ops, 3);
1953 assert_eq!(snap.decoded_bytes_total, 6144);
1954 assert_eq!(snap.decode_failures, 1);
1955 assert_eq!(snap.encode_ops, 0);
1956 }
1957
1958 #[test]
1959 fn metrics_snapshot_display() {
1960 let m = RaptorQMetrics::new();
1961 m.record_encode(4096, 2);
1962 m.record_decode_success(4096);
1963 let snap = m.snapshot();
1964 let display = format!("{snap}");
1965 assert!(display.contains("4096"), "encoded bytes in display");
1966 assert!(display.contains("2 repair"), "repair syms in display");
1967 }
1968
1969 #[test]
1970 fn metrics_reset() {
1971 let m = RaptorQMetrics::new();
1972 m.record_encode(1000, 5);
1973 m.record_decode_success(500);
1974 m.record_decode_failure();
1975 m.reset();
1976 let snap = m.snapshot();
1977 assert_eq!(snap.encoded_bytes_total, 0);
1978 assert_eq!(snap.repair_symbols_generated_total, 0);
1979 assert_eq!(snap.encode_ops, 0);
1980 assert_eq!(snap.decode_ops, 0);
1981 assert_eq!(snap.decode_failures, 0);
1982 assert_eq!(snap.decoded_bytes_total, 0);
1983 }
1984
1985 #[test]
1986 fn metrics_global_wired_to_encode_decode() {
1987 let before = GLOBAL_RAPTORQ_METRICS.snapshot();
1991
1992 let config = default_config();
1993 let encoder =
1994 RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1995 let decoder =
1996 RaptorQPageDecoder::new(config.clone(), default_codec()).expect("decoder build");
1997 let cx = test_cx();
1998 let k = 4_usize;
1999 let data = deterministic_page_data(k, config.symbol_size as usize, 0xF00D);
2000
2001 let mut sink = VecPageSink::new();
2002 let outcome = encoder.encode_pages(&cx, &data, &mut sink).expect("encode");
2003 let mut source = VecPageSource::from_sink(&sink);
2004 let _decode = decoder
2005 .decode_pages(&cx, &mut source, outcome.source_count)
2006 .expect("decode");
2007
2008 let after = GLOBAL_RAPTORQ_METRICS.snapshot();
2009 assert!(
2010 after.encode_ops > before.encode_ops,
2011 "global encode_ops should have increased"
2012 );
2013 assert!(
2014 after.encoded_bytes_total > before.encoded_bytes_total,
2015 "global encoded_bytes should have increased"
2016 );
2017 assert!(
2018 after.decode_ops > before.decode_ops,
2019 "global decode_ops should have increased"
2020 );
2021 assert!(
2022 after.decoded_bytes_total > before.decoded_bytes_total,
2023 "global decoded_bytes should have increased"
2024 );
2025 }
2026}