Skip to main content

fsqlite_core/
lib.rs

1//! Core bounded-parallelism primitives (§1.5, bd-22n.4).
2//!
3//! This module provides a small bulkhead framework for internal background work.
4//! It is intentionally non-blocking: overflow is rejected with `SQLITE_BUSY`
5//! (`FrankenError::Busy`) instead of queue-and-wait semantics.
6
7pub mod attach;
8pub mod commit_marker;
9#[cfg(not(target_arch = "wasm32"))]
10pub mod commit_repair;
11pub mod compat_persist;
12pub mod connection;
13#[cfg(not(target_arch = "wasm32"))]
14pub mod db_fec;
15pub mod decode_proofs;
16pub mod ecs_replication;
17pub mod epoch;
18pub mod explain;
19pub mod inter_object_coding;
20pub mod lrc;
21pub mod native_index;
22pub mod permeation_map;
23pub mod por;
24pub mod quiescence;
25#[cfg(not(target_arch = "wasm32"))]
26pub mod raptorq_codec;
27pub mod raptorq_integration;
28pub mod region;
29pub mod remote_effects;
30#[cfg(not(target_arch = "wasm32"))]
31pub mod repair_engine;
32pub mod repair_symbols;
33pub mod replication_receiver;
34pub mod replication_sender;
35pub mod snapshot_shipping;
36pub mod source_block_partition;
37pub mod symbol_log;
38pub mod symbol_size_policy;
39pub mod tiered_storage;
40pub mod transaction;
41pub mod vacuum;
42pub mod wal_adapter;
43#[cfg(not(target_arch = "wasm32"))]
44pub mod wal_fec_adapter;
45
46use std::num::NonZeroUsize;
47use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
48
49use fsqlite_error::{FrankenError, Result};
50use fsqlite_types::{ObjectId, Oti, PayloadHash, Region, SymbolRecord, SymbolRecordFlags};
51use tracing::{debug, error};
52
53const MAX_BALANCED_BG_CPU: usize = 16;
54
55/// Policy used when the bulkhead admission budget is exhausted.
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum OverflowPolicy {
58    /// Reject overflow immediately with `SQLITE_BUSY`.
59    DropBusy,
60}
61
62/// Runtime profile for conservative parallelism defaults.
63#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum ParallelismProfile {
65    /// Conservative profile used by default.
66    Balanced,
67}
68
69/// Bounded parallelism configuration for a work class.
70#[derive(Debug, Clone, Copy, PartialEq, Eq)]
71pub struct BulkheadConfig {
72    /// Number of tasks allowed to execute concurrently.
73    pub max_concurrent: usize,
74    /// Additional bounded admission slots (not unbounded queueing).
75    pub queue_depth: usize,
76    /// Overflow behavior when capacity is exhausted.
77    pub overflow_policy: OverflowPolicy,
78}
79
80impl BulkheadConfig {
81    /// Create an explicit configuration.
82    ///
83    /// Returns `None` when `max_concurrent` is zero.
84    #[must_use]
85    pub const fn new(
86        max_concurrent: usize,
87        queue_depth: usize,
88        overflow_policy: OverflowPolicy,
89    ) -> Option<Self> {
90        if max_concurrent == 0 {
91            None
92        } else {
93            Some(Self {
94                max_concurrent,
95                queue_depth,
96                overflow_policy,
97            })
98        }
99    }
100
101    /// Conservative default derived from available CPU parallelism.
102    ///
103    /// Uses the "balanced profile" formula from bd-22n.4:
104    /// `clamp(P / 8, 1, 16)` where `P = available_parallelism`.
105    #[must_use]
106    pub fn for_profile(profile: ParallelismProfile) -> Self {
107        let p = available_parallelism_or_one();
108        match profile {
109            ParallelismProfile::Balanced => Self {
110                max_concurrent: conservative_bg_cpu_max(p),
111                queue_depth: 0,
112                overflow_policy: OverflowPolicy::DropBusy,
113            },
114        }
115    }
116
117    /// Maximum admitted work units at once.
118    #[must_use]
119    pub const fn admission_limit(self) -> usize {
120        self.max_concurrent.saturating_add(self.queue_depth)
121    }
122}
123
124impl Default for BulkheadConfig {
125    fn default() -> Self {
126        Self::for_profile(ParallelismProfile::Balanced)
127    }
128}
129
130/// Compute conservative default background CPU parallelism from `P`.
131#[must_use]
132pub const fn conservative_bg_cpu_max(p: usize) -> usize {
133    let base = p / 8;
134    if base == 0 {
135        1
136    } else if base > MAX_BALANCED_BG_CPU {
137        MAX_BALANCED_BG_CPU
138    } else {
139        base
140    }
141}
142
143/// Return `std::thread::available_parallelism()` with a safe floor of 1.
144#[must_use]
145pub fn available_parallelism_or_one() -> usize {
146    std::thread::available_parallelism().map_or(1, NonZeroUsize::get)
147}
148
149/// Chunking plan for SIMD-friendly wide-word loops.
150#[derive(Debug, Clone, Copy, PartialEq, Eq)]
151pub struct WideChunkLayout {
152    /// Number of `u128` chunks processed.
153    pub u128_chunks: usize,
154    /// Number of `u64` chunks processed after `u128` chunks.
155    pub u64_chunks: usize,
156    /// Remaining tail bytes processed scalar.
157    pub tail_bytes: usize,
158}
159
160impl WideChunkLayout {
161    /// Compute the wide-chunk layout for a byte length.
162    #[must_use]
163    pub const fn for_len(len: usize) -> Self {
164        let u128_chunks = len / 16;
165        let rem_after_u128 = len % 16;
166        let u64_chunks = rem_after_u128 / 8;
167        let tail_bytes = rem_after_u128 % 8;
168        Self {
169            u128_chunks,
170            u64_chunks,
171            tail_bytes,
172        }
173    }
174}
175
176/// XOR patch application using `u128` + `u64` + tail loops.
177///
178/// This is the SIMD-friendly primitive for hot patch paths. LLVM can
179/// auto-vectorize the wide integer loops.
180#[allow(clippy::incompatible_msrv)]
181pub fn xor_patch_wide_chunks(dst: &mut [u8], patch: &[u8]) -> Result<WideChunkLayout> {
182    if dst.len() != patch.len() {
183        return Err(FrankenError::TypeMismatch {
184            expected: format!("equal lengths (dst == patch), got {}", dst.len()),
185            actual: patch.len().to_string(),
186        });
187    }
188
189    let layout = WideChunkLayout::for_len(dst.len());
190
191    let (dst_128, dst_rem) = dst.as_chunks_mut::<16>();
192    let (patch_128, patch_rem) = patch.as_chunks::<16>();
193    for (d, p) in dst_128.iter_mut().zip(patch_128.iter()) {
194        let d_word = u128::from_ne_bytes(*d);
195        let p_word = u128::from_ne_bytes(*p);
196        *d = (d_word ^ p_word).to_ne_bytes();
197    }
198
199    let (dst_64, dst_tail) = dst_rem.as_chunks_mut::<8>();
200    let (patch_64, patch_tail) = patch_rem.as_chunks::<8>();
201    for (d, p) in dst_64.iter_mut().zip(patch_64.iter()) {
202        let d_word = u64::from_ne_bytes(*d);
203        let p_word = u64::from_ne_bytes(*p);
204        *d = (d_word ^ p_word).to_ne_bytes();
205    }
206
207    for (d, p) in dst_tail.iter_mut().zip(patch_tail.iter()) {
208        *d ^= *p;
209    }
210
211    Ok(layout)
212}
213
214/// GF(256) addition (`+`) using wide XOR chunk loops.
215///
216/// In GF(256), addition is XOR, so this uses the same SIMD-friendly chunking
217/// strategy as [`xor_patch_wide_chunks`].
218pub fn gf256_add_assign_chunked(dst: &mut [u8], src: &[u8]) -> Result<WideChunkLayout> {
219    xor_patch_wide_chunks(dst, src)
220}
221
222const GF256_FIELD_SIZE: usize = 256;
223
224const fn gf256_mul_byte_for_table(mut a: u8, mut b: u8) -> u8 {
225    let mut out = 0_u8;
226    while b != 0 {
227        if (b & 1) != 0 {
228            out ^= a;
229        }
230        let carry = (a & 0x80) != 0;
231        a <<= 1;
232        if carry {
233            a ^= 0x1D;
234        }
235        b >>= 1;
236    }
237    out
238}
239
240const fn gf256_mul_row(coeff: u8) -> [u8; GF256_FIELD_SIZE] {
241    let mut row = [0_u8; GF256_FIELD_SIZE];
242    let mut byte = 0_u8;
243
244    loop {
245        row[byte as usize] = gf256_mul_byte_for_table(coeff, byte);
246        if byte == u8::MAX {
247            break;
248        }
249        byte = byte.wrapping_add(1);
250    }
251
252    row
253}
254
255// This builds `GF256_MUL_TABLES` at compile time; it is not a runtime stack allocation.
256#[allow(clippy::large_stack_arrays)]
257const fn gf256_mul_tables() -> [[u8; GF256_FIELD_SIZE]; GF256_FIELD_SIZE] {
258    let mut tables = [[0_u8; GF256_FIELD_SIZE]; GF256_FIELD_SIZE];
259    let mut coeff = 0_u8;
260
261    loop {
262        tables[coeff as usize] = gf256_mul_row(coeff);
263        if coeff == u8::MAX {
264            break;
265        }
266        coeff = coeff.wrapping_add(1);
267    }
268
269    tables
270}
271
272static GF256_MUL_TABLES: [[u8; GF256_FIELD_SIZE]; GF256_FIELD_SIZE] = gf256_mul_tables();
273
274#[inline]
275fn gf256_mul_table(coeff: u8) -> &'static [u8; GF256_FIELD_SIZE] {
276    &GF256_MUL_TABLES[usize::from(coeff)]
277}
278
279/// RaptorQ symbol add (`dst ^= src`) using chunked XOR.
280///
281/// This is the core symbol-add primitive from §3.2.2.
282pub fn symbol_add_assign(dst: &mut [u8], src: &[u8]) -> Result<WideChunkLayout> {
283    debug!(
284        bead_id = "bd-1hi.2",
285        op = "symbol_add_assign",
286        symbol_len = dst.len(),
287        "applying in-place XOR over symbol bytes"
288    );
289    gf256_add_assign_chunked(dst, src)
290}
291
292/// RaptorQ symbol scalar multiply (`out = c * src`) in GF(256).
293///
294/// Special cases:
295/// - `c == 0`: zero output
296/// - `c == 1`: copy input
297pub fn symbol_mul_into(coeff: u8, src: &[u8], out: &mut [u8]) -> Result<()> {
298    if src.len() != out.len() {
299        error!(
300            bead_id = "bd-1hi.2",
301            op = "symbol_mul_into",
302            coeff,
303            src_len = src.len(),
304            out_len = out.len(),
305            "symbol length mismatch"
306        );
307        return Err(FrankenError::TypeMismatch {
308            expected: format!("equal lengths (src == out), got {}", src.len()),
309            actual: out.len().to_string(),
310        });
311    }
312
313    debug!(
314        bead_id = "bd-1hi.2",
315        op = "symbol_mul_into",
316        coeff,
317        symbol_len = src.len(),
318        "applying GF(256) scalar multiplication"
319    );
320
321    match coeff {
322        0 => {
323            out.fill(0);
324            Ok(())
325        }
326        1 => {
327            out.copy_from_slice(src);
328            Ok(())
329        }
330        _ => {
331            let table = gf256_mul_table(coeff);
332            for (dst_byte, src_byte) in out.iter_mut().zip(src.iter()) {
333                *dst_byte = table[usize::from(*src_byte)];
334            }
335            Ok(())
336        }
337    }
338}
339
340/// RaptorQ fused multiply-add (`dst ^= c * src`) in GF(256).
341///
342/// Special cases:
343/// - `c == 0`: no-op
344/// - `c == 1`: pure XOR path
345pub fn symbol_addmul_assign(dst: &mut [u8], coeff: u8, src: &[u8]) -> Result<WideChunkLayout> {
346    if dst.len() != src.len() {
347        error!(
348            bead_id = "bd-1hi.2",
349            op = "symbol_addmul_assign",
350            coeff,
351            dst_len = dst.len(),
352            src_len = src.len(),
353            "symbol length mismatch"
354        );
355        return Err(FrankenError::TypeMismatch {
356            expected: format!("equal lengths (dst == src), got {}", dst.len()),
357            actual: src.len().to_string(),
358        });
359    }
360
361    debug!(
362        bead_id = "bd-1hi.2",
363        op = "symbol_addmul_assign",
364        coeff,
365        symbol_len = dst.len(),
366        "applying fused multiply-and-add over symbol bytes"
367    );
368
369    match coeff {
370        0 => Ok(WideChunkLayout::for_len(dst.len())),
371        1 => symbol_add_assign(dst, src),
372        _ => {
373            let table = gf256_mul_table(coeff);
374            for (dst_byte, src_byte) in dst.iter_mut().zip(src.iter()) {
375                *dst_byte ^= table[usize::from(*src_byte)];
376            }
377            Ok(WideChunkLayout::for_len(dst.len()))
378        }
379    }
380}
381
382/// Compute xxhash3 + blake3 on a contiguous input buffer.
383///
384/// - `xxhash3` path comes from `SymbolRecord::new` (`frame_xxh3`).
385/// - `blake3` path comes from `PayloadHash::blake3`.
386pub fn simd_friendly_checksum_pair(buffer: &[u8]) -> Result<(u64, [u8; 32])> {
387    let symbol_size = u32::try_from(buffer.len()).map_err(|_| FrankenError::OutOfRange {
388        what: "symbol_size".to_owned(),
389        value: buffer.len().to_string(),
390    })?;
391
392    let symbol_record = SymbolRecord::new(
393        ObjectId::from_bytes([0_u8; ObjectId::LEN]),
394        Oti {
395            f: u64::from(symbol_size),
396            al: 1,
397            t: symbol_size,
398            z: 1,
399            n: 1,
400        },
401        0,
402        buffer.to_vec(),
403        SymbolRecordFlags::empty(),
404    );
405    let blake = PayloadHash::blake3(buffer);
406
407    Ok((symbol_record.frame_xxh3, *blake.as_bytes()))
408}
409
410/// Non-blocking bulkhead admission gate.
411#[derive(Debug)]
412pub struct Bulkhead {
413    config: BulkheadConfig,
414    in_flight: AtomicUsize,
415    peak_in_flight: AtomicUsize,
416    busy_rejections: AtomicUsize,
417}
418
419impl Bulkhead {
420    #[must_use]
421    pub fn new(config: BulkheadConfig) -> Self {
422        Self {
423            config,
424            in_flight: AtomicUsize::new(0),
425            peak_in_flight: AtomicUsize::new(0),
426            busy_rejections: AtomicUsize::new(0),
427        }
428    }
429
430    #[must_use]
431    pub const fn config(&self) -> BulkheadConfig {
432        self.config
433    }
434
435    #[must_use]
436    pub fn in_flight(&self) -> usize {
437        self.in_flight.load(Ordering::Acquire)
438    }
439
440    #[must_use]
441    pub fn peak_in_flight(&self) -> usize {
442        self.peak_in_flight.load(Ordering::Acquire)
443    }
444
445    #[must_use]
446    pub fn busy_rejections(&self) -> usize {
447        self.busy_rejections.load(Ordering::Acquire)
448    }
449
450    /// Try to admit one work item.
451    ///
452    /// Never blocks. If the admission budget is exhausted, this returns
453    /// `FrankenError::Busy`.
454    pub fn try_acquire(&self) -> Result<BulkheadPermit<'_>> {
455        let limit = self.config.admission_limit();
456        loop {
457            let current = self.in_flight.load(Ordering::Acquire);
458            if current >= limit {
459                self.busy_rejections.fetch_add(1, Ordering::AcqRel);
460                return Err(match self.config.overflow_policy {
461                    OverflowPolicy::DropBusy => FrankenError::Busy,
462                });
463            }
464
465            let next = current.saturating_add(1);
466            if self
467                .in_flight
468                .compare_exchange_weak(current, next, Ordering::AcqRel, Ordering::Acquire)
469                .is_ok()
470            {
471                self.peak_in_flight.fetch_max(next, Ordering::AcqRel);
472                return Ok(BulkheadPermit {
473                    bulkhead: self,
474                    released: false,
475                });
476            }
477        }
478    }
479
480    /// Run work within a bulkhead permit.
481    pub fn run<T>(&self, work: impl FnOnce() -> T) -> Result<T> {
482        let _permit = self.try_acquire()?;
483        Ok(work())
484    }
485}
486
487/// RAII permit for a single admitted work item.
488#[derive(Debug)]
489pub struct BulkheadPermit<'a> {
490    bulkhead: &'a Bulkhead,
491    released: bool,
492}
493
494impl BulkheadPermit<'_> {
495    /// Explicitly release the permit.
496    pub fn release(mut self) {
497        if !self.released {
498            self.bulkhead.in_flight.fetch_sub(1, Ordering::AcqRel);
499            self.released = true;
500        }
501    }
502}
503
504impl Drop for BulkheadPermit<'_> {
505    fn drop(&mut self) {
506        if !self.released {
507            self.bulkhead.in_flight.fetch_sub(1, Ordering::AcqRel);
508            self.released = true;
509        }
510    }
511}
512
513/// Region-owned wrapper used for structured-concurrency integration.
514#[derive(Debug)]
515pub struct RegionBulkhead {
516    region: Region,
517    bulkhead: Bulkhead,
518    closing: AtomicBool,
519}
520
521impl RegionBulkhead {
522    #[must_use]
523    pub fn new(region: Region, config: BulkheadConfig) -> Self {
524        Self {
525            region,
526            bulkhead: Bulkhead::new(config),
527            closing: AtomicBool::new(false),
528        }
529    }
530
531    #[must_use]
532    pub const fn region(&self) -> Region {
533        self.region
534    }
535
536    #[must_use]
537    pub fn bulkhead(&self) -> &Bulkhead {
538        &self.bulkhead
539    }
540
541    pub fn try_acquire(&self) -> Result<BulkheadPermit<'_>> {
542        if self.closing.load(Ordering::Acquire) {
543            return Err(FrankenError::Busy);
544        }
545        self.bulkhead.try_acquire()
546    }
547
548    /// Begin region close: no new admissions are allowed after this point.
549    pub fn begin_close(&self) {
550        self.closing.store(true, Ordering::Release);
551    }
552
553    /// Whether all region-owned work has quiesced.
554    #[must_use]
555    pub fn is_quiescent(&self) -> bool {
556        self.bulkhead.in_flight() == 0
557    }
558}
559
560#[cfg(test)]
561mod tests {
562    use std::collections::VecDeque;
563    use std::pin::Pin;
564    use std::sync::Arc;
565    use std::task::{Context, Poll};
566    use std::thread;
567    use std::time::{Duration, Instant};
568
569    use asupersync::raptorq::decoder::{InactivationDecoder, ReceivedSymbol};
570    use asupersync::raptorq::gf256::{Gf256, gf256_add_slice, gf256_addmul_slice, gf256_mul_slice};
571    use asupersync::raptorq::systematic::{ConstraintMatrix, SystematicEncoder};
572    use asupersync::raptorq::{RaptorQReceiverBuilder, RaptorQSenderBuilder};
573    use asupersync::security::AuthenticationTag;
574    use asupersync::security::authenticated::AuthenticatedSymbol;
575    use asupersync::transport::error::{SinkError, StreamError};
576    use asupersync::transport::sink::SymbolSink;
577    use asupersync::transport::stream::SymbolStream;
578    use asupersync::types::{ObjectId as AsObjectId, ObjectParams, Symbol};
579    use asupersync::{Cx, RaptorQConfig};
580    use fsqlite_btree::compare_key_bytes_contiguous;
581    use fsqlite_types::gf256_mul_byte;
582
583    use super::*;
584
585    const BEAD_ID: &str = "bd-22n.4";
586    const SIMD_BEAD_ID: &str = "bd-22n.6";
587    const RAPTORQ_BEAD_ID: &str = "bd-1hi.2";
588
589    #[derive(Debug)]
590    struct VecSink {
591        symbols: Vec<Symbol>,
592    }
593
594    impl VecSink {
595        fn new() -> Self {
596            Self {
597                symbols: Vec::new(),
598            }
599        }
600    }
601
602    impl SymbolSink for VecSink {
603        fn poll_send(
604            mut self: Pin<&mut Self>,
605            _cx: &mut Context<'_>,
606            symbol: AuthenticatedSymbol,
607        ) -> Poll<std::result::Result<(), SinkError>> {
608            self.symbols.push(symbol.into_symbol());
609            Poll::Ready(Ok(()))
610        }
611
612        fn poll_flush(
613            self: Pin<&mut Self>,
614            _cx: &mut Context<'_>,
615        ) -> Poll<std::result::Result<(), SinkError>> {
616            Poll::Ready(Ok(()))
617        }
618
619        fn poll_close(
620            self: Pin<&mut Self>,
621            _cx: &mut Context<'_>,
622        ) -> Poll<std::result::Result<(), SinkError>> {
623            Poll::Ready(Ok(()))
624        }
625
626        fn poll_ready(
627            self: Pin<&mut Self>,
628            _cx: &mut Context<'_>,
629        ) -> Poll<std::result::Result<(), SinkError>> {
630            Poll::Ready(Ok(()))
631        }
632    }
633
634    #[derive(Debug)]
635    struct VecStream {
636        q: VecDeque<AuthenticatedSymbol>,
637    }
638
639    impl VecStream {
640        fn new(symbols: Vec<Symbol>) -> Self {
641            let q = symbols
642                .into_iter()
643                .map(|symbol| AuthenticatedSymbol::from_parts(symbol, AuthenticationTag::zero()))
644                .collect();
645            Self { q }
646        }
647    }
648
649    impl SymbolStream for VecStream {
650        fn poll_next(
651            mut self: Pin<&mut Self>,
652            _cx: &mut Context<'_>,
653        ) -> Poll<Option<std::result::Result<AuthenticatedSymbol, StreamError>>> {
654            match self.q.pop_front() {
655                Some(symbol) => Poll::Ready(Some(Ok(symbol))),
656                None => Poll::Ready(None),
657            }
658        }
659
660        fn size_hint(&self) -> (usize, Option<usize>) {
661            (self.q.len(), Some(self.q.len()))
662        }
663
664        fn is_exhausted(&self) -> bool {
665            self.q.is_empty()
666        }
667    }
668
669    fn raptorq_config(symbol_size: u16, repair_overhead: f64) -> RaptorQConfig {
670        let mut config = RaptorQConfig::default();
671        config.encoding.symbol_size = symbol_size;
672        config.encoding.max_block_size = 64 * 1024;
673        config.encoding.repair_overhead = repair_overhead;
674        config
675    }
676
677    fn deterministic_payload(len: usize, seed: u64) -> Vec<u8> {
678        let mut state = seed ^ 0x9E37_79B9_7F4A_7C15;
679        let mut out = Vec::with_capacity(len);
680        for idx in 0..len {
681            state ^= state << 7;
682            state ^= state >> 9;
683            state = state.wrapping_mul(0xA24B_AED4_963E_E407);
684            let idx_byte = u8::try_from(idx % 251).expect("modulo fits in u8");
685            out.push(u8::try_from(state & 0xFF).expect("masked to u8") ^ idx_byte);
686        }
687        out
688    }
689
690    fn xor_patch_bytewise(dst: &mut [u8], patch: &[u8]) {
691        for (dst_byte, patch_byte) in dst.iter_mut().zip(patch.iter()) {
692            *dst_byte ^= *patch_byte;
693        }
694    }
695
696    fn gf256_mul_bytewise(coeff: u8, src: &[u8], out: &mut [u8]) {
697        for (dst_byte, src_byte) in out.iter_mut().zip(src.iter()) {
698            *dst_byte = gf256_mul_byte(coeff, *src_byte);
699        }
700    }
701
702    fn collect_rs_files(root: &std::path::Path, out: &mut Vec<std::path::PathBuf>) {
703        let entries = std::fs::read_dir(root).expect("read_dir should succeed");
704        for entry in entries {
705            let path = entry.expect("read_dir entry should be readable").path();
706            if path.is_dir() {
707                collect_rs_files(&path, out);
708            } else if path.extension().is_some_and(|ext| ext == "rs") {
709                out.push(path);
710            }
711        }
712    }
713
714    fn encode_symbols(
715        config: RaptorQConfig,
716        object_id: AsObjectId,
717        data: &[u8],
718    ) -> (Vec<Symbol>, usize) {
719        let cx = Cx::for_testing();
720        let mut sender = RaptorQSenderBuilder::new()
721            .config(config)
722            .transport(VecSink::new())
723            .build()
724            .expect("sender build");
725        let outcome = sender
726            .send_object(&cx, object_id, data)
727            .expect("send_object must succeed");
728        let symbols = std::mem::take(&mut sender.transport_mut().symbols);
729        tracing::debug!(
730            bead_id = RAPTORQ_BEAD_ID,
731            case = "encode_symbols",
732            source_symbols = outcome.source_symbols,
733            emitted_symbols = symbols.len(),
734            object_size = data.len(),
735            "encoded object into source+repair symbol stream"
736        );
737        (symbols, outcome.source_symbols)
738    }
739
740    #[allow(clippy::result_large_err)]
741    fn decode_symbols(
742        config: RaptorQConfig,
743        object_id: AsObjectId,
744        object_size: usize,
745        source_symbols: usize,
746        symbols: Vec<Symbol>,
747    ) -> std::result::Result<Vec<u8>, asupersync::Error> {
748        let cx = Cx::for_testing();
749        let params = ObjectParams::new(
750            object_id,
751            u64::try_from(object_size).expect("object size fits u64"),
752            config.encoding.symbol_size,
753            1,
754            u16::try_from(source_symbols).expect("source symbol count fits u16"),
755        );
756        let mut receiver = RaptorQReceiverBuilder::new()
757            .config(config)
758            .source(VecStream::new(symbols))
759            .build()
760            .expect("receiver build");
761
762        receiver
763            .receive_object(&cx, &params)
764            .map(|outcome| outcome.data)
765    }
766
767    fn split_source_and_repair(
768        symbols: &[Symbol],
769        source_symbols: usize,
770    ) -> (Vec<Symbol>, Vec<Symbol>) {
771        let source_symbols_u32 =
772            u32::try_from(source_symbols).expect("source symbol count fits u32");
773        let mut sources = Vec::new();
774        let mut repairs = Vec::new();
775        for symbol in symbols {
776            if symbol.esi() < source_symbols_u32 {
777                sources.push(symbol.clone());
778            } else {
779                repairs.push(symbol.clone());
780            }
781        }
782        (sources, repairs)
783    }
784
785    fn low_level_source_block(k: usize, symbol_size: usize, seed: u64) -> Vec<Vec<u8>> {
786        (0..k)
787            .map(|source_index| {
788                deterministic_payload(
789                    symbol_size,
790                    seed + u64::try_from(source_index).expect("source index fits u64"),
791                )
792            })
793            .collect()
794    }
795
796    fn append_source_received_symbols(
797        received: &mut Vec<ReceivedSymbol>,
798        constraints: &ConstraintMatrix,
799        base_rows: usize,
800        k_prime: usize,
801        symbol_size: usize,
802        source: &[Vec<u8>],
803        source_indexes: &[usize],
804    ) {
805        for &source_index in source_indexes {
806            received.push(ReceivedSymbol::source(
807                u32::try_from(source_index).expect("source index fits u32"),
808                source[source_index].clone(),
809            ));
810        }
811
812        // RFC 6330 decode domain uses K' source-domain rows, not just K.
813        // The K' - K PI rows are internal zero-padding equations, not public
814        // source symbols. Public source ESIs remain restricted to [0, K).
815        for source_index in source.len()..k_prime {
816            let row = base_rows + source_index;
817            let mut columns = Vec::new();
818            let mut coefficients = Vec::new();
819            for col in 0..constraints.cols {
820                let coeff = constraints.get(row, col);
821                if !coeff.is_zero() {
822                    columns.push(col);
823                    coefficients.push(coeff);
824                }
825            }
826
827            received.push(ReceivedSymbol {
828                esi: u32::try_from(source_index).expect("source index fits u32"),
829                is_source: false,
830                columns,
831                coefficients,
832                data: vec![0_u8; symbol_size],
833            });
834        }
835    }
836
837    #[test]
838    fn test_parallelism_defaults_conservative() {
839        assert_eq!(
840            conservative_bg_cpu_max(16),
841            2,
842            "bead_id={BEAD_ID} case=balanced_profile_formula_p16"
843        );
844        assert_eq!(
845            conservative_bg_cpu_max(1),
846            1,
847            "bead_id={BEAD_ID} case=balanced_profile_min_floor"
848        );
849        assert_eq!(
850            conservative_bg_cpu_max(512),
851            16,
852            "bead_id={BEAD_ID} case=balanced_profile_max_cap"
853        );
854    }
855
856    #[test]
857    fn test_parallelism_bounded_by_available() {
858        let cfg = BulkheadConfig::default();
859        let p = available_parallelism_or_one();
860        assert!(
861            cfg.max_concurrent <= p,
862            "bead_id={BEAD_ID} case=default_exceeds_available_parallelism cfg={cfg:?} p={p}"
863        );
864
865        let bulkhead = Bulkhead::new(cfg);
866        let mut permits = Vec::new();
867        for _ in 0..cfg.admission_limit() {
868            permits.push(
869                bulkhead
870                    .try_acquire()
871                    .expect("admission under configured limit should succeed"),
872            );
873        }
874
875        let overflow = bulkhead.try_acquire();
876        assert!(
877            matches!(overflow, Err(FrankenError::Busy)),
878            "bead_id={BEAD_ID} case=bounded_admission_overflow_must_be_busy overflow={overflow:?}"
879        );
880
881        drop(permits);
882        assert_eq!(
883            bulkhead.in_flight(),
884            0,
885            "bead_id={BEAD_ID} case=permits_drop_to_zero"
886        );
887    }
888
889    #[test]
890    fn test_bulkhead_config_max_concurrent() {
891        let cfg = BulkheadConfig::new(3, 0, OverflowPolicy::DropBusy)
892            .expect("non-zero max_concurrent must be valid");
893        let bulkhead = Bulkhead::new(cfg);
894
895        let p1 = bulkhead.try_acquire().expect("slot 1");
896        let p2 = bulkhead.try_acquire().expect("slot 2");
897        let p3 = bulkhead.try_acquire().expect("slot 3");
898        let overflow = bulkhead.try_acquire();
899
900        assert!(
901            matches!(overflow, Err(FrankenError::Busy)),
902            "bead_id={BEAD_ID} case=max_concurrent_enforced overflow={overflow:?}"
903        );
904        drop((p1, p2, p3));
905    }
906
907    #[test]
908    fn test_overflow_policy_drop_with_busy() {
909        let cfg = BulkheadConfig::new(1, 0, OverflowPolicy::DropBusy)
910            .expect("non-zero max_concurrent must be valid");
911        let bulkhead = Bulkhead::new(cfg);
912        let _permit = bulkhead.try_acquire().expect("first permit must succeed");
913
914        let overflow = bulkhead.try_acquire();
915        assert!(
916            matches!(overflow, Err(FrankenError::Busy)),
917            "bead_id={BEAD_ID} case=overflow_policy_drop_busy overflow={overflow:?}"
918        );
919    }
920
921    #[test]
922    fn test_background_work_degrades_gracefully() {
923        let cfg = BulkheadConfig::new(2, 0, OverflowPolicy::DropBusy)
924            .expect("non-zero max_concurrent must be valid");
925        let bulkhead = Bulkhead::new(cfg);
926
927        let _a = bulkhead.try_acquire().expect("permit a");
928        let _b = bulkhead.try_acquire().expect("permit b");
929
930        for _ in 0..8 {
931            let result = bulkhead.try_acquire();
932            assert!(
933                matches!(result, Err(FrankenError::Busy)),
934                "bead_id={BEAD_ID} case=overflow_must_reject_not_wait result={result:?}"
935            );
936        }
937
938        assert_eq!(
939            bulkhead.busy_rejections(),
940            8,
941            "bead_id={BEAD_ID} case=busy_rejection_counter"
942        );
943    }
944
945    #[test]
946    fn test_region_integration() {
947        let cfg = BulkheadConfig::new(1, 0, OverflowPolicy::DropBusy)
948            .expect("non-zero max_concurrent must be valid");
949        let region_bulkhead = RegionBulkhead::new(Region::new(7), cfg);
950        assert_eq!(
951            region_bulkhead.region().get(),
952            7,
953            "bead_id={BEAD_ID} case=region_id_plumbed"
954        );
955
956        let permit = region_bulkhead.try_acquire().expect("first permit");
957        assert!(
958            !region_bulkhead.is_quiescent(),
959            "bead_id={BEAD_ID} case=region_non_quiescent_with_active_work"
960        );
961
962        region_bulkhead.begin_close();
963        let after_close = region_bulkhead.try_acquire();
964        assert!(
965            matches!(after_close, Err(FrankenError::Busy)),
966            "bead_id={BEAD_ID} case=region_close_blocks_new_work result={after_close:?}"
967        );
968
969        drop(permit);
970        assert!(
971            region_bulkhead.is_quiescent(),
972            "bead_id={BEAD_ID} case=region_quiescent_after_permit_drop"
973        );
974    }
975
976    #[test]
977    fn test_gf256_ops_chunked() {
978        let mut dst = vec![0xAA_u8; 40];
979        let src = vec![0x55_u8; 40];
980        let expected: Vec<u8> = dst.iter().zip(src.iter()).map(|(d, s)| *d ^ *s).collect();
981
982        let layout = gf256_add_assign_chunked(&mut dst, &src)
983            .expect("equal-length buffers should be accepted");
984        assert!(
985            layout.u128_chunks > 0 || layout.u64_chunks > 0,
986            "bead_id={SIMD_BEAD_ID} case=wide_chunks_expected layout={layout:?}"
987        );
988        assert_eq!(
989            dst, expected,
990            "bead_id={SIMD_BEAD_ID} case=gf256_addition_xor_equivalence"
991        );
992    }
993
994    #[test]
995    fn test_xor_patch_wide_chunks() {
996        let mut dst = vec![0xF0_u8; 37];
997        let patch = vec![0x0F_u8; 37];
998        let expected: Vec<u8> = dst.iter().zip(patch.iter()).map(|(d, p)| *d ^ *p).collect();
999
1000        let layout =
1001            xor_patch_wide_chunks(&mut dst, &patch).expect("equal-length buffers should be valid");
1002        assert_eq!(
1003            layout,
1004            WideChunkLayout {
1005                u128_chunks: 2,
1006                u64_chunks: 0,
1007                tail_bytes: 5,
1008            },
1009            "bead_id={SIMD_BEAD_ID} case=chunk_layout_expected"
1010        );
1011        assert_eq!(
1012            dst, expected,
1013            "bead_id={SIMD_BEAD_ID} case=xor_patch_matches_scalar_reference"
1014        );
1015    }
1016
1017    #[test]
1018    fn test_xor_symbols_u64_chunks() {
1019        // Length 24 exercises both the u128 and u64 lanes.
1020        let mut dst = vec![0xAB_u8; 24];
1021        let src = vec![0xCD_u8; 24];
1022        let mut expected = vec![0xAB_u8; 24];
1023        xor_patch_bytewise(&mut expected, &src);
1024
1025        let layout = xor_patch_wide_chunks(&mut dst, &src).expect("equal-length buffers");
1026        assert_eq!(
1027            layout,
1028            WideChunkLayout {
1029                u128_chunks: 1,
1030                u64_chunks: 1,
1031                tail_bytes: 0,
1032            },
1033            "bead_id=bd-2ddc case=u64_chunk_lane_exercised"
1034        );
1035        assert_eq!(
1036            dst, expected,
1037            "bead_id=bd-2ddc case=chunked_xor_matches_bytewise_reference"
1038        );
1039    }
1040
1041    #[test]
1042    fn test_gf256_multiply_chunks() {
1043        let coeff = 0xA7_u8;
1044        let src = deterministic_payload(4096, 0xDDCC_BBAA_1122_3344);
1045        let mut chunked = vec![0_u8; src.len()];
1046        let mut scalar = vec![0_u8; src.len()];
1047
1048        symbol_mul_into(coeff, &src, &mut chunked).expect("chunked symbol_mul_into");
1049        gf256_mul_bytewise(coeff, &src, &mut scalar);
1050
1051        assert_eq!(
1052            chunked, scalar,
1053            "bead_id=bd-2ddc case=chunked_mul_matches_scalar_reference"
1054        );
1055    }
1056
1057    #[test]
1058    fn test_u128_chunk_alignment() {
1059        // Non-multiple of 16 exercises the u128 path + tail handling.
1060        let mut via_wide = deterministic_payload(4099, 0x1234_5678_9ABC_DEF0);
1061        let mut via_u64_only = via_wide.clone();
1062        let patch = deterministic_payload(4099, 0x0F0E_0D0C_0B0A_0908);
1063
1064        xor_patch_wide_chunks(&mut via_wide, &patch).expect("wide chunk xor");
1065
1066        let mut dst_u64_chunks = via_u64_only.chunks_exact_mut(8);
1067        let mut patch_u64_chunks = patch.chunks_exact(8);
1068        for (dst_chunk, patch_chunk) in dst_u64_chunks.by_ref().zip(patch_u64_chunks.by_ref()) {
1069            let dst_word = u64::from_ne_bytes(
1070                dst_chunk
1071                    .try_into()
1072                    .expect("chunks_exact(8) must yield 8-byte chunk"),
1073            );
1074            let patch_word = u64::from_ne_bytes(
1075                patch_chunk
1076                    .try_into()
1077                    .expect("chunks_exact(8) must yield 8-byte chunk"),
1078            );
1079            dst_chunk.copy_from_slice(&(dst_word ^ patch_word).to_ne_bytes());
1080        }
1081        for (dst_byte, patch_byte) in dst_u64_chunks
1082            .into_remainder()
1083            .iter_mut()
1084            .zip(patch_u64_chunks.remainder().iter())
1085        {
1086            *dst_byte ^= *patch_byte;
1087        }
1088
1089        assert_eq!(
1090            via_wide, via_u64_only,
1091            "bead_id=bd-2ddc case=u128_lane_matches_u64_plus_tail"
1092        );
1093    }
1094
1095    #[test]
1096    fn test_benchmark_chunk_vs_byte() {
1097        // Meaningful performance checks require optimized codegen.
1098        if cfg!(debug_assertions) {
1099            return;
1100        }
1101
1102        let iterations = 32_000_usize;
1103        let src = deterministic_payload(4096, 0xDEAD_BEEF_F00D_CAFE);
1104        let base = deterministic_payload(4096, 0x0123_4567_89AB_CDEF);
1105
1106        let mut chunked = base.clone();
1107        let chunked_start = Instant::now();
1108        for _ in 0..iterations {
1109            xor_patch_wide_chunks(&mut chunked, &src).expect("chunked xor");
1110            std::hint::black_box(&chunked);
1111        }
1112        let chunked_elapsed = chunked_start.elapsed();
1113
1114        let mut bytewise = base;
1115        let bytewise_start = Instant::now();
1116        for _ in 0..iterations {
1117            xor_patch_bytewise(&mut bytewise, &src);
1118            std::hint::black_box(&bytewise);
1119        }
1120        let bytewise_elapsed = bytewise_start.elapsed();
1121
1122        let speedup = bytewise_elapsed.as_secs_f64() / chunked_elapsed.as_secs_f64();
1123        assert!(
1124            speedup >= 4.0,
1125            "bead_id=bd-2ddc case=chunk_vs_byte_speedup speedup={speedup:.2}x \
1126             chunked_ns={} bytewise_ns={} iterations={iterations}",
1127            chunked_elapsed.as_nanos(),
1128            bytewise_elapsed.as_nanos()
1129        );
1130    }
1131
1132    #[test]
1133    fn test_no_unsafe_simd() {
1134        let manifest = include_str!("../../../Cargo.toml");
1135        assert!(
1136            manifest.contains(r#"unsafe_code = "forbid""#),
1137            "bead_id=bd-2ddc case=workspace_forbids_unsafe"
1138        );
1139
1140        let workspace_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
1141            .parent()
1142            .expect("crate dir has parent")
1143            .parent()
1144            .expect("workspace root exists")
1145            .to_path_buf();
1146        let crates_dir = workspace_root.join("crates");
1147
1148        let mut rs_files = Vec::new();
1149        collect_rs_files(&crates_dir, &mut rs_files);
1150
1151        let simd_needles = [
1152            "_mm_",
1153            "std::arch::",
1154            "core::arch::",
1155            "__m128",
1156            "__m256",
1157            "__m512",
1158            "simd_shuffle",
1159            "vpxor",
1160            "vxorq",
1161        ];
1162
1163        let mut offenders = Vec::new();
1164        for file in rs_files {
1165            let Ok(content) = std::fs::read_to_string(&file) else {
1166                continue;
1167            };
1168
1169            let lines = content.lines().collect::<Vec<_>>();
1170            for (idx, line) in lines.iter().enumerate() {
1171                let has_intrinsic = simd_needles.iter().any(|needle| line.contains(needle));
1172                if !has_intrinsic {
1173                    continue;
1174                }
1175
1176                let window_start = idx.saturating_sub(3);
1177                let window_end = (idx + 3).min(lines.len().saturating_sub(1));
1178                let mut found_unsafe_nearby = false;
1179                for nearby in &lines[window_start..=window_end] {
1180                    let trimmed = nearby.trim_start();
1181                    let is_comment = trimmed.starts_with("//");
1182                    if !is_comment && trimmed.contains("unsafe") {
1183                        found_unsafe_nearby = true;
1184                        break;
1185                    }
1186                }
1187
1188                if found_unsafe_nearby {
1189                    offenders.push(format!("{}:{}", file.display(), idx + 1));
1190                }
1191            }
1192        }
1193
1194        assert!(
1195            offenders.is_empty(),
1196            "bead_id=bd-2ddc case=no_unsafe_simd_intrinsics offenders={offenders:?}"
1197        );
1198    }
1199
1200    #[test]
1201    fn test_checksum_simd_friendly() {
1202        let buffer = vec![0x11_u8; 256];
1203        let (xx_a, blake_a) =
1204            simd_friendly_checksum_pair(&buffer).expect("checksum pair must succeed");
1205
1206        let mut modified = buffer;
1207        modified[255] ^= 0x01;
1208        let (xx_b, blake_b) =
1209            simd_friendly_checksum_pair(&modified).expect("checksum pair must succeed");
1210
1211        assert_ne!(
1212            xx_a, xx_b,
1213            "bead_id={SIMD_BEAD_ID} case=xxhash3_changes_on_byte_flip"
1214        );
1215        assert_ne!(
1216            blake_a, blake_b,
1217            "bead_id={SIMD_BEAD_ID} case=blake3_changes_on_byte_flip"
1218        );
1219    }
1220
1221    #[test]
1222    fn test_e2e_bounded_parallelism_under_background_load() {
1223        let cfg = BulkheadConfig::new(4, 0, OverflowPolicy::DropBusy)
1224            .expect("non-zero max_concurrent must be valid");
1225        let bulkhead = Arc::new(Bulkhead::new(cfg));
1226
1227        let handles: Vec<_> = (0..48)
1228            .map(|_| {
1229                let bulkhead = Arc::clone(&bulkhead);
1230                thread::spawn(move || {
1231                    bulkhead.run(|| {
1232                        thread::sleep(Duration::from_millis(10));
1233                    })
1234                })
1235            })
1236            .collect();
1237
1238        let mut busy = 0_usize;
1239        for handle in handles {
1240            match handle.join().expect("worker thread should not panic") {
1241                Ok(()) => {}
1242                Err(FrankenError::Busy) => busy = busy.saturating_add(1),
1243                Err(err) => {
1244                    assert_eq!(
1245                        err.error_code(),
1246                        fsqlite_error::ErrorCode::Busy,
1247                        "bead_id={BEAD_ID} case=e2e_unexpected_bulkhead_error err={err}"
1248                    );
1249                    busy = busy.saturating_add(1);
1250                }
1251            }
1252        }
1253
1254        assert!(
1255            busy > 0,
1256            "bead_id={BEAD_ID} case=e2e_should_observe_overflow_rejections"
1257        );
1258        assert!(
1259            bulkhead.peak_in_flight() <= cfg.admission_limit(),
1260            "bead_id={BEAD_ID} case=e2e_peak_parallelism_exceeded peak={} limit={}",
1261            bulkhead.peak_in_flight(),
1262            cfg.admission_limit()
1263        );
1264    }
1265
1266    #[test]
1267    fn test_e2e_simd_hot_path_correctness() {
1268        // 1) B-tree hot comparison over contiguous slices.
1269        let contiguous = b"key-0001key-0002".to_vec();
1270        let left = &contiguous[0..8];
1271        let right = &contiguous[8..16];
1272        let compare_start = Instant::now();
1273        assert_eq!(
1274            compare_key_bytes_contiguous(left, right),
1275            left.cmp(right),
1276            "bead_id={SIMD_BEAD_ID} case=btree_contiguous_compare_correct"
1277        );
1278        let compare_elapsed = compare_start.elapsed();
1279
1280        // 2) GF(256) add (XOR) and XOR patch helpers.
1281        let mut symbol_a = (0_u8..64).collect::<Vec<u8>>();
1282        let symbol_b = (64_u8..128).collect::<Vec<u8>>();
1283        let expected_add: Vec<u8> = symbol_a
1284            .iter()
1285            .zip(symbol_b.iter())
1286            .map(|(a, b)| *a ^ *b)
1287            .collect();
1288        let gf256_start = Instant::now();
1289        gf256_add_assign_chunked(&mut symbol_a, &symbol_b).expect("gf256 add should succeed");
1290        let gf256_elapsed = gf256_start.elapsed();
1291        assert_eq!(
1292            symbol_a, expected_add,
1293            "bead_id={SIMD_BEAD_ID} case=gf256_chunked_add_correct"
1294        );
1295
1296        let mut patch_target = vec![0x33_u8; 64];
1297        let patch = vec![0xCC_u8; 64];
1298        let xor_start = Instant::now();
1299        xor_patch_wide_chunks(&mut patch_target, &patch).expect("xor patch should succeed");
1300        let xor_elapsed = xor_start.elapsed();
1301        assert!(
1302            patch_target.iter().all(|&byte| byte == (0x33_u8 ^ 0xCC_u8)),
1303            "bead_id={SIMD_BEAD_ID} case=xor_patch_chunked_correct"
1304        );
1305
1306        // 3) SIMD-friendly checksum feed.
1307        let checksum_start = Instant::now();
1308        let (xx, blake) =
1309            simd_friendly_checksum_pair(&patch_target).expect("checksum pair should succeed");
1310        let checksum_elapsed = checksum_start.elapsed();
1311        assert_ne!(
1312            xx, 0,
1313            "bead_id={SIMD_BEAD_ID} case=xxhash3_nonzero_for_nonempty_payload"
1314        );
1315        assert!(
1316            blake.iter().any(|&b| b != 0),
1317            "bead_id={SIMD_BEAD_ID} case=blake3_digest_nonzero"
1318        );
1319
1320        eprintln!(
1321            "bead_id={SIMD_BEAD_ID} metric=simd_hot_path_ns compare={} gf256_add={} xor_patch={} checksum={}",
1322            compare_elapsed.as_nanos(),
1323            gf256_elapsed.as_nanos(),
1324            xor_elapsed.as_nanos(),
1325            checksum_elapsed.as_nanos()
1326        );
1327    }
1328
1329    #[test]
1330    fn test_symbol_add_self_inverse() {
1331        let src = (0_u16..512)
1332            .map(|idx| u8::try_from(idx % 251).expect("modulo fits in u8"))
1333            .collect::<Vec<_>>();
1334        let mut dst = src.clone();
1335        symbol_add_assign(&mut dst, &src).expect("symbol_add should succeed");
1336        assert!(
1337            dst.iter().all(|byte| *byte == 0),
1338            "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_self_inverse"
1339        );
1340    }
1341
1342    #[test]
1343    fn test_symbol_add_commutative_and_associative() {
1344        let a = (0_u16..128)
1345            .map(|idx| u8::try_from((idx * 3) % 251).expect("modulo fits"))
1346            .collect::<Vec<_>>();
1347        let b = (0_u16..128)
1348            .map(|idx| u8::try_from((idx * 5 + 7) % 251).expect("modulo fits"))
1349            .collect::<Vec<_>>();
1350        let c = (0_u16..128)
1351            .map(|idx| u8::try_from((idx * 11 + 13) % 251).expect("modulo fits"))
1352            .collect::<Vec<_>>();
1353
1354        let mut ab = a.clone();
1355        symbol_add_assign(&mut ab, &b).expect("a+b");
1356        let mut ba = b.clone();
1357        symbol_add_assign(&mut ba, &a).expect("b+a");
1358        assert_eq!(
1359            ab, ba,
1360            "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_commutative"
1361        );
1362
1363        let mut lhs = a.clone();
1364        symbol_add_assign(&mut lhs, &b).expect("(a+b)");
1365        symbol_add_assign(&mut lhs, &c).expect("(a+b)+c");
1366
1367        let mut rhs = b;
1368        symbol_add_assign(&mut rhs, &c).expect("(b+c)");
1369        let mut rhs2 = a;
1370        symbol_add_assign(&mut rhs2, &rhs).expect("a+(b+c)");
1371
1372        assert_eq!(
1373            lhs, rhs2,
1374            "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_associative"
1375        );
1376    }
1377
1378    #[test]
1379    fn test_symbol_mul_special_cases() {
1380        let src = (0_u16..256)
1381            .map(|idx| u8::try_from(idx).expect("idx fits"))
1382            .collect::<Vec<_>>();
1383
1384        let mut out_zero = vec![0_u8; src.len()];
1385        symbol_mul_into(0, &src, &mut out_zero).expect("mul by zero");
1386        assert!(
1387            out_zero.iter().all(|byte| *byte == 0),
1388            "bead_id={RAPTORQ_BEAD_ID} case=symbol_mul_zero"
1389        );
1390
1391        let mut out_one = vec![0_u8; src.len()];
1392        symbol_mul_into(1, &src, &mut out_one).expect("mul by one");
1393        assert_eq!(
1394            out_one, src,
1395            "bead_id={RAPTORQ_BEAD_ID} case=symbol_mul_identity"
1396        );
1397    }
1398
1399    #[test]
1400    fn test_symbol_mul_matches_scalar_reference() {
1401        let src = (0_u16..512)
1402            .map(|idx| u8::try_from((idx * 7 + 17) % 251).expect("modulo fits"))
1403            .collect::<Vec<_>>();
1404        let coeff = 0xA7_u8;
1405
1406        let mut out = vec![0_u8; src.len()];
1407        symbol_mul_into(coeff, &src, &mut out).expect("symbol mul");
1408        for (actual, input) in out.iter().zip(src.iter()) {
1409            let expected = gf256_mul_byte(coeff, *input);
1410            assert_eq!(
1411                *actual, expected,
1412                "bead_id={RAPTORQ_BEAD_ID} case=symbol_mul_scalar_match"
1413            );
1414        }
1415    }
1416
1417    #[test]
1418    fn test_symbol_addmul_special_cases_and_equivalence() {
1419        let src = (0_u16..512)
1420            .map(|idx| u8::try_from((idx * 13 + 19) % 251).expect("modulo fits"))
1421            .collect::<Vec<_>>();
1422        let original = (0_u16..512)
1423            .map(|idx| u8::try_from((idx * 9 + 3) % 251).expect("modulo fits"))
1424            .collect::<Vec<_>>();
1425
1426        let mut no_op = original.clone();
1427        symbol_addmul_assign(&mut no_op, 0, &src).expect("c=0");
1428        assert_eq!(
1429            no_op, original,
1430            "bead_id={RAPTORQ_BEAD_ID} case=symbol_addmul_c0_noop"
1431        );
1432
1433        let mut xor_path = original.clone();
1434        symbol_addmul_assign(&mut xor_path, 1, &src).expect("c=1");
1435        let mut expected_xor = original.clone();
1436        symbol_add_assign(&mut expected_xor, &src).expect("xor reference");
1437        assert_eq!(
1438            xor_path, expected_xor,
1439            "bead_id={RAPTORQ_BEAD_ID} case=symbol_addmul_c1_equals_xor"
1440        );
1441
1442        let coeff = 0x53_u8;
1443        let mut fused = original.clone();
1444        symbol_addmul_assign(&mut fused, coeff, &src).expect("fused");
1445        let mut mul = vec![0_u8; src.len()];
1446        symbol_mul_into(coeff, &src, &mut mul).expect("mul");
1447        let mut separate = original;
1448        symbol_add_assign(&mut separate, &mul).expect("add");
1449        assert_eq!(
1450            fused, separate,
1451            "bead_id={RAPTORQ_BEAD_ID} case=symbol_addmul_fused_equals_mul_plus_add"
1452        );
1453    }
1454
1455    #[test]
1456    fn test_symbol_operations_4096_and_512() {
1457        for symbol_len in [4096_usize, 1024_usize, 512_usize] {
1458            let a = vec![0xAA_u8; symbol_len];
1459            let b = vec![0x55_u8; symbol_len];
1460            let mut sum = a.clone();
1461            let layout = symbol_add_assign(&mut sum, &b).expect("symbol add");
1462            assert_eq!(
1463                layout,
1464                WideChunkLayout::for_len(symbol_len),
1465                "bead_id={RAPTORQ_BEAD_ID} case=symbol_len_layout_consistency len={symbol_len}"
1466            );
1467            assert!(
1468                sum.iter().all(|byte| *byte == (0xAA_u8 ^ 0x55_u8)),
1469                "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_expected_xor len={symbol_len}"
1470            );
1471        }
1472    }
1473
1474    #[test]
1475    fn test_gf256_arithmetic_matches_asupersync() {
1476        for a in 0_u8..=u8::MAX {
1477            for b in 0_u8..=u8::MAX {
1478                assert_eq!(
1479                    gf256_mul_byte(a, b),
1480                    (Gf256(a) * Gf256(b)).raw(),
1481                    "bead_id={RAPTORQ_BEAD_ID} case=gf256_mul_parity a=0x{a:02X} b=0x{b:02X}"
1482                );
1483            }
1484        }
1485    }
1486
1487    #[test]
1488    fn test_symbol_ops_match_asupersync_gf256_slices() {
1489        for symbol_len in [512_usize, 1024_usize, 4096_usize] {
1490            let src = deterministic_payload(symbol_len, 0xA5A5_0101);
1491            let dst = deterministic_payload(symbol_len, 0x5A5A_0202);
1492
1493            let mut ours_add = dst.clone();
1494            symbol_add_assign(&mut ours_add, &src).expect("symbol add");
1495            let mut as_add = dst.clone();
1496            gf256_add_slice(&mut as_add, &src);
1497            assert_eq!(
1498                ours_add, as_add,
1499                "bead_id={RAPTORQ_BEAD_ID} case=asupersync_parity_add len={symbol_len}"
1500            );
1501
1502            for coeff in [0_u8, 1_u8, 0x53_u8, 0xA7_u8] {
1503                let mut ours_mul = vec![0_u8; symbol_len];
1504                symbol_mul_into(coeff, &src, &mut ours_mul).expect("symbol mul");
1505                let mut as_mul = src.clone();
1506                gf256_mul_slice(&mut as_mul, Gf256(coeff));
1507                assert_eq!(
1508                    ours_mul, as_mul,
1509                    "bead_id={RAPTORQ_BEAD_ID} case=asupersync_parity_mul len={symbol_len} coeff=0x{coeff:02X}"
1510                );
1511
1512                let mut ours_addmul = dst.clone();
1513                symbol_addmul_assign(&mut ours_addmul, coeff, &src).expect("symbol addmul");
1514                let mut as_addmul = dst.clone();
1515                gf256_addmul_slice(&mut as_addmul, &src, Gf256(coeff));
1516                assert_eq!(
1517                    ours_addmul, as_addmul,
1518                    "bead_id={RAPTORQ_BEAD_ID} case=asupersync_parity_addmul len={symbol_len} coeff=0x{coeff:02X}"
1519                );
1520            }
1521        }
1522    }
1523
1524    #[test]
1525    fn test_encode_single_source_block() {
1526        let config = raptorq_config(512, 1.25);
1527        let symbol_size = usize::from(config.encoding.symbol_size);
1528        let k = 8_usize;
1529        let data = deterministic_payload(k * symbol_size, 0x0102_0304);
1530        let object_id = AsObjectId::new_for_test(1201);
1531        tracing::info!(
1532            bead_id = RAPTORQ_BEAD_ID,
1533            case = "test_encode_single_source_block",
1534            symbol_size,
1535            requested_k = k,
1536            "encoding source block"
1537        );
1538        let (symbols, source_symbols) = encode_symbols(config, object_id, &data);
1539        let (sources, repairs) = split_source_and_repair(&symbols, source_symbols);
1540
1541        assert_eq!(
1542            source_symbols, k,
1543            "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_source_count"
1544        );
1545        assert_eq!(
1546            sources.len(),
1547            source_symbols,
1548            "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_source_partition"
1549        );
1550        assert!(
1551            !repairs.is_empty(),
1552            "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_repair_present"
1553        );
1554        assert!(
1555            symbols.iter().all(|symbol| symbol.len() == symbol_size),
1556            "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_symbol_size_consistent"
1557        );
1558    }
1559
1560    #[test]
1561    fn test_decode_exact_k_symbols() {
1562        let symbol_size = 512_usize;
1563        let k = 16_usize;
1564        let seed = 0x0BAD_CAFE_u64;
1565        let source = low_level_source_block(k, symbol_size, seed);
1566        let encoder =
1567            SystematicEncoder::new(&source, symbol_size, seed).expect("systematic encoder");
1568        let decoder = InactivationDecoder::new(k, symbol_size, seed);
1569        let params = decoder.params();
1570        let base_rows = params.s + params.h;
1571        let constraints = ConstraintMatrix::build(params, seed);
1572        let mut received = decoder.constraint_symbols();
1573        let source_indexes = (0..k).collect::<Vec<_>>();
1574        append_source_received_symbols(
1575            &mut received,
1576            &constraints,
1577            base_rows,
1578            params.k_prime,
1579            symbol_size,
1580            &source,
1581            &source_indexes,
1582        );
1583
1584        tracing::warn!(
1585            bead_id = RAPTORQ_BEAD_ID,
1586            case = "test_decode_exact_k_symbols",
1587            source_symbols = k,
1588            "decoding with minimum symbol count (fragile recovery threshold)"
1589        );
1590        let decode_outcome = decoder
1591            .decode(&received)
1592            .expect("decode exact-k must succeed");
1593        assert_eq!(
1594            decode_outcome.source, source,
1595            "bead_id={RAPTORQ_BEAD_ID} case=decode_exact_k_symbols_roundtrip"
1596        );
1597        assert_eq!(
1598            decode_outcome.intermediate[0].len(),
1599            symbol_size,
1600            "bead_id={RAPTORQ_BEAD_ID} case=decode_exact_k_symbol_size"
1601        );
1602        assert_eq!(
1603            encoder.intermediate_symbol(0),
1604            decode_outcome.intermediate[0],
1605            "bead_id={RAPTORQ_BEAD_ID} case=decode_exact_k_intermediate_consistency"
1606        );
1607    }
1608
1609    #[test]
1610    fn test_decode_with_repair_symbols() {
1611        let symbol_size = 512_usize;
1612        let k = 16_usize;
1613        let seed = 0xABC0_FED1_u64;
1614        let source = low_level_source_block(k, symbol_size, seed);
1615        let encoder =
1616            SystematicEncoder::new(&source, symbol_size, seed).expect("systematic encoder");
1617        let decoder = InactivationDecoder::new(k, symbol_size, seed);
1618        let params = decoder.params();
1619        let base_rows = params.s + params.h;
1620        let constraints = ConstraintMatrix::build(params, seed);
1621
1622        let mut received = decoder.constraint_symbols();
1623        let source_indexes = (1..k).collect::<Vec<_>>();
1624        append_source_received_symbols(
1625            &mut received,
1626            &constraints,
1627            base_rows,
1628            params.k_prime,
1629            symbol_size,
1630            &source,
1631            &source_indexes,
1632        );
1633
1634        let repair_esi = u32::try_from(k).expect("k fits u32");
1635        let (columns, coefficients) = decoder.repair_equation_rfc6330(repair_esi);
1636        let repair_data = encoder.repair_symbol(repair_esi);
1637        received.push(ReceivedSymbol::repair(
1638            repair_esi,
1639            columns,
1640            coefficients,
1641            repair_data,
1642        ));
1643
1644        let decode_outcome = decoder
1645            .decode(&received)
1646            .expect("decode with one repair must succeed");
1647        assert_eq!(
1648            decode_outcome.source, source,
1649            "bead_id={RAPTORQ_BEAD_ID} case=decode_with_repair_roundtrip"
1650        );
1651    }
1652
1653    #[test]
1654    fn test_decode_insufficient_symbols() {
1655        let symbol_size = 512_usize;
1656        let k = 8_usize;
1657        let seed = 0xDEAD_BEEF_u64;
1658        let source = low_level_source_block(k, symbol_size, seed);
1659        let decoder = InactivationDecoder::new(k, symbol_size, seed);
1660        let params = decoder.params();
1661        let base_rows = params.s + params.h;
1662        let constraints = ConstraintMatrix::build(params, seed);
1663        let mut received = decoder.constraint_symbols();
1664        let source_indexes = (0..k.saturating_sub(1)).collect::<Vec<_>>();
1665        append_source_received_symbols(
1666            &mut received,
1667            &constraints,
1668            base_rows,
1669            params.k_prime,
1670            symbol_size,
1671            &source,
1672            &source_indexes,
1673        );
1674
1675        let decode = decoder.decode(&received);
1676        assert!(
1677            decode.is_err(),
1678            "bead_id={RAPTORQ_BEAD_ID} case=decode_insufficient_symbols unexpectedly succeeded"
1679        );
1680        if let Err(err) = decode {
1681            tracing::error!(
1682                bead_id = RAPTORQ_BEAD_ID,
1683                case = "test_decode_insufficient_symbols",
1684                error = ?err,
1685                "decode failed as expected due to insufficient symbols"
1686            );
1687        }
1688    }
1689
1690    #[test]
1691    fn test_symbol_size_alignment() {
1692        let config = raptorq_config(4096, 1.20);
1693        let symbol_size = usize::from(config.encoding.symbol_size);
1694        let data = deterministic_payload(symbol_size * 3, 0x600D_1111);
1695        let object_id = AsObjectId::new_for_test(1205);
1696        let (symbols, source_symbols) = encode_symbols(config, object_id, &data);
1697
1698        assert_eq!(
1699            source_symbols, 3,
1700            "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_source_count"
1701        );
1702        assert!(
1703            symbol_size.is_power_of_two(),
1704            "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_power_of_two"
1705        );
1706        assert_eq!(
1707            symbol_size % 512,
1708            0,
1709            "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_sector_multiple"
1710        );
1711        assert!(
1712            symbols.iter().all(|symbol| symbol.len() == symbol_size),
1713            "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_symbol_lengths"
1714        );
1715    }
1716
1717    #[test]
1718    fn prop_encode_decode_roundtrip() {
1719        for seed in [11_u64, 29_u64, 43_u64, 71_u64] {
1720            for k in [8_usize, 16_usize] {
1721                let config = raptorq_config(512, 1.30);
1722                let symbol_size = usize::from(config.encoding.symbol_size);
1723                let data = deterministic_payload(k * symbol_size - 17, seed);
1724                let object_id = AsObjectId::new_for_test(2000 + seed);
1725                let (symbols, source_symbols) = encode_symbols(config.clone(), object_id, &data);
1726                let (sources, _) = split_source_and_repair(&symbols, source_symbols);
1727                let subset = sources
1728                    .iter()
1729                    .take(source_symbols)
1730                    .cloned()
1731                    .collect::<Vec<_>>();
1732                let decoded = decode_symbols(config, object_id, data.len(), source_symbols, subset)
1733                    .expect("property roundtrip decode");
1734                assert_eq!(
1735                    decoded, data,
1736                    "bead_id={RAPTORQ_BEAD_ID} case=prop_encode_decode_roundtrip seed={seed} k={k}"
1737                );
1738            }
1739        }
1740    }
1741
1742    #[test]
1743    fn prop_any_k_of_n_suffices() {
1744        let symbol_size = 512_usize;
1745        let k = 16_usize;
1746        let sources = (0..k)
1747            .map(|symbol_idx| {
1748                deterministic_payload(
1749                    symbol_size,
1750                    0x5000_0000 + u64::try_from(symbol_idx).expect("index fits u64"),
1751                )
1752            })
1753            .collect::<Vec<_>>();
1754
1755        let mut parity = vec![0_u8; symbol_size];
1756        for source in &sources {
1757            symbol_add_assign(&mut parity, source).expect("parity construction");
1758        }
1759
1760        for omitted in 0..=k {
1761            let rebuilt = if omitted == k {
1762                sources.clone()
1763            } else {
1764                let mut recovered = parity.clone();
1765                for (index, source) in sources.iter().enumerate() {
1766                    if index != omitted {
1767                        symbol_add_assign(&mut recovered, source).expect("single-erasure recovery");
1768                    }
1769                }
1770                let mut rebuilt = sources.clone();
1771                rebuilt[omitted] = recovered;
1772                rebuilt
1773            };
1774
1775            assert_eq!(
1776                rebuilt.len(),
1777                k,
1778                "bead_id={RAPTORQ_BEAD_ID} case=prop_any_k_of_n_subset_size omitted_index={omitted}"
1779            );
1780            assert_eq!(
1781                rebuilt, sources,
1782                "bead_id={RAPTORQ_BEAD_ID} case=prop_any_k_of_n_suffices omitted_index={omitted}"
1783            );
1784        }
1785    }
1786
1787    #[test]
1788    fn prop_symbol_size_consistent() {
1789        for symbol_size in [512_u16, 1024_u16, 4096_u16] {
1790            for k in [4_usize, 8_usize] {
1791                let config = raptorq_config(symbol_size, 1.25);
1792                let size = usize::from(symbol_size);
1793                let object_id = AsObjectId::new_for_test(
1794                    u64::from(symbol_size) * 100 + u64::try_from(k).expect("k fits u64"),
1795                );
1796                let data = deterministic_payload(k * size - 3, u64::from(symbol_size));
1797                let (symbols, _) = encode_symbols(config, object_id, &data);
1798                assert!(
1799                    symbols.iter().all(|symbol| symbol.len() == size),
1800                    "bead_id={RAPTORQ_BEAD_ID} case=prop_symbol_size_consistent symbol_size={symbol_size} k={k}"
1801                );
1802            }
1803        }
1804    }
1805
1806    #[test]
1807    fn test_e2e_symbol_ops_in_encode_decode_roundtrip() {
1808        for (run, k) in [8_usize, 16_usize, 64_usize].iter().copied().enumerate() {
1809            let config = raptorq_config(512, 1.30);
1810            let symbol_size = usize::from(config.encoding.symbol_size);
1811            let data = deterministic_payload(
1812                k * symbol_size,
1813                0x4455_6677 + u64::try_from(run).expect("run fits u64"),
1814            );
1815            let object_id = AsObjectId::new_for_test(3000 + u64::try_from(k).expect("k fits u64"));
1816
1817            tracing::info!(
1818                bead_id = RAPTORQ_BEAD_ID,
1819                case = "test_e2e_symbol_ops_in_encode_decode_roundtrip",
1820                k,
1821                symbol_size,
1822                "starting encode/decode roundtrip"
1823            );
1824            let (symbols, source_symbols) = encode_symbols(config.clone(), object_id, &data);
1825            let (sources, _) = split_source_and_repair(&symbols, source_symbols);
1826            let subset = sources
1827                .iter()
1828                .take(source_symbols)
1829                .cloned()
1830                .collect::<Vec<_>>();
1831            let decoded = decode_symbols(config, object_id, data.len(), source_symbols, subset)
1832                .expect("e2e decode must succeed");
1833
1834            assert_eq!(
1835                decoded, data,
1836                "bead_id={RAPTORQ_BEAD_ID} case=e2e_roundtrip_bytes k={k}"
1837            );
1838
1839            let mut source_parity = vec![0_u8; symbol_size];
1840            for chunk in data.chunks_exact(symbol_size) {
1841                symbol_add_assign(&mut source_parity, chunk).expect("source parity xor");
1842            }
1843
1844            let mut decoded_parity = vec![0_u8; symbol_size];
1845            for chunk in decoded.chunks_exact(symbol_size) {
1846                symbol_add_assign(&mut decoded_parity, chunk).expect("decoded parity xor");
1847            }
1848
1849            assert_eq!(
1850                decoded_parity, source_parity,
1851                "bead_id={RAPTORQ_BEAD_ID} case=e2e_symbol_ops_parity k={k}"
1852            );
1853        }
1854    }
1855}