1use std::sync::Arc;
41use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
42use web_time::{Duration, Instant};
43
44#[cfg(feature = "tracing")]
46use crate::logging::{info_span, warn};
47#[cfg(not(feature = "tracing"))]
48use crate::{info_span, warn};
49
50static NEXT_CX_ID: AtomicU64 = AtomicU64::new(1);
53
54fn next_cx_id() -> u64 {
55 NEXT_CX_ID.fetch_add(1, Ordering::Relaxed)
56}
57
58static CX_CANCELLATIONS_TOTAL: AtomicU64 = AtomicU64::new(0);
62
63#[must_use]
65pub fn cx_cancellations_total() -> u64 {
66 CX_CANCELLATIONS_TOTAL.load(Ordering::Relaxed)
67}
68
69#[derive(Debug, Clone)]
76enum TimeSource {
77 Real,
79 Lab(LabClock),
81}
82
83#[derive(Debug, Clone)]
87pub struct LabClock {
88 epoch: Instant,
89 offset_us: Arc<AtomicU64>,
90}
91
92impl LabClock {
93 #[must_use]
95 pub fn new() -> Self {
96 Self {
97 epoch: Instant::now(),
98 offset_us: Arc::new(AtomicU64::new(0)),
99 }
100 }
101
102 pub fn advance(&self, delta: Duration) {
104 let us = delta.as_micros().min(u64::MAX as u128) as u64;
105 self.offset_us.fetch_add(us, Ordering::Release);
106 }
107
108 #[must_use]
110 pub fn now(&self) -> Instant {
111 let offset = Duration::from_micros(self.offset_us.load(Ordering::Acquire));
112 self.epoch + offset
113 }
114}
115
116impl Default for LabClock {
117 fn default() -> Self {
118 Self::new()
119 }
120}
121
122#[derive(Debug)]
125struct CxInner {
126 id: u64,
127 cancelled: AtomicBool,
128 deadline_us: u64,
130 created_at: Instant,
131 time_source: TimeSource,
132 parent: Option<Arc<CxInner>>,
134}
135
136#[derive(Clone, Debug)]
143pub struct Cx {
144 inner: Arc<CxInner>,
145}
146
147impl Cx {
148 #[must_use]
152 pub fn background() -> (Self, CxController) {
153 Self::new_inner(u64::MAX, TimeSource::Real, None)
154 }
155
156 #[must_use]
158 pub fn with_deadline(deadline: Duration) -> (Self, CxController) {
159 let us = deadline.as_micros().min(u64::MAX as u128) as u64;
160 Self::new_inner(us, TimeSource::Real, None)
161 }
162
163 #[must_use]
165 pub fn lab(clock: &LabClock) -> (Self, CxController) {
166 Self::new_inner(u64::MAX, TimeSource::Lab(clock.clone()), None)
167 }
168
169 #[must_use]
171 pub fn lab_with_deadline(clock: &LabClock, deadline: Duration) -> (Self, CxController) {
172 let us = deadline.as_micros().min(u64::MAX as u128) as u64;
173 Self::new_inner(us, TimeSource::Lab(clock.clone()), None)
174 }
175
176 #[must_use]
181 pub fn child(&self, deadline: Duration) -> (Self, CxController) {
182 let us = deadline.as_micros().min(u64::MAX as u128) as u64;
183 let time_source = match &self.inner.time_source {
184 TimeSource::Real => TimeSource::Real,
185 TimeSource::Lab(c) => TimeSource::Lab(c.clone()),
186 };
187 Self::new_inner(us, time_source, Some(self.inner.clone()))
188 }
189
190 #[must_use]
192 pub fn child_inherit(&self) -> (Self, CxController) {
193 let time_source = match &self.inner.time_source {
194 TimeSource::Real => TimeSource::Real,
195 TimeSource::Lab(c) => TimeSource::Lab(c.clone()),
196 };
197 Self::new_inner(u64::MAX, time_source, Some(self.inner.clone()))
198 }
199
200 fn new_inner(
201 deadline_us: u64,
202 time_source: TimeSource,
203 parent: Option<Arc<CxInner>>,
204 ) -> (Self, CxController) {
205 let now = match &time_source {
206 TimeSource::Real => Instant::now(),
207 TimeSource::Lab(c) => c.now(),
208 };
209 let inner = Arc::new(CxInner {
210 id: next_cx_id(),
211 cancelled: AtomicBool::new(false),
212 deadline_us,
213 created_at: now,
214 time_source,
215 parent,
216 });
217 let cx = Self {
218 inner: inner.clone(),
219 };
220 let ctrl = CxController { inner };
221 (cx, ctrl)
222 }
223
224 #[inline]
228 #[must_use]
229 pub fn id(&self) -> u64 {
230 self.inner.id
231 }
232
233 #[inline]
235 #[must_use]
236 pub fn is_cancelled(&self) -> bool {
237 self.is_cancelled_inner(&self.inner)
238 }
239
240 fn is_cancelled_inner(&self, inner: &CxInner) -> bool {
241 if inner.cancelled.load(Ordering::Acquire) {
242 return true;
243 }
244 if let Some(ref parent) = inner.parent {
245 return self.is_cancelled_inner(parent);
246 }
247 false
248 }
249
250 #[inline]
252 #[must_use]
253 pub fn is_expired(&self) -> bool {
254 self.remaining().is_some_and(|d| d.is_zero())
255 }
256
257 #[inline]
259 #[must_use]
260 pub fn is_done(&self) -> bool {
261 self.is_cancelled() || self.is_expired()
262 }
263
264 #[must_use]
267 pub fn deadline(&self) -> Option<Duration> {
268 let own = self.inner.deadline_us;
269 let parent_remaining = self.parent_remaining_us();
270
271 let now = self.now();
272 let elapsed = now
273 .checked_duration_since(self.inner.created_at)
274 .unwrap_or(Duration::ZERO);
275 let elapsed_us = elapsed.as_micros().min(u64::MAX as u128) as u64;
276
277 let own_remaining = if own == u64::MAX {
279 u64::MAX
280 } else {
281 own.saturating_sub(elapsed_us)
282 };
283
284 let effective = own_remaining.min(parent_remaining);
285 if effective == u64::MAX {
286 None
287 } else {
288 Some(Duration::from_micros(effective))
289 }
290 }
291
292 #[must_use]
295 pub fn remaining(&self) -> Option<Duration> {
296 self.deadline()
297 }
298
299 #[must_use]
301 pub fn remaining_us(&self) -> Option<u64> {
302 self.remaining()
303 .map(|d| d.as_micros().min(u64::MAX as u128) as u64)
304 }
305
306 #[must_use]
308 pub fn now(&self) -> Instant {
309 match &self.inner.time_source {
310 TimeSource::Real => Instant::now(),
311 TimeSource::Lab(c) => c.now(),
312 }
313 }
314
315 #[inline]
317 #[must_use]
318 pub fn is_lab(&self) -> bool {
319 matches!(self.inner.time_source, TimeSource::Lab(_))
320 }
321
322 fn parent_remaining_us(&self) -> u64 {
323 match &self.inner.parent {
324 Some(parent) => {
325 let parent_cx = Cx {
326 inner: parent.clone(),
327 };
328 parent_cx.remaining_us().unwrap_or(u64::MAX)
329 }
330 None => u64::MAX,
331 }
332 }
333
334 pub fn sleep(&self, duration: Duration) -> bool {
341 let effective = match self.remaining() {
342 Some(rem) => duration.min(rem),
343 None => duration,
344 };
345 if effective.is_zero() || self.is_cancelled() {
346 return false;
347 }
348
349 let chunk = Duration::from_millis(10);
351 let mut remaining = effective;
352 while remaining > Duration::ZERO && !self.is_cancelled() {
353 let sleep_time = remaining.min(chunk);
354 std::thread::sleep(sleep_time);
355 remaining = remaining.saturating_sub(sleep_time);
356 }
357 !self.is_cancelled() && remaining.is_zero()
358 }
359}
360
361#[derive(Debug)]
369pub struct CxController {
370 inner: Arc<CxInner>,
371}
372
373impl CxController {
374 pub fn cancel(&self) {
378 let was_cancelled = self.inner.cancelled.swap(true, Ordering::Release);
379 if !was_cancelled {
380 CX_CANCELLATIONS_TOTAL.fetch_add(1, Ordering::Relaxed);
381 warn!(cx_id = self.inner.id, "cx cancelled");
382 }
383 }
384
385 #[inline]
387 #[must_use]
388 pub fn is_cancelled(&self) -> bool {
389 self.inner.cancelled.load(Ordering::Acquire)
390 }
391}
392
393#[derive(Debug, Clone, PartialEq, Eq)]
397pub enum CxError {
398 Cancelled,
400 DeadlineExceeded,
402}
403
404impl std::fmt::Display for CxError {
405 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
406 match self {
407 Self::Cancelled => write!(f, "context cancelled"),
408 Self::DeadlineExceeded => write!(f, "deadline exceeded"),
409 }
410 }
411}
412
413impl std::error::Error for CxError {}
414
415impl Cx {
416 pub fn check(&self) -> Result<(), CxError> {
424 let cancelled = self.is_cancelled();
425 let deadline_remaining_us = self.remaining_us().unwrap_or(u64::MAX);
426 let propagate_span = info_span!(
427 "cx.propagate",
428 cx_id = self.id(),
429 deadline_remaining_us,
430 cx_cancelled = cancelled
431 );
432 let _propagate_span = propagate_span.enter();
433
434 if cancelled {
435 warn!(
436 cx_id = self.id(),
437 deadline_remaining_us, "cx.propagate cancelled"
438 );
439 return Err(CxError::Cancelled);
440 }
441 if deadline_remaining_us == 0 {
442 return Err(CxError::DeadlineExceeded);
443 }
444 Ok(())
445 }
446}
447
448#[cfg(test)]
451mod tests {
452 use super::*;
453 use std::sync::mpsc;
454 #[cfg(feature = "tracing-json")]
455 use std::sync::{Arc, Mutex};
456
457 #[cfg(feature = "tracing-json")]
458 use tracing::Subscriber;
459 #[cfg(feature = "tracing-json")]
460 use tracing::field::{Field, Visit};
461 #[cfg(feature = "tracing-json")]
462 use tracing_subscriber::filter::LevelFilter;
463 #[cfg(feature = "tracing-json")]
464 use tracing_subscriber::layer::{Context, Layer};
465 #[cfg(feature = "tracing-json")]
466 use tracing_subscriber::prelude::*;
467 #[cfg(feature = "tracing-json")]
468 use tracing_subscriber::registry::LookupSpan;
469
470 #[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
471 struct TestModel {
472 value: u32,
473 }
474
475 #[derive(Debug, Clone, Copy, PartialEq, Eq)]
476 enum RenderStrategy {
477 Full,
478 Degraded,
479 }
480
481 #[derive(Debug, Clone, PartialEq, Eq)]
482 enum MissingCxError {
483 MissingContext,
484 Cx(CxError),
485 }
486
487 fn update_with_cx(cx: &Cx, model: &mut TestModel, delta: u32) -> Result<(), CxError> {
488 cx.check()?;
489 model.value = model.value.saturating_add(delta);
490 Ok(())
491 }
492
493 fn update_with_optional_cx(
494 cx: Option<&Cx>,
495 model: &mut TestModel,
496 delta: u32,
497 ) -> Result<(), MissingCxError> {
498 let cx = cx.ok_or(MissingCxError::MissingContext)?;
499 update_with_cx(cx, model, delta).map_err(MissingCxError::Cx)
500 }
501
502 fn render_widget_with_cx_atomic(
503 cx: &Cx,
504 source: &str,
505 sink: &mut String,
506 mut on_chunk: impl FnMut(usize),
507 ) -> Result<(), CxError> {
508 let mut scratch = String::new();
509 for (idx, ch) in source.chars().enumerate() {
510 scratch.push(ch);
511 on_chunk(idx);
512 cx.check()?;
513 }
514 sink.push_str(&scratch);
515 Ok(())
516 }
517
518 fn subscription_send_with_cx(
519 cx: &Cx,
520 sender: &mpsc::Sender<u32>,
521 payload: u32,
522 ) -> Result<(), CxError> {
523 cx.check()?;
524 sender.send(payload).map_err(|_| CxError::Cancelled)
525 }
526
527 fn choose_render_strategy(cx: &Cx, downgrade_threshold: Duration) -> RenderStrategy {
528 match cx.remaining() {
529 Some(remaining) if remaining <= downgrade_threshold => RenderStrategy::Degraded,
530 _ => RenderStrategy::Full,
531 }
532 }
533
534 #[test]
535 fn background_cx_is_not_cancelled() {
536 let (cx, _ctrl) = Cx::background();
537 assert!(!cx.is_cancelled());
538 assert!(!cx.is_expired());
539 assert!(!cx.is_done());
540 assert!(cx.deadline().is_none());
541 }
542
543 #[test]
544 fn cancel_propagates() {
545 let (cx, ctrl) = Cx::background();
546 assert!(!cx.is_cancelled());
547 ctrl.cancel();
548 assert!(cx.is_cancelled());
549 assert!(cx.is_done());
550 }
551
552 #[test]
553 fn clone_shares_cancellation() {
554 let (cx, ctrl) = Cx::background();
555 let cx2 = cx.clone();
556 ctrl.cancel();
557 assert!(cx.is_cancelled());
558 assert!(cx2.is_cancelled());
559 }
560
561 #[test]
562 fn deadline_reports_remaining() {
563 let (cx, _ctrl) = Cx::with_deadline(Duration::from_secs(10));
564 let rem = cx.remaining().expect("should have deadline");
565 assert!(rem.as_secs() >= 9);
567 }
568
569 #[test]
570 fn child_inherits_cancellation() {
571 let (parent, parent_ctrl) = Cx::background();
572 let (child, _child_ctrl) = parent.child(Duration::from_secs(60));
573 assert!(!child.is_cancelled());
574 parent_ctrl.cancel();
575 assert!(child.is_cancelled());
576 }
577
578 #[test]
579 fn child_has_tighter_deadline() {
580 let (parent, _) = Cx::with_deadline(Duration::from_secs(60));
581 let (child, _) = parent.child(Duration::from_millis(100));
582 let child_rem = child.remaining().expect("child has deadline");
583 assert!(child_rem < Duration::from_secs(1));
585 }
586
587 #[test]
588 fn child_respects_parent_tighter_deadline() {
589 let (parent, _) = Cx::with_deadline(Duration::from_millis(50));
590 let (child, _) = parent.child(Duration::from_secs(60));
591 let child_rem = child.remaining().expect("child has deadline via parent");
592 assert!(child_rem < Duration::from_secs(1));
594 }
595
596 #[test]
597 fn lab_clock_deterministic() {
598 let clock = LabClock::new();
599 let (cx, _ctrl) = Cx::lab_with_deadline(&clock, Duration::from_millis(100));
600
601 let r1 = cx.remaining().expect("has deadline");
603 assert!(r1 >= Duration::from_millis(90));
604
605 clock.advance(Duration::from_millis(80));
607 let r2 = cx.remaining().expect("has deadline");
608 assert!(r2 <= Duration::from_millis(25));
609 assert!(!cx.is_expired());
610
611 clock.advance(Duration::from_millis(30));
613 assert!(cx.is_expired());
614 assert!(cx.is_done());
615 }
616
617 #[test]
618 fn check_returns_ok_when_live() {
619 let (cx, _ctrl) = Cx::background();
620 assert!(cx.check().is_ok());
621 }
622
623 #[test]
624 fn check_returns_cancelled() {
625 let (cx, ctrl) = Cx::background();
626 ctrl.cancel();
627 assert_eq!(cx.check(), Err(CxError::Cancelled));
628 }
629
630 #[test]
631 fn check_returns_deadline_exceeded() {
632 let clock = LabClock::new();
633 let (cx, _ctrl) = Cx::lab_with_deadline(&clock, Duration::from_millis(10));
634 clock.advance(Duration::from_millis(20));
635 assert_eq!(cx.check(), Err(CxError::DeadlineExceeded));
636 }
637
638 #[test]
639 fn cx_id_is_unique() {
640 let (cx1, _) = Cx::background();
641 let (cx2, _) = Cx::background();
642 assert_ne!(cx1.id(), cx2.id());
643 }
644
645 #[test]
646 fn cx_is_lab() {
647 let clock = LabClock::new();
648 let (cx_lab, _) = Cx::lab(&clock);
649 let (cx_real, _) = Cx::background();
650 assert!(cx_lab.is_lab());
651 assert!(!cx_real.is_lab());
652 }
653
654 #[test]
655 fn child_inherit_no_deadline() {
656 let (parent, _) = Cx::background();
657 let (child, _) = parent.child_inherit();
658 assert!(child.deadline().is_none());
659 }
660
661 #[test]
662 fn child_inherit_with_parent_deadline() {
663 let (parent, _) = Cx::with_deadline(Duration::from_secs(30));
664 let (child, _) = parent.child_inherit();
665 let rem = child.remaining().expect("inherits parent deadline");
667 assert!(rem > Duration::from_secs(28));
668 }
669
670 #[test]
671 fn cx_error_display() {
672 assert_eq!(CxError::Cancelled.to_string(), "context cancelled");
673 assert_eq!(CxError::DeadlineExceeded.to_string(), "deadline exceeded");
674 }
675
676 #[test]
677 fn controller_is_cancelled_matches_cx() {
678 let (cx, ctrl) = Cx::background();
679 assert!(!ctrl.is_cancelled());
680 ctrl.cancel();
681 assert!(ctrl.is_cancelled());
682 assert!(cx.is_cancelled());
683 }
684
685 #[test]
686 fn double_cancel_is_idempotent() {
687 let (cx, ctrl) = Cx::background();
688 ctrl.cancel();
689 ctrl.cancel();
690 assert!(cx.is_cancelled());
691 }
692
693 #[test]
694 fn lab_clock_advance_accumulates() {
695 let clock = LabClock::new();
696 let t0 = clock.now();
697 clock.advance(Duration::from_millis(100));
698 clock.advance(Duration::from_millis(200));
699 let elapsed = clock.now().duration_since(t0);
700 assert!(elapsed >= Duration::from_millis(290));
702 assert!(elapsed <= Duration::from_millis(310));
703 }
704
705 #[test]
706 fn cancellation_counter_increments() {
707 let before = cx_cancellations_total();
708 let (_cx, ctrl) = Cx::background();
709 ctrl.cancel();
710 assert!(cx_cancellations_total() > before);
711 let after_first = cx_cancellations_total();
713 ctrl.cancel();
714 assert_eq!(cx_cancellations_total(), after_first);
715 }
716
717 #[test]
718 fn sleep_respects_cancellation() {
719 let (cx, ctrl) = Cx::background();
720 ctrl.cancel();
722 let completed = cx.sleep(Duration::from_secs(10));
723 assert!(!completed);
724 }
725
726 #[test]
727 fn sleep_respects_lab_deadline() {
728 let clock = LabClock::new();
729 let (cx, _ctrl) = Cx::lab_with_deadline(&clock, Duration::from_millis(5));
730 clock.advance(Duration::from_millis(10));
732 let completed = cx.sleep(Duration::from_secs(10));
733 assert!(!completed);
734 }
735
736 #[test]
737 fn cx_propagates_across_update_render_and_subscription_phases() {
738 let clock = LabClock::new();
739 let (cx, _ctrl) = Cx::lab_with_deadline(&clock, Duration::from_millis(100));
740 let mut model = TestModel::default();
741
742 update_with_cx(&cx, &mut model, 7).expect("update should respect live cx");
743 let mut rendered = String::new();
744 render_widget_with_cx_atomic(&cx, "ok", &mut rendered, |_| {})
745 .expect("render should respect live cx");
746
747 let (tx, rx) = mpsc::channel();
748 subscription_send_with_cx(&cx, &tx, model.value)
749 .expect("subscription should respect live cx");
750
751 assert_eq!(model.value, 7);
752 assert_eq!(rendered, "ok");
753 assert_eq!(rx.try_recv().expect("subscription payload"), 7);
754 }
755
756 #[test]
757 fn cancellation_mid_render_aborts_without_partial_output() {
758 let clock = LabClock::new();
759 let (cx, ctrl) = Cx::lab(&clock);
760 let mut sink = String::from("prefix:");
761
762 let result = render_widget_with_cx_atomic(&cx, "render-me", &mut sink, |idx| {
763 if idx == 2 {
764 ctrl.cancel();
765 }
766 });
767
768 assert_eq!(result, Err(CxError::Cancelled));
769 assert_eq!(sink, "prefix:");
770 }
771
772 #[test]
773 fn deadline_enforcement_triggers_strategy_downgrade() {
774 let clock = LabClock::new();
775 let (cx, _ctrl) = Cx::lab_with_deadline(&clock, Duration::from_millis(50));
776
777 assert_eq!(
778 choose_render_strategy(&cx, Duration::from_millis(10)),
779 RenderStrategy::Full
780 );
781
782 clock.advance(Duration::from_millis(45));
783 assert_eq!(
784 choose_render_strategy(&cx, Duration::from_millis(10)),
785 RenderStrategy::Degraded
786 );
787 }
788
789 #[test]
790 fn missing_cx_returns_clear_runtime_error() {
791 let mut model = TestModel::default();
792 let err = update_with_optional_cx(None, &mut model, 1).expect_err("missing cx");
793 assert_eq!(err, MissingCxError::MissingContext);
794 assert_eq!(model.value, 0, "state should remain unchanged without cx");
795 }
796
797 #[cfg(feature = "tracing-json")]
798 #[derive(Default, Clone)]
799 struct TraceCaptureLayer {
800 spans: Arc<Mutex<Vec<String>>>,
801 events: Arc<Mutex<Vec<String>>>,
802 }
803
804 #[cfg(feature = "tracing-json")]
805 #[derive(Default)]
806 struct EventMessageVisitor {
807 message: Option<String>,
808 }
809
810 #[cfg(feature = "tracing-json")]
811 impl Visit for EventMessageVisitor {
812 fn record_str(&mut self, field: &Field, value: &str) {
813 if field.name() == "message" {
814 self.message = Some(value.to_string());
815 }
816 }
817
818 fn record_debug(&mut self, field: &Field, value: &dyn std::fmt::Debug) {
819 if field.name() == "message" {
820 self.message = Some(format!("{value:?}"));
821 }
822 }
823 }
824
825 #[cfg(feature = "tracing-json")]
826 impl<S> Layer<S> for TraceCaptureLayer
827 where
828 S: Subscriber + for<'lookup> LookupSpan<'lookup>,
829 {
830 fn on_new_span(
831 &self,
832 attrs: &tracing::span::Attributes<'_>,
833 _id: &tracing::span::Id,
834 _ctx: Context<'_, S>,
835 ) {
836 self.spans
837 .lock()
838 .expect("span capture lock")
839 .push(attrs.metadata().name().to_string());
840 }
841
842 fn on_event(&self, event: &tracing::Event<'_>, _ctx: Context<'_, S>) {
843 let mut visitor = EventMessageVisitor::default();
844 event.record(&mut visitor);
845 let message = visitor.message.unwrap_or_default();
846 self.events
847 .lock()
848 .expect("event capture lock")
849 .push(format!("{}:{}", event.metadata().level(), message));
850 }
851 }
852
853 #[cfg(feature = "tracing-json")]
854 #[test]
855 fn check_emits_cx_propagate_span_and_warns_on_cancellation() {
856 let capture = TraceCaptureLayer::default();
857 let subscriber =
858 tracing_subscriber::registry().with(capture.clone().with_filter(LevelFilter::TRACE));
859 let _guard = tracing::subscriber::set_default(subscriber);
860
861 let (cx, ctrl) = Cx::background();
862 ctrl.cancel();
863 let result = cx.check();
864 assert_eq!(result, Err(CxError::Cancelled));
865
866 let spans = capture.spans.lock().expect("span capture lock");
867 assert!(
868 spans.iter().any(|name| name == "cx.propagate"),
869 "expected cx.propagate span, got {spans:?}"
870 );
871 drop(spans);
872
873 let events = capture.events.lock().expect("event capture lock");
874 assert!(
875 events
876 .iter()
877 .any(|event| event.contains("WARN:cx cancelled")),
878 "expected cancellation WARN event, got {events:?}"
879 );
880 assert!(
881 events
882 .iter()
883 .any(|event| event.contains("WARN:cx.propagate cancelled")),
884 "expected cx.propagate WARN event, got {events:?}"
885 );
886 }
887}