Skip to main content

fsqlite_core/
lib.rs

1//! Core bounded-parallelism primitives (§1.5, bd-22n.4).
2//!
3//! This module provides a small bulkhead framework for internal background work.
4//! It is intentionally non-blocking: overflow is rejected with `SQLITE_BUSY`
5//! (`FrankenError::Busy`) instead of queue-and-wait semantics.
6
7pub mod attach;
8pub mod commit_marker;
9#[cfg(not(target_arch = "wasm32"))]
10pub mod commit_repair;
11pub mod compat_persist;
12pub mod connection;
13#[cfg(not(target_arch = "wasm32"))]
14pub mod db_fec;
15pub mod decode_proofs;
16pub mod ecs_replication;
17pub mod epoch;
18pub mod explain;
19pub mod inter_object_coding;
20pub mod lrc;
21pub mod native_index;
22pub mod permeation_map;
23pub mod por;
24#[cfg(not(target_arch = "wasm32"))]
25pub mod raptorq_codec;
26pub mod raptorq_integration;
27pub mod region;
28pub mod remote_effects;
29#[cfg(not(target_arch = "wasm32"))]
30pub mod repair_engine;
31pub mod repair_symbols;
32pub mod replication_receiver;
33pub mod replication_sender;
34pub mod snapshot_shipping;
35pub mod source_block_partition;
36pub mod symbol_log;
37pub mod symbol_size_policy;
38pub mod tiered_storage;
39pub mod transaction;
40pub mod wal_adapter;
41#[cfg(not(target_arch = "wasm32"))]
42pub mod wal_fec_adapter;
43
44use std::num::NonZeroUsize;
45use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
46
47use fsqlite_error::{FrankenError, Result};
48use fsqlite_types::{
49    ObjectId, Oti, PayloadHash, Region, SymbolRecord, SymbolRecordFlags, gf256_mul_byte,
50};
51use tracing::{debug, error};
52
53const MAX_BALANCED_BG_CPU: usize = 16;
54
55/// Policy used when the bulkhead admission budget is exhausted.
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum OverflowPolicy {
58    /// Reject overflow immediately with `SQLITE_BUSY`.
59    DropBusy,
60}
61
62/// Runtime profile for conservative parallelism defaults.
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum ParallelismProfile {
65    /// Conservative profile used by default.
66    Balanced,
67}
68
69/// Bounded parallelism configuration for a work class.
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
71pub struct BulkheadConfig {
72    /// Number of tasks allowed to execute concurrently.
73    pub max_concurrent: usize,
74    /// Additional bounded admission slots (not unbounded queueing).
75    pub queue_depth: usize,
76    /// Overflow behavior when capacity is exhausted.
77    pub overflow_policy: OverflowPolicy,
78}
79
80impl BulkheadConfig {
81    /// Create an explicit configuration.
82    ///
83    /// Returns `None` when `max_concurrent` is zero.
84    #[must_use]
85    pub const fn new(
86        max_concurrent: usize,
87        queue_depth: usize,
88        overflow_policy: OverflowPolicy,
89    ) -> Option<Self> {
90        if max_concurrent == 0 {
91            None
92        } else {
93            Some(Self {
94                max_concurrent,
95                queue_depth,
96                overflow_policy,
97            })
98        }
99    }
100
101    /// Conservative default derived from available CPU parallelism.
102    ///
103    /// Uses the "balanced profile" formula from bd-22n.4:
104    /// `clamp(P / 8, 1, 16)` where `P = available_parallelism`.
105    #[must_use]
106    pub fn for_profile(profile: ParallelismProfile) -> Self {
107        let p = available_parallelism_or_one();
108        match profile {
109            ParallelismProfile::Balanced => Self {
110                max_concurrent: conservative_bg_cpu_max(p),
111                queue_depth: 0,
112                overflow_policy: OverflowPolicy::DropBusy,
113            },
114        }
115    }
116
117    /// Maximum admitted work units at once.
118    #[must_use]
119    pub const fn admission_limit(self) -> usize {
120        self.max_concurrent.saturating_add(self.queue_depth)
121    }
122}
123
124impl Default for BulkheadConfig {
125    fn default() -> Self {
126        Self::for_profile(ParallelismProfile::Balanced)
127    }
128}
129
130/// Compute conservative default background CPU parallelism from `P`.
131#[must_use]
132pub const fn conservative_bg_cpu_max(p: usize) -> usize {
133    let base = p / 8;
134    if base == 0 {
135        1
136    } else if base > MAX_BALANCED_BG_CPU {
137        MAX_BALANCED_BG_CPU
138    } else {
139        base
140    }
141}
142
143/// Return `std::thread::available_parallelism()` with a safe floor of 1.
144#[must_use]
145pub fn available_parallelism_or_one() -> usize {
146    std::thread::available_parallelism().map_or(1, NonZeroUsize::get)
147}
148
149/// Chunking plan for SIMD-friendly wide-word loops.
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
151pub struct WideChunkLayout {
152    /// Number of `u128` chunks processed.
153    pub u128_chunks: usize,
154    /// Number of `u64` chunks processed after `u128` chunks.
155    pub u64_chunks: usize,
156    /// Remaining tail bytes processed scalar.
157    pub tail_bytes: usize,
158}
159
160impl WideChunkLayout {
161    /// Compute the wide-chunk layout for a byte length.
162    #[must_use]
163    pub const fn for_len(len: usize) -> Self {
164        let u128_chunks = len / 16;
165        let rem_after_u128 = len % 16;
166        let u64_chunks = rem_after_u128 / 8;
167        let tail_bytes = rem_after_u128 % 8;
168        Self {
169            u128_chunks,
170            u64_chunks,
171            tail_bytes,
172        }
173    }
174}
175
176/// XOR patch application using `u128` + `u64` + tail loops.
177///
178/// This is the SIMD-friendly primitive for hot patch paths. LLVM can
179/// auto-vectorize the wide integer loops.
180#[allow(clippy::incompatible_msrv)]
181pub fn xor_patch_wide_chunks(dst: &mut [u8], patch: &[u8]) -> Result<WideChunkLayout> {
182    if dst.len() != patch.len() {
183        return Err(FrankenError::TypeMismatch {
184            expected: format!("equal lengths (dst == patch), got {}", dst.len()),
185            actual: patch.len().to_string(),
186        });
187    }
188
189    let layout = WideChunkLayout::for_len(dst.len());
190
191    let (dst_128, dst_rem) = dst.as_chunks_mut::<16>();
192    let (patch_128, patch_rem) = patch.as_chunks::<16>();
193    for (d, p) in dst_128.iter_mut().zip(patch_128.iter()) {
194        let d_word = u128::from_ne_bytes(*d);
195        let p_word = u128::from_ne_bytes(*p);
196        *d = (d_word ^ p_word).to_ne_bytes();
197    }
198
199    let (dst_64, dst_tail) = dst_rem.as_chunks_mut::<8>();
200    let (patch_64, patch_tail) = patch_rem.as_chunks::<8>();
201    for (d, p) in dst_64.iter_mut().zip(patch_64.iter()) {
202        let d_word = u64::from_ne_bytes(*d);
203        let p_word = u64::from_ne_bytes(*p);
204        *d = (d_word ^ p_word).to_ne_bytes();
205    }
206
207    for (d, p) in dst_tail.iter_mut().zip(patch_tail.iter()) {
208        *d ^= *p;
209    }
210
211    Ok(layout)
212}
213
214/// GF(256) addition (`+`) using wide XOR chunk loops.
215///
216/// In GF(256), addition is XOR, so this uses the same SIMD-friendly chunking
217/// strategy as [`xor_patch_wide_chunks`].
218pub fn gf256_add_assign_chunked(dst: &mut [u8], src: &[u8]) -> Result<WideChunkLayout> {
219    xor_patch_wide_chunks(dst, src)
220}
221
222/// RaptorQ symbol add (`dst ^= src`) using chunked XOR.
223///
224/// This is the core symbol-add primitive from §3.2.2.
225pub fn symbol_add_assign(dst: &mut [u8], src: &[u8]) -> Result<WideChunkLayout> {
226    debug!(
227        bead_id = "bd-1hi.2",
228        op = "symbol_add_assign",
229        symbol_len = dst.len(),
230        "applying in-place XOR over symbol bytes"
231    );
232    gf256_add_assign_chunked(dst, src)
233}
234
235/// RaptorQ symbol scalar multiply (`out = c * src`) in GF(256).
236///
237/// Special cases:
238/// - `c == 0`: zero output
239/// - `c == 1`: copy input
240pub fn symbol_mul_into(coeff: u8, src: &[u8], out: &mut [u8]) -> Result<()> {
241    if src.len() != out.len() {
242        error!(
243            bead_id = "bd-1hi.2",
244            op = "symbol_mul_into",
245            coeff,
246            src_len = src.len(),
247            out_len = out.len(),
248            "symbol length mismatch"
249        );
250        return Err(FrankenError::TypeMismatch {
251            expected: format!("equal lengths (src == out), got {}", src.len()),
252            actual: out.len().to_string(),
253        });
254    }
255
256    debug!(
257        bead_id = "bd-1hi.2",
258        op = "symbol_mul_into",
259        coeff,
260        symbol_len = src.len(),
261        "applying GF(256) scalar multiplication"
262    );
263
264    match coeff {
265        0 => {
266            out.fill(0);
267            Ok(())
268        }
269        1 => {
270            out.copy_from_slice(src);
271            Ok(())
272        }
273        _ => {
274            let mut out_chunks = out.chunks_exact_mut(16);
275            let mut src_chunks = src.chunks_exact(16);
276
277            for (dst_chunk, src_chunk) in out_chunks.by_ref().zip(src_chunks.by_ref()) {
278                for (dst_byte, src_byte) in dst_chunk.iter_mut().zip(src_chunk.iter()) {
279                    *dst_byte = gf256_mul_byte(coeff, *src_byte);
280                }
281            }
282            for (dst_byte, src_byte) in out_chunks
283                .into_remainder()
284                .iter_mut()
285                .zip(src_chunks.remainder().iter())
286            {
287                *dst_byte = gf256_mul_byte(coeff, *src_byte);
288            }
289            Ok(())
290        }
291    }
292}
293
294/// RaptorQ fused multiply-add (`dst ^= c * src`) in GF(256).
295///
296/// Special cases:
297/// - `c == 0`: no-op
298/// - `c == 1`: pure XOR path
299pub fn symbol_addmul_assign(dst: &mut [u8], coeff: u8, src: &[u8]) -> Result<WideChunkLayout> {
300    if dst.len() != src.len() {
301        error!(
302            bead_id = "bd-1hi.2",
303            op = "symbol_addmul_assign",
304            coeff,
305            dst_len = dst.len(),
306            src_len = src.len(),
307            "symbol length mismatch"
308        );
309        return Err(FrankenError::TypeMismatch {
310            expected: format!("equal lengths (dst == src), got {}", dst.len()),
311            actual: src.len().to_string(),
312        });
313    }
314
315    debug!(
316        bead_id = "bd-1hi.2",
317        op = "symbol_addmul_assign",
318        coeff,
319        symbol_len = dst.len(),
320        "applying fused multiply-and-add over symbol bytes"
321    );
322
323    match coeff {
324        0 => Ok(WideChunkLayout::for_len(dst.len())),
325        1 => symbol_add_assign(dst, src),
326        _ => {
327            let mut dst_chunks = dst.chunks_exact_mut(16);
328            let mut src_chunks = src.chunks_exact(16);
329
330            for (dst_chunk, src_chunk) in dst_chunks.by_ref().zip(src_chunks.by_ref()) {
331                for (dst_byte, src_byte) in dst_chunk.iter_mut().zip(src_chunk.iter()) {
332                    *dst_byte ^= gf256_mul_byte(coeff, *src_byte);
333                }
334            }
335            for (dst_byte, src_byte) in dst_chunks
336                .into_remainder()
337                .iter_mut()
338                .zip(src_chunks.remainder().iter())
339            {
340                *dst_byte ^= gf256_mul_byte(coeff, *src_byte);
341            }
342            Ok(WideChunkLayout::for_len(dst.len()))
343        }
344    }
345}
346
347/// Compute xxhash3 + blake3 on a contiguous input buffer.
348///
349/// - `xxhash3` path comes from `SymbolRecord::new` (`frame_xxh3`).
350/// - `blake3` path comes from `PayloadHash::blake3`.
351pub fn simd_friendly_checksum_pair(buffer: &[u8]) -> Result<(u64, [u8; 32])> {
352    let symbol_size = u32::try_from(buffer.len()).map_err(|_| FrankenError::OutOfRange {
353        what: "symbol_size".to_owned(),
354        value: buffer.len().to_string(),
355    })?;
356
357    let symbol_record = SymbolRecord::new(
358        ObjectId::from_bytes([0_u8; ObjectId::LEN]),
359        Oti {
360            f: u64::from(symbol_size),
361            al: 1,
362            t: symbol_size,
363            z: 1,
364            n: 1,
365        },
366        0,
367        buffer.to_vec(),
368        SymbolRecordFlags::empty(),
369    );
370    let blake = PayloadHash::blake3(buffer);
371
372    Ok((symbol_record.frame_xxh3, *blake.as_bytes()))
373}
374
375/// Non-blocking bulkhead admission gate.
376#[derive(Debug)]
377pub struct Bulkhead {
378    config: BulkheadConfig,
379    in_flight: AtomicUsize,
380    peak_in_flight: AtomicUsize,
381    busy_rejections: AtomicUsize,
382}
383
384impl Bulkhead {
385    #[must_use]
386    pub fn new(config: BulkheadConfig) -> Self {
387        Self {
388            config,
389            in_flight: AtomicUsize::new(0),
390            peak_in_flight: AtomicUsize::new(0),
391            busy_rejections: AtomicUsize::new(0),
392        }
393    }
394
395    #[must_use]
396    pub const fn config(&self) -> BulkheadConfig {
397        self.config
398    }
399
400    #[must_use]
401    pub fn in_flight(&self) -> usize {
402        self.in_flight.load(Ordering::Acquire)
403    }
404
405    #[must_use]
406    pub fn peak_in_flight(&self) -> usize {
407        self.peak_in_flight.load(Ordering::Acquire)
408    }
409
410    #[must_use]
411    pub fn busy_rejections(&self) -> usize {
412        self.busy_rejections.load(Ordering::Acquire)
413    }
414
415    /// Try to admit one work item.
416    ///
417    /// Never blocks. If the admission budget is exhausted, this returns
418    /// `FrankenError::Busy`.
419    pub fn try_acquire(&self) -> Result<BulkheadPermit<'_>> {
420        let limit = self.config.admission_limit();
421        loop {
422            let current = self.in_flight.load(Ordering::Acquire);
423            if current >= limit {
424                self.busy_rejections.fetch_add(1, Ordering::AcqRel);
425                return Err(match self.config.overflow_policy {
426                    OverflowPolicy::DropBusy => FrankenError::Busy,
427                });
428            }
429
430            let next = current.saturating_add(1);
431            if self
432                .in_flight
433                .compare_exchange_weak(current, next, Ordering::AcqRel, Ordering::Acquire)
434                .is_ok()
435            {
436                self.peak_in_flight.fetch_max(next, Ordering::AcqRel);
437                return Ok(BulkheadPermit {
438                    bulkhead: self,
439                    released: false,
440                });
441            }
442        }
443    }
444
445    /// Run work within a bulkhead permit.
446    pub fn run<T>(&self, work: impl FnOnce() -> T) -> Result<T> {
447        let _permit = self.try_acquire()?;
448        Ok(work())
449    }
450}
451
452/// RAII permit for a single admitted work item.
453#[derive(Debug)]
454pub struct BulkheadPermit<'a> {
455    bulkhead: &'a Bulkhead,
456    released: bool,
457}
458
459impl BulkheadPermit<'_> {
460    /// Explicitly release the permit.
461    pub fn release(mut self) {
462        if !self.released {
463            self.bulkhead.in_flight.fetch_sub(1, Ordering::AcqRel);
464            self.released = true;
465        }
466    }
467}
468
469impl Drop for BulkheadPermit<'_> {
470    fn drop(&mut self) {
471        if !self.released {
472            self.bulkhead.in_flight.fetch_sub(1, Ordering::AcqRel);
473            self.released = true;
474        }
475    }
476}
477
478/// Region-owned wrapper used for structured-concurrency integration.
479#[derive(Debug)]
480pub struct RegionBulkhead {
481    region: Region,
482    bulkhead: Bulkhead,
483    closing: AtomicBool,
484}
485
486impl RegionBulkhead {
487    #[must_use]
488    pub fn new(region: Region, config: BulkheadConfig) -> Self {
489        Self {
490            region,
491            bulkhead: Bulkhead::new(config),
492            closing: AtomicBool::new(false),
493        }
494    }
495
496    #[must_use]
497    pub const fn region(&self) -> Region {
498        self.region
499    }
500
501    #[must_use]
502    pub fn bulkhead(&self) -> &Bulkhead {
503        &self.bulkhead
504    }
505
506    pub fn try_acquire(&self) -> Result<BulkheadPermit<'_>> {
507        if self.closing.load(Ordering::Acquire) {
508            return Err(FrankenError::Busy);
509        }
510        self.bulkhead.try_acquire()
511    }
512
513    /// Begin region close: no new admissions are allowed after this point.
514    pub fn begin_close(&self) {
515        self.closing.store(true, Ordering::Release);
516    }
517
518    /// Whether all region-owned work has quiesced.
519    #[must_use]
520    pub fn is_quiescent(&self) -> bool {
521        self.bulkhead.in_flight() == 0
522    }
523}
524
525#[cfg(test)]
526mod tests {
527    use std::collections::VecDeque;
528    use std::pin::Pin;
529    use std::sync::Arc;
530    use std::task::{Context, Poll};
531    use std::thread;
532    use std::time::{Duration, Instant};
533
534    use asupersync::raptorq::decoder::{InactivationDecoder, ReceivedSymbol};
535    use asupersync::raptorq::gf256::{Gf256, gf256_add_slice, gf256_addmul_slice, gf256_mul_slice};
536    use asupersync::raptorq::systematic::{ConstraintMatrix, SystematicEncoder};
537    use asupersync::raptorq::{RaptorQReceiverBuilder, RaptorQSenderBuilder};
538    use asupersync::security::AuthenticationTag;
539    use asupersync::security::authenticated::AuthenticatedSymbol;
540    use asupersync::transport::error::{SinkError, StreamError};
541    use asupersync::transport::sink::SymbolSink;
542    use asupersync::transport::stream::SymbolStream;
543    use asupersync::types::{ObjectId as AsObjectId, ObjectParams, Symbol};
544    use asupersync::{Cx, RaptorQConfig};
545    use fsqlite_btree::compare_key_bytes_contiguous;
546
547    use super::*;
548
549    const BEAD_ID: &str = "bd-22n.4";
550    const SIMD_BEAD_ID: &str = "bd-22n.6";
551    const RAPTORQ_BEAD_ID: &str = "bd-1hi.2";
552
553    #[derive(Debug)]
554    struct VecSink {
555        symbols: Vec<Symbol>,
556    }
557
558    impl VecSink {
559        fn new() -> Self {
560            Self {
561                symbols: Vec::new(),
562            }
563        }
564    }
565
566    impl SymbolSink for VecSink {
567        fn poll_send(
568            mut self: Pin<&mut Self>,
569            _cx: &mut Context<'_>,
570            symbol: AuthenticatedSymbol,
571        ) -> Poll<std::result::Result<(), SinkError>> {
572            self.symbols.push(symbol.into_symbol());
573            Poll::Ready(Ok(()))
574        }
575
576        fn poll_flush(
577            self: Pin<&mut Self>,
578            _cx: &mut Context<'_>,
579        ) -> Poll<std::result::Result<(), SinkError>> {
580            Poll::Ready(Ok(()))
581        }
582
583        fn poll_close(
584            self: Pin<&mut Self>,
585            _cx: &mut Context<'_>,
586        ) -> Poll<std::result::Result<(), SinkError>> {
587            Poll::Ready(Ok(()))
588        }
589
590        fn poll_ready(
591            self: Pin<&mut Self>,
592            _cx: &mut Context<'_>,
593        ) -> Poll<std::result::Result<(), SinkError>> {
594            Poll::Ready(Ok(()))
595        }
596    }
597
598    #[derive(Debug)]
599    struct VecStream {
600        q: VecDeque<AuthenticatedSymbol>,
601    }
602
603    impl VecStream {
604        fn new(symbols: Vec<Symbol>) -> Self {
605            let q = symbols
606                .into_iter()
607                .map(|symbol| AuthenticatedSymbol::new_verified(symbol, AuthenticationTag::zero()))
608                .collect();
609            Self { q }
610        }
611    }
612
613    impl SymbolStream for VecStream {
614        fn poll_next(
615            mut self: Pin<&mut Self>,
616            _cx: &mut Context<'_>,
617        ) -> Poll<Option<std::result::Result<AuthenticatedSymbol, StreamError>>> {
618            match self.q.pop_front() {
619                Some(symbol) => Poll::Ready(Some(Ok(symbol))),
620                None => Poll::Ready(None),
621            }
622        }
623
624        fn size_hint(&self) -> (usize, Option<usize>) {
625            (self.q.len(), Some(self.q.len()))
626        }
627
628        fn is_exhausted(&self) -> bool {
629            self.q.is_empty()
630        }
631    }
632
633    fn raptorq_config(symbol_size: u16, repair_overhead: f64) -> RaptorQConfig {
634        let mut config = RaptorQConfig::default();
635        config.encoding.symbol_size = symbol_size;
636        config.encoding.max_block_size = 64 * 1024;
637        config.encoding.repair_overhead = repair_overhead;
638        config
639    }
640
641    fn deterministic_payload(len: usize, seed: u64) -> Vec<u8> {
642        let mut state = seed ^ 0x9E37_79B9_7F4A_7C15;
643        let mut out = Vec::with_capacity(len);
644        for idx in 0..len {
645            state ^= state << 7;
646            state ^= state >> 9;
647            state = state.wrapping_mul(0xA24B_AED4_963E_E407);
648            let idx_byte = u8::try_from(idx % 251).expect("modulo fits in u8");
649            out.push(u8::try_from(state & 0xFF).expect("masked to u8") ^ idx_byte);
650        }
651        out
652    }
653
654    fn xor_patch_bytewise(dst: &mut [u8], patch: &[u8]) {
655        for (dst_byte, patch_byte) in dst.iter_mut().zip(patch.iter()) {
656            *dst_byte ^= *patch_byte;
657        }
658    }
659
660    fn gf256_mul_bytewise(coeff: u8, src: &[u8], out: &mut [u8]) {
661        for (dst_byte, src_byte) in out.iter_mut().zip(src.iter()) {
662            *dst_byte = gf256_mul_byte(coeff, *src_byte);
663        }
664    }
665
666    fn collect_rs_files(root: &std::path::Path, out: &mut Vec<std::path::PathBuf>) {
667        let entries = std::fs::read_dir(root).expect("read_dir should succeed");
668        for entry in entries {
669            let path = entry.expect("read_dir entry should be readable").path();
670            if path.is_dir() {
671                collect_rs_files(&path, out);
672            } else if path.extension().is_some_and(|ext| ext == "rs") {
673                out.push(path);
674            }
675        }
676    }
677
678    fn encode_symbols(
679        config: RaptorQConfig,
680        object_id: AsObjectId,
681        data: &[u8],
682    ) -> (Vec<Symbol>, usize) {
683        let cx = Cx::for_testing();
684        let mut sender = RaptorQSenderBuilder::new()
685            .config(config)
686            .transport(VecSink::new())
687            .build()
688            .expect("sender build");
689        let outcome = sender
690            .send_object(&cx, object_id, data)
691            .expect("send_object must succeed");
692        let symbols = std::mem::take(&mut sender.transport_mut().symbols);
693        tracing::debug!(
694            bead_id = RAPTORQ_BEAD_ID,
695            case = "encode_symbols",
696            source_symbols = outcome.source_symbols,
697            emitted_symbols = symbols.len(),
698            object_size = data.len(),
699            "encoded object into source+repair symbol stream"
700        );
701        (symbols, outcome.source_symbols)
702    }
703
704    #[allow(clippy::result_large_err)]
705    fn decode_symbols(
706        config: RaptorQConfig,
707        object_id: AsObjectId,
708        object_size: usize,
709        source_symbols: usize,
710        symbols: Vec<Symbol>,
711    ) -> std::result::Result<Vec<u8>, asupersync::Error> {
712        let cx = Cx::for_testing();
713        let params = ObjectParams::new(
714            object_id,
715            u64::try_from(object_size).expect("object size fits u64"),
716            config.encoding.symbol_size,
717            1,
718            u16::try_from(source_symbols).expect("source symbol count fits u16"),
719        );
720        let mut receiver = RaptorQReceiverBuilder::new()
721            .config(config)
722            .source(VecStream::new(symbols))
723            .build()
724            .expect("receiver build");
725
726        receiver
727            .receive_object(&cx, &params)
728            .map(|outcome| outcome.data)
729    }
730
731    fn split_source_and_repair(
732        symbols: &[Symbol],
733        source_symbols: usize,
734    ) -> (Vec<Symbol>, Vec<Symbol>) {
735        let source_symbols_u32 =
736            u32::try_from(source_symbols).expect("source symbol count fits u32");
737        let mut sources = Vec::new();
738        let mut repairs = Vec::new();
739        for symbol in symbols {
740            if symbol.esi() < source_symbols_u32 {
741                sources.push(symbol.clone());
742            } else {
743                repairs.push(symbol.clone());
744            }
745        }
746        (sources, repairs)
747    }
748
749    fn low_level_source_block(k: usize, symbol_size: usize, seed: u64) -> Vec<Vec<u8>> {
750        (0..k)
751            .map(|source_index| {
752                deterministic_payload(
753                    symbol_size,
754                    seed + u64::try_from(source_index).expect("source index fits u64"),
755                )
756            })
757            .collect()
758    }
759
760    fn append_source_received_symbols(
761        received: &mut Vec<ReceivedSymbol>,
762        constraints: &ConstraintMatrix,
763        base_rows: usize,
764        k_prime: usize,
765        symbol_size: usize,
766        source: &[Vec<u8>],
767        source_indexes: &[usize],
768    ) {
769        for &source_index in source_indexes {
770            let row = base_rows + source_index;
771            let mut columns = Vec::new();
772            let mut coefficients = Vec::new();
773            for col in 0..constraints.cols {
774                let coeff = constraints.get(row, col);
775                if !coeff.is_zero() {
776                    columns.push(col);
777                    coefficients.push(coeff);
778                }
779            }
780
781            received.push(ReceivedSymbol {
782                esi: u32::try_from(source_index).expect("source index fits u32"),
783                is_source: true,
784                columns,
785                coefficients,
786                data: source[source_index].clone(),
787            });
788        }
789
790        // RFC 6330 decode domain uses K' source-domain rows, not just K.
791        // The K'−K PI rows correspond to zero-padded source symbols.
792        for source_index in source.len()..k_prime {
793            let row = base_rows + source_index;
794            let mut columns = Vec::new();
795            let mut coefficients = Vec::new();
796            for col in 0..constraints.cols {
797                let coeff = constraints.get(row, col);
798                if !coeff.is_zero() {
799                    columns.push(col);
800                    coefficients.push(coeff);
801                }
802            }
803
804            received.push(ReceivedSymbol {
805                esi: u32::try_from(source_index).expect("source index fits u32"),
806                is_source: true,
807                columns,
808                coefficients,
809                data: vec![0_u8; symbol_size],
810            });
811        }
812    }
813
814    #[test]
815    fn test_parallelism_defaults_conservative() {
816        assert_eq!(
817            conservative_bg_cpu_max(16),
818            2,
819            "bead_id={BEAD_ID} case=balanced_profile_formula_p16"
820        );
821        assert_eq!(
822            conservative_bg_cpu_max(1),
823            1,
824            "bead_id={BEAD_ID} case=balanced_profile_min_floor"
825        );
826        assert_eq!(
827            conservative_bg_cpu_max(512),
828            16,
829            "bead_id={BEAD_ID} case=balanced_profile_max_cap"
830        );
831    }
832
833    #[test]
834    fn test_parallelism_bounded_by_available() {
835        let cfg = BulkheadConfig::default();
836        let p = available_parallelism_or_one();
837        assert!(
838            cfg.max_concurrent <= p,
839            "bead_id={BEAD_ID} case=default_exceeds_available_parallelism cfg={cfg:?} p={p}"
840        );
841
842        let bulkhead = Bulkhead::new(cfg);
843        let mut permits = Vec::new();
844        for _ in 0..cfg.admission_limit() {
845            permits.push(
846                bulkhead
847                    .try_acquire()
848                    .expect("admission under configured limit should succeed"),
849            );
850        }
851
852        let overflow = bulkhead.try_acquire();
853        assert!(
854            matches!(overflow, Err(FrankenError::Busy)),
855            "bead_id={BEAD_ID} case=bounded_admission_overflow_must_be_busy overflow={overflow:?}"
856        );
857
858        drop(permits);
859        assert_eq!(
860            bulkhead.in_flight(),
861            0,
862            "bead_id={BEAD_ID} case=permits_drop_to_zero"
863        );
864    }
865
866    #[test]
867    fn test_bulkhead_config_max_concurrent() {
868        let cfg = BulkheadConfig::new(3, 0, OverflowPolicy::DropBusy)
869            .expect("non-zero max_concurrent must be valid");
870        let bulkhead = Bulkhead::new(cfg);
871
872        let p1 = bulkhead.try_acquire().expect("slot 1");
873        let p2 = bulkhead.try_acquire().expect("slot 2");
874        let p3 = bulkhead.try_acquire().expect("slot 3");
875        let overflow = bulkhead.try_acquire();
876
877        assert!(
878            matches!(overflow, Err(FrankenError::Busy)),
879            "bead_id={BEAD_ID} case=max_concurrent_enforced overflow={overflow:?}"
880        );
881        drop((p1, p2, p3));
882    }
883
884    #[test]
885    fn test_overflow_policy_drop_with_busy() {
886        let cfg = BulkheadConfig::new(1, 0, OverflowPolicy::DropBusy)
887            .expect("non-zero max_concurrent must be valid");
888        let bulkhead = Bulkhead::new(cfg);
889        let _permit = bulkhead.try_acquire().expect("first permit must succeed");
890
891        let overflow = bulkhead.try_acquire();
892        assert!(
893            matches!(overflow, Err(FrankenError::Busy)),
894            "bead_id={BEAD_ID} case=overflow_policy_drop_busy overflow={overflow:?}"
895        );
896    }
897
898    #[test]
899    fn test_background_work_degrades_gracefully() {
900        let cfg = BulkheadConfig::new(2, 0, OverflowPolicy::DropBusy)
901            .expect("non-zero max_concurrent must be valid");
902        let bulkhead = Bulkhead::new(cfg);
903
904        let _a = bulkhead.try_acquire().expect("permit a");
905        let _b = bulkhead.try_acquire().expect("permit b");
906
907        for _ in 0..8 {
908            let result = bulkhead.try_acquire();
909            assert!(
910                matches!(result, Err(FrankenError::Busy)),
911                "bead_id={BEAD_ID} case=overflow_must_reject_not_wait result={result:?}"
912            );
913        }
914
915        assert_eq!(
916            bulkhead.busy_rejections(),
917            8,
918            "bead_id={BEAD_ID} case=busy_rejection_counter"
919        );
920    }
921
922    #[test]
923    fn test_region_integration() {
924        let cfg = BulkheadConfig::new(1, 0, OverflowPolicy::DropBusy)
925            .expect("non-zero max_concurrent must be valid");
926        let region_bulkhead = RegionBulkhead::new(Region::new(7), cfg);
927        assert_eq!(
928            region_bulkhead.region().get(),
929            7,
930            "bead_id={BEAD_ID} case=region_id_plumbed"
931        );
932
933        let permit = region_bulkhead.try_acquire().expect("first permit");
934        assert!(
935            !region_bulkhead.is_quiescent(),
936            "bead_id={BEAD_ID} case=region_non_quiescent_with_active_work"
937        );
938
939        region_bulkhead.begin_close();
940        let after_close = region_bulkhead.try_acquire();
941        assert!(
942            matches!(after_close, Err(FrankenError::Busy)),
943            "bead_id={BEAD_ID} case=region_close_blocks_new_work result={after_close:?}"
944        );
945
946        drop(permit);
947        assert!(
948            region_bulkhead.is_quiescent(),
949            "bead_id={BEAD_ID} case=region_quiescent_after_permit_drop"
950        );
951    }
952
953    #[test]
954    fn test_gf256_ops_chunked() {
955        let mut dst = vec![0xAA_u8; 40];
956        let src = vec![0x55_u8; 40];
957        let expected: Vec<u8> = dst.iter().zip(src.iter()).map(|(d, s)| *d ^ *s).collect();
958
959        let layout = gf256_add_assign_chunked(&mut dst, &src)
960            .expect("equal-length buffers should be accepted");
961        assert!(
962            layout.u128_chunks > 0 || layout.u64_chunks > 0,
963            "bead_id={SIMD_BEAD_ID} case=wide_chunks_expected layout={layout:?}"
964        );
965        assert_eq!(
966            dst, expected,
967            "bead_id={SIMD_BEAD_ID} case=gf256_addition_xor_equivalence"
968        );
969    }
970
971    #[test]
972    fn test_xor_patch_wide_chunks() {
973        let mut dst = vec![0xF0_u8; 37];
974        let patch = vec![0x0F_u8; 37];
975        let expected: Vec<u8> = dst.iter().zip(patch.iter()).map(|(d, p)| *d ^ *p).collect();
976
977        let layout =
978            xor_patch_wide_chunks(&mut dst, &patch).expect("equal-length buffers should be valid");
979        assert_eq!(
980            layout,
981            WideChunkLayout {
982                u128_chunks: 2,
983                u64_chunks: 0,
984                tail_bytes: 5,
985            },
986            "bead_id={SIMD_BEAD_ID} case=chunk_layout_expected"
987        );
988        assert_eq!(
989            dst, expected,
990            "bead_id={SIMD_BEAD_ID} case=xor_patch_matches_scalar_reference"
991        );
992    }
993
994    #[test]
995    fn test_xor_symbols_u64_chunks() {
996        // Length 24 exercises both the u128 and u64 lanes.
997        let mut dst = vec![0xAB_u8; 24];
998        let src = vec![0xCD_u8; 24];
999        let mut expected = vec![0xAB_u8; 24];
1000        xor_patch_bytewise(&mut expected, &src);
1001
1002        let layout = xor_patch_wide_chunks(&mut dst, &src).expect("equal-length buffers");
1003        assert_eq!(
1004            layout,
1005            WideChunkLayout {
1006                u128_chunks: 1,
1007                u64_chunks: 1,
1008                tail_bytes: 0,
1009            },
1010            "bead_id=bd-2ddc case=u64_chunk_lane_exercised"
1011        );
1012        assert_eq!(
1013            dst, expected,
1014            "bead_id=bd-2ddc case=chunked_xor_matches_bytewise_reference"
1015        );
1016    }
1017
1018    #[test]
1019    fn test_gf256_multiply_chunks() {
1020        let coeff = 0xA7_u8;
1021        let src = deterministic_payload(4096, 0xDDCC_BBAA_1122_3344);
1022        let mut chunked = vec![0_u8; src.len()];
1023        let mut scalar = vec![0_u8; src.len()];
1024
1025        symbol_mul_into(coeff, &src, &mut chunked).expect("chunked symbol_mul_into");
1026        gf256_mul_bytewise(coeff, &src, &mut scalar);
1027
1028        assert_eq!(
1029            chunked, scalar,
1030            "bead_id=bd-2ddc case=chunked_mul_matches_scalar_reference"
1031        );
1032    }
1033
1034    #[test]
1035    fn test_u128_chunk_alignment() {
1036        // Non-multiple of 16 exercises the u128 path + tail handling.
1037        let mut via_wide = deterministic_payload(4099, 0x1234_5678_9ABC_DEF0);
1038        let mut via_u64_only = via_wide.clone();
1039        let patch = deterministic_payload(4099, 0x0F0E_0D0C_0B0A_0908);
1040
1041        xor_patch_wide_chunks(&mut via_wide, &patch).expect("wide chunk xor");
1042
1043        let mut dst_u64_chunks = via_u64_only.chunks_exact_mut(8);
1044        let mut patch_u64_chunks = patch.chunks_exact(8);
1045        for (dst_chunk, patch_chunk) in dst_u64_chunks.by_ref().zip(patch_u64_chunks.by_ref()) {
1046            let dst_word = u64::from_ne_bytes(
1047                dst_chunk
1048                    .try_into()
1049                    .expect("chunks_exact(8) must yield 8-byte chunk"),
1050            );
1051            let patch_word = u64::from_ne_bytes(
1052                patch_chunk
1053                    .try_into()
1054                    .expect("chunks_exact(8) must yield 8-byte chunk"),
1055            );
1056            dst_chunk.copy_from_slice(&(dst_word ^ patch_word).to_ne_bytes());
1057        }
1058        for (dst_byte, patch_byte) in dst_u64_chunks
1059            .into_remainder()
1060            .iter_mut()
1061            .zip(patch_u64_chunks.remainder().iter())
1062        {
1063            *dst_byte ^= *patch_byte;
1064        }
1065
1066        assert_eq!(
1067            via_wide, via_u64_only,
1068            "bead_id=bd-2ddc case=u128_lane_matches_u64_plus_tail"
1069        );
1070    }
1071
1072    #[test]
1073    fn test_benchmark_chunk_vs_byte() {
1074        // Meaningful performance checks require optimized codegen.
1075        if cfg!(debug_assertions) {
1076            return;
1077        }
1078
1079        let iterations = 32_000_usize;
1080        let src = deterministic_payload(4096, 0xDEAD_BEEF_F00D_CAFE);
1081        let base = deterministic_payload(4096, 0x0123_4567_89AB_CDEF);
1082
1083        let mut chunked = base.clone();
1084        let chunked_start = Instant::now();
1085        for _ in 0..iterations {
1086            xor_patch_wide_chunks(&mut chunked, &src).expect("chunked xor");
1087            std::hint::black_box(&chunked);
1088        }
1089        let chunked_elapsed = chunked_start.elapsed();
1090
1091        let mut bytewise = base;
1092        let bytewise_start = Instant::now();
1093        for _ in 0..iterations {
1094            xor_patch_bytewise(&mut bytewise, &src);
1095            std::hint::black_box(&bytewise);
1096        }
1097        let bytewise_elapsed = bytewise_start.elapsed();
1098
1099        let speedup = bytewise_elapsed.as_secs_f64() / chunked_elapsed.as_secs_f64();
1100        assert!(
1101            speedup >= 4.0,
1102            "bead_id=bd-2ddc case=chunk_vs_byte_speedup speedup={speedup:.2}x \
1103             chunked_ns={} bytewise_ns={} iterations={iterations}",
1104            chunked_elapsed.as_nanos(),
1105            bytewise_elapsed.as_nanos()
1106        );
1107    }
1108
1109    #[test]
1110    fn test_no_unsafe_simd() {
1111        let manifest = include_str!("../../../Cargo.toml");
1112        assert!(
1113            manifest.contains(r#"unsafe_code = "forbid""#),
1114            "bead_id=bd-2ddc case=workspace_forbids_unsafe"
1115        );
1116
1117        let workspace_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
1118            .parent()
1119            .expect("crate dir has parent")
1120            .parent()
1121            .expect("workspace root exists")
1122            .to_path_buf();
1123        let crates_dir = workspace_root.join("crates");
1124
1125        let mut rs_files = Vec::new();
1126        collect_rs_files(&crates_dir, &mut rs_files);
1127
1128        let simd_needles = [
1129            "_mm_",
1130            "std::arch::",
1131            "core::arch::",
1132            "__m128",
1133            "__m256",
1134            "__m512",
1135            "simd_shuffle",
1136            "vpxor",
1137            "vxorq",
1138        ];
1139
1140        let mut offenders = Vec::new();
1141        for file in rs_files {
1142            let Ok(content) = std::fs::read_to_string(&file) else {
1143                continue;
1144            };
1145
1146            let lines = content.lines().collect::<Vec<_>>();
1147            for (idx, line) in lines.iter().enumerate() {
1148                let has_intrinsic = simd_needles.iter().any(|needle| line.contains(needle));
1149                if !has_intrinsic {
1150                    continue;
1151                }
1152
1153                let window_start = idx.saturating_sub(3);
1154                let window_end = (idx + 3).min(lines.len().saturating_sub(1));
1155                let mut found_unsafe_nearby = false;
1156                for nearby in &lines[window_start..=window_end] {
1157                    let trimmed = nearby.trim_start();
1158                    let is_comment = trimmed.starts_with("//");
1159                    if !is_comment && trimmed.contains("unsafe") {
1160                        found_unsafe_nearby = true;
1161                        break;
1162                    }
1163                }
1164
1165                if found_unsafe_nearby {
1166                    offenders.push(format!("{}:{}", file.display(), idx + 1));
1167                }
1168            }
1169        }
1170
1171        assert!(
1172            offenders.is_empty(),
1173            "bead_id=bd-2ddc case=no_unsafe_simd_intrinsics offenders={offenders:?}"
1174        );
1175    }
1176
1177    #[test]
1178    fn test_checksum_simd_friendly() {
1179        let buffer = vec![0x11_u8; 256];
1180        let (xx_a, blake_a) =
1181            simd_friendly_checksum_pair(&buffer).expect("checksum pair must succeed");
1182
1183        let mut modified = buffer;
1184        modified[255] ^= 0x01;
1185        let (xx_b, blake_b) =
1186            simd_friendly_checksum_pair(&modified).expect("checksum pair must succeed");
1187
1188        assert_ne!(
1189            xx_a, xx_b,
1190            "bead_id={SIMD_BEAD_ID} case=xxhash3_changes_on_byte_flip"
1191        );
1192        assert_ne!(
1193            blake_a, blake_b,
1194            "bead_id={SIMD_BEAD_ID} case=blake3_changes_on_byte_flip"
1195        );
1196    }
1197
1198    #[test]
1199    fn test_e2e_bounded_parallelism_under_background_load() {
1200        let cfg = BulkheadConfig::new(4, 0, OverflowPolicy::DropBusy)
1201            .expect("non-zero max_concurrent must be valid");
1202        let bulkhead = Arc::new(Bulkhead::new(cfg));
1203
1204        let handles: Vec<_> = (0..48)
1205            .map(|_| {
1206                let bulkhead = Arc::clone(&bulkhead);
1207                thread::spawn(move || {
1208                    bulkhead.run(|| {
1209                        thread::sleep(Duration::from_millis(10));
1210                    })
1211                })
1212            })
1213            .collect();
1214
1215        let mut busy = 0_usize;
1216        for handle in handles {
1217            match handle.join().expect("worker thread should not panic") {
1218                Ok(()) => {}
1219                Err(FrankenError::Busy) => busy = busy.saturating_add(1),
1220                Err(err) => {
1221                    assert_eq!(
1222                        err.error_code(),
1223                        fsqlite_error::ErrorCode::Busy,
1224                        "bead_id={BEAD_ID} case=e2e_unexpected_bulkhead_error err={err}"
1225                    );
1226                    busy = busy.saturating_add(1);
1227                }
1228            }
1229        }
1230
1231        assert!(
1232            busy > 0,
1233            "bead_id={BEAD_ID} case=e2e_should_observe_overflow_rejections"
1234        );
1235        assert!(
1236            bulkhead.peak_in_flight() <= cfg.admission_limit(),
1237            "bead_id={BEAD_ID} case=e2e_peak_parallelism_exceeded peak={} limit={}",
1238            bulkhead.peak_in_flight(),
1239            cfg.admission_limit()
1240        );
1241    }
1242
1243    #[test]
1244    fn test_e2e_simd_hot_path_correctness() {
1245        // 1) B-tree hot comparison over contiguous slices.
1246        let contiguous = b"key-0001key-0002".to_vec();
1247        let left = &contiguous[0..8];
1248        let right = &contiguous[8..16];
1249        let compare_start = Instant::now();
1250        assert_eq!(
1251            compare_key_bytes_contiguous(left, right),
1252            left.cmp(right),
1253            "bead_id={SIMD_BEAD_ID} case=btree_contiguous_compare_correct"
1254        );
1255        let compare_elapsed = compare_start.elapsed();
1256
1257        // 2) GF(256) add (XOR) and XOR patch helpers.
1258        let mut symbol_a = (0_u8..64).collect::<Vec<u8>>();
1259        let symbol_b = (64_u8..128).collect::<Vec<u8>>();
1260        let expected_add: Vec<u8> = symbol_a
1261            .iter()
1262            .zip(symbol_b.iter())
1263            .map(|(a, b)| *a ^ *b)
1264            .collect();
1265        let gf256_start = Instant::now();
1266        gf256_add_assign_chunked(&mut symbol_a, &symbol_b).expect("gf256 add should succeed");
1267        let gf256_elapsed = gf256_start.elapsed();
1268        assert_eq!(
1269            symbol_a, expected_add,
1270            "bead_id={SIMD_BEAD_ID} case=gf256_chunked_add_correct"
1271        );
1272
1273        let mut patch_target = vec![0x33_u8; 64];
1274        let patch = vec![0xCC_u8; 64];
1275        let xor_start = Instant::now();
1276        xor_patch_wide_chunks(&mut patch_target, &patch).expect("xor patch should succeed");
1277        let xor_elapsed = xor_start.elapsed();
1278        assert!(
1279            patch_target.iter().all(|&byte| byte == (0x33_u8 ^ 0xCC_u8)),
1280            "bead_id={SIMD_BEAD_ID} case=xor_patch_chunked_correct"
1281        );
1282
1283        // 3) SIMD-friendly checksum feed.
1284        let checksum_start = Instant::now();
1285        let (xx, blake) =
1286            simd_friendly_checksum_pair(&patch_target).expect("checksum pair should succeed");
1287        let checksum_elapsed = checksum_start.elapsed();
1288        assert_ne!(
1289            xx, 0,
1290            "bead_id={SIMD_BEAD_ID} case=xxhash3_nonzero_for_nonempty_payload"
1291        );
1292        assert!(
1293            blake.iter().any(|&b| b != 0),
1294            "bead_id={SIMD_BEAD_ID} case=blake3_digest_nonzero"
1295        );
1296
1297        eprintln!(
1298            "bead_id={SIMD_BEAD_ID} metric=simd_hot_path_ns compare={} gf256_add={} xor_patch={} checksum={}",
1299            compare_elapsed.as_nanos(),
1300            gf256_elapsed.as_nanos(),
1301            xor_elapsed.as_nanos(),
1302            checksum_elapsed.as_nanos()
1303        );
1304    }
1305
1306    #[test]
1307    fn test_symbol_add_self_inverse() {
1308        let src = (0_u16..512)
1309            .map(|idx| u8::try_from(idx % 251).expect("modulo fits in u8"))
1310            .collect::<Vec<_>>();
1311        let mut dst = src.clone();
1312        symbol_add_assign(&mut dst, &src).expect("symbol_add should succeed");
1313        assert!(
1314            dst.iter().all(|byte| *byte == 0),
1315            "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_self_inverse"
1316        );
1317    }
1318
1319    #[test]
1320    fn test_symbol_add_commutative_and_associative() {
1321        let a = (0_u16..128)
1322            .map(|idx| u8::try_from((idx * 3) % 251).expect("modulo fits"))
1323            .collect::<Vec<_>>();
1324        let b = (0_u16..128)
1325            .map(|idx| u8::try_from((idx * 5 + 7) % 251).expect("modulo fits"))
1326            .collect::<Vec<_>>();
1327        let c = (0_u16..128)
1328            .map(|idx| u8::try_from((idx * 11 + 13) % 251).expect("modulo fits"))
1329            .collect::<Vec<_>>();
1330
1331        let mut ab = a.clone();
1332        symbol_add_assign(&mut ab, &b).expect("a+b");
1333        let mut ba = b.clone();
1334        symbol_add_assign(&mut ba, &a).expect("b+a");
1335        assert_eq!(
1336            ab, ba,
1337            "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_commutative"
1338        );
1339
1340        let mut lhs = a.clone();
1341        symbol_add_assign(&mut lhs, &b).expect("(a+b)");
1342        symbol_add_assign(&mut lhs, &c).expect("(a+b)+c");
1343
1344        let mut rhs = b;
1345        symbol_add_assign(&mut rhs, &c).expect("(b+c)");
1346        let mut rhs2 = a;
1347        symbol_add_assign(&mut rhs2, &rhs).expect("a+(b+c)");
1348
1349        assert_eq!(
1350            lhs, rhs2,
1351            "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_associative"
1352        );
1353    }
1354
1355    #[test]
1356    fn test_symbol_mul_special_cases() {
1357        let src = (0_u16..256)
1358            .map(|idx| u8::try_from(idx).expect("idx fits"))
1359            .collect::<Vec<_>>();
1360
1361        let mut out_zero = vec![0_u8; src.len()];
1362        symbol_mul_into(0, &src, &mut out_zero).expect("mul by zero");
1363        assert!(
1364            out_zero.iter().all(|byte| *byte == 0),
1365            "bead_id={RAPTORQ_BEAD_ID} case=symbol_mul_zero"
1366        );
1367
1368        let mut out_one = vec![0_u8; src.len()];
1369        symbol_mul_into(1, &src, &mut out_one).expect("mul by one");
1370        assert_eq!(
1371            out_one, src,
1372            "bead_id={RAPTORQ_BEAD_ID} case=symbol_mul_identity"
1373        );
1374    }
1375
1376    #[test]
1377    fn test_symbol_mul_matches_scalar_reference() {
1378        let src = (0_u16..512)
1379            .map(|idx| u8::try_from((idx * 7 + 17) % 251).expect("modulo fits"))
1380            .collect::<Vec<_>>();
1381        let coeff = 0xA7_u8;
1382
1383        let mut out = vec![0_u8; src.len()];
1384        symbol_mul_into(coeff, &src, &mut out).expect("symbol mul");
1385        for (actual, input) in out.iter().zip(src.iter()) {
1386            let expected = gf256_mul_byte(coeff, *input);
1387            assert_eq!(
1388                *actual, expected,
1389                "bead_id={RAPTORQ_BEAD_ID} case=symbol_mul_scalar_match"
1390            );
1391        }
1392    }
1393
1394    #[test]
1395    fn test_symbol_addmul_special_cases_and_equivalence() {
1396        let src = (0_u16..512)
1397            .map(|idx| u8::try_from((idx * 13 + 19) % 251).expect("modulo fits"))
1398            .collect::<Vec<_>>();
1399        let original = (0_u16..512)
1400            .map(|idx| u8::try_from((idx * 9 + 3) % 251).expect("modulo fits"))
1401            .collect::<Vec<_>>();
1402
1403        let mut no_op = original.clone();
1404        symbol_addmul_assign(&mut no_op, 0, &src).expect("c=0");
1405        assert_eq!(
1406            no_op, original,
1407            "bead_id={RAPTORQ_BEAD_ID} case=symbol_addmul_c0_noop"
1408        );
1409
1410        let mut xor_path = original.clone();
1411        symbol_addmul_assign(&mut xor_path, 1, &src).expect("c=1");
1412        let mut expected_xor = original.clone();
1413        symbol_add_assign(&mut expected_xor, &src).expect("xor reference");
1414        assert_eq!(
1415            xor_path, expected_xor,
1416            "bead_id={RAPTORQ_BEAD_ID} case=symbol_addmul_c1_equals_xor"
1417        );
1418
1419        let coeff = 0x53_u8;
1420        let mut fused = original.clone();
1421        symbol_addmul_assign(&mut fused, coeff, &src).expect("fused");
1422        let mut mul = vec![0_u8; src.len()];
1423        symbol_mul_into(coeff, &src, &mut mul).expect("mul");
1424        let mut separate = original;
1425        symbol_add_assign(&mut separate, &mul).expect("add");
1426        assert_eq!(
1427            fused, separate,
1428            "bead_id={RAPTORQ_BEAD_ID} case=symbol_addmul_fused_equals_mul_plus_add"
1429        );
1430    }
1431
1432    #[test]
1433    fn test_symbol_operations_4096_and_512() {
1434        for symbol_len in [4096_usize, 1024_usize, 512_usize] {
1435            let a = vec![0xAA_u8; symbol_len];
1436            let b = vec![0x55_u8; symbol_len];
1437            let mut sum = a.clone();
1438            let layout = symbol_add_assign(&mut sum, &b).expect("symbol add");
1439            assert_eq!(
1440                layout,
1441                WideChunkLayout::for_len(symbol_len),
1442                "bead_id={RAPTORQ_BEAD_ID} case=symbol_len_layout_consistency len={symbol_len}"
1443            );
1444            assert!(
1445                sum.iter().all(|byte| *byte == (0xAA_u8 ^ 0x55_u8)),
1446                "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_expected_xor len={symbol_len}"
1447            );
1448        }
1449    }
1450
1451    #[test]
1452    fn test_gf256_arithmetic_matches_asupersync() {
1453        for a in 0_u8..=u8::MAX {
1454            for b in 0_u8..=u8::MAX {
1455                assert_eq!(
1456                    gf256_mul_byte(a, b),
1457                    (Gf256(a) * Gf256(b)).raw(),
1458                    "bead_id={RAPTORQ_BEAD_ID} case=gf256_mul_parity a=0x{a:02X} b=0x{b:02X}"
1459                );
1460            }
1461        }
1462    }
1463
1464    #[test]
1465    fn test_symbol_ops_match_asupersync_gf256_slices() {
1466        for symbol_len in [512_usize, 1024_usize, 4096_usize] {
1467            let src = deterministic_payload(symbol_len, 0xA5A5_0101);
1468            let dst = deterministic_payload(symbol_len, 0x5A5A_0202);
1469
1470            let mut ours_add = dst.clone();
1471            symbol_add_assign(&mut ours_add, &src).expect("symbol add");
1472            let mut as_add = dst.clone();
1473            gf256_add_slice(&mut as_add, &src);
1474            assert_eq!(
1475                ours_add, as_add,
1476                "bead_id={RAPTORQ_BEAD_ID} case=asupersync_parity_add len={symbol_len}"
1477            );
1478
1479            for coeff in [0_u8, 1_u8, 0x53_u8, 0xA7_u8] {
1480                let mut ours_mul = vec![0_u8; symbol_len];
1481                symbol_mul_into(coeff, &src, &mut ours_mul).expect("symbol mul");
1482                let mut as_mul = src.clone();
1483                gf256_mul_slice(&mut as_mul, Gf256(coeff));
1484                assert_eq!(
1485                    ours_mul, as_mul,
1486                    "bead_id={RAPTORQ_BEAD_ID} case=asupersync_parity_mul len={symbol_len} coeff=0x{coeff:02X}"
1487                );
1488
1489                let mut ours_addmul = dst.clone();
1490                symbol_addmul_assign(&mut ours_addmul, coeff, &src).expect("symbol addmul");
1491                let mut as_addmul = dst.clone();
1492                gf256_addmul_slice(&mut as_addmul, &src, Gf256(coeff));
1493                assert_eq!(
1494                    ours_addmul, as_addmul,
1495                    "bead_id={RAPTORQ_BEAD_ID} case=asupersync_parity_addmul len={symbol_len} coeff=0x{coeff:02X}"
1496                );
1497            }
1498        }
1499    }
1500
1501    #[test]
1502    fn test_encode_single_source_block() {
1503        let config = raptorq_config(512, 1.25);
1504        let symbol_size = usize::from(config.encoding.symbol_size);
1505        let k = 8_usize;
1506        let data = deterministic_payload(k * symbol_size, 0x0102_0304);
1507        let object_id = AsObjectId::new_for_test(1201);
1508        tracing::info!(
1509            bead_id = RAPTORQ_BEAD_ID,
1510            case = "test_encode_single_source_block",
1511            symbol_size,
1512            requested_k = k,
1513            "encoding source block"
1514        );
1515        let (symbols, source_symbols) = encode_symbols(config, object_id, &data);
1516        let (sources, repairs) = split_source_and_repair(&symbols, source_symbols);
1517
1518        assert_eq!(
1519            source_symbols, k,
1520            "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_source_count"
1521        );
1522        assert_eq!(
1523            sources.len(),
1524            source_symbols,
1525            "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_source_partition"
1526        );
1527        assert!(
1528            !repairs.is_empty(),
1529            "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_repair_present"
1530        );
1531        assert!(
1532            symbols.iter().all(|symbol| symbol.len() == symbol_size),
1533            "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_symbol_size_consistent"
1534        );
1535    }
1536
1537    #[test]
1538    fn test_decode_exact_k_symbols() {
1539        let symbol_size = 512_usize;
1540        let k = 16_usize;
1541        let seed = 0x0BAD_CAFE_u64;
1542        let source = low_level_source_block(k, symbol_size, seed);
1543        let encoder =
1544            SystematicEncoder::new(&source, symbol_size, seed).expect("systematic encoder");
1545        let decoder = InactivationDecoder::new(k, symbol_size, seed);
1546        let params = decoder.params();
1547        let base_rows = params.s + params.h;
1548        let constraints = ConstraintMatrix::build(params, seed);
1549        let mut received = decoder.constraint_symbols();
1550        let source_indexes = (0..k).collect::<Vec<_>>();
1551        append_source_received_symbols(
1552            &mut received,
1553            &constraints,
1554            base_rows,
1555            params.k_prime,
1556            symbol_size,
1557            &source,
1558            &source_indexes,
1559        );
1560
1561        tracing::warn!(
1562            bead_id = RAPTORQ_BEAD_ID,
1563            case = "test_decode_exact_k_symbols",
1564            source_symbols = k,
1565            "decoding with minimum symbol count (fragile recovery threshold)"
1566        );
1567        let decode_outcome = decoder
1568            .decode(&received)
1569            .expect("decode exact-k must succeed");
1570        assert_eq!(
1571            decode_outcome.source, source,
1572            "bead_id={RAPTORQ_BEAD_ID} case=decode_exact_k_symbols_roundtrip"
1573        );
1574        assert_eq!(
1575            decode_outcome.intermediate[0].len(),
1576            symbol_size,
1577            "bead_id={RAPTORQ_BEAD_ID} case=decode_exact_k_symbol_size"
1578        );
1579        assert_eq!(
1580            encoder.intermediate_symbol(0),
1581            decode_outcome.intermediate[0],
1582            "bead_id={RAPTORQ_BEAD_ID} case=decode_exact_k_intermediate_consistency"
1583        );
1584    }
1585
1586    #[test]
1587    fn test_decode_with_repair_symbols() {
1588        let symbol_size = 512_usize;
1589        let k = 16_usize;
1590        let seed = 0xABC0_FED1_u64;
1591        let source = low_level_source_block(k, symbol_size, seed);
1592        let encoder =
1593            SystematicEncoder::new(&source, symbol_size, seed).expect("systematic encoder");
1594        let decoder = InactivationDecoder::new(k, symbol_size, seed);
1595        let params = decoder.params();
1596        let base_rows = params.s + params.h;
1597        let constraints = ConstraintMatrix::build(params, seed);
1598
1599        let mut received = decoder.constraint_symbols();
1600        let source_indexes = (1..k).collect::<Vec<_>>();
1601        append_source_received_symbols(
1602            &mut received,
1603            &constraints,
1604            base_rows,
1605            params.k_prime,
1606            symbol_size,
1607            &source,
1608            &source_indexes,
1609        );
1610
1611        let repair_esi = u32::try_from(k).expect("k fits u32");
1612        let (columns, coefficients) = decoder.repair_equation(repair_esi);
1613        let repair_data = encoder.repair_symbol(repair_esi);
1614        received.push(ReceivedSymbol::repair(
1615            repair_esi,
1616            columns,
1617            coefficients,
1618            repair_data,
1619        ));
1620
1621        let decode_outcome = decoder
1622            .decode(&received)
1623            .expect("decode with one repair must succeed");
1624        assert_eq!(
1625            decode_outcome.source, source,
1626            "bead_id={RAPTORQ_BEAD_ID} case=decode_with_repair_roundtrip"
1627        );
1628    }
1629
1630    #[test]
1631    fn test_decode_insufficient_symbols() {
1632        let symbol_size = 512_usize;
1633        let k = 8_usize;
1634        let seed = 0xDEAD_BEEF_u64;
1635        let source = low_level_source_block(k, symbol_size, seed);
1636        let decoder = InactivationDecoder::new(k, symbol_size, seed);
1637        let params = decoder.params();
1638        let base_rows = params.s + params.h;
1639        let constraints = ConstraintMatrix::build(params, seed);
1640        let mut received = decoder.constraint_symbols();
1641        let source_indexes = (0..k.saturating_sub(1)).collect::<Vec<_>>();
1642        append_source_received_symbols(
1643            &mut received,
1644            &constraints,
1645            base_rows,
1646            params.k_prime,
1647            symbol_size,
1648            &source,
1649            &source_indexes,
1650        );
1651
1652        let decode = decoder.decode(&received);
1653        assert!(
1654            decode.is_err(),
1655            "bead_id={RAPTORQ_BEAD_ID} case=decode_insufficient_symbols unexpectedly succeeded"
1656        );
1657        if let Err(err) = decode {
1658            tracing::error!(
1659                bead_id = RAPTORQ_BEAD_ID,
1660                case = "test_decode_insufficient_symbols",
1661                error = ?err,
1662                "decode failed as expected due to insufficient symbols"
1663            );
1664        }
1665    }
1666
1667    #[test]
1668    fn test_symbol_size_alignment() {
1669        let config = raptorq_config(4096, 1.20);
1670        let symbol_size = usize::from(config.encoding.symbol_size);
1671        let data = deterministic_payload(symbol_size * 3, 0x600D_1111);
1672        let object_id = AsObjectId::new_for_test(1205);
1673        let (symbols, source_symbols) = encode_symbols(config, object_id, &data);
1674
1675        assert_eq!(
1676            source_symbols, 3,
1677            "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_source_count"
1678        );
1679        assert!(
1680            symbol_size.is_power_of_two(),
1681            "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_power_of_two"
1682        );
1683        assert_eq!(
1684            symbol_size % 512,
1685            0,
1686            "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_sector_multiple"
1687        );
1688        assert!(
1689            symbols.iter().all(|symbol| symbol.len() == symbol_size),
1690            "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_symbol_lengths"
1691        );
1692    }
1693
1694    #[test]
1695    fn prop_encode_decode_roundtrip() {
1696        for seed in [11_u64, 29_u64, 43_u64, 71_u64] {
1697            for k in [8_usize, 16_usize] {
1698                let config = raptorq_config(512, 1.30);
1699                let symbol_size = usize::from(config.encoding.symbol_size);
1700                let data = deterministic_payload(k * symbol_size - 17, seed);
1701                let object_id = AsObjectId::new_for_test(2000 + seed);
1702                let (symbols, source_symbols) = encode_symbols(config.clone(), object_id, &data);
1703                let (sources, _) = split_source_and_repair(&symbols, source_symbols);
1704                let subset = sources
1705                    .iter()
1706                    .take(source_symbols)
1707                    .cloned()
1708                    .collect::<Vec<_>>();
1709                let decoded = decode_symbols(config, object_id, data.len(), source_symbols, subset)
1710                    .expect("property roundtrip decode");
1711                assert_eq!(
1712                    decoded, data,
1713                    "bead_id={RAPTORQ_BEAD_ID} case=prop_encode_decode_roundtrip seed={seed} k={k}"
1714                );
1715            }
1716        }
1717    }
1718
1719    #[test]
1720    fn prop_any_k_of_n_suffices() {
1721        let symbol_size = 512_usize;
1722        let k = 16_usize;
1723        let sources = (0..k)
1724            .map(|symbol_idx| {
1725                deterministic_payload(
1726                    symbol_size,
1727                    0x5000_0000 + u64::try_from(symbol_idx).expect("index fits u64"),
1728                )
1729            })
1730            .collect::<Vec<_>>();
1731
1732        let mut parity = vec![0_u8; symbol_size];
1733        for source in &sources {
1734            symbol_add_assign(&mut parity, source).expect("parity construction");
1735        }
1736
1737        for omitted in 0..=k {
1738            let rebuilt = if omitted == k {
1739                sources.clone()
1740            } else {
1741                let mut recovered = parity.clone();
1742                for (index, source) in sources.iter().enumerate() {
1743                    if index != omitted {
1744                        symbol_add_assign(&mut recovered, source).expect("single-erasure recovery");
1745                    }
1746                }
1747                let mut rebuilt = sources.clone();
1748                rebuilt[omitted] = recovered;
1749                rebuilt
1750            };
1751
1752            assert_eq!(
1753                rebuilt.len(),
1754                k,
1755                "bead_id={RAPTORQ_BEAD_ID} case=prop_any_k_of_n_subset_size omitted_index={omitted}"
1756            );
1757            assert_eq!(
1758                rebuilt, sources,
1759                "bead_id={RAPTORQ_BEAD_ID} case=prop_any_k_of_n_suffices omitted_index={omitted}"
1760            );
1761        }
1762    }
1763
1764    #[test]
1765    fn prop_symbol_size_consistent() {
1766        for symbol_size in [512_u16, 1024_u16, 4096_u16] {
1767            for k in [4_usize, 8_usize] {
1768                let config = raptorq_config(symbol_size, 1.25);
1769                let size = usize::from(symbol_size);
1770                let object_id = AsObjectId::new_for_test(
1771                    u64::from(symbol_size) * 100 + u64::try_from(k).expect("k fits u64"),
1772                );
1773                let data = deterministic_payload(k * size - 3, u64::from(symbol_size));
1774                let (symbols, _) = encode_symbols(config, object_id, &data);
1775                assert!(
1776                    symbols.iter().all(|symbol| symbol.len() == size),
1777                    "bead_id={RAPTORQ_BEAD_ID} case=prop_symbol_size_consistent symbol_size={symbol_size} k={k}"
1778                );
1779            }
1780        }
1781    }
1782
1783    #[test]
1784    fn test_e2e_symbol_ops_in_encode_decode_roundtrip() {
1785        for (run, k) in [8_usize, 16_usize, 64_usize].iter().copied().enumerate() {
1786            let config = raptorq_config(512, 1.30);
1787            let symbol_size = usize::from(config.encoding.symbol_size);
1788            let data = deterministic_payload(
1789                k * symbol_size,
1790                0x4455_6677 + u64::try_from(run).expect("run fits u64"),
1791            );
1792            let object_id = AsObjectId::new_for_test(3000 + u64::try_from(k).expect("k fits u64"));
1793
1794            tracing::info!(
1795                bead_id = RAPTORQ_BEAD_ID,
1796                case = "test_e2e_symbol_ops_in_encode_decode_roundtrip",
1797                k,
1798                symbol_size,
1799                "starting encode/decode roundtrip"
1800            );
1801            let (symbols, source_symbols) = encode_symbols(config.clone(), object_id, &data);
1802            let (sources, _) = split_source_and_repair(&symbols, source_symbols);
1803            let subset = sources
1804                .iter()
1805                .take(source_symbols)
1806                .cloned()
1807                .collect::<Vec<_>>();
1808            let decoded = decode_symbols(config, object_id, data.len(), source_symbols, subset)
1809                .expect("e2e decode must succeed");
1810
1811            assert_eq!(
1812                decoded, data,
1813                "bead_id={RAPTORQ_BEAD_ID} case=e2e_roundtrip_bytes k={k}"
1814            );
1815
1816            let mut source_parity = vec![0_u8; symbol_size];
1817            for chunk in data.chunks_exact(symbol_size) {
1818                symbol_add_assign(&mut source_parity, chunk).expect("source parity xor");
1819            }
1820
1821            let mut decoded_parity = vec![0_u8; symbol_size];
1822            for chunk in decoded.chunks_exact(symbol_size) {
1823                symbol_add_assign(&mut decoded_parity, chunk).expect("decoded parity xor");
1824            }
1825
1826            assert_eq!(
1827                decoded_parity, source_parity,
1828                "bead_id={RAPTORQ_BEAD_ID} case=e2e_symbol_ops_parity k={k}"
1829            );
1830        }
1831    }
1832}