use crate::task::{TaskId, TaskInfo, TaskState};
use parking_lot::RwLock;
use std::collections::{HashMap, HashSet, VecDeque};
use std::fmt;
use std::io::Write;
use std::path::Path;
use std::sync::Arc;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum RelationshipType {
Spawned,
ChannelSend,
ChannelReceive,
SharedResource,
DataFlow,
AwaitsOn,
Dependency,
}
impl fmt::Display for RelationshipType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::Spawned => write!(f, "spawned"),
Self::ChannelSend => write!(f, "sends →"),
Self::ChannelReceive => write!(f, "← receives"),
Self::SharedResource => write!(f, "shares resource"),
Self::DataFlow => write!(f, "data →"),
Self::AwaitsOn => write!(f, "awaits"),
Self::Dependency => write!(f, "depends on"),
}
}
}
#[derive(Debug, Clone)]
pub struct Relationship {
pub from: TaskId,
pub to: TaskId,
pub relationship_type: RelationshipType,
pub resource_name: Option<String>,
pub data_description: Option<String>,
}
#[derive(Debug, Clone)]
pub struct TaskGraph {
relationships: Vec<Relationship>,
tasks: HashMap<TaskId, TaskInfo>,
adjacency: HashMap<TaskId, Vec<(TaskId, RelationshipType)>>,
reverse_adjacency: HashMap<TaskId, Vec<(TaskId, RelationshipType)>>,
}
impl TaskGraph {
#[must_use]
pub fn new() -> Self {
Self {
relationships: Vec::new(),
tasks: HashMap::new(),
adjacency: HashMap::new(),
reverse_adjacency: HashMap::new(),
}
}
pub fn add_task(&mut self, task: TaskInfo) {
self.tasks.insert(task.id, task);
}
pub fn add_relationship(&mut self, relationship: Relationship) {
self.adjacency
.entry(relationship.from)
.or_default()
.push((relationship.to, relationship.relationship_type));
self.reverse_adjacency
.entry(relationship.to)
.or_default()
.push((relationship.from, relationship.relationship_type));
self.relationships.push(relationship);
}
#[must_use]
pub fn get_relationships_by_type(&self, rel_type: RelationshipType) -> Vec<&Relationship> {
self.relationships
.iter()
.filter(|r| r.relationship_type == rel_type)
.collect()
}
#[must_use]
pub fn get_related_tasks(&self, task_id: TaskId) -> Vec<(TaskId, RelationshipType)> {
self.adjacency.get(&task_id).cloned().unwrap_or_default()
}
#[must_use]
pub fn get_dependent_tasks(&self, task_id: TaskId) -> Vec<(TaskId, RelationshipType)> {
self.reverse_adjacency
.get(&task_id)
.cloned()
.unwrap_or_default()
}
#[must_use]
pub fn get_task(&self, task_id: &TaskId) -> Option<&TaskInfo> {
self.tasks.get(task_id)
}
#[must_use]
pub fn find_critical_path(&self) -> Vec<TaskId> {
let mut longest_path = Vec::new();
let mut visited = HashSet::new();
for task_id in self.tasks.keys() {
let path = self.find_longest_path(*task_id, &mut visited);
if path.len() > longest_path.len() {
longest_path = path;
}
}
longest_path
}
fn find_longest_path(&self, task_id: TaskId, visited: &mut HashSet<TaskId>) -> Vec<TaskId> {
if visited.contains(&task_id) {
return vec![];
}
visited.insert(task_id);
let mut longest = vec![task_id];
if let Some(related) = self.adjacency.get(&task_id) {
for (next_id, rel_type) in related {
if matches!(
rel_type,
RelationshipType::Dependency
| RelationshipType::DataFlow
| RelationshipType::AwaitsOn
) {
let mut path = self.find_longest_path(*next_id, visited);
if path.len() + 1 > longest.len() {
path.insert(0, task_id);
longest = path;
}
}
}
}
visited.remove(&task_id);
longest
}
#[must_use]
pub fn find_transitive_dependencies(&self, task_id: TaskId) -> HashSet<TaskId> {
let mut dependencies = HashSet::new();
let mut queue = VecDeque::new();
queue.push_back(task_id);
while let Some(current) = queue.pop_front() {
if let Some(related) = self.adjacency.get(¤t) {
for (next_id, rel_type) in related {
if matches!(
rel_type,
RelationshipType::Dependency | RelationshipType::AwaitsOn
) && dependencies.insert(*next_id)
{
queue.push_back(*next_id);
}
}
}
}
dependencies
}
#[must_use]
pub fn find_tasks_sharing_resource(&self, resource_name: &str) -> Vec<TaskId> {
let mut tasks = HashSet::new();
for rel in &self.relationships {
if rel.relationship_type == RelationshipType::SharedResource {
if let Some(ref name) = rel.resource_name {
if name == resource_name {
tasks.insert(rel.from);
tasks.insert(rel.to);
}
}
}
}
tasks.into_iter().collect()
}
#[must_use]
pub fn find_channel_pairs(&self) -> Vec<(TaskId, TaskId)> {
let mut pairs = Vec::new();
for send in self.get_relationships_by_type(RelationshipType::ChannelSend) {
for recv in self.get_relationships_by_type(RelationshipType::ChannelReceive) {
if send.resource_name == recv.resource_name && send.to == recv.from {
pairs.push((send.from, recv.to));
}
}
}
pairs
}
#[must_use]
pub fn detect_potential_deadlocks(&self) -> Vec<Vec<TaskId>> {
let mut deadlock_cycles = Vec::new();
let mut visited = HashSet::new();
let mut rec_stack = HashSet::new();
for task_id in self.tasks.keys() {
if !visited.contains(task_id) {
if let Some(cycle) = self.find_cycle(*task_id, &mut visited, &mut rec_stack) {
deadlock_cycles.push(cycle);
}
}
}
deadlock_cycles
}
fn find_cycle(
&self,
task_id: TaskId,
visited: &mut HashSet<TaskId>,
rec_stack: &mut HashSet<TaskId>,
) -> Option<Vec<TaskId>> {
visited.insert(task_id);
rec_stack.insert(task_id);
if let Some(related) = self.adjacency.get(&task_id) {
for (next_id, rel_type) in related {
if matches!(
rel_type,
RelationshipType::SharedResource | RelationshipType::AwaitsOn
) {
if !visited.contains(next_id) {
if let Some(cycle) = self.find_cycle(*next_id, visited, rec_stack) {
return Some(cycle);
}
} else if rec_stack.contains(next_id) {
return Some(vec![task_id, *next_id]);
}
}
}
}
rec_stack.remove(&task_id);
None
}
#[must_use]
pub fn to_dot(&self) -> String {
let mut dot = String::from("digraph TaskGraph {\n");
dot.push_str(" rankdir=LR;\n");
dot.push_str(" node [shape=box, style=rounded];\n\n");
for (task_id, task) in &self.tasks {
let color = match task.state {
TaskState::Pending => "lightgray",
TaskState::Running => "lightblue",
TaskState::Blocked { .. } => "yellow",
TaskState::Completed => "lightgreen",
TaskState::Failed => "lightcoral",
};
dot.push_str(&format!(
" t{} [label=\"{}\n{:?}\", fillcolor={}, style=\"filled,rounded\"];\n",
task_id.as_u64(),
task.name,
task.state,
color
));
}
dot.push('\n');
for rel in &self.relationships {
let (style, color, label) = match rel.relationship_type {
RelationshipType::Spawned => ("solid", "black", "spawned"),
RelationshipType::ChannelSend => ("dashed", "blue", "→ channel"),
RelationshipType::ChannelReceive => ("dashed", "blue", "channel →"),
RelationshipType::SharedResource => ("dotted", "red", "shares"),
RelationshipType::DataFlow => ("bold", "green", "data →"),
RelationshipType::AwaitsOn => ("solid", "purple", "awaits"),
RelationshipType::Dependency => ("solid", "orange", "depends"),
};
let mut edge_label = label.to_string();
if let Some(ref resource) = rel.resource_name {
edge_label = format!("{label}\n{resource}");
}
dot.push_str(&format!(
" t{} -> t{} [label=\"{}\", style={}, color={}];\n",
rel.from.as_u64(),
rel.to.as_u64(),
edge_label,
style,
color
));
}
let critical_path = self.find_critical_path();
if critical_path.len() > 1 {
dot.push_str("\n // Critical path\n");
for window in critical_path.windows(2) {
dot.push_str(&format!(
" t{} -> t{} [color=red, penwidth=3.0, constraint=false];\n",
window[0].as_u64(),
window[1].as_u64()
));
}
}
dot.push_str("}\n");
dot
}
#[must_use]
pub fn to_text(&self) -> String {
let mut output = String::new();
output.push_str("Task Relationship Graph\n");
output.push_str("=======================\n\n");
for rel_type in &[
RelationshipType::Spawned,
RelationshipType::ChannelSend,
RelationshipType::ChannelReceive,
RelationshipType::SharedResource,
RelationshipType::DataFlow,
RelationshipType::AwaitsOn,
RelationshipType::Dependency,
] {
let rels = self.get_relationships_by_type(*rel_type);
if !rels.is_empty() {
output.push_str(&format!("\n{rel_type} Relationships:\n"));
for rel in rels {
let from_name = self.tasks.get(&rel.from).map_or("?", |t| t.name.as_str());
let to_name = self.tasks.get(&rel.to).map_or("?", |t| t.name.as_str());
output.push_str(&format!(" {from_name} {rel_type} {to_name}"));
if let Some(ref resource) = rel.resource_name {
output.push_str(&format!(" ({resource})"));
}
output.push('\n');
}
}
}
let critical_path = self.find_critical_path();
if !critical_path.is_empty() {
output.push_str("\nCritical Path:\n");
for task_id in &critical_path {
if let Some(task) = self.tasks.get(task_id) {
output.push_str(&format!(" → {} ({:?})\n", task.name, task.state));
}
}
}
let mut resources: HashMap<String, Vec<TaskId>> = HashMap::new();
for rel in &self.relationships {
if rel.relationship_type == RelationshipType::SharedResource {
if let Some(ref name) = rel.resource_name {
resources.entry(name.clone()).or_default().push(rel.from);
resources.entry(name.clone()).or_default().push(rel.to);
}
}
}
if !resources.is_empty() {
output.push_str("\nShared Resources:\n");
for (resource, task_ids) in resources {
let unique_tasks: HashSet<_> = task_ids.into_iter().collect();
output.push_str(&format!(
" {} (accessed by {} tasks):\n",
resource,
unique_tasks.len()
));
for task_id in unique_tasks {
if let Some(task) = self.tasks.get(&task_id) {
output.push_str(&format!(" - {}\n", task.name));
}
}
}
}
output
}
pub fn export_dot<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
let mut file = std::fs::File::create(path)?;
file.write_all(self.to_dot().as_bytes())?;
Ok(())
}
pub fn export_json<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
let json = self.to_json();
let mut file = std::fs::File::create(path)?;
file.write_all(json.as_bytes())?;
Ok(())
}
#[must_use]
pub fn to_json(&self) -> String {
let mut json = String::from("{\n");
json.push_str(" \"nodes\": [\n");
let nodes: Vec<_> = self
.tasks
.iter()
.map(|(id, task)| {
format!(
" {{\"id\": {}, \"name\": \"{}\", \"state\": \"{}\"}}",
id.as_u64(),
task.name.replace('"', "\\\""),
format!("{:?}", task.state).replace('"', "\\\"")
)
})
.collect();
json.push_str(&nodes.join(",\n"));
json.push_str("\n ],\n");
json.push_str(" \"edges\": [\n");
let edges: Vec<_> = self
.relationships
.iter()
.map(|rel| {
let mut edge = format!(
" {{\"from\": {}, \"to\": {}, \"type\": \"{:?}\"",
rel.from.as_u64(),
rel.to.as_u64(),
rel.relationship_type
);
if let Some(ref resource) = rel.resource_name {
edge.push_str(&format!(
", \"resource\": \"{}\"",
resource.replace('"', "\\\"")
));
}
edge.push('}');
edge
})
.collect();
json.push_str(&edges.join(",\n"));
json.push_str("\n ],\n");
json.push_str(" \"stats\": {\n");
json.push_str(&format!(" \"total_tasks\": {},\n", self.tasks.len()));
json.push_str(&format!(
" \"total_relationships\": {},\n",
self.relationships.len()
));
let critical_path = self.find_critical_path();
json.push_str(&format!(
" \"critical_path_length\": {}\n",
critical_path.len()
));
json.push_str(" }\n");
json.push_str("}\n");
json
}
#[must_use]
pub fn to_mermaid(&self) -> String {
let mut mermaid = String::from("flowchart LR\n");
for (task_id, task) in &self.tasks {
let id = format!("t{}", task_id.as_u64());
let label = task.name.replace('"', "'");
let node_def = match task.state {
TaskState::Pending => format!(" {id}[{label}]:::pending\n"),
TaskState::Running => format!(" {id}([{label}]):::running\n"),
TaskState::Blocked { .. } => format!(" {id}{{{{{label}}}}}:::blocked\n"),
TaskState::Completed => format!(" {id}[/{label}/]:::completed\n"),
TaskState::Failed => format!(" {id}[({label})]:::failed\n"),
};
mermaid.push_str(&node_def);
}
mermaid.push('\n');
for rel in &self.relationships {
let from = format!("t{}", rel.from.as_u64());
let to = format!("t{}", rel.to.as_u64());
let arrow = match rel.relationship_type {
RelationshipType::Spawned => "-->",
RelationshipType::ChannelSend | RelationshipType::ChannelReceive => "-.->",
RelationshipType::SharedResource => "o--o",
RelationshipType::DataFlow => "==>",
RelationshipType::AwaitsOn => "-->",
RelationshipType::Dependency => "-->",
};
let label = match (&rel.relationship_type, &rel.resource_name) {
(_, Some(resource)) => format!("|{resource}|"),
(RelationshipType::Spawned, None) => "|spawned|".to_string(),
(RelationshipType::ChannelSend, None) => "|send|".to_string(),
(RelationshipType::ChannelReceive, None) => "|recv|".to_string(),
(RelationshipType::SharedResource, None) => "|shares|".to_string(),
(RelationshipType::DataFlow, None) => "|data|".to_string(),
(RelationshipType::AwaitsOn, None) => "|awaits|".to_string(),
(RelationshipType::Dependency, None) => "|depends|".to_string(),
};
mermaid.push_str(&format!(" {from} {arrow}{label} {to}\n"));
}
mermaid.push_str("\n classDef pending fill:#f9f9f9,stroke:#999\n");
mermaid.push_str(" classDef running fill:#bbdefb,stroke:#1976d2\n");
mermaid.push_str(" classDef blocked fill:#fff9c4,stroke:#fbc02d\n");
mermaid.push_str(" classDef completed fill:#c8e6c9,stroke:#388e3c\n");
mermaid.push_str(" classDef failed fill:#ffcdd2,stroke:#d32f2f\n");
mermaid
}
pub fn export_mermaid<P: AsRef<Path>>(&self, path: P) -> std::io::Result<()> {
let mut file = std::fs::File::create(path)?;
file.write_all(self.to_mermaid().as_bytes())?;
Ok(())
}
#[must_use]
pub fn stats(&self) -> GraphStats {
let mut stats = GraphStats::default();
stats.total_tasks = self.tasks.len();
stats.total_relationships = self.relationships.len();
for task in self.tasks.values() {
match task.state {
TaskState::Pending => stats.pending_tasks += 1,
TaskState::Running => stats.running_tasks += 1,
TaskState::Blocked { .. } => stats.blocked_tasks += 1,
TaskState::Completed => stats.completed_tasks += 1,
TaskState::Failed => stats.failed_tasks += 1,
}
}
for rel in &self.relationships {
match rel.relationship_type {
RelationshipType::Spawned => stats.spawn_relationships += 1,
RelationshipType::ChannelSend | RelationshipType::ChannelReceive => {
stats.channel_relationships += 1;
}
RelationshipType::SharedResource => stats.resource_relationships += 1,
RelationshipType::DataFlow => stats.dataflow_relationships += 1,
RelationshipType::AwaitsOn | RelationshipType::Dependency => {
stats.dependency_relationships += 1;
}
}
}
stats.critical_path_length = self.find_critical_path().len();
stats
}
}
#[derive(Debug, Clone, Default)]
pub struct GraphStats {
pub total_tasks: usize,
pub total_relationships: usize,
pub pending_tasks: usize,
pub running_tasks: usize,
pub blocked_tasks: usize,
pub completed_tasks: usize,
pub failed_tasks: usize,
pub spawn_relationships: usize,
pub channel_relationships: usize,
pub resource_relationships: usize,
pub dataflow_relationships: usize,
pub dependency_relationships: usize,
pub critical_path_length: usize,
}
impl Default for TaskGraph {
fn default() -> Self {
Self::new()
}
}
static GRAPH: once_cell::sync::Lazy<Arc<RwLock<TaskGraph>>> =
once_cell::sync::Lazy::new(|| Arc::new(RwLock::new(TaskGraph::new())));
#[must_use]
pub fn global_graph() -> Arc<RwLock<TaskGraph>> {
Arc::clone(&GRAPH)
}
#[cfg(test)]
mod tests {
use super::*;
use crate::task::TaskId;
use std::time::{Duration, Instant};
#[test]
fn test_critical_path() {
let mut graph = TaskGraph::new();
let t1 = TaskId::from_u64(1);
let t2 = TaskId::from_u64(2);
let t3 = TaskId::from_u64(3);
use crate::task::{TaskInfo, TaskState};
let now = Instant::now();
graph.add_task(TaskInfo {
id: t1,
name: "task1".to_string(),
state: TaskState::Running,
created_at: now,
last_updated: now,
parent: None,
location: None,
poll_count: 0,
total_run_time: Duration::ZERO,
});
graph.add_task(TaskInfo {
id: t2,
name: "task2".to_string(),
state: TaskState::Running,
created_at: now,
last_updated: now,
parent: None,
location: None,
poll_count: 0,
total_run_time: Duration::ZERO,
});
graph.add_task(TaskInfo {
id: t3,
name: "task3".to_string(),
state: TaskState::Running,
created_at: now,
last_updated: now,
parent: None,
location: None,
poll_count: 0,
total_run_time: Duration::ZERO,
});
graph.add_relationship(Relationship {
from: t1,
to: t2,
relationship_type: RelationshipType::Dependency,
resource_name: None,
data_description: None,
});
graph.add_relationship(Relationship {
from: t2,
to: t3,
relationship_type: RelationshipType::Dependency,
resource_name: None,
data_description: None,
});
let path = graph.find_critical_path();
assert!(path.contains(&t1));
assert!(path.contains(&t2));
assert!(path.contains(&t3));
}
#[test]
fn test_shared_resources() {
let mut graph = TaskGraph::new();
let t1 = TaskId::from_u64(1);
let t2 = TaskId::from_u64(2);
graph.add_relationship(Relationship {
from: t1,
to: t2,
relationship_type: RelationshipType::SharedResource,
resource_name: Some("mutex_1".to_string()),
data_description: None,
});
let tasks = graph.find_tasks_sharing_resource("mutex_1");
assert_eq!(tasks.len(), 2);
}
}