1pub mod attach;
8pub mod commit_marker;
9#[cfg(not(target_arch = "wasm32"))]
10pub mod commit_repair;
11pub mod compat_persist;
12pub mod connection;
13#[cfg(not(target_arch = "wasm32"))]
14pub mod db_fec;
15pub mod decode_proofs;
16pub mod ecs_replication;
17pub mod epoch;
18pub mod explain;
19pub mod inter_object_coding;
20pub mod lrc;
21pub mod native_index;
22pub mod permeation_map;
23pub mod por;
24#[cfg(not(target_arch = "wasm32"))]
25pub mod raptorq_codec;
26pub mod raptorq_integration;
27pub mod region;
28pub mod remote_effects;
29#[cfg(not(target_arch = "wasm32"))]
30pub mod repair_engine;
31pub mod repair_symbols;
32pub mod replication_receiver;
33pub mod replication_sender;
34pub mod snapshot_shipping;
35pub mod source_block_partition;
36pub mod symbol_log;
37pub mod symbol_size_policy;
38pub mod tiered_storage;
39pub mod transaction;
40pub mod wal_adapter;
41#[cfg(not(target_arch = "wasm32"))]
42pub mod wal_fec_adapter;
43
44use std::num::NonZeroUsize;
45use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
46
47use fsqlite_error::{FrankenError, Result};
48use fsqlite_types::{
49 ObjectId, Oti, PayloadHash, Region, SymbolRecord, SymbolRecordFlags, gf256_mul_byte,
50};
51use tracing::{debug, error};
52
53const MAX_BALANCED_BG_CPU: usize = 16;
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub enum OverflowPolicy {
58 DropBusy,
60}
61
62#[derive(Debug, Clone, Copy, PartialEq, Eq)]
64pub enum ParallelismProfile {
65 Balanced,
67}
68
69#[derive(Debug, Clone, Copy, PartialEq, Eq)]
71pub struct BulkheadConfig {
72 pub max_concurrent: usize,
74 pub queue_depth: usize,
76 pub overflow_policy: OverflowPolicy,
78}
79
80impl BulkheadConfig {
81 #[must_use]
85 pub const fn new(
86 max_concurrent: usize,
87 queue_depth: usize,
88 overflow_policy: OverflowPolicy,
89 ) -> Option<Self> {
90 if max_concurrent == 0 {
91 None
92 } else {
93 Some(Self {
94 max_concurrent,
95 queue_depth,
96 overflow_policy,
97 })
98 }
99 }
100
101 #[must_use]
106 pub fn for_profile(profile: ParallelismProfile) -> Self {
107 let p = available_parallelism_or_one();
108 match profile {
109 ParallelismProfile::Balanced => Self {
110 max_concurrent: conservative_bg_cpu_max(p),
111 queue_depth: 0,
112 overflow_policy: OverflowPolicy::DropBusy,
113 },
114 }
115 }
116
117 #[must_use]
119 pub const fn admission_limit(self) -> usize {
120 self.max_concurrent.saturating_add(self.queue_depth)
121 }
122}
123
124impl Default for BulkheadConfig {
125 fn default() -> Self {
126 Self::for_profile(ParallelismProfile::Balanced)
127 }
128}
129
130#[must_use]
132pub const fn conservative_bg_cpu_max(p: usize) -> usize {
133 let base = p / 8;
134 if base == 0 {
135 1
136 } else if base > MAX_BALANCED_BG_CPU {
137 MAX_BALANCED_BG_CPU
138 } else {
139 base
140 }
141}
142
143#[must_use]
145pub fn available_parallelism_or_one() -> usize {
146 std::thread::available_parallelism().map_or(1, NonZeroUsize::get)
147}
148
149#[derive(Debug, Clone, Copy, PartialEq, Eq)]
151pub struct WideChunkLayout {
152 pub u128_chunks: usize,
154 pub u64_chunks: usize,
156 pub tail_bytes: usize,
158}
159
160impl WideChunkLayout {
161 #[must_use]
163 pub const fn for_len(len: usize) -> Self {
164 let u128_chunks = len / 16;
165 let rem_after_u128 = len % 16;
166 let u64_chunks = rem_after_u128 / 8;
167 let tail_bytes = rem_after_u128 % 8;
168 Self {
169 u128_chunks,
170 u64_chunks,
171 tail_bytes,
172 }
173 }
174}
175
176#[allow(clippy::incompatible_msrv)]
181pub fn xor_patch_wide_chunks(dst: &mut [u8], patch: &[u8]) -> Result<WideChunkLayout> {
182 if dst.len() != patch.len() {
183 return Err(FrankenError::TypeMismatch {
184 expected: format!("equal lengths (dst == patch), got {}", dst.len()),
185 actual: patch.len().to_string(),
186 });
187 }
188
189 let layout = WideChunkLayout::for_len(dst.len());
190
191 let (dst_128, dst_rem) = dst.as_chunks_mut::<16>();
192 let (patch_128, patch_rem) = patch.as_chunks::<16>();
193 for (d, p) in dst_128.iter_mut().zip(patch_128.iter()) {
194 let d_word = u128::from_ne_bytes(*d);
195 let p_word = u128::from_ne_bytes(*p);
196 *d = (d_word ^ p_word).to_ne_bytes();
197 }
198
199 let (dst_64, dst_tail) = dst_rem.as_chunks_mut::<8>();
200 let (patch_64, patch_tail) = patch_rem.as_chunks::<8>();
201 for (d, p) in dst_64.iter_mut().zip(patch_64.iter()) {
202 let d_word = u64::from_ne_bytes(*d);
203 let p_word = u64::from_ne_bytes(*p);
204 *d = (d_word ^ p_word).to_ne_bytes();
205 }
206
207 for (d, p) in dst_tail.iter_mut().zip(patch_tail.iter()) {
208 *d ^= *p;
209 }
210
211 Ok(layout)
212}
213
214pub fn gf256_add_assign_chunked(dst: &mut [u8], src: &[u8]) -> Result<WideChunkLayout> {
219 xor_patch_wide_chunks(dst, src)
220}
221
222pub fn symbol_add_assign(dst: &mut [u8], src: &[u8]) -> Result<WideChunkLayout> {
226 debug!(
227 bead_id = "bd-1hi.2",
228 op = "symbol_add_assign",
229 symbol_len = dst.len(),
230 "applying in-place XOR over symbol bytes"
231 );
232 gf256_add_assign_chunked(dst, src)
233}
234
235pub fn symbol_mul_into(coeff: u8, src: &[u8], out: &mut [u8]) -> Result<()> {
241 if src.len() != out.len() {
242 error!(
243 bead_id = "bd-1hi.2",
244 op = "symbol_mul_into",
245 coeff,
246 src_len = src.len(),
247 out_len = out.len(),
248 "symbol length mismatch"
249 );
250 return Err(FrankenError::TypeMismatch {
251 expected: format!("equal lengths (src == out), got {}", src.len()),
252 actual: out.len().to_string(),
253 });
254 }
255
256 debug!(
257 bead_id = "bd-1hi.2",
258 op = "symbol_mul_into",
259 coeff,
260 symbol_len = src.len(),
261 "applying GF(256) scalar multiplication"
262 );
263
264 match coeff {
265 0 => {
266 out.fill(0);
267 Ok(())
268 }
269 1 => {
270 out.copy_from_slice(src);
271 Ok(())
272 }
273 _ => {
274 let mut out_chunks = out.chunks_exact_mut(16);
275 let mut src_chunks = src.chunks_exact(16);
276
277 for (dst_chunk, src_chunk) in out_chunks.by_ref().zip(src_chunks.by_ref()) {
278 for (dst_byte, src_byte) in dst_chunk.iter_mut().zip(src_chunk.iter()) {
279 *dst_byte = gf256_mul_byte(coeff, *src_byte);
280 }
281 }
282 for (dst_byte, src_byte) in out_chunks
283 .into_remainder()
284 .iter_mut()
285 .zip(src_chunks.remainder().iter())
286 {
287 *dst_byte = gf256_mul_byte(coeff, *src_byte);
288 }
289 Ok(())
290 }
291 }
292}
293
294pub fn symbol_addmul_assign(dst: &mut [u8], coeff: u8, src: &[u8]) -> Result<WideChunkLayout> {
300 if dst.len() != src.len() {
301 error!(
302 bead_id = "bd-1hi.2",
303 op = "symbol_addmul_assign",
304 coeff,
305 dst_len = dst.len(),
306 src_len = src.len(),
307 "symbol length mismatch"
308 );
309 return Err(FrankenError::TypeMismatch {
310 expected: format!("equal lengths (dst == src), got {}", dst.len()),
311 actual: src.len().to_string(),
312 });
313 }
314
315 debug!(
316 bead_id = "bd-1hi.2",
317 op = "symbol_addmul_assign",
318 coeff,
319 symbol_len = dst.len(),
320 "applying fused multiply-and-add over symbol bytes"
321 );
322
323 match coeff {
324 0 => Ok(WideChunkLayout::for_len(dst.len())),
325 1 => symbol_add_assign(dst, src),
326 _ => {
327 let mut dst_chunks = dst.chunks_exact_mut(16);
328 let mut src_chunks = src.chunks_exact(16);
329
330 for (dst_chunk, src_chunk) in dst_chunks.by_ref().zip(src_chunks.by_ref()) {
331 for (dst_byte, src_byte) in dst_chunk.iter_mut().zip(src_chunk.iter()) {
332 *dst_byte ^= gf256_mul_byte(coeff, *src_byte);
333 }
334 }
335 for (dst_byte, src_byte) in dst_chunks
336 .into_remainder()
337 .iter_mut()
338 .zip(src_chunks.remainder().iter())
339 {
340 *dst_byte ^= gf256_mul_byte(coeff, *src_byte);
341 }
342 Ok(WideChunkLayout::for_len(dst.len()))
343 }
344 }
345}
346
347pub fn simd_friendly_checksum_pair(buffer: &[u8]) -> Result<(u64, [u8; 32])> {
352 let symbol_size = u32::try_from(buffer.len()).map_err(|_| FrankenError::OutOfRange {
353 what: "symbol_size".to_owned(),
354 value: buffer.len().to_string(),
355 })?;
356
357 let symbol_record = SymbolRecord::new(
358 ObjectId::from_bytes([0_u8; ObjectId::LEN]),
359 Oti {
360 f: u64::from(symbol_size),
361 al: 1,
362 t: symbol_size,
363 z: 1,
364 n: 1,
365 },
366 0,
367 buffer.to_vec(),
368 SymbolRecordFlags::empty(),
369 );
370 let blake = PayloadHash::blake3(buffer);
371
372 Ok((symbol_record.frame_xxh3, *blake.as_bytes()))
373}
374
375#[derive(Debug)]
377pub struct Bulkhead {
378 config: BulkheadConfig,
379 in_flight: AtomicUsize,
380 peak_in_flight: AtomicUsize,
381 busy_rejections: AtomicUsize,
382}
383
384impl Bulkhead {
385 #[must_use]
386 pub fn new(config: BulkheadConfig) -> Self {
387 Self {
388 config,
389 in_flight: AtomicUsize::new(0),
390 peak_in_flight: AtomicUsize::new(0),
391 busy_rejections: AtomicUsize::new(0),
392 }
393 }
394
395 #[must_use]
396 pub const fn config(&self) -> BulkheadConfig {
397 self.config
398 }
399
400 #[must_use]
401 pub fn in_flight(&self) -> usize {
402 self.in_flight.load(Ordering::Acquire)
403 }
404
405 #[must_use]
406 pub fn peak_in_flight(&self) -> usize {
407 self.peak_in_flight.load(Ordering::Acquire)
408 }
409
410 #[must_use]
411 pub fn busy_rejections(&self) -> usize {
412 self.busy_rejections.load(Ordering::Acquire)
413 }
414
415 pub fn try_acquire(&self) -> Result<BulkheadPermit<'_>> {
420 let limit = self.config.admission_limit();
421 loop {
422 let current = self.in_flight.load(Ordering::Acquire);
423 if current >= limit {
424 self.busy_rejections.fetch_add(1, Ordering::AcqRel);
425 return Err(match self.config.overflow_policy {
426 OverflowPolicy::DropBusy => FrankenError::Busy,
427 });
428 }
429
430 let next = current.saturating_add(1);
431 if self
432 .in_flight
433 .compare_exchange_weak(current, next, Ordering::AcqRel, Ordering::Acquire)
434 .is_ok()
435 {
436 self.peak_in_flight.fetch_max(next, Ordering::AcqRel);
437 return Ok(BulkheadPermit {
438 bulkhead: self,
439 released: false,
440 });
441 }
442 }
443 }
444
445 pub fn run<T>(&self, work: impl FnOnce() -> T) -> Result<T> {
447 let _permit = self.try_acquire()?;
448 Ok(work())
449 }
450}
451
452#[derive(Debug)]
454pub struct BulkheadPermit<'a> {
455 bulkhead: &'a Bulkhead,
456 released: bool,
457}
458
459impl BulkheadPermit<'_> {
460 pub fn release(mut self) {
462 if !self.released {
463 self.bulkhead.in_flight.fetch_sub(1, Ordering::AcqRel);
464 self.released = true;
465 }
466 }
467}
468
469impl Drop for BulkheadPermit<'_> {
470 fn drop(&mut self) {
471 if !self.released {
472 self.bulkhead.in_flight.fetch_sub(1, Ordering::AcqRel);
473 self.released = true;
474 }
475 }
476}
477
478#[derive(Debug)]
480pub struct RegionBulkhead {
481 region: Region,
482 bulkhead: Bulkhead,
483 closing: AtomicBool,
484}
485
486impl RegionBulkhead {
487 #[must_use]
488 pub fn new(region: Region, config: BulkheadConfig) -> Self {
489 Self {
490 region,
491 bulkhead: Bulkhead::new(config),
492 closing: AtomicBool::new(false),
493 }
494 }
495
496 #[must_use]
497 pub const fn region(&self) -> Region {
498 self.region
499 }
500
501 #[must_use]
502 pub fn bulkhead(&self) -> &Bulkhead {
503 &self.bulkhead
504 }
505
506 pub fn try_acquire(&self) -> Result<BulkheadPermit<'_>> {
507 if self.closing.load(Ordering::Acquire) {
508 return Err(FrankenError::Busy);
509 }
510 self.bulkhead.try_acquire()
511 }
512
513 pub fn begin_close(&self) {
515 self.closing.store(true, Ordering::Release);
516 }
517
518 #[must_use]
520 pub fn is_quiescent(&self) -> bool {
521 self.bulkhead.in_flight() == 0
522 }
523}
524
525#[cfg(test)]
526mod tests {
527 use std::collections::VecDeque;
528 use std::pin::Pin;
529 use std::sync::Arc;
530 use std::task::{Context, Poll};
531 use std::thread;
532 use std::time::{Duration, Instant};
533
534 use asupersync::raptorq::decoder::{InactivationDecoder, ReceivedSymbol};
535 use asupersync::raptorq::gf256::{Gf256, gf256_add_slice, gf256_addmul_slice, gf256_mul_slice};
536 use asupersync::raptorq::systematic::{ConstraintMatrix, SystematicEncoder};
537 use asupersync::raptorq::{RaptorQReceiverBuilder, RaptorQSenderBuilder};
538 use asupersync::security::AuthenticationTag;
539 use asupersync::security::authenticated::AuthenticatedSymbol;
540 use asupersync::transport::error::{SinkError, StreamError};
541 use asupersync::transport::sink::SymbolSink;
542 use asupersync::transport::stream::SymbolStream;
543 use asupersync::types::{ObjectId as AsObjectId, ObjectParams, Symbol};
544 use asupersync::{Cx, RaptorQConfig};
545 use fsqlite_btree::compare_key_bytes_contiguous;
546
547 use super::*;
548
549 const BEAD_ID: &str = "bd-22n.4";
550 const SIMD_BEAD_ID: &str = "bd-22n.6";
551 const RAPTORQ_BEAD_ID: &str = "bd-1hi.2";
552
553 #[derive(Debug)]
554 struct VecSink {
555 symbols: Vec<Symbol>,
556 }
557
558 impl VecSink {
559 fn new() -> Self {
560 Self {
561 symbols: Vec::new(),
562 }
563 }
564 }
565
566 impl SymbolSink for VecSink {
567 fn poll_send(
568 mut self: Pin<&mut Self>,
569 _cx: &mut Context<'_>,
570 symbol: AuthenticatedSymbol,
571 ) -> Poll<std::result::Result<(), SinkError>> {
572 self.symbols.push(symbol.into_symbol());
573 Poll::Ready(Ok(()))
574 }
575
576 fn poll_flush(
577 self: Pin<&mut Self>,
578 _cx: &mut Context<'_>,
579 ) -> Poll<std::result::Result<(), SinkError>> {
580 Poll::Ready(Ok(()))
581 }
582
583 fn poll_close(
584 self: Pin<&mut Self>,
585 _cx: &mut Context<'_>,
586 ) -> Poll<std::result::Result<(), SinkError>> {
587 Poll::Ready(Ok(()))
588 }
589
590 fn poll_ready(
591 self: Pin<&mut Self>,
592 _cx: &mut Context<'_>,
593 ) -> Poll<std::result::Result<(), SinkError>> {
594 Poll::Ready(Ok(()))
595 }
596 }
597
598 #[derive(Debug)]
599 struct VecStream {
600 q: VecDeque<AuthenticatedSymbol>,
601 }
602
603 impl VecStream {
604 fn new(symbols: Vec<Symbol>) -> Self {
605 let q = symbols
606 .into_iter()
607 .map(|symbol| AuthenticatedSymbol::new_verified(symbol, AuthenticationTag::zero()))
608 .collect();
609 Self { q }
610 }
611 }
612
613 impl SymbolStream for VecStream {
614 fn poll_next(
615 mut self: Pin<&mut Self>,
616 _cx: &mut Context<'_>,
617 ) -> Poll<Option<std::result::Result<AuthenticatedSymbol, StreamError>>> {
618 match self.q.pop_front() {
619 Some(symbol) => Poll::Ready(Some(Ok(symbol))),
620 None => Poll::Ready(None),
621 }
622 }
623
624 fn size_hint(&self) -> (usize, Option<usize>) {
625 (self.q.len(), Some(self.q.len()))
626 }
627
628 fn is_exhausted(&self) -> bool {
629 self.q.is_empty()
630 }
631 }
632
633 fn raptorq_config(symbol_size: u16, repair_overhead: f64) -> RaptorQConfig {
634 let mut config = RaptorQConfig::default();
635 config.encoding.symbol_size = symbol_size;
636 config.encoding.max_block_size = 64 * 1024;
637 config.encoding.repair_overhead = repair_overhead;
638 config
639 }
640
641 fn deterministic_payload(len: usize, seed: u64) -> Vec<u8> {
642 let mut state = seed ^ 0x9E37_79B9_7F4A_7C15;
643 let mut out = Vec::with_capacity(len);
644 for idx in 0..len {
645 state ^= state << 7;
646 state ^= state >> 9;
647 state = state.wrapping_mul(0xA24B_AED4_963E_E407);
648 let idx_byte = u8::try_from(idx % 251).expect("modulo fits in u8");
649 out.push(u8::try_from(state & 0xFF).expect("masked to u8") ^ idx_byte);
650 }
651 out
652 }
653
654 fn xor_patch_bytewise(dst: &mut [u8], patch: &[u8]) {
655 for (dst_byte, patch_byte) in dst.iter_mut().zip(patch.iter()) {
656 *dst_byte ^= *patch_byte;
657 }
658 }
659
660 fn gf256_mul_bytewise(coeff: u8, src: &[u8], out: &mut [u8]) {
661 for (dst_byte, src_byte) in out.iter_mut().zip(src.iter()) {
662 *dst_byte = gf256_mul_byte(coeff, *src_byte);
663 }
664 }
665
666 fn collect_rs_files(root: &std::path::Path, out: &mut Vec<std::path::PathBuf>) {
667 let entries = std::fs::read_dir(root).expect("read_dir should succeed");
668 for entry in entries {
669 let path = entry.expect("read_dir entry should be readable").path();
670 if path.is_dir() {
671 collect_rs_files(&path, out);
672 } else if path.extension().is_some_and(|ext| ext == "rs") {
673 out.push(path);
674 }
675 }
676 }
677
678 fn encode_symbols(
679 config: RaptorQConfig,
680 object_id: AsObjectId,
681 data: &[u8],
682 ) -> (Vec<Symbol>, usize) {
683 let cx = Cx::for_testing();
684 let mut sender = RaptorQSenderBuilder::new()
685 .config(config)
686 .transport(VecSink::new())
687 .build()
688 .expect("sender build");
689 let outcome = sender
690 .send_object(&cx, object_id, data)
691 .expect("send_object must succeed");
692 let symbols = std::mem::take(&mut sender.transport_mut().symbols);
693 tracing::debug!(
694 bead_id = RAPTORQ_BEAD_ID,
695 case = "encode_symbols",
696 source_symbols = outcome.source_symbols,
697 emitted_symbols = symbols.len(),
698 object_size = data.len(),
699 "encoded object into source+repair symbol stream"
700 );
701 (symbols, outcome.source_symbols)
702 }
703
704 #[allow(clippy::result_large_err)]
705 fn decode_symbols(
706 config: RaptorQConfig,
707 object_id: AsObjectId,
708 object_size: usize,
709 source_symbols: usize,
710 symbols: Vec<Symbol>,
711 ) -> std::result::Result<Vec<u8>, asupersync::Error> {
712 let cx = Cx::for_testing();
713 let params = ObjectParams::new(
714 object_id,
715 u64::try_from(object_size).expect("object size fits u64"),
716 config.encoding.symbol_size,
717 1,
718 u16::try_from(source_symbols).expect("source symbol count fits u16"),
719 );
720 let mut receiver = RaptorQReceiverBuilder::new()
721 .config(config)
722 .source(VecStream::new(symbols))
723 .build()
724 .expect("receiver build");
725
726 receiver
727 .receive_object(&cx, ¶ms)
728 .map(|outcome| outcome.data)
729 }
730
731 fn split_source_and_repair(
732 symbols: &[Symbol],
733 source_symbols: usize,
734 ) -> (Vec<Symbol>, Vec<Symbol>) {
735 let source_symbols_u32 =
736 u32::try_from(source_symbols).expect("source symbol count fits u32");
737 let mut sources = Vec::new();
738 let mut repairs = Vec::new();
739 for symbol in symbols {
740 if symbol.esi() < source_symbols_u32 {
741 sources.push(symbol.clone());
742 } else {
743 repairs.push(symbol.clone());
744 }
745 }
746 (sources, repairs)
747 }
748
749 fn low_level_source_block(k: usize, symbol_size: usize, seed: u64) -> Vec<Vec<u8>> {
750 (0..k)
751 .map(|source_index| {
752 deterministic_payload(
753 symbol_size,
754 seed + u64::try_from(source_index).expect("source index fits u64"),
755 )
756 })
757 .collect()
758 }
759
760 fn append_source_received_symbols(
761 received: &mut Vec<ReceivedSymbol>,
762 constraints: &ConstraintMatrix,
763 base_rows: usize,
764 k_prime: usize,
765 symbol_size: usize,
766 source: &[Vec<u8>],
767 source_indexes: &[usize],
768 ) {
769 for &source_index in source_indexes {
770 let row = base_rows + source_index;
771 let mut columns = Vec::new();
772 let mut coefficients = Vec::new();
773 for col in 0..constraints.cols {
774 let coeff = constraints.get(row, col);
775 if !coeff.is_zero() {
776 columns.push(col);
777 coefficients.push(coeff);
778 }
779 }
780
781 received.push(ReceivedSymbol {
782 esi: u32::try_from(source_index).expect("source index fits u32"),
783 is_source: true,
784 columns,
785 coefficients,
786 data: source[source_index].clone(),
787 });
788 }
789
790 for source_index in source.len()..k_prime {
793 let row = base_rows + source_index;
794 let mut columns = Vec::new();
795 let mut coefficients = Vec::new();
796 for col in 0..constraints.cols {
797 let coeff = constraints.get(row, col);
798 if !coeff.is_zero() {
799 columns.push(col);
800 coefficients.push(coeff);
801 }
802 }
803
804 received.push(ReceivedSymbol {
805 esi: u32::try_from(source_index).expect("source index fits u32"),
806 is_source: true,
807 columns,
808 coefficients,
809 data: vec![0_u8; symbol_size],
810 });
811 }
812 }
813
814 #[test]
815 fn test_parallelism_defaults_conservative() {
816 assert_eq!(
817 conservative_bg_cpu_max(16),
818 2,
819 "bead_id={BEAD_ID} case=balanced_profile_formula_p16"
820 );
821 assert_eq!(
822 conservative_bg_cpu_max(1),
823 1,
824 "bead_id={BEAD_ID} case=balanced_profile_min_floor"
825 );
826 assert_eq!(
827 conservative_bg_cpu_max(512),
828 16,
829 "bead_id={BEAD_ID} case=balanced_profile_max_cap"
830 );
831 }
832
833 #[test]
834 fn test_parallelism_bounded_by_available() {
835 let cfg = BulkheadConfig::default();
836 let p = available_parallelism_or_one();
837 assert!(
838 cfg.max_concurrent <= p,
839 "bead_id={BEAD_ID} case=default_exceeds_available_parallelism cfg={cfg:?} p={p}"
840 );
841
842 let bulkhead = Bulkhead::new(cfg);
843 let mut permits = Vec::new();
844 for _ in 0..cfg.admission_limit() {
845 permits.push(
846 bulkhead
847 .try_acquire()
848 .expect("admission under configured limit should succeed"),
849 );
850 }
851
852 let overflow = bulkhead.try_acquire();
853 assert!(
854 matches!(overflow, Err(FrankenError::Busy)),
855 "bead_id={BEAD_ID} case=bounded_admission_overflow_must_be_busy overflow={overflow:?}"
856 );
857
858 drop(permits);
859 assert_eq!(
860 bulkhead.in_flight(),
861 0,
862 "bead_id={BEAD_ID} case=permits_drop_to_zero"
863 );
864 }
865
866 #[test]
867 fn test_bulkhead_config_max_concurrent() {
868 let cfg = BulkheadConfig::new(3, 0, OverflowPolicy::DropBusy)
869 .expect("non-zero max_concurrent must be valid");
870 let bulkhead = Bulkhead::new(cfg);
871
872 let p1 = bulkhead.try_acquire().expect("slot 1");
873 let p2 = bulkhead.try_acquire().expect("slot 2");
874 let p3 = bulkhead.try_acquire().expect("slot 3");
875 let overflow = bulkhead.try_acquire();
876
877 assert!(
878 matches!(overflow, Err(FrankenError::Busy)),
879 "bead_id={BEAD_ID} case=max_concurrent_enforced overflow={overflow:?}"
880 );
881 drop((p1, p2, p3));
882 }
883
884 #[test]
885 fn test_overflow_policy_drop_with_busy() {
886 let cfg = BulkheadConfig::new(1, 0, OverflowPolicy::DropBusy)
887 .expect("non-zero max_concurrent must be valid");
888 let bulkhead = Bulkhead::new(cfg);
889 let _permit = bulkhead.try_acquire().expect("first permit must succeed");
890
891 let overflow = bulkhead.try_acquire();
892 assert!(
893 matches!(overflow, Err(FrankenError::Busy)),
894 "bead_id={BEAD_ID} case=overflow_policy_drop_busy overflow={overflow:?}"
895 );
896 }
897
898 #[test]
899 fn test_background_work_degrades_gracefully() {
900 let cfg = BulkheadConfig::new(2, 0, OverflowPolicy::DropBusy)
901 .expect("non-zero max_concurrent must be valid");
902 let bulkhead = Bulkhead::new(cfg);
903
904 let _a = bulkhead.try_acquire().expect("permit a");
905 let _b = bulkhead.try_acquire().expect("permit b");
906
907 for _ in 0..8 {
908 let result = bulkhead.try_acquire();
909 assert!(
910 matches!(result, Err(FrankenError::Busy)),
911 "bead_id={BEAD_ID} case=overflow_must_reject_not_wait result={result:?}"
912 );
913 }
914
915 assert_eq!(
916 bulkhead.busy_rejections(),
917 8,
918 "bead_id={BEAD_ID} case=busy_rejection_counter"
919 );
920 }
921
922 #[test]
923 fn test_region_integration() {
924 let cfg = BulkheadConfig::new(1, 0, OverflowPolicy::DropBusy)
925 .expect("non-zero max_concurrent must be valid");
926 let region_bulkhead = RegionBulkhead::new(Region::new(7), cfg);
927 assert_eq!(
928 region_bulkhead.region().get(),
929 7,
930 "bead_id={BEAD_ID} case=region_id_plumbed"
931 );
932
933 let permit = region_bulkhead.try_acquire().expect("first permit");
934 assert!(
935 !region_bulkhead.is_quiescent(),
936 "bead_id={BEAD_ID} case=region_non_quiescent_with_active_work"
937 );
938
939 region_bulkhead.begin_close();
940 let after_close = region_bulkhead.try_acquire();
941 assert!(
942 matches!(after_close, Err(FrankenError::Busy)),
943 "bead_id={BEAD_ID} case=region_close_blocks_new_work result={after_close:?}"
944 );
945
946 drop(permit);
947 assert!(
948 region_bulkhead.is_quiescent(),
949 "bead_id={BEAD_ID} case=region_quiescent_after_permit_drop"
950 );
951 }
952
953 #[test]
954 fn test_gf256_ops_chunked() {
955 let mut dst = vec![0xAA_u8; 40];
956 let src = vec![0x55_u8; 40];
957 let expected: Vec<u8> = dst.iter().zip(src.iter()).map(|(d, s)| *d ^ *s).collect();
958
959 let layout = gf256_add_assign_chunked(&mut dst, &src)
960 .expect("equal-length buffers should be accepted");
961 assert!(
962 layout.u128_chunks > 0 || layout.u64_chunks > 0,
963 "bead_id={SIMD_BEAD_ID} case=wide_chunks_expected layout={layout:?}"
964 );
965 assert_eq!(
966 dst, expected,
967 "bead_id={SIMD_BEAD_ID} case=gf256_addition_xor_equivalence"
968 );
969 }
970
971 #[test]
972 fn test_xor_patch_wide_chunks() {
973 let mut dst = vec![0xF0_u8; 37];
974 let patch = vec![0x0F_u8; 37];
975 let expected: Vec<u8> = dst.iter().zip(patch.iter()).map(|(d, p)| *d ^ *p).collect();
976
977 let layout =
978 xor_patch_wide_chunks(&mut dst, &patch).expect("equal-length buffers should be valid");
979 assert_eq!(
980 layout,
981 WideChunkLayout {
982 u128_chunks: 2,
983 u64_chunks: 0,
984 tail_bytes: 5,
985 },
986 "bead_id={SIMD_BEAD_ID} case=chunk_layout_expected"
987 );
988 assert_eq!(
989 dst, expected,
990 "bead_id={SIMD_BEAD_ID} case=xor_patch_matches_scalar_reference"
991 );
992 }
993
994 #[test]
995 fn test_xor_symbols_u64_chunks() {
996 let mut dst = vec![0xAB_u8; 24];
998 let src = vec![0xCD_u8; 24];
999 let mut expected = vec![0xAB_u8; 24];
1000 xor_patch_bytewise(&mut expected, &src);
1001
1002 let layout = xor_patch_wide_chunks(&mut dst, &src).expect("equal-length buffers");
1003 assert_eq!(
1004 layout,
1005 WideChunkLayout {
1006 u128_chunks: 1,
1007 u64_chunks: 1,
1008 tail_bytes: 0,
1009 },
1010 "bead_id=bd-2ddc case=u64_chunk_lane_exercised"
1011 );
1012 assert_eq!(
1013 dst, expected,
1014 "bead_id=bd-2ddc case=chunked_xor_matches_bytewise_reference"
1015 );
1016 }
1017
1018 #[test]
1019 fn test_gf256_multiply_chunks() {
1020 let coeff = 0xA7_u8;
1021 let src = deterministic_payload(4096, 0xDDCC_BBAA_1122_3344);
1022 let mut chunked = vec![0_u8; src.len()];
1023 let mut scalar = vec![0_u8; src.len()];
1024
1025 symbol_mul_into(coeff, &src, &mut chunked).expect("chunked symbol_mul_into");
1026 gf256_mul_bytewise(coeff, &src, &mut scalar);
1027
1028 assert_eq!(
1029 chunked, scalar,
1030 "bead_id=bd-2ddc case=chunked_mul_matches_scalar_reference"
1031 );
1032 }
1033
1034 #[test]
1035 fn test_u128_chunk_alignment() {
1036 let mut via_wide = deterministic_payload(4099, 0x1234_5678_9ABC_DEF0);
1038 let mut via_u64_only = via_wide.clone();
1039 let patch = deterministic_payload(4099, 0x0F0E_0D0C_0B0A_0908);
1040
1041 xor_patch_wide_chunks(&mut via_wide, &patch).expect("wide chunk xor");
1042
1043 let mut dst_u64_chunks = via_u64_only.chunks_exact_mut(8);
1044 let mut patch_u64_chunks = patch.chunks_exact(8);
1045 for (dst_chunk, patch_chunk) in dst_u64_chunks.by_ref().zip(patch_u64_chunks.by_ref()) {
1046 let dst_word = u64::from_ne_bytes(
1047 dst_chunk
1048 .try_into()
1049 .expect("chunks_exact(8) must yield 8-byte chunk"),
1050 );
1051 let patch_word = u64::from_ne_bytes(
1052 patch_chunk
1053 .try_into()
1054 .expect("chunks_exact(8) must yield 8-byte chunk"),
1055 );
1056 dst_chunk.copy_from_slice(&(dst_word ^ patch_word).to_ne_bytes());
1057 }
1058 for (dst_byte, patch_byte) in dst_u64_chunks
1059 .into_remainder()
1060 .iter_mut()
1061 .zip(patch_u64_chunks.remainder().iter())
1062 {
1063 *dst_byte ^= *patch_byte;
1064 }
1065
1066 assert_eq!(
1067 via_wide, via_u64_only,
1068 "bead_id=bd-2ddc case=u128_lane_matches_u64_plus_tail"
1069 );
1070 }
1071
1072 #[test]
1073 fn test_benchmark_chunk_vs_byte() {
1074 if cfg!(debug_assertions) {
1076 return;
1077 }
1078
1079 let iterations = 32_000_usize;
1080 let src = deterministic_payload(4096, 0xDEAD_BEEF_F00D_CAFE);
1081 let base = deterministic_payload(4096, 0x0123_4567_89AB_CDEF);
1082
1083 let mut chunked = base.clone();
1084 let chunked_start = Instant::now();
1085 for _ in 0..iterations {
1086 xor_patch_wide_chunks(&mut chunked, &src).expect("chunked xor");
1087 std::hint::black_box(&chunked);
1088 }
1089 let chunked_elapsed = chunked_start.elapsed();
1090
1091 let mut bytewise = base;
1092 let bytewise_start = Instant::now();
1093 for _ in 0..iterations {
1094 xor_patch_bytewise(&mut bytewise, &src);
1095 std::hint::black_box(&bytewise);
1096 }
1097 let bytewise_elapsed = bytewise_start.elapsed();
1098
1099 let speedup = bytewise_elapsed.as_secs_f64() / chunked_elapsed.as_secs_f64();
1100 assert!(
1101 speedup >= 4.0,
1102 "bead_id=bd-2ddc case=chunk_vs_byte_speedup speedup={speedup:.2}x \
1103 chunked_ns={} bytewise_ns={} iterations={iterations}",
1104 chunked_elapsed.as_nanos(),
1105 bytewise_elapsed.as_nanos()
1106 );
1107 }
1108
1109 #[test]
1110 fn test_no_unsafe_simd() {
1111 let manifest = include_str!("../../../Cargo.toml");
1112 assert!(
1113 manifest.contains(r#"unsafe_code = "forbid""#),
1114 "bead_id=bd-2ddc case=workspace_forbids_unsafe"
1115 );
1116
1117 let workspace_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
1118 .parent()
1119 .expect("crate dir has parent")
1120 .parent()
1121 .expect("workspace root exists")
1122 .to_path_buf();
1123 let crates_dir = workspace_root.join("crates");
1124
1125 let mut rs_files = Vec::new();
1126 collect_rs_files(&crates_dir, &mut rs_files);
1127
1128 let simd_needles = [
1129 "_mm_",
1130 "std::arch::",
1131 "core::arch::",
1132 "__m128",
1133 "__m256",
1134 "__m512",
1135 "simd_shuffle",
1136 "vpxor",
1137 "vxorq",
1138 ];
1139
1140 let mut offenders = Vec::new();
1141 for file in rs_files {
1142 let Ok(content) = std::fs::read_to_string(&file) else {
1143 continue;
1144 };
1145
1146 let lines = content.lines().collect::<Vec<_>>();
1147 for (idx, line) in lines.iter().enumerate() {
1148 let has_intrinsic = simd_needles.iter().any(|needle| line.contains(needle));
1149 if !has_intrinsic {
1150 continue;
1151 }
1152
1153 let window_start = idx.saturating_sub(3);
1154 let window_end = (idx + 3).min(lines.len().saturating_sub(1));
1155 let mut found_unsafe_nearby = false;
1156 for nearby in &lines[window_start..=window_end] {
1157 let trimmed = nearby.trim_start();
1158 let is_comment = trimmed.starts_with("//");
1159 if !is_comment && trimmed.contains("unsafe") {
1160 found_unsafe_nearby = true;
1161 break;
1162 }
1163 }
1164
1165 if found_unsafe_nearby {
1166 offenders.push(format!("{}:{}", file.display(), idx + 1));
1167 }
1168 }
1169 }
1170
1171 assert!(
1172 offenders.is_empty(),
1173 "bead_id=bd-2ddc case=no_unsafe_simd_intrinsics offenders={offenders:?}"
1174 );
1175 }
1176
1177 #[test]
1178 fn test_checksum_simd_friendly() {
1179 let buffer = vec![0x11_u8; 256];
1180 let (xx_a, blake_a) =
1181 simd_friendly_checksum_pair(&buffer).expect("checksum pair must succeed");
1182
1183 let mut modified = buffer;
1184 modified[255] ^= 0x01;
1185 let (xx_b, blake_b) =
1186 simd_friendly_checksum_pair(&modified).expect("checksum pair must succeed");
1187
1188 assert_ne!(
1189 xx_a, xx_b,
1190 "bead_id={SIMD_BEAD_ID} case=xxhash3_changes_on_byte_flip"
1191 );
1192 assert_ne!(
1193 blake_a, blake_b,
1194 "bead_id={SIMD_BEAD_ID} case=blake3_changes_on_byte_flip"
1195 );
1196 }
1197
1198 #[test]
1199 fn test_e2e_bounded_parallelism_under_background_load() {
1200 let cfg = BulkheadConfig::new(4, 0, OverflowPolicy::DropBusy)
1201 .expect("non-zero max_concurrent must be valid");
1202 let bulkhead = Arc::new(Bulkhead::new(cfg));
1203
1204 let handles: Vec<_> = (0..48)
1205 .map(|_| {
1206 let bulkhead = Arc::clone(&bulkhead);
1207 thread::spawn(move || {
1208 bulkhead.run(|| {
1209 thread::sleep(Duration::from_millis(10));
1210 })
1211 })
1212 })
1213 .collect();
1214
1215 let mut busy = 0_usize;
1216 for handle in handles {
1217 match handle.join().expect("worker thread should not panic") {
1218 Ok(()) => {}
1219 Err(FrankenError::Busy) => busy = busy.saturating_add(1),
1220 Err(err) => {
1221 assert_eq!(
1222 err.error_code(),
1223 fsqlite_error::ErrorCode::Busy,
1224 "bead_id={BEAD_ID} case=e2e_unexpected_bulkhead_error err={err}"
1225 );
1226 busy = busy.saturating_add(1);
1227 }
1228 }
1229 }
1230
1231 assert!(
1232 busy > 0,
1233 "bead_id={BEAD_ID} case=e2e_should_observe_overflow_rejections"
1234 );
1235 assert!(
1236 bulkhead.peak_in_flight() <= cfg.admission_limit(),
1237 "bead_id={BEAD_ID} case=e2e_peak_parallelism_exceeded peak={} limit={}",
1238 bulkhead.peak_in_flight(),
1239 cfg.admission_limit()
1240 );
1241 }
1242
1243 #[test]
1244 fn test_e2e_simd_hot_path_correctness() {
1245 let contiguous = b"key-0001key-0002".to_vec();
1247 let left = &contiguous[0..8];
1248 let right = &contiguous[8..16];
1249 let compare_start = Instant::now();
1250 assert_eq!(
1251 compare_key_bytes_contiguous(left, right),
1252 left.cmp(right),
1253 "bead_id={SIMD_BEAD_ID} case=btree_contiguous_compare_correct"
1254 );
1255 let compare_elapsed = compare_start.elapsed();
1256
1257 let mut symbol_a = (0_u8..64).collect::<Vec<u8>>();
1259 let symbol_b = (64_u8..128).collect::<Vec<u8>>();
1260 let expected_add: Vec<u8> = symbol_a
1261 .iter()
1262 .zip(symbol_b.iter())
1263 .map(|(a, b)| *a ^ *b)
1264 .collect();
1265 let gf256_start = Instant::now();
1266 gf256_add_assign_chunked(&mut symbol_a, &symbol_b).expect("gf256 add should succeed");
1267 let gf256_elapsed = gf256_start.elapsed();
1268 assert_eq!(
1269 symbol_a, expected_add,
1270 "bead_id={SIMD_BEAD_ID} case=gf256_chunked_add_correct"
1271 );
1272
1273 let mut patch_target = vec![0x33_u8; 64];
1274 let patch = vec![0xCC_u8; 64];
1275 let xor_start = Instant::now();
1276 xor_patch_wide_chunks(&mut patch_target, &patch).expect("xor patch should succeed");
1277 let xor_elapsed = xor_start.elapsed();
1278 assert!(
1279 patch_target.iter().all(|&byte| byte == (0x33_u8 ^ 0xCC_u8)),
1280 "bead_id={SIMD_BEAD_ID} case=xor_patch_chunked_correct"
1281 );
1282
1283 let checksum_start = Instant::now();
1285 let (xx, blake) =
1286 simd_friendly_checksum_pair(&patch_target).expect("checksum pair should succeed");
1287 let checksum_elapsed = checksum_start.elapsed();
1288 assert_ne!(
1289 xx, 0,
1290 "bead_id={SIMD_BEAD_ID} case=xxhash3_nonzero_for_nonempty_payload"
1291 );
1292 assert!(
1293 blake.iter().any(|&b| b != 0),
1294 "bead_id={SIMD_BEAD_ID} case=blake3_digest_nonzero"
1295 );
1296
1297 eprintln!(
1298 "bead_id={SIMD_BEAD_ID} metric=simd_hot_path_ns compare={} gf256_add={} xor_patch={} checksum={}",
1299 compare_elapsed.as_nanos(),
1300 gf256_elapsed.as_nanos(),
1301 xor_elapsed.as_nanos(),
1302 checksum_elapsed.as_nanos()
1303 );
1304 }
1305
1306 #[test]
1307 fn test_symbol_add_self_inverse() {
1308 let src = (0_u16..512)
1309 .map(|idx| u8::try_from(idx % 251).expect("modulo fits in u8"))
1310 .collect::<Vec<_>>();
1311 let mut dst = src.clone();
1312 symbol_add_assign(&mut dst, &src).expect("symbol_add should succeed");
1313 assert!(
1314 dst.iter().all(|byte| *byte == 0),
1315 "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_self_inverse"
1316 );
1317 }
1318
1319 #[test]
1320 fn test_symbol_add_commutative_and_associative() {
1321 let a = (0_u16..128)
1322 .map(|idx| u8::try_from((idx * 3) % 251).expect("modulo fits"))
1323 .collect::<Vec<_>>();
1324 let b = (0_u16..128)
1325 .map(|idx| u8::try_from((idx * 5 + 7) % 251).expect("modulo fits"))
1326 .collect::<Vec<_>>();
1327 let c = (0_u16..128)
1328 .map(|idx| u8::try_from((idx * 11 + 13) % 251).expect("modulo fits"))
1329 .collect::<Vec<_>>();
1330
1331 let mut ab = a.clone();
1332 symbol_add_assign(&mut ab, &b).expect("a+b");
1333 let mut ba = b.clone();
1334 symbol_add_assign(&mut ba, &a).expect("b+a");
1335 assert_eq!(
1336 ab, ba,
1337 "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_commutative"
1338 );
1339
1340 let mut lhs = a.clone();
1341 symbol_add_assign(&mut lhs, &b).expect("(a+b)");
1342 symbol_add_assign(&mut lhs, &c).expect("(a+b)+c");
1343
1344 let mut rhs = b;
1345 symbol_add_assign(&mut rhs, &c).expect("(b+c)");
1346 let mut rhs2 = a;
1347 symbol_add_assign(&mut rhs2, &rhs).expect("a+(b+c)");
1348
1349 assert_eq!(
1350 lhs, rhs2,
1351 "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_associative"
1352 );
1353 }
1354
1355 #[test]
1356 fn test_symbol_mul_special_cases() {
1357 let src = (0_u16..256)
1358 .map(|idx| u8::try_from(idx).expect("idx fits"))
1359 .collect::<Vec<_>>();
1360
1361 let mut out_zero = vec![0_u8; src.len()];
1362 symbol_mul_into(0, &src, &mut out_zero).expect("mul by zero");
1363 assert!(
1364 out_zero.iter().all(|byte| *byte == 0),
1365 "bead_id={RAPTORQ_BEAD_ID} case=symbol_mul_zero"
1366 );
1367
1368 let mut out_one = vec![0_u8; src.len()];
1369 symbol_mul_into(1, &src, &mut out_one).expect("mul by one");
1370 assert_eq!(
1371 out_one, src,
1372 "bead_id={RAPTORQ_BEAD_ID} case=symbol_mul_identity"
1373 );
1374 }
1375
1376 #[test]
1377 fn test_symbol_mul_matches_scalar_reference() {
1378 let src = (0_u16..512)
1379 .map(|idx| u8::try_from((idx * 7 + 17) % 251).expect("modulo fits"))
1380 .collect::<Vec<_>>();
1381 let coeff = 0xA7_u8;
1382
1383 let mut out = vec![0_u8; src.len()];
1384 symbol_mul_into(coeff, &src, &mut out).expect("symbol mul");
1385 for (actual, input) in out.iter().zip(src.iter()) {
1386 let expected = gf256_mul_byte(coeff, *input);
1387 assert_eq!(
1388 *actual, expected,
1389 "bead_id={RAPTORQ_BEAD_ID} case=symbol_mul_scalar_match"
1390 );
1391 }
1392 }
1393
1394 #[test]
1395 fn test_symbol_addmul_special_cases_and_equivalence() {
1396 let src = (0_u16..512)
1397 .map(|idx| u8::try_from((idx * 13 + 19) % 251).expect("modulo fits"))
1398 .collect::<Vec<_>>();
1399 let original = (0_u16..512)
1400 .map(|idx| u8::try_from((idx * 9 + 3) % 251).expect("modulo fits"))
1401 .collect::<Vec<_>>();
1402
1403 let mut no_op = original.clone();
1404 symbol_addmul_assign(&mut no_op, 0, &src).expect("c=0");
1405 assert_eq!(
1406 no_op, original,
1407 "bead_id={RAPTORQ_BEAD_ID} case=symbol_addmul_c0_noop"
1408 );
1409
1410 let mut xor_path = original.clone();
1411 symbol_addmul_assign(&mut xor_path, 1, &src).expect("c=1");
1412 let mut expected_xor = original.clone();
1413 symbol_add_assign(&mut expected_xor, &src).expect("xor reference");
1414 assert_eq!(
1415 xor_path, expected_xor,
1416 "bead_id={RAPTORQ_BEAD_ID} case=symbol_addmul_c1_equals_xor"
1417 );
1418
1419 let coeff = 0x53_u8;
1420 let mut fused = original.clone();
1421 symbol_addmul_assign(&mut fused, coeff, &src).expect("fused");
1422 let mut mul = vec![0_u8; src.len()];
1423 symbol_mul_into(coeff, &src, &mut mul).expect("mul");
1424 let mut separate = original;
1425 symbol_add_assign(&mut separate, &mul).expect("add");
1426 assert_eq!(
1427 fused, separate,
1428 "bead_id={RAPTORQ_BEAD_ID} case=symbol_addmul_fused_equals_mul_plus_add"
1429 );
1430 }
1431
1432 #[test]
1433 fn test_symbol_operations_4096_and_512() {
1434 for symbol_len in [4096_usize, 1024_usize, 512_usize] {
1435 let a = vec![0xAA_u8; symbol_len];
1436 let b = vec![0x55_u8; symbol_len];
1437 let mut sum = a.clone();
1438 let layout = symbol_add_assign(&mut sum, &b).expect("symbol add");
1439 assert_eq!(
1440 layout,
1441 WideChunkLayout::for_len(symbol_len),
1442 "bead_id={RAPTORQ_BEAD_ID} case=symbol_len_layout_consistency len={symbol_len}"
1443 );
1444 assert!(
1445 sum.iter().all(|byte| *byte == (0xAA_u8 ^ 0x55_u8)),
1446 "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_expected_xor len={symbol_len}"
1447 );
1448 }
1449 }
1450
1451 #[test]
1452 fn test_gf256_arithmetic_matches_asupersync() {
1453 for a in 0_u8..=u8::MAX {
1454 for b in 0_u8..=u8::MAX {
1455 assert_eq!(
1456 gf256_mul_byte(a, b),
1457 (Gf256(a) * Gf256(b)).raw(),
1458 "bead_id={RAPTORQ_BEAD_ID} case=gf256_mul_parity a=0x{a:02X} b=0x{b:02X}"
1459 );
1460 }
1461 }
1462 }
1463
1464 #[test]
1465 fn test_symbol_ops_match_asupersync_gf256_slices() {
1466 for symbol_len in [512_usize, 1024_usize, 4096_usize] {
1467 let src = deterministic_payload(symbol_len, 0xA5A5_0101);
1468 let dst = deterministic_payload(symbol_len, 0x5A5A_0202);
1469
1470 let mut ours_add = dst.clone();
1471 symbol_add_assign(&mut ours_add, &src).expect("symbol add");
1472 let mut as_add = dst.clone();
1473 gf256_add_slice(&mut as_add, &src);
1474 assert_eq!(
1475 ours_add, as_add,
1476 "bead_id={RAPTORQ_BEAD_ID} case=asupersync_parity_add len={symbol_len}"
1477 );
1478
1479 for coeff in [0_u8, 1_u8, 0x53_u8, 0xA7_u8] {
1480 let mut ours_mul = vec![0_u8; symbol_len];
1481 symbol_mul_into(coeff, &src, &mut ours_mul).expect("symbol mul");
1482 let mut as_mul = src.clone();
1483 gf256_mul_slice(&mut as_mul, Gf256(coeff));
1484 assert_eq!(
1485 ours_mul, as_mul,
1486 "bead_id={RAPTORQ_BEAD_ID} case=asupersync_parity_mul len={symbol_len} coeff=0x{coeff:02X}"
1487 );
1488
1489 let mut ours_addmul = dst.clone();
1490 symbol_addmul_assign(&mut ours_addmul, coeff, &src).expect("symbol addmul");
1491 let mut as_addmul = dst.clone();
1492 gf256_addmul_slice(&mut as_addmul, &src, Gf256(coeff));
1493 assert_eq!(
1494 ours_addmul, as_addmul,
1495 "bead_id={RAPTORQ_BEAD_ID} case=asupersync_parity_addmul len={symbol_len} coeff=0x{coeff:02X}"
1496 );
1497 }
1498 }
1499 }
1500
1501 #[test]
1502 fn test_encode_single_source_block() {
1503 let config = raptorq_config(512, 1.25);
1504 let symbol_size = usize::from(config.encoding.symbol_size);
1505 let k = 8_usize;
1506 let data = deterministic_payload(k * symbol_size, 0x0102_0304);
1507 let object_id = AsObjectId::new_for_test(1201);
1508 tracing::info!(
1509 bead_id = RAPTORQ_BEAD_ID,
1510 case = "test_encode_single_source_block",
1511 symbol_size,
1512 requested_k = k,
1513 "encoding source block"
1514 );
1515 let (symbols, source_symbols) = encode_symbols(config, object_id, &data);
1516 let (sources, repairs) = split_source_and_repair(&symbols, source_symbols);
1517
1518 assert_eq!(
1519 source_symbols, k,
1520 "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_source_count"
1521 );
1522 assert_eq!(
1523 sources.len(),
1524 source_symbols,
1525 "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_source_partition"
1526 );
1527 assert!(
1528 !repairs.is_empty(),
1529 "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_repair_present"
1530 );
1531 assert!(
1532 symbols.iter().all(|symbol| symbol.len() == symbol_size),
1533 "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_symbol_size_consistent"
1534 );
1535 }
1536
1537 #[test]
1538 fn test_decode_exact_k_symbols() {
1539 let symbol_size = 512_usize;
1540 let k = 16_usize;
1541 let seed = 0x0BAD_CAFE_u64;
1542 let source = low_level_source_block(k, symbol_size, seed);
1543 let encoder =
1544 SystematicEncoder::new(&source, symbol_size, seed).expect("systematic encoder");
1545 let decoder = InactivationDecoder::new(k, symbol_size, seed);
1546 let params = decoder.params();
1547 let base_rows = params.s + params.h;
1548 let constraints = ConstraintMatrix::build(params, seed);
1549 let mut received = decoder.constraint_symbols();
1550 let source_indexes = (0..k).collect::<Vec<_>>();
1551 append_source_received_symbols(
1552 &mut received,
1553 &constraints,
1554 base_rows,
1555 params.k_prime,
1556 symbol_size,
1557 &source,
1558 &source_indexes,
1559 );
1560
1561 tracing::warn!(
1562 bead_id = RAPTORQ_BEAD_ID,
1563 case = "test_decode_exact_k_symbols",
1564 source_symbols = k,
1565 "decoding with minimum symbol count (fragile recovery threshold)"
1566 );
1567 let decode_outcome = decoder
1568 .decode(&received)
1569 .expect("decode exact-k must succeed");
1570 assert_eq!(
1571 decode_outcome.source, source,
1572 "bead_id={RAPTORQ_BEAD_ID} case=decode_exact_k_symbols_roundtrip"
1573 );
1574 assert_eq!(
1575 decode_outcome.intermediate[0].len(),
1576 symbol_size,
1577 "bead_id={RAPTORQ_BEAD_ID} case=decode_exact_k_symbol_size"
1578 );
1579 assert_eq!(
1580 encoder.intermediate_symbol(0),
1581 decode_outcome.intermediate[0],
1582 "bead_id={RAPTORQ_BEAD_ID} case=decode_exact_k_intermediate_consistency"
1583 );
1584 }
1585
1586 #[test]
1587 fn test_decode_with_repair_symbols() {
1588 let symbol_size = 512_usize;
1589 let k = 16_usize;
1590 let seed = 0xABC0_FED1_u64;
1591 let source = low_level_source_block(k, symbol_size, seed);
1592 let encoder =
1593 SystematicEncoder::new(&source, symbol_size, seed).expect("systematic encoder");
1594 let decoder = InactivationDecoder::new(k, symbol_size, seed);
1595 let params = decoder.params();
1596 let base_rows = params.s + params.h;
1597 let constraints = ConstraintMatrix::build(params, seed);
1598
1599 let mut received = decoder.constraint_symbols();
1600 let source_indexes = (1..k).collect::<Vec<_>>();
1601 append_source_received_symbols(
1602 &mut received,
1603 &constraints,
1604 base_rows,
1605 params.k_prime,
1606 symbol_size,
1607 &source,
1608 &source_indexes,
1609 );
1610
1611 let repair_esi = u32::try_from(k).expect("k fits u32");
1612 let (columns, coefficients) = decoder.repair_equation(repair_esi);
1613 let repair_data = encoder.repair_symbol(repair_esi);
1614 received.push(ReceivedSymbol::repair(
1615 repair_esi,
1616 columns,
1617 coefficients,
1618 repair_data,
1619 ));
1620
1621 let decode_outcome = decoder
1622 .decode(&received)
1623 .expect("decode with one repair must succeed");
1624 assert_eq!(
1625 decode_outcome.source, source,
1626 "bead_id={RAPTORQ_BEAD_ID} case=decode_with_repair_roundtrip"
1627 );
1628 }
1629
1630 #[test]
1631 fn test_decode_insufficient_symbols() {
1632 let symbol_size = 512_usize;
1633 let k = 8_usize;
1634 let seed = 0xDEAD_BEEF_u64;
1635 let source = low_level_source_block(k, symbol_size, seed);
1636 let decoder = InactivationDecoder::new(k, symbol_size, seed);
1637 let params = decoder.params();
1638 let base_rows = params.s + params.h;
1639 let constraints = ConstraintMatrix::build(params, seed);
1640 let mut received = decoder.constraint_symbols();
1641 let source_indexes = (0..k.saturating_sub(1)).collect::<Vec<_>>();
1642 append_source_received_symbols(
1643 &mut received,
1644 &constraints,
1645 base_rows,
1646 params.k_prime,
1647 symbol_size,
1648 &source,
1649 &source_indexes,
1650 );
1651
1652 let decode = decoder.decode(&received);
1653 assert!(
1654 decode.is_err(),
1655 "bead_id={RAPTORQ_BEAD_ID} case=decode_insufficient_symbols unexpectedly succeeded"
1656 );
1657 if let Err(err) = decode {
1658 tracing::error!(
1659 bead_id = RAPTORQ_BEAD_ID,
1660 case = "test_decode_insufficient_symbols",
1661 error = ?err,
1662 "decode failed as expected due to insufficient symbols"
1663 );
1664 }
1665 }
1666
1667 #[test]
1668 fn test_symbol_size_alignment() {
1669 let config = raptorq_config(4096, 1.20);
1670 let symbol_size = usize::from(config.encoding.symbol_size);
1671 let data = deterministic_payload(symbol_size * 3, 0x600D_1111);
1672 let object_id = AsObjectId::new_for_test(1205);
1673 let (symbols, source_symbols) = encode_symbols(config, object_id, &data);
1674
1675 assert_eq!(
1676 source_symbols, 3,
1677 "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_source_count"
1678 );
1679 assert!(
1680 symbol_size.is_power_of_two(),
1681 "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_power_of_two"
1682 );
1683 assert_eq!(
1684 symbol_size % 512,
1685 0,
1686 "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_sector_multiple"
1687 );
1688 assert!(
1689 symbols.iter().all(|symbol| symbol.len() == symbol_size),
1690 "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_symbol_lengths"
1691 );
1692 }
1693
1694 #[test]
1695 fn prop_encode_decode_roundtrip() {
1696 for seed in [11_u64, 29_u64, 43_u64, 71_u64] {
1697 for k in [8_usize, 16_usize] {
1698 let config = raptorq_config(512, 1.30);
1699 let symbol_size = usize::from(config.encoding.symbol_size);
1700 let data = deterministic_payload(k * symbol_size - 17, seed);
1701 let object_id = AsObjectId::new_for_test(2000 + seed);
1702 let (symbols, source_symbols) = encode_symbols(config.clone(), object_id, &data);
1703 let (sources, _) = split_source_and_repair(&symbols, source_symbols);
1704 let subset = sources
1705 .iter()
1706 .take(source_symbols)
1707 .cloned()
1708 .collect::<Vec<_>>();
1709 let decoded = decode_symbols(config, object_id, data.len(), source_symbols, subset)
1710 .expect("property roundtrip decode");
1711 assert_eq!(
1712 decoded, data,
1713 "bead_id={RAPTORQ_BEAD_ID} case=prop_encode_decode_roundtrip seed={seed} k={k}"
1714 );
1715 }
1716 }
1717 }
1718
1719 #[test]
1720 fn prop_any_k_of_n_suffices() {
1721 let symbol_size = 512_usize;
1722 let k = 16_usize;
1723 let sources = (0..k)
1724 .map(|symbol_idx| {
1725 deterministic_payload(
1726 symbol_size,
1727 0x5000_0000 + u64::try_from(symbol_idx).expect("index fits u64"),
1728 )
1729 })
1730 .collect::<Vec<_>>();
1731
1732 let mut parity = vec![0_u8; symbol_size];
1733 for source in &sources {
1734 symbol_add_assign(&mut parity, source).expect("parity construction");
1735 }
1736
1737 for omitted in 0..=k {
1738 let rebuilt = if omitted == k {
1739 sources.clone()
1740 } else {
1741 let mut recovered = parity.clone();
1742 for (index, source) in sources.iter().enumerate() {
1743 if index != omitted {
1744 symbol_add_assign(&mut recovered, source).expect("single-erasure recovery");
1745 }
1746 }
1747 let mut rebuilt = sources.clone();
1748 rebuilt[omitted] = recovered;
1749 rebuilt
1750 };
1751
1752 assert_eq!(
1753 rebuilt.len(),
1754 k,
1755 "bead_id={RAPTORQ_BEAD_ID} case=prop_any_k_of_n_subset_size omitted_index={omitted}"
1756 );
1757 assert_eq!(
1758 rebuilt, sources,
1759 "bead_id={RAPTORQ_BEAD_ID} case=prop_any_k_of_n_suffices omitted_index={omitted}"
1760 );
1761 }
1762 }
1763
1764 #[test]
1765 fn prop_symbol_size_consistent() {
1766 for symbol_size in [512_u16, 1024_u16, 4096_u16] {
1767 for k in [4_usize, 8_usize] {
1768 let config = raptorq_config(symbol_size, 1.25);
1769 let size = usize::from(symbol_size);
1770 let object_id = AsObjectId::new_for_test(
1771 u64::from(symbol_size) * 100 + u64::try_from(k).expect("k fits u64"),
1772 );
1773 let data = deterministic_payload(k * size - 3, u64::from(symbol_size));
1774 let (symbols, _) = encode_symbols(config, object_id, &data);
1775 assert!(
1776 symbols.iter().all(|symbol| symbol.len() == size),
1777 "bead_id={RAPTORQ_BEAD_ID} case=prop_symbol_size_consistent symbol_size={symbol_size} k={k}"
1778 );
1779 }
1780 }
1781 }
1782
1783 #[test]
1784 fn test_e2e_symbol_ops_in_encode_decode_roundtrip() {
1785 for (run, k) in [8_usize, 16_usize, 64_usize].iter().copied().enumerate() {
1786 let config = raptorq_config(512, 1.30);
1787 let symbol_size = usize::from(config.encoding.symbol_size);
1788 let data = deterministic_payload(
1789 k * symbol_size,
1790 0x4455_6677 + u64::try_from(run).expect("run fits u64"),
1791 );
1792 let object_id = AsObjectId::new_for_test(3000 + u64::try_from(k).expect("k fits u64"));
1793
1794 tracing::info!(
1795 bead_id = RAPTORQ_BEAD_ID,
1796 case = "test_e2e_symbol_ops_in_encode_decode_roundtrip",
1797 k,
1798 symbol_size,
1799 "starting encode/decode roundtrip"
1800 );
1801 let (symbols, source_symbols) = encode_symbols(config.clone(), object_id, &data);
1802 let (sources, _) = split_source_and_repair(&symbols, source_symbols);
1803 let subset = sources
1804 .iter()
1805 .take(source_symbols)
1806 .cloned()
1807 .collect::<Vec<_>>();
1808 let decoded = decode_symbols(config, object_id, data.len(), source_symbols, subset)
1809 .expect("e2e decode must succeed");
1810
1811 assert_eq!(
1812 decoded, data,
1813 "bead_id={RAPTORQ_BEAD_ID} case=e2e_roundtrip_bytes k={k}"
1814 );
1815
1816 let mut source_parity = vec![0_u8; symbol_size];
1817 for chunk in data.chunks_exact(symbol_size) {
1818 symbol_add_assign(&mut source_parity, chunk).expect("source parity xor");
1819 }
1820
1821 let mut decoded_parity = vec![0_u8; symbol_size];
1822 for chunk in decoded.chunks_exact(symbol_size) {
1823 symbol_add_assign(&mut decoded_parity, chunk).expect("decoded parity xor");
1824 }
1825
1826 assert_eq!(
1827 decoded_parity, source_parity,
1828 "bead_id={RAPTORQ_BEAD_ID} case=e2e_symbol_ops_parity k={k}"
1829 );
1830 }
1831 }
1832}