Skip to main content

pureflow_core/
context.rs

1//! Execution context and cancellation boundary types.
2//!
3//! ## Fragment: context-runtime-boundary
4//!
5//! This module keeps the runtime-facing context deliberately small: workflow
6//! identity, node identity, execution identity, and cancellation state. That
7//! is enough for the foundation beads to define what a node is executing
8//! without prematurely choosing an async runtime, scheduler, or transport.
9//!
10//! ## Fragment: context-cancellation-shape
11//!
12//! Cancellation is represented as a Pureflow-owned shared signal rather than as
13//! an exposed async-runtime context. Runtime supervisors can request
14//! cancellation after a node starts, and cloned parent or child contexts observe
15//! the same request, but node APIs still see only Pureflow `NodeContext`
16//! semantics rather than raw `asupersync::Cx`.
17//!
18//! ## Fragment: context-attempt-numbering
19//!
20//! Execution attempts are one-based on purpose. Retry counts are usually read
21//! by humans in logs and diagnostics, and `attempt = 1` is less error-prone
22//! than forcing every downstream consumer to translate from zero-based storage.
23
24use std::num::NonZeroU32;
25use std::sync::{Arc, Mutex, MutexGuard, PoisonError};
26
27use pureflow_types::{ExecutionId, NodeId, WorkflowId};
28
29/// One-based attempt number for an execution boundary.
30#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
31pub struct ExecutionAttempt(NonZeroU32);
32
33impl ExecutionAttempt {
34    /// Create an execution attempt from a one-based value.
35    #[must_use]
36    pub const fn new(value: NonZeroU32) -> Self {
37        Self(value)
38    }
39
40    /// First attempt for a workflow execution.
41    #[must_use]
42    pub const fn first() -> Self {
43        Self(NonZeroU32::MIN)
44    }
45
46    /// Return the one-based attempt number.
47    #[must_use]
48    pub const fn get(self) -> u32 {
49        self.0.get()
50    }
51}
52
53/// Metadata that identifies one workflow execution attempt.
54#[derive(Debug, Clone, PartialEq, Eq)]
55pub struct ExecutionMetadata {
56    execution_id: ExecutionId,
57    attempt: ExecutionAttempt,
58}
59
60impl ExecutionMetadata {
61    /// Create execution metadata for an explicit attempt.
62    #[must_use]
63    pub const fn new(execution_id: ExecutionId, attempt: ExecutionAttempt) -> Self {
64        Self {
65            execution_id,
66            attempt,
67        }
68    }
69
70    /// Create execution metadata for the first attempt.
71    #[must_use]
72    pub const fn first_attempt(execution_id: ExecutionId) -> Self {
73        Self::new(execution_id, ExecutionAttempt::first())
74    }
75
76    /// Identifier for this workflow execution.
77    #[must_use]
78    pub const fn execution_id(&self) -> &ExecutionId {
79        &self.execution_id
80    }
81
82    /// One-based attempt for this workflow execution.
83    #[must_use]
84    pub const fn attempt(&self) -> ExecutionAttempt {
85        self.attempt
86    }
87}
88
89/// Cancellation request visible at the runtime boundary.
90#[derive(Debug, Clone, PartialEq, Eq)]
91pub struct CancellationRequest {
92    reason: String,
93}
94
95impl CancellationRequest {
96    /// Create a cancellation request with a human-readable reason.
97    #[must_use]
98    pub fn new(reason: impl Into<String>) -> Self {
99        Self {
100            reason: reason.into(),
101        }
102    }
103
104    /// Human-readable reason for cancellation.
105    #[must_use]
106    pub fn reason(&self) -> &str {
107        &self.reason
108    }
109}
110
111/// Cancellation state carried by a node execution context.
112#[derive(Debug, Clone, PartialEq, Eq)]
113pub enum CancellationState {
114    /// No cancellation has been requested.
115    Active,
116    /// Cancellation has been requested at the runtime boundary.
117    Requested(CancellationRequest),
118}
119
120impl CancellationState {
121    /// Return whether cancellation has been requested.
122    #[must_use]
123    pub const fn is_requested(&self) -> bool {
124        matches!(self, Self::Requested(_))
125    }
126}
127
128#[derive(Debug, Default)]
129struct CancellationSignal {
130    request: Mutex<Option<CancellationRequest>>,
131}
132
133/// Read-only cancellation view carried by a node execution context.
134#[derive(Debug, Clone)]
135pub struct CancellationToken {
136    signal: Arc<CancellationSignal>,
137}
138
139impl CancellationToken {
140    /// Create an active cancellation token.
141    #[must_use]
142    pub fn active() -> Self {
143        Self {
144            signal: Arc::new(CancellationSignal::default()),
145        }
146    }
147
148    /// Create a token that already has cancellation requested.
149    #[must_use]
150    pub fn cancelled(request: CancellationRequest) -> Self {
151        let token: Self = Self::active();
152        let _first_request: bool = token.request_cancellation(request);
153        token
154    }
155
156    /// Return the current cancellation request, if any.
157    #[must_use]
158    pub fn request(&self) -> Option<CancellationRequest> {
159        self.signal
160            .request
161            .lock()
162            .unwrap_or_else(PoisonError::into_inner)
163            .clone()
164    }
165
166    /// Return the current cancellation state.
167    #[must_use]
168    pub fn state(&self) -> CancellationState {
169        self.request()
170            .map_or(CancellationState::Active, |request: CancellationRequest| {
171                CancellationState::Requested(request)
172            })
173    }
174
175    /// Return whether cancellation has been requested.
176    #[must_use]
177    pub fn is_cancelled(&self) -> bool {
178        self.request().is_some()
179    }
180
181    fn request_cancellation(&self, request: CancellationRequest) -> bool {
182        let mut guard: MutexGuard<'_, Option<CancellationRequest>> = self
183            .signal
184            .request
185            .lock()
186            .unwrap_or_else(PoisonError::into_inner);
187
188        if guard.is_some() {
189            return false;
190        }
191
192        *guard = Some(request);
193        true
194    }
195}
196
197impl Default for CancellationToken {
198    fn default() -> Self {
199        Self::active()
200    }
201}
202
203impl PartialEq for CancellationToken {
204    fn eq(&self, other: &Self) -> bool {
205        self.request() == other.request()
206    }
207}
208
209impl Eq for CancellationToken {}
210
211/// Runtime-owned handle that can request cancellation for shared contexts.
212#[derive(Debug, Clone, PartialEq, Eq)]
213pub struct CancellationHandle {
214    token: CancellationToken,
215}
216
217impl CancellationHandle {
218    /// Create a cancellation handle with an active token.
219    #[must_use]
220    pub fn new() -> Self {
221        Self {
222            token: CancellationToken::active(),
223        }
224    }
225
226    /// Return a read-only token suitable for attaching to a `NodeContext`.
227    #[must_use]
228    pub fn token(&self) -> CancellationToken {
229        self.token.clone()
230    }
231
232    /// Request cancellation for every context sharing this handle's token.
233    ///
234    /// Returns `true` when this call recorded the first cancellation request.
235    #[must_use]
236    pub fn cancel(&self, request: CancellationRequest) -> bool {
237        self.token.request_cancellation(request)
238    }
239
240    /// Return whether cancellation has been requested.
241    #[must_use]
242    pub fn is_cancelled(&self) -> bool {
243        self.token.is_cancelled()
244    }
245
246    /// Return the current cancellation request, if any.
247    #[must_use]
248    pub fn request(&self) -> Option<CancellationRequest> {
249        self.token.request()
250    }
251}
252
253impl Default for CancellationHandle {
254    fn default() -> Self {
255        Self::new()
256    }
257}
258
259/// Minimal execution context passed to runtime-managed nodes.
260#[derive(Debug, Clone, PartialEq, Eq)]
261pub struct NodeContext {
262    workflow_id: WorkflowId,
263    node_id: NodeId,
264    execution: ExecutionMetadata,
265    cancellation: CancellationToken,
266}
267
268impl NodeContext {
269    /// Create an active node context for one execution attempt.
270    #[must_use]
271    pub fn new(workflow_id: WorkflowId, node_id: NodeId, execution: ExecutionMetadata) -> Self {
272        Self {
273            workflow_id,
274            node_id,
275            execution,
276            cancellation: CancellationToken::active(),
277        }
278    }
279
280    /// Create a copy of this context with cancellation requested.
281    #[must_use]
282    pub fn with_cancellation(mut self, request: CancellationRequest) -> Self {
283        self.cancellation = CancellationToken::cancelled(request);
284        self
285    }
286
287    /// Attach a shared cancellation token to this context.
288    #[must_use]
289    pub fn with_cancellation_token(mut self, token: CancellationToken) -> Self {
290        self.cancellation = token;
291        self
292    }
293
294    /// Workflow currently being executed.
295    #[must_use]
296    pub const fn workflow_id(&self) -> &WorkflowId {
297        &self.workflow_id
298    }
299
300    /// Node currently being executed.
301    #[must_use]
302    pub const fn node_id(&self) -> &NodeId {
303        &self.node_id
304    }
305
306    /// Execution metadata shared by nodes in the same run.
307    #[must_use]
308    pub const fn execution(&self) -> &ExecutionMetadata {
309        &self.execution
310    }
311
312    /// Cancellation state visible to this node.
313    #[must_use]
314    pub fn cancellation(&self) -> CancellationState {
315        self.cancellation.state()
316    }
317
318    /// Shared cancellation token visible to this node.
319    #[must_use]
320    pub fn cancellation_token(&self) -> CancellationToken {
321        self.cancellation.clone()
322    }
323
324    /// Return whether cancellation has been requested.
325    #[must_use]
326    pub fn is_cancelled(&self) -> bool {
327        self.cancellation.is_cancelled()
328    }
329}
330
331#[cfg(test)]
332mod tests {
333    use super::*;
334
335    fn execution_id(value: &str) -> ExecutionId {
336        ExecutionId::new(value).expect("valid execution id")
337    }
338
339    fn node_id(value: &str) -> NodeId {
340        NodeId::new(value).expect("valid node id")
341    }
342
343    fn workflow_id(value: &str) -> WorkflowId {
344        WorkflowId::new(value).expect("valid workflow id")
345    }
346
347    fn execution() -> ExecutionMetadata {
348        ExecutionMetadata::first_attempt(execution_id("run-1"))
349    }
350
351    #[test]
352    fn first_execution_attempt_is_one_based() {
353        assert_eq!(ExecutionAttempt::first().get(), 1);
354    }
355
356    #[test]
357    fn node_context_starts_active_and_can_carry_cancellation() {
358        let ctx: NodeContext = NodeContext::new(workflow_id("flow"), node_id("node"), execution());
359
360        assert!(!ctx.is_cancelled());
361        assert!(matches!(ctx.cancellation(), CancellationState::Active));
362
363        let cancelled: NodeContext =
364            ctx.with_cancellation(CancellationRequest::new("shutdown requested"));
365
366        assert!(cancelled.is_cancelled());
367        assert!(matches!(
368            cancelled.cancellation(),
369            CancellationState::Requested(request) if request.reason() == "shutdown requested"
370        ));
371    }
372
373    #[test]
374    fn shared_cancellation_handle_reaches_parent_and_child_contexts() {
375        let handle: CancellationHandle = CancellationHandle::new();
376        let parent: NodeContext =
377            NodeContext::new(workflow_id("flow"), node_id("parent"), execution())
378                .with_cancellation_token(handle.token());
379        let child: NodeContext =
380            NodeContext::new(workflow_id("flow"), node_id("child"), execution())
381                .with_cancellation_token(parent.cancellation_token());
382
383        assert!(!parent.is_cancelled());
384        assert!(!child.is_cancelled());
385
386        assert!(handle.cancel(CancellationRequest::new("supervisor shutdown")));
387        assert!(!handle.cancel(CancellationRequest::new("ignored duplicate")));
388
389        assert!(parent.is_cancelled());
390        assert!(child.is_cancelled());
391        assert!(matches!(
392            child.cancellation(),
393            CancellationState::Requested(request) if request.reason() == "supervisor shutdown"
394        ));
395    }
396}