1use std::sync::Arc;
41use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
42use web_time::{Duration, Instant};
43
44#[cfg(feature = "tracing")]
46use crate::logging::warn;
47#[cfg(not(feature = "tracing"))]
48use crate::warn;
49
50static NEXT_CX_ID: AtomicU64 = AtomicU64::new(1);
53
54fn next_cx_id() -> u64 {
55 NEXT_CX_ID.fetch_add(1, Ordering::Relaxed)
56}
57
58static CX_CANCELLATIONS_TOTAL: AtomicU64 = AtomicU64::new(0);
62
63#[must_use]
65pub fn cx_cancellations_total() -> u64 {
66 CX_CANCELLATIONS_TOTAL.load(Ordering::Relaxed)
67}
68
69#[derive(Debug, Clone)]
76enum TimeSource {
77 Real,
79 Lab(LabClock),
81}
82
83#[derive(Debug, Clone)]
87pub struct LabClock {
88 epoch: Instant,
89 offset_us: Arc<AtomicU64>,
90}
91
92impl LabClock {
93 #[must_use]
95 pub fn new() -> Self {
96 Self {
97 epoch: Instant::now(),
98 offset_us: Arc::new(AtomicU64::new(0)),
99 }
100 }
101
102 pub fn advance(&self, delta: Duration) {
104 let us = delta.as_micros().min(u64::MAX as u128) as u64;
105 self.offset_us.fetch_add(us, Ordering::Release);
106 }
107
108 #[must_use]
110 pub fn now(&self) -> Instant {
111 let offset = Duration::from_micros(self.offset_us.load(Ordering::Acquire));
112 self.epoch + offset
113 }
114}
115
116impl Default for LabClock {
117 fn default() -> Self {
118 Self::new()
119 }
120}
121
122#[derive(Debug)]
125struct CxInner {
126 id: u64,
127 cancelled: AtomicBool,
128 deadline_us: u64,
130 created_at: Instant,
131 time_source: TimeSource,
132 parent: Option<Arc<CxInner>>,
134}
135
136#[derive(Clone, Debug)]
143pub struct Cx {
144 inner: Arc<CxInner>,
145}
146
147impl Cx {
148 #[must_use]
152 pub fn background() -> (Self, CxController) {
153 Self::new_inner(u64::MAX, TimeSource::Real, None)
154 }
155
156 #[must_use]
158 pub fn with_deadline(deadline: Duration) -> (Self, CxController) {
159 let us = deadline.as_micros().min(u64::MAX as u128) as u64;
160 Self::new_inner(us, TimeSource::Real, None)
161 }
162
163 #[must_use]
165 pub fn lab(clock: &LabClock) -> (Self, CxController) {
166 Self::new_inner(u64::MAX, TimeSource::Lab(clock.clone()), None)
167 }
168
169 #[must_use]
171 pub fn lab_with_deadline(clock: &LabClock, deadline: Duration) -> (Self, CxController) {
172 let us = deadline.as_micros().min(u64::MAX as u128) as u64;
173 Self::new_inner(us, TimeSource::Lab(clock.clone()), None)
174 }
175
176 #[must_use]
181 pub fn child(&self, deadline: Duration) -> (Self, CxController) {
182 let us = deadline.as_micros().min(u64::MAX as u128) as u64;
183 let time_source = match &self.inner.time_source {
184 TimeSource::Real => TimeSource::Real,
185 TimeSource::Lab(c) => TimeSource::Lab(c.clone()),
186 };
187 Self::new_inner(us, time_source, Some(self.inner.clone()))
188 }
189
190 #[must_use]
192 pub fn child_inherit(&self) -> (Self, CxController) {
193 let time_source = match &self.inner.time_source {
194 TimeSource::Real => TimeSource::Real,
195 TimeSource::Lab(c) => TimeSource::Lab(c.clone()),
196 };
197 Self::new_inner(u64::MAX, time_source, Some(self.inner.clone()))
198 }
199
200 fn new_inner(
201 deadline_us: u64,
202 time_source: TimeSource,
203 parent: Option<Arc<CxInner>>,
204 ) -> (Self, CxController) {
205 let now = match &time_source {
206 TimeSource::Real => Instant::now(),
207 TimeSource::Lab(c) => c.now(),
208 };
209 let inner = Arc::new(CxInner {
210 id: next_cx_id(),
211 cancelled: AtomicBool::new(false),
212 deadline_us,
213 created_at: now,
214 time_source,
215 parent,
216 });
217 let cx = Self {
218 inner: inner.clone(),
219 };
220 let ctrl = CxController { inner };
221 (cx, ctrl)
222 }
223
224 #[inline]
228 #[must_use]
229 pub fn id(&self) -> u64 {
230 self.inner.id
231 }
232
233 #[inline]
235 #[must_use]
236 pub fn is_cancelled(&self) -> bool {
237 self.is_cancelled_inner(&self.inner)
238 }
239
240 fn is_cancelled_inner(&self, inner: &CxInner) -> bool {
241 if inner.cancelled.load(Ordering::Acquire) {
242 return true;
243 }
244 if let Some(ref parent) = inner.parent {
245 return self.is_cancelled_inner(parent);
246 }
247 false
248 }
249
250 #[inline]
252 #[must_use]
253 pub fn is_expired(&self) -> bool {
254 self.remaining().is_some_and(|d| d.is_zero())
255 }
256
257 #[inline]
259 #[must_use]
260 pub fn is_done(&self) -> bool {
261 self.is_cancelled() || self.is_expired()
262 }
263
264 #[must_use]
267 pub fn deadline(&self) -> Option<Duration> {
268 let own = self.inner.deadline_us;
269 let parent_remaining = self.parent_remaining_us();
270
271 let now = self.now();
272 let elapsed = now
273 .checked_duration_since(self.inner.created_at)
274 .unwrap_or(Duration::ZERO);
275 let elapsed_us = elapsed.as_micros().min(u64::MAX as u128) as u64;
276
277 let own_remaining = if own == u64::MAX {
279 u64::MAX
280 } else {
281 own.saturating_sub(elapsed_us)
282 };
283
284 let effective = own_remaining.min(parent_remaining);
285 if effective == u64::MAX {
286 None
287 } else {
288 Some(Duration::from_micros(effective))
289 }
290 }
291
292 #[must_use]
295 pub fn remaining(&self) -> Option<Duration> {
296 self.deadline()
297 }
298
299 #[must_use]
301 pub fn remaining_us(&self) -> Option<u64> {
302 self.remaining()
303 .map(|d| d.as_micros().min(u64::MAX as u128) as u64)
304 }
305
306 #[must_use]
308 pub fn now(&self) -> Instant {
309 match &self.inner.time_source {
310 TimeSource::Real => Instant::now(),
311 TimeSource::Lab(c) => c.now(),
312 }
313 }
314
315 #[inline]
317 #[must_use]
318 pub fn is_lab(&self) -> bool {
319 matches!(self.inner.time_source, TimeSource::Lab(_))
320 }
321
322 fn parent_remaining_us(&self) -> u64 {
323 match &self.inner.parent {
324 Some(parent) => {
325 let parent_cx = Cx {
326 inner: parent.clone(),
327 };
328 parent_cx.remaining_us().unwrap_or(u64::MAX)
329 }
330 None => u64::MAX,
331 }
332 }
333
334 pub fn sleep(&self, duration: Duration) -> bool {
341 let effective = match self.remaining() {
342 Some(rem) => duration.min(rem),
343 None => duration,
344 };
345 if effective.is_zero() || self.is_cancelled() {
346 return false;
347 }
348
349 let chunk = Duration::from_millis(10);
351 let mut remaining = effective;
352 while remaining > Duration::ZERO && !self.is_cancelled() {
353 let sleep_time = remaining.min(chunk);
354 std::thread::sleep(sleep_time);
355 remaining = remaining.saturating_sub(sleep_time);
356 }
357 !self.is_cancelled() && remaining.is_zero()
358 }
359}
360
361#[derive(Debug)]
369pub struct CxController {
370 inner: Arc<CxInner>,
371}
372
373impl CxController {
374 pub fn cancel(&self) {
378 let was_cancelled = self.inner.cancelled.swap(true, Ordering::Release);
379 if !was_cancelled {
380 CX_CANCELLATIONS_TOTAL.fetch_add(1, Ordering::Relaxed);
381 warn!(cx_id = self.inner.id, "cx cancelled");
382 }
383 }
384
385 #[inline]
387 #[must_use]
388 pub fn is_cancelled(&self) -> bool {
389 self.inner.cancelled.load(Ordering::Acquire)
390 }
391}
392
393#[derive(Debug, Clone, PartialEq, Eq)]
397pub enum CxError {
398 Cancelled,
400 DeadlineExceeded,
402}
403
404impl std::fmt::Display for CxError {
405 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
406 match self {
407 Self::Cancelled => write!(f, "context cancelled"),
408 Self::DeadlineExceeded => write!(f, "deadline exceeded"),
409 }
410 }
411}
412
413impl std::error::Error for CxError {}
414
415impl Cx {
416 pub fn check(&self) -> Result<(), CxError> {
424 if self.is_cancelled() {
425 return Err(CxError::Cancelled);
426 }
427 if self.is_expired() {
428 return Err(CxError::DeadlineExceeded);
429 }
430 Ok(())
431 }
432}
433
434#[cfg(test)]
437mod tests {
438 use super::*;
439
440 #[test]
441 fn background_cx_is_not_cancelled() {
442 let (cx, _ctrl) = Cx::background();
443 assert!(!cx.is_cancelled());
444 assert!(!cx.is_expired());
445 assert!(!cx.is_done());
446 assert!(cx.deadline().is_none());
447 }
448
449 #[test]
450 fn cancel_propagates() {
451 let (cx, ctrl) = Cx::background();
452 assert!(!cx.is_cancelled());
453 ctrl.cancel();
454 assert!(cx.is_cancelled());
455 assert!(cx.is_done());
456 }
457
458 #[test]
459 fn clone_shares_cancellation() {
460 let (cx, ctrl) = Cx::background();
461 let cx2 = cx.clone();
462 ctrl.cancel();
463 assert!(cx.is_cancelled());
464 assert!(cx2.is_cancelled());
465 }
466
467 #[test]
468 fn deadline_reports_remaining() {
469 let (cx, _ctrl) = Cx::with_deadline(Duration::from_secs(10));
470 let rem = cx.remaining().expect("should have deadline");
471 assert!(rem.as_secs() >= 9);
473 }
474
475 #[test]
476 fn child_inherits_cancellation() {
477 let (parent, parent_ctrl) = Cx::background();
478 let (child, _child_ctrl) = parent.child(Duration::from_secs(60));
479 assert!(!child.is_cancelled());
480 parent_ctrl.cancel();
481 assert!(child.is_cancelled());
482 }
483
484 #[test]
485 fn child_has_tighter_deadline() {
486 let (parent, _) = Cx::with_deadline(Duration::from_secs(60));
487 let (child, _) = parent.child(Duration::from_millis(100));
488 let child_rem = child.remaining().expect("child has deadline");
489 assert!(child_rem < Duration::from_secs(1));
491 }
492
493 #[test]
494 fn child_respects_parent_tighter_deadline() {
495 let (parent, _) = Cx::with_deadline(Duration::from_millis(50));
496 let (child, _) = parent.child(Duration::from_secs(60));
497 let child_rem = child.remaining().expect("child has deadline via parent");
498 assert!(child_rem < Duration::from_secs(1));
500 }
501
502 #[test]
503 fn lab_clock_deterministic() {
504 let clock = LabClock::new();
505 let (cx, _ctrl) = Cx::lab_with_deadline(&clock, Duration::from_millis(100));
506
507 let r1 = cx.remaining().expect("has deadline");
509 assert!(r1 >= Duration::from_millis(90));
510
511 clock.advance(Duration::from_millis(80));
513 let r2 = cx.remaining().expect("has deadline");
514 assert!(r2 <= Duration::from_millis(25));
515 assert!(!cx.is_expired());
516
517 clock.advance(Duration::from_millis(30));
519 assert!(cx.is_expired());
520 assert!(cx.is_done());
521 }
522
523 #[test]
524 fn check_returns_ok_when_live() {
525 let (cx, _ctrl) = Cx::background();
526 assert!(cx.check().is_ok());
527 }
528
529 #[test]
530 fn check_returns_cancelled() {
531 let (cx, ctrl) = Cx::background();
532 ctrl.cancel();
533 assert_eq!(cx.check(), Err(CxError::Cancelled));
534 }
535
536 #[test]
537 fn check_returns_deadline_exceeded() {
538 let clock = LabClock::new();
539 let (cx, _ctrl) = Cx::lab_with_deadline(&clock, Duration::from_millis(10));
540 clock.advance(Duration::from_millis(20));
541 assert_eq!(cx.check(), Err(CxError::DeadlineExceeded));
542 }
543
544 #[test]
545 fn cx_id_is_unique() {
546 let (cx1, _) = Cx::background();
547 let (cx2, _) = Cx::background();
548 assert_ne!(cx1.id(), cx2.id());
549 }
550
551 #[test]
552 fn cx_is_lab() {
553 let clock = LabClock::new();
554 let (cx_lab, _) = Cx::lab(&clock);
555 let (cx_real, _) = Cx::background();
556 assert!(cx_lab.is_lab());
557 assert!(!cx_real.is_lab());
558 }
559
560 #[test]
561 fn child_inherit_no_deadline() {
562 let (parent, _) = Cx::background();
563 let (child, _) = parent.child_inherit();
564 assert!(child.deadline().is_none());
565 }
566
567 #[test]
568 fn child_inherit_with_parent_deadline() {
569 let (parent, _) = Cx::with_deadline(Duration::from_secs(30));
570 let (child, _) = parent.child_inherit();
571 let rem = child.remaining().expect("inherits parent deadline");
573 assert!(rem > Duration::from_secs(28));
574 }
575
576 #[test]
577 fn cx_error_display() {
578 assert_eq!(CxError::Cancelled.to_string(), "context cancelled");
579 assert_eq!(CxError::DeadlineExceeded.to_string(), "deadline exceeded");
580 }
581
582 #[test]
583 fn controller_is_cancelled_matches_cx() {
584 let (cx, ctrl) = Cx::background();
585 assert!(!ctrl.is_cancelled());
586 ctrl.cancel();
587 assert!(ctrl.is_cancelled());
588 assert!(cx.is_cancelled());
589 }
590
591 #[test]
592 fn double_cancel_is_idempotent() {
593 let (cx, ctrl) = Cx::background();
594 ctrl.cancel();
595 ctrl.cancel();
596 assert!(cx.is_cancelled());
597 }
598
599 #[test]
600 fn lab_clock_advance_accumulates() {
601 let clock = LabClock::new();
602 let t0 = clock.now();
603 clock.advance(Duration::from_millis(100));
604 clock.advance(Duration::from_millis(200));
605 let elapsed = clock.now().duration_since(t0);
606 assert!(elapsed >= Duration::from_millis(290));
608 assert!(elapsed <= Duration::from_millis(310));
609 }
610
611 #[test]
612 fn cancellation_counter_increments() {
613 let before = cx_cancellations_total();
614 let (_cx, ctrl) = Cx::background();
615 ctrl.cancel();
616 assert!(cx_cancellations_total() > before);
617 let after_first = cx_cancellations_total();
619 ctrl.cancel();
620 assert_eq!(cx_cancellations_total(), after_first);
621 }
622
623 #[test]
624 fn sleep_respects_cancellation() {
625 let (cx, ctrl) = Cx::background();
626 ctrl.cancel();
628 let completed = cx.sleep(Duration::from_secs(10));
629 assert!(!completed);
630 }
631
632 #[test]
633 fn sleep_respects_lab_deadline() {
634 let clock = LabClock::new();
635 let (cx, _ctrl) = Cx::lab_with_deadline(&clock, Duration::from_millis(5));
636 clock.advance(Duration::from_millis(10));
638 let completed = cx.sleep(Duration::from_secs(10));
639 assert!(!completed);
640 }
641}