1use std::collections::VecDeque;
10use std::pin::Pin;
11use std::task::{Context, Poll};
12use std::time::{Duration, SystemTime, UNIX_EPOCH};
13
14use asupersync::error::ErrorKind as AsErrorKind;
15use asupersync::raptorq::{RaptorQReceiverBuilder, RaptorQSenderBuilder};
16use asupersync::security::AuthenticationTag;
17use asupersync::security::authenticated::AuthenticatedSymbol;
18use asupersync::transport::error::{SinkError, StreamError};
19use asupersync::transport::sink::SymbolSink;
20use asupersync::transport::stream::SymbolStream;
21use asupersync::types::Time as AsTime;
22use asupersync::types::{
23 CancelKind as AsCancelKind, CancelReason as AsCancelReason, ObjectId as AsObjectId,
24 ObjectParams, Symbol, SymbolId, SymbolKind,
25};
26use asupersync::{Budget as AsBudget, Cx as AsCx, RaptorQConfig};
27
28use fsqlite_error::{FrankenError, Result};
29use fsqlite_types::cx::Cx;
30
31use crate::raptorq_integration::{
32 CodecDecodeResult, CodecEncodeResult, DecodeFailureReason, SymbolCodec,
33};
34
35const BEAD_ID: &str = "bd-3sj9w";
36
37const PRODUCTION_OBJECT_ID: u64 = 0xF5_3D9A_0001;
42
43const PACKED_KIND_REPAIR_BIT: u32 = 1_u32 << 31;
53const PACKED_SBN_SHIFT: u32 = 23;
54const PACKED_SBN_MASK: u32 = 0xFF;
55const PACKED_ESI_MASK: u32 = 0x7F_FFFF;
56
57pub fn pack_symbol_key(kind: SymbolKind, sbn: u8, esi: u32) -> Result<u32> {
61 if esi > PACKED_ESI_MASK {
62 return Err(FrankenError::OutOfRange {
63 what: "packed symbol esi (must fit 23 bits)".to_owned(),
64 value: esi.to_string(),
65 });
66 }
67
68 let kind_bit = if kind.is_repair() {
69 PACKED_KIND_REPAIR_BIT
70 } else {
71 0
72 };
73 Ok(kind_bit | (u32::from(sbn) << PACKED_SBN_SHIFT) | esi)
74}
75
76#[must_use]
78pub fn unpack_symbol_key(packed: u32) -> (SymbolKind, u8, u32) {
79 let kind = if packed & PACKED_KIND_REPAIR_BIT == 0 {
80 SymbolKind::Source
81 } else {
82 SymbolKind::Repair
83 };
84 #[allow(clippy::cast_possible_truncation)]
85 let sbn = ((packed >> PACKED_SBN_SHIFT) & PACKED_SBN_MASK) as u8;
86 let esi = packed & PACKED_ESI_MASK;
87 (kind, sbn, esi)
88}
89
90#[derive(Debug)]
96struct VecTransportSink {
97 symbols: Vec<Symbol>,
98}
99
100impl VecTransportSink {
101 fn new() -> Self {
102 Self {
103 symbols: Vec::new(),
104 }
105 }
106}
107
108impl SymbolSink for VecTransportSink {
109 fn poll_send(
110 mut self: Pin<&mut Self>,
111 _cx: &mut Context<'_>,
112 symbol: AuthenticatedSymbol,
113 ) -> Poll<std::result::Result<(), SinkError>> {
114 self.symbols.push(symbol.into_symbol());
115 Poll::Ready(Ok(()))
116 }
117
118 fn poll_flush(
119 self: Pin<&mut Self>,
120 _cx: &mut Context<'_>,
121 ) -> Poll<std::result::Result<(), SinkError>> {
122 Poll::Ready(Ok(()))
123 }
124
125 fn poll_close(
126 self: Pin<&mut Self>,
127 _cx: &mut Context<'_>,
128 ) -> Poll<std::result::Result<(), SinkError>> {
129 Poll::Ready(Ok(()))
130 }
131
132 fn poll_ready(
133 self: Pin<&mut Self>,
134 _cx: &mut Context<'_>,
135 ) -> Poll<std::result::Result<(), SinkError>> {
136 Poll::Ready(Ok(()))
137 }
138}
139
140#[derive(Debug)]
142struct VecTransportStream {
143 symbols: VecDeque<AuthenticatedSymbol>,
144}
145
146impl VecTransportStream {
147 fn new(symbols: Vec<Symbol>) -> Self {
148 let symbols = symbols
149 .into_iter()
150 .map(|symbol| AuthenticatedSymbol::new_verified(symbol, AuthenticationTag::zero()))
151 .collect();
152 Self { symbols }
153 }
154}
155
156impl SymbolStream for VecTransportStream {
157 fn poll_next(
158 mut self: Pin<&mut Self>,
159 _cx: &mut Context<'_>,
160 ) -> Poll<Option<std::result::Result<AuthenticatedSymbol, StreamError>>> {
161 match self.symbols.pop_front() {
162 Some(symbol) => Poll::Ready(Some(Ok(symbol))),
163 None => Poll::Ready(None),
164 }
165 }
166
167 fn size_hint(&self) -> (usize, Option<usize>) {
168 (self.symbols.len(), Some(self.symbols.len()))
169 }
170
171 fn is_exhausted(&self) -> bool {
172 self.symbols.is_empty()
173 }
174}
175
176#[derive(Debug, Clone)]
193pub struct AsupersyncCodec {
194 max_block_size: usize,
196}
197
198impl AsupersyncCodec {
199 #[must_use]
201 pub const fn new(max_block_size: usize) -> Self {
202 Self { max_block_size }
203 }
204}
205
206impl Default for AsupersyncCodec {
207 fn default() -> Self {
208 Self::new(64 * 1024)
209 }
210}
211
212fn native_budget_from_local(cx: &Cx) -> AsBudget {
213 let budget = cx.budget();
214 let mut native_budget = AsBudget::new()
215 .with_poll_quota(budget.poll_quota)
216 .with_priority(budget.priority);
217 if let Some(cost_quota) = budget.cost_quota {
218 native_budget = native_budget.with_cost_quota(cost_quota);
219 }
220 if let Some(deadline) = budget.deadline {
221 native_budget = native_budget.with_deadline(local_deadline_to_native_time(deadline));
222 }
223 native_budget
224}
225
226fn wall_clock_now_since_epoch() -> Duration {
227 SystemTime::now()
228 .duration_since(UNIX_EPOCH)
229 .unwrap_or(Duration::ZERO)
230}
231
232fn local_deadline_to_native_time(deadline: Duration) -> AsTime {
233 let absolute_deadline = wall_clock_now_since_epoch()
234 .checked_add(deadline)
235 .unwrap_or(Duration::MAX);
236 let nanos = u64::try_from(absolute_deadline.as_nanos()).unwrap_or(u64::MAX);
237 AsTime::from_nanos(nanos)
238}
239
240fn is_native_abort(kind: AsErrorKind) -> bool {
241 matches!(
242 kind,
243 AsErrorKind::Cancelled
244 | AsErrorKind::CancelTimeout
245 | AsErrorKind::DeadlineExceeded
246 | AsErrorKind::PollQuotaExhausted
247 | AsErrorKind::CostQuotaExhausted
248 )
249}
250
251fn native_reason_to_local(reason: &AsCancelReason) -> fsqlite_types::cx::CancelReason {
252 match reason.kind {
253 AsCancelKind::User => fsqlite_types::cx::CancelReason::UserInterrupt,
254 AsCancelKind::Timeout
255 | AsCancelKind::Deadline
256 | AsCancelKind::PollQuota
257 | AsCancelKind::CostBudget => fsqlite_types::cx::CancelReason::Timeout,
258 AsCancelKind::FailFast
259 | AsCancelKind::RaceLost
260 | AsCancelKind::ParentCancelled
261 | AsCancelKind::Shutdown
262 | AsCancelKind::LinkedExit => fsqlite_types::cx::CancelReason::RegionClose,
263 AsCancelKind::ResourceUnavailable => fsqlite_types::cx::CancelReason::Abort,
264 }
265}
266
267fn sync_local_cancel_from_attached_native(codec_cx: &Cx, native_cx: &AsCx) {
268 if let Some(reason) = native_cx.cancel_reason() {
269 codec_cx.cancel_with_reason(native_reason_to_local(&reason));
270 } else if native_cx.is_cancel_requested() {
271 codec_cx.cancel();
272 }
273}
274
275fn derive_native_request_cx(cx: &Cx) -> (Cx, AsCx) {
276 let codec_cx = cx.create_child();
277 if let Some(reason) = cx.cancel_reason() {
278 codec_cx.cancel_with_reason(reason);
279 } else if cx.is_cancel_requested() {
280 codec_cx.cancel();
281 }
282 let attached_native_cx = cx.attached_native_cx();
283 if let Some(native_cx) = attached_native_cx.as_ref() {
284 sync_local_cancel_from_attached_native(&codec_cx, native_cx);
285 }
286 let native_cx = attached_native_cx
287 .unwrap_or_else(|| AsCx::for_request_with_budget(native_budget_from_local(&codec_cx)));
288 codec_cx.set_native_cx(native_cx.clone());
289 (codec_cx, native_cx)
290}
291
292fn decode_object_params(
293 object_id: AsObjectId,
294 k_source: u32,
295 symbol_size: u32,
296 max_block_size: usize,
297) -> Result<ObjectParams> {
298 let object_size = u64::from(k_source)
299 .checked_mul(u64::from(symbol_size))
300 .ok_or_else(|| FrankenError::OutOfRange {
301 what: "object_size for decode params".to_owned(),
302 value: format!("{k_source}*{symbol_size}"),
303 })?;
304 let symbol_size_u16 = u16::try_from(symbol_size).map_err(|_| FrankenError::OutOfRange {
305 what: "symbol_size as u16".to_owned(),
306 value: symbol_size.to_string(),
307 })?;
308 if object_size == 0 {
309 return Ok(ObjectParams::new(object_id, 0, symbol_size_u16, 0, 0));
310 }
311 if max_block_size == 0 {
312 return Err(FrankenError::OutOfRange {
313 what: "max_block_size (must be > 0)".to_owned(),
314 value: "0".to_owned(),
315 });
316 }
317
318 let max_block_size_u64 =
319 u64::try_from(max_block_size).map_err(|_| FrankenError::OutOfRange {
320 what: "max_block_size as u64".to_owned(),
321 value: max_block_size.to_string(),
322 })?;
323 let source_blocks = u16::try_from(object_size.div_ceil(max_block_size_u64)).map_err(|_| {
324 FrankenError::OutOfRange {
325 what: "source_blocks as u16".to_owned(),
326 value: object_size.div_ceil(max_block_size_u64).to_string(),
327 }
328 })?;
329 let symbols_per_block = u16::try_from(
330 object_size
331 .min(max_block_size_u64)
332 .div_ceil(u64::from(symbol_size_u16)),
333 )
334 .map_err(|_| FrankenError::OutOfRange {
335 what: "symbols_per_block as u16".to_owned(),
336 value: object_size
337 .min(max_block_size_u64)
338 .div_ceil(u64::from(symbol_size_u16))
339 .to_string(),
340 })?;
341
342 Ok(ObjectParams::new(
343 object_id,
344 object_size,
345 symbol_size_u16,
346 source_blocks,
347 symbols_per_block,
348 ))
349}
350
351#[allow(
352 clippy::cast_possible_truncation,
353 clippy::cast_lossless,
354 clippy::cast_precision_loss,
355 clippy::cast_sign_loss
356)]
357impl SymbolCodec for AsupersyncCodec {
358 fn encode(
359 &self,
360 cx: &Cx,
361 source_data: &[u8],
362 symbol_size: u32,
363 repair_overhead: f64,
364 ) -> Result<CodecEncodeResult> {
365 if symbol_size == 0 {
366 return Err(FrankenError::OutOfRange {
367 what: "symbol_size (must be > 0)".to_owned(),
368 value: "0".to_owned(),
369 });
370 }
371 let mut config = RaptorQConfig::default();
372 config.encoding.symbol_size = symbol_size as u16;
373 config.encoding.max_block_size = self.max_block_size;
374 config.encoding.repair_overhead = repair_overhead;
375
376 let (codec_cx, native_cx) = derive_native_request_cx(cx);
377 codec_cx.checkpoint().map_err(|_| FrankenError::Abort)?;
378 let object_id = AsObjectId::new_for_test(PRODUCTION_OBJECT_ID);
379 let mut sender = RaptorQSenderBuilder::new()
380 .config(config)
381 .transport(VecTransportSink::new())
382 .build()
383 .map_err(|e| FrankenError::Internal(format!("{BEAD_ID}: sender build: {e}")))?;
384
385 let outcome = sender
386 .send_object(&native_cx, object_id, source_data)
387 .map_err(|e| {
388 if is_native_abort(e.kind()) {
389 FrankenError::Abort
390 } else {
391 FrankenError::Internal(format!("{BEAD_ID}: send_object: {e}"))
392 }
393 })?;
394
395 let symbols = std::mem::take(&mut sender.transport_mut().symbols);
396 let k = outcome.source_symbols as u32;
397
398 let mut source_symbols = Vec::new();
399 let mut repair_symbols = Vec::new();
400 for s in &symbols {
401 let packed_key = pack_symbol_key(s.kind(), s.sbn(), s.esi())?;
402 if s.kind().is_source() {
403 source_symbols.push((packed_key, s.data().to_vec()));
404 } else {
405 repair_symbols.push((packed_key, s.data().to_vec()));
406 }
407 }
408
409 Ok(CodecEncodeResult {
410 source_symbols,
411 repair_symbols,
412 k_source: k,
413 })
414 }
415
416 fn decode(
417 &self,
418 cx: &Cx,
419 symbols: &[(u32, Vec<u8>)],
420 k_source: u32,
421 symbol_size: u32,
422 ) -> Result<CodecDecodeResult> {
423 if symbols.is_empty() {
424 return Ok(CodecDecodeResult::Failure {
425 reason: DecodeFailureReason::InsufficientSymbols,
426 symbols_received: 0,
427 k_required: k_source,
428 });
429 }
430
431 if symbol_size == 0 {
432 return Err(FrankenError::OutOfRange {
433 what: "symbol_size (must be > 0)".to_owned(),
434 value: "0".to_owned(),
435 });
436 }
437
438 let object_id = AsObjectId::new_for_test(PRODUCTION_OBJECT_ID);
439 let mut config = RaptorQConfig::default();
440 config.encoding.symbol_size = symbol_size as u16;
441 config.encoding.max_block_size = self.max_block_size;
442 let params = decode_object_params(object_id, k_source, symbol_size, self.max_block_size)?;
443
444 let mut rebuilt = Vec::with_capacity(symbols.len());
445 for (packed, data) in symbols {
446 let (kind, sbn, esi) = unpack_symbol_key(*packed);
447 rebuilt.push(Symbol::new(
448 SymbolId::new(object_id, sbn, esi),
449 data.clone(),
450 kind,
451 ));
452 }
453
454 let (codec_cx, native_cx) = derive_native_request_cx(cx);
455 codec_cx.checkpoint().map_err(|_| FrankenError::Abort)?;
456 let mut receiver = RaptorQReceiverBuilder::new()
457 .config(config)
458 .source(VecTransportStream::new(rebuilt))
459 .build()
460 .map_err(|e| FrankenError::Internal(format!("{BEAD_ID}: receiver build: {e}")))?;
461
462 match receiver.receive_object(&native_cx, ¶ms) {
463 Ok(outcome) => Ok(CodecDecodeResult::Success {
464 data: outcome.data,
465 symbols_used: outcome.symbols_received as u32,
466 peeled_count: 0,
467 inactivated_count: 0,
468 }),
469 Err(err) if is_native_abort(err.kind()) => Err(FrankenError::Abort),
470 Err(err) => {
471 let reason = match err.kind() {
472 AsErrorKind::InsufficientSymbols => DecodeFailureReason::InsufficientSymbols,
473 _ => DecodeFailureReason::SingularMatrix,
474 };
475 Ok(CodecDecodeResult::Failure {
476 reason,
477 symbols_received: symbols.len() as u32,
478 k_required: k_source,
479 })
480 }
481 }
482 }
483}
484
485#[cfg(test)]
490mod tests {
491 use super::*;
492 use fsqlite_types::cx::{CancelReason, Cx};
493
494 fn test_cx() -> Cx {
495 Cx::new()
496 }
497
498 #[test]
499 fn test_pack_unpack_source_symbol() {
500 let packed = pack_symbol_key(SymbolKind::Source, 0, 42).unwrap();
501 let (kind, sbn, esi) = unpack_symbol_key(packed);
502 assert_eq!(kind, SymbolKind::Source);
503 assert_eq!(sbn, 0);
504 assert_eq!(esi, 42);
505 }
506
507 #[test]
508 fn test_pack_unpack_repair_symbol() {
509 let packed = pack_symbol_key(SymbolKind::Repair, 3, 100).unwrap();
510 let (kind, sbn, esi) = unpack_symbol_key(packed);
511 assert_eq!(kind, SymbolKind::Repair);
512 assert_eq!(sbn, 3);
513 assert_eq!(esi, 100);
514 }
515
516 #[test]
517 fn test_pack_esi_overflow() {
518 let result = pack_symbol_key(SymbolKind::Source, 0, PACKED_ESI_MASK + 1);
519 assert!(result.is_err());
520 }
521
522 #[test]
523 fn test_pack_max_esi() {
524 let packed = pack_symbol_key(SymbolKind::Source, 0, PACKED_ESI_MASK).unwrap();
525 let (_, _, esi) = unpack_symbol_key(packed);
526 assert_eq!(esi, PACKED_ESI_MASK);
527 }
528
529 #[test]
530 fn test_pack_max_sbn() {
531 let packed = pack_symbol_key(SymbolKind::Repair, 255, 0).unwrap();
532 let (kind, sbn, esi) = unpack_symbol_key(packed);
533 assert_eq!(kind, SymbolKind::Repair);
534 assert_eq!(sbn, 255);
535 assert_eq!(esi, 0);
536 }
537
538 #[test]
539 fn test_codec_encode_decode_roundtrip() {
540 let codec = AsupersyncCodec::default();
541 let cx = test_cx();
542 let data = vec![0xAB_u8; 4096];
543 let symbol_size = 512_u32;
544 let repair_overhead = 1.25;
545
546 let encoded = codec
547 .encode(&cx, &data, symbol_size, repair_overhead)
548 .unwrap();
549 assert!(encoded.k_source > 0);
550 assert!(!encoded.source_symbols.is_empty());
551 assert!(!encoded.repair_symbols.is_empty());
552
553 let mut all_symbols: Vec<(u32, Vec<u8>)> = encoded.source_symbols.clone();
555 all_symbols.extend(encoded.repair_symbols.clone());
556
557 let decoded = codec
558 .decode(&cx, &all_symbols, encoded.k_source, symbol_size)
559 .unwrap();
560 match decoded {
561 CodecDecodeResult::Success {
562 data: recovered, ..
563 } => {
564 assert_eq!(recovered, data);
565 }
566 CodecDecodeResult::Failure { reason, .. } => {
567 panic!("decode failed: {reason:?}");
568 }
569 }
570 }
571
572 #[test]
573 fn test_codec_decode_source_only() {
574 let codec = AsupersyncCodec::default();
575 let cx = test_cx();
576 let data = vec![0xCD_u8; 2048];
577 let symbol_size = 512_u32;
578
579 let encoded = codec.encode(&cx, &data, symbol_size, 1.25).unwrap();
580
581 let decoded = codec
583 .decode(&cx, &encoded.source_symbols, encoded.k_source, symbol_size)
584 .unwrap();
585 match decoded {
586 CodecDecodeResult::Success {
587 data: recovered, ..
588 } => {
589 assert_eq!(recovered, data);
590 }
591 CodecDecodeResult::Failure { reason, .. } => {
592 panic!("source-only decode failed: {reason:?}");
593 }
594 }
595 }
596
597 #[test]
598 fn test_codec_decode_with_erasures() {
599 let codec = AsupersyncCodec::default();
600 let cx = test_cx();
601 let data = vec![0xEF_u8; 4096];
602 let symbol_size = 512_u32;
603
604 let encoded = codec.encode(&cx, &data, symbol_size, 1.5).unwrap();
605 let k = encoded.k_source as usize;
606
607 let mut symbols: Vec<(u32, Vec<u8>)> = encoded.source_symbols[1..].to_vec();
609 symbols.extend(encoded.repair_symbols.iter().take(2).cloned());
610
611 assert!(symbols.len() >= k, "need at least K symbols");
612
613 let decoded = codec
614 .decode(&cx, &symbols, encoded.k_source, symbol_size)
615 .unwrap();
616 match decoded {
617 CodecDecodeResult::Success {
618 data: recovered, ..
619 } => {
620 assert_eq!(recovered, data);
621 }
622 CodecDecodeResult::Failure { reason, .. } => {
623 panic!("erasure decode failed: {reason:?}");
624 }
625 }
626 }
627
628 #[test]
629 fn test_codec_decode_empty() {
630 let codec = AsupersyncCodec::default();
631 let cx = test_cx();
632 let result = codec.decode(&cx, &[], 4, 512).unwrap();
633 assert!(matches!(
634 result,
635 CodecDecodeResult::Failure {
636 reason: DecodeFailureReason::InsufficientSymbols,
637 ..
638 }
639 ));
640 }
641
642 #[test]
643 fn test_codec_default_max_block_size() {
644 let codec = AsupersyncCodec::default();
645 assert_eq!(codec.max_block_size, 64 * 1024);
646 }
647
648 #[test]
649 fn test_codec_custom_max_block_size() {
650 let codec = AsupersyncCodec::new(128 * 1024);
651 let cx = test_cx();
652 assert_eq!(codec.max_block_size, 128 * 1024);
653
654 let data = vec![0x42_u8; 2048];
656 let encoded = codec.encode(&cx, &data, 512, 1.25).unwrap();
657 let decoded = codec
658 .decode(&cx, &encoded.source_symbols, encoded.k_source, 512)
659 .unwrap();
660 assert!(matches!(decoded, CodecDecodeResult::Success { .. }));
661 }
662
663 #[test]
664 fn test_codec_send_sync() {
665 fn assert_send_sync<T: Send + Sync>() {}
667 assert_send_sync::<AsupersyncCodec>();
668 }
669
670 #[test]
671 fn test_codec_large_data_4096_page() {
672 let codec = AsupersyncCodec::default();
673 let cx = test_cx();
674 let data = vec![0x77_u8; 4 * 4096];
676 let encoded = codec.encode(&cx, &data, 4096, 1.25).unwrap();
677 assert!(encoded.k_source >= 4);
678
679 let decoded = codec
680 .decode(&cx, &encoded.source_symbols, encoded.k_source, 4096)
681 .unwrap();
682 match decoded {
683 CodecDecodeResult::Success {
684 data: recovered, ..
685 } => {
686 assert_eq!(recovered, data);
687 }
688 CodecDecodeResult::Failure { reason, .. } => {
689 panic!("large page decode failed: {reason:?}");
690 }
691 }
692 }
693
694 #[test]
695 fn test_codec_repair_symbol_count_scales_with_overhead() {
696 let codec = AsupersyncCodec::default();
697 let cx = test_cx();
698 let data = vec![0x55_u8; 8192];
699
700 let low = codec.encode(&cx, &data, 512, 1.1).unwrap();
701 let high = codec.encode(&cx, &data, 512, 2.0).unwrap();
702
703 assert!(
705 high.repair_symbols.len() > low.repair_symbols.len(),
706 "2.0x overhead ({}) should produce more repairs than 1.1x ({})",
707 high.repair_symbols.len(),
708 low.repair_symbols.len()
709 );
710 }
711
712 #[test]
713 fn test_codec_decode_multiple_source_blocks_roundtrip() {
714 let codec = AsupersyncCodec::new(1024);
715 let cx = test_cx();
716 let data = vec![0x5A_u8; 3 * 1024];
717 let symbol_size = 512_u32;
718
719 let encoded = codec.encode(&cx, &data, symbol_size, 1.25).unwrap();
720 assert!(
721 encoded.source_symbols.iter().any(|(packed, _)| {
722 let (_, sbn, _) = unpack_symbol_key(*packed);
723 sbn > 0
724 }),
725 "test data should span multiple source blocks"
726 );
727
728 let decoded = codec
729 .decode(&cx, &encoded.source_symbols, encoded.k_source, symbol_size)
730 .unwrap();
731 match decoded {
732 CodecDecodeResult::Success {
733 data: recovered, ..
734 } => {
735 assert_eq!(recovered, data);
736 }
737 CodecDecodeResult::Failure { reason, .. } => {
738 panic!("multi-block decode failed: {reason:?}");
739 }
740 }
741 }
742
743 #[test]
744 fn test_pack_all_bits_combined() {
745 let packed = pack_symbol_key(SymbolKind::Repair, 127, 0x3F_FFFF).unwrap();
747 let (kind, sbn, esi) = unpack_symbol_key(packed);
748 assert_eq!(kind, SymbolKind::Repair);
749 assert_eq!(sbn, 127);
750 assert_eq!(esi, 0x3F_FFFF);
751 }
752
753 #[test]
754 fn test_codec_encode_respects_cancelled_cx() {
755 let codec = AsupersyncCodec::default();
756 let cx = test_cx();
757 cx.cancel_with_reason(CancelReason::Abort);
758
759 let err = codec.encode(&cx, &[0xAB; 512], 512, 1.25).unwrap_err();
760 assert!(matches!(err, FrankenError::Abort));
761 }
762
763 #[test]
764 fn test_codec_decode_respects_cancelled_cx() {
765 let codec = AsupersyncCodec::default();
766 let setup_cx = test_cx();
767 let encoded = codec.encode(&setup_cx, &[0xBC; 512], 512, 1.25).unwrap();
768
769 let cx = test_cx();
770 cx.cancel_with_reason(CancelReason::Abort);
771
772 let err = codec
773 .decode(&cx, &encoded.source_symbols, encoded.k_source, 512)
774 .unwrap_err();
775 assert!(matches!(err, FrankenError::Abort));
776 }
777
778 #[test]
779 fn test_local_deadline_converts_to_future_native_time() {
780 let before = wall_clock_now_since_epoch();
781 let cx = Cx::with_budget(
782 fsqlite_types::cx::Budget::INFINITE.with_deadline(Duration::from_millis(50)),
783 );
784
785 let native_budget = native_budget_from_local(&cx);
786 let native_deadline = Duration::from_nanos(
787 native_budget
788 .deadline
789 .expect("native budget should carry a deadline")
790 .as_nanos(),
791 );
792 let lower_bound = before
793 .checked_add(Duration::from_millis(25))
794 .unwrap_or(Duration::MAX);
795
796 assert!(
797 native_deadline >= lower_bound,
798 "native deadline should be an absolute future instant, got {native_deadline:?}"
799 );
800 }
801
802 #[test]
803 fn test_codec_encode_respects_attached_native_cancellation() {
804 let codec = AsupersyncCodec::default();
805 let cx = test_cx();
806 let native = AsCx::for_testing();
807 cx.set_native_cx(native.clone());
808 native.set_cancel_reason(AsCancelReason::timeout());
809
810 let err = codec.encode(&cx, &[0xAB; 512], 512, 1.25).unwrap_err();
811 assert!(matches!(err, FrankenError::Abort));
812 }
813
814 #[test]
815 fn test_codec_decode_respects_attached_native_cancellation() {
816 let codec = AsupersyncCodec::default();
817 let setup_cx = test_cx();
818 let encoded = codec.encode(&setup_cx, &[0xBC; 512], 512, 1.25).unwrap();
819
820 let cx = test_cx();
821 let native = AsCx::for_testing();
822 cx.set_native_cx(native.clone());
823 native.set_cancel_reason(AsCancelReason::timeout());
824
825 let err = codec
826 .decode(&cx, &encoded.source_symbols, encoded.k_source, 512)
827 .unwrap_err();
828 assert!(matches!(err, FrankenError::Abort));
829 }
830
831 #[test]
832 fn test_derive_native_request_cx_mirrors_attached_native_cancellation() {
833 let cx = test_cx();
834 let native = AsCx::for_testing();
835 cx.set_native_cx(native.clone());
836 native.set_cancel_reason(AsCancelReason::timeout());
837
838 let (codec_cx, derived_native) = derive_native_request_cx(&cx);
839
840 assert_eq!(codec_cx.cancel_reason(), Some(CancelReason::Timeout));
841 assert!(codec_cx.is_cancel_requested());
842 assert!(codec_cx.checkpoint().is_err());
843 assert!(derived_native.is_cancel_requested());
844 }
845}