Skip to main content

mana_core/ops/
run.rs

1//! Core orchestration operations for `mana run`.
2//!
3//! This module exposes the dependency-aware scheduling logic that imp_orch
4//! and other library consumers need to compute ready queues and execution plans
5//! without depending on the CLI layer.
6//!
7//! # Key types
8//!
9//! - [`ReadyQueue`] — result of computing which units are ready to dispatch
10//! - [`ReadyUnit`] — a single dispatchable unit with scheduling metadata
11//! - [`RunPlan`] — a full execution plan grouped into waves
12//! - [`RunWave`] — a group of units that can run concurrently
13//!
14//! # Key functions
15//!
16//! - [`compute_ready_queue`] — find all dispatchable units with priority/weight ordering
17//! - [`compute_run_plan`] — group ready units into dependency-ordered waves
18
19use std::collections::{HashMap, HashSet};
20use std::path::Path;
21
22use anyhow::Result;
23
24use crate::blocking::{check_blocked_with_archive, check_scope_warning, ScopeWarning};
25use crate::discovery::find_unit_file;
26use crate::index::{ArchiveIndex, Index, IndexEntry};
27use crate::unit::{AttemptOutcome, AutonomyBlockerCode, Status, Unit};
28use crate::util::natural_cmp;
29
30// ---------------------------------------------------------------------------
31// Public types
32// ---------------------------------------------------------------------------
33
34#[derive(Debug, Clone, PartialEq, Eq)]
35pub enum RunTarget {
36    AllReady,
37    Unit(String),
38    Explicit(Vec<String>),
39}
40
41fn parent_id_for(index: &Index, unit_id: &str) -> Option<String> {
42    index
43        .units
44        .iter()
45        .find(|entry| entry.id == unit_id)
46        .and_then(|entry| entry.parent.clone())
47}
48
49fn is_descendant_of(index: &Index, unit_id: &str, ancestor_id: &str) -> bool {
50    let mut current = parent_id_for(index, unit_id);
51
52    while let Some(parent_id) = current {
53        if parent_id == ancestor_id {
54            return true;
55        }
56        current = parent_id_for(index, &parent_id);
57    }
58
59    false
60}
61
62fn has_open_descendants(index: &Index, unit_id: &str) -> bool {
63    index
64        .units
65        .iter()
66        .any(|entry| entry.status != Status::Closed && is_descendant_of(index, &entry.id, unit_id))
67}
68
69fn matches_target(index: &Index, entry: &IndexEntry, target: &RunTarget) -> bool {
70    match target {
71        RunTarget::AllReady => true,
72        RunTarget::Unit(filter_id) => {
73            let target_has_open_descendants = index.units.iter().any(|candidate| {
74                candidate.status != Status::Closed
75                    && is_descendant_of(index, &candidate.id, filter_id)
76            });
77
78            if target_has_open_descendants {
79                is_descendant_of(index, &entry.id, filter_id)
80                    && !has_open_descendants(index, &entry.id)
81            } else {
82                entry.id == *filter_id
83            }
84        }
85        RunTarget::Explicit(ids) => ids
86            .iter()
87            .any(|id| matches_target(index, entry, &RunTarget::Unit(id.clone()))),
88    }
89}
90
91/// A unit that is ready to be dispatched.
92#[derive(Debug, Clone, PartialEq)]
93pub struct ReadyUnit {
94    pub id: String,
95    pub title: String,
96    /// Lower is higher priority (1 = P1, etc.).
97    pub priority: u8,
98    /// Downstream dependency weight for critical-path scheduling.
99    /// Higher weight = more downstream units blocked = schedule first.
100    pub critical_path_weight: u32,
101    /// Files this unit will modify (for conflict detection).
102    pub paths: Vec<String>,
103    /// Artifacts this unit produces.
104    pub produces: Vec<String>,
105    /// Artifacts this unit requires from siblings.
106    pub requires: Vec<String>,
107    /// Explicit dependency IDs.
108    pub dependencies: Vec<String>,
109    /// Parent unit ID (for sibling produces/requires resolution).
110    pub parent: Option<String>,
111    /// Optional fast verify command to run before the full verify gate.
112    pub verify_fast: Option<String>,
113    /// Deferred verify command for grouped post-agent verification.
114    pub verify_command: Option<String>,
115    /// Retry context derived from prior attempts without depending on pool/runtime crates.
116    pub retry: RunRetryContext,
117    /// Per-unit model override from frontmatter.
118    pub model: Option<String>,
119}
120
121/// Retry context derived from a unit's attempt history.
122#[derive(Debug, Clone, PartialEq)]
123pub struct RunRetryContext {
124    pub attempt_number: u32,
125    pub previous_failure: Option<String>,
126    pub previous_notes: Vec<String>,
127}
128
129/// A non-blocking warning for a unit that will still dispatch.
130#[derive(Debug, Clone, PartialEq)]
131pub struct RunScopeWarning {
132    pub id: String,
133    pub warning: ScopeWarning,
134}
135
136/// A unit that was excluded from dispatch.
137#[derive(Debug, Clone, PartialEq)]
138pub struct BlockedUnit {
139    pub id: String,
140    pub title: String,
141    pub reason: String,
142    /// Canonical autonomy blocker code when this blocked reason maps to the
143    /// scheduler-visible autonomy contract.
144    pub blocker: Option<AutonomyBlockerCode>,
145    /// Unresolved decision prompts when `blocker == Some(UnresolvedDecision)`.
146    pub decisions: Vec<String>,
147}
148
149/// The result of computing the ready queue.
150#[derive(Debug, Clone)]
151pub struct ReadyQueue {
152    /// Units ready to dispatch, sorted by priority then critical-path weight.
153    pub units: Vec<ReadyUnit>,
154    /// Units that are blocked by autonomy/scope/dependency guardrails.
155    pub blocked: Vec<BlockedUnit>,
156    /// Scope warnings for units that will dispatch.
157    pub warnings: Vec<RunScopeWarning>,
158}
159
160/// A wave of units that can run concurrently (no inter-wave dependencies).
161#[derive(Debug, Clone)]
162pub struct RunWave {
163    /// Units in this wave, sorted by priority then critical-path weight.
164    pub units: Vec<ReadyUnit>,
165}
166
167/// A full execution plan grouped into dependency-ordered waves.
168#[derive(Debug, Clone)]
169pub struct RunPlan {
170    /// Ordered waves (wave 0 has no deps, wave 1 depends on wave 0, etc.).
171    pub waves: Vec<RunWave>,
172    /// Total number of dispatchable units across all waves.
173    pub total_units: usize,
174    /// Units that cannot be dispatched.
175    pub blocked: Vec<BlockedUnit>,
176    /// Scope warnings for units that will dispatch.
177    pub warnings: Vec<RunScopeWarning>,
178}
179
180// ---------------------------------------------------------------------------
181// Core computation
182// ---------------------------------------------------------------------------
183
184/// Check if all dependencies of a unit are satisfied.
185///
186/// A dependency is satisfied if it is closed in the active index, or present
187/// in the archive (archived units are always considered closed). Dependencies
188/// not found in either index are treated as unsatisfied to catch typos.
189pub fn all_deps_closed(entry: &IndexEntry, index: &Index, archive: &ArchiveIndex) -> bool {
190    for dep_id in &entry.dependencies {
191        match index.units.iter().find(|e| e.id == *dep_id) {
192            Some(dep) if dep.status == Status::Closed => {}
193            Some(_) => return false,
194            None => {
195                if !archive.units.iter().any(|e| e.id == *dep_id) {
196                    return false;
197                }
198            }
199        }
200    }
201
202    for required in &entry.requires {
203        if let Some(producer) = index
204            .units
205            .iter()
206            .find(|e| e.id != entry.id && e.parent == entry.parent && e.produces.contains(required))
207        {
208            if producer.status != Status::Closed {
209                return false;
210            }
211        }
212        // If producer is in archive (archived = closed) or not found, treat as satisfied
213    }
214
215    true
216}
217
218/// Compute downstream dependency weights for critical-path scheduling.
219///
220/// Each unit's weight is `1 + count of all transitively dependent units`.
221/// Units on the critical path (most blocked work downstream) get the highest weight.
222pub fn compute_downstream_weights(units: &[ReadyUnit]) -> HashMap<String, u32> {
223    let unit_ids: HashSet<String> = units.iter().map(|u| u.id.clone()).collect();
224
225    // Build reverse dependency graph: dep → Vec<dependents>
226    let mut reverse_deps: HashMap<String, Vec<String>> = HashMap::new();
227
228    for u in units {
229        reverse_deps.entry(u.id.clone()).or_default();
230
231        for dep in &u.dependencies {
232            if unit_ids.contains(dep) {
233                reverse_deps
234                    .entry(dep.clone())
235                    .or_default()
236                    .push(u.id.clone());
237            }
238        }
239
240        for req in &u.requires {
241            if let Some(producer) = units.iter().find(|other| {
242                other.id != u.id && other.parent == u.parent && other.produces.contains(req)
243            }) {
244                if unit_ids.contains(&producer.id) {
245                    reverse_deps
246                        .entry(producer.id.clone())
247                        .or_default()
248                        .push(u.id.clone());
249                }
250            }
251        }
252    }
253
254    let mut weights: HashMap<String, u32> = HashMap::new();
255
256    for u in units {
257        let mut visited: HashSet<String> = HashSet::new();
258        let mut queue: Vec<String> = Vec::new();
259
260        for dep in reverse_deps.get(&u.id).unwrap_or(&Vec::new()) {
261            if visited.insert(dep.clone()) {
262                queue.push(dep.clone());
263            }
264        }
265
266        while let Some(current) = queue.pop() {
267            for next in reverse_deps.get(&current).unwrap_or(&Vec::new()) {
268                if visited.insert(next.clone()) {
269                    queue.push(next.clone());
270                }
271            }
272        }
273
274        weights.insert(u.id.clone(), 1 + visited.len() as u32);
275    }
276
277    weights
278}
279
280/// Check if a unit's dependencies are all satisfied within a dispatch set.
281fn is_unit_ready(
282    unit: &ReadyUnit,
283    completed: &HashSet<String>,
284    all_unit_ids: &HashSet<String>,
285    all_units: &[ReadyUnit],
286) -> bool {
287    let explicit_ok = unit
288        .dependencies
289        .iter()
290        .all(|d| completed.contains(d) || !all_unit_ids.contains(d));
291
292    let requires_ok = unit.requires.iter().all(|req| {
293        if let Some(producer) = all_units.iter().find(|other| {
294            other.id != unit.id && other.parent == unit.parent && other.produces.contains(req)
295        }) {
296            completed.contains(&producer.id)
297        } else {
298            true
299        }
300    });
301
302    explicit_ok && requires_ok
303}
304
305/// Sort a list of units by priority (ascending) then critical-path weight (descending) then ID.
306fn sort_units(units: &mut [ReadyUnit], weights: &HashMap<String, u32>) {
307    units.sort_by(|a, b| {
308        a.priority
309            .cmp(&b.priority)
310            .then_with(|| {
311                let wa = weights.get(&a.id).copied().unwrap_or(1);
312                let wb = weights.get(&b.id).copied().unwrap_or(1);
313                wb.cmp(&wa)
314            })
315            .then_with(|| natural_cmp(&a.id, &b.id))
316    });
317}
318
319fn build_retry_context(unit: &Unit) -> RunRetryContext {
320    RunRetryContext {
321        attempt_number: unit.attempts,
322        previous_failure: unit
323            .attempt_log
324            .iter()
325            .rev()
326            .find_map(|attempt| match attempt.outcome {
327                AttemptOutcome::Failed | AttemptOutcome::Abandoned => attempt.notes.clone(),
328                AttemptOutcome::Success => None,
329            }),
330        previous_notes: unit
331            .attempt_log
332            .iter()
333            .filter_map(|attempt| attempt.notes.clone())
334            .collect(),
335    }
336}
337
338/// Build a `ReadyUnit` from an index entry and the loaded unit file.
339fn build_ready_unit(entry: &IndexEntry, unit: &Unit, weight: u32) -> ReadyUnit {
340    ReadyUnit {
341        id: entry.id.clone(),
342        title: entry.title.clone(),
343        priority: entry.priority,
344        critical_path_weight: weight,
345        paths: entry.paths.clone(),
346        produces: entry.produces.clone(),
347        requires: entry.requires.clone(),
348        dependencies: entry.dependencies.clone(),
349        parent: entry.parent.clone(),
350        verify_fast: unit.verify_fast.clone(),
351        verify_command: unit.verify.clone(),
352        retry: build_retry_context(unit),
353        model: unit.model.clone(),
354    }
355}
356
357/// Build a canonical blocked unit for unresolved durable decisions.
358pub fn blocked_unit_for_unresolved_decisions(
359    entry: &IndexEntry,
360    unit: &Unit,
361) -> Option<BlockedUnit> {
362    if unit.decisions.is_empty() {
363        return None;
364    }
365
366    Some(BlockedUnit {
367        id: entry.id.clone(),
368        title: entry.title.clone(),
369        reason: "unresolved_decision".to_string(),
370        blocker: Some(AutonomyBlockerCode::UnresolvedDecision),
371        decisions: unit.decisions.clone(),
372    })
373}
374
375// ---------------------------------------------------------------------------
376// Public API
377// ---------------------------------------------------------------------------
378
379/// Compute which units are ready to dispatch.
380///
381/// Returns a [`ReadyQueue`] with units sorted by priority then critical-path
382/// weight (highest-weight first within same priority). Optionally filters to
383/// a specific unit ID or its ready children if `filter_id` is a parent.
384///
385/// Set `simulate = true` to include all open units with verify commands —
386/// even those whose deps are not yet met. This is the dry-run mode.
387pub fn compute_ready_queue(
388    mana_dir: &Path,
389    target: &RunTarget,
390    simulate: bool,
391) -> Result<ReadyQueue> {
392    let index = Index::load_or_rebuild(mana_dir)?;
393    let archive = ArchiveIndex::load_or_rebuild(mana_dir)
394        .unwrap_or_else(|_| ArchiveIndex { units: Vec::new() });
395
396    let candidates: Vec<&IndexEntry> = index
397        .units
398        .iter()
399        .filter(|e| {
400            e.kind == crate::unit::UnitType::Task
401                && e.has_verify
402                && e.status == Status::Open
403                && (simulate || all_deps_closed(e, &index, &archive))
404                && !has_open_descendants(&index, &e.id)
405                && matches_target(&index, e, target)
406        })
407        .collect();
408
409    let mut blocked: Vec<BlockedUnit> = Vec::new();
410    let mut warnings: Vec<RunScopeWarning> = Vec::new();
411
412    // Collect dispatchable entries
413    let mut entries_and_units: Vec<(&IndexEntry, Unit)> = Vec::new();
414    for entry in &candidates {
415        let unit_path = find_unit_file(mana_dir, &entry.id)?;
416        let unit = Unit::from_file(&unit_path)?;
417
418        if !simulate {
419            if let Some(unresolved_blocked) = blocked_unit_for_unresolved_decisions(entry, &unit) {
420                blocked.push(unresolved_blocked);
421                continue;
422            }
423
424            if let Some(reason) = check_blocked_with_archive(entry, &index, Some(&archive)) {
425                blocked.push(BlockedUnit {
426                    id: entry.id.clone(),
427                    title: entry.title.clone(),
428                    reason: reason.to_string(),
429                    blocker: None,
430                    decisions: Vec::new(),
431                });
432                continue;
433            }
434        }
435        if let Some(warning) = check_scope_warning(entry) {
436            warnings.push(RunScopeWarning {
437                id: entry.id.clone(),
438                warning,
439            });
440        }
441        entries_and_units.push((entry, unit));
442    }
443
444    // Build provisional list (weight = 1), then compute real weights and update
445    let mut ready_units: Vec<ReadyUnit> = entries_and_units
446        .iter()
447        .map(|(entry, unit)| build_ready_unit(entry, unit, 1))
448        .collect();
449
450    let weights = compute_downstream_weights(&ready_units);
451    for unit in &mut ready_units {
452        unit.critical_path_weight = weights.get(&unit.id).copied().unwrap_or(1);
453    }
454    sort_units(&mut ready_units, &weights);
455
456    Ok(ReadyQueue {
457        units: ready_units,
458        blocked,
459        warnings,
460    })
461}
462
463/// Compute a full execution plan grouped into dependency-ordered waves.
464///
465/// Wave 0 contains units with no unsatisfied deps. Wave 1 depends on wave 0.
466/// And so on. Within each wave, units are sorted by priority then critical-path weight.
467///
468/// Set `simulate = true` for dry-run mode (includes units whose deps are not yet met).
469pub fn compute_run_plan(mana_dir: &Path, target: &RunTarget, simulate: bool) -> Result<RunPlan> {
470    let queue = compute_ready_queue(mana_dir, target, simulate)?;
471    let total_units = queue.units.len();
472    let blocked = queue.blocked;
473    let warnings = queue.warnings;
474
475    let waves = group_into_waves(queue.units);
476
477    Ok(RunPlan {
478        waves,
479        total_units,
480        blocked,
481        warnings,
482    })
483}
484
485/// Group a flat list of ready units into dependency-ordered waves.
486///
487/// Wave 0 has no deps on other units in the set.
488/// Wave N depends only on units in waves 0..N-1.
489///
490/// The full `all_units` slice is passed to `is_unit_ready` so that
491/// sibling produces/requires resolution works correctly across waves.
492fn group_into_waves(units: Vec<ReadyUnit>) -> Vec<RunWave> {
493    let mut waves: Vec<RunWave> = Vec::new();
494    let all_units = units.clone();
495    let unit_ids: HashSet<String> = units.iter().map(|u| u.id.clone()).collect();
496
497    let mut completed: HashSet<String> = HashSet::new();
498    let mut remaining: Vec<ReadyUnit> = units;
499
500    while !remaining.is_empty() {
501        let (ready, blocked): (Vec<ReadyUnit>, Vec<ReadyUnit>) = remaining
502            .into_iter()
503            .partition(|u| is_unit_ready(u, &completed, &unit_ids, &all_units));
504
505        if ready.is_empty() {
506            // Cycle or unresolvable deps — add remaining as a final wave
507            let mut leftover = blocked;
508            let weights = compute_downstream_weights(&leftover);
509            sort_units(&mut leftover, &weights);
510            waves.push(RunWave { units: leftover });
511            break;
512        }
513
514        for u in &ready {
515            completed.insert(u.id.clone());
516        }
517
518        // Sort wave by global weights (not just within the wave) for consistent ordering
519        let weights = compute_downstream_weights(&all_units);
520        let mut wave_units = ready;
521        sort_units(&mut wave_units, &weights);
522        waves.push(RunWave { units: wave_units });
523        remaining = blocked;
524    }
525
526    waves
527}
528
529#[cfg(test)]
530mod tests {
531    use super::*;
532    use crate::unit::UnitType;
533    use std::collections::HashSet;
534
535    fn make_unit(id: &str, deps: Vec<&str>, produces: Vec<&str>, requires: Vec<&str>) -> ReadyUnit {
536        ReadyUnit {
537            id: id.to_string(),
538            title: format!("Unit {}", id),
539            priority: 2,
540            critical_path_weight: 1,
541            paths: vec![],
542            produces: produces.into_iter().map(|s| s.to_string()).collect(),
543            requires: requires.into_iter().map(|s| s.to_string()).collect(),
544            dependencies: deps.into_iter().map(|s| s.to_string()).collect(),
545            parent: Some("parent".to_string()),
546            verify_fast: None,
547            verify_command: None,
548            retry: RunRetryContext {
549                attempt_number: 0,
550                previous_failure: None,
551                previous_notes: Vec::new(),
552            },
553            model: None,
554        }
555    }
556
557    // -- all_deps_closed tests --
558
559    fn make_index_entry(
560        id: &str,
561        status: Status,
562        deps: Vec<&str>,
563        parent: Option<&str>,
564        produces: Vec<&str>,
565        requires: Vec<&str>,
566    ) -> IndexEntry {
567        IndexEntry {
568            handle: None,
569            id: id.to_string(),
570            title: format!("Unit {}", id),
571            status,
572            priority: 2,
573            parent: parent.map(|s| s.to_string()),
574            dependencies: deps.into_iter().map(|s| s.to_string()).collect(),
575            labels: vec![],
576            assignee: None,
577            updated_at: chrono::Utc::now(),
578            produces: produces.into_iter().map(|s| s.to_string()).collect(),
579            requires: requires.into_iter().map(|s| s.to_string()).collect(),
580            has_verify: true,
581            verify: None,
582            created_at: chrono::Utc::now(),
583            claimed_by: None,
584            attempts: 0,
585            paths: vec![],
586            kind: crate::unit::UnitType::Task,
587            feature: false,
588            has_decisions: false,
589        }
590    }
591
592    #[test]
593    fn all_deps_closed_archived_dep_satisfied() {
594        let entry_a = make_index_entry("A", Status::Open, vec!["B"], None, vec![], vec![]);
595        let index = Index {
596            units: vec![entry_a.clone()],
597        };
598        let archived_b = make_index_entry("B", Status::Closed, vec![], None, vec![], vec![]);
599        let archive = ArchiveIndex {
600            units: vec![archived_b],
601        };
602        assert!(all_deps_closed(&entry_a, &index, &archive));
603    }
604
605    #[test]
606    fn all_deps_closed_missing_dep_unsatisfied() {
607        let entry_a = make_index_entry("A", Status::Open, vec!["B"], None, vec![], vec![]);
608        let index = Index {
609            units: vec![entry_a.clone()],
610        };
611        let archive = ArchiveIndex { units: vec![] };
612        assert!(!all_deps_closed(&entry_a, &index, &archive));
613    }
614
615    #[test]
616    fn all_deps_closed_active_closed_dep_satisfied() {
617        let entry_a = make_index_entry("A", Status::Open, vec!["B"], None, vec![], vec![]);
618        let entry_b = make_index_entry("B", Status::Closed, vec![], None, vec![], vec![]);
619        let index = Index {
620            units: vec![entry_a.clone(), entry_b],
621        };
622        let archive = ArchiveIndex { units: vec![] };
623        assert!(all_deps_closed(&entry_a, &index, &archive));
624    }
625
626    #[test]
627    fn all_deps_closed_active_open_dep_unsatisfied() {
628        let entry_a = make_index_entry("A", Status::Open, vec!["B"], None, vec![], vec![]);
629        let entry_b = make_index_entry("B", Status::Open, vec![], None, vec![], vec![]);
630        let index = Index {
631            units: vec![entry_a.clone(), entry_b],
632        };
633        let archive = ArchiveIndex { units: vec![] };
634        assert!(!all_deps_closed(&entry_a, &index, &archive));
635    }
636
637    // -- compute_downstream_weights tests --
638
639    #[test]
640    fn unresolved_decisions_become_canonical_blocked_reason() {
641        let dir = tempfile::tempdir().unwrap();
642        let mana_dir = dir.path().join(".mana");
643        std::fs::create_dir(&mana_dir).unwrap();
644
645        crate::config::Config {
646            project: "test".to_string(),
647            next_id: 1,
648            auto_close_parent: true,
649            run: None,
650            plan: None,
651            max_loops: 10,
652            max_concurrent: 4,
653            poll_interval: 30,
654            extends: vec![],
655            rules_file: None,
656            file_locking: false,
657            worktree: false,
658            on_close: None,
659            on_fail: None,
660            verify_timeout: None,
661            review: None,
662            user: None,
663            user_email: None,
664            auto_commit: false,
665            commit_template: None,
666            research: None,
667            run_model: None,
668            plan_model: None,
669            review_model: None,
670            research_model: None,
671            batch_verify: false,
672            memory_reserve_mb: 0,
673            notify: None,
674        }
675        .save(&mana_dir)
676        .unwrap();
677
678        let mut unit = Unit::new("2", "Dispatchable task with unresolved decisions");
679        unit.kind = UnitType::Task;
680        unit.verify = Some("cargo test unresolved_decision_blocker".to_string());
681        unit.decisions = vec![
682            "JWT or sessions?".to_string(),
683            "Which provider should be the default?".to_string(),
684        ];
685        unit.to_file(mana_dir.join("2-dispatchable-task-with-unresolved-decisions.md"))
686            .unwrap();
687
688        let queue = compute_ready_queue(&mana_dir, &RunTarget::AllReady, false).unwrap();
689        assert!(queue.units.is_empty());
690        assert_eq!(queue.blocked.len(), 1);
691        assert_eq!(queue.blocked[0].id, "2");
692        assert_eq!(queue.blocked[0].reason, "unresolved_decision");
693        assert_eq!(
694            queue.blocked[0].blocker,
695            Some(AutonomyBlockerCode::UnresolvedDecision)
696        );
697        assert_eq!(
698            queue.blocked[0].decisions,
699            vec![
700                "JWT or sessions?".to_string(),
701                "Which provider should be the default?".to_string(),
702            ]
703        );
704
705        let simulated = compute_ready_queue(&mana_dir, &RunTarget::AllReady, true).unwrap();
706        assert_eq!(simulated.units.len(), 1);
707        assert!(simulated.blocked.is_empty());
708    }
709
710    #[test]
711    fn run_only_dispatches_jobs() {
712        let dir = tempfile::tempdir().unwrap();
713        let mana_dir = dir.path().join(".mana");
714        std::fs::create_dir(&mana_dir).unwrap();
715
716        crate::config::Config {
717            project: "test".to_string(),
718            next_id: 1,
719            auto_close_parent: true,
720            run: None,
721            plan: None,
722            max_loops: 10,
723            max_concurrent: 4,
724            poll_interval: 30,
725            extends: vec![],
726            rules_file: None,
727            file_locking: false,
728            worktree: false,
729            on_close: None,
730            on_fail: None,
731            verify_timeout: None,
732            review: None,
733            user: None,
734            user_email: None,
735            auto_commit: false,
736            commit_template: None,
737            research: None,
738            run_model: None,
739            plan_model: None,
740            review_model: None,
741            research_model: None,
742            batch_verify: false,
743            memory_reserve_mb: 0,
744            notify: None,
745        }
746        .save(&mana_dir)
747        .unwrap();
748
749        let mut epic = Unit::new("1", "Epic parent");
750        epic.kind = UnitType::Epic;
751        epic.verify = Some("cargo test should_not_dispatch_epic".to_string());
752        epic.to_file(mana_dir.join("1-epic-parent.md")).unwrap();
753
754        let mut task = Unit::new("2", "Dispatchable task");
755        task.kind = UnitType::Task;
756        task.verify = Some("cargo test dispatchable_task".to_string());
757        task.to_file(mana_dir.join("2-dispatchable-task.md"))
758            .unwrap();
759
760        let queue = compute_ready_queue(&mana_dir, &RunTarget::AllReady, false).unwrap();
761        assert_eq!(queue.units.len(), 1);
762        assert_eq!(queue.units[0].id, "2");
763    }
764
765    #[test]
766    fn weights_single_unit() {
767        let units = vec![make_unit("A", vec![], vec![], vec![])];
768        let weights = compute_downstream_weights(&units);
769        assert_eq!(weights.get("A").copied(), Some(1));
770    }
771
772    #[test]
773    fn weights_linear_chain() {
774        let units = vec![
775            make_unit("A", vec![], vec![], vec![]),
776            make_unit("B", vec!["A"], vec![], vec![]),
777            make_unit("C", vec!["B"], vec![], vec![]),
778        ];
779        let weights = compute_downstream_weights(&units);
780        assert_eq!(weights.get("A").copied(), Some(3));
781        assert_eq!(weights.get("B").copied(), Some(2));
782        assert_eq!(weights.get("C").copied(), Some(1));
783    }
784
785    #[test]
786    fn weights_diamond() {
787        let units = vec![
788            make_unit("A", vec![], vec![], vec![]),
789            make_unit("B", vec!["A"], vec![], vec![]),
790            make_unit("C", vec!["A"], vec![], vec![]),
791            make_unit("D", vec!["B", "C"], vec![], vec![]),
792        ];
793        let weights = compute_downstream_weights(&units);
794        assert_eq!(weights.get("D").copied(), Some(1));
795        assert_eq!(weights.get("B").copied(), Some(2));
796        assert_eq!(weights.get("C").copied(), Some(2));
797        assert_eq!(weights.get("A").copied(), Some(4));
798    }
799
800    // -- is_unit_ready tests --
801
802    #[test]
803    fn unit_ready_no_deps() {
804        let unit = make_unit("1", vec![], vec![], vec![]);
805        let all = vec![unit.clone()];
806        let ids: HashSet<String> = all.iter().map(|u| u.id.clone()).collect();
807        assert!(is_unit_ready(&unit, &HashSet::new(), &ids, &all));
808    }
809
810    #[test]
811    fn unit_not_ready_dep_not_completed() {
812        let unit = make_unit("2", vec!["1"], vec![], vec![]);
813        let dep = make_unit("1", vec![], vec![], vec![]);
814        let all = vec![dep, unit.clone()];
815        let ids: HashSet<String> = all.iter().map(|u| u.id.clone()).collect();
816        assert!(!is_unit_ready(&unit, &HashSet::new(), &ids, &all));
817    }
818
819    #[test]
820    fn unit_ready_dep_completed() {
821        let unit = make_unit("2", vec!["1"], vec![], vec![]);
822        let dep = make_unit("1", vec![], vec![], vec![]);
823        let all = vec![dep, unit.clone()];
824        let ids: HashSet<String> = all.iter().map(|u| u.id.clone()).collect();
825        let mut completed = HashSet::new();
826        completed.insert("1".to_string());
827        assert!(is_unit_ready(&unit, &completed, &ids, &all));
828    }
829
830    #[test]
831    fn unit_ready_dep_outside_dispatch_set() {
832        let unit = make_unit("2", vec!["external"], vec![], vec![]);
833        let all = vec![unit.clone()];
834        let ids: HashSet<String> = all.iter().map(|u| u.id.clone()).collect();
835        // "external" is not in ids → treated as satisfied
836        assert!(is_unit_ready(&unit, &HashSet::new(), &ids, &all));
837    }
838
839    // -- sort_units tests --
840
841    #[test]
842    fn sort_units_by_priority_then_weight() {
843        let mut units = vec![
844            {
845                let mut u = make_unit("B", vec![], vec![], vec![]);
846                u.priority = 2;
847                u.critical_path_weight = 3;
848                u
849            },
850            {
851                let mut u = make_unit("A", vec![], vec![], vec![]);
852                u.priority = 1;
853                u.critical_path_weight = 1;
854                u
855            },
856        ];
857        let weights: HashMap<String, u32> = [("A".to_string(), 1), ("B".to_string(), 3)]
858            .into_iter()
859            .collect();
860        sort_units(&mut units, &weights);
861        // Priority 1 before priority 2
862        assert_eq!(units[0].id, "A");
863        assert_eq!(units[1].id, "B");
864    }
865
866    #[test]
867    fn sort_units_same_priority_higher_weight_first() {
868        let mut units = vec![
869            {
870                let mut u = make_unit("A", vec![], vec![], vec![]);
871                u.priority = 2;
872                u.critical_path_weight = 1;
873                u
874            },
875            {
876                let mut u = make_unit("B", vec![], vec![], vec![]);
877                u.priority = 2;
878                u.critical_path_weight = 5;
879                u
880            },
881        ];
882        let weights: HashMap<String, u32> = [("A".to_string(), 1), ("B".to_string(), 5)]
883            .into_iter()
884            .collect();
885        sort_units(&mut units, &weights);
886        // Higher weight first (B before A)
887        assert_eq!(units[0].id, "B");
888        assert_eq!(units[1].id, "A");
889    }
890}