1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4use uuid::Uuid;
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
8#[serde(rename_all = "snake_case")]
9pub enum FlowStatus {
10 Created,
11 Running,
12 Waiting,
13 Cancelled,
14 Finished,
15 Failed,
16}
17
18impl FlowStatus {
19 pub fn is_terminal(&self) -> bool {
20 matches!(
21 self,
22 FlowStatus::Cancelled | FlowStatus::Finished | FlowStatus::Failed
23 )
24 }
25
26 pub fn can_transition_to(&self, next: FlowStatus) -> bool {
30 if self.is_terminal() {
31 return false;
32 }
33 if next == FlowStatus::Cancelled {
34 return true; }
36 matches!(
37 (self, next),
38 (FlowStatus::Created, FlowStatus::Running)
39 | (FlowStatus::Running, FlowStatus::Waiting)
40 | (FlowStatus::Running, FlowStatus::Finished)
41 | (FlowStatus::Running, FlowStatus::Failed)
42 | (FlowStatus::Waiting, FlowStatus::Running)
43 | (FlowStatus::Waiting, FlowStatus::Failed)
44 )
45 }
46
47 pub fn as_str(&self) -> &'static str {
48 match self {
49 FlowStatus::Created => "created",
50 FlowStatus::Running => "running",
51 FlowStatus::Waiting => "waiting",
52 FlowStatus::Cancelled => "cancelled",
53 FlowStatus::Finished => "finished",
54 FlowStatus::Failed => "failed",
55 }
56 }
57
58 #[allow(clippy::should_implement_trait)]
59 pub fn from_str(s: &str) -> Option<Self> {
60 Some(match s {
61 "created" => FlowStatus::Created,
62 "running" => FlowStatus::Running,
63 "waiting" => FlowStatus::Waiting,
64 "cancelled" => FlowStatus::Cancelled,
65 "finished" => FlowStatus::Finished,
66 "failed" => FlowStatus::Failed,
67 _ => return None,
68 })
69 }
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct Flow {
77 pub id: Uuid,
78 pub controller_id: String,
79 pub goal: String,
80 pub owner_session_key: String,
81 pub requester_origin: String,
82 pub current_step: String,
83 pub state_json: Value,
84 pub wait_json: Option<Value>,
85 pub status: FlowStatus,
86 pub cancel_requested: bool,
89 pub revision: i64,
90 pub created_at: DateTime<Utc>,
91 pub updated_at: DateTime<Utc>,
92}
93
94impl Flow {
95 pub fn transition_to(&mut self, next: FlowStatus) -> Result<(), FlowError> {
99 if self.status.is_terminal() {
100 return Err(FlowError::AlreadyTerminal {
101 id: self.id,
102 status: self.status,
103 });
104 }
105 if self.cancel_requested && next != FlowStatus::Cancelled {
106 return Err(FlowError::CancelPending { id: self.id });
107 }
108 if !self.status.can_transition_to(next) {
109 return Err(FlowError::IllegalTransition {
110 from: self.status,
111 to: next,
112 });
113 }
114 self.status = next;
115 self.updated_at = chrono::Utc::now();
116 Ok(())
117 }
118
119 pub fn request_cancel(&mut self) {
123 if !self.status.is_terminal() {
124 self.cancel_requested = true;
125 self.updated_at = chrono::Utc::now();
126 }
127 }
128}
129
130#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
131#[serde(rename_all = "snake_case")]
132pub enum StepRuntime {
133 Managed,
135 Mirrored,
137}
138
139impl StepRuntime {
140 pub fn as_str(&self) -> &'static str {
141 match self {
142 StepRuntime::Managed => "managed",
143 StepRuntime::Mirrored => "mirrored",
144 }
145 }
146
147 #[allow(clippy::should_implement_trait)]
148 pub fn from_str(s: &str) -> Option<Self> {
149 Some(match s {
150 "managed" => StepRuntime::Managed,
151 "mirrored" => StepRuntime::Mirrored,
152 _ => return None,
153 })
154 }
155}
156
157#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
158#[serde(rename_all = "snake_case")]
159pub enum FlowStepStatus {
160 Pending,
161 Running,
162 Succeeded,
163 Failed,
164 Cancelled,
165}
166
167impl FlowStepStatus {
168 pub fn as_str(&self) -> &'static str {
169 match self {
170 FlowStepStatus::Pending => "pending",
171 FlowStepStatus::Running => "running",
172 FlowStepStatus::Succeeded => "succeeded",
173 FlowStepStatus::Failed => "failed",
174 FlowStepStatus::Cancelled => "cancelled",
175 }
176 }
177
178 #[allow(clippy::should_implement_trait)]
179 pub fn from_str(s: &str) -> Option<Self> {
180 Some(match s {
181 "pending" => FlowStepStatus::Pending,
182 "running" => FlowStepStatus::Running,
183 "succeeded" => FlowStepStatus::Succeeded,
184 "failed" => FlowStepStatus::Failed,
185 "cancelled" => FlowStepStatus::Cancelled,
186 _ => return None,
187 })
188 }
189}
190
191#[derive(Debug, Clone, Serialize, Deserialize)]
192pub struct FlowStep {
193 pub id: Uuid,
194 pub flow_id: Uuid,
195 pub runtime: StepRuntime,
196 pub child_session_key: Option<String>,
197 pub run_id: String,
198 pub task: String,
199 pub status: FlowStepStatus,
200 pub result_json: Option<Value>,
201 pub created_at: DateTime<Utc>,
202 pub updated_at: DateTime<Utc>,
203}
204
205#[derive(Debug, Clone, Serialize, Deserialize)]
207pub struct FlowEvent {
208 pub id: i64,
209 pub flow_id: Uuid,
210 pub kind: String,
211 pub payload_json: Value,
212 pub at: DateTime<Utc>,
213}
214
215#[derive(Debug, thiserror::Error)]
216pub enum FlowError {
217 #[error("flow not found: {0}")]
218 NotFound(Uuid),
219 #[error("revision mismatch: expected {expected}, found {actual}")]
220 RevisionMismatch { expected: i64, actual: i64 },
221 #[error("illegal transition from {from:?} to {to:?}")]
222 IllegalTransition { from: FlowStatus, to: FlowStatus },
223 #[error("flow {id} is already terminal ({status:?})")]
224 AlreadyTerminal { id: Uuid, status: FlowStatus },
225 #[error("flow {id} has cancel_requested; only Cancelled transition allowed")]
226 CancelPending { id: Uuid },
227 #[error("storage error: {0}")]
228 Storage(#[from] sqlx::Error),
229 #[error("invalid data: {0}")]
230 InvalidData(String),
231}
232
233#[cfg(test)]
234mod tests {
235 use super::*;
236 use serde_json::json;
237 use uuid::Uuid;
238
239 fn flow(status: FlowStatus) -> Flow {
240 let now = chrono::Utc::now();
241 Flow {
242 id: Uuid::new_v4(),
243 controller_id: "test".into(),
244 goal: "test".into(),
245 owner_session_key: "owner".into(),
246 requester_origin: "user".into(),
247 current_step: "init".into(),
248 state_json: json!({}),
249 wait_json: None,
250 status,
251 cancel_requested: false,
252 revision: 0,
253 created_at: now,
254 updated_at: now,
255 }
256 }
257
258 #[test]
259 fn status_round_trip() {
260 for s in [
261 FlowStatus::Created,
262 FlowStatus::Running,
263 FlowStatus::Waiting,
264 FlowStatus::Cancelled,
265 FlowStatus::Finished,
266 FlowStatus::Failed,
267 ] {
268 assert_eq!(FlowStatus::from_str(s.as_str()), Some(s));
269 }
270 assert!(FlowStatus::from_str("nope").is_none());
271 }
272
273 #[test]
274 fn terminal_flag_matches_intent() {
275 assert!(!FlowStatus::Created.is_terminal());
276 assert!(!FlowStatus::Running.is_terminal());
277 assert!(!FlowStatus::Waiting.is_terminal());
278 assert!(FlowStatus::Cancelled.is_terminal());
279 assert!(FlowStatus::Finished.is_terminal());
280 assert!(FlowStatus::Failed.is_terminal());
281 }
282
283 #[test]
284 fn legal_transitions_succeed() {
285 let mut f = flow(FlowStatus::Created);
286 f.transition_to(FlowStatus::Running)
287 .expect("created→running");
288 f.transition_to(FlowStatus::Waiting)
289 .expect("running→waiting");
290 f.transition_to(FlowStatus::Running)
291 .expect("waiting→running");
292 f.transition_to(FlowStatus::Finished)
293 .expect("running→finished");
294 assert_eq!(f.status, FlowStatus::Finished);
295 }
296
297 #[test]
298 fn illegal_transitions_are_rejected() {
299 let mut f = flow(FlowStatus::Created);
300 let err = f
301 .transition_to(FlowStatus::Waiting)
302 .expect_err("created→waiting illegal");
303 assert!(matches!(err, FlowError::IllegalTransition { .. }));
304 }
305
306 #[test]
307 fn cancel_allowed_from_any_non_terminal() {
308 for start in [
309 FlowStatus::Created,
310 FlowStatus::Running,
311 FlowStatus::Waiting,
312 ] {
313 let mut f = flow(start);
314 f.transition_to(FlowStatus::Cancelled)
315 .unwrap_or_else(|_| panic!("{start:?}→Cancelled must be legal"));
316 assert_eq!(f.status, FlowStatus::Cancelled);
317 }
318 }
319
320 #[test]
321 fn terminal_flow_rejects_any_transition() {
322 for term in [
323 FlowStatus::Cancelled,
324 FlowStatus::Finished,
325 FlowStatus::Failed,
326 ] {
327 let mut f = flow(term);
328 let err = f
329 .transition_to(FlowStatus::Running)
330 .expect_err("terminal must reject");
331 assert!(matches!(err, FlowError::AlreadyTerminal { .. }));
332 }
333 }
334
335 #[test]
336 fn cancel_requested_blocks_non_cancel_transition() {
337 let mut f = flow(FlowStatus::Running);
338 f.request_cancel();
339 assert!(f.cancel_requested);
340 let err = f
341 .transition_to(FlowStatus::Finished)
342 .expect_err("blocked");
343 assert!(matches!(err, FlowError::CancelPending { .. }));
344 f.transition_to(FlowStatus::Cancelled).expect("cancel ok");
346 assert_eq!(f.status, FlowStatus::Cancelled);
347 }
348
349 #[test]
350 fn request_cancel_is_idempotent_and_no_op_on_terminal() {
351 let mut f = flow(FlowStatus::Finished);
352 f.request_cancel();
353 assert!(
354 !f.cancel_requested,
355 "terminal flow should not gain cancel intent"
356 );
357
358 let mut g = flow(FlowStatus::Running);
359 g.request_cancel();
360 let first_ts = g.updated_at;
361 g.request_cancel();
362 assert!(g.cancel_requested);
363 let _ = first_ts;
365 }
366}