1pub mod attach;
8pub mod commit_marker;
9#[cfg(not(target_arch = "wasm32"))]
10pub mod commit_repair;
11pub mod compat_persist;
12pub mod connection;
13#[cfg(not(target_arch = "wasm32"))]
14pub mod db_fec;
15pub mod decode_proofs;
16pub mod ecs_replication;
17pub mod epoch;
18pub mod explain;
19pub mod inter_object_coding;
20pub mod lrc;
21pub mod native_index;
22pub mod permeation_map;
23pub mod por;
24pub mod quiescence;
25#[cfg(not(target_arch = "wasm32"))]
26pub mod raptorq_codec;
27pub mod raptorq_integration;
28pub mod region;
29pub mod remote_effects;
30#[cfg(not(target_arch = "wasm32"))]
31pub mod repair_engine;
32pub mod repair_symbols;
33pub mod replication_receiver;
34pub mod replication_sender;
35pub mod snapshot_shipping;
36pub mod source_block_partition;
37pub mod symbol_log;
38pub mod symbol_size_policy;
39pub mod tiered_storage;
40pub mod transaction;
41pub mod vacuum;
42pub mod wal_adapter;
43#[cfg(not(target_arch = "wasm32"))]
44pub mod wal_fec_adapter;
45
46use std::num::NonZeroUsize;
47use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
48
49use fsqlite_error::{FrankenError, Result};
50use fsqlite_types::{ObjectId, Oti, PayloadHash, Region, SymbolRecord, SymbolRecordFlags};
51use tracing::{debug, error};
52
53const MAX_BALANCED_BG_CPU: usize = 16;
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum OverflowPolicy {
58 DropBusy,
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum ParallelismProfile {
65 Balanced,
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
71pub struct BulkheadConfig {
72 pub max_concurrent: usize,
74 pub queue_depth: usize,
76 pub overflow_policy: OverflowPolicy,
78}
79
80impl BulkheadConfig {
81 #[must_use]
85 pub const fn new(
86 max_concurrent: usize,
87 queue_depth: usize,
88 overflow_policy: OverflowPolicy,
89 ) -> Option<Self> {
90 if max_concurrent == 0 {
91 None
92 } else {
93 Some(Self {
94 max_concurrent,
95 queue_depth,
96 overflow_policy,
97 })
98 }
99 }
100
101 #[must_use]
106 pub fn for_profile(profile: ParallelismProfile) -> Self {
107 let p = available_parallelism_or_one();
108 match profile {
109 ParallelismProfile::Balanced => Self {
110 max_concurrent: conservative_bg_cpu_max(p),
111 queue_depth: 0,
112 overflow_policy: OverflowPolicy::DropBusy,
113 },
114 }
115 }
116
117 #[must_use]
119 pub const fn admission_limit(self) -> usize {
120 self.max_concurrent.saturating_add(self.queue_depth)
121 }
122}
123
124impl Default for BulkheadConfig {
125 fn default() -> Self {
126 Self::for_profile(ParallelismProfile::Balanced)
127 }
128}
129
130#[must_use]
132pub const fn conservative_bg_cpu_max(p: usize) -> usize {
133 let base = p / 8;
134 if base == 0 {
135 1
136 } else if base > MAX_BALANCED_BG_CPU {
137 MAX_BALANCED_BG_CPU
138 } else {
139 base
140 }
141}
142
143#[must_use]
145pub fn available_parallelism_or_one() -> usize {
146 std::thread::available_parallelism().map_or(1, NonZeroUsize::get)
147}
148
149#[derive(Debug, Clone, Copy, PartialEq, Eq)]
151pub struct WideChunkLayout {
152 pub u128_chunks: usize,
154 pub u64_chunks: usize,
156 pub tail_bytes: usize,
158}
159
160impl WideChunkLayout {
161 #[must_use]
163 pub const fn for_len(len: usize) -> Self {
164 let u128_chunks = len / 16;
165 let rem_after_u128 = len % 16;
166 let u64_chunks = rem_after_u128 / 8;
167 let tail_bytes = rem_after_u128 % 8;
168 Self {
169 u128_chunks,
170 u64_chunks,
171 tail_bytes,
172 }
173 }
174}
175
176#[allow(clippy::incompatible_msrv)]
181pub fn xor_patch_wide_chunks(dst: &mut [u8], patch: &[u8]) -> Result<WideChunkLayout> {
182 if dst.len() != patch.len() {
183 return Err(FrankenError::TypeMismatch {
184 expected: format!("equal lengths (dst == patch), got {}", dst.len()),
185 actual: patch.len().to_string(),
186 });
187 }
188
189 let layout = WideChunkLayout::for_len(dst.len());
190
191 let (dst_128, dst_rem) = dst.as_chunks_mut::<16>();
192 let (patch_128, patch_rem) = patch.as_chunks::<16>();
193 for (d, p) in dst_128.iter_mut().zip(patch_128.iter()) {
194 let d_word = u128::from_ne_bytes(*d);
195 let p_word = u128::from_ne_bytes(*p);
196 *d = (d_word ^ p_word).to_ne_bytes();
197 }
198
199 let (dst_64, dst_tail) = dst_rem.as_chunks_mut::<8>();
200 let (patch_64, patch_tail) = patch_rem.as_chunks::<8>();
201 for (d, p) in dst_64.iter_mut().zip(patch_64.iter()) {
202 let d_word = u64::from_ne_bytes(*d);
203 let p_word = u64::from_ne_bytes(*p);
204 *d = (d_word ^ p_word).to_ne_bytes();
205 }
206
207 for (d, p) in dst_tail.iter_mut().zip(patch_tail.iter()) {
208 *d ^= *p;
209 }
210
211 Ok(layout)
212}
213
214pub fn gf256_add_assign_chunked(dst: &mut [u8], src: &[u8]) -> Result<WideChunkLayout> {
219 xor_patch_wide_chunks(dst, src)
220}
221
222const GF256_FIELD_SIZE: usize = 256;
223
224const fn gf256_mul_byte_for_table(mut a: u8, mut b: u8) -> u8 {
225 let mut out = 0_u8;
226 while b != 0 {
227 if (b & 1) != 0 {
228 out ^= a;
229 }
230 let carry = (a & 0x80) != 0;
231 a <<= 1;
232 if carry {
233 a ^= 0x1D;
234 }
235 b >>= 1;
236 }
237 out
238}
239
240const fn gf256_mul_row(coeff: u8) -> [u8; GF256_FIELD_SIZE] {
241 let mut row = [0_u8; GF256_FIELD_SIZE];
242 let mut byte = 0_u8;
243
244 loop {
245 row[byte as usize] = gf256_mul_byte_for_table(coeff, byte);
246 if byte == u8::MAX {
247 break;
248 }
249 byte = byte.wrapping_add(1);
250 }
251
252 row
253}
254
255#[allow(clippy::large_stack_arrays)]
257const fn gf256_mul_tables() -> [[u8; GF256_FIELD_SIZE]; GF256_FIELD_SIZE] {
258 let mut tables = [[0_u8; GF256_FIELD_SIZE]; GF256_FIELD_SIZE];
259 let mut coeff = 0_u8;
260
261 loop {
262 tables[coeff as usize] = gf256_mul_row(coeff);
263 if coeff == u8::MAX {
264 break;
265 }
266 coeff = coeff.wrapping_add(1);
267 }
268
269 tables
270}
271
272static GF256_MUL_TABLES: [[u8; GF256_FIELD_SIZE]; GF256_FIELD_SIZE] = gf256_mul_tables();
273
274#[inline]
275fn gf256_mul_table(coeff: u8) -> &'static [u8; GF256_FIELD_SIZE] {
276 &GF256_MUL_TABLES[usize::from(coeff)]
277}
278
279pub fn symbol_add_assign(dst: &mut [u8], src: &[u8]) -> Result<WideChunkLayout> {
283 debug!(
284 bead_id = "bd-1hi.2",
285 op = "symbol_add_assign",
286 symbol_len = dst.len(),
287 "applying in-place XOR over symbol bytes"
288 );
289 gf256_add_assign_chunked(dst, src)
290}
291
292pub fn symbol_mul_into(coeff: u8, src: &[u8], out: &mut [u8]) -> Result<()> {
298 if src.len() != out.len() {
299 error!(
300 bead_id = "bd-1hi.2",
301 op = "symbol_mul_into",
302 coeff,
303 src_len = src.len(),
304 out_len = out.len(),
305 "symbol length mismatch"
306 );
307 return Err(FrankenError::TypeMismatch {
308 expected: format!("equal lengths (src == out), got {}", src.len()),
309 actual: out.len().to_string(),
310 });
311 }
312
313 debug!(
314 bead_id = "bd-1hi.2",
315 op = "symbol_mul_into",
316 coeff,
317 symbol_len = src.len(),
318 "applying GF(256) scalar multiplication"
319 );
320
321 match coeff {
322 0 => {
323 out.fill(0);
324 Ok(())
325 }
326 1 => {
327 out.copy_from_slice(src);
328 Ok(())
329 }
330 _ => {
331 let table = gf256_mul_table(coeff);
332 for (dst_byte, src_byte) in out.iter_mut().zip(src.iter()) {
333 *dst_byte = table[usize::from(*src_byte)];
334 }
335 Ok(())
336 }
337 }
338}
339
340pub fn symbol_addmul_assign(dst: &mut [u8], coeff: u8, src: &[u8]) -> Result<WideChunkLayout> {
346 if dst.len() != src.len() {
347 error!(
348 bead_id = "bd-1hi.2",
349 op = "symbol_addmul_assign",
350 coeff,
351 dst_len = dst.len(),
352 src_len = src.len(),
353 "symbol length mismatch"
354 );
355 return Err(FrankenError::TypeMismatch {
356 expected: format!("equal lengths (dst == src), got {}", dst.len()),
357 actual: src.len().to_string(),
358 });
359 }
360
361 debug!(
362 bead_id = "bd-1hi.2",
363 op = "symbol_addmul_assign",
364 coeff,
365 symbol_len = dst.len(),
366 "applying fused multiply-and-add over symbol bytes"
367 );
368
369 match coeff {
370 0 => Ok(WideChunkLayout::for_len(dst.len())),
371 1 => symbol_add_assign(dst, src),
372 _ => {
373 let table = gf256_mul_table(coeff);
374 for (dst_byte, src_byte) in dst.iter_mut().zip(src.iter()) {
375 *dst_byte ^= table[usize::from(*src_byte)];
376 }
377 Ok(WideChunkLayout::for_len(dst.len()))
378 }
379 }
380}
381
382pub fn simd_friendly_checksum_pair(buffer: &[u8]) -> Result<(u64, [u8; 32])> {
387 let symbol_size = u32::try_from(buffer.len()).map_err(|_| FrankenError::OutOfRange {
388 what: "symbol_size".to_owned(),
389 value: buffer.len().to_string(),
390 })?;
391
392 let symbol_record = SymbolRecord::new(
393 ObjectId::from_bytes([0_u8; ObjectId::LEN]),
394 Oti {
395 f: u64::from(symbol_size),
396 al: 1,
397 t: symbol_size,
398 z: 1,
399 n: 1,
400 },
401 0,
402 buffer.to_vec(),
403 SymbolRecordFlags::empty(),
404 );
405 let blake = PayloadHash::blake3(buffer);
406
407 Ok((symbol_record.frame_xxh3, *blake.as_bytes()))
408}
409
410#[derive(Debug)]
412pub struct Bulkhead {
413 config: BulkheadConfig,
414 in_flight: AtomicUsize,
415 peak_in_flight: AtomicUsize,
416 busy_rejections: AtomicUsize,
417}
418
419impl Bulkhead {
420 #[must_use]
421 pub fn new(config: BulkheadConfig) -> Self {
422 Self {
423 config,
424 in_flight: AtomicUsize::new(0),
425 peak_in_flight: AtomicUsize::new(0),
426 busy_rejections: AtomicUsize::new(0),
427 }
428 }
429
430 #[must_use]
431 pub const fn config(&self) -> BulkheadConfig {
432 self.config
433 }
434
435 #[must_use]
436 pub fn in_flight(&self) -> usize {
437 self.in_flight.load(Ordering::Acquire)
438 }
439
440 #[must_use]
441 pub fn peak_in_flight(&self) -> usize {
442 self.peak_in_flight.load(Ordering::Acquire)
443 }
444
445 #[must_use]
446 pub fn busy_rejections(&self) -> usize {
447 self.busy_rejections.load(Ordering::Acquire)
448 }
449
450 pub fn try_acquire(&self) -> Result<BulkheadPermit<'_>> {
455 let limit = self.config.admission_limit();
456 loop {
457 let current = self.in_flight.load(Ordering::Acquire);
458 if current >= limit {
459 self.busy_rejections.fetch_add(1, Ordering::AcqRel);
460 return Err(match self.config.overflow_policy {
461 OverflowPolicy::DropBusy => FrankenError::Busy,
462 });
463 }
464
465 let next = current.saturating_add(1);
466 if self
467 .in_flight
468 .compare_exchange_weak(current, next, Ordering::AcqRel, Ordering::Acquire)
469 .is_ok()
470 {
471 self.peak_in_flight.fetch_max(next, Ordering::AcqRel);
472 return Ok(BulkheadPermit {
473 bulkhead: self,
474 released: false,
475 });
476 }
477 }
478 }
479
480 pub fn run<T>(&self, work: impl FnOnce() -> T) -> Result<T> {
482 let _permit = self.try_acquire()?;
483 Ok(work())
484 }
485}
486
487#[derive(Debug)]
489pub struct BulkheadPermit<'a> {
490 bulkhead: &'a Bulkhead,
491 released: bool,
492}
493
494impl BulkheadPermit<'_> {
495 pub fn release(mut self) {
497 if !self.released {
498 self.bulkhead.in_flight.fetch_sub(1, Ordering::AcqRel);
499 self.released = true;
500 }
501 }
502}
503
504impl Drop for BulkheadPermit<'_> {
505 fn drop(&mut self) {
506 if !self.released {
507 self.bulkhead.in_flight.fetch_sub(1, Ordering::AcqRel);
508 self.released = true;
509 }
510 }
511}
512
513#[derive(Debug)]
515pub struct RegionBulkhead {
516 region: Region,
517 bulkhead: Bulkhead,
518 closing: AtomicBool,
519}
520
521impl RegionBulkhead {
522 #[must_use]
523 pub fn new(region: Region, config: BulkheadConfig) -> Self {
524 Self {
525 region,
526 bulkhead: Bulkhead::new(config),
527 closing: AtomicBool::new(false),
528 }
529 }
530
531 #[must_use]
532 pub const fn region(&self) -> Region {
533 self.region
534 }
535
536 #[must_use]
537 pub fn bulkhead(&self) -> &Bulkhead {
538 &self.bulkhead
539 }
540
541 pub fn try_acquire(&self) -> Result<BulkheadPermit<'_>> {
542 if self.closing.load(Ordering::Acquire) {
543 return Err(FrankenError::Busy);
544 }
545 self.bulkhead.try_acquire()
546 }
547
548 pub fn begin_close(&self) {
550 self.closing.store(true, Ordering::Release);
551 }
552
553 #[must_use]
555 pub fn is_quiescent(&self) -> bool {
556 self.bulkhead.in_flight() == 0
557 }
558}
559
560#[cfg(test)]
561mod tests {
562 use std::collections::VecDeque;
563 use std::pin::Pin;
564 use std::sync::Arc;
565 use std::task::{Context, Poll};
566 use std::thread;
567 use std::time::{Duration, Instant};
568
569 use asupersync::raptorq::decoder::{InactivationDecoder, ReceivedSymbol};
570 use asupersync::raptorq::gf256::{Gf256, gf256_add_slice, gf256_addmul_slice, gf256_mul_slice};
571 use asupersync::raptorq::systematic::{ConstraintMatrix, SystematicEncoder};
572 use asupersync::raptorq::{RaptorQReceiverBuilder, RaptorQSenderBuilder};
573 use asupersync::security::AuthenticationTag;
574 use asupersync::security::authenticated::AuthenticatedSymbol;
575 use asupersync::transport::error::{SinkError, StreamError};
576 use asupersync::transport::sink::SymbolSink;
577 use asupersync::transport::stream::SymbolStream;
578 use asupersync::types::{ObjectId as AsObjectId, ObjectParams, Symbol};
579 use asupersync::{Cx, RaptorQConfig};
580 use fsqlite_btree::compare_key_bytes_contiguous;
581 use fsqlite_types::gf256_mul_byte;
582
583 use super::*;
584
585 const BEAD_ID: &str = "bd-22n.4";
586 const SIMD_BEAD_ID: &str = "bd-22n.6";
587 const RAPTORQ_BEAD_ID: &str = "bd-1hi.2";
588
589 #[derive(Debug)]
590 struct VecSink {
591 symbols: Vec<Symbol>,
592 }
593
594 impl VecSink {
595 fn new() -> Self {
596 Self {
597 symbols: Vec::new(),
598 }
599 }
600 }
601
602 impl SymbolSink for VecSink {
603 fn poll_send(
604 mut self: Pin<&mut Self>,
605 _cx: &mut Context<'_>,
606 symbol: AuthenticatedSymbol,
607 ) -> Poll<std::result::Result<(), SinkError>> {
608 self.symbols.push(symbol.into_symbol());
609 Poll::Ready(Ok(()))
610 }
611
612 fn poll_flush(
613 self: Pin<&mut Self>,
614 _cx: &mut Context<'_>,
615 ) -> Poll<std::result::Result<(), SinkError>> {
616 Poll::Ready(Ok(()))
617 }
618
619 fn poll_close(
620 self: Pin<&mut Self>,
621 _cx: &mut Context<'_>,
622 ) -> Poll<std::result::Result<(), SinkError>> {
623 Poll::Ready(Ok(()))
624 }
625
626 fn poll_ready(
627 self: Pin<&mut Self>,
628 _cx: &mut Context<'_>,
629 ) -> Poll<std::result::Result<(), SinkError>> {
630 Poll::Ready(Ok(()))
631 }
632 }
633
634 #[derive(Debug)]
635 struct VecStream {
636 q: VecDeque<AuthenticatedSymbol>,
637 }
638
639 impl VecStream {
640 fn new(symbols: Vec<Symbol>) -> Self {
641 let q = symbols
642 .into_iter()
643 .map(|symbol| AuthenticatedSymbol::from_parts(symbol, AuthenticationTag::zero()))
644 .collect();
645 Self { q }
646 }
647 }
648
649 impl SymbolStream for VecStream {
650 fn poll_next(
651 mut self: Pin<&mut Self>,
652 _cx: &mut Context<'_>,
653 ) -> Poll<Option<std::result::Result<AuthenticatedSymbol, StreamError>>> {
654 match self.q.pop_front() {
655 Some(symbol) => Poll::Ready(Some(Ok(symbol))),
656 None => Poll::Ready(None),
657 }
658 }
659
660 fn size_hint(&self) -> (usize, Option<usize>) {
661 (self.q.len(), Some(self.q.len()))
662 }
663
664 fn is_exhausted(&self) -> bool {
665 self.q.is_empty()
666 }
667 }
668
669 fn raptorq_config(symbol_size: u16, repair_overhead: f64) -> RaptorQConfig {
670 let mut config = RaptorQConfig::default();
671 config.encoding.symbol_size = symbol_size;
672 config.encoding.max_block_size = 64 * 1024;
673 config.encoding.repair_overhead = repair_overhead;
674 config
675 }
676
677 fn deterministic_payload(len: usize, seed: u64) -> Vec<u8> {
678 let mut state = seed ^ 0x9E37_79B9_7F4A_7C15;
679 let mut out = Vec::with_capacity(len);
680 for idx in 0..len {
681 state ^= state << 7;
682 state ^= state >> 9;
683 state = state.wrapping_mul(0xA24B_AED4_963E_E407);
684 let idx_byte = u8::try_from(idx % 251).expect("modulo fits in u8");
685 out.push(u8::try_from(state & 0xFF).expect("masked to u8") ^ idx_byte);
686 }
687 out
688 }
689
690 fn xor_patch_bytewise(dst: &mut [u8], patch: &[u8]) {
691 for (dst_byte, patch_byte) in dst.iter_mut().zip(patch.iter()) {
692 *dst_byte ^= *patch_byte;
693 }
694 }
695
696 fn gf256_mul_bytewise(coeff: u8, src: &[u8], out: &mut [u8]) {
697 for (dst_byte, src_byte) in out.iter_mut().zip(src.iter()) {
698 *dst_byte = gf256_mul_byte(coeff, *src_byte);
699 }
700 }
701
702 fn collect_rs_files(root: &std::path::Path, out: &mut Vec<std::path::PathBuf>) {
703 let entries = std::fs::read_dir(root).expect("read_dir should succeed");
704 for entry in entries {
705 let path = entry.expect("read_dir entry should be readable").path();
706 if path.is_dir() {
707 collect_rs_files(&path, out);
708 } else if path.extension().is_some_and(|ext| ext == "rs") {
709 out.push(path);
710 }
711 }
712 }
713
714 fn encode_symbols(
715 config: RaptorQConfig,
716 object_id: AsObjectId,
717 data: &[u8],
718 ) -> (Vec<Symbol>, usize) {
719 let cx = Cx::for_testing();
720 let mut sender = RaptorQSenderBuilder::new()
721 .config(config)
722 .transport(VecSink::new())
723 .build()
724 .expect("sender build");
725 let outcome = sender
726 .send_object(&cx, object_id, data)
727 .expect("send_object must succeed");
728 let symbols = std::mem::take(&mut sender.transport_mut().symbols);
729 tracing::debug!(
730 bead_id = RAPTORQ_BEAD_ID,
731 case = "encode_symbols",
732 source_symbols = outcome.source_symbols,
733 emitted_symbols = symbols.len(),
734 object_size = data.len(),
735 "encoded object into source+repair symbol stream"
736 );
737 (symbols, outcome.source_symbols)
738 }
739
740 #[allow(clippy::result_large_err)]
741 fn decode_symbols(
742 config: RaptorQConfig,
743 object_id: AsObjectId,
744 object_size: usize,
745 source_symbols: usize,
746 symbols: Vec<Symbol>,
747 ) -> std::result::Result<Vec<u8>, asupersync::Error> {
748 let cx = Cx::for_testing();
749 let params = ObjectParams::new(
750 object_id,
751 u64::try_from(object_size).expect("object size fits u64"),
752 config.encoding.symbol_size,
753 1,
754 u16::try_from(source_symbols).expect("source symbol count fits u16"),
755 );
756 let mut receiver = RaptorQReceiverBuilder::new()
757 .config(config)
758 .source(VecStream::new(symbols))
759 .build()
760 .expect("receiver build");
761
762 receiver
763 .receive_object(&cx, ¶ms)
764 .map(|outcome| outcome.data)
765 }
766
767 fn split_source_and_repair(
768 symbols: &[Symbol],
769 source_symbols: usize,
770 ) -> (Vec<Symbol>, Vec<Symbol>) {
771 let source_symbols_u32 =
772 u32::try_from(source_symbols).expect("source symbol count fits u32");
773 let mut sources = Vec::new();
774 let mut repairs = Vec::new();
775 for symbol in symbols {
776 if symbol.esi() < source_symbols_u32 {
777 sources.push(symbol.clone());
778 } else {
779 repairs.push(symbol.clone());
780 }
781 }
782 (sources, repairs)
783 }
784
785 fn low_level_source_block(k: usize, symbol_size: usize, seed: u64) -> Vec<Vec<u8>> {
786 (0..k)
787 .map(|source_index| {
788 deterministic_payload(
789 symbol_size,
790 seed + u64::try_from(source_index).expect("source index fits u64"),
791 )
792 })
793 .collect()
794 }
795
796 fn append_source_received_symbols(
797 received: &mut Vec<ReceivedSymbol>,
798 constraints: &ConstraintMatrix,
799 base_rows: usize,
800 k_prime: usize,
801 symbol_size: usize,
802 source: &[Vec<u8>],
803 source_indexes: &[usize],
804 ) {
805 for &source_index in source_indexes {
806 received.push(ReceivedSymbol::source(
807 u32::try_from(source_index).expect("source index fits u32"),
808 source[source_index].clone(),
809 ));
810 }
811
812 for source_index in source.len()..k_prime {
816 let row = base_rows + source_index;
817 let mut columns = Vec::new();
818 let mut coefficients = Vec::new();
819 for col in 0..constraints.cols {
820 let coeff = constraints.get(row, col);
821 if !coeff.is_zero() {
822 columns.push(col);
823 coefficients.push(coeff);
824 }
825 }
826
827 received.push(ReceivedSymbol {
828 esi: u32::try_from(source_index).expect("source index fits u32"),
829 is_source: false,
830 columns,
831 coefficients,
832 data: vec![0_u8; symbol_size],
833 });
834 }
835 }
836
837 #[test]
838 fn test_parallelism_defaults_conservative() {
839 assert_eq!(
840 conservative_bg_cpu_max(16),
841 2,
842 "bead_id={BEAD_ID} case=balanced_profile_formula_p16"
843 );
844 assert_eq!(
845 conservative_bg_cpu_max(1),
846 1,
847 "bead_id={BEAD_ID} case=balanced_profile_min_floor"
848 );
849 assert_eq!(
850 conservative_bg_cpu_max(512),
851 16,
852 "bead_id={BEAD_ID} case=balanced_profile_max_cap"
853 );
854 }
855
856 #[test]
857 fn test_parallelism_bounded_by_available() {
858 let cfg = BulkheadConfig::default();
859 let p = available_parallelism_or_one();
860 assert!(
861 cfg.max_concurrent <= p,
862 "bead_id={BEAD_ID} case=default_exceeds_available_parallelism cfg={cfg:?} p={p}"
863 );
864
865 let bulkhead = Bulkhead::new(cfg);
866 let mut permits = Vec::new();
867 for _ in 0..cfg.admission_limit() {
868 permits.push(
869 bulkhead
870 .try_acquire()
871 .expect("admission under configured limit should succeed"),
872 );
873 }
874
875 let overflow = bulkhead.try_acquire();
876 assert!(
877 matches!(overflow, Err(FrankenError::Busy)),
878 "bead_id={BEAD_ID} case=bounded_admission_overflow_must_be_busy overflow={overflow:?}"
879 );
880
881 drop(permits);
882 assert_eq!(
883 bulkhead.in_flight(),
884 0,
885 "bead_id={BEAD_ID} case=permits_drop_to_zero"
886 );
887 }
888
889 #[test]
890 fn test_bulkhead_config_max_concurrent() {
891 let cfg = BulkheadConfig::new(3, 0, OverflowPolicy::DropBusy)
892 .expect("non-zero max_concurrent must be valid");
893 let bulkhead = Bulkhead::new(cfg);
894
895 let p1 = bulkhead.try_acquire().expect("slot 1");
896 let p2 = bulkhead.try_acquire().expect("slot 2");
897 let p3 = bulkhead.try_acquire().expect("slot 3");
898 let overflow = bulkhead.try_acquire();
899
900 assert!(
901 matches!(overflow, Err(FrankenError::Busy)),
902 "bead_id={BEAD_ID} case=max_concurrent_enforced overflow={overflow:?}"
903 );
904 drop((p1, p2, p3));
905 }
906
907 #[test]
908 fn test_overflow_policy_drop_with_busy() {
909 let cfg = BulkheadConfig::new(1, 0, OverflowPolicy::DropBusy)
910 .expect("non-zero max_concurrent must be valid");
911 let bulkhead = Bulkhead::new(cfg);
912 let _permit = bulkhead.try_acquire().expect("first permit must succeed");
913
914 let overflow = bulkhead.try_acquire();
915 assert!(
916 matches!(overflow, Err(FrankenError::Busy)),
917 "bead_id={BEAD_ID} case=overflow_policy_drop_busy overflow={overflow:?}"
918 );
919 }
920
921 #[test]
922 fn test_background_work_degrades_gracefully() {
923 let cfg = BulkheadConfig::new(2, 0, OverflowPolicy::DropBusy)
924 .expect("non-zero max_concurrent must be valid");
925 let bulkhead = Bulkhead::new(cfg);
926
927 let _a = bulkhead.try_acquire().expect("permit a");
928 let _b = bulkhead.try_acquire().expect("permit b");
929
930 for _ in 0..8 {
931 let result = bulkhead.try_acquire();
932 assert!(
933 matches!(result, Err(FrankenError::Busy)),
934 "bead_id={BEAD_ID} case=overflow_must_reject_not_wait result={result:?}"
935 );
936 }
937
938 assert_eq!(
939 bulkhead.busy_rejections(),
940 8,
941 "bead_id={BEAD_ID} case=busy_rejection_counter"
942 );
943 }
944
945 #[test]
946 fn test_region_integration() {
947 let cfg = BulkheadConfig::new(1, 0, OverflowPolicy::DropBusy)
948 .expect("non-zero max_concurrent must be valid");
949 let region_bulkhead = RegionBulkhead::new(Region::new(7), cfg);
950 assert_eq!(
951 region_bulkhead.region().get(),
952 7,
953 "bead_id={BEAD_ID} case=region_id_plumbed"
954 );
955
956 let permit = region_bulkhead.try_acquire().expect("first permit");
957 assert!(
958 !region_bulkhead.is_quiescent(),
959 "bead_id={BEAD_ID} case=region_non_quiescent_with_active_work"
960 );
961
962 region_bulkhead.begin_close();
963 let after_close = region_bulkhead.try_acquire();
964 assert!(
965 matches!(after_close, Err(FrankenError::Busy)),
966 "bead_id={BEAD_ID} case=region_close_blocks_new_work result={after_close:?}"
967 );
968
969 drop(permit);
970 assert!(
971 region_bulkhead.is_quiescent(),
972 "bead_id={BEAD_ID} case=region_quiescent_after_permit_drop"
973 );
974 }
975
976 #[test]
977 fn test_gf256_ops_chunked() {
978 let mut dst = vec![0xAA_u8; 40];
979 let src = vec![0x55_u8; 40];
980 let expected: Vec<u8> = dst.iter().zip(src.iter()).map(|(d, s)| *d ^ *s).collect();
981
982 let layout = gf256_add_assign_chunked(&mut dst, &src)
983 .expect("equal-length buffers should be accepted");
984 assert!(
985 layout.u128_chunks > 0 || layout.u64_chunks > 0,
986 "bead_id={SIMD_BEAD_ID} case=wide_chunks_expected layout={layout:?}"
987 );
988 assert_eq!(
989 dst, expected,
990 "bead_id={SIMD_BEAD_ID} case=gf256_addition_xor_equivalence"
991 );
992 }
993
994 #[test]
995 fn test_xor_patch_wide_chunks() {
996 let mut dst = vec![0xF0_u8; 37];
997 let patch = vec![0x0F_u8; 37];
998 let expected: Vec<u8> = dst.iter().zip(patch.iter()).map(|(d, p)| *d ^ *p).collect();
999
1000 let layout =
1001 xor_patch_wide_chunks(&mut dst, &patch).expect("equal-length buffers should be valid");
1002 assert_eq!(
1003 layout,
1004 WideChunkLayout {
1005 u128_chunks: 2,
1006 u64_chunks: 0,
1007 tail_bytes: 5,
1008 },
1009 "bead_id={SIMD_BEAD_ID} case=chunk_layout_expected"
1010 );
1011 assert_eq!(
1012 dst, expected,
1013 "bead_id={SIMD_BEAD_ID} case=xor_patch_matches_scalar_reference"
1014 );
1015 }
1016
1017 #[test]
1018 fn test_xor_symbols_u64_chunks() {
1019 let mut dst = vec![0xAB_u8; 24];
1021 let src = vec![0xCD_u8; 24];
1022 let mut expected = vec![0xAB_u8; 24];
1023 xor_patch_bytewise(&mut expected, &src);
1024
1025 let layout = xor_patch_wide_chunks(&mut dst, &src).expect("equal-length buffers");
1026 assert_eq!(
1027 layout,
1028 WideChunkLayout {
1029 u128_chunks: 1,
1030 u64_chunks: 1,
1031 tail_bytes: 0,
1032 },
1033 "bead_id=bd-2ddc case=u64_chunk_lane_exercised"
1034 );
1035 assert_eq!(
1036 dst, expected,
1037 "bead_id=bd-2ddc case=chunked_xor_matches_bytewise_reference"
1038 );
1039 }
1040
1041 #[test]
1042 fn test_gf256_multiply_chunks() {
1043 let coeff = 0xA7_u8;
1044 let src = deterministic_payload(4096, 0xDDCC_BBAA_1122_3344);
1045 let mut chunked = vec![0_u8; src.len()];
1046 let mut scalar = vec![0_u8; src.len()];
1047
1048 symbol_mul_into(coeff, &src, &mut chunked).expect("chunked symbol_mul_into");
1049 gf256_mul_bytewise(coeff, &src, &mut scalar);
1050
1051 assert_eq!(
1052 chunked, scalar,
1053 "bead_id=bd-2ddc case=chunked_mul_matches_scalar_reference"
1054 );
1055 }
1056
1057 #[test]
1058 fn test_u128_chunk_alignment() {
1059 let mut via_wide = deterministic_payload(4099, 0x1234_5678_9ABC_DEF0);
1061 let mut via_u64_only = via_wide.clone();
1062 let patch = deterministic_payload(4099, 0x0F0E_0D0C_0B0A_0908);
1063
1064 xor_patch_wide_chunks(&mut via_wide, &patch).expect("wide chunk xor");
1065
1066 let mut dst_u64_chunks = via_u64_only.chunks_exact_mut(8);
1067 let mut patch_u64_chunks = patch.chunks_exact(8);
1068 for (dst_chunk, patch_chunk) in dst_u64_chunks.by_ref().zip(patch_u64_chunks.by_ref()) {
1069 let dst_word = u64::from_ne_bytes(
1070 dst_chunk
1071 .try_into()
1072 .expect("chunks_exact(8) must yield 8-byte chunk"),
1073 );
1074 let patch_word = u64::from_ne_bytes(
1075 patch_chunk
1076 .try_into()
1077 .expect("chunks_exact(8) must yield 8-byte chunk"),
1078 );
1079 dst_chunk.copy_from_slice(&(dst_word ^ patch_word).to_ne_bytes());
1080 }
1081 for (dst_byte, patch_byte) in dst_u64_chunks
1082 .into_remainder()
1083 .iter_mut()
1084 .zip(patch_u64_chunks.remainder().iter())
1085 {
1086 *dst_byte ^= *patch_byte;
1087 }
1088
1089 assert_eq!(
1090 via_wide, via_u64_only,
1091 "bead_id=bd-2ddc case=u128_lane_matches_u64_plus_tail"
1092 );
1093 }
1094
1095 #[test]
1096 fn test_benchmark_chunk_vs_byte() {
1097 if cfg!(debug_assertions) {
1099 return;
1100 }
1101
1102 let iterations = 32_000_usize;
1103 let src = deterministic_payload(4096, 0xDEAD_BEEF_F00D_CAFE);
1104 let base = deterministic_payload(4096, 0x0123_4567_89AB_CDEF);
1105
1106 let mut chunked = base.clone();
1107 let chunked_start = Instant::now();
1108 for _ in 0..iterations {
1109 xor_patch_wide_chunks(&mut chunked, &src).expect("chunked xor");
1110 std::hint::black_box(&chunked);
1111 }
1112 let chunked_elapsed = chunked_start.elapsed();
1113
1114 let mut bytewise = base;
1115 let bytewise_start = Instant::now();
1116 for _ in 0..iterations {
1117 xor_patch_bytewise(&mut bytewise, &src);
1118 std::hint::black_box(&bytewise);
1119 }
1120 let bytewise_elapsed = bytewise_start.elapsed();
1121
1122 let speedup = bytewise_elapsed.as_secs_f64() / chunked_elapsed.as_secs_f64();
1123 assert!(
1124 speedup >= 4.0,
1125 "bead_id=bd-2ddc case=chunk_vs_byte_speedup speedup={speedup:.2}x \
1126 chunked_ns={} bytewise_ns={} iterations={iterations}",
1127 chunked_elapsed.as_nanos(),
1128 bytewise_elapsed.as_nanos()
1129 );
1130 }
1131
1132 #[test]
1133 fn test_no_unsafe_simd() {
1134 let manifest = include_str!("../../../Cargo.toml");
1135 assert!(
1136 manifest.contains(r#"unsafe_code = "forbid""#),
1137 "bead_id=bd-2ddc case=workspace_forbids_unsafe"
1138 );
1139
1140 let workspace_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
1141 .parent()
1142 .expect("crate dir has parent")
1143 .parent()
1144 .expect("workspace root exists")
1145 .to_path_buf();
1146 let crates_dir = workspace_root.join("crates");
1147
1148 let mut rs_files = Vec::new();
1149 collect_rs_files(&crates_dir, &mut rs_files);
1150
1151 let simd_needles = [
1152 "_mm_",
1153 "std::arch::",
1154 "core::arch::",
1155 "__m128",
1156 "__m256",
1157 "__m512",
1158 "simd_shuffle",
1159 "vpxor",
1160 "vxorq",
1161 ];
1162
1163 let mut offenders = Vec::new();
1164 for file in rs_files {
1165 let Ok(content) = std::fs::read_to_string(&file) else {
1166 continue;
1167 };
1168
1169 let lines = content.lines().collect::<Vec<_>>();
1170 for (idx, line) in lines.iter().enumerate() {
1171 let has_intrinsic = simd_needles.iter().any(|needle| line.contains(needle));
1172 if !has_intrinsic {
1173 continue;
1174 }
1175
1176 let window_start = idx.saturating_sub(3);
1177 let window_end = (idx + 3).min(lines.len().saturating_sub(1));
1178 let mut found_unsafe_nearby = false;
1179 for nearby in &lines[window_start..=window_end] {
1180 let trimmed = nearby.trim_start();
1181 let is_comment = trimmed.starts_with("//");
1182 if !is_comment && trimmed.contains("unsafe") {
1183 found_unsafe_nearby = true;
1184 break;
1185 }
1186 }
1187
1188 if found_unsafe_nearby {
1189 offenders.push(format!("{}:{}", file.display(), idx + 1));
1190 }
1191 }
1192 }
1193
1194 assert!(
1195 offenders.is_empty(),
1196 "bead_id=bd-2ddc case=no_unsafe_simd_intrinsics offenders={offenders:?}"
1197 );
1198 }
1199
1200 #[test]
1201 fn test_checksum_simd_friendly() {
1202 let buffer = vec![0x11_u8; 256];
1203 let (xx_a, blake_a) =
1204 simd_friendly_checksum_pair(&buffer).expect("checksum pair must succeed");
1205
1206 let mut modified = buffer;
1207 modified[255] ^= 0x01;
1208 let (xx_b, blake_b) =
1209 simd_friendly_checksum_pair(&modified).expect("checksum pair must succeed");
1210
1211 assert_ne!(
1212 xx_a, xx_b,
1213 "bead_id={SIMD_BEAD_ID} case=xxhash3_changes_on_byte_flip"
1214 );
1215 assert_ne!(
1216 blake_a, blake_b,
1217 "bead_id={SIMD_BEAD_ID} case=blake3_changes_on_byte_flip"
1218 );
1219 }
1220
1221 #[test]
1222 fn test_e2e_bounded_parallelism_under_background_load() {
1223 let cfg = BulkheadConfig::new(4, 0, OverflowPolicy::DropBusy)
1224 .expect("non-zero max_concurrent must be valid");
1225 let bulkhead = Arc::new(Bulkhead::new(cfg));
1226
1227 let handles: Vec<_> = (0..48)
1228 .map(|_| {
1229 let bulkhead = Arc::clone(&bulkhead);
1230 thread::spawn(move || {
1231 bulkhead.run(|| {
1232 thread::sleep(Duration::from_millis(10));
1233 })
1234 })
1235 })
1236 .collect();
1237
1238 let mut busy = 0_usize;
1239 for handle in handles {
1240 match handle.join().expect("worker thread should not panic") {
1241 Ok(()) => {}
1242 Err(FrankenError::Busy) => busy = busy.saturating_add(1),
1243 Err(err) => {
1244 assert_eq!(
1245 err.error_code(),
1246 fsqlite_error::ErrorCode::Busy,
1247 "bead_id={BEAD_ID} case=e2e_unexpected_bulkhead_error err={err}"
1248 );
1249 busy = busy.saturating_add(1);
1250 }
1251 }
1252 }
1253
1254 assert!(
1255 busy > 0,
1256 "bead_id={BEAD_ID} case=e2e_should_observe_overflow_rejections"
1257 );
1258 assert!(
1259 bulkhead.peak_in_flight() <= cfg.admission_limit(),
1260 "bead_id={BEAD_ID} case=e2e_peak_parallelism_exceeded peak={} limit={}",
1261 bulkhead.peak_in_flight(),
1262 cfg.admission_limit()
1263 );
1264 }
1265
1266 #[test]
1267 fn test_e2e_simd_hot_path_correctness() {
1268 let contiguous = b"key-0001key-0002".to_vec();
1270 let left = &contiguous[0..8];
1271 let right = &contiguous[8..16];
1272 let compare_start = Instant::now();
1273 assert_eq!(
1274 compare_key_bytes_contiguous(left, right),
1275 left.cmp(right),
1276 "bead_id={SIMD_BEAD_ID} case=btree_contiguous_compare_correct"
1277 );
1278 let compare_elapsed = compare_start.elapsed();
1279
1280 let mut symbol_a = (0_u8..64).collect::<Vec<u8>>();
1282 let symbol_b = (64_u8..128).collect::<Vec<u8>>();
1283 let expected_add: Vec<u8> = symbol_a
1284 .iter()
1285 .zip(symbol_b.iter())
1286 .map(|(a, b)| *a ^ *b)
1287 .collect();
1288 let gf256_start = Instant::now();
1289 gf256_add_assign_chunked(&mut symbol_a, &symbol_b).expect("gf256 add should succeed");
1290 let gf256_elapsed = gf256_start.elapsed();
1291 assert_eq!(
1292 symbol_a, expected_add,
1293 "bead_id={SIMD_BEAD_ID} case=gf256_chunked_add_correct"
1294 );
1295
1296 let mut patch_target = vec![0x33_u8; 64];
1297 let patch = vec![0xCC_u8; 64];
1298 let xor_start = Instant::now();
1299 xor_patch_wide_chunks(&mut patch_target, &patch).expect("xor patch should succeed");
1300 let xor_elapsed = xor_start.elapsed();
1301 assert!(
1302 patch_target.iter().all(|&byte| byte == (0x33_u8 ^ 0xCC_u8)),
1303 "bead_id={SIMD_BEAD_ID} case=xor_patch_chunked_correct"
1304 );
1305
1306 let checksum_start = Instant::now();
1308 let (xx, blake) =
1309 simd_friendly_checksum_pair(&patch_target).expect("checksum pair should succeed");
1310 let checksum_elapsed = checksum_start.elapsed();
1311 assert_ne!(
1312 xx, 0,
1313 "bead_id={SIMD_BEAD_ID} case=xxhash3_nonzero_for_nonempty_payload"
1314 );
1315 assert!(
1316 blake.iter().any(|&b| b != 0),
1317 "bead_id={SIMD_BEAD_ID} case=blake3_digest_nonzero"
1318 );
1319
1320 eprintln!(
1321 "bead_id={SIMD_BEAD_ID} metric=simd_hot_path_ns compare={} gf256_add={} xor_patch={} checksum={}",
1322 compare_elapsed.as_nanos(),
1323 gf256_elapsed.as_nanos(),
1324 xor_elapsed.as_nanos(),
1325 checksum_elapsed.as_nanos()
1326 );
1327 }
1328
1329 #[test]
1330 fn test_symbol_add_self_inverse() {
1331 let src = (0_u16..512)
1332 .map(|idx| u8::try_from(idx % 251).expect("modulo fits in u8"))
1333 .collect::<Vec<_>>();
1334 let mut dst = src.clone();
1335 symbol_add_assign(&mut dst, &src).expect("symbol_add should succeed");
1336 assert!(
1337 dst.iter().all(|byte| *byte == 0),
1338 "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_self_inverse"
1339 );
1340 }
1341
1342 #[test]
1343 fn test_symbol_add_commutative_and_associative() {
1344 let a = (0_u16..128)
1345 .map(|idx| u8::try_from((idx * 3) % 251).expect("modulo fits"))
1346 .collect::<Vec<_>>();
1347 let b = (0_u16..128)
1348 .map(|idx| u8::try_from((idx * 5 + 7) % 251).expect("modulo fits"))
1349 .collect::<Vec<_>>();
1350 let c = (0_u16..128)
1351 .map(|idx| u8::try_from((idx * 11 + 13) % 251).expect("modulo fits"))
1352 .collect::<Vec<_>>();
1353
1354 let mut ab = a.clone();
1355 symbol_add_assign(&mut ab, &b).expect("a+b");
1356 let mut ba = b.clone();
1357 symbol_add_assign(&mut ba, &a).expect("b+a");
1358 assert_eq!(
1359 ab, ba,
1360 "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_commutative"
1361 );
1362
1363 let mut lhs = a.clone();
1364 symbol_add_assign(&mut lhs, &b).expect("(a+b)");
1365 symbol_add_assign(&mut lhs, &c).expect("(a+b)+c");
1366
1367 let mut rhs = b;
1368 symbol_add_assign(&mut rhs, &c).expect("(b+c)");
1369 let mut rhs2 = a;
1370 symbol_add_assign(&mut rhs2, &rhs).expect("a+(b+c)");
1371
1372 assert_eq!(
1373 lhs, rhs2,
1374 "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_associative"
1375 );
1376 }
1377
1378 #[test]
1379 fn test_symbol_mul_special_cases() {
1380 let src = (0_u16..256)
1381 .map(|idx| u8::try_from(idx).expect("idx fits"))
1382 .collect::<Vec<_>>();
1383
1384 let mut out_zero = vec![0_u8; src.len()];
1385 symbol_mul_into(0, &src, &mut out_zero).expect("mul by zero");
1386 assert!(
1387 out_zero.iter().all(|byte| *byte == 0),
1388 "bead_id={RAPTORQ_BEAD_ID} case=symbol_mul_zero"
1389 );
1390
1391 let mut out_one = vec![0_u8; src.len()];
1392 symbol_mul_into(1, &src, &mut out_one).expect("mul by one");
1393 assert_eq!(
1394 out_one, src,
1395 "bead_id={RAPTORQ_BEAD_ID} case=symbol_mul_identity"
1396 );
1397 }
1398
1399 #[test]
1400 fn test_symbol_mul_matches_scalar_reference() {
1401 let src = (0_u16..512)
1402 .map(|idx| u8::try_from((idx * 7 + 17) % 251).expect("modulo fits"))
1403 .collect::<Vec<_>>();
1404 let coeff = 0xA7_u8;
1405
1406 let mut out = vec![0_u8; src.len()];
1407 symbol_mul_into(coeff, &src, &mut out).expect("symbol mul");
1408 for (actual, input) in out.iter().zip(src.iter()) {
1409 let expected = gf256_mul_byte(coeff, *input);
1410 assert_eq!(
1411 *actual, expected,
1412 "bead_id={RAPTORQ_BEAD_ID} case=symbol_mul_scalar_match"
1413 );
1414 }
1415 }
1416
1417 #[test]
1418 fn test_symbol_addmul_special_cases_and_equivalence() {
1419 let src = (0_u16..512)
1420 .map(|idx| u8::try_from((idx * 13 + 19) % 251).expect("modulo fits"))
1421 .collect::<Vec<_>>();
1422 let original = (0_u16..512)
1423 .map(|idx| u8::try_from((idx * 9 + 3) % 251).expect("modulo fits"))
1424 .collect::<Vec<_>>();
1425
1426 let mut no_op = original.clone();
1427 symbol_addmul_assign(&mut no_op, 0, &src).expect("c=0");
1428 assert_eq!(
1429 no_op, original,
1430 "bead_id={RAPTORQ_BEAD_ID} case=symbol_addmul_c0_noop"
1431 );
1432
1433 let mut xor_path = original.clone();
1434 symbol_addmul_assign(&mut xor_path, 1, &src).expect("c=1");
1435 let mut expected_xor = original.clone();
1436 symbol_add_assign(&mut expected_xor, &src).expect("xor reference");
1437 assert_eq!(
1438 xor_path, expected_xor,
1439 "bead_id={RAPTORQ_BEAD_ID} case=symbol_addmul_c1_equals_xor"
1440 );
1441
1442 let coeff = 0x53_u8;
1443 let mut fused = original.clone();
1444 symbol_addmul_assign(&mut fused, coeff, &src).expect("fused");
1445 let mut mul = vec![0_u8; src.len()];
1446 symbol_mul_into(coeff, &src, &mut mul).expect("mul");
1447 let mut separate = original;
1448 symbol_add_assign(&mut separate, &mul).expect("add");
1449 assert_eq!(
1450 fused, separate,
1451 "bead_id={RAPTORQ_BEAD_ID} case=symbol_addmul_fused_equals_mul_plus_add"
1452 );
1453 }
1454
1455 #[test]
1456 fn test_symbol_operations_4096_and_512() {
1457 for symbol_len in [4096_usize, 1024_usize, 512_usize] {
1458 let a = vec![0xAA_u8; symbol_len];
1459 let b = vec![0x55_u8; symbol_len];
1460 let mut sum = a.clone();
1461 let layout = symbol_add_assign(&mut sum, &b).expect("symbol add");
1462 assert_eq!(
1463 layout,
1464 WideChunkLayout::for_len(symbol_len),
1465 "bead_id={RAPTORQ_BEAD_ID} case=symbol_len_layout_consistency len={symbol_len}"
1466 );
1467 assert!(
1468 sum.iter().all(|byte| *byte == (0xAA_u8 ^ 0x55_u8)),
1469 "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_expected_xor len={symbol_len}"
1470 );
1471 }
1472 }
1473
1474 #[test]
1475 fn test_gf256_arithmetic_matches_asupersync() {
1476 for a in 0_u8..=u8::MAX {
1477 for b in 0_u8..=u8::MAX {
1478 assert_eq!(
1479 gf256_mul_byte(a, b),
1480 (Gf256(a) * Gf256(b)).raw(),
1481 "bead_id={RAPTORQ_BEAD_ID} case=gf256_mul_parity a=0x{a:02X} b=0x{b:02X}"
1482 );
1483 }
1484 }
1485 }
1486
1487 #[test]
1488 fn test_symbol_ops_match_asupersync_gf256_slices() {
1489 for symbol_len in [512_usize, 1024_usize, 4096_usize] {
1490 let src = deterministic_payload(symbol_len, 0xA5A5_0101);
1491 let dst = deterministic_payload(symbol_len, 0x5A5A_0202);
1492
1493 let mut ours_add = dst.clone();
1494 symbol_add_assign(&mut ours_add, &src).expect("symbol add");
1495 let mut as_add = dst.clone();
1496 gf256_add_slice(&mut as_add, &src);
1497 assert_eq!(
1498 ours_add, as_add,
1499 "bead_id={RAPTORQ_BEAD_ID} case=asupersync_parity_add len={symbol_len}"
1500 );
1501
1502 for coeff in [0_u8, 1_u8, 0x53_u8, 0xA7_u8] {
1503 let mut ours_mul = vec![0_u8; symbol_len];
1504 symbol_mul_into(coeff, &src, &mut ours_mul).expect("symbol mul");
1505 let mut as_mul = src.clone();
1506 gf256_mul_slice(&mut as_mul, Gf256(coeff));
1507 assert_eq!(
1508 ours_mul, as_mul,
1509 "bead_id={RAPTORQ_BEAD_ID} case=asupersync_parity_mul len={symbol_len} coeff=0x{coeff:02X}"
1510 );
1511
1512 let mut ours_addmul = dst.clone();
1513 symbol_addmul_assign(&mut ours_addmul, coeff, &src).expect("symbol addmul");
1514 let mut as_addmul = dst.clone();
1515 gf256_addmul_slice(&mut as_addmul, &src, Gf256(coeff));
1516 assert_eq!(
1517 ours_addmul, as_addmul,
1518 "bead_id={RAPTORQ_BEAD_ID} case=asupersync_parity_addmul len={symbol_len} coeff=0x{coeff:02X}"
1519 );
1520 }
1521 }
1522 }
1523
1524 #[test]
1525 fn test_encode_single_source_block() {
1526 let config = raptorq_config(512, 1.25);
1527 let symbol_size = usize::from(config.encoding.symbol_size);
1528 let k = 8_usize;
1529 let data = deterministic_payload(k * symbol_size, 0x0102_0304);
1530 let object_id = AsObjectId::new_for_test(1201);
1531 tracing::info!(
1532 bead_id = RAPTORQ_BEAD_ID,
1533 case = "test_encode_single_source_block",
1534 symbol_size,
1535 requested_k = k,
1536 "encoding source block"
1537 );
1538 let (symbols, source_symbols) = encode_symbols(config, object_id, &data);
1539 let (sources, repairs) = split_source_and_repair(&symbols, source_symbols);
1540
1541 assert_eq!(
1542 source_symbols, k,
1543 "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_source_count"
1544 );
1545 assert_eq!(
1546 sources.len(),
1547 source_symbols,
1548 "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_source_partition"
1549 );
1550 assert!(
1551 !repairs.is_empty(),
1552 "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_repair_present"
1553 );
1554 assert!(
1555 symbols.iter().all(|symbol| symbol.len() == symbol_size),
1556 "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_symbol_size_consistent"
1557 );
1558 }
1559
1560 #[test]
1561 fn test_decode_exact_k_symbols() {
1562 let symbol_size = 512_usize;
1563 let k = 16_usize;
1564 let seed = 0x0BAD_CAFE_u64;
1565 let source = low_level_source_block(k, symbol_size, seed);
1566 let encoder =
1567 SystematicEncoder::new(&source, symbol_size, seed).expect("systematic encoder");
1568 let decoder = InactivationDecoder::new(k, symbol_size, seed);
1569 let params = decoder.params();
1570 let base_rows = params.s + params.h;
1571 let constraints = ConstraintMatrix::build(params, seed);
1572 let mut received = decoder.constraint_symbols();
1573 let source_indexes = (0..k).collect::<Vec<_>>();
1574 append_source_received_symbols(
1575 &mut received,
1576 &constraints,
1577 base_rows,
1578 params.k_prime,
1579 symbol_size,
1580 &source,
1581 &source_indexes,
1582 );
1583
1584 tracing::warn!(
1585 bead_id = RAPTORQ_BEAD_ID,
1586 case = "test_decode_exact_k_symbols",
1587 source_symbols = k,
1588 "decoding with minimum symbol count (fragile recovery threshold)"
1589 );
1590 let decode_outcome = decoder
1591 .decode(&received)
1592 .expect("decode exact-k must succeed");
1593 assert_eq!(
1594 decode_outcome.source, source,
1595 "bead_id={RAPTORQ_BEAD_ID} case=decode_exact_k_symbols_roundtrip"
1596 );
1597 assert_eq!(
1598 decode_outcome.intermediate[0].len(),
1599 symbol_size,
1600 "bead_id={RAPTORQ_BEAD_ID} case=decode_exact_k_symbol_size"
1601 );
1602 assert_eq!(
1603 encoder.intermediate_symbol(0),
1604 decode_outcome.intermediate[0],
1605 "bead_id={RAPTORQ_BEAD_ID} case=decode_exact_k_intermediate_consistency"
1606 );
1607 }
1608
1609 #[test]
1610 fn test_decode_with_repair_symbols() {
1611 let symbol_size = 512_usize;
1612 let k = 16_usize;
1613 let seed = 0xABC0_FED1_u64;
1614 let source = low_level_source_block(k, symbol_size, seed);
1615 let encoder =
1616 SystematicEncoder::new(&source, symbol_size, seed).expect("systematic encoder");
1617 let decoder = InactivationDecoder::new(k, symbol_size, seed);
1618 let params = decoder.params();
1619 let base_rows = params.s + params.h;
1620 let constraints = ConstraintMatrix::build(params, seed);
1621
1622 let mut received = decoder.constraint_symbols();
1623 let source_indexes = (1..k).collect::<Vec<_>>();
1624 append_source_received_symbols(
1625 &mut received,
1626 &constraints,
1627 base_rows,
1628 params.k_prime,
1629 symbol_size,
1630 &source,
1631 &source_indexes,
1632 );
1633
1634 let repair_esi = u32::try_from(k).expect("k fits u32");
1635 let (columns, coefficients) = decoder.repair_equation_rfc6330(repair_esi);
1636 let repair_data = encoder.repair_symbol(repair_esi);
1637 received.push(ReceivedSymbol::repair(
1638 repair_esi,
1639 columns,
1640 coefficients,
1641 repair_data,
1642 ));
1643
1644 let decode_outcome = decoder
1645 .decode(&received)
1646 .expect("decode with one repair must succeed");
1647 assert_eq!(
1648 decode_outcome.source, source,
1649 "bead_id={RAPTORQ_BEAD_ID} case=decode_with_repair_roundtrip"
1650 );
1651 }
1652
1653 #[test]
1654 fn test_decode_insufficient_symbols() {
1655 let symbol_size = 512_usize;
1656 let k = 8_usize;
1657 let seed = 0xDEAD_BEEF_u64;
1658 let source = low_level_source_block(k, symbol_size, seed);
1659 let decoder = InactivationDecoder::new(k, symbol_size, seed);
1660 let params = decoder.params();
1661 let base_rows = params.s + params.h;
1662 let constraints = ConstraintMatrix::build(params, seed);
1663 let mut received = decoder.constraint_symbols();
1664 let source_indexes = (0..k.saturating_sub(1)).collect::<Vec<_>>();
1665 append_source_received_symbols(
1666 &mut received,
1667 &constraints,
1668 base_rows,
1669 params.k_prime,
1670 symbol_size,
1671 &source,
1672 &source_indexes,
1673 );
1674
1675 let decode = decoder.decode(&received);
1676 assert!(
1677 decode.is_err(),
1678 "bead_id={RAPTORQ_BEAD_ID} case=decode_insufficient_symbols unexpectedly succeeded"
1679 );
1680 if let Err(err) = decode {
1681 tracing::error!(
1682 bead_id = RAPTORQ_BEAD_ID,
1683 case = "test_decode_insufficient_symbols",
1684 error = ?err,
1685 "decode failed as expected due to insufficient symbols"
1686 );
1687 }
1688 }
1689
1690 #[test]
1691 fn test_symbol_size_alignment() {
1692 let config = raptorq_config(4096, 1.20);
1693 let symbol_size = usize::from(config.encoding.symbol_size);
1694 let data = deterministic_payload(symbol_size * 3, 0x600D_1111);
1695 let object_id = AsObjectId::new_for_test(1205);
1696 let (symbols, source_symbols) = encode_symbols(config, object_id, &data);
1697
1698 assert_eq!(
1699 source_symbols, 3,
1700 "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_source_count"
1701 );
1702 assert!(
1703 symbol_size.is_power_of_two(),
1704 "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_power_of_two"
1705 );
1706 assert_eq!(
1707 symbol_size % 512,
1708 0,
1709 "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_sector_multiple"
1710 );
1711 assert!(
1712 symbols.iter().all(|symbol| symbol.len() == symbol_size),
1713 "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_symbol_lengths"
1714 );
1715 }
1716
1717 #[test]
1718 fn prop_encode_decode_roundtrip() {
1719 for seed in [11_u64, 29_u64, 43_u64, 71_u64] {
1720 for k in [8_usize, 16_usize] {
1721 let config = raptorq_config(512, 1.30);
1722 let symbol_size = usize::from(config.encoding.symbol_size);
1723 let data = deterministic_payload(k * symbol_size - 17, seed);
1724 let object_id = AsObjectId::new_for_test(2000 + seed);
1725 let (symbols, source_symbols) = encode_symbols(config.clone(), object_id, &data);
1726 let (sources, _) = split_source_and_repair(&symbols, source_symbols);
1727 let subset = sources
1728 .iter()
1729 .take(source_symbols)
1730 .cloned()
1731 .collect::<Vec<_>>();
1732 let decoded = decode_symbols(config, object_id, data.len(), source_symbols, subset)
1733 .expect("property roundtrip decode");
1734 assert_eq!(
1735 decoded, data,
1736 "bead_id={RAPTORQ_BEAD_ID} case=prop_encode_decode_roundtrip seed={seed} k={k}"
1737 );
1738 }
1739 }
1740 }
1741
1742 #[test]
1743 fn prop_any_k_of_n_suffices() {
1744 let symbol_size = 512_usize;
1745 let k = 16_usize;
1746 let sources = (0..k)
1747 .map(|symbol_idx| {
1748 deterministic_payload(
1749 symbol_size,
1750 0x5000_0000 + u64::try_from(symbol_idx).expect("index fits u64"),
1751 )
1752 })
1753 .collect::<Vec<_>>();
1754
1755 let mut parity = vec![0_u8; symbol_size];
1756 for source in &sources {
1757 symbol_add_assign(&mut parity, source).expect("parity construction");
1758 }
1759
1760 for omitted in 0..=k {
1761 let rebuilt = if omitted == k {
1762 sources.clone()
1763 } else {
1764 let mut recovered = parity.clone();
1765 for (index, source) in sources.iter().enumerate() {
1766 if index != omitted {
1767 symbol_add_assign(&mut recovered, source).expect("single-erasure recovery");
1768 }
1769 }
1770 let mut rebuilt = sources.clone();
1771 rebuilt[omitted] = recovered;
1772 rebuilt
1773 };
1774
1775 assert_eq!(
1776 rebuilt.len(),
1777 k,
1778 "bead_id={RAPTORQ_BEAD_ID} case=prop_any_k_of_n_subset_size omitted_index={omitted}"
1779 );
1780 assert_eq!(
1781 rebuilt, sources,
1782 "bead_id={RAPTORQ_BEAD_ID} case=prop_any_k_of_n_suffices omitted_index={omitted}"
1783 );
1784 }
1785 }
1786
1787 #[test]
1788 fn prop_symbol_size_consistent() {
1789 for symbol_size in [512_u16, 1024_u16, 4096_u16] {
1790 for k in [4_usize, 8_usize] {
1791 let config = raptorq_config(symbol_size, 1.25);
1792 let size = usize::from(symbol_size);
1793 let object_id = AsObjectId::new_for_test(
1794 u64::from(symbol_size) * 100 + u64::try_from(k).expect("k fits u64"),
1795 );
1796 let data = deterministic_payload(k * size - 3, u64::from(symbol_size));
1797 let (symbols, _) = encode_symbols(config, object_id, &data);
1798 assert!(
1799 symbols.iter().all(|symbol| symbol.len() == size),
1800 "bead_id={RAPTORQ_BEAD_ID} case=prop_symbol_size_consistent symbol_size={symbol_size} k={k}"
1801 );
1802 }
1803 }
1804 }
1805
1806 #[test]
1807 fn test_e2e_symbol_ops_in_encode_decode_roundtrip() {
1808 for (run, k) in [8_usize, 16_usize, 64_usize].iter().copied().enumerate() {
1809 let config = raptorq_config(512, 1.30);
1810 let symbol_size = usize::from(config.encoding.symbol_size);
1811 let data = deterministic_payload(
1812 k * symbol_size,
1813 0x4455_6677 + u64::try_from(run).expect("run fits u64"),
1814 );
1815 let object_id = AsObjectId::new_for_test(3000 + u64::try_from(k).expect("k fits u64"));
1816
1817 tracing::info!(
1818 bead_id = RAPTORQ_BEAD_ID,
1819 case = "test_e2e_symbol_ops_in_encode_decode_roundtrip",
1820 k,
1821 symbol_size,
1822 "starting encode/decode roundtrip"
1823 );
1824 let (symbols, source_symbols) = encode_symbols(config.clone(), object_id, &data);
1825 let (sources, _) = split_source_and_repair(&symbols, source_symbols);
1826 let subset = sources
1827 .iter()
1828 .take(source_symbols)
1829 .cloned()
1830 .collect::<Vec<_>>();
1831 let decoded = decode_symbols(config, object_id, data.len(), source_symbols, subset)
1832 .expect("e2e decode must succeed");
1833
1834 assert_eq!(
1835 decoded, data,
1836 "bead_id={RAPTORQ_BEAD_ID} case=e2e_roundtrip_bytes k={k}"
1837 );
1838
1839 let mut source_parity = vec![0_u8; symbol_size];
1840 for chunk in data.chunks_exact(symbol_size) {
1841 symbol_add_assign(&mut source_parity, chunk).expect("source parity xor");
1842 }
1843
1844 let mut decoded_parity = vec![0_u8; symbol_size];
1845 for chunk in decoded.chunks_exact(symbol_size) {
1846 symbol_add_assign(&mut decoded_parity, chunk).expect("decoded parity xor");
1847 }
1848
1849 assert_eq!(
1850 decoded_parity, source_parity,
1851 "bead_id={RAPTORQ_BEAD_ID} case=e2e_symbol_ops_parity k={k}"
1852 );
1853 }
1854 }
1855}