use std::collections::{HashMap, HashSet, VecDeque};
use crate::error::{BatchError, Result};
use crate::types::JobId;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub enum DependencyStatus {
Pending,
Ready,
Completed,
Failed,
}
impl std::fmt::Display for DependencyStatus {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Pending => write!(f, "Pending"),
Self::Ready => write!(f, "Ready"),
Self::Completed => write!(f, "Completed"),
Self::Failed => write!(f, "Failed"),
}
}
}
#[derive(Debug, Default)]
pub struct JobDependencyManager {
successors: HashMap<String, HashSet<String>>,
predecessors: HashMap<String, HashSet<String>>,
status: HashMap<String, DependencyStatus>,
}
impl JobDependencyManager {
#[must_use]
pub fn new() -> Self {
Self::default()
}
pub fn register_job(&mut self, job_id: &JobId) {
let key = job_id.as_str().to_string();
if self.status.contains_key(&key) {
return;
}
self.successors.entry(key.clone()).or_default();
self.predecessors.entry(key.clone()).or_default();
self.status.insert(key, DependencyStatus::Ready);
}
pub fn add_dependency(&mut self, job_id: &JobId, depends_on: &JobId) -> Result<()> {
if job_id == depends_on {
return Err(BatchError::DependencyError(format!(
"Self-dependency not allowed: {}",
job_id.as_str()
)));
}
self.register_job(depends_on);
self.register_job(job_id);
let from = depends_on.as_str().to_string();
let to = job_id.as_str().to_string();
if self.would_create_cycle(&from, &to) {
return Err(BatchError::DependencyError(format!(
"Adding dependency {} -> {} would create a cycle",
from, to
)));
}
self.successors
.entry(from.clone())
.or_default()
.insert(to.clone());
self.predecessors
.entry(to.clone())
.or_default()
.insert(from);
self.recompute_status(&to);
Ok(())
}
pub fn status(&self, job_id: &JobId) -> Result<DependencyStatus> {
self.status
.get(job_id.as_str())
.copied()
.ok_or_else(|| BatchError::JobNotFound(job_id.as_str().to_string()))
}
pub fn mark_completed(&mut self, job_id: &JobId) -> Result<()> {
let key = job_id.as_str().to_string();
if !self.status.contains_key(&key) {
return Err(BatchError::JobNotFound(key));
}
self.status.insert(key.clone(), DependencyStatus::Completed);
let successors: Vec<String> = self
.successors
.get(&key)
.map(|s| s.iter().cloned().collect())
.unwrap_or_default();
for succ in &successors {
self.recompute_status(succ);
}
Ok(())
}
pub fn mark_failed(&mut self, job_id: &JobId) -> Result<()> {
let key = job_id.as_str().to_string();
if !self.status.contains_key(&key) {
return Err(BatchError::JobNotFound(key));
}
self.status.insert(key.clone(), DependencyStatus::Failed);
let dependents = self.transitive_dependents(&key);
for dep in &dependents {
self.status.insert(dep.clone(), DependencyStatus::Failed);
}
Ok(())
}
#[must_use]
pub fn ready_jobs(&self) -> Vec<String> {
self.status
.iter()
.filter(|(_, &s)| s == DependencyStatus::Ready)
.map(|(k, _)| k.clone())
.collect()
}
#[must_use]
pub fn pending_jobs(&self) -> Vec<String> {
self.status
.iter()
.filter(|(_, &s)| s == DependencyStatus::Pending)
.map(|(k, _)| k.clone())
.collect()
}
#[must_use]
pub fn job_count(&self) -> usize {
self.status.len()
}
#[must_use]
pub fn dependencies_of(&self, job_id: &JobId) -> Vec<String> {
self.predecessors
.get(job_id.as_str())
.map(|s| s.iter().cloned().collect())
.unwrap_or_default()
}
#[must_use]
pub fn dependents_of(&self, job_id: &JobId) -> Vec<String> {
self.successors
.get(job_id.as_str())
.map(|s| s.iter().cloned().collect())
.unwrap_or_default()
}
#[must_use]
pub fn fan_out(&self, job_id: &JobId) -> usize {
self.successors.get(job_id.as_str()).map_or(0, HashSet::len)
}
#[must_use]
pub fn fan_in(&self, job_id: &JobId) -> usize {
self.predecessors
.get(job_id.as_str())
.map_or(0, HashSet::len)
}
pub fn topological_sort(&self) -> Result<Vec<String>> {
let mut in_degree: HashMap<&str, usize> = HashMap::new();
for key in self.status.keys() {
in_degree.insert(key.as_str(), 0);
}
for (_, succs) in &self.successors {
for s in succs {
*in_degree.entry(s.as_str()).or_insert(0) += 1;
}
}
let mut queue: VecDeque<&str> = in_degree
.iter()
.filter(|(_, &d)| d == 0)
.map(|(&k, _)| k)
.collect();
let mut order: Vec<String> = Vec::with_capacity(self.status.len());
while let Some(node) = queue.pop_front() {
order.push(node.to_string());
if let Some(succs) = self.successors.get(node) {
for s in succs {
if let Some(deg) = in_degree.get_mut(s.as_str()) {
*deg = deg.saturating_sub(1);
if *deg == 0 {
queue.push_back(s.as_str());
}
}
}
}
}
if order.len() != self.status.len() {
return Err(BatchError::DependencyError(
"Cycle detected during topological sort".to_string(),
));
}
Ok(order)
}
pub fn execution_order(&self) -> Result<Vec<String>> {
self.topological_sort()
}
fn recompute_status(&mut self, node: &str) {
let preds = match self.predecessors.get(node) {
Some(p) => p.clone(),
None => return,
};
let any_failed = preds
.iter()
.any(|p| self.status.get(p) == Some(&DependencyStatus::Failed));
if any_failed {
self.status
.insert(node.to_string(), DependencyStatus::Failed);
return;
}
let all_completed = preds
.iter()
.all(|p| self.status.get(p) == Some(&DependencyStatus::Completed));
if preds.is_empty() || all_completed {
if self.status.get(node) == Some(&DependencyStatus::Pending) {
self.status
.insert(node.to_string(), DependencyStatus::Ready);
}
} else {
if self.status.get(node) == Some(&DependencyStatus::Ready) {
self.status
.insert(node.to_string(), DependencyStatus::Pending);
}
}
}
fn would_create_cycle(&self, from: &str, to: &str) -> bool {
let mut visited = HashSet::new();
let mut queue = VecDeque::new();
queue.push_back(to);
visited.insert(to.to_string());
while let Some(current) = queue.pop_front() {
if current == from {
return true;
}
if let Some(succs) = self.successors.get(current) {
for s in succs {
if visited.insert(s.clone()) {
queue.push_back(s.as_str());
}
}
}
}
false
}
fn transitive_dependents(&self, node: &str) -> Vec<String> {
let mut visited = HashSet::new();
let mut queue = VecDeque::new();
if let Some(succs) = self.successors.get(node) {
for s in succs {
if visited.insert(s.clone()) {
queue.push_back(s.clone());
}
}
}
while let Some(current) = queue.pop_front() {
if let Some(succs) = self.successors.get(¤t) {
for s in succs {
if visited.insert(s.clone()) {
queue.push_back(s.clone());
}
}
}
}
visited.into_iter().collect()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn jid(s: &str) -> JobId {
JobId::from(s)
}
#[test]
fn test_register_job_starts_ready() {
let mut mgr = JobDependencyManager::new();
mgr.register_job(&jid("a"));
assert_eq!(
mgr.status(&jid("a")).expect("should exist"),
DependencyStatus::Ready
);
}
#[test]
fn test_add_dependency_makes_dependent_pending() {
let mut mgr = JobDependencyManager::new();
mgr.add_dependency(&jid("b"), &jid("a")).expect("ok");
assert_eq!(
mgr.status(&jid("b")).expect("exists"),
DependencyStatus::Pending
);
assert_eq!(
mgr.status(&jid("a")).expect("exists"),
DependencyStatus::Ready
);
}
#[test]
fn test_circular_dependency_detected() {
let mut mgr = JobDependencyManager::new();
mgr.add_dependency(&jid("b"), &jid("a")).expect("ok");
let result = mgr.add_dependency(&jid("a"), &jid("b"));
assert!(result.is_err());
}
#[test]
fn test_self_dependency_rejected() {
let mut mgr = JobDependencyManager::new();
let result = mgr.add_dependency(&jid("x"), &jid("x"));
assert!(result.is_err());
}
#[test]
fn test_mark_completed_propagates_readiness() {
let mut mgr = JobDependencyManager::new();
mgr.add_dependency(&jid("b"), &jid("a")).expect("ok");
assert_eq!(
mgr.status(&jid("b")).expect("exists"),
DependencyStatus::Pending
);
mgr.mark_completed(&jid("a")).expect("ok");
assert_eq!(
mgr.status(&jid("b")).expect("exists"),
DependencyStatus::Ready
);
}
#[test]
fn test_mark_failed_cascades() {
let mut mgr = JobDependencyManager::new();
mgr.add_dependency(&jid("b"), &jid("a")).expect("ok");
mgr.add_dependency(&jid("c"), &jid("b")).expect("ok");
mgr.mark_failed(&jid("a")).expect("ok");
assert_eq!(
mgr.status(&jid("b")).expect("exists"),
DependencyStatus::Failed
);
assert_eq!(
mgr.status(&jid("c")).expect("exists"),
DependencyStatus::Failed
);
}
#[test]
fn test_topological_sort_linear_chain() {
let mut mgr = JobDependencyManager::new();
mgr.add_dependency(&jid("b"), &jid("a")).expect("ok");
mgr.add_dependency(&jid("c"), &jid("b")).expect("ok");
let order = mgr.topological_sort().expect("ok");
let pos_a = order.iter().position(|x| x == "a").expect("a in order");
let pos_b = order.iter().position(|x| x == "b").expect("b in order");
let pos_c = order.iter().position(|x| x == "c").expect("c in order");
assert!(pos_a < pos_b);
assert!(pos_b < pos_c);
}
#[test]
fn test_fan_out_pattern() {
let mut mgr = JobDependencyManager::new();
mgr.add_dependency(&jid("b"), &jid("a")).expect("ok");
mgr.add_dependency(&jid("c"), &jid("a")).expect("ok");
mgr.add_dependency(&jid("d"), &jid("a")).expect("ok");
assert_eq!(mgr.fan_out(&jid("a")), 3);
assert_eq!(mgr.fan_in(&jid("b")), 1);
assert_eq!(mgr.pending_jobs().len(), 3);
mgr.mark_completed(&jid("a")).expect("ok");
assert_eq!(mgr.ready_jobs().len(), 3);
}
#[test]
fn test_fan_in_pattern() {
let mut mgr = JobDependencyManager::new();
mgr.add_dependency(&jid("d"), &jid("a")).expect("ok");
mgr.add_dependency(&jid("d"), &jid("b")).expect("ok");
mgr.add_dependency(&jid("d"), &jid("c")).expect("ok");
assert_eq!(mgr.fan_in(&jid("d")), 3);
assert_eq!(
mgr.status(&jid("d")).expect("exists"),
DependencyStatus::Pending
);
mgr.mark_completed(&jid("a")).expect("ok");
mgr.mark_completed(&jid("b")).expect("ok");
assert_eq!(
mgr.status(&jid("d")).expect("exists"),
DependencyStatus::Pending
);
mgr.mark_completed(&jid("c")).expect("ok");
assert_eq!(
mgr.status(&jid("d")).expect("exists"),
DependencyStatus::Ready
);
}
#[test]
fn test_execution_order_respects_deps() {
let mut mgr = JobDependencyManager::new();
mgr.add_dependency(&jid("deploy"), &jid("build"))
.expect("ok");
mgr.add_dependency(&jid("deploy"), &jid("test"))
.expect("ok");
mgr.add_dependency(&jid("test"), &jid("build")).expect("ok");
let order = mgr.execution_order().expect("ok");
let pos = |name: &str| order.iter().position(|x| x == name).expect("in order");
assert!(pos("build") < pos("test"));
assert!(pos("test") < pos("deploy"));
}
#[test]
fn test_dependency_status_display() {
assert_eq!(DependencyStatus::Pending.to_string(), "Pending");
assert_eq!(DependencyStatus::Ready.to_string(), "Ready");
assert_eq!(DependencyStatus::Completed.to_string(), "Completed");
assert_eq!(DependencyStatus::Failed.to_string(), "Failed");
}
#[test]
fn test_status_unknown_job_returns_error() {
let mgr = JobDependencyManager::new();
assert!(mgr.status(&jid("nonexistent")).is_err());
}
#[test]
fn test_register_idempotent() {
let mut mgr = JobDependencyManager::new();
mgr.register_job(&jid("x"));
mgr.register_job(&jid("x")); assert_eq!(mgr.job_count(), 1);
}
#[test]
fn test_three_node_cycle_detected() {
let mut mgr = JobDependencyManager::new();
mgr.add_dependency(&jid("b"), &jid("a")).expect("ok");
mgr.add_dependency(&jid("c"), &jid("b")).expect("ok");
let result = mgr.add_dependency(&jid("a"), &jid("c"));
assert!(result.is_err());
}
#[test]
fn test_diamond_dag() {
let mut mgr = JobDependencyManager::new();
mgr.add_dependency(&jid("b"), &jid("a")).expect("ok");
mgr.add_dependency(&jid("c"), &jid("a")).expect("ok");
mgr.add_dependency(&jid("d"), &jid("b")).expect("ok");
mgr.add_dependency(&jid("d"), &jid("c")).expect("ok");
assert_eq!(mgr.job_count(), 4);
let order = mgr.topological_sort().expect("ok");
assert_eq!(order.len(), 4);
let pos = |n: &str| order.iter().position(|x| x == n).expect("in order");
assert!(pos("a") < pos("b"));
assert!(pos("a") < pos("c"));
assert!(pos("b") < pos("d"));
assert!(pos("c") < pos("d"));
}
}