Skip to main content

actionqueue_executor_local/handler/
cancellation.rs

1//! Cancellation mechanism for attempt execution.
2//!
3//! This module provides cooperative cancellation support for attempt execution.
4//! A cancellation token can be checked by handlers to determine if execution
5//! should be terminated early.
6
7use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::OnceLock;
9use std::time::{Duration, Instant};
10
11/// A token that can be used to signal cancellation to an ongoing operation.
12///
13/// The token itself is immutable, but can be used to check if cancellation
14/// has been requested. When a cancellation is requested, all handlers
15/// checking this token should terminate early.
16#[derive(Debug, Clone)]
17#[must_use = "cancellation token should be passed to handler for cooperative timeout"]
18pub struct CancellationToken {
19    /// Internal flag indicating if cancellation has been requested.
20    cancelled: std::sync::Arc<AtomicBool>,
21    /// Internal flag indicating whether any handler poll observed cancellation.
22    observed_cancelled: std::sync::Arc<AtomicBool>,
23    /// Timestamp of the first cancellation request.
24    cancellation_requested_at: std::sync::Arc<OnceLock<Instant>>,
25    /// Timestamp of the first poll that observed cancellation.
26    cancellation_observed_at: std::sync::Arc<OnceLock<Instant>>,
27}
28
29impl CancellationToken {
30    /// Creates a new cancellable token that is not yet cancelled.
31    pub fn new() -> Self {
32        Self {
33            cancelled: std::sync::Arc::new(AtomicBool::new(false)),
34            observed_cancelled: std::sync::Arc::new(AtomicBool::new(false)),
35            cancellation_requested_at: std::sync::Arc::new(OnceLock::new()),
36            cancellation_observed_at: std::sync::Arc::new(OnceLock::new()),
37        }
38    }
39
40    /// Returns `true` if cancellation has been requested.
41    pub fn is_cancelled(&self) -> bool {
42        let cancelled = self.cancelled.load(Ordering::SeqCst);
43        if cancelled {
44            self.observed_cancelled.store(true, Ordering::SeqCst);
45            let _ = self.cancellation_observed_at.get_or_init(Instant::now);
46        }
47        cancelled
48    }
49
50    /// Requests cancellation for this token.
51    ///
52    /// This is idempotent - calling it multiple times has the same effect
53    /// as calling it once.
54    pub fn cancel(&self) {
55        let _ = self.cancellation_requested_at.get_or_init(Instant::now);
56        self.cancelled.store(true, Ordering::SeqCst);
57    }
58
59    /// Returns `true` once any poll has observed cancellation as requested.
60    pub fn was_cancellation_observed(&self) -> bool {
61        self.observed_cancelled.load(Ordering::SeqCst)
62    }
63
64    /// Returns a clone of the inner cancellation flag.
65    ///
66    /// This allows external systems (e.g. Universal Interface connectors) to
67    /// share the same `AtomicBool` via their own cancellation token type,
68    /// providing zero-overhead live propagation of cancellation signals.
69    pub fn cancelled_flag(&self) -> std::sync::Arc<AtomicBool> {
70        std::sync::Arc::clone(&self.cancelled)
71    }
72
73    /// Returns the first observed cancellation-poll latency from request to observation.
74    pub fn cancellation_observation_latency(&self) -> Option<Duration> {
75        let requested = self.cancellation_requested_at.get().copied()?;
76        let observed = self.cancellation_observed_at.get().copied()?;
77        Some(observed.saturating_duration_since(requested))
78    }
79}
80
81impl Default for CancellationToken {
82    fn default() -> Self {
83        Self::new()
84    }
85}
86
87/// A context that provides access to a cancellation token.
88///
89/// This is passed to handlers to allow them to check for cancellation
90/// during execution.
91#[derive(Debug, Clone)]
92pub struct CancellationContext {
93    token: CancellationToken,
94}
95
96impl CancellationContext {
97    /// Creates a new cancellation context with a fresh token.
98    pub fn new() -> Self {
99        Self { token: CancellationToken::new() }
100    }
101
102    /// Returns the cancellation token for this context.
103    pub fn token(&self) -> &CancellationToken {
104        &self.token
105    }
106
107    /// Requests cancellation for this context's token.
108    pub fn cancel(&self) {
109        self.token.cancel();
110    }
111
112    /// Returns `true` once any handler poll has observed cancellation.
113    pub fn was_cancellation_observed(&self) -> bool {
114        self.token.was_cancellation_observed()
115    }
116
117    /// Returns the first observed cancellation-poll latency from request to observation.
118    pub fn cancellation_observation_latency(&self) -> Option<Duration> {
119        self.token.cancellation_observation_latency()
120    }
121}
122
123impl Default for CancellationContext {
124    fn default() -> Self {
125        Self::new()
126    }
127}
128
129#[cfg(test)]
130mod tests {
131    use super::*;
132
133    #[test]
134    fn cancellation_token_defaults_to_not_cancelled() {
135        let token = CancellationToken::new();
136        assert!(!token.is_cancelled());
137    }
138
139    #[test]
140    fn cancellation_token_can_be_cancelled() {
141        let token = CancellationToken::new();
142        token.cancel();
143        assert!(token.is_cancelled());
144    }
145
146    #[test]
147    fn cancellation_observation_is_recorded_when_polled_after_cancel() {
148        let token = CancellationToken::new();
149        assert!(!token.was_cancellation_observed());
150        assert_eq!(token.cancellation_observation_latency(), None);
151
152        token.cancel();
153        assert!(token.is_cancelled());
154        assert!(token.was_cancellation_observed());
155        assert!(token.cancellation_observation_latency().is_some());
156    }
157
158    #[test]
159    fn cancellation_is_idempotent() {
160        let token = CancellationToken::new();
161        token.cancel();
162        token.cancel();
163        assert!(token.is_cancelled());
164    }
165
166    #[test]
167    fn cancellation_context_creates_fresh_token() {
168        let context = CancellationContext::new();
169        assert!(!context.token().is_cancelled());
170    }
171
172    #[test]
173    fn cancellation_context_can_cancel() {
174        let context = CancellationContext::new();
175        context.cancel();
176        assert!(context.token().is_cancelled());
177    }
178
179    #[test]
180    fn cancellation_context_reports_observation_state() {
181        let context = CancellationContext::new();
182        assert!(!context.was_cancellation_observed());
183        assert_eq!(context.cancellation_observation_latency(), None);
184
185        context.cancel();
186        assert!(context.token().is_cancelled());
187        assert!(context.was_cancellation_observed());
188        assert!(context.cancellation_observation_latency().is_some());
189    }
190}