1use std::collections::{HashMap, HashSet, VecDeque};
18
19use actionqueue_core::ids::TaskId;
20
21#[derive(Debug, Clone, PartialEq, Eq)]
23pub struct CycleError {
24 task_id: TaskId,
25 cycle_through: TaskId,
26}
27
28impl CycleError {
29 pub(crate) fn new(task_id: TaskId, cycle_through: TaskId) -> Self {
30 Self { task_id, cycle_through }
31 }
32
33 pub fn task_id(&self) -> TaskId {
35 self.task_id
36 }
37
38 pub fn cycle_through(&self) -> TaskId {
40 self.cycle_through
41 }
42}
43
44impl std::fmt::Display for CycleError {
45 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 if self.task_id == self.cycle_through {
47 write!(f, "task {} cannot depend on itself", self.task_id)
48 } else {
49 write!(
50 f,
51 "declaring dependency from {} on {} would introduce a cycle ({} is already \
52 reachable from {})",
53 self.task_id, self.cycle_through, self.task_id, self.cycle_through
54 )
55 }
56 }
57}
58
59impl std::error::Error for CycleError {}
60
61#[derive(Debug, Default, Clone, PartialEq, Eq)]
70pub struct DependencyGate {
71 prerequisites: HashMap<TaskId, HashSet<TaskId>>,
75
76 satisfied: HashSet<TaskId>,
78
79 failed: HashSet<TaskId>,
81}
82
83impl DependencyGate {
84 pub fn new() -> Self {
86 Self::default()
87 }
88
89 pub fn declare(&mut self, task_id: TaskId, depends_on: Vec<TaskId>) -> Result<(), CycleError> {
96 for &prereq in &depends_on {
101 if self.is_reachable_from(prereq, task_id) {
102 return Err(CycleError::new(task_id, prereq));
103 }
104 }
105
106 let entry = self.prerequisites.entry(task_id).or_default();
107 for prereq in depends_on {
108 entry.insert(prereq);
109 }
110
111 self.recompute_satisfaction(task_id);
113 Ok(())
114 }
115
116 pub fn check_cycle(&self, task_id: TaskId, depends_on: &[TaskId]) -> Result<(), CycleError> {
128 for &prereq in depends_on {
129 if self.is_reachable_from(prereq, task_id) {
130 return Err(CycleError::new(task_id, prereq));
131 }
132 }
133 Ok(())
134 }
135
136 #[must_use]
142 pub fn is_eligible(&self, task_id: TaskId) -> bool {
143 !self.failed.contains(&task_id)
144 && match self.prerequisites.get(&task_id) {
145 None => true, Some(_) => self.satisfied.contains(&task_id), }
148 }
149
150 #[must_use]
155 pub fn is_dependency_failed(&self, task_id: TaskId) -> bool {
156 self.failed.contains(&task_id)
157 }
158
159 #[must_use]
161 pub fn has_prerequisites(&self, task_id: TaskId) -> bool {
162 self.prerequisites.contains_key(&task_id)
163 }
164
165 pub fn waiting_task_ids(&self) -> impl Iterator<Item = TaskId> + '_ {
170 self.prerequisites
171 .keys()
172 .copied()
173 .filter(move |&id| !self.satisfied.contains(&id) && !self.failed.contains(&id))
174 }
175
176 #[must_use]
181 pub fn notify_completed(&mut self, completed_task_id: TaskId) -> Vec<TaskId> {
182 self.satisfied.insert(completed_task_id);
184
185 let dependents: Vec<TaskId> = self
187 .prerequisites
188 .iter()
189 .filter(|(_, prereqs)| prereqs.contains(&completed_task_id))
190 .map(|(task_id, _)| *task_id)
191 .collect();
192
193 let mut newly_eligible = Vec::new();
194 for dep in dependents {
195 let was_eligible = self.is_eligible(dep);
196 self.recompute_satisfaction(dep);
197 if !was_eligible && self.is_eligible(dep) {
198 newly_eligible.push(dep);
199 }
200 }
201 newly_eligible
202 }
203
204 #[must_use]
210 pub fn notify_failed(&mut self, failed_task_id: TaskId) -> Vec<TaskId> {
211 if self.failed.contains(&failed_task_id) {
213 return Vec::new();
214 }
215
216 self.failed.insert(failed_task_id);
220
221 let mut newly_blocked = Vec::new();
222 let mut queue: VecDeque<TaskId> = VecDeque::new();
223 queue.push_back(failed_task_id);
224
225 while let Some(failed_id) = queue.pop_front() {
226 let dependents: Vec<TaskId> = self
228 .prerequisites
229 .iter()
230 .filter(|(_, prereqs)| prereqs.contains(&failed_id))
231 .map(|(task_id, _)| *task_id)
232 .filter(|task_id| !self.failed.contains(task_id))
233 .collect();
234
235 for dep in dependents {
236 self.failed.insert(dep);
237 self.satisfied.remove(&dep);
238 newly_blocked.push(dep);
239 queue.push_back(dep);
241 }
242 }
243
244 newly_blocked
245 }
246
247 #[must_use]
255 pub fn propagate_failures(&mut self) -> Vec<TaskId> {
256 let seeds: Vec<TaskId> = self.failed.iter().copied().collect();
257 let mut newly_blocked = Vec::new();
258 let mut queue: VecDeque<TaskId> = seeds.into_iter().collect();
259 while let Some(failed_id) = queue.pop_front() {
260 let dependents: Vec<TaskId> = self
261 .prerequisites
262 .iter()
263 .filter(|(_, prereqs)| prereqs.contains(&failed_id))
264 .map(|(task_id, _)| *task_id)
265 .filter(|task_id| !self.failed.contains(task_id))
266 .collect();
267 for dep in dependents {
268 self.failed.insert(dep);
269 self.satisfied.remove(&dep);
270 newly_blocked.push(dep);
271 queue.push_back(dep);
272 }
273 }
274 newly_blocked
275 }
276
277 pub fn force_satisfy(&mut self, task_id: TaskId) {
281 self.satisfied.insert(task_id);
282 self.failed.remove(&task_id);
283 }
284
285 pub fn force_fail(&mut self, task_id: TaskId) {
287 self.failed.insert(task_id);
288 self.satisfied.remove(&task_id);
289 }
290
291 pub fn gc_task(&mut self, task_id: TaskId) {
302 self.prerequisites.remove(&task_id);
304 self.satisfied.remove(&task_id);
305 self.failed.remove(&task_id);
306
307 for prereqs in self.prerequisites.values_mut() {
309 prereqs.remove(&task_id);
310 }
311 }
312
313 pub fn recompute_satisfaction_pub(&mut self, task_id: TaskId) {
319 self.recompute_satisfaction(task_id);
320 }
321
322 fn is_reachable_from(&self, start: TaskId, target: TaskId) -> bool {
326 if start == target {
327 return true;
328 }
329 let mut visited = HashSet::new();
330 let mut queue = VecDeque::new();
331 queue.push_back(start);
332
333 while let Some(current) = queue.pop_front() {
334 if current == target {
335 return true;
336 }
337 if !visited.insert(current) {
338 continue;
339 }
340 if let Some(prereqs) = self.prerequisites.get(¤t) {
341 for &prereq in prereqs {
342 if !visited.contains(&prereq) {
343 queue.push_back(prereq);
344 }
345 }
346 }
347 }
348 false
349 }
350
351 fn recompute_satisfaction(&mut self, task_id: TaskId) {
359 if self.failed.contains(&task_id) {
360 return; }
362 let Some(prereqs) = self.prerequisites.get(&task_id) else {
363 return; };
365 let prereqs: Vec<TaskId> = prereqs.iter().copied().collect();
366
367 let all_satisfied = prereqs.iter().all(|prereq| self.satisfied.contains(prereq));
370
371 if all_satisfied {
372 self.satisfied.insert(task_id);
373 } else {
374 self.satisfied.remove(&task_id);
375 }
376 }
377}
378
379#[cfg(test)]
380mod tests {
381 use actionqueue_core::ids::TaskId;
382
383 use super::*;
384
385 fn tid(n: u128) -> TaskId {
386 TaskId::from_uuid(uuid::Uuid::from_u128(n))
387 }
388
389 #[test]
390 fn no_dependencies_is_always_eligible() {
391 let gate = DependencyGate::new();
392 assert!(gate.is_eligible(tid(1)));
393 assert!(!gate.has_prerequisites(tid(1)));
394 }
395
396 #[test]
397 fn declared_dependency_blocks_until_completed() {
398 let mut gate = DependencyGate::new();
399 gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
400 assert!(!gate.is_eligible(tid(2)));
401
402 let newly_eligible = gate.notify_completed(tid(1));
403 assert_eq!(newly_eligible, vec![tid(2)]);
404 assert!(gate.is_eligible(tid(2)));
405 }
406
407 #[test]
408 fn failed_prerequisite_blocks_dependent() {
409 let mut gate = DependencyGate::new();
410 gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
411
412 let blocked = gate.notify_failed(tid(1));
413 assert_eq!(blocked, vec![tid(2)]);
414 assert!(!gate.is_eligible(tid(2)));
415 assert!(gate.is_dependency_failed(tid(2)));
416 }
417
418 #[test]
419 fn cascading_failure_through_chain() {
420 let mut gate = DependencyGate::new();
421 gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
422 gate.declare(tid(3), vec![tid(2)]).expect("no cycle");
423
424 let blocked = gate.notify_failed(tid(1));
425 assert!(blocked.contains(&tid(2)));
426 assert!(blocked.contains(&tid(3)));
427 assert!(gate.is_dependency_failed(tid(3)));
428 }
429
430 #[test]
431 fn cycle_detection_rejects_direct_cycle() {
432 let mut gate = DependencyGate::new();
433 gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
434 let err = gate.declare(tid(1), vec![tid(2)]).expect_err("cycle should be detected");
435 assert_eq!(err.task_id(), tid(1));
436 }
437
438 #[test]
439 fn force_satisfy_and_fail_for_bootstrap() {
440 let mut gate = DependencyGate::new();
441 gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
442 assert!(!gate.is_eligible(tid(2)));
443
444 gate.force_satisfy(tid(2));
445 assert!(gate.is_eligible(tid(2)));
446 }
447
448 #[test]
449 fn check_cycle_detects_cycle_without_mutation() {
450 let mut gate = DependencyGate::new();
451 gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
452 let snapshot = gate.clone();
453
454 let err = gate.check_cycle(tid(1), &[tid(2)]).expect_err("cycle");
456 assert_eq!(err.task_id(), tid(1));
457 assert_eq!(err.cycle_through(), tid(2));
458
459 assert_eq!(gate, snapshot);
461 }
462
463 #[test]
464 fn self_loop_display_message() {
465 let err = CycleError::new(tid(1), tid(1));
466 let msg = err.to_string();
467 assert!(
468 msg.contains("cannot depend on itself"),
469 "self-loop error should say 'cannot depend on itself', got: {msg}"
470 );
471 }
472
473 #[test]
474 fn notify_failed_idempotent_on_second_call() {
475 let mut gate = DependencyGate::new();
476 gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
477
478 let first = gate.notify_failed(tid(1));
479 assert_eq!(first.len(), 1);
480
481 let second = gate.notify_failed(tid(1));
483 assert!(second.is_empty(), "second notify_failed must return empty");
484 }
485
486 #[test]
487 fn propagate_failures_cascades_transitive_chain() {
488 let mut gate = DependencyGate::new();
489 gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
490 gate.declare(tid(3), vec![tid(2)]).expect("no cycle");
491
492 gate.force_fail(tid(1));
494 assert!(gate.is_dependency_failed(tid(1)));
495 assert!(!gate.is_dependency_failed(tid(2)), "before propagate, tid(2) not failed");
496 assert!(!gate.is_dependency_failed(tid(3)), "before propagate, tid(3) not failed");
497
498 let newly_blocked = gate.propagate_failures();
500 assert!(newly_blocked.contains(&tid(2)));
501 assert!(newly_blocked.contains(&tid(3)));
502 assert!(gate.is_dependency_failed(tid(2)));
503 assert!(gate.is_dependency_failed(tid(3)));
504 }
505
506 #[test]
507 fn notify_failed_idempotent_guard_fires_on_root() {
508 let mut gate = DependencyGate::new();
509 gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
510
511 let first = gate.notify_failed(tid(1));
512 assert_eq!(first.len(), 1);
513 assert!(gate.is_dependency_failed(tid(1)), "root should be in failed set");
514
515 let second = gate.notify_failed(tid(1));
517 assert!(second.is_empty(), "second call must return empty via guard");
518 }
519
520 #[test]
521 fn gc_task_removes_completed_prerequisite() {
522 let mut gate = DependencyGate::new();
523 gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
524 let _ = gate.notify_completed(tid(1));
525 assert!(gate.is_eligible(tid(2)));
526
527 gate.gc_task(tid(1));
528
529 assert!(gate.is_eligible(tid(2)));
531 assert!(!gate.has_prerequisites(tid(1)));
533 }
534
535 #[test]
536 fn gc_task_removes_failed_prerequisite() {
537 let mut gate = DependencyGate::new();
538 gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
539 let _ = gate.notify_failed(tid(1));
540 assert!(gate.is_dependency_failed(tid(2)));
541
542 gate.gc_task(tid(1));
543
544 assert!(gate.is_dependency_failed(tid(2)));
546 }
547
548 #[test]
549 fn gc_task_is_idempotent() {
550 let mut gate = DependencyGate::new();
551 gate.declare(tid(2), vec![tid(1)]).expect("no cycle");
552 let _ = gate.notify_completed(tid(1));
553
554 gate.gc_task(tid(1));
555 gate.gc_task(tid(1)); assert!(gate.is_eligible(tid(2)));
557 }
558
559 #[test]
560 fn multiple_prerequisites_require_all_satisfied() {
561 let mut gate = DependencyGate::new();
562 gate.declare(tid(3), vec![tid(1), tid(2)]).expect("no cycle");
563
564 let r1 = gate.notify_completed(tid(1));
565 assert!(r1.is_empty(), "tid(3) still blocked on tid(2)");
566 assert!(!gate.is_eligible(tid(3)));
567
568 let r2 = gate.notify_completed(tid(2));
569 assert_eq!(r2, vec![tid(3)]);
570 assert!(gate.is_eligible(tid(3)));
571 }
572}