1use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11use std::thread;
12use std::time::{Duration, Instant};
13
14use crate::handler::CancellationContext;
15
16const DEFAULT_WATCHDOG_POLL_INTERVAL: Duration = Duration::from_millis(1);
18
19#[cfg(test)]
20static ACTIVE_WATCHDOG_THREADS: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
21
22#[cfg(test)]
23struct ActiveWatchdogThreadGuard;
24
25#[cfg(test)]
26impl Drop for ActiveWatchdogThreadGuard {
27 fn drop(&mut self) {
28 ACTIVE_WATCHDOG_THREADS.fetch_sub(1, Ordering::SeqCst);
29 }
30}
31
32#[cfg(test)]
33fn active_watchdog_threads() -> u64 {
34 ACTIVE_WATCHDOG_THREADS.load(Ordering::SeqCst)
35}
36
37#[derive(Debug)]
38struct WatchdogLifecycle {
39 operation_completed: Arc<AtomicBool>,
40 cancellation_requested: Arc<AtomicBool>,
41 handle: Option<thread::JoinHandle<()>>,
42 watchdog_spawn_failed: bool,
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47struct WatchdogOutcome {
48 cancellation_requested: bool,
49 watchdog_joined: bool,
50 watchdog_spawn_failed: bool,
51}
52
53impl WatchdogLifecycle {
54 fn new(
55 timeout_secs: Option<u64>,
56 cancellation_context: &CancellationContext,
57 poll_interval: Duration,
58 ) -> Self {
59 let operation_completed = Arc::new(AtomicBool::new(false));
60 let cancellation_requested = Arc::new(AtomicBool::new(false));
61 let mut watchdog_spawn_failed = false;
62
63 let handle = timeout_secs.and_then(|secs| {
64 let completion = Arc::clone(&operation_completed);
65 let requested = Arc::clone(&cancellation_requested);
66 let context = cancellation_context.clone();
67 let deadline = Duration::from_secs(secs);
68
69 match thread::Builder::new().name("actionqueue-timeout-watchdog".to_string()).spawn(
70 move || {
71 #[cfg(test)]
72 ACTIVE_WATCHDOG_THREADS.fetch_add(1, Ordering::SeqCst);
73 #[cfg(test)]
74 let _active_watchdog_guard = ActiveWatchdogThreadGuard;
75
76 let started_at = Instant::now();
77 loop {
78 if completion.load(Ordering::SeqCst) {
79 return;
80 }
81
82 let elapsed = started_at.elapsed();
83 if elapsed >= deadline {
84 break;
85 }
86
87 let remaining = deadline.saturating_sub(elapsed);
88 thread::sleep(remaining.min(poll_interval));
89 }
90
91 if !completion.load(Ordering::SeqCst) {
92 context.cancel();
93 requested.store(true, Ordering::SeqCst);
94 }
95 },
96 ) {
97 Ok(handle) => Some(handle),
98 Err(_) => {
99 watchdog_spawn_failed = true;
100 None
101 }
102 }
103 });
104
105 Self { operation_completed, cancellation_requested, handle, watchdog_spawn_failed }
106 }
107
108 fn mark_operation_completed(&self) {
109 self.operation_completed.store(true, Ordering::SeqCst);
110 }
111
112 fn join_handle(&mut self) {
113 if let Some(handle) = self.handle.take() {
114 if let Err(panic_payload) = handle.join() {
115 if std::thread::panicking() {
116 std::process::abort();
117 }
118 std::panic::resume_unwind(panic_payload);
119 }
120 }
121 }
122
123 fn finish(mut self) -> WatchdogOutcome {
124 let watchdog_started = self.handle.is_some();
125 let spawn_failed = self.watchdog_spawn_failed;
126 self.mark_operation_completed();
127 self.join_handle();
128
129 WatchdogOutcome {
130 cancellation_requested: self.cancellation_requested.load(Ordering::SeqCst),
131 watchdog_joined: watchdog_started,
132 watchdog_spawn_failed: spawn_failed,
133 }
134 }
135}
136
137impl Drop for WatchdogLifecycle {
138 fn drop(&mut self) {
139 self.mark_operation_completed();
140 self.join_handle();
141 }
142}
143
144#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub enum TimeoutReasonCode {
147 DeadlineExceeded,
149 NoTimeoutConfigured,
151 WithinLimit,
153}
154
155#[derive(Debug, Clone, PartialEq, Eq)]
157pub struct TimeoutFailure {
158 pub timeout_secs: u64,
160 pub elapsed: Duration,
162 pub reason_code: TimeoutReasonCode,
164}
165
166#[derive(Debug, Clone, PartialEq, Eq)]
168#[must_use]
169pub enum TimeoutClassification {
170 NotConfigured {
172 elapsed: Duration,
174 reason_code: TimeoutReasonCode,
176 },
177 CompletedInTime {
179 timeout_secs: u64,
181 elapsed: Duration,
183 reason_code: TimeoutReasonCode,
185 },
186 TimedOut(TimeoutFailure),
188}
189
190impl TimeoutClassification {
191 pub fn is_timed_out(&self) -> bool {
193 matches!(self, TimeoutClassification::TimedOut(_))
194 }
195}
196
197#[derive(Debug, Clone, PartialEq, Eq)]
199#[must_use = "execution result should be inspected for timeout classification"]
200pub struct GuardedExecution<T> {
201 pub value: T,
203 pub elapsed: Duration,
205 pub timeout: TimeoutClassification,
207}
208
209#[derive(Debug, Clone, PartialEq, Eq)]
211#[must_use = "execution result should be inspected for timeout classification"]
212pub struct CancellableExecution<T> {
213 pub value: T,
215 pub elapsed: Duration,
217 pub timeout: TimeoutClassification,
219 pub cancel_requested: bool,
221 pub cancellation_observed: bool,
223 pub cancellation_observation_latency: Option<Duration>,
225 pub watchdog_joined: bool,
227 pub watchdog_spawn_failed: bool,
229}
230
231pub trait TimeoutClock {
233 type Mark: Copy;
235
236 fn mark_now(&self) -> Self::Mark;
238
239 fn elapsed_since(&self, mark: Self::Mark) -> Duration;
241}
242
243#[derive(Debug, Clone, Copy, Default)]
245pub struct SystemTimeoutClock;
246
247impl TimeoutClock for SystemTimeoutClock {
248 type Mark = Instant;
249
250 fn mark_now(&self) -> Self::Mark {
251 Instant::now()
252 }
253
254 fn elapsed_since(&self, mark: Self::Mark) -> Duration {
255 mark.elapsed()
256 }
257}
258
259#[derive(Debug, Clone)]
270pub struct TimeoutClassifier<C = SystemTimeoutClock> {
271 clock: C,
272}
273
274impl TimeoutClassifier<SystemTimeoutClock> {
275 pub fn new() -> Self {
277 Self { clock: SystemTimeoutClock }
278 }
279}
280
281impl Default for TimeoutClassifier<SystemTimeoutClock> {
282 fn default() -> Self {
283 Self::new()
284 }
285}
286
287impl<C> TimeoutClassifier<C>
288where
289 C: TimeoutClock,
290{
291 pub fn with_clock(clock: C) -> Self {
293 Self { clock }
294 }
295
296 pub fn classify(&self, timeout_secs: Option<u64>, elapsed: Duration) -> TimeoutClassification {
301 classify_timeout(timeout_secs, elapsed)
302 }
303
304 pub fn execute_and_classify<T, F>(
309 &self,
310 timeout_secs: Option<u64>,
311 operation: F,
312 ) -> GuardedExecution<T>
313 where
314 F: FnOnce() -> T,
315 {
316 let mark = self.clock.mark_now();
317 let value = operation();
318 let elapsed = self.clock.elapsed_since(mark);
319 let timeout = classify_timeout(timeout_secs, elapsed);
320
321 GuardedExecution { value, elapsed, timeout }
322 }
323}
324
325#[derive(Debug, Clone)]
330pub struct TimeoutGuard<C = SystemTimeoutClock> {
331 classifier: TimeoutClassifier<C>,
332 poll_interval: Duration,
333}
334
335impl TimeoutGuard<SystemTimeoutClock> {
336 pub fn new() -> Self {
338 Self { classifier: TimeoutClassifier::new(), poll_interval: DEFAULT_WATCHDOG_POLL_INTERVAL }
339 }
340}
341
342impl Default for TimeoutGuard<SystemTimeoutClock> {
343 fn default() -> Self {
344 Self::new()
345 }
346}
347
348impl<C> TimeoutGuard<C>
349where
350 C: TimeoutClock,
351{
352 pub fn with_clock(clock: C) -> Self {
354 Self {
355 classifier: TimeoutClassifier::with_clock(clock),
356 poll_interval: DEFAULT_WATCHDOG_POLL_INTERVAL,
357 }
358 }
359
360 pub fn with_poll_interval(mut self, interval: Duration) -> Self {
362 self.poll_interval = interval;
363 self
364 }
365
366 pub fn execute<T, F>(&self, timeout_secs: Option<u64>, operation: F) -> GuardedExecution<T>
372 where
373 F: FnOnce() -> T,
374 {
375 self.classifier.execute_and_classify(timeout_secs, operation)
376 }
377
378 pub fn execute_with_cancellation<T, F>(
392 &self,
393 timeout_secs: Option<u64>,
394 operation: F,
395 ) -> CancellableExecution<T>
396 where
397 F: FnOnce(&CancellationContext) -> T,
398 {
399 let context = CancellationContext::new();
400 self.execute_with_external_cancellation(timeout_secs, context, operation)
401 }
402
403 pub fn execute_with_external_cancellation<T, F>(
410 &self,
411 timeout_secs: Option<u64>,
412 cancellation_context: CancellationContext,
413 operation: F,
414 ) -> CancellableExecution<T>
415 where
416 F: FnOnce(&CancellationContext) -> T,
417 {
418 let mark = self.classifier.clock.mark_now();
419 let watchdog =
420 WatchdogLifecycle::new(timeout_secs, &cancellation_context, self.poll_interval);
421
422 let value = operation(&cancellation_context);
423 let watchdog_outcome = watchdog.finish();
424
425 let elapsed = self.classifier.clock.elapsed_since(mark);
426 let timeout = self.classifier.classify(timeout_secs, elapsed);
427 let cancel_requested = watchdog_outcome.cancellation_requested;
428 let cancellation_observed = cancellation_context.was_cancellation_observed();
429 let cancellation_observation_latency =
430 cancellation_context.cancellation_observation_latency();
431
432 CancellableExecution {
433 value,
434 elapsed,
435 timeout,
436 cancel_requested,
437 cancellation_observed,
438 cancellation_observation_latency,
439 watchdog_joined: watchdog_outcome.watchdog_joined,
440 watchdog_spawn_failed: watchdog_outcome.watchdog_spawn_failed,
441 }
442 }
443}
444
445pub fn classify_timeout(timeout_secs: Option<u64>, elapsed: Duration) -> TimeoutClassification {
459 match timeout_secs {
460 None => TimeoutClassification::NotConfigured {
461 elapsed,
462 reason_code: TimeoutReasonCode::NoTimeoutConfigured,
463 },
464 Some(secs) => {
465 let timeout = Duration::from_secs(secs);
466 if elapsed > timeout {
467 TimeoutClassification::TimedOut(TimeoutFailure {
468 timeout_secs: secs,
469 elapsed,
470 reason_code: TimeoutReasonCode::DeadlineExceeded,
471 })
472 } else {
473 TimeoutClassification::CompletedInTime {
474 timeout_secs: secs,
475 elapsed,
476 reason_code: TimeoutReasonCode::WithinLimit,
477 }
478 }
479 }
480 }
481}
482
483#[cfg(test)]
484mod tests {
485 use std::panic::{catch_unwind, AssertUnwindSafe};
486 use std::sync::atomic::{AtomicBool, Ordering};
487 use std::sync::Arc;
488 use std::thread;
489 use std::time::Duration;
490
491 use super::{
492 classify_timeout, TimeoutClassification, TimeoutClassifier, TimeoutClock, TimeoutFailure,
493 TimeoutGuard, TimeoutReasonCode,
494 };
495
496 #[derive(Debug, Clone, Copy)]
497 struct FixedClock {
498 elapsed: Duration,
499 }
500
501 impl TimeoutClock for FixedClock {
502 type Mark = ();
503
504 fn mark_now(&self) -> Self::Mark {}
505
506 fn elapsed_since(&self, _mark: Self::Mark) -> Duration {
507 self.elapsed
508 }
509 }
510
511 #[test]
512 fn classify_timeout_marks_not_configured_when_absent() {
513 let elapsed = Duration::from_secs(3);
514 let outcome = classify_timeout(None, elapsed);
515
516 assert_eq!(
517 outcome,
518 TimeoutClassification::NotConfigured {
519 elapsed,
520 reason_code: TimeoutReasonCode::NoTimeoutConfigured,
521 }
522 );
523 assert!(!outcome.is_timed_out());
524 }
525
526 #[test]
527 fn classify_timeout_marks_in_time_when_elapsed_meets_limit() {
528 let elapsed = Duration::from_secs(5);
529 let outcome = classify_timeout(Some(5), elapsed);
530
531 assert_eq!(
532 outcome,
533 TimeoutClassification::CompletedInTime {
534 timeout_secs: 5,
535 elapsed,
536 reason_code: TimeoutReasonCode::WithinLimit,
537 }
538 );
539 assert!(!outcome.is_timed_out());
540 }
541
542 #[test]
543 fn classify_timeout_marks_timeout_when_elapsed_exceeds_limit() {
544 let elapsed = Duration::from_secs(6);
545 let outcome = classify_timeout(Some(5), elapsed);
546
547 assert_eq!(
548 outcome,
549 TimeoutClassification::TimedOut(TimeoutFailure {
550 timeout_secs: 5,
551 elapsed,
552 reason_code: TimeoutReasonCode::DeadlineExceeded,
553 })
554 );
555 assert!(outcome.is_timed_out());
556 }
557
558 #[test]
559 fn guard_wraps_operation_and_reports_timeout_data() {
560 let guard = TimeoutGuard::with_clock(FixedClock { elapsed: Duration::from_secs(9) });
561
562 let guarded = guard.execute(Some(3), || 42u32);
563
564 assert_eq!(guarded.value, 42);
565 assert_eq!(guarded.elapsed, Duration::from_secs(9));
566 assert_eq!(
567 guarded.timeout,
568 TimeoutClassification::TimedOut(TimeoutFailure {
569 timeout_secs: 3,
570 elapsed: Duration::from_secs(9),
571 reason_code: TimeoutReasonCode::DeadlineExceeded,
572 })
573 );
574 }
575
576 #[test]
577 fn classifier_provides_side_effect_free_classification() {
578 let classifier =
579 TimeoutClassifier::with_clock(FixedClock { elapsed: Duration::from_secs(7) });
580
581 let classification = classifier.classify(Some(5), Duration::from_secs(7));
582
583 assert!(classification.is_timed_out());
584 assert_eq!(
585 classification,
586 TimeoutClassification::TimedOut(TimeoutFailure {
587 timeout_secs: 5,
588 elapsed: Duration::from_secs(7),
589 reason_code: TimeoutReasonCode::DeadlineExceeded,
590 })
591 );
592 }
593
594 #[test]
595 fn classifier_execute_and_classify_wraps_operation() {
596 let classifier =
597 TimeoutClassifier::with_clock(FixedClock { elapsed: Duration::from_secs(4) });
598
599 let result = classifier.execute_and_classify(Some(5), || 42u32);
600
601 assert_eq!(result.value, 42);
602 assert_eq!(result.elapsed, Duration::from_secs(4));
603 assert_eq!(
604 result.timeout,
605 TimeoutClassification::CompletedInTime {
606 timeout_secs: 5,
607 elapsed: Duration::from_secs(4),
608 reason_code: TimeoutReasonCode::WithinLimit,
609 }
610 );
611 }
612
613 #[test]
614 fn execute_with_cancellation_requests_deadline_signal_during_execution() {
615 let guard = TimeoutGuard::new();
616 let saw_cancel = Arc::new(AtomicBool::new(false));
617 let saw_cancel_clone = Arc::clone(&saw_cancel);
618
619 let result = guard.execute_with_cancellation(Some(0), move |context| {
620 for _ in 0..10_000 {
621 if context.token().is_cancelled() {
622 saw_cancel_clone.store(true, Ordering::SeqCst);
623 break;
624 }
625 std::hint::spin_loop();
626 }
627 9u8
628 });
629
630 assert_eq!(result.value, 9);
631 assert!(result.cancel_requested);
632 assert!(result.cancellation_observed);
633 assert!(result.cancellation_observation_latency.is_some());
634 assert!(result.watchdog_joined);
635 assert!(saw_cancel.load(Ordering::SeqCst));
636 assert!(result.timeout.is_timed_out());
637 }
638
639 #[test]
640 fn execute_with_cancellation_does_not_start_watchdog_when_timeout_is_disabled() {
641 let guard = TimeoutGuard::new();
642 let result = guard.execute_with_cancellation(None, |context| {
643 assert!(!context.token().is_cancelled());
644 11u8
645 });
646
647 assert_eq!(result.value, 11);
648 assert!(!result.cancel_requested);
649 assert!(!result.cancellation_observed);
650 assert_eq!(result.cancellation_observation_latency, None);
651 assert!(!result.watchdog_joined);
652 assert!(matches!(
653 result.timeout,
654 TimeoutClassification::NotConfigured {
655 reason_code: TimeoutReasonCode::NoTimeoutConfigured,
656 ..
657 }
658 ));
659 }
660
661 #[test]
662 fn d01_t_n4a_operation_panic_still_joins_watchdog_cleanup() {
663 let baseline = super::active_watchdog_threads();
664 let guard = TimeoutGuard::new();
665
666 let panic_result = catch_unwind(AssertUnwindSafe(|| {
667 let _ = guard.execute_with_cancellation(Some(30), |_context| -> u8 {
668 panic!("operation panic for cleanup-path verification");
669 });
670 }));
671
672 assert!(panic_result.is_err());
673
674 for _ in 0..100 {
675 if super::active_watchdog_threads() == baseline {
676 break;
677 }
678 thread::sleep(Duration::from_millis(2));
679 }
680
681 assert_eq!(super::active_watchdog_threads(), baseline);
682 }
683
684 #[test]
685 fn configurable_poll_interval_is_used() {
686 let guard = TimeoutGuard::new().with_poll_interval(Duration::from_millis(10));
687
688 let result = guard.execute_with_cancellation(Some(0), move |context| {
689 for _ in 0..10_000 {
691 if context.token().is_cancelled() {
692 return true;
693 }
694 std::hint::spin_loop();
695 }
696 false
697 });
698
699 assert!(result.cancel_requested);
700 assert!(result.watchdog_joined);
701 assert!(!result.watchdog_spawn_failed);
702 }
703}