Skip to main content

nexo_taskflow/
types.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4use uuid::Uuid;
5
6/// Lifecycle states a flow can be in. Transitions are enforced by
7/// `can_transition_to`.
8#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
9#[serde(rename_all = "snake_case")]
10pub enum FlowStatus {
11    Created,
12    Running,
13    Waiting,
14    Cancelled,
15    Finished,
16    Failed,
17}
18
19impl FlowStatus {
20    pub fn is_terminal(&self) -> bool {
21        matches!(
22            self,
23            FlowStatus::Cancelled | FlowStatus::Finished | FlowStatus::Failed
24        )
25    }
26
27    /// Legal direct transitions from this status. Cancellation is allowed
28    /// from any non-terminal status; the rest are constrained to a small
29    /// graph that mirrors the documented flow lifecycle.
30    pub fn can_transition_to(&self, next: FlowStatus) -> bool {
31        if self.is_terminal() {
32            return false;
33        }
34        if next == FlowStatus::Cancelled {
35            return true; // any non-terminal → Cancelled
36        }
37        matches!(
38            (self, next),
39            (FlowStatus::Created, FlowStatus::Running)
40                | (FlowStatus::Running, FlowStatus::Waiting)
41                | (FlowStatus::Running, FlowStatus::Finished)
42                | (FlowStatus::Running, FlowStatus::Failed)
43                | (FlowStatus::Waiting, FlowStatus::Running)
44                | (FlowStatus::Waiting, FlowStatus::Failed)
45        )
46    }
47
48    pub fn as_str(&self) -> &'static str {
49        match self {
50            FlowStatus::Created => "created",
51            FlowStatus::Running => "running",
52            FlowStatus::Waiting => "waiting",
53            FlowStatus::Cancelled => "cancelled",
54            FlowStatus::Finished => "finished",
55            FlowStatus::Failed => "failed",
56        }
57    }
58
59    #[allow(clippy::should_implement_trait)]
60    pub fn from_str(s: &str) -> Option<Self> {
61        Some(match s {
62            "created" => FlowStatus::Created,
63            "running" => FlowStatus::Running,
64            "waiting" => FlowStatus::Waiting,
65            "cancelled" => FlowStatus::Cancelled,
66            "finished" => FlowStatus::Finished,
67            "failed" => FlowStatus::Failed,
68            _ => return None,
69        })
70    }
71}
72
73/// A flow record as persisted in `flows`. `state_json` and `wait_json` carry
74/// arbitrary controller-specific payloads; the runtime treats them as opaque
75/// blobs and only mutates them through revision-checked APIs.
76#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct Flow {
78    pub id: Uuid,
79    pub controller_id: String,
80    pub goal: String,
81    pub owner_session_key: String,
82    pub requester_origin: String,
83    pub current_step: String,
84    pub state_json: Value,
85    pub wait_json: Option<Value>,
86    pub status: FlowStatus,
87    /// Sticky cancel intent. When `true`, the flow refuses any non-terminal
88    /// transition until it reaches `Cancelled`. Survives restart.
89    pub cancel_requested: bool,
90    pub revision: i64,
91    pub created_at: DateTime<Utc>,
92    pub updated_at: DateTime<Utc>,
93}
94
95impl Flow {
96    /// Validate and apply a status transition. Does not persist — caller is
97    /// responsible for `FlowStore::update_with_revision`. Mutates `self` so
98    /// the caller can immediately persist the resulting state.
99    pub fn transition_to(&mut self, next: FlowStatus) -> Result<(), FlowError> {
100        if self.status.is_terminal() {
101            return Err(FlowError::AlreadyTerminal {
102                id: self.id,
103                status: self.status,
104            });
105        }
106        if self.cancel_requested && next != FlowStatus::Cancelled {
107            return Err(FlowError::CancelPending { id: self.id });
108        }
109        if !self.status.can_transition_to(next) {
110            return Err(FlowError::IllegalTransition {
111                from: self.status,
112                to: next,
113            });
114        }
115        self.status = next;
116        self.updated_at = chrono::Utc::now();
117        Ok(())
118    }
119
120    /// Mark a sticky cancel intent. Survives restart; subsequent transitions
121    /// are restricted to `Cancelled`. Idempotent — calling on an already
122    /// cancel-requested flow is a no-op.
123    pub fn request_cancel(&mut self) {
124        if !self.status.is_terminal() {
125            self.cancel_requested = true;
126            self.updated_at = chrono::Utc::now();
127        }
128    }
129}
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
132#[serde(rename_all = "snake_case")]
133pub enum StepRuntime {
134    /// We own the lifecycle: created and driven by the FlowManager.
135    Managed,
136    /// We observe an externally created task and reflect its status here.
137    Mirrored,
138}
139
140impl StepRuntime {
141    pub fn as_str(&self) -> &'static str {
142        match self {
143            StepRuntime::Managed => "managed",
144            StepRuntime::Mirrored => "mirrored",
145        }
146    }
147
148    #[allow(clippy::should_implement_trait)]
149    pub fn from_str(s: &str) -> Option<Self> {
150        Some(match s {
151            "managed" => StepRuntime::Managed,
152            "mirrored" => StepRuntime::Mirrored,
153            _ => return None,
154        })
155    }
156}
157
158#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
159#[serde(rename_all = "snake_case")]
160pub enum FlowStepStatus {
161    Pending,
162    Running,
163    Succeeded,
164    Failed,
165    Cancelled,
166}
167
168impl FlowStepStatus {
169    pub fn as_str(&self) -> &'static str {
170        match self {
171            FlowStepStatus::Pending => "pending",
172            FlowStepStatus::Running => "running",
173            FlowStepStatus::Succeeded => "succeeded",
174            FlowStepStatus::Failed => "failed",
175            FlowStepStatus::Cancelled => "cancelled",
176        }
177    }
178
179    #[allow(clippy::should_implement_trait)]
180    pub fn from_str(s: &str) -> Option<Self> {
181        Some(match s {
182            "pending" => FlowStepStatus::Pending,
183            "running" => FlowStepStatus::Running,
184            "succeeded" => FlowStepStatus::Succeeded,
185            "failed" => FlowStepStatus::Failed,
186            "cancelled" => FlowStepStatus::Cancelled,
187            _ => return None,
188        })
189    }
190}
191
192#[derive(Debug, Clone, Serialize, Deserialize)]
193pub struct FlowStep {
194    pub id: Uuid,
195    pub flow_id: Uuid,
196    pub runtime: StepRuntime,
197    pub child_session_key: Option<String>,
198    pub run_id: String,
199    pub task: String,
200    pub status: FlowStepStatus,
201    pub result_json: Option<Value>,
202    pub created_at: DateTime<Utc>,
203    pub updated_at: DateTime<Utc>,
204}
205
206/// Audit log entry for any flow mutation. Append-only, never updated.
207#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct FlowEvent {
209    pub id: i64,
210    pub flow_id: Uuid,
211    pub kind: String,
212    pub payload_json: Value,
213    pub at: DateTime<Utc>,
214}
215
216#[derive(Debug, thiserror::Error)]
217pub enum FlowError {
218    #[error("flow not found: {0}")]
219    NotFound(Uuid),
220    #[error("revision mismatch: expected {expected}, found {actual}")]
221    RevisionMismatch { expected: i64, actual: i64 },
222    #[error("illegal transition from {from:?} to {to:?}")]
223    IllegalTransition { from: FlowStatus, to: FlowStatus },
224    #[error("flow {id} is already terminal ({status:?})")]
225    AlreadyTerminal { id: Uuid, status: FlowStatus },
226    #[error("flow {id} has cancel_requested; only Cancelled transition allowed")]
227    CancelPending { id: Uuid },
228    #[error("storage error: {0}")]
229    Storage(#[from] sqlx::Error),
230    #[error("invalid data: {0}")]
231    InvalidData(String),
232}
233
234#[cfg(test)]
235mod tests {
236    use super::*;
237    use serde_json::json;
238    use uuid::Uuid;
239
240    fn flow(status: FlowStatus) -> Flow {
241        let now = chrono::Utc::now();
242        Flow {
243            id: Uuid::new_v4(),
244            controller_id: "test".into(),
245            goal: "test".into(),
246            owner_session_key: "owner".into(),
247            requester_origin: "user".into(),
248            current_step: "init".into(),
249            state_json: json!({}),
250            wait_json: None,
251            status,
252            cancel_requested: false,
253            revision: 0,
254            created_at: now,
255            updated_at: now,
256        }
257    }
258
259    #[test]
260    fn status_round_trip() {
261        for s in [
262            FlowStatus::Created,
263            FlowStatus::Running,
264            FlowStatus::Waiting,
265            FlowStatus::Cancelled,
266            FlowStatus::Finished,
267            FlowStatus::Failed,
268        ] {
269            assert_eq!(FlowStatus::from_str(s.as_str()), Some(s));
270        }
271        assert!(FlowStatus::from_str("nope").is_none());
272    }
273
274    #[test]
275    fn terminal_flag_matches_intent() {
276        assert!(!FlowStatus::Created.is_terminal());
277        assert!(!FlowStatus::Running.is_terminal());
278        assert!(!FlowStatus::Waiting.is_terminal());
279        assert!(FlowStatus::Cancelled.is_terminal());
280        assert!(FlowStatus::Finished.is_terminal());
281        assert!(FlowStatus::Failed.is_terminal());
282    }
283
284    #[test]
285    fn legal_transitions_succeed() {
286        let mut f = flow(FlowStatus::Created);
287        f.transition_to(FlowStatus::Running)
288            .expect("created→running");
289        f.transition_to(FlowStatus::Waiting)
290            .expect("running→waiting");
291        f.transition_to(FlowStatus::Running)
292            .expect("waiting→running");
293        f.transition_to(FlowStatus::Finished)
294            .expect("running→finished");
295        assert_eq!(f.status, FlowStatus::Finished);
296    }
297
298    #[test]
299    fn illegal_transitions_are_rejected() {
300        let mut f = flow(FlowStatus::Created);
301        let err = f
302            .transition_to(FlowStatus::Waiting)
303            .expect_err("created→waiting illegal");
304        assert!(matches!(err, FlowError::IllegalTransition { .. }));
305    }
306
307    #[test]
308    fn cancel_allowed_from_any_non_terminal() {
309        for start in [
310            FlowStatus::Created,
311            FlowStatus::Running,
312            FlowStatus::Waiting,
313        ] {
314            let mut f = flow(start);
315            f.transition_to(FlowStatus::Cancelled)
316                .unwrap_or_else(|_| panic!("{start:?}→Cancelled must be legal"));
317            assert_eq!(f.status, FlowStatus::Cancelled);
318        }
319    }
320
321    #[test]
322    fn terminal_flow_rejects_any_transition() {
323        for term in [
324            FlowStatus::Cancelled,
325            FlowStatus::Finished,
326            FlowStatus::Failed,
327        ] {
328            let mut f = flow(term);
329            let err = f
330                .transition_to(FlowStatus::Running)
331                .expect_err("terminal must reject");
332            assert!(matches!(err, FlowError::AlreadyTerminal { .. }));
333        }
334    }
335
336    #[test]
337    fn cancel_requested_blocks_non_cancel_transition() {
338        let mut f = flow(FlowStatus::Running);
339        f.request_cancel();
340        assert!(f.cancel_requested);
341        let err = f.transition_to(FlowStatus::Finished).expect_err("blocked");
342        assert!(matches!(err, FlowError::CancelPending { .. }));
343        // But Cancelled is still allowed.
344        f.transition_to(FlowStatus::Cancelled).expect("cancel ok");
345        assert_eq!(f.status, FlowStatus::Cancelled);
346    }
347
348    #[test]
349    fn request_cancel_is_idempotent_and_no_op_on_terminal() {
350        let mut f = flow(FlowStatus::Finished);
351        f.request_cancel();
352        assert!(
353            !f.cancel_requested,
354            "terminal flow should not gain cancel intent"
355        );
356
357        let mut g = flow(FlowStatus::Running);
358        g.request_cancel();
359        let first_ts = g.updated_at;
360        g.request_cancel();
361        assert!(g.cancel_requested);
362        // updated_at may shift but flag stays on; either way the call is safe.
363        let _ = first_ts;
364    }
365}