1use std::collections::{HashMap, HashSet};
2use std::path::Path;
3use std::process::Command;
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, Instant};
6
7use anyhow::Result;
8
9use crate::index::Index;
10use crate::stream::{self, StreamEvent};
11use crate::unit::Status;
12use crate::util::natural_cmp;
13
14use super::plan::SizedUnit;
15use super::ready_queue::run_single_direct;
16use super::{AgentResult, SpawnMode, UnitAction};
17
18pub struct Wave {
20 pub units: Vec<SizedUnit>,
21}
22
23pub(super) fn compute_waves(units: &[SizedUnit], index: &Index) -> Vec<Wave> {
26 let mut waves = Vec::new();
27 let unit_ids: HashSet<String> = units.iter().map(|b| b.id.clone()).collect();
28
29 let mut completed: HashSet<String> = index
31 .units
32 .iter()
33 .filter(|e| e.status == Status::Closed)
34 .map(|e| e.id.clone())
35 .collect();
36
37 let mut remaining: Vec<SizedUnit> = units.to_vec();
38
39 while !remaining.is_empty() {
40 let (ready, blocked): (Vec<SizedUnit>, Vec<SizedUnit>) =
41 remaining.into_iter().partition(|b| {
42 let explicit_ok = b
44 .dependencies
45 .iter()
46 .all(|d| completed.contains(d) || !unit_ids.contains(d));
47
48 let requires_ok = b.requires.iter().all(|req| {
50 if let Some(producer) = units.iter().find(|other| {
52 other.id != b.id && other.parent == b.parent && other.produces.contains(req)
53 }) {
54 completed.contains(&producer.id)
55 } else {
56 true }
58 });
59
60 explicit_ok && requires_ok
61 });
62
63 if ready.is_empty() {
64 eprintln!(
67 "Warning: {} unit(s) have unresolvable dependencies, adding to final wave",
68 blocked.len()
69 );
70 waves.push(Wave { units: blocked });
71 break;
72 }
73
74 for b in &ready {
75 completed.insert(b.id.clone());
76 }
77
78 waves.push(Wave { units: ready });
79 remaining = blocked;
80 }
81
82 let weights = compute_downstream_weights(units);
84
85 for wave in &mut waves {
89 wave.units.sort_by(|a, b| {
90 a.priority
91 .cmp(&b.priority)
92 .then_with(|| {
93 let wa = weights.get(&a.id).copied().unwrap_or(1);
94 let wb = weights.get(&b.id).copied().unwrap_or(1);
95 wb.cmp(&wa) })
97 .then_with(|| natural_cmp(&a.id, &b.id))
98 });
99 }
100
101 waves
102}
103
104pub(super) fn compute_downstream_weights(units: &[SizedUnit]) -> HashMap<String, u32> {
108 let unit_ids: HashSet<String> = units.iter().map(|b| b.id.clone()).collect();
109
110 let mut reverse_deps: HashMap<String, Vec<String>> = HashMap::new();
112
113 for b in units {
114 reverse_deps.entry(b.id.clone()).or_default();
115
116 for dep in &b.dependencies {
118 if unit_ids.contains(dep) {
119 reverse_deps
120 .entry(dep.clone())
121 .or_default()
122 .push(b.id.clone());
123 }
124 }
125
126 for req in &b.requires {
129 if let Some(producer) = units.iter().find(|other| {
130 other.id != b.id && other.parent == b.parent && other.produces.contains(req)
131 }) {
132 if unit_ids.contains(&producer.id) {
133 reverse_deps
134 .entry(producer.id.clone())
135 .or_default()
136 .push(b.id.clone());
137 }
138 }
139 }
140 }
141
142 let mut weights: HashMap<String, u32> = HashMap::new();
144
145 for b in units {
146 let mut visited: HashSet<String> = HashSet::new();
147 let mut queue: Vec<String> = Vec::new();
148
149 for dep in reverse_deps.get(&b.id).unwrap_or(&Vec::new()) {
151 if visited.insert(dep.clone()) {
152 queue.push(dep.clone());
153 }
154 }
155
156 while let Some(current) = queue.pop() {
158 for next in reverse_deps.get(¤t).unwrap_or(&Vec::new()) {
159 if visited.insert(next.clone()) {
160 queue.push(next.clone());
161 }
162 }
163 }
164
165 weights.insert(b.id.clone(), 1 + visited.len() as u32);
166 }
167
168 weights
169}
170
171pub(super) fn compute_file_conflicts(units: &[SizedUnit]) -> Vec<(String, Vec<String>)> {
174 let mut file_to_units: HashMap<String, Vec<String>> = HashMap::new();
175 for b in units {
176 for p in &b.paths {
177 file_to_units
178 .entry(p.clone())
179 .or_default()
180 .push(b.id.clone());
181 }
182 }
183 let mut conflicts: Vec<(String, Vec<String>)> = file_to_units
184 .into_iter()
185 .filter(|(_, ids)| ids.len() > 1)
186 .collect();
187 conflicts.sort_by(|a, b| a.0.cmp(&b.0));
188 conflicts
189}
190
191pub(super) fn compute_effective_parallelism(units: &[SizedUnit]) -> usize {
194 if units.is_empty() {
195 return 0;
196 }
197 let mut occupied: HashSet<String> = HashSet::new();
198 let mut count = 0;
199 for b in units {
200 if b.paths.is_empty() || !b.paths.iter().any(|p| occupied.contains(p)) {
201 for p in &b.paths {
202 occupied.insert(p.clone());
203 }
204 count += 1;
205 }
206 }
207 count
208}
209
210pub(super) fn compute_critical_path(units: &[SizedUnit]) -> Vec<String> {
213 if units.is_empty() {
214 return vec![];
215 }
216
217 let weights = compute_downstream_weights(units);
218 let unit_ids: HashSet<String> = units.iter().map(|b| b.id.clone()).collect();
219
220 let mut dependents: HashMap<String, Vec<String>> = HashMap::new();
222 for b in units {
223 for dep in &b.dependencies {
224 if unit_ids.contains(dep) {
225 dependents
226 .entry(dep.clone())
227 .or_default()
228 .push(b.id.clone());
229 }
230 }
231 for req in &b.requires {
232 if let Some(producer) = units.iter().find(|other| {
233 other.id != b.id && other.parent == b.parent && other.produces.contains(req)
234 }) {
235 if unit_ids.contains(&producer.id) {
236 dependents
237 .entry(producer.id.clone())
238 .or_default()
239 .push(b.id.clone());
240 }
241 }
242 }
243 }
244
245 let start = units
247 .iter()
248 .max_by(|a, b| {
249 let wa = weights.get(&a.id).copied().unwrap_or(0);
250 let wb = weights.get(&b.id).copied().unwrap_or(0);
251 wa.cmp(&wb).then_with(|| natural_cmp(&b.id, &a.id))
252 })
253 .unwrap();
254
255 let mut path = vec![start.id.clone()];
256 let mut current = start.id.clone();
257
258 loop {
260 let Some(deps) = dependents.get(¤t) else {
261 break;
262 };
263 if deps.is_empty() {
264 break;
265 }
266 let mut deps_sorted = deps.clone();
268 deps_sorted.sort_by(|a, b| {
269 let wa = weights.get(a).copied().unwrap_or(0);
270 let wb = weights.get(b).copied().unwrap_or(0);
271 wb.cmp(&wa).then_with(|| natural_cmp(a, b))
272 });
273 let next = &deps_sorted[0];
274 path.push(next.clone());
275 current = next.clone();
276 }
277
278 path
279}
280
281pub(super) fn run_wave(
287 mana_dir: &Path,
288 units: &[SizedUnit],
289 spawn_mode: &SpawnMode,
290 cfg: &super::RunConfig,
291 wave_number: usize,
292) -> Result<Vec<AgentResult>> {
293 match spawn_mode {
294 SpawnMode::Template {
295 run_template,
296 plan_template,
297 } => run_wave_template(
298 units,
299 run_template,
300 plan_template.as_deref(),
301 cfg.max_jobs,
302 cfg.timeout_minutes,
303 cfg.idle_timeout_minutes,
304 cfg.run_model.as_deref(),
305 ),
306 SpawnMode::Direct => run_wave_direct(
307 mana_dir,
308 units,
309 cfg.max_jobs,
310 cfg.timeout_minutes,
311 cfg.idle_timeout_minutes,
312 cfg.run_model.as_deref(),
313 cfg.json_stream,
314 wave_number,
315 cfg.file_locking,
316 ),
317 }
318}
319
320fn run_wave_template(
322 units: &[SizedUnit],
323 run_template: &str,
324 _plan_template: Option<&str>,
325 max_jobs: usize,
326 timeout_minutes: u32,
327 idle_timeout_minutes: u32,
328 config_run_model: Option<&str>,
329) -> Result<Vec<AgentResult>> {
330 let total_timeout = Duration::from_secs(timeout_minutes as u64 * 60);
331 let _idle_timeout = Duration::from_secs(idle_timeout_minutes as u64 * 60);
332 let mut results = Vec::new();
336 let mut children: Vec<(SizedUnit, std::process::Child, Instant, Instant)> = Vec::new();
338
339 let mut pending: Vec<&SizedUnit> = units.iter().collect();
340
341 while !pending.is_empty() || !children.is_empty() {
342 if super::shutdown_requested() {
344 for (sb, mut child, started, _) in children {
345 let _ = child.kill();
346 let _ = child.wait();
347 results.push(AgentResult {
348 id: sb.id.clone(),
349 title: sb.title.clone(),
350 action: sb.action,
351 success: false,
352 duration: started.elapsed(),
353 total_tokens: None,
354 total_cost: None,
355 error: Some("Interrupted by shutdown signal".to_string()),
356 tool_count: 0,
357 turns: 0,
358 failure_summary: None,
359 });
360 }
361 return Ok(results);
362 }
363
364 while children.len() < max_jobs && !pending.is_empty() {
366 let sb = pending.remove(0);
367 let template = match sb.action {
368 UnitAction::Implement => run_template,
369 };
370
371 let effective_model = sb.model.as_deref().or(config_run_model);
373 let cmd =
374 crate::spawner::substitute_template_with_model(template, &sb.id, effective_model);
375 match Command::new("sh").args(["-c", &cmd]).spawn() {
376 Ok(child) => {
377 let now = Instant::now();
378 children.push((sb.clone(), child, now, now));
379 }
380 Err(e) => {
381 eprintln!(" Failed to spawn agent for {}: {}", sb.id, e);
382 results.push(AgentResult {
383 id: sb.id.clone(),
384 title: sb.title.clone(),
385 action: sb.action,
386 success: false,
387 duration: Duration::ZERO,
388 total_tokens: None,
389 total_cost: None,
390 error: Some(format!("Failed to spawn: {}", e)),
391 tool_count: 0,
392 turns: 0,
393 failure_summary: None,
394 });
395 }
396 }
397 }
398
399 if children.is_empty() {
400 break;
401 }
402
403 let mut still_running = Vec::new();
405 for (sb, mut child, started, last_activity) in children {
406 match child.try_wait() {
407 Ok(Some(status)) => {
408 let err = if status.success() {
409 None
410 } else {
411 Some(format!("Exit code {}", status.code().unwrap_or(-1)))
412 };
413 results.push(AgentResult {
414 id: sb.id.clone(),
415 title: sb.title.clone(),
416 action: sb.action,
417 success: status.success(),
418 duration: started.elapsed(),
419 total_tokens: None,
420 total_cost: None,
421 error: err,
422 tool_count: 0,
423 turns: 0,
424 failure_summary: None,
425 });
426 }
427 Ok(None) => {
428 let elapsed = started.elapsed();
430
431 if !total_timeout.is_zero() && elapsed > total_timeout {
432 eprintln!(
433 " ⚠ {} total timeout ({}m) — killing",
434 sb.id, timeout_minutes
435 );
436 let _ = child.kill();
437 let _ = child.wait();
438 results.push(AgentResult {
439 id: sb.id.clone(),
440 title: sb.title.clone(),
441 action: sb.action,
442 success: false,
443 duration: elapsed,
444 total_tokens: None,
445 total_cost: None,
446 error: Some(format!("Total timeout exceeded ({}m)", timeout_minutes)),
447 tool_count: 0,
448 turns: 0,
449 failure_summary: None,
450 });
451 } else {
452 still_running.push((sb, child, started, last_activity));
453 }
454 }
455 Err(e) => {
456 eprintln!(" Error checking agent for {}: {}", sb.id, e);
457 results.push(AgentResult {
458 id: sb.id.clone(),
459 title: sb.title.clone(),
460 action: sb.action,
461 success: false,
462 duration: started.elapsed(),
463 total_tokens: None,
464 total_cost: None,
465 error: Some(format!("Error checking process: {}", e)),
466 tool_count: 0,
467 turns: 0,
468 failure_summary: None,
469 });
470 }
471 }
472 }
473 children = still_running;
474
475 if !children.is_empty() {
476 std::thread::sleep(Duration::from_millis(500));
477 }
478 }
479
480 Ok(results)
481}
482
483#[allow(clippy::too_many_arguments)]
485fn run_wave_direct(
486 mana_dir: &Path,
487 units: &[SizedUnit],
488 max_jobs: usize,
489 timeout_minutes: u32,
490 idle_timeout_minutes: u32,
491 config_run_model: Option<&str>,
492 json_stream: bool,
493 wave_number: usize,
494 file_locking: bool,
495) -> Result<Vec<AgentResult>> {
496 let results = Arc::new(Mutex::new(Vec::new()));
497 let mut pending: Vec<SizedUnit> = units.to_vec();
498 let mut handles: Vec<std::thread::JoinHandle<()>> = Vec::new();
499
500 while !pending.is_empty() || !handles.is_empty() {
501 if super::shutdown_requested() {
503 super::kill_all_children();
504 for handle in handles {
506 let _ = handle.join();
507 }
508 return Ok(Arc::try_unwrap(results).unwrap().into_inner().unwrap());
509 }
510
511 while handles.len() < max_jobs && !pending.is_empty() {
513 let sb = pending.remove(0);
514 let mana_dir = mana_dir.to_path_buf();
515 let results = Arc::clone(&results);
516 let timeout_min = timeout_minutes;
517 let idle_min = idle_timeout_minutes;
518 let config_run_model = config_run_model.map(str::to_string);
519
520 if json_stream {
521 stream::emit(&StreamEvent::UnitStart {
522 id: sb.id.clone(),
523 title: sb.title.clone(),
524 round: wave_number,
525 file_overlaps: None,
526 attempt: None,
527 priority: None,
528 });
529 }
530
531 let handle = std::thread::spawn(move || {
532 let result = run_single_direct(
533 &mana_dir,
534 &sb,
535 timeout_min,
536 idle_min,
537 config_run_model.as_deref(),
538 json_stream,
539 file_locking,
540 false, );
542 results.lock().unwrap().push(result);
543 });
544 handles.push(handle);
545 }
546
547 let prev_count = handles.len();
549 let mut still_running = Vec::new();
550 for handle in handles.drain(..) {
551 if handle.is_finished() {
552 let _ = handle.join();
553 } else {
554 still_running.push(handle);
555 }
556 }
557
558 if still_running.len() == prev_count && !still_running.is_empty() {
560 std::thread::sleep(Duration::from_millis(200));
561 }
562
563 handles = still_running;
564 }
565
566 for handle in handles {
568 let _ = handle.join();
569 }
570
571 Ok(Arc::try_unwrap(results).unwrap().into_inner().unwrap())
572}
573
574#[cfg(test)]
575mod tests {
576 use super::*;
577 use crate::commands::run::UnitAction;
578 use crate::index::Index;
579
580 #[test]
581 fn compute_waves_no_deps() {
582 let index = Index { units: vec![] };
583 let units = vec![
584 SizedUnit {
585 id: "1".to_string(),
586 title: "A".to_string(),
587 action: UnitAction::Implement,
588 priority: 2,
589 dependencies: vec![],
590 parent: None,
591 produces: vec![],
592 requires: vec![],
593 paths: vec![],
594 model: None,
595 },
596 SizedUnit {
597 id: "2".to_string(),
598 title: "B".to_string(),
599 action: UnitAction::Implement,
600 priority: 2,
601 dependencies: vec![],
602 parent: None,
603 produces: vec![],
604 requires: vec![],
605 paths: vec![],
606 model: None,
607 },
608 ];
609 let waves = compute_waves(&units, &index);
610 assert_eq!(waves.len(), 1);
611 assert_eq!(waves[0].units.len(), 2);
612 }
613
614 #[test]
615 fn compute_waves_linear_chain() {
616 let index = Index { units: vec![] };
617 let units = vec![
618 SizedUnit {
619 id: "1".to_string(),
620 title: "A".to_string(),
621 action: UnitAction::Implement,
622 priority: 2,
623 dependencies: vec![],
624 parent: None,
625 produces: vec![],
626 requires: vec![],
627 paths: vec![],
628 model: None,
629 },
630 SizedUnit {
631 id: "2".to_string(),
632 title: "B".to_string(),
633 action: UnitAction::Implement,
634 priority: 2,
635 dependencies: vec!["1".to_string()],
636 parent: None,
637 produces: vec![],
638 requires: vec![],
639 paths: vec![],
640 model: None,
641 },
642 SizedUnit {
643 id: "3".to_string(),
644 title: "C".to_string(),
645 action: UnitAction::Implement,
646 priority: 2,
647 dependencies: vec!["2".to_string()],
648 parent: None,
649 produces: vec![],
650 requires: vec![],
651 paths: vec![],
652 model: None,
653 },
654 ];
655 let waves = compute_waves(&units, &index);
656 assert_eq!(waves.len(), 3);
657 assert_eq!(waves[0].units[0].id, "1");
658 assert_eq!(waves[1].units[0].id, "2");
659 assert_eq!(waves[2].units[0].id, "3");
660 }
661
662 #[test]
663 fn compute_waves_diamond() {
664 let index = Index { units: vec![] };
665 let units = vec![
667 SizedUnit {
668 id: "1".to_string(),
669 title: "Root".to_string(),
670 action: UnitAction::Implement,
671 priority: 2,
672 dependencies: vec![],
673 parent: None,
674 produces: vec![],
675 requires: vec![],
676 paths: vec![],
677 model: None,
678 },
679 SizedUnit {
680 id: "2".to_string(),
681 title: "Left".to_string(),
682 action: UnitAction::Implement,
683 priority: 2,
684 dependencies: vec!["1".to_string()],
685 parent: None,
686 produces: vec![],
687 requires: vec![],
688 paths: vec![],
689 model: None,
690 },
691 SizedUnit {
692 id: "3".to_string(),
693 title: "Right".to_string(),
694 action: UnitAction::Implement,
695 priority: 2,
696 dependencies: vec!["1".to_string()],
697 parent: None,
698 produces: vec![],
699 requires: vec![],
700 paths: vec![],
701 model: None,
702 },
703 SizedUnit {
704 id: "4".to_string(),
705 title: "Join".to_string(),
706 action: UnitAction::Implement,
707 priority: 2,
708 dependencies: vec!["2".to_string(), "3".to_string()],
709 parent: None,
710 produces: vec![],
711 requires: vec![],
712 paths: vec![],
713 model: None,
714 },
715 ];
716 let waves = compute_waves(&units, &index);
717 assert_eq!(waves.len(), 3);
718 assert_eq!(waves[0].units.len(), 1); assert_eq!(waves[1].units.len(), 2); assert_eq!(waves[2].units.len(), 1); }
722
723 #[test]
724 fn template_wave_execution_with_echo() {
725 let units = vec![SizedUnit {
726 id: "1".to_string(),
727 title: "Test".to_string(),
728 action: UnitAction::Implement,
729 priority: 2,
730 dependencies: vec![],
731 parent: None,
732 produces: vec![],
733 requires: vec![],
734 paths: vec![],
735 model: None,
736 }];
737
738 let results = run_wave_template(&units, "echo {id}", None, 4, 30, 5, None).unwrap();
739 assert_eq!(results.len(), 1);
740 assert!(results[0].success);
741 assert_eq!(results[0].id, "1");
742 }
743
744 #[test]
745 fn template_wave_runs_implement_action() {
746 let units = vec![SizedUnit {
747 id: "1".to_string(),
748 title: "Test".to_string(),
749 action: UnitAction::Implement,
750 priority: 2,
751 dependencies: vec![],
752 parent: None,
753 produces: vec![],
754 requires: vec![],
755 paths: vec![],
756 model: None,
757 }];
758
759 let results = run_wave_template(&units, "echo {id}", None, 4, 30, 5, None).unwrap();
760 assert_eq!(results.len(), 1);
761 assert!(results[0].success);
762 assert_eq!(results[0].id, "1");
763 }
764
765 #[test]
766 fn template_wave_failed_command() {
767 let units = vec![SizedUnit {
768 id: "1".to_string(),
769 title: "Fail".to_string(),
770 action: UnitAction::Implement,
771 priority: 2,
772 dependencies: vec![],
773 parent: None,
774 produces: vec![],
775 requires: vec![],
776 paths: vec![],
777 model: None,
778 }];
779
780 let results = run_wave_template(&units, "false", None, 4, 30, 5, None).unwrap();
781 assert_eq!(results.len(), 1);
782 assert!(!results[0].success);
783 assert!(results[0].error.is_some());
784 }
785
786 fn make_unit(id: &str, deps: Vec<&str>, produces: Vec<&str>, requires: Vec<&str>) -> SizedUnit {
789 SizedUnit {
790 id: id.to_string(),
791 title: format!("Unit {}", id),
792 action: UnitAction::Implement,
793 priority: 2,
794 dependencies: deps.into_iter().map(|s| s.to_string()).collect(),
795 parent: Some("p".to_string()),
796 produces: produces.into_iter().map(|s| s.to_string()).collect(),
797 requires: requires.into_iter().map(|s| s.to_string()).collect(),
798 paths: vec![],
799 model: None,
800 }
801 }
802
803 #[test]
804 fn downstream_weights_single_unit() {
805 let units = vec![make_unit("A", vec![], vec![], vec![])];
806 let weights = compute_downstream_weights(&units);
807 assert_eq!(weights.get("A").copied(), Some(1));
808 }
809
810 #[test]
811 fn downstream_weights_linear_chain() {
812 let units = vec![
814 make_unit("A", vec![], vec![], vec![]),
815 make_unit("B", vec!["A"], vec![], vec![]),
816 make_unit("C", vec!["B"], vec![], vec![]),
817 ];
818 let weights = compute_downstream_weights(&units);
819 assert_eq!(weights.get("A").copied(), Some(3)); assert_eq!(weights.get("B").copied(), Some(2)); assert_eq!(weights.get("C").copied(), Some(1)); }
823
824 #[test]
825 fn downstream_weights_diamond() {
826 let units = vec![
828 make_unit("A", vec![], vec![], vec![]),
829 make_unit("B", vec!["A"], vec![], vec![]),
830 make_unit("C", vec!["A"], vec![], vec![]),
831 make_unit("D", vec!["B", "C"], vec![], vec![]),
832 ];
833 let weights = compute_downstream_weights(&units);
834 assert_eq!(weights.get("D").copied(), Some(1)); assert_eq!(weights.get("B").copied(), Some(2)); assert_eq!(weights.get("C").copied(), Some(2)); assert_eq!(weights.get("A").copied(), Some(4)); }
839
840 #[test]
841 fn downstream_weights_independent() {
842 let units = vec![
843 make_unit("A", vec![], vec![], vec![]),
844 make_unit("B", vec![], vec![], vec![]),
845 ];
846 let weights = compute_downstream_weights(&units);
847 assert_eq!(weights.get("A").copied(), Some(1));
848 assert_eq!(weights.get("B").copied(), Some(1));
849 }
850
851 #[test]
852 fn downstream_weights_wide_fan() {
853 let units = vec![
855 make_unit("A", vec![], vec![], vec![]),
856 make_unit("B", vec!["A"], vec![], vec![]),
857 make_unit("C", vec!["A"], vec![], vec![]),
858 make_unit("D", vec!["A"], vec![], vec![]),
859 ];
860 let weights = compute_downstream_weights(&units);
861 assert_eq!(weights.get("A").copied(), Some(4)); assert_eq!(weights.get("B").copied(), Some(1));
863 assert_eq!(weights.get("C").copied(), Some(1));
864 assert_eq!(weights.get("D").copied(), Some(1));
865 }
866
867 #[test]
870 fn compute_waves_sorts_by_downstream_weight() {
871 let index = Index { units: vec![] };
872 let units = vec![
877 make_unit("A", vec![], vec![], vec![]),
878 make_unit("B", vec![], vec![], vec![]),
879 make_unit("C", vec![], vec![], vec![]),
880 make_unit("D", vec!["A"], vec![], vec![]),
881 make_unit("E", vec!["B"], vec![], vec![]),
882 make_unit("F", vec!["B"], vec![], vec![]),
883 ];
884 let waves = compute_waves(&units, &index);
885 assert_eq!(waves.len(), 2);
886 assert_eq!(waves[0].units[0].id, "B");
888 assert_eq!(waves[0].units[1].id, "A");
889 assert_eq!(waves[0].units[2].id, "C");
890 }
891
892 #[test]
893 fn compute_waves_weight_sorting_preserves_priority() {
894 let index = Index { units: vec![] };
895 let mut a = make_unit("A", vec![], vec![], vec![]);
897 a.priority = 1;
898 let mut b = make_unit("B", vec![], vec![], vec![]);
899 b.priority = 2;
900 let c = make_unit("C", vec!["B"], vec![], vec![]);
902 let units = vec![a, b, c];
903 let waves = compute_waves(&units, &index);
904 assert_eq!(waves[0].units[0].id, "A");
906 assert_eq!(waves[0].units[1].id, "B");
907 }
908
909 fn make_unit_with_paths(id: &str, deps: Vec<&str>, paths: Vec<&str>) -> SizedUnit {
912 SizedUnit {
913 id: id.to_string(),
914 title: format!("Unit {}", id),
915 action: UnitAction::Implement,
916 priority: 2,
917 dependencies: deps.into_iter().map(|s| s.to_string()).collect(),
918 parent: Some("p".to_string()),
919 produces: vec![],
920 requires: vec![],
921 paths: paths.into_iter().map(|s| s.to_string()).collect(),
922 model: None,
923 }
924 }
925
926 #[test]
927 fn file_conflicts_detected() {
928 let units = vec![
929 make_unit_with_paths("A", vec![], vec!["src/lib.rs", "src/a.rs"]),
930 make_unit_with_paths("B", vec![], vec!["src/lib.rs", "src/b.rs"]),
931 make_unit_with_paths("C", vec![], vec!["src/c.rs"]),
932 ];
933 let conflicts = compute_file_conflicts(&units);
934 assert_eq!(conflicts.len(), 1);
935 assert_eq!(conflicts[0].0, "src/lib.rs");
936 assert!(conflicts[0].1.contains(&"A".to_string()));
937 assert!(conflicts[0].1.contains(&"B".to_string()));
938 }
939
940 #[test]
941 fn file_conflicts_empty_when_no_overlap() {
942 let units = vec![
943 make_unit_with_paths("A", vec![], vec!["src/a.rs"]),
944 make_unit_with_paths("B", vec![], vec!["src/b.rs"]),
945 ];
946 let conflicts = compute_file_conflicts(&units);
947 assert!(conflicts.is_empty());
948 }
949
950 #[test]
951 fn file_conflicts_multiple_files() {
952 let units = vec![
953 make_unit_with_paths("A", vec![], vec!["src/lib.rs", "src/mod.rs"]),
954 make_unit_with_paths("B", vec![], vec!["src/lib.rs"]),
955 make_unit_with_paths("C", vec![], vec!["src/mod.rs"]),
956 ];
957 let conflicts = compute_file_conflicts(&units);
958 assert_eq!(conflicts.len(), 2);
959 assert_eq!(conflicts[0].0, "src/lib.rs");
961 assert_eq!(conflicts[1].0, "src/mod.rs");
962 }
963
964 #[test]
967 fn effective_parallelism_no_conflicts() {
968 let units = vec![
969 make_unit_with_paths("A", vec![], vec!["src/a.rs"]),
970 make_unit_with_paths("B", vec![], vec!["src/b.rs"]),
971 make_unit_with_paths("C", vec![], vec!["src/c.rs"]),
972 ];
973 assert_eq!(compute_effective_parallelism(&units), 3);
974 }
975
976 #[test]
977 fn effective_parallelism_with_conflict() {
978 let units = vec![
979 make_unit_with_paths("A", vec![], vec!["src/lib.rs"]),
980 make_unit_with_paths("B", vec![], vec!["src/lib.rs"]),
981 make_unit_with_paths("C", vec![], vec!["src/c.rs"]),
982 ];
983 assert_eq!(compute_effective_parallelism(&units), 2);
985 }
986
987 #[test]
988 fn effective_parallelism_all_conflict() {
989 let units = vec![
990 make_unit_with_paths("A", vec![], vec!["src/shared.rs"]),
991 make_unit_with_paths("B", vec![], vec!["src/shared.rs"]),
992 make_unit_with_paths("C", vec![], vec!["src/shared.rs"]),
993 ];
994 assert_eq!(compute_effective_parallelism(&units), 1);
996 }
997
998 #[test]
999 fn effective_parallelism_empty_paths_no_conflict() {
1000 let units = vec![
1001 make_unit_with_paths("A", vec![], vec![]),
1002 make_unit_with_paths("B", vec![], vec![]),
1003 make_unit_with_paths("C", vec![], vec!["src/c.rs"]),
1004 ];
1005 assert_eq!(compute_effective_parallelism(&units), 3);
1007 }
1008
1009 #[test]
1010 fn effective_parallelism_empty_input() {
1011 assert_eq!(compute_effective_parallelism(&[]), 0);
1012 }
1013
1014 #[test]
1017 fn critical_path_single_unit() {
1018 let units = vec![make_unit("A", vec![], vec![], vec![])];
1019 let path = compute_critical_path(&units);
1020 assert_eq!(path, vec!["A"]);
1021 }
1022
1023 #[test]
1024 fn critical_path_linear_chain() {
1025 let units = vec![
1026 make_unit("A", vec![], vec![], vec![]),
1027 make_unit("B", vec!["A"], vec![], vec![]),
1028 make_unit("C", vec!["B"], vec![], vec![]),
1029 ];
1030 let path = compute_critical_path(&units);
1031 assert_eq!(path, vec!["A", "B", "C"]);
1032 }
1033
1034 #[test]
1035 fn critical_path_diamond() {
1036 let units = vec![
1038 make_unit("A", vec![], vec![], vec![]),
1039 make_unit("B", vec!["A"], vec![], vec![]),
1040 make_unit("C", vec!["A"], vec![], vec![]),
1041 make_unit("D", vec!["B", "C"], vec![], vec![]),
1042 ];
1043 let path = compute_critical_path(&units);
1044 assert_eq!(path.len(), 3);
1045 assert_eq!(path[0], "A");
1046 assert_eq!(path[1], "B");
1048 assert_eq!(path[2], "D");
1049 }
1050
1051 #[test]
1052 fn critical_path_picks_heaviest_branch() {
1053 let units = vec![
1057 make_unit("A", vec![], vec![], vec![]),
1058 make_unit("B", vec!["A"], vec![], vec![]),
1059 make_unit("C", vec!["B"], vec![], vec![]),
1060 make_unit("D", vec!["A"], vec![], vec![]),
1061 ];
1062 let path = compute_critical_path(&units);
1063 assert_eq!(path, vec!["A", "B", "C"]);
1064 }
1065
1066 #[test]
1067 fn critical_path_independent_units() {
1068 let units = vec![
1070 make_unit("A", vec![], vec![], vec![]),
1071 make_unit("B", vec![], vec![], vec![]),
1072 ];
1073 let path = compute_critical_path(&units);
1074 assert_eq!(path.len(), 1);
1075 }
1076}