1use std::num::NonZeroU32;
25use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
26
27use pureflow_types::{ExecutionId, NodeId, WorkflowId};
28
29#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
31pub struct ExecutionAttempt(NonZeroU32);
32
33impl ExecutionAttempt {
34 #[must_use]
36 pub const fn new(value: NonZeroU32) -> Self {
37 Self(value)
38 }
39
40 #[must_use]
42 pub const fn first() -> Self {
43 Self(NonZeroU32::MIN)
44 }
45
46 #[must_use]
48 pub const fn get(self) -> u32 {
49 self.0.get()
50 }
51}
52
53#[derive(Debug, Clone, PartialEq, Eq)]
55pub struct ExecutionMetadata {
56 execution_id: ExecutionId,
57 attempt: ExecutionAttempt,
58}
59
60impl ExecutionMetadata {
61 #[must_use]
63 pub const fn new(execution_id: ExecutionId, attempt: ExecutionAttempt) -> Self {
64 Self {
65 execution_id,
66 attempt,
67 }
68 }
69
70 #[must_use]
72 pub const fn first_attempt(execution_id: ExecutionId) -> Self {
73 Self::new(execution_id, ExecutionAttempt::first())
74 }
75
76 #[must_use]
78 pub const fn execution_id(&self) -> &ExecutionId {
79 &self.execution_id
80 }
81
82 #[must_use]
84 pub const fn attempt(&self) -> ExecutionAttempt {
85 self.attempt
86 }
87}
88
89#[derive(Debug, Clone, PartialEq, Eq)]
91pub struct CancellationRequest {
92 reason: String,
93}
94
95impl CancellationRequest {
96 #[must_use]
98 pub fn new(reason: impl Into<String>) -> Self {
99 Self {
100 reason: reason.into(),
101 }
102 }
103
104 #[must_use]
106 pub fn reason(&self) -> &str {
107 &self.reason
108 }
109}
110
111#[derive(Debug, Clone, PartialEq, Eq)]
113pub enum CancellationState {
114 Active,
116 Requested(CancellationRequest),
118}
119
120impl CancellationState {
121 #[must_use]
123 pub const fn is_requested(&self) -> bool {
124 matches!(self, Self::Requested(_))
125 }
126}
127
128#[derive(Debug, Default)]
129struct CancellationSignal {
130 request: Mutex<Option<CancellationRequest>>,
131}
132
133#[derive(Debug, Clone)]
135pub struct CancellationToken {
136 signal: Arc<CancellationSignal>,
137}
138
139impl CancellationToken {
140 #[must_use]
142 pub fn active() -> Self {
143 Self {
144 signal: Arc::new(CancellationSignal::default()),
145 }
146 }
147
148 #[must_use]
150 pub fn cancelled(request: CancellationRequest) -> Self {
151 let token: Self = Self::active();
152 let _first_request: bool = token.request_cancellation(request);
153 token
154 }
155
156 #[must_use]
158 pub fn request(&self) -> Option<CancellationRequest> {
159 self.signal
160 .request
161 .lock()
162 .unwrap_or_else(PoisonError::into_inner)
163 .clone()
164 }
165
166 #[must_use]
168 pub fn state(&self) -> CancellationState {
169 self.request()
170 .map_or(CancellationState::Active, |request: CancellationRequest| {
171 CancellationState::Requested(request)
172 })
173 }
174
175 #[must_use]
177 pub fn is_cancelled(&self) -> bool {
178 self.request().is_some()
179 }
180
181 fn request_cancellation(&self, request: CancellationRequest) -> bool {
182 let mut guard: MutexGuard<'_, Option<CancellationRequest>> = self
183 .signal
184 .request
185 .lock()
186 .unwrap_or_else(PoisonError::into_inner);
187
188 if guard.is_some() {
189 return false;
190 }
191
192 *guard = Some(request);
193 true
194 }
195}
196
197impl Default for CancellationToken {
198 fn default() -> Self {
199 Self::active()
200 }
201}
202
203impl PartialEq for CancellationToken {
204 fn eq(&self, other: &Self) -> bool {
205 self.request() == other.request()
206 }
207}
208
209impl Eq for CancellationToken {}
210
211#[derive(Debug, Clone, PartialEq, Eq)]
213pub struct CancellationHandle {
214 token: CancellationToken,
215}
216
217impl CancellationHandle {
218 #[must_use]
220 pub fn new() -> Self {
221 Self {
222 token: CancellationToken::active(),
223 }
224 }
225
226 #[must_use]
228 pub fn token(&self) -> CancellationToken {
229 self.token.clone()
230 }
231
232 #[must_use]
236 pub fn cancel(&self, request: CancellationRequest) -> bool {
237 self.token.request_cancellation(request)
238 }
239
240 #[must_use]
242 pub fn is_cancelled(&self) -> bool {
243 self.token.is_cancelled()
244 }
245
246 #[must_use]
248 pub fn request(&self) -> Option<CancellationRequest> {
249 self.token.request()
250 }
251}
252
253impl Default for CancellationHandle {
254 fn default() -> Self {
255 Self::new()
256 }
257}
258
259#[derive(Debug, Clone, PartialEq, Eq)]
261pub struct NodeContext {
262 workflow_id: WorkflowId,
263 node_id: NodeId,
264 execution: ExecutionMetadata,
265 cancellation: CancellationToken,
266}
267
268impl NodeContext {
269 #[must_use]
271 pub fn new(workflow_id: WorkflowId, node_id: NodeId, execution: ExecutionMetadata) -> Self {
272 Self {
273 workflow_id,
274 node_id,
275 execution,
276 cancellation: CancellationToken::active(),
277 }
278 }
279
280 #[must_use]
282 pub fn with_cancellation(mut self, request: CancellationRequest) -> Self {
283 self.cancellation = CancellationToken::cancelled(request);
284 self
285 }
286
287 #[must_use]
289 pub fn with_cancellation_token(mut self, token: CancellationToken) -> Self {
290 self.cancellation = token;
291 self
292 }
293
294 #[must_use]
296 pub const fn workflow_id(&self) -> &WorkflowId {
297 &self.workflow_id
298 }
299
300 #[must_use]
302 pub const fn node_id(&self) -> &NodeId {
303 &self.node_id
304 }
305
306 #[must_use]
308 pub const fn execution(&self) -> &ExecutionMetadata {
309 &self.execution
310 }
311
312 #[must_use]
314 pub fn cancellation(&self) -> CancellationState {
315 self.cancellation.state()
316 }
317
318 #[must_use]
320 pub fn cancellation_token(&self) -> CancellationToken {
321 self.cancellation.clone()
322 }
323
324 #[must_use]
326 pub fn is_cancelled(&self) -> bool {
327 self.cancellation.is_cancelled()
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 use super::*;
334
335 fn execution_id(value: &str) -> ExecutionId {
336 ExecutionId::new(value).expect("valid execution id")
337 }
338
339 fn node_id(value: &str) -> NodeId {
340 NodeId::new(value).expect("valid node id")
341 }
342
343 fn workflow_id(value: &str) -> WorkflowId {
344 WorkflowId::new(value).expect("valid workflow id")
345 }
346
347 fn execution() -> ExecutionMetadata {
348 ExecutionMetadata::first_attempt(execution_id("run-1"))
349 }
350
351 #[test]
352 fn first_execution_attempt_is_one_based() {
353 assert_eq!(ExecutionAttempt::first().get(), 1);
354 }
355
356 #[test]
357 fn node_context_starts_active_and_can_carry_cancellation() {
358 let ctx: NodeContext = NodeContext::new(workflow_id("flow"), node_id("node"), execution());
359
360 assert!(!ctx.is_cancelled());
361 assert!(matches!(ctx.cancellation(), CancellationState::Active));
362
363 let cancelled: NodeContext =
364 ctx.with_cancellation(CancellationRequest::new("shutdown requested"));
365
366 assert!(cancelled.is_cancelled());
367 assert!(matches!(
368 cancelled.cancellation(),
369 CancellationState::Requested(request) if request.reason() == "shutdown requested"
370 ));
371 }
372
373 #[test]
374 fn shared_cancellation_handle_reaches_parent_and_child_contexts() {
375 let handle: CancellationHandle = CancellationHandle::new();
376 let parent: NodeContext =
377 NodeContext::new(workflow_id("flow"), node_id("parent"), execution())
378 .with_cancellation_token(handle.token());
379 let child: NodeContext =
380 NodeContext::new(workflow_id("flow"), node_id("child"), execution())
381 .with_cancellation_token(parent.cancellation_token());
382
383 assert!(!parent.is_cancelled());
384 assert!(!child.is_cancelled());
385
386 assert!(handle.cancel(CancellationRequest::new("supervisor shutdown")));
387 assert!(!handle.cancel(CancellationRequest::new("ignored duplicate")));
388
389 assert!(parent.is_cancelled());
390 assert!(child.is_cancelled());
391 assert!(matches!(
392 child.cancellation(),
393 CancellationState::Requested(request) if request.reason() == "supervisor shutdown"
394 ));
395 }
396}