Skip to main content

fsqlite_core/
raptorq_codec.rs

1//! Production `SymbolCodec` implementation backed by asupersync RaptorQ (bd-3sj9w).
2//!
3//! This module lifts the test-only `AsupersyncCodec` from `raptorq_integration`
4//! into a public, production-ready codec.  The codec wraps asupersync's
5//! `RaptorQSenderBuilder` / `RaptorQReceiverBuilder` behind the
6//! `SymbolCodec` trait, translating between FrankenSQLite's packed symbol
7//! key format and asupersync's `Symbol` / `SymbolId` types.
8
9use std::collections::VecDeque;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12use std::time::{Duration, SystemTime, UNIX_EPOCH};
13
14use asupersync::error::ErrorKind as AsErrorKind;
15use asupersync::raptorq::{RaptorQReceiverBuilder, RaptorQSenderBuilder};
16use asupersync::security::AuthenticationTag;
17use asupersync::security::authenticated::AuthenticatedSymbol;
18use asupersync::transport::error::{SinkError, StreamError};
19use asupersync::transport::sink::SymbolSink;
20use asupersync::transport::stream::SymbolStream;
21use asupersync::types::Time as AsTime;
22use asupersync::types::{
23    CancelKind as AsCancelKind, CancelReason as AsCancelReason, ObjectId as AsObjectId,
24    ObjectParams, Symbol, SymbolId, SymbolKind,
25};
26use asupersync::{Budget as AsBudget, Cx as AsCx, RaptorQConfig};
27
28use fsqlite_error::{FrankenError, Result};
29use fsqlite_types::cx::Cx;
30
31use crate::raptorq_integration::{
32    CodecDecodeResult, CodecEncodeResult, DecodeFailureReason, SymbolCodec,
33};
34
35const BEAD_ID: &str = "bd-3sj9w";
36
37/// Fixed object ID for production codec operations.  The object ID is not
38/// semantically meaningful for FrankenSQLite's page-level FEC — each WAL
39/// commit group is a standalone encode/decode unit — so we use a constant
40/// derived from the bead lineage.
41const PRODUCTION_OBJECT_ID: u64 = 0xF5_3D9A_0001;
42
43// ---------------------------------------------------------------------------
44// Packed symbol key format
45// ---------------------------------------------------------------------------
46//
47// 32-bit key layout:
48//   [31]     = kind (0 = source, 1 = repair)
49//   [30..23] = source block number (SBN, 8 bits)
50//   [22..0]  = encoding symbol ID (ESI, 23 bits)
51
52const PACKED_KIND_REPAIR_BIT: u32 = 1_u32 << 31;
53const PACKED_SBN_SHIFT: u32 = 23;
54const PACKED_SBN_MASK: u32 = 0xFF;
55const PACKED_ESI_MASK: u32 = 0x7F_FFFF;
56
57/// Pack a `(kind, sbn, esi)` triple into a 32-bit key.
58///
59/// Returns an error if `esi` exceeds 23 bits.
60pub fn pack_symbol_key(kind: SymbolKind, sbn: u8, esi: u32) -> Result<u32> {
61    if esi > PACKED_ESI_MASK {
62        return Err(FrankenError::OutOfRange {
63            what: "packed symbol esi (must fit 23 bits)".to_owned(),
64            value: esi.to_string(),
65        });
66    }
67
68    let kind_bit = if kind.is_repair() {
69        PACKED_KIND_REPAIR_BIT
70    } else {
71        0
72    };
73    Ok(kind_bit | (u32::from(sbn) << PACKED_SBN_SHIFT) | esi)
74}
75
76/// Unpack a 32-bit key into `(kind, sbn, esi)`.
77#[must_use]
78pub fn unpack_symbol_key(packed: u32) -> (SymbolKind, u8, u32) {
79    let kind = if packed & PACKED_KIND_REPAIR_BIT == 0 {
80        SymbolKind::Source
81    } else {
82        SymbolKind::Repair
83    };
84    #[allow(clippy::cast_possible_truncation)]
85    let sbn = ((packed >> PACKED_SBN_SHIFT) & PACKED_SBN_MASK) as u8;
86    let esi = packed & PACKED_ESI_MASK;
87    (kind, sbn, esi)
88}
89
90// ---------------------------------------------------------------------------
91// In-memory transport adapters
92// ---------------------------------------------------------------------------
93
94/// In-memory symbol sink that collects symbols into a `Vec`.
95#[derive(Debug)]
96struct VecTransportSink {
97    symbols: Vec<Symbol>,
98}
99
100impl VecTransportSink {
101    fn new() -> Self {
102        Self {
103            symbols: Vec::new(),
104        }
105    }
106}
107
108impl SymbolSink for VecTransportSink {
109    fn poll_send(
110        mut self: Pin<&mut Self>,
111        _cx: &mut Context<'_>,
112        symbol: AuthenticatedSymbol,
113    ) -> Poll<std::result::Result<(), SinkError>> {
114        self.symbols.push(symbol.into_symbol());
115        Poll::Ready(Ok(()))
116    }
117
118    fn poll_flush(
119        self: Pin<&mut Self>,
120        _cx: &mut Context<'_>,
121    ) -> Poll<std::result::Result<(), SinkError>> {
122        Poll::Ready(Ok(()))
123    }
124
125    fn poll_close(
126        self: Pin<&mut Self>,
127        _cx: &mut Context<'_>,
128    ) -> Poll<std::result::Result<(), SinkError>> {
129        Poll::Ready(Ok(()))
130    }
131
132    fn poll_ready(
133        self: Pin<&mut Self>,
134        _cx: &mut Context<'_>,
135    ) -> Poll<std::result::Result<(), SinkError>> {
136        Poll::Ready(Ok(()))
137    }
138}
139
140/// In-memory symbol stream that drains from a `VecDeque`.
141#[derive(Debug)]
142struct VecTransportStream {
143    symbols: VecDeque<AuthenticatedSymbol>,
144}
145
146impl VecTransportStream {
147    fn new(symbols: Vec<Symbol>) -> Self {
148        let symbols = symbols
149            .into_iter()
150            .map(|symbol| AuthenticatedSymbol::new_verified(symbol, AuthenticationTag::zero()))
151            .collect();
152        Self { symbols }
153    }
154}
155
156impl SymbolStream for VecTransportStream {
157    fn poll_next(
158        mut self: Pin<&mut Self>,
159        _cx: &mut Context<'_>,
160    ) -> Poll<Option<std::result::Result<AuthenticatedSymbol, StreamError>>> {
161        match self.symbols.pop_front() {
162            Some(symbol) => Poll::Ready(Some(Ok(symbol))),
163            None => Poll::Ready(None),
164        }
165    }
166
167    fn size_hint(&self) -> (usize, Option<usize>) {
168        (self.symbols.len(), Some(self.symbols.len()))
169    }
170
171    fn is_exhausted(&self) -> bool {
172        self.symbols.is_empty()
173    }
174}
175
176// ---------------------------------------------------------------------------
177// Production SymbolCodec
178// ---------------------------------------------------------------------------
179
180/// Production [`SymbolCodec`] backed by asupersync's RaptorQ encoder/decoder.
181///
182/// Wraps `RaptorQSenderBuilder` for encode and `RaptorQReceiverBuilder` for
183/// decode, using in-memory transports.  The codec is stateless and can be
184/// shared across threads (`Send + Sync`).
185///
186/// # Configuration
187///
188/// - `max_block_size`: Maximum source block size in bytes (default: 64 KiB).
189///   This controls how asupersync partitions large objects into source blocks.
190///   For page-level FEC where each encode call handles a single commit group
191///   (typically a few pages), the default is sufficient.
192#[derive(Debug, Clone)]
193pub struct AsupersyncCodec {
194    /// Maximum source block size in bytes.
195    max_block_size: usize,
196}
197
198impl AsupersyncCodec {
199    /// Create a codec with the given maximum block size.
200    #[must_use]
201    pub const fn new(max_block_size: usize) -> Self {
202        Self { max_block_size }
203    }
204}
205
206impl Default for AsupersyncCodec {
207    fn default() -> Self {
208        Self::new(64 * 1024)
209    }
210}
211
212fn native_budget_from_local(cx: &Cx) -> AsBudget {
213    let budget = cx.budget();
214    let mut native_budget = AsBudget::new()
215        .with_poll_quota(budget.poll_quota)
216        .with_priority(budget.priority);
217    if let Some(cost_quota) = budget.cost_quota {
218        native_budget = native_budget.with_cost_quota(cost_quota);
219    }
220    if let Some(deadline) = budget.deadline {
221        native_budget = native_budget.with_deadline(local_deadline_to_native_time(deadline));
222    }
223    native_budget
224}
225
226fn wall_clock_now_since_epoch() -> Duration {
227    SystemTime::now()
228        .duration_since(UNIX_EPOCH)
229        .unwrap_or(Duration::ZERO)
230}
231
232fn local_deadline_to_native_time(deadline: Duration) -> AsTime {
233    let absolute_deadline = wall_clock_now_since_epoch()
234        .checked_add(deadline)
235        .unwrap_or(Duration::MAX);
236    let nanos = u64::try_from(absolute_deadline.as_nanos()).unwrap_or(u64::MAX);
237    AsTime::from_nanos(nanos)
238}
239
240fn is_native_abort(kind: AsErrorKind) -> bool {
241    matches!(
242        kind,
243        AsErrorKind::Cancelled
244            | AsErrorKind::CancelTimeout
245            | AsErrorKind::DeadlineExceeded
246            | AsErrorKind::PollQuotaExhausted
247            | AsErrorKind::CostQuotaExhausted
248    )
249}
250
251fn native_reason_to_local(reason: &AsCancelReason) -> fsqlite_types::cx::CancelReason {
252    match reason.kind {
253        AsCancelKind::User => fsqlite_types::cx::CancelReason::UserInterrupt,
254        AsCancelKind::Timeout
255        | AsCancelKind::Deadline
256        | AsCancelKind::PollQuota
257        | AsCancelKind::CostBudget => fsqlite_types::cx::CancelReason::Timeout,
258        AsCancelKind::FailFast
259        | AsCancelKind::RaceLost
260        | AsCancelKind::ParentCancelled
261        | AsCancelKind::Shutdown
262        | AsCancelKind::LinkedExit => fsqlite_types::cx::CancelReason::RegionClose,
263        AsCancelKind::ResourceUnavailable => fsqlite_types::cx::CancelReason::Abort,
264    }
265}
266
267fn sync_local_cancel_from_attached_native(codec_cx: &Cx, native_cx: &AsCx) {
268    if let Some(reason) = native_cx.cancel_reason() {
269        codec_cx.cancel_with_reason(native_reason_to_local(&reason));
270    } else if native_cx.is_cancel_requested() {
271        codec_cx.cancel();
272    }
273}
274
275fn derive_native_request_cx(cx: &Cx) -> (Cx, AsCx) {
276    let codec_cx = cx.create_child();
277    if let Some(reason) = cx.cancel_reason() {
278        codec_cx.cancel_with_reason(reason);
279    } else if cx.is_cancel_requested() {
280        codec_cx.cancel();
281    }
282    let attached_native_cx = cx.attached_native_cx();
283    if let Some(native_cx) = attached_native_cx.as_ref() {
284        sync_local_cancel_from_attached_native(&codec_cx, native_cx);
285    }
286    let native_cx = attached_native_cx
287        .unwrap_or_else(|| AsCx::for_request_with_budget(native_budget_from_local(&codec_cx)));
288    codec_cx.set_native_cx(native_cx.clone());
289    (codec_cx, native_cx)
290}
291
292fn decode_object_params(
293    object_id: AsObjectId,
294    k_source: u32,
295    symbol_size: u32,
296    max_block_size: usize,
297) -> Result<ObjectParams> {
298    let object_size = u64::from(k_source)
299        .checked_mul(u64::from(symbol_size))
300        .ok_or_else(|| FrankenError::OutOfRange {
301            what: "object_size for decode params".to_owned(),
302            value: format!("{k_source}*{symbol_size}"),
303        })?;
304    let symbol_size_u16 = u16::try_from(symbol_size).map_err(|_| FrankenError::OutOfRange {
305        what: "symbol_size as u16".to_owned(),
306        value: symbol_size.to_string(),
307    })?;
308    if object_size == 0 {
309        return Ok(ObjectParams::new(object_id, 0, symbol_size_u16, 0, 0));
310    }
311    if max_block_size == 0 {
312        return Err(FrankenError::OutOfRange {
313            what: "max_block_size (must be > 0)".to_owned(),
314            value: "0".to_owned(),
315        });
316    }
317
318    let max_block_size_u64 =
319        u64::try_from(max_block_size).map_err(|_| FrankenError::OutOfRange {
320            what: "max_block_size as u64".to_owned(),
321            value: max_block_size.to_string(),
322        })?;
323    let source_blocks = u16::try_from(object_size.div_ceil(max_block_size_u64)).map_err(|_| {
324        FrankenError::OutOfRange {
325            what: "source_blocks as u16".to_owned(),
326            value: object_size.div_ceil(max_block_size_u64).to_string(),
327        }
328    })?;
329    let symbols_per_block = u16::try_from(
330        object_size
331            .min(max_block_size_u64)
332            .div_ceil(u64::from(symbol_size_u16)),
333    )
334    .map_err(|_| FrankenError::OutOfRange {
335        what: "symbols_per_block as u16".to_owned(),
336        value: object_size
337            .min(max_block_size_u64)
338            .div_ceil(u64::from(symbol_size_u16))
339            .to_string(),
340    })?;
341
342    Ok(ObjectParams::new(
343        object_id,
344        object_size,
345        symbol_size_u16,
346        source_blocks,
347        symbols_per_block,
348    ))
349}
350
351#[allow(
352    clippy::cast_possible_truncation,
353    clippy::cast_lossless,
354    clippy::cast_precision_loss,
355    clippy::cast_sign_loss
356)]
357impl SymbolCodec for AsupersyncCodec {
358    fn encode(
359        &self,
360        cx: &Cx,
361        source_data: &[u8],
362        symbol_size: u32,
363        repair_overhead: f64,
364    ) -> Result<CodecEncodeResult> {
365        if symbol_size == 0 {
366            return Err(FrankenError::OutOfRange {
367                what: "symbol_size (must be > 0)".to_owned(),
368                value: "0".to_owned(),
369            });
370        }
371        let mut config = RaptorQConfig::default();
372        config.encoding.symbol_size = symbol_size as u16;
373        config.encoding.max_block_size = self.max_block_size;
374        config.encoding.repair_overhead = repair_overhead;
375
376        let (codec_cx, native_cx) = derive_native_request_cx(cx);
377        codec_cx.checkpoint().map_err(|_| FrankenError::Abort)?;
378        let object_id = AsObjectId::new_for_test(PRODUCTION_OBJECT_ID);
379        let mut sender = RaptorQSenderBuilder::new()
380            .config(config)
381            .transport(VecTransportSink::new())
382            .build()
383            .map_err(|e| FrankenError::Internal(format!("{BEAD_ID}: sender build: {e}")))?;
384
385        let outcome = sender
386            .send_object(&native_cx, object_id, source_data)
387            .map_err(|e| {
388                if is_native_abort(e.kind()) {
389                    FrankenError::Abort
390                } else {
391                    FrankenError::Internal(format!("{BEAD_ID}: send_object: {e}"))
392                }
393            })?;
394
395        let symbols = std::mem::take(&mut sender.transport_mut().symbols);
396        let k = outcome.source_symbols as u32;
397
398        let mut source_symbols = Vec::new();
399        let mut repair_symbols = Vec::new();
400        for s in &symbols {
401            let packed_key = pack_symbol_key(s.kind(), s.sbn(), s.esi())?;
402            if s.kind().is_source() {
403                source_symbols.push((packed_key, s.data().to_vec()));
404            } else {
405                repair_symbols.push((packed_key, s.data().to_vec()));
406            }
407        }
408
409        Ok(CodecEncodeResult {
410            source_symbols,
411            repair_symbols,
412            k_source: k,
413        })
414    }
415
416    fn decode(
417        &self,
418        cx: &Cx,
419        symbols: &[(u32, Vec<u8>)],
420        k_source: u32,
421        symbol_size: u32,
422    ) -> Result<CodecDecodeResult> {
423        if symbols.is_empty() {
424            return Ok(CodecDecodeResult::Failure {
425                reason: DecodeFailureReason::InsufficientSymbols,
426                symbols_received: 0,
427                k_required: k_source,
428            });
429        }
430
431        if symbol_size == 0 {
432            return Err(FrankenError::OutOfRange {
433                what: "symbol_size (must be > 0)".to_owned(),
434                value: "0".to_owned(),
435            });
436        }
437
438        let object_id = AsObjectId::new_for_test(PRODUCTION_OBJECT_ID);
439        let mut config = RaptorQConfig::default();
440        config.encoding.symbol_size = symbol_size as u16;
441        config.encoding.max_block_size = self.max_block_size;
442        let params = decode_object_params(object_id, k_source, symbol_size, self.max_block_size)?;
443
444        let mut rebuilt = Vec::with_capacity(symbols.len());
445        for (packed, data) in symbols {
446            let (kind, sbn, esi) = unpack_symbol_key(*packed);
447            rebuilt.push(Symbol::new(
448                SymbolId::new(object_id, sbn, esi),
449                data.clone(),
450                kind,
451            ));
452        }
453
454        let (codec_cx, native_cx) = derive_native_request_cx(cx);
455        codec_cx.checkpoint().map_err(|_| FrankenError::Abort)?;
456        let mut receiver = RaptorQReceiverBuilder::new()
457            .config(config)
458            .source(VecTransportStream::new(rebuilt))
459            .build()
460            .map_err(|e| FrankenError::Internal(format!("{BEAD_ID}: receiver build: {e}")))?;
461
462        match receiver.receive_object(&native_cx, &params) {
463            Ok(outcome) => Ok(CodecDecodeResult::Success {
464                data: outcome.data,
465                symbols_used: outcome.symbols_received as u32,
466                peeled_count: 0,
467                inactivated_count: 0,
468            }),
469            Err(err) if is_native_abort(err.kind()) => Err(FrankenError::Abort),
470            Err(err) => {
471                let reason = match err.kind() {
472                    AsErrorKind::InsufficientSymbols => DecodeFailureReason::InsufficientSymbols,
473                    _ => DecodeFailureReason::SingularMatrix,
474                };
475                Ok(CodecDecodeResult::Failure {
476                    reason,
477                    symbols_received: symbols.len() as u32,
478                    k_required: k_source,
479                })
480            }
481        }
482    }
483}
484
485// ---------------------------------------------------------------------------
486// Tests
487// ---------------------------------------------------------------------------
488
489#[cfg(test)]
490mod tests {
491    use super::*;
492    use fsqlite_types::cx::{CancelReason, Cx};
493
494    fn test_cx() -> Cx {
495        Cx::new()
496    }
497
498    #[test]
499    fn test_pack_unpack_source_symbol() {
500        let packed = pack_symbol_key(SymbolKind::Source, 0, 42).unwrap();
501        let (kind, sbn, esi) = unpack_symbol_key(packed);
502        assert_eq!(kind, SymbolKind::Source);
503        assert_eq!(sbn, 0);
504        assert_eq!(esi, 42);
505    }
506
507    #[test]
508    fn test_pack_unpack_repair_symbol() {
509        let packed = pack_symbol_key(SymbolKind::Repair, 3, 100).unwrap();
510        let (kind, sbn, esi) = unpack_symbol_key(packed);
511        assert_eq!(kind, SymbolKind::Repair);
512        assert_eq!(sbn, 3);
513        assert_eq!(esi, 100);
514    }
515
516    #[test]
517    fn test_pack_esi_overflow() {
518        let result = pack_symbol_key(SymbolKind::Source, 0, PACKED_ESI_MASK + 1);
519        assert!(result.is_err());
520    }
521
522    #[test]
523    fn test_pack_max_esi() {
524        let packed = pack_symbol_key(SymbolKind::Source, 0, PACKED_ESI_MASK).unwrap();
525        let (_, _, esi) = unpack_symbol_key(packed);
526        assert_eq!(esi, PACKED_ESI_MASK);
527    }
528
529    #[test]
530    fn test_pack_max_sbn() {
531        let packed = pack_symbol_key(SymbolKind::Repair, 255, 0).unwrap();
532        let (kind, sbn, esi) = unpack_symbol_key(packed);
533        assert_eq!(kind, SymbolKind::Repair);
534        assert_eq!(sbn, 255);
535        assert_eq!(esi, 0);
536    }
537
538    #[test]
539    fn test_codec_encode_decode_roundtrip() {
540        let codec = AsupersyncCodec::default();
541        let cx = test_cx();
542        let data = vec![0xAB_u8; 4096];
543        let symbol_size = 512_u32;
544        let repair_overhead = 1.25;
545
546        let encoded = codec
547            .encode(&cx, &data, symbol_size, repair_overhead)
548            .unwrap();
549        assert!(encoded.k_source > 0);
550        assert!(!encoded.source_symbols.is_empty());
551        assert!(!encoded.repair_symbols.is_empty());
552
553        // Decode with all symbols (source + repair).
554        let mut all_symbols: Vec<(u32, Vec<u8>)> = encoded.source_symbols.clone();
555        all_symbols.extend(encoded.repair_symbols.clone());
556
557        let decoded = codec
558            .decode(&cx, &all_symbols, encoded.k_source, symbol_size)
559            .unwrap();
560        match decoded {
561            CodecDecodeResult::Success {
562                data: recovered, ..
563            } => {
564                assert_eq!(recovered, data);
565            }
566            CodecDecodeResult::Failure { reason, .. } => {
567                panic!("decode failed: {reason:?}");
568            }
569        }
570    }
571
572    #[test]
573    fn test_codec_decode_source_only() {
574        let codec = AsupersyncCodec::default();
575        let cx = test_cx();
576        let data = vec![0xCD_u8; 2048];
577        let symbol_size = 512_u32;
578
579        let encoded = codec.encode(&cx, &data, symbol_size, 1.25).unwrap();
580
581        // Decode with source symbols only (no repair needed).
582        let decoded = codec
583            .decode(&cx, &encoded.source_symbols, encoded.k_source, symbol_size)
584            .unwrap();
585        match decoded {
586            CodecDecodeResult::Success {
587                data: recovered, ..
588            } => {
589                assert_eq!(recovered, data);
590            }
591            CodecDecodeResult::Failure { reason, .. } => {
592                panic!("source-only decode failed: {reason:?}");
593            }
594        }
595    }
596
597    #[test]
598    fn test_codec_decode_with_erasures() {
599        let codec = AsupersyncCodec::default();
600        let cx = test_cx();
601        let data = vec![0xEF_u8; 4096];
602        let symbol_size = 512_u32;
603
604        let encoded = codec.encode(&cx, &data, symbol_size, 1.5).unwrap();
605        let k = encoded.k_source as usize;
606
607        // Drop first source symbol, replace with repair symbols.
608        let mut symbols: Vec<(u32, Vec<u8>)> = encoded.source_symbols[1..].to_vec();
609        symbols.extend(encoded.repair_symbols.iter().take(2).cloned());
610
611        assert!(symbols.len() >= k, "need at least K symbols");
612
613        let decoded = codec
614            .decode(&cx, &symbols, encoded.k_source, symbol_size)
615            .unwrap();
616        match decoded {
617            CodecDecodeResult::Success {
618                data: recovered, ..
619            } => {
620                assert_eq!(recovered, data);
621            }
622            CodecDecodeResult::Failure { reason, .. } => {
623                panic!("erasure decode failed: {reason:?}");
624            }
625        }
626    }
627
628    #[test]
629    fn test_codec_decode_empty() {
630        let codec = AsupersyncCodec::default();
631        let cx = test_cx();
632        let result = codec.decode(&cx, &[], 4, 512).unwrap();
633        assert!(matches!(
634            result,
635            CodecDecodeResult::Failure {
636                reason: DecodeFailureReason::InsufficientSymbols,
637                ..
638            }
639        ));
640    }
641
642    #[test]
643    fn test_codec_default_max_block_size() {
644        let codec = AsupersyncCodec::default();
645        assert_eq!(codec.max_block_size, 64 * 1024);
646    }
647
648    #[test]
649    fn test_codec_custom_max_block_size() {
650        let codec = AsupersyncCodec::new(128 * 1024);
651        let cx = test_cx();
652        assert_eq!(codec.max_block_size, 128 * 1024);
653
654        // Should still encode/decode correctly.
655        let data = vec![0x42_u8; 2048];
656        let encoded = codec.encode(&cx, &data, 512, 1.25).unwrap();
657        let decoded = codec
658            .decode(&cx, &encoded.source_symbols, encoded.k_source, 512)
659            .unwrap();
660        assert!(matches!(decoded, CodecDecodeResult::Success { .. }));
661    }
662
663    #[test]
664    fn test_codec_send_sync() {
665        // SymbolCodec requires Send + Sync.
666        fn assert_send_sync<T: Send + Sync>() {}
667        assert_send_sync::<AsupersyncCodec>();
668    }
669
670    #[test]
671    fn test_codec_large_data_4096_page() {
672        let codec = AsupersyncCodec::default();
673        let cx = test_cx();
674        // 4 pages of 4096 bytes each.
675        let data = vec![0x77_u8; 4 * 4096];
676        let encoded = codec.encode(&cx, &data, 4096, 1.25).unwrap();
677        assert!(encoded.k_source >= 4);
678
679        let decoded = codec
680            .decode(&cx, &encoded.source_symbols, encoded.k_source, 4096)
681            .unwrap();
682        match decoded {
683            CodecDecodeResult::Success {
684                data: recovered, ..
685            } => {
686                assert_eq!(recovered, data);
687            }
688            CodecDecodeResult::Failure { reason, .. } => {
689                panic!("large page decode failed: {reason:?}");
690            }
691        }
692    }
693
694    #[test]
695    fn test_codec_repair_symbol_count_scales_with_overhead() {
696        let codec = AsupersyncCodec::default();
697        let cx = test_cx();
698        let data = vec![0x55_u8; 8192];
699
700        let low = codec.encode(&cx, &data, 512, 1.1).unwrap();
701        let high = codec.encode(&cx, &data, 512, 2.0).unwrap();
702
703        // Higher overhead should produce more repair symbols.
704        assert!(
705            high.repair_symbols.len() > low.repair_symbols.len(),
706            "2.0x overhead ({}) should produce more repairs than 1.1x ({})",
707            high.repair_symbols.len(),
708            low.repair_symbols.len()
709        );
710    }
711
712    #[test]
713    fn test_codec_decode_multiple_source_blocks_roundtrip() {
714        let codec = AsupersyncCodec::new(1024);
715        let cx = test_cx();
716        let data = vec![0x5A_u8; 3 * 1024];
717        let symbol_size = 512_u32;
718
719        let encoded = codec.encode(&cx, &data, symbol_size, 1.25).unwrap();
720        assert!(
721            encoded.source_symbols.iter().any(|(packed, _)| {
722                let (_, sbn, _) = unpack_symbol_key(*packed);
723                sbn > 0
724            }),
725            "test data should span multiple source blocks"
726        );
727
728        let decoded = codec
729            .decode(&cx, &encoded.source_symbols, encoded.k_source, symbol_size)
730            .unwrap();
731        match decoded {
732            CodecDecodeResult::Success {
733                data: recovered, ..
734            } => {
735                assert_eq!(recovered, data);
736            }
737            CodecDecodeResult::Failure { reason, .. } => {
738                panic!("multi-block decode failed: {reason:?}");
739            }
740        }
741    }
742
743    #[test]
744    fn test_pack_all_bits_combined() {
745        // Test with all bit fields populated.
746        let packed = pack_symbol_key(SymbolKind::Repair, 127, 0x3F_FFFF).unwrap();
747        let (kind, sbn, esi) = unpack_symbol_key(packed);
748        assert_eq!(kind, SymbolKind::Repair);
749        assert_eq!(sbn, 127);
750        assert_eq!(esi, 0x3F_FFFF);
751    }
752
753    #[test]
754    fn test_codec_encode_respects_cancelled_cx() {
755        let codec = AsupersyncCodec::default();
756        let cx = test_cx();
757        cx.cancel_with_reason(CancelReason::Abort);
758
759        let err = codec.encode(&cx, &[0xAB; 512], 512, 1.25).unwrap_err();
760        assert!(matches!(err, FrankenError::Abort));
761    }
762
763    #[test]
764    fn test_codec_decode_respects_cancelled_cx() {
765        let codec = AsupersyncCodec::default();
766        let setup_cx = test_cx();
767        let encoded = codec.encode(&setup_cx, &[0xBC; 512], 512, 1.25).unwrap();
768
769        let cx = test_cx();
770        cx.cancel_with_reason(CancelReason::Abort);
771
772        let err = codec
773            .decode(&cx, &encoded.source_symbols, encoded.k_source, 512)
774            .unwrap_err();
775        assert!(matches!(err, FrankenError::Abort));
776    }
777
778    #[test]
779    fn test_local_deadline_converts_to_future_native_time() {
780        let before = wall_clock_now_since_epoch();
781        let cx = Cx::with_budget(
782            fsqlite_types::cx::Budget::INFINITE.with_deadline(Duration::from_millis(50)),
783        );
784
785        let native_budget = native_budget_from_local(&cx);
786        let native_deadline = Duration::from_nanos(
787            native_budget
788                .deadline
789                .expect("native budget should carry a deadline")
790                .as_nanos(),
791        );
792        let lower_bound = before
793            .checked_add(Duration::from_millis(25))
794            .unwrap_or(Duration::MAX);
795
796        assert!(
797            native_deadline >= lower_bound,
798            "native deadline should be an absolute future instant, got {native_deadline:?}"
799        );
800    }
801
802    #[test]
803    fn test_codec_encode_respects_attached_native_cancellation() {
804        let codec = AsupersyncCodec::default();
805        let cx = test_cx();
806        let native = AsCx::for_testing();
807        cx.set_native_cx(native.clone());
808        native.set_cancel_reason(AsCancelReason::timeout());
809
810        let err = codec.encode(&cx, &[0xAB; 512], 512, 1.25).unwrap_err();
811        assert!(matches!(err, FrankenError::Abort));
812    }
813
814    #[test]
815    fn test_codec_decode_respects_attached_native_cancellation() {
816        let codec = AsupersyncCodec::default();
817        let setup_cx = test_cx();
818        let encoded = codec.encode(&setup_cx, &[0xBC; 512], 512, 1.25).unwrap();
819
820        let cx = test_cx();
821        let native = AsCx::for_testing();
822        cx.set_native_cx(native.clone());
823        native.set_cancel_reason(AsCancelReason::timeout());
824
825        let err = codec
826            .decode(&cx, &encoded.source_symbols, encoded.k_source, 512)
827            .unwrap_err();
828        assert!(matches!(err, FrankenError::Abort));
829    }
830
831    #[test]
832    fn test_derive_native_request_cx_mirrors_attached_native_cancellation() {
833        let cx = test_cx();
834        let native = AsCx::for_testing();
835        cx.set_native_cx(native.clone());
836        native.set_cancel_reason(AsCancelReason::timeout());
837
838        let (codec_cx, derived_native) = derive_native_request_cx(&cx);
839
840        assert_eq!(codec_cx.cancel_reason(), Some(CancelReason::Timeout));
841        assert!(codec_cx.is_cancel_requested());
842        assert!(codec_cx.checkpoint().is_err());
843        assert!(derived_native.is_cancel_requested());
844    }
845}