Skip to main content

fsqlite_core/
lib.rs

1//! Core bounded-parallelism primitives (§1.5, bd-22n.4).
2//!
3//! This module provides a small bulkhead framework for internal background work.
4//! It is intentionally non-blocking: overflow is rejected with `SQLITE_BUSY`
5//! (`FrankenError::Busy`) instead of queue-and-wait semantics.
6
7pub mod attach;
8pub mod commit_marker;
9pub mod commit_repair;
10pub mod compat_persist;
11pub mod connection;
12pub mod db_fec;
13pub mod decode_proofs;
14pub mod ecs_replication;
15pub mod epoch;
16pub mod explain;
17pub mod inter_object_coding;
18pub mod lrc;
19pub mod native_index;
20pub mod permeation_map;
21pub mod por;
22pub mod raptorq_codec;
23pub mod raptorq_integration;
24pub mod region;
25pub mod remote_effects;
26pub mod repair_engine;
27pub mod repair_symbols;
28pub mod replication_receiver;
29pub mod replication_sender;
30pub mod snapshot_shipping;
31pub mod source_block_partition;
32pub mod symbol_log;
33pub mod symbol_size_policy;
34pub mod tiered_storage;
35pub mod transaction;
36pub mod wal_adapter;
37pub mod wal_fec_adapter;
38
39use std::num::NonZeroUsize;
40use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
41
42use fsqlite_error::{FrankenError, Result};
43use fsqlite_types::{
44    ObjectId, Oti, PayloadHash, Region, SymbolRecord, SymbolRecordFlags, gf256_mul_byte,
45};
46use tracing::{debug, error};
47
48const MAX_BALANCED_BG_CPU: usize = 16;
49
50/// Policy used when the bulkhead admission budget is exhausted.
51#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum OverflowPolicy {
53    /// Reject overflow immediately with `SQLITE_BUSY`.
54    DropBusy,
55}
56
57/// Runtime profile for conservative parallelism defaults.
58#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum ParallelismProfile {
60    /// Conservative profile used by default.
61    Balanced,
62}
63
64/// Bounded parallelism configuration for a work class.
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub struct BulkheadConfig {
67    /// Number of tasks allowed to execute concurrently.
68    pub max_concurrent: usize,
69    /// Additional bounded admission slots (not unbounded queueing).
70    pub queue_depth: usize,
71    /// Overflow behavior when capacity is exhausted.
72    pub overflow_policy: OverflowPolicy,
73}
74
75impl BulkheadConfig {
76    /// Create an explicit configuration.
77    ///
78    /// Returns `None` when `max_concurrent` is zero.
79    #[must_use]
80    pub const fn new(
81        max_concurrent: usize,
82        queue_depth: usize,
83        overflow_policy: OverflowPolicy,
84    ) -> Option<Self> {
85        if max_concurrent == 0 {
86            None
87        } else {
88            Some(Self {
89                max_concurrent,
90                queue_depth,
91                overflow_policy,
92            })
93        }
94    }
95
96    /// Conservative default derived from available CPU parallelism.
97    ///
98    /// Uses the "balanced profile" formula from bd-22n.4:
99    /// `clamp(P / 8, 1, 16)` where `P = available_parallelism`.
100    #[must_use]
101    pub fn for_profile(profile: ParallelismProfile) -> Self {
102        let p = available_parallelism_or_one();
103        match profile {
104            ParallelismProfile::Balanced => Self {
105                max_concurrent: conservative_bg_cpu_max(p),
106                queue_depth: 0,
107                overflow_policy: OverflowPolicy::DropBusy,
108            },
109        }
110    }
111
112    /// Maximum admitted work units at once.
113    #[must_use]
114    pub const fn admission_limit(self) -> usize {
115        self.max_concurrent.saturating_add(self.queue_depth)
116    }
117}
118
119impl Default for BulkheadConfig {
120    fn default() -> Self {
121        Self::for_profile(ParallelismProfile::Balanced)
122    }
123}
124
125/// Compute conservative default background CPU parallelism from `P`.
126#[must_use]
127pub const fn conservative_bg_cpu_max(p: usize) -> usize {
128    let base = p / 8;
129    if base == 0 {
130        1
131    } else if base > MAX_BALANCED_BG_CPU {
132        MAX_BALANCED_BG_CPU
133    } else {
134        base
135    }
136}
137
138/// Return `std::thread::available_parallelism()` with a safe floor of 1.
139#[must_use]
140pub fn available_parallelism_or_one() -> usize {
141    std::thread::available_parallelism().map_or(1, NonZeroUsize::get)
142}
143
144/// Chunking plan for SIMD-friendly wide-word loops.
145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub struct WideChunkLayout {
147    /// Number of `u128` chunks processed.
148    pub u128_chunks: usize,
149    /// Number of `u64` chunks processed after `u128` chunks.
150    pub u64_chunks: usize,
151    /// Remaining tail bytes processed scalar.
152    pub tail_bytes: usize,
153}
154
155impl WideChunkLayout {
156    /// Compute the wide-chunk layout for a byte length.
157    #[must_use]
158    pub const fn for_len(len: usize) -> Self {
159        let u128_chunks = len / 16;
160        let rem_after_u128 = len % 16;
161        let u64_chunks = rem_after_u128 / 8;
162        let tail_bytes = rem_after_u128 % 8;
163        Self {
164            u128_chunks,
165            u64_chunks,
166            tail_bytes,
167        }
168    }
169}
170
171/// XOR patch application using `u128` + `u64` + tail loops.
172///
173/// This is the SIMD-friendly primitive for hot patch paths. LLVM can
174/// auto-vectorize the wide integer loops.
175#[allow(clippy::incompatible_msrv)]
176pub fn xor_patch_wide_chunks(dst: &mut [u8], patch: &[u8]) -> Result<WideChunkLayout> {
177    if dst.len() != patch.len() {
178        return Err(FrankenError::TypeMismatch {
179            expected: format!("equal lengths (dst == patch), got {}", dst.len()),
180            actual: patch.len().to_string(),
181        });
182    }
183
184    let layout = WideChunkLayout::for_len(dst.len());
185
186    let (dst_128, dst_rem) = dst.as_chunks_mut::<16>();
187    let (patch_128, patch_rem) = patch.as_chunks::<16>();
188    for (d, p) in dst_128.iter_mut().zip(patch_128.iter()) {
189        let d_word = u128::from_ne_bytes(*d);
190        let p_word = u128::from_ne_bytes(*p);
191        *d = (d_word ^ p_word).to_ne_bytes();
192    }
193
194    let (dst_64, dst_tail) = dst_rem.as_chunks_mut::<8>();
195    let (patch_64, patch_tail) = patch_rem.as_chunks::<8>();
196    for (d, p) in dst_64.iter_mut().zip(patch_64.iter()) {
197        let d_word = u64::from_ne_bytes(*d);
198        let p_word = u64::from_ne_bytes(*p);
199        *d = (d_word ^ p_word).to_ne_bytes();
200    }
201
202    for (d, p) in dst_tail.iter_mut().zip(patch_tail.iter()) {
203        *d ^= *p;
204    }
205
206    Ok(layout)
207}
208
209/// GF(256) addition (`+`) using wide XOR chunk loops.
210///
211/// In GF(256), addition is XOR, so this uses the same SIMD-friendly chunking
212/// strategy as [`xor_patch_wide_chunks`].
213pub fn gf256_add_assign_chunked(dst: &mut [u8], src: &[u8]) -> Result<WideChunkLayout> {
214    xor_patch_wide_chunks(dst, src)
215}
216
217/// RaptorQ symbol add (`dst ^= src`) using chunked XOR.
218///
219/// This is the core symbol-add primitive from §3.2.2.
220pub fn symbol_add_assign(dst: &mut [u8], src: &[u8]) -> Result<WideChunkLayout> {
221    debug!(
222        bead_id = "bd-1hi.2",
223        op = "symbol_add_assign",
224        symbol_len = dst.len(),
225        "applying in-place XOR over symbol bytes"
226    );
227    gf256_add_assign_chunked(dst, src)
228}
229
230/// RaptorQ symbol scalar multiply (`out = c * src`) in GF(256).
231///
232/// Special cases:
233/// - `c == 0`: zero output
234/// - `c == 1`: copy input
235pub fn symbol_mul_into(coeff: u8, src: &[u8], out: &mut [u8]) -> Result<()> {
236    if src.len() != out.len() {
237        error!(
238            bead_id = "bd-1hi.2",
239            op = "symbol_mul_into",
240            coeff,
241            src_len = src.len(),
242            out_len = out.len(),
243            "symbol length mismatch"
244        );
245        return Err(FrankenError::TypeMismatch {
246            expected: format!("equal lengths (src == out), got {}", src.len()),
247            actual: out.len().to_string(),
248        });
249    }
250
251    debug!(
252        bead_id = "bd-1hi.2",
253        op = "symbol_mul_into",
254        coeff,
255        symbol_len = src.len(),
256        "applying GF(256) scalar multiplication"
257    );
258
259    match coeff {
260        0 => {
261            out.fill(0);
262            Ok(())
263        }
264        1 => {
265            out.copy_from_slice(src);
266            Ok(())
267        }
268        _ => {
269            let mut out_chunks = out.chunks_exact_mut(16);
270            let mut src_chunks = src.chunks_exact(16);
271
272            for (dst_chunk, src_chunk) in out_chunks.by_ref().zip(src_chunks.by_ref()) {
273                for (dst_byte, src_byte) in dst_chunk.iter_mut().zip(src_chunk.iter()) {
274                    *dst_byte = gf256_mul_byte(coeff, *src_byte);
275                }
276            }
277            for (dst_byte, src_byte) in out_chunks
278                .into_remainder()
279                .iter_mut()
280                .zip(src_chunks.remainder().iter())
281            {
282                *dst_byte = gf256_mul_byte(coeff, *src_byte);
283            }
284            Ok(())
285        }
286    }
287}
288
289/// RaptorQ fused multiply-add (`dst ^= c * src`) in GF(256).
290///
291/// Special cases:
292/// - `c == 0`: no-op
293/// - `c == 1`: pure XOR path
294pub fn symbol_addmul_assign(dst: &mut [u8], coeff: u8, src: &[u8]) -> Result<WideChunkLayout> {
295    if dst.len() != src.len() {
296        error!(
297            bead_id = "bd-1hi.2",
298            op = "symbol_addmul_assign",
299            coeff,
300            dst_len = dst.len(),
301            src_len = src.len(),
302            "symbol length mismatch"
303        );
304        return Err(FrankenError::TypeMismatch {
305            expected: format!("equal lengths (dst == src), got {}", dst.len()),
306            actual: src.len().to_string(),
307        });
308    }
309
310    debug!(
311        bead_id = "bd-1hi.2",
312        op = "symbol_addmul_assign",
313        coeff,
314        symbol_len = dst.len(),
315        "applying fused multiply-and-add over symbol bytes"
316    );
317
318    match coeff {
319        0 => Ok(WideChunkLayout::for_len(dst.len())),
320        1 => symbol_add_assign(dst, src),
321        _ => {
322            let mut dst_chunks = dst.chunks_exact_mut(16);
323            let mut src_chunks = src.chunks_exact(16);
324
325            for (dst_chunk, src_chunk) in dst_chunks.by_ref().zip(src_chunks.by_ref()) {
326                for (dst_byte, src_byte) in dst_chunk.iter_mut().zip(src_chunk.iter()) {
327                    *dst_byte ^= gf256_mul_byte(coeff, *src_byte);
328                }
329            }
330            for (dst_byte, src_byte) in dst_chunks
331                .into_remainder()
332                .iter_mut()
333                .zip(src_chunks.remainder().iter())
334            {
335                *dst_byte ^= gf256_mul_byte(coeff, *src_byte);
336            }
337            Ok(WideChunkLayout::for_len(dst.len()))
338        }
339    }
340}
341
342/// Compute xxhash3 + blake3 on a contiguous input buffer.
343///
344/// - `xxhash3` path comes from `SymbolRecord::new` (`frame_xxh3`).
345/// - `blake3` path comes from `PayloadHash::blake3`.
346pub fn simd_friendly_checksum_pair(buffer: &[u8]) -> Result<(u64, [u8; 32])> {
347    let symbol_size = u32::try_from(buffer.len()).map_err(|_| FrankenError::OutOfRange {
348        what: "symbol_size".to_owned(),
349        value: buffer.len().to_string(),
350    })?;
351
352    let symbol_record = SymbolRecord::new(
353        ObjectId::from_bytes([0_u8; ObjectId::LEN]),
354        Oti {
355            f: u64::from(symbol_size),
356            al: 1,
357            t: symbol_size,
358            z: 1,
359            n: 1,
360        },
361        0,
362        buffer.to_vec(),
363        SymbolRecordFlags::empty(),
364    );
365    let blake = PayloadHash::blake3(buffer);
366
367    Ok((symbol_record.frame_xxh3, *blake.as_bytes()))
368}
369
370/// Non-blocking bulkhead admission gate.
371#[derive(Debug)]
372pub struct Bulkhead {
373    config: BulkheadConfig,
374    in_flight: AtomicUsize,
375    peak_in_flight: AtomicUsize,
376    busy_rejections: AtomicUsize,
377}
378
379impl Bulkhead {
380    #[must_use]
381    pub fn new(config: BulkheadConfig) -> Self {
382        Self {
383            config,
384            in_flight: AtomicUsize::new(0),
385            peak_in_flight: AtomicUsize::new(0),
386            busy_rejections: AtomicUsize::new(0),
387        }
388    }
389
390    #[must_use]
391    pub const fn config(&self) -> BulkheadConfig {
392        self.config
393    }
394
395    #[must_use]
396    pub fn in_flight(&self) -> usize {
397        self.in_flight.load(Ordering::Acquire)
398    }
399
400    #[must_use]
401    pub fn peak_in_flight(&self) -> usize {
402        self.peak_in_flight.load(Ordering::Acquire)
403    }
404
405    #[must_use]
406    pub fn busy_rejections(&self) -> usize {
407        self.busy_rejections.load(Ordering::Acquire)
408    }
409
410    /// Try to admit one work item.
411    ///
412    /// Never blocks. If the admission budget is exhausted, this returns
413    /// `FrankenError::Busy`.
414    pub fn try_acquire(&self) -> Result<BulkheadPermit<'_>> {
415        let limit = self.config.admission_limit();
416        loop {
417            let current = self.in_flight.load(Ordering::Acquire);
418            if current >= limit {
419                self.busy_rejections.fetch_add(1, Ordering::AcqRel);
420                return Err(match self.config.overflow_policy {
421                    OverflowPolicy::DropBusy => FrankenError::Busy,
422                });
423            }
424
425            let next = current.saturating_add(1);
426            if self
427                .in_flight
428                .compare_exchange_weak(current, next, Ordering::AcqRel, Ordering::Acquire)
429                .is_ok()
430            {
431                self.peak_in_flight.fetch_max(next, Ordering::AcqRel);
432                return Ok(BulkheadPermit {
433                    bulkhead: self,
434                    released: false,
435                });
436            }
437        }
438    }
439
440    /// Run work within a bulkhead permit.
441    pub fn run<T>(&self, work: impl FnOnce() -> T) -> Result<T> {
442        let _permit = self.try_acquire()?;
443        Ok(work())
444    }
445}
446
447/// RAII permit for a single admitted work item.
448#[derive(Debug)]
449pub struct BulkheadPermit<'a> {
450    bulkhead: &'a Bulkhead,
451    released: bool,
452}
453
454impl BulkheadPermit<'_> {
455    /// Explicitly release the permit.
456    pub fn release(mut self) {
457        if !self.released {
458            self.bulkhead.in_flight.fetch_sub(1, Ordering::AcqRel);
459            self.released = true;
460        }
461    }
462}
463
464impl Drop for BulkheadPermit<'_> {
465    fn drop(&mut self) {
466        if !self.released {
467            self.bulkhead.in_flight.fetch_sub(1, Ordering::AcqRel);
468            self.released = true;
469        }
470    }
471}
472
473/// Region-owned wrapper used for structured-concurrency integration.
474#[derive(Debug)]
475pub struct RegionBulkhead {
476    region: Region,
477    bulkhead: Bulkhead,
478    closing: AtomicBool,
479}
480
481impl RegionBulkhead {
482    #[must_use]
483    pub fn new(region: Region, config: BulkheadConfig) -> Self {
484        Self {
485            region,
486            bulkhead: Bulkhead::new(config),
487            closing: AtomicBool::new(false),
488        }
489    }
490
491    #[must_use]
492    pub const fn region(&self) -> Region {
493        self.region
494    }
495
496    #[must_use]
497    pub fn bulkhead(&self) -> &Bulkhead {
498        &self.bulkhead
499    }
500
501    pub fn try_acquire(&self) -> Result<BulkheadPermit<'_>> {
502        if self.closing.load(Ordering::Acquire) {
503            return Err(FrankenError::Busy);
504        }
505        self.bulkhead.try_acquire()
506    }
507
508    /// Begin region close: no new admissions are allowed after this point.
509    pub fn begin_close(&self) {
510        self.closing.store(true, Ordering::Release);
511    }
512
513    /// Whether all region-owned work has quiesced.
514    #[must_use]
515    pub fn is_quiescent(&self) -> bool {
516        self.bulkhead.in_flight() == 0
517    }
518}
519
520#[cfg(test)]
521mod tests {
522    use std::collections::VecDeque;
523    use std::pin::Pin;
524    use std::sync::Arc;
525    use std::task::{Context, Poll};
526    use std::thread;
527    use std::time::{Duration, Instant};
528
529    use asupersync::raptorq::decoder::{InactivationDecoder, ReceivedSymbol};
530    use asupersync::raptorq::gf256::{Gf256, gf256_add_slice, gf256_addmul_slice, gf256_mul_slice};
531    use asupersync::raptorq::systematic::{ConstraintMatrix, SystematicEncoder};
532    use asupersync::raptorq::{RaptorQReceiverBuilder, RaptorQSenderBuilder};
533    use asupersync::security::AuthenticationTag;
534    use asupersync::security::authenticated::AuthenticatedSymbol;
535    use asupersync::transport::error::{SinkError, StreamError};
536    use asupersync::transport::sink::SymbolSink;
537    use asupersync::transport::stream::SymbolStream;
538    use asupersync::types::{ObjectId as AsObjectId, ObjectParams, Symbol};
539    use asupersync::{Cx, RaptorQConfig};
540    use fsqlite_btree::compare_key_bytes_contiguous;
541
542    use super::*;
543
544    const BEAD_ID: &str = "bd-22n.4";
545    const SIMD_BEAD_ID: &str = "bd-22n.6";
546    const RAPTORQ_BEAD_ID: &str = "bd-1hi.2";
547
548    #[derive(Debug)]
549    struct VecSink {
550        symbols: Vec<Symbol>,
551    }
552
553    impl VecSink {
554        fn new() -> Self {
555            Self {
556                symbols: Vec::new(),
557            }
558        }
559    }
560
561    impl SymbolSink for VecSink {
562        fn poll_send(
563            mut self: Pin<&mut Self>,
564            _cx: &mut Context<'_>,
565            symbol: AuthenticatedSymbol,
566        ) -> Poll<std::result::Result<(), SinkError>> {
567            self.symbols.push(symbol.into_symbol());
568            Poll::Ready(Ok(()))
569        }
570
571        fn poll_flush(
572            self: Pin<&mut Self>,
573            _cx: &mut Context<'_>,
574        ) -> Poll<std::result::Result<(), SinkError>> {
575            Poll::Ready(Ok(()))
576        }
577
578        fn poll_close(
579            self: Pin<&mut Self>,
580            _cx: &mut Context<'_>,
581        ) -> Poll<std::result::Result<(), SinkError>> {
582            Poll::Ready(Ok(()))
583        }
584
585        fn poll_ready(
586            self: Pin<&mut Self>,
587            _cx: &mut Context<'_>,
588        ) -> Poll<std::result::Result<(), SinkError>> {
589            Poll::Ready(Ok(()))
590        }
591    }
592
593    #[derive(Debug)]
594    struct VecStream {
595        q: VecDeque<AuthenticatedSymbol>,
596    }
597
598    impl VecStream {
599        fn new(symbols: Vec<Symbol>) -> Self {
600            let q = symbols
601                .into_iter()
602                .map(|symbol| AuthenticatedSymbol::new_verified(symbol, AuthenticationTag::zero()))
603                .collect();
604            Self { q }
605        }
606    }
607
608    impl SymbolStream for VecStream {
609        fn poll_next(
610            mut self: Pin<&mut Self>,
611            _cx: &mut Context<'_>,
612        ) -> Poll<Option<std::result::Result<AuthenticatedSymbol, StreamError>>> {
613            match self.q.pop_front() {
614                Some(symbol) => Poll::Ready(Some(Ok(symbol))),
615                None => Poll::Ready(None),
616            }
617        }
618
619        fn size_hint(&self) -> (usize, Option<usize>) {
620            (self.q.len(), Some(self.q.len()))
621        }
622
623        fn is_exhausted(&self) -> bool {
624            self.q.is_empty()
625        }
626    }
627
628    fn raptorq_config(symbol_size: u16, repair_overhead: f64) -> RaptorQConfig {
629        let mut config = RaptorQConfig::default();
630        config.encoding.symbol_size = symbol_size;
631        config.encoding.max_block_size = 64 * 1024;
632        config.encoding.repair_overhead = repair_overhead;
633        config
634    }
635
636    fn deterministic_payload(len: usize, seed: u64) -> Vec<u8> {
637        let mut state = seed ^ 0x9E37_79B9_7F4A_7C15;
638        let mut out = Vec::with_capacity(len);
639        for idx in 0..len {
640            state ^= state << 7;
641            state ^= state >> 9;
642            state = state.wrapping_mul(0xA24B_AED4_963E_E407);
643            let idx_byte = u8::try_from(idx % 251).expect("modulo fits in u8");
644            out.push(u8::try_from(state & 0xFF).expect("masked to u8") ^ idx_byte);
645        }
646        out
647    }
648
649    fn xor_patch_bytewise(dst: &mut [u8], patch: &[u8]) {
650        for (dst_byte, patch_byte) in dst.iter_mut().zip(patch.iter()) {
651            *dst_byte ^= *patch_byte;
652        }
653    }
654
655    fn gf256_mul_bytewise(coeff: u8, src: &[u8], out: &mut [u8]) {
656        for (dst_byte, src_byte) in out.iter_mut().zip(src.iter()) {
657            *dst_byte = gf256_mul_byte(coeff, *src_byte);
658        }
659    }
660
661    fn collect_rs_files(root: &std::path::Path, out: &mut Vec<std::path::PathBuf>) {
662        let entries = std::fs::read_dir(root).expect("read_dir should succeed");
663        for entry in entries {
664            let path = entry.expect("read_dir entry should be readable").path();
665            if path.is_dir() {
666                collect_rs_files(&path, out);
667            } else if path.extension().is_some_and(|ext| ext == "rs") {
668                out.push(path);
669            }
670        }
671    }
672
673    fn encode_symbols(
674        config: RaptorQConfig,
675        object_id: AsObjectId,
676        data: &[u8],
677    ) -> (Vec<Symbol>, usize) {
678        let cx = Cx::for_testing();
679        let mut sender = RaptorQSenderBuilder::new()
680            .config(config)
681            .transport(VecSink::new())
682            .build()
683            .expect("sender build");
684        let outcome = sender
685            .send_object(&cx, object_id, data)
686            .expect("send_object must succeed");
687        let symbols = std::mem::take(&mut sender.transport_mut().symbols);
688        tracing::debug!(
689            bead_id = RAPTORQ_BEAD_ID,
690            case = "encode_symbols",
691            source_symbols = outcome.source_symbols,
692            emitted_symbols = symbols.len(),
693            object_size = data.len(),
694            "encoded object into source+repair symbol stream"
695        );
696        (symbols, outcome.source_symbols)
697    }
698
699    #[allow(clippy::result_large_err)]
700    fn decode_symbols(
701        config: RaptorQConfig,
702        object_id: AsObjectId,
703        object_size: usize,
704        source_symbols: usize,
705        symbols: Vec<Symbol>,
706    ) -> std::result::Result<Vec<u8>, asupersync::Error> {
707        let cx = Cx::for_testing();
708        let params = ObjectParams::new(
709            object_id,
710            u64::try_from(object_size).expect("object size fits u64"),
711            config.encoding.symbol_size,
712            1,
713            u16::try_from(source_symbols).expect("source symbol count fits u16"),
714        );
715        let mut receiver = RaptorQReceiverBuilder::new()
716            .config(config)
717            .source(VecStream::new(symbols))
718            .build()
719            .expect("receiver build");
720
721        receiver
722            .receive_object(&cx, &params)
723            .map(|outcome| outcome.data)
724    }
725
726    fn split_source_and_repair(
727        symbols: &[Symbol],
728        source_symbols: usize,
729    ) -> (Vec<Symbol>, Vec<Symbol>) {
730        let source_symbols_u32 =
731            u32::try_from(source_symbols).expect("source symbol count fits u32");
732        let mut sources = Vec::new();
733        let mut repairs = Vec::new();
734        for symbol in symbols {
735            if symbol.esi() < source_symbols_u32 {
736                sources.push(symbol.clone());
737            } else {
738                repairs.push(symbol.clone());
739            }
740        }
741        (sources, repairs)
742    }
743
744    fn low_level_source_block(k: usize, symbol_size: usize, seed: u64) -> Vec<Vec<u8>> {
745        (0..k)
746            .map(|source_index| {
747                deterministic_payload(
748                    symbol_size,
749                    seed + u64::try_from(source_index).expect("source index fits u64"),
750                )
751            })
752            .collect()
753    }
754
755    fn append_source_received_symbols(
756        received: &mut Vec<ReceivedSymbol>,
757        constraints: &ConstraintMatrix,
758        base_rows: usize,
759        k_prime: usize,
760        symbol_size: usize,
761        source: &[Vec<u8>],
762        source_indexes: &[usize],
763    ) {
764        for &source_index in source_indexes {
765            let row = base_rows + source_index;
766            let mut columns = Vec::new();
767            let mut coefficients = Vec::new();
768            for col in 0..constraints.cols {
769                let coeff = constraints.get(row, col);
770                if !coeff.is_zero() {
771                    columns.push(col);
772                    coefficients.push(coeff);
773                }
774            }
775
776            received.push(ReceivedSymbol {
777                esi: u32::try_from(source_index).expect("source index fits u32"),
778                is_source: true,
779                columns,
780                coefficients,
781                data: source[source_index].clone(),
782            });
783        }
784
785        // RFC 6330 decode domain uses K' source-domain rows, not just K.
786        // The K'−K PI rows correspond to zero-padded source symbols.
787        for source_index in source.len()..k_prime {
788            let row = base_rows + source_index;
789            let mut columns = Vec::new();
790            let mut coefficients = Vec::new();
791            for col in 0..constraints.cols {
792                let coeff = constraints.get(row, col);
793                if !coeff.is_zero() {
794                    columns.push(col);
795                    coefficients.push(coeff);
796                }
797            }
798
799            received.push(ReceivedSymbol {
800                esi: u32::try_from(source_index).expect("source index fits u32"),
801                is_source: true,
802                columns,
803                coefficients,
804                data: vec![0_u8; symbol_size],
805            });
806        }
807    }
808
809    #[test]
810    fn test_parallelism_defaults_conservative() {
811        assert_eq!(
812            conservative_bg_cpu_max(16),
813            2,
814            "bead_id={BEAD_ID} case=balanced_profile_formula_p16"
815        );
816        assert_eq!(
817            conservative_bg_cpu_max(1),
818            1,
819            "bead_id={BEAD_ID} case=balanced_profile_min_floor"
820        );
821        assert_eq!(
822            conservative_bg_cpu_max(512),
823            16,
824            "bead_id={BEAD_ID} case=balanced_profile_max_cap"
825        );
826    }
827
828    #[test]
829    fn test_parallelism_bounded_by_available() {
830        let cfg = BulkheadConfig::default();
831        let p = available_parallelism_or_one();
832        assert!(
833            cfg.max_concurrent <= p,
834            "bead_id={BEAD_ID} case=default_exceeds_available_parallelism cfg={cfg:?} p={p}"
835        );
836
837        let bulkhead = Bulkhead::new(cfg);
838        let mut permits = Vec::new();
839        for _ in 0..cfg.admission_limit() {
840            permits.push(
841                bulkhead
842                    .try_acquire()
843                    .expect("admission under configured limit should succeed"),
844            );
845        }
846
847        let overflow = bulkhead.try_acquire();
848        assert!(
849            matches!(overflow, Err(FrankenError::Busy)),
850            "bead_id={BEAD_ID} case=bounded_admission_overflow_must_be_busy overflow={overflow:?}"
851        );
852
853        drop(permits);
854        assert_eq!(
855            bulkhead.in_flight(),
856            0,
857            "bead_id={BEAD_ID} case=permits_drop_to_zero"
858        );
859    }
860
861    #[test]
862    fn test_bulkhead_config_max_concurrent() {
863        let cfg = BulkheadConfig::new(3, 0, OverflowPolicy::DropBusy)
864            .expect("non-zero max_concurrent must be valid");
865        let bulkhead = Bulkhead::new(cfg);
866
867        let p1 = bulkhead.try_acquire().expect("slot 1");
868        let p2 = bulkhead.try_acquire().expect("slot 2");
869        let p3 = bulkhead.try_acquire().expect("slot 3");
870        let overflow = bulkhead.try_acquire();
871
872        assert!(
873            matches!(overflow, Err(FrankenError::Busy)),
874            "bead_id={BEAD_ID} case=max_concurrent_enforced overflow={overflow:?}"
875        );
876        drop((p1, p2, p3));
877    }
878
879    #[test]
880    fn test_overflow_policy_drop_with_busy() {
881        let cfg = BulkheadConfig::new(1, 0, OverflowPolicy::DropBusy)
882            .expect("non-zero max_concurrent must be valid");
883        let bulkhead = Bulkhead::new(cfg);
884        let _permit = bulkhead.try_acquire().expect("first permit must succeed");
885
886        let overflow = bulkhead.try_acquire();
887        assert!(
888            matches!(overflow, Err(FrankenError::Busy)),
889            "bead_id={BEAD_ID} case=overflow_policy_drop_busy overflow={overflow:?}"
890        );
891    }
892
893    #[test]
894    fn test_background_work_degrades_gracefully() {
895        let cfg = BulkheadConfig::new(2, 0, OverflowPolicy::DropBusy)
896            .expect("non-zero max_concurrent must be valid");
897        let bulkhead = Bulkhead::new(cfg);
898
899        let _a = bulkhead.try_acquire().expect("permit a");
900        let _b = bulkhead.try_acquire().expect("permit b");
901
902        for _ in 0..8 {
903            let result = bulkhead.try_acquire();
904            assert!(
905                matches!(result, Err(FrankenError::Busy)),
906                "bead_id={BEAD_ID} case=overflow_must_reject_not_wait result={result:?}"
907            );
908        }
909
910        assert_eq!(
911            bulkhead.busy_rejections(),
912            8,
913            "bead_id={BEAD_ID} case=busy_rejection_counter"
914        );
915    }
916
917    #[test]
918    fn test_region_integration() {
919        let cfg = BulkheadConfig::new(1, 0, OverflowPolicy::DropBusy)
920            .expect("non-zero max_concurrent must be valid");
921        let region_bulkhead = RegionBulkhead::new(Region::new(7), cfg);
922        assert_eq!(
923            region_bulkhead.region().get(),
924            7,
925            "bead_id={BEAD_ID} case=region_id_plumbed"
926        );
927
928        let permit = region_bulkhead.try_acquire().expect("first permit");
929        assert!(
930            !region_bulkhead.is_quiescent(),
931            "bead_id={BEAD_ID} case=region_non_quiescent_with_active_work"
932        );
933
934        region_bulkhead.begin_close();
935        let after_close = region_bulkhead.try_acquire();
936        assert!(
937            matches!(after_close, Err(FrankenError::Busy)),
938            "bead_id={BEAD_ID} case=region_close_blocks_new_work result={after_close:?}"
939        );
940
941        drop(permit);
942        assert!(
943            region_bulkhead.is_quiescent(),
944            "bead_id={BEAD_ID} case=region_quiescent_after_permit_drop"
945        );
946    }
947
948    #[test]
949    fn test_gf256_ops_chunked() {
950        let mut dst = vec![0xAA_u8; 40];
951        let src = vec![0x55_u8; 40];
952        let expected: Vec<u8> = dst.iter().zip(src.iter()).map(|(d, s)| *d ^ *s).collect();
953
954        let layout = gf256_add_assign_chunked(&mut dst, &src)
955            .expect("equal-length buffers should be accepted");
956        assert!(
957            layout.u128_chunks > 0 || layout.u64_chunks > 0,
958            "bead_id={SIMD_BEAD_ID} case=wide_chunks_expected layout={layout:?}"
959        );
960        assert_eq!(
961            dst, expected,
962            "bead_id={SIMD_BEAD_ID} case=gf256_addition_xor_equivalence"
963        );
964    }
965
966    #[test]
967    fn test_xor_patch_wide_chunks() {
968        let mut dst = vec![0xF0_u8; 37];
969        let patch = vec![0x0F_u8; 37];
970        let expected: Vec<u8> = dst.iter().zip(patch.iter()).map(|(d, p)| *d ^ *p).collect();
971
972        let layout =
973            xor_patch_wide_chunks(&mut dst, &patch).expect("equal-length buffers should be valid");
974        assert_eq!(
975            layout,
976            WideChunkLayout {
977                u128_chunks: 2,
978                u64_chunks: 0,
979                tail_bytes: 5,
980            },
981            "bead_id={SIMD_BEAD_ID} case=chunk_layout_expected"
982        );
983        assert_eq!(
984            dst, expected,
985            "bead_id={SIMD_BEAD_ID} case=xor_patch_matches_scalar_reference"
986        );
987    }
988
989    #[test]
990    fn test_xor_symbols_u64_chunks() {
991        // Length 24 exercises both the u128 and u64 lanes.
992        let mut dst = vec![0xAB_u8; 24];
993        let src = vec![0xCD_u8; 24];
994        let mut expected = vec![0xAB_u8; 24];
995        xor_patch_bytewise(&mut expected, &src);
996
997        let layout = xor_patch_wide_chunks(&mut dst, &src).expect("equal-length buffers");
998        assert_eq!(
999            layout,
1000            WideChunkLayout {
1001                u128_chunks: 1,
1002                u64_chunks: 1,
1003                tail_bytes: 0,
1004            },
1005            "bead_id=bd-2ddc case=u64_chunk_lane_exercised"
1006        );
1007        assert_eq!(
1008            dst, expected,
1009            "bead_id=bd-2ddc case=chunked_xor_matches_bytewise_reference"
1010        );
1011    }
1012
1013    #[test]
1014    fn test_gf256_multiply_chunks() {
1015        let coeff = 0xA7_u8;
1016        let src = deterministic_payload(4096, 0xDDCC_BBAA_1122_3344);
1017        let mut chunked = vec![0_u8; src.len()];
1018        let mut scalar = vec![0_u8; src.len()];
1019
1020        symbol_mul_into(coeff, &src, &mut chunked).expect("chunked symbol_mul_into");
1021        gf256_mul_bytewise(coeff, &src, &mut scalar);
1022
1023        assert_eq!(
1024            chunked, scalar,
1025            "bead_id=bd-2ddc case=chunked_mul_matches_scalar_reference"
1026        );
1027    }
1028
1029    #[test]
1030    fn test_u128_chunk_alignment() {
1031        // Non-multiple of 16 exercises the u128 path + tail handling.
1032        let mut via_wide = deterministic_payload(4099, 0x1234_5678_9ABC_DEF0);
1033        let mut via_u64_only = via_wide.clone();
1034        let patch = deterministic_payload(4099, 0x0F0E_0D0C_0B0A_0908);
1035
1036        xor_patch_wide_chunks(&mut via_wide, &patch).expect("wide chunk xor");
1037
1038        let mut dst_u64_chunks = via_u64_only.chunks_exact_mut(8);
1039        let mut patch_u64_chunks = patch.chunks_exact(8);
1040        for (dst_chunk, patch_chunk) in dst_u64_chunks.by_ref().zip(patch_u64_chunks.by_ref()) {
1041            let dst_word = u64::from_ne_bytes(
1042                dst_chunk
1043                    .try_into()
1044                    .expect("chunks_exact(8) must yield 8-byte chunk"),
1045            );
1046            let patch_word = u64::from_ne_bytes(
1047                patch_chunk
1048                    .try_into()
1049                    .expect("chunks_exact(8) must yield 8-byte chunk"),
1050            );
1051            dst_chunk.copy_from_slice(&(dst_word ^ patch_word).to_ne_bytes());
1052        }
1053        for (dst_byte, patch_byte) in dst_u64_chunks
1054            .into_remainder()
1055            .iter_mut()
1056            .zip(patch_u64_chunks.remainder().iter())
1057        {
1058            *dst_byte ^= *patch_byte;
1059        }
1060
1061        assert_eq!(
1062            via_wide, via_u64_only,
1063            "bead_id=bd-2ddc case=u128_lane_matches_u64_plus_tail"
1064        );
1065    }
1066
1067    #[test]
1068    fn test_benchmark_chunk_vs_byte() {
1069        // Meaningful performance checks require optimized codegen.
1070        if cfg!(debug_assertions) {
1071            return;
1072        }
1073
1074        let iterations = 32_000_usize;
1075        let src = deterministic_payload(4096, 0xDEAD_BEEF_F00D_CAFE);
1076        let base = deterministic_payload(4096, 0x0123_4567_89AB_CDEF);
1077
1078        let mut chunked = base.clone();
1079        let chunked_start = Instant::now();
1080        for _ in 0..iterations {
1081            xor_patch_wide_chunks(&mut chunked, &src).expect("chunked xor");
1082            std::hint::black_box(&chunked);
1083        }
1084        let chunked_elapsed = chunked_start.elapsed();
1085
1086        let mut bytewise = base;
1087        let bytewise_start = Instant::now();
1088        for _ in 0..iterations {
1089            xor_patch_bytewise(&mut bytewise, &src);
1090            std::hint::black_box(&bytewise);
1091        }
1092        let bytewise_elapsed = bytewise_start.elapsed();
1093
1094        let speedup = bytewise_elapsed.as_secs_f64() / chunked_elapsed.as_secs_f64();
1095        assert!(
1096            speedup >= 4.0,
1097            "bead_id=bd-2ddc case=chunk_vs_byte_speedup speedup={speedup:.2}x \
1098             chunked_ns={} bytewise_ns={} iterations={iterations}",
1099            chunked_elapsed.as_nanos(),
1100            bytewise_elapsed.as_nanos()
1101        );
1102    }
1103
1104    #[test]
1105    fn test_no_unsafe_simd() {
1106        let manifest = include_str!("../../../Cargo.toml");
1107        assert!(
1108            manifest.contains(r#"unsafe_code = "forbid""#),
1109            "bead_id=bd-2ddc case=workspace_forbids_unsafe"
1110        );
1111
1112        let workspace_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
1113            .parent()
1114            .expect("crate dir has parent")
1115            .parent()
1116            .expect("workspace root exists")
1117            .to_path_buf();
1118        let crates_dir = workspace_root.join("crates");
1119
1120        let mut rs_files = Vec::new();
1121        collect_rs_files(&crates_dir, &mut rs_files);
1122
1123        let simd_needles = [
1124            "_mm_",
1125            "std::arch::",
1126            "core::arch::",
1127            "__m128",
1128            "__m256",
1129            "__m512",
1130            "simd_shuffle",
1131            "vpxor",
1132            "vxorq",
1133        ];
1134
1135        let mut offenders = Vec::new();
1136        for file in rs_files {
1137            let Ok(content) = std::fs::read_to_string(&file) else {
1138                continue;
1139            };
1140
1141            let lines = content.lines().collect::<Vec<_>>();
1142            for (idx, line) in lines.iter().enumerate() {
1143                let has_intrinsic = simd_needles.iter().any(|needle| line.contains(needle));
1144                if !has_intrinsic {
1145                    continue;
1146                }
1147
1148                let window_start = idx.saturating_sub(3);
1149                let window_end = (idx + 3).min(lines.len().saturating_sub(1));
1150                let mut found_unsafe_nearby = false;
1151                for nearby in &lines[window_start..=window_end] {
1152                    let trimmed = nearby.trim_start();
1153                    let is_comment = trimmed.starts_with("//");
1154                    if !is_comment && trimmed.contains("unsafe") {
1155                        found_unsafe_nearby = true;
1156                        break;
1157                    }
1158                }
1159
1160                if found_unsafe_nearby {
1161                    offenders.push(format!("{}:{}", file.display(), idx + 1));
1162                }
1163            }
1164        }
1165
1166        assert!(
1167            offenders.is_empty(),
1168            "bead_id=bd-2ddc case=no_unsafe_simd_intrinsics offenders={offenders:?}"
1169        );
1170    }
1171
1172    #[test]
1173    fn test_checksum_simd_friendly() {
1174        let buffer = vec![0x11_u8; 256];
1175        let (xx_a, blake_a) =
1176            simd_friendly_checksum_pair(&buffer).expect("checksum pair must succeed");
1177
1178        let mut modified = buffer;
1179        modified[255] ^= 0x01;
1180        let (xx_b, blake_b) =
1181            simd_friendly_checksum_pair(&modified).expect("checksum pair must succeed");
1182
1183        assert_ne!(
1184            xx_a, xx_b,
1185            "bead_id={SIMD_BEAD_ID} case=xxhash3_changes_on_byte_flip"
1186        );
1187        assert_ne!(
1188            blake_a, blake_b,
1189            "bead_id={SIMD_BEAD_ID} case=blake3_changes_on_byte_flip"
1190        );
1191    }
1192
1193    #[test]
1194    fn test_e2e_bounded_parallelism_under_background_load() {
1195        let cfg = BulkheadConfig::new(4, 0, OverflowPolicy::DropBusy)
1196            .expect("non-zero max_concurrent must be valid");
1197        let bulkhead = Arc::new(Bulkhead::new(cfg));
1198
1199        let handles: Vec<_> = (0..48)
1200            .map(|_| {
1201                let bulkhead = Arc::clone(&bulkhead);
1202                thread::spawn(move || {
1203                    bulkhead.run(|| {
1204                        thread::sleep(Duration::from_millis(10));
1205                    })
1206                })
1207            })
1208            .collect();
1209
1210        let mut busy = 0_usize;
1211        for handle in handles {
1212            match handle.join().expect("worker thread should not panic") {
1213                Ok(()) => {}
1214                Err(FrankenError::Busy) => busy = busy.saturating_add(1),
1215                Err(err) => {
1216                    assert_eq!(
1217                        err.error_code(),
1218                        fsqlite_error::ErrorCode::Busy,
1219                        "bead_id={BEAD_ID} case=e2e_unexpected_bulkhead_error err={err}"
1220                    );
1221                    busy = busy.saturating_add(1);
1222                }
1223            }
1224        }
1225
1226        assert!(
1227            busy > 0,
1228            "bead_id={BEAD_ID} case=e2e_should_observe_overflow_rejections"
1229        );
1230        assert!(
1231            bulkhead.peak_in_flight() <= cfg.admission_limit(),
1232            "bead_id={BEAD_ID} case=e2e_peak_parallelism_exceeded peak={} limit={}",
1233            bulkhead.peak_in_flight(),
1234            cfg.admission_limit()
1235        );
1236    }
1237
1238    #[test]
1239    fn test_e2e_simd_hot_path_correctness() {
1240        // 1) B-tree hot comparison over contiguous slices.
1241        let contiguous = b"key-0001key-0002".to_vec();
1242        let left = &contiguous[0..8];
1243        let right = &contiguous[8..16];
1244        let compare_start = Instant::now();
1245        assert_eq!(
1246            compare_key_bytes_contiguous(left, right),
1247            left.cmp(right),
1248            "bead_id={SIMD_BEAD_ID} case=btree_contiguous_compare_correct"
1249        );
1250        let compare_elapsed = compare_start.elapsed();
1251
1252        // 2) GF(256) add (XOR) and XOR patch helpers.
1253        let mut symbol_a = (0_u8..64).collect::<Vec<u8>>();
1254        let symbol_b = (64_u8..128).collect::<Vec<u8>>();
1255        let expected_add: Vec<u8> = symbol_a
1256            .iter()
1257            .zip(symbol_b.iter())
1258            .map(|(a, b)| *a ^ *b)
1259            .collect();
1260        let gf256_start = Instant::now();
1261        gf256_add_assign_chunked(&mut symbol_a, &symbol_b).expect("gf256 add should succeed");
1262        let gf256_elapsed = gf256_start.elapsed();
1263        assert_eq!(
1264            symbol_a, expected_add,
1265            "bead_id={SIMD_BEAD_ID} case=gf256_chunked_add_correct"
1266        );
1267
1268        let mut patch_target = vec![0x33_u8; 64];
1269        let patch = vec![0xCC_u8; 64];
1270        let xor_start = Instant::now();
1271        xor_patch_wide_chunks(&mut patch_target, &patch).expect("xor patch should succeed");
1272        let xor_elapsed = xor_start.elapsed();
1273        assert!(
1274            patch_target.iter().all(|&byte| byte == (0x33_u8 ^ 0xCC_u8)),
1275            "bead_id={SIMD_BEAD_ID} case=xor_patch_chunked_correct"
1276        );
1277
1278        // 3) SIMD-friendly checksum feed.
1279        let checksum_start = Instant::now();
1280        let (xx, blake) =
1281            simd_friendly_checksum_pair(&patch_target).expect("checksum pair should succeed");
1282        let checksum_elapsed = checksum_start.elapsed();
1283        assert_ne!(
1284            xx, 0,
1285            "bead_id={SIMD_BEAD_ID} case=xxhash3_nonzero_for_nonempty_payload"
1286        );
1287        assert!(
1288            blake.iter().any(|&b| b != 0),
1289            "bead_id={SIMD_BEAD_ID} case=blake3_digest_nonzero"
1290        );
1291
1292        eprintln!(
1293            "bead_id={SIMD_BEAD_ID} metric=simd_hot_path_ns compare={} gf256_add={} xor_patch={} checksum={}",
1294            compare_elapsed.as_nanos(),
1295            gf256_elapsed.as_nanos(),
1296            xor_elapsed.as_nanos(),
1297            checksum_elapsed.as_nanos()
1298        );
1299    }
1300
1301    #[test]
1302    fn test_symbol_add_self_inverse() {
1303        let src = (0_u16..512)
1304            .map(|idx| u8::try_from(idx % 251).expect("modulo fits in u8"))
1305            .collect::<Vec<_>>();
1306        let mut dst = src.clone();
1307        symbol_add_assign(&mut dst, &src).expect("symbol_add should succeed");
1308        assert!(
1309            dst.iter().all(|byte| *byte == 0),
1310            "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_self_inverse"
1311        );
1312    }
1313
1314    #[test]
1315    fn test_symbol_add_commutative_and_associative() {
1316        let a = (0_u16..128)
1317            .map(|idx| u8::try_from((idx * 3) % 251).expect("modulo fits"))
1318            .collect::<Vec<_>>();
1319        let b = (0_u16..128)
1320            .map(|idx| u8::try_from((idx * 5 + 7) % 251).expect("modulo fits"))
1321            .collect::<Vec<_>>();
1322        let c = (0_u16..128)
1323            .map(|idx| u8::try_from((idx * 11 + 13) % 251).expect("modulo fits"))
1324            .collect::<Vec<_>>();
1325
1326        let mut ab = a.clone();
1327        symbol_add_assign(&mut ab, &b).expect("a+b");
1328        let mut ba = b.clone();
1329        symbol_add_assign(&mut ba, &a).expect("b+a");
1330        assert_eq!(
1331            ab, ba,
1332            "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_commutative"
1333        );
1334
1335        let mut lhs = a.clone();
1336        symbol_add_assign(&mut lhs, &b).expect("(a+b)");
1337        symbol_add_assign(&mut lhs, &c).expect("(a+b)+c");
1338
1339        let mut rhs = b;
1340        symbol_add_assign(&mut rhs, &c).expect("(b+c)");
1341        let mut rhs2 = a;
1342        symbol_add_assign(&mut rhs2, &rhs).expect("a+(b+c)");
1343
1344        assert_eq!(
1345            lhs, rhs2,
1346            "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_associative"
1347        );
1348    }
1349
1350    #[test]
1351    fn test_symbol_mul_special_cases() {
1352        let src = (0_u16..256)
1353            .map(|idx| u8::try_from(idx).expect("idx fits"))
1354            .collect::<Vec<_>>();
1355
1356        let mut out_zero = vec![0_u8; src.len()];
1357        symbol_mul_into(0, &src, &mut out_zero).expect("mul by zero");
1358        assert!(
1359            out_zero.iter().all(|byte| *byte == 0),
1360            "bead_id={RAPTORQ_BEAD_ID} case=symbol_mul_zero"
1361        );
1362
1363        let mut out_one = vec![0_u8; src.len()];
1364        symbol_mul_into(1, &src, &mut out_one).expect("mul by one");
1365        assert_eq!(
1366            out_one, src,
1367            "bead_id={RAPTORQ_BEAD_ID} case=symbol_mul_identity"
1368        );
1369    }
1370
1371    #[test]
1372    fn test_symbol_mul_matches_scalar_reference() {
1373        let src = (0_u16..512)
1374            .map(|idx| u8::try_from((idx * 7 + 17) % 251).expect("modulo fits"))
1375            .collect::<Vec<_>>();
1376        let coeff = 0xA7_u8;
1377
1378        let mut out = vec![0_u8; src.len()];
1379        symbol_mul_into(coeff, &src, &mut out).expect("symbol mul");
1380        for (actual, input) in out.iter().zip(src.iter()) {
1381            let expected = gf256_mul_byte(coeff, *input);
1382            assert_eq!(
1383                *actual, expected,
1384                "bead_id={RAPTORQ_BEAD_ID} case=symbol_mul_scalar_match"
1385            );
1386        }
1387    }
1388
1389    #[test]
1390    fn test_symbol_addmul_special_cases_and_equivalence() {
1391        let src = (0_u16..512)
1392            .map(|idx| u8::try_from((idx * 13 + 19) % 251).expect("modulo fits"))
1393            .collect::<Vec<_>>();
1394        let original = (0_u16..512)
1395            .map(|idx| u8::try_from((idx * 9 + 3) % 251).expect("modulo fits"))
1396            .collect::<Vec<_>>();
1397
1398        let mut no_op = original.clone();
1399        symbol_addmul_assign(&mut no_op, 0, &src).expect("c=0");
1400        assert_eq!(
1401            no_op, original,
1402            "bead_id={RAPTORQ_BEAD_ID} case=symbol_addmul_c0_noop"
1403        );
1404
1405        let mut xor_path = original.clone();
1406        symbol_addmul_assign(&mut xor_path, 1, &src).expect("c=1");
1407        let mut expected_xor = original.clone();
1408        symbol_add_assign(&mut expected_xor, &src).expect("xor reference");
1409        assert_eq!(
1410            xor_path, expected_xor,
1411            "bead_id={RAPTORQ_BEAD_ID} case=symbol_addmul_c1_equals_xor"
1412        );
1413
1414        let coeff = 0x53_u8;
1415        let mut fused = original.clone();
1416        symbol_addmul_assign(&mut fused, coeff, &src).expect("fused");
1417        let mut mul = vec![0_u8; src.len()];
1418        symbol_mul_into(coeff, &src, &mut mul).expect("mul");
1419        let mut separate = original;
1420        symbol_add_assign(&mut separate, &mul).expect("add");
1421        assert_eq!(
1422            fused, separate,
1423            "bead_id={RAPTORQ_BEAD_ID} case=symbol_addmul_fused_equals_mul_plus_add"
1424        );
1425    }
1426
1427    #[test]
1428    fn test_symbol_operations_4096_and_512() {
1429        for symbol_len in [4096_usize, 1024_usize, 512_usize] {
1430            let a = vec![0xAA_u8; symbol_len];
1431            let b = vec![0x55_u8; symbol_len];
1432            let mut sum = a.clone();
1433            let layout = symbol_add_assign(&mut sum, &b).expect("symbol add");
1434            assert_eq!(
1435                layout,
1436                WideChunkLayout::for_len(symbol_len),
1437                "bead_id={RAPTORQ_BEAD_ID} case=symbol_len_layout_consistency len={symbol_len}"
1438            );
1439            assert!(
1440                sum.iter().all(|byte| *byte == (0xAA_u8 ^ 0x55_u8)),
1441                "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_expected_xor len={symbol_len}"
1442            );
1443        }
1444    }
1445
1446    #[test]
1447    fn test_gf256_arithmetic_matches_asupersync() {
1448        for a in 0_u8..=u8::MAX {
1449            for b in 0_u8..=u8::MAX {
1450                assert_eq!(
1451                    gf256_mul_byte(a, b),
1452                    (Gf256(a) * Gf256(b)).raw(),
1453                    "bead_id={RAPTORQ_BEAD_ID} case=gf256_mul_parity a=0x{a:02X} b=0x{b:02X}"
1454                );
1455            }
1456        }
1457    }
1458
1459    #[test]
1460    fn test_symbol_ops_match_asupersync_gf256_slices() {
1461        for symbol_len in [512_usize, 1024_usize, 4096_usize] {
1462            let src = deterministic_payload(symbol_len, 0xA5A5_0101);
1463            let dst = deterministic_payload(symbol_len, 0x5A5A_0202);
1464
1465            let mut ours_add = dst.clone();
1466            symbol_add_assign(&mut ours_add, &src).expect("symbol add");
1467            let mut as_add = dst.clone();
1468            gf256_add_slice(&mut as_add, &src);
1469            assert_eq!(
1470                ours_add, as_add,
1471                "bead_id={RAPTORQ_BEAD_ID} case=asupersync_parity_add len={symbol_len}"
1472            );
1473
1474            for coeff in [0_u8, 1_u8, 0x53_u8, 0xA7_u8] {
1475                let mut ours_mul = vec![0_u8; symbol_len];
1476                symbol_mul_into(coeff, &src, &mut ours_mul).expect("symbol mul");
1477                let mut as_mul = src.clone();
1478                gf256_mul_slice(&mut as_mul, Gf256(coeff));
1479                assert_eq!(
1480                    ours_mul, as_mul,
1481                    "bead_id={RAPTORQ_BEAD_ID} case=asupersync_parity_mul len={symbol_len} coeff=0x{coeff:02X}"
1482                );
1483
1484                let mut ours_addmul = dst.clone();
1485                symbol_addmul_assign(&mut ours_addmul, coeff, &src).expect("symbol addmul");
1486                let mut as_addmul = dst.clone();
1487                gf256_addmul_slice(&mut as_addmul, &src, Gf256(coeff));
1488                assert_eq!(
1489                    ours_addmul, as_addmul,
1490                    "bead_id={RAPTORQ_BEAD_ID} case=asupersync_parity_addmul len={symbol_len} coeff=0x{coeff:02X}"
1491                );
1492            }
1493        }
1494    }
1495
1496    #[test]
1497    fn test_encode_single_source_block() {
1498        let config = raptorq_config(512, 1.25);
1499        let symbol_size = usize::from(config.encoding.symbol_size);
1500        let k = 8_usize;
1501        let data = deterministic_payload(k * symbol_size, 0x0102_0304);
1502        let object_id = AsObjectId::new_for_test(1201);
1503        tracing::info!(
1504            bead_id = RAPTORQ_BEAD_ID,
1505            case = "test_encode_single_source_block",
1506            symbol_size,
1507            requested_k = k,
1508            "encoding source block"
1509        );
1510        let (symbols, source_symbols) = encode_symbols(config, object_id, &data);
1511        let (sources, repairs) = split_source_and_repair(&symbols, source_symbols);
1512
1513        assert_eq!(
1514            source_symbols, k,
1515            "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_source_count"
1516        );
1517        assert_eq!(
1518            sources.len(),
1519            source_symbols,
1520            "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_source_partition"
1521        );
1522        assert!(
1523            !repairs.is_empty(),
1524            "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_repair_present"
1525        );
1526        assert!(
1527            symbols.iter().all(|symbol| symbol.len() == symbol_size),
1528            "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_symbol_size_consistent"
1529        );
1530    }
1531
1532    #[test]
1533    fn test_decode_exact_k_symbols() {
1534        let symbol_size = 512_usize;
1535        let k = 16_usize;
1536        let seed = 0x0BAD_CAFE_u64;
1537        let source = low_level_source_block(k, symbol_size, seed);
1538        let encoder =
1539            SystematicEncoder::new(&source, symbol_size, seed).expect("systematic encoder");
1540        let decoder = InactivationDecoder::new(k, symbol_size, seed);
1541        let params = decoder.params();
1542        let base_rows = params.s + params.h;
1543        let constraints = ConstraintMatrix::build(params, seed);
1544        let mut received = decoder.constraint_symbols();
1545        let source_indexes = (0..k).collect::<Vec<_>>();
1546        append_source_received_symbols(
1547            &mut received,
1548            &constraints,
1549            base_rows,
1550            params.k_prime,
1551            symbol_size,
1552            &source,
1553            &source_indexes,
1554        );
1555
1556        tracing::warn!(
1557            bead_id = RAPTORQ_BEAD_ID,
1558            case = "test_decode_exact_k_symbols",
1559            source_symbols = k,
1560            "decoding with minimum symbol count (fragile recovery threshold)"
1561        );
1562        let decode_outcome = decoder
1563            .decode(&received)
1564            .expect("decode exact-k must succeed");
1565        assert_eq!(
1566            decode_outcome.source, source,
1567            "bead_id={RAPTORQ_BEAD_ID} case=decode_exact_k_symbols_roundtrip"
1568        );
1569        assert_eq!(
1570            decode_outcome.intermediate[0].len(),
1571            symbol_size,
1572            "bead_id={RAPTORQ_BEAD_ID} case=decode_exact_k_symbol_size"
1573        );
1574        assert_eq!(
1575            encoder.intermediate_symbol(0),
1576            decode_outcome.intermediate[0],
1577            "bead_id={RAPTORQ_BEAD_ID} case=decode_exact_k_intermediate_consistency"
1578        );
1579    }
1580
1581    #[test]
1582    fn test_decode_with_repair_symbols() {
1583        let symbol_size = 512_usize;
1584        let k = 16_usize;
1585        let seed = 0xABC0_FED1_u64;
1586        let source = low_level_source_block(k, symbol_size, seed);
1587        let encoder =
1588            SystematicEncoder::new(&source, symbol_size, seed).expect("systematic encoder");
1589        let decoder = InactivationDecoder::new(k, symbol_size, seed);
1590        let params = decoder.params();
1591        let base_rows = params.s + params.h;
1592        let constraints = ConstraintMatrix::build(params, seed);
1593
1594        let mut received = decoder.constraint_symbols();
1595        let source_indexes = (1..k).collect::<Vec<_>>();
1596        append_source_received_symbols(
1597            &mut received,
1598            &constraints,
1599            base_rows,
1600            params.k_prime,
1601            symbol_size,
1602            &source,
1603            &source_indexes,
1604        );
1605
1606        let repair_esi = u32::try_from(k).expect("k fits u32");
1607        let (columns, coefficients) = decoder.repair_equation(repair_esi);
1608        let repair_data = encoder.repair_symbol(repair_esi);
1609        received.push(ReceivedSymbol::repair(
1610            repair_esi,
1611            columns,
1612            coefficients,
1613            repair_data,
1614        ));
1615
1616        let decode_outcome = decoder
1617            .decode(&received)
1618            .expect("decode with one repair must succeed");
1619        assert_eq!(
1620            decode_outcome.source, source,
1621            "bead_id={RAPTORQ_BEAD_ID} case=decode_with_repair_roundtrip"
1622        );
1623    }
1624
1625    #[test]
1626    fn test_decode_insufficient_symbols() {
1627        let symbol_size = 512_usize;
1628        let k = 8_usize;
1629        let seed = 0xDEAD_BEEF_u64;
1630        let source = low_level_source_block(k, symbol_size, seed);
1631        let decoder = InactivationDecoder::new(k, symbol_size, seed);
1632        let params = decoder.params();
1633        let base_rows = params.s + params.h;
1634        let constraints = ConstraintMatrix::build(params, seed);
1635        let mut received = decoder.constraint_symbols();
1636        let source_indexes = (0..k.saturating_sub(1)).collect::<Vec<_>>();
1637        append_source_received_symbols(
1638            &mut received,
1639            &constraints,
1640            base_rows,
1641            params.k_prime,
1642            symbol_size,
1643            &source,
1644            &source_indexes,
1645        );
1646
1647        let decode = decoder.decode(&received);
1648        assert!(
1649            decode.is_err(),
1650            "bead_id={RAPTORQ_BEAD_ID} case=decode_insufficient_symbols unexpectedly succeeded"
1651        );
1652        if let Err(err) = decode {
1653            tracing::error!(
1654                bead_id = RAPTORQ_BEAD_ID,
1655                case = "test_decode_insufficient_symbols",
1656                error = ?err,
1657                "decode failed as expected due to insufficient symbols"
1658            );
1659        }
1660    }
1661
1662    #[test]
1663    fn test_symbol_size_alignment() {
1664        let config = raptorq_config(4096, 1.20);
1665        let symbol_size = usize::from(config.encoding.symbol_size);
1666        let data = deterministic_payload(symbol_size * 3, 0x600D_1111);
1667        let object_id = AsObjectId::new_for_test(1205);
1668        let (symbols, source_symbols) = encode_symbols(config, object_id, &data);
1669
1670        assert_eq!(
1671            source_symbols, 3,
1672            "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_source_count"
1673        );
1674        assert!(
1675            symbol_size.is_power_of_two(),
1676            "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_power_of_two"
1677        );
1678        assert_eq!(
1679            symbol_size % 512,
1680            0,
1681            "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_sector_multiple"
1682        );
1683        assert!(
1684            symbols.iter().all(|symbol| symbol.len() == symbol_size),
1685            "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_symbol_lengths"
1686        );
1687    }
1688
1689    #[test]
1690    fn prop_encode_decode_roundtrip() {
1691        for seed in [11_u64, 29_u64, 43_u64, 71_u64] {
1692            for k in [8_usize, 16_usize] {
1693                let config = raptorq_config(512, 1.30);
1694                let symbol_size = usize::from(config.encoding.symbol_size);
1695                let data = deterministic_payload(k * symbol_size - 17, seed);
1696                let object_id = AsObjectId::new_for_test(2000 + seed);
1697                let (symbols, source_symbols) = encode_symbols(config.clone(), object_id, &data);
1698                let (sources, _) = split_source_and_repair(&symbols, source_symbols);
1699                let subset = sources
1700                    .iter()
1701                    .take(source_symbols)
1702                    .cloned()
1703                    .collect::<Vec<_>>();
1704                let decoded = decode_symbols(config, object_id, data.len(), source_symbols, subset)
1705                    .expect("property roundtrip decode");
1706                assert_eq!(
1707                    decoded, data,
1708                    "bead_id={RAPTORQ_BEAD_ID} case=prop_encode_decode_roundtrip seed={seed} k={k}"
1709                );
1710            }
1711        }
1712    }
1713
1714    #[test]
1715    fn prop_any_k_of_n_suffices() {
1716        let symbol_size = 512_usize;
1717        let k = 16_usize;
1718        let sources = (0..k)
1719            .map(|symbol_idx| {
1720                deterministic_payload(
1721                    symbol_size,
1722                    0x5000_0000 + u64::try_from(symbol_idx).expect("index fits u64"),
1723                )
1724            })
1725            .collect::<Vec<_>>();
1726
1727        let mut parity = vec![0_u8; symbol_size];
1728        for source in &sources {
1729            symbol_add_assign(&mut parity, source).expect("parity construction");
1730        }
1731
1732        for omitted in 0..=k {
1733            let rebuilt = if omitted == k {
1734                sources.clone()
1735            } else {
1736                let mut recovered = parity.clone();
1737                for (index, source) in sources.iter().enumerate() {
1738                    if index != omitted {
1739                        symbol_add_assign(&mut recovered, source).expect("single-erasure recovery");
1740                    }
1741                }
1742                let mut rebuilt = sources.clone();
1743                rebuilt[omitted] = recovered;
1744                rebuilt
1745            };
1746
1747            assert_eq!(
1748                rebuilt.len(),
1749                k,
1750                "bead_id={RAPTORQ_BEAD_ID} case=prop_any_k_of_n_subset_size omitted_index={omitted}"
1751            );
1752            assert_eq!(
1753                rebuilt, sources,
1754                "bead_id={RAPTORQ_BEAD_ID} case=prop_any_k_of_n_suffices omitted_index={omitted}"
1755            );
1756        }
1757    }
1758
1759    #[test]
1760    fn prop_symbol_size_consistent() {
1761        for symbol_size in [512_u16, 1024_u16, 4096_u16] {
1762            for k in [4_usize, 8_usize] {
1763                let config = raptorq_config(symbol_size, 1.25);
1764                let size = usize::from(symbol_size);
1765                let object_id = AsObjectId::new_for_test(
1766                    u64::from(symbol_size) * 100 + u64::try_from(k).expect("k fits u64"),
1767                );
1768                let data = deterministic_payload(k * size - 3, u64::from(symbol_size));
1769                let (symbols, _) = encode_symbols(config, object_id, &data);
1770                assert!(
1771                    symbols.iter().all(|symbol| symbol.len() == size),
1772                    "bead_id={RAPTORQ_BEAD_ID} case=prop_symbol_size_consistent symbol_size={symbol_size} k={k}"
1773                );
1774            }
1775        }
1776    }
1777
1778    #[test]
1779    fn test_e2e_symbol_ops_in_encode_decode_roundtrip() {
1780        for (run, k) in [8_usize, 16_usize, 64_usize].iter().copied().enumerate() {
1781            let config = raptorq_config(512, 1.30);
1782            let symbol_size = usize::from(config.encoding.symbol_size);
1783            let data = deterministic_payload(
1784                k * symbol_size,
1785                0x4455_6677 + u64::try_from(run).expect("run fits u64"),
1786            );
1787            let object_id = AsObjectId::new_for_test(3000 + u64::try_from(k).expect("k fits u64"));
1788
1789            tracing::info!(
1790                bead_id = RAPTORQ_BEAD_ID,
1791                case = "test_e2e_symbol_ops_in_encode_decode_roundtrip",
1792                k,
1793                symbol_size,
1794                "starting encode/decode roundtrip"
1795            );
1796            let (symbols, source_symbols) = encode_symbols(config.clone(), object_id, &data);
1797            let (sources, _) = split_source_and_repair(&symbols, source_symbols);
1798            let subset = sources
1799                .iter()
1800                .take(source_symbols)
1801                .cloned()
1802                .collect::<Vec<_>>();
1803            let decoded = decode_symbols(config, object_id, data.len(), source_symbols, subset)
1804                .expect("e2e decode must succeed");
1805
1806            assert_eq!(
1807                decoded, data,
1808                "bead_id={RAPTORQ_BEAD_ID} case=e2e_roundtrip_bytes k={k}"
1809            );
1810
1811            let mut source_parity = vec![0_u8; symbol_size];
1812            for chunk in data.chunks_exact(symbol_size) {
1813                symbol_add_assign(&mut source_parity, chunk).expect("source parity xor");
1814            }
1815
1816            let mut decoded_parity = vec![0_u8; symbol_size];
1817            for chunk in decoded.chunks_exact(symbol_size) {
1818                symbol_add_assign(&mut decoded_parity, chunk).expect("decoded parity xor");
1819            }
1820
1821            assert_eq!(
1822                decoded_parity, source_parity,
1823                "bead_id={RAPTORQ_BEAD_ID} case=e2e_symbol_ops_parity k={k}"
1824            );
1825        }
1826    }
1827}