use std::collections::{HashMap, HashSet, VecDeque};
use actionqueue_core::ids::TaskId;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CycleError {
task_id: TaskId,
cycle_through: TaskId,
}
impl CycleError {
pub(crate) fn new(task_id: TaskId, cycle_through: TaskId) -> Self {
Self { task_id, cycle_through }
}
pub fn task_id(&self) -> TaskId {
self.task_id
}
pub fn cycle_through(&self) -> TaskId {
self.cycle_through
}
}
impl std::fmt::Display for CycleError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
if self.task_id == self.cycle_through {
write!(f, "task {} cannot depend on itself", self.task_id)
} else {
write!(
f,
"declaring dependency from {} on {} would introduce a cycle ({} is already \
reachable from {})",
self.task_id, self.cycle_through, self.task_id, self.cycle_through
)
}
}
}
impl std::error::Error for CycleError {}
#[derive(Debug, Default, Clone, PartialEq, Eq)]
pub struct DependencyGate {
prerequisites: HashMap<TaskId, HashSet<TaskId>>,
satisfied: HashSet<TaskId>,
failed: HashSet<TaskId>,
}
impl DependencyGate {
pub fn new() -> Self {
Self::default()
}
pub fn declare(&mut self, task_id: TaskId, depends_on: Vec<TaskId>) -> Result<(), CycleError> {
for &prereq in &depends_on {
if self.is_reachable_from(prereq, task_id) {
return Err(CycleError::new(task_id, prereq));
}
}
let entry = self.prerequisites.entry(task_id).or_default();
for prereq in depends_on {
entry.insert(prereq);
}
self.recompute_satisfaction(task_id);
Ok(())
}
pub fn check_cycle(&self, task_id: TaskId, depends_on: &[TaskId]) -> Result<(), CycleError> {
for &prereq in depends_on {
if self.is_reachable_from(prereq, task_id) {
return Err(CycleError::new(task_id, prereq));
}
}
Ok(())
}
#[must_use]
pub fn is_eligible(&self, task_id: TaskId) -> bool {
!self.failed.contains(&task_id)
&& match self.prerequisites.get(&task_id) {
None => true, Some(_) => self.satisfied.contains(&task_id), }
}
#[must_use]
pub fn is_dependency_failed(&self, task_id: TaskId) -> bool {
self.failed.contains(&task_id)
}
#[must_use]
pub fn has_prerequisites(&self, task_id: TaskId) -> bool {
self.prerequisites.contains_key(&task_id)
}
pub fn waiting_task_ids(&self) -> impl Iterator<Item = TaskId> + '_ {
self.prerequisites
.keys()
.copied()
.filter(move |&id| !self.satisfied.contains(&id) && !self.failed.contains(&id))
}
#[must_use]
pub fn notify_completed(&mut self, completed_task_id: TaskId) -> Vec<TaskId> {
self.satisfied.insert(completed_task_id);
let dependents: Vec<TaskId> = self
.prerequisites
.iter()
.filter(|(_, prereqs)| prereqs.contains(&completed_task_id))
.map(|(task_id, _)| *task_id)
.collect();
let mut newly_eligible = Vec::new();
for dep in dependents {
let was_eligible = self.is_eligible(dep);
self.recompute_satisfaction(dep);
if !was_eligible && self.is_eligible(dep) {
newly_eligible.push(dep);
}
}
newly_eligible
}
#[must_use]
pub fn notify_failed(&mut self, failed_task_id: TaskId) -> Vec<TaskId> {
if self.failed.contains(&failed_task_id) {
return Vec::new();
}
self.failed.insert(failed_task_id);
let mut newly_blocked = Vec::new();
let mut queue: VecDeque<TaskId> = VecDeque::new();
queue.push_back(failed_task_id);
while let Some(failed_id) = queue.pop_front() {
let dependents: Vec<TaskId> = self
.prerequisites
.iter()
.filter(|(_, prereqs)| prereqs.contains(&failed_id))
.map(|(task_id, _)| *task_id)
.filter(|task_id| !self.failed.contains(task_id))
.collect();
for dep in dependents {
self.failed.insert(dep);
self.satisfied.remove(&dep);
newly_blocked.push(dep);
queue.push_back(dep);
}
}
newly_blocked
}
#[must_use]
pub fn propagate_failures(&mut self) -> Vec<TaskId> {
let seeds: Vec<TaskId> = self.failed.iter().copied().collect();
let mut newly_blocked = Vec::new();
let mut queue: VecDeque<TaskId> = seeds.into_iter().collect();
while let Some(failed_id) = queue.pop_front() {
let dependents: Vec<TaskId> = self
.prerequisites
.iter()
.filter(|(_, prereqs)| prereqs.contains(&failed_id))
.map(|(task_id, _)| *task_id)
.filter(|task_id| !self.failed.contains(task_id))
.collect();
for dep in dependents {
self.failed.insert(dep);
self.satisfied.remove(&dep);
newly_blocked.push(dep);
queue.push_back(dep);
}
}
newly_blocked
}
pub fn force_satisfy(&mut self, task_id: TaskId) {
self.satisfied.insert(task_id);
self.failed.remove(&task_id);
}
pub fn force_fail(&mut self, task_id: TaskId) {
self.failed.insert(task_id);
self.satisfied.remove(&task_id);
}
pub fn gc_task(&mut self, task_id: TaskId) {
self.prerequisites.remove(&task_id);
self.satisfied.remove(&task_id);
self.failed.remove(&task_id);
for prereqs in self.prerequisites.values_mut() {
prereqs.remove(&task_id);
}
}
pub fn recompute_satisfaction_pub(&mut self, task_id: TaskId) {
self.recompute_satisfaction(task_id);
}
fn is_reachable_from(&self, start: TaskId, target: TaskId) -> bool {
if start == target {
return true;
}
let mut visited = HashSet::new();
let mut queue = VecDeque::new();
queue.push_back(start);
while let Some(current) = queue.pop_front() {
if current == target {
return true;
}
if !visited.insert(current) {
continue;
}
if let Some(prereqs) = self.prerequisites.get(¤t) {
for &prereq in prereqs {
if !visited.contains(&prereq) {
queue.push_back(prereq);
}
}
}
}
false
}
fn recompute_satisfaction(&mut self, task_id: TaskId) {
if self.failed.contains(&task_id) {
return; }
let Some(prereqs) = self.prerequisites.get(&task_id) else {
return; };
let prereqs: Vec<TaskId> = prereqs.iter().copied().collect();
let all_satisfied = prereqs.iter().all(|prereq| self.satisfied.contains(prereq));
if all_satisfied {
self.satisfied.insert(task_id);
} else {
self.satisfied.remove(&task_id);
}
}
}
#[cfg(test)]
mod tests {
use actionqueue_core::ids::TaskId;
use super::*;
fn tid(n: u128) -> TaskId {
TaskId::from_uuid(uuid::Uuid::from_u128(n))
}
#[test]
fn no_dependencies_is_always_eligible() {
let gate = DependencyGate::new();
assert!(gate.is_eligible(tid(1)));
assert!(!gate.has_prerequisites(tid(1)));
}
#[test]
fn declared_dependency_blocks_until_completed() {
let mut gate = DependencyGate::new();
gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
assert!(!gate.is_eligible(tid(2)));
let newly_eligible = gate.notify_completed(tid(1));
assert_eq!(newly_eligible, vec![tid(2)]);
assert!(gate.is_eligible(tid(2)));
}
#[test]
fn failed_prerequisite_blocks_dependent() {
let mut gate = DependencyGate::new();
gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
let blocked = gate.notify_failed(tid(1));
assert_eq!(blocked, vec![tid(2)]);
assert!(!gate.is_eligible(tid(2)));
assert!(gate.is_dependency_failed(tid(2)));
}
#[test]
fn cascading_failure_through_chain() {
let mut gate = DependencyGate::new();
gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
gate.declare(tid(3), vec![tid(2)]).expect("no cycle");
let blocked = gate.notify_failed(tid(1));
assert!(blocked.contains(&tid(2)));
assert!(blocked.contains(&tid(3)));
assert!(gate.is_dependency_failed(tid(3)));
}
#[test]
fn cycle_detection_rejects_direct_cycle() {
let mut gate = DependencyGate::new();
gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
let err = gate.declare(tid(1), vec![tid(2)]).expect_err("cycle should be detected");
assert_eq!(err.task_id(), tid(1));
}
#[test]
fn force_satisfy_and_fail_for_bootstrap() {
let mut gate = DependencyGate::new();
gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
assert!(!gate.is_eligible(tid(2)));
gate.force_satisfy(tid(2));
assert!(gate.is_eligible(tid(2)));
}
#[test]
fn check_cycle_detects_cycle_without_mutation() {
let mut gate = DependencyGate::new();
gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
let snapshot = gate.clone();
let err = gate.check_cycle(tid(1), &[tid(2)]).expect_err("cycle");
assert_eq!(err.task_id(), tid(1));
assert_eq!(err.cycle_through(), tid(2));
assert_eq!(gate, snapshot);
}
#[test]
fn self_loop_display_message() {
let err = CycleError::new(tid(1), tid(1));
let msg = err.to_string();
assert!(
msg.contains("cannot depend on itself"),
"self-loop error should say 'cannot depend on itself', got: {msg}"
);
}
#[test]
fn notify_failed_idempotent_on_second_call() {
let mut gate = DependencyGate::new();
gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
let first = gate.notify_failed(tid(1));
assert_eq!(first.len(), 1);
let second = gate.notify_failed(tid(1));
assert!(second.is_empty(), "second notify_failed must return empty");
}
#[test]
fn propagate_failures_cascades_transitive_chain() {
let mut gate = DependencyGate::new();
gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
gate.declare(tid(3), vec![tid(2)]).expect("no cycle");
gate.force_fail(tid(1));
assert!(gate.is_dependency_failed(tid(1)));
assert!(!gate.is_dependency_failed(tid(2)), "before propagate, tid(2) not failed");
assert!(!gate.is_dependency_failed(tid(3)), "before propagate, tid(3) not failed");
let newly_blocked = gate.propagate_failures();
assert!(newly_blocked.contains(&tid(2)));
assert!(newly_blocked.contains(&tid(3)));
assert!(gate.is_dependency_failed(tid(2)));
assert!(gate.is_dependency_failed(tid(3)));
}
#[test]
fn notify_failed_idempotent_guard_fires_on_root() {
let mut gate = DependencyGate::new();
gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
let first = gate.notify_failed(tid(1));
assert_eq!(first.len(), 1);
assert!(gate.is_dependency_failed(tid(1)), "root should be in failed set");
let second = gate.notify_failed(tid(1));
assert!(second.is_empty(), "second call must return empty via guard");
}
#[test]
fn gc_task_removes_completed_prerequisite() {
let mut gate = DependencyGate::new();
gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
let _ = gate.notify_completed(tid(1));
assert!(gate.is_eligible(tid(2)));
gate.gc_task(tid(1));
assert!(gate.is_eligible(tid(2)));
assert!(!gate.has_prerequisites(tid(1)));
}
#[test]
fn gc_task_removes_failed_prerequisite() {
let mut gate = DependencyGate::new();
gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
let _ = gate.notify_failed(tid(1));
assert!(gate.is_dependency_failed(tid(2)));
gate.gc_task(tid(1));
assert!(gate.is_dependency_failed(tid(2)));
}
#[test]
fn gc_task_is_idempotent() {
let mut gate = DependencyGate::new();
gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
let _ = gate.notify_completed(tid(1));
gate.gc_task(tid(1));
gate.gc_task(tid(1)); assert!(gate.is_eligible(tid(2)));
}
#[test]
fn multiple_prerequisites_require_all_satisfied() {
let mut gate = DependencyGate::new();
gate.declare(tid(3), vec![tid(1), tid(2)]).expect("no cycle");
let r1 = gate.notify_completed(tid(1));
assert!(r1.is_empty(), "tid(3) still blocked on tid(2)");
assert!(!gate.is_eligible(tid(3)));
let r2 = gate.notify_completed(tid(2));
assert_eq!(r2, vec![tid(3)]);
assert!(gate.is_eligible(tid(3)));
}
}