Skip to main content

actionqueue_workflow/
hierarchy.rs

1//! Parent-child task lifecycle coupling.
2//!
3//! [`HierarchyTracker`] maintains the parent → children mapping and enforces
4//! lifecycle rules:
5//!
6//! - **Cascading cancellation**: canceling a parent cascades to all non-terminal
7//!   descendants via [`HierarchyTracker::collect_cancellation_cascade`].
8//! - **Orphan prevention**: [`HierarchyTracker::register_child`] rejects children
9//!   of tasks already marked terminal.
10//! - **Depth limit**: configurable maximum nesting depth (default 8 levels).
11//!
12//! # Invariants
13//!
14//! - Tree structure is acyclic by construction (only append, no reparenting).
15//! - Terminal state is tracked via [`HierarchyTracker::mark_terminal`], called
16//!   by the dispatch loop after all runs for a task reach terminal state.
17//! - At bootstrap, tree registration happens before terminal marking, so
18//!   orphan prevention never fires during WAL replay.
19
20use std::collections::{HashMap, HashSet, VecDeque};
21
22use actionqueue_core::ids::TaskId;
23
24/// Default maximum nesting depth for task hierarchies.
25pub const DEFAULT_MAX_DEPTH: usize = 8;
26
27/// Error returned when registering a parent-child relationship fails.
28#[derive(Debug, Clone, PartialEq, Eq)]
29pub enum HierarchyError {
30    /// Parent task has already reached a terminal state; no new children allowed.
31    OrphanPrevention {
32        /// The child task that was rejected.
33        child: TaskId,
34        /// The terminal parent task.
35        parent: TaskId,
36    },
37    /// Adding this child would exceed the configured maximum nesting depth.
38    DepthLimitExceeded {
39        /// The child task that was rejected.
40        child: TaskId,
41        /// The parent task that would have hosted the child.
42        parent: TaskId,
43        /// The depth the child would have reached.
44        depth: usize,
45        /// The configured maximum allowed depth.
46        limit: usize,
47    },
48}
49
50impl std::fmt::Display for HierarchyError {
51    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
52        match self {
53            HierarchyError::OrphanPrevention { child, parent } => {
54                write!(
55                    f,
56                    "cannot register child {child} under terminal parent {parent} (orphan \
57                     prevention)"
58                )
59            }
60            HierarchyError::DepthLimitExceeded { child, parent, depth, limit } => {
61                write!(
62                    f,
63                    "registering child {child} under {parent} would reach depth {depth} (limit: \
64                     {limit})"
65                )
66            }
67        }
68    }
69}
70
71impl std::error::Error for HierarchyError {}
72
73/// Tracks parent-child task relationships and enforces lifecycle coupling.
74///
75/// The tracker is **not** persisted directly. At bootstrap it is reconstructed
76/// from the `parent_task_id` field present on each `TaskSpec` in the WAL, then
77/// terminal status is inferred from run states in the projection.
78///
79/// The dispatch loop calls [`mark_terminal`][HierarchyTracker::mark_terminal]
80/// whenever a task's runs all reach terminal state, keeping orphan prevention
81/// accurate throughout the engine lifetime.
82#[derive(Debug, Default, Clone, PartialEq, Eq)]
83pub struct HierarchyTracker {
84    /// parent_task_id → set of direct child task_ids.
85    children: HashMap<TaskId, HashSet<TaskId>>,
86    /// child_task_id → parent_task_id.
87    parents: HashMap<TaskId, TaskId>,
88    /// task_ids that have reached terminal state.
89    terminal_tasks: HashSet<TaskId>,
90    /// Maximum allowed nesting depth (root = 0).
91    max_depth: usize,
92}
93
94impl HierarchyTracker {
95    /// Creates a new tracker with the default maximum depth (8).
96    pub fn new() -> Self {
97        Self::with_max_depth(DEFAULT_MAX_DEPTH)
98    }
99
100    /// Creates a new tracker with the given maximum nesting depth.
101    ///
102    /// A root task is at depth 0. A direct child is at depth 1.
103    /// `max_depth = 8` allows up to depth 8 (nine levels total).
104    pub fn with_max_depth(max_depth: usize) -> Self {
105        Self {
106            children: HashMap::new(),
107            parents: HashMap::new(),
108            terminal_tasks: HashSet::new(),
109            max_depth,
110        }
111    }
112
113    /// Registers a parent-child relationship.
114    ///
115    /// # Errors
116    ///
117    /// - [`HierarchyError::OrphanPrevention`] if the parent is already terminal.
118    /// - [`HierarchyError::DepthLimitExceeded`] if adding this child would
119    ///   exceed the configured maximum nesting depth.
120    pub fn register_child(&mut self, parent: TaskId, child: TaskId) -> Result<(), HierarchyError> {
121        if self.terminal_tasks.contains(&parent) {
122            return Err(HierarchyError::OrphanPrevention { child, parent });
123        }
124
125        let child_depth = self.depth(parent) + 1;
126        if child_depth > self.max_depth {
127            return Err(HierarchyError::DepthLimitExceeded {
128                child,
129                parent,
130                depth: child_depth,
131                limit: self.max_depth,
132            });
133        }
134
135        self.parents.insert(child, parent);
136        self.children.entry(parent).or_default().insert(child);
137        Ok(())
138    }
139
140    /// Returns an iterator over the direct children of `parent`.
141    pub fn children_of(&self, parent: TaskId) -> impl Iterator<Item = TaskId> + '_ {
142        self.children.get(&parent).into_iter().flat_map(|s| s.iter().copied())
143    }
144
145    /// Returns the parent of `child`, if any.
146    pub fn parent_of(&self, child: TaskId) -> Option<TaskId> {
147        self.parents.get(&child).copied()
148    }
149
150    /// Returns `true` if `task_id` has any registered children.
151    #[must_use]
152    pub fn has_children(&self, task_id: TaskId) -> bool {
153        self.children.contains_key(&task_id)
154    }
155
156    /// Returns `true` if `task_id` has been marked terminal.
157    #[must_use]
158    pub fn is_terminal(&self, task_id: TaskId) -> bool {
159        self.terminal_tasks.contains(&task_id)
160    }
161
162    /// Marks `task_id` as having reached a terminal state.
163    ///
164    /// Called by the dispatch loop when all runs for a task become terminal.
165    /// Once marked terminal, the task cannot accept new children.
166    pub fn mark_terminal(&mut self, task_id: TaskId) {
167        self.terminal_tasks.insert(task_id);
168    }
169
170    /// Returns the nesting depth of `task_id` (0 = root, no parent).
171    ///
172    /// Walks up the parent chain; O(depth) time. With `max_depth` capped at 8,
173    /// this is effectively constant-time and caching is unnecessary.
174    pub fn depth(&self, task_id: TaskId) -> usize {
175        let mut depth = 0usize;
176        let mut current = task_id;
177        while let Some(&parent) = self.parents.get(&current) {
178            depth += 1;
179            current = parent;
180        }
181        depth
182    }
183
184    /// Removes all tracker state for a fully-terminal task and its descendants.
185    ///
186    /// **Safety constraint:** Only call when `task_id` AND all its descendants
187    /// are terminal (i.e., `collect_cancellation_cascade(task_id)` returns empty).
188    /// Violating this precondition can remove entries still needed for cascade
189    /// protection or orphan prevention of in-flight tasks.
190    ///
191    /// Called by the dispatch loop after the hierarchy cascade for `task_id`
192    /// is fully quenched (all descendants terminal).
193    pub fn gc_subtree(&mut self, task_id: TaskId) {
194        // Collect all task_ids in the subtree (task_id + all descendants).
195        let mut to_remove = Vec::new();
196        let mut queue = VecDeque::new();
197        queue.push_back(task_id);
198
199        while let Some(current) = queue.pop_front() {
200            to_remove.push(current);
201            if let Some(children) = self.children.get(&current) {
202                for &child in children {
203                    queue.push_back(child);
204                }
205            }
206        }
207
208        for id in to_remove {
209            self.children.remove(&id);
210            self.parents.remove(&id);
211            self.terminal_tasks.remove(&id);
212        }
213    }
214
215    /// Collects all non-terminal descendants of `task_id` for cascading cancellation.
216    ///
217    /// Traverses the subtree in **breadth-first order** (BFS) starting from
218    /// `task_id`'s direct children. Results are returned in BFS visitation
219    /// order. Descendants already marked terminal
220    /// (via [`mark_terminal`][HierarchyTracker::mark_terminal]) are excluded
221    /// from the result, as they require no further action.
222    ///
223    /// Returns an empty `Vec` when `task_id` has no children or all descendants
224    /// are already terminal (self-quenching: repeated calls are safe and cheap).
225    pub fn collect_cancellation_cascade(&self, task_id: TaskId) -> Vec<TaskId> {
226        let mut result = Vec::new();
227        let mut queue = VecDeque::new();
228
229        // Seed the queue with direct children.
230        if let Some(children) = self.children.get(&task_id) {
231            for &child in children {
232                queue.push_back(child);
233            }
234        }
235
236        while let Some(current) = queue.pop_front() {
237            if self.terminal_tasks.contains(&current) {
238                // Terminal descendants need no cascading; still descend their children
239                // because a deeper non-terminal descendant may still need canceling.
240                // (A terminal descendant may itself have had non-terminal children
241                // registered before it went terminal — those children are still live.)
242            } else {
243                result.push(current);
244            }
245
246            // Always traverse children regardless of terminal status (to reach
247            // deeper non-terminal descendants through terminal intermediaries).
248            if let Some(children) = self.children.get(&current) {
249                for &child in children {
250                    queue.push_back(child);
251                }
252            }
253        }
254
255        result
256    }
257}
258
259#[cfg(test)]
260mod tests {
261    use actionqueue_core::ids::TaskId;
262
263    use super::*;
264
265    fn tid(n: u128) -> TaskId {
266        TaskId::from_uuid(uuid::Uuid::from_u128(n))
267    }
268
269    #[test]
270    fn new_tracker_has_no_children() {
271        let tracker = HierarchyTracker::new();
272        assert!(!tracker.has_children(tid(1)));
273        assert!(tracker.parent_of(tid(2)).is_none());
274        assert_eq!(tracker.depth(tid(1)), 0);
275    }
276
277    #[test]
278    fn register_child_records_relationship() {
279        let mut tracker = HierarchyTracker::new();
280        tracker.register_child(tid(1), tid(2)).expect("no error");
281        assert!(tracker.has_children(tid(1)));
282        assert_eq!(tracker.parent_of(tid(2)), Some(tid(1)));
283        assert_eq!(tracker.depth(tid(2)), 1);
284    }
285
286    #[test]
287    fn depth_increases_down_chain() {
288        let mut tracker = HierarchyTracker::new();
289        tracker.register_child(tid(1), tid(2)).expect("no error");
290        tracker.register_child(tid(2), tid(3)).expect("no error");
291        tracker.register_child(tid(3), tid(4)).expect("no error");
292        assert_eq!(tracker.depth(tid(1)), 0);
293        assert_eq!(tracker.depth(tid(2)), 1);
294        assert_eq!(tracker.depth(tid(3)), 2);
295        assert_eq!(tracker.depth(tid(4)), 3);
296    }
297
298    #[test]
299    fn orphan_prevention_rejects_child_of_terminal() {
300        let mut tracker = HierarchyTracker::new();
301        tracker.mark_terminal(tid(1));
302        let err = tracker.register_child(tid(1), tid(2)).expect_err("should fail");
303        assert!(matches!(err, HierarchyError::OrphanPrevention { child, parent }
304            if child == tid(2) && parent == tid(1)));
305    }
306
307    #[test]
308    fn depth_limit_rejects_excessive_nesting() {
309        let mut tracker = HierarchyTracker::with_max_depth(2);
310        tracker.register_child(tid(1), tid(2)).expect("depth 1 OK");
311        tracker.register_child(tid(2), tid(3)).expect("depth 2 OK");
312        let err = tracker.register_child(tid(3), tid(4)).expect_err("depth 3 exceeds limit 2");
313        assert!(matches!(err, HierarchyError::DepthLimitExceeded { depth: 3, limit: 2, .. }));
314    }
315
316    #[test]
317    fn cascade_returns_non_terminal_descendants() {
318        let mut tracker = HierarchyTracker::new();
319        tracker.register_child(tid(1), tid(2)).expect("no error");
320        tracker.register_child(tid(1), tid(3)).expect("no error");
321        tracker.register_child(tid(2), tid(4)).expect("no error");
322
323        let cascade = tracker.collect_cancellation_cascade(tid(1));
324        assert!(cascade.contains(&tid(2)));
325        assert!(cascade.contains(&tid(3)));
326        assert!(cascade.contains(&tid(4)));
327        assert_eq!(cascade.len(), 3);
328    }
329
330    #[test]
331    fn cascade_excludes_terminal_descendants() {
332        let mut tracker = HierarchyTracker::new();
333        tracker.register_child(tid(1), tid(2)).expect("no error");
334        tracker.register_child(tid(1), tid(3)).expect("no error");
335        tracker.mark_terminal(tid(2));
336
337        let cascade = tracker.collect_cancellation_cascade(tid(1));
338        assert!(!cascade.contains(&tid(2)), "tid(2) is terminal, excluded");
339        assert!(cascade.contains(&tid(3)));
340        assert_eq!(cascade.len(), 1);
341    }
342
343    #[test]
344    fn cascade_traverses_through_terminal_intermediary() {
345        // Even if a mid-chain task is terminal, its non-terminal children
346        // should still be collected (they are still live descendants).
347        let mut tracker = HierarchyTracker::new();
348        tracker.register_child(tid(1), tid(2)).expect("no error");
349        tracker.register_child(tid(2), tid(3)).expect("no error");
350        tracker.mark_terminal(tid(2));
351
352        let cascade = tracker.collect_cancellation_cascade(tid(1));
353        assert!(!cascade.contains(&tid(2)), "terminal");
354        assert!(cascade.contains(&tid(3)), "tid(3) is non-terminal leaf under terminal tid(2)");
355    }
356
357    #[test]
358    fn cascade_returns_empty_for_leaf_task() {
359        let tracker = HierarchyTracker::new();
360        assert!(tracker.collect_cancellation_cascade(tid(99)).is_empty());
361    }
362
363    #[test]
364    fn cascade_self_quenches_after_mark_terminal() {
365        let mut tracker = HierarchyTracker::new();
366        tracker.register_child(tid(1), tid(2)).expect("no error");
367
368        let first = tracker.collect_cancellation_cascade(tid(1));
369        assert_eq!(first, vec![tid(2)]);
370
371        tracker.mark_terminal(tid(2));
372        let second = tracker.collect_cancellation_cascade(tid(1));
373        assert!(second.is_empty(), "self-quenches after mark_terminal");
374    }
375
376    #[test]
377    fn bootstrap_registration_succeeds_with_empty_terminal_set() {
378        // During bootstrap, terminal_tasks is empty, so all register_child calls
379        // succeed regardless of actual terminal state — matching WAL replay semantics.
380        let mut tracker = HierarchyTracker::new();
381        tracker.register_child(tid(1), tid(2)).expect("no error during bootstrap");
382        // After registering, populate terminal status
383        tracker.mark_terminal(tid(1));
384        tracker.mark_terminal(tid(2));
385        // Now orphan prevention works for new submissions
386        tracker.register_child(tid(1), tid(3)).expect_err("parent is terminal");
387    }
388
389    #[test]
390    fn gc_subtree_removes_terminal_subtree() {
391        let mut tracker = HierarchyTracker::new();
392        tracker.register_child(tid(1), tid(2)).expect("no error");
393        tracker.register_child(tid(2), tid(3)).expect("no error");
394        tracker.mark_terminal(tid(1));
395        tracker.mark_terminal(tid(2));
396        tracker.mark_terminal(tid(3));
397
398        // Cascade is quenched (all terminal).
399        assert!(tracker.collect_cancellation_cascade(tid(1)).is_empty());
400
401        tracker.gc_subtree(tid(1));
402
403        // All tracker state removed.
404        assert!(!tracker.has_children(tid(1)));
405        assert!(!tracker.has_children(tid(2)));
406        assert!(tracker.parent_of(tid(2)).is_none());
407        assert!(tracker.parent_of(tid(3)).is_none());
408        assert!(!tracker.is_terminal(tid(1)));
409        assert!(!tracker.is_terminal(tid(2)));
410        assert!(!tracker.is_terminal(tid(3)));
411    }
412
413    #[test]
414    fn gc_subtree_is_idempotent() {
415        let mut tracker = HierarchyTracker::new();
416        tracker.register_child(tid(1), tid(2)).expect("no error");
417        tracker.mark_terminal(tid(1));
418        tracker.mark_terminal(tid(2));
419
420        tracker.gc_subtree(tid(1));
421        tracker.gc_subtree(tid(1)); // second call must not panic
422    }
423
424    #[test]
425    fn max_depth_default_is_eight() {
426        let tracker = HierarchyTracker::new();
427        assert_eq!(tracker.max_depth, DEFAULT_MAX_DEPTH);
428    }
429}