1use std::collections::{HashMap, HashSet};
2
3use anyhow::{Context, Result};
4use rusqlite::Connection;
5use tracing::{info, warn};
6
7use crate::{
8 agents::{PlannedNode, PlannerGraphOutput},
9 db::graph::{self, GraphNodeRow, NodeState},
10 issues::PipelineIssue,
11};
12
13#[derive(Debug, Clone)]
15pub struct GraphNode {
16 pub issue_number: u32,
17 pub title: String,
18 pub area: String,
19 pub predicted_files: Vec<String>,
20 pub has_migration: bool,
21 pub complexity: String,
22 pub state: NodeState,
23 pub pr_number: Option<u32>,
24 pub run_id: Option<String>,
25 pub issue: Option<PipelineIssue>,
26 pub target_repo: Option<String>,
29}
30
31pub struct DependencyGraph {
36 session_id: String,
37 nodes: HashMap<u32, GraphNode>,
38 edges: HashMap<u32, HashSet<u32>>,
40 reverse_edges: HashMap<u32, HashSet<u32>>,
42}
43
44impl DependencyGraph {
45 pub fn new(session_id: &str) -> Self {
46 Self {
47 session_id: session_id.to_string(),
48 nodes: HashMap::new(),
49 edges: HashMap::new(),
50 reverse_edges: HashMap::new(),
51 }
52 }
53
54 pub fn session_id(&self) -> &str {
55 &self.session_id
56 }
57
58 pub fn contains(&self, issue: u32) -> bool {
59 self.nodes.contains_key(&issue)
60 }
61
62 pub fn node(&self, issue: u32) -> Option<&GraphNode> {
63 self.nodes.get(&issue)
64 }
65
66 pub fn node_count(&self) -> usize {
67 self.nodes.len()
68 }
69
70 pub fn add_node(&mut self, node: GraphNode) {
71 let num = node.issue_number;
72 self.nodes.insert(num, node);
73 self.edges.entry(num).or_default();
74 self.reverse_edges.entry(num).or_default();
75 }
76
77 pub fn add_edge(&mut self, from: u32, to: u32) -> bool {
81 if from == to || self.would_create_cycle(from, to) {
82 return false;
83 }
84 self.edges.entry(from).or_default().insert(to);
85 self.reverse_edges.entry(to).or_default().insert(from);
86 true
87 }
88
89 pub fn would_create_cycle(&self, from: u32, to: u32) -> bool {
93 let mut visited = HashSet::new();
94 let mut stack = vec![to];
95 while let Some(current) = stack.pop() {
96 if current == from {
97 return true;
98 }
99 if visited.insert(current) {
100 if let Some(deps) = self.edges.get(¤t) {
101 for &dep in deps {
102 if !visited.contains(&dep) {
103 stack.push(dep);
104 }
105 }
106 }
107 }
108 }
109 false
110 }
111
112 pub fn ready_issues(&self) -> Vec<u32> {
114 self.nodes
115 .iter()
116 .filter(|(_, node)| node.state == NodeState::Pending)
117 .filter(|(num, _)| {
118 self.edges.get(num).is_none_or(|deps| {
119 deps.iter()
120 .all(|d| self.nodes.get(d).is_some_and(|n| n.state == NodeState::Merged))
121 })
122 })
123 .map(|(num, _)| *num)
124 .collect()
125 }
126
127 pub fn awaiting_merge(&self) -> Vec<u32> {
129 self.nodes
130 .iter()
131 .filter(|(_, node)| node.state == NodeState::AwaitingMerge)
132 .map(|(num, _)| *num)
133 .collect()
134 }
135
136 pub fn transition(&mut self, issue: u32, state: NodeState) {
138 if let Some(node) = self.nodes.get_mut(&issue) {
139 info!(
140 issue,
141 from = %node.state,
142 to = %state,
143 "graph node state transition"
144 );
145 node.state = state;
146 }
147 }
148
149 pub fn set_pr_number(&mut self, issue: u32, pr_number: u32) {
151 if let Some(node) = self.nodes.get_mut(&issue) {
152 node.pr_number = Some(pr_number);
153 }
154 }
155
156 pub fn set_run_id(&mut self, issue: u32, run_id: &str) {
158 if let Some(node) = self.nodes.get_mut(&issue) {
159 node.run_id = Some(run_id.to_string());
160 }
161 }
162
163 pub fn dependencies(&self, issue: u32) -> HashSet<u32> {
165 self.edges.get(&issue).cloned().unwrap_or_default()
166 }
167
168 pub fn dependents(&self, issue: u32) -> HashSet<u32> {
170 self.reverse_edges.get(&issue).cloned().unwrap_or_default()
171 }
172
173 pub fn all_terminal(&self) -> bool {
175 self.nodes.values().all(|n| matches!(n.state, NodeState::Merged | NodeState::Failed))
176 }
177
178 pub fn is_blocked(&self, issue: u32) -> bool {
180 self.edges.get(&issue).is_some_and(|deps| {
181 deps.iter().any(|d| self.nodes.get(d).is_some_and(|n| n.state == NodeState::Failed))
182 })
183 }
184
185 pub fn propagate_failure(&mut self, issue: u32) -> Vec<u32> {
191 use std::collections::VecDeque;
192
193 let mut queue = VecDeque::new();
194 let mut newly_failed = Vec::new();
195
196 if let Some(dependents) = self.reverse_edges.get(&issue) {
198 queue.extend(dependents.iter().copied());
199 }
200
201 let mut visited = HashSet::new();
202 visited.insert(issue);
203
204 while let Some(current) = queue.pop_front() {
205 if !visited.insert(current) {
206 continue;
207 }
208 let dominated = self
209 .nodes
210 .get(¤t)
211 .is_some_and(|n| matches!(n.state, NodeState::Pending | NodeState::InFlight));
212 if !dominated {
213 continue;
214 }
215 self.transition(current, NodeState::Failed);
216 newly_failed.push(current);
217 if let Some(dependents) = self.reverse_edges.get(¤t) {
218 queue.extend(dependents.iter().copied());
219 }
220 }
221
222 newly_failed
223 }
224
225 pub fn remove_node(&mut self, issue: u32) {
227 self.nodes.remove(&issue);
228 if let Some(deps) = self.edges.remove(&issue) {
229 for dep in &deps {
230 if let Some(rev) = self.reverse_edges.get_mut(dep) {
231 rev.remove(&issue);
232 }
233 }
234 }
235 if let Some(dependents) = self.reverse_edges.remove(&issue) {
236 for dependent in &dependents {
237 if let Some(fwd) = self.edges.get_mut(dependent) {
238 fwd.remove(&issue);
239 }
240 }
241 }
242 }
243
244 pub fn all_issues(&self) -> Vec<u32> {
246 let mut nums: Vec<u32> = self.nodes.keys().copied().collect();
247 nums.sort_unstable();
248 nums
249 }
250
251 pub fn from_db(conn: &Connection, session_id: &str) -> Result<Self> {
253 let db_nodes = graph::get_nodes(conn, session_id).context("loading graph nodes")?;
254 let db_edges = graph::get_edges(conn, session_id).context("loading graph edges")?;
255
256 let mut g = Self::new(session_id);
257 for row in &db_nodes {
258 g.add_node(GraphNode {
259 issue_number: row.issue_number,
260 title: row.title.clone(),
261 area: row.area.clone(),
262 predicted_files: row.predicted_files.clone(),
263 has_migration: row.has_migration,
264 complexity: row.complexity.clone(),
265 state: row.state,
266 pr_number: row.pr_number,
267 run_id: row.run_id.clone(),
268 issue: None,
269 target_repo: row.target_repo.clone(),
270 });
271 }
272 for (from, to) in &db_edges {
273 if !g.add_edge(*from, *to) {
274 warn!(from, to, "skipping persisted edge that would create cycle");
275 }
276 }
277
278 Ok(g)
279 }
280
281 pub fn save_to_db(&self, conn: &Connection) -> Result<()> {
285 let tx = conn.unchecked_transaction().context("starting graph save transaction")?;
286
287 graph::delete_session(&tx, &self.session_id)?;
288 for node in self.nodes.values() {
289 let row = GraphNodeRow {
290 issue_number: node.issue_number,
291 session_id: self.session_id.clone(),
292 state: node.state,
293 pr_number: node.pr_number,
294 run_id: node.run_id.clone(),
295 title: node.title.clone(),
296 area: node.area.clone(),
297 predicted_files: node.predicted_files.clone(),
298 has_migration: node.has_migration,
299 complexity: node.complexity.clone(),
300 target_repo: node.target_repo.clone(),
301 };
302 graph::insert_node(&tx, &self.session_id, &row)?;
303 }
304 for (&from, deps) in &self.edges {
305 for &to in deps {
306 graph::insert_edge(&tx, &self.session_id, from, to)?;
307 }
308 }
309
310 tx.commit().context("committing graph save transaction")?;
311 Ok(())
312 }
313
314 pub fn from_planner_output(
316 session_id: &str,
317 plan: &PlannerGraphOutput,
318 issues: &[PipelineIssue],
319 ) -> Self {
320 let issue_map: HashMap<u32, &PipelineIssue> =
321 issues.iter().map(|i| (i.number, i)).collect();
322 let mut g = Self::new(session_id);
323 for node in &plan.nodes {
324 g.add_node(node_from_planned(node, issue_map.get(&node.number).copied()));
325 }
326 add_planned_edges(&mut g, &plan.nodes);
327 g
328 }
329
330 pub fn merge_planner_output(&mut self, plan: &PlannerGraphOutput, issues: &[PipelineIssue]) {
335 let issue_map: HashMap<u32, &PipelineIssue> =
336 issues.iter().map(|i| (i.number, i)).collect();
337 let new_nodes: Vec<&PlannedNode> =
338 plan.nodes.iter().filter(|n| !self.contains(n.number)).collect();
339 for node in &new_nodes {
340 self.add_node(node_from_planned(node, issue_map.get(&node.number).copied()));
341 }
342 add_planned_edges(self, &new_nodes);
343 }
344
345 pub fn display_lines(&self) -> Vec<String> {
347 let mut lines = Vec::new();
348 let issues = self.all_issues();
349
350 for num in issues {
351 let Some(node) = self.nodes.get(&num) else { continue };
352 let blocked = if self.is_blocked(num) { " (blocked)" } else { "" };
353 let state_str = format!("[{}]{blocked}", node.state);
354 lines.push(format!(" #{num} {} {:.<40} {state_str}", node.title, "."));
355 let deps = self.dependencies(num);
356 if !deps.is_empty() {
357 let mut dep_nums: Vec<u32> = deps.into_iter().collect();
358 dep_nums.sort_unstable();
359 let dep_strs: Vec<String> = dep_nums.iter().map(|d| format!("#{d}")).collect();
360 lines.push(format!(" depends on: {}", dep_strs.join(", ")));
361 }
362 }
363 lines
364 }
365
366 pub fn to_graph_context(&self) -> Vec<crate::agents::GraphContextNode> {
371 self.all_issues()
372 .into_iter()
373 .filter_map(|num| {
374 let node = self.nodes.get(&num)?;
375 let depends_on: Vec<u32> = self.edges.get(&num).map_or_else(Vec::new, |deps| {
376 let mut v: Vec<u32> = deps.iter().copied().collect();
377 v.sort_unstable();
378 v
379 });
380 Some(crate::agents::GraphContextNode {
381 number: num,
382 title: node.title.clone(),
383 state: node.state,
384 area: node.area.clone(),
385 predicted_files: node.predicted_files.clone(),
386 has_migration: node.has_migration,
387 depends_on,
388 target_repo: node.target_repo.clone(),
389 })
390 })
391 .collect()
392 }
393}
394
395fn node_from_planned(node: &PlannedNode, issue: Option<&PipelineIssue>) -> GraphNode {
396 GraphNode {
397 issue_number: node.number,
398 title: node.title.clone(),
399 area: node.area.clone(),
400 predicted_files: node.predicted_files.clone(),
401 has_migration: node.has_migration,
402 complexity: node.complexity.to_string(),
403 state: NodeState::Pending,
404 pr_number: None,
405 run_id: None,
406 target_repo: issue.and_then(|i| i.target_repo.clone()),
407 issue: issue.cloned(),
408 }
409}
410
411fn add_planned_edges(graph: &mut DependencyGraph, nodes: &[impl std::borrow::Borrow<PlannedNode>]) {
412 for node in nodes {
413 let node = node.borrow();
414 for &dep in &node.depends_on {
415 if !graph.add_edge(node.number, dep) {
416 warn!(
417 from = node.number,
418 to = dep,
419 "skipping planner edge that would create cycle"
420 );
421 }
422 }
423 }
424}
425
426#[cfg(test)]
427mod tests {
428 use super::*;
429
430 fn make_node(num: u32) -> GraphNode {
431 GraphNode {
432 issue_number: num,
433 title: format!("Issue #{num}"),
434 area: "test".to_string(),
435 predicted_files: vec![],
436 has_migration: false,
437 complexity: "full".to_string(),
438 state: NodeState::Pending,
439 pr_number: None,
440 run_id: None,
441 issue: None,
442 target_repo: None,
443 }
444 }
445
446 #[test]
447 fn add_node_and_check() {
448 let mut g = DependencyGraph::new("test");
449 g.add_node(make_node(1));
450 assert!(g.contains(1));
451 assert!(!g.contains(2));
452 assert_eq!(g.node_count(), 1);
453 }
454
455 #[test]
456 fn add_edge_and_check() {
457 let mut g = DependencyGraph::new("test");
458 g.add_node(make_node(1));
459 g.add_node(make_node(2));
460 assert!(g.add_edge(2, 1)); assert_eq!(g.dependencies(2), HashSet::from([1]));
463 assert_eq!(g.dependents(1), HashSet::from([2]));
464 }
465
466 #[test]
467 fn self_edge_rejected() {
468 let mut g = DependencyGraph::new("test");
469 g.add_node(make_node(1));
470 assert!(!g.add_edge(1, 1));
471 }
472
473 #[test]
474 fn direct_cycle_detected() {
475 let mut g = DependencyGraph::new("test");
476 g.add_node(make_node(1));
477 g.add_node(make_node(2));
478 assert!(g.add_edge(2, 1)); assert!(!g.add_edge(1, 2)); }
481
482 #[test]
483 fn indirect_cycle_detected() {
484 let mut g = DependencyGraph::new("test");
485 g.add_node(make_node(1));
486 g.add_node(make_node(2));
487 g.add_node(make_node(3));
488 assert!(g.add_edge(2, 1)); assert!(g.add_edge(3, 2)); assert!(!g.add_edge(1, 3)); }
492
493 #[test]
494 fn valid_dag_no_false_cycle() {
495 let mut g = DependencyGraph::new("test");
496 g.add_node(make_node(1));
497 g.add_node(make_node(2));
498 g.add_node(make_node(3));
499 assert!(g.add_edge(2, 1));
500 assert!(g.add_edge(3, 1)); assert!(g.add_edge(3, 2)); }
503
504 #[test]
505 fn ready_issues_returns_pending_with_merged_deps() {
506 let mut g = DependencyGraph::new("test");
507 g.add_node(make_node(1));
508 g.add_node(make_node(2));
509 g.add_edge(2, 1);
510
511 assert_eq!(g.ready_issues(), vec![1]);
513
514 g.transition(1, NodeState::Merged);
516 let ready = g.ready_issues();
517 assert_eq!(ready, vec![2]);
518 }
519
520 #[test]
521 fn ready_issues_empty_when_deps_in_flight() {
522 let mut g = DependencyGraph::new("test");
523 g.add_node(make_node(1));
524 g.add_node(make_node(2));
525 g.add_edge(2, 1);
526 g.transition(1, NodeState::InFlight);
527 assert!(g.ready_issues().is_empty());
528 }
529
530 #[test]
531 fn ready_issues_empty_when_deps_awaiting_merge() {
532 let mut g = DependencyGraph::new("test");
533 g.add_node(make_node(1));
534 g.add_node(make_node(2));
535 g.add_edge(2, 1);
536 g.transition(1, NodeState::AwaitingMerge);
537 assert!(g.ready_issues().is_empty());
538 }
539
540 #[test]
541 fn awaiting_merge_returns_correct_nodes() {
542 let mut g = DependencyGraph::new("test");
543 g.add_node(make_node(1));
544 g.add_node(make_node(2));
545 g.transition(1, NodeState::AwaitingMerge);
546 let awaiting = g.awaiting_merge();
547 assert_eq!(awaiting, vec![1]);
548 }
549
550 #[test]
551 fn all_terminal_checks_all_nodes() {
552 let mut g = DependencyGraph::new("test");
553 g.add_node(make_node(1));
554 g.add_node(make_node(2));
555 assert!(!g.all_terminal());
556
557 g.transition(1, NodeState::Merged);
558 assert!(!g.all_terminal());
559
560 g.transition(2, NodeState::Failed);
561 assert!(g.all_terminal());
562 }
563
564 #[test]
565 fn is_blocked_when_dep_failed() {
566 let mut g = DependencyGraph::new("test");
567 g.add_node(make_node(1));
568 g.add_node(make_node(2));
569 g.add_edge(2, 1);
570 g.transition(1, NodeState::Failed);
571 assert!(g.is_blocked(2));
572 assert!(!g.is_blocked(1));
573 }
574
575 #[test]
576 fn remove_node_cleans_edges() {
577 let mut g = DependencyGraph::new("test");
578 g.add_node(make_node(1));
579 g.add_node(make_node(2));
580 g.add_node(make_node(3));
581 g.add_edge(2, 1);
582 g.add_edge(3, 2);
583
584 g.remove_node(2);
585 assert!(!g.contains(2));
586 assert!(g.dependencies(3).is_empty());
588 assert!(g.dependents(1).is_empty());
590 }
591
592 #[test]
593 fn display_lines_format() {
594 let mut g = DependencyGraph::new("test");
595 g.add_node(make_node(1));
596 g.add_node(make_node(2));
597 g.add_edge(2, 1);
598 g.transition(1, NodeState::Merged);
599
600 let lines = g.display_lines();
601 assert!(!lines.is_empty());
602 assert!(lines.iter().any(|l| l.contains("#1")));
603 assert!(lines.iter().any(|l| l.contains("depends on")));
604 }
605
606 #[test]
607 fn db_roundtrip() {
608 let conn = crate::db::open_in_memory().unwrap();
609 let mut g = DependencyGraph::new("test-session");
610 g.add_node(make_node(1));
611 g.add_node(make_node(2));
612 g.add_node(make_node(3));
613 g.add_edge(2, 1);
614 g.add_edge(3, 1);
615 g.add_edge(3, 2);
616 g.transition(1, NodeState::Merged);
617 g.set_pr_number(1, 99);
618 g.set_run_id(1, "abc");
619
620 g.save_to_db(&conn).unwrap();
621
622 let loaded = DependencyGraph::from_db(&conn, "test-session").unwrap();
623 assert_eq!(loaded.node_count(), 3);
624 assert_eq!(loaded.dependencies(2), HashSet::from([1]));
625 assert_eq!(loaded.dependencies(3), HashSet::from([1, 2]));
626 assert_eq!(loaded.node(1).unwrap().state, NodeState::Merged);
627 assert_eq!(loaded.node(1).unwrap().pr_number, Some(99));
628 assert_eq!(loaded.node(1).unwrap().run_id.as_deref(), Some("abc"));
629 }
630
631 #[test]
632 fn diamond_graph_ready_ordering() {
633 let mut g = DependencyGraph::new("test");
635 g.add_node(make_node(1)); g.add_node(make_node(2)); g.add_node(make_node(3)); g.add_node(make_node(4)); g.add_edge(2, 1); g.add_edge(3, 1); g.add_edge(4, 2); g.add_edge(4, 3); assert_eq!(g.ready_issues(), vec![1]);
647
648 g.transition(1, NodeState::Merged);
650 let mut ready = g.ready_issues();
651 ready.sort_unstable();
652 assert_eq!(ready, vec![2, 3]);
653
654 g.transition(2, NodeState::Merged);
656 assert_eq!(g.ready_issues(), vec![3]);
657
658 g.transition(3, NodeState::Merged);
660 assert_eq!(g.ready_issues(), vec![4]);
661 }
662
663 #[test]
664 fn empty_graph_is_all_terminal() {
665 let g = DependencyGraph::new("test");
666 assert!(g.all_terminal());
667 }
668
669 #[test]
670 fn independent_nodes_all_ready() {
671 let mut g = DependencyGraph::new("test");
672 g.add_node(make_node(1));
673 g.add_node(make_node(2));
674 g.add_node(make_node(3));
675
676 let mut ready = g.ready_issues();
677 ready.sort_unstable();
678 assert_eq!(ready, vec![1, 2, 3]);
679 }
680
681 fn make_planned(number: u32, depends_on: Vec<u32>) -> crate::agents::PlannedNode {
682 crate::agents::PlannedNode {
683 number,
684 title: format!("Issue #{number}"),
685 area: "test".to_string(),
686 predicted_files: vec![],
687 has_migration: false,
688 complexity: crate::agents::Complexity::Full,
689 depends_on,
690 reasoning: String::new(),
691 }
692 }
693
694 fn make_issue(number: u32) -> PipelineIssue {
695 PipelineIssue {
696 number,
697 title: format!("Issue #{number}"),
698 body: String::new(),
699 source: crate::issues::IssueOrigin::Github,
700 target_repo: None,
701 author: None,
702 }
703 }
704
705 #[test]
706 fn from_planner_output_basic() {
707 let plan = crate::agents::PlannerGraphOutput {
708 nodes: vec![
709 make_planned(1, vec![]),
710 make_planned(2, vec![]),
711 make_planned(3, vec![1, 2]),
712 ],
713 total_issues: 3,
714 parallel_capacity: 2,
715 };
716 let issues = vec![make_issue(1), make_issue(2), make_issue(3)];
717
718 let g = DependencyGraph::from_planner_output("sess", &plan, &issues);
719 assert_eq!(g.node_count(), 3);
720 assert_eq!(g.dependencies(3), HashSet::from([1, 2]));
721 assert!(g.dependencies(1).is_empty());
722 assert!(g.node(1).unwrap().issue.is_some());
724 assert!(g.node(2).unwrap().issue.is_some());
725 }
726
727 #[test]
728 fn from_planner_output_skips_cycle() {
729 let plan = crate::agents::PlannerGraphOutput {
730 nodes: vec![make_planned(1, vec![2]), make_planned(2, vec![1])],
731 total_issues: 2,
732 parallel_capacity: 1,
733 };
734
735 let g = DependencyGraph::from_planner_output("sess", &plan, &[]);
736 assert_eq!(g.node_count(), 2);
738 let total_edges: usize = [1, 2].iter().map(|n| g.dependencies(*n).len()).sum();
739 assert_eq!(total_edges, 1);
740 }
741
742 #[test]
743 fn merge_planner_output_adds_new_only() {
744 let plan1 = crate::agents::PlannerGraphOutput {
745 nodes: vec![make_planned(1, vec![])],
746 total_issues: 1,
747 parallel_capacity: 1,
748 };
749 let mut g = DependencyGraph::from_planner_output("sess", &plan1, &[make_issue(1)]);
750 g.transition(1, NodeState::InFlight);
751
752 let plan2 = crate::agents::PlannerGraphOutput {
754 nodes: vec![make_planned(1, vec![]), make_planned(2, vec![1])],
755 total_issues: 2,
756 parallel_capacity: 1,
757 };
758 g.merge_planner_output(&plan2, &[make_issue(2)]);
759
760 assert_eq!(g.node_count(), 2);
761 assert_eq!(g.node(1).unwrap().state, NodeState::InFlight);
763 assert_eq!(g.node(2).unwrap().state, NodeState::Pending);
765 assert_eq!(g.dependencies(2), HashSet::from([1]));
766 }
767
768 #[test]
769 fn merge_planner_output_cross_edges() {
770 let mut g = DependencyGraph::new("sess");
771 g.add_node(make_node(1));
772 g.transition(1, NodeState::Merged);
773
774 let plan = crate::agents::PlannerGraphOutput {
775 nodes: vec![make_planned(2, vec![1])],
776 total_issues: 1,
777 parallel_capacity: 1,
778 };
779 g.merge_planner_output(&plan, &[make_issue(2)]);
780
781 assert_eq!(g.dependencies(2), HashSet::from([1]));
782 assert_eq!(g.ready_issues(), vec![2]);
784 }
785
786 #[test]
787 fn propagate_failure_linear_chain() {
788 let mut g = DependencyGraph::new("test");
789 g.add_node(make_node(1));
790 g.add_node(make_node(2));
791 g.add_node(make_node(3));
792 g.add_edge(2, 1);
793 g.add_edge(3, 2);
794
795 g.transition(1, NodeState::Failed);
796 let mut failed = g.propagate_failure(1);
797 failed.sort_unstable();
798 assert_eq!(failed, vec![2, 3]);
799 assert_eq!(g.node(2).unwrap().state, NodeState::Failed);
800 assert_eq!(g.node(3).unwrap().state, NodeState::Failed);
801 }
802
803 #[test]
804 fn propagate_failure_diamond() {
805 let mut g = DependencyGraph::new("test");
807 for i in 1..=4 {
808 g.add_node(make_node(i));
809 }
810 g.add_edge(2, 1);
811 g.add_edge(3, 1);
812 g.add_edge(4, 2);
813 g.add_edge(4, 3);
814
815 g.transition(1, NodeState::Failed);
816 let mut failed = g.propagate_failure(1);
817 failed.sort_unstable();
818 assert_eq!(failed, vec![2, 3, 4]);
819 }
820
821 #[test]
822 fn propagate_failure_partial_branch() {
823 let mut g = DependencyGraph::new("test");
825 for i in 1..=4 {
826 g.add_node(make_node(i));
827 }
828 g.add_edge(3, 1);
829 g.add_edge(4, 2);
830
831 g.transition(1, NodeState::Failed);
832 let failed = g.propagate_failure(1);
833 assert_eq!(failed, vec![3]);
834 assert_eq!(g.node(4).unwrap().state, NodeState::Pending);
836 }
837
838 #[test]
839 fn propagate_failure_skips_merged() {
840 let mut g = DependencyGraph::new("test");
841 g.add_node(make_node(1));
842 g.add_node(make_node(2));
843 g.add_node(make_node(3));
844 g.add_edge(2, 1);
845 g.add_edge(3, 2);
846 g.transition(2, NodeState::Merged);
848
849 g.transition(1, NodeState::Failed);
850 let failed = g.propagate_failure(1);
851 assert!(failed.is_empty());
853 assert_eq!(g.node(2).unwrap().state, NodeState::Merged);
854 assert_eq!(g.node(3).unwrap().state, NodeState::Pending);
855 }
856
857 #[test]
858 fn propagate_failure_returns_newly_failed() {
859 let mut g = DependencyGraph::new("test");
860 g.add_node(make_node(1));
861 g.add_node(make_node(2));
862 g.add_node(make_node(3));
863 g.add_edge(2, 1);
864 g.add_edge(3, 1);
865
866 g.transition(1, NodeState::Failed);
867 let mut failed = g.propagate_failure(1);
868 failed.sort_unstable();
869 assert_eq!(failed, vec![2, 3]);
870 let failed2 = g.propagate_failure(1);
872 assert!(failed2.is_empty());
873 }
874
875 #[test]
876 fn to_graph_context_includes_all_nodes() {
877 let mut g = DependencyGraph::new("test");
878 g.add_node(make_node(1));
879 g.add_node(make_node(2));
880 g.add_node(make_node(3));
881 g.add_edge(2, 1);
882 g.add_edge(3, 1);
883 g.add_edge(3, 2);
884 g.transition(1, NodeState::InFlight);
885
886 let ctx = g.to_graph_context();
887 assert_eq!(ctx.len(), 3);
888
889 let ctx_map: HashMap<u32, &crate::agents::GraphContextNode> =
890 ctx.iter().map(|c| (c.number, c)).collect();
891
892 let c1 = ctx_map[&1];
893 assert_eq!(c1.state, NodeState::InFlight);
894 assert!(c1.depends_on.is_empty());
895
896 let c2 = ctx_map[&2];
897 assert_eq!(c2.state, NodeState::Pending);
898 assert_eq!(c2.depends_on, vec![1]);
899
900 let c3 = ctx_map[&3];
901 assert_eq!(c3.state, NodeState::Pending);
902 assert_eq!(c3.depends_on, vec![1, 2]);
903 }
904
905 #[test]
906 fn save_to_db_is_atomic_on_success() {
907 let conn = crate::db::open_in_memory().unwrap();
908 let mut g = DependencyGraph::new("atomic-test");
909 g.add_node(make_node(1));
910 g.add_node(make_node(2));
911 g.add_edge(2, 1);
912
913 g.save_to_db(&conn).unwrap();
914
915 let mut g2 = DependencyGraph::new("atomic-test");
917 g2.add_node(make_node(10));
918 g2.save_to_db(&conn).unwrap();
919
920 let loaded = DependencyGraph::from_db(&conn, "atomic-test").unwrap();
921 assert_eq!(loaded.node_count(), 1);
923 assert!(loaded.contains(10));
924 assert!(!loaded.contains(1));
925 assert!(!loaded.contains(2));
926 }
927}