Skip to main content

mana/commands/run/
wave.rs

1use std::collections::{HashMap, HashSet};
2use std::path::Path;
3use std::process::Command;
4use std::sync::{Arc, Mutex};
5use std::time::{Duration, Instant};
6
7use anyhow::Result;
8
9use crate::index::Index;
10use crate::stream::{self, StreamEvent};
11use crate::unit::Status;
12use crate::util::natural_cmp;
13
14use super::plan::SizedUnit;
15use super::ready_queue::run_single_direct;
16use super::{AgentResult, SpawnMode, UnitAction};
17
18/// A wave of units that can be dispatched in parallel.
19pub struct Wave {
20    pub units: Vec<SizedUnit>,
21}
22
23/// Compute waves of units grouped by dependency order.
24/// Wave 0: no deps. Wave 1: deps all in wave 0. Etc.
25pub(super) fn compute_waves(units: &[SizedUnit], index: &Index) -> Vec<Wave> {
26    let mut waves = Vec::new();
27    let unit_ids: HashSet<String> = units.iter().map(|b| b.id.clone()).collect();
28
29    // Already-closed units count as completed
30    let mut completed: HashSet<String> = index
31        .units
32        .iter()
33        .filter(|e| e.status == Status::Closed)
34        .map(|e| e.id.clone())
35        .collect();
36
37    let mut remaining: Vec<SizedUnit> = units.to_vec();
38
39    while !remaining.is_empty() {
40        let (ready, blocked): (Vec<SizedUnit>, Vec<SizedUnit>) =
41            remaining.into_iter().partition(|b| {
42                // All explicit deps must be completed or not in our dispatch set
43                let explicit_ok = b
44                    .dependencies
45                    .iter()
46                    .all(|d| completed.contains(d) || !unit_ids.contains(d));
47
48                // All requires must be satisfied (producer completed or not in set)
49                let requires_ok = b.requires.iter().all(|req| {
50                    // Find the sibling producer for this artifact
51                    if let Some(producer) = units.iter().find(|other| {
52                        other.id != b.id && other.parent == b.parent && other.produces.contains(req)
53                    }) {
54                        completed.contains(&producer.id)
55                    } else {
56                        true // No producer in set, assume satisfied
57                    }
58                });
59
60                explicit_ok && requires_ok
61            });
62
63        if ready.is_empty() {
64            // Remaining units have unresolvable deps (cycle or missing)
65            // Add them all as a final wave to avoid losing them
66            eprintln!(
67                "Warning: {} unit(s) have unresolvable dependencies, adding to final wave",
68                blocked.len()
69            );
70            waves.push(Wave { units: blocked });
71            break;
72        }
73
74        for b in &ready {
75            completed.insert(b.id.clone());
76        }
77
78        waves.push(Wave { units: ready });
79        remaining = blocked;
80    }
81
82    // Compute downstream weights for critical-path-aware sorting
83    let weights = compute_downstream_weights(units);
84
85    // Sort units within each wave: priority first, then downstream weight
86    // (descending — units that block the most work get scheduled first),
87    // then ID for stability.
88    for wave in &mut waves {
89        wave.units.sort_by(|a, b| {
90            a.priority
91                .cmp(&b.priority)
92                .then_with(|| {
93                    let wa = weights.get(&a.id).copied().unwrap_or(1);
94                    let wb = weights.get(&b.id).copied().unwrap_or(1);
95                    wb.cmp(&wa) // higher weight first
96                })
97                .then_with(|| natural_cmp(&a.id, &b.id))
98        });
99    }
100
101    waves
102}
103
104/// Compute downstream weight for each unit.
105/// Weight = 1 + count of all transitively dependent units.
106/// Units on the critical path will have the highest weights.
107pub(super) fn compute_downstream_weights(units: &[SizedUnit]) -> HashMap<String, u32> {
108    let unit_ids: HashSet<String> = units.iter().map(|b| b.id.clone()).collect();
109
110    // Build reverse dependency graph: for each unit, which units directly depend on it.
111    let mut reverse_deps: HashMap<String, Vec<String>> = HashMap::new();
112
113    for b in units {
114        reverse_deps.entry(b.id.clone()).or_default();
115
116        // Explicit dependencies: b depends on dep → dep's reverse_deps includes b
117        for dep in &b.dependencies {
118            if unit_ids.contains(dep) {
119                reverse_deps
120                    .entry(dep.clone())
121                    .or_default()
122                    .push(b.id.clone());
123            }
124        }
125
126        // Requires/produces: if b requires artifact X and producer makes X
127        // (same parent), then b depends on producer.
128        for req in &b.requires {
129            if let Some(producer) = units.iter().find(|other| {
130                other.id != b.id && other.parent == b.parent && other.produces.contains(req)
131            }) {
132                if unit_ids.contains(&producer.id) {
133                    reverse_deps
134                        .entry(producer.id.clone())
135                        .or_default()
136                        .push(b.id.clone());
137                }
138            }
139        }
140    }
141
142    // For each unit, count all transitively reachable descendants via BFS.
143    let mut weights: HashMap<String, u32> = HashMap::new();
144
145    for b in units {
146        let mut visited: HashSet<String> = HashSet::new();
147        let mut queue: Vec<String> = Vec::new();
148
149        // Seed with direct dependents
150        for dep in reverse_deps.get(&b.id).unwrap_or(&Vec::new()) {
151            if visited.insert(dep.clone()) {
152                queue.push(dep.clone());
153            }
154        }
155
156        // BFS to find all transitive dependents
157        while let Some(current) = queue.pop() {
158            for next in reverse_deps.get(&current).unwrap_or(&Vec::new()) {
159                if visited.insert(next.clone()) {
160                    queue.push(next.clone());
161                }
162            }
163        }
164
165        weights.insert(b.id.clone(), 1 + visited.len() as u32);
166    }
167
168    weights
169}
170
171/// Compute file conflict groups: files touched by more than one unit.
172/// Returns pairs of (file_path, vec_of_unit_ids).
173pub(super) fn compute_file_conflicts(units: &[SizedUnit]) -> Vec<(String, Vec<String>)> {
174    let mut file_to_units: HashMap<String, Vec<String>> = HashMap::new();
175    for b in units {
176        for p in &b.paths {
177            file_to_units
178                .entry(p.clone())
179                .or_default()
180                .push(b.id.clone());
181        }
182    }
183    let mut conflicts: Vec<(String, Vec<String>)> = file_to_units
184        .into_iter()
185        .filter(|(_, ids)| ids.len() > 1)
186        .collect();
187    conflicts.sort_by(|a, b| a.0.cmp(&b.0));
188    conflicts
189}
190
191/// Compute effective parallelism: max units that can run simultaneously
192/// without file path conflicts. Uses greedy selection.
193pub(super) fn compute_effective_parallelism(units: &[SizedUnit]) -> usize {
194    if units.is_empty() {
195        return 0;
196    }
197    let mut occupied: HashSet<String> = HashSet::new();
198    let mut count = 0;
199    for b in units {
200        if b.paths.is_empty() || !b.paths.iter().any(|p| occupied.contains(p)) {
201            for p in &b.paths {
202                occupied.insert(p.clone());
203            }
204            count += 1;
205        }
206    }
207    count
208}
209
210/// Find the critical path through the dependency graph.
211/// Returns the longest chain of unit IDs from root to leaf.
212pub(super) fn compute_critical_path(units: &[SizedUnit]) -> Vec<String> {
213    if units.is_empty() {
214        return vec![];
215    }
216
217    let weights = compute_downstream_weights(units);
218    let unit_ids: HashSet<String> = units.iter().map(|b| b.id.clone()).collect();
219
220    // Build forward dependency map: unit_id → units that depend on it
221    let mut dependents: HashMap<String, Vec<String>> = HashMap::new();
222    for b in units {
223        for dep in &b.dependencies {
224            if unit_ids.contains(dep) {
225                dependents
226                    .entry(dep.clone())
227                    .or_default()
228                    .push(b.id.clone());
229            }
230        }
231        for req in &b.requires {
232            if let Some(producer) = units.iter().find(|other| {
233                other.id != b.id && other.parent == b.parent && other.produces.contains(req)
234            }) {
235                if unit_ids.contains(&producer.id) {
236                    dependents
237                        .entry(producer.id.clone())
238                        .or_default()
239                        .push(b.id.clone());
240                }
241            }
242        }
243    }
244
245    // Start from unit with highest weight
246    let start = units
247        .iter()
248        .max_by(|a, b| {
249            let wa = weights.get(&a.id).copied().unwrap_or(0);
250            let wb = weights.get(&b.id).copied().unwrap_or(0);
251            wa.cmp(&wb).then_with(|| natural_cmp(&b.id, &a.id))
252        })
253        .unwrap();
254
255    let mut path = vec![start.id.clone()];
256    let mut current = start.id.clone();
257
258    // Follow the dependent with highest weight (greedy critical path)
259    loop {
260        let Some(deps) = dependents.get(&current) else {
261            break;
262        };
263        if deps.is_empty() {
264            break;
265        }
266        // Sort dependents: highest weight first, then natural ID for stability
267        let mut deps_sorted = deps.clone();
268        deps_sorted.sort_by(|a, b| {
269            let wa = weights.get(a).copied().unwrap_or(0);
270            let wb = weights.get(b).copied().unwrap_or(0);
271            wb.cmp(&wa).then_with(|| natural_cmp(a, b))
272        });
273        let next = &deps_sorted[0];
274        path.push(next.clone());
275        current = next.clone();
276    }
277
278    path
279}
280
281// ---------------------------------------------------------------------------
282// Wave execution
283// ---------------------------------------------------------------------------
284
285/// Spawn agents for a wave of units, respecting max parallelism.
286pub(super) fn run_wave(
287    mana_dir: &Path,
288    units: &[SizedUnit],
289    spawn_mode: &SpawnMode,
290    cfg: &super::RunConfig,
291    wave_number: usize,
292) -> Result<Vec<AgentResult>> {
293    match spawn_mode {
294        SpawnMode::Template {
295            run_template,
296            plan_template,
297        } => run_wave_template(
298            units,
299            run_template,
300            plan_template.as_deref(),
301            cfg.max_jobs,
302            cfg.timeout_minutes,
303            cfg.idle_timeout_minutes,
304            cfg.run_model.as_deref(),
305        ),
306        SpawnMode::Direct => run_wave_direct(
307            mana_dir,
308            units,
309            cfg.max_jobs,
310            cfg.timeout_minutes,
311            cfg.idle_timeout_minutes,
312            cfg.run_model.as_deref(),
313            cfg.json_stream,
314            wave_number,
315            cfg.file_locking,
316        ),
317    }
318}
319
320/// Template mode: spawn agents via `sh -c <template>` (backward compat).
321fn run_wave_template(
322    units: &[SizedUnit],
323    run_template: &str,
324    _plan_template: Option<&str>,
325    max_jobs: usize,
326    timeout_minutes: u32,
327    idle_timeout_minutes: u32,
328    config_run_model: Option<&str>,
329) -> Result<Vec<AgentResult>> {
330    let total_timeout = Duration::from_secs(timeout_minutes as u64 * 60);
331    let _idle_timeout = Duration::from_secs(idle_timeout_minutes as u64 * 60);
332    // Note: idle timeout based on stdout activity is only enforced in Direct mode
333    // (via timeout::monitor_process). Template mode enforces total timeout only.
334
335    let mut results = Vec::new();
336    // Track: unit, child process, start time, last stdout activity time
337    let mut children: Vec<(SizedUnit, std::process::Child, Instant, Instant)> = Vec::new();
338
339    let mut pending: Vec<&SizedUnit> = units.iter().collect();
340
341    while !pending.is_empty() || !children.is_empty() {
342        // Check for shutdown signal
343        if super::shutdown_requested() {
344            for (sb, mut child, started, _) in children {
345                let _ = child.kill();
346                let _ = child.wait();
347                results.push(AgentResult {
348                    id: sb.id.clone(),
349                    title: sb.title.clone(),
350                    action: sb.action,
351                    success: false,
352                    duration: started.elapsed(),
353                    total_tokens: None,
354                    total_cost: None,
355                    error: Some("Interrupted by shutdown signal".to_string()),
356                    tool_count: 0,
357                    turns: 0,
358                    failure_summary: None,
359                });
360            }
361            return Ok(results);
362        }
363
364        // Spawn up to max_jobs
365        while children.len() < max_jobs && !pending.is_empty() {
366            let sb = pending.remove(0);
367            let template = match sb.action {
368                UnitAction::Implement => run_template,
369            };
370
371            // Model precedence: unit-level override > config-level > no substitution
372            let effective_model = sb.model.as_deref().or(config_run_model);
373            let cmd =
374                crate::spawner::substitute_template_with_model(template, &sb.id, effective_model);
375            match Command::new("sh").args(["-c", &cmd]).spawn() {
376                Ok(child) => {
377                    let now = Instant::now();
378                    children.push((sb.clone(), child, now, now));
379                }
380                Err(e) => {
381                    eprintln!("  Failed to spawn agent for {}: {}", sb.id, e);
382                    results.push(AgentResult {
383                        id: sb.id.clone(),
384                        title: sb.title.clone(),
385                        action: sb.action,
386                        success: false,
387                        duration: Duration::ZERO,
388                        total_tokens: None,
389                        total_cost: None,
390                        error: Some(format!("Failed to spawn: {}", e)),
391                        tool_count: 0,
392                        turns: 0,
393                        failure_summary: None,
394                    });
395                }
396            }
397        }
398
399        if children.is_empty() {
400            break;
401        }
402
403        // Poll for completions and enforce timeouts
404        let mut still_running = Vec::new();
405        for (sb, mut child, started, last_activity) in children {
406            match child.try_wait() {
407                Ok(Some(status)) => {
408                    let err = if status.success() {
409                        None
410                    } else {
411                        Some(format!("Exit code {}", status.code().unwrap_or(-1)))
412                    };
413                    results.push(AgentResult {
414                        id: sb.id.clone(),
415                        title: sb.title.clone(),
416                        action: sb.action,
417                        success: status.success(),
418                        duration: started.elapsed(),
419                        total_tokens: None,
420                        total_cost: None,
421                        error: err,
422                        tool_count: 0,
423                        turns: 0,
424                        failure_summary: None,
425                    });
426                }
427                Ok(None) => {
428                    // Process still running — check timeouts
429                    let elapsed = started.elapsed();
430
431                    if !total_timeout.is_zero() && elapsed > total_timeout {
432                        eprintln!(
433                            "  ⚠ {} total timeout ({}m) — killing",
434                            sb.id, timeout_minutes
435                        );
436                        let _ = child.kill();
437                        let _ = child.wait();
438                        results.push(AgentResult {
439                            id: sb.id.clone(),
440                            title: sb.title.clone(),
441                            action: sb.action,
442                            success: false,
443                            duration: elapsed,
444                            total_tokens: None,
445                            total_cost: None,
446                            error: Some(format!("Total timeout exceeded ({}m)", timeout_minutes)),
447                            tool_count: 0,
448                            turns: 0,
449                            failure_summary: None,
450                        });
451                    } else {
452                        still_running.push((sb, child, started, last_activity));
453                    }
454                }
455                Err(e) => {
456                    eprintln!("  Error checking agent for {}: {}", sb.id, e);
457                    results.push(AgentResult {
458                        id: sb.id.clone(),
459                        title: sb.title.clone(),
460                        action: sb.action,
461                        success: false,
462                        duration: started.elapsed(),
463                        total_tokens: None,
464                        total_cost: None,
465                        error: Some(format!("Error checking process: {}", e)),
466                        tool_count: 0,
467                        turns: 0,
468                        failure_summary: None,
469                    });
470                }
471            }
472        }
473        children = still_running;
474
475        if !children.is_empty() {
476            std::thread::sleep(Duration::from_millis(500));
477        }
478    }
479
480    Ok(results)
481}
482
483/// Direct mode: spawn pi directly with JSON output and monitoring.
484#[allow(clippy::too_many_arguments)]
485fn run_wave_direct(
486    mana_dir: &Path,
487    units: &[SizedUnit],
488    max_jobs: usize,
489    timeout_minutes: u32,
490    idle_timeout_minutes: u32,
491    config_run_model: Option<&str>,
492    json_stream: bool,
493    wave_number: usize,
494    file_locking: bool,
495) -> Result<Vec<AgentResult>> {
496    let results = Arc::new(Mutex::new(Vec::new()));
497    let mut pending: Vec<SizedUnit> = units.to_vec();
498    let mut handles: Vec<std::thread::JoinHandle<()>> = Vec::new();
499
500    while !pending.is_empty() || !handles.is_empty() {
501        // Check for shutdown signal
502        if super::shutdown_requested() {
503            super::kill_all_children();
504            // Wait for threads to finish (they should exit after children are killed)
505            for handle in handles {
506                let _ = handle.join();
507            }
508            return Ok(Arc::try_unwrap(results).unwrap().into_inner().unwrap());
509        }
510
511        // Spawn up to max_jobs threads
512        while handles.len() < max_jobs && !pending.is_empty() {
513            let sb = pending.remove(0);
514            let mana_dir = mana_dir.to_path_buf();
515            let results = Arc::clone(&results);
516            let timeout_min = timeout_minutes;
517            let idle_min = idle_timeout_minutes;
518            let config_run_model = config_run_model.map(str::to_string);
519
520            if json_stream {
521                stream::emit(&StreamEvent::UnitStart {
522                    id: sb.id.clone(),
523                    title: sb.title.clone(),
524                    round: wave_number,
525                    file_overlaps: None,
526                    attempt: None,
527                    priority: None,
528                });
529            }
530
531            let handle = std::thread::spawn(move || {
532                let result = run_single_direct(
533                    &mana_dir,
534                    &sb,
535                    timeout_min,
536                    idle_min,
537                    config_run_model.as_deref(),
538                    json_stream,
539                    file_locking,
540                    false, // batch_verify not used in template/wave mode
541                );
542                results.lock().unwrap().push(result);
543            });
544            handles.push(handle);
545        }
546
547        // Wait for at least one thread to finish
548        let prev_count = handles.len();
549        let mut still_running = Vec::new();
550        for handle in handles.drain(..) {
551            if handle.is_finished() {
552                let _ = handle.join();
553            } else {
554                still_running.push(handle);
555            }
556        }
557
558        // If nothing finished, wait briefly before polling again
559        if still_running.len() == prev_count && !still_running.is_empty() {
560            std::thread::sleep(Duration::from_millis(200));
561        }
562
563        handles = still_running;
564    }
565
566    // Wait for any remaining threads
567    for handle in handles {
568        let _ = handle.join();
569    }
570
571    Ok(Arc::try_unwrap(results).unwrap().into_inner().unwrap())
572}
573
574#[cfg(test)]
575mod tests {
576    use super::*;
577    use crate::commands::run::UnitAction;
578    use crate::index::Index;
579
580    #[test]
581    fn compute_waves_no_deps() {
582        let index = Index { units: vec![] };
583        let units = vec![
584            SizedUnit {
585                id: "1".to_string(),
586                title: "A".to_string(),
587                action: UnitAction::Implement,
588                priority: 2,
589                dependencies: vec![],
590                parent: None,
591                produces: vec![],
592                requires: vec![],
593                paths: vec![],
594                model: None,
595            },
596            SizedUnit {
597                id: "2".to_string(),
598                title: "B".to_string(),
599                action: UnitAction::Implement,
600                priority: 2,
601                dependencies: vec![],
602                parent: None,
603                produces: vec![],
604                requires: vec![],
605                paths: vec![],
606                model: None,
607            },
608        ];
609        let waves = compute_waves(&units, &index);
610        assert_eq!(waves.len(), 1);
611        assert_eq!(waves[0].units.len(), 2);
612    }
613
614    #[test]
615    fn compute_waves_linear_chain() {
616        let index = Index { units: vec![] };
617        let units = vec![
618            SizedUnit {
619                id: "1".to_string(),
620                title: "A".to_string(),
621                action: UnitAction::Implement,
622                priority: 2,
623                dependencies: vec![],
624                parent: None,
625                produces: vec![],
626                requires: vec![],
627                paths: vec![],
628                model: None,
629            },
630            SizedUnit {
631                id: "2".to_string(),
632                title: "B".to_string(),
633                action: UnitAction::Implement,
634                priority: 2,
635                dependencies: vec!["1".to_string()],
636                parent: None,
637                produces: vec![],
638                requires: vec![],
639                paths: vec![],
640                model: None,
641            },
642            SizedUnit {
643                id: "3".to_string(),
644                title: "C".to_string(),
645                action: UnitAction::Implement,
646                priority: 2,
647                dependencies: vec!["2".to_string()],
648                parent: None,
649                produces: vec![],
650                requires: vec![],
651                paths: vec![],
652                model: None,
653            },
654        ];
655        let waves = compute_waves(&units, &index);
656        assert_eq!(waves.len(), 3);
657        assert_eq!(waves[0].units[0].id, "1");
658        assert_eq!(waves[1].units[0].id, "2");
659        assert_eq!(waves[2].units[0].id, "3");
660    }
661
662    #[test]
663    fn compute_waves_diamond() {
664        let index = Index { units: vec![] };
665        // 1 → (2, 3) → 4
666        let units = vec![
667            SizedUnit {
668                id: "1".to_string(),
669                title: "Root".to_string(),
670                action: UnitAction::Implement,
671                priority: 2,
672                dependencies: vec![],
673                parent: None,
674                produces: vec![],
675                requires: vec![],
676                paths: vec![],
677                model: None,
678            },
679            SizedUnit {
680                id: "2".to_string(),
681                title: "Left".to_string(),
682                action: UnitAction::Implement,
683                priority: 2,
684                dependencies: vec!["1".to_string()],
685                parent: None,
686                produces: vec![],
687                requires: vec![],
688                paths: vec![],
689                model: None,
690            },
691            SizedUnit {
692                id: "3".to_string(),
693                title: "Right".to_string(),
694                action: UnitAction::Implement,
695                priority: 2,
696                dependencies: vec!["1".to_string()],
697                parent: None,
698                produces: vec![],
699                requires: vec![],
700                paths: vec![],
701                model: None,
702            },
703            SizedUnit {
704                id: "4".to_string(),
705                title: "Join".to_string(),
706                action: UnitAction::Implement,
707                priority: 2,
708                dependencies: vec!["2".to_string(), "3".to_string()],
709                parent: None,
710                produces: vec![],
711                requires: vec![],
712                paths: vec![],
713                model: None,
714            },
715        ];
716        let waves = compute_waves(&units, &index);
717        assert_eq!(waves.len(), 3);
718        assert_eq!(waves[0].units.len(), 1); // 1
719        assert_eq!(waves[1].units.len(), 2); // 2, 3
720        assert_eq!(waves[2].units.len(), 1); // 4
721    }
722
723    #[test]
724    fn template_wave_execution_with_echo() {
725        let units = vec![SizedUnit {
726            id: "1".to_string(),
727            title: "Test".to_string(),
728            action: UnitAction::Implement,
729            priority: 2,
730            dependencies: vec![],
731            parent: None,
732            produces: vec![],
733            requires: vec![],
734            paths: vec![],
735            model: None,
736        }];
737
738        let results = run_wave_template(&units, "echo {id}", None, 4, 30, 5, None).unwrap();
739        assert_eq!(results.len(), 1);
740        assert!(results[0].success);
741        assert_eq!(results[0].id, "1");
742    }
743
744    #[test]
745    fn template_wave_runs_implement_action() {
746        let units = vec![SizedUnit {
747            id: "1".to_string(),
748            title: "Test".to_string(),
749            action: UnitAction::Implement,
750            priority: 2,
751            dependencies: vec![],
752            parent: None,
753            produces: vec![],
754            requires: vec![],
755            paths: vec![],
756            model: None,
757        }];
758
759        let results = run_wave_template(&units, "echo {id}", None, 4, 30, 5, None).unwrap();
760        assert_eq!(results.len(), 1);
761        assert!(results[0].success);
762        assert_eq!(results[0].id, "1");
763    }
764
765    #[test]
766    fn template_wave_failed_command() {
767        let units = vec![SizedUnit {
768            id: "1".to_string(),
769            title: "Fail".to_string(),
770            action: UnitAction::Implement,
771            priority: 2,
772            dependencies: vec![],
773            parent: None,
774            produces: vec![],
775            requires: vec![],
776            paths: vec![],
777            model: None,
778        }];
779
780        let results = run_wave_template(&units, "false", None, 4, 30, 5, None).unwrap();
781        assert_eq!(results.len(), 1);
782        assert!(!results[0].success);
783        assert!(results[0].error.is_some());
784    }
785
786    // -- downstream weight tests --
787
788    fn make_unit(id: &str, deps: Vec<&str>, produces: Vec<&str>, requires: Vec<&str>) -> SizedUnit {
789        SizedUnit {
790            id: id.to_string(),
791            title: format!("Unit {}", id),
792            action: UnitAction::Implement,
793            priority: 2,
794            dependencies: deps.into_iter().map(|s| s.to_string()).collect(),
795            parent: Some("p".to_string()),
796            produces: produces.into_iter().map(|s| s.to_string()).collect(),
797            requires: requires.into_iter().map(|s| s.to_string()).collect(),
798            paths: vec![],
799            model: None,
800        }
801    }
802
803    #[test]
804    fn downstream_weights_single_unit() {
805        let units = vec![make_unit("A", vec![], vec![], vec![])];
806        let weights = compute_downstream_weights(&units);
807        assert_eq!(weights.get("A").copied(), Some(1));
808    }
809
810    #[test]
811    fn downstream_weights_linear_chain() {
812        // A → B → C (B depends on A, C depends on B)
813        let units = vec![
814            make_unit("A", vec![], vec![], vec![]),
815            make_unit("B", vec!["A"], vec![], vec![]),
816            make_unit("C", vec!["B"], vec![], vec![]),
817        ];
818        let weights = compute_downstream_weights(&units);
819        assert_eq!(weights.get("A").copied(), Some(3)); // blocks B and C
820        assert_eq!(weights.get("B").copied(), Some(2)); // blocks C
821        assert_eq!(weights.get("C").copied(), Some(1)); // leaf
822    }
823
824    #[test]
825    fn downstream_weights_diamond() {
826        // A → (B, C) → D
827        let units = vec![
828            make_unit("A", vec![], vec![], vec![]),
829            make_unit("B", vec!["A"], vec![], vec![]),
830            make_unit("C", vec!["A"], vec![], vec![]),
831            make_unit("D", vec!["B", "C"], vec![], vec![]),
832        ];
833        let weights = compute_downstream_weights(&units);
834        assert_eq!(weights.get("D").copied(), Some(1)); // leaf
835        assert_eq!(weights.get("B").copied(), Some(2)); // blocks D
836        assert_eq!(weights.get("C").copied(), Some(2)); // blocks D
837        assert_eq!(weights.get("A").copied(), Some(4)); // blocks B, C, D (3 descendants + 1)
838    }
839
840    #[test]
841    fn downstream_weights_independent() {
842        let units = vec![
843            make_unit("A", vec![], vec![], vec![]),
844            make_unit("B", vec![], vec![], vec![]),
845        ];
846        let weights = compute_downstream_weights(&units);
847        assert_eq!(weights.get("A").copied(), Some(1));
848        assert_eq!(weights.get("B").copied(), Some(1));
849    }
850
851    #[test]
852    fn downstream_weights_wide_fan() {
853        // A → (B, C, D)
854        let units = vec![
855            make_unit("A", vec![], vec![], vec![]),
856            make_unit("B", vec!["A"], vec![], vec![]),
857            make_unit("C", vec!["A"], vec![], vec![]),
858            make_unit("D", vec!["A"], vec![], vec![]),
859        ];
860        let weights = compute_downstream_weights(&units);
861        assert_eq!(weights.get("A").copied(), Some(4)); // 1 + 1 + 1 + 1
862        assert_eq!(weights.get("B").copied(), Some(1));
863        assert_eq!(weights.get("C").copied(), Some(1));
864        assert_eq!(weights.get("D").copied(), Some(1));
865    }
866
867    // -- wave sorting by downstream weight tests --
868
869    #[test]
870    fn compute_waves_sorts_by_downstream_weight() {
871        let index = Index { units: vec![] };
872        // Wave 1: A, B, C (no deps among each other, same priority)
873        // D depends on A → A has weight 2
874        // E and F depend on B → B has weight 3
875        // C is leaf → weight 1
876        let units = vec![
877            make_unit("A", vec![], vec![], vec![]),
878            make_unit("B", vec![], vec![], vec![]),
879            make_unit("C", vec![], vec![], vec![]),
880            make_unit("D", vec!["A"], vec![], vec![]),
881            make_unit("E", vec!["B"], vec![], vec![]),
882            make_unit("F", vec!["B"], vec![], vec![]),
883        ];
884        let waves = compute_waves(&units, &index);
885        assert_eq!(waves.len(), 2);
886        // Wave 1 sorted by weight desc: B(3), A(2), C(1)
887        assert_eq!(waves[0].units[0].id, "B");
888        assert_eq!(waves[0].units[1].id, "A");
889        assert_eq!(waves[0].units[2].id, "C");
890    }
891
892    #[test]
893    fn compute_waves_weight_sorting_preserves_priority() {
894        let index = Index { units: vec![] };
895        // A has priority 1, B has priority 2 — A first despite lower weight
896        let mut a = make_unit("A", vec![], vec![], vec![]);
897        a.priority = 1;
898        let mut b = make_unit("B", vec![], vec![], vec![]);
899        b.priority = 2;
900        // C depends on B → B has weight 2, A has weight 1
901        let c = make_unit("C", vec!["B"], vec![], vec![]);
902        let units = vec![a, b, c];
903        let waves = compute_waves(&units, &index);
904        // Wave 1: A (pri 1) before B (pri 2), despite B having higher weight
905        assert_eq!(waves[0].units[0].id, "A");
906        assert_eq!(waves[0].units[1].id, "B");
907    }
908
909    // -- file conflict tests --
910
911    fn make_unit_with_paths(id: &str, deps: Vec<&str>, paths: Vec<&str>) -> SizedUnit {
912        SizedUnit {
913            id: id.to_string(),
914            title: format!("Unit {}", id),
915            action: UnitAction::Implement,
916            priority: 2,
917            dependencies: deps.into_iter().map(|s| s.to_string()).collect(),
918            parent: Some("p".to_string()),
919            produces: vec![],
920            requires: vec![],
921            paths: paths.into_iter().map(|s| s.to_string()).collect(),
922            model: None,
923        }
924    }
925
926    #[test]
927    fn file_conflicts_detected() {
928        let units = vec![
929            make_unit_with_paths("A", vec![], vec!["src/lib.rs", "src/a.rs"]),
930            make_unit_with_paths("B", vec![], vec!["src/lib.rs", "src/b.rs"]),
931            make_unit_with_paths("C", vec![], vec!["src/c.rs"]),
932        ];
933        let conflicts = compute_file_conflicts(&units);
934        assert_eq!(conflicts.len(), 1);
935        assert_eq!(conflicts[0].0, "src/lib.rs");
936        assert!(conflicts[0].1.contains(&"A".to_string()));
937        assert!(conflicts[0].1.contains(&"B".to_string()));
938    }
939
940    #[test]
941    fn file_conflicts_empty_when_no_overlap() {
942        let units = vec![
943            make_unit_with_paths("A", vec![], vec!["src/a.rs"]),
944            make_unit_with_paths("B", vec![], vec!["src/b.rs"]),
945        ];
946        let conflicts = compute_file_conflicts(&units);
947        assert!(conflicts.is_empty());
948    }
949
950    #[test]
951    fn file_conflicts_multiple_files() {
952        let units = vec![
953            make_unit_with_paths("A", vec![], vec!["src/lib.rs", "src/mod.rs"]),
954            make_unit_with_paths("B", vec![], vec!["src/lib.rs"]),
955            make_unit_with_paths("C", vec![], vec!["src/mod.rs"]),
956        ];
957        let conflicts = compute_file_conflicts(&units);
958        assert_eq!(conflicts.len(), 2);
959        // Sorted by file path
960        assert_eq!(conflicts[0].0, "src/lib.rs");
961        assert_eq!(conflicts[1].0, "src/mod.rs");
962    }
963
964    // -- effective parallelism tests --
965
966    #[test]
967    fn effective_parallelism_no_conflicts() {
968        let units = vec![
969            make_unit_with_paths("A", vec![], vec!["src/a.rs"]),
970            make_unit_with_paths("B", vec![], vec!["src/b.rs"]),
971            make_unit_with_paths("C", vec![], vec!["src/c.rs"]),
972        ];
973        assert_eq!(compute_effective_parallelism(&units), 3);
974    }
975
976    #[test]
977    fn effective_parallelism_with_conflict() {
978        let units = vec![
979            make_unit_with_paths("A", vec![], vec!["src/lib.rs"]),
980            make_unit_with_paths("B", vec![], vec!["src/lib.rs"]),
981            make_unit_with_paths("C", vec![], vec!["src/c.rs"]),
982        ];
983        // A takes src/lib.rs, B is blocked, C can run → 2
984        assert_eq!(compute_effective_parallelism(&units), 2);
985    }
986
987    #[test]
988    fn effective_parallelism_all_conflict() {
989        let units = vec![
990            make_unit_with_paths("A", vec![], vec!["src/shared.rs"]),
991            make_unit_with_paths("B", vec![], vec!["src/shared.rs"]),
992            make_unit_with_paths("C", vec![], vec!["src/shared.rs"]),
993        ];
994        // Only one can run at a time
995        assert_eq!(compute_effective_parallelism(&units), 1);
996    }
997
998    #[test]
999    fn effective_parallelism_empty_paths_no_conflict() {
1000        let units = vec![
1001            make_unit_with_paths("A", vec![], vec![]),
1002            make_unit_with_paths("B", vec![], vec![]),
1003            make_unit_with_paths("C", vec![], vec!["src/c.rs"]),
1004        ];
1005        // Empty paths never conflict
1006        assert_eq!(compute_effective_parallelism(&units), 3);
1007    }
1008
1009    #[test]
1010    fn effective_parallelism_empty_input() {
1011        assert_eq!(compute_effective_parallelism(&[]), 0);
1012    }
1013
1014    // -- critical path tests --
1015
1016    #[test]
1017    fn critical_path_single_unit() {
1018        let units = vec![make_unit("A", vec![], vec![], vec![])];
1019        let path = compute_critical_path(&units);
1020        assert_eq!(path, vec!["A"]);
1021    }
1022
1023    #[test]
1024    fn critical_path_linear_chain() {
1025        let units = vec![
1026            make_unit("A", vec![], vec![], vec![]),
1027            make_unit("B", vec!["A"], vec![], vec![]),
1028            make_unit("C", vec!["B"], vec![], vec![]),
1029        ];
1030        let path = compute_critical_path(&units);
1031        assert_eq!(path, vec!["A", "B", "C"]);
1032    }
1033
1034    #[test]
1035    fn critical_path_diamond() {
1036        // A → (B, C) → D
1037        let units = vec![
1038            make_unit("A", vec![], vec![], vec![]),
1039            make_unit("B", vec!["A"], vec![], vec![]),
1040            make_unit("C", vec!["A"], vec![], vec![]),
1041            make_unit("D", vec!["B", "C"], vec![], vec![]),
1042        ];
1043        let path = compute_critical_path(&units);
1044        assert_eq!(path.len(), 3);
1045        assert_eq!(path[0], "A");
1046        // B and C have equal weight; tie broken by natural ID order → B
1047        assert_eq!(path[1], "B");
1048        assert_eq!(path[2], "D");
1049    }
1050
1051    #[test]
1052    fn critical_path_picks_heaviest_branch() {
1053        // A → B → C (long branch)
1054        // A → D (short branch)
1055        // Critical path should be A → B → C
1056        let units = vec![
1057            make_unit("A", vec![], vec![], vec![]),
1058            make_unit("B", vec!["A"], vec![], vec![]),
1059            make_unit("C", vec!["B"], vec![], vec![]),
1060            make_unit("D", vec!["A"], vec![], vec![]),
1061        ];
1062        let path = compute_critical_path(&units);
1063        assert_eq!(path, vec!["A", "B", "C"]);
1064    }
1065
1066    #[test]
1067    fn critical_path_independent_units() {
1068        // No deps — all have weight 1. Path is just the first one (by ID).
1069        let units = vec![
1070            make_unit("A", vec![], vec![], vec![]),
1071            make_unit("B", vec![], vec![], vec![]),
1072        ];
1073        let path = compute_critical_path(&units);
1074        assert_eq!(path.len(), 1);
1075    }
1076}