Skip to main content

hivemind/core/
flow.rs

1//! `TaskFlow` - Runtime instance of a `TaskGraph`.
2//!
3//! A `TaskFlow` binds a `TaskGraph` to execution state. Multiple `TaskFlows`
4//! may exist for the same `TaskGraph`.
5
6use chrono::{DateTime, Utc};
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use uuid::Uuid;
10
11/// `TaskFlow` lifecycle state.
12#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
13#[serde(rename_all = "lowercase")]
14pub enum FlowState {
15    /// Flow created but not started.
16    Created,
17    /// Flow is actively executing.
18    Running,
19    /// Flow is paused (can be resumed).
20    Paused,
21    /// Flow is frozen for merge preparation/execution.
22    FrozenForMerge,
23    /// Flow completed successfully.
24    Completed,
25    /// Flow was merged into the target branch.
26    Merged,
27    /// Flow was aborted.
28    Aborted,
29}
30
31/// Task execution state within a flow.
32#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
33#[serde(rename_all = "lowercase")]
34pub enum TaskExecState {
35    /// Task not yet eligible for execution.
36    Pending,
37    /// Task is ready (dependencies satisfied).
38    Ready,
39    /// Task is currently executing.
40    Running,
41    /// Task is being verified.
42    Verifying,
43    /// Task completed successfully.
44    Success,
45    /// Task is scheduled for retry.
46    Retry,
47    /// Task failed permanently.
48    Failed,
49    /// Task escalated to human.
50    Escalated,
51}
52
53#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, Default)]
54#[serde(rename_all = "lowercase")]
55pub enum RetryMode {
56    #[default]
57    Clean,
58    Continue,
59}
60
61/// Execution state for a single task within a flow.
62#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct TaskExecution {
64    /// Task ID (from `TaskGraph`).
65    pub task_id: Uuid,
66    /// Current execution state.
67    pub state: TaskExecState,
68    /// Number of attempts made.
69    pub attempt_count: u32,
70    #[serde(default)]
71    pub retry_mode: RetryMode,
72    #[serde(default)]
73    pub frozen_commit_sha: Option<String>,
74    #[serde(default)]
75    pub integrated_commit_sha: Option<String>,
76    /// Last state change timestamp.
77    pub updated_at: DateTime<Utc>,
78    /// Blocking reason (if blocked).
79    pub blocked_reason: Option<String>,
80}
81
82impl TaskExecution {
83    /// Creates a new task execution in Pending state.
84    #[must_use]
85    pub fn new(task_id: Uuid) -> Self {
86        Self {
87            task_id,
88            state: TaskExecState::Pending,
89            attempt_count: 0,
90            retry_mode: RetryMode::default(),
91            frozen_commit_sha: None,
92            integrated_commit_sha: None,
93            updated_at: Utc::now(),
94            blocked_reason: None,
95        }
96    }
97
98    /// Transitions to a new state.
99    pub fn transition(&mut self, new_state: TaskExecState) -> Result<(), FlowError> {
100        if !self.can_transition_to(new_state) {
101            return Err(FlowError::InvalidTransition {
102                from: self.state,
103                to: new_state,
104            });
105        }
106
107        self.state = new_state;
108        self.updated_at = Utc::now();
109
110        if new_state == TaskExecState::Running {
111            self.attempt_count += 1;
112        }
113
114        Ok(())
115    }
116
117    /// Checks if a transition is valid.
118    #[must_use]
119    pub fn can_transition_to(&self, new_state: TaskExecState) -> bool {
120        use TaskExecState::{
121            Escalated, Failed, Pending, Ready, Retry, Running, Success, Verifying,
122        };
123        matches!(
124            (self.state, new_state),
125            (Pending, Ready | Running)
126                | (Ready | Retry, Running)
127                | (Running, Verifying)
128                | (Verifying, Success | Retry | Failed)
129                | (Failed, Escalated)
130        )
131    }
132}
133
134/// A `TaskFlow` - runtime instance of a `TaskGraph`.
135#[derive(Debug, Clone, Serialize, Deserialize)]
136pub struct TaskFlow {
137    /// Unique flow ID.
138    pub id: Uuid,
139    /// Associated `TaskGraph` ID.
140    pub graph_id: Uuid,
141    /// Associated project ID.
142    pub project_id: Uuid,
143    #[serde(default)]
144    pub base_revision: Option<String>,
145    /// Current flow state.
146    pub state: FlowState,
147    /// Task execution states.
148    pub task_executions: HashMap<Uuid, TaskExecution>,
149    /// Creation timestamp.
150    pub created_at: DateTime<Utc>,
151    /// Start timestamp.
152    pub started_at: Option<DateTime<Utc>>,
153    /// Completion timestamp.
154    pub completed_at: Option<DateTime<Utc>>,
155    /// Last update timestamp.
156    pub updated_at: DateTime<Utc>,
157}
158
159impl TaskFlow {
160    /// Creates a new `TaskFlow` from a graph.
161    #[must_use]
162    pub fn new(graph_id: Uuid, project_id: Uuid, task_ids: &[Uuid]) -> Self {
163        let now = Utc::now();
164        let mut task_executions = HashMap::new();
165
166        for &task_id in task_ids {
167            task_executions.insert(task_id, TaskExecution::new(task_id));
168        }
169
170        Self {
171            id: Uuid::new_v4(),
172            graph_id,
173            project_id,
174            base_revision: None,
175            state: FlowState::Created,
176            task_executions,
177            created_at: now,
178            started_at: None,
179            completed_at: None,
180            updated_at: now,
181        }
182    }
183
184    /// Starts the flow.
185    pub fn start(&mut self) -> Result<(), FlowError> {
186        if self.state != FlowState::Created {
187            return Err(FlowError::InvalidFlowTransition {
188                from: self.state,
189                to: FlowState::Running,
190            });
191        }
192
193        self.state = FlowState::Running;
194        self.started_at = Some(Utc::now());
195        self.updated_at = Utc::now();
196        Ok(())
197    }
198
199    /// Pauses the flow.
200    pub fn pause(&mut self) -> Result<(), FlowError> {
201        if self.state != FlowState::Running {
202            return Err(FlowError::InvalidFlowTransition {
203                from: self.state,
204                to: FlowState::Paused,
205            });
206        }
207
208        self.state = FlowState::Paused;
209        self.updated_at = Utc::now();
210        Ok(())
211    }
212
213    /// Resumes the flow.
214    pub fn resume(&mut self) -> Result<(), FlowError> {
215        if self.state != FlowState::Paused {
216            return Err(FlowError::InvalidFlowTransition {
217                from: self.state,
218                to: FlowState::Running,
219            });
220        }
221
222        self.state = FlowState::Running;
223        self.updated_at = Utc::now();
224        Ok(())
225    }
226
227    /// Aborts the flow.
228    pub fn abort(&mut self) -> Result<(), FlowError> {
229        if matches!(
230            self.state,
231            FlowState::Completed | FlowState::Merged | FlowState::Aborted
232        ) {
233            return Err(FlowError::InvalidFlowTransition {
234                from: self.state,
235                to: FlowState::Aborted,
236            });
237        }
238
239        self.state = FlowState::Aborted;
240        self.completed_at = Some(Utc::now());
241        self.updated_at = Utc::now();
242        Ok(())
243    }
244
245    /// Marks the flow as completed.
246    pub fn complete(&mut self) -> Result<(), FlowError> {
247        if self.state != FlowState::Running {
248            return Err(FlowError::InvalidFlowTransition {
249                from: self.state,
250                to: FlowState::Completed,
251            });
252        }
253
254        // Check all tasks are in terminal state
255        for exec in self.task_executions.values() {
256            if !matches!(
257                exec.state,
258                TaskExecState::Success | TaskExecState::Failed | TaskExecState::Escalated
259            ) {
260                return Err(FlowError::TasksNotComplete);
261            }
262        }
263
264        self.state = FlowState::Completed;
265        self.completed_at = Some(Utc::now());
266        self.updated_at = Utc::now();
267        Ok(())
268    }
269
270    /// Gets the execution state for a task.
271    #[must_use]
272    pub fn get_task_execution(&self, task_id: Uuid) -> Option<&TaskExecution> {
273        self.task_executions.get(&task_id)
274    }
275
276    /// Gets mutable execution state for a task.
277    pub fn get_task_execution_mut(&mut self, task_id: Uuid) -> Option<&mut TaskExecution> {
278        self.task_executions.get_mut(&task_id)
279    }
280
281    /// Transitions a task to a new state.
282    pub fn transition_task(
283        &mut self,
284        task_id: Uuid,
285        new_state: TaskExecState,
286    ) -> Result<(), FlowError> {
287        let exec = self
288            .task_executions
289            .get_mut(&task_id)
290            .ok_or(FlowError::TaskNotFound(task_id))?;
291
292        exec.transition(new_state)?;
293        self.updated_at = Utc::now();
294        Ok(())
295    }
296
297    /// Checks if the flow is in a terminal state.
298    #[must_use]
299    pub fn is_terminal(&self) -> bool {
300        matches!(
301            self.state,
302            FlowState::Completed | FlowState::Merged | FlowState::Aborted
303        )
304    }
305
306    /// Gets tasks in a particular state.
307    #[must_use]
308    pub fn tasks_in_state(&self, state: TaskExecState) -> Vec<Uuid> {
309        self.task_executions
310            .iter()
311            .filter(|(_, exec)| exec.state == state)
312            .map(|(id, _)| *id)
313            .collect()
314    }
315
316    /// Counts tasks by state.
317    #[must_use]
318    pub fn task_state_counts(&self) -> HashMap<TaskExecState, usize> {
319        let mut counts = HashMap::new();
320        for exec in self.task_executions.values() {
321            *counts.entry(exec.state).or_insert(0) += 1;
322        }
323        counts
324    }
325}
326
327/// Errors that can occur during flow operations.
328#[derive(Debug, Clone, PartialEq, Eq)]
329pub enum FlowError {
330    /// Invalid task state transition.
331    InvalidTransition {
332        from: TaskExecState,
333        to: TaskExecState,
334    },
335    /// Invalid flow state transition.
336    InvalidFlowTransition { from: FlowState, to: FlowState },
337    /// Task not found.
338    TaskNotFound(Uuid),
339    /// Cannot complete flow with incomplete tasks.
340    TasksNotComplete,
341}
342
343impl std::fmt::Display for FlowError {
344    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
345        match self {
346            Self::InvalidTransition { from, to } => {
347                write!(f, "Invalid transition from {from:?} to {to:?}")
348            }
349            Self::InvalidFlowTransition { from, to } => {
350                write!(f, "Invalid flow transition from {from:?} to {to:?}")
351            }
352            Self::TaskNotFound(id) => write!(f, "Task not found: {id}"),
353            Self::TasksNotComplete => write!(f, "Cannot complete flow with incomplete tasks"),
354        }
355    }
356}
357
358impl std::error::Error for FlowError {}
359
360#[cfg(test)]
361mod tests {
362    use super::*;
363
364    fn test_flow() -> TaskFlow {
365        let task_ids = vec![Uuid::new_v4(), Uuid::new_v4(), Uuid::new_v4()];
366        TaskFlow::new(Uuid::new_v4(), Uuid::new_v4(), &task_ids)
367    }
368
369    #[test]
370    fn create_flow() {
371        let flow = test_flow();
372        assert_eq!(flow.state, FlowState::Created);
373        assert_eq!(flow.task_executions.len(), 3);
374    }
375
376    #[test]
377    fn flow_lifecycle() {
378        let mut flow = test_flow();
379
380        assert!(flow.start().is_ok());
381        assert_eq!(flow.state, FlowState::Running);
382        assert!(flow.started_at.is_some());
383
384        assert!(flow.pause().is_ok());
385        assert_eq!(flow.state, FlowState::Paused);
386
387        assert!(flow.resume().is_ok());
388        assert_eq!(flow.state, FlowState::Running);
389
390        assert!(flow.abort().is_ok());
391        assert_eq!(flow.state, FlowState::Aborted);
392        assert!(flow.completed_at.is_some());
393    }
394
395    #[test]
396    fn cannot_start_twice() {
397        let mut flow = test_flow();
398        flow.start().unwrap();
399
400        let result = flow.start();
401        assert!(result.is_err());
402    }
403
404    #[test]
405    fn task_execution_transitions() {
406        let mut exec = TaskExecution::new(Uuid::new_v4());
407
408        assert_eq!(exec.state, TaskExecState::Pending);
409
410        exec.transition(TaskExecState::Ready).unwrap();
411        assert_eq!(exec.state, TaskExecState::Ready);
412
413        exec.transition(TaskExecState::Running).unwrap();
414        assert_eq!(exec.state, TaskExecState::Running);
415        assert_eq!(exec.attempt_count, 1);
416
417        exec.transition(TaskExecState::Verifying).unwrap();
418        assert_eq!(exec.state, TaskExecState::Verifying);
419
420        exec.transition(TaskExecState::Success).unwrap();
421        assert_eq!(exec.state, TaskExecState::Success);
422    }
423
424    #[test]
425    fn task_retry_cycle() {
426        let mut exec = TaskExecution::new(Uuid::new_v4());
427
428        exec.transition(TaskExecState::Ready).unwrap();
429        exec.transition(TaskExecState::Running).unwrap();
430        exec.transition(TaskExecState::Verifying).unwrap();
431        exec.transition(TaskExecState::Retry).unwrap();
432
433        assert_eq!(exec.attempt_count, 1);
434
435        exec.transition(TaskExecState::Running).unwrap();
436        assert_eq!(exec.attempt_count, 2);
437    }
438
439    #[test]
440    fn invalid_task_transition() {
441        let mut exec = TaskExecution::new(Uuid::new_v4());
442
443        // Cannot go directly from Pending to Success
444        let result = exec.transition(TaskExecState::Success);
445        assert!(result.is_err());
446    }
447
448    #[test]
449    fn flow_task_transition() {
450        let mut flow = test_flow();
451        let task_id = *flow.task_executions.keys().next().unwrap();
452
453        flow.transition_task(task_id, TaskExecState::Ready).unwrap();
454
455        let exec = flow.get_task_execution(task_id).unwrap();
456        assert_eq!(exec.state, TaskExecState::Ready);
457    }
458
459    #[test]
460    fn tasks_in_state() {
461        let mut flow = test_flow();
462        let task_ids: Vec<_> = flow.task_executions.keys().copied().collect();
463
464        flow.transition_task(task_ids[0], TaskExecState::Ready)
465            .unwrap();
466        flow.transition_task(task_ids[1], TaskExecState::Ready)
467            .unwrap();
468
469        let ready_tasks = flow.tasks_in_state(TaskExecState::Ready);
470        assert_eq!(ready_tasks.len(), 2);
471
472        let pending_tasks = flow.tasks_in_state(TaskExecState::Pending);
473        assert_eq!(pending_tasks.len(), 1);
474    }
475
476    #[test]
477    fn flow_completion_requires_terminal_tasks() {
478        let mut flow = test_flow();
479        flow.start().unwrap();
480
481        // Cannot complete with pending tasks
482        let result = flow.complete();
483        assert!(result.is_err());
484    }
485
486    #[test]
487    fn flow_serialization() {
488        let flow = test_flow();
489        let json = serde_json::to_string(&flow).unwrap();
490        let restored: TaskFlow = serde_json::from_str(&json).unwrap();
491
492        assert_eq!(flow.id, restored.id);
493        assert_eq!(flow.state, restored.state);
494    }
495}