1pub mod attach;
8pub mod commit_marker;
9pub mod commit_repair;
10pub mod compat_persist;
11pub mod connection;
12pub mod db_fec;
13pub mod decode_proofs;
14pub mod ecs_replication;
15pub mod epoch;
16pub mod explain;
17pub mod inter_object_coding;
18pub mod lrc;
19pub mod native_index;
20pub mod permeation_map;
21pub mod por;
22pub mod raptorq_codec;
23pub mod raptorq_integration;
24pub mod region;
25pub mod remote_effects;
26pub mod repair_engine;
27pub mod repair_symbols;
28pub mod replication_receiver;
29pub mod replication_sender;
30pub mod snapshot_shipping;
31pub mod source_block_partition;
32pub mod symbol_log;
33pub mod symbol_size_policy;
34pub mod tiered_storage;
35pub mod transaction;
36pub mod wal_adapter;
37pub mod wal_fec_adapter;
38
39use std::num::NonZeroUsize;
40use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
41
42use fsqlite_error::{FrankenError, Result};
43use fsqlite_types::{
44 ObjectId, Oti, PayloadHash, Region, SymbolRecord, SymbolRecordFlags, gf256_mul_byte,
45};
46use tracing::{debug, error};
47
48const MAX_BALANCED_BG_CPU: usize = 16;
49
50#[derive(Debug, Clone, Copy, PartialEq, Eq)]
52pub enum OverflowPolicy {
53 DropBusy,
55}
56
57#[derive(Debug, Clone, Copy, PartialEq, Eq)]
59pub enum ParallelismProfile {
60 Balanced,
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub struct BulkheadConfig {
67 pub max_concurrent: usize,
69 pub queue_depth: usize,
71 pub overflow_policy: OverflowPolicy,
73}
74
75impl BulkheadConfig {
76 #[must_use]
80 pub const fn new(
81 max_concurrent: usize,
82 queue_depth: usize,
83 overflow_policy: OverflowPolicy,
84 ) -> Option<Self> {
85 if max_concurrent == 0 {
86 None
87 } else {
88 Some(Self {
89 max_concurrent,
90 queue_depth,
91 overflow_policy,
92 })
93 }
94 }
95
96 #[must_use]
101 pub fn for_profile(profile: ParallelismProfile) -> Self {
102 let p = available_parallelism_or_one();
103 match profile {
104 ParallelismProfile::Balanced => Self {
105 max_concurrent: conservative_bg_cpu_max(p),
106 queue_depth: 0,
107 overflow_policy: OverflowPolicy::DropBusy,
108 },
109 }
110 }
111
112 #[must_use]
114 pub const fn admission_limit(self) -> usize {
115 self.max_concurrent.saturating_add(self.queue_depth)
116 }
117}
118
119impl Default for BulkheadConfig {
120 fn default() -> Self {
121 Self::for_profile(ParallelismProfile::Balanced)
122 }
123}
124
125#[must_use]
127pub const fn conservative_bg_cpu_max(p: usize) -> usize {
128 let base = p / 8;
129 if base == 0 {
130 1
131 } else if base > MAX_BALANCED_BG_CPU {
132 MAX_BALANCED_BG_CPU
133 } else {
134 base
135 }
136}
137
138#[must_use]
140pub fn available_parallelism_or_one() -> usize {
141 std::thread::available_parallelism().map_or(1, NonZeroUsize::get)
142}
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub struct WideChunkLayout {
147 pub u128_chunks: usize,
149 pub u64_chunks: usize,
151 pub tail_bytes: usize,
153}
154
155impl WideChunkLayout {
156 #[must_use]
158 pub const fn for_len(len: usize) -> Self {
159 let u128_chunks = len / 16;
160 let rem_after_u128 = len % 16;
161 let u64_chunks = rem_after_u128 / 8;
162 let tail_bytes = rem_after_u128 % 8;
163 Self {
164 u128_chunks,
165 u64_chunks,
166 tail_bytes,
167 }
168 }
169}
170
171#[allow(clippy::incompatible_msrv)]
176pub fn xor_patch_wide_chunks(dst: &mut [u8], patch: &[u8]) -> Result<WideChunkLayout> {
177 if dst.len() != patch.len() {
178 return Err(FrankenError::TypeMismatch {
179 expected: format!("equal lengths (dst == patch), got {}", dst.len()),
180 actual: patch.len().to_string(),
181 });
182 }
183
184 let layout = WideChunkLayout::for_len(dst.len());
185
186 let (dst_128, dst_rem) = dst.as_chunks_mut::<16>();
187 let (patch_128, patch_rem) = patch.as_chunks::<16>();
188 for (d, p) in dst_128.iter_mut().zip(patch_128.iter()) {
189 let d_word = u128::from_ne_bytes(*d);
190 let p_word = u128::from_ne_bytes(*p);
191 *d = (d_word ^ p_word).to_ne_bytes();
192 }
193
194 let (dst_64, dst_tail) = dst_rem.as_chunks_mut::<8>();
195 let (patch_64, patch_tail) = patch_rem.as_chunks::<8>();
196 for (d, p) in dst_64.iter_mut().zip(patch_64.iter()) {
197 let d_word = u64::from_ne_bytes(*d);
198 let p_word = u64::from_ne_bytes(*p);
199 *d = (d_word ^ p_word).to_ne_bytes();
200 }
201
202 for (d, p) in dst_tail.iter_mut().zip(patch_tail.iter()) {
203 *d ^= *p;
204 }
205
206 Ok(layout)
207}
208
209pub fn gf256_add_assign_chunked(dst: &mut [u8], src: &[u8]) -> Result<WideChunkLayout> {
214 xor_patch_wide_chunks(dst, src)
215}
216
217pub fn symbol_add_assign(dst: &mut [u8], src: &[u8]) -> Result<WideChunkLayout> {
221 debug!(
222 bead_id = "bd-1hi.2",
223 op = "symbol_add_assign",
224 symbol_len = dst.len(),
225 "applying in-place XOR over symbol bytes"
226 );
227 gf256_add_assign_chunked(dst, src)
228}
229
230pub fn symbol_mul_into(coeff: u8, src: &[u8], out: &mut [u8]) -> Result<()> {
236 if src.len() != out.len() {
237 error!(
238 bead_id = "bd-1hi.2",
239 op = "symbol_mul_into",
240 coeff,
241 src_len = src.len(),
242 out_len = out.len(),
243 "symbol length mismatch"
244 );
245 return Err(FrankenError::TypeMismatch {
246 expected: format!("equal lengths (src == out), got {}", src.len()),
247 actual: out.len().to_string(),
248 });
249 }
250
251 debug!(
252 bead_id = "bd-1hi.2",
253 op = "symbol_mul_into",
254 coeff,
255 symbol_len = src.len(),
256 "applying GF(256) scalar multiplication"
257 );
258
259 match coeff {
260 0 => {
261 out.fill(0);
262 Ok(())
263 }
264 1 => {
265 out.copy_from_slice(src);
266 Ok(())
267 }
268 _ => {
269 let mut out_chunks = out.chunks_exact_mut(16);
270 let mut src_chunks = src.chunks_exact(16);
271
272 for (dst_chunk, src_chunk) in out_chunks.by_ref().zip(src_chunks.by_ref()) {
273 for (dst_byte, src_byte) in dst_chunk.iter_mut().zip(src_chunk.iter()) {
274 *dst_byte = gf256_mul_byte(coeff, *src_byte);
275 }
276 }
277 for (dst_byte, src_byte) in out_chunks
278 .into_remainder()
279 .iter_mut()
280 .zip(src_chunks.remainder().iter())
281 {
282 *dst_byte = gf256_mul_byte(coeff, *src_byte);
283 }
284 Ok(())
285 }
286 }
287}
288
289pub fn symbol_addmul_assign(dst: &mut [u8], coeff: u8, src: &[u8]) -> Result<WideChunkLayout> {
295 if dst.len() != src.len() {
296 error!(
297 bead_id = "bd-1hi.2",
298 op = "symbol_addmul_assign",
299 coeff,
300 dst_len = dst.len(),
301 src_len = src.len(),
302 "symbol length mismatch"
303 );
304 return Err(FrankenError::TypeMismatch {
305 expected: format!("equal lengths (dst == src), got {}", dst.len()),
306 actual: src.len().to_string(),
307 });
308 }
309
310 debug!(
311 bead_id = "bd-1hi.2",
312 op = "symbol_addmul_assign",
313 coeff,
314 symbol_len = dst.len(),
315 "applying fused multiply-and-add over symbol bytes"
316 );
317
318 match coeff {
319 0 => Ok(WideChunkLayout::for_len(dst.len())),
320 1 => symbol_add_assign(dst, src),
321 _ => {
322 let mut dst_chunks = dst.chunks_exact_mut(16);
323 let mut src_chunks = src.chunks_exact(16);
324
325 for (dst_chunk, src_chunk) in dst_chunks.by_ref().zip(src_chunks.by_ref()) {
326 for (dst_byte, src_byte) in dst_chunk.iter_mut().zip(src_chunk.iter()) {
327 *dst_byte ^= gf256_mul_byte(coeff, *src_byte);
328 }
329 }
330 for (dst_byte, src_byte) in dst_chunks
331 .into_remainder()
332 .iter_mut()
333 .zip(src_chunks.remainder().iter())
334 {
335 *dst_byte ^= gf256_mul_byte(coeff, *src_byte);
336 }
337 Ok(WideChunkLayout::for_len(dst.len()))
338 }
339 }
340}
341
342pub fn simd_friendly_checksum_pair(buffer: &[u8]) -> Result<(u64, [u8; 32])> {
347 let symbol_size = u32::try_from(buffer.len()).map_err(|_| FrankenError::OutOfRange {
348 what: "symbol_size".to_owned(),
349 value: buffer.len().to_string(),
350 })?;
351
352 let symbol_record = SymbolRecord::new(
353 ObjectId::from_bytes([0_u8; ObjectId::LEN]),
354 Oti {
355 f: u64::from(symbol_size),
356 al: 1,
357 t: symbol_size,
358 z: 1,
359 n: 1,
360 },
361 0,
362 buffer.to_vec(),
363 SymbolRecordFlags::empty(),
364 );
365 let blake = PayloadHash::blake3(buffer);
366
367 Ok((symbol_record.frame_xxh3, *blake.as_bytes()))
368}
369
370#[derive(Debug)]
372pub struct Bulkhead {
373 config: BulkheadConfig,
374 in_flight: AtomicUsize,
375 peak_in_flight: AtomicUsize,
376 busy_rejections: AtomicUsize,
377}
378
379impl Bulkhead {
380 #[must_use]
381 pub fn new(config: BulkheadConfig) -> Self {
382 Self {
383 config,
384 in_flight: AtomicUsize::new(0),
385 peak_in_flight: AtomicUsize::new(0),
386 busy_rejections: AtomicUsize::new(0),
387 }
388 }
389
390 #[must_use]
391 pub const fn config(&self) -> BulkheadConfig {
392 self.config
393 }
394
395 #[must_use]
396 pub fn in_flight(&self) -> usize {
397 self.in_flight.load(Ordering::Acquire)
398 }
399
400 #[must_use]
401 pub fn peak_in_flight(&self) -> usize {
402 self.peak_in_flight.load(Ordering::Acquire)
403 }
404
405 #[must_use]
406 pub fn busy_rejections(&self) -> usize {
407 self.busy_rejections.load(Ordering::Acquire)
408 }
409
410 pub fn try_acquire(&self) -> Result<BulkheadPermit<'_>> {
415 let limit = self.config.admission_limit();
416 loop {
417 let current = self.in_flight.load(Ordering::Acquire);
418 if current >= limit {
419 self.busy_rejections.fetch_add(1, Ordering::AcqRel);
420 return Err(match self.config.overflow_policy {
421 OverflowPolicy::DropBusy => FrankenError::Busy,
422 });
423 }
424
425 let next = current.saturating_add(1);
426 if self
427 .in_flight
428 .compare_exchange_weak(current, next, Ordering::AcqRel, Ordering::Acquire)
429 .is_ok()
430 {
431 self.peak_in_flight.fetch_max(next, Ordering::AcqRel);
432 return Ok(BulkheadPermit {
433 bulkhead: self,
434 released: false,
435 });
436 }
437 }
438 }
439
440 pub fn run<T>(&self, work: impl FnOnce() -> T) -> Result<T> {
442 let _permit = self.try_acquire()?;
443 Ok(work())
444 }
445}
446
447#[derive(Debug)]
449pub struct BulkheadPermit<'a> {
450 bulkhead: &'a Bulkhead,
451 released: bool,
452}
453
454impl BulkheadPermit<'_> {
455 pub fn release(mut self) {
457 if !self.released {
458 self.bulkhead.in_flight.fetch_sub(1, Ordering::AcqRel);
459 self.released = true;
460 }
461 }
462}
463
464impl Drop for BulkheadPermit<'_> {
465 fn drop(&mut self) {
466 if !self.released {
467 self.bulkhead.in_flight.fetch_sub(1, Ordering::AcqRel);
468 self.released = true;
469 }
470 }
471}
472
473#[derive(Debug)]
475pub struct RegionBulkhead {
476 region: Region,
477 bulkhead: Bulkhead,
478 closing: AtomicBool,
479}
480
481impl RegionBulkhead {
482 #[must_use]
483 pub fn new(region: Region, config: BulkheadConfig) -> Self {
484 Self {
485 region,
486 bulkhead: Bulkhead::new(config),
487 closing: AtomicBool::new(false),
488 }
489 }
490
491 #[must_use]
492 pub const fn region(&self) -> Region {
493 self.region
494 }
495
496 #[must_use]
497 pub fn bulkhead(&self) -> &Bulkhead {
498 &self.bulkhead
499 }
500
501 pub fn try_acquire(&self) -> Result<BulkheadPermit<'_>> {
502 if self.closing.load(Ordering::Acquire) {
503 return Err(FrankenError::Busy);
504 }
505 self.bulkhead.try_acquire()
506 }
507
508 pub fn begin_close(&self) {
510 self.closing.store(true, Ordering::Release);
511 }
512
513 #[must_use]
515 pub fn is_quiescent(&self) -> bool {
516 self.bulkhead.in_flight() == 0
517 }
518}
519
520#[cfg(test)]
521mod tests {
522 use std::collections::VecDeque;
523 use std::pin::Pin;
524 use std::sync::Arc;
525 use std::task::{Context, Poll};
526 use std::thread;
527 use std::time::{Duration, Instant};
528
529 use asupersync::raptorq::decoder::{InactivationDecoder, ReceivedSymbol};
530 use asupersync::raptorq::gf256::{Gf256, gf256_add_slice, gf256_addmul_slice, gf256_mul_slice};
531 use asupersync::raptorq::systematic::{ConstraintMatrix, SystematicEncoder};
532 use asupersync::raptorq::{RaptorQReceiverBuilder, RaptorQSenderBuilder};
533 use asupersync::security::AuthenticationTag;
534 use asupersync::security::authenticated::AuthenticatedSymbol;
535 use asupersync::transport::error::{SinkError, StreamError};
536 use asupersync::transport::sink::SymbolSink;
537 use asupersync::transport::stream::SymbolStream;
538 use asupersync::types::{ObjectId as AsObjectId, ObjectParams, Symbol};
539 use asupersync::{Cx, RaptorQConfig};
540 use fsqlite_btree::compare_key_bytes_contiguous;
541
542 use super::*;
543
544 const BEAD_ID: &str = "bd-22n.4";
545 const SIMD_BEAD_ID: &str = "bd-22n.6";
546 const RAPTORQ_BEAD_ID: &str = "bd-1hi.2";
547
548 #[derive(Debug)]
549 struct VecSink {
550 symbols: Vec<Symbol>,
551 }
552
553 impl VecSink {
554 fn new() -> Self {
555 Self {
556 symbols: Vec::new(),
557 }
558 }
559 }
560
561 impl SymbolSink for VecSink {
562 fn poll_send(
563 mut self: Pin<&mut Self>,
564 _cx: &mut Context<'_>,
565 symbol: AuthenticatedSymbol,
566 ) -> Poll<std::result::Result<(), SinkError>> {
567 self.symbols.push(symbol.into_symbol());
568 Poll::Ready(Ok(()))
569 }
570
571 fn poll_flush(
572 self: Pin<&mut Self>,
573 _cx: &mut Context<'_>,
574 ) -> Poll<std::result::Result<(), SinkError>> {
575 Poll::Ready(Ok(()))
576 }
577
578 fn poll_close(
579 self: Pin<&mut Self>,
580 _cx: &mut Context<'_>,
581 ) -> Poll<std::result::Result<(), SinkError>> {
582 Poll::Ready(Ok(()))
583 }
584
585 fn poll_ready(
586 self: Pin<&mut Self>,
587 _cx: &mut Context<'_>,
588 ) -> Poll<std::result::Result<(), SinkError>> {
589 Poll::Ready(Ok(()))
590 }
591 }
592
593 #[derive(Debug)]
594 struct VecStream {
595 q: VecDeque<AuthenticatedSymbol>,
596 }
597
598 impl VecStream {
599 fn new(symbols: Vec<Symbol>) -> Self {
600 let q = symbols
601 .into_iter()
602 .map(|symbol| AuthenticatedSymbol::new_verified(symbol, AuthenticationTag::zero()))
603 .collect();
604 Self { q }
605 }
606 }
607
608 impl SymbolStream for VecStream {
609 fn poll_next(
610 mut self: Pin<&mut Self>,
611 _cx: &mut Context<'_>,
612 ) -> Poll<Option<std::result::Result<AuthenticatedSymbol, StreamError>>> {
613 match self.q.pop_front() {
614 Some(symbol) => Poll::Ready(Some(Ok(symbol))),
615 None => Poll::Ready(None),
616 }
617 }
618
619 fn size_hint(&self) -> (usize, Option<usize>) {
620 (self.q.len(), Some(self.q.len()))
621 }
622
623 fn is_exhausted(&self) -> bool {
624 self.q.is_empty()
625 }
626 }
627
628 fn raptorq_config(symbol_size: u16, repair_overhead: f64) -> RaptorQConfig {
629 let mut config = RaptorQConfig::default();
630 config.encoding.symbol_size = symbol_size;
631 config.encoding.max_block_size = 64 * 1024;
632 config.encoding.repair_overhead = repair_overhead;
633 config
634 }
635
636 fn deterministic_payload(len: usize, seed: u64) -> Vec<u8> {
637 let mut state = seed ^ 0x9E37_79B9_7F4A_7C15;
638 let mut out = Vec::with_capacity(len);
639 for idx in 0..len {
640 state ^= state << 7;
641 state ^= state >> 9;
642 state = state.wrapping_mul(0xA24B_AED4_963E_E407);
643 let idx_byte = u8::try_from(idx % 251).expect("modulo fits in u8");
644 out.push(u8::try_from(state & 0xFF).expect("masked to u8") ^ idx_byte);
645 }
646 out
647 }
648
649 fn xor_patch_bytewise(dst: &mut [u8], patch: &[u8]) {
650 for (dst_byte, patch_byte) in dst.iter_mut().zip(patch.iter()) {
651 *dst_byte ^= *patch_byte;
652 }
653 }
654
655 fn gf256_mul_bytewise(coeff: u8, src: &[u8], out: &mut [u8]) {
656 for (dst_byte, src_byte) in out.iter_mut().zip(src.iter()) {
657 *dst_byte = gf256_mul_byte(coeff, *src_byte);
658 }
659 }
660
661 fn collect_rs_files(root: &std::path::Path, out: &mut Vec<std::path::PathBuf>) {
662 let entries = std::fs::read_dir(root).expect("read_dir should succeed");
663 for entry in entries {
664 let path = entry.expect("read_dir entry should be readable").path();
665 if path.is_dir() {
666 collect_rs_files(&path, out);
667 } else if path.extension().is_some_and(|ext| ext == "rs") {
668 out.push(path);
669 }
670 }
671 }
672
673 fn encode_symbols(
674 config: RaptorQConfig,
675 object_id: AsObjectId,
676 data: &[u8],
677 ) -> (Vec<Symbol>, usize) {
678 let cx = Cx::for_testing();
679 let mut sender = RaptorQSenderBuilder::new()
680 .config(config)
681 .transport(VecSink::new())
682 .build()
683 .expect("sender build");
684 let outcome = sender
685 .send_object(&cx, object_id, data)
686 .expect("send_object must succeed");
687 let symbols = std::mem::take(&mut sender.transport_mut().symbols);
688 tracing::debug!(
689 bead_id = RAPTORQ_BEAD_ID,
690 case = "encode_symbols",
691 source_symbols = outcome.source_symbols,
692 emitted_symbols = symbols.len(),
693 object_size = data.len(),
694 "encoded object into source+repair symbol stream"
695 );
696 (symbols, outcome.source_symbols)
697 }
698
699 #[allow(clippy::result_large_err)]
700 fn decode_symbols(
701 config: RaptorQConfig,
702 object_id: AsObjectId,
703 object_size: usize,
704 source_symbols: usize,
705 symbols: Vec<Symbol>,
706 ) -> std::result::Result<Vec<u8>, asupersync::Error> {
707 let cx = Cx::for_testing();
708 let params = ObjectParams::new(
709 object_id,
710 u64::try_from(object_size).expect("object size fits u64"),
711 config.encoding.symbol_size,
712 1,
713 u16::try_from(source_symbols).expect("source symbol count fits u16"),
714 );
715 let mut receiver = RaptorQReceiverBuilder::new()
716 .config(config)
717 .source(VecStream::new(symbols))
718 .build()
719 .expect("receiver build");
720
721 receiver
722 .receive_object(&cx, ¶ms)
723 .map(|outcome| outcome.data)
724 }
725
726 fn split_source_and_repair(
727 symbols: &[Symbol],
728 source_symbols: usize,
729 ) -> (Vec<Symbol>, Vec<Symbol>) {
730 let source_symbols_u32 =
731 u32::try_from(source_symbols).expect("source symbol count fits u32");
732 let mut sources = Vec::new();
733 let mut repairs = Vec::new();
734 for symbol in symbols {
735 if symbol.esi() < source_symbols_u32 {
736 sources.push(symbol.clone());
737 } else {
738 repairs.push(symbol.clone());
739 }
740 }
741 (sources, repairs)
742 }
743
744 fn low_level_source_block(k: usize, symbol_size: usize, seed: u64) -> Vec<Vec<u8>> {
745 (0..k)
746 .map(|source_index| {
747 deterministic_payload(
748 symbol_size,
749 seed + u64::try_from(source_index).expect("source index fits u64"),
750 )
751 })
752 .collect()
753 }
754
755 fn append_source_received_symbols(
756 received: &mut Vec<ReceivedSymbol>,
757 constraints: &ConstraintMatrix,
758 base_rows: usize,
759 k_prime: usize,
760 symbol_size: usize,
761 source: &[Vec<u8>],
762 source_indexes: &[usize],
763 ) {
764 for &source_index in source_indexes {
765 let row = base_rows + source_index;
766 let mut columns = Vec::new();
767 let mut coefficients = Vec::new();
768 for col in 0..constraints.cols {
769 let coeff = constraints.get(row, col);
770 if !coeff.is_zero() {
771 columns.push(col);
772 coefficients.push(coeff);
773 }
774 }
775
776 received.push(ReceivedSymbol {
777 esi: u32::try_from(source_index).expect("source index fits u32"),
778 is_source: true,
779 columns,
780 coefficients,
781 data: source[source_index].clone(),
782 });
783 }
784
785 for source_index in source.len()..k_prime {
788 let row = base_rows + source_index;
789 let mut columns = Vec::new();
790 let mut coefficients = Vec::new();
791 for col in 0..constraints.cols {
792 let coeff = constraints.get(row, col);
793 if !coeff.is_zero() {
794 columns.push(col);
795 coefficients.push(coeff);
796 }
797 }
798
799 received.push(ReceivedSymbol {
800 esi: u32::try_from(source_index).expect("source index fits u32"),
801 is_source: true,
802 columns,
803 coefficients,
804 data: vec![0_u8; symbol_size],
805 });
806 }
807 }
808
809 #[test]
810 fn test_parallelism_defaults_conservative() {
811 assert_eq!(
812 conservative_bg_cpu_max(16),
813 2,
814 "bead_id={BEAD_ID} case=balanced_profile_formula_p16"
815 );
816 assert_eq!(
817 conservative_bg_cpu_max(1),
818 1,
819 "bead_id={BEAD_ID} case=balanced_profile_min_floor"
820 );
821 assert_eq!(
822 conservative_bg_cpu_max(512),
823 16,
824 "bead_id={BEAD_ID} case=balanced_profile_max_cap"
825 );
826 }
827
828 #[test]
829 fn test_parallelism_bounded_by_available() {
830 let cfg = BulkheadConfig::default();
831 let p = available_parallelism_or_one();
832 assert!(
833 cfg.max_concurrent <= p,
834 "bead_id={BEAD_ID} case=default_exceeds_available_parallelism cfg={cfg:?} p={p}"
835 );
836
837 let bulkhead = Bulkhead::new(cfg);
838 let mut permits = Vec::new();
839 for _ in 0..cfg.admission_limit() {
840 permits.push(
841 bulkhead
842 .try_acquire()
843 .expect("admission under configured limit should succeed"),
844 );
845 }
846
847 let overflow = bulkhead.try_acquire();
848 assert!(
849 matches!(overflow, Err(FrankenError::Busy)),
850 "bead_id={BEAD_ID} case=bounded_admission_overflow_must_be_busy overflow={overflow:?}"
851 );
852
853 drop(permits);
854 assert_eq!(
855 bulkhead.in_flight(),
856 0,
857 "bead_id={BEAD_ID} case=permits_drop_to_zero"
858 );
859 }
860
861 #[test]
862 fn test_bulkhead_config_max_concurrent() {
863 let cfg = BulkheadConfig::new(3, 0, OverflowPolicy::DropBusy)
864 .expect("non-zero max_concurrent must be valid");
865 let bulkhead = Bulkhead::new(cfg);
866
867 let p1 = bulkhead.try_acquire().expect("slot 1");
868 let p2 = bulkhead.try_acquire().expect("slot 2");
869 let p3 = bulkhead.try_acquire().expect("slot 3");
870 let overflow = bulkhead.try_acquire();
871
872 assert!(
873 matches!(overflow, Err(FrankenError::Busy)),
874 "bead_id={BEAD_ID} case=max_concurrent_enforced overflow={overflow:?}"
875 );
876 drop((p1, p2, p3));
877 }
878
879 #[test]
880 fn test_overflow_policy_drop_with_busy() {
881 let cfg = BulkheadConfig::new(1, 0, OverflowPolicy::DropBusy)
882 .expect("non-zero max_concurrent must be valid");
883 let bulkhead = Bulkhead::new(cfg);
884 let _permit = bulkhead.try_acquire().expect("first permit must succeed");
885
886 let overflow = bulkhead.try_acquire();
887 assert!(
888 matches!(overflow, Err(FrankenError::Busy)),
889 "bead_id={BEAD_ID} case=overflow_policy_drop_busy overflow={overflow:?}"
890 );
891 }
892
893 #[test]
894 fn test_background_work_degrades_gracefully() {
895 let cfg = BulkheadConfig::new(2, 0, OverflowPolicy::DropBusy)
896 .expect("non-zero max_concurrent must be valid");
897 let bulkhead = Bulkhead::new(cfg);
898
899 let _a = bulkhead.try_acquire().expect("permit a");
900 let _b = bulkhead.try_acquire().expect("permit b");
901
902 for _ in 0..8 {
903 let result = bulkhead.try_acquire();
904 assert!(
905 matches!(result, Err(FrankenError::Busy)),
906 "bead_id={BEAD_ID} case=overflow_must_reject_not_wait result={result:?}"
907 );
908 }
909
910 assert_eq!(
911 bulkhead.busy_rejections(),
912 8,
913 "bead_id={BEAD_ID} case=busy_rejection_counter"
914 );
915 }
916
917 #[test]
918 fn test_region_integration() {
919 let cfg = BulkheadConfig::new(1, 0, OverflowPolicy::DropBusy)
920 .expect("non-zero max_concurrent must be valid");
921 let region_bulkhead = RegionBulkhead::new(Region::new(7), cfg);
922 assert_eq!(
923 region_bulkhead.region().get(),
924 7,
925 "bead_id={BEAD_ID} case=region_id_plumbed"
926 );
927
928 let permit = region_bulkhead.try_acquire().expect("first permit");
929 assert!(
930 !region_bulkhead.is_quiescent(),
931 "bead_id={BEAD_ID} case=region_non_quiescent_with_active_work"
932 );
933
934 region_bulkhead.begin_close();
935 let after_close = region_bulkhead.try_acquire();
936 assert!(
937 matches!(after_close, Err(FrankenError::Busy)),
938 "bead_id={BEAD_ID} case=region_close_blocks_new_work result={after_close:?}"
939 );
940
941 drop(permit);
942 assert!(
943 region_bulkhead.is_quiescent(),
944 "bead_id={BEAD_ID} case=region_quiescent_after_permit_drop"
945 );
946 }
947
948 #[test]
949 fn test_gf256_ops_chunked() {
950 let mut dst = vec![0xAA_u8; 40];
951 let src = vec![0x55_u8; 40];
952 let expected: Vec<u8> = dst.iter().zip(src.iter()).map(|(d, s)| *d ^ *s).collect();
953
954 let layout = gf256_add_assign_chunked(&mut dst, &src)
955 .expect("equal-length buffers should be accepted");
956 assert!(
957 layout.u128_chunks > 0 || layout.u64_chunks > 0,
958 "bead_id={SIMD_BEAD_ID} case=wide_chunks_expected layout={layout:?}"
959 );
960 assert_eq!(
961 dst, expected,
962 "bead_id={SIMD_BEAD_ID} case=gf256_addition_xor_equivalence"
963 );
964 }
965
966 #[test]
967 fn test_xor_patch_wide_chunks() {
968 let mut dst = vec![0xF0_u8; 37];
969 let patch = vec![0x0F_u8; 37];
970 let expected: Vec<u8> = dst.iter().zip(patch.iter()).map(|(d, p)| *d ^ *p).collect();
971
972 let layout =
973 xor_patch_wide_chunks(&mut dst, &patch).expect("equal-length buffers should be valid");
974 assert_eq!(
975 layout,
976 WideChunkLayout {
977 u128_chunks: 2,
978 u64_chunks: 0,
979 tail_bytes: 5,
980 },
981 "bead_id={SIMD_BEAD_ID} case=chunk_layout_expected"
982 );
983 assert_eq!(
984 dst, expected,
985 "bead_id={SIMD_BEAD_ID} case=xor_patch_matches_scalar_reference"
986 );
987 }
988
989 #[test]
990 fn test_xor_symbols_u64_chunks() {
991 let mut dst = vec![0xAB_u8; 24];
993 let src = vec![0xCD_u8; 24];
994 let mut expected = vec![0xAB_u8; 24];
995 xor_patch_bytewise(&mut expected, &src);
996
997 let layout = xor_patch_wide_chunks(&mut dst, &src).expect("equal-length buffers");
998 assert_eq!(
999 layout,
1000 WideChunkLayout {
1001 u128_chunks: 1,
1002 u64_chunks: 1,
1003 tail_bytes: 0,
1004 },
1005 "bead_id=bd-2ddc case=u64_chunk_lane_exercised"
1006 );
1007 assert_eq!(
1008 dst, expected,
1009 "bead_id=bd-2ddc case=chunked_xor_matches_bytewise_reference"
1010 );
1011 }
1012
1013 #[test]
1014 fn test_gf256_multiply_chunks() {
1015 let coeff = 0xA7_u8;
1016 let src = deterministic_payload(4096, 0xDDCC_BBAA_1122_3344);
1017 let mut chunked = vec![0_u8; src.len()];
1018 let mut scalar = vec![0_u8; src.len()];
1019
1020 symbol_mul_into(coeff, &src, &mut chunked).expect("chunked symbol_mul_into");
1021 gf256_mul_bytewise(coeff, &src, &mut scalar);
1022
1023 assert_eq!(
1024 chunked, scalar,
1025 "bead_id=bd-2ddc case=chunked_mul_matches_scalar_reference"
1026 );
1027 }
1028
1029 #[test]
1030 fn test_u128_chunk_alignment() {
1031 let mut via_wide = deterministic_payload(4099, 0x1234_5678_9ABC_DEF0);
1033 let mut via_u64_only = via_wide.clone();
1034 let patch = deterministic_payload(4099, 0x0F0E_0D0C_0B0A_0908);
1035
1036 xor_patch_wide_chunks(&mut via_wide, &patch).expect("wide chunk xor");
1037
1038 let mut dst_u64_chunks = via_u64_only.chunks_exact_mut(8);
1039 let mut patch_u64_chunks = patch.chunks_exact(8);
1040 for (dst_chunk, patch_chunk) in dst_u64_chunks.by_ref().zip(patch_u64_chunks.by_ref()) {
1041 let dst_word = u64::from_ne_bytes(
1042 dst_chunk
1043 .try_into()
1044 .expect("chunks_exact(8) must yield 8-byte chunk"),
1045 );
1046 let patch_word = u64::from_ne_bytes(
1047 patch_chunk
1048 .try_into()
1049 .expect("chunks_exact(8) must yield 8-byte chunk"),
1050 );
1051 dst_chunk.copy_from_slice(&(dst_word ^ patch_word).to_ne_bytes());
1052 }
1053 for (dst_byte, patch_byte) in dst_u64_chunks
1054 .into_remainder()
1055 .iter_mut()
1056 .zip(patch_u64_chunks.remainder().iter())
1057 {
1058 *dst_byte ^= *patch_byte;
1059 }
1060
1061 assert_eq!(
1062 via_wide, via_u64_only,
1063 "bead_id=bd-2ddc case=u128_lane_matches_u64_plus_tail"
1064 );
1065 }
1066
1067 #[test]
1068 fn test_benchmark_chunk_vs_byte() {
1069 if cfg!(debug_assertions) {
1071 return;
1072 }
1073
1074 let iterations = 32_000_usize;
1075 let src = deterministic_payload(4096, 0xDEAD_BEEF_F00D_CAFE);
1076 let base = deterministic_payload(4096, 0x0123_4567_89AB_CDEF);
1077
1078 let mut chunked = base.clone();
1079 let chunked_start = Instant::now();
1080 for _ in 0..iterations {
1081 xor_patch_wide_chunks(&mut chunked, &src).expect("chunked xor");
1082 std::hint::black_box(&chunked);
1083 }
1084 let chunked_elapsed = chunked_start.elapsed();
1085
1086 let mut bytewise = base;
1087 let bytewise_start = Instant::now();
1088 for _ in 0..iterations {
1089 xor_patch_bytewise(&mut bytewise, &src);
1090 std::hint::black_box(&bytewise);
1091 }
1092 let bytewise_elapsed = bytewise_start.elapsed();
1093
1094 let speedup = bytewise_elapsed.as_secs_f64() / chunked_elapsed.as_secs_f64();
1095 assert!(
1096 speedup >= 4.0,
1097 "bead_id=bd-2ddc case=chunk_vs_byte_speedup speedup={speedup:.2}x \
1098 chunked_ns={} bytewise_ns={} iterations={iterations}",
1099 chunked_elapsed.as_nanos(),
1100 bytewise_elapsed.as_nanos()
1101 );
1102 }
1103
1104 #[test]
1105 fn test_no_unsafe_simd() {
1106 let manifest = include_str!("../../../Cargo.toml");
1107 assert!(
1108 manifest.contains(r#"unsafe_code = "forbid""#),
1109 "bead_id=bd-2ddc case=workspace_forbids_unsafe"
1110 );
1111
1112 let workspace_root = std::path::Path::new(env!("CARGO_MANIFEST_DIR"))
1113 .parent()
1114 .expect("crate dir has parent")
1115 .parent()
1116 .expect("workspace root exists")
1117 .to_path_buf();
1118 let crates_dir = workspace_root.join("crates");
1119
1120 let mut rs_files = Vec::new();
1121 collect_rs_files(&crates_dir, &mut rs_files);
1122
1123 let simd_needles = [
1124 "_mm_",
1125 "std::arch::",
1126 "core::arch::",
1127 "__m128",
1128 "__m256",
1129 "__m512",
1130 "simd_shuffle",
1131 "vpxor",
1132 "vxorq",
1133 ];
1134
1135 let mut offenders = Vec::new();
1136 for file in rs_files {
1137 let Ok(content) = std::fs::read_to_string(&file) else {
1138 continue;
1139 };
1140
1141 let lines = content.lines().collect::<Vec<_>>();
1142 for (idx, line) in lines.iter().enumerate() {
1143 let has_intrinsic = simd_needles.iter().any(|needle| line.contains(needle));
1144 if !has_intrinsic {
1145 continue;
1146 }
1147
1148 let window_start = idx.saturating_sub(3);
1149 let window_end = (idx + 3).min(lines.len().saturating_sub(1));
1150 let mut found_unsafe_nearby = false;
1151 for nearby in &lines[window_start..=window_end] {
1152 let trimmed = nearby.trim_start();
1153 let is_comment = trimmed.starts_with("//");
1154 if !is_comment && trimmed.contains("unsafe") {
1155 found_unsafe_nearby = true;
1156 break;
1157 }
1158 }
1159
1160 if found_unsafe_nearby {
1161 offenders.push(format!("{}:{}", file.display(), idx + 1));
1162 }
1163 }
1164 }
1165
1166 assert!(
1167 offenders.is_empty(),
1168 "bead_id=bd-2ddc case=no_unsafe_simd_intrinsics offenders={offenders:?}"
1169 );
1170 }
1171
1172 #[test]
1173 fn test_checksum_simd_friendly() {
1174 let buffer = vec![0x11_u8; 256];
1175 let (xx_a, blake_a) =
1176 simd_friendly_checksum_pair(&buffer).expect("checksum pair must succeed");
1177
1178 let mut modified = buffer;
1179 modified[255] ^= 0x01;
1180 let (xx_b, blake_b) =
1181 simd_friendly_checksum_pair(&modified).expect("checksum pair must succeed");
1182
1183 assert_ne!(
1184 xx_a, xx_b,
1185 "bead_id={SIMD_BEAD_ID} case=xxhash3_changes_on_byte_flip"
1186 );
1187 assert_ne!(
1188 blake_a, blake_b,
1189 "bead_id={SIMD_BEAD_ID} case=blake3_changes_on_byte_flip"
1190 );
1191 }
1192
1193 #[test]
1194 fn test_e2e_bounded_parallelism_under_background_load() {
1195 let cfg = BulkheadConfig::new(4, 0, OverflowPolicy::DropBusy)
1196 .expect("non-zero max_concurrent must be valid");
1197 let bulkhead = Arc::new(Bulkhead::new(cfg));
1198
1199 let handles: Vec<_> = (0..48)
1200 .map(|_| {
1201 let bulkhead = Arc::clone(&bulkhead);
1202 thread::spawn(move || {
1203 bulkhead.run(|| {
1204 thread::sleep(Duration::from_millis(10));
1205 })
1206 })
1207 })
1208 .collect();
1209
1210 let mut busy = 0_usize;
1211 for handle in handles {
1212 match handle.join().expect("worker thread should not panic") {
1213 Ok(()) => {}
1214 Err(FrankenError::Busy) => busy = busy.saturating_add(1),
1215 Err(err) => {
1216 assert_eq!(
1217 err.error_code(),
1218 fsqlite_error::ErrorCode::Busy,
1219 "bead_id={BEAD_ID} case=e2e_unexpected_bulkhead_error err={err}"
1220 );
1221 busy = busy.saturating_add(1);
1222 }
1223 }
1224 }
1225
1226 assert!(
1227 busy > 0,
1228 "bead_id={BEAD_ID} case=e2e_should_observe_overflow_rejections"
1229 );
1230 assert!(
1231 bulkhead.peak_in_flight() <= cfg.admission_limit(),
1232 "bead_id={BEAD_ID} case=e2e_peak_parallelism_exceeded peak={} limit={}",
1233 bulkhead.peak_in_flight(),
1234 cfg.admission_limit()
1235 );
1236 }
1237
1238 #[test]
1239 fn test_e2e_simd_hot_path_correctness() {
1240 let contiguous = b"key-0001key-0002".to_vec();
1242 let left = &contiguous[0..8];
1243 let right = &contiguous[8..16];
1244 let compare_start = Instant::now();
1245 assert_eq!(
1246 compare_key_bytes_contiguous(left, right),
1247 left.cmp(right),
1248 "bead_id={SIMD_BEAD_ID} case=btree_contiguous_compare_correct"
1249 );
1250 let compare_elapsed = compare_start.elapsed();
1251
1252 let mut symbol_a = (0_u8..64).collect::<Vec<u8>>();
1254 let symbol_b = (64_u8..128).collect::<Vec<u8>>();
1255 let expected_add: Vec<u8> = symbol_a
1256 .iter()
1257 .zip(symbol_b.iter())
1258 .map(|(a, b)| *a ^ *b)
1259 .collect();
1260 let gf256_start = Instant::now();
1261 gf256_add_assign_chunked(&mut symbol_a, &symbol_b).expect("gf256 add should succeed");
1262 let gf256_elapsed = gf256_start.elapsed();
1263 assert_eq!(
1264 symbol_a, expected_add,
1265 "bead_id={SIMD_BEAD_ID} case=gf256_chunked_add_correct"
1266 );
1267
1268 let mut patch_target = vec![0x33_u8; 64];
1269 let patch = vec![0xCC_u8; 64];
1270 let xor_start = Instant::now();
1271 xor_patch_wide_chunks(&mut patch_target, &patch).expect("xor patch should succeed");
1272 let xor_elapsed = xor_start.elapsed();
1273 assert!(
1274 patch_target.iter().all(|&byte| byte == (0x33_u8 ^ 0xCC_u8)),
1275 "bead_id={SIMD_BEAD_ID} case=xor_patch_chunked_correct"
1276 );
1277
1278 let checksum_start = Instant::now();
1280 let (xx, blake) =
1281 simd_friendly_checksum_pair(&patch_target).expect("checksum pair should succeed");
1282 let checksum_elapsed = checksum_start.elapsed();
1283 assert_ne!(
1284 xx, 0,
1285 "bead_id={SIMD_BEAD_ID} case=xxhash3_nonzero_for_nonempty_payload"
1286 );
1287 assert!(
1288 blake.iter().any(|&b| b != 0),
1289 "bead_id={SIMD_BEAD_ID} case=blake3_digest_nonzero"
1290 );
1291
1292 eprintln!(
1293 "bead_id={SIMD_BEAD_ID} metric=simd_hot_path_ns compare={} gf256_add={} xor_patch={} checksum={}",
1294 compare_elapsed.as_nanos(),
1295 gf256_elapsed.as_nanos(),
1296 xor_elapsed.as_nanos(),
1297 checksum_elapsed.as_nanos()
1298 );
1299 }
1300
1301 #[test]
1302 fn test_symbol_add_self_inverse() {
1303 let src = (0_u16..512)
1304 .map(|idx| u8::try_from(idx % 251).expect("modulo fits in u8"))
1305 .collect::<Vec<_>>();
1306 let mut dst = src.clone();
1307 symbol_add_assign(&mut dst, &src).expect("symbol_add should succeed");
1308 assert!(
1309 dst.iter().all(|byte| *byte == 0),
1310 "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_self_inverse"
1311 );
1312 }
1313
1314 #[test]
1315 fn test_symbol_add_commutative_and_associative() {
1316 let a = (0_u16..128)
1317 .map(|idx| u8::try_from((idx * 3) % 251).expect("modulo fits"))
1318 .collect::<Vec<_>>();
1319 let b = (0_u16..128)
1320 .map(|idx| u8::try_from((idx * 5 + 7) % 251).expect("modulo fits"))
1321 .collect::<Vec<_>>();
1322 let c = (0_u16..128)
1323 .map(|idx| u8::try_from((idx * 11 + 13) % 251).expect("modulo fits"))
1324 .collect::<Vec<_>>();
1325
1326 let mut ab = a.clone();
1327 symbol_add_assign(&mut ab, &b).expect("a+b");
1328 let mut ba = b.clone();
1329 symbol_add_assign(&mut ba, &a).expect("b+a");
1330 assert_eq!(
1331 ab, ba,
1332 "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_commutative"
1333 );
1334
1335 let mut lhs = a.clone();
1336 symbol_add_assign(&mut lhs, &b).expect("(a+b)");
1337 symbol_add_assign(&mut lhs, &c).expect("(a+b)+c");
1338
1339 let mut rhs = b;
1340 symbol_add_assign(&mut rhs, &c).expect("(b+c)");
1341 let mut rhs2 = a;
1342 symbol_add_assign(&mut rhs2, &rhs).expect("a+(b+c)");
1343
1344 assert_eq!(
1345 lhs, rhs2,
1346 "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_associative"
1347 );
1348 }
1349
1350 #[test]
1351 fn test_symbol_mul_special_cases() {
1352 let src = (0_u16..256)
1353 .map(|idx| u8::try_from(idx).expect("idx fits"))
1354 .collect::<Vec<_>>();
1355
1356 let mut out_zero = vec![0_u8; src.len()];
1357 symbol_mul_into(0, &src, &mut out_zero).expect("mul by zero");
1358 assert!(
1359 out_zero.iter().all(|byte| *byte == 0),
1360 "bead_id={RAPTORQ_BEAD_ID} case=symbol_mul_zero"
1361 );
1362
1363 let mut out_one = vec![0_u8; src.len()];
1364 symbol_mul_into(1, &src, &mut out_one).expect("mul by one");
1365 assert_eq!(
1366 out_one, src,
1367 "bead_id={RAPTORQ_BEAD_ID} case=symbol_mul_identity"
1368 );
1369 }
1370
1371 #[test]
1372 fn test_symbol_mul_matches_scalar_reference() {
1373 let src = (0_u16..512)
1374 .map(|idx| u8::try_from((idx * 7 + 17) % 251).expect("modulo fits"))
1375 .collect::<Vec<_>>();
1376 let coeff = 0xA7_u8;
1377
1378 let mut out = vec![0_u8; src.len()];
1379 symbol_mul_into(coeff, &src, &mut out).expect("symbol mul");
1380 for (actual, input) in out.iter().zip(src.iter()) {
1381 let expected = gf256_mul_byte(coeff, *input);
1382 assert_eq!(
1383 *actual, expected,
1384 "bead_id={RAPTORQ_BEAD_ID} case=symbol_mul_scalar_match"
1385 );
1386 }
1387 }
1388
1389 #[test]
1390 fn test_symbol_addmul_special_cases_and_equivalence() {
1391 let src = (0_u16..512)
1392 .map(|idx| u8::try_from((idx * 13 + 19) % 251).expect("modulo fits"))
1393 .collect::<Vec<_>>();
1394 let original = (0_u16..512)
1395 .map(|idx| u8::try_from((idx * 9 + 3) % 251).expect("modulo fits"))
1396 .collect::<Vec<_>>();
1397
1398 let mut no_op = original.clone();
1399 symbol_addmul_assign(&mut no_op, 0, &src).expect("c=0");
1400 assert_eq!(
1401 no_op, original,
1402 "bead_id={RAPTORQ_BEAD_ID} case=symbol_addmul_c0_noop"
1403 );
1404
1405 let mut xor_path = original.clone();
1406 symbol_addmul_assign(&mut xor_path, 1, &src).expect("c=1");
1407 let mut expected_xor = original.clone();
1408 symbol_add_assign(&mut expected_xor, &src).expect("xor reference");
1409 assert_eq!(
1410 xor_path, expected_xor,
1411 "bead_id={RAPTORQ_BEAD_ID} case=symbol_addmul_c1_equals_xor"
1412 );
1413
1414 let coeff = 0x53_u8;
1415 let mut fused = original.clone();
1416 symbol_addmul_assign(&mut fused, coeff, &src).expect("fused");
1417 let mut mul = vec![0_u8; src.len()];
1418 symbol_mul_into(coeff, &src, &mut mul).expect("mul");
1419 let mut separate = original;
1420 symbol_add_assign(&mut separate, &mul).expect("add");
1421 assert_eq!(
1422 fused, separate,
1423 "bead_id={RAPTORQ_BEAD_ID} case=symbol_addmul_fused_equals_mul_plus_add"
1424 );
1425 }
1426
1427 #[test]
1428 fn test_symbol_operations_4096_and_512() {
1429 for symbol_len in [4096_usize, 1024_usize, 512_usize] {
1430 let a = vec![0xAA_u8; symbol_len];
1431 let b = vec![0x55_u8; symbol_len];
1432 let mut sum = a.clone();
1433 let layout = symbol_add_assign(&mut sum, &b).expect("symbol add");
1434 assert_eq!(
1435 layout,
1436 WideChunkLayout::for_len(symbol_len),
1437 "bead_id={RAPTORQ_BEAD_ID} case=symbol_len_layout_consistency len={symbol_len}"
1438 );
1439 assert!(
1440 sum.iter().all(|byte| *byte == (0xAA_u8 ^ 0x55_u8)),
1441 "bead_id={RAPTORQ_BEAD_ID} case=symbol_add_expected_xor len={symbol_len}"
1442 );
1443 }
1444 }
1445
1446 #[test]
1447 fn test_gf256_arithmetic_matches_asupersync() {
1448 for a in 0_u8..=u8::MAX {
1449 for b in 0_u8..=u8::MAX {
1450 assert_eq!(
1451 gf256_mul_byte(a, b),
1452 (Gf256(a) * Gf256(b)).raw(),
1453 "bead_id={RAPTORQ_BEAD_ID} case=gf256_mul_parity a=0x{a:02X} b=0x{b:02X}"
1454 );
1455 }
1456 }
1457 }
1458
1459 #[test]
1460 fn test_symbol_ops_match_asupersync_gf256_slices() {
1461 for symbol_len in [512_usize, 1024_usize, 4096_usize] {
1462 let src = deterministic_payload(symbol_len, 0xA5A5_0101);
1463 let dst = deterministic_payload(symbol_len, 0x5A5A_0202);
1464
1465 let mut ours_add = dst.clone();
1466 symbol_add_assign(&mut ours_add, &src).expect("symbol add");
1467 let mut as_add = dst.clone();
1468 gf256_add_slice(&mut as_add, &src);
1469 assert_eq!(
1470 ours_add, as_add,
1471 "bead_id={RAPTORQ_BEAD_ID} case=asupersync_parity_add len={symbol_len}"
1472 );
1473
1474 for coeff in [0_u8, 1_u8, 0x53_u8, 0xA7_u8] {
1475 let mut ours_mul = vec![0_u8; symbol_len];
1476 symbol_mul_into(coeff, &src, &mut ours_mul).expect("symbol mul");
1477 let mut as_mul = src.clone();
1478 gf256_mul_slice(&mut as_mul, Gf256(coeff));
1479 assert_eq!(
1480 ours_mul, as_mul,
1481 "bead_id={RAPTORQ_BEAD_ID} case=asupersync_parity_mul len={symbol_len} coeff=0x{coeff:02X}"
1482 );
1483
1484 let mut ours_addmul = dst.clone();
1485 symbol_addmul_assign(&mut ours_addmul, coeff, &src).expect("symbol addmul");
1486 let mut as_addmul = dst.clone();
1487 gf256_addmul_slice(&mut as_addmul, &src, Gf256(coeff));
1488 assert_eq!(
1489 ours_addmul, as_addmul,
1490 "bead_id={RAPTORQ_BEAD_ID} case=asupersync_parity_addmul len={symbol_len} coeff=0x{coeff:02X}"
1491 );
1492 }
1493 }
1494 }
1495
1496 #[test]
1497 fn test_encode_single_source_block() {
1498 let config = raptorq_config(512, 1.25);
1499 let symbol_size = usize::from(config.encoding.symbol_size);
1500 let k = 8_usize;
1501 let data = deterministic_payload(k * symbol_size, 0x0102_0304);
1502 let object_id = AsObjectId::new_for_test(1201);
1503 tracing::info!(
1504 bead_id = RAPTORQ_BEAD_ID,
1505 case = "test_encode_single_source_block",
1506 symbol_size,
1507 requested_k = k,
1508 "encoding source block"
1509 );
1510 let (symbols, source_symbols) = encode_symbols(config, object_id, &data);
1511 let (sources, repairs) = split_source_and_repair(&symbols, source_symbols);
1512
1513 assert_eq!(
1514 source_symbols, k,
1515 "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_source_count"
1516 );
1517 assert_eq!(
1518 sources.len(),
1519 source_symbols,
1520 "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_source_partition"
1521 );
1522 assert!(
1523 !repairs.is_empty(),
1524 "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_repair_present"
1525 );
1526 assert!(
1527 symbols.iter().all(|symbol| symbol.len() == symbol_size),
1528 "bead_id={RAPTORQ_BEAD_ID} case=encode_single_block_symbol_size_consistent"
1529 );
1530 }
1531
1532 #[test]
1533 fn test_decode_exact_k_symbols() {
1534 let symbol_size = 512_usize;
1535 let k = 16_usize;
1536 let seed = 0x0BAD_CAFE_u64;
1537 let source = low_level_source_block(k, symbol_size, seed);
1538 let encoder =
1539 SystematicEncoder::new(&source, symbol_size, seed).expect("systematic encoder");
1540 let decoder = InactivationDecoder::new(k, symbol_size, seed);
1541 let params = decoder.params();
1542 let base_rows = params.s + params.h;
1543 let constraints = ConstraintMatrix::build(params, seed);
1544 let mut received = decoder.constraint_symbols();
1545 let source_indexes = (0..k).collect::<Vec<_>>();
1546 append_source_received_symbols(
1547 &mut received,
1548 &constraints,
1549 base_rows,
1550 params.k_prime,
1551 symbol_size,
1552 &source,
1553 &source_indexes,
1554 );
1555
1556 tracing::warn!(
1557 bead_id = RAPTORQ_BEAD_ID,
1558 case = "test_decode_exact_k_symbols",
1559 source_symbols = k,
1560 "decoding with minimum symbol count (fragile recovery threshold)"
1561 );
1562 let decode_outcome = decoder
1563 .decode(&received)
1564 .expect("decode exact-k must succeed");
1565 assert_eq!(
1566 decode_outcome.source, source,
1567 "bead_id={RAPTORQ_BEAD_ID} case=decode_exact_k_symbols_roundtrip"
1568 );
1569 assert_eq!(
1570 decode_outcome.intermediate[0].len(),
1571 symbol_size,
1572 "bead_id={RAPTORQ_BEAD_ID} case=decode_exact_k_symbol_size"
1573 );
1574 assert_eq!(
1575 encoder.intermediate_symbol(0),
1576 decode_outcome.intermediate[0],
1577 "bead_id={RAPTORQ_BEAD_ID} case=decode_exact_k_intermediate_consistency"
1578 );
1579 }
1580
1581 #[test]
1582 fn test_decode_with_repair_symbols() {
1583 let symbol_size = 512_usize;
1584 let k = 16_usize;
1585 let seed = 0xABC0_FED1_u64;
1586 let source = low_level_source_block(k, symbol_size, seed);
1587 let encoder =
1588 SystematicEncoder::new(&source, symbol_size, seed).expect("systematic encoder");
1589 let decoder = InactivationDecoder::new(k, symbol_size, seed);
1590 let params = decoder.params();
1591 let base_rows = params.s + params.h;
1592 let constraints = ConstraintMatrix::build(params, seed);
1593
1594 let mut received = decoder.constraint_symbols();
1595 let source_indexes = (1..k).collect::<Vec<_>>();
1596 append_source_received_symbols(
1597 &mut received,
1598 &constraints,
1599 base_rows,
1600 params.k_prime,
1601 symbol_size,
1602 &source,
1603 &source_indexes,
1604 );
1605
1606 let repair_esi = u32::try_from(k).expect("k fits u32");
1607 let (columns, coefficients) = decoder.repair_equation(repair_esi);
1608 let repair_data = encoder.repair_symbol(repair_esi);
1609 received.push(ReceivedSymbol::repair(
1610 repair_esi,
1611 columns,
1612 coefficients,
1613 repair_data,
1614 ));
1615
1616 let decode_outcome = decoder
1617 .decode(&received)
1618 .expect("decode with one repair must succeed");
1619 assert_eq!(
1620 decode_outcome.source, source,
1621 "bead_id={RAPTORQ_BEAD_ID} case=decode_with_repair_roundtrip"
1622 );
1623 }
1624
1625 #[test]
1626 fn test_decode_insufficient_symbols() {
1627 let symbol_size = 512_usize;
1628 let k = 8_usize;
1629 let seed = 0xDEAD_BEEF_u64;
1630 let source = low_level_source_block(k, symbol_size, seed);
1631 let decoder = InactivationDecoder::new(k, symbol_size, seed);
1632 let params = decoder.params();
1633 let base_rows = params.s + params.h;
1634 let constraints = ConstraintMatrix::build(params, seed);
1635 let mut received = decoder.constraint_symbols();
1636 let source_indexes = (0..k.saturating_sub(1)).collect::<Vec<_>>();
1637 append_source_received_symbols(
1638 &mut received,
1639 &constraints,
1640 base_rows,
1641 params.k_prime,
1642 symbol_size,
1643 &source,
1644 &source_indexes,
1645 );
1646
1647 let decode = decoder.decode(&received);
1648 assert!(
1649 decode.is_err(),
1650 "bead_id={RAPTORQ_BEAD_ID} case=decode_insufficient_symbols unexpectedly succeeded"
1651 );
1652 if let Err(err) = decode {
1653 tracing::error!(
1654 bead_id = RAPTORQ_BEAD_ID,
1655 case = "test_decode_insufficient_symbols",
1656 error = ?err,
1657 "decode failed as expected due to insufficient symbols"
1658 );
1659 }
1660 }
1661
1662 #[test]
1663 fn test_symbol_size_alignment() {
1664 let config = raptorq_config(4096, 1.20);
1665 let symbol_size = usize::from(config.encoding.symbol_size);
1666 let data = deterministic_payload(symbol_size * 3, 0x600D_1111);
1667 let object_id = AsObjectId::new_for_test(1205);
1668 let (symbols, source_symbols) = encode_symbols(config, object_id, &data);
1669
1670 assert_eq!(
1671 source_symbols, 3,
1672 "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_source_count"
1673 );
1674 assert!(
1675 symbol_size.is_power_of_two(),
1676 "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_power_of_two"
1677 );
1678 assert_eq!(
1679 symbol_size % 512,
1680 0,
1681 "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_sector_multiple"
1682 );
1683 assert!(
1684 symbols.iter().all(|symbol| symbol.len() == symbol_size),
1685 "bead_id={RAPTORQ_BEAD_ID} case=symbol_size_alignment_symbol_lengths"
1686 );
1687 }
1688
1689 #[test]
1690 fn prop_encode_decode_roundtrip() {
1691 for seed in [11_u64, 29_u64, 43_u64, 71_u64] {
1692 for k in [8_usize, 16_usize] {
1693 let config = raptorq_config(512, 1.30);
1694 let symbol_size = usize::from(config.encoding.symbol_size);
1695 let data = deterministic_payload(k * symbol_size - 17, seed);
1696 let object_id = AsObjectId::new_for_test(2000 + seed);
1697 let (symbols, source_symbols) = encode_symbols(config.clone(), object_id, &data);
1698 let (sources, _) = split_source_and_repair(&symbols, source_symbols);
1699 let subset = sources
1700 .iter()
1701 .take(source_symbols)
1702 .cloned()
1703 .collect::<Vec<_>>();
1704 let decoded = decode_symbols(config, object_id, data.len(), source_symbols, subset)
1705 .expect("property roundtrip decode");
1706 assert_eq!(
1707 decoded, data,
1708 "bead_id={RAPTORQ_BEAD_ID} case=prop_encode_decode_roundtrip seed={seed} k={k}"
1709 );
1710 }
1711 }
1712 }
1713
1714 #[test]
1715 fn prop_any_k_of_n_suffices() {
1716 let symbol_size = 512_usize;
1717 let k = 16_usize;
1718 let sources = (0..k)
1719 .map(|symbol_idx| {
1720 deterministic_payload(
1721 symbol_size,
1722 0x5000_0000 + u64::try_from(symbol_idx).expect("index fits u64"),
1723 )
1724 })
1725 .collect::<Vec<_>>();
1726
1727 let mut parity = vec![0_u8; symbol_size];
1728 for source in &sources {
1729 symbol_add_assign(&mut parity, source).expect("parity construction");
1730 }
1731
1732 for omitted in 0..=k {
1733 let rebuilt = if omitted == k {
1734 sources.clone()
1735 } else {
1736 let mut recovered = parity.clone();
1737 for (index, source) in sources.iter().enumerate() {
1738 if index != omitted {
1739 symbol_add_assign(&mut recovered, source).expect("single-erasure recovery");
1740 }
1741 }
1742 let mut rebuilt = sources.clone();
1743 rebuilt[omitted] = recovered;
1744 rebuilt
1745 };
1746
1747 assert_eq!(
1748 rebuilt.len(),
1749 k,
1750 "bead_id={RAPTORQ_BEAD_ID} case=prop_any_k_of_n_subset_size omitted_index={omitted}"
1751 );
1752 assert_eq!(
1753 rebuilt, sources,
1754 "bead_id={RAPTORQ_BEAD_ID} case=prop_any_k_of_n_suffices omitted_index={omitted}"
1755 );
1756 }
1757 }
1758
1759 #[test]
1760 fn prop_symbol_size_consistent() {
1761 for symbol_size in [512_u16, 1024_u16, 4096_u16] {
1762 for k in [4_usize, 8_usize] {
1763 let config = raptorq_config(symbol_size, 1.25);
1764 let size = usize::from(symbol_size);
1765 let object_id = AsObjectId::new_for_test(
1766 u64::from(symbol_size) * 100 + u64::try_from(k).expect("k fits u64"),
1767 );
1768 let data = deterministic_payload(k * size - 3, u64::from(symbol_size));
1769 let (symbols, _) = encode_symbols(config, object_id, &data);
1770 assert!(
1771 symbols.iter().all(|symbol| symbol.len() == size),
1772 "bead_id={RAPTORQ_BEAD_ID} case=prop_symbol_size_consistent symbol_size={symbol_size} k={k}"
1773 );
1774 }
1775 }
1776 }
1777
1778 #[test]
1779 fn test_e2e_symbol_ops_in_encode_decode_roundtrip() {
1780 for (run, k) in [8_usize, 16_usize, 64_usize].iter().copied().enumerate() {
1781 let config = raptorq_config(512, 1.30);
1782 let symbol_size = usize::from(config.encoding.symbol_size);
1783 let data = deterministic_payload(
1784 k * symbol_size,
1785 0x4455_6677 + u64::try_from(run).expect("run fits u64"),
1786 );
1787 let object_id = AsObjectId::new_for_test(3000 + u64::try_from(k).expect("k fits u64"));
1788
1789 tracing::info!(
1790 bead_id = RAPTORQ_BEAD_ID,
1791 case = "test_e2e_symbol_ops_in_encode_decode_roundtrip",
1792 k,
1793 symbol_size,
1794 "starting encode/decode roundtrip"
1795 );
1796 let (symbols, source_symbols) = encode_symbols(config.clone(), object_id, &data);
1797 let (sources, _) = split_source_and_repair(&symbols, source_symbols);
1798 let subset = sources
1799 .iter()
1800 .take(source_symbols)
1801 .cloned()
1802 .collect::<Vec<_>>();
1803 let decoded = decode_symbols(config, object_id, data.len(), source_symbols, subset)
1804 .expect("e2e decode must succeed");
1805
1806 assert_eq!(
1807 decoded, data,
1808 "bead_id={RAPTORQ_BEAD_ID} case=e2e_roundtrip_bytes k={k}"
1809 );
1810
1811 let mut source_parity = vec![0_u8; symbol_size];
1812 for chunk in data.chunks_exact(symbol_size) {
1813 symbol_add_assign(&mut source_parity, chunk).expect("source parity xor");
1814 }
1815
1816 let mut decoded_parity = vec![0_u8; symbol_size];
1817 for chunk in decoded.chunks_exact(symbol_size) {
1818 symbol_add_assign(&mut decoded_parity, chunk).expect("decoded parity xor");
1819 }
1820
1821 assert_eq!(
1822 decoded_parity, source_parity,
1823 "bead_id={RAPTORQ_BEAD_ID} case=e2e_symbol_ops_parity k={k}"
1824 );
1825 }
1826 }
1827}