1pub(super) mod memory;
19mod plan;
20mod ready_queue;
21mod wave;
22
23pub use plan::{DispatchPlan, SizedUnit};
24pub use wave::Wave;
25
26use std::fmt;
27use std::io::IsTerminal;
28use std::path::{Path, PathBuf};
29use std::process::{Command, Stdio};
30use std::sync::atomic::{AtomicBool, Ordering};
31use std::sync::Mutex;
32use std::time::{Duration, Instant};
33
34use anyhow::Result;
35
36use crate::commands::review::{cmd_review, ReviewArgs};
37use crate::config::Config;
38use crate::stream::{self, StreamEvent};
39use crate::unit::Unit;
40
41use plan::{plan_dispatch, print_plan, print_plan_json};
42use ready_queue::run_ready_queue_direct;
43use wave::run_wave;
44
45pub(super) struct RunConfig {
47 pub max_jobs: usize,
48 pub timeout_minutes: u32,
49 pub idle_timeout_minutes: u32,
50 pub json_stream: bool,
51 pub file_locking: bool,
52 pub run_model: Option<String>,
54 pub batch_verify: bool,
57 pub memory_reserve_mb: u64,
59}
60
61pub struct RunArgs {
63 pub id: Option<String>,
64 pub jobs: u32,
65 pub dry_run: bool,
66 pub loop_mode: bool,
67 pub auto_plan: bool,
68 pub keep_going: bool,
69 pub timeout: u32,
70 pub idle_timeout: u32,
71 pub json_stream: bool,
72 pub review: bool,
74}
75
76#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub enum UnitAction {
79 Implement,
80}
81
82impl fmt::Display for UnitAction {
83 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84 match self {
85 UnitAction::Implement => write!(f, "implement"),
86 }
87 }
88}
89
90#[derive(Debug)]
92#[allow(dead_code)]
93struct AgentResult {
94 id: String,
95 title: String,
96 action: UnitAction,
97 success: bool,
98 duration: Duration,
99 total_tokens: Option<u64>,
100 total_cost: Option<f64>,
101 error: Option<String>,
102 tool_count: usize,
103 turns: usize,
104 failure_summary: Option<String>,
105}
106
107static SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false);
113
114static CHILD_PIDS: Mutex<Vec<u32>> = Mutex::new(Vec::new());
116
117fn shutdown_requested() -> bool {
119 SHUTDOWN_REQUESTED.load(Ordering::SeqCst)
120}
121
122fn install_signal_handlers() {
128 unsafe {
129 libc::signal(
130 libc::SIGINT,
131 signal_handler as *const () as libc::sighandler_t,
132 );
133 libc::signal(
134 libc::SIGTERM,
135 signal_handler as *const () as libc::sighandler_t,
136 );
137 }
138}
139
140extern "C" fn signal_handler(_sig: libc::c_int) {
141 SHUTDOWN_REQUESTED.store(true, Ordering::SeqCst);
142}
143
144fn register_child_pid(pid: u32) {
146 if let Ok(mut pids) = CHILD_PIDS.lock() {
147 pids.push(pid);
148 }
149}
150
151fn unregister_child_pid(pid: u32) {
153 if let Ok(mut pids) = CHILD_PIDS.lock() {
154 pids.retain(|&p| p != pid);
155 }
156}
157
158fn kill_all_children() {
160 if let Ok(pids) = CHILD_PIDS.lock() {
161 for &pid in pids.iter() {
162 unsafe {
163 libc::kill(pid as i32, libc::SIGTERM);
164 }
165 }
166 }
167}
168
169fn force_kill_all_children() {
171 if let Ok(pids) = CHILD_PIDS.lock() {
172 for &pid in pids.iter() {
173 unsafe {
174 libc::kill(pid as i32, libc::SIGKILL);
175 }
176 }
177 }
178}
179
180#[derive(Debug, Clone, PartialEq, Eq)]
182enum SpawnMode {
183 Template {
185 run_template: String,
186 plan_template: Option<String>,
187 },
188 Direct,
190}
191
192#[derive(Debug, Clone, PartialEq, Eq)]
193struct DecisionWarning {
194 id: String,
195 title: String,
196 decisions: Vec<String>,
197}
198
199fn collect_decision_warnings(
200 mana_dir: &Path,
201 units: &[SizedUnit],
202 index: &crate::index::Index,
203) -> Result<Vec<DecisionWarning>> {
204 let mut warnings = Vec::new();
205
206 for unit in units {
207 let Some(entry) = index.units.iter().find(|entry| entry.id == unit.id) else {
208 continue;
209 };
210
211 if !entry.has_decisions {
212 continue;
213 }
214
215 let unit_path = crate::discovery::find_unit_file(mana_dir, &unit.id)?;
216 let unit = Unit::from_file(&unit_path)?;
217 if unit.decisions.is_empty() {
218 continue;
219 }
220
221 warnings.push(DecisionWarning {
222 id: unit.id,
223 title: unit.title,
224 decisions: unit.decisions,
225 });
226 }
227
228 warnings.sort_by(|a, b| crate::util::natural_cmp(&a.id, &b.id));
229 Ok(warnings)
230}
231
232fn format_decision_warning_message(warnings: &[DecisionWarning]) -> String {
233 let mut message = String::new();
234
235 if warnings.len() == 1 {
236 let warning = &warnings[0];
237 message.push_str(&format!(
238 "⚠ Unit {} has {} unresolved decision{} — agent may make wrong choices:\n",
239 warning.id,
240 warning.decisions.len(),
241 if warning.decisions.len() == 1 {
242 ""
243 } else {
244 "s"
245 }
246 ));
247 for (idx, decision) in warning.decisions.iter().enumerate() {
248 message.push_str(&format!(" {}: {}\n", idx, decision));
249 }
250 return message;
251 }
252
253 message.push_str(&format!(
254 "⚠ {} units have unresolved decisions — agents may make wrong choices:\n",
255 warnings.len()
256 ));
257 for warning in warnings {
258 message.push_str(&format!(
259 "Unit {}: {} ({} unresolved)\n",
260 warning.id,
261 warning.title,
262 warning.decisions.len()
263 ));
264 for (idx, decision) in warning.decisions.iter().enumerate() {
265 message.push_str(&format!(" {}: {}\n", idx, decision));
266 }
267 }
268
269 message
270}
271
272fn confirm_dispatch_with_decisions(
273 warnings: &[DecisionWarning],
274 json_stream: bool,
275) -> Result<bool> {
276 if warnings.is_empty() {
277 return Ok(true);
278 }
279
280 eprint!("{}", format_decision_warning_message(warnings));
281
282 if json_stream || !std::io::stdin().is_terminal() {
283 return Ok(true);
284 }
285
286 eprint!("Dispatch anyway? [y/N] ");
287 let mut input = String::new();
288 std::io::stdin().read_line(&mut input)?;
289 Ok(input.trim().eq_ignore_ascii_case("y"))
290}
291
292pub fn cmd_run(mana_dir: &Path, args: RunArgs) -> Result<()> {
294 install_signal_handlers();
296
297 let config = Config::load_with_extends(mana_dir)?;
299 let spawn_mode = determine_spawn_mode(&config);
300
301 if spawn_mode == SpawnMode::Direct && !imp_available() && !pi_available() {
302 anyhow::bail!(
303 "No agent configured and neither `imp` nor `pi` found on PATH.\n\n\
304 Either:\n \
305 1. Install imp (Rust): cargo install imp-cli\n \
306 2. Install pi (Node): npm i -g @mariozechner/pi-coding-agent\n \
307 3. Set a run template: mana config set run \"<command>\"\n\n\
308 The command template uses {{id}} as a placeholder for the unit ID.\n\n\
309 Examples:\n \
310 mana config set run \"imp run {{id}} && mana close {{id}}\"\n \
311 mana config set run \"pi @.mana/{{id}}-*.md 'implement and mana close {{id}}'\""
312 );
313 }
314
315 if let SpawnMode::Template {
316 ref run_template, ..
317 } = spawn_mode
318 {
319 let _ = run_template;
321 }
322
323 if args.loop_mode {
324 run_loop(mana_dir, &config, &args, &spawn_mode)
325 } else {
326 run_once(mana_dir, &config, &args, &spawn_mode)
327 }
328}
329
330fn determine_spawn_mode(config: &Config) -> SpawnMode {
332 if let Some(ref run) = config.run {
333 SpawnMode::Template {
334 run_template: run.clone(),
335 plan_template: config.plan.clone(),
336 }
337 } else {
338 SpawnMode::Direct
339 }
340}
341
342fn imp_available() -> bool {
344 Command::new("imp")
345 .arg("--version")
346 .stdout(Stdio::null())
347 .stderr(Stdio::null())
348 .status()
349 .map(|s| s.success())
350 .unwrap_or(false)
351}
352
353fn pi_available() -> bool {
355 Command::new("pi")
356 .arg("--version")
357 .stdout(Stdio::null())
358 .stderr(Stdio::null())
359 .status()
360 .map(|s| s.success())
361 .unwrap_or(false)
362}
363
364fn run_once(
366 mana_dir: &Path,
367 config: &Config,
368 args: &RunArgs,
369 spawn_mode: &SpawnMode,
370) -> Result<()> {
371 if shutdown_requested() {
373 if !args.json_stream {
374 eprintln!("\nShutdown signal received, aborting.");
375 }
376 return Ok(());
377 }
378
379 let plan = plan_dispatch(
380 mana_dir,
381 config,
382 args.id.as_deref(),
383 args.auto_plan,
384 args.dry_run,
385 )?;
386
387 if plan.waves.is_empty() && plan.skipped.is_empty() {
388 if args.json_stream {
389 stream::emit_error("No ready units");
390 } else {
391 eprintln!("No ready units. Use `mana status` to see what's going on.");
392 }
393 return Ok(());
394 }
395
396 if args.dry_run {
397 if args.json_stream {
398 print_plan_json(&plan, args.id.as_deref());
399 } else {
400 print_plan(&plan);
401 }
402 return Ok(());
403 }
404
405 let decision_warnings = collect_decision_warnings(mana_dir, &plan.all_units, &plan.index)?;
406 if !confirm_dispatch_with_decisions(&decision_warnings, args.json_stream)? {
407 if !args.json_stream {
408 eprintln!("Dispatch cancelled.");
409 }
410 return Ok(());
411 }
412
413 if !plan.skipped.is_empty() && !args.json_stream {
415 eprintln!("{} unit(s) blocked:", plan.skipped.len());
416 for bb in &plan.skipped {
417 eprintln!(" ⚠ {} {} ({})", bb.id, bb.title, bb.reason);
418 }
419 eprintln!();
420 }
421
422 let total_units: usize = plan.waves.iter().map(|w| w.units.len()).sum();
423 let total_waves = plan.waves.len();
424 let parent_id = args.id.as_deref().unwrap_or("all");
425
426 if args.json_stream {
427 let units_info: Vec<stream::UnitInfo> = plan
428 .waves
429 .iter()
430 .enumerate()
431 .flat_map(|(wave_idx, wave)| {
432 wave.units.iter().map(move |b| stream::UnitInfo {
433 id: b.id.clone(),
434 title: b.title.clone(),
435 round: wave_idx + 1,
436 })
437 })
438 .collect();
439 stream::emit(&StreamEvent::RunStart {
440 parent_id: parent_id.to_string(),
441 total_units,
442 total_rounds: total_waves,
443 units: units_info,
444 });
445 }
446
447 let run_cfg = RunConfig {
448 max_jobs: args.jobs.min(config.max_concurrent) as usize,
449 timeout_minutes: args.timeout,
450 idle_timeout_minutes: args.idle_timeout,
451 json_stream: args.json_stream,
452 file_locking: config.file_locking,
453 run_model: config.run_model.clone(),
454 batch_verify: config.batch_verify,
455 memory_reserve_mb: config.memory_reserve_mb,
456 };
457 let run_start = Instant::now();
458 let total_done;
459 let mut total_failed;
460 let mut any_failed;
461 let mut total_tokens: u64 = 0;
462 let mut total_cost: f64 = 0.0;
463 let mut successful_ids: Vec<String> = Vec::new();
465
466 match spawn_mode {
467 SpawnMode::Direct => {
468 if !args.json_stream {
469 eprintln!("Dispatching {} unit(s)...", total_units);
470 }
471
472 let (results, had_failure) = run_ready_queue_direct(
475 mana_dir,
476 &plan.all_units,
477 &plan.index,
478 &run_cfg,
479 args.keep_going,
480 )?;
481
482 let mut done = 0u32;
483 let mut failed = 0u32;
484 for result in &results {
485 total_tokens += result.total_tokens.unwrap_or(0);
486 total_cost += result.total_cost.unwrap_or(0.0);
487 if result.success {
488 if args.json_stream {
489 stream::emit(&StreamEvent::UnitDone {
490 id: result.id.clone(),
491 success: true,
492 duration_secs: result.duration.as_secs(),
493 error: None,
494 total_tokens: result.total_tokens,
495 total_cost: result.total_cost,
496 tool_count: Some(result.tool_count),
497 turns: Some(result.turns),
498 failure_summary: None,
499 });
500 }
501 done += 1;
502 successful_ids.push(result.id.clone());
503 } else {
504 if args.json_stream {
505 stream::emit(&StreamEvent::UnitDone {
506 id: result.id.clone(),
507 success: false,
508 duration_secs: result.duration.as_secs(),
509 error: result.error.clone(),
510 total_tokens: result.total_tokens,
511 total_cost: result.total_cost,
512 tool_count: Some(result.tool_count),
513 turns: Some(result.turns),
514 failure_summary: result.failure_summary.clone(),
515 });
516 }
517 failed += 1;
518 }
519 }
520 total_done = done;
521 total_failed = failed;
522 any_failed = had_failure;
523
524 if run_cfg.batch_verify {
527 match mana_core::ops::batch_verify::batch_verify(mana_dir) {
528 Ok(bv) => {
529 for id in &bv.passed {
531 if !successful_ids.contains(id) {
532 successful_ids.push(id.clone());
533 }
534 }
535 total_failed += bv.failed.len() as u32;
537 if !bv.failed.is_empty() {
538 any_failed = true;
539 }
540
541 if args.json_stream {
542 stream::emit(&StreamEvent::BatchVerify {
543 commands_run: bv.commands_run,
544 passed: bv.passed.clone(),
545 failed: bv.failed.iter().map(|f| f.unit_id.clone()).collect(),
546 });
547 } else {
548 print_batch_verify_result(&bv);
549 }
550 }
551 Err(e) => {
552 eprintln!("Batch verify error: {}", e);
553 any_failed = true;
554 }
555 }
556 }
557 }
558
559 SpawnMode::Template { .. } => {
560 let mut done = 0u32;
562 let mut failed = 0u32;
563 let mut had_failure = false;
564
565 for (wave_idx, wave) in plan.waves.iter().enumerate() {
566 if shutdown_requested() {
568 if !args.json_stream {
569 eprintln!("\nShutdown signal received, stopping.");
570 }
571 had_failure = true;
572 break;
573 }
574
575 if args.json_stream {
576 stream::emit(&StreamEvent::RoundStart {
577 round: wave_idx + 1,
578 total_rounds: total_waves,
579 unit_count: wave.units.len(),
580 });
581 } else {
582 eprintln!("Wave {}: {} unit(s)", wave_idx + 1, wave.units.len());
583 }
584
585 let results = run_wave(mana_dir, &wave.units, spawn_mode, &run_cfg, wave_idx + 1)?;
586
587 let mut wave_success = 0usize;
588 let mut wave_failed = 0usize;
589
590 for result in &results {
591 let duration = format_duration(result.duration);
592 if result.success {
593 if args.json_stream {
594 stream::emit(&StreamEvent::UnitDone {
595 id: result.id.clone(),
596 success: true,
597 duration_secs: result.duration.as_secs(),
598 error: None,
599 total_tokens: result.total_tokens,
600 total_cost: result.total_cost,
601 tool_count: Some(result.tool_count),
602 turns: Some(result.turns),
603 failure_summary: None,
604 });
605 } else {
606 eprintln!(" ✓ {} {} {}", result.id, result.title, duration);
607 }
608 done += 1;
609 wave_success += 1;
610 successful_ids.push(result.id.clone());
611 } else {
612 if args.json_stream {
613 stream::emit(&StreamEvent::UnitDone {
614 id: result.id.clone(),
615 success: false,
616 duration_secs: result.duration.as_secs(),
617 error: result.error.clone(),
618 total_tokens: result.total_tokens,
619 total_cost: result.total_cost,
620 tool_count: Some(result.tool_count),
621 turns: Some(result.turns),
622 failure_summary: result.failure_summary.clone(),
623 });
624 } else {
625 let err = result.error.as_deref().unwrap_or("failed");
626 eprintln!(
627 " ✗ {} {} {} ({})",
628 result.id, result.title, duration, err
629 );
630 }
631 failed += 1;
632 wave_failed += 1;
633 had_failure = true;
634 }
635 }
636
637 if args.json_stream {
638 stream::emit(&StreamEvent::RoundEnd {
639 round: wave_idx + 1,
640 success_count: wave_success,
641 failed_count: wave_failed,
642 });
643 }
644
645 if had_failure && !args.keep_going {
646 break;
647 }
648 }
649
650 total_done = done;
651 total_failed = failed;
652 any_failed = had_failure;
653 }
654 }
655
656 if args.review && !successful_ids.is_empty() {
659 for id in &successful_ids {
660 if !args.json_stream {
661 eprintln!("Review: checking {} ...", id);
662 }
663 if let Err(e) = cmd_review(
664 mana_dir,
665 ReviewArgs {
666 id: id.clone(),
667 model: None,
668 diff_only: false,
669 },
670 ) {
671 eprintln!("Review: warning — review of {} failed: {}", id, e);
672 }
673 }
674 }
675
676 if args.json_stream {
677 stream::emit(&StreamEvent::RunEnd {
678 total_success: total_done as usize,
679 total_failed: total_failed as usize,
680 duration_secs: run_start.elapsed().as_secs(),
681 });
682 } else {
683 let elapsed = format_duration(run_start.elapsed());
684 let mut summary = format!(
685 "\nDone: {} succeeded, {} failed, {} skipped ({})",
686 total_done,
687 total_failed,
688 plan.skipped.len(),
689 elapsed,
690 );
691 if total_tokens > 0 || total_cost > 0.0 {
692 let token_str = if total_tokens >= 1_000_000 {
693 format!("{:.1}M tokens", total_tokens as f64 / 1_000_000.0)
694 } else if total_tokens >= 1_000 {
695 format!("{}k tokens", total_tokens / 1_000)
696 } else {
697 format!("{} tokens", total_tokens)
698 };
699 summary.push_str(&format!(" [{}, ${:.2}]", token_str, total_cost));
700 }
701 eprintln!("{}", summary);
702 }
703
704 if any_failed && !args.keep_going {
705 anyhow::bail!("Some agents failed");
706 }
707
708 Ok(())
709}
710
711fn run_loop(
713 mana_dir: &Path,
714 config: &Config,
715 args: &RunArgs,
716 _spawn_mode: &SpawnMode,
717) -> Result<()> {
718 let max_loops = if config.max_loops == 0 {
719 u32::MAX
720 } else {
721 config.max_loops
722 };
723
724 for iteration in 0..max_loops {
725 if shutdown_requested() {
727 if !args.json_stream {
728 eprintln!("\nShutdown signal received, stopping.");
729 }
730 return Ok(());
731 }
732
733 if iteration > 0 && !args.json_stream {
734 eprintln!("\n--- Loop iteration {} ---\n", iteration + 1);
735 }
736
737 let plan = plan_dispatch(mana_dir, config, args.id.as_deref(), args.auto_plan, false)?;
738
739 if plan.waves.is_empty() {
740 if !args.json_stream {
741 if iteration == 0 {
742 eprintln!("No ready units. Use `mana status` to see what's going on.");
743 } else {
744 eprintln!("No more ready units. Stopping.");
745 }
746 }
747 return Ok(());
748 }
749
750 let inner_args = RunArgs {
752 id: args.id.clone(),
753 jobs: args.jobs,
754 dry_run: false,
755 loop_mode: false,
756 auto_plan: args.auto_plan,
757 keep_going: args.keep_going,
758 timeout: args.timeout,
759 idle_timeout: args.idle_timeout,
760 json_stream: args.json_stream,
761 review: args.review,
762 };
763
764 let config = Config::load_with_extends(mana_dir)?;
766 let spawn_mode = determine_spawn_mode(&config);
767 match run_once(mana_dir, &config, &inner_args, &spawn_mode) {
768 Ok(()) => {}
769 Err(e) => {
770 if args.keep_going {
771 eprintln!("Warning: {}", e);
772 } else {
773 return Err(e);
774 }
775 }
776 }
777 }
778
779 eprintln!("Reached max_loops ({}). Stopping.", max_loops);
780 Ok(())
781}
782
783fn print_batch_verify_result(result: &mana_core::ops::batch_verify::BatchVerifyResult) {
790 let total = result.passed.len() + result.failed.len();
791 eprintln!(
792 "\nBatch verify: {} command{}, {}/{} unit{} passed",
793 result.commands_run,
794 if result.commands_run == 1 { "" } else { "s" },
795 result.passed.len(),
796 total,
797 if total == 1 { "" } else { "s" },
798 );
799
800 if !result.passed.is_empty() {
801 eprintln!(
802 " ✓ {} unit{} passed",
803 result.passed.len(),
804 if result.passed.len() == 1 { "" } else { "s" }
805 );
806 }
807
808 let mut by_cmd: std::collections::HashMap<&str, Vec<&str>> = std::collections::HashMap::new();
810 for failure in &result.failed {
811 by_cmd
812 .entry(&failure.verify_command)
813 .or_default()
814 .push(&failure.unit_id);
815 }
816
817 let mut cmd_entries: Vec<(&str, Vec<&str>)> = by_cmd.into_iter().collect();
819 cmd_entries.sort_by_key(|(cmd, _)| *cmd);
820
821 for (cmd, ids) in cmd_entries {
822 let ids_str = ids.join(", ");
823 let unit_word = if ids.len() == 1 { "unit" } else { "units" };
824 let exit_info = result
826 .failed
827 .iter()
828 .find(|f| f.verify_command == cmd)
829 .map(|f| {
830 if f.timed_out {
831 " — timed out".to_string()
832 } else if let Some(code) = f.exit_code {
833 format!(" — exit code {}", code)
834 } else {
835 String::new()
836 }
837 })
838 .unwrap_or_default();
839 eprintln!(" ✗ {} ({}: {}){}", cmd, unit_word, ids_str, exit_info);
840 }
841}
842
843pub(super) fn format_duration(d: Duration) -> String {
845 let secs = d.as_secs();
846 format!("{}:{:02}", secs / 60, secs % 60)
847}
848
849pub fn find_unit_file(mana_dir: &Path, id: &str) -> Result<PathBuf> {
851 crate::discovery::find_unit_file(mana_dir, id)
852}
853
854#[cfg(test)]
855mod tests {
856 use super::*;
857 use std::fs;
858 use tempfile::TempDir;
859
860 fn make_mana_dir() -> (TempDir, std::path::PathBuf) {
861 let dir = TempDir::new().unwrap();
862 let mana_dir = dir.path().join(".mana");
863 fs::create_dir(&mana_dir).unwrap();
864 (dir, mana_dir)
865 }
866
867 fn write_config(mana_dir: &std::path::Path, run: Option<&str>) {
868 let run_line = match run {
869 Some(r) => format!("run: \"{}\"\n", r),
870 None => String::new(),
871 };
872 fs::write(
873 mana_dir.join("config.yaml"),
874 format!("project: test\nnext_id: 1\n{}", run_line),
875 )
876 .unwrap();
877 }
878
879 fn default_args() -> RunArgs {
880 RunArgs {
881 id: None,
882 jobs: 4,
883 dry_run: false,
884 loop_mode: false,
885 auto_plan: false,
886 keep_going: false,
887 timeout: 30,
888 idle_timeout: 5,
889 json_stream: false,
890 review: false,
891 }
892 }
893
894 #[test]
895 fn cmd_run_errors_when_no_run_template_and_no_pi() {
896 let (_dir, mana_dir) = make_mana_dir();
897 write_config(&mana_dir, None);
898
899 let args = default_args();
900
901 let result = cmd_run(&mana_dir, args);
902 if !pi_available() && !imp_available() {
906 assert!(result.is_err());
907 let err = result.unwrap_err().to_string();
908 assert!(
909 err.contains("No agent configured") || err.contains("not found"),
910 "Error should mention missing agent: {}",
911 err
912 );
913 }
914 }
915
916 #[test]
917 fn dry_run_does_not_spawn() {
918 let (_dir, mana_dir) = make_mana_dir();
919 write_config(&mana_dir, Some("echo {id}"));
920
921 let mut unit = crate::unit::Unit::new("1", "Test unit");
923 unit.verify = Some("echo ok".to_string());
924 unit.to_file(mana_dir.join("1-test.md")).unwrap();
925
926 let args = RunArgs {
927 dry_run: true,
928 ..default_args()
929 };
930
931 let result = cmd_run(&mana_dir, args);
933 assert!(result.is_ok());
934 }
935
936 #[test]
937 fn dry_run_with_json_stream() {
938 let (_dir, mana_dir) = make_mana_dir();
939 write_config(&mana_dir, Some("echo {id}"));
940
941 let mut unit = crate::unit::Unit::new("1", "Test unit");
942 unit.verify = Some("echo ok".to_string());
943 unit.to_file(mana_dir.join("1-test.md")).unwrap();
944
945 let args = RunArgs {
946 dry_run: true,
947 json_stream: true,
948 ..default_args()
949 };
950
951 let result = cmd_run(&mana_dir, args);
953 assert!(result.is_ok());
954 }
955
956 #[test]
957 fn format_duration_formats_correctly() {
958 assert_eq!(format_duration(Duration::from_secs(0)), "0:00");
959 assert_eq!(format_duration(Duration::from_secs(32)), "0:32");
960 assert_eq!(format_duration(Duration::from_secs(62)), "1:02");
961 assert_eq!(format_duration(Duration::from_secs(600)), "10:00");
962 }
963
964 #[test]
965 fn determine_spawn_mode_template_when_run_set() {
966 let config = Config {
967 project: "test".to_string(),
968 next_id: 1,
969 auto_close_parent: true,
970 run: Some("echo {id}".to_string()),
971 plan: Some("plan {id}".to_string()),
972 max_loops: 10,
973 max_concurrent: 4,
974 poll_interval: 30,
975 extends: vec![],
976 rules_file: None,
977 file_locking: false,
978 worktree: false,
979 on_close: None,
980 on_fail: None,
981 post_plan: None,
982 verify_timeout: None,
983 review: None,
984 user: None,
985 user_email: None,
986 auto_commit: false,
987 commit_template: None,
988 research: None,
989 run_model: None,
990 plan_model: None,
991 review_model: None,
992 research_model: None,
993 batch_verify: false,
994 memory_reserve_mb: 0,
995 notify: None,
996 };
997 let mode = determine_spawn_mode(&config);
998 assert_eq!(
999 mode,
1000 SpawnMode::Template {
1001 run_template: "echo {id}".to_string(),
1002 plan_template: Some("plan {id}".to_string()),
1003 }
1004 );
1005 }
1006
1007 #[test]
1008 fn determine_spawn_mode_direct_when_no_run() {
1009 let config = Config {
1010 project: "test".to_string(),
1011 next_id: 1,
1012 auto_close_parent: true,
1013 run: None,
1014 plan: None,
1015 max_loops: 10,
1016 max_concurrent: 4,
1017 poll_interval: 30,
1018 extends: vec![],
1019 rules_file: None,
1020 file_locking: false,
1021 worktree: false,
1022 on_close: None,
1023 on_fail: None,
1024 post_plan: None,
1025 verify_timeout: None,
1026 review: None,
1027 user: None,
1028 user_email: None,
1029 auto_commit: false,
1030 commit_template: None,
1031 research: None,
1032 run_model: None,
1033 plan_model: None,
1034 review_model: None,
1035 research_model: None,
1036 batch_verify: false,
1037 memory_reserve_mb: 0,
1038 notify: None,
1039 };
1040 let mode = determine_spawn_mode(&config);
1041 assert_eq!(mode, SpawnMode::Direct);
1042 }
1043
1044 #[test]
1045 fn agent_result_tracks_tokens_and_cost() {
1046 let result = AgentResult {
1047 id: "1".to_string(),
1048 title: "Test".to_string(),
1049 action: UnitAction::Implement,
1050 success: true,
1051 duration: Duration::from_secs(10),
1052 total_tokens: Some(5000),
1053 total_cost: Some(0.03),
1054 error: None,
1055 tool_count: 5,
1056 turns: 2,
1057 failure_summary: None,
1058 };
1059 assert_eq!(result.total_tokens, Some(5000));
1060 assert_eq!(result.total_cost, Some(0.03));
1061 }
1062
1063 #[test]
1064 fn collect_decision_warnings_only_returns_dispatch_units_with_decisions() {
1065 let (_dir, mana_dir) = make_mana_dir();
1066 write_config(&mana_dir, Some("echo {id}"));
1067
1068 let mut unit1 = crate::unit::Unit::new("1", "Has decisions");
1069 unit1.verify = Some("echo ok".to_string());
1070 unit1.decisions = vec!["JWT or session cookies?".to_string()];
1071 unit1.to_file(mana_dir.join("1-has-decisions.md")).unwrap();
1072
1073 let mut unit2 = crate::unit::Unit::new("2", "No decisions");
1074 unit2.verify = Some("echo ok".to_string());
1075 unit2.to_file(mana_dir.join("2-no-decisions.md")).unwrap();
1076
1077 let index = crate::index::Index::build(&mana_dir).unwrap();
1078 let units = vec![
1079 SizedUnit {
1080 id: "1".to_string(),
1081 title: "Has decisions".to_string(),
1082 action: UnitAction::Implement,
1083 priority: 2,
1084 dependencies: Vec::new(),
1085 parent: None,
1086 produces: Vec::new(),
1087 requires: Vec::new(),
1088 paths: Vec::new(),
1089 model: None,
1090 },
1091 SizedUnit {
1092 id: "2".to_string(),
1093 title: "No decisions".to_string(),
1094 action: UnitAction::Implement,
1095 priority: 2,
1096 dependencies: Vec::new(),
1097 parent: None,
1098 produces: Vec::new(),
1099 requires: Vec::new(),
1100 paths: Vec::new(),
1101 model: None,
1102 },
1103 ];
1104
1105 let warnings = collect_decision_warnings(&mana_dir, &units, &index).unwrap();
1106 assert_eq!(warnings.len(), 1);
1107 assert_eq!(warnings[0].id, "1");
1108 assert_eq!(warnings[0].decisions, vec!["JWT or session cookies?"]);
1109 }
1110
1111 #[test]
1112 fn format_decision_warning_message_matches_single_unit_prompt() {
1113 let message = format_decision_warning_message(&[DecisionWarning {
1114 id: "42".to_string(),
1115 title: "Implement auth".to_string(),
1116 decisions: vec![
1117 "JWT or session cookies?".to_string(),
1118 "Which JWT library?".to_string(),
1119 ],
1120 }]);
1121
1122 assert!(message.contains("⚠ Unit 42 has 2 unresolved decisions"));
1123 assert!(message.contains("0: JWT or session cookies?"));
1124 assert!(message.contains("1: Which JWT library?"));
1125 }
1126
1127 #[test]
1128 fn signal_flag_defaults_to_false() {
1129 SHUTDOWN_REQUESTED.store(false, Ordering::SeqCst);
1130 assert!(!shutdown_requested());
1131 }
1132
1133 #[test]
1134 fn signal_flag_can_be_toggled() {
1135 SHUTDOWN_REQUESTED.store(true, Ordering::SeqCst);
1136 assert!(shutdown_requested());
1137 SHUTDOWN_REQUESTED.store(false, Ordering::SeqCst);
1139 assert!(!shutdown_requested());
1140 }
1141
1142 #[test]
1143 fn child_pid_tracking() {
1144 if let Ok(mut pids) = CHILD_PIDS.lock() {
1146 pids.clear();
1147 }
1148
1149 register_child_pid(1234);
1150 register_child_pid(5678);
1151
1152 let count = CHILD_PIDS.lock().unwrap().len();
1153 assert_eq!(count, 2);
1154
1155 unregister_child_pid(1234);
1156 let count = CHILD_PIDS.lock().unwrap().len();
1157 assert_eq!(count, 1);
1158
1159 unregister_child_pid(9999);
1161 let count = CHILD_PIDS.lock().unwrap().len();
1162 assert_eq!(count, 1);
1163
1164 unregister_child_pid(5678);
1165 let count = CHILD_PIDS.lock().unwrap().len();
1166 assert_eq!(count, 0);
1167 }
1168}