1use crate::workflow::dag::{Workflow, WorkflowError};
9use crate::workflow::task::TaskId;
10use petgraph::algo::tarjan_scc;
11use std::collections::HashSet;
12use std::time::Duration;
13use thiserror::Error;
14
15#[derive(Error, Debug)]
17pub enum DeadlockError {
18 #[error("Dependency cycle detected: {0:?}")]
20 DependencyCycle(Vec<TaskId>),
21
22 #[error("Resource deadlock detected: {0}")]
24 ResourceDeadlock(String),
25
26 #[error("Potential deadlock: {0}")]
28 PotentialDeadlock(String),
29}
30
31impl From<DeadlockError> for WorkflowError {
32 fn from(err: DeadlockError) -> Self {
33 match err {
34 DeadlockError::DependencyCycle(cycle) => WorkflowError::CycleDetected(cycle),
35 DeadlockError::ResourceDeadlock(msg) => {
36 WorkflowError::Timeout(crate::workflow::timeout::TimeoutError::WorkflowTimeout {
37 timeout: Duration::from_secs(300), })
39 }
40 DeadlockError::PotentialDeadlock(_) => WorkflowError::CycleDetected(Vec::new()),
41 }
42 }
43}
44
45#[derive(Clone, Debug)]
47pub enum DeadlockWarningType {
48 SharedResource(String),
50 LongDependencyChain { length: usize },
52 NoTimeout,
54}
55
56#[derive(Clone, Debug)]
58pub struct DeadlockWarning {
59 pub task_id: TaskId,
61 pub warning_type: DeadlockWarningType,
63 pub suggestion: String,
65}
66
67impl DeadlockWarning {
68 fn new(task_id: TaskId, warning_type: DeadlockWarningType, suggestion: String) -> Self {
70 Self {
71 task_id,
72 warning_type,
73 suggestion,
74 }
75 }
76
77 pub fn description(&self) -> String {
79 match &self.warning_type {
80 DeadlockWarningType::SharedResource(resource) => {
81 format!("Task '{}' shares resource '{}': {}", self.task_id, resource, self.suggestion)
82 }
83 DeadlockWarningType::LongDependencyChain { length } => {
84 format!(
85 "Task '{}' has a long dependency chain ({} tasks): {}",
86 self.task_id, length, self.suggestion
87 )
88 }
89 DeadlockWarningType::NoTimeout => {
90 format!("Task '{}' has no timeout: {}", self.task_id, self.suggestion)
91 }
92 }
93 }
94}
95
96pub struct DeadlockDetector;
103
104impl DeadlockDetector {
105 pub fn new() -> Self {
107 Self
108 }
109
110 pub fn detect_dependency_cycles(&self, workflow: &Workflow) -> Result<(), DeadlockError> {
134 let sccs = tarjan_scc(&workflow.graph);
136
137 for scc in &sccs {
139 if scc.len() > 1 {
140 let cycle_tasks: Vec<TaskId> = scc
142 .iter()
143 .filter_map(|&idx| workflow.graph.node_weight(idx))
144 .map(|node| node.id().clone())
145 .collect();
146
147 if !cycle_tasks.is_empty() {
148 return Err(DeadlockError::DependencyCycle(cycle_tasks));
149 }
150 }
151 }
152
153 for scc in &sccs {
155 if scc.len() == 1 {
156 let idx = scc[0];
157 if workflow
159 .graph
160 .find_edge(idx, idx)
161 .is_some()
162 {
163 if let Some(node) = workflow.graph.node_weight(idx) {
164 return Err(DeadlockError::DependencyCycle(vec![node.id().clone()]));
165 }
166 }
167 }
168 }
169
170 Ok(())
171 }
172
173 pub fn detect_resource_deadlocks(
200 &self,
201 workflow: &Workflow,
202 ) -> Result<Vec<DeadlockWarning>, DeadlockError> {
203 let mut warnings = Vec::new();
204
205 let chain_warnings = self.detect_long_chains(workflow);
207 warnings.extend(chain_warnings);
208
209 Ok(warnings)
214 }
215
216 fn detect_long_chains(&self, workflow: &Workflow) -> Vec<DeadlockWarning> {
220 let mut warnings = Vec::new();
221
222 if let Ok(layers) = workflow.execution_layers() {
224 let max_layer = layers.len();
225
226 if max_layer > 5 {
228 for task_id in &layers[max_layer - 1] {
230 warnings.push(DeadlockWarning::new(
231 task_id.clone(),
232 DeadlockWarningType::LongDependencyChain { length: max_layer },
233 format!(
234 "Consider splitting this workflow or increasing deadlock_timeout (current depth: {})",
235 max_layer
236 ),
237 ));
238 }
239 }
240 }
241
242 warnings
243 }
244
245 fn find_cycle_nodes(&self, workflow: &Workflow) -> Vec<TaskId> {
249 let sccs = tarjan_scc(&workflow.graph);
250
251 sccs
252 .into_iter()
253 .filter(|scc| scc.len() > 1)
254 .flat_map(|scc| {
255 scc.into_iter()
256 .filter_map(|idx| workflow.graph.node_weight(idx))
257 .map(|node| node.id().clone())
258 })
259 .collect()
260 }
261
262 pub fn validate_workflow(
293 &self,
294 workflow: &Workflow,
295 ) -> Result<Vec<DeadlockWarning>, DeadlockError> {
296 self.detect_dependency_cycles(workflow)?;
298
299 let warnings = self.detect_resource_deadlocks(workflow)?;
301
302 Ok(warnings)
303 }
304}
305
306impl Default for DeadlockDetector {
307 fn default() -> Self {
308 Self::new()
309 }
310}
311
312#[cfg(test)]
313mod tests {
314 use super::*;
315 use crate::workflow::task::{TaskContext, TaskError, TaskResult, WorkflowTask};
316 use async_trait::async_trait;
317
318 struct MockTask {
320 id: TaskId,
321 name: String,
322 deps: Vec<TaskId>,
323 }
324
325 impl MockTask {
326 fn new(id: impl Into<TaskId>, name: &str) -> Self {
327 Self {
328 id: id.into(),
329 name: name.to_string(),
330 deps: Vec::new(),
331 }
332 }
333
334 fn with_dep(mut self, dep: impl Into<TaskId>) -> Self {
335 self.deps.push(dep.into());
336 self
337 }
338 }
339
340 #[async_trait]
341 impl WorkflowTask for MockTask {
342 async fn execute(&self, _context: &TaskContext) -> Result<TaskResult, TaskError> {
343 Ok(TaskResult::Success)
344 }
345
346 fn id(&self) -> TaskId {
347 self.id.clone()
348 }
349
350 fn name(&self) -> &str {
351 &self.name
352 }
353
354 fn dependencies(&self) -> Vec<TaskId> {
355 self.deps.clone()
356 }
357 }
358
359 #[test]
360 fn test_deadlock_detector_creation() {
361 let _detector = DeadlockDetector::new();
362 let _detector2 = DeadlockDetector::default();
363 }
364
365 #[test]
366 fn test_detect_cycle_simple() {
367 let mut workflow = Workflow::new();
368 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
369 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
370 workflow.add_task(Box::new(MockTask::new("c", "Task C")));
371
372 workflow.add_dependency("a", "b").unwrap();
374 workflow.add_dependency("b", "c").unwrap();
375
376 let a_idx = workflow.task_map.get(&TaskId::new("a")).copied().unwrap();
377 let c_idx = workflow.task_map.get(&TaskId::new("c")).copied().unwrap();
378 workflow.graph.add_edge(c_idx, a_idx, ()); let detector = DeadlockDetector::new();
381 let result = detector.detect_dependency_cycles(&workflow);
382
383 assert!(result.is_err());
384 match result {
385 Err(DeadlockError::DependencyCycle(cycle)) => {
386 assert!(!cycle.is_empty());
387 }
388 _ => panic!("Expected DependencyCycle error"),
389 }
390 }
391
392 #[test]
393 fn test_detect_cycle_none_diamond() {
394 let mut workflow = Workflow::new();
395
396 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
398 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
399 workflow.add_task(Box::new(MockTask::new("c", "Task C")));
400 workflow.add_task(Box::new(MockTask::new("d", "Task D")));
401
402 workflow.add_dependency("a", "b").unwrap();
403 workflow.add_dependency("a", "c").unwrap();
404 workflow.add_dependency("b", "d").unwrap();
405 workflow.add_dependency("c", "d").unwrap();
406
407 let detector = DeadlockDetector::new();
408 let result = detector.detect_dependency_cycles(&workflow);
409
410 assert!(result.is_ok());
411 }
412
413 #[test]
414 fn test_detect_cycle_complex() {
415 let mut workflow = Workflow::new();
416 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
417 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
418 workflow.add_task(Box::new(MockTask::new("c", "Task C")));
419 workflow.add_task(Box::new(MockTask::new("d", "Task D")));
420
421 workflow.add_dependency("a", "b").unwrap();
423 workflow.add_dependency("b", "c").unwrap();
424
425 let b_idx = workflow.task_map.get(&TaskId::new("b")).copied().unwrap();
426 let c_idx = workflow.task_map.get(&TaskId::new("c")).copied().unwrap();
427 let d_idx = workflow.task_map.get(&TaskId::new("d")).copied().unwrap();
428 workflow.graph.add_edge(c_idx, d_idx, ());
429 workflow.graph.add_edge(d_idx, b_idx, ()); let detector = DeadlockDetector::new();
432 let result = detector.detect_dependency_cycles(&workflow);
433
434 assert!(result.is_err());
435 match result {
436 Err(DeadlockError::DependencyCycle(cycle)) => {
437 assert!(!cycle.is_empty());
438 let cycle_ids: HashSet<_> = cycle.iter().collect();
440 assert!(cycle_ids.contains(&TaskId::new("b")));
441 assert!(cycle_ids.contains(&TaskId::new("c")));
442 assert!(cycle_ids.contains(&TaskId::new("d")));
443 }
444 _ => panic!("Expected DependencyCycle error"),
445 }
446 }
447
448 #[test]
449 fn test_detect_self_loop() {
450 let mut workflow = Workflow::new();
451 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
452
453 let a_idx = workflow.task_map.get(&TaskId::new("a")).copied().unwrap();
455 workflow.graph.add_edge(a_idx, a_idx, ());
456
457 let detector = DeadlockDetector::new();
458 let result = detector.detect_dependency_cycles(&workflow);
459
460 assert!(result.is_err());
461 match result {
462 Err(DeadlockError::DependencyCycle(cycle)) => {
463 assert_eq!(cycle, vec![TaskId::new("a")]);
464 }
465 _ => panic!("Expected DependencyCycle error"),
466 }
467 }
468
469 #[test]
470 fn test_detect_long_chain_warning() {
471 let mut workflow = Workflow::new();
472
473 for i in 0..7 {
475 workflow.add_task(Box::new(MockTask::new(format!("task-{}", i), &format!("Task {}", i))));
476 }
477
478 for i in 0..6 {
479 workflow
480 .add_dependency(format!("task-{}", i), format!("task-{}", i + 1))
481 .unwrap();
482 }
483
484 let detector = DeadlockDetector::new();
485 let warnings = detector.detect_resource_deadlocks(&workflow).unwrap();
486
487 assert!(!warnings.is_empty());
489 assert!(warnings.iter().any(|w| matches!(
490 w.warning_type,
491 DeadlockWarningType::LongDependencyChain { length: 7 }
492 )));
493 }
494
495 #[test]
496 fn test_validate_workflow_no_issues() {
497 let mut workflow = Workflow::new();
498 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
499 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
500 workflow.add_task(Box::new(MockTask::new("c", "Task C")));
501
502 workflow.add_dependency("a", "b").unwrap();
503
504 let detector = DeadlockDetector::new();
505 let result = detector.validate_workflow(&workflow);
506
507 assert!(result.is_ok());
508 let warnings = result.unwrap();
509 assert!(warnings.is_empty());
511 }
512
513 #[test]
514 fn test_validate_workflow_with_cycle() {
515 let mut workflow = Workflow::new();
516 workflow.add_task(Box::new(MockTask::new("a", "Task A")));
517 workflow.add_task(Box::new(MockTask::new("b", "Task B")));
518 workflow.add_task(Box::new(MockTask::new("c", "Task C")));
519
520 workflow.add_dependency("a", "b").unwrap();
521 workflow.add_dependency("b", "c").unwrap();
522
523 let a_idx = workflow.task_map.get(&TaskId::new("a")).copied().unwrap();
524 let c_idx = workflow.task_map.get(&TaskId::new("c")).copied().unwrap();
525 workflow.graph.add_edge(c_idx, a_idx, ());
526
527 let detector = DeadlockDetector::new();
528 let result = detector.validate_workflow(&workflow);
529
530 assert!(result.is_err());
531 }
532
533 #[test]
534 fn test_warning_description() {
535 let warning = DeadlockWarning::new(
536 TaskId::new("task-1"),
537 DeadlockWarningType::LongDependencyChain { length: 10 },
538 "Consider splitting the workflow".to_string(),
539 );
540
541 let desc = warning.description();
542 assert!(desc.contains("task-1"));
543 assert!(desc.contains("10"));
544 assert!(desc.contains("splitting"));
545 }
546
547 #[test]
548 fn test_no_warning_for_short_chain() {
549 let mut workflow = Workflow::new();
550
551 for i in 0..3 {
553 workflow.add_task(Box::new(MockTask::new(format!("task-{}", i), &format!("Task {}", i))));
554 }
555
556 for i in 0..2 {
557 workflow
558 .add_dependency(format!("task-{}", i), format!("task-{}", i + 1))
559 .unwrap();
560 }
561
562 let detector = DeadlockDetector::new();
563 let warnings = detector.detect_resource_deadlocks(&workflow).unwrap();
564
565 assert!(warnings.is_empty());
566 }
567
568 #[test]
569 fn test_warning_boundary_at_depth_6() {
570 let mut workflow = Workflow::new();
571
572 for i in 0..6 {
574 workflow.add_task(Box::new(MockTask::new(format!("task-{}", i), &format!("Task {}", i))));
575 }
576
577 for i in 0..5 {
578 workflow
579 .add_dependency(format!("task-{}", i), format!("task-{}", i + 1))
580 .unwrap();
581 }
582
583 let detector = DeadlockDetector::new();
584 let warnings = detector.detect_resource_deadlocks(&workflow).unwrap();
585
586 assert!(!warnings.is_empty());
588 }
589}