1use std::collections::{HashMap, HashSet, VecDeque};
10
11use serde::{Deserialize, Serialize};
12
13use crate::multi_repo::error::{MultiRepoError, MultiRepoResult};
14
15#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
17pub struct RepoNode {
18 pub repo_id: String,
20 pub display_name: String,
22 pub remote_url: Option<String>,
24}
25
26impl RepoNode {
27 pub fn new(repo_id: impl Into<String>, display_name: impl Into<String>) -> Self {
29 Self {
30 repo_id: repo_id.into(),
31 display_name: display_name.into(),
32 remote_url: None,
33 }
34 }
35}
36
37#[derive(Debug, Clone)]
42pub struct RepoStep {
43 pub position: usize,
45 pub repo: RepoNode,
47 pub depends_on: Vec<String>,
49 pub parallelizable: bool,
52}
53
54#[derive(Debug, Clone)]
56pub struct RepoExecutionPlan {
57 pub title: String,
59 pub steps: Vec<RepoStep>,
61}
62
63impl RepoExecutionPlan {
64 pub fn parallel_groups(&self) -> Vec<Vec<&RepoStep>> {
70 let mut groups: Vec<Vec<&RepoStep>> = Vec::new();
71 let mut current: Vec<&RepoStep> = Vec::new();
72
73 for step in &self.steps {
74 if step.parallelizable {
75 current.push(step);
76 } else {
77 if !current.is_empty() {
78 groups.push(std::mem::take(&mut current));
79 }
80 groups.push(vec![step]);
81 }
82 }
83 if !current.is_empty() {
84 groups.push(current);
85 }
86 groups
87 }
88}
89
90#[derive(Debug, Clone, Default)]
95pub struct RepoDependencyGraph {
96 nodes: HashMap<String, RepoNode>,
97 downstream: HashMap<String, HashSet<String>>,
99 upstream: HashMap<String, HashSet<String>>,
101}
102
103impl RepoDependencyGraph {
104 pub fn new() -> Self {
106 Self::default()
107 }
108
109 pub fn add_node(&mut self, node: RepoNode) {
112 let id = node.repo_id.clone();
113 self.nodes.insert(id.clone(), node);
114 self.downstream.entry(id.clone()).or_default();
115 self.upstream.entry(id).or_default();
116 }
117
118 pub fn add_dependency(&mut self, dependency: &str, dependent: &str) -> MultiRepoResult<()> {
125 if !self.nodes.contains_key(dependency) {
126 return Err(MultiRepoError::RepoNotFound {
127 repo: dependency.to_string(),
128 });
129 }
130 if !self.nodes.contains_key(dependent) {
131 return Err(MultiRepoError::RepoNotFound {
132 repo: dependent.to_string(),
133 });
134 }
135
136 self.downstream
138 .entry(dependency.to_string())
139 .or_default()
140 .insert(dependent.to_string());
141 self.upstream
142 .entry(dependent.to_string())
143 .or_default()
144 .insert(dependency.to_string());
145
146 if let Some(cycle) = self.find_cycle_through(dependent) {
148 self.downstream
150 .get_mut(dependency)
151 .unwrap()
152 .remove(dependent);
153 self.upstream.get_mut(dependent).unwrap().remove(dependency);
154 return Err(MultiRepoError::DependencyCycle { repos: cycle });
155 }
156
157 Ok(())
158 }
159
160 pub fn topological_order(&self) -> MultiRepoResult<Vec<RepoNode>> {
165 let mut in_degree: HashMap<&str, usize> =
166 self.nodes.keys().map(|id| (id.as_str(), 0)).collect();
167
168 for (dep, dependents) in &self.downstream {
169 for d in dependents {
170 *in_degree.entry(d.as_str()).or_default() += 1;
171 let _ = dep; }
173 }
174 for id in self.nodes.keys() {
176 in_degree.entry(id.as_str()).or_default();
177 }
178
179 let mut roots: Vec<&str> = in_degree
180 .iter()
181 .filter(|(_, °)| deg == 0)
182 .map(|(&id, _)| id)
183 .collect();
184 roots.sort_unstable();
185 let mut queue: VecDeque<&str> = roots.into_iter().collect();
186
187 let mut sorted = Vec::new();
188
189 while let Some(node_id) = queue.pop_front() {
190 sorted.push(node_id.to_string());
191 if let Some(dependents) = self.downstream.get(node_id) {
192 let mut next: Vec<&str> = Vec::new();
193 for dep in dependents {
194 let deg = in_degree.get_mut(dep.as_str()).unwrap();
195 *deg -= 1;
196 if *deg == 0 {
197 next.push(dep.as_str());
198 }
199 }
200 next.sort_unstable();
202 queue.extend(next);
203 }
204 }
205
206 if sorted.len() != self.nodes.len() {
207 return Err(MultiRepoError::DependencyCycle {
208 repos: self.nodes.keys().cloned().collect(),
209 });
210 }
211
212 Ok(sorted
213 .into_iter()
214 .map(|id| self.nodes[&id].clone())
215 .collect())
216 }
217
218 pub fn dependencies_of(&self, repo_id: &str) -> MultiRepoResult<Vec<&RepoNode>> {
220 self.nodes
221 .get(repo_id)
222 .ok_or_else(|| MultiRepoError::RepoNotFound {
223 repo: repo_id.to_string(),
224 })?;
225 let deps = self
226 .upstream
227 .get(repo_id)
228 .into_iter()
229 .flatten()
230 .filter_map(|id| self.nodes.get(id))
231 .collect();
232 Ok(deps)
233 }
234
235 pub fn dependents_of(&self, repo_id: &str) -> MultiRepoResult<Vec<&RepoNode>> {
237 self.nodes
238 .get(repo_id)
239 .ok_or_else(|| MultiRepoError::RepoNotFound {
240 repo: repo_id.to_string(),
241 })?;
242 let deps = self
243 .downstream
244 .get(repo_id)
245 .into_iter()
246 .flatten()
247 .filter_map(|id| self.nodes.get(id))
248 .collect();
249 Ok(deps)
250 }
251
252 pub fn transitive_dependents_of(&self, repo_id: &str) -> MultiRepoResult<Vec<String>> {
254 self.nodes
255 .get(repo_id)
256 .ok_or_else(|| MultiRepoError::RepoNotFound {
257 repo: repo_id.to_string(),
258 })?;
259
260 let mut visited = HashSet::new();
261 let mut queue = VecDeque::new();
262 queue.push_back(repo_id.to_string());
263
264 while let Some(current) = queue.pop_front() {
265 if let Some(deps) = self.downstream.get(¤t) {
266 for dep in deps {
267 if visited.insert(dep.clone()) {
268 queue.push_back(dep.clone());
269 }
270 }
271 }
272 }
273
274 Ok(visited.into_iter().collect())
275 }
276
277 pub fn to_execution_plan(&self, title: &str) -> MultiRepoResult<RepoExecutionPlan> {
284 if self.nodes.is_empty() {
285 return Ok(RepoExecutionPlan {
286 title: title.to_string(),
287 steps: Vec::new(),
288 });
289 }
290
291 let mut in_degree: HashMap<String, usize> =
293 self.nodes.keys().map(|id| (id.clone(), 0)).collect();
294
295 for dependents in self.downstream.values() {
296 for dep in dependents {
297 *in_degree.get_mut(dep).unwrap() += 1;
298 }
299 }
300
301 let mut level_roots: Vec<String> = in_degree
302 .iter()
303 .filter(|(_, °)| deg == 0)
304 .map(|(id, _)| id.clone())
305 .collect();
306 level_roots.sort_unstable();
307 let mut level_queue: VecDeque<(String, usize)> =
308 level_roots.into_iter().map(|id| (id, 0usize)).collect();
309
310 let mut node_level: HashMap<String, usize> = HashMap::new();
311 let mut sorted_ids: Vec<String> = Vec::new();
312
313 while let Some((node_id, level)) = level_queue.pop_front() {
314 node_level.insert(node_id.clone(), level);
315 sorted_ids.push(node_id.clone());
316
317 if let Some(dependents) = self.downstream.get(&node_id) {
318 let mut next: Vec<String> = Vec::new();
319 for dep in dependents {
320 let deg = in_degree.get_mut(dep).unwrap();
321 *deg -= 1;
322 if *deg == 0 {
323 next.push(dep.clone());
324 }
325 }
326 next.sort_unstable();
327 for dep in next {
328 level_queue.push_back((dep, level + 1));
329 }
330 }
331 }
332
333 if sorted_ids.len() != self.nodes.len() {
334 return Err(MultiRepoError::DependencyCycle {
335 repos: self.nodes.keys().cloned().collect(),
336 });
337 }
338
339 let mut level_counts: HashMap<usize, usize> = HashMap::new();
341 for l in node_level.values() {
342 *level_counts.entry(*l).or_default() += 1;
343 }
344
345 let steps = sorted_ids
346 .into_iter()
347 .enumerate()
348 .map(|(pos, id)| {
349 let repo = self.nodes[&id].clone();
350 let depends_on = self
351 .upstream
352 .get(&id)
353 .into_iter()
354 .flatten()
355 .cloned()
356 .collect();
357 let level = *node_level.get(&id).unwrap();
358 let parallelizable = level_counts.get(&level).copied().unwrap_or(1) > 1;
360 RepoStep {
361 position: pos,
362 repo,
363 depends_on,
364 parallelizable,
365 }
366 })
367 .collect();
368
369 Ok(RepoExecutionPlan {
370 title: title.to_string(),
371 steps,
372 })
373 }
374
375 fn find_cycle_through(&self, start: &str) -> Option<Vec<String>> {
377 let mut visited = HashSet::new();
378 let mut path = Vec::new();
379 if self.dfs_cycle(start, &mut visited, &mut path) {
380 Some(path)
381 } else {
382 None
383 }
384 }
385
386 fn dfs_cycle<'a>(
387 &'a self,
388 node: &'a str,
389 visited: &mut HashSet<String>,
390 path: &mut Vec<String>,
391 ) -> bool {
392 if path.contains(&node.to_string()) {
393 path.push(node.to_string());
394 return true;
395 }
396 if visited.contains(node) {
397 return false;
398 }
399 visited.insert(node.to_string());
400 path.push(node.to_string());
401
402 if let Some(dependents) = self.downstream.get(node) {
403 for dep in dependents {
404 if self.dfs_cycle(dep, visited, path) {
405 return true;
406 }
407 }
408 }
409
410 path.pop();
411 false
412 }
413}
414
415#[cfg(test)]
416mod tests {
417 use super::*;
418
419 fn node(id: &str) -> RepoNode {
420 RepoNode::new(id, id)
421 }
422
423 fn three_chain() -> RepoDependencyGraph {
424 let mut g = RepoDependencyGraph::new();
426 g.add_node(node("C"));
427 g.add_node(node("B"));
428 g.add_node(node("A"));
429 g.add_dependency("C", "B").unwrap(); g.add_dependency("B", "A").unwrap(); g
432 }
433
434 #[test]
435 fn test_topological_order_respects_deps() {
436 let g = three_chain();
437 let order: Vec<String> = g
438 .topological_order()
439 .unwrap()
440 .into_iter()
441 .map(|n| n.repo_id)
442 .collect();
443 let c_idx = order.iter().position(|x| x == "C").unwrap();
444 let b_idx = order.iter().position(|x| x == "B").unwrap();
445 let a_idx = order.iter().position(|x| x == "A").unwrap();
446 assert!(c_idx < b_idx, "C must come before B");
447 assert!(b_idx < a_idx, "B must come before A");
448 }
449
450 #[test]
451 fn test_cycle_detection_rejects_mutual_dependency() {
452 let mut g = RepoDependencyGraph::new();
453 g.add_node(node("X"));
454 g.add_node(node("Y"));
455 g.add_dependency("X", "Y").unwrap(); let result = g.add_dependency("Y", "X"); assert!(matches!(
458 result,
459 Err(MultiRepoError::DependencyCycle { .. })
460 ));
461 }
462
463 #[test]
464 fn test_parallel_groups_partitions_independent_repos() {
465 let mut g = RepoDependencyGraph::new();
467 g.add_node(node("A"));
468 g.add_node(node("B"));
469 let plan = g.to_execution_plan("test").unwrap();
470 let groups = plan.parallel_groups();
471 assert_eq!(groups.len(), 1);
473 assert_eq!(groups[0].len(), 2);
474 }
475
476 #[test]
477 fn test_single_repo_graph_produces_one_step_plan() {
478 let mut g = RepoDependencyGraph::new();
479 g.add_node(node("solo"));
480 let plan = g.to_execution_plan("solo plan").unwrap();
481 assert_eq!(plan.steps.len(), 1);
482 assert!(!plan.steps[0].parallelizable);
483 }
484
485 #[test]
486 fn test_to_execution_plan_title_is_preserved() {
487 let mut g = RepoDependencyGraph::new();
488 g.add_node(node("r1"));
489 let plan = g.to_execution_plan("my plan title").unwrap();
490 assert_eq!(plan.title, "my plan title");
491 }
492
493 #[test]
494 fn test_repo_not_found_error_on_missing_node() {
495 let mut g = RepoDependencyGraph::new();
496 g.add_node(node("A"));
497 let r = g.add_dependency("A", "missing");
498 assert!(matches!(r, Err(MultiRepoError::RepoNotFound { .. })));
499 }
500
501 #[test]
502 fn test_transitive_dependents_covers_full_chain() {
503 let g = three_chain(); let mut trans = g.transitive_dependents_of("C").unwrap();
505 trans.sort();
506 assert!(trans.contains(&"B".to_string()));
507 assert!(trans.contains(&"A".to_string()));
508 assert!(!trans.contains(&"C".to_string()));
509 }
510
511 #[test]
512 fn test_diamond_graph_resolves_correctly() {
513 let mut g = RepoDependencyGraph::new();
515 for id in &["A", "B", "C", "D"] {
516 g.add_node(node(id));
517 }
518 g.add_dependency("A", "B").unwrap();
519 g.add_dependency("A", "C").unwrap();
520 g.add_dependency("B", "D").unwrap();
521 g.add_dependency("C", "D").unwrap();
522
523 let order = g.topological_order().unwrap();
524 let ids: Vec<&str> = order.iter().map(|n| n.repo_id.as_str()).collect();
525 let a_idx = ids.iter().position(|&x| x == "A").unwrap();
526 let d_idx = ids.iter().position(|&x| x == "D").unwrap();
527 assert!(a_idx < d_idx);
528 }
529
530 #[test]
531 fn test_topological_order_is_deterministic_for_independent_roots() {
532 let mut g = RepoDependencyGraph::new();
533 g.add_node(node("B"));
534 g.add_node(node("A"));
535 g.add_node(node("C"));
536 g.add_dependency("A", "C").unwrap();
537 g.add_dependency("B", "C").unwrap();
538
539 let first: Vec<String> = g
540 .topological_order()
541 .unwrap()
542 .into_iter()
543 .map(|n| n.repo_id)
544 .collect();
545 let second: Vec<String> = g
546 .topological_order()
547 .unwrap()
548 .into_iter()
549 .map(|n| n.repo_id)
550 .collect();
551
552 assert_eq!(first, second);
553 assert_eq!(first, vec!["A", "B", "C"]);
554 }
555
556 #[test]
557 fn test_execution_plan_is_deterministic_for_independent_roots() {
558 let mut g = RepoDependencyGraph::new();
559 g.add_node(node("repo-b"));
560 g.add_node(node("repo-a"));
561 g.add_node(node("repo-c"));
562 g.add_dependency("repo-a", "repo-c").unwrap();
563 g.add_dependency("repo-b", "repo-c").unwrap();
564
565 let plan = g.to_execution_plan("deterministic").unwrap();
566 let ids: Vec<String> = plan.steps.into_iter().map(|s| s.repo.repo_id).collect();
567 assert_eq!(ids, vec!["repo-a", "repo-b", "repo-c"]);
568 }
569}