Skip to main content

nexo_taskflow/
types.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4use uuid::Uuid;
5
6/// Lifecycle states a flow can be in. Transitions are enforced in Phase 14.2.
7#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
8#[serde(rename_all = "snake_case")]
9pub enum FlowStatus {
10    Created,
11    Running,
12    Waiting,
13    Cancelled,
14    Finished,
15    Failed,
16}
17
18impl FlowStatus {
19    pub fn is_terminal(&self) -> bool {
20        matches!(
21            self,
22            FlowStatus::Cancelled | FlowStatus::Finished | FlowStatus::Failed
23        )
24    }
25
26    /// Legal direct transitions from this status. Cancellation is allowed
27    /// from any non-terminal status; the rest are constrained to a small
28    /// graph that mirrors the documented Phase 14 lifecycle.
29    pub fn can_transition_to(&self, next: FlowStatus) -> bool {
30        if self.is_terminal() {
31            return false;
32        }
33        if next == FlowStatus::Cancelled {
34            return true; // any non-terminal → Cancelled
35        }
36        matches!(
37            (self, next),
38            (FlowStatus::Created, FlowStatus::Running)
39                | (FlowStatus::Running, FlowStatus::Waiting)
40                | (FlowStatus::Running, FlowStatus::Finished)
41                | (FlowStatus::Running, FlowStatus::Failed)
42                | (FlowStatus::Waiting, FlowStatus::Running)
43                | (FlowStatus::Waiting, FlowStatus::Failed)
44        )
45    }
46
47    pub fn as_str(&self) -> &'static str {
48        match self {
49            FlowStatus::Created => "created",
50            FlowStatus::Running => "running",
51            FlowStatus::Waiting => "waiting",
52            FlowStatus::Cancelled => "cancelled",
53            FlowStatus::Finished => "finished",
54            FlowStatus::Failed => "failed",
55        }
56    }
57
58    #[allow(clippy::should_implement_trait)]
59    pub fn from_str(s: &str) -> Option<Self> {
60        Some(match s {
61            "created" => FlowStatus::Created,
62            "running" => FlowStatus::Running,
63            "waiting" => FlowStatus::Waiting,
64            "cancelled" => FlowStatus::Cancelled,
65            "finished" => FlowStatus::Finished,
66            "failed" => FlowStatus::Failed,
67            _ => return None,
68        })
69    }
70}
71
72/// A flow record as persisted in `flows`. `state_json` and `wait_json` carry
73/// arbitrary controller-specific payloads; the runtime treats them as opaque
74/// blobs and only mutates them through revision-checked APIs.
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct Flow {
77    pub id: Uuid,
78    pub controller_id: String,
79    pub goal: String,
80    pub owner_session_key: String,
81    pub requester_origin: String,
82    pub current_step: String,
83    pub state_json: Value,
84    pub wait_json: Option<Value>,
85    pub status: FlowStatus,
86    /// Sticky cancel intent. When `true`, the flow refuses any non-terminal
87    /// transition until it reaches `Cancelled`. Survives restart.
88    pub cancel_requested: bool,
89    pub revision: i64,
90    pub created_at: DateTime<Utc>,
91    pub updated_at: DateTime<Utc>,
92}
93
94impl Flow {
95    /// Validate and apply a status transition. Does not persist — caller is
96    /// responsible for `FlowStore::update_with_revision`. Mutates `self` so
97    /// the caller can immediately persist the resulting state.
98    pub fn transition_to(&mut self, next: FlowStatus) -> Result<(), FlowError> {
99        if self.status.is_terminal() {
100            return Err(FlowError::AlreadyTerminal {
101                id: self.id,
102                status: self.status,
103            });
104        }
105        if self.cancel_requested && next != FlowStatus::Cancelled {
106            return Err(FlowError::CancelPending { id: self.id });
107        }
108        if !self.status.can_transition_to(next) {
109            return Err(FlowError::IllegalTransition {
110                from: self.status,
111                to: next,
112            });
113        }
114        self.status = next;
115        self.updated_at = chrono::Utc::now();
116        Ok(())
117    }
118
119    /// Mark a sticky cancel intent. Survives restart; subsequent transitions
120    /// are restricted to `Cancelled`. Idempotent — calling on an already
121    /// cancel-requested flow is a no-op.
122    pub fn request_cancel(&mut self) {
123        if !self.status.is_terminal() {
124            self.cancel_requested = true;
125            self.updated_at = chrono::Utc::now();
126        }
127    }
128}
129
130#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
131#[serde(rename_all = "snake_case")]
132pub enum StepRuntime {
133    /// We own the lifecycle: created and driven by the FlowManager.
134    Managed,
135    /// We observe an externally created task and reflect its status here.
136    Mirrored,
137}
138
139impl StepRuntime {
140    pub fn as_str(&self) -> &'static str {
141        match self {
142            StepRuntime::Managed => "managed",
143            StepRuntime::Mirrored => "mirrored",
144        }
145    }
146
147    #[allow(clippy::should_implement_trait)]
148    pub fn from_str(s: &str) -> Option<Self> {
149        Some(match s {
150            "managed" => StepRuntime::Managed,
151            "mirrored" => StepRuntime::Mirrored,
152            _ => return None,
153        })
154    }
155}
156
157#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
158#[serde(rename_all = "snake_case")]
159pub enum FlowStepStatus {
160    Pending,
161    Running,
162    Succeeded,
163    Failed,
164    Cancelled,
165}
166
167impl FlowStepStatus {
168    pub fn as_str(&self) -> &'static str {
169        match self {
170            FlowStepStatus::Pending => "pending",
171            FlowStepStatus::Running => "running",
172            FlowStepStatus::Succeeded => "succeeded",
173            FlowStepStatus::Failed => "failed",
174            FlowStepStatus::Cancelled => "cancelled",
175        }
176    }
177
178    #[allow(clippy::should_implement_trait)]
179    pub fn from_str(s: &str) -> Option<Self> {
180        Some(match s {
181            "pending" => FlowStepStatus::Pending,
182            "running" => FlowStepStatus::Running,
183            "succeeded" => FlowStepStatus::Succeeded,
184            "failed" => FlowStepStatus::Failed,
185            "cancelled" => FlowStepStatus::Cancelled,
186            _ => return None,
187        })
188    }
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct FlowStep {
193    pub id: Uuid,
194    pub flow_id: Uuid,
195    pub runtime: StepRuntime,
196    pub child_session_key: Option<String>,
197    pub run_id: String,
198    pub task: String,
199    pub status: FlowStepStatus,
200    pub result_json: Option<Value>,
201    pub created_at: DateTime<Utc>,
202    pub updated_at: DateTime<Utc>,
203}
204
205/// Audit log entry for any flow mutation. Append-only, never updated.
206#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct FlowEvent {
208    pub id: i64,
209    pub flow_id: Uuid,
210    pub kind: String,
211    pub payload_json: Value,
212    pub at: DateTime<Utc>,
213}
214
215#[derive(Debug, thiserror::Error)]
216pub enum FlowError {
217    #[error("flow not found: {0}")]
218    NotFound(Uuid),
219    #[error("revision mismatch: expected {expected}, found {actual}")]
220    RevisionMismatch { expected: i64, actual: i64 },
221    #[error("illegal transition from {from:?} to {to:?}")]
222    IllegalTransition { from: FlowStatus, to: FlowStatus },
223    #[error("flow {id} is already terminal ({status:?})")]
224    AlreadyTerminal { id: Uuid, status: FlowStatus },
225    #[error("flow {id} has cancel_requested; only Cancelled transition allowed")]
226    CancelPending { id: Uuid },
227    #[error("storage error: {0}")]
228    Storage(#[from] sqlx::Error),
229    #[error("invalid data: {0}")]
230    InvalidData(String),
231}
232
233#[cfg(test)]
234mod tests {
235    use super::*;
236    use serde_json::json;
237    use uuid::Uuid;
238
239    fn flow(status: FlowStatus) -> Flow {
240        let now = chrono::Utc::now();
241        Flow {
242            id: Uuid::new_v4(),
243            controller_id: "test".into(),
244            goal: "test".into(),
245            owner_session_key: "owner".into(),
246            requester_origin: "user".into(),
247            current_step: "init".into(),
248            state_json: json!({}),
249            wait_json: None,
250            status,
251            cancel_requested: false,
252            revision: 0,
253            created_at: now,
254            updated_at: now,
255        }
256    }
257
258    #[test]
259    fn status_round_trip() {
260        for s in [
261            FlowStatus::Created,
262            FlowStatus::Running,
263            FlowStatus::Waiting,
264            FlowStatus::Cancelled,
265            FlowStatus::Finished,
266            FlowStatus::Failed,
267        ] {
268            assert_eq!(FlowStatus::from_str(s.as_str()), Some(s));
269        }
270        assert!(FlowStatus::from_str("nope").is_none());
271    }
272
273    #[test]
274    fn terminal_flag_matches_intent() {
275        assert!(!FlowStatus::Created.is_terminal());
276        assert!(!FlowStatus::Running.is_terminal());
277        assert!(!FlowStatus::Waiting.is_terminal());
278        assert!(FlowStatus::Cancelled.is_terminal());
279        assert!(FlowStatus::Finished.is_terminal());
280        assert!(FlowStatus::Failed.is_terminal());
281    }
282
283    #[test]
284    fn legal_transitions_succeed() {
285        let mut f = flow(FlowStatus::Created);
286        f.transition_to(FlowStatus::Running)
287            .expect("created→running");
288        f.transition_to(FlowStatus::Waiting)
289            .expect("running→waiting");
290        f.transition_to(FlowStatus::Running)
291            .expect("waiting→running");
292        f.transition_to(FlowStatus::Finished)
293            .expect("running→finished");
294        assert_eq!(f.status, FlowStatus::Finished);
295    }
296
297    #[test]
298    fn illegal_transitions_are_rejected() {
299        let mut f = flow(FlowStatus::Created);
300        let err = f
301            .transition_to(FlowStatus::Waiting)
302            .expect_err("created→waiting illegal");
303        assert!(matches!(err, FlowError::IllegalTransition { .. }));
304    }
305
306    #[test]
307    fn cancel_allowed_from_any_non_terminal() {
308        for start in [
309            FlowStatus::Created,
310            FlowStatus::Running,
311            FlowStatus::Waiting,
312        ] {
313            let mut f = flow(start);
314            f.transition_to(FlowStatus::Cancelled)
315                .unwrap_or_else(|_| panic!("{start:?}→Cancelled must be legal"));
316            assert_eq!(f.status, FlowStatus::Cancelled);
317        }
318    }
319
320    #[test]
321    fn terminal_flow_rejects_any_transition() {
322        for term in [
323            FlowStatus::Cancelled,
324            FlowStatus::Finished,
325            FlowStatus::Failed,
326        ] {
327            let mut f = flow(term);
328            let err = f
329                .transition_to(FlowStatus::Running)
330                .expect_err("terminal must reject");
331            assert!(matches!(err, FlowError::AlreadyTerminal { .. }));
332        }
333    }
334
335    #[test]
336    fn cancel_requested_blocks_non_cancel_transition() {
337        let mut f = flow(FlowStatus::Running);
338        f.request_cancel();
339        assert!(f.cancel_requested);
340        let err = f
341            .transition_to(FlowStatus::Finished)
342            .expect_err("blocked");
343        assert!(matches!(err, FlowError::CancelPending { .. }));
344        // But Cancelled is still allowed.
345        f.transition_to(FlowStatus::Cancelled).expect("cancel ok");
346        assert_eq!(f.status, FlowStatus::Cancelled);
347    }
348
349    #[test]
350    fn request_cancel_is_idempotent_and_no_op_on_terminal() {
351        let mut f = flow(FlowStatus::Finished);
352        f.request_cancel();
353        assert!(
354            !f.cancel_requested,
355            "terminal flow should not gain cancel intent"
356        );
357
358        let mut g = flow(FlowStatus::Running);
359        g.request_cancel();
360        let first_ts = g.updated_at;
361        g.request_cancel();
362        assert!(g.cancel_requested);
363        // updated_at may shift but flag stays on; either way the call is safe.
364        let _ = first_ts;
365    }
366}