actionqueue_executor_local/handler/
cancellation.rs1use std::sync::atomic::{AtomicBool, Ordering};
8use std::sync::OnceLock;
9use std::time::{Duration, Instant};
10
11#[derive(Debug, Clone)]
17#[must_use = "cancellation token should be passed to handler for cooperative timeout"]
18pub struct CancellationToken {
19 cancelled: std::sync::Arc<AtomicBool>,
21 observed_cancelled: std::sync::Arc<AtomicBool>,
23 cancellation_requested_at: std::sync::Arc<OnceLock<Instant>>,
25 cancellation_observed_at: std::sync::Arc<OnceLock<Instant>>,
27}
28
29impl CancellationToken {
30 pub fn new() -> Self {
32 Self {
33 cancelled: std::sync::Arc::new(AtomicBool::new(false)),
34 observed_cancelled: std::sync::Arc::new(AtomicBool::new(false)),
35 cancellation_requested_at: std::sync::Arc::new(OnceLock::new()),
36 cancellation_observed_at: std::sync::Arc::new(OnceLock::new()),
37 }
38 }
39
40 pub fn is_cancelled(&self) -> bool {
42 let cancelled = self.cancelled.load(Ordering::SeqCst);
43 if cancelled {
44 self.observed_cancelled.store(true, Ordering::SeqCst);
45 let _ = self.cancellation_observed_at.get_or_init(Instant::now);
46 }
47 cancelled
48 }
49
50 pub fn cancel(&self) {
55 let _ = self.cancellation_requested_at.get_or_init(Instant::now);
56 self.cancelled.store(true, Ordering::SeqCst);
57 }
58
59 pub fn was_cancellation_observed(&self) -> bool {
61 self.observed_cancelled.load(Ordering::SeqCst)
62 }
63
64 pub fn cancellation_observation_latency(&self) -> Option<Duration> {
66 let requested = self.cancellation_requested_at.get().copied()?;
67 let observed = self.cancellation_observed_at.get().copied()?;
68 Some(observed.saturating_duration_since(requested))
69 }
70}
71
72impl Default for CancellationToken {
73 fn default() -> Self {
74 Self::new()
75 }
76}
77
78#[derive(Debug, Clone)]
83pub struct CancellationContext {
84 token: CancellationToken,
85}
86
87impl CancellationContext {
88 pub fn new() -> Self {
90 Self { token: CancellationToken::new() }
91 }
92
93 pub fn token(&self) -> &CancellationToken {
95 &self.token
96 }
97
98 pub fn cancel(&self) {
100 self.token.cancel();
101 }
102
103 pub fn was_cancellation_observed(&self) -> bool {
105 self.token.was_cancellation_observed()
106 }
107
108 pub fn cancellation_observation_latency(&self) -> Option<Duration> {
110 self.token.cancellation_observation_latency()
111 }
112}
113
114impl Default for CancellationContext {
115 fn default() -> Self {
116 Self::new()
117 }
118}
119
120#[cfg(test)]
121mod tests {
122 use super::*;
123
124 #[test]
125 fn cancellation_token_defaults_to_not_cancelled() {
126 let token = CancellationToken::new();
127 assert!(!token.is_cancelled());
128 }
129
130 #[test]
131 fn cancellation_token_can_be_cancelled() {
132 let token = CancellationToken::new();
133 token.cancel();
134 assert!(token.is_cancelled());
135 }
136
137 #[test]
138 fn cancellation_observation_is_recorded_when_polled_after_cancel() {
139 let token = CancellationToken::new();
140 assert!(!token.was_cancellation_observed());
141 assert_eq!(token.cancellation_observation_latency(), None);
142
143 token.cancel();
144 assert!(token.is_cancelled());
145 assert!(token.was_cancellation_observed());
146 assert!(token.cancellation_observation_latency().is_some());
147 }
148
149 #[test]
150 fn cancellation_is_idempotent() {
151 let token = CancellationToken::new();
152 token.cancel();
153 token.cancel();
154 assert!(token.is_cancelled());
155 }
156
157 #[test]
158 fn cancellation_context_creates_fresh_token() {
159 let context = CancellationContext::new();
160 assert!(!context.token().is_cancelled());
161 }
162
163 #[test]
164 fn cancellation_context_can_cancel() {
165 let context = CancellationContext::new();
166 context.cancel();
167 assert!(context.token().is_cancelled());
168 }
169
170 #[test]
171 fn cancellation_context_reports_observation_state() {
172 let context = CancellationContext::new();
173 assert!(!context.was_cancellation_observed());
174 assert_eq!(context.cancellation_observation_latency(), None);
175
176 context.cancel();
177 assert!(context.token().is_cancelled());
178 assert!(context.was_cancellation_observed());
179 assert!(context.cancellation_observation_latency().is_some());
180 }
181}