Skip to main content

actionqueue_executor_local/handler/
cancellation.rs

1//! Cancellation mechanism for attempt execution.
2//!
3//! This module provides cooperative cancellation support for attempt execution.
4//! A cancellation token can be checked by handlers to determine if execution
5//! should be terminated early.
6
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::OnceLock;
9use std::time::{Duration, Instant};
10
11/// A token that can be used to signal cancellation to an ongoing operation.
12///
13/// The token itself is immutable, but can be used to check if cancellation
14/// has been requested. When a cancellation is requested, all handlers
15/// checking this token should terminate early.
16#[derive(Debug, Clone)]
17#[must_use = "cancellation token should be passed to handler for cooperative timeout"]
18pub struct CancellationToken {
19    /// Internal flag indicating if cancellation has been requested.
20    cancelled: std::sync::Arc<AtomicBool>,
21    /// Internal flag indicating whether any handler poll observed cancellation.
22    observed_cancelled: std::sync::Arc<AtomicBool>,
23    /// Timestamp of the first cancellation request.
24    cancellation_requested_at: std::sync::Arc<OnceLock<Instant>>,
25    /// Timestamp of the first poll that observed cancellation.
26    cancellation_observed_at: std::sync::Arc<OnceLock<Instant>>,
27}
28
29impl CancellationToken {
30    /// Creates a new cancellable token that is not yet cancelled.
31    pub fn new() -> Self {
32        Self {
33            cancelled: std::sync::Arc::new(AtomicBool::new(false)),
34            observed_cancelled: std::sync::Arc::new(AtomicBool::new(false)),
35            cancellation_requested_at: std::sync::Arc::new(OnceLock::new()),
36            cancellation_observed_at: std::sync::Arc::new(OnceLock::new()),
37        }
38    }
39
40    /// Returns `true` if cancellation has been requested.
41    pub fn is_cancelled(&self) -> bool {
42        let cancelled = self.cancelled.load(Ordering::SeqCst);
43        if cancelled {
44            self.observed_cancelled.store(true, Ordering::SeqCst);
45            let _ = self.cancellation_observed_at.get_or_init(Instant::now);
46        }
47        cancelled
48    }
49
50    /// Requests cancellation for this token.
51    ///
52    /// This is idempotent - calling it multiple times has the same effect
53    /// as calling it once.
54    pub fn cancel(&self) {
55        let _ = self.cancellation_requested_at.get_or_init(Instant::now);
56        self.cancelled.store(true, Ordering::SeqCst);
57    }
58
59    /// Returns `true` once any poll has observed cancellation as requested.
60    pub fn was_cancellation_observed(&self) -> bool {
61        self.observed_cancelled.load(Ordering::SeqCst)
62    }
63
64    /// Returns the first observed cancellation-poll latency from request to observation.
65    pub fn cancellation_observation_latency(&self) -> Option<Duration> {
66        let requested = self.cancellation_requested_at.get().copied()?;
67        let observed = self.cancellation_observed_at.get().copied()?;
68        Some(observed.saturating_duration_since(requested))
69    }
70}
71
72impl Default for CancellationToken {
73    fn default() -> Self {
74        Self::new()
75    }
76}
77
78/// A context that provides access to a cancellation token.
79///
80/// This is passed to handlers to allow them to check for cancellation
81/// during execution.
82#[derive(Debug, Clone)]
83pub struct CancellationContext {
84    token: CancellationToken,
85}
86
87impl CancellationContext {
88    /// Creates a new cancellation context with a fresh token.
89    pub fn new() -> Self {
90        Self { token: CancellationToken::new() }
91    }
92
93    /// Returns the cancellation token for this context.
94    pub fn token(&self) -> &CancellationToken {
95        &self.token
96    }
97
98    /// Requests cancellation for this context's token.
99    pub fn cancel(&self) {
100        self.token.cancel();
101    }
102
103    /// Returns `true` once any handler poll has observed cancellation.
104    pub fn was_cancellation_observed(&self) -> bool {
105        self.token.was_cancellation_observed()
106    }
107
108    /// Returns the first observed cancellation-poll latency from request to observation.
109    pub fn cancellation_observation_latency(&self) -> Option<Duration> {
110        self.token.cancellation_observation_latency()
111    }
112}
113
114impl Default for CancellationContext {
115    fn default() -> Self {
116        Self::new()
117    }
118}
119
120#[cfg(test)]
121mod tests {
122    use super::*;
123
124    #[test]
125    fn cancellation_token_defaults_to_not_cancelled() {
126        let token = CancellationToken::new();
127        assert!(!token.is_cancelled());
128    }
129
130    #[test]
131    fn cancellation_token_can_be_cancelled() {
132        let token = CancellationToken::new();
133        token.cancel();
134        assert!(token.is_cancelled());
135    }
136
137    #[test]
138    fn cancellation_observation_is_recorded_when_polled_after_cancel() {
139        let token = CancellationToken::new();
140        assert!(!token.was_cancellation_observed());
141        assert_eq!(token.cancellation_observation_latency(), None);
142
143        token.cancel();
144        assert!(token.is_cancelled());
145        assert!(token.was_cancellation_observed());
146        assert!(token.cancellation_observation_latency().is_some());
147    }
148
149    #[test]
150    fn cancellation_is_idempotent() {
151        let token = CancellationToken::new();
152        token.cancel();
153        token.cancel();
154        assert!(token.is_cancelled());
155    }
156
157    #[test]
158    fn cancellation_context_creates_fresh_token() {
159        let context = CancellationContext::new();
160        assert!(!context.token().is_cancelled());
161    }
162
163    #[test]
164    fn cancellation_context_can_cancel() {
165        let context = CancellationContext::new();
166        context.cancel();
167        assert!(context.token().is_cancelled());
168    }
169
170    #[test]
171    fn cancellation_context_reports_observation_state() {
172        let context = CancellationContext::new();
173        assert!(!context.was_cancellation_observed());
174        assert_eq!(context.cancellation_observation_latency(), None);
175
176        context.cancel();
177        assert!(context.token().is_cancelled());
178        assert!(context.was_cancellation_observed());
179        assert!(context.cancellation_observation_latency().is_some());
180    }
181}