1use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use uuid::Uuid;
10
11#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "lowercase")]
14pub enum FlowState {
15 Created,
17 Running,
19 Paused,
21 FrozenForMerge,
23 Completed,
25 Merged,
27 Aborted,
29}
30
31#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
33#[serde(rename_all = "lowercase")]
34pub enum TaskExecState {
35 Pending,
37 Ready,
39 Running,
41 Verifying,
43 Success,
45 Retry,
47 Failed,
49 Escalated,
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
54#[serde(rename_all = "lowercase")]
55pub enum RetryMode {
56 #[default]
57 Clean,
58 Continue,
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct TaskExecution {
64 pub task_id: Uuid,
66 pub state: TaskExecState,
68 pub attempt_count: u32,
70 #[serde(default)]
71 pub retry_mode: RetryMode,
72 #[serde(default)]
73 pub frozen_commit_sha: Option<String>,
74 #[serde(default)]
75 pub integrated_commit_sha: Option<String>,
76 pub updated_at: DateTime<Utc>,
78 pub blocked_reason: Option<String>,
80}
81
82impl TaskExecution {
83 #[must_use]
85 pub fn new(task_id: Uuid) -> Self {
86 Self {
87 task_id,
88 state: TaskExecState::Pending,
89 attempt_count: 0,
90 retry_mode: RetryMode::default(),
91 frozen_commit_sha: None,
92 integrated_commit_sha: None,
93 updated_at: Utc::now(),
94 blocked_reason: None,
95 }
96 }
97
98 pub fn transition(&mut self, new_state: TaskExecState) -> Result<(), FlowError> {
100 if !self.can_transition_to(new_state) {
101 return Err(FlowError::InvalidTransition {
102 from: self.state,
103 to: new_state,
104 });
105 }
106
107 self.state = new_state;
108 self.updated_at = Utc::now();
109
110 if new_state == TaskExecState::Running {
111 self.attempt_count += 1;
112 }
113
114 Ok(())
115 }
116
117 #[must_use]
119 pub fn can_transition_to(&self, new_state: TaskExecState) -> bool {
120 use TaskExecState::{
121 Escalated, Failed, Pending, Ready, Retry, Running, Success, Verifying,
122 };
123 matches!(
124 (self.state, new_state),
125 (Pending, Ready | Running)
126 | (Ready | Retry, Running)
127 | (Running, Verifying)
128 | (Verifying, Success | Retry | Failed)
129 | (Failed, Escalated)
130 )
131 }
132}
133
134#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct TaskFlow {
137 pub id: Uuid,
139 pub graph_id: Uuid,
141 pub project_id: Uuid,
143 #[serde(default)]
144 pub base_revision: Option<String>,
145 pub state: FlowState,
147 pub task_executions: HashMap<Uuid, TaskExecution>,
149 pub created_at: DateTime<Utc>,
151 pub started_at: Option<DateTime<Utc>>,
153 pub completed_at: Option<DateTime<Utc>>,
155 pub updated_at: DateTime<Utc>,
157}
158
159impl TaskFlow {
160 #[must_use]
162 pub fn new(graph_id: Uuid, project_id: Uuid, task_ids: &[Uuid]) -> Self {
163 let now = Utc::now();
164 let mut task_executions = HashMap::new();
165
166 for &task_id in task_ids {
167 task_executions.insert(task_id, TaskExecution::new(task_id));
168 }
169
170 Self {
171 id: Uuid::new_v4(),
172 graph_id,
173 project_id,
174 base_revision: None,
175 state: FlowState::Created,
176 task_executions,
177 created_at: now,
178 started_at: None,
179 completed_at: None,
180 updated_at: now,
181 }
182 }
183
184 pub fn start(&mut self) -> Result<(), FlowError> {
186 if self.state != FlowState::Created {
187 return Err(FlowError::InvalidFlowTransition {
188 from: self.state,
189 to: FlowState::Running,
190 });
191 }
192
193 self.state = FlowState::Running;
194 self.started_at = Some(Utc::now());
195 self.updated_at = Utc::now();
196 Ok(())
197 }
198
199 pub fn pause(&mut self) -> Result<(), FlowError> {
201 if self.state != FlowState::Running {
202 return Err(FlowError::InvalidFlowTransition {
203 from: self.state,
204 to: FlowState::Paused,
205 });
206 }
207
208 self.state = FlowState::Paused;
209 self.updated_at = Utc::now();
210 Ok(())
211 }
212
213 pub fn resume(&mut self) -> Result<(), FlowError> {
215 if self.state != FlowState::Paused {
216 return Err(FlowError::InvalidFlowTransition {
217 from: self.state,
218 to: FlowState::Running,
219 });
220 }
221
222 self.state = FlowState::Running;
223 self.updated_at = Utc::now();
224 Ok(())
225 }
226
227 pub fn abort(&mut self) -> Result<(), FlowError> {
229 if matches!(
230 self.state,
231 FlowState::Completed | FlowState::Merged | FlowState::Aborted
232 ) {
233 return Err(FlowError::InvalidFlowTransition {
234 from: self.state,
235 to: FlowState::Aborted,
236 });
237 }
238
239 self.state = FlowState::Aborted;
240 self.completed_at = Some(Utc::now());
241 self.updated_at = Utc::now();
242 Ok(())
243 }
244
245 pub fn complete(&mut self) -> Result<(), FlowError> {
247 if self.state != FlowState::Running {
248 return Err(FlowError::InvalidFlowTransition {
249 from: self.state,
250 to: FlowState::Completed,
251 });
252 }
253
254 for exec in self.task_executions.values() {
256 if !matches!(
257 exec.state,
258 TaskExecState::Success | TaskExecState::Failed | TaskExecState::Escalated
259 ) {
260 return Err(FlowError::TasksNotComplete);
261 }
262 }
263
264 self.state = FlowState::Completed;
265 self.completed_at = Some(Utc::now());
266 self.updated_at = Utc::now();
267 Ok(())
268 }
269
270 #[must_use]
272 pub fn get_task_execution(&self, task_id: Uuid) -> Option<&TaskExecution> {
273 self.task_executions.get(&task_id)
274 }
275
276 pub fn get_task_execution_mut(&mut self, task_id: Uuid) -> Option<&mut TaskExecution> {
278 self.task_executions.get_mut(&task_id)
279 }
280
281 pub fn transition_task(
283 &mut self,
284 task_id: Uuid,
285 new_state: TaskExecState,
286 ) -> Result<(), FlowError> {
287 let exec = self
288 .task_executions
289 .get_mut(&task_id)
290 .ok_or(FlowError::TaskNotFound(task_id))?;
291
292 exec.transition(new_state)?;
293 self.updated_at = Utc::now();
294 Ok(())
295 }
296
297 #[must_use]
299 pub fn is_terminal(&self) -> bool {
300 matches!(
301 self.state,
302 FlowState::Completed | FlowState::Merged | FlowState::Aborted
303 )
304 }
305
306 #[must_use]
308 pub fn tasks_in_state(&self, state: TaskExecState) -> Vec<Uuid> {
309 self.task_executions
310 .iter()
311 .filter(|(_, exec)| exec.state == state)
312 .map(|(id, _)| *id)
313 .collect()
314 }
315
316 #[must_use]
318 pub fn task_state_counts(&self) -> HashMap<TaskExecState, usize> {
319 let mut counts = HashMap::new();
320 for exec in self.task_executions.values() {
321 *counts.entry(exec.state).or_insert(0) += 1;
322 }
323 counts
324 }
325}
326
327#[derive(Debug, Clone, PartialEq, Eq)]
329pub enum FlowError {
330 InvalidTransition {
332 from: TaskExecState,
333 to: TaskExecState,
334 },
335 InvalidFlowTransition { from: FlowState, to: FlowState },
337 TaskNotFound(Uuid),
339 TasksNotComplete,
341}
342
343impl std::fmt::Display for FlowError {
344 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
345 match self {
346 Self::InvalidTransition { from, to } => {
347 write!(f, "Invalid transition from {from:?} to {to:?}")
348 }
349 Self::InvalidFlowTransition { from, to } => {
350 write!(f, "Invalid flow transition from {from:?} to {to:?}")
351 }
352 Self::TaskNotFound(id) => write!(f, "Task not found: {id}"),
353 Self::TasksNotComplete => write!(f, "Cannot complete flow with incomplete tasks"),
354 }
355 }
356}
357
358impl std::error::Error for FlowError {}
359
360#[cfg(test)]
361mod tests {
362 use super::*;
363
364 fn test_flow() -> TaskFlow {
365 let task_ids = vec![Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4()];
366 TaskFlow::new(Uuid::new_v4(), Uuid::new_v4(), &task_ids)
367 }
368
369 #[test]
370 fn create_flow() {
371 let flow = test_flow();
372 assert_eq!(flow.state, FlowState::Created);
373 assert_eq!(flow.task_executions.len(), 3);
374 }
375
376 #[test]
377 fn flow_lifecycle() {
378 let mut flow = test_flow();
379
380 assert!(flow.start().is_ok());
381 assert_eq!(flow.state, FlowState::Running);
382 assert!(flow.started_at.is_some());
383
384 assert!(flow.pause().is_ok());
385 assert_eq!(flow.state, FlowState::Paused);
386
387 assert!(flow.resume().is_ok());
388 assert_eq!(flow.state, FlowState::Running);
389
390 assert!(flow.abort().is_ok());
391 assert_eq!(flow.state, FlowState::Aborted);
392 assert!(flow.completed_at.is_some());
393 }
394
395 #[test]
396 fn cannot_start_twice() {
397 let mut flow = test_flow();
398 flow.start().unwrap();
399
400 let result = flow.start();
401 assert!(result.is_err());
402 }
403
404 #[test]
405 fn task_execution_transitions() {
406 let mut exec = TaskExecution::new(Uuid::new_v4());
407
408 assert_eq!(exec.state, TaskExecState::Pending);
409
410 exec.transition(TaskExecState::Ready).unwrap();
411 assert_eq!(exec.state, TaskExecState::Ready);
412
413 exec.transition(TaskExecState::Running).unwrap();
414 assert_eq!(exec.state, TaskExecState::Running);
415 assert_eq!(exec.attempt_count, 1);
416
417 exec.transition(TaskExecState::Verifying).unwrap();
418 assert_eq!(exec.state, TaskExecState::Verifying);
419
420 exec.transition(TaskExecState::Success).unwrap();
421 assert_eq!(exec.state, TaskExecState::Success);
422 }
423
424 #[test]
425 fn task_retry_cycle() {
426 let mut exec = TaskExecution::new(Uuid::new_v4());
427
428 exec.transition(TaskExecState::Ready).unwrap();
429 exec.transition(TaskExecState::Running).unwrap();
430 exec.transition(TaskExecState::Verifying).unwrap();
431 exec.transition(TaskExecState::Retry).unwrap();
432
433 assert_eq!(exec.attempt_count, 1);
434
435 exec.transition(TaskExecState::Running).unwrap();
436 assert_eq!(exec.attempt_count, 2);
437 }
438
439 #[test]
440 fn invalid_task_transition() {
441 let mut exec = TaskExecution::new(Uuid::new_v4());
442
443 let result = exec.transition(TaskExecState::Success);
445 assert!(result.is_err());
446 }
447
448 #[test]
449 fn flow_task_transition() {
450 let mut flow = test_flow();
451 let task_id = *flow.task_executions.keys().next().unwrap();
452
453 flow.transition_task(task_id, TaskExecState::Ready).unwrap();
454
455 let exec = flow.get_task_execution(task_id).unwrap();
456 assert_eq!(exec.state, TaskExecState::Ready);
457 }
458
459 #[test]
460 fn tasks_in_state() {
461 let mut flow = test_flow();
462 let task_ids: Vec<_> = flow.task_executions.keys().copied().collect();
463
464 flow.transition_task(task_ids[0], TaskExecState::Ready)
465 .unwrap();
466 flow.transition_task(task_ids[1], TaskExecState::Ready)
467 .unwrap();
468
469 let ready_tasks = flow.tasks_in_state(TaskExecState::Ready);
470 assert_eq!(ready_tasks.len(), 2);
471
472 let pending_tasks = flow.tasks_in_state(TaskExecState::Pending);
473 assert_eq!(pending_tasks.len(), 1);
474 }
475
476 #[test]
477 fn flow_completion_requires_terminal_tasks() {
478 let mut flow = test_flow();
479 flow.start().unwrap();
480
481 let result = flow.complete();
483 assert!(result.is_err());
484 }
485
486 #[test]
487 fn flow_serialization() {
488 let flow = test_flow();
489 let json = serde_json::to_string(&flow).unwrap();
490 let restored: TaskFlow = serde_json::from_str(&json).unwrap();
491
492 assert_eq!(flow.id, restored.id);
493 assert_eq!(flow.state, restored.state);
494 }
495}