Skip to main content

fsqlite_core/
raptorq_integration.rs

1//! §3.3 Asupersync RaptorQ Pipeline Integration (bd-1hi.5).
2//!
3//! This module provides the FrankenSQLite-side wrapper types for the
4//! asupersync RaptorQ pipeline.  Production code uses abstract traits
5//! (`PageSymbolSink`, `PageSymbolSource`, `SymbolCodec`) so that the
6//! actual asupersync dependency remains dev-only.
7//!
8//! # Cx Cancellation
9//!
10//! All long-running encode/decode loops call `cx.checkpoint()` every
11//! `checkpoint_interval` symbols (§4.12.1).  If the context is cancelled
12//! the operation returns `FrankenError::Abort`.
13
14use fsqlite_types::sync_primitives::Instant;
15use std::fmt;
16use std::sync::atomic::{AtomicU64, Ordering};
17
18use fsqlite_error::{FrankenError, Result};
19use fsqlite_types::{ObjectId, cx::Cx};
20use tracing::{debug, error, info, warn};
21use xxhash_rust::xxh3::xxh3_64;
22
23use crate::decode_proofs::EcsDecodeProof;
24
25const BEAD_ID: &str = "bd-1hi.5";
26
27// ---------------------------------------------------------------------------
28// RaptorQ Metrics (bd-3bw.1)
29// ---------------------------------------------------------------------------
30
31/// Global atomic counters for RaptorQ encode/decode operations.
32///
33/// These metrics track cumulative byte and symbol counts for observability
34/// and capacity planning.  All counters are monotonically increasing and
35/// use `Relaxed` ordering (sufficient for diagnostic counters).
36pub struct RaptorQMetrics {
37    /// Total bytes encoded via `encode_pages()`.
38    pub encoded_bytes_total: AtomicU64,
39    /// Total repair symbols generated across all encode calls.
40    pub repair_symbols_generated_total: AtomicU64,
41    /// Total bytes successfully decoded via `decode_pages()`.
42    pub decoded_bytes_total: AtomicU64,
43    /// Total encode operations.
44    pub encode_ops: AtomicU64,
45    /// Total decode operations (success + failure).
46    pub decode_ops: AtomicU64,
47    /// Total decode failures.
48    pub decode_failures: AtomicU64,
49}
50
51impl RaptorQMetrics {
52    /// Create a new zeroed metrics instance.  `const` so it can back a
53    /// `static`.
54    #[must_use]
55    pub const fn new() -> Self {
56        Self {
57            encoded_bytes_total: AtomicU64::new(0),
58            repair_symbols_generated_total: AtomicU64::new(0),
59            decoded_bytes_total: AtomicU64::new(0),
60            encode_ops: AtomicU64::new(0),
61            decode_ops: AtomicU64::new(0),
62            decode_failures: AtomicU64::new(0),
63        }
64    }
65
66    /// Record a successful encode operation.
67    pub fn record_encode(&self, encoded_bytes: u64, repair_symbols: u64) {
68        self.encoded_bytes_total
69            .fetch_add(encoded_bytes, Ordering::Relaxed);
70        self.repair_symbols_generated_total
71            .fetch_add(repair_symbols, Ordering::Relaxed);
72        self.encode_ops.fetch_add(1, Ordering::Relaxed);
73    }
74
75    /// Record a successful decode operation.
76    pub fn record_decode_success(&self, decoded_bytes: u64) {
77        self.decoded_bytes_total
78            .fetch_add(decoded_bytes, Ordering::Relaxed);
79        self.decode_ops.fetch_add(1, Ordering::Relaxed);
80    }
81
82    /// Record a failed decode operation.
83    pub fn record_decode_failure(&self) {
84        self.decode_ops.fetch_add(1, Ordering::Relaxed);
85        self.decode_failures.fetch_add(1, Ordering::Relaxed);
86    }
87
88    /// Take a point-in-time snapshot of all counters.
89    #[must_use]
90    pub fn snapshot(&self) -> RaptorQMetricsSnapshot {
91        RaptorQMetricsSnapshot {
92            encoded_bytes_total: self.encoded_bytes_total.load(Ordering::Relaxed),
93            repair_symbols_generated_total: self
94                .repair_symbols_generated_total
95                .load(Ordering::Relaxed),
96            decoded_bytes_total: self.decoded_bytes_total.load(Ordering::Relaxed),
97            encode_ops: self.encode_ops.load(Ordering::Relaxed),
98            decode_ops: self.decode_ops.load(Ordering::Relaxed),
99            decode_failures: self.decode_failures.load(Ordering::Relaxed),
100        }
101    }
102
103    /// Reset all counters to zero (useful for tests).
104    pub fn reset(&self) {
105        self.encoded_bytes_total.store(0, Ordering::Relaxed);
106        self.repair_symbols_generated_total
107            .store(0, Ordering::Relaxed);
108        self.decoded_bytes_total.store(0, Ordering::Relaxed);
109        self.encode_ops.store(0, Ordering::Relaxed);
110        self.decode_ops.store(0, Ordering::Relaxed);
111        self.decode_failures.store(0, Ordering::Relaxed);
112    }
113}
114
115impl Default for RaptorQMetrics {
116    fn default() -> Self {
117        Self::new()
118    }
119}
120
121/// Global RaptorQ metrics singleton.
122pub static GLOBAL_RAPTORQ_METRICS: RaptorQMetrics = RaptorQMetrics::new();
123
124/// Point-in-time snapshot of [`RaptorQMetrics`].
125#[derive(Debug, Clone, Copy, PartialEq, Eq)]
126pub struct RaptorQMetricsSnapshot {
127    pub encoded_bytes_total: u64,
128    pub repair_symbols_generated_total: u64,
129    pub decoded_bytes_total: u64,
130    pub encode_ops: u64,
131    pub decode_ops: u64,
132    pub decode_failures: u64,
133}
134
135impl fmt::Display for RaptorQMetricsSnapshot {
136    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
137        write!(
138            f,
139            "raptorq: encoded={} bytes ({} ops, {} repair syms), decoded={} bytes ({} ops, {} failures)",
140            self.encoded_bytes_total,
141            self.encode_ops,
142            self.repair_symbols_generated_total,
143            self.decoded_bytes_total,
144            self.decode_ops,
145            self.decode_failures,
146        )
147    }
148}
149
150/// Convert a `Duration` to microseconds, saturating at `u64::MAX`.
151fn duration_us_saturating(d: std::time::Duration) -> u64 {
152    u64::try_from(d.as_micros()).unwrap_or(u64::MAX)
153}
154
155// ---------------------------------------------------------------------------
156// Pipeline Configuration (§3.3)
157// ---------------------------------------------------------------------------
158
159/// Minimum allowed symbol size (bytes).
160pub const MIN_PIPELINE_SYMBOL_SIZE: u32 = 512;
161
162/// Maximum allowed symbol size (bytes).
163pub const MAX_PIPELINE_SYMBOL_SIZE: u32 = 65_536;
164
165/// Default Cx checkpoint interval (symbols between cancellation checks).
166pub const DEFAULT_CHECKPOINT_INTERVAL: u32 = 64;
167
168/// Policy surface for decode-proof emission hooks.
169///
170/// This keeps proof generation optional in production while allowing
171/// durability paths and tests to request deterministic proof artifacts.
172#[derive(Debug, Clone, Copy, PartialEq, Eq)]
173pub struct DecodeProofEmissionPolicy {
174    /// Emit proof records for decode failures.
175    pub emit_on_decode_failure: bool,
176    /// Emit proof records for successful decodes that required repair symbols.
177    pub emit_on_repair_success: bool,
178}
179
180impl DecodeProofEmissionPolicy {
181    /// Default production posture: proof emission disabled.
182    #[must_use]
183    pub const fn disabled() -> Self {
184        Self {
185            emit_on_decode_failure: false,
186            emit_on_repair_success: false,
187        }
188    }
189
190    /// Durability-focused posture for replication/WAL-style decode paths.
191    #[must_use]
192    pub const fn durability_critical() -> Self {
193        Self {
194            emit_on_decode_failure: true,
195            emit_on_repair_success: true,
196        }
197    }
198}
199
200impl Default for DecodeProofEmissionPolicy {
201    fn default() -> Self {
202        Self::disabled()
203    }
204}
205
206/// FrankenSQLite-side RaptorQ pipeline configuration (§3.3).
207///
208/// Mirrors the needed subset of asupersync's `RaptorQConfig` so that
209/// production code does not depend on asupersync directly.
210#[derive(Debug, Clone, PartialEq)]
211pub struct PipelineConfig {
212    /// Symbol size T in bytes.  Must be a power of two in
213    /// `[MIN_PIPELINE_SYMBOL_SIZE, MAX_PIPELINE_SYMBOL_SIZE]`.
214    pub symbol_size: u32,
215    /// Maximum source block size (max K per source block) in bytes.
216    pub max_block_size: u32,
217    /// Repair overhead factor.  E.g. `1.25` means 25 % extra repair symbols.
218    pub repair_overhead: f64,
219    /// Symbols between `Cx::checkpoint()` calls (§4.12.1).
220    pub checkpoint_interval: u32,
221    /// Decode-proof emission policy hooks (§3.5.8 / bd-faz4).
222    pub decode_proof_policy: DecodeProofEmissionPolicy,
223}
224
225impl PipelineConfig {
226    /// Create a configuration for page-sized symbols (T = page_size).
227    #[must_use]
228    pub fn for_page_size(page_size: u32) -> Self {
229        Self {
230            symbol_size: page_size,
231            max_block_size: 64 * 1024,
232            repair_overhead: 1.25,
233            checkpoint_interval: DEFAULT_CHECKPOINT_INTERVAL,
234            decode_proof_policy: DecodeProofEmissionPolicy::default(),
235        }
236    }
237
238    /// Validate this configuration.
239    ///
240    /// Rejects:
241    /// - `symbol_size == 0`
242    /// - `symbol_size` not a power of two
243    /// - `symbol_size` outside `[MIN, MAX]`
244    /// - `max_block_size == 0`
245    /// - `repair_overhead < 1.0`
246    /// - `checkpoint_interval == 0`
247    pub fn validate(&self) -> Result<()> {
248        if self.symbol_size == 0 {
249            return Err(FrankenError::OutOfRange {
250                what: "pipeline symbol_size".to_owned(),
251                value: "0".to_owned(),
252            });
253        }
254        if !self.symbol_size.is_power_of_two() {
255            return Err(FrankenError::OutOfRange {
256                what: "pipeline symbol_size (must be power of 2)".to_owned(),
257                value: self.symbol_size.to_string(),
258            });
259        }
260        if self.symbol_size < MIN_PIPELINE_SYMBOL_SIZE
261            || self.symbol_size > MAX_PIPELINE_SYMBOL_SIZE
262        {
263            return Err(FrankenError::OutOfRange {
264                what: format!(
265                    "pipeline symbol_size (must be in [{MIN_PIPELINE_SYMBOL_SIZE}, {MAX_PIPELINE_SYMBOL_SIZE}])"
266                ),
267                value: self.symbol_size.to_string(),
268            });
269        }
270        if self.max_block_size == 0 {
271            return Err(FrankenError::OutOfRange {
272                what: "pipeline max_block_size".to_owned(),
273                value: "0".to_owned(),
274            });
275        }
276        if self.repair_overhead < 1.0 {
277            return Err(FrankenError::OutOfRange {
278                what: "pipeline repair_overhead (must be >= 1.0)".to_owned(),
279                value: self.repair_overhead.to_string(),
280            });
281        }
282        if self.checkpoint_interval == 0 {
283            return Err(FrankenError::OutOfRange {
284                what: "pipeline checkpoint_interval".to_owned(),
285                value: "0".to_owned(),
286            });
287        }
288        Ok(())
289    }
290}
291
292impl Default for PipelineConfig {
293    fn default() -> Self {
294        Self::for_page_size(4096)
295    }
296}
297
298// ---------------------------------------------------------------------------
299// Page Symbol Sink / Source Traits (§3.3)
300// ---------------------------------------------------------------------------
301
302/// Writes encoded page symbols to WAL/ECS storage.
303pub trait PageSymbolSink {
304    /// Write a single encoded symbol.
305    fn write_symbol(&mut self, esi: u32, data: &[u8]) -> Result<()>;
306
307    /// Flush all buffered symbols to durable storage.
308    fn flush(&mut self) -> Result<()>;
309
310    /// Number of symbols written so far.
311    fn written_count(&self) -> u32;
312}
313
314/// Reads symbols from WAL/ECS storage for decoding.
315pub trait PageSymbolSource {
316    /// Read a symbol by its ESI.  Returns `None` if unavailable (erased).
317    fn read_symbol(&mut self, esi: u32) -> Result<Option<Vec<u8>>>;
318
319    /// All available ESIs in this source.
320    fn available_esis(&self) -> Vec<u32>;
321
322    /// Number of available symbols.
323    fn available_count(&self) -> u32;
324}
325
326// ---------------------------------------------------------------------------
327// Symbol Codec Trait (§3.3)
328// ---------------------------------------------------------------------------
329
330/// Abstraction over the actual RaptorQ encode/decode engine.
331///
332/// In production, this wraps asupersync's `RaptorQSenderBuilder` /
333/// `RaptorQReceiverBuilder`.  In tests, it may be a mock.
334pub trait SymbolCodec: Send + Sync {
335    /// Encode source data into source + repair symbols.
336    fn encode(
337        &self,
338        cx: &Cx,
339        source_data: &[u8],
340        symbol_size: u32,
341        repair_overhead: f64,
342    ) -> Result<CodecEncodeResult>;
343
344    /// Decode from received symbols.
345    fn decode(
346        &self,
347        cx: &Cx,
348        symbols: &[(u32, Vec<u8>)],
349        k_source: u32,
350        symbol_size: u32,
351    ) -> Result<CodecDecodeResult>;
352}
353
354/// Raw encode result from the codec.
355#[derive(Debug, Clone)]
356pub struct CodecEncodeResult {
357    /// Source symbols: `(esi, data)`.
358    pub source_symbols: Vec<(u32, Vec<u8>)>,
359    /// Repair symbols: `(esi, data)`.
360    pub repair_symbols: Vec<(u32, Vec<u8>)>,
361    /// Number of source symbols K.
362    pub k_source: u32,
363}
364
365/// Raw decode result from the codec.
366#[derive(Debug, Clone)]
367pub enum CodecDecodeResult {
368    /// Decode succeeded.
369    Success {
370        /// Recovered source data.
371        data: Vec<u8>,
372        /// Number of symbols consumed.
373        symbols_used: u32,
374        /// Symbols resolved by peeling.
375        peeled_count: u32,
376        /// Symbols resolved by Gaussian elimination (inactive subsystem).
377        inactivated_count: u32,
378    },
379    /// Decode failed.
380    Failure {
381        /// Reason for failure.
382        reason: DecodeFailureReason,
383        /// Number of symbols that were received.
384        symbols_received: u32,
385        /// Source symbols required.
386        k_required: u32,
387    },
388}
389
390// ---------------------------------------------------------------------------
391// Outcome Types (§3.3)
392// ---------------------------------------------------------------------------
393
394/// Result of a pipeline encode operation.
395#[derive(Debug, Clone)]
396pub struct EncodeOutcome {
397    /// Number of source symbols produced.
398    pub source_count: u32,
399    /// Number of repair symbols produced.
400    pub repair_count: u32,
401    /// Symbol size in bytes.
402    pub symbol_size: u32,
403}
404
405/// Result of a pipeline decode operation.
406#[derive(Debug, Clone)]
407pub enum DecodeOutcome {
408    /// Successful decode with recovered pages.
409    Success(DecodeSuccess),
410    /// Failed decode with diagnostic information.
411    Failure(DecodeFailure),
412}
413
414/// Successful decode metadata.
415#[derive(Debug, Clone)]
416pub struct DecodeSuccess {
417    /// Recovered page data, concatenated.
418    pub data: Vec<u8>,
419    /// Number of symbols used for decoding.
420    pub symbols_used: u32,
421    /// Symbols resolved during the peeling phase.
422    pub peeled_count: u32,
423    /// Symbols resolved during the Gaussian elimination phase.
424    pub inactivated_count: u32,
425    /// Optional decode proof emitted under policy control.
426    pub decode_proof: Option<EcsDecodeProof>,
427}
428
429/// Failed decode metadata.
430#[derive(Debug, Clone)]
431pub struct DecodeFailure {
432    /// Why the decode failed.
433    pub reason: DecodeFailureReason,
434    /// Number of symbols that were available.
435    pub symbols_received: u32,
436    /// Source symbols required (K).
437    pub k_required: u32,
438    /// Optional decode proof emitted under policy control.
439    pub decode_proof: Option<EcsDecodeProof>,
440}
441
442/// Reasons a decode can fail.
443#[derive(Debug, Clone, Copy, PartialEq, Eq)]
444pub enum DecodeFailureReason {
445    /// Fewer symbols than K available.
446    InsufficientSymbols,
447    /// The decoding matrix is singular (rank deficient).
448    SingularMatrix,
449    /// Symbol sizes do not match the expected T.
450    SymbolSizeMismatch,
451    /// Cancelled via `Cx::checkpoint()`.
452    Cancelled,
453}
454
455// ---------------------------------------------------------------------------
456// Pipeline Encoder (§3.3)
457// ---------------------------------------------------------------------------
458
459/// RaptorQ page encoder that wraps a [`SymbolCodec`] and writes through
460/// a [`PageSymbolSink`] with Cx cancellation checkpoints.
461pub struct RaptorQPageEncoder<C: SymbolCodec> {
462    config: PipelineConfig,
463    codec: C,
464}
465
466impl<C: SymbolCodec> RaptorQPageEncoder<C> {
467    /// Create a new encoder.  Validates the config eagerly.
468    pub fn new(config: PipelineConfig, codec: C) -> Result<Self> {
469        config.validate()?;
470        info!(
471            bead_id = BEAD_ID,
472            symbol_size = config.symbol_size,
473            max_block_size = config.max_block_size,
474            repair_overhead = config.repair_overhead,
475            "RaptorQ page encoder created"
476        );
477        Ok(Self { config, codec })
478    }
479
480    /// Encode page data and write symbols through the sink.
481    ///
482    /// `cx.checkpoint()` is called every `checkpoint_interval` symbols.
483    /// Emits a `raptorq_encode` tracing span (bd-3bw.1) and updates
484    /// [`GLOBAL_RAPTORQ_METRICS`].
485    #[allow(clippy::cast_possible_truncation)]
486    pub fn encode_pages(
487        &self,
488        cx: &Cx,
489        page_data: &[u8],
490        sink: &mut dyn PageSymbolSink,
491    ) -> Result<EncodeOutcome> {
492        cx.checkpoint().map_err(|_| FrankenError::Abort)?;
493
494        let symbol_size = self.config.symbol_size;
495        let t0 = Instant::now();
496        debug!(
497            bead_id = BEAD_ID,
498            data_len = page_data.len(),
499            symbol_size,
500            "starting page encode"
501        );
502
503        let result = self
504            .codec
505            .encode(cx, page_data, symbol_size, self.config.repair_overhead)?;
506
507        // Write source symbols with checkpoints.
508        let interval = self.config.checkpoint_interval as usize;
509        for (idx, (esi, data)) in result.source_symbols.iter().enumerate() {
510            if idx > 0 && idx % interval == 0 {
511                cx.checkpoint().map_err(|_| FrankenError::Abort)?;
512            }
513            sink.write_symbol(*esi, data)?;
514        }
515
516        // Write repair symbols with checkpoints.
517        for (idx, (esi, data)) in result.repair_symbols.iter().enumerate() {
518            if idx > 0 && idx % interval == 0 {
519                cx.checkpoint().map_err(|_| FrankenError::Abort)?;
520            }
521            sink.write_symbol(*esi, data)?;
522        }
523
524        sink.flush()?;
525
526        let outcome = EncodeOutcome {
527            source_count: result.k_source,
528            repair_count: result.repair_symbols.len() as u32,
529            symbol_size,
530        };
531
532        let encode_time_us = duration_us_saturating(t0.elapsed());
533
534        // bd-3bw.1: structured tracing span with required fields.
535        let span = tracing::span!(
536            tracing::Level::DEBUG,
537            "raptorq_encode",
538            source_symbols = outcome.source_count,
539            repair_symbols = outcome.repair_count,
540            encode_time_us,
541            encoded_bytes = page_data.len(),
542            symbol_size = outcome.symbol_size,
543        );
544        let _guard = span.enter();
545
546        info!(
547            bead_id = BEAD_ID,
548            source_count = outcome.source_count,
549            repair_count = outcome.repair_count,
550            symbol_size = outcome.symbol_size,
551            encode_time_us,
552            "page encode complete"
553        );
554
555        // bd-3bw.1: update global metric counters.
556        GLOBAL_RAPTORQ_METRICS
557            .record_encode(page_data.len() as u64, u64::from(outcome.repair_count));
558
559        Ok(outcome)
560    }
561
562    /// Reference to the pipeline config.
563    #[must_use]
564    pub const fn config(&self) -> &PipelineConfig {
565        &self.config
566    }
567}
568
569// ---------------------------------------------------------------------------
570// Pipeline Decoder (§3.3)
571// ---------------------------------------------------------------------------
572
573/// RaptorQ page decoder that wraps a [`SymbolCodec`] and reads from
574/// a [`PageSymbolSource`] with Cx cancellation checkpoints.
575pub struct RaptorQPageDecoder<C: SymbolCodec> {
576    config: PipelineConfig,
577    codec: C,
578}
579
580impl<C: SymbolCodec> RaptorQPageDecoder<C> {
581    /// Create a new decoder.  Validates the config eagerly.
582    pub fn new(config: PipelineConfig, codec: C) -> Result<Self> {
583        config.validate()?;
584        info!(
585            bead_id = BEAD_ID,
586            symbol_size = config.symbol_size,
587            "RaptorQ page decoder created"
588        );
589        Ok(Self { config, codec })
590    }
591
592    /// Decode pages from the source.
593    ///
594    /// Reads available symbols, delegates to the codec, and returns the
595    /// outcome.  Cx checkpoint is called at read boundaries.  Emits a
596    /// `raptorq_decode` tracing span (bd-3bw.1) and updates
597    /// [`GLOBAL_RAPTORQ_METRICS`].
598    #[allow(clippy::cast_possible_truncation, clippy::too_many_lines)]
599    pub fn decode_pages(
600        &self,
601        cx: &Cx,
602        source: &mut dyn PageSymbolSource,
603        k_source: u32,
604    ) -> Result<DecodeOutcome> {
605        cx.checkpoint().map_err(|_| FrankenError::Abort)?;
606        let t0 = Instant::now();
607
608        let available = source.available_count();
609        debug!(
610            bead_id = BEAD_ID,
611            k_source, available, "starting page decode"
612        );
613
614        if available < k_source {
615            warn!(
616                bead_id = BEAD_ID,
617                k_source, available, "fewer symbols than K_source — decode likely to fail"
618            );
619        }
620
621        // Collect symbols from source with checkpoints.
622        let esis = source.available_esis();
623        let interval = self.config.checkpoint_interval as usize;
624        let mut symbols = Vec::with_capacity(esis.len());
625        for (idx, esi) in esis.iter().enumerate() {
626            if idx > 0 && idx % interval == 0 {
627                cx.checkpoint().map_err(|_| FrankenError::Abort)?;
628            }
629            if let Some(data) = source.read_symbol(*esi)? {
630                symbols.push((*esi, data));
631            }
632        }
633
634        // Delegate to codec.
635        let codec_result = self
636            .codec
637            .decode(cx, &symbols, k_source, self.config.symbol_size)?;
638        let all_esis = canonical_esis(&symbols);
639        let proof_object_id =
640            derive_decode_proof_object_id(k_source, self.config.symbol_size, &all_esis);
641        let proof_seed = xxh3_64(proof_object_id.as_bytes());
642
643        match codec_result {
644            CodecDecodeResult::Success {
645                data,
646                symbols_used,
647                peeled_count,
648                inactivated_count,
649            } => {
650                info!(
651                    bead_id = BEAD_ID,
652                    k_source,
653                    symbols_used,
654                    peeled_count,
655                    inactivated_count,
656                    "page decode succeeded"
657                );
658                let decode_proof = if self.config.decode_proof_policy.emit_on_repair_success
659                    && contains_repair_esi(&all_esis, k_source)
660                {
661                    let proof = EcsDecodeProof::from_esis(
662                        proof_object_id,
663                        k_source,
664                        &all_esis,
665                        true,
666                        Some(symbols_used),
667                        deterministic_timing_ns(k_source, self.config.symbol_size, symbols_used),
668                        proof_seed,
669                    );
670                    debug!(
671                        bead_id = "bd-faz4",
672                        symbols_used, k_source, "emitted repair-success decode proof"
673                    );
674                    Some(proof)
675                } else {
676                    None
677                };
678                if symbols_used == k_source {
679                    warn!(
680                        bead_id = BEAD_ID,
681                        k_source,
682                        symbols_used,
683                        "fragile recovery: decoded with minimum symbol count"
684                    );
685                }
686                let decoded_len = data.len() as u64;
687                let decode_time_us = duration_us_saturating(t0.elapsed());
688
689                // bd-3bw.1: structured tracing span for successful decode.
690                let span = tracing::span!(
691                    tracing::Level::DEBUG,
692                    "raptorq_decode",
693                    k_source,
694                    symbols_used,
695                    decoded_bytes = decoded_len,
696                    decode_time_us,
697                    ok = true,
698                );
699                let _guard = span.enter();
700
701                GLOBAL_RAPTORQ_METRICS.record_decode_success(decoded_len);
702
703                Ok(DecodeOutcome::Success(DecodeSuccess {
704                    data,
705                    symbols_used,
706                    peeled_count,
707                    inactivated_count,
708                    decode_proof,
709                }))
710            }
711            CodecDecodeResult::Failure {
712                reason,
713                symbols_received,
714                k_required,
715            } => {
716                let decode_proof = if self.config.decode_proof_policy.emit_on_decode_failure {
717                    let intermediate_rank = Some(symbols_received.min(k_required));
718                    let proof = EcsDecodeProof::from_esis(
719                        proof_object_id,
720                        k_source,
721                        &all_esis,
722                        false,
723                        intermediate_rank,
724                        deterministic_timing_ns(
725                            k_source,
726                            self.config.symbol_size,
727                            symbols_received,
728                        ),
729                        proof_seed,
730                    );
731                    debug!(
732                        bead_id = "bd-faz4",
733                        symbols_received, k_required, "emitted decode-failure proof"
734                    );
735                    Some(proof)
736                } else {
737                    None
738                };
739                let decode_time_us = duration_us_saturating(t0.elapsed());
740
741                // bd-3bw.1: structured tracing span for failed decode.
742                let span = tracing::span!(
743                    tracing::Level::DEBUG,
744                    "raptorq_decode",
745                    k_source,
746                    symbols_received,
747                    k_required,
748                    decode_time_us,
749                    ok = false,
750                );
751                let _guard = span.enter();
752
753                error!(
754                    bead_id = BEAD_ID,
755                    k_source,
756                    symbols_received,
757                    k_required,
758                    reason = ?reason,
759                    "page decode failed"
760                );
761
762                GLOBAL_RAPTORQ_METRICS.record_decode_failure();
763
764                Ok(DecodeOutcome::Failure(DecodeFailure {
765                    reason,
766                    symbols_received,
767                    k_required,
768                    decode_proof,
769                }))
770            }
771        }
772    }
773
774    /// Reference to the pipeline config.
775    #[must_use]
776    pub const fn config(&self) -> &PipelineConfig {
777        &self.config
778    }
779}
780
781fn canonical_esis(symbols: &[(u32, Vec<u8>)]) -> Vec<u32> {
782    let mut esis: Vec<u32> = symbols.iter().map(|(esi, _)| *esi).collect();
783    esis.sort_unstable();
784    esis.dedup();
785    esis
786}
787
788fn contains_repair_esi(esis: &[u32], k_source: u32) -> bool {
789    esis.iter().any(|&esi| esi >= k_source)
790}
791
792fn derive_decode_proof_object_id(k_source: u32, symbol_size: u32, esis: &[u32]) -> ObjectId {
793    let mut material = Vec::with_capacity(40 + esis.len() * 4);
794    material.extend_from_slice(b"fsqlite:raptorq:decode-proof:v1");
795    material.extend_from_slice(&k_source.to_le_bytes());
796    material.extend_from_slice(&symbol_size.to_le_bytes());
797    for esi in esis {
798        material.extend_from_slice(&esi.to_le_bytes());
799    }
800    ObjectId::derive_from_canonical_bytes(&material)
801}
802
803fn deterministic_timing_ns(k_source: u32, symbol_size: u32, symbols_used: u32) -> u64 {
804    let mut material = [0_u8; 12];
805    material[..4].copy_from_slice(&k_source.to_le_bytes());
806    material[4..8].copy_from_slice(&symbol_size.to_le_bytes());
807    material[8..12].copy_from_slice(&symbols_used.to_le_bytes());
808    xxh3_64(&material)
809}
810
811// ===========================================================================
812// Tests
813// ===========================================================================
814
815#[cfg(test)]
816#[allow(
817    clippy::cast_possible_truncation,
818    clippy::cast_lossless,
819    clippy::cast_precision_loss,
820    clippy::cast_sign_loss
821)]
822mod tests {
823    use std::collections::{BTreeMap, VecDeque};
824    use std::pin::Pin;
825    use std::task::{Context, Poll};
826
827    use asupersync::error::ErrorKind as AsErrorKind;
828    use asupersync::raptorq::RaptorQReceiverBuilder;
829    use asupersync::raptorq::RaptorQSenderBuilder;
830    use asupersync::security::AuthenticationTag;
831    use asupersync::security::authenticated::AuthenticatedSymbol;
832    use asupersync::transport::error::{SinkError, StreamError};
833    use asupersync::transport::sink::SymbolSink;
834    use asupersync::transport::stream::SymbolStream;
835    use asupersync::types::{ObjectId as AsObjectId, ObjectParams, Symbol, SymbolId, SymbolKind};
836    use asupersync::{Cx as AsCx, RaptorQConfig};
837
838    use super::*;
839
840    // -----------------------------------------------------------------------
841    // Mock PageSymbolSink / PageSymbolSource
842    // -----------------------------------------------------------------------
843
844    struct VecPageSink {
845        symbols: BTreeMap<u32, Vec<u8>>,
846        flushed: bool,
847    }
848
849    impl VecPageSink {
850        fn new() -> Self {
851            Self {
852                symbols: BTreeMap::new(),
853                flushed: false,
854            }
855        }
856    }
857
858    impl PageSymbolSink for VecPageSink {
859        fn write_symbol(&mut self, esi: u32, data: &[u8]) -> Result<()> {
860            self.symbols.insert(esi, data.to_vec());
861            Ok(())
862        }
863
864        fn flush(&mut self) -> Result<()> {
865            self.flushed = true;
866            Ok(())
867        }
868
869        fn written_count(&self) -> u32 {
870            self.symbols.len() as u32
871        }
872    }
873
874    struct VecPageSource {
875        symbols: BTreeMap<u32, Vec<u8>>,
876    }
877
878    impl VecPageSource {
879        fn from_sink(sink: &VecPageSink) -> Self {
880            Self {
881                symbols: sink.symbols.clone(),
882            }
883        }
884
885        fn from_map(symbols: BTreeMap<u32, Vec<u8>>) -> Self {
886            Self { symbols }
887        }
888    }
889
890    impl PageSymbolSource for VecPageSource {
891        fn read_symbol(&mut self, esi: u32) -> Result<Option<Vec<u8>>> {
892            Ok(self.symbols.get(&esi).cloned())
893        }
894
895        fn available_esis(&self) -> Vec<u32> {
896            self.symbols.keys().copied().collect()
897        }
898
899        fn available_count(&self) -> u32 {
900            self.symbols.len() as u32
901        }
902    }
903
904    // -----------------------------------------------------------------------
905    // Asupersync-backed SymbolCodec implementation
906    // -----------------------------------------------------------------------
907
908    #[derive(Debug)]
909    struct VecTransportSink {
910        symbols: Vec<Symbol>,
911    }
912
913    impl VecTransportSink {
914        fn new() -> Self {
915            Self {
916                symbols: Vec::new(),
917            }
918        }
919    }
920
921    #[derive(Debug)]
922    struct VecTransportStream {
923        symbols: VecDeque<AuthenticatedSymbol>,
924    }
925
926    impl VecTransportStream {
927        fn new(symbols: Vec<Symbol>) -> Self {
928            let symbols = symbols
929                .into_iter()
930                .map(|symbol| AuthenticatedSymbol::new_verified(symbol, AuthenticationTag::zero()))
931                .collect();
932            Self { symbols }
933        }
934    }
935
936    impl SymbolStream for VecTransportStream {
937        fn poll_next(
938            mut self: Pin<&mut Self>,
939            _cx: &mut Context<'_>,
940        ) -> Poll<Option<std::result::Result<AuthenticatedSymbol, StreamError>>> {
941            match self.symbols.pop_front() {
942                Some(symbol) => Poll::Ready(Some(Ok(symbol))),
943                None => Poll::Ready(None),
944            }
945        }
946
947        fn size_hint(&self) -> (usize, Option<usize>) {
948            (self.symbols.len(), Some(self.symbols.len()))
949        }
950
951        fn is_exhausted(&self) -> bool {
952            self.symbols.is_empty()
953        }
954    }
955
956    const TEST_OBJECT_ID: u64 = 0xBD_1A15;
957    const TEST_MAX_BLOCK_SIZE: usize = 64 * 1024;
958    const PACKED_KIND_REPAIR_BIT: u32 = 1_u32 << 31;
959    const PACKED_SBN_SHIFT: u32 = 23;
960    const PACKED_SBN_MASK: u32 = 0xFF;
961    const PACKED_ESI_MASK: u32 = 0x7F_FFFF;
962
963    fn pack_symbol_key(kind: SymbolKind, sbn: u8, esi: u32) -> Result<u32> {
964        if esi > PACKED_ESI_MASK {
965            return Err(FrankenError::OutOfRange {
966                what: "packed symbol esi (must fit 23 bits)".to_owned(),
967                value: esi.to_string(),
968            });
969        }
970
971        let kind_bit = if kind.is_repair() {
972            PACKED_KIND_REPAIR_BIT
973        } else {
974            0
975        };
976        Ok(kind_bit | (u32::from(sbn) << PACKED_SBN_SHIFT) | esi)
977    }
978
979    fn unpack_symbol_key(packed: u32) -> (SymbolKind, u8, u32) {
980        let kind = if packed & PACKED_KIND_REPAIR_BIT == 0 {
981            SymbolKind::Source
982        } else {
983            SymbolKind::Repair
984        };
985        let sbn = ((packed >> PACKED_SBN_SHIFT) & PACKED_SBN_MASK) as u8;
986        let esi = packed & PACKED_ESI_MASK;
987        (kind, sbn, esi)
988    }
989
990    impl SymbolSink for VecTransportSink {
991        fn poll_send(
992            mut self: Pin<&mut Self>,
993            _cx: &mut Context<'_>,
994            symbol: AuthenticatedSymbol,
995        ) -> Poll<std::result::Result<(), SinkError>> {
996            self.symbols.push(symbol.into_symbol());
997            Poll::Ready(Ok(()))
998        }
999
1000        fn poll_flush(
1001            self: Pin<&mut Self>,
1002            _cx: &mut Context<'_>,
1003        ) -> Poll<std::result::Result<(), SinkError>> {
1004            Poll::Ready(Ok(()))
1005        }
1006
1007        fn poll_close(
1008            self: Pin<&mut Self>,
1009            _cx: &mut Context<'_>,
1010        ) -> Poll<std::result::Result<(), SinkError>> {
1011            Poll::Ready(Ok(()))
1012        }
1013
1014        fn poll_ready(
1015            self: Pin<&mut Self>,
1016            _cx: &mut Context<'_>,
1017        ) -> Poll<std::result::Result<(), SinkError>> {
1018            Poll::Ready(Ok(()))
1019        }
1020    }
1021
1022    /// SymbolCodec backed by asupersync.
1023    struct AsupersyncCodec;
1024
1025    impl SymbolCodec for AsupersyncCodec {
1026        fn encode(
1027            &self,
1028            _cx: &Cx,
1029            source_data: &[u8],
1030            symbol_size: u32,
1031            repair_overhead: f64,
1032        ) -> Result<CodecEncodeResult> {
1033            let mut config = RaptorQConfig::default();
1034            config.encoding.symbol_size = symbol_size as u16;
1035            config.encoding.max_block_size = TEST_MAX_BLOCK_SIZE;
1036            config.encoding.repair_overhead = repair_overhead;
1037
1038            let cx = AsCx::for_testing();
1039            let object_id = AsObjectId::new_for_test(TEST_OBJECT_ID);
1040            let mut sender = RaptorQSenderBuilder::new()
1041                .config(config)
1042                .transport(VecTransportSink::new())
1043                .build()
1044                .map_err(|e| FrankenError::Internal(format!("sender build: {e}")))?;
1045
1046            let outcome = sender
1047                .send_object(&cx, object_id, source_data)
1048                .map_err(|e| FrankenError::Internal(format!("send_object: {e}")))?;
1049
1050            let symbols = std::mem::take(&mut sender.transport_mut().symbols);
1051            let k = outcome.source_symbols as u32;
1052
1053            let mut source_symbols = Vec::new();
1054            let mut repair_symbols = Vec::new();
1055            for s in &symbols {
1056                let packed_key = pack_symbol_key(s.kind(), s.sbn(), s.esi())?;
1057                if s.kind().is_source() {
1058                    source_symbols.push((packed_key, s.data().to_vec()));
1059                } else {
1060                    repair_symbols.push((packed_key, s.data().to_vec()));
1061                }
1062            }
1063
1064            Ok(CodecEncodeResult {
1065                source_symbols,
1066                repair_symbols,
1067                k_source: k,
1068            })
1069        }
1070
1071        fn decode(
1072            &self,
1073            _cx: &Cx,
1074            symbols: &[(u32, Vec<u8>)],
1075            k_source: u32,
1076            symbol_size: u32,
1077        ) -> Result<CodecDecodeResult> {
1078            if symbols.is_empty() {
1079                return Ok(CodecDecodeResult::Failure {
1080                    reason: DecodeFailureReason::InsufficientSymbols,
1081                    symbols_received: 0,
1082                    k_required: k_source,
1083                });
1084            }
1085
1086            let object_id = AsObjectId::new_for_test(TEST_OBJECT_ID);
1087            let mut config = RaptorQConfig::default();
1088            config.encoding.symbol_size = symbol_size as u16;
1089            config.encoding.max_block_size = TEST_MAX_BLOCK_SIZE;
1090
1091            // The test codec exposes the same metadata surface as the
1092            // production `SymbolCodec`, so it must rebuild decode parameters
1093            // using the same single-block geometry.
1094            let source_blocks = 1_u16;
1095            let symbols_per_block = k_source.max(1);
1096            let object_size = u64::from(k_source)
1097                .checked_mul(u64::from(symbol_size))
1098                .ok_or_else(|| FrankenError::OutOfRange {
1099                    what: "object_size for decode params".to_owned(),
1100                    value: format!("{k_source}*{symbol_size}"),
1101                })?;
1102            let params = ObjectParams::new(
1103                object_id,
1104                object_size,
1105                u16::try_from(symbol_size).map_err(|_| FrankenError::OutOfRange {
1106                    what: "symbol_size as u16".to_owned(),
1107                    value: symbol_size.to_string(),
1108                })?,
1109                source_blocks,
1110                u16::try_from(symbols_per_block).map_err(|_| FrankenError::OutOfRange {
1111                    what: "symbols_per_block as u16".to_owned(),
1112                    value: symbols_per_block.to_string(),
1113                })?,
1114            );
1115
1116            let mut rebuilt = Vec::with_capacity(symbols.len());
1117            for (packed, data) in symbols {
1118                let (kind, sbn, esi) = unpack_symbol_key(*packed);
1119                rebuilt.push(Symbol::new(
1120                    SymbolId::new(object_id, sbn, esi),
1121                    data.clone(),
1122                    kind,
1123                ));
1124            }
1125
1126            let cx = AsCx::for_testing();
1127            let mut receiver = RaptorQReceiverBuilder::new()
1128                .config(config)
1129                .source(VecTransportStream::new(rebuilt))
1130                .build()
1131                .map_err(|e| FrankenError::Internal(format!("receiver build: {e}")))?;
1132
1133            match receiver.receive_object(&cx, &params) {
1134                Ok(outcome) => Ok(CodecDecodeResult::Success {
1135                    data: outcome.data,
1136                    symbols_used: outcome.symbols_received as u32,
1137                    peeled_count: 0,
1138                    inactivated_count: 0,
1139                }),
1140                Err(err) => {
1141                    let reason = match err.kind() {
1142                        AsErrorKind::InsufficientSymbols => {
1143                            DecodeFailureReason::InsufficientSymbols
1144                        }
1145                        _ => DecodeFailureReason::SingularMatrix,
1146                    };
1147                    Ok(CodecDecodeResult::Failure {
1148                        reason,
1149                        symbols_received: symbols.len() as u32,
1150                        k_required: k_source,
1151                    })
1152                }
1153            }
1154        }
1155    }
1156
1157    // -----------------------------------------------------------------------
1158    // Helpers
1159    // -----------------------------------------------------------------------
1160
1161    fn deterministic_page_data(k: usize, symbol_size: usize, seed: u64) -> Vec<u8> {
1162        let mut state = seed ^ 0x9E37_79B9_7F4A_7C15;
1163        let total = k * symbol_size;
1164        let mut out = Vec::with_capacity(total);
1165        for idx in 0..total {
1166            state ^= state << 7;
1167            state ^= state >> 9;
1168            state = state.wrapping_mul(0xA24B_AED4_963E_E407);
1169            let idx_byte = (idx % 251) as u8;
1170            out.push((state & 0xFF) as u8 ^ idx_byte);
1171        }
1172        out
1173    }
1174
1175    fn test_cx() -> fsqlite_types::cx::Cx {
1176        fsqlite_types::cx::Cx::new()
1177    }
1178
1179    fn default_codec() -> AsupersyncCodec {
1180        AsupersyncCodec
1181    }
1182
1183    fn default_config() -> PipelineConfig {
1184        PipelineConfig::for_page_size(512)
1185    }
1186
1187    // -----------------------------------------------------------------------
1188    // §3.3 Test 12: Pipeline encode (test_pipeline_encode_async)
1189    // -----------------------------------------------------------------------
1190
1191    #[test]
1192    fn test_pipeline_encode_produces_source_and_repair() {
1193        let config = default_config();
1194        let encoder =
1195            RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1196        let cx = test_cx();
1197        let k = 10_usize;
1198        let data = deterministic_page_data(k, config.symbol_size as usize, 0x1234);
1199
1200        let mut sink = VecPageSink::new();
1201        let outcome = encoder
1202            .encode_pages(&cx, &data, &mut sink)
1203            .expect("encode must succeed");
1204
1205        assert_eq!(
1206            outcome.source_count as usize, k,
1207            "bead_id={BEAD_ID} case=encode_source_count"
1208        );
1209        assert!(
1210            outcome.repair_count > 0,
1211            "bead_id={BEAD_ID} case=encode_repair_present"
1212        );
1213        assert_eq!(
1214            outcome.symbol_size, config.symbol_size,
1215            "bead_id={BEAD_ID} case=encode_symbol_size"
1216        );
1217        assert!(sink.flushed, "bead_id={BEAD_ID} case=encode_sink_flushed");
1218
1219        // Verify source symbols contain original page data.
1220        let sym_size = config.symbol_size as usize;
1221        for i in 0..k {
1222            let esi = i as u32;
1223            let expected = &data[i * sym_size..(i + 1) * sym_size];
1224            let actual = sink.symbols.get(&esi);
1225            assert!(actual.is_some(), "source symbol ESI {esi} missing");
1226            let actual = actual.expect("source symbol existence asserted");
1227            assert_eq!(
1228                actual, expected,
1229                "bead_id={BEAD_ID} case=encode_source_symbol_matches esi={esi}"
1230            );
1231        }
1232
1233        info!(
1234            bead_id = BEAD_ID,
1235            source_count = outcome.source_count,
1236            repair_count = outcome.repair_count,
1237            total_written = sink.written_count(),
1238            "test_pipeline_encode complete"
1239        );
1240    }
1241
1242    // -----------------------------------------------------------------------
1243    // §3.3 Test 13: Pipeline decode (test_pipeline_decode_async)
1244    // -----------------------------------------------------------------------
1245
1246    #[test]
1247    fn test_pipeline_decode_with_extra_symbols() {
1248        let config = default_config();
1249        let encoder =
1250            RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1251        let decoder =
1252            RaptorQPageDecoder::new(config.clone(), default_codec()).expect("decoder build");
1253        let cx = test_cx();
1254        let k = 10_usize;
1255        let data = deterministic_page_data(k, config.symbol_size as usize, 0x5678);
1256
1257        // Encode.
1258        let mut sink = VecPageSink::new();
1259        let outcome = encoder
1260            .encode_pages(&cx, &data, &mut sink)
1261            .expect("encode must succeed");
1262
1263        // Decode from all symbols (K + repair).
1264        let mut source = VecPageSource::from_sink(&sink);
1265        let decode_outcome = decoder
1266            .decode_pages(&cx, &mut source, outcome.source_count)
1267            .expect("decode must succeed");
1268
1269        match decode_outcome {
1270            DecodeOutcome::Success(success) => {
1271                assert_eq!(
1272                    success.data, data,
1273                    "bead_id={BEAD_ID} case=decode_roundtrip_bytes"
1274                );
1275                assert!(
1276                    success.symbols_used >= outcome.source_count,
1277                    "bead_id={BEAD_ID} case=decode_symbols_used"
1278                );
1279                info!(
1280                    bead_id = BEAD_ID,
1281                    symbols_used = success.symbols_used,
1282                    peeled = success.peeled_count,
1283                    inactivated = success.inactivated_count,
1284                    "test_pipeline_decode complete"
1285                );
1286            }
1287            DecodeOutcome::Failure(failure) => unreachable!(
1288                "bead_id={BEAD_ID} case=decode_unexpected_failure reason={:?}",
1289                failure.reason
1290            ),
1291        }
1292    }
1293
1294    // -----------------------------------------------------------------------
1295    // §3.3 Test 14: Cancel-safety (test_pipeline_cancel_safe)
1296    // -----------------------------------------------------------------------
1297
1298    #[test]
1299    fn test_pipeline_cancel_safe_encode() {
1300        let config = PipelineConfig {
1301            checkpoint_interval: 2, // checkpoint every 2 symbols
1302            ..default_config()
1303        };
1304        let encoder =
1305            RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1306
1307        // Create a Cx that is already cancelled.
1308        let cx = fsqlite_types::cx::Cx::new();
1309        cx.cancel_with_reason(fsqlite_types::cx::CancelReason::UserInterrupt);
1310
1311        let k = 10_usize;
1312        let data = deterministic_page_data(k, config.symbol_size as usize, 0xABCD);
1313        let mut sink = VecPageSink::new();
1314
1315        let result = encoder.encode_pages(&cx, &data, &mut sink);
1316        assert!(
1317            result.is_err(),
1318            "bead_id={BEAD_ID} case=cancel_safe_encode_aborts"
1319        );
1320        assert!(
1321            matches!(result.unwrap_err(), FrankenError::Abort),
1322            "bead_id={BEAD_ID} case=cancel_safe_encode_error_type"
1323        );
1324        // Sink should not have been flushed.
1325        assert!(!sink.flushed, "bead_id={BEAD_ID} case=cancel_safe_no_flush");
1326    }
1327
1328    #[test]
1329    fn test_pipeline_cancel_safe_decode() {
1330        let config = PipelineConfig {
1331            checkpoint_interval: 2,
1332            ..default_config()
1333        };
1334        let decoder =
1335            RaptorQPageDecoder::new(config.clone(), default_codec()).expect("decoder build");
1336
1337        // Create a Cx that is already cancelled.
1338        let cx = fsqlite_types::cx::Cx::new();
1339        cx.cancel_with_reason(fsqlite_types::cx::CancelReason::UserInterrupt);
1340
1341        // Feed some symbols.
1342        let mut symbols = BTreeMap::new();
1343        for esi in 0..10_u32 {
1344            symbols.insert(esi, vec![0xAA; config.symbol_size as usize]);
1345        }
1346        let mut source = VecPageSource::from_map(symbols);
1347
1348        let result = decoder.decode_pages(&cx, &mut source, 10);
1349        assert!(
1350            result.is_err(),
1351            "bead_id={BEAD_ID} case=cancel_safe_decode_aborts"
1352        );
1353        assert!(
1354            matches!(result.unwrap_err(), FrankenError::Abort),
1355            "bead_id={BEAD_ID} case=cancel_safe_decode_error_type"
1356        );
1357    }
1358
1359    // -----------------------------------------------------------------------
1360    // §3.3 Test 15: Backpressure (test_pipeline_backpressure)
1361    // -----------------------------------------------------------------------
1362
1363    /// Sink that fails after N writes, simulating a full output buffer.
1364    struct BackpressureSink {
1365        limit: u32,
1366        count: u32,
1367    }
1368
1369    impl BackpressureSink {
1370        fn new(limit: u32) -> Self {
1371            Self { limit, count: 0 }
1372        }
1373    }
1374
1375    impl PageSymbolSink for BackpressureSink {
1376        fn write_symbol(&mut self, _esi: u32, _data: &[u8]) -> Result<()> {
1377            if self.count >= self.limit {
1378                return Err(FrankenError::Busy);
1379            }
1380            self.count += 1;
1381            Ok(())
1382        }
1383
1384        fn flush(&mut self) -> Result<()> {
1385            Ok(())
1386        }
1387
1388        fn written_count(&self) -> u32 {
1389            self.count
1390        }
1391    }
1392
1393    #[test]
1394    fn test_pipeline_backpressure_sink_full() {
1395        let config = default_config();
1396        let encoder =
1397            RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1398        let cx = test_cx();
1399        let k = 10_usize;
1400        let data = deterministic_page_data(k, config.symbol_size as usize, 0xEEFF);
1401
1402        // Sink that only accepts 3 symbols then returns Busy.
1403        let mut sink = BackpressureSink::new(3);
1404        let result = encoder.encode_pages(&cx, &data, &mut sink);
1405
1406        assert!(
1407            result.is_err(),
1408            "bead_id={BEAD_ID} case=backpressure_propagated"
1409        );
1410        assert!(
1411            matches!(result.unwrap_err(), FrankenError::Busy),
1412            "bead_id={BEAD_ID} case=backpressure_error_type"
1413        );
1414        assert_eq!(
1415            sink.written_count(),
1416            3,
1417            "bead_id={BEAD_ID} case=backpressure_partial_write"
1418        );
1419    }
1420
1421    // -----------------------------------------------------------------------
1422    // Config Validation Tests
1423    // -----------------------------------------------------------------------
1424
1425    #[test]
1426    fn test_config_validation_zero_symbol_size() {
1427        let config = PipelineConfig {
1428            symbol_size: 0,
1429            ..default_config()
1430        };
1431        assert!(
1432            config.validate().is_err(),
1433            "bead_id={BEAD_ID} case=config_reject_zero_symbol_size"
1434        );
1435    }
1436
1437    #[test]
1438    fn test_config_validation_non_power_of_two() {
1439        let config = PipelineConfig {
1440            symbol_size: 1000,
1441            ..default_config()
1442        };
1443        assert!(
1444            config.validate().is_err(),
1445            "bead_id={BEAD_ID} case=config_reject_non_power_of_two"
1446        );
1447    }
1448
1449    #[test]
1450    fn test_config_validation_below_min() {
1451        let config = PipelineConfig {
1452            symbol_size: 256,
1453            ..default_config()
1454        };
1455        assert!(
1456            config.validate().is_err(),
1457            "bead_id={BEAD_ID} case=config_reject_below_min"
1458        );
1459    }
1460
1461    #[test]
1462    fn test_config_validation_above_max() {
1463        let config = PipelineConfig {
1464            symbol_size: 128 * 1024,
1465            ..default_config()
1466        };
1467        assert!(
1468            config.validate().is_err(),
1469            "bead_id={BEAD_ID} case=config_reject_above_max"
1470        );
1471    }
1472
1473    #[test]
1474    fn test_config_validation_zero_max_block_size() {
1475        let config = PipelineConfig {
1476            max_block_size: 0,
1477            ..default_config()
1478        };
1479        assert!(
1480            config.validate().is_err(),
1481            "bead_id={BEAD_ID} case=config_reject_zero_max_block"
1482        );
1483    }
1484
1485    #[test]
1486    fn test_config_validation_repair_overhead_below_one() {
1487        let config = PipelineConfig {
1488            repair_overhead: 0.5,
1489            ..default_config()
1490        };
1491        assert!(
1492            config.validate().is_err(),
1493            "bead_id={BEAD_ID} case=config_reject_repair_overhead_below_one"
1494        );
1495    }
1496
1497    #[test]
1498    fn test_config_validation_zero_checkpoint_interval() {
1499        let config = PipelineConfig {
1500            checkpoint_interval: 0,
1501            ..default_config()
1502        };
1503        assert!(
1504            config.validate().is_err(),
1505            "bead_id={BEAD_ID} case=config_reject_zero_checkpoint_interval"
1506        );
1507    }
1508
1509    #[test]
1510    fn test_config_validation_valid_configs() {
1511        for symbol_size in [512, 1024, 2048, 4096, 8192, 16384, 32768, 65536] {
1512            let config = PipelineConfig::for_page_size(symbol_size);
1513            assert!(
1514                config.validate().is_ok(),
1515                "bead_id={BEAD_ID} case=config_valid symbol_size={symbol_size}"
1516            );
1517        }
1518    }
1519
1520    // -----------------------------------------------------------------------
1521    // Decode Proof on Failure
1522    // -----------------------------------------------------------------------
1523
1524    #[test]
1525    fn test_decode_failure_insufficient_symbols() {
1526        let config = default_config();
1527        let encoder =
1528            RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1529        let decoder =
1530            RaptorQPageDecoder::new(config.clone(), default_codec()).expect("decoder build");
1531        let cx = test_cx();
1532        let k = 10_usize;
1533        let data = deterministic_page_data(k, config.symbol_size as usize, 0xDEAD);
1534
1535        // Encode.
1536        let mut sink = VecPageSink::new();
1537        let outcome = encoder
1538            .encode_pages(&cx, &data, &mut sink)
1539            .expect("encode must succeed");
1540
1541        // Keep only K-3 source symbols (insufficient).
1542        let mut partial = BTreeMap::new();
1543        for esi in 0..((k - 3) as u32) {
1544            if let Some(sym) = sink.symbols.get(&esi) {
1545                partial.insert(esi, sym.clone());
1546            }
1547        }
1548        let mut source = VecPageSource::from_map(partial);
1549
1550        let decode_outcome = decoder
1551            .decode_pages(&cx, &mut source, outcome.source_count)
1552            .expect("decode call itself should not error");
1553
1554        match decode_outcome {
1555            DecodeOutcome::Failure(failure) => {
1556                assert_eq!(
1557                    failure.reason,
1558                    DecodeFailureReason::InsufficientSymbols,
1559                    "bead_id={BEAD_ID} case=decode_failure_reason"
1560                );
1561                assert!(
1562                    failure.symbols_received < outcome.source_count,
1563                    "bead_id={BEAD_ID} case=decode_failure_symbol_count"
1564                );
1565                assert_eq!(
1566                    failure.k_required, outcome.source_count,
1567                    "bead_id={BEAD_ID} case=decode_failure_k_required"
1568                );
1569                assert!(
1570                    failure.decode_proof.is_none(),
1571                    "bead_id={BEAD_ID} case=decode_failure_proof_disabled_by_default"
1572                );
1573            }
1574            DecodeOutcome::Success(_) => {
1575                unreachable!("bead_id={BEAD_ID} case=decode_should_have_failed")
1576            }
1577        }
1578    }
1579
1580    #[test]
1581    fn test_decode_failure_emits_proof_when_enabled() {
1582        let mut config = default_config();
1583        config.decode_proof_policy = DecodeProofEmissionPolicy {
1584            emit_on_decode_failure: true,
1585            emit_on_repair_success: false,
1586        };
1587        let encoder =
1588            RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1589        let decoder =
1590            RaptorQPageDecoder::new(config.clone(), default_codec()).expect("decoder build");
1591        let cx = test_cx();
1592        let k = 10_usize;
1593        let data = deterministic_page_data(k, config.symbol_size as usize, 0xFA24);
1594
1595        let mut sink = VecPageSink::new();
1596        let outcome = encoder
1597            .encode_pages(&cx, &data, &mut sink)
1598            .expect("encode must succeed");
1599
1600        let mut partial = BTreeMap::new();
1601        for esi in 0..((k - 2) as u32) {
1602            if let Some(sym) = sink.symbols.get(&esi) {
1603                partial.insert(esi, sym.clone());
1604            }
1605        }
1606        let mut source = VecPageSource::from_map(partial);
1607        let decode_outcome = decoder
1608            .decode_pages(&cx, &mut source, outcome.source_count)
1609            .expect("decode call itself should not error");
1610
1611        match decode_outcome {
1612            DecodeOutcome::Failure(failure) => {
1613                let proof = failure
1614                    .decode_proof
1615                    .expect("bead_id=bd-faz4 case=decode_failure_proof_emitted");
1616                assert!(
1617                    !proof.decode_success,
1618                    "bead_id=bd-faz4 case=decode_failure_proof_flag"
1619                );
1620                assert!(
1621                    proof.is_consistent(),
1622                    "bead_id=bd-faz4 case=decode_failure_proof_consistent"
1623                );
1624            }
1625            DecodeOutcome::Success(_) => {
1626                unreachable!("bead_id=bd-faz4 case=decode_failure_expected")
1627            }
1628        }
1629    }
1630
1631    #[test]
1632    fn test_decode_success_with_repair_emits_proof_when_enabled() {
1633        let mut config = default_config();
1634        config.decode_proof_policy = DecodeProofEmissionPolicy {
1635            emit_on_decode_failure: false,
1636            emit_on_repair_success: true,
1637        };
1638        let encoder =
1639            RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1640        let decoder =
1641            RaptorQPageDecoder::new(config.clone(), default_codec()).expect("decoder build");
1642        let cx = test_cx();
1643        let k = 10_usize;
1644        let data = deterministic_page_data(k, config.symbol_size as usize, 0xF0AA);
1645
1646        let mut sink = VecPageSink::new();
1647        let outcome = encoder
1648            .encode_pages(&cx, &data, &mut sink)
1649            .expect("encode must succeed");
1650        let mut source = VecPageSource::from_sink(&sink);
1651        let decode_outcome = decoder
1652            .decode_pages(&cx, &mut source, outcome.source_count)
1653            .expect("decode must succeed");
1654
1655        match decode_outcome {
1656            DecodeOutcome::Success(success) => {
1657                let proof = success
1658                    .decode_proof
1659                    .expect("bead_id=bd-faz4 case=repair_success_proof_emitted");
1660                assert!(proof.decode_success);
1661                assert!(proof.is_repair());
1662                assert!(
1663                    proof.is_consistent(),
1664                    "bead_id=bd-faz4 case=repair_success_proof_consistent"
1665                );
1666            }
1667            DecodeOutcome::Failure(failure) => unreachable!(
1668                "bead_id=bd-faz4 case=repair_success_should_decode reason={:?}",
1669                failure.reason
1670            ),
1671        }
1672    }
1673
1674    // -----------------------------------------------------------------------
1675    // E2E Round-trip: encode → store → read → decode → verify
1676    // -----------------------------------------------------------------------
1677
1678    #[test]
1679    fn test_e2e_roundtrip_multiple_page_sizes() {
1680        for &symbol_size in &[512_u32, 1024, 4096] {
1681            let config = PipelineConfig::for_page_size(symbol_size);
1682            let encoder =
1683                RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1684            let decoder =
1685                RaptorQPageDecoder::new(config.clone(), default_codec()).expect("decoder build");
1686            let cx = test_cx();
1687
1688            let k = 8_usize;
1689            let data = deterministic_page_data(k, symbol_size as usize, u64::from(symbol_size));
1690
1691            // Encode → store.
1692            let mut sink = VecPageSink::new();
1693            let outcome = encoder
1694                .encode_pages(&cx, &data, &mut sink)
1695                .expect("encode must succeed");
1696
1697            // Read → decode.
1698            let mut source = VecPageSource::from_sink(&sink);
1699            let decode_result = decoder
1700                .decode_pages(&cx, &mut source, outcome.source_count)
1701                .expect("decode must succeed");
1702
1703            match decode_result {
1704                DecodeOutcome::Success(success) => {
1705                    assert_eq!(
1706                        success.data, data,
1707                        "bead_id={BEAD_ID} case=e2e_roundtrip symbol_size={symbol_size}"
1708                    );
1709                }
1710                DecodeOutcome::Failure(f) => unreachable!(
1711                    "bead_id={BEAD_ID} case=e2e_roundtrip_failure symbol_size={symbol_size} reason={:?}",
1712                    f.reason
1713                ),
1714            }
1715        }
1716    }
1717
1718    #[test]
1719    fn test_e2e_roundtrip_64_pages() {
1720        let config = PipelineConfig::for_page_size(4096);
1721        let encoder =
1722            RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1723        let decoder =
1724            RaptorQPageDecoder::new(config.clone(), default_codec()).expect("decoder build");
1725        let cx = test_cx();
1726
1727        let k = 64_usize;
1728        let data = deterministic_page_data(k, config.symbol_size as usize, 0xE2E6_4000);
1729
1730        let mut sink = VecPageSink::new();
1731        let outcome = encoder
1732            .encode_pages(&cx, &data, &mut sink)
1733            .expect("encode must succeed");
1734
1735        assert_eq!(
1736            outcome.source_count as usize, k,
1737            "bead_id={BEAD_ID} case=e2e_64_source_count"
1738        );
1739
1740        let mut source = VecPageSource::from_sink(&sink);
1741        let decode_result = decoder
1742            .decode_pages(&cx, &mut source, outcome.source_count)
1743            .expect("decode must succeed");
1744
1745        match decode_result {
1746            DecodeOutcome::Success(success) => {
1747                assert_eq!(
1748                    success.data, data,
1749                    "bead_id={BEAD_ID} case=e2e_64_roundtrip_bytes"
1750                );
1751                info!(
1752                    bead_id = BEAD_ID,
1753                    k,
1754                    peeled = success.peeled_count,
1755                    inactivated = success.inactivated_count,
1756                    "E2E 64-page roundtrip complete"
1757                );
1758            }
1759            DecodeOutcome::Failure(f) => unreachable!(
1760                "bead_id={BEAD_ID} case=e2e_64_failure reason={:?}",
1761                f.reason
1762            ),
1763        }
1764    }
1765
1766    #[test]
1767    fn test_e2e_bd_1hi_5() {
1768        let config = PipelineConfig::for_page_size(4096);
1769        let encoder =
1770            RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1771        let decoder =
1772            RaptorQPageDecoder::new(config.clone(), default_codec()).expect("decoder build");
1773        let cx = test_cx();
1774
1775        // Realistic load for this lane: 64 pages (256 KiB) with symbol loss.
1776        let k = 64_usize;
1777        let data = deterministic_page_data(k, config.symbol_size as usize, 0xB1D1_5005);
1778        let mut sink = VecPageSink::new();
1779        let outcome = encoder
1780            .encode_pages(&cx, &data, &mut sink)
1781            .expect("encode must succeed");
1782
1783        // Drop one source symbol per source block; keep all repair symbols.
1784        let mut dropped = 0_u32;
1785        let mut degraded = BTreeMap::new();
1786        for (packed_key, symbol_bytes) in &sink.symbols {
1787            let (kind, _sbn, esi) = unpack_symbol_key(*packed_key);
1788            if kind.is_source() && esi == 0 {
1789                dropped += 1;
1790                continue;
1791            }
1792            degraded.insert(*packed_key, symbol_bytes.clone());
1793        }
1794        assert!(dropped > 0, "bead_id={BEAD_ID} case=e2e_named_dropped_some");
1795
1796        let mut source = VecPageSource::from_map(degraded);
1797        let decode_result = decoder
1798            .decode_pages(&cx, &mut source, outcome.source_count)
1799            .expect("decode must complete");
1800
1801        match decode_result {
1802            DecodeOutcome::Success(success) => {
1803                assert_eq!(
1804                    success.data, data,
1805                    "bead_id={BEAD_ID} case=e2e_named_byte_perfect_recovery"
1806                );
1807            }
1808            DecodeOutcome::Failure(f) => unreachable!(
1809                "bead_id={BEAD_ID} case=e2e_named_unexpected_failure reason={:?}",
1810                f.reason
1811            ),
1812        }
1813    }
1814
1815    // -----------------------------------------------------------------------
1816    // E2E: Retry after failure
1817    // -----------------------------------------------------------------------
1818
1819    #[test]
1820    fn test_e2e_retry_after_failure() {
1821        let config = default_config();
1822        let encoder =
1823            RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1824        let decoder =
1825            RaptorQPageDecoder::new(config.clone(), default_codec()).expect("decoder build");
1826        let cx = test_cx();
1827        let k = 10_usize;
1828        let data = deterministic_page_data(k, config.symbol_size as usize, 0xAE_7121);
1829
1830        // Encode.
1831        let mut sink = VecPageSink::new();
1832        let outcome = encoder
1833            .encode_pages(&cx, &data, &mut sink)
1834            .expect("encode must succeed");
1835
1836        // First attempt: K-2 source symbols only → should fail.
1837        let mut partial = BTreeMap::new();
1838        for esi in 0..((k - 2) as u32) {
1839            if let Some(sym) = sink.symbols.get(&esi) {
1840                partial.insert(esi, sym.clone());
1841            }
1842        }
1843        let mut source_attempt1 = VecPageSource::from_map(partial.clone());
1844        let result1 = decoder
1845            .decode_pages(&cx, &mut source_attempt1, outcome.source_count)
1846            .expect("decode call should not error");
1847        assert!(
1848            matches!(result1, DecodeOutcome::Failure(_)),
1849            "bead_id={BEAD_ID} case=retry_first_attempt_fails"
1850        );
1851
1852        // Second attempt: add all remaining symbols → should succeed.
1853        let full = sink.symbols.clone();
1854        let mut source_attempt2 = VecPageSource::from_map(full);
1855        let result2 = decoder
1856            .decode_pages(&cx, &mut source_attempt2, outcome.source_count)
1857            .expect("decode call should not error");
1858        match result2 {
1859            DecodeOutcome::Success(success) => {
1860                assert_eq!(
1861                    success.data, data,
1862                    "bead_id={BEAD_ID} case=retry_second_attempt_succeeds"
1863                );
1864            }
1865            DecodeOutcome::Failure(f) => unreachable!(
1866                "bead_id={BEAD_ID} case=retry_second_should_succeed reason={:?}",
1867                f.reason
1868            ),
1869        }
1870    }
1871
1872    // -----------------------------------------------------------------------
1873    // Decode with exact K symbols (fragile recovery)
1874    // -----------------------------------------------------------------------
1875
1876    #[test]
1877    fn test_decode_source_only_exact_k() {
1878        let config = default_config();
1879        let encoder =
1880            RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1881        let decoder =
1882            RaptorQPageDecoder::new(config.clone(), default_codec()).expect("decoder build");
1883        let cx = test_cx();
1884        let k = 8_usize;
1885        let data = deterministic_page_data(k, config.symbol_size as usize, 0xE4AC7);
1886
1887        let mut sink = VecPageSink::new();
1888        let outcome = encoder
1889            .encode_pages(&cx, &data, &mut sink)
1890            .expect("encode must succeed");
1891
1892        // Keep only K source symbols (no repair).
1893        let mut source_only = BTreeMap::new();
1894        for esi in 0..(k as u32) {
1895            if let Some(sym) = sink.symbols.get(&esi) {
1896                source_only.insert(esi, sym.clone());
1897            }
1898        }
1899
1900        let mut source = VecPageSource::from_map(source_only);
1901        let decode_result = decoder
1902            .decode_pages(&cx, &mut source, outcome.source_count)
1903            .expect("decode must not error");
1904
1905        match decode_result {
1906            DecodeOutcome::Success(success) => {
1907                assert_eq!(
1908                    success.data, data,
1909                    "bead_id={BEAD_ID} case=exact_k_roundtrip"
1910                );
1911                assert_eq!(
1912                    success.symbols_used, k as u32,
1913                    "bead_id={BEAD_ID} case=exact_k_symbols_used"
1914                );
1915            }
1916            DecodeOutcome::Failure(f) => unreachable!(
1917                "bead_id={BEAD_ID} case=exact_k_should_succeed reason={:?}",
1918                f.reason
1919            ),
1920        }
1921    }
1922
1923    // -------------------------------------------------------------------
1924    // bd-3bw.1: RaptorQ Metrics Tests
1925    //
1926    // Unit tests use a local RaptorQMetrics instance to avoid
1927    // interference from parallel tests sharing the global singleton.
1928    // Integration test verifies the global is wired up.
1929    // -------------------------------------------------------------------
1930
1931    #[test]
1932    fn metrics_struct_encode_counters() {
1933        let m = RaptorQMetrics::new();
1934        m.record_encode(2048, 3);
1935        m.record_encode(4096, 5);
1936
1937        let snap = m.snapshot();
1938        assert_eq!(snap.encode_ops, 2);
1939        assert_eq!(snap.encoded_bytes_total, 6144);
1940        assert_eq!(snap.repair_symbols_generated_total, 8);
1941        assert_eq!(snap.decode_ops, 0);
1942    }
1943
1944    #[test]
1945    fn metrics_struct_decode_counters() {
1946        let m = RaptorQMetrics::new();
1947        m.record_decode_success(4096);
1948        m.record_decode_success(2048);
1949        m.record_decode_failure();
1950
1951        let snap = m.snapshot();
1952        assert_eq!(snap.decode_ops, 3);
1953        assert_eq!(snap.decoded_bytes_total, 6144);
1954        assert_eq!(snap.decode_failures, 1);
1955        assert_eq!(snap.encode_ops, 0);
1956    }
1957
1958    #[test]
1959    fn metrics_snapshot_display() {
1960        let m = RaptorQMetrics::new();
1961        m.record_encode(4096, 2);
1962        m.record_decode_success(4096);
1963        let snap = m.snapshot();
1964        let display = format!("{snap}");
1965        assert!(display.contains("4096"), "encoded bytes in display");
1966        assert!(display.contains("2 repair"), "repair syms in display");
1967    }
1968
1969    #[test]
1970    fn metrics_reset() {
1971        let m = RaptorQMetrics::new();
1972        m.record_encode(1000, 5);
1973        m.record_decode_success(500);
1974        m.record_decode_failure();
1975        m.reset();
1976        let snap = m.snapshot();
1977        assert_eq!(snap.encoded_bytes_total, 0);
1978        assert_eq!(snap.repair_symbols_generated_total, 0);
1979        assert_eq!(snap.encode_ops, 0);
1980        assert_eq!(snap.decode_ops, 0);
1981        assert_eq!(snap.decode_failures, 0);
1982        assert_eq!(snap.decoded_bytes_total, 0);
1983    }
1984
1985    #[test]
1986    fn metrics_global_wired_to_encode_decode() {
1987        // Verify that encode_pages / decode_pages bump the global.
1988        // We use >= on deltas because other parallel tests also touch
1989        // the global singleton.
1990        let before = GLOBAL_RAPTORQ_METRICS.snapshot();
1991
1992        let config = default_config();
1993        let encoder =
1994            RaptorQPageEncoder::new(config.clone(), default_codec()).expect("encoder build");
1995        let decoder =
1996            RaptorQPageDecoder::new(config.clone(), default_codec()).expect("decoder build");
1997        let cx = test_cx();
1998        let k = 4_usize;
1999        let data = deterministic_page_data(k, config.symbol_size as usize, 0xF00D);
2000
2001        let mut sink = VecPageSink::new();
2002        let outcome = encoder.encode_pages(&cx, &data, &mut sink).expect("encode");
2003        let mut source = VecPageSource::from_sink(&sink);
2004        let _decode = decoder
2005            .decode_pages(&cx, &mut source, outcome.source_count)
2006            .expect("decode");
2007
2008        let after = GLOBAL_RAPTORQ_METRICS.snapshot();
2009        assert!(
2010            after.encode_ops > before.encode_ops,
2011            "global encode_ops should have increased"
2012        );
2013        assert!(
2014            after.encoded_bytes_total > before.encoded_bytes_total,
2015            "global encoded_bytes should have increased"
2016        );
2017        assert!(
2018            after.decode_ops > before.decode_ops,
2019            "global decode_ops should have increased"
2020        );
2021        assert!(
2022            after.decoded_bytes_total > before.decoded_bytes_total,
2023            "global decoded_bytes should have increased"
2024        );
2025    }
2026}