Skip to main content

oven_cli/pipeline/
graph.rs

1use std::collections::{HashMap, HashSet};
2
3use anyhow::{Context, Result};
4use rusqlite::Connection;
5use tracing::{info, warn};
6
7use crate::{
8    agents::{PlannedNode, PlannerGraphOutput},
9    db::graph::{self, GraphNodeRow, NodeState},
10    issues::PipelineIssue,
11};
12
13/// A node in the dependency graph, holding all metadata for scheduling.
14#[derive(Debug, Clone)]
15pub struct GraphNode {
16    pub issue_number: u32,
17    pub title: String,
18    pub area: String,
19    pub predicted_files: Vec<String>,
20    pub has_migration: bool,
21    pub complexity: String,
22    pub state: NodeState,
23    pub pr_number: Option<u32>,
24    pub run_id: Option<String>,
25    pub issue: Option<PipelineIssue>,
26    /// Target repo name for multi-repo routing. Persisted separately from `issue`
27    /// so it survives DB round-trips (where `issue` is `None`).
28    pub target_repo: Option<String>,
29}
30
31/// Directed acyclic graph tracking issue dependencies.
32///
33/// Edges point from dependent to dependency: if A depends on B, `edges[A]`
34/// contains B. The graph enforces acyclicity on insertion.
35pub struct DependencyGraph {
36    session_id: String,
37    nodes: HashMap<u32, GraphNode>,
38    /// Forward edges: `edges[a]` = set of issues that `a` depends on.
39    edges: HashMap<u32, HashSet<u32>>,
40    /// Reverse edges: `reverse_edges[b]` = set of issues that depend on `b`.
41    reverse_edges: HashMap<u32, HashSet<u32>>,
42}
43
44impl DependencyGraph {
45    pub fn new(session_id: &str) -> Self {
46        Self {
47            session_id: session_id.to_string(),
48            nodes: HashMap::new(),
49            edges: HashMap::new(),
50            reverse_edges: HashMap::new(),
51        }
52    }
53
54    pub fn session_id(&self) -> &str {
55        &self.session_id
56    }
57
58    pub fn contains(&self, issue: u32) -> bool {
59        self.nodes.contains_key(&issue)
60    }
61
62    pub fn node(&self, issue: u32) -> Option<&GraphNode> {
63        self.nodes.get(&issue)
64    }
65
66    pub fn node_count(&self) -> usize {
67        self.nodes.len()
68    }
69
70    pub fn add_node(&mut self, node: GraphNode) {
71        let num = node.issue_number;
72        self.nodes.insert(num, node);
73        self.edges.entry(num).or_default();
74        self.reverse_edges.entry(num).or_default();
75    }
76
77    /// Add a dependency edge: `from` depends on `to`.
78    ///
79    /// Returns `false` if the edge would create a cycle (edge not added).
80    pub fn add_edge(&mut self, from: u32, to: u32) -> bool {
81        if from == to || self.would_create_cycle(from, to) {
82            return false;
83        }
84        self.edges.entry(from).or_default().insert(to);
85        self.reverse_edges.entry(to).or_default().insert(from);
86        true
87    }
88
89    /// Check if adding an edge from -> to would create a cycle.
90    ///
91    /// A cycle exists if `to` transitively depends on `from`.
92    pub fn would_create_cycle(&self, from: u32, to: u32) -> bool {
93        let mut visited = HashSet::new();
94        let mut stack = vec![to];
95        while let Some(current) = stack.pop() {
96            if current == from {
97                return true;
98            }
99            if visited.insert(current) {
100                if let Some(deps) = self.edges.get(&current) {
101                    for &dep in deps {
102                        if !visited.contains(&dep) {
103                            stack.push(dep);
104                        }
105                    }
106                }
107            }
108        }
109        false
110    }
111
112    /// Issues in `Pending` state whose dependencies are all `Merged`.
113    pub fn ready_issues(&self) -> Vec<u32> {
114        self.nodes
115            .iter()
116            .filter(|(_, node)| node.state == NodeState::Pending)
117            .filter(|(num, _)| {
118                self.edges.get(num).is_none_or(|deps| {
119                    deps.iter()
120                        .all(|d| self.nodes.get(d).is_some_and(|n| n.state == NodeState::Merged))
121                })
122            })
123            .map(|(num, _)| *num)
124            .collect()
125    }
126
127    /// Issues currently in `AwaitingMerge` state.
128    pub fn awaiting_merge(&self) -> Vec<u32> {
129        self.nodes
130            .iter()
131            .filter(|(_, node)| node.state == NodeState::AwaitingMerge)
132            .map(|(num, _)| *num)
133            .collect()
134    }
135
136    /// Transition a node to a new state.
137    pub fn transition(&mut self, issue: u32, state: NodeState) {
138        if let Some(node) = self.nodes.get_mut(&issue) {
139            info!(
140                issue,
141                from = %node.state,
142                to = %state,
143                "graph node state transition"
144            );
145            node.state = state;
146        }
147    }
148
149    /// Set the PR number on a node.
150    pub fn set_pr_number(&mut self, issue: u32, pr_number: u32) {
151        if let Some(node) = self.nodes.get_mut(&issue) {
152            node.pr_number = Some(pr_number);
153        }
154    }
155
156    /// Set the run ID on a node.
157    pub fn set_run_id(&mut self, issue: u32, run_id: &str) {
158        if let Some(node) = self.nodes.get_mut(&issue) {
159            node.run_id = Some(run_id.to_string());
160        }
161    }
162
163    /// Get the set of issues that `issue` depends on.
164    pub fn dependencies(&self, issue: u32) -> HashSet<u32> {
165        self.edges.get(&issue).cloned().unwrap_or_default()
166    }
167
168    /// Get the set of issues that depend on `issue`.
169    pub fn dependents(&self, issue: u32) -> HashSet<u32> {
170        self.reverse_edges.get(&issue).cloned().unwrap_or_default()
171    }
172
173    /// Whether every node is in a terminal state (`Merged` or `Failed`).
174    pub fn all_terminal(&self) -> bool {
175        self.nodes.values().all(|n| matches!(n.state, NodeState::Merged | NodeState::Failed))
176    }
177
178    /// Whether a node is blocked because at least one dependency has failed.
179    pub fn is_blocked(&self, issue: u32) -> bool {
180        self.edges.get(&issue).is_some_and(|deps| {
181            deps.iter().any(|d| self.nodes.get(d).is_some_and(|n| n.state == NodeState::Failed))
182        })
183    }
184
185    /// Propagate failure from a node to all transitive dependents.
186    ///
187    /// Any `Pending` or `InFlight` node reachable via `reverse_edges` from the
188    /// failed node is transitioned to `Failed`. Returns the list of issue
189    /// numbers that were newly failed (excludes the original node).
190    pub fn propagate_failure(&mut self, issue: u32) -> Vec<u32> {
191        use std::collections::VecDeque;
192
193        let mut queue = VecDeque::new();
194        let mut newly_failed = Vec::new();
195
196        // Seed with direct dependents of the failed node
197        if let Some(dependents) = self.reverse_edges.get(&issue) {
198            queue.extend(dependents.iter().copied());
199        }
200
201        let mut visited = HashSet::new();
202        visited.insert(issue);
203
204        while let Some(current) = queue.pop_front() {
205            if !visited.insert(current) {
206                continue;
207            }
208            let dominated = self
209                .nodes
210                .get(&current)
211                .is_some_and(|n| matches!(n.state, NodeState::Pending | NodeState::InFlight));
212            if !dominated {
213                continue;
214            }
215            self.transition(current, NodeState::Failed);
216            newly_failed.push(current);
217            if let Some(dependents) = self.reverse_edges.get(&current) {
218                queue.extend(dependents.iter().copied());
219            }
220        }
221
222        newly_failed
223    }
224
225    /// Remove a node and all its edges (for stale issue cleanup).
226    pub fn remove_node(&mut self, issue: u32) {
227        self.nodes.remove(&issue);
228        if let Some(deps) = self.edges.remove(&issue) {
229            for dep in &deps {
230                if let Some(rev) = self.reverse_edges.get_mut(dep) {
231                    rev.remove(&issue);
232                }
233            }
234        }
235        if let Some(dependents) = self.reverse_edges.remove(&issue) {
236            for dependent in &dependents {
237                if let Some(fwd) = self.edges.get_mut(dependent) {
238                    fwd.remove(&issue);
239                }
240            }
241        }
242    }
243
244    /// All issue numbers in the graph.
245    pub fn all_issues(&self) -> Vec<u32> {
246        let mut nums: Vec<u32> = self.nodes.keys().copied().collect();
247        nums.sort_unstable();
248        nums
249    }
250
251    /// Load graph state from the database.
252    pub fn from_db(conn: &Connection, session_id: &str) -> Result<Self> {
253        let db_nodes = graph::get_nodes(conn, session_id).context("loading graph nodes")?;
254        let db_edges = graph::get_edges(conn, session_id).context("loading graph edges")?;
255
256        let mut g = Self::new(session_id);
257        for row in &db_nodes {
258            g.add_node(GraphNode {
259                issue_number: row.issue_number,
260                title: row.title.clone(),
261                area: row.area.clone(),
262                predicted_files: row.predicted_files.clone(),
263                has_migration: row.has_migration,
264                complexity: row.complexity.clone(),
265                state: row.state,
266                pr_number: row.pr_number,
267                run_id: row.run_id.clone(),
268                issue: None,
269                target_repo: row.target_repo.clone(),
270            });
271        }
272        for (from, to) in &db_edges {
273            if !g.add_edge(*from, *to) {
274                warn!(from, to, "skipping persisted edge that would create cycle");
275            }
276        }
277
278        Ok(g)
279    }
280
281    /// Persist the full graph state to the database, replacing any existing data for
282    /// this session. Runs inside a transaction so a crash mid-save cannot leave a
283    /// partial graph.
284    pub fn save_to_db(&self, conn: &Connection) -> Result<()> {
285        let tx = conn.unchecked_transaction().context("starting graph save transaction")?;
286
287        graph::delete_session(&tx, &self.session_id)?;
288        for node in self.nodes.values() {
289            let row = GraphNodeRow {
290                issue_number: node.issue_number,
291                session_id: self.session_id.clone(),
292                state: node.state,
293                pr_number: node.pr_number,
294                run_id: node.run_id.clone(),
295                title: node.title.clone(),
296                area: node.area.clone(),
297                predicted_files: node.predicted_files.clone(),
298                has_migration: node.has_migration,
299                complexity: node.complexity.clone(),
300                target_repo: node.target_repo.clone(),
301            };
302            graph::insert_node(&tx, &self.session_id, &row)?;
303        }
304        for (&from, deps) in &self.edges {
305            for &to in deps {
306                graph::insert_edge(&tx, &self.session_id, from, to)?;
307            }
308        }
309
310        tx.commit().context("committing graph save transaction")?;
311        Ok(())
312    }
313
314    /// Build a graph from planner output, matching issues by number.
315    pub fn from_planner_output(
316        session_id: &str,
317        plan: &PlannerGraphOutput,
318        issues: &[PipelineIssue],
319    ) -> Self {
320        let issue_map: HashMap<u32, &PipelineIssue> =
321            issues.iter().map(|i| (i.number, i)).collect();
322        let mut g = Self::new(session_id);
323        for node in &plan.nodes {
324            g.add_node(node_from_planned(node, issue_map.get(&node.number).copied()));
325        }
326        add_planned_edges(&mut g, &plan.nodes);
327        g
328    }
329
330    /// Merge new planner output into an existing graph (polling mode).
331    ///
332    /// Only adds nodes not already present. Edges between new nodes and
333    /// existing nodes are added if they don't create cycles.
334    pub fn merge_planner_output(&mut self, plan: &PlannerGraphOutput, issues: &[PipelineIssue]) {
335        let issue_map: HashMap<u32, &PipelineIssue> =
336            issues.iter().map(|i| (i.number, i)).collect();
337        let new_nodes: Vec<&PlannedNode> =
338            plan.nodes.iter().filter(|n| !self.contains(n.number)).collect();
339        for node in &new_nodes {
340            self.add_node(node_from_planned(node, issue_map.get(&node.number).copied()));
341        }
342        add_planned_edges(self, &new_nodes);
343    }
344
345    /// Format the graph for display in CLI output.
346    pub fn display_lines(&self) -> Vec<String> {
347        let mut lines = Vec::new();
348        let issues = self.all_issues();
349
350        for num in issues {
351            let Some(node) = self.nodes.get(&num) else { continue };
352            let blocked = if self.is_blocked(num) { " (blocked)" } else { "" };
353            let state_str = format!("[{}]{blocked}", node.state);
354            lines.push(format!("  #{num} {} {:.<40} {state_str}", node.title, "."));
355            let deps = self.dependencies(num);
356            if !deps.is_empty() {
357                let mut dep_nums: Vec<u32> = deps.into_iter().collect();
358                dep_nums.sort_unstable();
359                let dep_strs: Vec<String> = dep_nums.iter().map(|d| format!("#{d}")).collect();
360                lines.push(format!("    depends on: {}", dep_strs.join(", ")));
361            }
362        }
363        lines
364    }
365
366    /// Build planner context from the current graph state.
367    ///
368    /// Produces one `GraphContextNode` per node so the planner can see
369    /// in-flight work and avoid scheduling conflicts.
370    pub fn to_graph_context(&self) -> Vec<crate::agents::GraphContextNode> {
371        self.all_issues()
372            .into_iter()
373            .filter_map(|num| {
374                let node = self.nodes.get(&num)?;
375                let depends_on: Vec<u32> = self.edges.get(&num).map_or_else(Vec::new, |deps| {
376                    let mut v: Vec<u32> = deps.iter().copied().collect();
377                    v.sort_unstable();
378                    v
379                });
380                Some(crate::agents::GraphContextNode {
381                    number: num,
382                    title: node.title.clone(),
383                    state: node.state,
384                    area: node.area.clone(),
385                    predicted_files: node.predicted_files.clone(),
386                    has_migration: node.has_migration,
387                    depends_on,
388                    target_repo: node.target_repo.clone(),
389                })
390            })
391            .collect()
392    }
393}
394
395fn node_from_planned(node: &PlannedNode, issue: Option<&PipelineIssue>) -> GraphNode {
396    GraphNode {
397        issue_number: node.number,
398        title: node.title.clone(),
399        area: node.area.clone(),
400        predicted_files: node.predicted_files.clone(),
401        has_migration: node.has_migration,
402        complexity: node.complexity.to_string(),
403        state: NodeState::Pending,
404        pr_number: None,
405        run_id: None,
406        target_repo: issue.and_then(|i| i.target_repo.clone()),
407        issue: issue.cloned(),
408    }
409}
410
411fn add_planned_edges(graph: &mut DependencyGraph, nodes: &[impl std::borrow::Borrow<PlannedNode>]) {
412    for node in nodes {
413        let node = node.borrow();
414        for &dep in &node.depends_on {
415            if !graph.add_edge(node.number, dep) {
416                warn!(
417                    from = node.number,
418                    to = dep,
419                    "skipping planner edge that would create cycle"
420                );
421            }
422        }
423    }
424}
425
426#[cfg(test)]
427mod tests {
428    use super::*;
429
430    fn make_node(num: u32) -> GraphNode {
431        GraphNode {
432            issue_number: num,
433            title: format!("Issue #{num}"),
434            area: "test".to_string(),
435            predicted_files: vec![],
436            has_migration: false,
437            complexity: "full".to_string(),
438            state: NodeState::Pending,
439            pr_number: None,
440            run_id: None,
441            issue: None,
442            target_repo: None,
443        }
444    }
445
446    #[test]
447    fn add_node_and_check() {
448        let mut g = DependencyGraph::new("test");
449        g.add_node(make_node(1));
450        assert!(g.contains(1));
451        assert!(!g.contains(2));
452        assert_eq!(g.node_count(), 1);
453    }
454
455    #[test]
456    fn add_edge_and_check() {
457        let mut g = DependencyGraph::new("test");
458        g.add_node(make_node(1));
459        g.add_node(make_node(2));
460        assert!(g.add_edge(2, 1)); // 2 depends on 1
461
462        assert_eq!(g.dependencies(2), HashSet::from([1]));
463        assert_eq!(g.dependents(1), HashSet::from([2]));
464    }
465
466    #[test]
467    fn self_edge_rejected() {
468        let mut g = DependencyGraph::new("test");
469        g.add_node(make_node(1));
470        assert!(!g.add_edge(1, 1));
471    }
472
473    #[test]
474    fn direct_cycle_detected() {
475        let mut g = DependencyGraph::new("test");
476        g.add_node(make_node(1));
477        g.add_node(make_node(2));
478        assert!(g.add_edge(2, 1)); // 2 depends on 1
479        assert!(!g.add_edge(1, 2)); // would create cycle
480    }
481
482    #[test]
483    fn indirect_cycle_detected() {
484        let mut g = DependencyGraph::new("test");
485        g.add_node(make_node(1));
486        g.add_node(make_node(2));
487        g.add_node(make_node(3));
488        assert!(g.add_edge(2, 1)); // 2 depends on 1
489        assert!(g.add_edge(3, 2)); // 3 depends on 2
490        assert!(!g.add_edge(1, 3)); // would create 1 -> 3 -> 2 -> 1 cycle
491    }
492
493    #[test]
494    fn valid_dag_no_false_cycle() {
495        let mut g = DependencyGraph::new("test");
496        g.add_node(make_node(1));
497        g.add_node(make_node(2));
498        g.add_node(make_node(3));
499        assert!(g.add_edge(2, 1));
500        assert!(g.add_edge(3, 1)); // diamond top, both depend on 1
501        assert!(g.add_edge(3, 2)); // 3 also depends on 2
502    }
503
504    #[test]
505    fn ready_issues_returns_pending_with_merged_deps() {
506        let mut g = DependencyGraph::new("test");
507        g.add_node(make_node(1));
508        g.add_node(make_node(2));
509        g.add_edge(2, 1);
510
511        // 1 is pending with no deps, so it's ready
512        assert_eq!(g.ready_issues(), vec![1]);
513
514        // Merge node 1, now node 2 should be ready
515        g.transition(1, NodeState::Merged);
516        let ready = g.ready_issues();
517        assert_eq!(ready, vec![2]);
518    }
519
520    #[test]
521    fn ready_issues_empty_when_deps_in_flight() {
522        let mut g = DependencyGraph::new("test");
523        g.add_node(make_node(1));
524        g.add_node(make_node(2));
525        g.add_edge(2, 1);
526        g.transition(1, NodeState::InFlight);
527        assert!(g.ready_issues().is_empty());
528    }
529
530    #[test]
531    fn ready_issues_empty_when_deps_awaiting_merge() {
532        let mut g = DependencyGraph::new("test");
533        g.add_node(make_node(1));
534        g.add_node(make_node(2));
535        g.add_edge(2, 1);
536        g.transition(1, NodeState::AwaitingMerge);
537        assert!(g.ready_issues().is_empty());
538    }
539
540    #[test]
541    fn awaiting_merge_returns_correct_nodes() {
542        let mut g = DependencyGraph::new("test");
543        g.add_node(make_node(1));
544        g.add_node(make_node(2));
545        g.transition(1, NodeState::AwaitingMerge);
546        let awaiting = g.awaiting_merge();
547        assert_eq!(awaiting, vec![1]);
548    }
549
550    #[test]
551    fn all_terminal_checks_all_nodes() {
552        let mut g = DependencyGraph::new("test");
553        g.add_node(make_node(1));
554        g.add_node(make_node(2));
555        assert!(!g.all_terminal());
556
557        g.transition(1, NodeState::Merged);
558        assert!(!g.all_terminal());
559
560        g.transition(2, NodeState::Failed);
561        assert!(g.all_terminal());
562    }
563
564    #[test]
565    fn is_blocked_when_dep_failed() {
566        let mut g = DependencyGraph::new("test");
567        g.add_node(make_node(1));
568        g.add_node(make_node(2));
569        g.add_edge(2, 1);
570        g.transition(1, NodeState::Failed);
571        assert!(g.is_blocked(2));
572        assert!(!g.is_blocked(1));
573    }
574
575    #[test]
576    fn remove_node_cleans_edges() {
577        let mut g = DependencyGraph::new("test");
578        g.add_node(make_node(1));
579        g.add_node(make_node(2));
580        g.add_node(make_node(3));
581        g.add_edge(2, 1);
582        g.add_edge(3, 2);
583
584        g.remove_node(2);
585        assert!(!g.contains(2));
586        // Edge from 3 to 2 should be gone
587        assert!(g.dependencies(3).is_empty());
588        // Reverse edge from 1 to 2 should be gone
589        assert!(g.dependents(1).is_empty());
590    }
591
592    #[test]
593    fn display_lines_format() {
594        let mut g = DependencyGraph::new("test");
595        g.add_node(make_node(1));
596        g.add_node(make_node(2));
597        g.add_edge(2, 1);
598        g.transition(1, NodeState::Merged);
599
600        let lines = g.display_lines();
601        assert!(!lines.is_empty());
602        assert!(lines.iter().any(|l| l.contains("#1")));
603        assert!(lines.iter().any(|l| l.contains("depends on")));
604    }
605
606    #[test]
607    fn db_roundtrip() {
608        let conn = crate::db::open_in_memory().unwrap();
609        let mut g = DependencyGraph::new("test-session");
610        g.add_node(make_node(1));
611        g.add_node(make_node(2));
612        g.add_node(make_node(3));
613        g.add_edge(2, 1);
614        g.add_edge(3, 1);
615        g.add_edge(3, 2);
616        g.transition(1, NodeState::Merged);
617        g.set_pr_number(1, 99);
618        g.set_run_id(1, "abc");
619
620        g.save_to_db(&conn).unwrap();
621
622        let loaded = DependencyGraph::from_db(&conn, "test-session").unwrap();
623        assert_eq!(loaded.node_count(), 3);
624        assert_eq!(loaded.dependencies(2), HashSet::from([1]));
625        assert_eq!(loaded.dependencies(3), HashSet::from([1, 2]));
626        assert_eq!(loaded.node(1).unwrap().state, NodeState::Merged);
627        assert_eq!(loaded.node(1).unwrap().pr_number, Some(99));
628        assert_eq!(loaded.node(1).unwrap().run_id.as_deref(), Some("abc"));
629    }
630
631    #[test]
632    fn diamond_graph_ready_ordering() {
633        // A -> B, A -> C, B -> D, C -> D (D is the root)
634        let mut g = DependencyGraph::new("test");
635        g.add_node(make_node(1)); // D
636        g.add_node(make_node(2)); // B
637        g.add_node(make_node(3)); // C
638        g.add_node(make_node(4)); // A
639
640        g.add_edge(2, 1); // B depends on D
641        g.add_edge(3, 1); // C depends on D
642        g.add_edge(4, 2); // A depends on B
643        g.add_edge(4, 3); // A depends on C
644
645        // Only D is ready initially
646        assert_eq!(g.ready_issues(), vec![1]);
647
648        // Merge D, B and C become ready
649        g.transition(1, NodeState::Merged);
650        let mut ready = g.ready_issues();
651        ready.sort_unstable();
652        assert_eq!(ready, vec![2, 3]);
653
654        // Merge B, A still waiting on C
655        g.transition(2, NodeState::Merged);
656        assert_eq!(g.ready_issues(), vec![3]);
657
658        // Merge C, now A is ready
659        g.transition(3, NodeState::Merged);
660        assert_eq!(g.ready_issues(), vec![4]);
661    }
662
663    #[test]
664    fn empty_graph_is_all_terminal() {
665        let g = DependencyGraph::new("test");
666        assert!(g.all_terminal());
667    }
668
669    #[test]
670    fn independent_nodes_all_ready() {
671        let mut g = DependencyGraph::new("test");
672        g.add_node(make_node(1));
673        g.add_node(make_node(2));
674        g.add_node(make_node(3));
675
676        let mut ready = g.ready_issues();
677        ready.sort_unstable();
678        assert_eq!(ready, vec![1, 2, 3]);
679    }
680
681    fn make_planned(number: u32, depends_on: Vec<u32>) -> crate::agents::PlannedNode {
682        crate::agents::PlannedNode {
683            number,
684            title: format!("Issue #{number}"),
685            area: "test".to_string(),
686            predicted_files: vec![],
687            has_migration: false,
688            complexity: crate::agents::Complexity::Full,
689            depends_on,
690            reasoning: String::new(),
691        }
692    }
693
694    fn make_issue(number: u32) -> PipelineIssue {
695        PipelineIssue {
696            number,
697            title: format!("Issue #{number}"),
698            body: String::new(),
699            source: crate::issues::IssueOrigin::Github,
700            target_repo: None,
701            author: None,
702        }
703    }
704
705    #[test]
706    fn from_planner_output_basic() {
707        let plan = crate::agents::PlannerGraphOutput {
708            nodes: vec![
709                make_planned(1, vec![]),
710                make_planned(2, vec![]),
711                make_planned(3, vec![1, 2]),
712            ],
713            total_issues: 3,
714            parallel_capacity: 2,
715        };
716        let issues = vec![make_issue(1), make_issue(2), make_issue(3)];
717
718        let g = DependencyGraph::from_planner_output("sess", &plan, &issues);
719        assert_eq!(g.node_count(), 3);
720        assert_eq!(g.dependencies(3), HashSet::from([1, 2]));
721        assert!(g.dependencies(1).is_empty());
722        // Issues should be attached
723        assert!(g.node(1).unwrap().issue.is_some());
724        assert!(g.node(2).unwrap().issue.is_some());
725    }
726
727    #[test]
728    fn from_planner_output_skips_cycle() {
729        let plan = crate::agents::PlannerGraphOutput {
730            nodes: vec![make_planned(1, vec![2]), make_planned(2, vec![1])],
731            total_issues: 2,
732            parallel_capacity: 1,
733        };
734
735        let g = DependencyGraph::from_planner_output("sess", &plan, &[]);
736        // One edge should succeed, the other should be skipped (cycle)
737        assert_eq!(g.node_count(), 2);
738        let total_edges: usize = [1, 2].iter().map(|n| g.dependencies(*n).len()).sum();
739        assert_eq!(total_edges, 1);
740    }
741
742    #[test]
743    fn merge_planner_output_adds_new_only() {
744        let plan1 = crate::agents::PlannerGraphOutput {
745            nodes: vec![make_planned(1, vec![])],
746            total_issues: 1,
747            parallel_capacity: 1,
748        };
749        let mut g = DependencyGraph::from_planner_output("sess", &plan1, &[make_issue(1)]);
750        g.transition(1, NodeState::InFlight);
751
752        // Merge a plan that includes node 1 again and adds node 2
753        let plan2 = crate::agents::PlannerGraphOutput {
754            nodes: vec![make_planned(1, vec![]), make_planned(2, vec![1])],
755            total_issues: 2,
756            parallel_capacity: 1,
757        };
758        g.merge_planner_output(&plan2, &[make_issue(2)]);
759
760        assert_eq!(g.node_count(), 2);
761        // Node 1 should still be InFlight (not overwritten)
762        assert_eq!(g.node(1).unwrap().state, NodeState::InFlight);
763        // Node 2 should be Pending with edge to 1
764        assert_eq!(g.node(2).unwrap().state, NodeState::Pending);
765        assert_eq!(g.dependencies(2), HashSet::from([1]));
766    }
767
768    #[test]
769    fn merge_planner_output_cross_edges() {
770        let mut g = DependencyGraph::new("sess");
771        g.add_node(make_node(1));
772        g.transition(1, NodeState::Merged);
773
774        let plan = crate::agents::PlannerGraphOutput {
775            nodes: vec![make_planned(2, vec![1])],
776            total_issues: 1,
777            parallel_capacity: 1,
778        };
779        g.merge_planner_output(&plan, &[make_issue(2)]);
780
781        assert_eq!(g.dependencies(2), HashSet::from([1]));
782        // Node 2 should be ready since node 1 is merged
783        assert_eq!(g.ready_issues(), vec![2]);
784    }
785
786    #[test]
787    fn propagate_failure_linear_chain() {
788        let mut g = DependencyGraph::new("test");
789        g.add_node(make_node(1));
790        g.add_node(make_node(2));
791        g.add_node(make_node(3));
792        g.add_edge(2, 1);
793        g.add_edge(3, 2);
794
795        g.transition(1, NodeState::Failed);
796        let mut failed = g.propagate_failure(1);
797        failed.sort_unstable();
798        assert_eq!(failed, vec![2, 3]);
799        assert_eq!(g.node(2).unwrap().state, NodeState::Failed);
800        assert_eq!(g.node(3).unwrap().state, NodeState::Failed);
801    }
802
803    #[test]
804    fn propagate_failure_diamond() {
805        // 1 is root, 2 and 3 depend on 1, 4 depends on 2 and 3
806        let mut g = DependencyGraph::new("test");
807        for i in 1..=4 {
808            g.add_node(make_node(i));
809        }
810        g.add_edge(2, 1);
811        g.add_edge(3, 1);
812        g.add_edge(4, 2);
813        g.add_edge(4, 3);
814
815        g.transition(1, NodeState::Failed);
816        let mut failed = g.propagate_failure(1);
817        failed.sort_unstable();
818        assert_eq!(failed, vec![2, 3, 4]);
819    }
820
821    #[test]
822    fn propagate_failure_partial_branch() {
823        // 1 and 2 are roots, 3 depends on 1, 4 depends on 2
824        let mut g = DependencyGraph::new("test");
825        for i in 1..=4 {
826            g.add_node(make_node(i));
827        }
828        g.add_edge(3, 1);
829        g.add_edge(4, 2);
830
831        g.transition(1, NodeState::Failed);
832        let failed = g.propagate_failure(1);
833        assert_eq!(failed, vec![3]);
834        // Node 4 should still be Pending (unrelated branch)
835        assert_eq!(g.node(4).unwrap().state, NodeState::Pending);
836    }
837
838    #[test]
839    fn propagate_failure_skips_merged() {
840        let mut g = DependencyGraph::new("test");
841        g.add_node(make_node(1));
842        g.add_node(make_node(2));
843        g.add_node(make_node(3));
844        g.add_edge(2, 1);
845        g.add_edge(3, 2);
846        // Node 2 already merged before 1 fails (unusual but possible)
847        g.transition(2, NodeState::Merged);
848
849        g.transition(1, NodeState::Failed);
850        let failed = g.propagate_failure(1);
851        // Node 2 is merged, skip. Node 3 depends on 2 (merged), not directly on 1.
852        assert!(failed.is_empty());
853        assert_eq!(g.node(2).unwrap().state, NodeState::Merged);
854        assert_eq!(g.node(3).unwrap().state, NodeState::Pending);
855    }
856
857    #[test]
858    fn propagate_failure_returns_newly_failed() {
859        let mut g = DependencyGraph::new("test");
860        g.add_node(make_node(1));
861        g.add_node(make_node(2));
862        g.add_node(make_node(3));
863        g.add_edge(2, 1);
864        g.add_edge(3, 1);
865
866        g.transition(1, NodeState::Failed);
867        let mut failed = g.propagate_failure(1);
868        failed.sort_unstable();
869        assert_eq!(failed, vec![2, 3]);
870        // Calling again should return empty (already failed)
871        let failed2 = g.propagate_failure(1);
872        assert!(failed2.is_empty());
873    }
874
875    #[test]
876    fn to_graph_context_includes_all_nodes() {
877        let mut g = DependencyGraph::new("test");
878        g.add_node(make_node(1));
879        g.add_node(make_node(2));
880        g.add_node(make_node(3));
881        g.add_edge(2, 1);
882        g.add_edge(3, 1);
883        g.add_edge(3, 2);
884        g.transition(1, NodeState::InFlight);
885
886        let ctx = g.to_graph_context();
887        assert_eq!(ctx.len(), 3);
888
889        let ctx_map: HashMap<u32, &crate::agents::GraphContextNode> =
890            ctx.iter().map(|c| (c.number, c)).collect();
891
892        let c1 = ctx_map[&1];
893        assert_eq!(c1.state, NodeState::InFlight);
894        assert!(c1.depends_on.is_empty());
895
896        let c2 = ctx_map[&2];
897        assert_eq!(c2.state, NodeState::Pending);
898        assert_eq!(c2.depends_on, vec![1]);
899
900        let c3 = ctx_map[&3];
901        assert_eq!(c3.state, NodeState::Pending);
902        assert_eq!(c3.depends_on, vec![1, 2]);
903    }
904
905    #[test]
906    fn save_to_db_is_atomic_on_success() {
907        let conn = crate::db::open_in_memory().unwrap();
908        let mut g = DependencyGraph::new("atomic-test");
909        g.add_node(make_node(1));
910        g.add_node(make_node(2));
911        g.add_edge(2, 1);
912
913        g.save_to_db(&conn).unwrap();
914
915        // Overwrite with a different graph to verify the delete+insert is atomic
916        let mut g2 = DependencyGraph::new("atomic-test");
917        g2.add_node(make_node(10));
918        g2.save_to_db(&conn).unwrap();
919
920        let loaded = DependencyGraph::from_db(&conn, "atomic-test").unwrap();
921        // Old nodes should be gone, only node 10 remains
922        assert_eq!(loaded.node_count(), 1);
923        assert!(loaded.contains(10));
924        assert!(!loaded.contains(1));
925        assert!(!loaded.contains(2));
926    }
927}