Skip to main content

aivcs_core/multi_repo/
graph.rs

1//! Cross-repo dependency graph and topological execution planning.
2//!
3//! Models repositories as nodes in a directed acyclic graph (DAG). An edge
4//! `A → B` means "B depends on A" — A must complete before B may run.
5//!
6//! Topological ordering is computed via Kahn's algorithm, producing a
7//! level-ordered result so that same-level repos can be parallelized.
8
9use std::collections::{HashMap, HashSet, VecDeque};
10
11use serde::{Deserialize, Serialize};
12
13use crate::multi_repo::error::{MultiRepoError, MultiRepoResult};
14
15/// A single repository node in the dependency graph.
16#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
17pub struct RepoNode {
18    /// Stable identifier, e.g. `"org/repo-name"`.
19    pub repo_id: String,
20    /// Human-readable display name.
21    pub display_name: String,
22    /// Optional remote URL (for CI polling or webhook targeting).
23    pub remote_url: Option<String>,
24}
25
26impl RepoNode {
27    /// Create a minimal repo node.
28    pub fn new(repo_id: impl Into<String>, display_name: impl Into<String>) -> Self {
29        Self {
30            repo_id: repo_id.into(),
31            display_name: display_name.into(),
32            remote_url: None,
33        }
34    }
35}
36
37/// A step in a repo-aware execution plan.
38///
39/// Mirrors [`crate::role_orchestration::router::RoleStep`] so that the same
40/// parallel-group dispatch logic can drive repo-level execution.
41#[derive(Debug, Clone)]
42pub struct RepoStep {
43    /// 0-indexed position in the plan.
44    pub position: usize,
45    /// The repo assigned to this step.
46    pub repo: RepoNode,
47    /// Repos whose completion this step waits for.
48    pub depends_on: Vec<String>,
49    /// True when this step can run concurrently with sibling steps at the
50    /// same topological level.
51    pub parallelizable: bool,
52}
53
54/// An ordered, validated execution plan for cross-repo operations.
55#[derive(Debug, Clone)]
56pub struct RepoExecutionPlan {
57    /// Human-readable plan title.
58    pub title: String,
59    /// Ordered steps, respecting topological dependency order.
60    pub steps: Vec<RepoStep>,
61}
62
63impl RepoExecutionPlan {
64    /// Partition steps into sequential groups where adjacent parallelizable
65    /// steps form one group and non-parallelizable steps form singletons.
66    ///
67    /// This mirrors `ExecutionPlan::parallel_groups()` from the role
68    /// orchestration module.
69    pub fn parallel_groups(&self) -> Vec<Vec<&RepoStep>> {
70        let mut groups: Vec<Vec<&RepoStep>> = Vec::new();
71        let mut current: Vec<&RepoStep> = Vec::new();
72
73        for step in &self.steps {
74            if step.parallelizable {
75                current.push(step);
76            } else {
77                if !current.is_empty() {
78                    groups.push(std::mem::take(&mut current));
79                }
80                groups.push(vec![step]);
81            }
82        }
83        if !current.is_empty() {
84            groups.push(current);
85        }
86        groups
87    }
88}
89
90/// Directed dependency graph over [`RepoNode`]s.
91///
92/// Edges are stored as `dependency → dependents` adjacency lists.
93/// Cycles are detected at insertion time via DFS.
94#[derive(Debug, Clone, Default)]
95pub struct RepoDependencyGraph {
96    nodes: HashMap<String, RepoNode>,
97    /// `dependency_id → {dependent_id, ...}` (downstream adjacency)
98    downstream: HashMap<String, HashSet<String>>,
99    /// `dependent_id → {dependency_id, ...}` (upstream adjacency)
100    upstream: HashMap<String, HashSet<String>>,
101}
102
103impl RepoDependencyGraph {
104    /// Create an empty graph.
105    pub fn new() -> Self {
106        Self::default()
107    }
108
109    /// Register a [`RepoNode`]. Idempotent — re-registering an existing id
110    /// updates the node metadata.
111    pub fn add_node(&mut self, node: RepoNode) {
112        let id = node.repo_id.clone();
113        self.nodes.insert(id.clone(), node);
114        self.downstream.entry(id.clone()).or_default();
115        self.upstream.entry(id).or_default();
116    }
117
118    /// Add a directed dependency edge: `dependent` depends on `dependency`.
119    ///
120    /// Both nodes must already be registered via [`add_node`].
121    /// Returns [`MultiRepoError::DependencyCycle`] if the edge would introduce
122    /// a cycle (checked via DFS before the edge is committed).
123    /// Returns [`MultiRepoError::RepoNotFound`] if either node is absent.
124    pub fn add_dependency(&mut self, dependency: &str, dependent: &str) -> MultiRepoResult<()> {
125        if !self.nodes.contains_key(dependency) {
126            return Err(MultiRepoError::RepoNotFound {
127                repo: dependency.to_string(),
128            });
129        }
130        if !self.nodes.contains_key(dependent) {
131            return Err(MultiRepoError::RepoNotFound {
132                repo: dependent.to_string(),
133            });
134        }
135
136        // Tentatively add the edge.
137        self.downstream
138            .entry(dependency.to_string())
139            .or_default()
140            .insert(dependent.to_string());
141        self.upstream
142            .entry(dependent.to_string())
143            .or_default()
144            .insert(dependency.to_string());
145
146        // DFS cycle check starting from the newly added dependent.
147        if let Some(cycle) = self.find_cycle_through(dependent) {
148            // Roll back.
149            self.downstream
150                .get_mut(dependency)
151                .unwrap()
152                .remove(dependent);
153            self.upstream.get_mut(dependent).unwrap().remove(dependency);
154            return Err(MultiRepoError::DependencyCycle { repos: cycle });
155        }
156
157        Ok(())
158    }
159
160    /// Return repos in topological order (dependencies before dependents).
161    ///
162    /// Uses Kahn's algorithm. Returns [`MultiRepoError::DependencyCycle`]
163    /// if a cycle is present (should not occur if `add_dependency` is used).
164    pub fn topological_order(&self) -> MultiRepoResult<Vec<RepoNode>> {
165        let mut in_degree: HashMap<&str, usize> =
166            self.nodes.keys().map(|id| (id.as_str(), 0)).collect();
167
168        for (dep, dependents) in &self.downstream {
169            for d in dependents {
170                *in_degree.entry(d.as_str()).or_default() += 1;
171                let _ = dep; // suppress unused warning
172            }
173        }
174        // Also handle nodes with no outgoing edges.
175        for id in self.nodes.keys() {
176            in_degree.entry(id.as_str()).or_default();
177        }
178
179        let mut roots: Vec<&str> = in_degree
180            .iter()
181            .filter(|(_, &deg)| deg == 0)
182            .map(|(&id, _)| id)
183            .collect();
184        roots.sort_unstable();
185        let mut queue: VecDeque<&str> = roots.into_iter().collect();
186
187        let mut sorted = Vec::new();
188
189        while let Some(node_id) = queue.pop_front() {
190            sorted.push(node_id.to_string());
191            if let Some(dependents) = self.downstream.get(node_id) {
192                let mut next: Vec<&str> = Vec::new();
193                for dep in dependents {
194                    let deg = in_degree.get_mut(dep.as_str()).unwrap();
195                    *deg -= 1;
196                    if *deg == 0 {
197                        next.push(dep.as_str());
198                    }
199                }
200                // Stable sort to keep output deterministic.
201                next.sort_unstable();
202                queue.extend(next);
203            }
204        }
205
206        if sorted.len() != self.nodes.len() {
207            return Err(MultiRepoError::DependencyCycle {
208                repos: self.nodes.keys().cloned().collect(),
209            });
210        }
211
212        Ok(sorted
213            .into_iter()
214            .map(|id| self.nodes[&id].clone())
215            .collect())
216    }
217
218    /// Return all direct dependencies of `repo_id` (repos it depends on).
219    pub fn dependencies_of(&self, repo_id: &str) -> MultiRepoResult<Vec<&RepoNode>> {
220        self.nodes
221            .get(repo_id)
222            .ok_or_else(|| MultiRepoError::RepoNotFound {
223                repo: repo_id.to_string(),
224            })?;
225        let deps = self
226            .upstream
227            .get(repo_id)
228            .into_iter()
229            .flatten()
230            .filter_map(|id| self.nodes.get(id))
231            .collect();
232        Ok(deps)
233    }
234
235    /// Return all direct dependents of `repo_id` (repos that depend on it).
236    pub fn dependents_of(&self, repo_id: &str) -> MultiRepoResult<Vec<&RepoNode>> {
237        self.nodes
238            .get(repo_id)
239            .ok_or_else(|| MultiRepoError::RepoNotFound {
240                repo: repo_id.to_string(),
241            })?;
242        let deps = self
243            .downstream
244            .get(repo_id)
245            .into_iter()
246            .flatten()
247            .filter_map(|id| self.nodes.get(id))
248            .collect();
249        Ok(deps)
250    }
251
252    /// All transitive dependents of `repo_id` (BFS over downstream edges).
253    pub fn transitive_dependents_of(&self, repo_id: &str) -> MultiRepoResult<Vec<String>> {
254        self.nodes
255            .get(repo_id)
256            .ok_or_else(|| MultiRepoError::RepoNotFound {
257                repo: repo_id.to_string(),
258            })?;
259
260        let mut visited = HashSet::new();
261        let mut queue = VecDeque::new();
262        queue.push_back(repo_id.to_string());
263
264        while let Some(current) = queue.pop_front() {
265            if let Some(deps) = self.downstream.get(&current) {
266                for dep in deps {
267                    if visited.insert(dep.clone()) {
268                        queue.push_back(dep.clone());
269                    }
270                }
271            }
272        }
273
274        Ok(visited.into_iter().collect())
275    }
276
277    /// Convert to a [`RepoExecutionPlan`] by running topological sort and
278    /// grouping same-level repos as parallelizable steps.
279    ///
280    /// Two steps are in the same "level" if they share exactly the same set
281    /// of transitive upstream dependencies. We approximate this by tracking
282    /// which Kahn wave each node belongs to.
283    pub fn to_execution_plan(&self, title: &str) -> MultiRepoResult<RepoExecutionPlan> {
284        if self.nodes.is_empty() {
285            return Ok(RepoExecutionPlan {
286                title: title.to_string(),
287                steps: Vec::new(),
288            });
289        }
290
291        // Kahn's algorithm with level tracking.
292        let mut in_degree: HashMap<String, usize> =
293            self.nodes.keys().map(|id| (id.clone(), 0)).collect();
294
295        for dependents in self.downstream.values() {
296            for dep in dependents {
297                *in_degree.get_mut(dep).unwrap() += 1;
298            }
299        }
300
301        let mut level_roots: Vec<String> = in_degree
302            .iter()
303            .filter(|(_, &deg)| deg == 0)
304            .map(|(id, _)| id.clone())
305            .collect();
306        level_roots.sort_unstable();
307        let mut level_queue: VecDeque<(String, usize)> =
308            level_roots.into_iter().map(|id| (id, 0usize)).collect();
309
310        let mut node_level: HashMap<String, usize> = HashMap::new();
311        let mut sorted_ids: Vec<String> = Vec::new();
312
313        while let Some((node_id, level)) = level_queue.pop_front() {
314            node_level.insert(node_id.clone(), level);
315            sorted_ids.push(node_id.clone());
316
317            if let Some(dependents) = self.downstream.get(&node_id) {
318                let mut next: Vec<String> = Vec::new();
319                for dep in dependents {
320                    let deg = in_degree.get_mut(dep).unwrap();
321                    *deg -= 1;
322                    if *deg == 0 {
323                        next.push(dep.clone());
324                    }
325                }
326                next.sort_unstable();
327                for dep in next {
328                    level_queue.push_back((dep, level + 1));
329                }
330            }
331        }
332
333        if sorted_ids.len() != self.nodes.len() {
334            return Err(MultiRepoError::DependencyCycle {
335                repos: self.nodes.keys().cloned().collect(),
336            });
337        }
338
339        // Count how many repos are at each level.
340        let mut level_counts: HashMap<usize, usize> = HashMap::new();
341        for l in node_level.values() {
342            *level_counts.entry(*l).or_default() += 1;
343        }
344
345        let steps = sorted_ids
346            .into_iter()
347            .enumerate()
348            .map(|(pos, id)| {
349                let repo = self.nodes[&id].clone();
350                let depends_on = self
351                    .upstream
352                    .get(&id)
353                    .into_iter()
354                    .flatten()
355                    .cloned()
356                    .collect();
357                let level = *node_level.get(&id).unwrap();
358                // Parallelizable when there are multiple repos at the same level.
359                let parallelizable = level_counts.get(&level).copied().unwrap_or(1) > 1;
360                RepoStep {
361                    position: pos,
362                    repo,
363                    depends_on,
364                    parallelizable,
365                }
366            })
367            .collect();
368
369        Ok(RepoExecutionPlan {
370            title: title.to_string(),
371            steps,
372        })
373    }
374
375    /// DFS from `start` to detect cycles. Returns the cycle path if found.
376    fn find_cycle_through(&self, start: &str) -> Option<Vec<String>> {
377        let mut visited = HashSet::new();
378        let mut path = Vec::new();
379        if self.dfs_cycle(start, &mut visited, &mut path) {
380            Some(path)
381        } else {
382            None
383        }
384    }
385
386    fn dfs_cycle<'a>(
387        &'a self,
388        node: &'a str,
389        visited: &mut HashSet<String>,
390        path: &mut Vec<String>,
391    ) -> bool {
392        if path.contains(&node.to_string()) {
393            path.push(node.to_string());
394            return true;
395        }
396        if visited.contains(node) {
397            return false;
398        }
399        visited.insert(node.to_string());
400        path.push(node.to_string());
401
402        if let Some(dependents) = self.downstream.get(node) {
403            for dep in dependents {
404                if self.dfs_cycle(dep, visited, path) {
405                    return true;
406                }
407            }
408        }
409
410        path.pop();
411        false
412    }
413}
414
415#[cfg(test)]
416mod tests {
417    use super::*;
418
419    fn node(id: &str) -> RepoNode {
420        RepoNode::new(id, id)
421    }
422
423    fn three_chain() -> RepoDependencyGraph {
424        // C → B → A  (A depends on B, B depends on C)
425        let mut g = RepoDependencyGraph::new();
426        g.add_node(node("C"));
427        g.add_node(node("B"));
428        g.add_node(node("A"));
429        g.add_dependency("C", "B").unwrap(); // B depends on C
430        g.add_dependency("B", "A").unwrap(); // A depends on B
431        g
432    }
433
434    #[test]
435    fn test_topological_order_respects_deps() {
436        let g = three_chain();
437        let order: Vec<String> = g
438            .topological_order()
439            .unwrap()
440            .into_iter()
441            .map(|n| n.repo_id)
442            .collect();
443        let c_idx = order.iter().position(|x| x == "C").unwrap();
444        let b_idx = order.iter().position(|x| x == "B").unwrap();
445        let a_idx = order.iter().position(|x| x == "A").unwrap();
446        assert!(c_idx < b_idx, "C must come before B");
447        assert!(b_idx < a_idx, "B must come before A");
448    }
449
450    #[test]
451    fn test_cycle_detection_rejects_mutual_dependency() {
452        let mut g = RepoDependencyGraph::new();
453        g.add_node(node("X"));
454        g.add_node(node("Y"));
455        g.add_dependency("X", "Y").unwrap(); // Y depends on X
456        let result = g.add_dependency("Y", "X"); // X depends on Y → cycle
457        assert!(matches!(
458            result,
459            Err(MultiRepoError::DependencyCycle { .. })
460        ));
461    }
462
463    #[test]
464    fn test_parallel_groups_partitions_independent_repos() {
465        // A and B have no dependencies on each other → same level → parallelizable.
466        let mut g = RepoDependencyGraph::new();
467        g.add_node(node("A"));
468        g.add_node(node("B"));
469        let plan = g.to_execution_plan("test").unwrap();
470        let groups = plan.parallel_groups();
471        // Both repos at level 0 → one parallel group.
472        assert_eq!(groups.len(), 1);
473        assert_eq!(groups[0].len(), 2);
474    }
475
476    #[test]
477    fn test_single_repo_graph_produces_one_step_plan() {
478        let mut g = RepoDependencyGraph::new();
479        g.add_node(node("solo"));
480        let plan = g.to_execution_plan("solo plan").unwrap();
481        assert_eq!(plan.steps.len(), 1);
482        assert!(!plan.steps[0].parallelizable);
483    }
484
485    #[test]
486    fn test_to_execution_plan_title_is_preserved() {
487        let mut g = RepoDependencyGraph::new();
488        g.add_node(node("r1"));
489        let plan = g.to_execution_plan("my plan title").unwrap();
490        assert_eq!(plan.title, "my plan title");
491    }
492
493    #[test]
494    fn test_repo_not_found_error_on_missing_node() {
495        let mut g = RepoDependencyGraph::new();
496        g.add_node(node("A"));
497        let r = g.add_dependency("A", "missing");
498        assert!(matches!(r, Err(MultiRepoError::RepoNotFound { .. })));
499    }
500
501    #[test]
502    fn test_transitive_dependents_covers_full_chain() {
503        let g = three_chain(); // C → B → A
504        let mut trans = g.transitive_dependents_of("C").unwrap();
505        trans.sort();
506        assert!(trans.contains(&"B".to_string()));
507        assert!(trans.contains(&"A".to_string()));
508        assert!(!trans.contains(&"C".to_string()));
509    }
510
511    #[test]
512    fn test_diamond_graph_resolves_correctly() {
513        // A→B, A→C, B→D, C→D
514        let mut g = RepoDependencyGraph::new();
515        for id in &["A", "B", "C", "D"] {
516            g.add_node(node(id));
517        }
518        g.add_dependency("A", "B").unwrap();
519        g.add_dependency("A", "C").unwrap();
520        g.add_dependency("B", "D").unwrap();
521        g.add_dependency("C", "D").unwrap();
522
523        let order = g.topological_order().unwrap();
524        let ids: Vec<&str> = order.iter().map(|n| n.repo_id.as_str()).collect();
525        let a_idx = ids.iter().position(|&x| x == "A").unwrap();
526        let d_idx = ids.iter().position(|&x| x == "D").unwrap();
527        assert!(a_idx < d_idx);
528    }
529
530    #[test]
531    fn test_topological_order_is_deterministic_for_independent_roots() {
532        let mut g = RepoDependencyGraph::new();
533        g.add_node(node("B"));
534        g.add_node(node("A"));
535        g.add_node(node("C"));
536        g.add_dependency("A", "C").unwrap();
537        g.add_dependency("B", "C").unwrap();
538
539        let first: Vec<String> = g
540            .topological_order()
541            .unwrap()
542            .into_iter()
543            .map(|n| n.repo_id)
544            .collect();
545        let second: Vec<String> = g
546            .topological_order()
547            .unwrap()
548            .into_iter()
549            .map(|n| n.repo_id)
550            .collect();
551
552        assert_eq!(first, second);
553        assert_eq!(first, vec!["A", "B", "C"]);
554    }
555
556    #[test]
557    fn test_execution_plan_is_deterministic_for_independent_roots() {
558        let mut g = RepoDependencyGraph::new();
559        g.add_node(node("repo-b"));
560        g.add_node(node("repo-a"));
561        g.add_node(node("repo-c"));
562        g.add_dependency("repo-a", "repo-c").unwrap();
563        g.add_dependency("repo-b", "repo-c").unwrap();
564
565        let plan = g.to_execution_plan("deterministic").unwrap();
566        let ids: Vec<String> = plan.steps.into_iter().map(|s| s.repo.repo_id).collect();
567        assert_eq!(ids, vec!["repo-a", "repo-b", "repo-c"]);
568    }
569}