actionqueue_executor_local/handler/
cancellation.rs1use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::OnceLock;
9use std::time::{Duration, Instant};
10
11#[derive(Debug, Clone)]
17#[must_use = "cancellation token should be passed to handler for cooperative timeout"]
18pub struct CancellationToken {
19 cancelled: std::sync::Arc<AtomicBool>,
21 observed_cancelled: std::sync::Arc<AtomicBool>,
23 cancellation_requested_at: std::sync::Arc<OnceLock<Instant>>,
25 cancellation_observed_at: std::sync::Arc<OnceLock<Instant>>,
27}
28
29impl CancellationToken {
30 pub fn new() -> Self {
32 Self {
33 cancelled: std::sync::Arc::new(AtomicBool::new(false)),
34 observed_cancelled: std::sync::Arc::new(AtomicBool::new(false)),
35 cancellation_requested_at: std::sync::Arc::new(OnceLock::new()),
36 cancellation_observed_at: std::sync::Arc::new(OnceLock::new()),
37 }
38 }
39
40 pub fn is_cancelled(&self) -> bool {
42 let cancelled = self.cancelled.load(Ordering::SeqCst);
43 if cancelled {
44 self.observed_cancelled.store(true, Ordering::SeqCst);
45 let _ = self.cancellation_observed_at.get_or_init(Instant::now);
46 }
47 cancelled
48 }
49
50 pub fn cancel(&self) {
55 let _ = self.cancellation_requested_at.get_or_init(Instant::now);
56 self.cancelled.store(true, Ordering::SeqCst);
57 }
58
59 pub fn was_cancellation_observed(&self) -> bool {
61 self.observed_cancelled.load(Ordering::SeqCst)
62 }
63
64 pub fn cancelled_flag(&self) -> std::sync::Arc<AtomicBool> {
70 std::sync::Arc::clone(&self.cancelled)
71 }
72
73 pub fn cancellation_observation_latency(&self) -> Option<Duration> {
75 let requested = self.cancellation_requested_at.get().copied()?;
76 let observed = self.cancellation_observed_at.get().copied()?;
77 Some(observed.saturating_duration_since(requested))
78 }
79}
80
81impl Default for CancellationToken {
82 fn default() -> Self {
83 Self::new()
84 }
85}
86
87#[derive(Debug, Clone)]
92pub struct CancellationContext {
93 token: CancellationToken,
94}
95
96impl CancellationContext {
97 pub fn new() -> Self {
99 Self { token: CancellationToken::new() }
100 }
101
102 pub fn token(&self) -> &CancellationToken {
104 &self.token
105 }
106
107 pub fn cancel(&self) {
109 self.token.cancel();
110 }
111
112 pub fn was_cancellation_observed(&self) -> bool {
114 self.token.was_cancellation_observed()
115 }
116
117 pub fn cancellation_observation_latency(&self) -> Option<Duration> {
119 self.token.cancellation_observation_latency()
120 }
121}
122
123impl Default for CancellationContext {
124 fn default() -> Self {
125 Self::new()
126 }
127}
128
129#[cfg(test)]
130mod tests {
131 use super::*;
132
133 #[test]
134 fn cancellation_token_defaults_to_not_cancelled() {
135 let token = CancellationToken::new();
136 assert!(!token.is_cancelled());
137 }
138
139 #[test]
140 fn cancellation_token_can_be_cancelled() {
141 let token = CancellationToken::new();
142 token.cancel();
143 assert!(token.is_cancelled());
144 }
145
146 #[test]
147 fn cancellation_observation_is_recorded_when_polled_after_cancel() {
148 let token = CancellationToken::new();
149 assert!(!token.was_cancellation_observed());
150 assert_eq!(token.cancellation_observation_latency(), None);
151
152 token.cancel();
153 assert!(token.is_cancelled());
154 assert!(token.was_cancellation_observed());
155 assert!(token.cancellation_observation_latency().is_some());
156 }
157
158 #[test]
159 fn cancellation_is_idempotent() {
160 let token = CancellationToken::new();
161 token.cancel();
162 token.cancel();
163 assert!(token.is_cancelled());
164 }
165
166 #[test]
167 fn cancellation_context_creates_fresh_token() {
168 let context = CancellationContext::new();
169 assert!(!context.token().is_cancelled());
170 }
171
172 #[test]
173 fn cancellation_context_can_cancel() {
174 let context = CancellationContext::new();
175 context.cancel();
176 assert!(context.token().is_cancelled());
177 }
178
179 #[test]
180 fn cancellation_context_reports_observation_state() {
181 let context = CancellationContext::new();
182 assert!(!context.was_cancellation_observed());
183 assert_eq!(context.cancellation_observation_latency(), None);
184
185 context.cancel();
186 assert!(context.token().is_cancelled());
187 assert!(context.was_cancellation_observed());
188 assert!(context.cancellation_observation_latency().is_some());
189 }
190}