1use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, HashSet};
9use uuid::Uuid;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "lowercase")]
14pub enum FlowState {
15 Created,
17 Running,
19 Paused,
21 FrozenForMerge,
23 Completed,
25 Merged,
27 Aborted,
29}
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
33#[serde(rename_all = "lowercase")]
34pub enum TaskExecState {
35 Pending,
37 Ready,
39 Running,
41 Verifying,
43 Success,
45 Retry,
47 Failed,
49 Escalated,
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
54#[serde(rename_all = "lowercase")]
55pub enum RetryMode {
56 #[default]
57 Clean,
58 Continue,
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
62#[serde(rename_all = "lowercase")]
63pub enum RunMode {
64 #[default]
65 Auto,
66 Manual,
67}
68
69#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct TaskExecution {
72 pub task_id: Uuid,
74 pub state: TaskExecState,
76 pub attempt_count: u32,
78 #[serde(default)]
79 pub retry_mode: RetryMode,
80 #[serde(default)]
81 pub frozen_commit_sha: Option<String>,
82 #[serde(default)]
83 pub integrated_commit_sha: Option<String>,
84 pub updated_at: DateTime<Utc>,
86 pub blocked_reason: Option<String>,
88}
89
90impl TaskExecution {
91 #[must_use]
93 pub fn new(task_id: Uuid) -> Self {
94 Self {
95 task_id,
96 state: TaskExecState::Pending,
97 attempt_count: 0,
98 retry_mode: RetryMode::default(),
99 frozen_commit_sha: None,
100 integrated_commit_sha: None,
101 updated_at: Utc::now(),
102 blocked_reason: None,
103 }
104 }
105
106 pub fn transition(&mut self, new_state: TaskExecState) -> Result<(), FlowError> {
108 if !self.can_transition_to(new_state) {
109 return Err(FlowError::InvalidTransition {
110 from: self.state,
111 to: new_state,
112 });
113 }
114
115 self.state = new_state;
116 self.updated_at = Utc::now();
117
118 if new_state == TaskExecState::Running {
119 self.attempt_count += 1;
120 }
121
122 Ok(())
123 }
124
125 #[must_use]
127 pub fn can_transition_to(&self, new_state: TaskExecState) -> bool {
128 use TaskExecState::{
129 Escalated, Failed, Pending, Ready, Retry, Running, Success, Verifying,
130 };
131 matches!(
132 (self.state, new_state),
133 (Pending, Ready | Running)
134 | (Ready | Retry, Running)
135 | (Running, Verifying)
136 | (Verifying, Success | Retry | Failed)
137 | (Failed, Escalated)
138 )
139 }
140}
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct TaskFlow {
145 pub id: Uuid,
147 pub graph_id: Uuid,
149 pub project_id: Uuid,
151 #[serde(default)]
152 pub base_revision: Option<String>,
153 #[serde(default)]
154 pub run_mode: RunMode,
155 #[serde(default)]
156 pub depends_on_flows: HashSet<Uuid>,
157 pub state: FlowState,
159 pub task_executions: HashMap<Uuid, TaskExecution>,
161 pub created_at: DateTime<Utc>,
163 pub started_at: Option<DateTime<Utc>>,
165 pub completed_at: Option<DateTime<Utc>>,
167 pub updated_at: DateTime<Utc>,
169}
170
171impl TaskFlow {
172 #[must_use]
174 pub fn new(graph_id: Uuid, project_id: Uuid, task_ids: &[Uuid]) -> Self {
175 let now = Utc::now();
176 let mut task_executions = HashMap::new();
177
178 for &task_id in task_ids {
179 task_executions.insert(task_id, TaskExecution::new(task_id));
180 }
181
182 Self {
183 id: Uuid::new_v4(),
184 graph_id,
185 project_id,
186 base_revision: None,
187 run_mode: RunMode::Manual,
188 depends_on_flows: HashSet::new(),
189 state: FlowState::Created,
190 task_executions,
191 created_at: now,
192 started_at: None,
193 completed_at: None,
194 updated_at: now,
195 }
196 }
197
198 pub fn start(&mut self) -> Result<(), FlowError> {
200 if self.state != FlowState::Created {
201 return Err(FlowError::InvalidFlowTransition {
202 from: self.state,
203 to: FlowState::Running,
204 });
205 }
206
207 self.state = FlowState::Running;
208 self.started_at = Some(Utc::now());
209 self.updated_at = Utc::now();
210 Ok(())
211 }
212
213 pub fn pause(&mut self) -> Result<(), FlowError> {
215 if self.state != FlowState::Running {
216 return Err(FlowError::InvalidFlowTransition {
217 from: self.state,
218 to: FlowState::Paused,
219 });
220 }
221
222 self.state = FlowState::Paused;
223 self.updated_at = Utc::now();
224 Ok(())
225 }
226
227 pub fn resume(&mut self) -> Result<(), FlowError> {
229 if self.state != FlowState::Paused {
230 return Err(FlowError::InvalidFlowTransition {
231 from: self.state,
232 to: FlowState::Running,
233 });
234 }
235
236 self.state = FlowState::Running;
237 self.updated_at = Utc::now();
238 Ok(())
239 }
240
241 pub fn abort(&mut self) -> Result<(), FlowError> {
243 if matches!(
244 self.state,
245 FlowState::Completed | FlowState::Merged | FlowState::Aborted
246 ) {
247 return Err(FlowError::InvalidFlowTransition {
248 from: self.state,
249 to: FlowState::Aborted,
250 });
251 }
252
253 self.state = FlowState::Aborted;
254 self.completed_at = Some(Utc::now());
255 self.updated_at = Utc::now();
256 Ok(())
257 }
258
259 pub fn complete(&mut self) -> Result<(), FlowError> {
261 if self.state != FlowState::Running {
262 return Err(FlowError::InvalidFlowTransition {
263 from: self.state,
264 to: FlowState::Completed,
265 });
266 }
267
268 for exec in self.task_executions.values() {
270 if !matches!(
271 exec.state,
272 TaskExecState::Success | TaskExecState::Failed | TaskExecState::Escalated
273 ) {
274 return Err(FlowError::TasksNotComplete);
275 }
276 }
277
278 self.state = FlowState::Completed;
279 self.completed_at = Some(Utc::now());
280 self.updated_at = Utc::now();
281 Ok(())
282 }
283
284 #[must_use]
286 pub fn get_task_execution(&self, task_id: Uuid) -> Option<&TaskExecution> {
287 self.task_executions.get(&task_id)
288 }
289
290 pub fn get_task_execution_mut(&mut self, task_id: Uuid) -> Option<&mut TaskExecution> {
292 self.task_executions.get_mut(&task_id)
293 }
294
295 pub fn transition_task(
297 &mut self,
298 task_id: Uuid,
299 new_state: TaskExecState,
300 ) -> Result<(), FlowError> {
301 let exec = self
302 .task_executions
303 .get_mut(&task_id)
304 .ok_or(FlowError::TaskNotFound(task_id))?;
305
306 exec.transition(new_state)?;
307 self.updated_at = Utc::now();
308 Ok(())
309 }
310
311 #[must_use]
313 pub fn is_terminal(&self) -> bool {
314 matches!(
315 self.state,
316 FlowState::Completed | FlowState::Merged | FlowState::Aborted
317 )
318 }
319
320 #[must_use]
322 pub fn tasks_in_state(&self, state: TaskExecState) -> Vec<Uuid> {
323 self.task_executions
324 .iter()
325 .filter(|(_, exec)| exec.state == state)
326 .map(|(id, _)| *id)
327 .collect()
328 }
329
330 #[must_use]
332 pub fn task_state_counts(&self) -> HashMap<TaskExecState, usize> {
333 let mut counts = HashMap::new();
334 for exec in self.task_executions.values() {
335 *counts.entry(exec.state).or_insert(0) += 1;
336 }
337 counts
338 }
339}
340
341#[derive(Debug, Clone, PartialEq, Eq)]
343pub enum FlowError {
344 InvalidTransition {
346 from: TaskExecState,
347 to: TaskExecState,
348 },
349 InvalidFlowTransition { from: FlowState, to: FlowState },
351 TaskNotFound(Uuid),
353 TasksNotComplete,
355}
356
357impl std::fmt::Display for FlowError {
358 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
359 match self {
360 Self::InvalidTransition { from, to } => {
361 write!(f, "Invalid transition from {from:?} to {to:?}")
362 }
363 Self::InvalidFlowTransition { from, to } => {
364 write!(f, "Invalid flow transition from {from:?} to {to:?}")
365 }
366 Self::TaskNotFound(id) => write!(f, "Task not found: {id}"),
367 Self::TasksNotComplete => write!(f, "Cannot complete flow with incomplete tasks"),
368 }
369 }
370}
371
372impl std::error::Error for FlowError {}
373
374#[cfg(test)]
375mod tests {
376 use super::*;
377
378 fn test_flow() -> TaskFlow {
379 let task_ids = vec![Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4()];
380 TaskFlow::new(Uuid::new_v4(), Uuid::new_v4(), &task_ids)
381 }
382
383 #[test]
384 fn create_flow() {
385 let flow = test_flow();
386 assert_eq!(flow.state, FlowState::Created);
387 assert_eq!(flow.task_executions.len(), 3);
388 }
389
390 #[test]
391 fn flow_lifecycle() {
392 let mut flow = test_flow();
393
394 assert!(flow.start().is_ok());
395 assert_eq!(flow.state, FlowState::Running);
396 assert!(flow.started_at.is_some());
397
398 assert!(flow.pause().is_ok());
399 assert_eq!(flow.state, FlowState::Paused);
400
401 assert!(flow.resume().is_ok());
402 assert_eq!(flow.state, FlowState::Running);
403
404 assert!(flow.abort().is_ok());
405 assert_eq!(flow.state, FlowState::Aborted);
406 assert!(flow.completed_at.is_some());
407 }
408
409 #[test]
410 fn cannot_start_twice() {
411 let mut flow = test_flow();
412 flow.start().unwrap();
413
414 let result = flow.start();
415 assert!(result.is_err());
416 }
417
418 #[test]
419 fn task_execution_transitions() {
420 let mut exec = TaskExecution::new(Uuid::new_v4());
421
422 assert_eq!(exec.state, TaskExecState::Pending);
423
424 exec.transition(TaskExecState::Ready).unwrap();
425 assert_eq!(exec.state, TaskExecState::Ready);
426
427 exec.transition(TaskExecState::Running).unwrap();
428 assert_eq!(exec.state, TaskExecState::Running);
429 assert_eq!(exec.attempt_count, 1);
430
431 exec.transition(TaskExecState::Verifying).unwrap();
432 assert_eq!(exec.state, TaskExecState::Verifying);
433
434 exec.transition(TaskExecState::Success).unwrap();
435 assert_eq!(exec.state, TaskExecState::Success);
436 }
437
438 #[test]
439 fn task_retry_cycle() {
440 let mut exec = TaskExecution::new(Uuid::new_v4());
441
442 exec.transition(TaskExecState::Ready).unwrap();
443 exec.transition(TaskExecState::Running).unwrap();
444 exec.transition(TaskExecState::Verifying).unwrap();
445 exec.transition(TaskExecState::Retry).unwrap();
446
447 assert_eq!(exec.attempt_count, 1);
448
449 exec.transition(TaskExecState::Running).unwrap();
450 assert_eq!(exec.attempt_count, 2);
451 }
452
453 #[test]
454 fn invalid_task_transition() {
455 let mut exec = TaskExecution::new(Uuid::new_v4());
456
457 let result = exec.transition(TaskExecState::Success);
459 assert!(result.is_err());
460 }
461
462 #[test]
463 fn flow_task_transition() {
464 let mut flow = test_flow();
465 let task_id = *flow.task_executions.keys().next().unwrap();
466
467 flow.transition_task(task_id, TaskExecState::Ready).unwrap();
468
469 let exec = flow.get_task_execution(task_id).unwrap();
470 assert_eq!(exec.state, TaskExecState::Ready);
471 }
472
473 #[test]
474 fn tasks_in_state() {
475 let mut flow = test_flow();
476 let task_ids: Vec<_> = flow.task_executions.keys().copied().collect();
477
478 flow.transition_task(task_ids[0], TaskExecState::Ready)
479 .unwrap();
480 flow.transition_task(task_ids[1], TaskExecState::Ready)
481 .unwrap();
482
483 let ready_tasks = flow.tasks_in_state(TaskExecState::Ready);
484 assert_eq!(ready_tasks.len(), 2);
485
486 let pending_tasks = flow.tasks_in_state(TaskExecState::Pending);
487 assert_eq!(pending_tasks.len(), 1);
488 }
489
490 #[test]
491 fn flow_completion_requires_terminal_tasks() {
492 let mut flow = test_flow();
493 flow.start().unwrap();
494
495 let result = flow.complete();
497 assert!(result.is_err());
498 }
499
500 #[test]
501 fn flow_serialization() {
502 let flow = test_flow();
503 let json = serde_json::to_string(&flow).unwrap();
504 let restored: TaskFlow = serde_json::from_str(&json).unwrap();
505
506 assert_eq!(flow.id, restored.id);
507 assert_eq!(flow.state, restored.state);
508 }
509}