Skip to main content

forge_agent/workflow/
deadlock.rs

1//! Deadlock detection and prevention for workflow execution.
2//!
3//! This module provides deadlock detection for parallel workflow execution:
4//! - Dependency cycle detection (before execution)
5//! - Resource deadlock analysis (heuristic-based warnings)
6//! - Timeout-based abort (runtime deadlock prevention)
7
8use crate::workflow::dag::{Workflow, WorkflowError};
9use crate::workflow::task::TaskId;
10use petgraph::algo::tarjan_scc;
11use std::collections::HashSet;
12use std::time::Duration;
13use thiserror::Error;
14
15/// Error types for deadlock detection.
16#[derive(Error, Debug)]
17pub enum DeadlockError {
18    /// Dependency cycle detected in workflow
19    #[error("Dependency cycle detected: {0:?}")]
20    DependencyCycle(Vec<TaskId>),
21
22    /// Resource deadlock detected at runtime
23    #[error("Resource deadlock detected: {0}")]
24    ResourceDeadlock(String),
25
26    /// Potential deadlock warning (heuristic-based)
27    #[error("Potential deadlock: {0}")]
28    PotentialDeadlock(String),
29}
30
31impl From<DeadlockError> for WorkflowError {
32    fn from(err: DeadlockError) -> Self {
33        match err {
34            DeadlockError::DependencyCycle(cycle) => WorkflowError::CycleDetected(cycle),
35            DeadlockError::ResourceDeadlock(msg) => {
36                WorkflowError::Timeout(crate::workflow::timeout::TimeoutError::WorkflowTimeout {
37                    timeout: Duration::from_secs(300), // Default 5 minute timeout
38                })
39            }
40            DeadlockError::PotentialDeadlock(_) => WorkflowError::CycleDetected(Vec::new()),
41        }
42    }
43}
44
45/// Warning type for potential deadlock conditions.
46#[derive(Clone, Debug)]
47pub enum DeadlockWarningType {
48    /// Tasks share the same resource (potential contention)
49    SharedResource(String),
50    /// Long chain of dependent tasks (risk of timeout)
51    LongDependencyChain { length: usize },
52    /// Task has no timeout configured (risk of hanging)
53    NoTimeout,
54}
55
56/// A deadlock warning with context and suggestions.
57#[derive(Clone, Debug)]
58pub struct DeadlockWarning {
59    /// Task ID that triggered the warning
60    pub task_id: TaskId,
61    /// Type of warning
62    pub warning_type: DeadlockWarningType,
63    /// Human-readable suggestion
64    pub suggestion: String,
65}
66
67impl DeadlockWarning {
68    /// Creates a new deadlock warning.
69    fn new(task_id: TaskId, warning_type: DeadlockWarningType, suggestion: String) -> Self {
70        Self {
71            task_id,
72            warning_type,
73            suggestion,
74        }
75    }
76
77    /// Returns a human-readable description of the warning.
78    pub fn description(&self) -> String {
79        match &self.warning_type {
80            DeadlockWarningType::SharedResource(resource) => {
81                format!("Task '{}' shares resource '{}': {}", self.task_id, resource, self.suggestion)
82            }
83            DeadlockWarningType::LongDependencyChain { length } => {
84                format!(
85                    "Task '{}' has a long dependency chain ({} tasks): {}",
86                    self.task_id, length, self.suggestion
87                )
88            }
89            DeadlockWarningType::NoTimeout => {
90                format!("Task '{}' has no timeout: {}", self.task_id, self.suggestion)
91            }
92        }
93    }
94}
95
96/// Deadlock detector for workflow analysis.
97///
98/// Provides static analysis of workflow structure to detect:
99/// - Dependency cycles (hard error - prevents execution)
100/// - Resource deadlock patterns (warning - execution continues)
101/// - Long dependency chains (warning - execution continues)
102pub struct DeadlockDetector;
103
104impl DeadlockDetector {
105    /// Creates a new deadlock detector.
106    pub fn new() -> Self {
107        Self
108    }
109
110    /// Detects dependency cycles in the workflow DAG.
111    ///
112    /// Uses Tarjan's strongly connected components algorithm to find cycles.
113    /// A cycle indicates tasks that directly or indirectly depend on each other,
114    /// making execution impossible.
115    ///
116    /// # Arguments
117    ///
118    /// * `workflow` - The workflow to analyze
119    ///
120    /// # Returns
121    ///
122    /// - `Ok(())` if no cycles detected
123    /// - `Err(DeadlockError::DependencyCycle)` with cycle path if cycle found
124    ///
125    /// # Example
126    ///
127    /// ```ignore
128    /// let detector = DeadlockDetector::new();
129    /// if let Err(e) = detector.detect_dependency_cycles(&workflow) {
130    ///     println!("Cycle detected: {:?}", e);
131    /// }
132    /// ```
133    pub fn detect_dependency_cycles(&self, workflow: &Workflow) -> Result<(), DeadlockError> {
134        // Use tarjan_scc to find strongly connected components
135        let sccs = tarjan_scc(&workflow.graph);
136
137        // Find SCCs with more than one node (these are cycles)
138        for scc in &sccs {
139            if scc.len() > 1 {
140                // Extract task IDs from the cycle
141                let cycle_tasks: Vec<TaskId> = scc
142                    .iter()
143                    .filter_map(|&idx| workflow.graph.node_weight(idx))
144                    .map(|node| node.id().clone())
145                    .collect();
146
147                if !cycle_tasks.is_empty() {
148                    return Err(DeadlockError::DependencyCycle(cycle_tasks));
149                }
150            }
151        }
152
153        // Also check for self-loops (single-node SCCs with edges to themselves)
154        for scc in &sccs {
155            if scc.len() == 1 {
156                let idx = scc[0];
157                // Check if this node has an edge to itself
158                if workflow
159                    .graph
160                    .find_edge(idx, idx)
161                    .is_some()
162                {
163                    if let Some(node) = workflow.graph.node_weight(idx) {
164                        return Err(DeadlockError::DependencyCycle(vec![node.id().clone()]));
165                    }
166                }
167            }
168        }
169
170        Ok(())
171    }
172
173    /// Analyzes workflow for potential resource deadlocks.
174    ///
175    /// This is a heuristic analysis that generates warnings for:
176    /// 1. Tasks that might share resources (no direct access in our model)
177    /// 2. Long chains of dependent tasks (timeout risk)
178    /// 3. Tasks with no timeout (hang risk)
179    ///
180    /// Note: This function does NOT prevent execution - warnings are informational.
181    ///
182    /// # Arguments
183    ///
184    /// * `workflow` - The workflow to analyze
185    ///
186    /// # Returns
187    ///
188    /// Vector of deadlock warnings (may be empty)
189    ///
190    /// # Example
191    ///
192    /// ```ignore
193    /// let detector = DeadlockDetector::new();
194    /// let warnings = detector.detect_resource_deadlocks(&workflow)?;
195    /// for warning in warnings {
196    ///     println!("Warning: {}", warning.description());
197    /// }
198    /// ```
199    pub fn detect_resource_deadlocks(
200        &self,
201        workflow: &Workflow,
202    ) -> Result<Vec<DeadlockWarning>, DeadlockError> {
203        let mut warnings = Vec::new();
204
205        // Check for long dependency chains
206        let chain_warnings = self.detect_long_chains(workflow);
207        warnings.extend(chain_warnings);
208
209        // Note: We can't detect resource sharing or missing timeouts
210        // from the Workflow structure alone - that would require task metadata
211        // which we don't have access to in the DAG.
212
213        Ok(warnings)
214    }
215
216    /// Detects long dependency chains that might timeout.
217    ///
218    /// Finds tasks with the longest distance from any root.
219    fn detect_long_chains(&self, workflow: &Workflow) -> Vec<DeadlockWarning> {
220        let mut warnings = Vec::new();
221
222        // Get execution layers to find the deepest tasks
223        if let Ok(layers) = workflow.execution_layers() {
224            let max_layer = layers.len();
225
226            // Tasks in the deepest layer have the longest chain
227            if max_layer > 5 {
228                // Warn about very deep chains
229                for task_id in &layers[max_layer - 1] {
230                    warnings.push(DeadlockWarning::new(
231                        task_id.clone(),
232                        DeadlockWarningType::LongDependencyChain { length: max_layer },
233                        format!(
234                            "Consider splitting this workflow or increasing deadlock_timeout (current depth: {})",
235                            max_layer
236                        ),
237                    ));
238                }
239            }
240        }
241
242        warnings
243    }
244
245    /// Finds all nodes involved in a cycle.
246    ///
247    /// Used for detailed error reporting when a cycle is detected.
248    fn find_cycle_nodes(&self, workflow: &Workflow) -> Vec<TaskId> {
249        let sccs = tarjan_scc(&workflow.graph);
250
251        sccs
252            .into_iter()
253            .filter(|scc| scc.len() > 1)
254            .flat_map(|scc| {
255                scc.into_iter()
256                    .filter_map(|idx| workflow.graph.node_weight(idx))
257                    .map(|node| node.id().clone())
258            })
259            .collect()
260    }
261
262    /// Validates that a workflow is deadlock-free.
263    ///
264    /// This is a convenience method that combines cycle detection
265    /// and resource deadlock analysis.
266    ///
267    /// # Arguments
268    ///
269    /// * `workflow` - The workflow to validate
270    ///
271    /// # Returns
272    ///
273    /// - `Ok(Vec<DeadlockWarning>)` - Warnings (may be empty) if no hard errors
274    /// - `Err(DeadlockError::DependencyCycle)` - If cycle detected
275    ///
276    /// # Example
277    ///
278    /// ```ignore
279    /// let detector = DeadlockDetector::new();
280    /// match detector.validate_workflow(&workflow) {
281    ///     Ok(warnings) => {
282    ///         for warning in warnings {
283    ///             println!("Warning: {}", warning.description());
284    ///         }
285    ///         // Execute workflow
286    ///     }
287    ///     Err(e) => {
288    ///         eprintln!("Cannot execute: {}", e);
289    ///     }
290    /// }
291    /// ```
292    pub fn validate_workflow(
293        &self,
294        workflow: &Workflow,
295    ) -> Result<Vec<DeadlockWarning>, DeadlockError> {
296        // Check for dependency cycles (hard error)
297        self.detect_dependency_cycles(workflow)?;
298
299        // Analyze for potential issues (warnings only)
300        let warnings = self.detect_resource_deadlocks(workflow)?;
301
302        Ok(warnings)
303    }
304}
305
306impl Default for DeadlockDetector {
307    fn default() -> Self {
308        Self::new()
309    }
310}
311
312#[cfg(test)]
313mod tests {
314    use super::*;
315    use crate::workflow::task::{TaskContext, TaskError, TaskResult, WorkflowTask};
316    use async_trait::async_trait;
317
318    // Mock task for testing
319    struct MockTask {
320        id: TaskId,
321        name: String,
322        deps: Vec<TaskId>,
323    }
324
325    impl MockTask {
326        fn new(id: impl Into<TaskId>, name: &str) -> Self {
327            Self {
328                id: id.into(),
329                name: name.to_string(),
330                deps: Vec::new(),
331            }
332        }
333
334        fn with_dep(mut self, dep: impl Into<TaskId>) -> Self {
335            self.deps.push(dep.into());
336            self
337        }
338    }
339
340    #[async_trait]
341    impl WorkflowTask for MockTask {
342        async fn execute(&self, _context: &TaskContext) -> Result<TaskResult, TaskError> {
343            Ok(TaskResult::Success)
344        }
345
346        fn id(&self) -> TaskId {
347            self.id.clone()
348        }
349
350        fn name(&self) -> &str {
351            &self.name
352        }
353
354        fn dependencies(&self) -> Vec<TaskId> {
355            self.deps.clone()
356        }
357    }
358
359    #[test]
360    fn test_deadlock_detector_creation() {
361        let _detector = DeadlockDetector::new();
362        let _detector2 = DeadlockDetector::default();
363    }
364
365    #[test]
366    fn test_detect_cycle_simple() {
367        let mut workflow = Workflow::new();
368        workflow.add_task(Box::new(MockTask::new("a", "Task A")));
369        workflow.add_task(Box::new(MockTask::new("b", "Task B")));
370        workflow.add_task(Box::new(MockTask::new("c", "Task C")));
371
372        // Create a -> b -> c -> a cycle
373        workflow.add_dependency("a", "b").unwrap();
374        workflow.add_dependency("b", "c").unwrap();
375
376        let a_idx = workflow.task_map.get(&TaskId::new("a")).copied().unwrap();
377        let c_idx = workflow.task_map.get(&TaskId::new("c")).copied().unwrap();
378        workflow.graph.add_edge(c_idx, a_idx, ()); // Creates the cycle
379
380        let detector = DeadlockDetector::new();
381        let result = detector.detect_dependency_cycles(&workflow);
382
383        assert!(result.is_err());
384        match result {
385            Err(DeadlockError::DependencyCycle(cycle)) => {
386                assert!(!cycle.is_empty());
387            }
388            _ => panic!("Expected DependencyCycle error"),
389        }
390    }
391
392    #[test]
393    fn test_detect_cycle_none_diamond() {
394        let mut workflow = Workflow::new();
395
396        // Diamond pattern: a -> [b, c] -> d
397        workflow.add_task(Box::new(MockTask::new("a", "Task A")));
398        workflow.add_task(Box::new(MockTask::new("b", "Task B")));
399        workflow.add_task(Box::new(MockTask::new("c", "Task C")));
400        workflow.add_task(Box::new(MockTask::new("d", "Task D")));
401
402        workflow.add_dependency("a", "b").unwrap();
403        workflow.add_dependency("a", "c").unwrap();
404        workflow.add_dependency("b", "d").unwrap();
405        workflow.add_dependency("c", "d").unwrap();
406
407        let detector = DeadlockDetector::new();
408        let result = detector.detect_dependency_cycles(&workflow);
409
410        assert!(result.is_ok());
411    }
412
413    #[test]
414    fn test_detect_cycle_complex() {
415        let mut workflow = Workflow::new();
416        workflow.add_task(Box::new(MockTask::new("a", "Task A")));
417        workflow.add_task(Box::new(MockTask::new("b", "Task B")));
418        workflow.add_task(Box::new(MockTask::new("c", "Task C")));
419        workflow.add_task(Box::new(MockTask::new("d", "Task D")));
420
421        // Create a -> b -> c -> d -> b cycle (b is part of the cycle)
422        workflow.add_dependency("a", "b").unwrap();
423        workflow.add_dependency("b", "c").unwrap();
424
425        let b_idx = workflow.task_map.get(&TaskId::new("b")).copied().unwrap();
426        let c_idx = workflow.task_map.get(&TaskId::new("c")).copied().unwrap();
427        let d_idx = workflow.task_map.get(&TaskId::new("d")).copied().unwrap();
428        workflow.graph.add_edge(c_idx, d_idx, ());
429        workflow.graph.add_edge(d_idx, b_idx, ()); // Creates the cycle
430
431        let detector = DeadlockDetector::new();
432        let result = detector.detect_dependency_cycles(&workflow);
433
434        assert!(result.is_err());
435        match result {
436            Err(DeadlockError::DependencyCycle(cycle)) => {
437                assert!(!cycle.is_empty());
438                // The cycle should involve b, c, d
439                let cycle_ids: HashSet<_> = cycle.iter().collect();
440                assert!(cycle_ids.contains(&TaskId::new("b")));
441                assert!(cycle_ids.contains(&TaskId::new("c")));
442                assert!(cycle_ids.contains(&TaskId::new("d")));
443            }
444            _ => panic!("Expected DependencyCycle error"),
445        }
446    }
447
448    #[test]
449    fn test_detect_self_loop() {
450        let mut workflow = Workflow::new();
451        workflow.add_task(Box::new(MockTask::new("a", "Task A")));
452
453        // Add self-loop
454        let a_idx = workflow.task_map.get(&TaskId::new("a")).copied().unwrap();
455        workflow.graph.add_edge(a_idx, a_idx, ());
456
457        let detector = DeadlockDetector::new();
458        let result = detector.detect_dependency_cycles(&workflow);
459
460        assert!(result.is_err());
461        match result {
462            Err(DeadlockError::DependencyCycle(cycle)) => {
463                assert_eq!(cycle, vec![TaskId::new("a")]);
464            }
465            _ => panic!("Expected DependencyCycle error"),
466        }
467    }
468
469    #[test]
470    fn test_detect_long_chain_warning() {
471        let mut workflow = Workflow::new();
472
473        // Create a chain of 7 tasks: 0 -> 1 -> 2 -> 3 -> 4 -> 5 -> 6
474        for i in 0..7 {
475            workflow.add_task(Box::new(MockTask::new(format!("task-{}", i), &format!("Task {}", i))));
476        }
477
478        for i in 0..6 {
479            workflow
480                .add_dependency(format!("task-{}", i), format!("task-{}", i + 1))
481                .unwrap();
482        }
483
484        let detector = DeadlockDetector::new();
485        let warnings = detector.detect_resource_deadlocks(&workflow).unwrap();
486
487        // Should warn about the last task being too deep
488        assert!(!warnings.is_empty());
489        assert!(warnings.iter().any(|w| matches!(
490            w.warning_type,
491            DeadlockWarningType::LongDependencyChain { length: 7 }
492        )));
493    }
494
495    #[test]
496    fn test_validate_workflow_no_issues() {
497        let mut workflow = Workflow::new();
498        workflow.add_task(Box::new(MockTask::new("a", "Task A")));
499        workflow.add_task(Box::new(MockTask::new("b", "Task B")));
500        workflow.add_task(Box::new(MockTask::new("c", "Task C")));
501
502        workflow.add_dependency("a", "b").unwrap();
503
504        let detector = DeadlockDetector::new();
505        let result = detector.validate_workflow(&workflow);
506
507        assert!(result.is_ok());
508        let warnings = result.unwrap();
509        // Should be empty or have only minor warnings
510        assert!(warnings.is_empty());
511    }
512
513    #[test]
514    fn test_validate_workflow_with_cycle() {
515        let mut workflow = Workflow::new();
516        workflow.add_task(Box::new(MockTask::new("a", "Task A")));
517        workflow.add_task(Box::new(MockTask::new("b", "Task B")));
518        workflow.add_task(Box::new(MockTask::new("c", "Task C")));
519
520        workflow.add_dependency("a", "b").unwrap();
521        workflow.add_dependency("b", "c").unwrap();
522
523        let a_idx = workflow.task_map.get(&TaskId::new("a")).copied().unwrap();
524        let c_idx = workflow.task_map.get(&TaskId::new("c")).copied().unwrap();
525        workflow.graph.add_edge(c_idx, a_idx, ());
526
527        let detector = DeadlockDetector::new();
528        let result = detector.validate_workflow(&workflow);
529
530        assert!(result.is_err());
531    }
532
533    #[test]
534    fn test_warning_description() {
535        let warning = DeadlockWarning::new(
536            TaskId::new("task-1"),
537            DeadlockWarningType::LongDependencyChain { length: 10 },
538            "Consider splitting the workflow".to_string(),
539        );
540
541        let desc = warning.description();
542        assert!(desc.contains("task-1"));
543        assert!(desc.contains("10"));
544        assert!(desc.contains("splitting"));
545    }
546
547    #[test]
548    fn test_no_warning_for_short_chain() {
549        let mut workflow = Workflow::new();
550
551        // Create a chain of 3 tasks (short enough, no warning)
552        for i in 0..3 {
553            workflow.add_task(Box::new(MockTask::new(format!("task-{}", i), &format!("Task {}", i))));
554        }
555
556        for i in 0..2 {
557            workflow
558                .add_dependency(format!("task-{}", i), format!("task-{}", i + 1))
559                .unwrap();
560        }
561
562        let detector = DeadlockDetector::new();
563        let warnings = detector.detect_resource_deadlocks(&workflow).unwrap();
564
565        assert!(warnings.is_empty());
566    }
567
568    #[test]
569    fn test_warning_boundary_at_depth_6() {
570        let mut workflow = Workflow::new();
571
572        // Create a chain of exactly 6 tasks (boundary case)
573        for i in 0..6 {
574            workflow.add_task(Box::new(MockTask::new(format!("task-{}", i), &format!("Task {}", i))));
575        }
576
577        for i in 0..5 {
578            workflow
579                .add_dependency(format!("task-{}", i), format!("task-{}", i + 1))
580                .unwrap();
581        }
582
583        let detector = DeadlockDetector::new();
584        let warnings = detector.detect_resource_deadlocks(&workflow).unwrap();
585
586        // Should warn about depth >= 5
587        assert!(!warnings.is_empty());
588    }
589}