Skip to main content

asupersync/cx/
cx.rs

1//! The capability context type.
2//!
3//! `Cx` is the token that grants access to runtime capabilities:
4//! - Querying identity (region ID, task ID)
5//! - Checking cancellation status
6//! - Yielding and sleeping
7//! - Tracing
8//!
9//! # Capability Model
10//!
11//! All effectful operations in Asupersync flow through explicit `Cx` tokens.
12//! This design prevents ambient authority and enables:
13//!
14//! - **Effect interception**: Production vs lab runtime can interpret effects differently
15//! - **Cancellation propagation**: Cx carries cancellation signals through the task tree
16//! - **Budget enforcement**: Deadlines and poll quotas flow through Cx
17//! - **Observability**: Tracing and spans are tied to task identity
18//!
19//! # Thread Safety
20//!
21//! `Cx` is `Send + Sync` due to its internal `Arc<RwLock>`. However, the semantic
22//! contract is that a `Cx` is associated with a specific task and should not be
23//! shared across task boundaries. The runtime manages Cx lifetime and ensures
24//! each task receives its own context.
25//!
26//! # Wrapping Cx for Frameworks
27//!
28//! Framework authors (e.g., fastapi_rust) should wrap `Cx` rather than store it directly:
29//!
30//! ```ignore
31//! // CORRECT: Wrap Cx reference, delegate capabilities
32//! pub struct RequestContext<'a> {
33//!     cx: &'a Cx,
34//!     request: &'a Request,
35//!     // framework-specific fields
36//! }
37//!
38//! impl<'a> RequestContext<'a> {
39//!     pub fn check_cancelled(&self) -> bool {
40//!         self.cx.is_cancel_requested()
41//!     }
42//!
43//!     pub fn budget(&self) -> Budget {
44//!         self.cx.budget()
45//!     }
46//! }
47//! ```
48//!
49//! This pattern ensures:
50//! - Cx lifetime is tied to the request scope
51//! - Framework can add domain-specific context
52//! - All capabilities flow through the wrapped Cx
53
54use super::cap;
55use super::macaroon::{MacaroonToken, VerificationContext, VerificationError};
56use super::registry::RegistryHandle;
57use crate::combinator::select::SelectAll;
58use crate::evidence_sink::EvidenceSink;
59#[cfg(feature = "messaging-fabric")]
60use crate::messaging::capability::{
61    FabricCapability, FabricCapabilityGrant, FabricCapabilityGrantError, FabricCapabilityId,
62    FabricCapabilityRegistry, FabricCapabilityScope, GrantedFabricToken, PublishPermit,
63    SubjectFamilyTag, SubscribeToken,
64};
65#[cfg(feature = "messaging-fabric")]
66use crate::messaging::class::DeliveryClass;
67#[cfg(feature = "messaging-fabric")]
68use crate::messaging::ir::CapabilityTokenSchema;
69#[cfg(feature = "messaging-fabric")]
70use crate::messaging::subject::SubjectPattern;
71use crate::observability::{
72    DiagnosticContext, LogCollector, LogEntry, ObservabilityConfig, SpanId,
73};
74use crate::remote::RemoteCap;
75use crate::runtime::blocking_pool::BlockingPoolHandle;
76use crate::runtime::io_driver::IoDriverHandle;
77#[cfg(not(target_arch = "wasm32"))]
78use crate::runtime::io_driver::IoRegistration;
79#[cfg(not(target_arch = "wasm32"))]
80use crate::runtime::reactor::{Interest, Source};
81use crate::runtime::task_handle::JoinError;
82use crate::time::{TimerDriverHandle, timeout};
83use crate::trace::distributed::{LogicalClockHandle, LogicalTime};
84use crate::trace::{TraceBufferHandle, TraceEvent};
85use crate::tracing_compat::{debug, error, info, trace, warn};
86use crate::types::{
87    Budget, CancelKind, CancelReason, CxInner, RegionId, SystemPressure, TaskId, Time,
88};
89use crate::util::{EntropySource, OsEntropy};
90use std::cell::RefCell;
91use std::future::Future;
92use std::marker::PhantomData;
93use std::pin::Pin;
94use std::sync::Arc;
95use std::task::Waker;
96use std::time::Duration;
97
98type NamedFuture<T> = (&'static str, Pin<Box<dyn Future<Output = T> + Send>>);
99type NamedFutures<T> = Vec<NamedFuture<T>>;
100
101/// Get the current wall clock time.
102fn wall_clock_now() -> Time {
103    crate::time::wall_now()
104}
105
106fn noop_waker() -> Waker {
107    Waker::noop().clone()
108}
109
110/// Grouped handle fields shared behind a single `Arc` to reduce per-clone
111/// refcount operations from ~13 to 1 for this bundle.
112#[derive(Debug, Clone)]
113struct CxHandles {
114    io_driver: Option<IoDriverHandle>,
115    io_cap: Option<Arc<dyn crate::io::IoCap>>,
116    timer_driver: Option<TimerDriverHandle>,
117    blocking_pool: Option<BlockingPoolHandle>,
118    entropy: Arc<dyn EntropySource>,
119    logical_clock: LogicalClockHandle,
120    remote_cap: Option<Arc<RemoteCap>>,
121    registry: Option<RegistryHandle>,
122    pressure: Option<Arc<SystemPressure>>,
123    evidence_sink: Option<Arc<dyn EvidenceSink>>,
124    macaroon: Option<Arc<MacaroonToken>>,
125    #[cfg(feature = "messaging-fabric")]
126    fabric_capabilities: Arc<FabricCapabilityRegistry>,
127}
128
129/// The capability context for a task.
130///
131/// `Cx` provides access to runtime capabilities within Asupersync. All effectful
132/// operations flow through `Cx`, ensuring explicit capability security with no
133/// ambient authority.
134///
135/// # Overview
136///
137/// A `Cx` instance is provided to each task by the runtime. It grants access to:
138///
139/// - **Identity**: Query the current region and task IDs
140/// - **Budget**: Check remaining time/poll quotas
141/// - **Cancellation**: Observe and respond to cancellation requests
142/// - **Tracing**: Emit trace events for observability
143///
144/// # Cloning
145///
146/// `Cx` is cheaply clonable (it wraps an `Arc`). Clones share the same
147/// underlying state, so cancellation signals and budget updates are visible
148/// to all clones.
149#[derive(Debug)]
150pub struct Cx<Caps = cap::All> {
151    pub(crate) inner: Arc<parking_lot::RwLock<CxInner>>,
152    observability: Arc<parking_lot::RwLock<ObservabilityState>>,
153    handles: Arc<CxHandles>,
154    // Use fn() -> Caps instead of just Caps to ensure Send+Sync regardless of Caps
155    _caps: PhantomData<fn() -> Caps>,
156}
157
158// Manual Clone impl to avoid requiring `Caps: Clone` (Caps is just a phantom marker type).
159// Only 3 Arc increments instead of ~15.
160impl<Caps> Clone for Cx<Caps> {
161    #[inline]
162    fn clone(&self) -> Self {
163        Self {
164            inner: Arc::clone(&self.inner),
165            observability: Arc::clone(&self.observability),
166            handles: Arc::clone(&self.handles),
167            _caps: PhantomData,
168        }
169    }
170}
171
172/// Internal observability state shared by `Cx` clones.
173#[derive(Debug, Clone)]
174pub struct ObservabilityState {
175    collector: Option<LogCollector>,
176    context: DiagnosticContext,
177    trace: Option<TraceBufferHandle>,
178    include_timestamps: bool,
179}
180
181impl ObservabilityState {
182    fn new(region: RegionId, task: TaskId) -> Self {
183        let context = DiagnosticContext::new()
184            .with_task_id(task)
185            .with_region_id(region)
186            .with_span_id(SpanId::new());
187        Self {
188            collector: None,
189            context,
190            trace: None,
191            include_timestamps: true,
192        }
193    }
194
195    pub(crate) fn new_with_config(
196        region: RegionId,
197        task: TaskId,
198        config: &ObservabilityConfig,
199        collector: Option<LogCollector>,
200    ) -> Self {
201        let context = config
202            .create_diagnostic_context()
203            .with_task_id(task)
204            .with_region_id(region)
205            .with_span_id(SpanId::new());
206        Self {
207            collector,
208            context,
209            trace: None,
210            include_timestamps: config.include_timestamps(),
211        }
212    }
213
214    fn derive_child(&self, region: RegionId, task: TaskId) -> Self {
215        let mut context = self.context.clone().fork();
216        context = context.with_task_id(task).with_region_id(region);
217        Self {
218            collector: self.collector.clone(),
219            context,
220            trace: self.trace.clone(),
221            include_timestamps: self.include_timestamps,
222        }
223    }
224}
225
226/// Guard that restores the cancellation mask on drop.
227struct MaskGuard<'a> {
228    inner: &'a Arc<parking_lot::RwLock<CxInner>>,
229}
230
231impl Drop for MaskGuard<'_> {
232    /// Implements `inv.cancel.mask_monotone` (#12): mask_depth only decreases
233    /// during cancel processing. `saturating_sub` ensures no underflow.
234    fn drop(&mut self) {
235        let mut inner = self.inner.write();
236        inner.mask_depth = inner.mask_depth.saturating_sub(1);
237    }
238}
239
240type FullCx = Cx<cap::All>;
241
242thread_local! {
243    static CURRENT_CX: RefCell<Option<FullCx>> = const { RefCell::new(None) };
244}
245
246/// Guard that restores the previous Cx on drop.
247#[cfg_attr(feature = "test-internals", visibility::make(pub))]
248pub(crate) struct CurrentCxGuard {
249    prev: Option<FullCx>,
250    _not_send: std::marker::PhantomData<*mut ()>,
251}
252
253impl Drop for CurrentCxGuard {
254    fn drop(&mut self) {
255        let prev = self.prev.take();
256        let _ = CURRENT_CX.try_with(|slot| {
257            *slot.borrow_mut() = prev;
258        });
259    }
260}
261
262impl FullCx {
263    /// Returns the current task context, if one is set.
264    ///
265    /// This is set by the runtime while polling a task.
266    ///
267    /// Returns `None` when no task context is installed and also during
268    /// thread-local teardown, where the ambient context is no longer
269    /// accessible.
270    #[inline]
271    #[must_use]
272    pub fn current() -> Option<Self> {
273        CURRENT_CX
274            .try_with(|slot| slot.borrow().clone())
275            .unwrap_or(None)
276    }
277
278    /// Sets the current task context for the duration of the guard.
279    #[inline]
280    #[must_use]
281    #[cfg_attr(feature = "test-internals", visibility::make(pub))]
282    pub(crate) fn set_current(cx: Option<Self>) -> CurrentCxGuard {
283        let prev = CURRENT_CX.with(|slot| {
284            let mut guard = slot.borrow_mut();
285            let prev = guard.take();
286            *guard = cx;
287            prev
288        });
289        CurrentCxGuard {
290            prev,
291            _not_send: std::marker::PhantomData,
292        }
293    }
294}
295
296impl<Caps> Cx<Caps> {
297    /// Creates a new capability context (internal use).
298    #[must_use]
299    #[allow(dead_code)]
300    #[cfg_attr(feature = "test-internals", visibility::make(pub))]
301    pub(crate) fn new(region: RegionId, task: TaskId, budget: Budget) -> Self {
302        Self::new_with_observability(region, task, budget, None, None, None)
303    }
304
305    /// Creates a new capability context from shared state (internal use).
306    #[allow(dead_code)] // Internal construction path for runtime integration
307    pub(crate) fn from_inner(inner: Arc<parking_lot::RwLock<CxInner>>) -> Self {
308        let (region, task) = {
309            let guard = inner.read();
310            (guard.region, guard.task)
311        };
312        Self {
313            inner,
314            observability: Arc::new(parking_lot::RwLock::new(ObservabilityState::new(
315                region, task,
316            ))),
317            handles: Arc::new(CxHandles {
318                io_driver: None,
319                io_cap: None,
320                timer_driver: None,
321                blocking_pool: None,
322                entropy: Arc::new(OsEntropy),
323                logical_clock: LogicalClockHandle::default(),
324                remote_cap: None,
325                registry: None,
326                pressure: None,
327                evidence_sink: None,
328                macaroon: None,
329                #[cfg(feature = "messaging-fabric")]
330                fabric_capabilities: Arc::new(FabricCapabilityRegistry::default()),
331            }),
332            _caps: PhantomData,
333        }
334    }
335
336    /// Creates a new capability context with optional observability state (internal use).
337    #[must_use]
338    #[cfg_attr(feature = "test-internals", visibility::make(pub))]
339    pub(crate) fn new_with_observability(
340        region: RegionId,
341        task: TaskId,
342        budget: Budget,
343        observability: Option<ObservabilityState>,
344        io_driver: Option<IoDriverHandle>,
345        entropy: Option<Arc<dyn EntropySource>>,
346    ) -> Self {
347        Self::new_with_io(
348            region,
349            task,
350            budget,
351            observability,
352            io_driver,
353            None,
354            entropy,
355        )
356    }
357
358    /// Creates a new capability context with optional I/O capability (internal use).
359    #[must_use]
360    #[cfg_attr(feature = "test-internals", visibility::make(pub))]
361    pub(crate) fn new_with_io(
362        region: RegionId,
363        task: TaskId,
364        budget: Budget,
365        observability: Option<ObservabilityState>,
366        io_driver: Option<IoDriverHandle>,
367        io_cap: Option<Arc<dyn crate::io::IoCap>>,
368        entropy: Option<Arc<dyn EntropySource>>,
369    ) -> Self {
370        Self::new_with_drivers(
371            region,
372            task,
373            budget,
374            observability,
375            io_driver,
376            io_cap,
377            None,
378            entropy,
379        )
380    }
381
382    /// Creates a new capability context with optional I/O and timer drivers (internal use).
383    #[must_use]
384    #[cfg_attr(feature = "test-internals", visibility::make(pub))]
385    #[allow(clippy::too_many_arguments)]
386    pub(crate) fn new_with_drivers(
387        region: RegionId,
388        task: TaskId,
389        budget: Budget,
390        observability: Option<ObservabilityState>,
391        io_driver: Option<IoDriverHandle>,
392        io_cap: Option<Arc<dyn crate::io::IoCap>>,
393        timer_driver: Option<TimerDriverHandle>,
394        entropy: Option<Arc<dyn EntropySource>>,
395    ) -> Self {
396        let inner = Arc::new(parking_lot::RwLock::new(CxInner::new(region, task, budget)));
397        let observability_state =
398            observability.unwrap_or_else(|| ObservabilityState::new(region, task));
399        let observability = Arc::new(parking_lot::RwLock::new(observability_state));
400        let entropy = entropy.unwrap_or_else(|| Arc::new(OsEntropy));
401
402        debug!(
403            task_id = ?task,
404            region_id = ?region,
405            budget_deadline = ?budget.deadline,
406            budget_poll_quota = budget.poll_quota,
407            budget_cost_quota = ?budget.cost_quota,
408            budget_priority = budget.priority,
409            budget_source = "cx_new",
410            "budget initialized for context"
411        );
412
413        Self {
414            inner,
415            observability,
416            handles: Arc::new(CxHandles {
417                io_driver,
418                io_cap,
419                timer_driver,
420                blocking_pool: None,
421                entropy,
422                logical_clock: LogicalClockHandle::default(),
423                remote_cap: None,
424                registry: None,
425                pressure: None,
426                evidence_sink: None,
427                macaroon: None,
428                #[cfg(feature = "messaging-fabric")]
429                fabric_capabilities: Arc::new(FabricCapabilityRegistry::default()),
430            }),
431            _caps: PhantomData,
432        }
433    }
434
435    /// Returns a cloned handle to the I/O driver, if present.
436    #[inline]
437    #[must_use]
438    pub(crate) fn io_driver_handle(&self) -> Option<IoDriverHandle> {
439        self.handles.io_driver.clone()
440    }
441
442    /// Returns a cloned handle to the blocking pool, if present.
443    #[inline]
444    #[must_use]
445    pub(crate) fn blocking_pool_handle(&self) -> Option<BlockingPoolHandle> {
446        self.handles.blocking_pool.clone()
447    }
448
449    /// Attaches a blocking pool handle to this context.
450    #[must_use]
451    pub(crate) fn with_blocking_pool_handle(mut self, handle: Option<BlockingPoolHandle>) -> Self {
452        Arc::make_mut(&mut self.handles).blocking_pool = handle;
453        self
454    }
455
456    /// Attaches a logical clock handle to this context.
457    #[must_use]
458    pub(crate) fn with_logical_clock(mut self, clock: LogicalClockHandle) -> Self {
459        Arc::make_mut(&mut self.handles).logical_clock = clock;
460        self
461    }
462
463    /// Re-type this context to a narrower capability set.
464    ///
465    /// This is a zero-cost type-level restriction. It does not change runtime behavior,
466    /// but removes access to gated APIs at compile time.
467    #[must_use]
468    pub fn restrict<NewCaps>(&self) -> Cx<NewCaps>
469    where
470        NewCaps: cap::SubsetOf<Caps>,
471    {
472        self.retype()
473    }
474
475    /// Internal re-typing helper (no subset enforcement).
476    #[inline]
477    #[must_use]
478    pub(crate) fn retype<NewCaps>(&self) -> Cx<NewCaps> {
479        Cx {
480            inner: self.inner.clone(),
481            observability: self.observability.clone(),
482            handles: self.handles.clone(),
483            _caps: PhantomData,
484        }
485    }
486
487    /// Attaches a registry handle to this context.
488    ///
489    /// This is how Spork-style naming is made capability-scoped (no globals):
490    /// tasks only see a registry if their `Cx` carries one.
491    #[must_use]
492    pub(crate) fn with_registry_handle(mut self, registry: Option<RegistryHandle>) -> Self {
493        Arc::make_mut(&mut self.handles).registry = registry;
494        self
495    }
496
497    /// Attaches a remote capability to this context.
498    ///
499    /// This allows the context to perform remote operations like `spawn_remote`.
500    #[must_use]
501    pub fn with_remote_cap(mut self, cap: RemoteCap) -> Self {
502        Arc::make_mut(&mut self.handles).remote_cap = Some(Arc::new(cap));
503        self
504    }
505
506    /// Attach a system pressure handle for compute budget propagation.
507    ///
508    /// The handle is shared via `Arc` so all clones observe the same pressure
509    /// state. A monitor thread can call [`SystemPressure::set_headroom`] to
510    /// update the value, and any code with `&Cx` can read it lock-free.
511    #[must_use]
512    pub fn with_pressure(mut self, pressure: Arc<SystemPressure>) -> Self {
513        Arc::make_mut(&mut self.handles).pressure = Some(pressure);
514        self
515    }
516
517    /// Read the current system pressure, if attached.
518    ///
519    /// Returns `None` if no pressure handle was attached to this context.
520    #[must_use]
521    #[inline]
522    pub fn pressure(&self) -> Option<&SystemPressure> {
523        self.handles.pressure.as_deref()
524    }
525
526    /// Returns a cloned handle to the configured system pressure source, if any.
527    ///
528    /// This is `pub(crate)` so spawned child tasks can inherit the same shared
529    /// pressure state as their parent. Some build slices currently exercise
530    /// the inheritance path only behind optional runtime wiring/tests.
531    #[allow(dead_code)]
532    #[must_use]
533    pub(crate) fn pressure_handle(&self) -> Option<Arc<SystemPressure>> {
534        self.handles.pressure.clone()
535    }
536
537    /// Returns a cloned handle to the configured remote capability, if any.
538    ///
539    /// This is `pub(crate)` so internal wiring (e.g. spawning child tasks) can
540    /// inherit remote capability without requiring `Caps: HasRemote` bounds.
541    #[inline]
542    #[must_use]
543    pub(crate) fn remote_cap_handle(&self) -> Option<Arc<RemoteCap>> {
544        self.handles.remote_cap.clone()
545    }
546
547    /// Attaches an already-shared remote capability handle to this context.
548    ///
549    /// This is the internal counterpart to [`Cx::with_remote_cap`] used for
550    /// capability propagation to child contexts.
551    #[must_use]
552    pub(crate) fn with_remote_cap_handle(mut self, cap: Option<Arc<RemoteCap>>) -> Self {
553        Arc::make_mut(&mut self.handles).remote_cap = cap;
554        self
555    }
556
557    /// Returns the registry capability handle, if attached.
558    #[inline]
559    #[must_use]
560    pub fn registry_handle(&self) -> Option<RegistryHandle> {
561        self.handles.registry.clone()
562    }
563
564    /// Returns true if a registry handle is attached.
565    #[inline]
566    #[must_use]
567    pub fn has_registry(&self) -> bool {
568        self.handles.registry.is_some()
569    }
570
571    /// Grant a shared FABRIC capability for runtime and distributed-path checks.
572    #[cfg(feature = "messaging-fabric")]
573    pub fn grant_fabric_capability(
574        &self,
575        capability: FabricCapability,
576    ) -> Result<FabricCapabilityGrant, FabricCapabilityGrantError> {
577        self.handles.fabric_capabilities.grant(capability)
578    }
579
580    /// Return the current FABRIC capability grants attached to this context.
581    #[cfg(feature = "messaging-fabric")]
582    #[must_use]
583    pub fn fabric_capabilities(&self) -> Vec<FabricCapabilityGrant> {
584        self.handles.fabric_capabilities.snapshot()
585    }
586
587    /// Grant a publish capability and mint the corresponding linear token.
588    #[cfg(feature = "messaging-fabric")]
589    pub fn grant_publish_capability<S: SubjectFamilyTag>(
590        &self,
591        subject: SubjectPattern,
592        schema: &CapabilityTokenSchema,
593        delivery_class: DeliveryClass,
594    ) -> Result<GrantedFabricToken<PublishPermit<S>>, FabricCapabilityGrantError> {
595        let token = PublishPermit::<S>::authorize(schema, delivery_class)?;
596        let grant = self.grant_fabric_capability(FabricCapability::Publish { subject })?;
597        Ok(GrantedFabricToken::new(grant, token))
598    }
599
600    /// Grant a subscription capability and mint the corresponding linear token.
601    #[cfg(feature = "messaging-fabric")]
602    pub fn grant_subscribe_capability<S: SubjectFamilyTag>(
603        &self,
604        subject: SubjectPattern,
605        schema: &CapabilityTokenSchema,
606        delivery_class: DeliveryClass,
607    ) -> Result<GrantedFabricToken<SubscribeToken<S>>, FabricCapabilityGrantError> {
608        let token = SubscribeToken::<S>::authorize(schema, delivery_class)?;
609        let grant = self.grant_fabric_capability(FabricCapability::Subscribe { subject })?;
610        Ok(GrantedFabricToken::new(grant, token))
611    }
612
613    /// Return true when the requested FABRIC capability is currently attached.
614    #[cfg(feature = "messaging-fabric")]
615    #[must_use]
616    pub fn check_fabric_capability(&self, capability: &FabricCapability) -> bool {
617        self.handles.fabric_capabilities.check(capability)
618    }
619
620    /// Revoke one FABRIC capability by its stable grant identifier.
621    #[cfg(feature = "messaging-fabric")]
622    #[must_use]
623    pub fn revoke_fabric_capability(&self, id: FabricCapabilityId) -> Option<FabricCapability> {
624        self.handles.fabric_capabilities.revoke_by_id(id)
625    }
626
627    /// Revoke every FABRIC capability whose subject space overlaps `subject`.
628    #[cfg(feature = "messaging-fabric")]
629    #[must_use]
630    pub fn revoke_fabric_capability_by_subject(&self, subject: &SubjectPattern) -> usize {
631        self.handles.fabric_capabilities.revoke_by_subject(subject)
632    }
633
634    /// Revoke every FABRIC capability in the provided coarse scope.
635    #[cfg(feature = "messaging-fabric")]
636    #[must_use]
637    pub fn revoke_fabric_capability_scope(&self, scope: FabricCapabilityScope) -> usize {
638        self.handles.fabric_capabilities.revoke_scope(scope)
639    }
640
641    /// Attaches an evidence sink for runtime decision tracing.
642    #[must_use]
643    pub fn with_evidence_sink(mut self, sink: Option<Arc<dyn EvidenceSink>>) -> Self {
644        Arc::make_mut(&mut self.handles).evidence_sink = sink;
645        self
646    }
647
648    /// Returns a cloned handle to the evidence sink, if attached.
649    #[inline]
650    #[must_use]
651    pub(crate) fn evidence_sink_handle(&self) -> Option<Arc<dyn EvidenceSink>> {
652        self.handles.evidence_sink.clone()
653    }
654
655    /// Emit an evidence entry to the attached sink, if any.
656    ///
657    /// This is a no-op if no evidence sink is configured. Errors during
658    /// emission are handled internally by the sink (logged and dropped).
659    pub fn emit_evidence(&self, entry: &franken_evidence::EvidenceLedger) {
660        if let Some(ref sink) = self.handles.evidence_sink {
661            sink.emit(entry);
662        }
663    }
664
665    // -----------------------------------------------------------------
666    // Macaroon-based capability attenuation (bd-2lqyk.2)
667    // -----------------------------------------------------------------
668
669    /// Attaches a Macaroon capability token to this context.
670    ///
671    /// The token is stored in an `Arc` for cheap cloning. Child contexts
672    /// created via [`restrict`](Self::restrict) or [`retype`](Self::retype)
673    /// inherit the macaroon.
674    #[must_use]
675    pub fn with_macaroon(mut self, token: MacaroonToken) -> Self {
676        Arc::make_mut(&mut self.handles).macaroon = Some(Arc::new(token));
677        self
678    }
679
680    /// Attaches a pre-shared Macaroon handle to this context (internal use).
681    #[must_use]
682    #[allow(dead_code)] // Macaroon integration API
683    pub(crate) fn with_macaroon_handle(mut self, handle: Option<Arc<MacaroonToken>>) -> Self {
684        Arc::make_mut(&mut self.handles).macaroon = handle;
685        self
686    }
687
688    /// Returns a reference to the attached Macaroon token, if any.
689    #[inline]
690    #[must_use]
691    pub fn macaroon(&self) -> Option<&MacaroonToken> {
692        self.handles.macaroon.as_deref()
693    }
694
695    /// Returns a cloned `Arc` handle to the macaroon, if any.
696    #[inline]
697    #[must_use]
698    #[allow(dead_code)] // Macaroon integration API
699    pub(crate) fn macaroon_handle(&self) -> Option<Arc<MacaroonToken>> {
700        self.handles.macaroon.clone()
701    }
702
703    /// Attenuate the capability token by adding a caveat.
704    ///
705    /// Returns a new `Cx` with an attenuated macaroon. The original
706    /// context is unchanged. This does **not** require the root key —
707    /// any holder can add caveats (but nobody can remove them).
708    ///
709    /// Returns `None` if no macaroon is attached.
710    #[must_use]
711    pub fn attenuate(&self, predicate: super::macaroon::CaveatPredicate) -> Option<Self> {
712        let token = self.handles.macaroon.as_ref()?;
713        let attenuated = MacaroonToken::clone(token).add_caveat(predicate);
714
715        info!(
716            token_id = %attenuated.identifier(),
717            caveat_count = attenuated.caveat_count(),
718            "capability attenuated"
719        );
720
721        let mut cx = self.clone();
722        Arc::make_mut(&mut cx.handles).macaroon = Some(Arc::new(attenuated));
723        Some(cx)
724    }
725
726    /// Attenuate with a time limit: the token expires at `deadline_ms`.
727    ///
728    /// Convenience wrapper around [`attenuate`](Self::attenuate) with
729    /// [`CaveatPredicate::TimeBefore`].
730    ///
731    /// Returns `None` if no macaroon is attached.
732    #[must_use]
733    pub fn attenuate_time_limit(&self, deadline_ms: u64) -> Option<Self> {
734        self.attenuate(super::macaroon::CaveatPredicate::TimeBefore(deadline_ms))
735    }
736
737    /// Attenuate with a resource scope restriction.
738    ///
739    /// The `pattern` uses simple glob syntax: `*` matches any single segment,
740    /// `**` matches any number of segments.
741    ///
742    /// Returns `None` if no macaroon is attached.
743    #[must_use]
744    pub fn attenuate_scope(&self, pattern: impl Into<String>) -> Option<Self> {
745        self.attenuate(super::macaroon::CaveatPredicate::ResourceScope(
746            pattern.into(),
747        ))
748    }
749
750    /// Attenuate with a windowed rate limit.
751    ///
752    /// Restricts the token to at most `max_count` uses per `window_secs`.
753    /// The caller is responsible for tracking the sliding window and
754    /// providing `window_use_count` in [`VerificationContext`].
755    ///
756    /// Returns `None` if no macaroon is attached.
757    #[must_use]
758    pub fn attenuate_rate_limit(&self, max_count: u32, window_secs: u32) -> Option<Self> {
759        self.attenuate(super::macaroon::CaveatPredicate::RateLimit {
760            max_count,
761            window_secs,
762        })
763    }
764
765    /// Attenuate with the Cx's current budget deadline.
766    ///
767    /// If the Cx has a finite deadline, adds a `TimeBefore` caveat using it.
768    /// If no deadline is set, the macaroon is returned unchanged.
769    ///
770    /// Returns `None` if no macaroon is attached.
771    #[must_use]
772    pub fn attenuate_from_budget(&self) -> Option<Self> {
773        let _ = self.handles.macaroon.as_ref()?;
774        let budget = self.budget();
775        budget.deadline.map_or_else(
776            || Some(self.clone()),
777            |d| self.attenuate_time_limit(d.as_millis()),
778        )
779    }
780
781    /// Verify the attached capability token against a root key and context.
782    ///
783    /// Checks the HMAC chain integrity and evaluates all caveat predicates.
784    /// Emits evidence to the attached sink on both success and failure.
785    ///
786    /// Returns `Ok(())` if the token is valid and all caveats pass.
787    ///
788    /// # Errors
789    ///
790    /// Returns `VerificationError` if verification fails (bad signature or
791    /// failed caveat). Returns `Err(VerificationError::InvalidSignature)` if
792    /// no macaroon is attached.
793    pub fn verify_capability(
794        &self,
795        root_key: &crate::security::key::AuthKey,
796        context: &VerificationContext,
797    ) -> Result<(), VerificationError> {
798        let Some(token) = self.handles.macaroon.as_ref() else {
799            // Emit evidence for the no-macaroon rejection before returning.
800            warn!(
801                task_id = ?self.task_id(),
802                region_id = ?self.region_id(),
803                "capability verification failed: no macaroon attached"
804            );
805            return Err(VerificationError::InvalidSignature);
806        };
807
808        let result = token.verify(root_key, context);
809
810        // Emit evidence for the verification decision.
811        self.emit_macaroon_evidence(token, &result);
812
813        match &result {
814            Ok(()) => {
815                info!(
816                    token_id = %token.identifier(),
817                    caveats_checked = token.caveat_count(),
818                    "macaroon verified successfully"
819                );
820            }
821            Err(VerificationError::InvalidSignature) => {
822                error!(
823                    token_id = %token.identifier(),
824                    "HMAC chain integrity violation — possible tampering"
825                );
826            }
827            #[allow(unused_variables)]
828            Err(VerificationError::CaveatFailed {
829                index,
830                predicate,
831                reason,
832            }) => {
833                info!(
834                    token_id = %token.identifier(),
835                    failed_at_caveat = index,
836                    predicate = %predicate,
837                    reason = %reason,
838                    "macaroon verification failed"
839                );
840            }
841            #[allow(unused_variables)]
842            Err(VerificationError::MissingDischarge { index, identifier }) => {
843                info!(
844                    token_id = %token.identifier(),
845                    failed_at_caveat = index,
846                    discharge_id = %identifier,
847                    "missing discharge macaroon"
848                );
849            }
850            #[allow(unused_variables)]
851            Err(VerificationError::DischargeInvalid { index, identifier }) => {
852                info!(
853                    token_id = %token.identifier(),
854                    failed_at_caveat = index,
855                    discharge_id = %identifier,
856                    "discharge macaroon verification failed"
857                );
858            }
859            #[allow(unused_variables)]
860            Err(VerificationError::DischargeChainTooDeep { depth }) => {
861                info!(
862                    token_id = %token.identifier(),
863                    depth = %depth,
864                    "discharge macaroon chain too deep"
865                );
866            }
867        }
868
869        result
870    }
871
872    /// Emit evidence for a macaroon verification decision.
873    fn emit_macaroon_evidence(
874        &self,
875        token: &MacaroonToken,
876        result: &Result<(), VerificationError>,
877    ) {
878        let Some(ref sink) = self.handles.evidence_sink else {
879            return;
880        };
881
882        let now_ms = wall_clock_now().as_millis();
883
884        let (action, loss) = match result {
885            Ok(()) => ("verify_success".to_string(), 0.0),
886            Err(VerificationError::InvalidSignature) => ("verify_fail_signature".to_string(), 1.0),
887            Err(VerificationError::CaveatFailed { index, .. }) => {
888                (format!("verify_fail_caveat_{index}"), 0.5)
889            }
890            Err(VerificationError::MissingDischarge { index, .. }) => {
891                (format!("verify_fail_missing_discharge_{index}"), 0.8)
892            }
893            Err(VerificationError::DischargeInvalid { index, .. }) => {
894                (format!("verify_fail_discharge_invalid_{index}"), 0.9)
895            }
896            Err(VerificationError::DischargeChainTooDeep { depth }) => {
897                (format!("verify_fail_discharge_chain_too_deep_{depth}"), 1.0)
898            }
899        };
900
901        let entry = franken_evidence::EvidenceLedger {
902            ts_unix_ms: now_ms,
903            component: "cx_macaroon".to_string(),
904            action: action.clone(),
905            posterior: vec![1.0],
906            expected_loss_by_action: std::collections::BTreeMap::from([(action, loss)]),
907            chosen_expected_loss: loss,
908            calibration_score: 1.0,
909            fallback_active: false,
910            #[allow(clippy::cast_precision_loss)]
911            top_features: vec![("caveat_count".to_string(), token.caveat_count() as f64)],
912        };
913        sink.emit(&entry);
914    }
915
916    /// Returns the current logical time without ticking.
917    #[inline]
918    #[must_use]
919    pub fn logical_now(&self) -> LogicalTime {
920        self.handles.logical_clock.now()
921    }
922
923    /// Returns a clone of the task's logical clock handle.
924    #[inline]
925    #[must_use]
926    pub(crate) fn logical_clock_handle(&self) -> LogicalClockHandle {
927        self.handles.logical_clock.clone()
928    }
929
930    /// Records a local logical event and returns the updated time.
931    #[inline]
932    #[must_use]
933    pub fn logical_tick(&self) -> LogicalTime {
934        self.handles.logical_clock.tick()
935    }
936
937    /// Merges a received logical time and returns the updated time.
938    #[inline]
939    #[must_use]
940    pub fn logical_receive(&self, sender_time: &LogicalTime) -> LogicalTime {
941        self.handles.logical_clock.receive(sender_time)
942    }
943
944    /// Returns a cloned handle to the timer driver, if present.
945    ///
946    /// The timer driver provides access to timer registration for async time
947    /// operations like `sleep`, `timeout`, and `interval`. When present, these
948    /// operations use the runtime's timer wheel instead of spawning threads.
949    ///
950    /// # Example
951    ///
952    /// ```ignore
953    /// if let Some(timer) = Cx::current().and_then(|cx| cx.timer_driver()) {
954    ///     let deadline = timer.now() + Duration::from_secs(1);
955    ///     let handle = timer.register(deadline, waker);
956    /// }
957    /// ```
958    #[inline]
959    #[must_use]
960    pub fn timer_driver(&self) -> Option<TimerDriverHandle>
961    where
962        Caps: cap::HasTime,
963    {
964        self.handles.timer_driver.clone()
965    }
966
967    /// Returns true if a timer driver is available.
968    ///
969    /// When true, time operations can use the runtime's timer wheel.
970    /// When false, time operations fall back to OS-level timing.
971    #[inline]
972    #[must_use]
973    pub fn has_timer(&self) -> bool
974    where
975        Caps: cap::HasTime,
976    {
977        self.handles.timer_driver.is_some()
978    }
979
980    /// Returns the I/O capability, if one is configured.
981    ///
982    /// The I/O capability provides access to async I/O operations. If no capability
983    /// is configured, this returns `None` and I/O operations are not available.
984    ///
985    /// # Capability Model
986    ///
987    /// Asupersync uses explicit capability-based I/O:
988    /// - Production runtime configures real I/O capability (via reactor)
989    /// - Lab runtime can configure virtual I/O for deterministic testing
990    /// - Code that needs I/O must explicitly check for and use this capability
991    ///
992    /// # Example
993    ///
994    /// ```ignore
995    /// async fn read_data(cx: &Cx) -> io::Result<Vec<u8>> {
996    ///     let io = cx.io().ok_or_else(|| {
997    ///         io::Error::new(io::ErrorKind::Unsupported, "I/O not available")
998    ///     })?;
999    ///
1000    ///     // Use io capability...
1001    ///     Ok(vec![])
1002    /// }
1003    /// ```
1004    #[inline]
1005    #[must_use]
1006    pub fn io(&self) -> Option<&dyn crate::io::IoCap>
1007    where
1008        Caps: cap::HasIo,
1009    {
1010        self.handles.io_cap.as_ref().map(AsRef::as_ref)
1011    }
1012
1013    /// Returns a cloned handle to the configured I/O capability, if any.
1014    ///
1015    /// This is `pub(crate)` so internal wiring can preserve I/O authority when
1016    /// deriving child task contexts without requiring `Caps: HasIo` bounds.
1017    /// Some build slices currently exercise the inheritance path only behind
1018    /// optional runtime wiring/tests.
1019    #[inline]
1020    #[allow(dead_code)]
1021    #[must_use]
1022    pub(crate) fn io_cap_handle(&self) -> Option<Arc<dyn crate::io::IoCap>> {
1023        self.handles.io_cap.clone()
1024    }
1025
1026    /// Returns true if I/O capability is available.
1027    ///
1028    /// Convenience method to check if I/O operations can be performed.
1029    #[inline]
1030    #[must_use]
1031    pub fn has_io(&self) -> bool
1032    where
1033        Caps: cap::HasIo,
1034    {
1035        self.handles.io_cap.is_some()
1036    }
1037
1038    /// Returns the fetch adapter capability, if one is configured.
1039    ///
1040    /// This is the browser-facing network authority surface. When present,
1041    /// requests must pass explicit origin/method/credential policy checks
1042    /// before any host fetch operation is attempted.
1043    #[inline]
1044    #[must_use]
1045    pub fn fetch_cap(&self) -> Option<&dyn crate::io::FetchIoCap>
1046    where
1047        Caps: cap::HasIo,
1048    {
1049        self.handles.io_cap.as_ref().and_then(|cap| cap.fetch_cap())
1050    }
1051
1052    /// Returns true if a fetch adapter capability is available.
1053    #[inline]
1054    #[must_use]
1055    pub fn has_fetch_cap(&self) -> bool
1056    where
1057        Caps: cap::HasIo,
1058    {
1059        self.fetch_cap().is_some()
1060    }
1061
1062    /// Returns the remote capability, if one is configured.
1063    ///
1064    /// The remote capability authorizes spawning tasks on remote nodes.
1065    /// Without this capability, [`spawn_remote`](crate::remote::spawn_remote)
1066    /// returns [`RemoteError::NoCapability`](crate::remote::RemoteError::NoCapability).
1067    ///
1068    /// # Capability Model
1069    ///
1070    /// Remote execution is an explicit capability:
1071    /// - Production runtime configures remote capability with transport config
1072    /// - Lab runtime can configure it for deterministic distributed testing
1073    /// - Code that needs remote spawning must check for this capability
1074    #[inline]
1075    #[must_use]
1076    pub fn remote(&self) -> Option<&RemoteCap>
1077    where
1078        Caps: cap::HasRemote,
1079    {
1080        self.handles.remote_cap.as_ref().map(AsRef::as_ref)
1081    }
1082
1083    /// Returns true if the remote capability is available.
1084    ///
1085    /// Convenience method to check if remote task operations can be performed.
1086    #[inline]
1087    #[must_use]
1088    pub fn has_remote(&self) -> bool
1089    where
1090        Caps: cap::HasRemote,
1091    {
1092        self.handles.remote_cap.is_some()
1093    }
1094
1095    /// Registers an I/O source with the reactor for the given interest.
1096    ///
1097    /// This method registers a source (such as a socket or file descriptor) with
1098    /// the reactor so that the task can be woken when I/O operations are ready.
1099    ///
1100    /// # Arguments
1101    ///
1102    /// * `source` - The I/O source to register (must implement [`Source`])
1103    /// * `interest` - The I/O operations to monitor for (read, write, or both)
1104    ///
1105    /// # Returns
1106    ///
1107    /// Returns a [`IoRegistration`] handle that represents the active registration.
1108    /// When dropped, the registration is automatically deregistered from the reactor.
1109    ///
1110    /// # Errors
1111    ///
1112    /// Returns an error if:
1113    /// - No reactor is available (reactor not initialized or not present)
1114    /// - The reactor fails to register the source
1115    ///
1116    #[cfg(unix)]
1117    pub fn register_io<S: Source>(
1118        &self,
1119        source: &S,
1120        interest: Interest,
1121    ) -> std::io::Result<IoRegistration>
1122    where
1123        Caps: cap::HasIo,
1124    {
1125        let Some(driver) = self.io_driver_handle() else {
1126            return Err(std::io::Error::new(
1127                std::io::ErrorKind::NotConnected,
1128                "I/O driver not available",
1129            ));
1130        };
1131        driver.register(source, interest, noop_waker())
1132    }
1133
1134    /// Returns the current region ID.
1135    ///
1136    /// The region ID identifies the structured concurrency scope that owns this task.
1137    /// Useful for debugging and for associating task-specific data with region boundaries.
1138    ///
1139    /// # Example
1140    ///
1141    /// ```ignore
1142    /// fn log_context(cx: &Cx) {
1143    ///     println!("Running in region: {:?}", cx.region_id());
1144    /// }
1145    /// ```
1146    #[inline]
1147    #[must_use]
1148    pub fn region_id(&self) -> RegionId {
1149        self.inner.read().region
1150    }
1151
1152    /// Returns the current task ID.
1153    ///
1154    /// The task ID uniquely identifies this task within the runtime. Useful for
1155    /// debugging, tracing, and correlating log entries.
1156    ///
1157    /// # Example
1158    ///
1159    /// ```ignore
1160    /// fn log_task(cx: &Cx) {
1161    ///     println!("Task {:?} starting work", cx.task_id());
1162    /// }
1163    /// ```
1164    #[inline]
1165    #[must_use]
1166    pub fn task_id(&self) -> TaskId {
1167        self.inner.read().task
1168    }
1169
1170    /// Returns the task type label, if one has been set.
1171    ///
1172    /// Task types are optional metadata used by adaptive deadline monitoring
1173    /// and metrics to group similar work.
1174    #[inline]
1175    #[must_use]
1176    pub fn task_type(&self) -> Option<String> {
1177        self.inner.read().task_type.clone()
1178    }
1179
1180    /// Sets a task type label for adaptive monitoring and metrics.
1181    ///
1182    /// This is intended to be called early in task execution to associate
1183    /// a stable label with the task's behavior profile.
1184    pub fn set_task_type(&self, task_type: impl Into<String>) {
1185        let mut inner = self.inner.write();
1186        inner.task_type = Some(task_type.into());
1187    }
1188
1189    /// Returns the current budget.
1190    ///
1191    /// The budget defines resource limits for this task:
1192    /// - `deadline`: Absolute time limit
1193    /// - `poll_quota`: Maximum number of polls
1194    /// - `cost_quota`: Abstract cost units
1195    /// - `priority`: Scheduling priority
1196    ///
1197    /// Frameworks can use the budget to implement request timeouts:
1198    ///
1199    /// # Example
1200    ///
1201    /// ```ignore
1202    /// async fn check_timeout(cx: &Cx) -> Result<(), TimeoutError> {
1203    ///     let budget = cx.budget();
1204    ///     if budget.is_expired() {
1205    ///         return Err(TimeoutError::DeadlineExceeded);
1206    ///     }
1207    ///     Ok(())
1208    /// }
1209    /// ```
1210    #[inline]
1211    #[must_use]
1212    pub fn budget(&self) -> Budget {
1213        self.inner.read().budget
1214    }
1215
1216    /// Returns true if cancellation has been requested.
1217    ///
1218    /// This is a non-blocking check that queries whether a cancellation signal
1219    /// has been sent to this task. Unlike `checkpoint()`, this method does not
1220    /// return an error - it just reports the current state.
1221    ///
1222    /// Frameworks should check this periodically during long-running operations
1223    /// to enable graceful shutdown.
1224    ///
1225    /// # Example
1226    ///
1227    /// ```ignore
1228    /// async fn process_items(cx: &Cx, items: Vec<Item>) -> Result<(), Error> {
1229    ///     for item in items {
1230    ///         // Check for cancellation between items
1231    ///         if cx.is_cancel_requested() {
1232    ///             return Err(Error::Cancelled);
1233    ///         }
1234    ///         process(item).await?;
1235    ///     }
1236    ///     Ok(())
1237    /// }
1238    /// ```
1239    #[inline]
1240    #[must_use]
1241    pub fn is_cancel_requested(&self) -> bool {
1242        self.inner.read().cancel_requested
1243    }
1244
1245    /// Checks for cancellation and returns an error if cancelled.
1246    ///
1247    /// This is a checkpoint where cancellation can be observed. It combines
1248    /// checking the cancellation flag with returning an error, making it
1249    /// convenient for use with the `?` operator.
1250    ///
1251    /// In addition to cancellation checking, this method records progress by
1252    /// updating the checkpoint state. This is useful for:
1253    /// - Detecting stuck/stalled tasks via `checkpoint_state()`
1254    /// - Work-stealing scheduler decisions
1255    /// - Observability and debugging
1256    ///
1257    /// If the context is currently masked (via `masked()`), this method
1258    /// returns `Ok(())` even when cancellation is pending, deferring the
1259    /// cancellation until the mask is released.
1260    ///
1261    /// # Errors
1262    ///
1263    /// Returns an `Err` with kind `ErrorKind::Cancelled` if cancellation is
1264    /// pending and the context is not masked.
1265    ///
1266    /// # Example
1267    ///
1268    /// ```ignore
1269    /// async fn do_work(cx: &Cx) -> Result<(), Error> {
1270    ///     // Use checkpoint with ? for concise cancellation handling
1271    ///     cx.checkpoint()?;
1272    ///
1273    ///     expensive_operation().await?;
1274    ///
1275    ///     cx.checkpoint()?;
1276    ///
1277    ///     another_operation().await?;
1278    ///
1279    ///     Ok(())
1280    /// }
1281    /// ```
1282    /// Implements `rule.cancel.checkpoint_masked` (#10):
1283    /// if cancel_requested and mask_depth == 0, acknowledge cancellation.
1284    /// If mask_depth > 0, cancel remains deferred until mask is unwound.
1285    #[allow(clippy::result_large_err)]
1286    pub fn checkpoint(&self) -> Result<(), crate::error::Error> {
1287        let checkpoint_time = self.current_checkpoint_time();
1288        // Record progress checkpoint and check cancellation under a single lock
1289        let (
1290            cancel_requested,
1291            mask_depth,
1292            task,
1293            region,
1294            budget,
1295            budget_baseline,
1296            cancel_reason,
1297            budget_exhaustion,
1298        ) = {
1299            let mut inner = self.inner.write();
1300            inner.checkpoint_state.record_at(checkpoint_time);
1301            let budget_exhaustion = Self::checkpoint_budget_exhaustion(
1302                inner.region,
1303                inner.task,
1304                inner.budget,
1305                checkpoint_time,
1306            );
1307            if let Some((reason, _, _)) = &budget_exhaustion {
1308                inner.cancel_requested = true;
1309                inner
1310                    .fast_cancel
1311                    .store(true, std::sync::atomic::Ordering::Release);
1312                if let Some(existing) = &mut inner.cancel_reason {
1313                    existing.strengthen(reason);
1314                } else {
1315                    inner.cancel_reason = Some(reason.clone());
1316                }
1317            }
1318            if inner.cancel_requested && inner.mask_depth == 0 {
1319                inner.cancel_acknowledged = true;
1320            }
1321            (
1322                inner.cancel_requested,
1323                inner.mask_depth,
1324                inner.task,
1325                inner.region,
1326                inner.budget,
1327                inner.budget_baseline,
1328                inner.cancel_reason.clone(),
1329                budget_exhaustion.map(|(_, exhaustion_kind, deadline_remaining_ms)| {
1330                    (exhaustion_kind, deadline_remaining_ms)
1331                }),
1332            )
1333        };
1334
1335        if let Some((exhaustion_kind, deadline_remaining_ms)) = budget_exhaustion {
1336            if let Some(ref sink) = self.handles.evidence_sink {
1337                crate::evidence_sink::emit_budget_evidence(
1338                    sink.as_ref(),
1339                    exhaustion_kind,
1340                    budget.poll_quota,
1341                    deadline_remaining_ms,
1342                );
1343            }
1344        }
1345
1346        // Emit evidence for cancellation decisions observed at checkpoint.
1347        if cancel_requested && mask_depth == 0 {
1348            if let Some(ref sink) = self.handles.evidence_sink {
1349                let kind_str = cancel_reason
1350                    .as_ref()
1351                    .map_or_else(|| "unknown".to_string(), |r| format!("{}", r.kind));
1352                crate::evidence_sink::emit_cancel_evidence(
1353                    sink.as_ref(),
1354                    &kind_str,
1355                    budget.poll_quota,
1356                    budget.priority,
1357                );
1358            }
1359        }
1360
1361        Self::check_cancel_from_values(
1362            cancel_requested,
1363            mask_depth,
1364            task,
1365            region,
1366            budget,
1367            budget_baseline,
1368            checkpoint_time,
1369            cancel_reason.as_ref(),
1370        )
1371    }
1372
1373    /// Checks for cancellation with a progress message.
1374    ///
1375    /// This is like [`checkpoint()`](Self::checkpoint) but also records a
1376    /// human-readable message describing the current progress. The message
1377    /// is stored in the checkpoint state and can be retrieved via
1378    /// [`checkpoint_state()`](Self::checkpoint_state).
1379    ///
1380    /// # Errors
1381    ///
1382    /// Returns an `Err` with kind `ErrorKind::Cancelled` if cancellation is
1383    /// pending and the context is not masked.
1384    ///
1385    /// # Example
1386    ///
1387    /// ```ignore
1388    /// async fn process_batch(cx: &Cx, items: &[Item]) -> Result<(), Error> {
1389    ///     for (i, item) in items.iter().enumerate() {
1390    ///         cx.checkpoint_with(format!("Processing item {}/{}", i + 1, items.len()))?;
1391    ///         process(item).await?;
1392    ///     }
1393    ///     Ok(())
1394    /// }
1395    /// ```
1396    #[allow(clippy::result_large_err)]
1397    pub fn checkpoint_with(&self, msg: impl Into<String>) -> Result<(), crate::error::Error> {
1398        let checkpoint_time = self.current_checkpoint_time();
1399        // Record progress checkpoint and check cancellation under a single lock
1400        let (
1401            cancel_requested,
1402            mask_depth,
1403            task,
1404            region,
1405            budget,
1406            budget_baseline,
1407            cancel_reason,
1408            budget_exhaustion,
1409        ) = {
1410            let mut inner = self.inner.write();
1411            inner
1412                .checkpoint_state
1413                .record_with_message_at(msg.into(), checkpoint_time);
1414            let budget_exhaustion = Self::checkpoint_budget_exhaustion(
1415                inner.region,
1416                inner.task,
1417                inner.budget,
1418                checkpoint_time,
1419            );
1420            if let Some((reason, _, _)) = &budget_exhaustion {
1421                inner.cancel_requested = true;
1422                inner
1423                    .fast_cancel
1424                    .store(true, std::sync::atomic::Ordering::Release);
1425                if let Some(existing) = &mut inner.cancel_reason {
1426                    existing.strengthen(reason);
1427                } else {
1428                    inner.cancel_reason = Some(reason.clone());
1429                }
1430            }
1431            if inner.cancel_requested && inner.mask_depth == 0 {
1432                inner.cancel_acknowledged = true;
1433            }
1434            (
1435                inner.cancel_requested,
1436                inner.mask_depth,
1437                inner.task,
1438                inner.region,
1439                inner.budget,
1440                inner.budget_baseline,
1441                inner.cancel_reason.clone(),
1442                budget_exhaustion.map(|(_, exhaustion_kind, deadline_remaining_ms)| {
1443                    (exhaustion_kind, deadline_remaining_ms)
1444                }),
1445            )
1446        };
1447
1448        if let Some((exhaustion_kind, deadline_remaining_ms)) = budget_exhaustion {
1449            if let Some(ref sink) = self.handles.evidence_sink {
1450                crate::evidence_sink::emit_budget_evidence(
1451                    sink.as_ref(),
1452                    exhaustion_kind,
1453                    budget.poll_quota,
1454                    deadline_remaining_ms,
1455                );
1456            }
1457        }
1458
1459        // Emit evidence for cancellation decisions observed at checkpoint.
1460        if cancel_requested && mask_depth == 0 {
1461            if let Some(ref sink) = self.handles.evidence_sink {
1462                let kind_str = cancel_reason
1463                    .as_ref()
1464                    .map_or_else(|| "unknown".to_string(), |r| format!("{}", r.kind));
1465                crate::evidence_sink::emit_cancel_evidence(
1466                    sink.as_ref(),
1467                    &kind_str,
1468                    budget.poll_quota,
1469                    budget.priority,
1470                );
1471            }
1472        }
1473
1474        Self::check_cancel_from_values(
1475            cancel_requested,
1476            mask_depth,
1477            task,
1478            region,
1479            budget,
1480            budget_baseline,
1481            checkpoint_time,
1482            cancel_reason.as_ref(),
1483        )
1484    }
1485
1486    /// Returns a snapshot of the current checkpoint state.
1487    ///
1488    /// The checkpoint state tracks progress reporting checkpoints:
1489    /// - `last_checkpoint`: The runtime time when the last checkpoint was recorded
1490    /// - `last_message`: The message from the last `checkpoint_with()` call
1491    /// - `checkpoint_count`: Total number of checkpoints
1492    ///
1493    /// This is useful for monitoring task progress and detecting stalled tasks.
1494    ///
1495    /// # Example
1496    ///
1497    /// ```ignore
1498    /// fn check_task_health(cx: &Cx) -> bool {
1499    ///     let state = cx.checkpoint_state();
1500    ///     state.last_checkpoint.is_some()
1501    /// }
1502    /// ```
1503    #[must_use]
1504    pub fn checkpoint_state(&self) -> crate::types::CheckpointState {
1505        self.inner.read().checkpoint_state.clone()
1506    }
1507
1508    /// Returns the current physical time according to the configured timer driver,
1509    /// or the wall clock if no timer driver is available.
1510    #[must_use]
1511    pub fn now(&self) -> Time
1512    where
1513        Caps: cap::HasTime,
1514    {
1515        self.handles
1516            .timer_driver
1517            .as_ref()
1518            .map_or_else(wall_clock_now, TimerDriverHandle::now)
1519    }
1520
1521    /// Internal: returns current time for checkpointing.
1522    #[inline]
1523    fn current_checkpoint_time(&self) -> Time {
1524        self.handles
1525            .timer_driver
1526            .as_ref()
1527            .map_or_else(wall_clock_now, TimerDriverHandle::now)
1528    }
1529
1530    #[inline]
1531    fn checkpoint_budget_exhaustion(
1532        region: RegionId,
1533        task: TaskId,
1534        budget: Budget,
1535        now: Time,
1536    ) -> Option<(CancelReason, &'static str, Option<u64>)> {
1537        let deadline_remaining_ms = budget
1538            .remaining_time(now)
1539            .map(Self::duration_millis_saturating);
1540
1541        let mut exhaustion = if budget.is_past_deadline(now) {
1542            Some((
1543                CancelReason::with_origin(CancelKind::Deadline, region, now).with_task(task),
1544                "time",
1545                deadline_remaining_ms,
1546            ))
1547        } else {
1548            None
1549        };
1550
1551        if budget.poll_quota == 0 {
1552            let candidate =
1553                CancelReason::with_origin(CancelKind::PollQuota, region, now).with_task(task);
1554            match &mut exhaustion {
1555                Some((existing, kind, _)) => {
1556                    if existing.strengthen(&candidate) {
1557                        *kind = "poll";
1558                    }
1559                }
1560                None => exhaustion = Some((candidate, "poll", deadline_remaining_ms)),
1561            }
1562        }
1563
1564        if matches!(budget.cost_quota, Some(0)) {
1565            let candidate =
1566                CancelReason::with_origin(CancelKind::CostBudget, region, now).with_task(task);
1567            match &mut exhaustion {
1568                Some((existing, kind, _)) => {
1569                    if existing.strengthen(&candidate) {
1570                        *kind = "cost";
1571                    }
1572                }
1573                None => exhaustion = Some((candidate, "cost", deadline_remaining_ms)),
1574            }
1575        }
1576
1577        exhaustion
1578    }
1579
1580    #[inline]
1581    fn checkpoint_budget_usage(
1582        budget: Budget,
1583        budget_baseline: Budget,
1584        now: Time,
1585    ) -> (Option<u32>, Option<u64>, Option<u64>) {
1586        let polls_used = if budget_baseline.poll_quota == u32::MAX {
1587            None
1588        } else {
1589            Some(budget_baseline.poll_quota.saturating_sub(budget.poll_quota))
1590        };
1591        let cost_used = match (budget_baseline.cost_quota, budget.cost_quota) {
1592            (Some(baseline), Some(remaining)) => Some(baseline.saturating_sub(remaining)),
1593            _ => None,
1594        };
1595        let time_remaining_ms = budget
1596            .remaining_time(now)
1597            .map(Self::duration_millis_saturating);
1598        (polls_used, cost_used, time_remaining_ms)
1599    }
1600
1601    #[inline]
1602    fn duration_millis_saturating(duration: Duration) -> u64 {
1603        u64::try_from(duration.as_millis()).unwrap_or(u64::MAX)
1604    }
1605
1606    /// Internal: checks cancellation from extracted values.
1607    #[allow(clippy::result_large_err)]
1608    #[allow(clippy::too_many_arguments)]
1609    fn check_cancel_from_values(
1610        cancel_requested: bool,
1611        mask_depth: u32,
1612        task: TaskId,
1613        region: RegionId,
1614        budget: Budget,
1615        budget_baseline: Budget,
1616        checkpoint_time: Time,
1617        cancel_reason: Option<&CancelReason>,
1618    ) -> Result<(), crate::error::Error> {
1619        let (polls_used, cost_used, time_remaining_ms) =
1620            Self::checkpoint_budget_usage(budget, budget_baseline, checkpoint_time);
1621
1622        let _ = (
1623            &task,
1624            &region,
1625            &budget,
1626            &budget_baseline,
1627            &polls_used,
1628            &cost_used,
1629            &time_remaining_ms,
1630        );
1631
1632        trace!(
1633            task_id = ?task,
1634            region_id = ?region,
1635            polls_used = ?polls_used,
1636            polls_remaining = budget.poll_quota,
1637            time_remaining_ms = ?time_remaining_ms,
1638            cost_used = ?cost_used,
1639            cost_remaining = ?budget.cost_quota,
1640            deadline = ?budget.deadline,
1641            cancel_reason = ?cancel_reason,
1642            cancel_requested,
1643            mask_depth,
1644            "checkpoint"
1645        );
1646
1647        if cancel_requested {
1648            if mask_depth == 0 {
1649                let cancel_reason_ref = cancel_reason.as_ref();
1650                let exhausted_resource = cancel_reason_ref
1651                    .map_or_else(|| "unknown".to_string(), |r| format!("{:?}", r.kind));
1652                let _ = &exhausted_resource;
1653
1654                info!(
1655                    task_id = ?task,
1656                    region_id = ?region,
1657                    exhausted_resource = %exhausted_resource,
1658                    cancel_reason = ?cancel_reason,
1659                    budget_deadline = ?budget.deadline,
1660                    budget_poll_quota = budget.poll_quota,
1661                    budget_cost_quota = ?budget.cost_quota,
1662                    "cancel observed at checkpoint - task cancelled"
1663                );
1664
1665                trace!(
1666                    task_id = ?task,
1667                    region_id = ?region,
1668                    cancel_reason = ?cancel_reason,
1669                    cancel_kind = ?cancel_reason.as_ref().map(|r| r.kind),
1670                    mask_depth,
1671                    budget_deadline = ?budget.deadline,
1672                    budget_poll_quota = budget.poll_quota,
1673                    budget_cost_quota = ?budget.cost_quota,
1674                    budget_priority = budget.priority,
1675                    "cancel observed at checkpoint"
1676                );
1677                Err(crate::error::Error::new(crate::error::ErrorKind::Cancelled))
1678            } else {
1679                trace!(
1680                    task_id = ?task,
1681                    region_id = ?region,
1682                    cancel_reason = ?cancel_reason,
1683                    cancel_kind = ?cancel_reason.as_ref().map(|r| r.kind),
1684                    mask_depth,
1685                    "cancel observed but masked"
1686                );
1687                Ok(())
1688            }
1689        } else {
1690            Ok(())
1691        }
1692    }
1693
1694    /// Executes a closure with cancellation masked.
1695    ///
1696    /// While masked, `checkpoint()` will return `Ok(())` even if cancellation
1697    /// has been requested. This is used for critical sections that must not
1698    /// be interrupted, such as:
1699    ///
1700    /// - Completing a two-phase commit
1701    /// - Flushing buffered data
1702    /// - Releasing resources in a specific order
1703    ///
1704    /// Masking can be nested - each call to `masked()` increments a depth
1705    /// counter, and cancellation is only observable when depth returns to 0.
1706    ///
1707    /// # Example
1708    ///
1709    /// ```ignore
1710    /// async fn commit_transaction(cx: &Cx, tx: Transaction) -> Result<(), Error> {
1711    ///     // Critical section: must complete even if cancelled
1712    ///     cx.masked(|| {
1713    ///         tx.prepare()?;
1714    ///         tx.commit()?;  // Cannot be interrupted here
1715    ///         Ok(())
1716    ///     })
1717    /// }
1718    /// ```
1719    ///
1720    /// # Note
1721    ///
1722    /// Use masking sparingly. Long-masked sections defeat the purpose of
1723    /// responsive cancellation. Prefer short critical sections followed
1724    /// by a checkpoint.
1725    ///
1726    /// Invariant `inv.cancel.mask_monotone` (#12): mask_depth is monotonically
1727    /// non-increasing during cancel processing. The increment here occurs before
1728    /// cancel acknowledgement; `MaskGuard::drop` decrements via `saturating_sub(1)`.
1729    /// Invariant `inv.cancel.mask_bounded` (#11): mask_depth <= MAX_MASK_DEPTH.
1730    pub fn masked<F, R>(&self, f: F) -> R
1731    where
1732        F: FnOnce() -> R,
1733    {
1734        {
1735            let mut inner = self.inner.write();
1736            assert!(
1737                inner.mask_depth < crate::types::task_context::MAX_MASK_DEPTH,
1738                "mask depth exceeded MAX_MASK_DEPTH ({}): this violates INV-MASK-BOUNDED \
1739                 and prevents cancellation from ever being observed. \
1740                 Reduce nesting of Cx::masked() sections.",
1741                crate::types::task_context::MAX_MASK_DEPTH,
1742            );
1743            inner.mask_depth += 1;
1744        }
1745
1746        let _guard = MaskGuard { inner: &self.inner };
1747        f()
1748    }
1749
1750    /// Traces an event for observability.
1751    ///
1752    /// Trace events are associated with the current task and region, enabling
1753    /// structured observability. In the lab runtime, traces are captured
1754    /// deterministically for replay and debugging.
1755    ///
1756    /// # Example
1757    ///
1758    /// ```ignore
1759    /// async fn process_request(cx: &Cx, request: &Request) -> Response {
1760    ///     cx.trace("Request received");
1761    ///
1762    ///     let result = handle(request).await;
1763    ///
1764    ///     cx.trace("Request processed");
1765    ///
1766    ///     result
1767    /// }
1768    /// ```
1769    ///
1770    /// # Note
1771    ///
1772    /// When a trace buffer is attached to this `Cx`, this writes a structured
1773    /// user trace event into that buffer and also emits to the log collector.
1774    /// Without a trace buffer, it still records the log entry.
1775    pub fn trace(&self, message: &str) {
1776        self.log(LogEntry::trace(message));
1777        let Some(trace) = self.trace_buffer() else {
1778            return;
1779        };
1780        let now = self
1781            .handles
1782            .timer_driver
1783            .as_ref()
1784            .map_or_else(wall_clock_now, TimerDriverHandle::now);
1785        let logical_time = self.logical_tick();
1786        trace.record_event(move |seq| {
1787            TraceEvent::user_trace(seq, now, message).with_logical_time(logical_time)
1788        });
1789    }
1790
1791    /// Logs a trace-level message with structured key-value fields.
1792    ///
1793    /// Each field is attached to the resulting `LogEntry`, making it
1794    /// queryable in the log collector.
1795    ///
1796    /// # Example
1797    ///
1798    /// ```ignore
1799    /// cx.trace_with_fields("request handled", &[
1800    ///     ("method", "GET"),
1801    ///     ("path", "/api/users"),
1802    ///     ("status", "200"),
1803    /// ]);
1804    /// ```
1805    pub fn trace_with_fields(&self, message: &str, fields: &[(&str, &str)]) {
1806        let mut entry = LogEntry::trace(message);
1807        for &(k, v) in fields {
1808            entry = entry.with_field(k, v);
1809        }
1810        self.log(entry);
1811        let Some(trace) = self.trace_buffer() else {
1812            return;
1813        };
1814        let now = self
1815            .handles
1816            .timer_driver
1817            .as_ref()
1818            .map_or_else(wall_clock_now, TimerDriverHandle::now);
1819        let logical_time = self.logical_tick();
1820        trace.record_event(move |seq| {
1821            TraceEvent::user_trace(seq, now, message).with_logical_time(logical_time)
1822        });
1823    }
1824
1825    /// Enters a named span, returning a guard that ends the span on drop.
1826    ///
1827    /// The span forks the current `DiagnosticContext`, assigning a new
1828    /// `SpanId` with the previous span as parent. When the guard is
1829    /// dropped the original context is restored.
1830    ///
1831    /// # Example
1832    ///
1833    /// ```ignore
1834    /// {
1835    ///     let _guard = cx.enter_span("parse_request");
1836    ///     // ... work inside the span ...
1837    /// } // span ends here
1838    /// ```
1839    #[must_use]
1840    pub fn enter_span(&self, name: &str) -> SpanGuard<Caps> {
1841        let prev = self.diagnostic_context();
1842        let child = prev.fork().with_custom("span.name", name);
1843        self.set_diagnostic_context(child);
1844        self.log(LogEntry::debug(format!("span enter: {name}")).with_target("tracing"));
1845        SpanGuard {
1846            cx: self.clone(),
1847            prev,
1848        }
1849    }
1850
1851    /// Sets a request correlation ID on the diagnostic context.
1852    ///
1853    /// The ID propagates to all log entries and child spans created
1854    /// from this context, enabling end-to-end request tracing.
1855    pub fn set_request_id(&self, id: impl Into<String>) {
1856        let mut obs = self.observability.write();
1857        obs.context = obs.context.clone().with_custom("request_id", id);
1858    }
1859
1860    /// Returns the current request correlation ID, if set.
1861    #[inline]
1862    #[must_use]
1863    pub fn request_id(&self) -> Option<String> {
1864        self.diagnostic_context()
1865            .custom("request_id")
1866            .map(String::from)
1867    }
1868
1869    /// Logs a structured entry to the attached collector, if present.
1870    pub fn log(&self, entry: LogEntry) {
1871        let obs = self.observability.read();
1872        let Some(collector) = obs.collector.clone() else {
1873            return;
1874        };
1875        let include_timestamps = obs.include_timestamps;
1876        let context = obs.context.clone();
1877        drop(obs);
1878        let mut entry = entry.with_context(&context);
1879        if include_timestamps && entry.timestamp() == Time::ZERO {
1880            let now = self
1881                .handles
1882                .timer_driver
1883                .as_ref()
1884                .map_or_else(wall_clock_now, TimerDriverHandle::now);
1885            entry = entry.with_timestamp(now);
1886        }
1887        collector.log(entry);
1888    }
1889
1890    /// Returns a snapshot of the current diagnostic context.
1891    #[must_use]
1892    pub fn diagnostic_context(&self) -> DiagnosticContext {
1893        self.observability.read().context.clone()
1894    }
1895
1896    /// Replaces the current diagnostic context.
1897    pub fn set_diagnostic_context(&self, ctx: DiagnosticContext) {
1898        let mut obs = self.observability.write();
1899        obs.context = ctx;
1900    }
1901
1902    /// Attaches a log collector to this context.
1903    pub fn set_log_collector(&self, collector: LogCollector) {
1904        let mut obs = self.observability.write();
1905        obs.collector = Some(collector);
1906    }
1907
1908    /// Returns the current log collector, if attached.
1909    #[inline]
1910    #[must_use]
1911    pub fn log_collector(&self) -> Option<LogCollector> {
1912        self.observability.read().collector.clone()
1913    }
1914
1915    /// Attaches a trace buffer to this context.
1916    pub fn set_trace_buffer(&self, trace: TraceBufferHandle) {
1917        let mut obs = self.observability.write();
1918        obs.trace = Some(trace);
1919    }
1920
1921    /// Returns the current trace buffer handle, if attached.
1922    #[inline]
1923    #[must_use]
1924    pub fn trace_buffer(&self) -> Option<TraceBufferHandle> {
1925        self.observability.read().trace.clone()
1926    }
1927
1928    /// Derives an observability state for a child task.
1929    pub(crate) fn child_observability(&self, region: RegionId, task: TaskId) -> ObservabilityState {
1930        let obs = self.observability.read();
1931        obs.derive_child(region, task)
1932    }
1933
1934    /// Returns the entropy source for this context.
1935    #[inline]
1936    #[must_use]
1937    pub fn entropy(&self) -> &dyn EntropySource
1938    where
1939        Caps: cap::HasRandom,
1940    {
1941        self.handles.entropy.as_ref()
1942    }
1943
1944    /// Derives an entropy source for a child task.
1945    pub(crate) fn child_entropy(&self, task: TaskId) -> Arc<dyn EntropySource> {
1946        self.handles.entropy.fork(task)
1947    }
1948
1949    /// Returns a cloned entropy handle for capability-aware subsystems.
1950    #[inline]
1951    #[must_use]
1952    pub(crate) fn entropy_handle(&self) -> Arc<dyn EntropySource>
1953    where
1954        Caps: cap::HasRandom,
1955    {
1956        self.handles.entropy.clone()
1957    }
1958
1959    /// Generates a random `u64` using the context entropy source.
1960    #[must_use]
1961    pub fn random_u64(&self) -> u64
1962    where
1963        Caps: cap::HasRandom,
1964    {
1965        let value = self.handles.entropy.next_u64();
1966        trace!(
1967            source = self.handles.entropy.source_id(),
1968            task_id = ?self.task_id(),
1969            value,
1970            "entropy_u64"
1971        );
1972        value
1973    }
1974
1975    /// Fills a buffer with random bytes using the context entropy source.
1976    pub fn random_bytes(&self, dest: &mut [u8])
1977    where
1978        Caps: cap::HasRandom,
1979    {
1980        self.handles.entropy.fill_bytes(dest);
1981        trace!(
1982            source = self.handles.entropy.source_id(),
1983            task_id = ?self.task_id(),
1984            len = dest.len(),
1985            "entropy_bytes"
1986        );
1987    }
1988
1989    /// Generates a random `usize` in `[0, bound)` with rejection sampling.
1990    #[must_use]
1991    pub fn random_usize(&self, bound: usize) -> usize
1992    where
1993        Caps: cap::HasRandom,
1994    {
1995        assert!(bound > 0, "bound must be non-zero");
1996        let bound_u64 = bound as u64;
1997        let threshold = u64::MAX - (u64::MAX % bound_u64);
1998        loop {
1999            let value = self.random_u64();
2000            if value < threshold {
2001                return (value % bound_u64) as usize;
2002            }
2003        }
2004    }
2005
2006    /// Generates a random boolean.
2007    #[must_use]
2008    pub fn random_bool(&self) -> bool
2009    where
2010        Caps: cap::HasRandom,
2011    {
2012        self.random_u64() & 1 == 1
2013    }
2014
2015    /// Generates a random `f64` in `[0, 1)`.
2016    #[must_use]
2017    #[allow(clippy::cast_precision_loss)]
2018    pub fn random_f64(&self) -> f64
2019    where
2020        Caps: cap::HasRandom,
2021    {
2022        (self.random_u64() >> 11) as f64 / (1u64 << 53) as f64
2023    }
2024
2025    /// Shuffles a slice in place using Fisher-Yates.
2026    pub fn shuffle<T>(&self, slice: &mut [T])
2027    where
2028        Caps: cap::HasRandom,
2029    {
2030        for i in (1..slice.len()).rev() {
2031            let j = self.random_usize(i + 1);
2032            slice.swap(i, j);
2033        }
2034    }
2035
2036    /// Sets the cancellation flag (internal use).
2037    #[allow(dead_code)]
2038    pub(crate) fn set_cancel_internal(&self, value: bool) {
2039        let mut inner = self.inner.write();
2040        inner.cancel_requested = value;
2041        inner
2042            .fast_cancel
2043            .store(value, std::sync::atomic::Ordering::Release);
2044        if !value {
2045            inner.cancel_reason = None;
2046        }
2047    }
2048
2049    /// Sets the cancellation flag for testing purposes.
2050    ///
2051    /// This method allows tests to simulate cancellation signals. It sets the
2052    /// `cancel_requested` flag, which will cause subsequent `checkpoint()` calls
2053    /// to return an error (unless masked).
2054    ///
2055    /// # Example
2056    ///
2057    /// ```
2058    /// use asupersync::Cx;
2059    ///
2060    /// let cx = Cx::for_testing();
2061    /// assert!(cx.checkpoint().is_ok());
2062    ///
2063    /// cx.set_cancel_requested(true);
2064    /// assert!(cx.checkpoint().is_err());
2065    /// ```
2066    ///
2067    /// # Note
2068    ///
2069    /// This API is intended for testing only. In production, cancellation signals
2070    /// are propagated by the runtime through the task tree.
2071    pub fn set_cancel_requested(&self, value: bool) {
2072        let waker = {
2073            let mut inner = self.inner.write();
2074            inner.cancel_requested = value;
2075            inner
2076                .fast_cancel
2077                .store(value, std::sync::atomic::Ordering::Release);
2078            if !value {
2079                inner.cancel_reason = None;
2080                None
2081            } else {
2082                inner.cancel_waker.clone()
2083            }
2084        };
2085        if let Some(waker) = waker {
2086            waker.wake();
2087        }
2088    }
2089
2090    // ========================================================================
2091    // Cancel Attribution API
2092    // ========================================================================
2093
2094    /// Cancels this context with a detailed reason.
2095    ///
2096    /// This is the preferred method for initiating cancellation, as it provides
2097    /// complete attribution information. The reason includes:
2098    /// - The kind of cancellation (e.g., User, Timeout, Deadline)
2099    /// - An optional message explaining the cancellation
2100    /// - Origin region and task information (automatically set)
2101    ///
2102    /// # Arguments
2103    ///
2104    /// * `kind` - The type of cancellation being initiated
2105    /// * `message` - An optional human-readable message explaining why
2106    ///
2107    /// # Example
2108    ///
2109    /// ```
2110    /// use asupersync::{Cx, types::CancelKind};
2111    ///
2112    /// let cx = Cx::for_testing();
2113    /// cx.cancel_with(CancelKind::User, Some("User pressed Ctrl+C"));
2114    /// assert!(cx.is_cancel_requested());
2115    ///
2116    /// if let Some(reason) = cx.cancel_reason() {
2117    ///     assert_eq!(reason.kind, CancelKind::User);
2118    /// }
2119    /// ```
2120    ///
2121    /// # Note
2122    ///
2123    /// This method only sets the local cancellation flag. In a real runtime,
2124    /// cancellation propagates through the region tree via `cancel_request()`.
2125    pub fn cancel_with(&self, kind: CancelKind, message: Option<&'static str>) {
2126        let (region, task, waker) = {
2127            let mut inner = self.inner.write();
2128            let region = inner.region;
2129            let task = inner.task;
2130
2131            let mut reason = CancelReason::new(kind).with_region(region).with_task(task);
2132            if let Some(msg) = message {
2133                reason = reason.with_message(msg);
2134            }
2135
2136            inner.cancel_requested = true;
2137            inner
2138                .fast_cancel
2139                .store(true, std::sync::atomic::Ordering::Release);
2140            inner.cancel_reason = Some(reason);
2141            let waker = inner.cancel_waker.clone();
2142            drop(inner);
2143            (region, task, waker)
2144        };
2145
2146        if let Some(w) = waker {
2147            w.wake();
2148        }
2149
2150        debug!(
2151            task_id = ?task,
2152            region_id = ?region,
2153            cancel_kind = ?kind,
2154            cancel_message = message,
2155            "cancel initiated via cancel_with"
2156        );
2157        let _ = (region, task);
2158    }
2159
2160    /// Cancels without building a full attribution chain (performance-critical path).
2161    ///
2162    /// Use this when attribution isn't needed and minimizing allocations is important.
2163    /// The cancellation reason will have minimal attribution (kind + region only).
2164    ///
2165    /// # Performance
2166    ///
2167    /// This method avoids:
2168    /// - Message string allocation
2169    /// - Cause chain allocation
2170    /// - Timestamp lookup
2171    ///
2172    /// Use `cancel_with` when you need full attribution for debugging.
2173    ///
2174    /// # Example
2175    ///
2176    /// ```
2177    /// use asupersync::{Cx, types::CancelKind};
2178    ///
2179    /// let cx = Cx::for_testing();
2180    ///
2181    /// // Fast cancellation - no allocation
2182    /// cx.cancel_fast(CancelKind::RaceLost);
2183    /// assert!(cx.is_cancel_requested());
2184    /// ```
2185    pub fn cancel_fast(&self, kind: CancelKind) {
2186        let (region, waker) = {
2187            let mut inner = self.inner.write();
2188            let region = inner.region;
2189
2190            // Minimal attribution: just kind and region
2191            let reason = CancelReason::new(kind).with_region(region);
2192
2193            inner.cancel_requested = true;
2194            inner
2195                .fast_cancel
2196                .store(true, std::sync::atomic::Ordering::Release);
2197            inner.cancel_reason = Some(reason);
2198            let waker = inner.cancel_waker.clone();
2199            drop(inner);
2200            (region, waker)
2201        };
2202
2203        if let Some(w) = waker {
2204            w.wake();
2205        }
2206
2207        trace!(
2208            region_id = ?region,
2209            cancel_kind = ?kind,
2210            "cancel_fast initiated"
2211        );
2212        let _ = region;
2213    }
2214
2215    /// Gets the cancellation reason if this context is cancelled.
2216    ///
2217    /// Returns `None` if the context is not cancelled, or `Some(reason)` if
2218    /// cancellation has been requested. The returned reason includes full
2219    /// attribution (kind, origin region, origin task, timestamp, cause chain).
2220    ///
2221    /// # Example
2222    ///
2223    /// ```
2224    /// use asupersync::{Cx, types::CancelKind};
2225    ///
2226    /// let cx = Cx::for_testing();
2227    /// assert!(cx.cancel_reason().is_none());
2228    ///
2229    /// cx.cancel_with(CancelKind::Timeout, Some("request timeout"));
2230    /// if let Some(reason) = cx.cancel_reason() {
2231    ///     assert_eq!(reason.kind, CancelKind::Timeout);
2232    ///     println!("Cancelled: {:?}", reason.kind);
2233    /// }
2234    /// ```
2235    #[inline]
2236    #[must_use]
2237    pub fn cancel_reason(&self) -> Option<CancelReason> {
2238        let inner = self.inner.read();
2239        inner.cancel_reason.clone()
2240    }
2241
2242    /// Iterates through the full cancellation cause chain.
2243    ///
2244    /// The first element is the immediate reason, followed by parent causes
2245    /// in order (immediate -> root). This is useful for understanding the
2246    /// full propagation path of a cancellation.
2247    ///
2248    /// Returns an empty iterator if the context is not cancelled.
2249    ///
2250    /// # Example
2251    ///
2252    /// ```
2253    /// use asupersync::{Cx, types::{CancelKind, CancelReason}};
2254    ///
2255    /// let cx = Cx::for_testing();
2256    ///
2257    /// // Create a chained reason: ParentCancelled -> Deadline
2258    /// let root_cause = CancelReason::deadline();
2259    /// let chained = CancelReason::parent_cancelled().with_cause(root_cause);
2260    ///
2261    /// // Set it via internal method for testing
2262    /// cx.set_cancel_reason(chained);
2263    ///
2264    /// let chain: Vec<_> = cx.cancel_chain().collect();
2265    /// assert_eq!(chain.len(), 2);
2266    /// assert_eq!(chain[0].kind, CancelKind::ParentCancelled);
2267    /// assert_eq!(chain[1].kind, CancelKind::Deadline);
2268    /// ```
2269    pub fn cancel_chain(&self) -> impl Iterator<Item = CancelReason> {
2270        let cancel_reason = self.inner.read().cancel_reason.clone();
2271        std::iter::successors(cancel_reason, |r| r.cause.as_deref().cloned())
2272    }
2273
2274    /// Gets the root cause of cancellation.
2275    ///
2276    /// This is the original trigger that initiated the cancellation, regardless
2277    /// of how many parent regions the cancellation propagated through. For example,
2278    /// if a grandchild task was cancelled due to a parent timeout, `root_cancel_cause()`
2279    /// returns the original Timeout reason, not the intermediate ParentCancelled reasons.
2280    ///
2281    /// Returns `None` if the context is not cancelled.
2282    ///
2283    /// # Example
2284    ///
2285    /// ```
2286    /// use asupersync::{Cx, types::{CancelKind, CancelReason}};
2287    ///
2288    /// let cx = Cx::for_testing();
2289    ///
2290    /// // Simulate a deep cancellation chain
2291    /// let deadline = CancelReason::deadline();
2292    /// let parent1 = CancelReason::parent_cancelled().with_cause(deadline);
2293    /// let parent2 = CancelReason::parent_cancelled().with_cause(parent1);
2294    ///
2295    /// cx.set_cancel_reason(parent2);
2296    ///
2297    /// // Root cause is the original Deadline, not ParentCancelled
2298    /// if let Some(root) = cx.root_cancel_cause() {
2299    ///     assert_eq!(root.kind, CancelKind::Deadline);
2300    /// }
2301    /// ```
2302    #[must_use]
2303    pub fn root_cancel_cause(&self) -> Option<CancelReason> {
2304        let inner = self.inner.read();
2305        inner.cancel_reason.as_ref().map(|r| r.root_cause().clone())
2306    }
2307
2308    /// Checks if cancellation was due to a specific kind.
2309    ///
2310    /// This checks the immediate reason only, not the cause chain. For example,
2311    /// if a task was cancelled with `ParentCancelled` due to an upstream timeout,
2312    /// `cancelled_by(CancelKind::ParentCancelled)` returns `true` but
2313    /// `cancelled_by(CancelKind::Timeout)` returns `false`.
2314    ///
2315    /// Use `any_cause_is()` to check the full cause chain.
2316    ///
2317    /// # Example
2318    ///
2319    /// ```
2320    /// use asupersync::{Cx, types::CancelKind};
2321    ///
2322    /// let cx = Cx::for_testing();
2323    /// cx.cancel_with(CancelKind::User, Some("manual cancel"));
2324    ///
2325    /// assert!(cx.cancelled_by(CancelKind::User));
2326    /// assert!(!cx.cancelled_by(CancelKind::Timeout));
2327    /// ```
2328    #[must_use]
2329    pub fn cancelled_by(&self, kind: CancelKind) -> bool {
2330        let inner = self.inner.read();
2331        inner.cancel_reason.as_ref().is_some_and(|r| r.kind == kind)
2332    }
2333
2334    /// Checks if any cause in the chain is a specific kind.
2335    ///
2336    /// This searches the entire cause chain, from the immediate reason to the
2337    /// root cause. This is useful for checking if a specific condition (like
2338    /// a timeout) anywhere in the hierarchy caused this cancellation.
2339    ///
2340    /// # Example
2341    ///
2342    /// ```
2343    /// use asupersync::{Cx, types::{CancelKind, CancelReason}};
2344    ///
2345    /// let cx = Cx::for_testing();
2346    ///
2347    /// // Grandchild cancelled due to parent timeout
2348    /// let timeout = CancelReason::timeout();
2349    /// let parent_cancelled = CancelReason::parent_cancelled().with_cause(timeout);
2350    ///
2351    /// cx.set_cancel_reason(parent_cancelled);
2352    ///
2353    /// // Immediate reason is ParentCancelled, but timeout is in the chain
2354    /// assert!(cx.cancelled_by(CancelKind::ParentCancelled));
2355    /// assert!(!cx.cancelled_by(CancelKind::Timeout));  // immediate only
2356    /// assert!(cx.any_cause_is(CancelKind::Timeout));   // searches chain
2357    /// assert!(cx.any_cause_is(CancelKind::ParentCancelled));  // also in chain
2358    /// ```
2359    #[must_use]
2360    pub fn any_cause_is(&self, kind: CancelKind) -> bool {
2361        let inner = self.inner.read();
2362        inner
2363            .cancel_reason
2364            .as_ref()
2365            .is_some_and(|r| r.any_cause_is(kind))
2366    }
2367
2368    /// Sets the cancellation reason (for testing purposes).
2369    ///
2370    /// This method allows tests to set a specific cancellation reason, including
2371    /// complex cause chains. It sets both the `cancel_requested` flag and the
2372    /// `cancel_reason`.
2373    ///
2374    /// # Example
2375    ///
2376    /// ```
2377    /// use asupersync::{Cx, types::{CancelKind, CancelReason}};
2378    ///
2379    /// let cx = Cx::for_testing();
2380    ///
2381    /// // Create a chained reason for testing
2382    /// let root = CancelReason::deadline();
2383    /// let chained = CancelReason::parent_cancelled().with_cause(root);
2384    ///
2385    /// cx.set_cancel_reason(chained);
2386    ///
2387    /// assert!(cx.is_cancel_requested());
2388    /// assert_eq!(cx.cancel_reason().unwrap().kind, CancelKind::ParentCancelled);
2389    /// ```
2390    pub fn set_cancel_reason(&self, reason: CancelReason) {
2391        let waker = {
2392            let mut inner = self.inner.write();
2393            inner.cancel_requested = true;
2394            inner
2395                .fast_cancel
2396                .store(true, std::sync::atomic::Ordering::Release);
2397            inner.cancel_reason = Some(reason);
2398            inner.cancel_waker.clone()
2399        };
2400        if let Some(w) = waker {
2401            w.wake();
2402        }
2403    }
2404
2405    /// Races multiple futures, waiting for the first to complete.
2406    ///
2407    /// This method is used by the `race!` macro. It runs the provided futures
2408    /// concurrently (inline, not spawned) and returns the result of the first
2409    /// one to complete. Losers are dropped (cancelled).
2410    ///
2411    /// # Cancellation vs Draining
2412    ///
2413    /// This method **drops** the losing futures, which cancels them. However,
2414    /// unlike [`Scope::race`](crate::cx::Scope::race), it does not await the
2415    /// losers to ensure they have fully cleaned up ("drained").
2416    ///
2417    /// If you are racing [`TaskHandle`](crate::runtime::TaskHandle)s and require
2418    /// the "Losers are drained" invariant (parent waits for losers to terminate),
2419    /// use [`Scope::race`](crate::cx::Scope::race) or
2420    /// [`Scope::race_all`](crate::cx::Scope::race_all) instead.
2421    pub async fn race<T>(
2422        &self,
2423        futures: Vec<Pin<Box<dyn Future<Output = T> + Send>>>,
2424    ) -> Result<T, JoinError> {
2425        if futures.is_empty() {
2426            return std::future::pending().await;
2427        }
2428        let (res, _) = SelectAll::new(futures)
2429            .await
2430            .map_err(|_| JoinError::PolledAfterCompletion)?;
2431        Ok(res)
2432    }
2433
2434    /// Races multiple named futures.
2435    ///
2436    /// Similar to `race`, but accepts names for tracing purposes.
2437    ///
2438    /// # Cancellation vs Draining
2439    ///
2440    /// This method **drops** the losing futures, which cancels them. However,
2441    /// unlike [`Scope::race`](crate::cx::Scope::race), it does not await the
2442    /// losers to ensure they have fully cleaned up ("drained").
2443    pub async fn race_named<T>(&self, futures: NamedFutures<T>) -> Result<T, JoinError> {
2444        let futures: Vec<_> = futures.into_iter().map(|(_, f)| f).collect();
2445        self.race(futures).await
2446    }
2447
2448    /// Races multiple futures with a timeout.
2449    ///
2450    /// If the timeout expires before any future completes, returns a cancellation error.
2451    ///
2452    /// # Cancellation vs Draining
2453    ///
2454    /// This method **drops** the losing futures (or all futures on timeout),
2455    /// which cancels them. However, it does not await the losers to ensure
2456    /// they have fully cleaned up ("drained").
2457    pub async fn race_timeout<T>(
2458        &self,
2459        duration: Duration,
2460        futures: Vec<Pin<Box<dyn Future<Output = T> + Send>>>,
2461    ) -> Result<T, JoinError>
2462    where
2463        Caps: cap::HasTime,
2464    {
2465        let race_fut = std::pin::pin!(self.race(futures));
2466        let now = self
2467            .handles
2468            .timer_driver
2469            .as_ref()
2470            .map_or_else(wall_clock_now, TimerDriverHandle::now);
2471        timeout(now, duration, race_fut)
2472            .await
2473            .unwrap_or_else(|_| Err(JoinError::Cancelled(CancelReason::timeout())))
2474    }
2475
2476    /// Races multiple named futures with a timeout.
2477    ///
2478    /// # Cancellation vs Draining
2479    ///
2480    /// This method **drops** the losing futures (or all futures on timeout),
2481    /// which cancels them. However, it does not await the losers to ensure
2482    /// they have fully cleaned up ("drained").
2483    pub async fn race_timeout_named<T>(
2484        &self,
2485        duration: Duration,
2486        futures: NamedFutures<T>,
2487    ) -> Result<T, JoinError>
2488    where
2489        Caps: cap::HasTime,
2490    {
2491        let futures: Vec<_> = futures.into_iter().map(|(_, f)| f).collect();
2492        self.race_timeout(duration, futures).await
2493    }
2494
2495    /// Creates a [`Scope`](super::Scope) bound to this context's region.
2496    ///
2497    /// The returned `Scope` can be used to spawn tasks, create child regions,
2498    /// and register finalizers. All spawned tasks will be owned by this
2499    /// context's region.
2500    ///
2501    /// # Example
2502    ///
2503    /// ```ignore
2504    /// // Using the scope! macro (recommended):
2505    /// scope!(cx, {
2506    ///     let handle = scope.spawn(|cx| async { 42 });
2507    ///     handle.await
2508    /// });
2509    ///
2510    /// // Manual scope creation:
2511    /// let scope = cx.scope();
2512    /// // Use scope for spawning...
2513    /// ```
2514    ///
2515    /// # Note
2516    ///
2517    /// In Phase 0, this creates a scope bound to the current region. In later
2518    /// phases, the `scope!` macro will create child regions with proper
2519    /// quiescence guarantees.
2520    #[must_use]
2521    pub fn scope(&self) -> crate::cx::Scope<'static> {
2522        let budget = self.budget();
2523        debug!(
2524            task_id = ?self.task_id(),
2525            region_id = ?self.region_id(),
2526            budget_deadline = ?budget.deadline,
2527            budget_poll_quota = budget.poll_quota,
2528            budget_cost_quota = ?budget.cost_quota,
2529            budget_priority = budget.priority,
2530            budget_source = "inherited",
2531            "scope budget inherited"
2532        );
2533        crate::cx::Scope::new(self.region_id(), budget)
2534    }
2535
2536    /// Creates a [`Scope`](super::Scope) bound to this context's region with a custom budget.
2537    ///
2538    /// This is used by the `scope!` macro when a budget is specified:
2539    /// ```ignore
2540    /// scope!(cx, budget: Budget::with_deadline_secs(5), {
2541    ///     // body
2542    /// })
2543    /// ```
2544    #[must_use]
2545    pub fn scope_with_budget(&self, budget: Budget) -> crate::cx::Scope<'static> {
2546        let parent_budget = self.budget();
2547        let deadline_tightened = match (parent_budget.deadline, budget.deadline) {
2548            (Some(parent), Some(child)) => child < parent,
2549            (None, Some(_)) => true,
2550            _ => false,
2551        };
2552        let poll_tightened = budget.poll_quota < parent_budget.poll_quota;
2553        let cost_tightened = match (parent_budget.cost_quota, budget.cost_quota) {
2554            (Some(parent), Some(child)) => child < parent,
2555            (None, Some(_)) => true,
2556            _ => false,
2557        };
2558        let priority_boosted = budget.priority > parent_budget.priority;
2559        let _ = (
2560            &deadline_tightened,
2561            &poll_tightened,
2562            &cost_tightened,
2563            &priority_boosted,
2564        );
2565
2566        // Clamp child budget to parent constraints (structured concurrency
2567        // invariant: child regions cannot exceed parent resource limits).
2568        // Priority is intentionally unclamped — boosting is allowed.
2569        let clamped_deadline = match (parent_budget.deadline, budget.deadline) {
2570            (Some(parent), Some(child)) => Some(if child < parent { child } else { parent }),
2571            (Some(parent), None) => Some(parent),
2572            (None, child) => child,
2573        };
2574        let clamped_poll_quota = budget.poll_quota.min(parent_budget.poll_quota);
2575        let clamped_cost_quota = match (parent_budget.cost_quota, budget.cost_quota) {
2576            (Some(parent), Some(child)) => Some(child.min(parent)),
2577            (Some(parent), None) => Some(parent),
2578            (None, child) => child,
2579        };
2580        let clamped = Budget {
2581            deadline: clamped_deadline,
2582            poll_quota: clamped_poll_quota,
2583            cost_quota: clamped_cost_quota,
2584            priority: budget.priority,
2585        };
2586
2587        debug!(
2588            task_id = ?self.task_id(),
2589            region_id = ?self.region_id(),
2590            parent_deadline = ?parent_budget.deadline,
2591            parent_poll_quota = parent_budget.poll_quota,
2592            parent_cost_quota = ?parent_budget.cost_quota,
2593            parent_priority = parent_budget.priority,
2594            budget_deadline = ?clamped.deadline,
2595            budget_poll_quota = clamped.poll_quota,
2596            budget_cost_quota = ?clamped.cost_quota,
2597            budget_priority = clamped.priority,
2598            deadline_tightened,
2599            poll_tightened,
2600            cost_tightened,
2601            priority_boosted,
2602            budget_source = "explicit",
2603            "scope budget set"
2604        );
2605        crate::cx::Scope::new(self.region_id(), clamped)
2606    }
2607}
2608
2609impl Cx<cap::All> {
2610    /// Creates a capability context for testing purposes.
2611    ///
2612    /// This constructor creates a Cx with default IDs and an infinite budget,
2613    /// suitable for unit and integration tests. The resulting context is fully
2614    /// functional but not connected to a real runtime.
2615    ///
2616    /// # Example
2617    ///
2618    /// ```
2619    /// use asupersync::Cx;
2620    ///
2621    /// let cx = Cx::for_testing();
2622    /// assert!(!cx.is_cancel_requested());
2623    /// assert!(cx.checkpoint().is_ok());
2624    /// ```
2625    ///
2626    /// # Note
2627    ///
2628    /// This API is intended for testing only. Production code should receive
2629    /// Cx instances from the runtime, not construct them directly.
2630    #[must_use]
2631    pub fn for_testing() -> Self {
2632        Self::new(
2633            RegionId::new_for_test(0, 0),
2634            TaskId::new_for_test(0, 0),
2635            Budget::INFINITE,
2636        )
2637    }
2638
2639    /// Creates a test-only capability context with a specified budget.
2640    ///
2641    /// Similar to [`Self::for_testing()`] but allows specifying a custom budget
2642    /// for testing timeout behavior.
2643    ///
2644    /// # Example
2645    ///
2646    /// ```ignore
2647    /// use asupersync::{Cx, Budget, Time};
2648    ///
2649    /// // Create a context with a 30-second deadline
2650    /// let cx = Cx::for_testing_with_budget(
2651    ///     Budget::new().with_deadline(Time::from_secs(30))
2652    /// );
2653    /// ```
2654    ///
2655    /// # Note
2656    ///
2657    /// This API is intended for testing only. Production code should receive
2658    /// Cx instances from the runtime, not construct them directly.
2659    #[must_use]
2660    pub fn for_testing_with_budget(budget: Budget) -> Self {
2661        Self::new(
2662            RegionId::new_for_test(0, 0),
2663            TaskId::new_for_test(0, 0),
2664            budget,
2665        )
2666    }
2667
2668    /// Creates a test-only capability context with lab I/O capability.
2669    ///
2670    /// This constructor creates a Cx with a `LabIoCap` for testing I/O code paths
2671    /// without performing real I/O.
2672    ///
2673    /// # Example
2674    ///
2675    /// ```ignore
2676    /// use asupersync::Cx;
2677    ///
2678    /// let cx = Cx::for_testing_with_io();
2679    /// assert!(cx.has_io());
2680    /// assert!(!cx.io().unwrap().is_real_io());
2681    /// ```
2682    ///
2683    /// # Note
2684    ///
2685    /// This API is intended for testing only.
2686    #[must_use]
2687    pub fn for_testing_with_io() -> Self {
2688        Self::new_with_io(
2689            RegionId::new_for_test(0, 0),
2690            TaskId::new_for_test(0, 0),
2691            Budget::INFINITE,
2692            None,
2693            None,
2694            Some(Arc::new(crate::io::LabIoCap::new())),
2695            None,
2696        )
2697    }
2698
2699    /// Creates a request-scoped capability context with a specified budget.
2700    ///
2701    /// This is intended for production request handling that needs unique
2702    /// task/region identifiers outside the scheduler.
2703    #[must_use]
2704    pub fn for_request_with_budget(budget: Budget) -> Self {
2705        Self::new(RegionId::new_ephemeral(), TaskId::new_ephemeral(), budget)
2706    }
2707
2708    /// Creates a request-scoped capability context with an infinite budget.
2709    #[must_use]
2710    pub fn for_request() -> Self {
2711        Self::for_request_with_budget(Budget::INFINITE)
2712    }
2713
2714    /// Creates a test-only capability context with a remote capability.
2715    ///
2716    /// This constructor creates a Cx with a [`RemoteCap`] for testing remote
2717    /// task spawning without a real network transport.
2718    ///
2719    /// # Note
2720    ///
2721    /// This API is intended for testing only.
2722    #[must_use]
2723    pub fn for_testing_with_remote(cap: RemoteCap) -> Self {
2724        let mut cx = Self::for_testing();
2725        Arc::make_mut(&mut cx.handles).remote_cap = Some(Arc::new(cap));
2726        cx
2727    }
2728}
2729
2730/// RAII guard returned by [`Cx::enter_span`].
2731///
2732/// On drop, restores the previous `DiagnosticContext` and emits a
2733/// span-exit log entry.
2734pub struct SpanGuard<Caps = cap::All> {
2735    cx: Cx<Caps>,
2736    prev: DiagnosticContext,
2737}
2738
2739impl<Caps> Drop for SpanGuard<Caps> {
2740    fn drop(&mut self) {
2741        let name = self
2742            .cx
2743            .diagnostic_context()
2744            .custom("span.name")
2745            .unwrap_or("unknown")
2746            .to_owned();
2747        self.cx
2748            .log(LogEntry::debug(format!("span exit: {name}")).with_target("tracing"));
2749        self.cx.set_diagnostic_context(self.prev.clone());
2750    }
2751}
2752
2753#[cfg(test)]
2754mod tests {
2755    use super::*;
2756    use crate::cx::macaroon::CaveatPredicate;
2757    #[cfg(feature = "messaging-fabric")]
2758    use crate::messaging::capability::{CommandFamily, FabricCapability, FabricCapabilityScope};
2759    #[cfg(feature = "messaging-fabric")]
2760    use crate::messaging::class::DeliveryClass;
2761    #[cfg(feature = "messaging-fabric")]
2762    use crate::messaging::ir::{CapabilityPermission, CapabilityTokenSchema, SubjectFamily};
2763    #[cfg(feature = "messaging-fabric")]
2764    use crate::messaging::subject::SubjectPattern;
2765    use crate::trace::TraceBufferHandle;
2766    use crate::util::{ArenaIndex, DetEntropy};
2767    use std::sync::atomic::{AtomicU8, Ordering};
2768
2769    static CURRENT_CX_DTOR_STATE: AtomicU8 = AtomicU8::new(0);
2770
2771    thread_local! {
2772        static CURRENT_CX_DTOR_PROBE: CurrentCxDtorProbe = const { CurrentCxDtorProbe };
2773    }
2774
2775    struct CurrentCxDtorProbe;
2776
2777    impl Drop for CurrentCxDtorProbe {
2778        fn drop(&mut self) {
2779            let state = match CURRENT_CX.try_with(|slot| slot.borrow().clone()) {
2780                Ok(Some(_)) => 1,
2781                Ok(None) => 2,
2782                Err(_) => {
2783                    if Cx::current().is_none() {
2784                        3
2785                    } else {
2786                        4
2787                    }
2788                }
2789            };
2790            CURRENT_CX_DTOR_STATE.store(state, Ordering::SeqCst);
2791        }
2792    }
2793
2794    fn test_cx() -> Cx {
2795        Cx::new(
2796            RegionId::from_arena(ArenaIndex::new(0, 0)),
2797            TaskId::from_arena(ArenaIndex::new(0, 0)),
2798            Budget::INFINITE,
2799        )
2800    }
2801
2802    fn test_cx_with_entropy(seed: u64) -> Cx {
2803        Cx::new_with_observability(
2804            RegionId::from_arena(ArenaIndex::new(0, 0)),
2805            TaskId::from_arena(ArenaIndex::new(0, 0)),
2806            Budget::INFINITE,
2807            None,
2808            None,
2809            Some(Arc::new(DetEntropy::new(seed))),
2810        )
2811    }
2812
2813    fn trace_message(event: &crate::trace::TraceEvent) -> &str {
2814        match &event.data {
2815            crate::trace::TraceData::Message(message) => message,
2816            other => panic!("expected user trace message, got {other:?}"),
2817        }
2818    }
2819
2820    #[cfg(feature = "messaging-fabric")]
2821    fn capability_schema(
2822        families: Vec<SubjectFamily>,
2823        permissions: Vec<CapabilityPermission>,
2824    ) -> CapabilityTokenSchema {
2825        CapabilityTokenSchema {
2826            name: "fabric.cx.demo".to_owned(),
2827            families,
2828            delivery_classes: vec![DeliveryClass::EphemeralInteractive],
2829            permissions,
2830        }
2831    }
2832
2833    #[test]
2834    fn io_not_available_by_default() {
2835        let cx = test_cx();
2836        assert!(!cx.has_io());
2837        assert!(cx.io().is_none());
2838    }
2839
2840    #[test]
2841    fn io_available_with_for_testing_with_io() {
2842        let cx: Cx = Cx::for_testing_with_io();
2843        assert!(cx.has_io());
2844        let io = cx.io().expect("should have io cap");
2845        assert!(!io.is_real_io());
2846        assert_eq!(io.name(), "lab");
2847    }
2848
2849    #[test]
2850    fn checkpoint_without_cancel() {
2851        let cx = test_cx();
2852        assert!(cx.checkpoint().is_ok());
2853    }
2854
2855    #[test]
2856    fn checkpoint_with_cancel() {
2857        let cx = test_cx();
2858        cx.set_cancel_requested(true);
2859        assert!(cx.checkpoint().is_err());
2860    }
2861
2862    #[test]
2863    fn masked_defers_cancel() {
2864        let cx = test_cx();
2865        cx.set_cancel_requested(true);
2866
2867        cx.masked(|| {
2868            assert!(
2869                cx.checkpoint().is_ok(),
2870                "checkpoint should succeed when masked"
2871            );
2872        });
2873
2874        assert!(
2875            cx.checkpoint().is_err(),
2876            "checkpoint should fail after unmasking"
2877        );
2878    }
2879
2880    #[test]
2881    fn trace_attaches_logical_time() {
2882        let cx = test_cx();
2883        let trace = TraceBufferHandle::new(8);
2884        cx.set_trace_buffer(trace.clone());
2885
2886        cx.trace("hello");
2887
2888        let events = trace.snapshot();
2889        let event = events.first().expect("trace event");
2890        assert!(event.logical_time.is_some());
2891    }
2892
2893    #[test]
2894    fn masked_panic_safety() {
2895        use std::panic::{AssertUnwindSafe, catch_unwind};
2896
2897        let cx = test_cx();
2898        cx.set_cancel_requested(true);
2899
2900        // Ensure initial state is cancelled (unmasked)
2901        assert!(cx.checkpoint().is_err());
2902
2903        // Run a masked block that panics
2904        let cx_clone = cx.clone();
2905        let _ = catch_unwind(AssertUnwindSafe(|| {
2906            cx_clone.masked(|| {
2907                // Avoid `panic!/unreachable!` macros (UBS critical). We still
2908                // need an unwind here to validate mask-depth restoration.
2909                std::panic::resume_unwind(Box::new("oops"));
2910            });
2911        }));
2912
2913        // After panic, mask depth should have been restored.
2914        // If it leaked, checkpoint() will return Ok(()) because it thinks it's still masked.
2915        assert!(
2916            cx.checkpoint().is_err(),
2917            "Cx remains masked after panic! mask_depth leaked."
2918        );
2919    }
2920
2921    #[test]
2922    fn current_returns_none_during_thread_local_teardown() {
2923        CURRENT_CX_DTOR_STATE.store(0, Ordering::SeqCst);
2924
2925        let join = std::thread::spawn(|| {
2926            // Initialize the probe first so its destructor runs after CURRENT_CX
2927            // and exercises ambient lookup during TLS teardown.
2928            CURRENT_CX_DTOR_PROBE.with(|_| {});
2929
2930            let cx = test_cx();
2931            let _guard = Cx::set_current(Some(cx));
2932            assert!(Cx::current().is_some(), "current cx should be installed");
2933        });
2934
2935        join.join()
2936            .expect("thread-local teardown should not panic when reading Cx");
2937        assert_eq!(
2938            CURRENT_CX_DTOR_STATE.load(Ordering::SeqCst),
2939            3,
2940            "Cx::current() should fail closed once CURRENT_CX is unavailable"
2941        );
2942    }
2943
2944    /// INV-MASK-BOUNDED: exceeding MAX_MASK_DEPTH must panic.
2945    #[test]
2946    #[should_panic(expected = "MAX_MASK_DEPTH")]
2947    fn mask_depth_exceeds_bound_panics() {
2948        let cx = test_cx();
2949
2950        // Directly set mask_depth to the limit, then call masked() once
2951        // to trigger the bound check. This avoids deep nesting which
2952        // would cause double-panic in MaskGuard drops during unwind.
2953        {
2954            let mut inner = cx.inner.write();
2955            inner.mask_depth = crate::types::task_context::MAX_MASK_DEPTH;
2956        }
2957        // This call should panic because mask_depth is already at the limit.
2958        cx.masked(|| {});
2959    }
2960
2961    #[test]
2962    fn random_usize_in_range() {
2963        let cx = test_cx_with_entropy(123);
2964        for _ in 0..100 {
2965            let value = cx.random_usize(7);
2966            assert!(value < 7);
2967        }
2968    }
2969
2970    #[test]
2971    fn shuffle_deterministic() {
2972        let cx1 = test_cx_with_entropy(42);
2973        let cx2 = test_cx_with_entropy(42);
2974
2975        let mut a = [1, 2, 3, 4, 5, 6, 7, 8];
2976        let mut b = [1, 2, 3, 4, 5, 6, 7, 8];
2977
2978        cx1.shuffle(&mut a);
2979        cx2.shuffle(&mut b);
2980
2981        assert_eq!(a, b);
2982    }
2983
2984    #[test]
2985    fn random_f64_range() {
2986        let cx = test_cx_with_entropy(7);
2987        for _ in 0..100 {
2988            let value = cx.random_f64();
2989            assert!((0.0..1.0).contains(&value));
2990        }
2991    }
2992
2993    // ========================================================================
2994    // Cancel Attribution API Tests
2995    // ========================================================================
2996
2997    #[test]
2998    fn cancel_with_sets_reason() {
2999        let cx = test_cx();
3000        assert!(cx.cancel_reason().is_none());
3001
3002        cx.cancel_with(CancelKind::User, Some("manual stop"));
3003
3004        assert!(cx.is_cancel_requested());
3005        let reason = cx.cancel_reason().expect("should have reason");
3006        assert_eq!(reason.kind, CancelKind::User);
3007        assert_eq!(reason.message, Some("manual stop".to_string()));
3008    }
3009
3010    #[test]
3011    fn cancel_with_no_message() {
3012        let cx = test_cx();
3013        cx.cancel_with(CancelKind::Timeout, None);
3014
3015        let reason = cx.cancel_reason().expect("should have reason");
3016        assert_eq!(reason.kind, CancelKind::Timeout);
3017        assert!(reason.message.is_none());
3018    }
3019
3020    #[test]
3021    fn cancel_reason_returns_none_when_not_cancelled() {
3022        let cx = test_cx();
3023        assert!(cx.cancel_reason().is_none());
3024    }
3025
3026    #[test]
3027    fn cancel_chain_empty_when_not_cancelled() {
3028        let cx = test_cx();
3029        assert!(cx.cancel_chain().next().is_none());
3030    }
3031
3032    #[test]
3033    fn cancel_chain_traverses_causes() {
3034        let cx = test_cx();
3035
3036        // Build a chain: ParentCancelled -> ParentCancelled -> Deadline
3037        let deadline = CancelReason::deadline();
3038        let parent1 = CancelReason::parent_cancelled().with_cause(deadline);
3039        let parent2 = CancelReason::parent_cancelled().with_cause(parent1);
3040
3041        cx.set_cancel_reason(parent2);
3042
3043        let chain: Vec<_> = cx.cancel_chain().collect();
3044        assert_eq!(chain.len(), 3);
3045        assert_eq!(chain[0].kind, CancelKind::ParentCancelled);
3046        assert_eq!(chain[1].kind, CancelKind::ParentCancelled);
3047        assert_eq!(chain[2].kind, CancelKind::Deadline);
3048    }
3049
3050    #[test]
3051    fn root_cancel_cause_returns_none_when_not_cancelled() {
3052        let cx = test_cx();
3053        assert!(cx.root_cancel_cause().is_none());
3054    }
3055
3056    #[test]
3057    fn root_cancel_cause_finds_root() {
3058        let cx = test_cx();
3059
3060        // Build: ParentCancelled -> Timeout
3061        let timeout = CancelReason::timeout();
3062        let parent = CancelReason::parent_cancelled().with_cause(timeout);
3063
3064        cx.set_cancel_reason(parent);
3065
3066        let root = cx.root_cancel_cause().expect("should have root");
3067        assert_eq!(root.kind, CancelKind::Timeout);
3068    }
3069
3070    #[test]
3071    fn root_cancel_cause_with_no_chain() {
3072        let cx = test_cx();
3073        cx.cancel_with(CancelKind::Shutdown, None);
3074
3075        let root = cx.root_cancel_cause().expect("should have root");
3076        assert_eq!(root.kind, CancelKind::Shutdown);
3077    }
3078
3079    #[test]
3080    fn cancelled_by_checks_immediate_reason() {
3081        let cx = test_cx();
3082
3083        // Build: ParentCancelled -> Deadline
3084        let deadline = CancelReason::deadline();
3085        let parent = CancelReason::parent_cancelled().with_cause(deadline);
3086
3087        cx.set_cancel_reason(parent);
3088
3089        // Immediate reason is ParentCancelled
3090        assert!(cx.cancelled_by(CancelKind::ParentCancelled));
3091        // Deadline is in chain but not immediate
3092        assert!(!cx.cancelled_by(CancelKind::Deadline));
3093    }
3094
3095    #[test]
3096    fn cancelled_by_returns_false_when_not_cancelled() {
3097        let cx = test_cx();
3098        assert!(!cx.cancelled_by(CancelKind::User));
3099    }
3100
3101    #[test]
3102    fn any_cause_is_searches_chain() {
3103        let cx = test_cx();
3104
3105        // Build: ParentCancelled -> ParentCancelled -> Timeout
3106        let timeout = CancelReason::timeout();
3107        let parent1 = CancelReason::parent_cancelled().with_cause(timeout);
3108        let parent2 = CancelReason::parent_cancelled().with_cause(parent1);
3109
3110        cx.set_cancel_reason(parent2);
3111
3112        // All kinds in the chain return true
3113        assert!(cx.any_cause_is(CancelKind::ParentCancelled));
3114        assert!(cx.any_cause_is(CancelKind::Timeout));
3115
3116        // Kinds not in chain return false
3117        assert!(!cx.any_cause_is(CancelKind::Deadline));
3118        assert!(!cx.any_cause_is(CancelKind::Shutdown));
3119    }
3120
3121    #[test]
3122    fn any_cause_is_returns_false_when_not_cancelled() {
3123        let cx = test_cx();
3124        assert!(!cx.any_cause_is(CancelKind::Timeout));
3125    }
3126
3127    #[test]
3128    fn set_cancel_reason_sets_flag_and_reason() {
3129        let cx = test_cx();
3130        assert!(!cx.is_cancel_requested());
3131
3132        cx.set_cancel_reason(CancelReason::shutdown());
3133
3134        assert!(cx.is_cancel_requested());
3135        assert_eq!(
3136            cx.cancel_reason().expect("should have reason").kind,
3137            CancelKind::Shutdown
3138        );
3139    }
3140
3141    #[test]
3142    fn integration_realistic_usage() {
3143        // Simulate a realistic cancellation scenario:
3144        // 1. Root region times out
3145        // 2. Child task receives ParentCancelled
3146        // 3. Handler inspects the cause chain
3147
3148        let cx = test_cx();
3149
3150        // Simulate runtime setting a chained reason (timeout -> parent_cancelled)
3151        let timeout_reason = CancelReason::timeout().with_message("request timeout");
3152        let child_reason = CancelReason::parent_cancelled().with_cause(timeout_reason);
3153
3154        cx.set_cancel_reason(child_reason);
3155
3156        // Handler code checks various conditions
3157        assert!(cx.is_cancel_requested());
3158
3159        // Immediate reason is ParentCancelled
3160        assert!(cx.cancelled_by(CancelKind::ParentCancelled));
3161
3162        // But we want to know if a timeout caused it
3163        if cx.any_cause_is(CancelKind::Timeout) {
3164            // Log or metric: "Request cancelled due to timeout"
3165            let root = cx.root_cancel_cause().unwrap();
3166            assert_eq!(root.kind, CancelKind::Timeout);
3167            assert_eq!(root.message, Some("request timeout".to_string()));
3168        }
3169
3170        // Full chain inspection
3171        let chain: Vec<_> = cx.cancel_chain().collect();
3172        assert_eq!(chain.len(), 2);
3173        assert_eq!(chain[0].kind, CancelKind::ParentCancelled);
3174        assert_eq!(chain[1].kind, CancelKind::Timeout);
3175    }
3176
3177    #[test]
3178    fn cancel_fast_sets_flag_and_reason() {
3179        let cx = test_cx();
3180        assert!(!cx.is_cancel_requested());
3181        assert!(cx.cancel_reason().is_none());
3182
3183        cx.cancel_fast(CancelKind::Shutdown);
3184
3185        assert!(cx.is_cancel_requested());
3186        let reason = cx.cancel_reason().expect("should have reason");
3187        assert_eq!(reason.kind, CancelKind::Shutdown);
3188    }
3189
3190    #[test]
3191    fn cancel_fast_no_cause_chain() {
3192        // cancel_fast is for the no-attribution path - it shouldn't create cause chains
3193        let cx = test_cx();
3194
3195        cx.cancel_fast(CancelKind::Timeout);
3196
3197        let reason = cx.cancel_reason().expect("should have reason");
3198        // No cause chain
3199        assert!(reason.cause.is_none());
3200        // No message
3201        assert!(reason.message.is_none());
3202        // Not truncated (nothing to truncate)
3203        assert!(!reason.truncated);
3204    }
3205
3206    #[test]
3207    fn cancel_fast_sets_region() {
3208        let cx = test_cx();
3209
3210        cx.cancel_fast(CancelKind::User);
3211
3212        let reason = cx.cancel_reason().expect("should have reason");
3213        // Region should be set from the Cx
3214        let expected_region = RegionId::from_arena(ArenaIndex::new(0, 0));
3215        assert_eq!(reason.origin_region, expected_region);
3216    }
3217
3218    #[test]
3219    fn cancel_fast_minimal_allocation() {
3220        // cancel_fast should create minimal CancelReason without extra allocations
3221        let cx = test_cx();
3222
3223        cx.cancel_fast(CancelKind::Deadline);
3224
3225        let reason = cx.cancel_reason().expect("should have reason");
3226        // Verify minimal structure: just kind, region, no message, no cause, no truncation
3227        assert_eq!(reason.kind, CancelKind::Deadline);
3228        assert!(reason.message.is_none());
3229        assert!(reason.cause.is_none());
3230        assert!(!reason.truncated);
3231        assert!(reason.truncated_at_depth.is_none());
3232
3233        // Memory cost should be minimal (just the struct size, no boxed cause)
3234        let cost = reason.estimated_memory_cost();
3235        // Should be roughly just the size of CancelReason without any heap allocations for cause
3236        assert!(
3237            cost < 200,
3238            "cancel_fast should have minimal memory cost, got {cost}"
3239        );
3240    }
3241
3242    // ========================================================================
3243    // Checkpoint Progress Reporting Tests
3244    // ========================================================================
3245
3246    #[test]
3247    fn checkpoint_records_progress() {
3248        let cx = test_cx();
3249
3250        // Initially no checkpoints
3251        let state = cx.checkpoint_state();
3252        assert!(state.last_checkpoint.is_none());
3253        assert!(state.last_message.is_none());
3254        assert_eq!(state.checkpoint_count, 0);
3255
3256        // Record first checkpoint
3257        assert!(cx.checkpoint().is_ok());
3258        let state = cx.checkpoint_state();
3259        assert!(state.last_checkpoint.is_some());
3260        assert!(state.last_message.is_none());
3261        assert_eq!(state.checkpoint_count, 1);
3262
3263        // Record second checkpoint
3264        assert!(cx.checkpoint().is_ok());
3265        let state = cx.checkpoint_state();
3266        assert_eq!(state.checkpoint_count, 2);
3267    }
3268
3269    #[test]
3270    fn checkpoint_with_records_message() {
3271        let cx = test_cx();
3272
3273        // Record checkpoint with message
3274        assert!(cx.checkpoint_with("processing step 1").is_ok());
3275        let state = cx.checkpoint_state();
3276        assert!(state.last_checkpoint.is_some());
3277        assert_eq!(state.last_message.as_deref(), Some("processing step 1"));
3278        assert_eq!(state.checkpoint_count, 1);
3279
3280        // Second checkpoint overwrites message
3281        assert!(cx.checkpoint_with("processing step 2").is_ok());
3282        let state = cx.checkpoint_state();
3283        assert_eq!(state.last_message.as_deref(), Some("processing step 2"));
3284        assert_eq!(state.checkpoint_count, 2);
3285    }
3286
3287    #[test]
3288    fn checkpoint_clears_message() {
3289        let cx = test_cx();
3290
3291        // Record checkpoint with message
3292        assert!(cx.checkpoint_with("step 1").is_ok());
3293        assert_eq!(
3294            cx.checkpoint_state().last_message.as_deref(),
3295            Some("step 1")
3296        );
3297
3298        // Regular checkpoint clears the message
3299        assert!(cx.checkpoint().is_ok());
3300        assert!(cx.checkpoint_state().last_message.is_none());
3301    }
3302
3303    #[test]
3304    fn checkpoint_with_checks_cancel() {
3305        let cx = test_cx();
3306        cx.set_cancel_requested(true);
3307
3308        // checkpoint_with should return error on cancellation
3309        assert!(cx.checkpoint_with("should fail").is_err());
3310
3311        // But checkpoint state should still be updated
3312        let state = cx.checkpoint_state();
3313        assert_eq!(state.checkpoint_count, 1);
3314        assert_eq!(state.last_message.as_deref(), Some("should fail"));
3315    }
3316
3317    #[test]
3318    fn checkpoint_deadline_exhaustion_sets_cancel_reason() {
3319        let cx = Cx::for_testing_with_budget(Budget::new().with_deadline(Time::ZERO));
3320
3321        assert!(cx.checkpoint().is_err());
3322        let reason = cx
3323            .cancel_reason()
3324            .expect("deadline exhaustion must set reason");
3325        assert_eq!(reason.kind, CancelKind::Deadline);
3326        assert!(cx.is_cancel_requested());
3327    }
3328
3329    #[test]
3330    fn checkpoint_poll_budget_exhaustion_sets_cancel_reason() {
3331        let cx = Cx::for_testing_with_budget(Budget::new().with_poll_quota(0));
3332
3333        assert!(cx.checkpoint().is_err());
3334        let reason = cx
3335            .cancel_reason()
3336            .expect("poll quota exhaustion must set reason");
3337        assert_eq!(reason.kind, CancelKind::PollQuota);
3338        assert!(cx.is_cancel_requested());
3339    }
3340
3341    #[test]
3342    fn checkpoint_cost_budget_exhaustion_sets_cancel_reason() {
3343        let cx = Cx::for_testing_with_budget(Budget::new().with_cost_quota(0));
3344
3345        assert!(cx.checkpoint().is_err());
3346        let reason = cx
3347            .cancel_reason()
3348            .expect("cost budget exhaustion must set reason");
3349        assert_eq!(reason.kind, CancelKind::CostBudget);
3350        assert!(cx.is_cancel_requested());
3351    }
3352
3353    #[test]
3354    fn masked_checkpoint_defers_budget_exhaustion() {
3355        let cx = Cx::for_testing_with_budget(Budget::new().with_deadline(Time::ZERO));
3356
3357        cx.masked(|| {
3358            assert!(
3359                cx.checkpoint().is_ok(),
3360                "budget exhaustion should defer while masked"
3361            );
3362        });
3363
3364        let reason = cx
3365            .cancel_reason()
3366            .expect("masked checkpoint should still record exhaustion reason");
3367        assert_eq!(reason.kind, CancelKind::Deadline);
3368        assert!(
3369            cx.checkpoint().is_err(),
3370            "deadline exhaustion should be observed after unmasking"
3371        );
3372    }
3373
3374    #[test]
3375    fn checkpoint_budget_usage_reports_remaining_time_in_millis() {
3376        let budget = Budget::new()
3377            .with_deadline(Time::from_secs(10))
3378            .with_poll_quota(3)
3379            .with_cost_quota(7);
3380        let baseline = Budget::new()
3381            .with_deadline(Time::from_secs(20))
3382            .with_poll_quota(5)
3383            .with_cost_quota(11);
3384
3385        let (polls_used, cost_used, time_remaining_ms) =
3386            Cx::<cap::All>::checkpoint_budget_usage(budget, baseline, Time::from_secs(7));
3387
3388        assert_eq!(polls_used, Some(2));
3389        assert_eq!(cost_used, Some(4));
3390        assert_eq!(time_remaining_ms, Some(3_000));
3391    }
3392
3393    #[test]
3394    fn set_cancel_requested_wakes_registered_cancel_waker() {
3395        use std::sync::atomic::{AtomicUsize, Ordering};
3396        use std::task::Waker;
3397
3398        struct CountWaker(Arc<AtomicUsize>);
3399
3400        use std::task::Wake;
3401        impl Wake for CountWaker {
3402            fn wake(self: Arc<Self>) {
3403                self.0.fetch_add(1, Ordering::SeqCst);
3404            }
3405
3406            fn wake_by_ref(self: &Arc<Self>) {
3407                self.0.fetch_add(1, Ordering::SeqCst);
3408            }
3409        }
3410
3411        let cx = test_cx();
3412        let wakes = Arc::new(AtomicUsize::new(0));
3413        let waker = Waker::from(Arc::new(CountWaker(Arc::clone(&wakes))));
3414
3415        {
3416            let mut inner = cx.inner.write();
3417            inner.cancel_waker = Some(waker);
3418        }
3419
3420        cx.set_cancel_requested(true);
3421
3422        assert_eq!(
3423            wakes.load(Ordering::SeqCst),
3424            1,
3425            "set_cancel_requested(true) must wake the registered cancel waker"
3426        );
3427
3428        cx.set_cancel_requested(false);
3429
3430        assert_eq!(
3431            wakes.load(Ordering::SeqCst),
3432            1,
3433            "clearing cancellation must not spuriously wake the cancel waker"
3434        );
3435    }
3436
3437    #[test]
3438    fn checkpoint_state_is_snapshot() {
3439        let cx = test_cx();
3440
3441        // Get a snapshot
3442        let snapshot = cx.checkpoint_state();
3443        assert_eq!(snapshot.checkpoint_count, 0);
3444
3445        // Record more checkpoints
3446        assert!(cx.checkpoint().is_ok());
3447        assert!(cx.checkpoint().is_ok());
3448
3449        // Original snapshot should be unchanged
3450        assert_eq!(snapshot.checkpoint_count, 0);
3451
3452        // New snapshot should reflect updates
3453        assert_eq!(cx.checkpoint_state().checkpoint_count, 2);
3454    }
3455
3456    #[test]
3457    fn checkpoint_with_accepts_string_types() {
3458        let cx = test_cx();
3459
3460        // Test &str
3461        assert!(cx.checkpoint_with("literal").is_ok());
3462
3463        // Test String
3464        assert!(cx.checkpoint_with(String::from("owned")).is_ok());
3465
3466        // Test format!
3467        assert!(cx.checkpoint_with(format!("item {}", 42)).is_ok());
3468
3469        assert_eq!(cx.checkpoint_state().checkpoint_count, 3);
3470    }
3471
3472    // -----------------------------------------------------------------
3473    // Macaroon integration tests (bd-2lqyk.2)
3474    // -----------------------------------------------------------------
3475
3476    fn test_root_key() -> crate::security::key::AuthKey {
3477        crate::security::key::AuthKey::from_seed(42)
3478    }
3479
3480    #[test]
3481    fn cx_no_macaroon_by_default() {
3482        let cx = test_cx();
3483        assert!(cx.macaroon().is_none());
3484    }
3485
3486    #[test]
3487    fn cx_with_macaroon_attaches_token() {
3488        let key = test_root_key();
3489        let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler");
3490        let cx = test_cx().with_macaroon(token);
3491
3492        let m = cx.macaroon().expect("should have macaroon");
3493        assert_eq!(m.identifier(), "spawn:r1");
3494        assert_eq!(m.location(), "cx/scheduler");
3495    }
3496
3497    #[test]
3498    fn cx_macaroon_survives_clone() {
3499        let key = test_root_key();
3500        let token = MacaroonToken::mint(&key, "io:net", "cx/io");
3501        let cx = test_cx().with_macaroon(token);
3502        let cx2 = cx.clone();
3503
3504        assert_eq!(
3505            cx.macaroon().unwrap().identifier(),
3506            cx2.macaroon().unwrap().identifier()
3507        );
3508    }
3509
3510    #[test]
3511    fn cx_macaroon_survives_restrict() {
3512        let key = test_root_key();
3513        let token = MacaroonToken::mint(&key, "all:cap", "cx/root");
3514        let cx: Cx<cap::All> = test_cx().with_macaroon(token);
3515        let narrow: Cx<cap::None> = cx.restrict();
3516
3517        assert_eq!(
3518            cx.macaroon().unwrap().identifier(),
3519            narrow.macaroon().unwrap().identifier()
3520        );
3521    }
3522
3523    #[test]
3524    fn cx_attenuate_adds_caveat() {
3525        let key = test_root_key();
3526        let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler");
3527        let cx = test_cx().with_macaroon(token);
3528
3529        let cx2 = cx
3530            .attenuate(CaveatPredicate::TimeBefore(5000))
3531            .expect("attenuate should succeed");
3532
3533        // Original unchanged
3534        assert_eq!(cx.macaroon().unwrap().caveat_count(), 0);
3535        // Attenuated has one caveat
3536        assert_eq!(cx2.macaroon().unwrap().caveat_count(), 1);
3537        // Both share the same identifier
3538        assert_eq!(
3539            cx.macaroon().unwrap().identifier(),
3540            cx2.macaroon().unwrap().identifier()
3541        );
3542    }
3543
3544    #[test]
3545    fn cx_attenuate_returns_none_without_macaroon() {
3546        let cx = test_cx();
3547        assert!(cx.attenuate(CaveatPredicate::MaxUses(10)).is_none());
3548    }
3549
3550    #[test]
3551    fn cx_attenuate_from_budget_returns_none_without_macaroon() {
3552        let cx = test_cx();
3553        assert!(cx.attenuate_from_budget().is_none());
3554    }
3555
3556    #[test]
3557    fn cx_attenuate_from_budget_preserves_token_without_deadline() {
3558        let key = test_root_key();
3559        let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler");
3560        let cx = test_cx().with_macaroon(token);
3561
3562        let attenuated = cx
3563            .attenuate_from_budget()
3564            .expect("macaroon should still be present");
3565        assert_eq!(attenuated.macaroon().unwrap().caveat_count(), 0);
3566        assert_eq!(
3567            attenuated.macaroon().unwrap().identifier(),
3568            cx.macaroon().unwrap().identifier()
3569        );
3570    }
3571
3572    #[test]
3573    fn cx_attenuate_from_budget_adds_deadline_caveat() {
3574        let key = test_root_key();
3575        let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler");
3576        let budget = Budget::new().with_deadline(Time::from_millis(5_000));
3577        let cx = Cx::for_testing_with_budget(budget).with_macaroon(token);
3578
3579        let attenuated = cx
3580            .attenuate_from_budget()
3581            .expect("attenuation with deadline should succeed");
3582        assert_eq!(attenuated.macaroon().unwrap().caveat_count(), 1);
3583    }
3584
3585    #[test]
3586    fn cx_verify_capability_succeeds() {
3587        let key = test_root_key();
3588        let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler");
3589        let cx = test_cx().with_macaroon(token);
3590
3591        let ctx = VerificationContext::new().with_time(1000);
3592        assert!(cx.verify_capability(&key, &ctx).is_ok());
3593    }
3594
3595    #[test]
3596    fn cx_verify_capability_fails_wrong_key() {
3597        let key = test_root_key();
3598        let wrong_key = crate::security::key::AuthKey::from_seed(99);
3599        let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler");
3600        let cx = test_cx().with_macaroon(token);
3601
3602        let ctx = VerificationContext::new();
3603        let err = cx.verify_capability(&wrong_key, &ctx).unwrap_err();
3604        assert!(matches!(err, VerificationError::InvalidSignature));
3605    }
3606
3607    #[test]
3608    fn cx_verify_capability_fails_no_macaroon() {
3609        let key = test_root_key();
3610        let cx = test_cx();
3611
3612        let ctx = VerificationContext::new();
3613        let err = cx.verify_capability(&key, &ctx).unwrap_err();
3614        assert!(matches!(err, VerificationError::InvalidSignature));
3615    }
3616
3617    #[test]
3618    fn cx_verify_with_caveats() {
3619        let key = test_root_key();
3620        let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler")
3621            .add_caveat(CaveatPredicate::TimeBefore(5000))
3622            .add_caveat(CaveatPredicate::RegionScope(42));
3623
3624        let cx = test_cx().with_macaroon(token);
3625
3626        // Passes with correct context
3627        let ctx = VerificationContext::new().with_time(1000).with_region(42);
3628        assert!(cx.verify_capability(&key, &ctx).is_ok());
3629
3630        // Fails with expired time
3631        let ctx_expired = VerificationContext::new().with_time(6000).with_region(42);
3632        let err = cx.verify_capability(&key, &ctx_expired).unwrap_err();
3633        assert!(matches!(
3634            err,
3635            VerificationError::CaveatFailed { index: 0, .. }
3636        ));
3637
3638        // Fails with wrong region
3639        let ctx_wrong_region = VerificationContext::new().with_time(1000).with_region(99);
3640        let err = cx.verify_capability(&key, &ctx_wrong_region).unwrap_err();
3641        assert!(matches!(
3642            err,
3643            VerificationError::CaveatFailed { index: 1, .. }
3644        ));
3645    }
3646
3647    #[test]
3648    fn cx_attenuate_then_verify() {
3649        let key = test_root_key();
3650        let token = MacaroonToken::mint(&key, "time:sleep", "cx/time");
3651        let cx = test_cx().with_macaroon(token);
3652
3653        // Attenuate with time limit
3654        let cx2 = cx.attenuate(CaveatPredicate::TimeBefore(3000)).unwrap();
3655
3656        // Further attenuate with max uses
3657        let cx3 = cx2.attenuate(CaveatPredicate::MaxUses(5)).unwrap();
3658
3659        // Original has no restrictions
3660        let ctx = VerificationContext::new().with_time(1000);
3661        assert!(cx.verify_capability(&key, &ctx).is_ok());
3662
3663        // cx2 has time restriction
3664        assert!(cx2.verify_capability(&key, &ctx).is_ok());
3665        let ctx_late = VerificationContext::new().with_time(4000);
3666        assert!(cx2.verify_capability(&key, &ctx_late).is_err());
3667
3668        // cx3 has both time + uses restriction
3669        let ctx_ok = VerificationContext::new().with_time(1000).with_use_count(3);
3670        assert!(cx3.verify_capability(&key, &ctx_ok).is_ok());
3671        let ctx_overuse = VerificationContext::new()
3672            .with_time(1000)
3673            .with_use_count(10);
3674        assert!(cx3.verify_capability(&key, &ctx_overuse).is_err());
3675    }
3676
3677    #[test]
3678    fn cx_verify_emits_evidence() {
3679        use crate::evidence_sink::CollectorSink;
3680
3681        let key = test_root_key();
3682        let token = MacaroonToken::mint(&key, "spawn:r1", "cx/scheduler");
3683        let sink = Arc::new(CollectorSink::new());
3684        let cx = test_cx()
3685            .with_macaroon(token)
3686            .with_evidence_sink(Some(sink.clone() as Arc<dyn EvidenceSink>));
3687
3688        let ctx = VerificationContext::new();
3689
3690        // Successful verification should emit evidence
3691        cx.verify_capability(&key, &ctx).unwrap();
3692        let entries = sink.entries();
3693        assert_eq!(entries.len(), 1);
3694        assert_eq!(entries[0].component, "cx_macaroon");
3695        assert_eq!(entries[0].action, "verify_success");
3696
3697        // Failed verification should also emit evidence
3698        let wrong_key = crate::security::key::AuthKey::from_seed(99);
3699        let _ = cx.verify_capability(&wrong_key, &ctx);
3700        let entries = sink.entries();
3701        assert_eq!(entries.len(), 2);
3702        assert_eq!(entries[1].action, "verify_fail_signature");
3703    }
3704
3705    #[cfg(feature = "messaging-fabric")]
3706    #[test]
3707    fn cx_grant_publish_capability_mints_token_and_runtime_grant() {
3708        let cx = test_cx();
3709        let schema = capability_schema(
3710            vec![SubjectFamily::Command],
3711            vec![CapabilityPermission::Publish],
3712        );
3713
3714        let granted = cx
3715            .grant_publish_capability::<CommandFamily>(
3716                SubjectPattern::new("orders.>"),
3717                &schema,
3718                DeliveryClass::EphemeralInteractive,
3719            )
3720            .expect("publish capability should mint");
3721
3722        assert_eq!(granted.token().family(), SubjectFamily::Command);
3723        assert!(cx.check_fabric_capability(&FabricCapability::Publish {
3724            subject: SubjectPattern::new("orders.created"),
3725        }));
3726        assert!(!cx.check_fabric_capability(&FabricCapability::Publish {
3727            subject: SubjectPattern::new("payments.created"),
3728        }));
3729        assert_eq!(cx.fabric_capabilities().len(), 1);
3730    }
3731
3732    #[cfg(feature = "messaging-fabric")]
3733    #[test]
3734    fn cx_revoke_fabric_capabilities_by_id_and_scope_propagates_to_children() {
3735        let cx = test_cx();
3736        let child = cx.restrict::<cap::None>();
3737        let publish = cx
3738            .grant_fabric_capability(FabricCapability::Publish {
3739                subject: SubjectPattern::new("orders.>"),
3740            })
3741            .expect("publish grant");
3742        let subscribe = cx
3743            .grant_fabric_capability(FabricCapability::Subscribe {
3744                subject: SubjectPattern::new("orders.created"),
3745            })
3746            .expect("subscribe grant");
3747
3748        assert!(child.check_fabric_capability(&FabricCapability::Publish {
3749            subject: SubjectPattern::new("orders.created"),
3750        }));
3751        assert_eq!(
3752            child.revoke_fabric_capability_scope(FabricCapabilityScope::Publish),
3753            1
3754        );
3755        assert!(!cx.check_fabric_capability(&FabricCapability::Publish {
3756            subject: SubjectPattern::new("orders.created"),
3757        }));
3758        assert_eq!(
3759            cx.revoke_fabric_capability(subscribe.id()),
3760            Some(FabricCapability::Subscribe {
3761                subject: SubjectPattern::new("orders.created"),
3762            })
3763        );
3764        assert!(
3765            !child.check_fabric_capability(&FabricCapability::Subscribe {
3766                subject: SubjectPattern::new("orders.created"),
3767            })
3768        );
3769        assert_eq!(publish.id().raw(), 1);
3770    }
3771
3772    #[cfg(feature = "messaging-fabric")]
3773    #[test]
3774    fn cx_revoke_fabric_capability_by_subject_is_overlap_based() {
3775        let cx = test_cx();
3776        cx.grant_fabric_capability(FabricCapability::Publish {
3777            subject: SubjectPattern::new("orders.>"),
3778        })
3779        .expect("publish grant");
3780        cx.grant_fabric_capability(FabricCapability::Subscribe {
3781            subject: SubjectPattern::new("payments.>"),
3782        })
3783        .expect("subscribe grant");
3784
3785        assert_eq!(
3786            cx.revoke_fabric_capability_by_subject(&SubjectPattern::new("orders.created")),
3787            1
3788        );
3789        assert!(!cx.check_fabric_capability(&FabricCapability::Publish {
3790            subject: SubjectPattern::new("orders.created"),
3791        }));
3792        assert!(cx.check_fabric_capability(&FabricCapability::Subscribe {
3793            subject: SubjectPattern::new("payments.captured"),
3794        }));
3795    }
3796
3797    #[cfg(feature = "messaging-fabric")]
3798    #[test]
3799    fn cx_rejects_empty_stream_capability_names() {
3800        let cx = test_cx();
3801
3802        let error = cx
3803            .grant_fabric_capability(FabricCapability::ConsumeStream {
3804                stream: "   ".to_owned(),
3805            })
3806            .expect_err("blank stream names must fail");
3807
3808        assert_eq!(error, FabricCapabilityGrantError::EmptyStreamName);
3809    }
3810
3811    // ========================================================================
3812    // Metamorphic Testing: Cx::trace ordering across scope boundaries
3813    // ========================================================================
3814
3815    /// MR1: Parent-Child Trace Ordering (Inclusive)
3816    /// Transformation: Create child scope
3817    /// Relation: Parent traces precede child traces in logical order
3818    #[test]
3819    fn mr_trace_parent_child_ordering() {
3820        let parent_cx = test_cx();
3821        let trace = TraceBufferHandle::new(16);
3822        parent_cx.set_trace_buffer(trace.clone());
3823
3824        // Parent emits trace first
3825        parent_cx.trace("parent trace 1");
3826
3827        // Create child context (simulating child scope)
3828        let child_cx = parent_cx.clone();
3829        child_cx.trace("child trace 1");
3830        child_cx.trace("child trace 2");
3831
3832        // Parent emits another trace after child
3833        parent_cx.trace("parent trace 2");
3834
3835        let events = trace.snapshot();
3836        assert_eq!(events.len(), 4);
3837
3838        // Extract logical times for ordering verification
3839        let times: Vec<_> = events
3840            .iter()
3841            .map(|e| e.logical_time.as_ref().expect("logical time"))
3842            .collect();
3843
3844        // Verify parent traces have logical time precedence structure
3845        // (In a real parent-child scenario, parent regions would have different region IDs)
3846        // For this test we verify causal ordering through logical time monotonicity
3847        for i in 1..times.len() {
3848            assert!(
3849                times[i - 1] <= times[i],
3850                "Logical time should be monotonically increasing: {:?} > {:?}",
3851                times[i - 1],
3852                times[i]
3853            );
3854        }
3855    }
3856
3857    /// MR2: Deterministic Interleaving (Equivalence)
3858    /// Transformation: Same seed replay
3859    /// Relation: Identical trace order under deterministic execution
3860    #[test]
3861    fn mr_trace_deterministic_interleaving() {
3862        // First execution with entropy seed
3863        let cx1 = test_cx_with_entropy(42);
3864        let trace1 = TraceBufferHandle::new(16);
3865        cx1.set_trace_buffer(trace1.clone());
3866
3867        // Simulate concurrent traces with deterministic randomization
3868        for i in 0..5 {
3869            if cx1.random_usize(2) == 0 {
3870                cx1.trace(&format!("branch_a_{}", i));
3871            } else {
3872                cx1.trace(&format!("branch_b_{}", i));
3873            }
3874        }
3875
3876        // Second execution with same seed
3877        let cx2 = test_cx_with_entropy(42);
3878        let trace2 = TraceBufferHandle::new(16);
3879        cx2.set_trace_buffer(trace2.clone());
3880
3881        for i in 0..5 {
3882            if cx2.random_usize(2) == 0 {
3883                cx2.trace(&format!("branch_a_{}", i));
3884            } else {
3885                cx2.trace(&format!("branch_b_{}", i));
3886            }
3887        }
3888
3889        let events1 = trace1.snapshot();
3890        let events2 = trace2.snapshot();
3891
3892        // Deterministic execution should produce identical trace sequences
3893        assert_eq!(
3894            events1.len(),
3895            events2.len(),
3896            "Trace count should be deterministic"
3897        );
3898
3899        for (i, (e1, e2)) in events1.iter().zip(events2.iter()).enumerate() {
3900            // Note: We compare message content rather than exact logical time
3901            // as time implementation details may vary while maintaining determinism
3902            assert_eq!(
3903                trace_message(e1),
3904                trace_message(e2),
3905                "Trace message at index {} should be deterministic: '{}' vs '{}'",
3906                i,
3907                trace_message(e1),
3908                trace_message(e2)
3909            );
3910        }
3911    }
3912
3913    /// MR3: Macaroon Causal Ordering (Permutative)
3914    /// Transformation: Macaroon attenuation chain
3915    /// Relation: Logical time monotonic through auth flow
3916    #[test]
3917    fn mr_trace_macaroon_causal_ordering() {
3918        use crate::cx::macaroon::{CaveatPredicate, MacaroonToken};
3919        use crate::security::key::AuthKey;
3920
3921        let key = AuthKey::from_seed(42);
3922        let token = MacaroonToken::mint(&key, "trace:emit", "cx/trace");
3923
3924        // Root context with macaroon
3925        let root_cx = test_cx().with_macaroon(token);
3926        let trace = TraceBufferHandle::new(16);
3927        root_cx.set_trace_buffer(trace.clone());
3928
3929        root_cx.trace("root macaroon trace");
3930
3931        // Attenuated context (simulating capability restriction)
3932        let attenuated_cx = root_cx
3933            .attenuate(CaveatPredicate::TimeBefore(5000))
3934            .expect("attenuation should succeed");
3935        attenuated_cx.trace("attenuated trace 1");
3936
3937        // Further attenuated context
3938        let further_attenuated_cx = attenuated_cx
3939            .attenuate(CaveatPredicate::MaxUses(10))
3940            .expect("further attenuation should succeed");
3941        further_attenuated_cx.trace("further attenuated trace");
3942
3943        // Back to less attenuated
3944        attenuated_cx.trace("attenuated trace 2");
3945
3946        let events = trace.snapshot();
3947        assert_eq!(events.len(), 4);
3948
3949        // Verify causal ordering preservation through logical time
3950        let logical_times: Vec<_> = events
3951            .iter()
3952            .map(|e| e.logical_time.as_ref().expect("logical time"))
3953            .collect();
3954
3955        // Logical time should increase monotonically regardless of attenuation level
3956        for i in 1..logical_times.len() {
3957            assert!(
3958                logical_times[i - 1] <= logical_times[i],
3959                "Macaroon attenuation should preserve causal ordering: tick {:?} > {:?}",
3960                logical_times[i - 1],
3961                logical_times[i]
3962            );
3963        }
3964    }
3965
3966    /// MR4: Budget Exhaustion Idempotence (Equivalence)
3967    /// Transformation: Multiple budget exhaust attempts
3968    /// Relation: Single log entry recorded
3969    #[test]
3970    fn mr_trace_budget_exhaustion_idempotence() {
3971        use crate::types::Budget;
3972
3973        // Create context with minimal budget
3974        let budget = Budget::new().with_poll_quota(1);
3975        let cx = Cx::for_testing_with_budget(budget);
3976        let trace = TraceBufferHandle::new(16);
3977        cx.set_trace_buffer(trace.clone());
3978
3979        // First trace that might exhaust budget
3980        cx.trace("pre-exhaustion trace");
3981
3982        // Simulate budget exhaustion (in practice this would happen during task execution)
3983        // For this test, we verify that multiple trace attempts during exhaustion
3984        // don't create duplicate entries
3985        cx.trace("exhaustion trace 1");
3986        cx.trace("exhaustion trace 2"); // Same condition
3987        cx.trace("exhaustion trace 3"); // Same condition
3988
3989        let events = trace.snapshot();
3990
3991        // All trace calls should succeed (budget exhaustion doesn't prevent tracing)
3992        // But this verifies that tracing remains consistent under budget pressure
3993        assert_eq!(events.len(), 4, "All traces should be recorded");
3994
3995        // Verify no duplicate logical times (idempotence of time allocation)
3996        let mut logical_times: Vec<_> = events
3997            .iter()
3998            .map(|e| format!("{:?}", e.logical_time.as_ref().expect("logical time")))
3999            .collect();
4000        logical_times.sort_unstable();
4001        logical_times.dedup();
4002
4003        assert_eq!(
4004            logical_times.len(),
4005            4,
4006            "Logical time allocation should be idempotent (no duplicate times)"
4007        );
4008    }
4009
4010    /// MR5: Clone Trace Equivalence (Equivalence)
4011    /// Transformation: Clone Cx
4012    /// Relation: Same trace patterns produced
4013    #[test]
4014    fn mr_trace_clone_equivalence() {
4015        let original_cx = test_cx_with_entropy(123);
4016        let trace = TraceBufferHandle::new(16);
4017        original_cx.set_trace_buffer(trace.clone());
4018
4019        // Clone the context
4020        let cloned_cx = original_cx.clone();
4021
4022        // Both should share the same trace buffer and produce equivalent patterns
4023        original_cx.trace("original trace 1");
4024        cloned_cx.trace("cloned trace 1");
4025        original_cx.trace("original trace 2");
4026        cloned_cx.trace("cloned trace 2");
4027
4028        let events = trace.snapshot();
4029        assert_eq!(events.len(), 4, "Both contexts should write to same buffer");
4030
4031        // Verify logical time ordering is preserved across clone usage
4032        let logical_times: Vec<_> = events
4033            .iter()
4034            .map(|e| e.logical_time.as_ref().expect("logical time"))
4035            .collect();
4036
4037        for i in 1..logical_times.len() {
4038            assert!(
4039                logical_times[i - 1] <= logical_times[i],
4040                "Clone should preserve logical time ordering: {:?} > {:?}",
4041                logical_times[i - 1],
4042                logical_times[i]
4043            );
4044        }
4045
4046        // Verify cloned context produces same entropy sequence
4047        let val1 = original_cx.random_usize(100);
4048        let val2 = cloned_cx.random_usize(100);
4049
4050        // Both should access the same entropy source
4051        assert_eq!(val1, val2, "Cloned context should share entropy state");
4052    }
4053
4054    /// MR6: Composite Trace Ordering (Composition)
4055    /// Combines parent-child + clone + macaroon relations
4056    #[test]
4057    fn mr_trace_composite_ordering() {
4058        use crate::cx::macaroon::{CaveatPredicate, MacaroonToken};
4059        use crate::security::key::AuthKey;
4060
4061        let key = AuthKey::from_seed(789);
4062        let token = MacaroonToken::mint(&key, "trace:composite", "cx/test");
4063
4064        // Root context with macaroon
4065        let root_cx = test_cx_with_entropy(456).with_macaroon(token);
4066        let trace = TraceBufferHandle::new(32);
4067        root_cx.set_trace_buffer(trace.clone());
4068
4069        // MR1 + MR3: Parent with macaroon
4070        root_cx.trace("parent+macaroon trace");
4071
4072        // MR5: Clone preserves properties
4073        let child_cx = root_cx.clone();
4074
4075        // MR3: Attenuation preserves ordering
4076        let attenuated_child = child_cx
4077            .attenuate(CaveatPredicate::TimeBefore(10000))
4078            .expect("attenuation should work");
4079
4080        // MR1: Child traces after parent
4081        attenuated_child.trace("child+attenuated trace");
4082
4083        // MR2: Deterministic interleaving
4084        for i in 0..3 {
4085            if root_cx.random_usize(2) == 0 {
4086                root_cx.trace(&format!("parent_branch_{}", i));
4087            } else {
4088                attenuated_child.trace(&format!("child_branch_{}", i));
4089            }
4090        }
4091
4092        let events = trace.snapshot();
4093        assert!(
4094            events.len() >= 5,
4095            "Composite test should produce multiple traces"
4096        );
4097
4098        // Verify all metamorphic properties hold in composition:
4099
4100        // 1. Logical time monotonicity (covers MR1, MR3, MR5)
4101        let logical_times: Vec<_> = events
4102            .iter()
4103            .map(|e| e.logical_time.as_ref().expect("logical time"))
4104            .collect();
4105
4106        for i in 1..logical_times.len() {
4107            assert!(
4108                logical_times[i - 1] <= logical_times[i],
4109                "Composite trace ordering should preserve monotonicity: {:?} > {:?}",
4110                logical_times[i - 1],
4111                logical_times[i]
4112            );
4113        }
4114
4115        // 2. All traces recorded (MR4 budget idempotence equivalent)
4116        assert!(
4117            events.iter().all(|e| !trace_message(e).is_empty()),
4118            "All traces should have non-empty messages"
4119        );
4120
4121        // 3. Deterministic branching produces expected pattern (MR2)
4122        let branch_traces = events
4123            .iter()
4124            .filter(|e| trace_message(e).contains("_branch_"))
4125            .count();
4126        assert_eq!(
4127            branch_traces, 3,
4128            "Deterministic branching should produce exactly 3 branch traces"
4129        );
4130    }
4131}