1use std::collections::{HashMap, HashSet};
20use std::path::Path;
21
22use anyhow::Result;
23
24use crate::blocking::check_blocked_with_archive;
25use crate::discovery::find_unit_file;
26use crate::index::{ArchiveIndex, Index, IndexEntry};
27use crate::unit::{Status, Unit};
28use crate::util::natural_cmp;
29
30#[derive(Debug, Clone, PartialEq)]
36pub struct ReadyUnit {
37 pub id: String,
38 pub title: String,
39 pub priority: u8,
41 pub critical_path_weight: u32,
44 pub paths: Vec<String>,
46 pub produces: Vec<String>,
48 pub requires: Vec<String>,
50 pub dependencies: Vec<String>,
52 pub parent: Option<String>,
54 pub model: Option<String>,
56}
57
58#[derive(Debug, Clone, PartialEq)]
60pub struct BlockedUnit {
61 pub id: String,
62 pub title: String,
63 pub reason: String,
64}
65
66#[derive(Debug, Clone)]
68pub struct ReadyQueue {
69 pub units: Vec<ReadyUnit>,
71 pub blocked: Vec<BlockedUnit>,
73}
74
75#[derive(Debug, Clone)]
77pub struct RunWave {
78 pub units: Vec<ReadyUnit>,
80}
81
82#[derive(Debug, Clone)]
84pub struct RunPlan {
85 pub waves: Vec<RunWave>,
87 pub total_units: usize,
89 pub blocked: Vec<BlockedUnit>,
91}
92
93pub fn all_deps_closed(entry: &IndexEntry, index: &Index, archive: &ArchiveIndex) -> bool {
103 for dep_id in &entry.dependencies {
104 match index.units.iter().find(|e| e.id == *dep_id) {
105 Some(dep) if dep.status == Status::Closed => {}
106 Some(_) => return false,
107 None => {
108 if !archive.units.iter().any(|e| e.id == *dep_id) {
109 return false;
110 }
111 }
112 }
113 }
114
115 for required in &entry.requires {
116 if let Some(producer) = index
117 .units
118 .iter()
119 .find(|e| e.id != entry.id && e.parent == entry.parent && e.produces.contains(required))
120 {
121 if producer.status != Status::Closed {
122 return false;
123 }
124 }
125 }
127
128 true
129}
130
131pub fn compute_downstream_weights(units: &[ReadyUnit]) -> HashMap<String, u32> {
136 let unit_ids: HashSet<String> = units.iter().map(|u| u.id.clone()).collect();
137
138 let mut reverse_deps: HashMap<String, Vec<String>> = HashMap::new();
140
141 for u in units {
142 reverse_deps.entry(u.id.clone()).or_default();
143
144 for dep in &u.dependencies {
145 if unit_ids.contains(dep) {
146 reverse_deps
147 .entry(dep.clone())
148 .or_default()
149 .push(u.id.clone());
150 }
151 }
152
153 for req in &u.requires {
154 if let Some(producer) = units.iter().find(|other| {
155 other.id != u.id && other.parent == u.parent && other.produces.contains(req)
156 }) {
157 if unit_ids.contains(&producer.id) {
158 reverse_deps
159 .entry(producer.id.clone())
160 .or_default()
161 .push(u.id.clone());
162 }
163 }
164 }
165 }
166
167 let mut weights: HashMap<String, u32> = HashMap::new();
168
169 for u in units {
170 let mut visited: HashSet<String> = HashSet::new();
171 let mut queue: Vec<String> = Vec::new();
172
173 for dep in reverse_deps.get(&u.id).unwrap_or(&Vec::new()) {
174 if visited.insert(dep.clone()) {
175 queue.push(dep.clone());
176 }
177 }
178
179 while let Some(current) = queue.pop() {
180 for next in reverse_deps.get(¤t).unwrap_or(&Vec::new()) {
181 if visited.insert(next.clone()) {
182 queue.push(next.clone());
183 }
184 }
185 }
186
187 weights.insert(u.id.clone(), 1 + visited.len() as u32);
188 }
189
190 weights
191}
192
193fn is_unit_ready(
195 unit: &ReadyUnit,
196 completed: &HashSet<String>,
197 all_unit_ids: &HashSet<String>,
198 all_units: &[ReadyUnit],
199) -> bool {
200 let explicit_ok = unit
201 .dependencies
202 .iter()
203 .all(|d| completed.contains(d) || !all_unit_ids.contains(d));
204
205 let requires_ok = unit.requires.iter().all(|req| {
206 if let Some(producer) = all_units.iter().find(|other| {
207 other.id != unit.id && other.parent == unit.parent && other.produces.contains(req)
208 }) {
209 completed.contains(&producer.id)
210 } else {
211 true
212 }
213 });
214
215 explicit_ok && requires_ok
216}
217
218fn sort_units(units: &mut [ReadyUnit], weights: &HashMap<String, u32>) {
220 units.sort_by(|a, b| {
221 a.priority
222 .cmp(&b.priority)
223 .then_with(|| {
224 let wa = weights.get(&a.id).copied().unwrap_or(1);
225 let wb = weights.get(&b.id).copied().unwrap_or(1);
226 wb.cmp(&wa)
227 })
228 .then_with(|| natural_cmp(&a.id, &b.id))
229 });
230}
231
232fn build_ready_unit(entry: &IndexEntry, unit: &Unit, weight: u32) -> ReadyUnit {
234 ReadyUnit {
235 id: entry.id.clone(),
236 title: entry.title.clone(),
237 priority: entry.priority,
238 critical_path_weight: weight,
239 paths: entry.paths.clone(),
240 produces: entry.produces.clone(),
241 requires: entry.requires.clone(),
242 dependencies: entry.dependencies.clone(),
243 parent: entry.parent.clone(),
244 model: unit.model.clone(),
245 }
246}
247
248pub fn compute_ready_queue(
261 mana_dir: &Path,
262 filter_id: Option<&str>,
263 simulate: bool,
264) -> Result<ReadyQueue> {
265 let index = Index::load_or_rebuild(mana_dir)?;
266 let archive = ArchiveIndex::load_or_rebuild(mana_dir)
267 .unwrap_or_else(|_| ArchiveIndex { units: Vec::new() });
268
269 let mut candidates: Vec<&IndexEntry> = index
270 .units
271 .iter()
272 .filter(|e| {
273 e.has_verify
274 && e.status == Status::Open
275 && (simulate || all_deps_closed(e, &index, &archive))
276 })
277 .collect();
278
279 if let Some(filter_id) = filter_id {
280 let is_parent = index
281 .units
282 .iter()
283 .any(|e| e.parent.as_deref() == Some(filter_id));
284 if is_parent {
285 candidates.retain(|e| e.parent.as_deref() == Some(filter_id));
286 } else {
287 candidates.retain(|e| e.id == filter_id);
288 }
289 }
290
291 let mut blocked: Vec<BlockedUnit> = Vec::new();
292
293 let mut entries_and_units: Vec<(&IndexEntry, Unit)> = Vec::new();
295 for entry in &candidates {
296 if !simulate {
297 if let Some(reason) = check_blocked_with_archive(entry, &index, Some(&archive)) {
298 blocked.push(BlockedUnit {
299 id: entry.id.clone(),
300 title: entry.title.clone(),
301 reason: reason.to_string(),
302 });
303 continue;
304 }
305 }
306 let unit_path = find_unit_file(mana_dir, &entry.id)?;
307 let unit = Unit::from_file(&unit_path)?;
308 entries_and_units.push((entry, unit));
309 }
310
311 let mut ready_units: Vec<ReadyUnit> = entries_and_units
313 .iter()
314 .map(|(entry, unit)| build_ready_unit(entry, unit, 1))
315 .collect();
316
317 let weights = compute_downstream_weights(&ready_units);
318 for unit in &mut ready_units {
319 unit.critical_path_weight = weights.get(&unit.id).copied().unwrap_or(1);
320 }
321 sort_units(&mut ready_units, &weights);
322
323 Ok(ReadyQueue {
324 units: ready_units,
325 blocked,
326 })
327}
328
329pub fn compute_run_plan(
336 mana_dir: &Path,
337 filter_id: Option<&str>,
338 simulate: bool,
339) -> Result<RunPlan> {
340 let queue = compute_ready_queue(mana_dir, filter_id, simulate)?;
341 let total_units = queue.units.len();
342 let blocked = queue.blocked;
343
344 let waves = group_into_waves(queue.units);
345
346 Ok(RunPlan {
347 waves,
348 total_units,
349 blocked,
350 })
351}
352
353fn group_into_waves(units: Vec<ReadyUnit>) -> Vec<RunWave> {
361 let mut waves: Vec<RunWave> = Vec::new();
362 let all_units = units.clone();
363 let unit_ids: HashSet<String> = units.iter().map(|u| u.id.clone()).collect();
364
365 let mut completed: HashSet<String> = HashSet::new();
366 let mut remaining: Vec<ReadyUnit> = units;
367
368 while !remaining.is_empty() {
369 let (ready, blocked): (Vec<ReadyUnit>, Vec<ReadyUnit>) = remaining
370 .into_iter()
371 .partition(|u| is_unit_ready(u, &completed, &unit_ids, &all_units));
372
373 if ready.is_empty() {
374 let mut leftover = blocked;
376 let weights = compute_downstream_weights(&leftover);
377 sort_units(&mut leftover, &weights);
378 waves.push(RunWave { units: leftover });
379 break;
380 }
381
382 for u in &ready {
383 completed.insert(u.id.clone());
384 }
385
386 let weights = compute_downstream_weights(&all_units);
388 let mut wave_units = ready;
389 sort_units(&mut wave_units, &weights);
390 waves.push(RunWave { units: wave_units });
391 remaining = blocked;
392 }
393
394 waves
395}
396
397#[cfg(test)]
398mod tests {
399 use super::*;
400 use std::collections::HashSet;
401
402 fn make_unit(id: &str, deps: Vec<&str>, produces: Vec<&str>, requires: Vec<&str>) -> ReadyUnit {
403 ReadyUnit {
404 id: id.to_string(),
405 title: format!("Unit {}", id),
406 priority: 2,
407 critical_path_weight: 1,
408 paths: vec![],
409 produces: produces.into_iter().map(|s| s.to_string()).collect(),
410 requires: requires.into_iter().map(|s| s.to_string()).collect(),
411 dependencies: deps.into_iter().map(|s| s.to_string()).collect(),
412 parent: Some("parent".to_string()),
413 model: None,
414 }
415 }
416
417 fn make_index_entry(
420 id: &str,
421 status: Status,
422 deps: Vec<&str>,
423 parent: Option<&str>,
424 produces: Vec<&str>,
425 requires: Vec<&str>,
426 ) -> IndexEntry {
427 IndexEntry {
428 id: id.to_string(),
429 title: format!("Unit {}", id),
430 status,
431 priority: 2,
432 parent: parent.map(|s| s.to_string()),
433 dependencies: deps.into_iter().map(|s| s.to_string()).collect(),
434 labels: vec![],
435 assignee: None,
436 updated_at: chrono::Utc::now(),
437 produces: produces.into_iter().map(|s| s.to_string()).collect(),
438 requires: requires.into_iter().map(|s| s.to_string()).collect(),
439 has_verify: true,
440 verify: None,
441 created_at: chrono::Utc::now(),
442 claimed_by: None,
443 attempts: 0,
444 paths: vec![],
445 feature: false,
446 has_decisions: false,
447 }
448 }
449
450 #[test]
451 fn all_deps_closed_archived_dep_satisfied() {
452 let entry_a = make_index_entry("A", Status::Open, vec!["B"], None, vec![], vec![]);
453 let index = Index {
454 units: vec![entry_a.clone()],
455 };
456 let archived_b = make_index_entry("B", Status::Closed, vec![], None, vec![], vec![]);
457 let archive = ArchiveIndex {
458 units: vec![archived_b],
459 };
460 assert!(all_deps_closed(&entry_a, &index, &archive));
461 }
462
463 #[test]
464 fn all_deps_closed_missing_dep_unsatisfied() {
465 let entry_a = make_index_entry("A", Status::Open, vec!["B"], None, vec![], vec![]);
466 let index = Index {
467 units: vec![entry_a.clone()],
468 };
469 let archive = ArchiveIndex { units: vec![] };
470 assert!(!all_deps_closed(&entry_a, &index, &archive));
471 }
472
473 #[test]
474 fn all_deps_closed_active_closed_dep_satisfied() {
475 let entry_a = make_index_entry("A", Status::Open, vec!["B"], None, vec![], vec![]);
476 let entry_b = make_index_entry("B", Status::Closed, vec![], None, vec![], vec![]);
477 let index = Index {
478 units: vec![entry_a.clone(), entry_b],
479 };
480 let archive = ArchiveIndex { units: vec![] };
481 assert!(all_deps_closed(&entry_a, &index, &archive));
482 }
483
484 #[test]
485 fn all_deps_closed_active_open_dep_unsatisfied() {
486 let entry_a = make_index_entry("A", Status::Open, vec!["B"], None, vec![], vec![]);
487 let entry_b = make_index_entry("B", Status::Open, vec![], None, vec![], vec![]);
488 let index = Index {
489 units: vec![entry_a.clone(), entry_b],
490 };
491 let archive = ArchiveIndex { units: vec![] };
492 assert!(!all_deps_closed(&entry_a, &index, &archive));
493 }
494
495 #[test]
498 fn weights_single_unit() {
499 let units = vec![make_unit("A", vec![], vec![], vec![])];
500 let weights = compute_downstream_weights(&units);
501 assert_eq!(weights.get("A").copied(), Some(1));
502 }
503
504 #[test]
505 fn weights_linear_chain() {
506 let units = vec![
507 make_unit("A", vec![], vec![], vec![]),
508 make_unit("B", vec!["A"], vec![], vec![]),
509 make_unit("C", vec!["B"], vec![], vec![]),
510 ];
511 let weights = compute_downstream_weights(&units);
512 assert_eq!(weights.get("A").copied(), Some(3));
513 assert_eq!(weights.get("B").copied(), Some(2));
514 assert_eq!(weights.get("C").copied(), Some(1));
515 }
516
517 #[test]
518 fn weights_diamond() {
519 let units = vec![
520 make_unit("A", vec![], vec![], vec![]),
521 make_unit("B", vec!["A"], vec![], vec![]),
522 make_unit("C", vec!["A"], vec![], vec![]),
523 make_unit("D", vec!["B", "C"], vec![], vec![]),
524 ];
525 let weights = compute_downstream_weights(&units);
526 assert_eq!(weights.get("D").copied(), Some(1));
527 assert_eq!(weights.get("B").copied(), Some(2));
528 assert_eq!(weights.get("C").copied(), Some(2));
529 assert_eq!(weights.get("A").copied(), Some(4));
530 }
531
532 #[test]
535 fn unit_ready_no_deps() {
536 let unit = make_unit("1", vec![], vec![], vec![]);
537 let all = vec![unit.clone()];
538 let ids: HashSet<String> = all.iter().map(|u| u.id.clone()).collect();
539 assert!(is_unit_ready(&unit, &HashSet::new(), &ids, &all));
540 }
541
542 #[test]
543 fn unit_not_ready_dep_not_completed() {
544 let unit = make_unit("2", vec!["1"], vec![], vec![]);
545 let dep = make_unit("1", vec![], vec![], vec![]);
546 let all = vec![dep, unit.clone()];
547 let ids: HashSet<String> = all.iter().map(|u| u.id.clone()).collect();
548 assert!(!is_unit_ready(&unit, &HashSet::new(), &ids, &all));
549 }
550
551 #[test]
552 fn unit_ready_dep_completed() {
553 let unit = make_unit("2", vec!["1"], vec![], vec![]);
554 let dep = make_unit("1", vec![], vec![], vec![]);
555 let all = vec![dep, unit.clone()];
556 let ids: HashSet<String> = all.iter().map(|u| u.id.clone()).collect();
557 let mut completed = HashSet::new();
558 completed.insert("1".to_string());
559 assert!(is_unit_ready(&unit, &completed, &ids, &all));
560 }
561
562 #[test]
563 fn unit_ready_dep_outside_dispatch_set() {
564 let unit = make_unit("2", vec!["external"], vec![], vec![]);
565 let all = vec![unit.clone()];
566 let ids: HashSet<String> = all.iter().map(|u| u.id.clone()).collect();
567 assert!(is_unit_ready(&unit, &HashSet::new(), &ids, &all));
569 }
570
571 #[test]
574 fn sort_units_by_priority_then_weight() {
575 let mut units = vec![
576 {
577 let mut u = make_unit("B", vec![], vec![], vec![]);
578 u.priority = 2;
579 u.critical_path_weight = 3;
580 u
581 },
582 {
583 let mut u = make_unit("A", vec![], vec![], vec![]);
584 u.priority = 1;
585 u.critical_path_weight = 1;
586 u
587 },
588 ];
589 let weights: HashMap<String, u32> = [("A".to_string(), 1), ("B".to_string(), 3)]
590 .into_iter()
591 .collect();
592 sort_units(&mut units, &weights);
593 assert_eq!(units[0].id, "A");
595 assert_eq!(units[1].id, "B");
596 }
597
598 #[test]
599 fn sort_units_same_priority_higher_weight_first() {
600 let mut units = vec![
601 {
602 let mut u = make_unit("A", vec![], vec![], vec![]);
603 u.priority = 2;
604 u.critical_path_weight = 1;
605 u
606 },
607 {
608 let mut u = make_unit("B", vec![], vec![], vec![]);
609 u.priority = 2;
610 u.critical_path_weight = 5;
611 u
612 },
613 ];
614 let weights: HashMap<String, u32> = [("A".to_string(), 1), ("B".to_string(), 5)]
615 .into_iter()
616 .collect();
617 sort_units(&mut units, &weights);
618 assert_eq!(units[0].id, "B");
620 assert_eq!(units[1].id, "A");
621 }
622}