Skip to main content

worldinterface_connector/
context.rs

1//! Invocation context and cancellation token for connector invocations.
2
3use std::sync::atomic::{AtomicBool, Ordering};
4use std::sync::Arc;
5
6use uuid::Uuid;
7use worldinterface_core::id::{FlowRunId, NodeId, StepRunId};
8
9/// Context provided to every connector invocation.
10///
11/// Contains identity fields for idempotency (Invariant 3), tracing, and
12/// cooperative cancellation.
13#[derive(Debug, Clone)]
14pub struct InvocationContext {
15    /// Which flow run this invocation belongs to.
16    pub flow_run_id: FlowRunId,
17    /// Which node in the FlowSpec graph is being executed.
18    pub node_id: NodeId,
19    /// The step execution identity (deterministic from FlowRunId + NodeId).
20    pub step_run_id: StepRunId,
21    /// The AQ RunId — the idempotency key for this invocation.
22    /// Stored as raw UUID to avoid coupling worldinterface-connector to actionqueue-core.
23    pub run_id: Uuid,
24    /// The AQ AttemptId — unique per retry attempt. Used for logging, NOT
25    /// for idempotency.
26    pub attempt_id: Uuid,
27    /// Which attempt this is (1-indexed). Useful for connectors that want
28    /// to log retry information.
29    pub attempt_number: u32,
30    /// Cooperative cancellation signal. Connectors performing long operations
31    /// should poll this at bounded intervals.
32    pub cancellation: CancellationToken,
33}
34
35/// Cooperative cancellation signal for connector invocations.
36///
37/// Wraps an atomic flag that the runtime sets when a timeout or cancellation
38/// is requested. Connectors performing long operations should check
39/// `is_cancelled()` at bounded intervals.
40#[derive(Debug, Clone)]
41pub struct CancellationToken {
42    cancelled: Arc<AtomicBool>,
43}
44
45impl CancellationToken {
46    pub fn new() -> Self {
47        Self { cancelled: Arc::new(AtomicBool::new(false)) }
48    }
49
50    pub fn is_cancelled(&self) -> bool {
51        self.cancelled.load(Ordering::Acquire)
52    }
53
54    pub fn cancel(&self) {
55        self.cancelled.store(true, Ordering::Release);
56    }
57
58    /// Construct from an external atomic flag. Used by the Step handler
59    /// (Sprint 4) to bridge AQ's CancellationToken.
60    pub fn from_flag(flag: Arc<AtomicBool>) -> Self {
61        Self { cancelled: flag }
62    }
63}
64
65impl Default for CancellationToken {
66    fn default() -> Self {
67        Self::new()
68    }
69}
70
71#[cfg(test)]
72mod tests {
73    use super::*;
74
75    #[test]
76    fn invocation_context_carries_all_fields() {
77        let ctx = InvocationContext {
78            flow_run_id: FlowRunId::new(),
79            node_id: NodeId::new(),
80            step_run_id: StepRunId::new(),
81            run_id: Uuid::new_v4(),
82            attempt_id: Uuid::new_v4(),
83            attempt_number: 1,
84            cancellation: CancellationToken::new(),
85        };
86        // All fields are accessible
87        let _ = ctx.flow_run_id;
88        let _ = ctx.node_id;
89        let _ = ctx.step_run_id;
90        let _ = ctx.run_id;
91        let _ = ctx.attempt_id;
92        assert_eq!(ctx.attempt_number, 1);
93        assert!(!ctx.cancellation.is_cancelled());
94    }
95
96    #[test]
97    fn cancellation_token_starts_uncancelled() {
98        let token = CancellationToken::new();
99        assert!(!token.is_cancelled());
100    }
101
102    #[test]
103    fn cancellation_token_cancel_sets_flag() {
104        let token = CancellationToken::new();
105        token.cancel();
106        assert!(token.is_cancelled());
107    }
108
109    #[test]
110    fn cancellation_token_from_flag() {
111        let flag = Arc::new(AtomicBool::new(false));
112        let token = CancellationToken::from_flag(Arc::clone(&flag));
113        assert!(!token.is_cancelled());
114        flag.store(true, Ordering::Release);
115        assert!(token.is_cancelled());
116    }
117
118    #[test]
119    fn cancellation_token_is_clone() {
120        let token = CancellationToken::new();
121        let clone = token.clone();
122        clone.cancel();
123        assert!(token.is_cancelled());
124    }
125}