1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Serialize};
3use serde_json::Value;
4use uuid::Uuid;
5
6#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
9#[serde(rename_all = "snake_case")]
10pub enum FlowStatus {
11 Created,
12 Running,
13 Waiting,
14 Cancelled,
15 Finished,
16 Failed,
17}
18
19impl FlowStatus {
20 pub fn is_terminal(&self) -> bool {
21 matches!(
22 self,
23 FlowStatus::Cancelled | FlowStatus::Finished | FlowStatus::Failed
24 )
25 }
26
27 pub fn can_transition_to(&self, next: FlowStatus) -> bool {
31 if self.is_terminal() {
32 return false;
33 }
34 if next == FlowStatus::Cancelled {
35 return true; }
37 matches!(
38 (self, next),
39 (FlowStatus::Created, FlowStatus::Running)
40 | (FlowStatus::Running, FlowStatus::Waiting)
41 | (FlowStatus::Running, FlowStatus::Finished)
42 | (FlowStatus::Running, FlowStatus::Failed)
43 | (FlowStatus::Waiting, FlowStatus::Running)
44 | (FlowStatus::Waiting, FlowStatus::Failed)
45 )
46 }
47
48 pub fn as_str(&self) -> &'static str {
49 match self {
50 FlowStatus::Created => "created",
51 FlowStatus::Running => "running",
52 FlowStatus::Waiting => "waiting",
53 FlowStatus::Cancelled => "cancelled",
54 FlowStatus::Finished => "finished",
55 FlowStatus::Failed => "failed",
56 }
57 }
58
59 #[allow(clippy::should_implement_trait)]
60 pub fn from_str(s: &str) -> Option<Self> {
61 Some(match s {
62 "created" => FlowStatus::Created,
63 "running" => FlowStatus::Running,
64 "waiting" => FlowStatus::Waiting,
65 "cancelled" => FlowStatus::Cancelled,
66 "finished" => FlowStatus::Finished,
67 "failed" => FlowStatus::Failed,
68 _ => return None,
69 })
70 }
71}
72
73#[derive(Debug, Clone, Serialize, Deserialize)]
77pub struct Flow {
78 pub id: Uuid,
79 pub controller_id: String,
80 pub goal: String,
81 pub owner_session_key: String,
82 pub requester_origin: String,
83 pub current_step: String,
84 pub state_json: Value,
85 pub wait_json: Option<Value>,
86 pub status: FlowStatus,
87 pub cancel_requested: bool,
90 pub revision: i64,
91 pub created_at: DateTime<Utc>,
92 pub updated_at: DateTime<Utc>,
93}
94
95impl Flow {
96 pub fn transition_to(&mut self, next: FlowStatus) -> Result<(), FlowError> {
100 if self.status.is_terminal() {
101 return Err(FlowError::AlreadyTerminal {
102 id: self.id,
103 status: self.status,
104 });
105 }
106 if self.cancel_requested && next != FlowStatus::Cancelled {
107 return Err(FlowError::CancelPending { id: self.id });
108 }
109 if !self.status.can_transition_to(next) {
110 return Err(FlowError::IllegalTransition {
111 from: self.status,
112 to: next,
113 });
114 }
115 self.status = next;
116 self.updated_at = chrono::Utc::now();
117 Ok(())
118 }
119
120 pub fn request_cancel(&mut self) {
124 if !self.status.is_terminal() {
125 self.cancel_requested = true;
126 self.updated_at = chrono::Utc::now();
127 }
128 }
129}
130
131#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
132#[serde(rename_all = "snake_case")]
133pub enum StepRuntime {
134 Managed,
136 Mirrored,
138}
139
140impl StepRuntime {
141 pub fn as_str(&self) -> &'static str {
142 match self {
143 StepRuntime::Managed => "managed",
144 StepRuntime::Mirrored => "mirrored",
145 }
146 }
147
148 #[allow(clippy::should_implement_trait)]
149 pub fn from_str(s: &str) -> Option<Self> {
150 Some(match s {
151 "managed" => StepRuntime::Managed,
152 "mirrored" => StepRuntime::Mirrored,
153 _ => return None,
154 })
155 }
156}
157
158#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
159#[serde(rename_all = "snake_case")]
160pub enum FlowStepStatus {
161 Pending,
162 Running,
163 Succeeded,
164 Failed,
165 Cancelled,
166}
167
168impl FlowStepStatus {
169 pub fn as_str(&self) -> &'static str {
170 match self {
171 FlowStepStatus::Pending => "pending",
172 FlowStepStatus::Running => "running",
173 FlowStepStatus::Succeeded => "succeeded",
174 FlowStepStatus::Failed => "failed",
175 FlowStepStatus::Cancelled => "cancelled",
176 }
177 }
178
179 #[allow(clippy::should_implement_trait)]
180 pub fn from_str(s: &str) -> Option<Self> {
181 Some(match s {
182 "pending" => FlowStepStatus::Pending,
183 "running" => FlowStepStatus::Running,
184 "succeeded" => FlowStepStatus::Succeeded,
185 "failed" => FlowStepStatus::Failed,
186 "cancelled" => FlowStepStatus::Cancelled,
187 _ => return None,
188 })
189 }
190}
191
192#[derive(Debug, Clone, Serialize, Deserialize)]
193pub struct FlowStep {
194 pub id: Uuid,
195 pub flow_id: Uuid,
196 pub runtime: StepRuntime,
197 pub child_session_key: Option<String>,
198 pub run_id: String,
199 pub task: String,
200 pub status: FlowStepStatus,
201 pub result_json: Option<Value>,
202 pub created_at: DateTime<Utc>,
203 pub updated_at: DateTime<Utc>,
204}
205
206#[derive(Debug, Clone, Serialize, Deserialize)]
208pub struct FlowEvent {
209 pub id: i64,
210 pub flow_id: Uuid,
211 pub kind: String,
212 pub payload_json: Value,
213 pub at: DateTime<Utc>,
214}
215
216#[derive(Debug, thiserror::Error)]
217pub enum FlowError {
218 #[error("flow not found: {0}")]
219 NotFound(Uuid),
220 #[error("revision mismatch: expected {expected}, found {actual}")]
221 RevisionMismatch { expected: i64, actual: i64 },
222 #[error("illegal transition from {from:?} to {to:?}")]
223 IllegalTransition { from: FlowStatus, to: FlowStatus },
224 #[error("flow {id} is already terminal ({status:?})")]
225 AlreadyTerminal { id: Uuid, status: FlowStatus },
226 #[error("flow {id} has cancel_requested; only Cancelled transition allowed")]
227 CancelPending { id: Uuid },
228 #[error("storage error: {0}")]
229 Storage(#[from] sqlx::Error),
230 #[error("invalid data: {0}")]
231 InvalidData(String),
232}
233
234#[cfg(test)]
235mod tests {
236 use super::*;
237 use serde_json::json;
238 use uuid::Uuid;
239
240 fn flow(status: FlowStatus) -> Flow {
241 let now = chrono::Utc::now();
242 Flow {
243 id: Uuid::new_v4(),
244 controller_id: "test".into(),
245 goal: "test".into(),
246 owner_session_key: "owner".into(),
247 requester_origin: "user".into(),
248 current_step: "init".into(),
249 state_json: json!({}),
250 wait_json: None,
251 status,
252 cancel_requested: false,
253 revision: 0,
254 created_at: now,
255 updated_at: now,
256 }
257 }
258
259 #[test]
260 fn status_round_trip() {
261 for s in [
262 FlowStatus::Created,
263 FlowStatus::Running,
264 FlowStatus::Waiting,
265 FlowStatus::Cancelled,
266 FlowStatus::Finished,
267 FlowStatus::Failed,
268 ] {
269 assert_eq!(FlowStatus::from_str(s.as_str()), Some(s));
270 }
271 assert!(FlowStatus::from_str("nope").is_none());
272 }
273
274 #[test]
275 fn terminal_flag_matches_intent() {
276 assert!(!FlowStatus::Created.is_terminal());
277 assert!(!FlowStatus::Running.is_terminal());
278 assert!(!FlowStatus::Waiting.is_terminal());
279 assert!(FlowStatus::Cancelled.is_terminal());
280 assert!(FlowStatus::Finished.is_terminal());
281 assert!(FlowStatus::Failed.is_terminal());
282 }
283
284 #[test]
285 fn legal_transitions_succeed() {
286 let mut f = flow(FlowStatus::Created);
287 f.transition_to(FlowStatus::Running)
288 .expect("created→running");
289 f.transition_to(FlowStatus::Waiting)
290 .expect("running→waiting");
291 f.transition_to(FlowStatus::Running)
292 .expect("waiting→running");
293 f.transition_to(FlowStatus::Finished)
294 .expect("running→finished");
295 assert_eq!(f.status, FlowStatus::Finished);
296 }
297
298 #[test]
299 fn illegal_transitions_are_rejected() {
300 let mut f = flow(FlowStatus::Created);
301 let err = f
302 .transition_to(FlowStatus::Waiting)
303 .expect_err("created→waiting illegal");
304 assert!(matches!(err, FlowError::IllegalTransition { .. }));
305 }
306
307 #[test]
308 fn cancel_allowed_from_any_non_terminal() {
309 for start in [
310 FlowStatus::Created,
311 FlowStatus::Running,
312 FlowStatus::Waiting,
313 ] {
314 let mut f = flow(start);
315 f.transition_to(FlowStatus::Cancelled)
316 .unwrap_or_else(|_| panic!("{start:?}→Cancelled must be legal"));
317 assert_eq!(f.status, FlowStatus::Cancelled);
318 }
319 }
320
321 #[test]
322 fn terminal_flow_rejects_any_transition() {
323 for term in [
324 FlowStatus::Cancelled,
325 FlowStatus::Finished,
326 FlowStatus::Failed,
327 ] {
328 let mut f = flow(term);
329 let err = f
330 .transition_to(FlowStatus::Running)
331 .expect_err("terminal must reject");
332 assert!(matches!(err, FlowError::AlreadyTerminal { .. }));
333 }
334 }
335
336 #[test]
337 fn cancel_requested_blocks_non_cancel_transition() {
338 let mut f = flow(FlowStatus::Running);
339 f.request_cancel();
340 assert!(f.cancel_requested);
341 let err = f.transition_to(FlowStatus::Finished).expect_err("blocked");
342 assert!(matches!(err, FlowError::CancelPending { .. }));
343 f.transition_to(FlowStatus::Cancelled).expect("cancel ok");
345 assert_eq!(f.status, FlowStatus::Cancelled);
346 }
347
348 #[test]
349 fn request_cancel_is_idempotent_and_no_op_on_terminal() {
350 let mut f = flow(FlowStatus::Finished);
351 f.request_cancel();
352 assert!(
353 !f.cancel_requested,
354 "terminal flow should not gain cancel intent"
355 );
356
357 let mut g = flow(FlowStatus::Running);
358 g.request_cancel();
359 let first_ts = g.updated_at;
360 g.request_cancel();
361 assert!(g.cancel_requested);
362 let _ = first_ts;
364 }
365}