Skip to main content

actionqueue_workflow/
dag.rs

1//! Task dependency graph and satisfaction tracking.
2//!
3//! [`DependencyGate`] tracks task-to-task dependencies and gates run promotion.
4//! A task with unmet prerequisites stays in `Scheduled` state even when its
5//! `scheduled_at` has elapsed. When all prerequisites reach terminal success,
6//! the gate marks the task eligible for promotion.
7//!
8//! # Invariants
9//!
10//! - Circular dependencies are rejected at declaration time (DFS cycle check).
11//! - Dependency satisfaction is deterministic: the gate state is reconstructible
12//!   from `DependencyDeclared` WAL events + run terminal states replayed via the
13//!   recovery reducer. Satisfaction and failure are ephemeral projections, not
14//!   independent WAL events.
15//! - A failed prerequisite cascades failure to all direct and transitive dependents.
16
17use std::collections::{HashMap, HashSet, VecDeque};
18
19use actionqueue_core::ids::TaskId;
20
21/// Error when a circular dependency is detected.
22#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct CycleError {
24    task_id: TaskId,
25    cycle_through: TaskId,
26}
27
28impl CycleError {
29    pub(crate) fn new(task_id: TaskId, cycle_through: TaskId) -> Self {
30        Self { task_id, cycle_through }
31    }
32
33    /// The task whose declaration would introduce a cycle.
34    pub fn task_id(&self) -> TaskId {
35        self.task_id
36    }
37
38    /// The prerequisite that would close the cycle.
39    pub fn cycle_through(&self) -> TaskId {
40        self.cycle_through
41    }
42}
43
44impl std::fmt::Display for CycleError {
45    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        if self.task_id == self.cycle_through {
47            write!(f, "task {} cannot depend on itself", self.task_id)
48        } else {
49            write!(
50                f,
51                "declaring dependency from {} on {} would introduce a cycle ({} is already \
52                 reachable from {})",
53                self.task_id, self.cycle_through, self.task_id, self.cycle_through
54            )
55        }
56    }
57}
58
59impl std::error::Error for CycleError {}
60
61/// Gates task promotion based on declared inter-task dependencies.
62///
63/// Tasks with unsatisfied prerequisites are blocked from Scheduled → Ready
64/// promotion in the dispatch loop. The gate is notified when tasks complete
65/// or fail, updating eligibility accordingly.
66///
67/// The gate is **not** persisted directly — it is reconstructed at bootstrap
68/// from WAL events stored in the recovery reducer.
69#[derive(Debug, Default, Clone, PartialEq, Eq)]
70pub struct DependencyGate {
71    /// task_id → set of prerequisite task_ids that must complete first.
72    ///
73    /// Only contains tasks with declared dependencies (not all tasks).
74    prerequisites: HashMap<TaskId, HashSet<TaskId>>,
75
76    /// task_ids all of whose prerequisites have reached terminal success.
77    satisfied: HashSet<TaskId>,
78
79    /// task_ids blocked because at least one prerequisite failed/canceled.
80    failed: HashSet<TaskId>,
81}
82
83impl DependencyGate {
84    /// Creates an empty gate (no dependencies, all tasks implicitly eligible).
85    pub fn new() -> Self {
86        Self::default()
87    }
88
89    /// Declares that `task_id` depends on all tasks in `depends_on`.
90    ///
91    /// # Errors
92    ///
93    /// Returns [`CycleError`] if adding this dependency would create a cycle
94    /// in the dependency graph.
95    pub fn declare(&mut self, task_id: TaskId, depends_on: Vec<TaskId>) -> Result<(), CycleError> {
96        // Cycle detection: for each new prerequisite, check whether `task_id`
97        // is reachable from the prerequisite (i.e., the prereq already depends
98        // on `task_id` directly or transitively). If so, adding this edge
99        // would close a cycle.
100        for &prereq in &depends_on {
101            if self.is_reachable_from(prereq, task_id) {
102                return Err(CycleError::new(task_id, prereq));
103            }
104        }
105
106        let entry = self.prerequisites.entry(task_id).or_default();
107        for prereq in depends_on {
108            entry.insert(prereq);
109        }
110
111        // Re-evaluate satisfaction now that prerequisites have changed.
112        self.recompute_satisfaction(task_id);
113        Ok(())
114    }
115
116    /// Checks whether declaring these dependencies would create a cycle,
117    /// without modifying the gate.
118    ///
119    /// Use this to pre-validate before a WAL append, so that cycles are
120    /// rejected before being durably persisted.
121    ///
122    /// # Errors
123    ///
124    /// Returns [`CycleError`] if any prerequisite in `depends_on` can already
125    /// reach `task_id` through existing edges (i.e., adding these edges would
126    /// close a cycle).
127    pub fn check_cycle(&self, task_id: TaskId, depends_on: &[TaskId]) -> Result<(), CycleError> {
128        for &prereq in depends_on {
129            if self.is_reachable_from(prereq, task_id) {
130                return Err(CycleError::new(task_id, prereq));
131            }
132        }
133        Ok(())
134    }
135
136    /// Returns `true` if `task_id` may be promoted to Ready.
137    ///
138    /// A task is eligible when it has no declared dependencies, OR when all
139    /// its prerequisites have been satisfied. Tasks with failed prerequisites
140    /// are NOT eligible (they will be canceled by the dispatch loop instead).
141    #[must_use]
142    pub fn is_eligible(&self, task_id: TaskId) -> bool {
143        !self.failed.contains(&task_id)
144            && match self.prerequisites.get(&task_id) {
145                None => true,                                 // No dependencies declared
146                Some(_) => self.satisfied.contains(&task_id), // All satisfied
147            }
148    }
149
150    /// Returns `true` if `task_id`'s prerequisites have permanently failed.
151    ///
152    /// When this returns `true`, the dispatch loop should cancel the task's
153    /// non-terminal runs rather than waiting for prerequisites to be satisfied.
154    #[must_use]
155    pub fn is_dependency_failed(&self, task_id: TaskId) -> bool {
156        self.failed.contains(&task_id)
157    }
158
159    /// Returns `true` if `task_id` has any declared prerequisites.
160    #[must_use]
161    pub fn has_prerequisites(&self, task_id: TaskId) -> bool {
162        self.prerequisites.contains_key(&task_id)
163    }
164
165    /// Returns all task_ids that are waiting for their prerequisites to be met.
166    ///
167    /// Excludes tasks whose prerequisites have all been satisfied and tasks
168    /// whose prerequisites have failed (those are in the `failed` set).
169    pub fn waiting_task_ids(&self) -> impl Iterator<Item = TaskId> + '_ {
170        self.prerequisites
171            .keys()
172            .copied()
173            .filter(move |&id| !self.satisfied.contains(&id) && !self.failed.contains(&id))
174    }
175
176    /// Called when a prerequisite task has reached terminal success.
177    ///
178    /// Returns the list of task_ids that became newly eligible as a result
179    /// (all prerequisites now satisfied).
180    #[must_use]
181    pub fn notify_completed(&mut self, completed_task_id: TaskId) -> Vec<TaskId> {
182        // Mark the completed task as satisfied (it is now eligible as a prerequisite).
183        self.satisfied.insert(completed_task_id);
184
185        // Recompute satisfaction for all tasks that declare this task as a prerequisite.
186        let dependents: Vec<TaskId> = self
187            .prerequisites
188            .iter()
189            .filter(|(_, prereqs)| prereqs.contains(&completed_task_id))
190            .map(|(task_id, _)| *task_id)
191            .collect();
192
193        let mut newly_eligible = Vec::new();
194        for dep in dependents {
195            let was_eligible = self.is_eligible(dep);
196            self.recompute_satisfaction(dep);
197            if !was_eligible && self.is_eligible(dep) {
198                newly_eligible.push(dep);
199            }
200        }
201        newly_eligible
202    }
203
204    /// Called when a prerequisite task has permanently failed (all runs
205    /// reached Failed or Canceled, with no successful completion).
206    ///
207    /// Returns the list of task_ids that are now permanently blocked
208    /// (their prerequisites will never succeed).
209    #[must_use]
210    pub fn notify_failed(&mut self, failed_task_id: TaskId) -> Vec<TaskId> {
211        // Early exit if this task was already processed as failed.
212        if self.failed.contains(&failed_task_id) {
213            return Vec::new();
214        }
215
216        // Mark the root as failed for idempotency on repeated calls.
217        // The root itself is NOT added to newly_blocked — it is the
218        // prerequisite that failed, not a dependent that became blocked.
219        self.failed.insert(failed_task_id);
220
221        let mut newly_blocked = Vec::new();
222        let mut queue: VecDeque<TaskId> = VecDeque::new();
223        queue.push_back(failed_task_id);
224
225        while let Some(failed_id) = queue.pop_front() {
226            // Find all tasks that depend on this failed task.
227            let dependents: Vec<TaskId> = self
228                .prerequisites
229                .iter()
230                .filter(|(_, prereqs)| prereqs.contains(&failed_id))
231                .map(|(task_id, _)| *task_id)
232                .filter(|task_id| !self.failed.contains(task_id))
233                .collect();
234
235            for dep in dependents {
236                self.failed.insert(dep);
237                self.satisfied.remove(&dep);
238                newly_blocked.push(dep);
239                // Cascade: dependents of the newly-failed task also fail.
240                queue.push_back(dep);
241            }
242        }
243
244        newly_blocked
245    }
246
247    /// Cascades the `failed` set to all transitive dependents.
248    ///
249    /// After bootstrap populates `force_fail` for directly-failed
250    /// prerequisites, this method BFS-cascades to ensure all reachable
251    /// dependents are also marked as failed.
252    ///
253    /// Returns the list of newly-failed (dependent) task IDs.
254    #[must_use]
255    pub fn propagate_failures(&mut self) -> Vec<TaskId> {
256        let seeds: Vec<TaskId> = self.failed.iter().copied().collect();
257        let mut newly_blocked = Vec::new();
258        let mut queue: VecDeque<TaskId> = seeds.into_iter().collect();
259        while let Some(failed_id) = queue.pop_front() {
260            let dependents: Vec<TaskId> = self
261                .prerequisites
262                .iter()
263                .filter(|(_, prereqs)| prereqs.contains(&failed_id))
264                .map(|(task_id, _)| *task_id)
265                .filter(|task_id| !self.failed.contains(task_id))
266                .collect();
267            for dep in dependents {
268                self.failed.insert(dep);
269                self.satisfied.remove(&dep);
270                newly_blocked.push(dep);
271                queue.push_back(dep);
272            }
273        }
274        newly_blocked
275    }
276
277    /// Directly marks a task as satisfied (used during gate reconstruction
278    /// from WAL events at bootstrap — bypasses cycle check since declarations
279    /// are already validated).
280    pub fn force_satisfy(&mut self, task_id: TaskId) {
281        self.satisfied.insert(task_id);
282        self.failed.remove(&task_id);
283    }
284
285    /// Directly marks a task as failed (used during gate reconstruction).
286    pub fn force_fail(&mut self, task_id: TaskId) {
287        self.failed.insert(task_id);
288        self.satisfied.remove(&task_id);
289    }
290
291    /// Removes all gate state for a fully-terminal task.
292    ///
293    /// Called by the dispatch loop after a task reaches full terminal state and
294    /// all dependency notifications have been issued. Safe to call because:
295    /// - If the task completed (`satisfied`), all dependents already received
296    ///   `notify_completed` and their eligibility was recomputed.
297    /// - If the task failed (`failed`), all dependents already received
298    ///   `notify_failed` and cascades are complete.
299    /// - Removing a satisfied prerequisite from a dependent's set is safe;
300    ///   the remaining prerequisites still gate eligibility correctly.
301    pub fn gc_task(&mut self, task_id: TaskId) {
302        // Remove from direct state sets.
303        self.prerequisites.remove(&task_id);
304        self.satisfied.remove(&task_id);
305        self.failed.remove(&task_id);
306
307        // Remove task_id from other tasks' prerequisite sets.
308        for prereqs in self.prerequisites.values_mut() {
309            prereqs.remove(&task_id);
310        }
311    }
312
313    /// Re-evaluates satisfaction for `task_id` using the current `satisfied` set.
314    ///
315    /// Called by the dispatch loop after restoring prerequisite satisfaction state
316    /// from the projection (e.g., when declaring a dependency after a prerequisite
317    /// completed and was GC'd from the `satisfied` set).
318    pub fn recompute_satisfaction_pub(&mut self, task_id: TaskId) {
319        self.recompute_satisfaction(task_id);
320    }
321
322    // ── Private helpers ──────────────────────────────────────────────────
323
324    /// BFS check: can we reach `target` starting from `start`?
325    fn is_reachable_from(&self, start: TaskId, target: TaskId) -> bool {
326        if start == target {
327            return true;
328        }
329        let mut visited = HashSet::new();
330        let mut queue = VecDeque::new();
331        queue.push_back(start);
332
333        while let Some(current) = queue.pop_front() {
334            if current == target {
335                return true;
336            }
337            if !visited.insert(current) {
338                continue;
339            }
340            if let Some(prereqs) = self.prerequisites.get(&current) {
341                for &prereq in prereqs {
342                    if !visited.contains(&prereq) {
343                        queue.push_back(prereq);
344                    }
345                }
346            }
347        }
348        false
349    }
350
351    /// Recomputes whether `task_id` has all prerequisites satisfied and
352    /// updates the `satisfied` set for `task_id` itself (not for leaf tasks).
353    ///
354    /// A task with declared prerequisites is "satisfied" (eligible as a future
355    /// prerequisite) when ALL its own prerequisites are in the `satisfied` set.
356    /// Leaf tasks (no declared prerequisites) are implicitly satisfied when they
357    /// complete — they are added to `satisfied` directly by `notify_completed`.
358    fn recompute_satisfaction(&mut self, task_id: TaskId) {
359        if self.failed.contains(&task_id) {
360            return; // Already permanently blocked.
361        }
362        let Some(prereqs) = self.prerequisites.get(&task_id) else {
363            return; // No declared prerequisites — eligibility decided by `is_eligible`.
364        };
365        let prereqs: Vec<TaskId> = prereqs.iter().copied().collect();
366
367        // A task's prerequisites are all met only if every prereq is in the
368        // `satisfied` set (meaning it has completed successfully).
369        let all_satisfied = prereqs.iter().all(|prereq| self.satisfied.contains(prereq));
370
371        if all_satisfied {
372            self.satisfied.insert(task_id);
373        } else {
374            self.satisfied.remove(&task_id);
375        }
376    }
377}
378
379#[cfg(test)]
380mod tests {
381    use actionqueue_core::ids::TaskId;
382
383    use super::*;
384
385    fn tid(n: u128) -> TaskId {
386        TaskId::from_uuid(uuid::Uuid::from_u128(n))
387    }
388
389    #[test]
390    fn no_dependencies_is_always_eligible() {
391        let gate = DependencyGate::new();
392        assert!(gate.is_eligible(tid(1)));
393        assert!(!gate.has_prerequisites(tid(1)));
394    }
395
396    #[test]
397    fn declared_dependency_blocks_until_completed() {
398        let mut gate = DependencyGate::new();
399        gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
400        assert!(!gate.is_eligible(tid(2)));
401
402        let newly_eligible = gate.notify_completed(tid(1));
403        assert_eq!(newly_eligible, vec![tid(2)]);
404        assert!(gate.is_eligible(tid(2)));
405    }
406
407    #[test]
408    fn failed_prerequisite_blocks_dependent() {
409        let mut gate = DependencyGate::new();
410        gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
411
412        let blocked = gate.notify_failed(tid(1));
413        assert_eq!(blocked, vec![tid(2)]);
414        assert!(!gate.is_eligible(tid(2)));
415        assert!(gate.is_dependency_failed(tid(2)));
416    }
417
418    #[test]
419    fn cascading_failure_through_chain() {
420        let mut gate = DependencyGate::new();
421        gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
422        gate.declare(tid(3), vec![tid(2)]).expect("no cycle");
423
424        let blocked = gate.notify_failed(tid(1));
425        assert!(blocked.contains(&tid(2)));
426        assert!(blocked.contains(&tid(3)));
427        assert!(gate.is_dependency_failed(tid(3)));
428    }
429
430    #[test]
431    fn cycle_detection_rejects_direct_cycle() {
432        let mut gate = DependencyGate::new();
433        gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
434        let err = gate.declare(tid(1), vec![tid(2)]).expect_err("cycle should be detected");
435        assert_eq!(err.task_id(), tid(1));
436    }
437
438    #[test]
439    fn force_satisfy_and_fail_for_bootstrap() {
440        let mut gate = DependencyGate::new();
441        gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
442        assert!(!gate.is_eligible(tid(2)));
443
444        gate.force_satisfy(tid(2));
445        assert!(gate.is_eligible(tid(2)));
446    }
447
448    #[test]
449    fn check_cycle_detects_cycle_without_mutation() {
450        let mut gate = DependencyGate::new();
451        gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
452        let snapshot = gate.clone();
453
454        // check_cycle detects the cycle...
455        let err = gate.check_cycle(tid(1), &[tid(2)]).expect_err("cycle");
456        assert_eq!(err.task_id(), tid(1));
457        assert_eq!(err.cycle_through(), tid(2));
458
459        // ...but the gate is unchanged.
460        assert_eq!(gate, snapshot);
461    }
462
463    #[test]
464    fn self_loop_display_message() {
465        let err = CycleError::new(tid(1), tid(1));
466        let msg = err.to_string();
467        assert!(
468            msg.contains("cannot depend on itself"),
469            "self-loop error should say 'cannot depend on itself', got: {msg}"
470        );
471    }
472
473    #[test]
474    fn notify_failed_idempotent_on_second_call() {
475        let mut gate = DependencyGate::new();
476        gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
477
478        let first = gate.notify_failed(tid(1));
479        assert_eq!(first.len(), 1);
480
481        // Second call for same task returns empty (already processed).
482        let second = gate.notify_failed(tid(1));
483        assert!(second.is_empty(), "second notify_failed must return empty");
484    }
485
486    #[test]
487    fn propagate_failures_cascades_transitive_chain() {
488        let mut gate = DependencyGate::new();
489        gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
490        gate.declare(tid(3), vec![tid(2)]).expect("no cycle");
491
492        // force_fail only marks tid(1), does NOT cascade.
493        gate.force_fail(tid(1));
494        assert!(gate.is_dependency_failed(tid(1)));
495        assert!(!gate.is_dependency_failed(tid(2)), "before propagate, tid(2) not failed");
496        assert!(!gate.is_dependency_failed(tid(3)), "before propagate, tid(3) not failed");
497
498        // propagate_failures cascades to transitive dependents.
499        let newly_blocked = gate.propagate_failures();
500        assert!(newly_blocked.contains(&tid(2)));
501        assert!(newly_blocked.contains(&tid(3)));
502        assert!(gate.is_dependency_failed(tid(2)));
503        assert!(gate.is_dependency_failed(tid(3)));
504    }
505
506    #[test]
507    fn notify_failed_idempotent_guard_fires_on_root() {
508        let mut gate = DependencyGate::new();
509        gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
510
511        let first = gate.notify_failed(tid(1));
512        assert_eq!(first.len(), 1);
513        assert!(gate.is_dependency_failed(tid(1)), "root should be in failed set");
514
515        // Second call returns empty via the early-exit guard on the root.
516        let second = gate.notify_failed(tid(1));
517        assert!(second.is_empty(), "second call must return empty via guard");
518    }
519
520    #[test]
521    fn gc_task_removes_completed_prerequisite() {
522        let mut gate = DependencyGate::new();
523        gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
524        let _ = gate.notify_completed(tid(1));
525        assert!(gate.is_eligible(tid(2)));
526
527        gate.gc_task(tid(1));
528
529        // tid(2) prerequisites are now empty (tid(1) removed), still eligible.
530        assert!(gate.is_eligible(tid(2)));
531        // tid(1) no longer tracked anywhere.
532        assert!(!gate.has_prerequisites(tid(1)));
533    }
534
535    #[test]
536    fn gc_task_removes_failed_prerequisite() {
537        let mut gate = DependencyGate::new();
538        gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
539        let _ = gate.notify_failed(tid(1));
540        assert!(gate.is_dependency_failed(tid(2)));
541
542        gate.gc_task(tid(1));
543
544        // tid(2) is still in failed set (gc_task only removes the root).
545        assert!(gate.is_dependency_failed(tid(2)));
546    }
547
548    #[test]
549    fn gc_task_is_idempotent() {
550        let mut gate = DependencyGate::new();
551        gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
552        let _ = gate.notify_completed(tid(1));
553
554        gate.gc_task(tid(1));
555        gate.gc_task(tid(1)); // second call must not panic
556        assert!(gate.is_eligible(tid(2)));
557    }
558
559    #[test]
560    fn multiple_prerequisites_require_all_satisfied() {
561        let mut gate = DependencyGate::new();
562        gate.declare(tid(3), vec![tid(1), tid(2)]).expect("no cycle");
563
564        let r1 = gate.notify_completed(tid(1));
565        assert!(r1.is_empty(), "tid(3) still blocked on tid(2)");
566        assert!(!gate.is_eligible(tid(3)));
567
568        let r2 = gate.notify_completed(tid(2));
569        assert_eq!(r2, vec![tid(3)]);
570        assert!(gate.is_eligible(tid(3)));
571    }
572}