Skip to main content

mana_core/ops/
run.rs

1//! Core orchestration operations for `mana run`.
2//!
3//! This module exposes the dependency-aware scheduling logic that imp_orch
4//! and other library consumers need to compute ready queues and execution plans
5//! without depending on the CLI layer.
6//!
7//! # Key types
8//!
9//! - [`ReadyQueue`] — result of computing which units are ready to dispatch
10//! - [`ReadyUnit`] — a single dispatchable unit with scheduling metadata
11//! - [`RunPlan`] — a full execution plan grouped into waves
12//! - [`RunWave`] — a group of units that can run concurrently
13//!
14//! # Key functions
15//!
16//! - [`compute_ready_queue`] — find all dispatchable units with priority/weight ordering
17//! - [`compute_run_plan`] — group ready units into dependency-ordered waves
18
19use std::collections::{HashMap, HashSet};
20use std::path::Path;
21
22use anyhow::Result;
23
24use crate::blocking::check_blocked_with_archive;
25use crate::discovery::find_unit_file;
26use crate::index::{ArchiveIndex, Index, IndexEntry};
27use crate::unit::{Status, Unit};
28use crate::util::natural_cmp;
29
30// ---------------------------------------------------------------------------
31// Public types
32// ---------------------------------------------------------------------------
33
34/// A unit that is ready to be dispatched.
35#[derive(Debug, Clone, PartialEq)]
36pub struct ReadyUnit {
37    pub id: String,
38    pub title: String,
39    /// Lower is higher priority (1 = P1, etc.).
40    pub priority: u8,
41    /// Downstream dependency weight for critical-path scheduling.
42    /// Higher weight = more downstream units blocked = schedule first.
43    pub critical_path_weight: u32,
44    /// Files this unit will modify (for conflict detection).
45    pub paths: Vec<String>,
46    /// Artifacts this unit produces.
47    pub produces: Vec<String>,
48    /// Artifacts this unit requires from siblings.
49    pub requires: Vec<String>,
50    /// Explicit dependency IDs.
51    pub dependencies: Vec<String>,
52    /// Parent unit ID (for sibling produces/requires resolution).
53    pub parent: Option<String>,
54    /// Per-unit model override from frontmatter.
55    pub model: Option<String>,
56}
57
58/// A unit that was excluded from dispatch.
59#[derive(Debug, Clone, PartialEq)]
60pub struct BlockedUnit {
61    pub id: String,
62    pub title: String,
63    pub reason: String,
64}
65
66/// The result of computing the ready queue.
67#[derive(Debug, Clone)]
68pub struct ReadyQueue {
69    /// Units ready to dispatch, sorted by priority then critical-path weight.
70    pub units: Vec<ReadyUnit>,
71    /// Units that are blocked (deps not met, claimed, etc.).
72    pub blocked: Vec<BlockedUnit>,
73}
74
75/// A wave of units that can run concurrently (no inter-wave dependencies).
76#[derive(Debug, Clone)]
77pub struct RunWave {
78    /// Units in this wave, sorted by priority then critical-path weight.
79    pub units: Vec<ReadyUnit>,
80}
81
82/// A full execution plan grouped into dependency-ordered waves.
83#[derive(Debug, Clone)]
84pub struct RunPlan {
85    /// Ordered waves (wave 0 has no deps, wave 1 depends on wave 0, etc.).
86    pub waves: Vec<RunWave>,
87    /// Total number of dispatchable units across all waves.
88    pub total_units: usize,
89    /// Units that cannot be dispatched.
90    pub blocked: Vec<BlockedUnit>,
91}
92
93// ---------------------------------------------------------------------------
94// Core computation
95// ---------------------------------------------------------------------------
96
97/// Check if all dependencies of a unit are satisfied.
98///
99/// A dependency is satisfied if it is closed in the active index, or present
100/// in the archive (archived units are always considered closed). Dependencies
101/// not found in either index are treated as unsatisfied to catch typos.
102pub fn all_deps_closed(entry: &IndexEntry, index: &Index, archive: &ArchiveIndex) -> bool {
103    for dep_id in &entry.dependencies {
104        match index.units.iter().find(|e| e.id == *dep_id) {
105            Some(dep) if dep.status == Status::Closed => {}
106            Some(_) => return false,
107            None => {
108                if !archive.units.iter().any(|e| e.id == *dep_id) {
109                    return false;
110                }
111            }
112        }
113    }
114
115    for required in &entry.requires {
116        if let Some(producer) = index
117            .units
118            .iter()
119            .find(|e| e.id != entry.id && e.parent == entry.parent && e.produces.contains(required))
120        {
121            if producer.status != Status::Closed {
122                return false;
123            }
124        }
125        // If producer is in archive (archived = closed) or not found, treat as satisfied
126    }
127
128    true
129}
130
131/// Compute downstream dependency weights for critical-path scheduling.
132///
133/// Each unit's weight is `1 + count of all transitively dependent units`.
134/// Units on the critical path (most blocked work downstream) get the highest weight.
135pub fn compute_downstream_weights(units: &[ReadyUnit]) -> HashMap<String, u32> {
136    let unit_ids: HashSet<String> = units.iter().map(|u| u.id.clone()).collect();
137
138    // Build reverse dependency graph: dep → Vec<dependents>
139    let mut reverse_deps: HashMap<String, Vec<String>> = HashMap::new();
140
141    for u in units {
142        reverse_deps.entry(u.id.clone()).or_default();
143
144        for dep in &u.dependencies {
145            if unit_ids.contains(dep) {
146                reverse_deps
147                    .entry(dep.clone())
148                    .or_default()
149                    .push(u.id.clone());
150            }
151        }
152
153        for req in &u.requires {
154            if let Some(producer) = units.iter().find(|other| {
155                other.id != u.id && other.parent == u.parent && other.produces.contains(req)
156            }) {
157                if unit_ids.contains(&producer.id) {
158                    reverse_deps
159                        .entry(producer.id.clone())
160                        .or_default()
161                        .push(u.id.clone());
162                }
163            }
164        }
165    }
166
167    let mut weights: HashMap<String, u32> = HashMap::new();
168
169    for u in units {
170        let mut visited: HashSet<String> = HashSet::new();
171        let mut queue: Vec<String> = Vec::new();
172
173        for dep in reverse_deps.get(&u.id).unwrap_or(&Vec::new()) {
174            if visited.insert(dep.clone()) {
175                queue.push(dep.clone());
176            }
177        }
178
179        while let Some(current) = queue.pop() {
180            for next in reverse_deps.get(&current).unwrap_or(&Vec::new()) {
181                if visited.insert(next.clone()) {
182                    queue.push(next.clone());
183                }
184            }
185        }
186
187        weights.insert(u.id.clone(), 1 + visited.len() as u32);
188    }
189
190    weights
191}
192
193/// Check if a unit's dependencies are all satisfied within a dispatch set.
194fn is_unit_ready(
195    unit: &ReadyUnit,
196    completed: &HashSet<String>,
197    all_unit_ids: &HashSet<String>,
198    all_units: &[ReadyUnit],
199) -> bool {
200    let explicit_ok = unit
201        .dependencies
202        .iter()
203        .all(|d| completed.contains(d) || !all_unit_ids.contains(d));
204
205    let requires_ok = unit.requires.iter().all(|req| {
206        if let Some(producer) = all_units.iter().find(|other| {
207            other.id != unit.id && other.parent == unit.parent && other.produces.contains(req)
208        }) {
209            completed.contains(&producer.id)
210        } else {
211            true
212        }
213    });
214
215    explicit_ok && requires_ok
216}
217
218/// Sort a list of units by priority (ascending) then critical-path weight (descending) then ID.
219fn sort_units(units: &mut [ReadyUnit], weights: &HashMap<String, u32>) {
220    units.sort_by(|a, b| {
221        a.priority
222            .cmp(&b.priority)
223            .then_with(|| {
224                let wa = weights.get(&a.id).copied().unwrap_or(1);
225                let wb = weights.get(&b.id).copied().unwrap_or(1);
226                wb.cmp(&wa)
227            })
228            .then_with(|| natural_cmp(&a.id, &b.id))
229    });
230}
231
232/// Build a `ReadyUnit` from an index entry and the loaded unit file.
233fn build_ready_unit(entry: &IndexEntry, unit: &Unit, weight: u32) -> ReadyUnit {
234    ReadyUnit {
235        id: entry.id.clone(),
236        title: entry.title.clone(),
237        priority: entry.priority,
238        critical_path_weight: weight,
239        paths: entry.paths.clone(),
240        produces: entry.produces.clone(),
241        requires: entry.requires.clone(),
242        dependencies: entry.dependencies.clone(),
243        parent: entry.parent.clone(),
244        model: unit.model.clone(),
245    }
246}
247
248// ---------------------------------------------------------------------------
249// Public API
250// ---------------------------------------------------------------------------
251
252/// Compute which units are ready to dispatch.
253///
254/// Returns a [`ReadyQueue`] with units sorted by priority then critical-path
255/// weight (highest-weight first within same priority). Optionally filters to
256/// a specific unit ID or its ready children if `filter_id` is a parent.
257///
258/// Set `simulate = true` to include all open units with verify commands —
259/// even those whose deps are not yet met. This is the dry-run mode.
260pub fn compute_ready_queue(
261    mana_dir: &Path,
262    filter_id: Option<&str>,
263    simulate: bool,
264) -> Result<ReadyQueue> {
265    let index = Index::load_or_rebuild(mana_dir)?;
266    let archive = ArchiveIndex::load_or_rebuild(mana_dir)
267        .unwrap_or_else(|_| ArchiveIndex { units: Vec::new() });
268
269    let mut candidates: Vec<&IndexEntry> = index
270        .units
271        .iter()
272        .filter(|e| {
273            e.has_verify
274                && e.status == Status::Open
275                && (simulate || all_deps_closed(e, &index, &archive))
276        })
277        .collect();
278
279    if let Some(filter_id) = filter_id {
280        let is_parent = index
281            .units
282            .iter()
283            .any(|e| e.parent.as_deref() == Some(filter_id));
284        if is_parent {
285            candidates.retain(|e| e.parent.as_deref() == Some(filter_id));
286        } else {
287            candidates.retain(|e| e.id == filter_id);
288        }
289    }
290
291    let mut blocked: Vec<BlockedUnit> = Vec::new();
292
293    // Collect dispatchable entries
294    let mut entries_and_units: Vec<(&IndexEntry, Unit)> = Vec::new();
295    for entry in &candidates {
296        if !simulate {
297            if let Some(reason) = check_blocked_with_archive(entry, &index, Some(&archive)) {
298                blocked.push(BlockedUnit {
299                    id: entry.id.clone(),
300                    title: entry.title.clone(),
301                    reason: reason.to_string(),
302                });
303                continue;
304            }
305        }
306        let unit_path = find_unit_file(mana_dir, &entry.id)?;
307        let unit = Unit::from_file(&unit_path)?;
308        entries_and_units.push((entry, unit));
309    }
310
311    // Build provisional list (weight = 1), then compute real weights and update
312    let mut ready_units: Vec<ReadyUnit> = entries_and_units
313        .iter()
314        .map(|(entry, unit)| build_ready_unit(entry, unit, 1))
315        .collect();
316
317    let weights = compute_downstream_weights(&ready_units);
318    for unit in &mut ready_units {
319        unit.critical_path_weight = weights.get(&unit.id).copied().unwrap_or(1);
320    }
321    sort_units(&mut ready_units, &weights);
322
323    Ok(ReadyQueue {
324        units: ready_units,
325        blocked,
326    })
327}
328
329/// Compute a full execution plan grouped into dependency-ordered waves.
330///
331/// Wave 0 contains units with no unsatisfied deps. Wave 1 depends on wave 0.
332/// And so on. Within each wave, units are sorted by priority then critical-path weight.
333///
334/// Set `simulate = true` for dry-run mode (includes units whose deps are not yet met).
335pub fn compute_run_plan(
336    mana_dir: &Path,
337    filter_id: Option<&str>,
338    simulate: bool,
339) -> Result<RunPlan> {
340    let queue = compute_ready_queue(mana_dir, filter_id, simulate)?;
341    let total_units = queue.units.len();
342    let blocked = queue.blocked;
343
344    let waves = group_into_waves(queue.units);
345
346    Ok(RunPlan {
347        waves,
348        total_units,
349        blocked,
350    })
351}
352
353/// Group a flat list of ready units into dependency-ordered waves.
354///
355/// Wave 0 has no deps on other units in the set.
356/// Wave N depends only on units in waves 0..N-1.
357///
358/// The full `all_units` slice is passed to `is_unit_ready` so that
359/// sibling produces/requires resolution works correctly across waves.
360fn group_into_waves(units: Vec<ReadyUnit>) -> Vec<RunWave> {
361    let mut waves: Vec<RunWave> = Vec::new();
362    let all_units = units.clone();
363    let unit_ids: HashSet<String> = units.iter().map(|u| u.id.clone()).collect();
364
365    let mut completed: HashSet<String> = HashSet::new();
366    let mut remaining: Vec<ReadyUnit> = units;
367
368    while !remaining.is_empty() {
369        let (ready, blocked): (Vec<ReadyUnit>, Vec<ReadyUnit>) = remaining
370            .into_iter()
371            .partition(|u| is_unit_ready(u, &completed, &unit_ids, &all_units));
372
373        if ready.is_empty() {
374            // Cycle or unresolvable deps — add remaining as a final wave
375            let mut leftover = blocked;
376            let weights = compute_downstream_weights(&leftover);
377            sort_units(&mut leftover, &weights);
378            waves.push(RunWave { units: leftover });
379            break;
380        }
381
382        for u in &ready {
383            completed.insert(u.id.clone());
384        }
385
386        // Sort wave by global weights (not just within the wave) for consistent ordering
387        let weights = compute_downstream_weights(&all_units);
388        let mut wave_units = ready;
389        sort_units(&mut wave_units, &weights);
390        waves.push(RunWave { units: wave_units });
391        remaining = blocked;
392    }
393
394    waves
395}
396
397#[cfg(test)]
398mod tests {
399    use super::*;
400    use std::collections::HashSet;
401
402    fn make_unit(id: &str, deps: Vec<&str>, produces: Vec<&str>, requires: Vec<&str>) -> ReadyUnit {
403        ReadyUnit {
404            id: id.to_string(),
405            title: format!("Unit {}", id),
406            priority: 2,
407            critical_path_weight: 1,
408            paths: vec![],
409            produces: produces.into_iter().map(|s| s.to_string()).collect(),
410            requires: requires.into_iter().map(|s| s.to_string()).collect(),
411            dependencies: deps.into_iter().map(|s| s.to_string()).collect(),
412            parent: Some("parent".to_string()),
413            model: None,
414        }
415    }
416
417    // -- all_deps_closed tests --
418
419    fn make_index_entry(
420        id: &str,
421        status: Status,
422        deps: Vec<&str>,
423        parent: Option<&str>,
424        produces: Vec<&str>,
425        requires: Vec<&str>,
426    ) -> IndexEntry {
427        IndexEntry {
428            id: id.to_string(),
429            title: format!("Unit {}", id),
430            status,
431            priority: 2,
432            parent: parent.map(|s| s.to_string()),
433            dependencies: deps.into_iter().map(|s| s.to_string()).collect(),
434            labels: vec![],
435            assignee: None,
436            updated_at: chrono::Utc::now(),
437            produces: produces.into_iter().map(|s| s.to_string()).collect(),
438            requires: requires.into_iter().map(|s| s.to_string()).collect(),
439            has_verify: true,
440            verify: None,
441            created_at: chrono::Utc::now(),
442            claimed_by: None,
443            attempts: 0,
444            paths: vec![],
445            feature: false,
446            has_decisions: false,
447        }
448    }
449
450    #[test]
451    fn all_deps_closed_archived_dep_satisfied() {
452        let entry_a = make_index_entry("A", Status::Open, vec!["B"], None, vec![], vec![]);
453        let index = Index {
454            units: vec![entry_a.clone()],
455        };
456        let archived_b = make_index_entry("B", Status::Closed, vec![], None, vec![], vec![]);
457        let archive = ArchiveIndex {
458            units: vec![archived_b],
459        };
460        assert!(all_deps_closed(&entry_a, &index, &archive));
461    }
462
463    #[test]
464    fn all_deps_closed_missing_dep_unsatisfied() {
465        let entry_a = make_index_entry("A", Status::Open, vec!["B"], None, vec![], vec![]);
466        let index = Index {
467            units: vec![entry_a.clone()],
468        };
469        let archive = ArchiveIndex { units: vec![] };
470        assert!(!all_deps_closed(&entry_a, &index, &archive));
471    }
472
473    #[test]
474    fn all_deps_closed_active_closed_dep_satisfied() {
475        let entry_a = make_index_entry("A", Status::Open, vec!["B"], None, vec![], vec![]);
476        let entry_b = make_index_entry("B", Status::Closed, vec![], None, vec![], vec![]);
477        let index = Index {
478            units: vec![entry_a.clone(), entry_b],
479        };
480        let archive = ArchiveIndex { units: vec![] };
481        assert!(all_deps_closed(&entry_a, &index, &archive));
482    }
483
484    #[test]
485    fn all_deps_closed_active_open_dep_unsatisfied() {
486        let entry_a = make_index_entry("A", Status::Open, vec!["B"], None, vec![], vec![]);
487        let entry_b = make_index_entry("B", Status::Open, vec![], None, vec![], vec![]);
488        let index = Index {
489            units: vec![entry_a.clone(), entry_b],
490        };
491        let archive = ArchiveIndex { units: vec![] };
492        assert!(!all_deps_closed(&entry_a, &index, &archive));
493    }
494
495    // -- compute_downstream_weights tests --
496
497    #[test]
498    fn weights_single_unit() {
499        let units = vec![make_unit("A", vec![], vec![], vec![])];
500        let weights = compute_downstream_weights(&units);
501        assert_eq!(weights.get("A").copied(), Some(1));
502    }
503
504    #[test]
505    fn weights_linear_chain() {
506        let units = vec![
507            make_unit("A", vec![], vec![], vec![]),
508            make_unit("B", vec!["A"], vec![], vec![]),
509            make_unit("C", vec!["B"], vec![], vec![]),
510        ];
511        let weights = compute_downstream_weights(&units);
512        assert_eq!(weights.get("A").copied(), Some(3));
513        assert_eq!(weights.get("B").copied(), Some(2));
514        assert_eq!(weights.get("C").copied(), Some(1));
515    }
516
517    #[test]
518    fn weights_diamond() {
519        let units = vec![
520            make_unit("A", vec![], vec![], vec![]),
521            make_unit("B", vec!["A"], vec![], vec![]),
522            make_unit("C", vec!["A"], vec![], vec![]),
523            make_unit("D", vec!["B", "C"], vec![], vec![]),
524        ];
525        let weights = compute_downstream_weights(&units);
526        assert_eq!(weights.get("D").copied(), Some(1));
527        assert_eq!(weights.get("B").copied(), Some(2));
528        assert_eq!(weights.get("C").copied(), Some(2));
529        assert_eq!(weights.get("A").copied(), Some(4));
530    }
531
532    // -- is_unit_ready tests --
533
534    #[test]
535    fn unit_ready_no_deps() {
536        let unit = make_unit("1", vec![], vec![], vec![]);
537        let all = vec![unit.clone()];
538        let ids: HashSet<String> = all.iter().map(|u| u.id.clone()).collect();
539        assert!(is_unit_ready(&unit, &HashSet::new(), &ids, &all));
540    }
541
542    #[test]
543    fn unit_not_ready_dep_not_completed() {
544        let unit = make_unit("2", vec!["1"], vec![], vec![]);
545        let dep = make_unit("1", vec![], vec![], vec![]);
546        let all = vec![dep, unit.clone()];
547        let ids: HashSet<String> = all.iter().map(|u| u.id.clone()).collect();
548        assert!(!is_unit_ready(&unit, &HashSet::new(), &ids, &all));
549    }
550
551    #[test]
552    fn unit_ready_dep_completed() {
553        let unit = make_unit("2", vec!["1"], vec![], vec![]);
554        let dep = make_unit("1", vec![], vec![], vec![]);
555        let all = vec![dep, unit.clone()];
556        let ids: HashSet<String> = all.iter().map(|u| u.id.clone()).collect();
557        let mut completed = HashSet::new();
558        completed.insert("1".to_string());
559        assert!(is_unit_ready(&unit, &completed, &ids, &all));
560    }
561
562    #[test]
563    fn unit_ready_dep_outside_dispatch_set() {
564        let unit = make_unit("2", vec!["external"], vec![], vec![]);
565        let all = vec![unit.clone()];
566        let ids: HashSet<String> = all.iter().map(|u| u.id.clone()).collect();
567        // "external" is not in ids → treated as satisfied
568        assert!(is_unit_ready(&unit, &HashSet::new(), &ids, &all));
569    }
570
571    // -- sort_units tests --
572
573    #[test]
574    fn sort_units_by_priority_then_weight() {
575        let mut units = vec![
576            {
577                let mut u = make_unit("B", vec![], vec![], vec![]);
578                u.priority = 2;
579                u.critical_path_weight = 3;
580                u
581            },
582            {
583                let mut u = make_unit("A", vec![], vec![], vec![]);
584                u.priority = 1;
585                u.critical_path_weight = 1;
586                u
587            },
588        ];
589        let weights: HashMap<String, u32> = [("A".to_string(), 1), ("B".to_string(), 3)]
590            .into_iter()
591            .collect();
592        sort_units(&mut units, &weights);
593        // Priority 1 before priority 2
594        assert_eq!(units[0].id, "A");
595        assert_eq!(units[1].id, "B");
596    }
597
598    #[test]
599    fn sort_units_same_priority_higher_weight_first() {
600        let mut units = vec![
601            {
602                let mut u = make_unit("A", vec![], vec![], vec![]);
603                u.priority = 2;
604                u.critical_path_weight = 1;
605                u
606            },
607            {
608                let mut u = make_unit("B", vec![], vec![], vec![]);
609                u.priority = 2;
610                u.critical_path_weight = 5;
611                u
612            },
613        ];
614        let weights: HashMap<String, u32> = [("A".to_string(), 1), ("B".to_string(), 5)]
615            .into_iter()
616            .collect();
617        sort_units(&mut units, &weights);
618        // Higher weight first (B before A)
619        assert_eq!(units[0].id, "B");
620        assert_eq!(units[1].id, "A");
621    }
622}