1use crate::task::{TaskId, TaskInfo, TaskState};
7use parking_lot::RwLock;
8use std::collections::{HashMap, HashSet, VecDeque};
9use std::fmt;
10use std::io::Write;
11use std::path::Path;
12use std::sync::Arc;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
16pub enum RelationshipType {
17 Spawned,
19 ChannelSend,
21 ChannelReceive,
23 SharedResource,
25 DataFlow,
27 AwaitsOn,
29 Dependency,
31}
32
33impl fmt::Display for RelationshipType {
34 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
35 match self {
36 Self::Spawned => write!(f, "spawned"),
37 Self::ChannelSend => write!(f, "sends →"),
38 Self::ChannelReceive => write!(f, "← receives"),
39 Self::SharedResource => write!(f, "shares resource"),
40 Self::DataFlow => write!(f, "data →"),
41 Self::AwaitsOn => write!(f, "awaits"),
42 Self::Dependency => write!(f, "depends on"),
43 }
44 }
45}
46
47#[derive(Debug, Clone)]
49pub struct Relationship {
50 pub from: TaskId,
52 pub to: TaskId,
54 pub relationship_type: RelationshipType,
56 pub resource_name: Option<String>,
58 pub data_description: Option<String>,
60}
61
62#[derive(Debug, Clone)]
64pub struct TaskGraph {
65 relationships: Vec<Relationship>,
67 tasks: HashMap<TaskId, TaskInfo>,
69 adjacency: HashMap<TaskId, Vec<(TaskId, RelationshipType)>>,
71 reverse_adjacency: HashMap<TaskId, Vec<(TaskId, RelationshipType)>>,
73}
74
75impl TaskGraph {
76 #[must_use]
78 pub fn new() -> Self {
79 Self {
80 relationships: Vec::new(),
81 tasks: HashMap::new(),
82 adjacency: HashMap::new(),
83 reverse_adjacency: HashMap::new(),
84 }
85 }
86
87 pub fn add_task(&mut self, task: TaskInfo) {
89 self.tasks.insert(task.id, task);
90 }
91
92 pub fn add_relationship(&mut self, relationship: Relationship) {
94 self.adjacency
96 .entry(relationship.from)
97 .or_default()
98 .push((relationship.to, relationship.relationship_type));
99
100 self.reverse_adjacency
101 .entry(relationship.to)
102 .or_default()
103 .push((relationship.from, relationship.relationship_type));
104
105 self.relationships.push(relationship);
106 }
107
108 #[must_use]
110 pub fn get_relationships_by_type(&self, rel_type: RelationshipType) -> Vec<&Relationship> {
111 self.relationships
112 .iter()
113 .filter(|r| r.relationship_type == rel_type)
114 .collect()
115 }
116
117 #[must_use]
119 pub fn get_related_tasks(&self, task_id: TaskId) -> Vec<(TaskId, RelationshipType)> {
120 self.adjacency.get(&task_id).cloned().unwrap_or_default()
121 }
122
123 #[must_use]
125 pub fn get_dependent_tasks(&self, task_id: TaskId) -> Vec<(TaskId, RelationshipType)> {
126 self.reverse_adjacency
127 .get(&task_id)
128 .cloned()
129 .unwrap_or_default()
130 }
131
132 #[must_use]
134 pub fn get_task(&self, task_id: &TaskId) -> Option<&TaskInfo> {
135 self.tasks.get(task_id)
136 }
137
138 #[must_use]
140 pub fn find_critical_path(&self) -> Vec<TaskId> {
141 let mut longest_path = Vec::new();
142 let mut visited = HashSet::new();
143
144 for task_id in self.tasks.keys() {
145 let path = self.find_longest_path(*task_id, &mut visited);
146 if path.len() > longest_path.len() {
147 longest_path = path;
148 }
149 }
150
151 longest_path
152 }
153
154 fn find_longest_path(&self, task_id: TaskId, visited: &mut HashSet<TaskId>) -> Vec<TaskId> {
156 if visited.contains(&task_id) {
157 return vec![];
158 }
159
160 visited.insert(task_id);
161 let mut longest = vec![task_id];
162
163 if let Some(related) = self.adjacency.get(&task_id) {
164 for (next_id, rel_type) in related {
165 if matches!(
167 rel_type,
168 RelationshipType::Dependency
169 | RelationshipType::DataFlow
170 | RelationshipType::AwaitsOn
171 ) {
172 let mut path = self.find_longest_path(*next_id, visited);
173 if path.len() + 1 > longest.len() {
174 path.insert(0, task_id);
175 longest = path;
176 }
177 }
178 }
179 }
180
181 visited.remove(&task_id);
182 longest
183 }
184
185 #[must_use]
187 pub fn find_transitive_dependencies(&self, task_id: TaskId) -> HashSet<TaskId> {
188 let mut dependencies = HashSet::new();
189 let mut queue = VecDeque::new();
190 queue.push_back(task_id);
191
192 while let Some(current) = queue.pop_front() {
193 if let Some(related) = self.adjacency.get(¤t) {
194 for (next_id, rel_type) in related {
195 if matches!(
196 rel_type,
197 RelationshipType::Dependency | RelationshipType::AwaitsOn
198 ) && dependencies.insert(*next_id)
199 {
200 queue.push_back(*next_id);
201 }
202 }
203 }
204 }
205
206 dependencies
207 }
208
209 #[must_use]
211 pub fn find_tasks_sharing_resource(&self, resource_name: &str) -> Vec<TaskId> {
212 let mut tasks = HashSet::new();
213
214 for rel in &self.relationships {
215 if rel.relationship_type == RelationshipType::SharedResource {
216 if let Some(ref name) = rel.resource_name {
217 if name == resource_name {
218 tasks.insert(rel.from);
219 tasks.insert(rel.to);
220 }
221 }
222 }
223 }
224
225 tasks.into_iter().collect()
226 }
227
228 #[must_use]
230 pub fn find_channel_pairs(&self) -> Vec<(TaskId, TaskId)> {
231 let mut pairs = Vec::new();
232
233 for send in self.get_relationships_by_type(RelationshipType::ChannelSend) {
234 for recv in self.get_relationships_by_type(RelationshipType::ChannelReceive) {
235 if send.resource_name == recv.resource_name && send.to == recv.from {
237 pairs.push((send.from, recv.to));
238 }
239 }
240 }
241
242 pairs
243 }
244
245 #[must_use]
247 pub fn detect_potential_deadlocks(&self) -> Vec<Vec<TaskId>> {
248 let mut deadlock_cycles = Vec::new();
249 let mut visited = HashSet::new();
250 let mut rec_stack = HashSet::new();
251
252 for task_id in self.tasks.keys() {
253 if !visited.contains(task_id) {
254 if let Some(cycle) = self.find_cycle(*task_id, &mut visited, &mut rec_stack) {
255 deadlock_cycles.push(cycle);
256 }
257 }
258 }
259
260 deadlock_cycles
261 }
262
263 fn find_cycle(
265 &self,
266 task_id: TaskId,
267 visited: &mut HashSet<TaskId>,
268 rec_stack: &mut HashSet<TaskId>,
269 ) -> Option<Vec<TaskId>> {
270 visited.insert(task_id);
271 rec_stack.insert(task_id);
272
273 if let Some(related) = self.adjacency.get(&task_id) {
274 for (next_id, rel_type) in related {
275 if matches!(
277 rel_type,
278 RelationshipType::SharedResource | RelationshipType::AwaitsOn
279 ) {
280 if !visited.contains(next_id) {
281 if let Some(cycle) = self.find_cycle(*next_id, visited, rec_stack) {
282 return Some(cycle);
283 }
284 } else if rec_stack.contains(next_id) {
285 return Some(vec![task_id, *next_id]);
287 }
288 }
289 }
290 }
291
292 rec_stack.remove(&task_id);
293 None
294 }
295
296 #[must_use]
298 pub fn to_dot(&self) -> String {
299 let mut dot = String::from("digraph TaskGraph {\n");
300 dot.push_str(" rankdir=LR;\n");
301 dot.push_str(" node [shape=box, style=rounded];\n\n");
302
303 for (task_id, task) in &self.tasks {
305 let color = match task.state {
306 TaskState::Pending => "lightgray",
307 TaskState::Running => "lightblue",
308 TaskState::Blocked { .. } => "yellow",
309 TaskState::Completed => "lightgreen",
310 TaskState::Failed => "lightcoral",
311 };
312
313 dot.push_str(&format!(
314 " t{} [label=\"{}\n{:?}\", fillcolor={}, style=\"filled,rounded\"];\n",
315 task_id.as_u64(),
316 task.name,
317 task.state,
318 color
319 ));
320 }
321
322 dot.push('\n');
323
324 for rel in &self.relationships {
326 let (style, color, label) = match rel.relationship_type {
327 RelationshipType::Spawned => ("solid", "black", "spawned"),
328 RelationshipType::ChannelSend => ("dashed", "blue", "→ channel"),
329 RelationshipType::ChannelReceive => ("dashed", "blue", "channel →"),
330 RelationshipType::SharedResource => ("dotted", "red", "shares"),
331 RelationshipType::DataFlow => ("bold", "green", "data →"),
332 RelationshipType::AwaitsOn => ("solid", "purple", "awaits"),
333 RelationshipType::Dependency => ("solid", "orange", "depends"),
334 };
335
336 let mut edge_label = label.to_string();
337 if let Some(ref resource) = rel.resource_name {
338 edge_label = format!("{label}\n{resource}");
339 }
340
341 dot.push_str(&format!(
342 " t{} -> t{} [label=\"{}\", style={}, color={}];\n",
343 rel.from.as_u64(),
344 rel.to.as_u64(),
345 edge_label,
346 style,
347 color
348 ));
349 }
350
351 let critical_path = self.find_critical_path();
353 if critical_path.len() > 1 {
354 dot.push_str("\n // Critical path\n");
355 for window in critical_path.windows(2) {
356 dot.push_str(&format!(
357 " t{} -> t{} [color=red, penwidth=3.0, constraint=false];\n",
358 window[0].as_u64(),
359 window[1].as_u64()
360 ));
361 }
362 }
363
364 dot.push_str("}\n");
365 dot
366 }
367
368 #[must_use]
370 pub fn to_text(&self) -> String {
371 let mut output = String::new();
372 output.push_str("Task Relationship Graph\n");
373 output.push_str("=======================\n\n");
374
375 for rel_type in &[
377 RelationshipType::Spawned,
378 RelationshipType::ChannelSend,
379 RelationshipType::ChannelReceive,
380 RelationshipType::SharedResource,
381 RelationshipType::DataFlow,
382 RelationshipType::AwaitsOn,
383 RelationshipType::Dependency,
384 ] {
385 let rels = self.get_relationships_by_type(*rel_type);
386 if !rels.is_empty() {
387 output.push_str(&format!("\n{rel_type} Relationships:\n"));
388 for rel in rels {
389 let from_name = self.tasks.get(&rel.from).map_or("?", |t| t.name.as_str());
390 let to_name = self.tasks.get(&rel.to).map_or("?", |t| t.name.as_str());
391
392 output.push_str(&format!(" {from_name} {rel_type} {to_name}"));
393
394 if let Some(ref resource) = rel.resource_name {
395 output.push_str(&format!(" ({resource})"));
396 }
397 output.push('\n');
398 }
399 }
400 }
401
402 let critical_path = self.find_critical_path();
404 if !critical_path.is_empty() {
405 output.push_str("\nCritical Path:\n");
406 for task_id in &critical_path {
407 if let Some(task) = self.tasks.get(task_id) {
408 output.push_str(&format!(" → {} ({:?})\n", task.name, task.state));
409 }
410 }
411 }
412
413 let mut resources: HashMap<String, Vec<TaskId>> = HashMap::new();
415 for rel in &self.relationships {
416 if rel.relationship_type == RelationshipType::SharedResource {
417 if let Some(ref name) = rel.resource_name {
418 resources.entry(name.clone()).or_default().push(rel.from);
419 resources.entry(name.clone()).or_default().push(rel.to);
420 }
421 }
422 }
423
424 if !resources.is_empty() {
425 output.push_str("\nShared Resources:\n");
426 for (resource, task_ids) in resources {
427 let unique_tasks: HashSet<_> = task_ids.into_iter().collect();
428 output.push_str(&format!(
429 " {} (accessed by {} tasks):\n",
430 resource,
431 unique_tasks.len()
432 ));
433 for task_id in unique_tasks {
434 if let Some(task) = self.tasks.get(&task_id) {
435 output.push_str(&format!(" - {}\n", task.name));
436 }
437 }
438 }
439 }
440
441 output
442 }
443
444 pub fn export_dot<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
459 let mut file = std::fs::File::create(path)?;
460 file.write_all(self.to_dot().as_bytes())?;
461 Ok(())
462 }
463
464 pub fn export_json<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
470 let json = self.to_json();
471 let mut file = std::fs::File::create(path)?;
472 file.write_all(json.as_bytes())?;
473 Ok(())
474 }
475
476 #[must_use]
478 pub fn to_json(&self) -> String {
479 let mut json = String::from("{\n");
480
481 json.push_str(" \"nodes\": [\n");
483 let nodes: Vec<_> = self
484 .tasks
485 .iter()
486 .map(|(id, task)| {
487 format!(
488 " {{\"id\": {}, \"name\": \"{}\", \"state\": \"{}\"}}",
489 id.as_u64(),
490 task.name.replace('"', "\\\""),
491 format!("{:?}", task.state).replace('"', "\\\"")
492 )
493 })
494 .collect();
495 json.push_str(&nodes.join(",\n"));
496 json.push_str("\n ],\n");
497
498 json.push_str(" \"edges\": [\n");
500 let edges: Vec<_> = self
501 .relationships
502 .iter()
503 .map(|rel| {
504 let mut edge = format!(
505 " {{\"from\": {}, \"to\": {}, \"type\": \"{:?}\"",
506 rel.from.as_u64(),
507 rel.to.as_u64(),
508 rel.relationship_type
509 );
510 if let Some(ref resource) = rel.resource_name {
511 edge.push_str(&format!(
512 ", \"resource\": \"{}\"",
513 resource.replace('"', "\\\"")
514 ));
515 }
516 edge.push('}');
517 edge
518 })
519 .collect();
520 json.push_str(&edges.join(",\n"));
521 json.push_str("\n ],\n");
522
523 json.push_str(" \"stats\": {\n");
525 json.push_str(&format!(" \"total_tasks\": {},\n", self.tasks.len()));
526 json.push_str(&format!(
527 " \"total_relationships\": {},\n",
528 self.relationships.len()
529 ));
530
531 let critical_path = self.find_critical_path();
532 json.push_str(&format!(
533 " \"critical_path_length\": {}\n",
534 critical_path.len()
535 ));
536 json.push_str(" }\n");
537
538 json.push_str("}\n");
539 json
540 }
541
542 #[must_use]
547 pub fn to_mermaid(&self) -> String {
548 let mut mermaid = String::from("flowchart LR\n");
549
550 for (task_id, task) in &self.tasks {
552 let id = format!("t{}", task_id.as_u64());
553 let label = task.name.replace('"', "'");
554
555 let node_def = match task.state {
556 TaskState::Pending => format!(" {id}[{label}]:::pending\n"),
557 TaskState::Running => format!(" {id}([{label}]):::running\n"),
558 TaskState::Blocked { .. } => format!(" {id}{{{{{label}}}}}:::blocked\n"),
559 TaskState::Completed => format!(" {id}[/{label}/]:::completed\n"),
560 TaskState::Failed => format!(" {id}[({label})]:::failed\n"),
561 };
562 mermaid.push_str(&node_def);
563 }
564
565 mermaid.push('\n');
566
567 for rel in &self.relationships {
569 let from = format!("t{}", rel.from.as_u64());
570 let to = format!("t{}", rel.to.as_u64());
571
572 let arrow = match rel.relationship_type {
573 RelationshipType::Spawned => "-->",
574 RelationshipType::ChannelSend | RelationshipType::ChannelReceive => "-.->",
575 RelationshipType::SharedResource => "o--o",
576 RelationshipType::DataFlow => "==>",
577 RelationshipType::AwaitsOn => "-->",
578 RelationshipType::Dependency => "-->",
579 };
580
581 let label = match (&rel.relationship_type, &rel.resource_name) {
582 (_, Some(resource)) => format!("|{resource}|"),
583 (RelationshipType::Spawned, None) => "|spawned|".to_string(),
584 (RelationshipType::ChannelSend, None) => "|send|".to_string(),
585 (RelationshipType::ChannelReceive, None) => "|recv|".to_string(),
586 (RelationshipType::SharedResource, None) => "|shares|".to_string(),
587 (RelationshipType::DataFlow, None) => "|data|".to_string(),
588 (RelationshipType::AwaitsOn, None) => "|awaits|".to_string(),
589 (RelationshipType::Dependency, None) => "|depends|".to_string(),
590 };
591
592 mermaid.push_str(&format!(" {from} {arrow}{label} {to}\n"));
593 }
594
595 mermaid.push_str("\n classDef pending fill:#f9f9f9,stroke:#999\n");
597 mermaid.push_str(" classDef running fill:#bbdefb,stroke:#1976d2\n");
598 mermaid.push_str(" classDef blocked fill:#fff9c4,stroke:#fbc02d\n");
599 mermaid.push_str(" classDef completed fill:#c8e6c9,stroke:#388e3c\n");
600 mermaid.push_str(" classDef failed fill:#ffcdd2,stroke:#d32f2f\n");
601
602 mermaid
603 }
604
605 pub fn export_mermaid<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
611 let mut file = std::fs::File::create(path)?;
612 file.write_all(self.to_mermaid().as_bytes())?;
613 Ok(())
614 }
615
616 #[must_use]
618 pub fn stats(&self) -> GraphStats {
619 let mut stats = GraphStats::default();
620 stats.total_tasks = self.tasks.len();
621 stats.total_relationships = self.relationships.len();
622
623 for task in self.tasks.values() {
624 match task.state {
625 TaskState::Pending => stats.pending_tasks += 1,
626 TaskState::Running => stats.running_tasks += 1,
627 TaskState::Blocked { .. } => stats.blocked_tasks += 1,
628 TaskState::Completed => stats.completed_tasks += 1,
629 TaskState::Failed => stats.failed_tasks += 1,
630 }
631 }
632
633 for rel in &self.relationships {
634 match rel.relationship_type {
635 RelationshipType::Spawned => stats.spawn_relationships += 1,
636 RelationshipType::ChannelSend | RelationshipType::ChannelReceive => {
637 stats.channel_relationships += 1;
638 }
639 RelationshipType::SharedResource => stats.resource_relationships += 1,
640 RelationshipType::DataFlow => stats.dataflow_relationships += 1,
641 RelationshipType::AwaitsOn | RelationshipType::Dependency => {
642 stats.dependency_relationships += 1;
643 }
644 }
645 }
646
647 stats.critical_path_length = self.find_critical_path().len();
648 stats
649 }
650}
651
652#[derive(Debug, Clone, Default)]
654pub struct GraphStats {
655 pub total_tasks: usize,
657 pub total_relationships: usize,
659 pub pending_tasks: usize,
661 pub running_tasks: usize,
663 pub blocked_tasks: usize,
665 pub completed_tasks: usize,
667 pub failed_tasks: usize,
669 pub spawn_relationships: usize,
671 pub channel_relationships: usize,
673 pub resource_relationships: usize,
675 pub dataflow_relationships: usize,
677 pub dependency_relationships: usize,
679 pub critical_path_length: usize,
681}
682
683impl Default for TaskGraph {
684 fn default() -> Self {
685 Self::new()
686 }
687}
688
689static GRAPH: once_cell::sync::Lazy<Arc<RwLock<TaskGraph>>> =
691 once_cell::sync::Lazy::new(|| Arc::new(RwLock::new(TaskGraph::new())));
692
693#[must_use]
695pub fn global_graph() -> Arc<RwLock<TaskGraph>> {
696 Arc::clone(&GRAPH)
697}
698
699#[cfg(test)]
700mod tests {
701 use super::*;
702 use crate::task::TaskId;
703 use std::time::{Duration, Instant};
704
705 #[test]
706 fn test_critical_path() {
707 let mut graph = TaskGraph::new();
708
709 let t1 = TaskId::from_u64(1);
710 let t2 = TaskId::from_u64(2);
711 let t3 = TaskId::from_u64(3);
712
713 use crate::task::{TaskInfo, TaskState};
715 let now = Instant::now();
716 graph.add_task(TaskInfo {
717 id: t1,
718 name: "task1".to_string(),
719 state: TaskState::Running,
720 created_at: now,
721 last_updated: now,
722 parent: None,
723 location: None,
724 poll_count: 0,
725 total_run_time: Duration::ZERO,
726 });
727 graph.add_task(TaskInfo {
728 id: t2,
729 name: "task2".to_string(),
730 state: TaskState::Running,
731 created_at: now,
732 last_updated: now,
733 parent: None,
734 location: None,
735 poll_count: 0,
736 total_run_time: Duration::ZERO,
737 });
738 graph.add_task(TaskInfo {
739 id: t3,
740 name: "task3".to_string(),
741 state: TaskState::Running,
742 created_at: now,
743 last_updated: now,
744 parent: None,
745 location: None,
746 poll_count: 0,
747 total_run_time: Duration::ZERO,
748 });
749
750 graph.add_relationship(Relationship {
751 from: t1,
752 to: t2,
753 relationship_type: RelationshipType::Dependency,
754 resource_name: None,
755 data_description: None,
756 });
757
758 graph.add_relationship(Relationship {
759 from: t2,
760 to: t3,
761 relationship_type: RelationshipType::Dependency,
762 resource_name: None,
763 data_description: None,
764 });
765
766 let path = graph.find_critical_path();
767 assert!(path.contains(&t1));
768 assert!(path.contains(&t2));
769 assert!(path.contains(&t3));
770 }
771
772 #[test]
773 fn test_shared_resources() {
774 let mut graph = TaskGraph::new();
775
776 let t1 = TaskId::from_u64(1);
777 let t2 = TaskId::from_u64(2);
778
779 graph.add_relationship(Relationship {
780 from: t1,
781 to: t2,
782 relationship_type: RelationshipType::SharedResource,
783 resource_name: Some("mutex_1".to_string()),
784 data_description: None,
785 });
786
787 let tasks = graph.find_tasks_sharing_resource("mutex_1");
788 assert_eq!(tasks.len(), 2);
789 }
790}