Skip to main content

ftui_core/
cx.rs

1//! Capability context (`Cx`) for cooperative cancellation and deadline propagation.
2//!
3//! `Cx` is a lightweight handle threaded through every async operation, I/O call,
4//! and timer in the FrankenTUI stack. It enables:
5//!
6//! - **Cooperative cancellation**: Any holder can check `cx.is_cancelled()` and
7//!   bail out early.
8//! - **Deadline propagation**: A parent context's deadline flows to children.
9//!   `cx.deadline()` returns the tightest deadline in the chain.
10//! - **Deterministic testing via Lab**: In `Lab` mode, time is controlled
11//!   externally, enabling fully reproducible test runs.
12//!
13//! # Design
14//!
15//! `Cx` is cheaply cloneable (`Arc` inside) and immutable from the outside.
16//! To cancel or advance Lab time, hold the companion [`CxController`].
17//!
18//! # Tracing
19//!
20//! When the `tracing` feature is active, cancellation emits a `WARN`-level event
21//! and deadline checks emit `cx.propagate` spans with `cx_id` and
22//! `deadline_remaining_us` fields.
23//!
24//! # Example
25//!
26//! ```
27//! use ftui_core::cx::{Cx, CxController};
28//! use web_time::Duration;
29//!
30//! // Create a root context with a 500ms deadline.
31//! let (cx, ctrl) = Cx::with_deadline(Duration::from_millis(500));
32//! assert!(!cx.is_cancelled());
33//! assert!(cx.deadline().is_some());
34//!
35//! // Cancel it.
36//! ctrl.cancel();
37//! assert!(cx.is_cancelled());
38//! ```
39
40use std::sync::Arc;
41use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
42use web_time::{Duration, Instant};
43
44// Import tracing macros (no-op when tracing feature is disabled).
45#[cfg(feature = "tracing")]
46use crate::logging::{info_span, warn};
47#[cfg(not(feature = "tracing"))]
48use crate::{info_span, warn};
49
50// ─── Cx ID generation ────────────────────────────────────────────────────────
51
52static NEXT_CX_ID: AtomicU64 = AtomicU64::new(1);
53
54fn next_cx_id() -> u64 {
55    NEXT_CX_ID.fetch_add(1, Ordering::Relaxed)
56}
57
58// ─── Metrics counters ────────────────────────────────────────────────────────
59
60/// Total number of Cx cancellations observed.
61static CX_CANCELLATIONS_TOTAL: AtomicU64 = AtomicU64::new(0);
62
63/// Read the total cancellation count (for diagnostics/telemetry).
64#[must_use]
65pub fn cx_cancellations_total() -> u64 {
66    CX_CANCELLATIONS_TOTAL.load(Ordering::Relaxed)
67}
68
69// ─── Time source ─────────────────────────────────────────────────────────────
70
71/// Time source abstraction for deterministic testing.
72///
73/// In production, `Cx` uses `web_time::Instant::now()`.
74/// In Lab mode, time is controlled via [`LabClock`].
75#[derive(Debug, Clone)]
76enum TimeSource {
77    /// Real wall-clock time.
78    Real,
79    /// Deterministic lab clock for testing.
80    Lab(LabClock),
81}
82
83/// A manually-advanceable clock for deterministic tests.
84///
85/// All `Cx` instances sharing the same `LabClock` see the same time.
86#[derive(Debug, Clone)]
87pub struct LabClock {
88    epoch: Instant,
89    offset_us: Arc<AtomicU64>,
90}
91
92impl LabClock {
93    /// Create a new lab clock starting at `Instant::now()`.
94    #[must_use]
95    pub fn new() -> Self {
96        Self {
97            epoch: Instant::now(),
98            offset_us: Arc::new(AtomicU64::new(0)),
99        }
100    }
101
102    /// Advance the lab clock by `delta`.
103    pub fn advance(&self, delta: Duration) {
104        let us = delta.as_micros().min(u64::MAX as u128) as u64;
105        self.offset_us.fetch_add(us, Ordering::Release);
106    }
107
108    /// Current lab time.
109    #[must_use]
110    pub fn now(&self) -> Instant {
111        let offset = Duration::from_micros(self.offset_us.load(Ordering::Acquire));
112        self.epoch + offset
113    }
114}
115
116impl Default for LabClock {
117    fn default() -> Self {
118        Self::new()
119    }
120}
121
122// ─── Inner shared state ──────────────────────────────────────────────────────
123
124#[derive(Debug)]
125struct CxInner {
126    id: u64,
127    cancelled: AtomicBool,
128    /// Deadline as microseconds since `created_at`. `u64::MAX` means no deadline.
129    deadline_us: u64,
130    created_at: Instant,
131    time_source: TimeSource,
132    /// Optional parent for deadline chain resolution.
133    parent: Option<Arc<CxInner>>,
134}
135
136// ─── Cx ──────────────────────────────────────────────────────────────────────
137
138/// Capability context handle.
139///
140/// Cheaply cloneable. Check `is_cancelled()` at natural yield points
141/// (loop iterations, before I/O, before expensive computation).
142#[derive(Clone, Debug)]
143pub struct Cx {
144    inner: Arc<CxInner>,
145}
146
147impl Cx {
148    // ── Constructors ─────────────────────────────────────────────────
149
150    /// Create a root context with no deadline.
151    #[must_use]
152    pub fn background() -> (Self, CxController) {
153        Self::new_inner(u64::MAX, TimeSource::Real, None)
154    }
155
156    /// Create a root context with a deadline.
157    #[must_use]
158    pub fn with_deadline(deadline: Duration) -> (Self, CxController) {
159        let us = deadline.as_micros().min(u64::MAX as u128) as u64;
160        Self::new_inner(us, TimeSource::Real, None)
161    }
162
163    /// Create a root context using a [`LabClock`] for deterministic testing.
164    #[must_use]
165    pub fn lab(clock: &LabClock) -> (Self, CxController) {
166        Self::new_inner(u64::MAX, TimeSource::Lab(clock.clone()), None)
167    }
168
169    /// Create a lab context with a deadline.
170    #[must_use]
171    pub fn lab_with_deadline(clock: &LabClock, deadline: Duration) -> (Self, CxController) {
172        let us = deadline.as_micros().min(u64::MAX as u128) as u64;
173        Self::new_inner(us, TimeSource::Lab(clock.clone()), None)
174    }
175
176    /// Derive a child context with a tighter deadline.
177    ///
178    /// The effective deadline is `min(parent.deadline(), child_deadline)`.
179    /// Cancelling the parent also cancels the child (checked via chain walk).
180    #[must_use]
181    pub fn child(&self, deadline: Duration) -> (Self, CxController) {
182        let us = deadline.as_micros().min(u64::MAX as u128) as u64;
183        let time_source = match &self.inner.time_source {
184            TimeSource::Real => TimeSource::Real,
185            TimeSource::Lab(c) => TimeSource::Lab(c.clone()),
186        };
187        Self::new_inner(us, time_source, Some(self.inner.clone()))
188    }
189
190    /// Derive a child context that inherits the parent deadline.
191    #[must_use]
192    pub fn child_inherit(&self) -> (Self, CxController) {
193        let time_source = match &self.inner.time_source {
194            TimeSource::Real => TimeSource::Real,
195            TimeSource::Lab(c) => TimeSource::Lab(c.clone()),
196        };
197        Self::new_inner(u64::MAX, time_source, Some(self.inner.clone()))
198    }
199
200    fn new_inner(
201        deadline_us: u64,
202        time_source: TimeSource,
203        parent: Option<Arc<CxInner>>,
204    ) -> (Self, CxController) {
205        let now = match &time_source {
206            TimeSource::Real => Instant::now(),
207            TimeSource::Lab(c) => c.now(),
208        };
209        let inner = Arc::new(CxInner {
210            id: next_cx_id(),
211            cancelled: AtomicBool::new(false),
212            deadline_us,
213            created_at: now,
214            time_source,
215            parent,
216        });
217        let cx = Self {
218            inner: inner.clone(),
219        };
220        let ctrl = CxController { inner };
221        (cx, ctrl)
222    }
223
224    // ── Queries ──────────────────────────────────────────────────────
225
226    /// Unique identifier for this context (for tracing/logging).
227    #[inline]
228    #[must_use]
229    pub fn id(&self) -> u64 {
230        self.inner.id
231    }
232
233    /// Check if this context (or any ancestor) has been cancelled.
234    #[inline]
235    #[must_use]
236    pub fn is_cancelled(&self) -> bool {
237        self.is_cancelled_inner(&self.inner)
238    }
239
240    fn is_cancelled_inner(&self, inner: &CxInner) -> bool {
241        if inner.cancelled.load(Ordering::Acquire) {
242            return true;
243        }
244        if let Some(ref parent) = inner.parent {
245            return self.is_cancelled_inner(parent);
246        }
247        false
248    }
249
250    /// Check if the deadline has passed.
251    #[inline]
252    #[must_use]
253    pub fn is_expired(&self) -> bool {
254        self.remaining().is_some_and(|d| d.is_zero())
255    }
256
257    /// Check if the context is done (cancelled or expired).
258    #[inline]
259    #[must_use]
260    pub fn is_done(&self) -> bool {
261        self.is_cancelled() || self.is_expired()
262    }
263
264    /// Return the effective deadline as a `Duration` from context creation,
265    /// considering the full parent chain. Returns `None` if no deadline is set.
266    #[must_use]
267    pub fn deadline(&self) -> Option<Duration> {
268        let own = self.inner.deadline_us;
269        let parent_remaining = self.parent_remaining_us();
270
271        let now = self.now();
272        let elapsed = now
273            .checked_duration_since(self.inner.created_at)
274            .unwrap_or(Duration::ZERO);
275        let elapsed_us = elapsed.as_micros().min(u64::MAX as u128) as u64;
276
277        // Own remaining
278        let own_remaining = if own == u64::MAX {
279            u64::MAX
280        } else {
281            own.saturating_sub(elapsed_us)
282        };
283
284        let effective = own_remaining.min(parent_remaining);
285        if effective == u64::MAX {
286            None
287        } else {
288            Some(Duration::from_micros(effective))
289        }
290    }
291
292    /// Remaining time until deadline (saturates to zero, never negative).
293    /// Returns `None` if no deadline is set.
294    #[must_use]
295    pub fn remaining(&self) -> Option<Duration> {
296        self.deadline()
297    }
298
299    /// Remaining time in microseconds, or `None` if no deadline.
300    #[must_use]
301    pub fn remaining_us(&self) -> Option<u64> {
302        self.remaining()
303            .map(|d| d.as_micros().min(u64::MAX as u128) as u64)
304    }
305
306    /// Current time according to this context's time source.
307    #[must_use]
308    pub fn now(&self) -> Instant {
309        match &self.inner.time_source {
310            TimeSource::Real => Instant::now(),
311            TimeSource::Lab(c) => c.now(),
312        }
313    }
314
315    /// Whether this context uses a lab clock.
316    #[inline]
317    #[must_use]
318    pub fn is_lab(&self) -> bool {
319        matches!(self.inner.time_source, TimeSource::Lab(_))
320    }
321
322    fn parent_remaining_us(&self) -> u64 {
323        match &self.inner.parent {
324            Some(parent) => {
325                let parent_cx = Cx {
326                    inner: parent.clone(),
327                };
328                parent_cx.remaining_us().unwrap_or(u64::MAX)
329            }
330            None => u64::MAX,
331        }
332    }
333
334    // ── Convenience ──────────────────────────────────────────────────
335
336    /// Sleep for the given duration, respecting cancellation and deadline.
337    ///
338    /// Returns `true` if the full duration elapsed, `false` if cancelled or
339    /// deadline expired early.
340    pub fn sleep(&self, duration: Duration) -> bool {
341        let effective = match self.remaining() {
342            Some(rem) => duration.min(rem),
343            None => duration,
344        };
345        if effective.is_zero() || self.is_cancelled() {
346            return false;
347        }
348
349        // Use small sleep chunks for responsive cancellation checking
350        let chunk = Duration::from_millis(10);
351        let mut remaining = effective;
352        while remaining > Duration::ZERO && !self.is_cancelled() {
353            let sleep_time = remaining.min(chunk);
354            std::thread::sleep(sleep_time);
355            remaining = remaining.saturating_sub(sleep_time);
356        }
357        !self.is_cancelled() && remaining.is_zero()
358    }
359}
360
361// ─── CxController ────────────────────────────────────────────────────────────
362
363/// Control handle for a [`Cx`].
364///
365/// Held by the owner of the context to trigger cancellation.
366/// Dropping the controller does **not** cancel the context — cancellation
367/// is always explicit.
368#[derive(Debug)]
369pub struct CxController {
370    inner: Arc<CxInner>,
371}
372
373impl CxController {
374    /// Cancel the associated context.
375    ///
376    /// All clones of the `Cx` (and children) will observe `is_cancelled() == true`.
377    pub fn cancel(&self) {
378        let was_cancelled = self.inner.cancelled.swap(true, Ordering::Release);
379        if !was_cancelled {
380            CX_CANCELLATIONS_TOTAL.fetch_add(1, Ordering::Relaxed);
381            warn!(cx_id = self.inner.id, "cx cancelled");
382        }
383    }
384
385    /// Whether this context has already been cancelled.
386    #[inline]
387    #[must_use]
388    pub fn is_cancelled(&self) -> bool {
389        self.inner.cancelled.load(Ordering::Acquire)
390    }
391}
392
393// ─── CxError ─────────────────────────────────────────────────────────────────
394
395/// Error returned when an operation is cancelled or times out via `Cx`.
396#[derive(Debug, Clone, PartialEq, Eq)]
397pub enum CxError {
398    /// The context was explicitly cancelled.
399    Cancelled,
400    /// The context deadline expired.
401    DeadlineExceeded,
402}
403
404impl std::fmt::Display for CxError {
405    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
406        match self {
407            Self::Cancelled => write!(f, "context cancelled"),
408            Self::DeadlineExceeded => write!(f, "deadline exceeded"),
409        }
410    }
411}
412
413impl std::error::Error for CxError {}
414
415impl Cx {
416    /// Check if the context is still live; return `Err` if cancelled or expired.
417    ///
418    /// Intended for use at yield points:
419    /// ```ignore
420    /// cx.check()?;
421    /// // ... continue work ...
422    /// ```
423    pub fn check(&self) -> Result<(), CxError> {
424        let cancelled = self.is_cancelled();
425        let deadline_remaining_us = self.remaining_us().unwrap_or(u64::MAX);
426        let propagate_span = info_span!(
427            "cx.propagate",
428            cx_id = self.id(),
429            deadline_remaining_us,
430            cx_cancelled = cancelled
431        );
432        let _propagate_span = propagate_span.enter();
433
434        if cancelled {
435            warn!(
436                cx_id = self.id(),
437                deadline_remaining_us, "cx.propagate cancelled"
438            );
439            return Err(CxError::Cancelled);
440        }
441        if deadline_remaining_us == 0 {
442            return Err(CxError::DeadlineExceeded);
443        }
444        Ok(())
445    }
446}
447
448// ─── Tests ───────────────────────────────────────────────────────────────────
449
450#[cfg(test)]
451mod tests {
452    use super::*;
453    use std::sync::mpsc;
454    #[cfg(feature = "tracing-json")]
455    use std::sync::{Arc, Mutex};
456
457    #[cfg(feature = "tracing-json")]
458    use tracing::Subscriber;
459    #[cfg(feature = "tracing-json")]
460    use tracing::field::{Field, Visit};
461    #[cfg(feature = "tracing-json")]
462    use tracing_subscriber::filter::LevelFilter;
463    #[cfg(feature = "tracing-json")]
464    use tracing_subscriber::layer::{Context, Layer};
465    #[cfg(feature = "tracing-json")]
466    use tracing_subscriber::prelude::*;
467    #[cfg(feature = "tracing-json")]
468    use tracing_subscriber::registry::LookupSpan;
469
470    #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
471    struct TestModel {
472        value: u32,
473    }
474
475    #[derive(Debug, Clone, Copy, PartialEq, Eq)]
476    enum RenderStrategy {
477        Full,
478        Degraded,
479    }
480
481    #[derive(Debug, Clone, PartialEq, Eq)]
482    enum MissingCxError {
483        MissingContext,
484        Cx(CxError),
485    }
486
487    fn update_with_cx(cx: &Cx, model: &mut TestModel, delta: u32) -> Result<(), CxError> {
488        cx.check()?;
489        model.value = model.value.saturating_add(delta);
490        Ok(())
491    }
492
493    fn update_with_optional_cx(
494        cx: Option<&Cx>,
495        model: &mut TestModel,
496        delta: u32,
497    ) -> Result<(), MissingCxError> {
498        let cx = cx.ok_or(MissingCxError::MissingContext)?;
499        update_with_cx(cx, model, delta).map_err(MissingCxError::Cx)
500    }
501
502    fn render_widget_with_cx_atomic(
503        cx: &Cx,
504        source: &str,
505        sink: &mut String,
506        mut on_chunk: impl FnMut(usize),
507    ) -> Result<(), CxError> {
508        let mut scratch = String::new();
509        for (idx, ch) in source.chars().enumerate() {
510            scratch.push(ch);
511            on_chunk(idx);
512            cx.check()?;
513        }
514        sink.push_str(&scratch);
515        Ok(())
516    }
517
518    fn subscription_send_with_cx(
519        cx: &Cx,
520        sender: &mpsc::Sender<u32>,
521        payload: u32,
522    ) -> Result<(), CxError> {
523        cx.check()?;
524        sender.send(payload).map_err(|_| CxError::Cancelled)
525    }
526
527    fn choose_render_strategy(cx: &Cx, downgrade_threshold: Duration) -> RenderStrategy {
528        match cx.remaining() {
529            Some(remaining) if remaining <= downgrade_threshold => RenderStrategy::Degraded,
530            _ => RenderStrategy::Full,
531        }
532    }
533
534    #[test]
535    fn background_cx_is_not_cancelled() {
536        let (cx, _ctrl) = Cx::background();
537        assert!(!cx.is_cancelled());
538        assert!(!cx.is_expired());
539        assert!(!cx.is_done());
540        assert!(cx.deadline().is_none());
541    }
542
543    #[test]
544    fn cancel_propagates() {
545        let (cx, ctrl) = Cx::background();
546        assert!(!cx.is_cancelled());
547        ctrl.cancel();
548        assert!(cx.is_cancelled());
549        assert!(cx.is_done());
550    }
551
552    #[test]
553    fn clone_shares_cancellation() {
554        let (cx, ctrl) = Cx::background();
555        let cx2 = cx.clone();
556        ctrl.cancel();
557        assert!(cx.is_cancelled());
558        assert!(cx2.is_cancelled());
559    }
560
561    #[test]
562    fn deadline_reports_remaining() {
563        let (cx, _ctrl) = Cx::with_deadline(Duration::from_secs(10));
564        let rem = cx.remaining().expect("should have deadline");
565        // Should be close to 10 seconds (minus tiny elapsed)
566        assert!(rem.as_secs() >= 9);
567    }
568
569    #[test]
570    fn child_inherits_cancellation() {
571        let (parent, parent_ctrl) = Cx::background();
572        let (child, _child_ctrl) = parent.child(Duration::from_secs(60));
573        assert!(!child.is_cancelled());
574        parent_ctrl.cancel();
575        assert!(child.is_cancelled());
576    }
577
578    #[test]
579    fn child_has_tighter_deadline() {
580        let (parent, _) = Cx::with_deadline(Duration::from_secs(60));
581        let (child, _) = parent.child(Duration::from_millis(100));
582        let child_rem = child.remaining().expect("child has deadline");
583        // Child deadline should be ~100ms, much less than parent's 60s
584        assert!(child_rem < Duration::from_secs(1));
585    }
586
587    #[test]
588    fn child_respects_parent_tighter_deadline() {
589        let (parent, _) = Cx::with_deadline(Duration::from_millis(50));
590        let (child, _) = parent.child(Duration::from_secs(60));
591        let child_rem = child.remaining().expect("child has deadline via parent");
592        // Parent deadline is tighter, child should see ~50ms
593        assert!(child_rem < Duration::from_secs(1));
594    }
595
596    #[test]
597    fn lab_clock_deterministic() {
598        let clock = LabClock::new();
599        let (cx, _ctrl) = Cx::lab_with_deadline(&clock, Duration::from_millis(100));
600
601        // At t=0, should have ~100ms remaining
602        let r1 = cx.remaining().expect("has deadline");
603        assert!(r1 >= Duration::from_millis(90));
604
605        // Advance 80ms
606        clock.advance(Duration::from_millis(80));
607        let r2 = cx.remaining().expect("has deadline");
608        assert!(r2 <= Duration::from_millis(25));
609        assert!(!cx.is_expired());
610
611        // Advance past deadline
612        clock.advance(Duration::from_millis(30));
613        assert!(cx.is_expired());
614        assert!(cx.is_done());
615    }
616
617    #[test]
618    fn check_returns_ok_when_live() {
619        let (cx, _ctrl) = Cx::background();
620        assert!(cx.check().is_ok());
621    }
622
623    #[test]
624    fn check_returns_cancelled() {
625        let (cx, ctrl) = Cx::background();
626        ctrl.cancel();
627        assert_eq!(cx.check(), Err(CxError::Cancelled));
628    }
629
630    #[test]
631    fn check_returns_deadline_exceeded() {
632        let clock = LabClock::new();
633        let (cx, _ctrl) = Cx::lab_with_deadline(&clock, Duration::from_millis(10));
634        clock.advance(Duration::from_millis(20));
635        assert_eq!(cx.check(), Err(CxError::DeadlineExceeded));
636    }
637
638    #[test]
639    fn cx_id_is_unique() {
640        let (cx1, _) = Cx::background();
641        let (cx2, _) = Cx::background();
642        assert_ne!(cx1.id(), cx2.id());
643    }
644
645    #[test]
646    fn cx_is_lab() {
647        let clock = LabClock::new();
648        let (cx_lab, _) = Cx::lab(&clock);
649        let (cx_real, _) = Cx::background();
650        assert!(cx_lab.is_lab());
651        assert!(!cx_real.is_lab());
652    }
653
654    #[test]
655    fn child_inherit_no_deadline() {
656        let (parent, _) = Cx::background();
657        let (child, _) = parent.child_inherit();
658        assert!(child.deadline().is_none());
659    }
660
661    #[test]
662    fn child_inherit_with_parent_deadline() {
663        let (parent, _) = Cx::with_deadline(Duration::from_secs(30));
664        let (child, _) = parent.child_inherit();
665        // Child has no own deadline but inherits parent's
666        let rem = child.remaining().expect("inherits parent deadline");
667        assert!(rem > Duration::from_secs(28));
668    }
669
670    #[test]
671    fn cx_error_display() {
672        assert_eq!(CxError::Cancelled.to_string(), "context cancelled");
673        assert_eq!(CxError::DeadlineExceeded.to_string(), "deadline exceeded");
674    }
675
676    #[test]
677    fn controller_is_cancelled_matches_cx() {
678        let (cx, ctrl) = Cx::background();
679        assert!(!ctrl.is_cancelled());
680        ctrl.cancel();
681        assert!(ctrl.is_cancelled());
682        assert!(cx.is_cancelled());
683    }
684
685    #[test]
686    fn double_cancel_is_idempotent() {
687        let (cx, ctrl) = Cx::background();
688        ctrl.cancel();
689        ctrl.cancel();
690        assert!(cx.is_cancelled());
691    }
692
693    #[test]
694    fn lab_clock_advance_accumulates() {
695        let clock = LabClock::new();
696        let t0 = clock.now();
697        clock.advance(Duration::from_millis(100));
698        clock.advance(Duration::from_millis(200));
699        let elapsed = clock.now().duration_since(t0);
700        // Should be ~300ms
701        assert!(elapsed >= Duration::from_millis(290));
702        assert!(elapsed <= Duration::from_millis(310));
703    }
704
705    #[test]
706    fn cancellation_counter_increments() {
707        let before = cx_cancellations_total();
708        let (_cx, ctrl) = Cx::background();
709        ctrl.cancel();
710        assert!(cx_cancellations_total() > before);
711        // Double cancel should not increment again
712        let after_first = cx_cancellations_total();
713        ctrl.cancel();
714        assert_eq!(cx_cancellations_total(), after_first);
715    }
716
717    #[test]
718    fn sleep_respects_cancellation() {
719        let (cx, ctrl) = Cx::background();
720        // Cancel immediately so sleep returns false
721        ctrl.cancel();
722        let completed = cx.sleep(Duration::from_secs(10));
723        assert!(!completed);
724    }
725
726    #[test]
727    fn sleep_respects_lab_deadline() {
728        let clock = LabClock::new();
729        let (cx, _ctrl) = Cx::lab_with_deadline(&clock, Duration::from_millis(5));
730        // Advance past deadline
731        clock.advance(Duration::from_millis(10));
732        let completed = cx.sleep(Duration::from_secs(10));
733        assert!(!completed);
734    }
735
736    #[test]
737    fn cx_propagates_across_update_render_and_subscription_phases() {
738        let clock = LabClock::new();
739        let (cx, _ctrl) = Cx::lab_with_deadline(&clock, Duration::from_millis(100));
740        let mut model = TestModel::default();
741
742        update_with_cx(&cx, &mut model, 7).expect("update should respect live cx");
743        let mut rendered = String::new();
744        render_widget_with_cx_atomic(&cx, "ok", &mut rendered, |_| {})
745            .expect("render should respect live cx");
746
747        let (tx, rx) = mpsc::channel();
748        subscription_send_with_cx(&cx, &tx, model.value)
749            .expect("subscription should respect live cx");
750
751        assert_eq!(model.value, 7);
752        assert_eq!(rendered, "ok");
753        assert_eq!(rx.try_recv().expect("subscription payload"), 7);
754    }
755
756    #[test]
757    fn cancellation_mid_render_aborts_without_partial_output() {
758        let clock = LabClock::new();
759        let (cx, ctrl) = Cx::lab(&clock);
760        let mut sink = String::from("prefix:");
761
762        let result = render_widget_with_cx_atomic(&cx, "render-me", &mut sink, |idx| {
763            if idx == 2 {
764                ctrl.cancel();
765            }
766        });
767
768        assert_eq!(result, Err(CxError::Cancelled));
769        assert_eq!(sink, "prefix:");
770    }
771
772    #[test]
773    fn deadline_enforcement_triggers_strategy_downgrade() {
774        let clock = LabClock::new();
775        let (cx, _ctrl) = Cx::lab_with_deadline(&clock, Duration::from_millis(50));
776
777        assert_eq!(
778            choose_render_strategy(&cx, Duration::from_millis(10)),
779            RenderStrategy::Full
780        );
781
782        clock.advance(Duration::from_millis(45));
783        assert_eq!(
784            choose_render_strategy(&cx, Duration::from_millis(10)),
785            RenderStrategy::Degraded
786        );
787    }
788
789    #[test]
790    fn missing_cx_returns_clear_runtime_error() {
791        let mut model = TestModel::default();
792        let err = update_with_optional_cx(None, &mut model, 1).expect_err("missing cx");
793        assert_eq!(err, MissingCxError::MissingContext);
794        assert_eq!(model.value, 0, "state should remain unchanged without cx");
795    }
796
797    #[cfg(feature = "tracing-json")]
798    #[derive(Default, Clone)]
799    struct TraceCaptureLayer {
800        spans: Arc<Mutex<Vec<String>>>,
801        events: Arc<Mutex<Vec<String>>>,
802    }
803
804    #[cfg(feature = "tracing-json")]
805    #[derive(Default)]
806    struct EventMessageVisitor {
807        message: Option<String>,
808    }
809
810    #[cfg(feature = "tracing-json")]
811    impl Visit for EventMessageVisitor {
812        fn record_str(&mut self, field: &Field, value: &str) {
813            if field.name() == "message" {
814                self.message = Some(value.to_string());
815            }
816        }
817
818        fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
819            if field.name() == "message" {
820                self.message = Some(format!("{value:?}"));
821            }
822        }
823    }
824
825    #[cfg(feature = "tracing-json")]
826    impl<S> Layer<S> for TraceCaptureLayer
827    where
828        S: Subscriber + for<'lookup> LookupSpan<'lookup>,
829    {
830        fn on_new_span(
831            &self,
832            attrs: &tracing::span::Attributes<'_>,
833            _id: &tracing::span::Id,
834            _ctx: Context<'_, S>,
835        ) {
836            self.spans
837                .lock()
838                .expect("span capture lock")
839                .push(attrs.metadata().name().to_string());
840        }
841
842        fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
843            let mut visitor = EventMessageVisitor::default();
844            event.record(&mut visitor);
845            let message = visitor.message.unwrap_or_default();
846            self.events
847                .lock()
848                .expect("event capture lock")
849                .push(format!("{}:{}", event.metadata().level(), message));
850        }
851    }
852
853    #[cfg(feature = "tracing-json")]
854    #[test]
855    fn check_emits_cx_propagate_span_and_warns_on_cancellation() {
856        let capture = TraceCaptureLayer::default();
857        let subscriber =
858            tracing_subscriber::registry().with(capture.clone().with_filter(LevelFilter::TRACE));
859        let _guard = tracing::subscriber::set_default(subscriber);
860
861        let (cx, ctrl) = Cx::background();
862        ctrl.cancel();
863        let result = cx.check();
864        assert_eq!(result, Err(CxError::Cancelled));
865
866        let spans = capture.spans.lock().expect("span capture lock");
867        assert!(
868            spans.iter().any(|name| name == "cx.propagate"),
869            "expected cx.propagate span, got {spans:?}"
870        );
871        drop(spans);
872
873        let events = capture.events.lock().expect("event capture lock");
874        assert!(
875            events
876                .iter()
877                .any(|event| event.contains("WARN:cx cancelled")),
878            "expected cancellation WARN event, got {events:?}"
879        );
880        assert!(
881            events
882                .iter()
883                .any(|event| event.contains("WARN:cx.propagate cancelled")),
884            "expected cx.propagate WARN event, got {events:?}"
885        );
886    }
887}