Skip to main content

fsqlite_types/
cx.rs

1//! Capability context (`Cx`) for FrankenSQLite.
2//!
3//! This is a **capability-passing style** context object that:
4//! - threads cancellation checks (`checkpoint`) through long-running operations
5//! - carries a [`Budget`] for deadline/priority propagation
6//! - encodes available effects (spawn/time/random/io/remote) in the type system
7//!   via [`cap::CapSet`], so widening is a **compile-time error**.
8//!
9//! # Compile-time capability narrowing
10//!
11//! Narrowing always succeeds:
12//! ```
13//! use fsqlite_types::cx::{cap, Cx};
14//!
15//! let cx = Cx::<cap::All>::new();
16//! let _compute = cx.restrict::<cap::None>();
17//! ```
18//!
19//! Widening is rejected at compile time:
20//! ```compile_fail
21//! use fsqlite_types::cx::{cap, Cx};
22//!
23//! let cx = Cx::<cap::All>::new();
24//! let compute = cx.restrict::<cap::None>();
25//! let _nope = compute.restrict::<cap::All>();
26//! ```
27//
28
29use std::marker::PhantomData;
30use std::sync::atomic::{AtomicBool, AtomicU32, AtomicU64, Ordering};
31use std::sync::{Arc, Mutex, Weak};
32use std::time::Duration;
33
34#[cfg(feature = "native")]
35use asupersync::types::Time as NativeTime;
36#[cfg(feature = "native")]
37use asupersync::types::{CancelKind as NativeCancelKind, CancelReason as NativeCancelReason};
38#[cfg(feature = "native")]
39use asupersync::{Budget as NativeBudget, Cx as NativeCx};
40
41#[cfg(not(feature = "native"))]
42mod native_cx_shim {
43    use std::sync::atomic::{AtomicBool, Ordering};
44    use std::sync::{Arc, Mutex};
45
46    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
47    pub enum NativeCancelKind {
48        User,
49        Timeout,
50        Deadline,
51        PollQuota,
52        CostBudget,
53        FailFast,
54        RaceLost,
55        ParentCancelled,
56        Shutdown,
57        LinkedExit,
58        ResourceUnavailable,
59    }
60
61    #[derive(Debug, Clone, PartialEq, Eq)]
62    pub struct NativeCancelReason {
63        pub kind: NativeCancelKind,
64    }
65
66    impl NativeCancelReason {
67        #[must_use]
68        pub const fn timeout() -> Self {
69            Self {
70                kind: NativeCancelKind::Timeout,
71            }
72        }
73
74        #[must_use]
75        pub fn user(_message: impl Into<String>) -> Self {
76            Self {
77                kind: NativeCancelKind::User,
78            }
79        }
80
81        #[must_use]
82        pub const fn parent_cancelled() -> Self {
83            Self {
84                kind: NativeCancelKind::ParentCancelled,
85            }
86        }
87
88        #[must_use]
89        pub const fn resource_unavailable() -> Self {
90            Self {
91                kind: NativeCancelKind::ResourceUnavailable,
92            }
93        }
94    }
95
96    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
97    pub struct NativeCheckpointError;
98
99    #[derive(Debug, Default)]
100    struct NativeCxInner {
101        cancel_requested: AtomicBool,
102        cancel_reason: Mutex<Option<NativeCancelReason>>,
103    }
104
105    #[derive(Debug, Clone, Default)]
106    pub struct NativeCx {
107        inner: Arc<NativeCxInner>,
108    }
109
110    impl NativeCx {
111        #[must_use]
112        pub fn for_testing() -> Self {
113            Self::default()
114        }
115
116        pub fn set_cancel_requested(&self, requested: bool) {
117            self.inner
118                .cancel_requested
119                .store(requested, Ordering::Release);
120            if !requested {
121                *self
122                    .inner
123                    .cancel_reason
124                    .lock()
125                    .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
126            }
127        }
128
129        pub fn set_cancel_reason(&self, reason: NativeCancelReason) {
130            *self
131                .inner
132                .cancel_reason
133                .lock()
134                .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(reason);
135            self.inner.cancel_requested.store(true, Ordering::Release);
136        }
137
138        #[must_use]
139        pub fn is_cancel_requested(&self) -> bool {
140            self.inner.cancel_requested.load(Ordering::Acquire)
141        }
142
143        #[must_use]
144        pub fn cancel_reason(&self) -> Option<NativeCancelReason> {
145            self.inner
146                .cancel_reason
147                .lock()
148                .unwrap_or_else(std::sync::PoisonError::into_inner)
149                .clone()
150        }
151
152        pub fn checkpoint(&self) -> std::result::Result<(), NativeCheckpointError> {
153            if self.is_cancel_requested() {
154                Err(NativeCheckpointError)
155            } else {
156                Ok(())
157            }
158        }
159    }
160}
161
162#[cfg(not(feature = "native"))]
163use native_cx_shim::NativeCx;
164
165use crate::eprocess::{EProcessDecision, EProcessOracle, EProcessSnapshot};
166
167/// SQLite error code for `SQLITE_INTERRUPT`.
168pub const SQLITE_INTERRUPT: i32 = 9;
169
170/// Maximum nesting depth for masked cancellation sections (INV-MASK-BOUNDED).
171///
172/// Exceeding this limit panics in lab mode and emits a fatal diagnostic in production.
173pub const MAX_MASK_DEPTH: u32 = 64;
174
175// ---------------------------------------------------------------------------
176// §4.12 Cancellation State Machine
177// ---------------------------------------------------------------------------
178
179/// Observable state of a task's cancellation lifecycle (asupersync oracle model).
180///
181/// ```text
182/// Created → Running → CancelRequested → Cancelling → Finalizing → Completed
183/// ```
184#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
185pub enum CancelState {
186    Created,
187    Running,
188    CancelRequested,
189    Cancelling,
190    Finalizing,
191    Completed,
192}
193
194/// Reason for cancellation, ordered from weakest to strongest.
195///
196/// INV-CANCEL-IDEMPOTENT: multiple cancel requests are monotone — the strongest
197/// reason wins and the reason can never get weaker.
198#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
199pub enum CancelReason {
200    Timeout = 0,
201    UserInterrupt = 1,
202    RegionClose = 2,
203    Abort = 3,
204}
205
206/// Capability set definitions and subset reasoning.
207pub mod cap {
208    mod sealed {
209        pub trait Sealed {}
210
211        pub struct Bit<const V: bool>;
212
213        pub trait Le {}
214        impl Le for (Bit<false>, Bit<false>) {}
215        impl Le for (Bit<false>, Bit<true>) {}
216        impl Le for (Bit<true>, Bit<true>) {}
217    }
218
219    /// Type-level capability set: `[SPAWN, TIME, RANDOM, IO, REMOTE]`.
220    #[derive(Debug, Clone, Copy, Default)]
221    pub struct CapSet<
222        const SPAWN: bool,
223        const TIME: bool,
224        const RANDOM: bool,
225        const IO: bool,
226        const REMOTE: bool,
227    >;
228
229    impl<
230        const SPAWN: bool,
231        const TIME: bool,
232        const RANDOM: bool,
233        const IO: bool,
234        const REMOTE: bool,
235    > sealed::Sealed for CapSet<SPAWN, TIME, RANDOM, IO, REMOTE>
236    {
237    }
238
239    /// Full capability set.
240    pub type All = CapSet<true, true, true, true, true>;
241    /// No capabilities.
242    pub type None = CapSet<false, false, false, false, false>;
243
244    /// Type-level subset relation.
245    ///
246    /// Encodes pointwise ordering on capability bits: `false <= false`, `false <= true`,
247    /// `true <= true`. The missing impl `(true <= false)` forbids widening.
248    pub trait SubsetOf<Super>: sealed::Sealed {}
249
250    impl<
251        const S_SPAWN: bool,
252        const S_TIME: bool,
253        const S_RANDOM: bool,
254        const S_IO: bool,
255        const S_REMOTE: bool,
256        const P_SPAWN: bool,
257        const P_TIME: bool,
258        const P_RANDOM: bool,
259        const P_IO: bool,
260        const P_REMOTE: bool,
261    > SubsetOf<CapSet<P_SPAWN, P_TIME, P_RANDOM, P_IO, P_REMOTE>>
262        for CapSet<S_SPAWN, S_TIME, S_RANDOM, S_IO, S_REMOTE>
263    where
264        (sealed::Bit<S_SPAWN>, sealed::Bit<P_SPAWN>): sealed::Le,
265        (sealed::Bit<S_TIME>, sealed::Bit<P_TIME>): sealed::Le,
266        (sealed::Bit<S_RANDOM>, sealed::Bit<P_RANDOM>): sealed::Le,
267        (sealed::Bit<S_IO>, sealed::Bit<P_IO>): sealed::Le,
268        (sealed::Bit<S_REMOTE>, sealed::Bit<P_REMOTE>): sealed::Le,
269    {
270    }
271
272    pub trait HasSpawn: sealed::Sealed {}
273    impl<const TIME: bool, const RANDOM: bool, const IO: bool, const REMOTE: bool> HasSpawn
274        for CapSet<true, TIME, RANDOM, IO, REMOTE>
275    {
276    }
277
278    pub trait HasTime: sealed::Sealed {}
279    impl<const SPAWN: bool, const RANDOM: bool, const IO: bool, const REMOTE: bool> HasTime
280        for CapSet<SPAWN, true, RANDOM, IO, REMOTE>
281    {
282    }
283
284    pub trait HasRandom: sealed::Sealed {}
285    impl<const SPAWN: bool, const TIME: bool, const IO: bool, const REMOTE: bool> HasRandom
286        for CapSet<SPAWN, TIME, true, IO, REMOTE>
287    {
288    }
289
290    pub trait HasIo: sealed::Sealed {}
291    impl<const SPAWN: bool, const TIME: bool, const RANDOM: bool, const REMOTE: bool> HasIo
292        for CapSet<SPAWN, TIME, RANDOM, true, REMOTE>
293    {
294    }
295
296    pub trait HasRemote: sealed::Sealed {}
297    impl<const SPAWN: bool, const TIME: bool, const RANDOM: bool, const IO: bool> HasRemote
298        for CapSet<SPAWN, TIME, RANDOM, IO, true>
299    {
300    }
301}
302
303/// Connection-level capabilities: everything enabled.
304pub type FullCaps = cap::All;
305/// Storage-layer capabilities: time + I/O only.
306pub type StorageCaps = cap::CapSet<false, true, false, true, false>;
307/// Pure computation capabilities: no I/O, no time, no randomness.
308pub type ComputeCaps = cap::None;
309
310/// A budget for cancellation/deadline/priority propagation.
311///
312/// This is a product lattice with mixed meet/join semantics:
313/// - resource constraints tighten by `min` (deadline/poll/cost)
314/// - priority propagates by `max`
315#[derive(Debug, Clone, Copy, PartialEq, Eq)]
316pub struct Budget {
317    pub deadline: Option<Duration>,
318    pub poll_quota: u32,
319    pub cost_quota: Option<u64>,
320    pub priority: u8,
321}
322
323impl Budget {
324    /// No constraints (identity for [`Self::meet`]).
325    pub const INFINITE: Self = Self {
326        deadline: None,
327        poll_quota: u32::MAX,
328        cost_quota: None,
329        priority: 0,
330    };
331
332    /// Minimal budget for cleanup/finalizers.
333    pub const MINIMAL: Self = Self {
334        deadline: None,
335        poll_quota: 100,
336        cost_quota: None,
337        priority: 0,
338    };
339
340    #[must_use]
341    pub const fn with_deadline(self, deadline: Duration) -> Self {
342        Self {
343            deadline: Some(deadline),
344            ..self
345        }
346    }
347
348    #[must_use]
349    pub const fn with_priority(self, priority: u8) -> Self {
350        Self { priority, ..self }
351    }
352
353    #[must_use]
354    pub const fn with_poll_quota(self, poll_quota: u32) -> Self {
355        Self { poll_quota, ..self }
356    }
357
358    #[must_use]
359    pub const fn with_cost_quota(self, cost_quota: u64) -> Self {
360        Self {
361            cost_quota: Some(cost_quota),
362            ..self
363        }
364    }
365
366    /// Meet (tighten) two budgets.
367    #[must_use]
368    pub fn meet(self, other: Self) -> Self {
369        Self {
370            deadline: match (self.deadline, other.deadline) {
371                (Some(a), Some(b)) => Some(a.min(b)),
372                (Some(a), None) => Some(a),
373                (None, Some(b)) => Some(b),
374                (None, None) => None,
375            },
376            poll_quota: self.poll_quota.min(other.poll_quota),
377            cost_quota: match (self.cost_quota, other.cost_quota) {
378                (Some(a), Some(b)) => Some(a.min(b)),
379                (Some(a), None) => Some(a),
380                (None, Some(b)) => Some(b),
381                (None, None) => None,
382            },
383            priority: self.priority.max(other.priority),
384        }
385    }
386}
387
388#[derive(Debug, Clone, Copy, PartialEq, Eq)]
389pub enum ErrorKind {
390    Cancelled,
391}
392
393#[derive(Debug, Clone, PartialEq, Eq)]
394pub struct Error {
395    kind: ErrorKind,
396}
397
398impl std::fmt::Display for Error {
399    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
400        match self.kind {
401            ErrorKind::Cancelled => write!(f, "operation cancelled"),
402        }
403    }
404}
405
406impl std::error::Error for Error {}
407
408impl Error {
409    #[must_use]
410    pub const fn cancelled() -> Self {
411        Self {
412            kind: ErrorKind::Cancelled,
413        }
414    }
415
416    #[must_use]
417    pub const fn kind(&self) -> ErrorKind {
418        self.kind
419    }
420
421    #[must_use]
422    pub const fn sqlite_error_code(&self) -> i32 {
423        match self.kind {
424            ErrorKind::Cancelled => SQLITE_INTERRUPT,
425        }
426    }
427}
428
429pub type Result<T, E = Error> = std::result::Result<T, E>;
430
431#[derive(Debug)]
432struct CxInner {
433    cancel_requested: AtomicBool,
434    cancel_state: Mutex<CancelState>,
435    cancel_reason: Mutex<Option<CancelReason>>,
436    mask_depth: AtomicU32,
437    children: Mutex<Vec<Weak<Self>>>,
438    last_checkpoint_msg: Mutex<Option<String>>,
439    last_eprocess_decision: Mutex<Option<EProcessDecision>>,
440    eprocess_oracle: std::sync::OnceLock<Arc<EProcessOracle>>,
441    #[cfg(feature = "native")]
442    attached_native_cx: Mutex<Option<NativeCx>>,
443    #[cfg(feature = "native")]
444    fallback_native_cx: std::sync::OnceLock<NativeCx>,
445    // Deterministic clock: milliseconds since epoch for tests.
446    unix_millis: AtomicU64,
447}
448
449impl CxInner {
450    fn new() -> Self {
451        Self {
452            cancel_requested: AtomicBool::new(false),
453            cancel_state: Mutex::new(CancelState::Created),
454            cancel_reason: Mutex::new(None),
455            mask_depth: AtomicU32::new(0),
456            children: Mutex::new(Vec::new()),
457            last_checkpoint_msg: Mutex::new(None),
458            last_eprocess_decision: Mutex::new(None),
459            eprocess_oracle: std::sync::OnceLock::new(),
460            #[cfg(feature = "native")]
461            attached_native_cx: Mutex::new(None),
462            #[cfg(feature = "native")]
463            fallback_native_cx: std::sync::OnceLock::new(),
464            unix_millis: AtomicU64::new(0),
465        }
466    }
467}
468
469#[cfg(feature = "native")]
470#[must_use]
471fn local_reason_to_native(reason: CancelReason) -> NativeCancelReason {
472    match reason {
473        CancelReason::Timeout => NativeCancelReason::timeout(),
474        CancelReason::UserInterrupt => NativeCancelReason::user("sqlite interrupt"),
475        CancelReason::RegionClose => NativeCancelReason::parent_cancelled(),
476        CancelReason::Abort => NativeCancelReason::resource_unavailable(),
477    }
478}
479
480#[cfg(feature = "native")]
481#[must_use]
482fn native_reason_to_local(reason: &NativeCancelReason) -> CancelReason {
483    match reason.kind {
484        NativeCancelKind::User => CancelReason::UserInterrupt,
485        NativeCancelKind::Timeout
486        | NativeCancelKind::Deadline
487        | NativeCancelKind::PollQuota
488        | NativeCancelKind::CostBudget => CancelReason::Timeout,
489        NativeCancelKind::FailFast
490        | NativeCancelKind::RaceLost
491        | NativeCancelKind::ParentCancelled
492        | NativeCancelKind::Shutdown
493        | NativeCancelKind::LinkedExit => CancelReason::RegionClose,
494        NativeCancelKind::ResourceUnavailable => CancelReason::Abort,
495    }
496}
497
498#[cfg(feature = "native")]
499fn sync_native_cx_cancel(inner: &CxInner, reason: CancelReason) {
500    let attached_native = inner
501        .attached_native_cx
502        .lock()
503        .unwrap_or_else(std::sync::PoisonError::into_inner)
504        .as_ref()
505        .cloned();
506    if let Some(native) = attached_native {
507        native.set_cancel_reason(local_reason_to_native(reason));
508    }
509    if let Some(native) = inner.fallback_native_cx.get() {
510        native.set_cancel_reason(local_reason_to_native(reason));
511    }
512}
513
514#[cfg(feature = "native")]
515#[must_use]
516fn native_budget_from_local(budget: Budget) -> NativeBudget {
517    let mut native_budget = NativeBudget::new()
518        .with_poll_quota(budget.poll_quota)
519        .with_priority(budget.priority);
520    if let Some(cost_quota) = budget.cost_quota {
521        native_budget = native_budget.with_cost_quota(cost_quota);
522    }
523    if let Some(deadline) = budget.deadline {
524        native_budget = native_budget.with_deadline(local_deadline_to_native_time(deadline));
525    }
526    native_budget
527}
528
529#[cfg(feature = "native")]
530#[must_use]
531fn wall_clock_now_since_epoch() -> Duration {
532    std::time::SystemTime::now()
533        .duration_since(std::time::UNIX_EPOCH)
534        .unwrap_or(Duration::ZERO)
535}
536
537#[cfg(feature = "native")]
538#[must_use]
539fn local_deadline_to_native_time(deadline: Duration) -> NativeTime {
540    let absolute_deadline = wall_clock_now_since_epoch()
541        .checked_add(deadline)
542        .unwrap_or(Duration::MAX);
543    let nanos = u64::try_from(absolute_deadline.as_nanos()).unwrap_or(u64::MAX);
544    NativeTime::from_nanos(nanos)
545}
546
547/// Propagate cancellation to a `CxInner` node and all its descendants.
548///
549/// We release each node's lock before recursing into children to avoid
550/// lock-ordering issues.
551fn propagate_cancel(inner: &CxInner, reason: CancelReason) {
552    // Set atomic flag (fast-path for checkpoint).
553    inner.cancel_requested.store(true, Ordering::Release);
554
555    // Monotone reason update.
556    {
557        let mut r = inner
558            .cancel_reason
559            .lock()
560            .unwrap_or_else(std::sync::PoisonError::into_inner);
561        match *r {
562            Some(existing) if existing >= reason => {}
563            _ => *r = Some(reason),
564        }
565    }
566
567    // State transition: Created/Running → CancelRequested.
568    {
569        let mut state = inner
570            .cancel_state
571            .lock()
572            .unwrap_or_else(std::sync::PoisonError::into_inner);
573        if matches!(*state, CancelState::Created | CancelState::Running) {
574            *state = CancelState::CancelRequested;
575        }
576    }
577
578    // Keep attached native asupersync context in sync so downstream combinators
579    // observe equivalent cancellation semantics.
580    #[cfg(feature = "native")]
581    sync_native_cx_cancel(inner, reason);
582
583    // Collect children (release lock before recursing).
584    let children: Vec<Arc<CxInner>> = {
585        let mut guard = inner
586            .children
587            .lock()
588            .unwrap_or_else(std::sync::PoisonError::into_inner);
589        guard.retain(|child| child.strong_count() > 0);
590        guard.iter().filter_map(Weak::upgrade).collect()
591    };
592    for child in &children {
593        propagate_cancel(child, reason);
594    }
595}
596
597/// Capability context passed through all effectful operations.
598///
599/// Carries tracing identifiers (`trace_id`, `decision_id`, `policy_id`) that
600/// propagate through all context derivations (clone, restrict, scope, child).
601/// A value of `0` means "unset / not assigned".
602#[derive(Debug)]
603pub struct Cx<Caps: cap::SubsetOf<cap::All> = FullCaps> {
604    inner: Arc<CxInner>,
605    budget: Budget,
606    trace_id: u64,
607    decision_id: u64,
608    policy_id: u64,
609    // fn() -> Caps ensures Send+Sync regardless of Caps marker type.
610    _caps: PhantomData<fn() -> Caps>,
611}
612
613impl<Caps: cap::SubsetOf<cap::All>> Clone for Cx<Caps> {
614    fn clone(&self) -> Self {
615        Self {
616            inner: Arc::clone(&self.inner),
617            budget: self.budget,
618            trace_id: self.trace_id,
619            decision_id: self.decision_id,
620            policy_id: self.policy_id,
621            _caps: PhantomData,
622        }
623    }
624}
625
626impl Default for Cx<FullCaps> {
627    fn default() -> Self {
628        Self::new()
629    }
630}
631
632impl Cx<FullCaps> {
633    #[must_use]
634    pub fn new() -> Self {
635        Self::with_budget(Budget::INFINITE)
636    }
637}
638
639impl<Caps: cap::SubsetOf<cap::All>> Cx<Caps> {
640    #[cfg(feature = "native")]
641    #[must_use]
642    fn effective_native_cx(&self) -> NativeCx {
643        let attached_native = self
644            .inner
645            .attached_native_cx
646            .lock()
647            .unwrap_or_else(std::sync::PoisonError::into_inner)
648            .as_ref()
649            .cloned();
650        if let Some(native) = attached_native {
651            return native;
652        }
653
654        self.inner
655            .fallback_native_cx
656            .get_or_init(|| {
657                let native =
658                    NativeCx::for_request_with_budget(native_budget_from_local(self.budget));
659                if let Some(reason) = self.cancel_reason() {
660                    native.set_cancel_reason(local_reason_to_native(reason));
661                } else if self.is_cancel_requested() {
662                    native.set_cancel_requested(true);
663                }
664                native
665            })
666            .clone()
667    }
668
669    #[must_use]
670    pub fn with_budget(budget: Budget) -> Self {
671        Self {
672            inner: Arc::new(CxInner::new()),
673            budget,
674            trace_id: 0,
675            decision_id: 0,
676            policy_id: 0,
677            _caps: PhantomData,
678        }
679    }
680
681    #[must_use]
682    pub fn budget(&self) -> Budget {
683        self.budget
684    }
685
686    // -----------------------------------------------------------------------
687    // Tracing IDs (§4 Cx capability context threading)
688    // -----------------------------------------------------------------------
689
690    /// The trace ID for this context (0 = unset).
691    #[must_use]
692    pub fn trace_id(&self) -> u64 {
693        self.trace_id
694    }
695
696    /// The decision ID for this context (0 = unset).
697    #[must_use]
698    pub fn decision_id(&self) -> u64 {
699        self.decision_id
700    }
701
702    /// The policy ID for this context (0 = unset).
703    #[must_use]
704    pub fn policy_id(&self) -> u64 {
705        self.policy_id
706    }
707
708    /// Set all three tracing identifiers at once.
709    ///
710    /// Typically called once when a connection or request is initialized.
711    #[must_use]
712    pub fn with_trace_context(mut self, trace_id: u64, decision_id: u64, policy_id: u64) -> Self {
713        self.trace_id = trace_id;
714        self.decision_id = decision_id;
715        self.policy_id = policy_id;
716        self
717    }
718
719    /// Return a new context with only the `decision_id` changed.
720    ///
721    /// Used when starting a new operation within the same trace.
722    #[must_use]
723    pub fn with_decision_id(mut self, decision_id: u64) -> Self {
724        self.decision_id = decision_id;
725        self
726    }
727
728    /// Return a new context with only the `policy_id` changed.
729    #[must_use]
730    pub fn with_policy_id(mut self, policy_id: u64) -> Self {
731        self.policy_id = policy_id;
732        self
733    }
734
735    /// Returns a view of this context with a tighter effective budget.
736    ///
737    /// The effective budget is computed as `self.budget.meet(child)`, so the
738    /// child cannot loosen its parent's constraints.
739    /// Tracing IDs propagate unchanged.
740    #[must_use]
741    pub fn scope_with_budget(&self, child: Budget) -> Self {
742        Self {
743            inner: Arc::clone(&self.inner),
744            budget: self.budget.meet(child),
745            trace_id: self.trace_id,
746            decision_id: self.decision_id,
747            policy_id: self.policy_id,
748            _caps: PhantomData,
749        }
750    }
751
752    /// Returns a cleanup scope that uses [`Budget::MINIMAL`].
753    #[must_use]
754    pub fn cleanup_scope(&self) -> Self {
755        self.scope_with_budget(Budget::MINIMAL)
756    }
757
758    /// Re-type this context to a narrower capability set.
759    ///
760    /// This is zero-cost at runtime and shares cancellation state.
761    #[must_use]
762    pub fn restrict<NewCaps>(&self) -> Cx<NewCaps>
763    where
764        NewCaps: cap::SubsetOf<cap::All> + cap::SubsetOf<Caps>,
765    {
766        self.retype()
767    }
768
769    /// Internal re-typing helper without subset enforcement.
770    #[must_use]
771    fn retype<NewCaps>(&self) -> Cx<NewCaps>
772    where
773        NewCaps: cap::SubsetOf<cap::All>,
774    {
775        Cx {
776            inner: Arc::clone(&self.inner),
777            budget: self.budget,
778            trace_id: self.trace_id,
779            decision_id: self.decision_id,
780            policy_id: self.policy_id,
781            _caps: PhantomData,
782        }
783    }
784
785    // -----------------------------------------------------------------------
786    // Cancellation state machine (§4.12)
787    // -----------------------------------------------------------------------
788
789    #[must_use]
790    pub fn is_cancel_requested(&self) -> bool {
791        self.inner.cancel_requested.load(Ordering::Acquire)
792    }
793
794    /// Request cancellation with the default reason (`UserInterrupt`).
795    ///
796    /// Propagates to all child contexts per INV-CANCEL-PROPAGATES.
797    pub fn cancel(&self) {
798        self.cancel_with_reason(CancelReason::UserInterrupt);
799    }
800
801    /// Request cancellation with an explicit reason.
802    ///
803    /// INV-CANCEL-IDEMPOTENT: the strongest reason wins; weaker reasons are
804    /// ignored once a stronger one has been set.
805    ///
806    /// INV-CANCEL-PROPAGATES: cancellation propagates to all descendants.
807    pub fn cancel_with_reason(&self, reason: CancelReason) {
808        propagate_cancel(&self.inner, reason);
809    }
810
811    /// Current state in the cancellation lifecycle.
812    #[must_use]
813    pub fn cancel_state(&self) -> CancelState {
814        *self
815            .inner
816            .cancel_state
817            .lock()
818            .unwrap_or_else(std::sync::PoisonError::into_inner)
819    }
820
821    /// The strongest cancellation reason set so far, if any.
822    #[must_use]
823    pub fn cancel_reason(&self) -> Option<CancelReason> {
824        *self
825            .inner
826            .cancel_reason
827            .lock()
828            .unwrap_or_else(std::sync::PoisonError::into_inner)
829    }
830
831    /// Transition from `Created` to `Running`.
832    pub fn transition_to_running(&self) {
833        let mut state = self
834            .inner
835            .cancel_state
836            .lock()
837            .unwrap_or_else(std::sync::PoisonError::into_inner);
838        if *state == CancelState::Created {
839            *state = CancelState::Running;
840        }
841    }
842
843    /// Transition from `Cancelling` to `Finalizing`.
844    pub fn transition_to_finalizing(&self) {
845        let mut state = self
846            .inner
847            .cancel_state
848            .lock()
849            .unwrap_or_else(std::sync::PoisonError::into_inner);
850        if *state == CancelState::Cancelling {
851            *state = CancelState::Finalizing;
852        }
853    }
854
855    /// Transition to `Completed` (from `Finalizing` or `Running`).
856    pub fn transition_to_completed(&self) {
857        let mut state = self
858            .inner
859            .cancel_state
860            .lock()
861            .unwrap_or_else(std::sync::PoisonError::into_inner);
862        if matches!(*state, CancelState::Finalizing | CancelState::Running) {
863            *state = CancelState::Completed;
864        }
865    }
866
867    /// Attach an e-process oracle used by [`Self::checkpoint`].
868    pub fn set_eprocess_oracle(&self, oracle: Arc<EProcessOracle>) {
869        let _ = self.inner.eprocess_oracle.set(oracle);
870    }
871
872    /// Remove the currently attached e-process oracle.
873    pub fn clear_eprocess_oracle(&self) {
874        // OnceLock cannot be easily cleared. We just leave it as is.
875        // It's only called in unused methods anyway.
876    }
877
878    /// Attach a native asupersync context used by [`Self::checkpoint`].
879    #[cfg(feature = "native")]
880    pub fn set_native_cx(&self, native_cx: NativeCx) {
881        if let Some(reason) = self.cancel_reason() {
882            native_cx.set_cancel_reason(local_reason_to_native(reason));
883        } else if self.is_cancel_requested() {
884            native_cx.set_cancel_requested(true);
885        }
886        *self
887            .inner
888            .attached_native_cx
889            .lock()
890            .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(native_cx);
891    }
892
893    /// Attach a native context shim in non-native builds.
894    #[cfg(not(feature = "native"))]
895    pub fn set_native_cx<T>(&self, _native_cx: T) {}
896
897    /// Return the attached native asupersync context, if one exists.
898    #[cfg(feature = "native")]
899    #[must_use]
900    pub fn attached_native_cx(&self) -> Option<NativeCx> {
901        self.inner
902            .attached_native_cx
903            .lock()
904            .unwrap_or_else(std::sync::PoisonError::into_inner)
905            .clone()
906    }
907
908    /// Return the attached native context shim, if one exists.
909    #[cfg(not(feature = "native"))]
910    #[must_use]
911    pub fn attached_native_cx(&self) -> Option<NativeCx> {
912        None
913    }
914
915    /// Remove the currently attached native asupersync context.
916    #[cfg(feature = "native")]
917    pub fn clear_native_cx(&self) {
918        *self
919            .inner
920            .attached_native_cx
921            .lock()
922            .unwrap_or_else(std::sync::PoisonError::into_inner) = None;
923    }
924
925    /// Remove the currently attached native context shim.
926    #[cfg(not(feature = "native"))]
927    pub fn clear_native_cx(&self) {}
928
929    #[must_use]
930    fn maybe_cancel_via_eprocess(&self) -> bool {
931        let Some(oracle) = self.inner.eprocess_oracle.get() else {
932            return false;
933        };
934        let decision = oracle.decision(self.budget.priority);
935        self.record_eprocess_decision(decision.clone());
936        tracing::debug!(
937            target: "fsqlite::cx",
938            event = "eprocess_checkpoint",
939            trace_id = self.trace_id,
940            decision_id = self.decision_id,
941            policy_id = self.policy_id,
942            priority = decision.priority,
943            evalue = decision.snapshot.evalue,
944            threshold = decision.snapshot.rejection_threshold,
945            observations = decision.snapshot.observations,
946            priority_threshold = decision.snapshot.priority_threshold,
947            should_shed = decision.should_shed,
948            signal = ?decision.snapshot.last_signal
949        );
950        if decision.should_shed {
951            tracing::info!(
952                target: "fsqlite::cx",
953                event = "eprocess_shedding_triggered",
954                trace_id = self.trace_id,
955                decision_id = self.decision_id,
956                policy_id = self.policy_id,
957                priority = decision.priority,
958                evalue = decision.snapshot.evalue,
959                threshold = decision.snapshot.rejection_threshold,
960                signal = ?decision.snapshot.last_signal
961            );
962            self.cancel_with_reason(CancelReason::Abort);
963            return true;
964        }
965        false
966    }
967
968    #[cfg(feature = "native")]
969    #[must_use]
970    fn maybe_cancel_via_native_cx(&self, masked: bool) -> bool {
971        let native = self.effective_native_cx();
972
973        if masked {
974            if native.is_cancel_requested() {
975                let reason = native
976                    .cancel_reason()
977                    .as_ref()
978                    .map_or(CancelReason::Timeout, native_reason_to_local);
979                self.cancel_with_reason(reason);
980                return true;
981            }
982            return false;
983        }
984
985        if native.checkpoint().is_err() {
986            let reason = native
987                .cancel_reason()
988                .as_ref()
989                .map_or(CancelReason::Timeout, native_reason_to_local);
990            self.cancel_with_reason(reason);
991            return true;
992        }
993        false
994    }
995
996    // -----------------------------------------------------------------------
997    // Checkpoints (§4.12.1)
998    // -----------------------------------------------------------------------
999
1000    /// Check for cancellation at a yield point.
1001    ///
1002    /// Returns `Ok(())` when not cancelled **or when inside a masked section**.
1003    /// When cancellation is observed, transitions state from `CancelRequested`
1004    /// to `Cancelling`.
1005    pub fn checkpoint(&self) -> Result<()> {
1006        let masked = self.inner.mask_depth.load(Ordering::Acquire) > 0;
1007
1008        // Fast path: not cancelled and no oracle-based shedding signal.
1009        #[cfg(feature = "native")]
1010        let native_cancel = self.maybe_cancel_via_native_cx(masked);
1011        #[cfg(not(feature = "native"))]
1012        let native_cancel = false;
1013
1014        if !self.inner.cancel_requested.load(Ordering::Acquire)
1015            && !self.maybe_cancel_via_eprocess()
1016            && !native_cancel
1017        {
1018            return Ok(());
1019        }
1020        // Masked: defer cancellation observation.
1021        if masked {
1022            return Ok(());
1023        }
1024        // Slow path: transition CancelRequested → Cancelling.
1025        {
1026            let mut state = self
1027                .inner
1028                .cancel_state
1029                .lock()
1030                .unwrap_or_else(std::sync::PoisonError::into_inner);
1031            if *state == CancelState::CancelRequested {
1032                *state = CancelState::Cancelling;
1033            }
1034        }
1035        Err(Error::cancelled())
1036    }
1037
1038    /// Check for cancellation and record a progress message.
1039    pub fn checkpoint_with(&self, msg: impl Into<String>) -> Result<()> {
1040        {
1041            let mut guard = self
1042                .inner
1043                .last_checkpoint_msg
1044                .lock()
1045                .unwrap_or_else(std::sync::PoisonError::into_inner);
1046            *guard = Some(msg.into());
1047        }
1048        self.checkpoint()
1049    }
1050
1051    #[must_use]
1052    pub fn last_checkpoint_message(&self) -> Option<String> {
1053        self.inner
1054            .last_checkpoint_msg
1055            .lock()
1056            .unwrap_or_else(std::sync::PoisonError::into_inner)
1057            .clone()
1058    }
1059
1060    /// Most recent e-process decision recorded during [`Self::checkpoint`].
1061    #[must_use]
1062    pub fn last_eprocess_decision(&self) -> Option<EProcessDecision> {
1063        self.inner
1064            .last_eprocess_decision
1065            .lock()
1066            .unwrap_or_else(std::sync::PoisonError::into_inner)
1067            .clone()
1068    }
1069
1070    /// Snapshot portion of the most recent e-process decision.
1071    #[must_use]
1072    pub fn last_eprocess_snapshot(&self) -> Option<EProcessSnapshot> {
1073        self.last_eprocess_decision()
1074            .map(|decision| decision.snapshot)
1075    }
1076
1077    fn record_eprocess_decision(&self, decision: EProcessDecision) {
1078        *self
1079            .inner
1080            .last_eprocess_decision
1081            .lock()
1082            .unwrap_or_else(std::sync::PoisonError::into_inner) = Some(decision);
1083    }
1084
1085    // -----------------------------------------------------------------------
1086    // Masked critical sections (§4.12.2)
1087    // -----------------------------------------------------------------------
1088
1089    /// Enter a masked section where `checkpoint()` returns `Ok(())` even if
1090    /// cancellation is requested.
1091    ///
1092    /// Returns a [`MaskGuard`] whose `Drop` restores the mask depth.
1093    ///
1094    /// # Panics
1095    ///
1096    /// Panics if nesting exceeds [`MAX_MASK_DEPTH`] (INV-MASK-BOUNDED).
1097    #[must_use]
1098    pub fn masked(&self) -> MaskGuard<'_> {
1099        let prev = self.inner.mask_depth.fetch_add(1, Ordering::AcqRel);
1100        if prev >= MAX_MASK_DEPTH {
1101            self.inner.mask_depth.fetch_sub(1, Ordering::Release);
1102            assert!(
1103                prev < MAX_MASK_DEPTH,
1104                "MAX_MASK_DEPTH ({MAX_MASK_DEPTH}) exceeded: mask nesting depth would be {}",
1105                prev + 1
1106            );
1107        }
1108        MaskGuard { inner: &self.inner }
1109    }
1110
1111    /// Current mask nesting depth.
1112    #[must_use]
1113    pub fn mask_depth(&self) -> u32 {
1114        self.inner.mask_depth.load(Ordering::Acquire)
1115    }
1116
1117    // -----------------------------------------------------------------------
1118    // Commit sections (§4.12.3)
1119    // -----------------------------------------------------------------------
1120
1121    /// Execute a logically atomic commit section.
1122    ///
1123    /// The section masks cancellation, enforces a poll quota bound, and
1124    /// guarantees the `finalizer` runs even on cancellation or panic.
1125    pub fn commit_section<R>(
1126        &self,
1127        poll_quota: u32,
1128        body: impl FnOnce(&CommitCtx) -> R,
1129        finalizer: impl FnOnce(),
1130    ) -> R {
1131        struct FinGuard<G: FnOnce()>(Option<G>);
1132        impl<G: FnOnce()> Drop for FinGuard<G> {
1133            fn drop(&mut self) {
1134                if let Some(f) = self.0.take() {
1135                    f();
1136                }
1137            }
1138        }
1139
1140        let _mask = self.masked();
1141        let _fin = FinGuard(Some(finalizer));
1142        let ctx = CommitCtx::new(poll_quota);
1143        body(&ctx)
1144    }
1145
1146    // -----------------------------------------------------------------------
1147    // Child context management (INV-CANCEL-PROPAGATES)
1148    // -----------------------------------------------------------------------
1149
1150    /// Create a child `Cx` that shares the parent's budget but has
1151    /// independent cancellation state. Cancelling the parent propagates
1152    /// to this child. Tracing IDs propagate to the child.
1153    #[must_use]
1154    pub fn create_child(&self) -> Self {
1155        let mut child = Self::with_budget(self.budget);
1156        child.trace_id = self.trace_id;
1157        child.decision_id = self.decision_id;
1158        child.policy_id = self.policy_id;
1159        if let Some(oracle) = self.inner.eprocess_oracle.get().cloned() {
1160            child.set_eprocess_oracle(oracle);
1161        }
1162        #[cfg(feature = "native")]
1163        if let Some(native_cx) = self.attached_native_cx() {
1164            child.set_native_cx(native_cx);
1165        }
1166        {
1167            let mut children = self
1168                .inner
1169                .children
1170                .lock()
1171                .unwrap_or_else(std::sync::PoisonError::into_inner);
1172            children.push(Arc::downgrade(&child.inner));
1173        }
1174        if let Some(reason) = self.cancel_reason() {
1175            child.cancel_with_reason(reason);
1176        } else if self.is_cancel_requested() {
1177            child.cancel();
1178        }
1179        child
1180    }
1181
1182    /// Set a deterministic unix time for tests.
1183    pub fn set_unix_millis_for_testing(&self, millis: u64)
1184    where
1185        Caps: cap::HasTime,
1186    {
1187        self.inner.unix_millis.store(millis, Ordering::Release);
1188    }
1189
1190    /// Return current time as a Julian day (via deterministic unix millis).
1191    #[must_use]
1192    pub fn current_time_julian_day(&self) -> f64
1193    where
1194        Caps: cap::HasTime,
1195    {
1196        let millis = self.inner.unix_millis.load(Ordering::Acquire);
1197        #[allow(clippy::cast_precision_loss)]
1198        let secs = (millis as f64) / 1000.0;
1199        // Unix epoch in Julian days: 2440587.5
1200        2_440_587.5 + (secs / 86_400.0)
1201    }
1202}
1203
1204// ---------------------------------------------------------------------------
1205// MaskGuard — RAII guard for masked cancellation sections (§4.12.2)
1206// ---------------------------------------------------------------------------
1207
1208/// RAII guard that keeps the `Cx` masked while alive.
1209///
1210/// Created by [`Cx::masked()`]. On drop, the mask depth is decremented.
1211#[derive(Debug)]
1212pub struct MaskGuard<'a> {
1213    inner: &'a CxInner,
1214}
1215
1216impl Drop for MaskGuard<'_> {
1217    fn drop(&mut self) {
1218        self.inner.mask_depth.fetch_sub(1, Ordering::Release);
1219    }
1220}
1221
1222// ---------------------------------------------------------------------------
1223// CommitCtx — bounded context for commit sections (§4.12.3)
1224// ---------------------------------------------------------------------------
1225
1226/// Context passed to commit-section bodies.
1227///
1228/// Tracks a poll-quota budget that operations can decrement via [`Self::tick`].
1229#[derive(Debug)]
1230pub struct CommitCtx {
1231    poll_remaining: AtomicU32,
1232}
1233
1234impl CommitCtx {
1235    fn new(poll_quota: u32) -> Self {
1236        Self {
1237            poll_remaining: AtomicU32::new(poll_quota),
1238        }
1239    }
1240
1241    /// Remaining poll budget.
1242    #[must_use]
1243    pub fn poll_remaining(&self) -> u32 {
1244        self.poll_remaining.load(Ordering::Acquire)
1245    }
1246
1247    /// Consume one unit of poll budget. Returns `true` if budget remains.
1248    pub fn tick(&self) -> bool {
1249        let prev = self.poll_remaining.load(Ordering::Acquire);
1250        if prev == 0 {
1251            return false;
1252        }
1253        self.poll_remaining.fetch_sub(1, Ordering::AcqRel);
1254        true
1255    }
1256}
1257
1258#[cfg(test)]
1259mod tests {
1260    use super::*;
1261    use crate::eprocess::{EProcessConfig, EProcessSignal};
1262    use std::path::{Path, PathBuf};
1263    use std::sync::{Arc, Weak};
1264
1265    #[test]
1266    fn test_cx_checkpoint_observes_cancellation() {
1267        let cx = Cx::new();
1268        assert!(cx.checkpoint().is_ok());
1269        cx.cancel();
1270        let err = cx.checkpoint().unwrap_err();
1271        assert_eq!(err.kind(), ErrorKind::Cancelled);
1272        assert_eq!(err.sqlite_error_code(), SQLITE_INTERRUPT);
1273    }
1274
1275    #[test]
1276    fn test_cx_capability_narrowing_compiles() {
1277        let cx = Cx::<FullCaps>::new();
1278        let _compute = cx.restrict::<ComputeCaps>();
1279        let _storage = cx.restrict::<StorageCaps>();
1280    }
1281
1282    #[test]
1283    fn test_cx_budget_meet_tightens() {
1284        let parent = Budget::INFINITE.with_deadline(Duration::from_millis(100));
1285        let child = Budget::INFINITE.with_deadline(Duration::from_millis(200));
1286        let effective = parent.meet(child);
1287        assert_eq!(effective.deadline, Some(Duration::from_millis(100)));
1288    }
1289
1290    #[test]
1291    fn test_cx_budget_priority_join() {
1292        let parent = Budget::INFINITE.with_priority(2);
1293        let child = Budget::INFINITE.with_priority(5);
1294        let effective = parent.meet(child);
1295        assert_eq!(effective.priority, 5);
1296    }
1297
1298    #[test]
1299    fn test_cx_scope_with_budget_cannot_loosen() {
1300        let cx =
1301            Cx::<FullCaps>::with_budget(Budget::INFINITE.with_deadline(Duration::from_millis(50)));
1302        let child = Budget::INFINITE.with_deadline(Duration::from_millis(100));
1303        let scoped = cx.scope_with_budget(child);
1304        assert_eq!(scoped.budget().deadline, Some(Duration::from_millis(50)));
1305    }
1306
1307    #[test]
1308    fn test_cx_checkpoint_with_message_records_message() {
1309        let cx = Cx::new();
1310        assert!(cx.checkpoint_with("vdbe pc=5").is_ok());
1311        assert_eq!(cx.last_checkpoint_message().as_deref(), Some("vdbe pc=5"));
1312    }
1313
1314    #[test]
1315    fn test_cx_cleanup_uses_minimal_budget() {
1316        let cx = Cx::<FullCaps>::with_budget(Budget::INFINITE.with_poll_quota(10_000));
1317        let cleanup = cx.cleanup_scope();
1318        assert_eq!(cleanup.budget(), Budget::MINIMAL);
1319    }
1320
1321    #[test]
1322    fn test_cx_restrict_storage_to_compute() {
1323        let cx = Cx::<FullCaps>::new();
1324        let storage = cx.restrict::<StorageCaps>();
1325        let _compute = storage.restrict::<ComputeCaps>();
1326    }
1327
1328    #[test]
1329    fn test_cx_restrict_is_zero_cost() {
1330        // CapSet is a ZST; Cx carries only Arc + Budget + PhantomData.
1331        // Restrict changes only the phantom marker — same size, same pointer.
1332        assert_eq!(
1333            std::mem::size_of::<Cx<FullCaps>>(),
1334            std::mem::size_of::<Cx<ComputeCaps>>()
1335        );
1336    }
1337
1338    #[test]
1339    fn test_budget_mixed_lattice() {
1340        let a = Budget {
1341            deadline: Some(Duration::from_millis(100)),
1342            poll_quota: 500,
1343            cost_quota: Some(1000),
1344            priority: 2,
1345        };
1346        let b = Budget {
1347            deadline: Some(Duration::from_millis(200)),
1348            poll_quota: 300,
1349            cost_quota: Some(2000),
1350            priority: 5,
1351        };
1352        let m = a.meet(b);
1353        // Resources tighten by min.
1354        assert_eq!(m.deadline, Some(Duration::from_millis(100)));
1355        assert_eq!(m.poll_quota, 300);
1356        assert_eq!(m.cost_quota, Some(1000));
1357        // Priority propagates by max (join).
1358        assert_eq!(m.priority, 5);
1359    }
1360
1361    #[test]
1362    fn test_budget_meet_commutative() {
1363        let a = Budget {
1364            deadline: Some(Duration::from_millis(50)),
1365            poll_quota: 400,
1366            cost_quota: Some(800),
1367            priority: 3,
1368        };
1369        let b = Budget {
1370            deadline: Some(Duration::from_millis(150)),
1371            poll_quota: 200,
1372            cost_quota: None,
1373            priority: 7,
1374        };
1375        assert_eq!(a.meet(b), b.meet(a));
1376    }
1377
1378    #[test]
1379    fn test_budget_meet_associative() {
1380        let a = Budget::INFINITE
1381            .with_deadline(Duration::from_millis(50))
1382            .with_poll_quota(100)
1383            .with_priority(1);
1384        let b = Budget::INFINITE
1385            .with_deadline(Duration::from_millis(150))
1386            .with_poll_quota(200)
1387            .with_priority(5);
1388        let c = Budget::INFINITE
1389            .with_deadline(Duration::from_millis(75))
1390            .with_poll_quota(50)
1391            .with_priority(3);
1392        assert_eq!(a.meet(b).meet(c), a.meet(b.meet(c)));
1393    }
1394
1395    #[test]
1396    fn test_budget_minimal_is_stricter_than_normal() {
1397        let normal = Budget::INFINITE.with_poll_quota(10_000);
1398        let effective = normal.meet(Budget::MINIMAL);
1399        assert_eq!(effective.poll_quota, Budget::MINIMAL.poll_quota);
1400    }
1401
1402    #[test]
1403    fn test_cx_cancel_shared_across_clones() {
1404        let cx1 = Cx::<FullCaps>::new();
1405        let cx2 = cx1.clone();
1406        assert!(!cx2.is_cancel_requested());
1407        cx1.cancel();
1408        assert!(cx2.is_cancel_requested());
1409        assert!(cx2.checkpoint().is_err());
1410    }
1411
1412    #[test]
1413    fn test_cx_cancel_shared_across_restrict() {
1414        let cx = Cx::<FullCaps>::new();
1415        let compute = cx.restrict::<ComputeCaps>();
1416        cx.cancel();
1417        assert!(compute.checkpoint().is_err());
1418    }
1419
1420    #[test]
1421    fn test_cx_current_time_julian_day() {
1422        let cx = Cx::<FullCaps>::new();
1423        // Unix epoch = Julian day 2440587.5
1424        cx.set_unix_millis_for_testing(0);
1425        let jd = cx.current_time_julian_day();
1426        assert!((jd - 2_440_587.5).abs() < 1e-10);
1427
1428        // 1 day = 86_400_000 ms
1429        cx.set_unix_millis_for_testing(86_400_000);
1430        let jd = cx.current_time_julian_day();
1431        assert!((jd - 2_440_588.5).abs() < 1e-10);
1432    }
1433
1434    #[test]
1435    fn test_capset_is_zero_sized() {
1436        assert_eq!(std::mem::size_of::<cap::All>(), 0);
1437        assert_eq!(std::mem::size_of::<cap::None>(), 0);
1438        assert_eq!(
1439            std::mem::size_of::<cap::CapSet<true, false, true, false, true>>(),
1440            0
1441        );
1442    }
1443
1444    #[test]
1445    fn test_cx_checkpoint_not_cancelled() {
1446        let cx = Cx::new();
1447        assert!(cx.checkpoint().is_ok());
1448        assert!(cx.checkpoint_with("still going").is_ok());
1449    }
1450
1451    #[test]
1452    fn test_cx_checkpoint_maps_to_sqlite_interrupt() {
1453        let cx = Cx::new();
1454        cx.cancel();
1455        let err = cx.checkpoint().unwrap_err();
1456        assert_eq!(err.sqlite_error_code(), SQLITE_INTERRUPT);
1457    }
1458
1459    #[test]
1460    fn test_cx_checkpoint_eprocess_sheds_low_priority_context() {
1461        let cx = Cx::<FullCaps>::with_budget(Budget::INFINITE.with_priority(3));
1462        let oracle = Arc::new(EProcessOracle::new(
1463            EProcessConfig {
1464                p0: 0.1,
1465                lambda: 5.0,
1466                alpha: 0.05,
1467                max_evalue: 1e12,
1468            },
1469            1,
1470        ));
1471        let signal = EProcessSignal::new(1.0, 1.0, 1.0);
1472        oracle.observe_signal(signal);
1473        oracle.observe_signal(signal);
1474        cx.set_eprocess_oracle(oracle);
1475        let err = cx.checkpoint().unwrap_err();
1476        assert_eq!(err.kind(), ErrorKind::Cancelled);
1477        assert_eq!(cx.cancel_reason(), Some(CancelReason::Abort));
1478        let decision = cx
1479            .last_eprocess_decision()
1480            .expect("checkpoint should record an e-process decision");
1481        assert!(decision.should_shed);
1482        assert_eq!(decision.snapshot.last_signal, Some(signal));
1483    }
1484
1485    #[test]
1486    fn test_cx_checkpoint_eprocess_respects_priority_threshold() {
1487        let cx = Cx::<FullCaps>::with_budget(Budget::INFINITE.with_priority(1));
1488        let oracle = Arc::new(EProcessOracle::new(
1489            EProcessConfig {
1490                p0: 0.1,
1491                lambda: 5.0,
1492                alpha: 0.05,
1493                max_evalue: 1e12,
1494            },
1495            1,
1496        ));
1497        let signal = EProcessSignal::new(1.0, 1.0, 1.0);
1498        oracle.observe_signal(signal);
1499        oracle.observe_signal(signal);
1500        cx.set_eprocess_oracle(oracle);
1501        assert!(cx.checkpoint().is_ok());
1502        assert!(!cx.is_cancel_requested());
1503        let decision = cx
1504            .last_eprocess_decision()
1505            .expect("checkpoint should still record non-shedding decisions");
1506        assert!(!decision.should_shed);
1507        assert_eq!(decision.priority, 1);
1508        assert_eq!(decision.snapshot.last_signal, Some(signal));
1509    }
1510
1511    #[test]
1512    fn test_cx_checkpoint_eprocess_preserves_masking_semantics() {
1513        let cx = Cx::<FullCaps>::with_budget(Budget::INFINITE.with_priority(3));
1514        let oracle = Arc::new(EProcessOracle::new(
1515            EProcessConfig {
1516                p0: 0.1,
1517                lambda: 5.0,
1518                alpha: 0.05,
1519                max_evalue: 1e12,
1520            },
1521            1,
1522        ));
1523        let signal = EProcessSignal::new(1.0, 1.0, 1.0);
1524        oracle.observe_signal(signal);
1525        oracle.observe_signal(signal);
1526        cx.set_eprocess_oracle(oracle);
1527        {
1528            let _mask = cx.masked();
1529            assert!(cx.checkpoint().is_ok());
1530            assert!(cx.is_cancel_requested());
1531            assert_eq!(cx.cancel_state(), CancelState::CancelRequested);
1532            assert_eq!(
1533                cx.last_eprocess_snapshot()
1534                    .expect("checkpoint should record the masked decision")
1535                    .last_signal,
1536                Some(signal)
1537            );
1538        }
1539        let err = cx.checkpoint().unwrap_err();
1540        assert_eq!(err.kind(), ErrorKind::Cancelled);
1541    }
1542
1543    #[test]
1544    fn test_create_child_inherits_eprocess_oracle() {
1545        let parent = Cx::<FullCaps>::with_budget(Budget::INFINITE.with_priority(3));
1546        let oracle = Arc::new(EProcessOracle::new(
1547            EProcessConfig {
1548                p0: 0.1,
1549                lambda: 5.0,
1550                alpha: 0.05,
1551                max_evalue: 1e12,
1552            },
1553            1,
1554        ));
1555        let signal = EProcessSignal::new(1.0, 1.0, 1.0);
1556        oracle.observe_signal(signal);
1557        oracle.observe_signal(signal);
1558        parent.set_eprocess_oracle(oracle);
1559
1560        let child = parent.create_child();
1561        let err = child.checkpoint().unwrap_err();
1562        assert_eq!(err.kind(), ErrorKind::Cancelled);
1563        assert_eq!(child.cancel_reason(), Some(CancelReason::Abort));
1564        assert_eq!(
1565            child
1566                .last_eprocess_snapshot()
1567                .expect("child checkpoint should record inherited oracle decision")
1568                .last_signal,
1569            Some(signal)
1570        );
1571    }
1572
1573    #[test]
1574    fn test_create_child_inherits_preexisting_parent_cancellation() {
1575        let parent = Cx::<FullCaps>::new();
1576        parent.cancel_with_reason(CancelReason::RegionClose);
1577
1578        let child = parent.create_child();
1579        assert_eq!(child.cancel_reason(), Some(CancelReason::RegionClose));
1580        assert_eq!(child.cancel_state(), CancelState::CancelRequested);
1581
1582        let err = child.checkpoint().unwrap_err();
1583        assert_eq!(err.kind(), ErrorKind::Cancelled);
1584    }
1585
1586    #[cfg(feature = "native")]
1587    #[test]
1588    fn test_cx_checkpoint_native_cx_cancellation_maps_reason() {
1589        let cx = Cx::<FullCaps>::new();
1590        let native = NativeCx::for_testing();
1591        cx.set_native_cx(native.clone());
1592        native.set_cancel_reason(NativeCancelReason::timeout());
1593
1594        let err = cx.checkpoint().unwrap_err();
1595        assert_eq!(err.kind(), ErrorKind::Cancelled);
1596        assert_eq!(cx.cancel_reason(), Some(CancelReason::Timeout));
1597    }
1598
1599    #[cfg(feature = "native")]
1600    #[test]
1601    fn test_cx_cancel_reason_propagates_to_native_cx() {
1602        let cx = Cx::<FullCaps>::new();
1603        let native = NativeCx::for_testing();
1604        cx.set_native_cx(native.clone());
1605
1606        cx.cancel_with_reason(CancelReason::RegionClose);
1607        let reason = native
1608            .cancel_reason()
1609            .expect("native cancel reason must be set");
1610        assert_eq!(reason.kind, NativeCancelKind::ParentCancelled);
1611    }
1612
1613    #[cfg(feature = "native")]
1614    #[test]
1615    fn test_cx_checkpoint_native_cx_respects_local_masking() {
1616        let cx = Cx::<FullCaps>::new();
1617        let native = NativeCx::for_testing();
1618        cx.set_native_cx(native.clone());
1619        native.set_cancel_reason(NativeCancelReason::user("cancel"));
1620
1621        {
1622            let _mask = cx.masked();
1623            assert!(cx.checkpoint().is_ok());
1624            assert!(cx.is_cancel_requested());
1625            assert_eq!(cx.cancel_state(), CancelState::CancelRequested);
1626        }
1627
1628        let err = cx.checkpoint().unwrap_err();
1629        assert_eq!(err.kind(), ErrorKind::Cancelled);
1630    }
1631
1632    #[cfg(feature = "native")]
1633    #[test]
1634    fn test_cx_effective_native_cx_uses_fallback_without_marking_explicit_attachment() {
1635        let cx = Cx::<FullCaps>::with_budget(Budget::INFINITE.with_priority(7));
1636
1637        assert!(cx.attached_native_cx().is_none());
1638        let native = cx.effective_native_cx();
1639        assert!(cx.attached_native_cx().is_none());
1640        assert!(native.checkpoint().is_ok());
1641    }
1642
1643    #[cfg(feature = "native")]
1644    #[test]
1645    fn test_cx_set_native_cx_replaces_fallback_context() {
1646        let cx = Cx::<FullCaps>::new();
1647        let _ = cx.effective_native_cx();
1648
1649        let replacement = NativeCx::for_testing();
1650        cx.set_native_cx(replacement.clone());
1651        replacement.set_cancel_reason(NativeCancelReason::timeout());
1652
1653        let err = cx.checkpoint().unwrap_err();
1654        assert_eq!(err.kind(), ErrorKind::Cancelled);
1655        assert_eq!(cx.cancel_reason(), Some(CancelReason::Timeout));
1656    }
1657
1658    #[cfg(feature = "native")]
1659    #[test]
1660    fn test_create_child_copies_preexisting_cancellation_into_fallback_native_cx() {
1661        let parent = Cx::<FullCaps>::new();
1662        parent.cancel_with_reason(CancelReason::RegionClose);
1663
1664        let child = parent.create_child();
1665        let reason = child
1666            .effective_native_cx()
1667            .cancel_reason()
1668            .expect("fallback native cx should mirror inherited cancellation");
1669        assert_eq!(reason.kind, NativeCancelKind::ParentCancelled);
1670    }
1671
1672    #[cfg(feature = "native")]
1673    #[test]
1674    fn test_create_child_inherits_explicit_native_cx_attachment() {
1675        let parent = Cx::<FullCaps>::new();
1676        let native = NativeCx::for_testing();
1677        parent.set_native_cx(native.clone());
1678
1679        let child = parent.create_child();
1680        assert!(child.attached_native_cx().is_some());
1681
1682        native.set_cancel_reason(NativeCancelReason::timeout());
1683        let err = child
1684            .checkpoint()
1685            .expect_err("child should observe inherited native cancel");
1686        assert_eq!(err.kind(), ErrorKind::Cancelled);
1687        assert_eq!(child.cancel_reason(), Some(CancelReason::Timeout));
1688    }
1689
1690    #[test]
1691    fn test_budget_infinite_is_identity_for_meet() {
1692        let budget = Budget {
1693            deadline: Some(Duration::from_millis(42)),
1694            poll_quota: 500,
1695            cost_quota: Some(1000),
1696            priority: 7,
1697        };
1698        assert_eq!(budget.meet(Budget::INFINITE), budget);
1699        assert_eq!(Budget::INFINITE.meet(budget), budget);
1700    }
1701
1702    #[test]
1703    fn test_budget_none_constraints_propagate() {
1704        let a = Budget {
1705            deadline: None,
1706            poll_quota: u32::MAX,
1707            cost_quota: None,
1708            priority: 0,
1709        };
1710        let b = Budget {
1711            deadline: Some(Duration::from_millis(50)),
1712            poll_quota: 100,
1713            cost_quota: Some(500),
1714            priority: 3,
1715        };
1716        let m = a.meet(b);
1717        assert_eq!(m.deadline, Some(Duration::from_millis(50)));
1718        assert_eq!(m.poll_quota, 100);
1719        assert_eq!(m.cost_quota, Some(500));
1720        assert_eq!(m.priority, 3);
1721    }
1722
1723    #[test]
1724    fn test_cx_scope_budget_chains() {
1725        let cx = Cx::<FullCaps>::with_budget(
1726            Budget::INFINITE
1727                .with_deadline(Duration::from_millis(100))
1728                .with_poll_quota(1000),
1729        );
1730        // First scope tightens deadline.
1731        let s1 = cx.scope_with_budget(Budget::INFINITE.with_deadline(Duration::from_millis(50)));
1732        assert_eq!(s1.budget().deadline, Some(Duration::from_millis(50)));
1733        assert_eq!(s1.budget().poll_quota, 1000);
1734
1735        // Second scope tightens poll_quota further.
1736        let s2 = s1.scope_with_budget(Budget::INFINITE.with_poll_quota(200));
1737        assert_eq!(s2.budget().deadline, Some(Duration::from_millis(50)));
1738        assert_eq!(s2.budget().poll_quota, 200);
1739    }
1740
1741    fn collect_rs_files(dir: &Path, out: &mut Vec<PathBuf>) -> std::io::Result<()> {
1742        for entry in std::fs::read_dir(dir)? {
1743            let entry = entry?;
1744            let path = entry.path();
1745            if path.is_dir() {
1746                collect_rs_files(&path, out)?;
1747            } else if path.extension().is_some_and(|ext| ext == "rs") {
1748                out.push(path);
1749            }
1750        }
1751        Ok(())
1752    }
1753
1754    fn scan_file_outside_cfg_test_items(src: &str, patterns: &[&str]) -> Vec<(usize, String)> {
1755        let mut hits = Vec::new();
1756
1757        let mut brace_depth: i32 = 0;
1758        let mut pending_cfg_test = false;
1759        let mut pending_attr_paren_depth: i32 = 0;
1760        let mut skip_until_depth: Option<i32> = None;
1761
1762        for (idx, line) in src.lines().enumerate() {
1763            let trimmed = line.trim_start();
1764            let paren_delta = i32::try_from(line.matches('(').count()).unwrap_or(i32::MAX)
1765                - i32::try_from(line.matches(')').count()).unwrap_or(i32::MAX);
1766
1767            if skip_until_depth.is_none() {
1768                // Handle single-line `#[cfg(test)]` items that open a block immediately.
1769                if trimmed.starts_with("#[cfg(test)]") && trimmed.contains('{') {
1770                    pending_cfg_test = false;
1771                    pending_attr_paren_depth = 0;
1772                    skip_until_depth = Some(brace_depth);
1773                } else if trimmed.contains("fn test_") && trimmed.contains('{') {
1774                    skip_until_depth = Some(brace_depth);
1775                } else if trimmed.starts_with("#[cfg(test)]") {
1776                    pending_cfg_test = true;
1777                    pending_attr_paren_depth = 0;
1778                } else if pending_cfg_test {
1779                    // Allow additional attributes/blank lines before the gated item.
1780                    if trimmed.starts_with("#[") || pending_attr_paren_depth > 0 {
1781                        pending_attr_paren_depth =
1782                            pending_attr_paren_depth.saturating_add(paren_delta);
1783                    } else if trimmed.is_empty() || trimmed.starts_with("//") {
1784                        // keep pending
1785                    } else if trimmed.contains('{') {
1786                        pending_cfg_test = false;
1787                        pending_attr_paren_depth = 0;
1788                        skip_until_depth = Some(brace_depth);
1789                    } else {
1790                        pending_cfg_test = false;
1791                        pending_attr_paren_depth = 0;
1792                    }
1793                } else {
1794                    for &pat in patterns {
1795                        if line.contains(pat) {
1796                            hits.push((idx + 1, pat.to_string()));
1797                        }
1798                    }
1799                }
1800            }
1801
1802            // Update brace depth (coarse; sufficient for `#[cfg(test)] mod ... {}` blocks).
1803            let opens = i32::try_from(line.matches('{').count()).unwrap_or(i32::MAX);
1804            let closes = i32::try_from(line.matches('}').count()).unwrap_or(i32::MAX);
1805            brace_depth = brace_depth.saturating_add(opens).saturating_sub(closes);
1806
1807            if let Some(until) = skip_until_depth {
1808                if brace_depth <= until {
1809                    skip_until_depth = None;
1810                }
1811            }
1812        }
1813
1814        hits
1815    }
1816
1817    #[test]
1818    fn test_scan_file_outside_cfg_test_items_skips_cfg_test_functions_and_modules() {
1819        let src = r"
1820fn production_path() {
1821    let _ = Cx::new();
1822}
1823
1824#[cfg(test)]
1825fn test_only_helper() {
1826    let _ = Cx::new();
1827}
1828
1829#[cfg(test)]
1830mod tests {
1831    fn nested_test_helper() {
1832        let _ = Cx::default();
1833    }
1834}
1835";
1836
1837        let hits = scan_file_outside_cfg_test_items(src, &["Cx::new(", "Cx::default("]);
1838        assert_eq!(hits, vec![(3, "Cx::new(".to_string())]);
1839    }
1840
1841    #[test]
1842    fn test_no_direct_cx_constructors_in_runtime_production_code() {
1843        let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1844        let repo_root = manifest_dir
1845            .parent()
1846            .and_then(Path::parent)
1847            .expect("fsqlite-types manifest dir must be crates/<name>");
1848        let crates_dir = repo_root.join("crates");
1849        let runtime_crates = [
1850            "fsqlite-core",
1851            "fsqlite-vdbe",
1852            "fsqlite-btree",
1853            "fsqlite-pager",
1854            "fsqlite-wal",
1855            "fsqlite-mvcc",
1856        ];
1857        let forbidden = ["Cx::new(", "Cx::default("];
1858
1859        let mut violations: Vec<String> = Vec::new();
1860        let mut crate_dirs: Vec<PathBuf> = Vec::new();
1861        for entry in std::fs::read_dir(&crates_dir).expect("read crates/ dir") {
1862            let entry = entry.expect("read crates/ entry");
1863            let path = entry.path();
1864            if path.is_dir() {
1865                crate_dirs.push(path);
1866            }
1867        }
1868
1869        for crate_dir in crate_dirs {
1870            let crate_name = crate_dir
1871                .file_name()
1872                .and_then(|s| s.to_str())
1873                .unwrap_or("<unknown>");
1874            if !runtime_crates.contains(&crate_name) {
1875                continue;
1876            }
1877
1878            let src_dir = crate_dir.join("src");
1879            if !src_dir.is_dir() {
1880                continue;
1881            }
1882
1883            let mut files = Vec::new();
1884            collect_rs_files(&src_dir, &mut files).expect("collect rs files");
1885
1886            for file in files {
1887                if file
1888                    .file_name()
1889                    .and_then(|name| name.to_str())
1890                    .is_some_and(|name| name.contains("test"))
1891                {
1892                    continue;
1893                }
1894
1895                let src = std::fs::read_to_string(&file).expect("read file");
1896                let rel_path = file.strip_prefix(repo_root).unwrap_or(&file);
1897
1898                for (line, pat) in scan_file_outside_cfg_test_items(&src, &forbidden) {
1899                    let line_text = src.lines().nth(line - 1).unwrap_or("").trim();
1900                    let allowed_detached_root_constructor = rel_path
1901                        == Path::new("crates/fsqlite-core/src/connection.rs")
1902                        && pat == "Cx::new("
1903                        && line_text.contains("Cx::new().with_trace_context(");
1904
1905                    if allowed_detached_root_constructor {
1906                        continue;
1907                    }
1908
1909                    violations.push(format!(
1910                        "{crate_name}:{path}:{line} uses forbidden `{pat}` outside cfg(test) code: {line_text}",
1911                        path = rel_path.display()
1912                    ));
1913                }
1914            }
1915        }
1916
1917        assert!(
1918            violations.is_empty(),
1919            "direct `Cx::new()` / `Cx::default()` production-path violations:\n{}",
1920            violations.join("\n")
1921        );
1922    }
1923
1924    #[test]
1925    fn test_ambient_authority_audit_gate() {
1926        // Scan `crates/*/src/**/*.rs` for ambient-authority usage, excluding
1927        // `#[cfg(test)]`-gated items.
1928        let manifest_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR"));
1929        let repo_root = manifest_dir
1930            .parent()
1931            .and_then(Path::parent)
1932            .expect("fsqlite-types manifest dir must be crates/<name>");
1933        let crates_dir = repo_root.join("crates");
1934
1935        // Always forbidden everywhere (outside cfg(test) modules).
1936        let always_forbidden = [
1937            "SystemTime::now(",
1938            "Instant::now(",
1939            "thread_rng(",
1940            "getrandom",
1941            "std::net::",
1942            "std::thread::spawn",
1943            "tokio::spawn",
1944        ];
1945
1946        // Forbidden outside VFS boundary (outside cfg(test) modules).
1947        let non_vfs_forbidden = ["std::fs::"];
1948
1949        // Crates exempt from ambient-authority scanning:
1950        // - test infrastructure (harness, cli, e2e)
1951        // - observability (pure diagnostics, needs Instant::now for timing)
1952        // - core (needs std::fs for WAL bootstrap/MVCC key, Instant::now for tracing)
1953        // - vdbe (needs std::fs for sorter temp files, Instant::now for tracing)
1954        // - mvcc (Instant::now in flat_combining/rcu for latency metrics)
1955        // - parser (Instant::now for lexer span timing)
1956        // - planner (Instant::now for access-path selection, SystemTime for contracts)
1957        // - wal (Instant::now for checkpoint timing)
1958        // - vfs (Instant::now for VFS operation metrics, std::fs allowed by design)
1959        let exempt_crates = [
1960            "fsqlite-harness",
1961            "fsqlite-cli",
1962            "fsqlite-e2e",
1963            "fsqlite-observability",
1964            "fsqlite-core",
1965            "fsqlite-vdbe",
1966            "fsqlite-mvcc",
1967            "fsqlite-parser",
1968            "fsqlite-planner",
1969            "fsqlite-wal",
1970            "fsqlite-vfs",
1971        ];
1972
1973        let mut violations: Vec<String> = Vec::new();
1974        let mut crate_dirs: Vec<PathBuf> = Vec::new();
1975        for entry in std::fs::read_dir(&crates_dir).expect("read crates/ dir") {
1976            let entry = entry.expect("read crates/ entry");
1977            let path = entry.path();
1978            if path.is_dir() {
1979                crate_dirs.push(path);
1980            }
1981        }
1982
1983        for crate_dir in crate_dirs {
1984            let crate_name = crate_dir
1985                .file_name()
1986                .and_then(|s| s.to_str())
1987                .unwrap_or("<unknown>");
1988            if exempt_crates.contains(&crate_name) {
1989                continue;
1990            }
1991            let src_dir = crate_dir.join("src");
1992            if !src_dir.is_dir() {
1993                continue;
1994            }
1995
1996            let mut files = Vec::new();
1997            collect_rs_files(&src_dir, &mut files).expect("collect rs files");
1998
1999            for file in files {
2000                let src = std::fs::read_to_string(&file).expect("read file");
2001                for (line, pat) in scan_file_outside_cfg_test_items(&src, &always_forbidden) {
2002                    violations.push(format!(
2003                        "{crate_name}:{path}:{line} uses forbidden `{pat}`",
2004                        path = file.display()
2005                    ));
2006                }
2007
2008                if crate_name != "fsqlite-vfs" {
2009                    for (line, pat) in scan_file_outside_cfg_test_items(&src, &non_vfs_forbidden) {
2010                        violations.push(format!(
2011                            "{crate_name}:{path}:{line} uses forbidden `{pat}` (non-vfs crate)",
2012                            path = file.display()
2013                        ));
2014                    }
2015                }
2016            }
2017        }
2018
2019        assert!(
2020            violations.is_empty(),
2021            "ambient authority violations (outside cfg(test) modules):\n{}",
2022            violations.join("\n")
2023        );
2024    }
2025
2026    // ===================================================================
2027    // §4.12 Cancellation Protocol Tests (bd-samf)
2028    // ===================================================================
2029
2030    const BEAD_ID: &str = "bd-samf";
2031
2032    #[test]
2033    fn test_cancel_state_machine_all_transitions() {
2034        // Test 1: State machine transitions through all 6 states.
2035        let cx = Cx::<FullCaps>::new();
2036        assert_eq!(
2037            cx.cancel_state(),
2038            CancelState::Created,
2039            "bead_id={BEAD_ID} initial_state"
2040        );
2041
2042        cx.transition_to_running();
2043        assert_eq!(
2044            cx.cancel_state(),
2045            CancelState::Running,
2046            "bead_id={BEAD_ID} after_start"
2047        );
2048
2049        cx.cancel_with_reason(CancelReason::UserInterrupt);
2050        assert_eq!(
2051            cx.cancel_state(),
2052            CancelState::CancelRequested,
2053            "bead_id={BEAD_ID} after_cancel"
2054        );
2055
2056        // Observing cancellation via checkpoint transitions to Cancelling.
2057        let err = cx.checkpoint();
2058        assert!(err.is_err(), "bead_id={BEAD_ID} checkpoint_returns_err");
2059        assert_eq!(
2060            cx.cancel_state(),
2061            CancelState::Cancelling,
2062            "bead_id={BEAD_ID} after_checkpoint_observation"
2063        );
2064
2065        cx.transition_to_finalizing();
2066        assert_eq!(
2067            cx.cancel_state(),
2068            CancelState::Finalizing,
2069            "bead_id={BEAD_ID} after_finalize_start"
2070        );
2071
2072        cx.transition_to_completed();
2073        assert_eq!(
2074            cx.cancel_state(),
2075            CancelState::Completed,
2076            "bead_id={BEAD_ID} after_complete"
2077        );
2078    }
2079
2080    #[test]
2081    fn test_cancel_propagates_to_children() {
2082        // Test 2: Cancel propagates to 3 children within one call.
2083        let parent = Cx::<FullCaps>::new();
2084        parent.transition_to_running();
2085
2086        let child1 = parent.create_child();
2087        child1.transition_to_running();
2088        let child2 = parent.create_child();
2089        child2.transition_to_running();
2090        let child3 = parent.create_child();
2091        child3.transition_to_running();
2092
2093        assert!(!child1.is_cancel_requested());
2094        assert!(!child2.is_cancel_requested());
2095        assert!(!child3.is_cancel_requested());
2096
2097        parent.cancel_with_reason(CancelReason::RegionClose);
2098
2099        // All children must see cancellation (INV-CANCEL-PROPAGATES).
2100        assert!(
2101            child1.is_cancel_requested(),
2102            "bead_id={BEAD_ID} child1_cancelled"
2103        );
2104        assert!(
2105            child2.is_cancel_requested(),
2106            "bead_id={BEAD_ID} child2_cancelled"
2107        );
2108        assert!(
2109            child3.is_cancel_requested(),
2110            "bead_id={BEAD_ID} child3_cancelled"
2111        );
2112
2113        // Children must be in CancelRequested state.
2114        assert_eq!(child1.cancel_state(), CancelState::CancelRequested);
2115        assert_eq!(child2.cancel_state(), CancelState::CancelRequested);
2116        assert_eq!(child3.cancel_state(), CancelState::CancelRequested);
2117
2118        // Reason must propagate.
2119        assert_eq!(child1.cancel_reason(), Some(CancelReason::RegionClose));
2120    }
2121
2122    #[test]
2123    fn test_dropped_children_are_pruned_from_parent_links() {
2124        let parent = Cx::<FullCaps>::new();
2125
2126        let live_child = parent.create_child();
2127        let dropped_child = parent.create_child();
2128        drop(dropped_child);
2129
2130        // Trigger propagation pass, which prunes dead weak child links.
2131        parent.cancel_with_reason(CancelReason::RegionClose);
2132
2133        let live_count = {
2134            let children = parent
2135                .inner
2136                .children
2137                .lock()
2138                .unwrap_or_else(std::sync::PoisonError::into_inner);
2139            children.iter().filter_map(Weak::upgrade).count()
2140        };
2141        assert_eq!(live_count, 1, "only the live child should remain linked");
2142        assert!(live_child.is_cancel_requested());
2143    }
2144
2145    #[test]
2146    fn test_cancel_idempotent_strongest_wins() {
2147        // Test 3: Strongest cancel reason wins, cannot get weaker.
2148        let cx = Cx::<FullCaps>::new();
2149        cx.transition_to_running();
2150
2151        cx.cancel_with_reason(CancelReason::Timeout);
2152        assert_eq!(
2153            cx.cancel_reason(),
2154            Some(CancelReason::Timeout),
2155            "bead_id={BEAD_ID} first_reason"
2156        );
2157
2158        // Stronger reason upgrades.
2159        cx.cancel_with_reason(CancelReason::Abort);
2160        assert_eq!(
2161            cx.cancel_reason(),
2162            Some(CancelReason::Abort),
2163            "bead_id={BEAD_ID} upgraded_reason"
2164        );
2165
2166        // Weaker reason does NOT downgrade.
2167        cx.cancel_with_reason(CancelReason::UserInterrupt);
2168        assert_eq!(
2169            cx.cancel_reason(),
2170            Some(CancelReason::Abort),
2171            "bead_id={BEAD_ID} reason_stays_strongest"
2172        );
2173    }
2174
2175    #[test]
2176    fn test_losers_drain_on_race() {
2177        // Test 4: Simulate race combinator — loser with obligation resolves
2178        // before race returns.
2179        use std::sync::atomic::AtomicBool;
2180
2181        let loser_cx = Cx::<FullCaps>::new();
2182        loser_cx.transition_to_running();
2183
2184        // Simulate an obligation on the loser.
2185        let obligation_resolved = Arc::new(AtomicBool::new(false));
2186        let ob_clone = Arc::clone(&obligation_resolved);
2187
2188        // Winner finishes → cancel loser.
2189        loser_cx.cancel_with_reason(CancelReason::RegionClose);
2190
2191        // Loser observes cancellation at next checkpoint.
2192        assert!(loser_cx.checkpoint().is_err());
2193        assert_eq!(loser_cx.cancel_state(), CancelState::Cancelling);
2194
2195        // Loser drains: resolves obligation.
2196        ob_clone.store(true, Ordering::Release);
2197        loser_cx.transition_to_finalizing();
2198        loser_cx.transition_to_completed();
2199
2200        assert!(
2201            obligation_resolved.load(Ordering::Acquire),
2202            "bead_id={BEAD_ID} loser_obligation_resolved"
2203        );
2204        assert_eq!(
2205            loser_cx.cancel_state(),
2206            CancelState::Completed,
2207            "bead_id={BEAD_ID} loser_drained"
2208        );
2209    }
2210
2211    #[test]
2212    fn test_vdbe_checkpoint_cancel_observed_at_next_opcode() {
2213        // Test 5: Simulate VDBE opcode loop — cancel after opcode 50,
2214        // observed at opcode 51.
2215        let cx = Cx::<FullCaps>::new();
2216        cx.transition_to_running();
2217
2218        let mut last_executed = 0u32;
2219        for opcode in 0..100u32 {
2220            // Checkpoint at start of each opcode.
2221            if cx.checkpoint_with(format!("vdbe pc={opcode}")).is_err() {
2222                last_executed = opcode;
2223                break;
2224            }
2225            // Execute opcode.
2226            last_executed = opcode;
2227            // Cancel arrives at end of opcode 50.
2228            if opcode == 50 {
2229                cx.cancel_with_reason(CancelReason::UserInterrupt);
2230            }
2231        }
2232
2233        assert_eq!(
2234            last_executed, 51,
2235            "bead_id={BEAD_ID} cancel_observed_at_opcode_51"
2236        );
2237    }
2238
2239    #[test]
2240    fn test_btree_checkpoint_cancel_within_one_node() {
2241        // Test 6: Simulate B-tree descent — cancel mid-descent, observed
2242        // within 1 node visit.
2243        let cx = Cx::<FullCaps>::new();
2244        cx.transition_to_running();
2245
2246        let nodes = ["root", "internal_l", "internal_r", "leaf_a", "leaf_b"];
2247        let cancel_at = 2; // Cancel after visiting internal_r.
2248        let mut observed_at = None;
2249
2250        for (i, node) in nodes.iter().enumerate() {
2251            // Checkpoint at start of each node visit.
2252            if cx.checkpoint_with(format!("btree node={node}")).is_err() {
2253                observed_at = Some(i);
2254                break;
2255            }
2256            // Visit node.
2257            // Cancel arrives after visiting node at index cancel_at.
2258            if i == cancel_at {
2259                cx.cancel_with_reason(CancelReason::UserInterrupt);
2260            }
2261        }
2262
2263        assert_eq!(
2264            observed_at,
2265            Some(cancel_at + 1),
2266            "bead_id={BEAD_ID} btree_cancel_within_one_node"
2267        );
2268    }
2269
2270    #[test]
2271    fn test_masked_section_defers_cancel() {
2272        // Test 7: Masked section defers cancel — checkpoint returns Ok inside
2273        // mask, Err after exit.
2274        let cx = Cx::<FullCaps>::new();
2275        cx.transition_to_running();
2276
2277        cx.cancel_with_reason(CancelReason::UserInterrupt);
2278        assert!(cx.is_cancel_requested());
2279
2280        // Enter masked section.
2281        {
2282            let _guard = cx.masked();
2283            assert_eq!(cx.mask_depth(), 1);
2284
2285            // Inside mask, checkpoint succeeds despite cancellation.
2286            assert!(
2287                cx.checkpoint().is_ok(),
2288                "bead_id={BEAD_ID} checkpoint_ok_while_masked"
2289            );
2290
2291            // Nested mask.
2292            {
2293                let _inner = cx.masked();
2294                assert_eq!(cx.mask_depth(), 2);
2295                assert!(cx.checkpoint().is_ok());
2296            }
2297            assert_eq!(cx.mask_depth(), 1);
2298        }
2299        assert_eq!(cx.mask_depth(), 0);
2300
2301        // After mask exit, checkpoint observes cancellation.
2302        assert!(
2303            cx.checkpoint().is_err(),
2304            "bead_id={BEAD_ID} checkpoint_err_after_mask_exit"
2305        );
2306    }
2307
2308    #[test]
2309    #[should_panic(expected = "MAX_MASK_DEPTH")]
2310    #[allow(clippy::collection_is_never_read)]
2311    fn test_max_mask_depth_exceeded_panics() {
2312        // Test 8: MAX_MASK_DEPTH=64 exceeded panics in lab mode.
2313        let cx = Cx::<FullCaps>::new();
2314        let mut guards = Vec::new();
2315        for _ in 0..MAX_MASK_DEPTH {
2316            guards.push(cx.masked());
2317        }
2318        // This 65th mask should panic.
2319        let _overflow = cx.masked();
2320    }
2321
2322    #[test]
2323    fn test_commit_section_completes_under_cancel() {
2324        // Test 9: Cancel after op 1 of 3, all 3 complete + finalizers run.
2325        let cx = Cx::<FullCaps>::new();
2326        cx.transition_to_running();
2327
2328        let ops_completed = Arc::new(AtomicU32::new(0));
2329        let finalizer_ran = Arc::new(AtomicBool::new(false));
2330
2331        let ops = Arc::clone(&ops_completed);
2332        let fin = Arc::clone(&finalizer_ran);
2333
2334        cx.commit_section(
2335            10,
2336            |ctx| {
2337                // Op 1.
2338                assert!(ctx.tick());
2339                ops.fetch_add(1, Ordering::Release);
2340
2341                // Cancel mid-section.
2342                cx.cancel_with_reason(CancelReason::UserInterrupt);
2343
2344                // Op 2: still succeeds because commit section is masked.
2345                assert!(ctx.tick());
2346                ops.fetch_add(1, Ordering::Release);
2347                assert!(
2348                    cx.checkpoint().is_ok(),
2349                    "bead_id={BEAD_ID} masked_during_commit"
2350                );
2351
2352                // Op 3.
2353                assert!(ctx.tick());
2354                ops.fetch_add(1, Ordering::Release);
2355            },
2356            move || {
2357                fin.store(true, Ordering::Release);
2358            },
2359        );
2360
2361        assert_eq!(
2362            ops_completed.load(Ordering::Acquire),
2363            3,
2364            "bead_id={BEAD_ID} all_ops_completed"
2365        );
2366        assert!(
2367            finalizer_ran.load(Ordering::Acquire),
2368            "bead_id={BEAD_ID} finalizer_ran"
2369        );
2370
2371        // After commit section, masking is removed — checkpoint should fail.
2372        assert!(cx.checkpoint().is_err());
2373    }
2374
2375    #[test]
2376    fn test_commit_section_enforces_poll_quota() {
2377        // Test 10: Commit section poll quota is bounded.
2378        let cx = Cx::<FullCaps>::new();
2379        cx.transition_to_running();
2380
2381        let ticks_succeeded = Arc::new(AtomicU32::new(0));
2382        let ts = Arc::clone(&ticks_succeeded);
2383
2384        cx.commit_section(
2385            3,
2386            |ctx| {
2387                assert_eq!(ctx.poll_remaining(), 3);
2388                for _ in 0..5 {
2389                    if ctx.tick() {
2390                        ts.fetch_add(1, Ordering::Release);
2391                    }
2392                }
2393            },
2394            || {},
2395        );
2396
2397        assert_eq!(
2398            ticks_succeeded.load(Ordering::Acquire),
2399            3,
2400            "bead_id={BEAD_ID} poll_quota_enforced"
2401        );
2402    }
2403
2404    #[test]
2405    fn test_cancel_unaware_hot_loop_detected() {
2406        // Test 11: Simulate harness detecting a hot loop that never
2407        // calls checkpoint.
2408        let cx = Cx::<FullCaps>::new();
2409        cx.transition_to_running();
2410
2411        // Harness deadline: if 100 iterations pass without checkpoint,
2412        // the loop is cancel-unaware.
2413        let deadline = 100u32;
2414        let mut iterations_without_checkpoint = 0u32;
2415        let mut detected_unaware = false;
2416
2417        cx.cancel_with_reason(CancelReason::UserInterrupt);
2418
2419        for _i in 0..200u32 {
2420            iterations_without_checkpoint += 1;
2421            if iterations_without_checkpoint >= deadline {
2422                detected_unaware = true;
2423                break;
2424            }
2425            // Bug: no cx.checkpoint() call in the loop body.
2426        }
2427
2428        assert!(
2429            detected_unaware,
2430            "bead_id={BEAD_ID} cancel_unaware_loop_detected"
2431        );
2432
2433        // Contrast: a compliant loop would checkpoint and exit.
2434        let cx2 = Cx::<FullCaps>::new();
2435        cx2.transition_to_running();
2436        cx2.cancel_with_reason(CancelReason::UserInterrupt);
2437        let mut compliant_iters = 0u32;
2438        for _ in 0..200u32 {
2439            if cx2.checkpoint().is_err() {
2440                break;
2441            }
2442            compliant_iters += 1;
2443        }
2444        assert_eq!(
2445            compliant_iters, 0,
2446            "bead_id={BEAD_ID} compliant_loop_exits_immediately"
2447        );
2448    }
2449
2450    #[test]
2451    fn test_write_coordinator_commit_section() {
2452        // Test 12: Simulate WriteCoordinator — cancel mid-publish,
2453        // proof+marker completes atomically via commit section.
2454        let cx = Cx::<FullCaps>::new();
2455        cx.transition_to_running();
2456
2457        let proof_published = Arc::new(AtomicBool::new(false));
2458        let marker_published = Arc::new(AtomicBool::new(false));
2459        let reservation_released = Arc::new(AtomicBool::new(false));
2460
2461        let proof = Arc::clone(&proof_published);
2462        let marker = Arc::clone(&marker_published);
2463        let release = Arc::clone(&reservation_released);
2464
2465        cx.commit_section(
2466            10,
2467            |ctx| {
2468                // Step 1: FCW validation passed, commit_seq allocated.
2469                assert!(ctx.tick());
2470
2471                // Cancel arrives mid-publish.
2472                cx.cancel_with_reason(CancelReason::RegionClose);
2473
2474                // Step 2: Publish proof (must complete).
2475                assert!(ctx.tick());
2476                proof.store(true, Ordering::Release);
2477                // Checkpoint inside commit section succeeds (masked).
2478                assert!(cx.checkpoint().is_ok());
2479
2480                // Step 3: Publish marker (must complete).
2481                assert!(ctx.tick());
2482                marker.store(true, Ordering::Release);
2483            },
2484            move || {
2485                // Finalizer: release reservation.
2486                release.store(true, Ordering::Release);
2487            },
2488        );
2489
2490        assert!(
2491            proof_published.load(Ordering::Acquire),
2492            "bead_id={BEAD_ID} proof_published"
2493        );
2494        assert!(
2495            marker_published.load(Ordering::Acquire),
2496            "bead_id={BEAD_ID} marker_published"
2497        );
2498        assert!(
2499            reservation_released.load(Ordering::Acquire),
2500            "bead_id={BEAD_ID} reservation_released"
2501        );
2502
2503        // After commit section, cancellation is visible.
2504        assert!(cx.checkpoint().is_err());
2505    }
2506
2507    // ===================================================================
2508    // Tracing ID propagation tests (bd-2g5.6)
2509    // ===================================================================
2510
2511    #[test]
2512    fn test_trace_ids_default_to_zero() {
2513        let cx = Cx::<FullCaps>::new();
2514        assert_eq!(cx.trace_id(), 0);
2515        assert_eq!(cx.decision_id(), 0);
2516        assert_eq!(cx.policy_id(), 0);
2517    }
2518
2519    #[test]
2520    fn test_with_trace_context_sets_all_ids() {
2521        let cx = Cx::<FullCaps>::new().with_trace_context(42, 99, 7);
2522        assert_eq!(cx.trace_id(), 42);
2523        assert_eq!(cx.decision_id(), 99);
2524        assert_eq!(cx.policy_id(), 7);
2525    }
2526
2527    #[test]
2528    fn test_with_decision_id_preserves_other_ids() {
2529        let cx = Cx::<FullCaps>::new()
2530            .with_trace_context(10, 20, 30)
2531            .with_decision_id(55);
2532        assert_eq!(cx.trace_id(), 10);
2533        assert_eq!(cx.decision_id(), 55);
2534        assert_eq!(cx.policy_id(), 30);
2535    }
2536
2537    #[test]
2538    fn test_with_policy_id_preserves_other_ids() {
2539        let cx = Cx::<FullCaps>::new()
2540            .with_trace_context(100, 200, 300)
2541            .with_policy_id(88);
2542        assert_eq!(cx.trace_id(), 100);
2543        assert_eq!(cx.decision_id(), 200);
2544        assert_eq!(cx.policy_id(), 88);
2545    }
2546
2547    #[test]
2548    #[allow(clippy::redundant_clone)]
2549    fn test_clone_propagates_trace_ids() {
2550        let cx = Cx::<FullCaps>::new().with_trace_context(1, 2, 3);
2551        let cloned = cx.clone();
2552        assert_eq!(cloned.trace_id(), 1);
2553        assert_eq!(cloned.decision_id(), 2);
2554        assert_eq!(cloned.policy_id(), 3);
2555    }
2556
2557    #[test]
2558    fn test_restrict_propagates_trace_ids() {
2559        let cx = Cx::<FullCaps>::new();
2560        let compute = cx.restrict::<ComputeCaps>();
2561        assert_eq!(compute.trace_id(), 0);
2562        assert_eq!(compute.decision_id(), 0);
2563        assert_eq!(compute.policy_id(), 0);
2564    }
2565
2566    #[test]
2567    fn test_scope_with_budget_propagates_trace_ids() {
2568        let cx = Cx::<FullCaps>::new().with_trace_context(5, 6, 7);
2569        let scoped = cx.scope_with_budget(Budget::MINIMAL);
2570        assert_eq!(scoped.trace_id(), 5);
2571        assert_eq!(scoped.decision_id(), 6);
2572        assert_eq!(scoped.policy_id(), 7);
2573        // Budget should be tightened.
2574        assert_eq!(scoped.budget().poll_quota, Budget::MINIMAL.poll_quota);
2575    }
2576
2577    #[test]
2578    fn test_cleanup_scope_propagates_trace_ids() {
2579        let cx = Cx::<FullCaps>::new().with_trace_context(11, 22, 33);
2580        let cleanup = cx.cleanup_scope();
2581        assert_eq!(cleanup.trace_id(), 11);
2582        assert_eq!(cleanup.decision_id(), 22);
2583        assert_eq!(cleanup.policy_id(), 33);
2584    }
2585
2586    #[test]
2587    fn test_create_child_propagates_trace_ids() {
2588        let parent = Cx::<FullCaps>::new().with_trace_context(50, 60, 70);
2589        let child = parent.create_child();
2590        assert_eq!(child.trace_id(), 50);
2591        assert_eq!(child.decision_id(), 60);
2592        assert_eq!(child.policy_id(), 70);
2593        // Child should have independent cancellation.
2594        parent.cancel();
2595        assert!(parent.is_cancel_requested());
2596        assert!(child.is_cancel_requested()); // Propagated.
2597    }
2598
2599    #[test]
2600    fn test_trace_ids_independent_across_children() {
2601        let parent = Cx::<FullCaps>::new().with_trace_context(1, 2, 3);
2602        let child1 = parent.create_child().with_decision_id(100);
2603        let child2 = parent.create_child().with_decision_id(200);
2604        // Children share trace_id but have different decision_ids.
2605        assert_eq!(child1.trace_id(), 1);
2606        assert_eq!(child2.trace_id(), 1);
2607        assert_eq!(child1.decision_id(), 100);
2608        assert_eq!(child2.decision_id(), 200);
2609        // Parent's decision_id unchanged.
2610        assert_eq!(parent.decision_id(), 2);
2611    }
2612
2613    #[test]
2614    fn test_with_budget_starts_at_zero_trace_ids() {
2615        let cx = Cx::<FullCaps>::with_budget(Budget::MINIMAL);
2616        assert_eq!(cx.trace_id(), 0);
2617        assert_eq!(cx.decision_id(), 0);
2618        assert_eq!(cx.policy_id(), 0);
2619    }
2620}