Skip to main content

actionqueue_executor_local/
timeout.rs

1//! Timeout classification and enforcement APIs.
2//!
3//! This module provides two distinct concerns for timeout handling:
4//!
5//! - **Classification**: Pure measurement of elapsed time and timeout status
6//!   without side effects. See [`TimeoutClassifier`].
7//! - **Enforcement**: Active timeout control via cancellation. See [`TimeoutGuard`].
8
9use std::sync::atomic::{AtomicBool, Ordering};
10use std::sync::Arc;
11use std::thread;
12use std::time::{Duration, Instant};
13
14use crate::handler::CancellationContext;
15
16/// Default watchdog poll interval.
17const DEFAULT_WATCHDOG_POLL_INTERVAL: Duration = Duration::from_millis(1);
18
19#[cfg(test)]
20static ACTIVE_WATCHDOG_THREADS: std::sync::atomic::AtomicU64 = std::sync::atomic::AtomicU64::new(0);
21
22#[cfg(test)]
23struct ActiveWatchdogThreadGuard;
24
25#[cfg(test)]
26impl Drop for ActiveWatchdogThreadGuard {
27    fn drop(&mut self) {
28        ACTIVE_WATCHDOG_THREADS.fetch_sub(1, Ordering::SeqCst);
29    }
30}
31
32#[cfg(test)]
33fn active_watchdog_threads() -> u64 {
34    ACTIVE_WATCHDOG_THREADS.load(Ordering::SeqCst)
35}
36
37#[derive(Debug)]
38struct WatchdogLifecycle {
39    operation_completed: Arc<AtomicBool>,
40    cancellation_requested: Arc<AtomicBool>,
41    handle: Option<thread::JoinHandle<()>>,
42    /// True if thread::spawn failed (graceful degradation — no watchdog active).
43    watchdog_spawn_failed: bool,
44}
45
46#[derive(Debug, Clone, Copy, PartialEq, Eq)]
47struct WatchdogOutcome {
48    cancellation_requested: bool,
49    watchdog_joined: bool,
50    watchdog_spawn_failed: bool,
51}
52
53impl WatchdogLifecycle {
54    fn new(
55        timeout_secs: Option<u64>,
56        cancellation_context: &CancellationContext,
57        poll_interval: Duration,
58    ) -> Self {
59        let operation_completed = Arc::new(AtomicBool::new(false));
60        let cancellation_requested = Arc::new(AtomicBool::new(false));
61        let mut watchdog_spawn_failed = false;
62
63        let handle = timeout_secs.and_then(|secs| {
64            let completion = Arc::clone(&operation_completed);
65            let requested = Arc::clone(&cancellation_requested);
66            let context = cancellation_context.clone();
67            let deadline = Duration::from_secs(secs);
68
69            match thread::Builder::new().name("actionqueue-timeout-watchdog".to_string()).spawn(
70                move || {
71                    #[cfg(test)]
72                    ACTIVE_WATCHDOG_THREADS.fetch_add(1, Ordering::SeqCst);
73                    #[cfg(test)]
74                    let _active_watchdog_guard = ActiveWatchdogThreadGuard;
75
76                    let started_at = Instant::now();
77                    loop {
78                        if completion.load(Ordering::SeqCst) {
79                            return;
80                        }
81
82                        let elapsed = started_at.elapsed();
83                        if elapsed >= deadline {
84                            break;
85                        }
86
87                        let remaining = deadline.saturating_sub(elapsed);
88                        thread::sleep(remaining.min(poll_interval));
89                    }
90
91                    if !completion.load(Ordering::SeqCst) {
92                        context.cancel();
93                        requested.store(true, Ordering::SeqCst);
94                    }
95                },
96            ) {
97                Ok(handle) => Some(handle),
98                Err(_) => {
99                    watchdog_spawn_failed = true;
100                    None
101                }
102            }
103        });
104
105        Self { operation_completed, cancellation_requested, handle, watchdog_spawn_failed }
106    }
107
108    fn mark_operation_completed(&self) {
109        self.operation_completed.store(true, Ordering::SeqCst);
110    }
111
112    fn join_handle(&mut self) {
113        if let Some(handle) = self.handle.take() {
114            if let Err(panic_payload) = handle.join() {
115                if std::thread::panicking() {
116                    std::process::abort();
117                }
118                std::panic::resume_unwind(panic_payload);
119            }
120        }
121    }
122
123    fn finish(mut self) -> WatchdogOutcome {
124        let watchdog_started = self.handle.is_some();
125        let spawn_failed = self.watchdog_spawn_failed;
126        self.mark_operation_completed();
127        self.join_handle();
128
129        WatchdogOutcome {
130            cancellation_requested: self.cancellation_requested.load(Ordering::SeqCst),
131            watchdog_joined: watchdog_started,
132            watchdog_spawn_failed: spawn_failed,
133        }
134    }
135}
136
137impl Drop for WatchdogLifecycle {
138    fn drop(&mut self) {
139        self.mark_operation_completed();
140        self.join_handle();
141    }
142}
143
144/// Inspectable reason codes for timeout classification outcomes.
145#[derive(Debug, Clone, Copy, PartialEq, Eq)]
146pub enum TimeoutReasonCode {
147    /// The execution exceeded the configured timeout deadline.
148    DeadlineExceeded,
149    /// No timeout was configured for the execution.
150    NoTimeoutConfigured,
151    /// A timeout was configured and the execution completed within the limit.
152    WithinLimit,
153}
154
155/// Explicit timeout failure data.
156#[derive(Debug, Clone, PartialEq, Eq)]
157pub struct TimeoutFailure {
158    /// Configured timeout in seconds.
159    pub timeout_secs: u64,
160    /// Measured elapsed execution duration.
161    pub elapsed: Duration,
162    /// Stable reason code for timeout inspection.
163    pub reason_code: TimeoutReasonCode,
164}
165
166/// Classification of attempt execution relative to timeout constraints.
167#[derive(Debug, Clone, PartialEq, Eq)]
168#[must_use]
169pub enum TimeoutClassification {
170    /// No timeout was configured.
171    NotConfigured {
172        /// Measured elapsed execution duration.
173        elapsed: Duration,
174        /// Stable reason code for inspectability.
175        reason_code: TimeoutReasonCode,
176    },
177    /// Timeout was configured and execution completed in time.
178    CompletedInTime {
179        /// Configured timeout in seconds.
180        timeout_secs: u64,
181        /// Measured elapsed execution duration.
182        elapsed: Duration,
183        /// Stable reason code for inspectability.
184        reason_code: TimeoutReasonCode,
185    },
186    /// Timeout was configured and execution exceeded the deadline.
187    TimedOut(TimeoutFailure),
188}
189
190impl TimeoutClassification {
191    /// Returns true when classification indicates a timeout failure.
192    pub fn is_timed_out(&self) -> bool {
193        matches!(self, TimeoutClassification::TimedOut(_))
194    }
195}
196
197/// Output of a guarded execution.
198#[derive(Debug, Clone, PartialEq, Eq)]
199#[must_use = "execution result should be inspected for timeout classification"]
200pub struct GuardedExecution<T> {
201    /// Return value produced by the wrapped operation.
202    pub value: T,
203    /// Measured elapsed execution duration.
204    pub elapsed: Duration,
205    /// Explicit timeout classification for the execution.
206    pub timeout: TimeoutClassification,
207}
208
209/// Output of a guarded execution with cancellation flag.
210#[derive(Debug, Clone, PartialEq, Eq)]
211#[must_use = "execution result should be inspected for timeout classification"]
212pub struct CancellableExecution<T> {
213    /// Return value produced by the wrapped operation.
214    pub value: T,
215    /// Measured elapsed execution duration.
216    pub elapsed: Duration,
217    /// Explicit timeout classification for the execution.
218    pub timeout: TimeoutClassification,
219    /// True if deadline signaling requested cancellation while operation was still running.
220    pub cancel_requested: bool,
221    /// True if any handler poll observed cancellation while running.
222    pub cancellation_observed: bool,
223    /// Latency between the first cancellation request and first observed cancellation poll.
224    pub cancellation_observation_latency: Option<Duration>,
225    /// True if a watchdog worker was started and deterministically joined.
226    pub watchdog_joined: bool,
227    /// True if the watchdog thread failed to spawn (graceful degradation).
228    pub watchdog_spawn_failed: bool,
229}
230
231/// Monotonic clock abstraction used by timeout classification.
232pub trait TimeoutClock {
233    /// Opaque mark captured before execution begins.
234    type Mark: Copy;
235
236    /// Captures the current mark.
237    fn mark_now(&self) -> Self::Mark;
238
239    /// Returns elapsed duration since a previously captured mark.
240    fn elapsed_since(&self, mark: Self::Mark) -> Duration;
241}
242
243/// System clock implementation for timeout classification using [`Instant`].
244#[derive(Debug, Clone, Copy, Default)]
245pub struct SystemTimeoutClock;
246
247impl TimeoutClock for SystemTimeoutClock {
248    type Mark = Instant;
249
250    fn mark_now(&self) -> Self::Mark {
251        Instant::now()
252    }
253
254    fn elapsed_since(&self, mark: Self::Mark) -> Duration {
255        mark.elapsed()
256    }
257}
258
259/// Pure timeout classifier for observability.
260///
261/// The [`TimeoutClassifier`] provides side-effect free timeout classification
262/// based on elapsed execution time. It is used to inspect execution duration
263/// relative to configured timeouts without performing any enforcement actions.
264///
265/// # Invariants
266///
267/// - Classification is purely observational - no side effects or state changes.
268/// - Given `(timeout_secs, elapsed)`, classification is deterministic.
269#[derive(Debug, Clone)]
270pub struct TimeoutClassifier<C = SystemTimeoutClock> {
271    clock: C,
272}
273
274impl TimeoutClassifier<SystemTimeoutClock> {
275    /// Creates a timeout classifier using the default system clock.
276    pub fn new() -> Self {
277        Self { clock: SystemTimeoutClock }
278    }
279}
280
281impl Default for TimeoutClassifier<SystemTimeoutClock> {
282    fn default() -> Self {
283        Self::new()
284    }
285}
286
287impl<C> TimeoutClassifier<C>
288where
289    C: TimeoutClock,
290{
291    /// Creates a timeout classifier with an explicit clock implementation.
292    pub fn with_clock(clock: C) -> Self {
293        Self { clock }
294    }
295
296    /// Classifies elapsed execution relative to a configured timeout.
297    ///
298    /// This is a pure, side-effect free operation that only inspects
299    /// elapsed time and returns a deterministic classification.
300    pub fn classify(&self, timeout_secs: Option<u64>, elapsed: Duration) -> TimeoutClassification {
301        classify_timeout(timeout_secs, elapsed)
302    }
303
304    /// Executes `operation`, captures elapsed time, and classifies timeout outcome.
305    ///
306    /// The timeout decision is deterministic for a given `(timeout_secs, elapsed)`.
307    /// This method wraps the operation with timing but performs no enforcement.
308    pub fn execute_and_classify<T, F>(
309        &self,
310        timeout_secs: Option<u64>,
311        operation: F,
312    ) -> GuardedExecution<T>
313    where
314        F: FnOnce() -> T,
315    {
316        let mark = self.clock.mark_now();
317        let value = operation();
318        let elapsed = self.clock.elapsed_since(mark);
319        let timeout = classify_timeout(timeout_secs, elapsed);
320
321        GuardedExecution { value, elapsed, timeout }
322    }
323}
324
325/// Deterministic timeout guard wrapper with enforcement.
326///
327/// The [`TimeoutGuard`] extends [`TimeoutClassifier`] with active timeout enforcement
328/// via cancellation signaling when a timeout occurs.
329#[derive(Debug, Clone)]
330pub struct TimeoutGuard<C = SystemTimeoutClock> {
331    classifier: TimeoutClassifier<C>,
332    poll_interval: Duration,
333}
334
335impl TimeoutGuard<SystemTimeoutClock> {
336    /// Creates a timeout guard using the default system clock.
337    pub fn new() -> Self {
338        Self { classifier: TimeoutClassifier::new(), poll_interval: DEFAULT_WATCHDOG_POLL_INTERVAL }
339    }
340}
341
342impl Default for TimeoutGuard<SystemTimeoutClock> {
343    fn default() -> Self {
344        Self::new()
345    }
346}
347
348impl<C> TimeoutGuard<C>
349where
350    C: TimeoutClock,
351{
352    /// Creates a timeout guard with an explicit clock implementation.
353    pub fn with_clock(clock: C) -> Self {
354        Self {
355            classifier: TimeoutClassifier::with_clock(clock),
356            poll_interval: DEFAULT_WATCHDOG_POLL_INTERVAL,
357        }
358    }
359
360    /// Sets the watchdog poll interval. Default is 1ms.
361    pub fn with_poll_interval(mut self, interval: Duration) -> Self {
362        self.poll_interval = interval;
363        self
364    }
365
366    /// Executes `operation`, captures elapsed time, and classifies timeout outcome.
367    ///
368    /// The timeout decision is deterministic for a given `(timeout_secs, elapsed)`.
369    /// This method is provided for backward compatibility; use [`TimeoutClassifier::execute_and_classify`]
370    /// when only classification is needed without any enforcement semantics.
371    pub fn execute<T, F>(&self, timeout_secs: Option<u64>, operation: F) -> GuardedExecution<T>
372    where
373        F: FnOnce() -> T,
374    {
375        self.classifier.execute_and_classify(timeout_secs, operation)
376    }
377
378    /// Executes `operation` with cancellation support, capturing elapsed time and classifying timeout.
379    ///
380    /// When a timeout is configured, this method starts an attempt-local watchdog worker.
381    /// The watchdog sleeps until the timeout deadline and requests cancellation if the
382    /// operation has not completed yet. The watchdog is deterministically joined before
383    /// returning from this method.
384    ///
385    /// Timeout classification remains inspectable output, but enforcement does not depend
386    /// on post-return classification. Enforcement (cancellation request) can happen during
387    /// active handler execution.
388    ///
389    /// Cleanup invariant: watchdog lifecycle is panic-safe and always joined via scoped
390    /// lifecycle teardown before return or unwind propagation.
391    pub fn execute_with_cancellation<T, F>(
392        &self,
393        timeout_secs: Option<u64>,
394        operation: F,
395    ) -> CancellableExecution<T>
396    where
397        F: FnOnce(&CancellationContext) -> T,
398    {
399        let context = CancellationContext::new();
400        self.execute_with_external_cancellation(timeout_secs, context, operation)
401    }
402
403    /// Executes `operation` with an externally-provided cancellation context.
404    ///
405    /// Identical to [`execute_with_cancellation`](Self::execute_with_cancellation)
406    /// except the caller supplies the [`CancellationContext`]. This allows the
407    /// dispatch loop to retain a clone of the context so it can signal
408    /// cancellation (e.g. budget exhaustion) while the handler is running.
409    pub fn execute_with_external_cancellation<T, F>(
410        &self,
411        timeout_secs: Option<u64>,
412        cancellation_context: CancellationContext,
413        operation: F,
414    ) -> CancellableExecution<T>
415    where
416        F: FnOnce(&CancellationContext) -> T,
417    {
418        let mark = self.classifier.clock.mark_now();
419        let watchdog =
420            WatchdogLifecycle::new(timeout_secs, &cancellation_context, self.poll_interval);
421
422        let value = operation(&cancellation_context);
423        let watchdog_outcome = watchdog.finish();
424
425        let elapsed = self.classifier.clock.elapsed_since(mark);
426        let timeout = self.classifier.classify(timeout_secs, elapsed);
427        let cancel_requested = watchdog_outcome.cancellation_requested;
428        let cancellation_observed = cancellation_context.was_cancellation_observed();
429        let cancellation_observation_latency =
430            cancellation_context.cancellation_observation_latency();
431
432        CancellableExecution {
433            value,
434            elapsed,
435            timeout,
436            cancel_requested,
437            cancellation_observed,
438            cancellation_observation_latency,
439            watchdog_joined: watchdog_outcome.watchdog_joined,
440            watchdog_spawn_failed: watchdog_outcome.watchdog_spawn_failed,
441        }
442    }
443}
444
445/// Deterministically classifies elapsed execution relative to configured timeout.
446///
447/// This is a pure, side-effect free operation suitable for standalone classification
448/// without any enforcement machinery.
449///
450/// # Boundary behavior
451///
452/// When `elapsed == timeout`, the result is [`TimeoutClassification::CompletedInTime`].
453/// Only `elapsed > timeout` yields [`TimeoutClassification::TimedOut`]. Note that the
454/// watchdog thread fires at `elapsed >= deadline`, so cancellation may have been
455/// requested even for executions classified as `CompletedInTime` (a narrow race where
456/// the operation finishes in the same tick the watchdog fires). Callers should inspect
457/// [`CancellableExecution::cancel_requested`] for the authoritative cancellation signal.
458pub fn classify_timeout(timeout_secs: Option<u64>, elapsed: Duration) -> TimeoutClassification {
459    match timeout_secs {
460        None => TimeoutClassification::NotConfigured {
461            elapsed,
462            reason_code: TimeoutReasonCode::NoTimeoutConfigured,
463        },
464        Some(secs) => {
465            let timeout = Duration::from_secs(secs);
466            if elapsed > timeout {
467                TimeoutClassification::TimedOut(TimeoutFailure {
468                    timeout_secs: secs,
469                    elapsed,
470                    reason_code: TimeoutReasonCode::DeadlineExceeded,
471                })
472            } else {
473                TimeoutClassification::CompletedInTime {
474                    timeout_secs: secs,
475                    elapsed,
476                    reason_code: TimeoutReasonCode::WithinLimit,
477                }
478            }
479        }
480    }
481}
482
483#[cfg(test)]
484mod tests {
485    use std::panic::{catch_unwind, AssertUnwindSafe};
486    use std::sync::atomic::{AtomicBool, Ordering};
487    use std::sync::Arc;
488    use std::thread;
489    use std::time::Duration;
490
491    use super::{
492        classify_timeout, TimeoutClassification, TimeoutClassifier, TimeoutClock, TimeoutFailure,
493        TimeoutGuard, TimeoutReasonCode,
494    };
495
496    #[derive(Debug, Clone, Copy)]
497    struct FixedClock {
498        elapsed: Duration,
499    }
500
501    impl TimeoutClock for FixedClock {
502        type Mark = ();
503
504        fn mark_now(&self) -> Self::Mark {}
505
506        fn elapsed_since(&self, _mark: Self::Mark) -> Duration {
507            self.elapsed
508        }
509    }
510
511    #[test]
512    fn classify_timeout_marks_not_configured_when_absent() {
513        let elapsed = Duration::from_secs(3);
514        let outcome = classify_timeout(None, elapsed);
515
516        assert_eq!(
517            outcome,
518            TimeoutClassification::NotConfigured {
519                elapsed,
520                reason_code: TimeoutReasonCode::NoTimeoutConfigured,
521            }
522        );
523        assert!(!outcome.is_timed_out());
524    }
525
526    #[test]
527    fn classify_timeout_marks_in_time_when_elapsed_meets_limit() {
528        let elapsed = Duration::from_secs(5);
529        let outcome = classify_timeout(Some(5), elapsed);
530
531        assert_eq!(
532            outcome,
533            TimeoutClassification::CompletedInTime {
534                timeout_secs: 5,
535                elapsed,
536                reason_code: TimeoutReasonCode::WithinLimit,
537            }
538        );
539        assert!(!outcome.is_timed_out());
540    }
541
542    #[test]
543    fn classify_timeout_marks_timeout_when_elapsed_exceeds_limit() {
544        let elapsed = Duration::from_secs(6);
545        let outcome = classify_timeout(Some(5), elapsed);
546
547        assert_eq!(
548            outcome,
549            TimeoutClassification::TimedOut(TimeoutFailure {
550                timeout_secs: 5,
551                elapsed,
552                reason_code: TimeoutReasonCode::DeadlineExceeded,
553            })
554        );
555        assert!(outcome.is_timed_out());
556    }
557
558    #[test]
559    fn guard_wraps_operation_and_reports_timeout_data() {
560        let guard = TimeoutGuard::with_clock(FixedClock { elapsed: Duration::from_secs(9) });
561
562        let guarded = guard.execute(Some(3), || 42u32);
563
564        assert_eq!(guarded.value, 42);
565        assert_eq!(guarded.elapsed, Duration::from_secs(9));
566        assert_eq!(
567            guarded.timeout,
568            TimeoutClassification::TimedOut(TimeoutFailure {
569                timeout_secs: 3,
570                elapsed: Duration::from_secs(9),
571                reason_code: TimeoutReasonCode::DeadlineExceeded,
572            })
573        );
574    }
575
576    #[test]
577    fn classifier_provides_side_effect_free_classification() {
578        let classifier =
579            TimeoutClassifier::with_clock(FixedClock { elapsed: Duration::from_secs(7) });
580
581        let classification = classifier.classify(Some(5), Duration::from_secs(7));
582
583        assert!(classification.is_timed_out());
584        assert_eq!(
585            classification,
586            TimeoutClassification::TimedOut(TimeoutFailure {
587                timeout_secs: 5,
588                elapsed: Duration::from_secs(7),
589                reason_code: TimeoutReasonCode::DeadlineExceeded,
590            })
591        );
592    }
593
594    #[test]
595    fn classifier_execute_and_classify_wraps_operation() {
596        let classifier =
597            TimeoutClassifier::with_clock(FixedClock { elapsed: Duration::from_secs(4) });
598
599        let result = classifier.execute_and_classify(Some(5), || 42u32);
600
601        assert_eq!(result.value, 42);
602        assert_eq!(result.elapsed, Duration::from_secs(4));
603        assert_eq!(
604            result.timeout,
605            TimeoutClassification::CompletedInTime {
606                timeout_secs: 5,
607                elapsed: Duration::from_secs(4),
608                reason_code: TimeoutReasonCode::WithinLimit,
609            }
610        );
611    }
612
613    #[test]
614    fn execute_with_cancellation_requests_deadline_signal_during_execution() {
615        let guard = TimeoutGuard::new();
616        let saw_cancel = Arc::new(AtomicBool::new(false));
617        let saw_cancel_clone = Arc::clone(&saw_cancel);
618
619        let result = guard.execute_with_cancellation(Some(0), move |context| {
620            for _ in 0..10_000 {
621                if context.token().is_cancelled() {
622                    saw_cancel_clone.store(true, Ordering::SeqCst);
623                    break;
624                }
625                std::hint::spin_loop();
626            }
627            9u8
628        });
629
630        assert_eq!(result.value, 9);
631        assert!(result.cancel_requested);
632        assert!(result.cancellation_observed);
633        assert!(result.cancellation_observation_latency.is_some());
634        assert!(result.watchdog_joined);
635        assert!(saw_cancel.load(Ordering::SeqCst));
636        assert!(result.timeout.is_timed_out());
637    }
638
639    #[test]
640    fn execute_with_cancellation_does_not_start_watchdog_when_timeout_is_disabled() {
641        let guard = TimeoutGuard::new();
642        let result = guard.execute_with_cancellation(None, |context| {
643            assert!(!context.token().is_cancelled());
644            11u8
645        });
646
647        assert_eq!(result.value, 11);
648        assert!(!result.cancel_requested);
649        assert!(!result.cancellation_observed);
650        assert_eq!(result.cancellation_observation_latency, None);
651        assert!(!result.watchdog_joined);
652        assert!(matches!(
653            result.timeout,
654            TimeoutClassification::NotConfigured {
655                reason_code: TimeoutReasonCode::NoTimeoutConfigured,
656                ..
657            }
658        ));
659    }
660
661    #[test]
662    fn d01_t_n4a_operation_panic_still_joins_watchdog_cleanup() {
663        let baseline = super::active_watchdog_threads();
664        let guard = TimeoutGuard::new();
665
666        let panic_result = catch_unwind(AssertUnwindSafe(|| {
667            let _ = guard.execute_with_cancellation(Some(30), |_context| -> u8 {
668                panic!("operation panic for cleanup-path verification");
669            });
670        }));
671
672        assert!(panic_result.is_err());
673
674        for _ in 0..100 {
675            if super::active_watchdog_threads() == baseline {
676                break;
677            }
678            thread::sleep(Duration::from_millis(2));
679        }
680
681        assert_eq!(super::active_watchdog_threads(), baseline);
682    }
683
684    #[test]
685    fn configurable_poll_interval_is_used() {
686        let guard = TimeoutGuard::new().with_poll_interval(Duration::from_millis(10));
687
688        let result = guard.execute_with_cancellation(Some(0), move |context| {
689            // With a 10ms poll interval, the watchdog should still eventually fire
690            for _ in 0..10_000 {
691                if context.token().is_cancelled() {
692                    return true;
693                }
694                std::hint::spin_loop();
695            }
696            false
697        });
698
699        assert!(result.cancel_requested);
700        assert!(result.watchdog_joined);
701        assert!(!result.watchdog_spawn_failed);
702    }
703}