1use std::marker::PhantomData;
30use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
31use std::sync::{Arc, Mutex, Weak};
32use std::time::Duration;
33
34#[cfg(feature = "native")]
35use asupersync::types::Time as NativeTime;
36#[cfg(feature = "native")]
37use asupersync::types::{CancelKind as NativeCancelKind, CancelReason as NativeCancelReason};
38#[cfg(feature = "native")]
39use asupersync::{Budget as NativeBudget, Cx as NativeCx};
40
41#[cfg(not(feature = "native"))]
42mod native_cx_shim {
43 use std::sync::atomic::{AtomicBool, Ordering};
44 use std::sync::{Arc, Mutex};
45
46 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
47 pub enum NativeCancelKind {
48 User,
49 Timeout,
50 Deadline,
51 PollQuota,
52 CostBudget,
53 FailFast,
54 RaceLost,
55 ParentCancelled,
56 Shutdown,
57 LinkedExit,
58 ResourceUnavailable,
59 }
60
61 #[derive(Debug, Clone, PartialEq, Eq)]
62 pub struct NativeCancelReason {
63 pub kind: NativeCancelKind,
64 }
65
66 impl NativeCancelReason {
67 #[must_use]
68 pub const fn timeout() -> Self {
69 Self {
70 kind: NativeCancelKind::Timeout,
71 }
72 }
73
74 #[must_use]
75 pub fn user(_message: impl Into<String>) -> Self {
76 Self {
77 kind: NativeCancelKind::User,
78 }
79 }
80
81 #[must_use]
82 pub const fn parent_cancelled() -> Self {
83 Self {
84 kind: NativeCancelKind::ParentCancelled,
85 }
86 }
87
88 #[must_use]
89 pub const fn resource_unavailable() -> Self {
90 Self {
91 kind: NativeCancelKind::ResourceUnavailable,
92 }
93 }
94 }
95
96 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
97 pub struct NativeCheckpointError;
98
99 #[derive(Debug, Default)]
100 struct NativeCxInner {
101 cancel_requested: AtomicBool,
102 cancel_reason: Mutex<Option<NativeCancelReason>>,
103 }
104
105 #[derive(Debug, Clone, Default)]
106 pub struct NativeCx {
107 inner: Arc<NativeCxInner>,
108 }
109
110 impl NativeCx {
111 #[must_use]
112 pub fn for_testing() -> Self {
113 Self::default()
114 }
115
116 pub fn set_cancel_requested(&self, requested: bool) {
117 self.inner
118 .cancel_requested
119 .store(requested, Ordering::Release);
120 if !requested {
121 *self
122 .inner
123 .cancel_reason
124 .lock()
125 .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
126 }
127 }
128
129 pub fn set_cancel_reason(&self, reason: NativeCancelReason) {
130 *self
131 .inner
132 .cancel_reason
133 .lock()
134 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(reason);
135 self.inner.cancel_requested.store(true, Ordering::Release);
136 }
137
138 #[must_use]
139 pub fn is_cancel_requested(&self) -> bool {
140 self.inner.cancel_requested.load(Ordering::Acquire)
141 }
142
143 #[must_use]
144 pub fn cancel_reason(&self) -> Option<NativeCancelReason> {
145 self.inner
146 .cancel_reason
147 .lock()
148 .unwrap_or_else(std::sync::PoisonError::into_inner)
149 .clone()
150 }
151
152 pub fn checkpoint(&self) -> std::result::Result<(), NativeCheckpointError> {
153 if self.is_cancel_requested() {
154 Err(NativeCheckpointError)
155 } else {
156 Ok(())
157 }
158 }
159 }
160}
161
162#[cfg(not(feature = "native"))]
163use native_cx_shim::NativeCx;
164
165use crate::eprocess::{EProcessDecision, EProcessOracle, EProcessSnapshot};
166
167pub const SQLITE_INTERRUPT: i32 = 9;
169
170pub const MAX_MASK_DEPTH: u32 = 64;
174
175#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
185pub enum CancelState {
186 Created,
187 Running,
188 CancelRequested,
189 Cancelling,
190 Finalizing,
191 Completed,
192}
193
194#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
199pub enum CancelReason {
200 Timeout = 0,
201 UserInterrupt = 1,
202 RegionClose = 2,
203 Abort = 3,
204}
205
206pub mod cap {
208 mod sealed {
209 pub trait Sealed {}
210
211 pub struct Bit<const V: bool>;
212
213 pub trait Le {}
214 impl Le for (Bit<false>, Bit<false>) {}
215 impl Le for (Bit<false>, Bit<true>) {}
216 impl Le for (Bit<true>, Bit<true>) {}
217 }
218
219 #[derive(Debug, Clone, Copy, Default)]
221 pub struct CapSet<
222 const SPAWN: bool,
223 const TIME: bool,
224 const RANDOM: bool,
225 const IO: bool,
226 const REMOTE: bool,
227 >;
228
229 impl<
230 const SPAWN: bool,
231 const TIME: bool,
232 const RANDOM: bool,
233 const IO: bool,
234 const REMOTE: bool,
235 > sealed::Sealed for CapSet<SPAWN, TIME, RANDOM, IO, REMOTE>
236 {
237 }
238
239 pub type All = CapSet<true, true, true, true, true>;
241 pub type None = CapSet<false, false, false, false, false>;
243
244 pub trait SubsetOf<Super>: sealed::Sealed {}
249
250 impl<
251 const S_SPAWN: bool,
252 const S_TIME: bool,
253 const S_RANDOM: bool,
254 const S_IO: bool,
255 const S_REMOTE: bool,
256 const P_SPAWN: bool,
257 const P_TIME: bool,
258 const P_RANDOM: bool,
259 const P_IO: bool,
260 const P_REMOTE: bool,
261 > SubsetOf<CapSet<P_SPAWN, P_TIME, P_RANDOM, P_IO, P_REMOTE>>
262 for CapSet<S_SPAWN, S_TIME, S_RANDOM, S_IO, S_REMOTE>
263 where
264 (sealed::Bit<S_SPAWN>, sealed::Bit<P_SPAWN>): sealed::Le,
265 (sealed::Bit<S_TIME>, sealed::Bit<P_TIME>): sealed::Le,
266 (sealed::Bit<S_RANDOM>, sealed::Bit<P_RANDOM>): sealed::Le,
267 (sealed::Bit<S_IO>, sealed::Bit<P_IO>): sealed::Le,
268 (sealed::Bit<S_REMOTE>, sealed::Bit<P_REMOTE>): sealed::Le,
269 {
270 }
271
272 pub trait HasSpawn: sealed::Sealed {}
273 impl<const TIME: bool, const RANDOM: bool, const IO: bool, const REMOTE: bool> HasSpawn
274 for CapSet<true, TIME, RANDOM, IO, REMOTE>
275 {
276 }
277
278 pub trait HasTime: sealed::Sealed {}
279 impl<const SPAWN: bool, const RANDOM: bool, const IO: bool, const REMOTE: bool> HasTime
280 for CapSet<SPAWN, true, RANDOM, IO, REMOTE>
281 {
282 }
283
284 pub trait HasRandom: sealed::Sealed {}
285 impl<const SPAWN: bool, const TIME: bool, const IO: bool, const REMOTE: bool> HasRandom
286 for CapSet<SPAWN, TIME, true, IO, REMOTE>
287 {
288 }
289
290 pub trait HasIo: sealed::Sealed {}
291 impl<const SPAWN: bool, const TIME: bool, const RANDOM: bool, const REMOTE: bool> HasIo
292 for CapSet<SPAWN, TIME, RANDOM, true, REMOTE>
293 {
294 }
295
296 pub trait HasRemote: sealed::Sealed {}
297 impl<const SPAWN: bool, const TIME: bool, const RANDOM: bool, const IO: bool> HasRemote
298 for CapSet<SPAWN, TIME, RANDOM, IO, true>
299 {
300 }
301}
302
303pub type FullCaps = cap::All;
305pub type StorageCaps = cap::CapSet<false, true, false, true, false>;
307pub type ComputeCaps = cap::None;
309
310#[derive(Debug, Clone, Copy, PartialEq, Eq)]
316pub struct Budget {
317 pub deadline: Option<Duration>,
318 pub poll_quota: u32,
319 pub cost_quota: Option<u64>,
320 pub priority: u8,
321}
322
323impl Budget {
324 pub const INFINITE: Self = Self {
326 deadline: None,
327 poll_quota: u32::MAX,
328 cost_quota: None,
329 priority: 0,
330 };
331
332 pub const MINIMAL: Self = Self {
334 deadline: None,
335 poll_quota: 100,
336 cost_quota: None,
337 priority: 0,
338 };
339
340 #[must_use]
341 pub const fn with_deadline(self, deadline: Duration) -> Self {
342 Self {
343 deadline: Some(deadline),
344 ..self
345 }
346 }
347
348 #[must_use]
349 pub const fn with_priority(self, priority: u8) -> Self {
350 Self { priority, ..self }
351 }
352
353 #[must_use]
354 pub const fn with_poll_quota(self, poll_quota: u32) -> Self {
355 Self { poll_quota, ..self }
356 }
357
358 #[must_use]
359 pub const fn with_cost_quota(self, cost_quota: u64) -> Self {
360 Self {
361 cost_quota: Some(cost_quota),
362 ..self
363 }
364 }
365
366 #[must_use]
368 pub fn meet(self, other: Self) -> Self {
369 Self {
370 deadline: match (self.deadline, other.deadline) {
371 (Some(a), Some(b)) => Some(a.min(b)),
372 (Some(a), None) => Some(a),
373 (None, Some(b)) => Some(b),
374 (None, None) => None,
375 },
376 poll_quota: self.poll_quota.min(other.poll_quota),
377 cost_quota: match (self.cost_quota, other.cost_quota) {
378 (Some(a), Some(b)) => Some(a.min(b)),
379 (Some(a), None) => Some(a),
380 (None, Some(b)) => Some(b),
381 (None, None) => None,
382 },
383 priority: self.priority.max(other.priority),
384 }
385 }
386}
387
388#[derive(Debug, Clone, Copy, PartialEq, Eq)]
389pub enum ErrorKind {
390 Cancelled,
391}
392
393#[derive(Debug, Clone, PartialEq, Eq)]
394pub struct Error {
395 kind: ErrorKind,
396}
397
398impl std::fmt::Display for Error {
399 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
400 match self.kind {
401 ErrorKind::Cancelled => write!(f, "operation cancelled"),
402 }
403 }
404}
405
406impl std::error::Error for Error {}
407
408impl Error {
409 #[must_use]
410 pub const fn cancelled() -> Self {
411 Self {
412 kind: ErrorKind::Cancelled,
413 }
414 }
415
416 #[must_use]
417 pub const fn kind(&self) -> ErrorKind {
418 self.kind
419 }
420
421 #[must_use]
422 pub const fn sqlite_error_code(&self) -> i32 {
423 match self.kind {
424 ErrorKind::Cancelled => SQLITE_INTERRUPT,
425 }
426 }
427}
428
429pub type Result<T, E = Error> = std::result::Result<T, E>;
430
431#[derive(Debug)]
432struct CxInner {
433 cancel_requested: AtomicBool,
434 cancel_state: Mutex<CancelState>,
435 cancel_reason: Mutex<Option<CancelReason>>,
436 mask_depth: AtomicU32,
437 children: Mutex<Vec<Weak<Self>>>,
438 last_checkpoint_msg: Mutex<Option<String>>,
439 last_eprocess_decision: Mutex<Option<EProcessDecision>>,
440 eprocess_oracle: std::sync::OnceLock<Arc<EProcessOracle>>,
441 #[cfg(feature = "native")]
442 attached_native_cx: Mutex<Option<NativeCx>>,
443 #[cfg(feature = "native")]
444 fallback_native_cx: std::sync::OnceLock<NativeCx>,
445 unix_millis: AtomicU64,
447}
448
449impl CxInner {
450 fn new() -> Self {
451 Self {
452 cancel_requested: AtomicBool::new(false),
453 cancel_state: Mutex::new(CancelState::Created),
454 cancel_reason: Mutex::new(None),
455 mask_depth: AtomicU32::new(0),
456 children: Mutex::new(Vec::new()),
457 last_checkpoint_msg: Mutex::new(None),
458 last_eprocess_decision: Mutex::new(None),
459 eprocess_oracle: std::sync::OnceLock::new(),
460 #[cfg(feature = "native")]
461 attached_native_cx: Mutex::new(None),
462 #[cfg(feature = "native")]
463 fallback_native_cx: std::sync::OnceLock::new(),
464 unix_millis: AtomicU64::new(0),
465 }
466 }
467}
468
469#[cfg(feature = "native")]
470#[must_use]
471fn local_reason_to_native(reason: CancelReason) -> NativeCancelReason {
472 match reason {
473 CancelReason::Timeout => NativeCancelReason::timeout(),
474 CancelReason::UserInterrupt => NativeCancelReason::user("sqlite interrupt"),
475 CancelReason::RegionClose => NativeCancelReason::parent_cancelled(),
476 CancelReason::Abort => NativeCancelReason::resource_unavailable(),
477 }
478}
479
480#[cfg(feature = "native")]
481#[must_use]
482fn native_reason_to_local(reason: &NativeCancelReason) -> CancelReason {
483 match reason.kind {
484 NativeCancelKind::User => CancelReason::UserInterrupt,
485 NativeCancelKind::Timeout
486 | NativeCancelKind::Deadline
487 | NativeCancelKind::PollQuota
488 | NativeCancelKind::CostBudget => CancelReason::Timeout,
489 NativeCancelKind::FailFast
490 | NativeCancelKind::RaceLost
491 | NativeCancelKind::ParentCancelled
492 | NativeCancelKind::Shutdown
493 | NativeCancelKind::LinkedExit => CancelReason::RegionClose,
494 NativeCancelKind::ResourceUnavailable => CancelReason::Abort,
495 }
496}
497
498#[cfg(feature = "native")]
499fn sync_native_cx_cancel(inner: &CxInner, reason: CancelReason) {
500 let attached_native = inner
501 .attached_native_cx
502 .lock()
503 .unwrap_or_else(std::sync::PoisonError::into_inner)
504 .as_ref()
505 .cloned();
506 if let Some(native) = attached_native {
507 native.set_cancel_reason(local_reason_to_native(reason));
508 }
509 if let Some(native) = inner.fallback_native_cx.get() {
510 native.set_cancel_reason(local_reason_to_native(reason));
511 }
512}
513
514#[cfg(feature = "native")]
515#[must_use]
516fn native_budget_from_local(budget: Budget) -> NativeBudget {
517 let mut native_budget = NativeBudget::new()
518 .with_poll_quota(budget.poll_quota)
519 .with_priority(budget.priority);
520 if let Some(cost_quota) = budget.cost_quota {
521 native_budget = native_budget.with_cost_quota(cost_quota);
522 }
523 if let Some(deadline) = budget.deadline {
524 native_budget = native_budget.with_deadline(local_deadline_to_native_time(deadline));
525 }
526 native_budget
527}
528
529#[cfg(feature = "native")]
530#[must_use]
531fn wall_clock_now_since_epoch() -> Duration {
532 std::time::SystemTime::now()
533 .duration_since(std::time::UNIX_EPOCH)
534 .unwrap_or(Duration::ZERO)
535}
536
537#[cfg(feature = "native")]
538#[must_use]
539fn local_deadline_to_native_time(deadline: Duration) -> NativeTime {
540 let absolute_deadline = wall_clock_now_since_epoch()
541 .checked_add(deadline)
542 .unwrap_or(Duration::MAX);
543 let nanos = u64::try_from(absolute_deadline.as_nanos()).unwrap_or(u64::MAX);
544 NativeTime::from_nanos(nanos)
545}
546
547fn propagate_cancel(inner: &CxInner, reason: CancelReason) {
552 inner.cancel_requested.store(true, Ordering::Release);
554
555 {
557 let mut r = inner
558 .cancel_reason
559 .lock()
560 .unwrap_or_else(std::sync::PoisonError::into_inner);
561 match *r {
562 Some(existing) if existing >= reason => {}
563 _ => *r = Some(reason),
564 }
565 }
566
567 {
569 let mut state = inner
570 .cancel_state
571 .lock()
572 .unwrap_or_else(std::sync::PoisonError::into_inner);
573 if matches!(*state, CancelState::Created | CancelState::Running) {
574 *state = CancelState::CancelRequested;
575 }
576 }
577
578 #[cfg(feature = "native")]
581 sync_native_cx_cancel(inner, reason);
582
583 let children: Vec<Arc<CxInner>> = {
585 let mut guard = inner
586 .children
587 .lock()
588 .unwrap_or_else(std::sync::PoisonError::into_inner);
589 guard.retain(|child| child.strong_count() > 0);
590 guard.iter().filter_map(Weak::upgrade).collect()
591 };
592 for child in &children {
593 propagate_cancel(child, reason);
594 }
595}
596
597#[derive(Debug)]
603pub struct Cx<Caps: cap::SubsetOf<cap::All> = FullCaps> {
604 inner: Arc<CxInner>,
605 budget: Budget,
606 trace_id: u64,
607 decision_id: u64,
608 policy_id: u64,
609 _caps: PhantomData<fn() -> Caps>,
611}
612
613impl<Caps: cap::SubsetOf<cap::All>> Clone for Cx<Caps> {
614 fn clone(&self) -> Self {
615 Self {
616 inner: Arc::clone(&self.inner),
617 budget: self.budget,
618 trace_id: self.trace_id,
619 decision_id: self.decision_id,
620 policy_id: self.policy_id,
621 _caps: PhantomData,
622 }
623 }
624}
625
626impl Default for Cx<FullCaps> {
627 fn default() -> Self {
628 Self::new()
629 }
630}
631
632impl Cx<FullCaps> {
633 #[must_use]
634 pub fn new() -> Self {
635 Self::with_budget(Budget::INFINITE)
636 }
637}
638
639impl<Caps: cap::SubsetOf<cap::All>> Cx<Caps> {
640 #[cfg(feature = "native")]
641 #[must_use]
642 fn effective_native_cx(&self) -> NativeCx {
643 let attached_native = self
644 .inner
645 .attached_native_cx
646 .lock()
647 .unwrap_or_else(std::sync::PoisonError::into_inner)
648 .as_ref()
649 .cloned();
650 if let Some(native) = attached_native {
651 return native;
652 }
653
654 self.inner
655 .fallback_native_cx
656 .get_or_init(|| {
657 let native =
658 NativeCx::for_request_with_budget(native_budget_from_local(self.budget));
659 if let Some(reason) = self.cancel_reason() {
660 native.set_cancel_reason(local_reason_to_native(reason));
661 } else if self.is_cancel_requested() {
662 native.set_cancel_requested(true);
663 }
664 native
665 })
666 .clone()
667 }
668
669 #[must_use]
670 pub fn with_budget(budget: Budget) -> Self {
671 Self {
672 inner: Arc::new(CxInner::new()),
673 budget,
674 trace_id: 0,
675 decision_id: 0,
676 policy_id: 0,
677 _caps: PhantomData,
678 }
679 }
680
681 #[must_use]
682 pub fn budget(&self) -> Budget {
683 self.budget
684 }
685
686 #[must_use]
692 pub fn trace_id(&self) -> u64 {
693 self.trace_id
694 }
695
696 #[must_use]
698 pub fn decision_id(&self) -> u64 {
699 self.decision_id
700 }
701
702 #[must_use]
704 pub fn policy_id(&self) -> u64 {
705 self.policy_id
706 }
707
708 #[must_use]
712 pub fn with_trace_context(mut self, trace_id: u64, decision_id: u64, policy_id: u64) -> Self {
713 self.trace_id = trace_id;
714 self.decision_id = decision_id;
715 self.policy_id = policy_id;
716 self
717 }
718
719 #[must_use]
723 pub fn with_decision_id(mut self, decision_id: u64) -> Self {
724 self.decision_id = decision_id;
725 self
726 }
727
728 #[must_use]
730 pub fn with_policy_id(mut self, policy_id: u64) -> Self {
731 self.policy_id = policy_id;
732 self
733 }
734
735 #[must_use]
741 pub fn scope_with_budget(&self, child: Budget) -> Self {
742 Self {
743 inner: Arc::clone(&self.inner),
744 budget: self.budget.meet(child),
745 trace_id: self.trace_id,
746 decision_id: self.decision_id,
747 policy_id: self.policy_id,
748 _caps: PhantomData,
749 }
750 }
751
752 #[must_use]
754 pub fn cleanup_scope(&self) -> Self {
755 self.scope_with_budget(Budget::MINIMAL)
756 }
757
758 #[must_use]
762 pub fn restrict<NewCaps>(&self) -> Cx<NewCaps>
763 where
764 NewCaps: cap::SubsetOf<cap::All> + cap::SubsetOf<Caps>,
765 {
766 self.retype()
767 }
768
769 #[must_use]
771 fn retype<NewCaps>(&self) -> Cx<NewCaps>
772 where
773 NewCaps: cap::SubsetOf<cap::All>,
774 {
775 Cx {
776 inner: Arc::clone(&self.inner),
777 budget: self.budget,
778 trace_id: self.trace_id,
779 decision_id: self.decision_id,
780 policy_id: self.policy_id,
781 _caps: PhantomData,
782 }
783 }
784
785 #[must_use]
790 pub fn is_cancel_requested(&self) -> bool {
791 self.inner.cancel_requested.load(Ordering::Acquire)
792 }
793
794 pub fn cancel(&self) {
798 self.cancel_with_reason(CancelReason::UserInterrupt);
799 }
800
801 pub fn cancel_with_reason(&self, reason: CancelReason) {
808 propagate_cancel(&self.inner, reason);
809 }
810
811 #[must_use]
813 pub fn cancel_state(&self) -> CancelState {
814 *self
815 .inner
816 .cancel_state
817 .lock()
818 .unwrap_or_else(std::sync::PoisonError::into_inner)
819 }
820
821 #[must_use]
823 pub fn cancel_reason(&self) -> Option<CancelReason> {
824 *self
825 .inner
826 .cancel_reason
827 .lock()
828 .unwrap_or_else(std::sync::PoisonError::into_inner)
829 }
830
831 pub fn transition_to_running(&self) {
833 let mut state = self
834 .inner
835 .cancel_state
836 .lock()
837 .unwrap_or_else(std::sync::PoisonError::into_inner);
838 if *state == CancelState::Created {
839 *state = CancelState::Running;
840 }
841 }
842
843 pub fn transition_to_finalizing(&self) {
845 let mut state = self
846 .inner
847 .cancel_state
848 .lock()
849 .unwrap_or_else(std::sync::PoisonError::into_inner);
850 if *state == CancelState::Cancelling {
851 *state = CancelState::Finalizing;
852 }
853 }
854
855 pub fn transition_to_completed(&self) {
857 let mut state = self
858 .inner
859 .cancel_state
860 .lock()
861 .unwrap_or_else(std::sync::PoisonError::into_inner);
862 if matches!(*state, CancelState::Finalizing | CancelState::Running) {
863 *state = CancelState::Completed;
864 }
865 }
866
867 pub fn set_eprocess_oracle(&self, oracle: Arc<EProcessOracle>) {
869 let _ = self.inner.eprocess_oracle.set(oracle);
870 }
871
872 pub fn clear_eprocess_oracle(&self) {
874 }
877
878 #[cfg(feature = "native")]
880 pub fn set_native_cx(&self, native_cx: NativeCx) {
881 if let Some(reason) = self.cancel_reason() {
882 native_cx.set_cancel_reason(local_reason_to_native(reason));
883 } else if self.is_cancel_requested() {
884 native_cx.set_cancel_requested(true);
885 }
886 *self
887 .inner
888 .attached_native_cx
889 .lock()
890 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(native_cx);
891 }
892
893 #[cfg(not(feature = "native"))]
895 pub fn set_native_cx<T>(&self, _native_cx: T) {}
896
897 #[cfg(feature = "native")]
899 #[must_use]
900 pub fn attached_native_cx(&self) -> Option<NativeCx> {
901 self.inner
902 .attached_native_cx
903 .lock()
904 .unwrap_or_else(std::sync::PoisonError::into_inner)
905 .clone()
906 }
907
908 #[cfg(not(feature = "native"))]
910 #[must_use]
911 pub fn attached_native_cx(&self) -> Option<NativeCx> {
912 None
913 }
914
915 #[cfg(feature = "native")]
917 pub fn clear_native_cx(&self) {
918 *self
919 .inner
920 .attached_native_cx
921 .lock()
922 .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
923 }
924
925 #[cfg(not(feature = "native"))]
927 pub fn clear_native_cx(&self) {}
928
929 #[must_use]
930 fn maybe_cancel_via_eprocess(&self) -> bool {
931 let Some(oracle) = self.inner.eprocess_oracle.get() else {
932 return false;
933 };
934 let decision = oracle.decision(self.budget.priority);
935 self.record_eprocess_decision(decision.clone());
936 tracing::debug!(
937 target: "fsqlite::cx",
938 event = "eprocess_checkpoint",
939 trace_id = self.trace_id,
940 decision_id = self.decision_id,
941 policy_id = self.policy_id,
942 priority = decision.priority,
943 evalue = decision.snapshot.evalue,
944 threshold = decision.snapshot.rejection_threshold,
945 observations = decision.snapshot.observations,
946 priority_threshold = decision.snapshot.priority_threshold,
947 should_shed = decision.should_shed,
948 signal = ?decision.snapshot.last_signal
949 );
950 if decision.should_shed {
951 tracing::info!(
952 target: "fsqlite::cx",
953 event = "eprocess_shedding_triggered",
954 trace_id = self.trace_id,
955 decision_id = self.decision_id,
956 policy_id = self.policy_id,
957 priority = decision.priority,
958 evalue = decision.snapshot.evalue,
959 threshold = decision.snapshot.rejection_threshold,
960 signal = ?decision.snapshot.last_signal
961 );
962 self.cancel_with_reason(CancelReason::Abort);
963 return true;
964 }
965 false
966 }
967
968 #[cfg(feature = "native")]
969 #[must_use]
970 fn maybe_cancel_via_native_cx(&self, masked: bool) -> bool {
971 let native = self.effective_native_cx();
972
973 if masked {
974 if native.is_cancel_requested() {
975 let reason = native
976 .cancel_reason()
977 .as_ref()
978 .map_or(CancelReason::Timeout, native_reason_to_local);
979 self.cancel_with_reason(reason);
980 return true;
981 }
982 return false;
983 }
984
985 if native.checkpoint().is_err() {
986 let reason = native
987 .cancel_reason()
988 .as_ref()
989 .map_or(CancelReason::Timeout, native_reason_to_local);
990 self.cancel_with_reason(reason);
991 return true;
992 }
993 false
994 }
995
996 pub fn checkpoint(&self) -> Result<()> {
1006 let masked = self.inner.mask_depth.load(Ordering::Acquire) > 0;
1007
1008 #[cfg(feature = "native")]
1010 let native_cancel = self.maybe_cancel_via_native_cx(masked);
1011 #[cfg(not(feature = "native"))]
1012 let native_cancel = false;
1013
1014 if !self.inner.cancel_requested.load(Ordering::Acquire)
1015 && !self.maybe_cancel_via_eprocess()
1016 && !native_cancel
1017 {
1018 return Ok(());
1019 }
1020 if masked {
1022 return Ok(());
1023 }
1024 {
1026 let mut state = self
1027 .inner
1028 .cancel_state
1029 .lock()
1030 .unwrap_or_else(std::sync::PoisonError::into_inner);
1031 if *state == CancelState::CancelRequested {
1032 *state = CancelState::Cancelling;
1033 }
1034 }
1035 Err(Error::cancelled())
1036 }
1037
1038 pub fn checkpoint_with(&self, msg: impl Into<String>) -> Result<()> {
1040 {
1041 let mut guard = self
1042 .inner
1043 .last_checkpoint_msg
1044 .lock()
1045 .unwrap_or_else(std::sync::PoisonError::into_inner);
1046 *guard = Some(msg.into());
1047 }
1048 self.checkpoint()
1049 }
1050
1051 #[must_use]
1052 pub fn last_checkpoint_message(&self) -> Option<String> {
1053 self.inner
1054 .last_checkpoint_msg
1055 .lock()
1056 .unwrap_or_else(std::sync::PoisonError::into_inner)
1057 .clone()
1058 }
1059
1060 #[must_use]
1062 pub fn last_eprocess_decision(&self) -> Option<EProcessDecision> {
1063 self.inner
1064 .last_eprocess_decision
1065 .lock()
1066 .unwrap_or_else(std::sync::PoisonError::into_inner)
1067 .clone()
1068 }
1069
1070 #[must_use]
1072 pub fn last_eprocess_snapshot(&self) -> Option<EProcessSnapshot> {
1073 self.last_eprocess_decision()
1074 .map(|decision| decision.snapshot)
1075 }
1076
1077 fn record_eprocess_decision(&self, decision: EProcessDecision) {
1078 *self
1079 .inner
1080 .last_eprocess_decision
1081 .lock()
1082 .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(decision);
1083 }
1084
1085 #[must_use]
1098 pub fn masked(&self) -> MaskGuard<'_> {
1099 let prev = self.inner.mask_depth.fetch_add(1, Ordering::AcqRel);
1100 if prev >= MAX_MASK_DEPTH {
1101 self.inner.mask_depth.fetch_sub(1, Ordering::Release);
1102 assert!(
1103 prev < MAX_MASK_DEPTH,
1104 "MAX_MASK_DEPTH ({MAX_MASK_DEPTH}) exceeded: mask nesting depth would be {}",
1105 prev + 1
1106 );
1107 }
1108 MaskGuard { inner: &self.inner }
1109 }
1110
1111 #[must_use]
1113 pub fn mask_depth(&self) -> u32 {
1114 self.inner.mask_depth.load(Ordering::Acquire)
1115 }
1116
1117 pub fn commit_section<R>(
1126 &self,
1127 poll_quota: u32,
1128 body: impl FnOnce(&CommitCtx) -> R,
1129 finalizer: impl FnOnce(),
1130 ) -> R {
1131 struct FinGuard<G: FnOnce()>(Option<G>);
1132 impl<G: FnOnce()> Drop for FinGuard<G> {
1133 fn drop(&mut self) {
1134 if let Some(f) = self.0.take() {
1135 f();
1136 }
1137 }
1138 }
1139
1140 let _mask = self.masked();
1141 let _fin = FinGuard(Some(finalizer));
1142 let ctx = CommitCtx::new(poll_quota);
1143 body(&ctx)
1144 }
1145
1146 #[must_use]
1154 pub fn create_child(&self) -> Self {
1155 let mut child = Self::with_budget(self.budget);
1156 child.trace_id = self.trace_id;
1157 child.decision_id = self.decision_id;
1158 child.policy_id = self.policy_id;
1159 if let Some(oracle) = self.inner.eprocess_oracle.get().cloned() {
1160 child.set_eprocess_oracle(oracle);
1161 }
1162 #[cfg(feature = "native")]
1163 if let Some(native_cx) = self.attached_native_cx() {
1164 child.set_native_cx(native_cx);
1165 }
1166 {
1167 let mut children = self
1168 .inner
1169 .children
1170 .lock()
1171 .unwrap_or_else(std::sync::PoisonError::into_inner);
1172 children.push(Arc::downgrade(&child.inner));
1173 }
1174 if let Some(reason) = self.cancel_reason() {
1175 child.cancel_with_reason(reason);
1176 } else if self.is_cancel_requested() {
1177 child.cancel();
1178 }
1179 child
1180 }
1181
1182 pub fn set_unix_millis_for_testing(&self, millis: u64)
1184 where
1185 Caps: cap::HasTime,
1186 {
1187 self.inner.unix_millis.store(millis, Ordering::Release);
1188 }
1189
1190 #[must_use]
1192 pub fn current_time_julian_day(&self) -> f64
1193 where
1194 Caps: cap::HasTime,
1195 {
1196 let millis = self.inner.unix_millis.load(Ordering::Acquire);
1197 #[allow(clippy::cast_precision_loss)]
1198 let secs = (millis as f64) / 1000.0;
1199 2_440_587.5 + (secs / 86_400.0)
1201 }
1202}
1203
1204#[derive(Debug)]
1212pub struct MaskGuard<'a> {
1213 inner: &'a CxInner,
1214}
1215
1216impl Drop for MaskGuard<'_> {
1217 fn drop(&mut self) {
1218 self.inner.mask_depth.fetch_sub(1, Ordering::Release);
1219 }
1220}
1221
1222#[derive(Debug)]
1230pub struct CommitCtx {
1231 poll_remaining: AtomicU32,
1232}
1233
1234impl CommitCtx {
1235 fn new(poll_quota: u32) -> Self {
1236 Self {
1237 poll_remaining: AtomicU32::new(poll_quota),
1238 }
1239 }
1240
1241 #[must_use]
1243 pub fn poll_remaining(&self) -> u32 {
1244 self.poll_remaining.load(Ordering::Acquire)
1245 }
1246
1247 pub fn tick(&self) -> bool {
1249 let prev = self.poll_remaining.load(Ordering::Acquire);
1250 if prev == 0 {
1251 return false;
1252 }
1253 self.poll_remaining.fetch_sub(1, Ordering::AcqRel);
1254 true
1255 }
1256}
1257
1258#[cfg(test)]
1259mod tests {
1260 use super::*;
1261 use crate::eprocess::{EProcessConfig, EProcessSignal};
1262 use std::path::{Path, PathBuf};
1263 use std::sync::{Arc, Weak};
1264
1265 #[test]
1266 fn test_cx_checkpoint_observes_cancellation() {
1267 let cx = Cx::new();
1268 assert!(cx.checkpoint().is_ok());
1269 cx.cancel();
1270 let err = cx.checkpoint().unwrap_err();
1271 assert_eq!(err.kind(), ErrorKind::Cancelled);
1272 assert_eq!(err.sqlite_error_code(), SQLITE_INTERRUPT);
1273 }
1274
1275 #[test]
1276 fn test_cx_capability_narrowing_compiles() {
1277 let cx = Cx::<FullCaps>::new();
1278 let _compute = cx.restrict::<ComputeCaps>();
1279 let _storage = cx.restrict::<StorageCaps>();
1280 }
1281
1282 #[test]
1283 fn test_cx_budget_meet_tightens() {
1284 let parent = Budget::INFINITE.with_deadline(Duration::from_millis(100));
1285 let child = Budget::INFINITE.with_deadline(Duration::from_millis(200));
1286 let effective = parent.meet(child);
1287 assert_eq!(effective.deadline, Some(Duration::from_millis(100)));
1288 }
1289
1290 #[test]
1291 fn test_cx_budget_priority_join() {
1292 let parent = Budget::INFINITE.with_priority(2);
1293 let child = Budget::INFINITE.with_priority(5);
1294 let effective = parent.meet(child);
1295 assert_eq!(effective.priority, 5);
1296 }
1297
1298 #[test]
1299 fn test_cx_scope_with_budget_cannot_loosen() {
1300 let cx =
1301 Cx::<FullCaps>::with_budget(Budget::INFINITE.with_deadline(Duration::from_millis(50)));
1302 let child = Budget::INFINITE.with_deadline(Duration::from_millis(100));
1303 let scoped = cx.scope_with_budget(child);
1304 assert_eq!(scoped.budget().deadline, Some(Duration::from_millis(50)));
1305 }
1306
1307 #[test]
1308 fn test_cx_checkpoint_with_message_records_message() {
1309 let cx = Cx::new();
1310 assert!(cx.checkpoint_with("vdbe pc=5").is_ok());
1311 assert_eq!(cx.last_checkpoint_message().as_deref(), Some("vdbe pc=5"));
1312 }
1313
1314 #[test]
1315 fn test_cx_cleanup_uses_minimal_budget() {
1316 let cx = Cx::<FullCaps>::with_budget(Budget::INFINITE.with_poll_quota(10_000));
1317 let cleanup = cx.cleanup_scope();
1318 assert_eq!(cleanup.budget(), Budget::MINIMAL);
1319 }
1320
1321 #[test]
1322 fn test_cx_restrict_storage_to_compute() {
1323 let cx = Cx::<FullCaps>::new();
1324 let storage = cx.restrict::<StorageCaps>();
1325 let _compute = storage.restrict::<ComputeCaps>();
1326 }
1327
1328 #[test]
1329 fn test_cx_restrict_is_zero_cost() {
1330 assert_eq!(
1333 std::mem::size_of::<Cx<FullCaps>>(),
1334 std::mem::size_of::<Cx<ComputeCaps>>()
1335 );
1336 }
1337
1338 #[test]
1339 fn test_budget_mixed_lattice() {
1340 let a = Budget {
1341 deadline: Some(Duration::from_millis(100)),
1342 poll_quota: 500,
1343 cost_quota: Some(1000),
1344 priority: 2,
1345 };
1346 let b = Budget {
1347 deadline: Some(Duration::from_millis(200)),
1348 poll_quota: 300,
1349 cost_quota: Some(2000),
1350 priority: 5,
1351 };
1352 let m = a.meet(b);
1353 assert_eq!(m.deadline, Some(Duration::from_millis(100)));
1355 assert_eq!(m.poll_quota, 300);
1356 assert_eq!(m.cost_quota, Some(1000));
1357 assert_eq!(m.priority, 5);
1359 }
1360
1361 #[test]
1362 fn test_budget_meet_commutative() {
1363 let a = Budget {
1364 deadline: Some(Duration::from_millis(50)),
1365 poll_quota: 400,
1366 cost_quota: Some(800),
1367 priority: 3,
1368 };
1369 let b = Budget {
1370 deadline: Some(Duration::from_millis(150)),
1371 poll_quota: 200,
1372 cost_quota: None,
1373 priority: 7,
1374 };
1375 assert_eq!(a.meet(b), b.meet(a));
1376 }
1377
1378 #[test]
1379 fn test_budget_meet_associative() {
1380 let a = Budget::INFINITE
1381 .with_deadline(Duration::from_millis(50))
1382 .with_poll_quota(100)
1383 .with_priority(1);
1384 let b = Budget::INFINITE
1385 .with_deadline(Duration::from_millis(150))
1386 .with_poll_quota(200)
1387 .with_priority(5);
1388 let c = Budget::INFINITE
1389 .with_deadline(Duration::from_millis(75))
1390 .with_poll_quota(50)
1391 .with_priority(3);
1392 assert_eq!(a.meet(b).meet(c), a.meet(b.meet(c)));
1393 }
1394
1395 #[test]
1396 fn test_budget_minimal_is_stricter_than_normal() {
1397 let normal = Budget::INFINITE.with_poll_quota(10_000);
1398 let effective = normal.meet(Budget::MINIMAL);
1399 assert_eq!(effective.poll_quota, Budget::MINIMAL.poll_quota);
1400 }
1401
1402 #[test]
1403 fn test_cx_cancel_shared_across_clones() {
1404 let cx1 = Cx::<FullCaps>::new();
1405 let cx2 = cx1.clone();
1406 assert!(!cx2.is_cancel_requested());
1407 cx1.cancel();
1408 assert!(cx2.is_cancel_requested());
1409 assert!(cx2.checkpoint().is_err());
1410 }
1411
1412 #[test]
1413 fn test_cx_cancel_shared_across_restrict() {
1414 let cx = Cx::<FullCaps>::new();
1415 let compute = cx.restrict::<ComputeCaps>();
1416 cx.cancel();
1417 assert!(compute.checkpoint().is_err());
1418 }
1419
1420 #[test]
1421 fn test_cx_current_time_julian_day() {
1422 let cx = Cx::<FullCaps>::new();
1423 cx.set_unix_millis_for_testing(0);
1425 let jd = cx.current_time_julian_day();
1426 assert!((jd - 2_440_587.5).abs() < 1e-10);
1427
1428 cx.set_unix_millis_for_testing(86_400_000);
1430 let jd = cx.current_time_julian_day();
1431 assert!((jd - 2_440_588.5).abs() < 1e-10);
1432 }
1433
1434 #[test]
1435 fn test_capset_is_zero_sized() {
1436 assert_eq!(std::mem::size_of::<cap::All>(), 0);
1437 assert_eq!(std::mem::size_of::<cap::None>(), 0);
1438 assert_eq!(
1439 std::mem::size_of::<cap::CapSet<true, false, true, false, true>>(),
1440 0
1441 );
1442 }
1443
1444 #[test]
1445 fn test_cx_checkpoint_not_cancelled() {
1446 let cx = Cx::new();
1447 assert!(cx.checkpoint().is_ok());
1448 assert!(cx.checkpoint_with("still going").is_ok());
1449 }
1450
1451 #[test]
1452 fn test_cx_checkpoint_maps_to_sqlite_interrupt() {
1453 let cx = Cx::new();
1454 cx.cancel();
1455 let err = cx.checkpoint().unwrap_err();
1456 assert_eq!(err.sqlite_error_code(), SQLITE_INTERRUPT);
1457 }
1458
1459 #[test]
1460 fn test_cx_checkpoint_eprocess_sheds_low_priority_context() {
1461 let cx = Cx::<FullCaps>::with_budget(Budget::INFINITE.with_priority(3));
1462 let oracle = Arc::new(EProcessOracle::new(
1463 EProcessConfig {
1464 p0: 0.1,
1465 lambda: 5.0,
1466 alpha: 0.05,
1467 max_evalue: 1e12,
1468 },
1469 1,
1470 ));
1471 let signal = EProcessSignal::new(1.0, 1.0, 1.0);
1472 oracle.observe_signal(signal);
1473 oracle.observe_signal(signal);
1474 cx.set_eprocess_oracle(oracle);
1475 let err = cx.checkpoint().unwrap_err();
1476 assert_eq!(err.kind(), ErrorKind::Cancelled);
1477 assert_eq!(cx.cancel_reason(), Some(CancelReason::Abort));
1478 let decision = cx
1479 .last_eprocess_decision()
1480 .expect("checkpoint should record an e-process decision");
1481 assert!(decision.should_shed);
1482 assert_eq!(decision.snapshot.last_signal, Some(signal));
1483 }
1484
1485 #[test]
1486 fn test_cx_checkpoint_eprocess_respects_priority_threshold() {
1487 let cx = Cx::<FullCaps>::with_budget(Budget::INFINITE.with_priority(1));
1488 let oracle = Arc::new(EProcessOracle::new(
1489 EProcessConfig {
1490 p0: 0.1,
1491 lambda: 5.0,
1492 alpha: 0.05,
1493 max_evalue: 1e12,
1494 },
1495 1,
1496 ));
1497 let signal = EProcessSignal::new(1.0, 1.0, 1.0);
1498 oracle.observe_signal(signal);
1499 oracle.observe_signal(signal);
1500 cx.set_eprocess_oracle(oracle);
1501 assert!(cx.checkpoint().is_ok());
1502 assert!(!cx.is_cancel_requested());
1503 let decision = cx
1504 .last_eprocess_decision()
1505 .expect("checkpoint should still record non-shedding decisions");
1506 assert!(!decision.should_shed);
1507 assert_eq!(decision.priority, 1);
1508 assert_eq!(decision.snapshot.last_signal, Some(signal));
1509 }
1510
1511 #[test]
1512 fn test_cx_checkpoint_eprocess_preserves_masking_semantics() {
1513 let cx = Cx::<FullCaps>::with_budget(Budget::INFINITE.with_priority(3));
1514 let oracle = Arc::new(EProcessOracle::new(
1515 EProcessConfig {
1516 p0: 0.1,
1517 lambda: 5.0,
1518 alpha: 0.05,
1519 max_evalue: 1e12,
1520 },
1521 1,
1522 ));
1523 let signal = EProcessSignal::new(1.0, 1.0, 1.0);
1524 oracle.observe_signal(signal);
1525 oracle.observe_signal(signal);
1526 cx.set_eprocess_oracle(oracle);
1527 {
1528 let _mask = cx.masked();
1529 assert!(cx.checkpoint().is_ok());
1530 assert!(cx.is_cancel_requested());
1531 assert_eq!(cx.cancel_state(), CancelState::CancelRequested);
1532 assert_eq!(
1533 cx.last_eprocess_snapshot()
1534 .expect("checkpoint should record the masked decision")
1535 .last_signal,
1536 Some(signal)
1537 );
1538 }
1539 let err = cx.checkpoint().unwrap_err();
1540 assert_eq!(err.kind(), ErrorKind::Cancelled);
1541 }
1542
1543 #[test]
1544 fn test_create_child_inherits_eprocess_oracle() {
1545 let parent = Cx::<FullCaps>::with_budget(Budget::INFINITE.with_priority(3));
1546 let oracle = Arc::new(EProcessOracle::new(
1547 EProcessConfig {
1548 p0: 0.1,
1549 lambda: 5.0,
1550 alpha: 0.05,
1551 max_evalue: 1e12,
1552 },
1553 1,
1554 ));
1555 let signal = EProcessSignal::new(1.0, 1.0, 1.0);
1556 oracle.observe_signal(signal);
1557 oracle.observe_signal(signal);
1558 parent.set_eprocess_oracle(oracle);
1559
1560 let child = parent.create_child();
1561 let err = child.checkpoint().unwrap_err();
1562 assert_eq!(err.kind(), ErrorKind::Cancelled);
1563 assert_eq!(child.cancel_reason(), Some(CancelReason::Abort));
1564 assert_eq!(
1565 child
1566 .last_eprocess_snapshot()
1567 .expect("child checkpoint should record inherited oracle decision")
1568 .last_signal,
1569 Some(signal)
1570 );
1571 }
1572
1573 #[test]
1574 fn test_create_child_inherits_preexisting_parent_cancellation() {
1575 let parent = Cx::<FullCaps>::new();
1576 parent.cancel_with_reason(CancelReason::RegionClose);
1577
1578 let child = parent.create_child();
1579 assert_eq!(child.cancel_reason(), Some(CancelReason::RegionClose));
1580 assert_eq!(child.cancel_state(), CancelState::CancelRequested);
1581
1582 let err = child.checkpoint().unwrap_err();
1583 assert_eq!(err.kind(), ErrorKind::Cancelled);
1584 }
1585
1586 #[cfg(feature = "native")]
1587 #[test]
1588 fn test_cx_checkpoint_native_cx_cancellation_maps_reason() {
1589 let cx = Cx::<FullCaps>::new();
1590 let native = NativeCx::for_testing();
1591 cx.set_native_cx(native.clone());
1592 native.set_cancel_reason(NativeCancelReason::timeout());
1593
1594 let err = cx.checkpoint().unwrap_err();
1595 assert_eq!(err.kind(), ErrorKind::Cancelled);
1596 assert_eq!(cx.cancel_reason(), Some(CancelReason::Timeout));
1597 }
1598
1599 #[cfg(feature = "native")]
1600 #[test]
1601 fn test_cx_cancel_reason_propagates_to_native_cx() {
1602 let cx = Cx::<FullCaps>::new();
1603 let native = NativeCx::for_testing();
1604 cx.set_native_cx(native.clone());
1605
1606 cx.cancel_with_reason(CancelReason::RegionClose);
1607 let reason = native
1608 .cancel_reason()
1609 .expect("native cancel reason must be set");
1610 assert_eq!(reason.kind, NativeCancelKind::ParentCancelled);
1611 }
1612
1613 #[cfg(feature = "native")]
1614 #[test]
1615 fn test_cx_checkpoint_native_cx_respects_local_masking() {
1616 let cx = Cx::<FullCaps>::new();
1617 let native = NativeCx::for_testing();
1618 cx.set_native_cx(native.clone());
1619 native.set_cancel_reason(NativeCancelReason::user("cancel"));
1620
1621 {
1622 let _mask = cx.masked();
1623 assert!(cx.checkpoint().is_ok());
1624 assert!(cx.is_cancel_requested());
1625 assert_eq!(cx.cancel_state(), CancelState::CancelRequested);
1626 }
1627
1628 let err = cx.checkpoint().unwrap_err();
1629 assert_eq!(err.kind(), ErrorKind::Cancelled);
1630 }
1631
1632 #[cfg(feature = "native")]
1633 #[test]
1634 fn test_cx_effective_native_cx_uses_fallback_without_marking_explicit_attachment() {
1635 let cx = Cx::<FullCaps>::with_budget(Budget::INFINITE.with_priority(7));
1636
1637 assert!(cx.attached_native_cx().is_none());
1638 let native = cx.effective_native_cx();
1639 assert!(cx.attached_native_cx().is_none());
1640 assert!(native.checkpoint().is_ok());
1641 }
1642
1643 #[cfg(feature = "native")]
1644 #[test]
1645 fn test_cx_set_native_cx_replaces_fallback_context() {
1646 let cx = Cx::<FullCaps>::new();
1647 let _ = cx.effective_native_cx();
1648
1649 let replacement = NativeCx::for_testing();
1650 cx.set_native_cx(replacement.clone());
1651 replacement.set_cancel_reason(NativeCancelReason::timeout());
1652
1653 let err = cx.checkpoint().unwrap_err();
1654 assert_eq!(err.kind(), ErrorKind::Cancelled);
1655 assert_eq!(cx.cancel_reason(), Some(CancelReason::Timeout));
1656 }
1657
1658 #[cfg(feature = "native")]
1659 #[test]
1660 fn test_create_child_copies_preexisting_cancellation_into_fallback_native_cx() {
1661 let parent = Cx::<FullCaps>::new();
1662 parent.cancel_with_reason(CancelReason::RegionClose);
1663
1664 let child = parent.create_child();
1665 let reason = child
1666 .effective_native_cx()
1667 .cancel_reason()
1668 .expect("fallback native cx should mirror inherited cancellation");
1669 assert_eq!(reason.kind, NativeCancelKind::ParentCancelled);
1670 }
1671
1672 #[cfg(feature = "native")]
1673 #[test]
1674 fn test_create_child_inherits_explicit_native_cx_attachment() {
1675 let parent = Cx::<FullCaps>::new();
1676 let native = NativeCx::for_testing();
1677 parent.set_native_cx(native.clone());
1678
1679 let child = parent.create_child();
1680 assert!(child.attached_native_cx().is_some());
1681
1682 native.set_cancel_reason(NativeCancelReason::timeout());
1683 let err = child
1684 .checkpoint()
1685 .expect_err("child should observe inherited native cancel");
1686 assert_eq!(err.kind(), ErrorKind::Cancelled);
1687 assert_eq!(child.cancel_reason(), Some(CancelReason::Timeout));
1688 }
1689
1690 #[test]
1691 fn test_budget_infinite_is_identity_for_meet() {
1692 let budget = Budget {
1693 deadline: Some(Duration::from_millis(42)),
1694 poll_quota: 500,
1695 cost_quota: Some(1000),
1696 priority: 7,
1697 };
1698 assert_eq!(budget.meet(Budget::INFINITE), budget);
1699 assert_eq!(Budget::INFINITE.meet(budget), budget);
1700 }
1701
1702 #[test]
1703 fn test_budget_none_constraints_propagate() {
1704 let a = Budget {
1705 deadline: None,
1706 poll_quota: u32::MAX,
1707 cost_quota: None,
1708 priority: 0,
1709 };
1710 let b = Budget {
1711 deadline: Some(Duration::from_millis(50)),
1712 poll_quota: 100,
1713 cost_quota: Some(500),
1714 priority: 3,
1715 };
1716 let m = a.meet(b);
1717 assert_eq!(m.deadline, Some(Duration::from_millis(50)));
1718 assert_eq!(m.poll_quota, 100);
1719 assert_eq!(m.cost_quota, Some(500));
1720 assert_eq!(m.priority, 3);
1721 }
1722
1723 #[test]
1724 fn test_cx_scope_budget_chains() {
1725 let cx = Cx::<FullCaps>::with_budget(
1726 Budget::INFINITE
1727 .with_deadline(Duration::from_millis(100))
1728 .with_poll_quota(1000),
1729 );
1730 let s1 = cx.scope_with_budget(Budget::INFINITE.with_deadline(Duration::from_millis(50)));
1732 assert_eq!(s1.budget().deadline, Some(Duration::from_millis(50)));
1733 assert_eq!(s1.budget().poll_quota, 1000);
1734
1735 let s2 = s1.scope_with_budget(Budget::INFINITE.with_poll_quota(200));
1737 assert_eq!(s2.budget().deadline, Some(Duration::from_millis(50)));
1738 assert_eq!(s2.budget().poll_quota, 200);
1739 }
1740
1741 fn collect_rs_files(dir: &Path, out: &mut Vec<PathBuf>) -> std::io::Result<()> {
1742 for entry in std::fs::read_dir(dir)? {
1743 let entry = entry?;
1744 let path = entry.path();
1745 if path.is_dir() {
1746 collect_rs_files(&path, out)?;
1747 } else if path.extension().is_some_and(|ext| ext == "rs") {
1748 out.push(path);
1749 }
1750 }
1751 Ok(())
1752 }
1753
1754 fn scan_file_outside_cfg_test_items(src: &str, patterns: &[&str]) -> Vec<(usize, String)> {
1755 let mut hits = Vec::new();
1756
1757 let mut brace_depth: i32 = 0;
1758 let mut pending_cfg_test = false;
1759 let mut pending_attr_paren_depth: i32 = 0;
1760 let mut skip_until_depth: Option<i32> = None;
1761
1762 for (idx, line) in src.lines().enumerate() {
1763 let trimmed = line.trim_start();
1764 let paren_delta = i32::try_from(line.matches('(').count()).unwrap_or(i32::MAX)
1765 - i32::try_from(line.matches(')').count()).unwrap_or(i32::MAX);
1766
1767 if skip_until_depth.is_none() {
1768 if trimmed.starts_with("#[cfg(test)]") && trimmed.contains('{') {
1770 pending_cfg_test = false;
1771 pending_attr_paren_depth = 0;
1772 skip_until_depth = Some(brace_depth);
1773 } else if trimmed.contains("fn test_") && trimmed.contains('{') {
1774 skip_until_depth = Some(brace_depth);
1775 } else if trimmed.starts_with("#[cfg(test)]") {
1776 pending_cfg_test = true;
1777 pending_attr_paren_depth = 0;
1778 } else if pending_cfg_test {
1779 if trimmed.starts_with("#[") || pending_attr_paren_depth > 0 {
1781 pending_attr_paren_depth =
1782 pending_attr_paren_depth.saturating_add(paren_delta);
1783 } else if trimmed.is_empty() || trimmed.starts_with("//") {
1784 } else if trimmed.contains('{') {
1786 pending_cfg_test = false;
1787 pending_attr_paren_depth = 0;
1788 skip_until_depth = Some(brace_depth);
1789 } else {
1790 pending_cfg_test = false;
1791 pending_attr_paren_depth = 0;
1792 }
1793 } else {
1794 for &pat in patterns {
1795 if line.contains(pat) {
1796 hits.push((idx + 1, pat.to_string()));
1797 }
1798 }
1799 }
1800 }
1801
1802 let opens = i32::try_from(line.matches('{').count()).unwrap_or(i32::MAX);
1804 let closes = i32::try_from(line.matches('}').count()).unwrap_or(i32::MAX);
1805 brace_depth = brace_depth.saturating_add(opens).saturating_sub(closes);
1806
1807 if let Some(until) = skip_until_depth {
1808 if brace_depth <= until {
1809 skip_until_depth = None;
1810 }
1811 }
1812 }
1813
1814 hits
1815 }
1816
1817 #[test]
1818 fn test_scan_file_outside_cfg_test_items_skips_cfg_test_functions_and_modules() {
1819 let src = r"
1820fn production_path() {
1821 let _ = Cx::new();
1822}
1823
1824#[cfg(test)]
1825fn test_only_helper() {
1826 let _ = Cx::new();
1827}
1828
1829#[cfg(test)]
1830mod tests {
1831 fn nested_test_helper() {
1832 let _ = Cx::default();
1833 }
1834}
1835";
1836
1837 let hits = scan_file_outside_cfg_test_items(src, &["Cx::new(", "Cx::default("]);
1838 assert_eq!(hits, vec![(3, "Cx::new(".to_string())]);
1839 }
1840
1841 #[test]
1842 fn test_no_direct_cx_constructors_in_runtime_production_code() {
1843 let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1844 let repo_root = manifest_dir
1845 .parent()
1846 .and_then(Path::parent)
1847 .expect("fsqlite-types manifest dir must be crates/<name>");
1848 let crates_dir = repo_root.join("crates");
1849 let runtime_crates = [
1850 "fsqlite-core",
1851 "fsqlite-vdbe",
1852 "fsqlite-btree",
1853 "fsqlite-pager",
1854 "fsqlite-wal",
1855 "fsqlite-mvcc",
1856 ];
1857 let forbidden = ["Cx::new(", "Cx::default("];
1858
1859 let mut violations: Vec<String> = Vec::new();
1860 let mut crate_dirs: Vec<PathBuf> = Vec::new();
1861 for entry in std::fs::read_dir(&crates_dir).expect("read crates/ dir") {
1862 let entry = entry.expect("read crates/ entry");
1863 let path = entry.path();
1864 if path.is_dir() {
1865 crate_dirs.push(path);
1866 }
1867 }
1868
1869 for crate_dir in crate_dirs {
1870 let crate_name = crate_dir
1871 .file_name()
1872 .and_then(|s| s.to_str())
1873 .unwrap_or("<unknown>");
1874 if !runtime_crates.contains(&crate_name) {
1875 continue;
1876 }
1877
1878 let src_dir = crate_dir.join("src");
1879 if !src_dir.is_dir() {
1880 continue;
1881 }
1882
1883 let mut files = Vec::new();
1884 collect_rs_files(&src_dir, &mut files).expect("collect rs files");
1885
1886 for file in files {
1887 if file
1888 .file_name()
1889 .and_then(|name| name.to_str())
1890 .is_some_and(|name| name.contains("test"))
1891 {
1892 continue;
1893 }
1894
1895 let src = std::fs::read_to_string(&file).expect("read file");
1896 let rel_path = file.strip_prefix(repo_root).unwrap_or(&file);
1897
1898 for (line, pat) in scan_file_outside_cfg_test_items(&src, &forbidden) {
1899 let line_text = src.lines().nth(line - 1).unwrap_or("").trim();
1900 let allowed_detached_root_constructor = rel_path
1901 == Path::new("crates/fsqlite-core/src/connection.rs")
1902 && pat == "Cx::new("
1903 && line_text.contains("Cx::new().with_trace_context(");
1904
1905 if allowed_detached_root_constructor {
1906 continue;
1907 }
1908
1909 violations.push(format!(
1910 "{crate_name}:{path}:{line} uses forbidden `{pat}` outside cfg(test) code: {line_text}",
1911 path = rel_path.display()
1912 ));
1913 }
1914 }
1915 }
1916
1917 assert!(
1918 violations.is_empty(),
1919 "direct `Cx::new()` / `Cx::default()` production-path violations:\n{}",
1920 violations.join("\n")
1921 );
1922 }
1923
1924 #[test]
1925 fn test_ambient_authority_audit_gate() {
1926 let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1929 let repo_root = manifest_dir
1930 .parent()
1931 .and_then(Path::parent)
1932 .expect("fsqlite-types manifest dir must be crates/<name>");
1933 let crates_dir = repo_root.join("crates");
1934
1935 let always_forbidden = [
1937 "SystemTime::now(",
1938 "Instant::now(",
1939 "thread_rng(",
1940 "getrandom",
1941 "std::net::",
1942 "std::thread::spawn",
1943 "tokio::spawn",
1944 ];
1945
1946 let non_vfs_forbidden = ["std::fs::"];
1948
1949 let exempt_crates = [
1960 "fsqlite-harness",
1961 "fsqlite-cli",
1962 "fsqlite-e2e",
1963 "fsqlite-observability",
1964 "fsqlite-core",
1965 "fsqlite-vdbe",
1966 "fsqlite-mvcc",
1967 "fsqlite-parser",
1968 "fsqlite-planner",
1969 "fsqlite-wal",
1970 "fsqlite-vfs",
1971 ];
1972
1973 let mut violations: Vec<String> = Vec::new();
1974 let mut crate_dirs: Vec<PathBuf> = Vec::new();
1975 for entry in std::fs::read_dir(&crates_dir).expect("read crates/ dir") {
1976 let entry = entry.expect("read crates/ entry");
1977 let path = entry.path();
1978 if path.is_dir() {
1979 crate_dirs.push(path);
1980 }
1981 }
1982
1983 for crate_dir in crate_dirs {
1984 let crate_name = crate_dir
1985 .file_name()
1986 .and_then(|s| s.to_str())
1987 .unwrap_or("<unknown>");
1988 if exempt_crates.contains(&crate_name) {
1989 continue;
1990 }
1991 let src_dir = crate_dir.join("src");
1992 if !src_dir.is_dir() {
1993 continue;
1994 }
1995
1996 let mut files = Vec::new();
1997 collect_rs_files(&src_dir, &mut files).expect("collect rs files");
1998
1999 for file in files {
2000 let src = std::fs::read_to_string(&file).expect("read file");
2001 for (line, pat) in scan_file_outside_cfg_test_items(&src, &always_forbidden) {
2002 violations.push(format!(
2003 "{crate_name}:{path}:{line} uses forbidden `{pat}`",
2004 path = file.display()
2005 ));
2006 }
2007
2008 if crate_name != "fsqlite-vfs" {
2009 for (line, pat) in scan_file_outside_cfg_test_items(&src, &non_vfs_forbidden) {
2010 violations.push(format!(
2011 "{crate_name}:{path}:{line} uses forbidden `{pat}` (non-vfs crate)",
2012 path = file.display()
2013 ));
2014 }
2015 }
2016 }
2017 }
2018
2019 assert!(
2020 violations.is_empty(),
2021 "ambient authority violations (outside cfg(test) modules):\n{}",
2022 violations.join("\n")
2023 );
2024 }
2025
2026 const BEAD_ID: &str = "bd-samf";
2031
2032 #[test]
2033 fn test_cancel_state_machine_all_transitions() {
2034 let cx = Cx::<FullCaps>::new();
2036 assert_eq!(
2037 cx.cancel_state(),
2038 CancelState::Created,
2039 "bead_id={BEAD_ID} initial_state"
2040 );
2041
2042 cx.transition_to_running();
2043 assert_eq!(
2044 cx.cancel_state(),
2045 CancelState::Running,
2046 "bead_id={BEAD_ID} after_start"
2047 );
2048
2049 cx.cancel_with_reason(CancelReason::UserInterrupt);
2050 assert_eq!(
2051 cx.cancel_state(),
2052 CancelState::CancelRequested,
2053 "bead_id={BEAD_ID} after_cancel"
2054 );
2055
2056 let err = cx.checkpoint();
2058 assert!(err.is_err(), "bead_id={BEAD_ID} checkpoint_returns_err");
2059 assert_eq!(
2060 cx.cancel_state(),
2061 CancelState::Cancelling,
2062 "bead_id={BEAD_ID} after_checkpoint_observation"
2063 );
2064
2065 cx.transition_to_finalizing();
2066 assert_eq!(
2067 cx.cancel_state(),
2068 CancelState::Finalizing,
2069 "bead_id={BEAD_ID} after_finalize_start"
2070 );
2071
2072 cx.transition_to_completed();
2073 assert_eq!(
2074 cx.cancel_state(),
2075 CancelState::Completed,
2076 "bead_id={BEAD_ID} after_complete"
2077 );
2078 }
2079
2080 #[test]
2081 fn test_cancel_propagates_to_children() {
2082 let parent = Cx::<FullCaps>::new();
2084 parent.transition_to_running();
2085
2086 let child1 = parent.create_child();
2087 child1.transition_to_running();
2088 let child2 = parent.create_child();
2089 child2.transition_to_running();
2090 let child3 = parent.create_child();
2091 child3.transition_to_running();
2092
2093 assert!(!child1.is_cancel_requested());
2094 assert!(!child2.is_cancel_requested());
2095 assert!(!child3.is_cancel_requested());
2096
2097 parent.cancel_with_reason(CancelReason::RegionClose);
2098
2099 assert!(
2101 child1.is_cancel_requested(),
2102 "bead_id={BEAD_ID} child1_cancelled"
2103 );
2104 assert!(
2105 child2.is_cancel_requested(),
2106 "bead_id={BEAD_ID} child2_cancelled"
2107 );
2108 assert!(
2109 child3.is_cancel_requested(),
2110 "bead_id={BEAD_ID} child3_cancelled"
2111 );
2112
2113 assert_eq!(child1.cancel_state(), CancelState::CancelRequested);
2115 assert_eq!(child2.cancel_state(), CancelState::CancelRequested);
2116 assert_eq!(child3.cancel_state(), CancelState::CancelRequested);
2117
2118 assert_eq!(child1.cancel_reason(), Some(CancelReason::RegionClose));
2120 }
2121
2122 #[test]
2123 fn test_dropped_children_are_pruned_from_parent_links() {
2124 let parent = Cx::<FullCaps>::new();
2125
2126 let live_child = parent.create_child();
2127 let dropped_child = parent.create_child();
2128 drop(dropped_child);
2129
2130 parent.cancel_with_reason(CancelReason::RegionClose);
2132
2133 let live_count = {
2134 let children = parent
2135 .inner
2136 .children
2137 .lock()
2138 .unwrap_or_else(std::sync::PoisonError::into_inner);
2139 children.iter().filter_map(Weak::upgrade).count()
2140 };
2141 assert_eq!(live_count, 1, "only the live child should remain linked");
2142 assert!(live_child.is_cancel_requested());
2143 }
2144
2145 #[test]
2146 fn test_cancel_idempotent_strongest_wins() {
2147 let cx = Cx::<FullCaps>::new();
2149 cx.transition_to_running();
2150
2151 cx.cancel_with_reason(CancelReason::Timeout);
2152 assert_eq!(
2153 cx.cancel_reason(),
2154 Some(CancelReason::Timeout),
2155 "bead_id={BEAD_ID} first_reason"
2156 );
2157
2158 cx.cancel_with_reason(CancelReason::Abort);
2160 assert_eq!(
2161 cx.cancel_reason(),
2162 Some(CancelReason::Abort),
2163 "bead_id={BEAD_ID} upgraded_reason"
2164 );
2165
2166 cx.cancel_with_reason(CancelReason::UserInterrupt);
2168 assert_eq!(
2169 cx.cancel_reason(),
2170 Some(CancelReason::Abort),
2171 "bead_id={BEAD_ID} reason_stays_strongest"
2172 );
2173 }
2174
2175 #[test]
2176 fn test_losers_drain_on_race() {
2177 use std::sync::atomic::AtomicBool;
2180
2181 let loser_cx = Cx::<FullCaps>::new();
2182 loser_cx.transition_to_running();
2183
2184 let obligation_resolved = Arc::new(AtomicBool::new(false));
2186 let ob_clone = Arc::clone(&obligation_resolved);
2187
2188 loser_cx.cancel_with_reason(CancelReason::RegionClose);
2190
2191 assert!(loser_cx.checkpoint().is_err());
2193 assert_eq!(loser_cx.cancel_state(), CancelState::Cancelling);
2194
2195 ob_clone.store(true, Ordering::Release);
2197 loser_cx.transition_to_finalizing();
2198 loser_cx.transition_to_completed();
2199
2200 assert!(
2201 obligation_resolved.load(Ordering::Acquire),
2202 "bead_id={BEAD_ID} loser_obligation_resolved"
2203 );
2204 assert_eq!(
2205 loser_cx.cancel_state(),
2206 CancelState::Completed,
2207 "bead_id={BEAD_ID} loser_drained"
2208 );
2209 }
2210
2211 #[test]
2212 fn test_vdbe_checkpoint_cancel_observed_at_next_opcode() {
2213 let cx = Cx::<FullCaps>::new();
2216 cx.transition_to_running();
2217
2218 let mut last_executed = 0u32;
2219 for opcode in 0..100u32 {
2220 if cx.checkpoint_with(format!("vdbe pc={opcode}")).is_err() {
2222 last_executed = opcode;
2223 break;
2224 }
2225 last_executed = opcode;
2227 if opcode == 50 {
2229 cx.cancel_with_reason(CancelReason::UserInterrupt);
2230 }
2231 }
2232
2233 assert_eq!(
2234 last_executed, 51,
2235 "bead_id={BEAD_ID} cancel_observed_at_opcode_51"
2236 );
2237 }
2238
2239 #[test]
2240 fn test_btree_checkpoint_cancel_within_one_node() {
2241 let cx = Cx::<FullCaps>::new();
2244 cx.transition_to_running();
2245
2246 let nodes = ["root", "internal_l", "internal_r", "leaf_a", "leaf_b"];
2247 let cancel_at = 2; let mut observed_at = None;
2249
2250 for (i, node) in nodes.iter().enumerate() {
2251 if cx.checkpoint_with(format!("btree node={node}")).is_err() {
2253 observed_at = Some(i);
2254 break;
2255 }
2256 if i == cancel_at {
2259 cx.cancel_with_reason(CancelReason::UserInterrupt);
2260 }
2261 }
2262
2263 assert_eq!(
2264 observed_at,
2265 Some(cancel_at + 1),
2266 "bead_id={BEAD_ID} btree_cancel_within_one_node"
2267 );
2268 }
2269
2270 #[test]
2271 fn test_masked_section_defers_cancel() {
2272 let cx = Cx::<FullCaps>::new();
2275 cx.transition_to_running();
2276
2277 cx.cancel_with_reason(CancelReason::UserInterrupt);
2278 assert!(cx.is_cancel_requested());
2279
2280 {
2282 let _guard = cx.masked();
2283 assert_eq!(cx.mask_depth(), 1);
2284
2285 assert!(
2287 cx.checkpoint().is_ok(),
2288 "bead_id={BEAD_ID} checkpoint_ok_while_masked"
2289 );
2290
2291 {
2293 let _inner = cx.masked();
2294 assert_eq!(cx.mask_depth(), 2);
2295 assert!(cx.checkpoint().is_ok());
2296 }
2297 assert_eq!(cx.mask_depth(), 1);
2298 }
2299 assert_eq!(cx.mask_depth(), 0);
2300
2301 assert!(
2303 cx.checkpoint().is_err(),
2304 "bead_id={BEAD_ID} checkpoint_err_after_mask_exit"
2305 );
2306 }
2307
2308 #[test]
2309 #[should_panic(expected = "MAX_MASK_DEPTH")]
2310 #[allow(clippy::collection_is_never_read)]
2311 fn test_max_mask_depth_exceeded_panics() {
2312 let cx = Cx::<FullCaps>::new();
2314 let mut guards = Vec::new();
2315 for _ in 0..MAX_MASK_DEPTH {
2316 guards.push(cx.masked());
2317 }
2318 let _overflow = cx.masked();
2320 }
2321
2322 #[test]
2323 fn test_commit_section_completes_under_cancel() {
2324 let cx = Cx::<FullCaps>::new();
2326 cx.transition_to_running();
2327
2328 let ops_completed = Arc::new(AtomicU32::new(0));
2329 let finalizer_ran = Arc::new(AtomicBool::new(false));
2330
2331 let ops = Arc::clone(&ops_completed);
2332 let fin = Arc::clone(&finalizer_ran);
2333
2334 cx.commit_section(
2335 10,
2336 |ctx| {
2337 assert!(ctx.tick());
2339 ops.fetch_add(1, Ordering::Release);
2340
2341 cx.cancel_with_reason(CancelReason::UserInterrupt);
2343
2344 assert!(ctx.tick());
2346 ops.fetch_add(1, Ordering::Release);
2347 assert!(
2348 cx.checkpoint().is_ok(),
2349 "bead_id={BEAD_ID} masked_during_commit"
2350 );
2351
2352 assert!(ctx.tick());
2354 ops.fetch_add(1, Ordering::Release);
2355 },
2356 move || {
2357 fin.store(true, Ordering::Release);
2358 },
2359 );
2360
2361 assert_eq!(
2362 ops_completed.load(Ordering::Acquire),
2363 3,
2364 "bead_id={BEAD_ID} all_ops_completed"
2365 );
2366 assert!(
2367 finalizer_ran.load(Ordering::Acquire),
2368 "bead_id={BEAD_ID} finalizer_ran"
2369 );
2370
2371 assert!(cx.checkpoint().is_err());
2373 }
2374
2375 #[test]
2376 fn test_commit_section_enforces_poll_quota() {
2377 let cx = Cx::<FullCaps>::new();
2379 cx.transition_to_running();
2380
2381 let ticks_succeeded = Arc::new(AtomicU32::new(0));
2382 let ts = Arc::clone(&ticks_succeeded);
2383
2384 cx.commit_section(
2385 3,
2386 |ctx| {
2387 assert_eq!(ctx.poll_remaining(), 3);
2388 for _ in 0..5 {
2389 if ctx.tick() {
2390 ts.fetch_add(1, Ordering::Release);
2391 }
2392 }
2393 },
2394 || {},
2395 );
2396
2397 assert_eq!(
2398 ticks_succeeded.load(Ordering::Acquire),
2399 3,
2400 "bead_id={BEAD_ID} poll_quota_enforced"
2401 );
2402 }
2403
2404 #[test]
2405 fn test_cancel_unaware_hot_loop_detected() {
2406 let cx = Cx::<FullCaps>::new();
2409 cx.transition_to_running();
2410
2411 let deadline = 100u32;
2414 let mut iterations_without_checkpoint = 0u32;
2415 let mut detected_unaware = false;
2416
2417 cx.cancel_with_reason(CancelReason::UserInterrupt);
2418
2419 for _i in 0..200u32 {
2420 iterations_without_checkpoint += 1;
2421 if iterations_without_checkpoint >= deadline {
2422 detected_unaware = true;
2423 break;
2424 }
2425 }
2427
2428 assert!(
2429 detected_unaware,
2430 "bead_id={BEAD_ID} cancel_unaware_loop_detected"
2431 );
2432
2433 let cx2 = Cx::<FullCaps>::new();
2435 cx2.transition_to_running();
2436 cx2.cancel_with_reason(CancelReason::UserInterrupt);
2437 let mut compliant_iters = 0u32;
2438 for _ in 0..200u32 {
2439 if cx2.checkpoint().is_err() {
2440 break;
2441 }
2442 compliant_iters += 1;
2443 }
2444 assert_eq!(
2445 compliant_iters, 0,
2446 "bead_id={BEAD_ID} compliant_loop_exits_immediately"
2447 );
2448 }
2449
2450 #[test]
2451 fn test_write_coordinator_commit_section() {
2452 let cx = Cx::<FullCaps>::new();
2455 cx.transition_to_running();
2456
2457 let proof_published = Arc::new(AtomicBool::new(false));
2458 let marker_published = Arc::new(AtomicBool::new(false));
2459 let reservation_released = Arc::new(AtomicBool::new(false));
2460
2461 let proof = Arc::clone(&proof_published);
2462 let marker = Arc::clone(&marker_published);
2463 let release = Arc::clone(&reservation_released);
2464
2465 cx.commit_section(
2466 10,
2467 |ctx| {
2468 assert!(ctx.tick());
2470
2471 cx.cancel_with_reason(CancelReason::RegionClose);
2473
2474 assert!(ctx.tick());
2476 proof.store(true, Ordering::Release);
2477 assert!(cx.checkpoint().is_ok());
2479
2480 assert!(ctx.tick());
2482 marker.store(true, Ordering::Release);
2483 },
2484 move || {
2485 release.store(true, Ordering::Release);
2487 },
2488 );
2489
2490 assert!(
2491 proof_published.load(Ordering::Acquire),
2492 "bead_id={BEAD_ID} proof_published"
2493 );
2494 assert!(
2495 marker_published.load(Ordering::Acquire),
2496 "bead_id={BEAD_ID} marker_published"
2497 );
2498 assert!(
2499 reservation_released.load(Ordering::Acquire),
2500 "bead_id={BEAD_ID} reservation_released"
2501 );
2502
2503 assert!(cx.checkpoint().is_err());
2505 }
2506
2507 #[test]
2512 fn test_trace_ids_default_to_zero() {
2513 let cx = Cx::<FullCaps>::new();
2514 assert_eq!(cx.trace_id(), 0);
2515 assert_eq!(cx.decision_id(), 0);
2516 assert_eq!(cx.policy_id(), 0);
2517 }
2518
2519 #[test]
2520 fn test_with_trace_context_sets_all_ids() {
2521 let cx = Cx::<FullCaps>::new().with_trace_context(42, 99, 7);
2522 assert_eq!(cx.trace_id(), 42);
2523 assert_eq!(cx.decision_id(), 99);
2524 assert_eq!(cx.policy_id(), 7);
2525 }
2526
2527 #[test]
2528 fn test_with_decision_id_preserves_other_ids() {
2529 let cx = Cx::<FullCaps>::new()
2530 .with_trace_context(10, 20, 30)
2531 .with_decision_id(55);
2532 assert_eq!(cx.trace_id(), 10);
2533 assert_eq!(cx.decision_id(), 55);
2534 assert_eq!(cx.policy_id(), 30);
2535 }
2536
2537 #[test]
2538 fn test_with_policy_id_preserves_other_ids() {
2539 let cx = Cx::<FullCaps>::new()
2540 .with_trace_context(100, 200, 300)
2541 .with_policy_id(88);
2542 assert_eq!(cx.trace_id(), 100);
2543 assert_eq!(cx.decision_id(), 200);
2544 assert_eq!(cx.policy_id(), 88);
2545 }
2546
2547 #[test]
2548 #[allow(clippy::redundant_clone)]
2549 fn test_clone_propagates_trace_ids() {
2550 let cx = Cx::<FullCaps>::new().with_trace_context(1, 2, 3);
2551 let cloned = cx.clone();
2552 assert_eq!(cloned.trace_id(), 1);
2553 assert_eq!(cloned.decision_id(), 2);
2554 assert_eq!(cloned.policy_id(), 3);
2555 }
2556
2557 #[test]
2558 fn test_restrict_propagates_trace_ids() {
2559 let cx = Cx::<FullCaps>::new();
2560 let compute = cx.restrict::<ComputeCaps>();
2561 assert_eq!(compute.trace_id(), 0);
2562 assert_eq!(compute.decision_id(), 0);
2563 assert_eq!(compute.policy_id(), 0);
2564 }
2565
2566 #[test]
2567 fn test_scope_with_budget_propagates_trace_ids() {
2568 let cx = Cx::<FullCaps>::new().with_trace_context(5, 6, 7);
2569 let scoped = cx.scope_with_budget(Budget::MINIMAL);
2570 assert_eq!(scoped.trace_id(), 5);
2571 assert_eq!(scoped.decision_id(), 6);
2572 assert_eq!(scoped.policy_id(), 7);
2573 assert_eq!(scoped.budget().poll_quota, Budget::MINIMAL.poll_quota);
2575 }
2576
2577 #[test]
2578 fn test_cleanup_scope_propagates_trace_ids() {
2579 let cx = Cx::<FullCaps>::new().with_trace_context(11, 22, 33);
2580 let cleanup = cx.cleanup_scope();
2581 assert_eq!(cleanup.trace_id(), 11);
2582 assert_eq!(cleanup.decision_id(), 22);
2583 assert_eq!(cleanup.policy_id(), 33);
2584 }
2585
2586 #[test]
2587 fn test_create_child_propagates_trace_ids() {
2588 let parent = Cx::<FullCaps>::new().with_trace_context(50, 60, 70);
2589 let child = parent.create_child();
2590 assert_eq!(child.trace_id(), 50);
2591 assert_eq!(child.decision_id(), 60);
2592 assert_eq!(child.policy_id(), 70);
2593 parent.cancel();
2595 assert!(parent.is_cancel_requested());
2596 assert!(child.is_cancel_requested()); }
2598
2599 #[test]
2600 fn test_trace_ids_independent_across_children() {
2601 let parent = Cx::<FullCaps>::new().with_trace_context(1, 2, 3);
2602 let child1 = parent.create_child().with_decision_id(100);
2603 let child2 = parent.create_child().with_decision_id(200);
2604 assert_eq!(child1.trace_id(), 1);
2606 assert_eq!(child2.trace_id(), 1);
2607 assert_eq!(child1.decision_id(), 100);
2608 assert_eq!(child2.decision_id(), 200);
2609 assert_eq!(parent.decision_id(), 2);
2611 }
2612
2613 #[test]
2614 fn test_with_budget_starts_at_zero_trace_ids() {
2615 let cx = Cx::<FullCaps>::with_budget(Budget::MINIMAL);
2616 assert_eq!(cx.trace_id(), 0);
2617 assert_eq!(cx.decision_id(), 0);
2618 assert_eq!(cx.policy_id(), 0);
2619 }
2620}