Skip to main content

hivemind/core/
flow.rs

1//! `TaskFlow` - Runtime instance of a `TaskGraph`.
2//!
3//! A `TaskFlow` binds a `TaskGraph` to execution state. Multiple `TaskFlows`
4//! may exist for the same `TaskGraph`.
5
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, HashSet};
9use uuid::Uuid;
10
11/// `TaskFlow` lifecycle state.
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "lowercase")]
14pub enum FlowState {
15    /// Flow created but not started.
16    Created,
17    /// Flow is actively executing.
18    Running,
19    /// Flow is paused (can be resumed).
20    Paused,
21    /// Flow is frozen for merge preparation/execution.
22    FrozenForMerge,
23    /// Flow completed successfully.
24    Completed,
25    /// Flow was merged into the target branch.
26    Merged,
27    /// Flow was aborted.
28    Aborted,
29}
30
31/// Task execution state within a flow.
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
33#[serde(rename_all = "lowercase")]
34pub enum TaskExecState {
35    /// Task not yet eligible for execution.
36    Pending,
37    /// Task is ready (dependencies satisfied).
38    Ready,
39    /// Task is currently executing.
40    Running,
41    /// Task is being verified.
42    Verifying,
43    /// Task completed successfully.
44    Success,
45    /// Task is scheduled for retry.
46    Retry,
47    /// Task failed permanently.
48    Failed,
49    /// Task escalated to human.
50    Escalated,
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
54#[serde(rename_all = "lowercase")]
55pub enum RetryMode {
56    #[default]
57    Clean,
58    Continue,
59}
60
61#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
62#[serde(rename_all = "lowercase")]
63pub enum RunMode {
64    #[default]
65    Auto,
66    Manual,
67}
68
69/// Execution state for a single task within a flow.
70#[derive(Debug, Clone, Serialize, Deserialize)]
71pub struct TaskExecution {
72    /// Task ID (from `TaskGraph`).
73    pub task_id: Uuid,
74    /// Current execution state.
75    pub state: TaskExecState,
76    /// Number of attempts made.
77    pub attempt_count: u32,
78    #[serde(default)]
79    pub retry_mode: RetryMode,
80    #[serde(default)]
81    pub frozen_commit_sha: Option<String>,
82    #[serde(default)]
83    pub integrated_commit_sha: Option<String>,
84    /// Last state change timestamp.
85    pub updated_at: DateTime<Utc>,
86    /// Blocking reason (if blocked).
87    pub blocked_reason: Option<String>,
88}
89
90impl TaskExecution {
91    /// Creates a new task execution in Pending state.
92    #[must_use]
93    pub fn new(task_id: Uuid) -> Self {
94        Self {
95            task_id,
96            state: TaskExecState::Pending,
97            attempt_count: 0,
98            retry_mode: RetryMode::default(),
99            frozen_commit_sha: None,
100            integrated_commit_sha: None,
101            updated_at: Utc::now(),
102            blocked_reason: None,
103        }
104    }
105
106    /// Transitions to a new state.
107    pub fn transition(&mut self, new_state: TaskExecState) -> Result<(), FlowError> {
108        if !self.can_transition_to(new_state) {
109            return Err(FlowError::InvalidTransition {
110                from: self.state,
111                to: new_state,
112            });
113        }
114
115        self.state = new_state;
116        self.updated_at = Utc::now();
117
118        if new_state == TaskExecState::Running {
119            self.attempt_count += 1;
120        }
121
122        Ok(())
123    }
124
125    /// Checks if a transition is valid.
126    #[must_use]
127    pub fn can_transition_to(&self, new_state: TaskExecState) -> bool {
128        use TaskExecState::{
129            Escalated, Failed, Pending, Ready, Retry, Running, Success, Verifying,
130        };
131        matches!(
132            (self.state, new_state),
133            (Pending, Ready | Running)
134                | (Ready | Retry, Running)
135                | (Running, Verifying)
136                | (Verifying, Success | Retry | Failed)
137                | (Failed, Escalated)
138        )
139    }
140}
141
142/// A `TaskFlow` - runtime instance of a `TaskGraph`.
143#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct TaskFlow {
145    /// Unique flow ID.
146    pub id: Uuid,
147    /// Associated `TaskGraph` ID.
148    pub graph_id: Uuid,
149    /// Associated project ID.
150    pub project_id: Uuid,
151    #[serde(default)]
152    pub base_revision: Option<String>,
153    #[serde(default)]
154    pub run_mode: RunMode,
155    #[serde(default)]
156    pub depends_on_flows: HashSet<Uuid>,
157    /// Current flow state.
158    pub state: FlowState,
159    /// Task execution states.
160    pub task_executions: HashMap<Uuid, TaskExecution>,
161    /// Creation timestamp.
162    pub created_at: DateTime<Utc>,
163    /// Start timestamp.
164    pub started_at: Option<DateTime<Utc>>,
165    /// Completion timestamp.
166    pub completed_at: Option<DateTime<Utc>>,
167    /// Last update timestamp.
168    pub updated_at: DateTime<Utc>,
169}
170
171impl TaskFlow {
172    /// Creates a new `TaskFlow` from a graph.
173    #[must_use]
174    pub fn new(graph_id: Uuid, project_id: Uuid, task_ids: &[Uuid]) -> Self {
175        let now = Utc::now();
176        let mut task_executions = HashMap::new();
177
178        for &task_id in task_ids {
179            task_executions.insert(task_id, TaskExecution::new(task_id));
180        }
181
182        Self {
183            id: Uuid::new_v4(),
184            graph_id,
185            project_id,
186            base_revision: None,
187            run_mode: RunMode::Manual,
188            depends_on_flows: HashSet::new(),
189            state: FlowState::Created,
190            task_executions,
191            created_at: now,
192            started_at: None,
193            completed_at: None,
194            updated_at: now,
195        }
196    }
197
198    /// Starts the flow.
199    pub fn start(&mut self) -> Result<(), FlowError> {
200        if self.state != FlowState::Created {
201            return Err(FlowError::InvalidFlowTransition {
202                from: self.state,
203                to: FlowState::Running,
204            });
205        }
206
207        self.state = FlowState::Running;
208        self.started_at = Some(Utc::now());
209        self.updated_at = Utc::now();
210        Ok(())
211    }
212
213    /// Pauses the flow.
214    pub fn pause(&mut self) -> Result<(), FlowError> {
215        if self.state != FlowState::Running {
216            return Err(FlowError::InvalidFlowTransition {
217                from: self.state,
218                to: FlowState::Paused,
219            });
220        }
221
222        self.state = FlowState::Paused;
223        self.updated_at = Utc::now();
224        Ok(())
225    }
226
227    /// Resumes the flow.
228    pub fn resume(&mut self) -> Result<(), FlowError> {
229        if self.state != FlowState::Paused {
230            return Err(FlowError::InvalidFlowTransition {
231                from: self.state,
232                to: FlowState::Running,
233            });
234        }
235
236        self.state = FlowState::Running;
237        self.updated_at = Utc::now();
238        Ok(())
239    }
240
241    /// Aborts the flow.
242    pub fn abort(&mut self) -> Result<(), FlowError> {
243        if matches!(
244            self.state,
245            FlowState::Completed | FlowState::Merged | FlowState::Aborted
246        ) {
247            return Err(FlowError::InvalidFlowTransition {
248                from: self.state,
249                to: FlowState::Aborted,
250            });
251        }
252
253        self.state = FlowState::Aborted;
254        self.completed_at = Some(Utc::now());
255        self.updated_at = Utc::now();
256        Ok(())
257    }
258
259    /// Marks the flow as completed.
260    pub fn complete(&mut self) -> Result<(), FlowError> {
261        if self.state != FlowState::Running {
262            return Err(FlowError::InvalidFlowTransition {
263                from: self.state,
264                to: FlowState::Completed,
265            });
266        }
267
268        // Check all tasks are in terminal state
269        for exec in self.task_executions.values() {
270            if !matches!(
271                exec.state,
272                TaskExecState::Success | TaskExecState::Failed | TaskExecState::Escalated
273            ) {
274                return Err(FlowError::TasksNotComplete);
275            }
276        }
277
278        self.state = FlowState::Completed;
279        self.completed_at = Some(Utc::now());
280        self.updated_at = Utc::now();
281        Ok(())
282    }
283
284    /// Gets the execution state for a task.
285    #[must_use]
286    pub fn get_task_execution(&self, task_id: Uuid) -> Option<&TaskExecution> {
287        self.task_executions.get(&task_id)
288    }
289
290    /// Gets mutable execution state for a task.
291    pub fn get_task_execution_mut(&mut self, task_id: Uuid) -> Option<&mut TaskExecution> {
292        self.task_executions.get_mut(&task_id)
293    }
294
295    /// Transitions a task to a new state.
296    pub fn transition_task(
297        &mut self,
298        task_id: Uuid,
299        new_state: TaskExecState,
300    ) -> Result<(), FlowError> {
301        let exec = self
302            .task_executions
303            .get_mut(&task_id)
304            .ok_or(FlowError::TaskNotFound(task_id))?;
305
306        exec.transition(new_state)?;
307        self.updated_at = Utc::now();
308        Ok(())
309    }
310
311    /// Checks if the flow is in a terminal state.
312    #[must_use]
313    pub fn is_terminal(&self) -> bool {
314        matches!(
315            self.state,
316            FlowState::Completed | FlowState::Merged | FlowState::Aborted
317        )
318    }
319
320    /// Gets tasks in a particular state.
321    #[must_use]
322    pub fn tasks_in_state(&self, state: TaskExecState) -> Vec<Uuid> {
323        self.task_executions
324            .iter()
325            .filter(|(_, exec)| exec.state == state)
326            .map(|(id, _)| *id)
327            .collect()
328    }
329
330    /// Counts tasks by state.
331    #[must_use]
332    pub fn task_state_counts(&self) -> HashMap<TaskExecState, usize> {
333        let mut counts = HashMap::new();
334        for exec in self.task_executions.values() {
335            *counts.entry(exec.state).or_insert(0) += 1;
336        }
337        counts
338    }
339}
340
341/// Errors that can occur during flow operations.
342#[derive(Debug, Clone, PartialEq, Eq)]
343pub enum FlowError {
344    /// Invalid task state transition.
345    InvalidTransition {
346        from: TaskExecState,
347        to: TaskExecState,
348    },
349    /// Invalid flow state transition.
350    InvalidFlowTransition { from: FlowState, to: FlowState },
351    /// Task not found.
352    TaskNotFound(Uuid),
353    /// Cannot complete flow with incomplete tasks.
354    TasksNotComplete,
355}
356
357impl std::fmt::Display for FlowError {
358    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
359        match self {
360            Self::InvalidTransition { from, to } => {
361                write!(f, "Invalid transition from {from:?} to {to:?}")
362            }
363            Self::InvalidFlowTransition { from, to } => {
364                write!(f, "Invalid flow transition from {from:?} to {to:?}")
365            }
366            Self::TaskNotFound(id) => write!(f, "Task not found: {id}"),
367            Self::TasksNotComplete => write!(f, "Cannot complete flow with incomplete tasks"),
368        }
369    }
370}
371
372impl std::error::Error for FlowError {}
373
374#[cfg(test)]
375mod tests {
376    use super::*;
377
378    fn test_flow() -> TaskFlow {
379        let task_ids = vec![Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4()];
380        TaskFlow::new(Uuid::new_v4(), Uuid::new_v4(), &task_ids)
381    }
382
383    #[test]
384    fn create_flow() {
385        let flow = test_flow();
386        assert_eq!(flow.state, FlowState::Created);
387        assert_eq!(flow.task_executions.len(), 3);
388    }
389
390    #[test]
391    fn flow_lifecycle() {
392        let mut flow = test_flow();
393
394        assert!(flow.start().is_ok());
395        assert_eq!(flow.state, FlowState::Running);
396        assert!(flow.started_at.is_some());
397
398        assert!(flow.pause().is_ok());
399        assert_eq!(flow.state, FlowState::Paused);
400
401        assert!(flow.resume().is_ok());
402        assert_eq!(flow.state, FlowState::Running);
403
404        assert!(flow.abort().is_ok());
405        assert_eq!(flow.state, FlowState::Aborted);
406        assert!(flow.completed_at.is_some());
407    }
408
409    #[test]
410    fn cannot_start_twice() {
411        let mut flow = test_flow();
412        flow.start().unwrap();
413
414        let result = flow.start();
415        assert!(result.is_err());
416    }
417
418    #[test]
419    fn task_execution_transitions() {
420        let mut exec = TaskExecution::new(Uuid::new_v4());
421
422        assert_eq!(exec.state, TaskExecState::Pending);
423
424        exec.transition(TaskExecState::Ready).unwrap();
425        assert_eq!(exec.state, TaskExecState::Ready);
426
427        exec.transition(TaskExecState::Running).unwrap();
428        assert_eq!(exec.state, TaskExecState::Running);
429        assert_eq!(exec.attempt_count, 1);
430
431        exec.transition(TaskExecState::Verifying).unwrap();
432        assert_eq!(exec.state, TaskExecState::Verifying);
433
434        exec.transition(TaskExecState::Success).unwrap();
435        assert_eq!(exec.state, TaskExecState::Success);
436    }
437
438    #[test]
439    fn task_retry_cycle() {
440        let mut exec = TaskExecution::new(Uuid::new_v4());
441
442        exec.transition(TaskExecState::Ready).unwrap();
443        exec.transition(TaskExecState::Running).unwrap();
444        exec.transition(TaskExecState::Verifying).unwrap();
445        exec.transition(TaskExecState::Retry).unwrap();
446
447        assert_eq!(exec.attempt_count, 1);
448
449        exec.transition(TaskExecState::Running).unwrap();
450        assert_eq!(exec.attempt_count, 2);
451    }
452
453    #[test]
454    fn invalid_task_transition() {
455        let mut exec = TaskExecution::new(Uuid::new_v4());
456
457        // Cannot go directly from Pending to Success
458        let result = exec.transition(TaskExecState::Success);
459        assert!(result.is_err());
460    }
461
462    #[test]
463    fn flow_task_transition() {
464        let mut flow = test_flow();
465        let task_id = *flow.task_executions.keys().next().unwrap();
466
467        flow.transition_task(task_id, TaskExecState::Ready).unwrap();
468
469        let exec = flow.get_task_execution(task_id).unwrap();
470        assert_eq!(exec.state, TaskExecState::Ready);
471    }
472
473    #[test]
474    fn tasks_in_state() {
475        let mut flow = test_flow();
476        let task_ids: Vec<_> = flow.task_executions.keys().copied().collect();
477
478        flow.transition_task(task_ids[0], TaskExecState::Ready)
479            .unwrap();
480        flow.transition_task(task_ids[1], TaskExecState::Ready)
481            .unwrap();
482
483        let ready_tasks = flow.tasks_in_state(TaskExecState::Ready);
484        assert_eq!(ready_tasks.len(), 2);
485
486        let pending_tasks = flow.tasks_in_state(TaskExecState::Pending);
487        assert_eq!(pending_tasks.len(), 1);
488    }
489
490    #[test]
491    fn flow_completion_requires_terminal_tasks() {
492        let mut flow = test_flow();
493        flow.start().unwrap();
494
495        // Cannot complete with pending tasks
496        let result = flow.complete();
497        assert!(result.is_err());
498    }
499
500    #[test]
501    fn flow_serialization() {
502        let flow = test_flow();
503        let json = serde_json::to_string(&flow).unwrap();
504        let restored: TaskFlow = serde_json::from_str(&json).unwrap();
505
506        assert_eq!(flow.id, restored.id);
507        assert_eq!(flow.state, restored.state);
508    }
509}