Skip to main content

mana/commands/run/
mod.rs

1//! `mana run` — Dispatch ready units to agents.
2//!
3//! Finds ready units, groups them into waves by dependency order,
4//! and spawns agents for each wave.
5//!
6//! Modes:
7//! - `mana run` — one-shot: dispatch all ready units, then exit
8//! - `mana run 5.1` — dispatch a single unit (or its ready children if parent)
9//! - `mana run --dry-run` — show plan without spawning
10//! - `mana run --loop` — keep running until no ready units remain
11//! - `mana run --json-stream` — emit JSON stream events to stdout
12//!
13//! Spawning modes:
14//! - **Template mode** (backward compat): If `config.run` is set, spawn via `sh -c <template>`.
15//! - **Direct mode**: If no template is configured but `pi` is on PATH, spawn pi directly
16//!   with `--mode json --print --no-session`, monitoring with timeouts and parsing events.
17
18pub(super) mod memory;
19mod plan;
20mod ready_queue;
21mod wave;
22
23pub use plan::{DispatchPlan, SizedUnit};
24pub use wave::Wave;
25
26use std::fmt;
27use std::io::IsTerminal;
28use std::path::{Path, PathBuf};
29use std::process::{Command, Stdio};
30use std::sync::atomic::{AtomicBool, Ordering};
31use std::sync::Mutex;
32use std::time::{Duration, Instant};
33
34use anyhow::Result;
35
36use crate::commands::review::{cmd_review, ReviewArgs};
37use crate::config::Config;
38use crate::stream::{self, StreamEvent};
39use crate::unit::Unit;
40
41use plan::{plan_dispatch, print_plan, print_plan_json};
42use ready_queue::run_ready_queue_direct;
43use wave::run_wave;
44
45/// Shared config passed to wave/ready-queue runners.
46pub(super) struct RunConfig {
47    pub max_jobs: usize,
48    pub timeout_minutes: u32,
49    pub idle_timeout_minutes: u32,
50    pub json_stream: bool,
51    pub file_locking: bool,
52    /// Config-level model for run/implement (substituted into `{model}` in templates).
53    pub run_model: Option<String>,
54    /// When true, agents defer verify by exiting with AwaitingVerify status.
55    /// The runner collects all deferred units and runs each unique verify command once.
56    pub batch_verify: bool,
57    /// Minimum available system memory (MB) to reserve. 0 = disabled.
58    pub memory_reserve_mb: u64,
59}
60
61/// Arguments for cmd_run, matching the CLI definition.
62pub struct RunArgs {
63    pub id: Option<String>,
64    pub jobs: u32,
65    pub dry_run: bool,
66    pub loop_mode: bool,
67    pub auto_plan: bool,
68    pub keep_going: bool,
69    pub timeout: u32,
70    pub idle_timeout: u32,
71    pub json_stream: bool,
72    /// If true, run adversarial review after each successful unit close.
73    pub review: bool,
74}
75
76/// What action to take for a unit.
77#[derive(Debug, Clone, Copy, PartialEq, Eq)]
78pub enum UnitAction {
79    Implement,
80}
81
82impl fmt::Display for UnitAction {
83    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
84        match self {
85            UnitAction::Implement => write!(f, "implement"),
86        }
87    }
88}
89
90/// Result of a completed agent.
91#[derive(Debug)]
92#[allow(dead_code)]
93struct AgentResult {
94    id: String,
95    title: String,
96    action: UnitAction,
97    success: bool,
98    duration: Duration,
99    total_tokens: Option<u64>,
100    total_cost: Option<f64>,
101    error: Option<String>,
102    tool_count: usize,
103    turns: usize,
104    failure_summary: Option<String>,
105}
106
107// ---------------------------------------------------------------------------
108// Signal handling for clean agent shutdown
109// ---------------------------------------------------------------------------
110
111/// Global flag set by SIGINT/SIGTERM signal handlers to request clean shutdown.
112static SHUTDOWN_REQUESTED: AtomicBool = AtomicBool::new(false);
113
114/// PIDs of running child agent processes, for cleanup on shutdown.
115static CHILD_PIDS: Mutex<Vec<u32>> = Mutex::new(Vec::new());
116
117/// Returns true if a shutdown signal (SIGINT/SIGTERM) has been received.
118fn shutdown_requested() -> bool {
119    SHUTDOWN_REQUESTED.load(Ordering::SeqCst)
120}
121
122/// Install signal handlers for SIGINT and SIGTERM.
123///
124/// Instead of immediately terminating, the handlers set a flag that's checked
125/// in the execution loops. This allows clean shutdown: kill child agents,
126/// release claims, and print a summary.
127fn install_signal_handlers() {
128    unsafe {
129        libc::signal(
130            libc::SIGINT,
131            signal_handler as *const () as libc::sighandler_t,
132        );
133        libc::signal(
134            libc::SIGTERM,
135            signal_handler as *const () as libc::sighandler_t,
136        );
137    }
138}
139
140extern "C" fn signal_handler(_sig: libc::c_int) {
141    SHUTDOWN_REQUESTED.store(true, Ordering::SeqCst);
142}
143
144/// Register a child process PID for shutdown tracking.
145fn register_child_pid(pid: u32) {
146    if let Ok(mut pids) = CHILD_PIDS.lock() {
147        pids.push(pid);
148    }
149}
150
151/// Unregister a child process PID after it exits.
152fn unregister_child_pid(pid: u32) {
153    if let Ok(mut pids) = CHILD_PIDS.lock() {
154        pids.retain(|&p| p != pid);
155    }
156}
157
158/// Send SIGTERM to all tracked child processes for graceful shutdown.
159fn kill_all_children() {
160    if let Ok(pids) = CHILD_PIDS.lock() {
161        for &pid in pids.iter() {
162            unsafe {
163                libc::kill(pid as i32, libc::SIGTERM);
164            }
165        }
166    }
167}
168
169/// Send SIGKILL to all tracked child processes (forced shutdown).
170fn force_kill_all_children() {
171    if let Ok(pids) = CHILD_PIDS.lock() {
172        for &pid in pids.iter() {
173            unsafe {
174                libc::kill(pid as i32, libc::SIGKILL);
175            }
176        }
177    }
178}
179
180/// Which spawning mode to use.
181#[derive(Debug, Clone, PartialEq, Eq)]
182enum SpawnMode {
183    /// Use shell template from config (backward compat).
184    Template {
185        run_template: String,
186        plan_template: Option<String>,
187    },
188    /// Spawn pi directly with JSON output and monitoring.
189    Direct,
190}
191
192#[derive(Debug, Clone, PartialEq, Eq)]
193struct DecisionWarning {
194    id: String,
195    title: String,
196    decisions: Vec<String>,
197}
198
199fn collect_decision_warnings(
200    mana_dir: &Path,
201    units: &[SizedUnit],
202    index: &crate::index::Index,
203) -> Result<Vec<DecisionWarning>> {
204    let mut warnings = Vec::new();
205
206    for unit in units {
207        let Some(entry) = index.units.iter().find(|entry| entry.id == unit.id) else {
208            continue;
209        };
210
211        if !entry.has_decisions {
212            continue;
213        }
214
215        let unit_path = crate::discovery::find_unit_file(mana_dir, &unit.id)?;
216        let unit = Unit::from_file(&unit_path)?;
217        if unit.decisions.is_empty() {
218            continue;
219        }
220
221        warnings.push(DecisionWarning {
222            id: unit.id,
223            title: unit.title,
224            decisions: unit.decisions,
225        });
226    }
227
228    warnings.sort_by(|a, b| crate::util::natural_cmp(&a.id, &b.id));
229    Ok(warnings)
230}
231
232fn format_decision_warning_message(warnings: &[DecisionWarning]) -> String {
233    let mut message = String::new();
234
235    if warnings.len() == 1 {
236        let warning = &warnings[0];
237        message.push_str(&format!(
238            "⚠ Unit {} has {} unresolved decision{} — agent may make wrong choices:\n",
239            warning.id,
240            warning.decisions.len(),
241            if warning.decisions.len() == 1 {
242                ""
243            } else {
244                "s"
245            }
246        ));
247        for (idx, decision) in warning.decisions.iter().enumerate() {
248            message.push_str(&format!("  {}: {}\n", idx, decision));
249        }
250        return message;
251    }
252
253    message.push_str(&format!(
254        "⚠ {} units have unresolved decisions — agents may make wrong choices:\n",
255        warnings.len()
256    ));
257    for warning in warnings {
258        message.push_str(&format!(
259            "Unit {}: {} ({} unresolved)\n",
260            warning.id,
261            warning.title,
262            warning.decisions.len()
263        ));
264        for (idx, decision) in warning.decisions.iter().enumerate() {
265            message.push_str(&format!("  {}: {}\n", idx, decision));
266        }
267    }
268
269    message
270}
271
272fn confirm_dispatch_with_decisions(
273    warnings: &[DecisionWarning],
274    json_stream: bool,
275) -> Result<bool> {
276    if warnings.is_empty() {
277        return Ok(true);
278    }
279
280    eprint!("{}", format_decision_warning_message(warnings));
281
282    if json_stream || !std::io::stdin().is_terminal() {
283        return Ok(true);
284    }
285
286    eprint!("Dispatch anyway? [y/N] ");
287    let mut input = String::new();
288    std::io::stdin().read_line(&mut input)?;
289    Ok(input.trim().eq_ignore_ascii_case("y"))
290}
291
292/// Execute the `mana run` command.
293pub fn cmd_run(mana_dir: &Path, args: RunArgs) -> Result<()> {
294    // Install signal handlers for clean shutdown on Ctrl+C / SIGTERM
295    install_signal_handlers();
296
297    // Determine spawn mode
298    let config = Config::load_with_extends(mana_dir)?;
299    let spawn_mode = determine_spawn_mode(&config);
300
301    if spawn_mode == SpawnMode::Direct && !imp_available() && !pi_available() {
302        anyhow::bail!(
303            "No agent configured and neither `imp` nor `pi` found on PATH.\n\n\
304             Either:\n  \
305               1. Install imp (Rust): cargo install imp-cli\n  \
306               2. Install pi (Node): npm i -g @mariozechner/pi-coding-agent\n  \
307               3. Set a run template: mana config set run \"<command>\"\n\n\
308             The command template uses {{id}} as a placeholder for the unit ID.\n\n\
309             Examples:\n  \
310               mana config set run \"imp run {{id}} && mana close {{id}}\"\n  \
311               mana config set run \"pi @.mana/{{id}}-*.md 'implement and mana close {{id}}'\""
312        );
313    }
314
315    if let SpawnMode::Template {
316        ref run_template, ..
317    } = spawn_mode
318    {
319        // Validate template exists (kept for backward compat error message)
320        let _ = run_template;
321    }
322
323    if args.loop_mode {
324        run_loop(mana_dir, &config, &args, &spawn_mode)
325    } else {
326        run_once(mana_dir, &config, &args, &spawn_mode)
327    }
328}
329
330/// Determine the spawn mode based on config.
331fn determine_spawn_mode(config: &Config) -> SpawnMode {
332    if let Some(ref run) = config.run {
333        SpawnMode::Template {
334            run_template: run.clone(),
335            plan_template: config.plan.clone(),
336        }
337    } else {
338        SpawnMode::Direct
339    }
340}
341
342/// Check if `imp` is available on PATH.
343fn imp_available() -> bool {
344    Command::new("imp")
345        .arg("--version")
346        .stdout(Stdio::null())
347        .stderr(Stdio::null())
348        .status()
349        .map(|s| s.success())
350        .unwrap_or(false)
351}
352
353/// Check if `pi` is available on PATH.
354fn pi_available() -> bool {
355    Command::new("pi")
356        .arg("--version")
357        .stdout(Stdio::null())
358        .stderr(Stdio::null())
359        .status()
360        .map(|s| s.success())
361        .unwrap_or(false)
362}
363
364/// Single dispatch pass: plan → print/execute → report.
365fn run_once(
366    mana_dir: &Path,
367    config: &Config,
368    args: &RunArgs,
369    spawn_mode: &SpawnMode,
370) -> Result<()> {
371    // Check for shutdown before starting execution
372    if shutdown_requested() {
373        if !args.json_stream {
374            eprintln!("\nShutdown signal received, aborting.");
375        }
376        return Ok(());
377    }
378
379    let plan = plan_dispatch(
380        mana_dir,
381        config,
382        args.id.as_deref(),
383        args.auto_plan,
384        args.dry_run,
385    )?;
386
387    if plan.waves.is_empty() && plan.skipped.is_empty() {
388        if args.json_stream {
389            stream::emit_error("No ready units");
390        } else {
391            eprintln!("No ready units. Use `mana status` to see what's going on.");
392        }
393        return Ok(());
394    }
395
396    if args.dry_run {
397        if args.json_stream {
398            print_plan_json(&plan, args.id.as_deref());
399        } else {
400            print_plan(&plan);
401        }
402        return Ok(());
403    }
404
405    let decision_warnings = collect_decision_warnings(mana_dir, &plan.all_units, &plan.index)?;
406    if !confirm_dispatch_with_decisions(&decision_warnings, args.json_stream)? {
407        if !args.json_stream {
408            eprintln!("Dispatch cancelled.");
409        }
410        return Ok(());
411    }
412
413    // Report blocked units (oversized/unscoped)
414    if !plan.skipped.is_empty() && !args.json_stream {
415        eprintln!("{} unit(s) blocked:", plan.skipped.len());
416        for bb in &plan.skipped {
417            eprintln!("  ⚠ {}  {}  ({})", bb.id, bb.title, bb.reason);
418        }
419        eprintln!();
420    }
421
422    let total_units: usize = plan.waves.iter().map(|w| w.units.len()).sum();
423    let total_waves = plan.waves.len();
424    let parent_id = args.id.as_deref().unwrap_or("all");
425
426    if args.json_stream {
427        let units_info: Vec<stream::UnitInfo> = plan
428            .waves
429            .iter()
430            .enumerate()
431            .flat_map(|(wave_idx, wave)| {
432                wave.units.iter().map(move |b| stream::UnitInfo {
433                    id: b.id.clone(),
434                    title: b.title.clone(),
435                    round: wave_idx + 1,
436                })
437            })
438            .collect();
439        stream::emit(&StreamEvent::RunStart {
440            parent_id: parent_id.to_string(),
441            total_units,
442            total_rounds: total_waves,
443            units: units_info,
444        });
445    }
446
447    let run_cfg = RunConfig {
448        max_jobs: args.jobs.min(config.max_concurrent) as usize,
449        timeout_minutes: args.timeout,
450        idle_timeout_minutes: args.idle_timeout,
451        json_stream: args.json_stream,
452        file_locking: config.file_locking,
453        run_model: config.run_model.clone(),
454        batch_verify: config.batch_verify,
455        memory_reserve_mb: config.memory_reserve_mb,
456    };
457    let run_start = Instant::now();
458    let total_done;
459    let mut total_failed;
460    let mut any_failed;
461    let mut total_tokens: u64 = 0;
462    let mut total_cost: f64 = 0.0;
463    // Collect IDs of successfully closed units for --review post-processing
464    let mut successful_ids: Vec<String> = Vec::new();
465
466    match spawn_mode {
467        SpawnMode::Direct => {
468            if !args.json_stream {
469                eprintln!("Dispatching {} unit(s)...", total_units);
470            }
471
472            // Ready-queue: start each unit as soon as its specific deps finish.
473            // Progress (▸ start, ✓/✗ done) is printed in real-time by the queue.
474            let (results, had_failure) = run_ready_queue_direct(
475                mana_dir,
476                &plan.all_units,
477                &plan.index,
478                &run_cfg,
479                args.keep_going,
480            )?;
481
482            let mut done = 0u32;
483            let mut failed = 0u32;
484            for result in &results {
485                total_tokens += result.total_tokens.unwrap_or(0);
486                total_cost += result.total_cost.unwrap_or(0.0);
487                if result.success {
488                    if args.json_stream {
489                        stream::emit(&StreamEvent::UnitDone {
490                            id: result.id.clone(),
491                            success: true,
492                            duration_secs: result.duration.as_secs(),
493                            error: None,
494                            total_tokens: result.total_tokens,
495                            total_cost: result.total_cost,
496                            tool_count: Some(result.tool_count),
497                            turns: Some(result.turns),
498                            failure_summary: None,
499                        });
500                    }
501                    done += 1;
502                    successful_ids.push(result.id.clone());
503                } else {
504                    if args.json_stream {
505                        stream::emit(&StreamEvent::UnitDone {
506                            id: result.id.clone(),
507                            success: false,
508                            duration_secs: result.duration.as_secs(),
509                            error: result.error.clone(),
510                            total_tokens: result.total_tokens,
511                            total_cost: result.total_cost,
512                            tool_count: Some(result.tool_count),
513                            turns: Some(result.turns),
514                            failure_summary: result.failure_summary.clone(),
515                        });
516                    }
517                    failed += 1;
518                }
519            }
520            total_done = done;
521            total_failed = failed;
522            any_failed = had_failure;
523
524            // After all agents complete, run batch verification if enabled.
525            // Each agent exits with AwaitingVerify status; the runner now resolves them.
526            if run_cfg.batch_verify {
527                match mana_core::ops::batch_verify::batch_verify(mana_dir) {
528                    Ok(bv) => {
529                        // Promote agent successes that passed verify into successful_ids
530                        for id in &bv.passed {
531                            if !successful_ids.contains(id) {
532                                successful_ids.push(id.clone());
533                            }
534                        }
535                        // Failures from batch verify count as failed units
536                        total_failed += bv.failed.len() as u32;
537                        if !bv.failed.is_empty() {
538                            any_failed = true;
539                        }
540
541                        if args.json_stream {
542                            stream::emit(&StreamEvent::BatchVerify {
543                                commands_run: bv.commands_run,
544                                passed: bv.passed.clone(),
545                                failed: bv.failed.iter().map(|f| f.unit_id.clone()).collect(),
546                            });
547                        } else {
548                            print_batch_verify_result(&bv);
549                        }
550                    }
551                    Err(e) => {
552                        eprintln!("Batch verify error: {}", e);
553                        any_failed = true;
554                    }
555                }
556            }
557        }
558
559        SpawnMode::Template { .. } => {
560            // Template mode: wave-based execution (legacy)
561            let mut done = 0u32;
562            let mut failed = 0u32;
563            let mut had_failure = false;
564
565            for (wave_idx, wave) in plan.waves.iter().enumerate() {
566                // Check for shutdown signal between waves
567                if shutdown_requested() {
568                    if !args.json_stream {
569                        eprintln!("\nShutdown signal received, stopping.");
570                    }
571                    had_failure = true;
572                    break;
573                }
574
575                if args.json_stream {
576                    stream::emit(&StreamEvent::RoundStart {
577                        round: wave_idx + 1,
578                        total_rounds: total_waves,
579                        unit_count: wave.units.len(),
580                    });
581                } else {
582                    eprintln!("Wave {}: {} unit(s)", wave_idx + 1, wave.units.len());
583                }
584
585                let results = run_wave(mana_dir, &wave.units, spawn_mode, &run_cfg, wave_idx + 1)?;
586
587                let mut wave_success = 0usize;
588                let mut wave_failed = 0usize;
589
590                for result in &results {
591                    let duration = format_duration(result.duration);
592                    if result.success {
593                        if args.json_stream {
594                            stream::emit(&StreamEvent::UnitDone {
595                                id: result.id.clone(),
596                                success: true,
597                                duration_secs: result.duration.as_secs(),
598                                error: None,
599                                total_tokens: result.total_tokens,
600                                total_cost: result.total_cost,
601                                tool_count: Some(result.tool_count),
602                                turns: Some(result.turns),
603                                failure_summary: None,
604                            });
605                        } else {
606                            eprintln!("  ✓ {}  {}  {}", result.id, result.title, duration);
607                        }
608                        done += 1;
609                        wave_success += 1;
610                        successful_ids.push(result.id.clone());
611                    } else {
612                        if args.json_stream {
613                            stream::emit(&StreamEvent::UnitDone {
614                                id: result.id.clone(),
615                                success: false,
616                                duration_secs: result.duration.as_secs(),
617                                error: result.error.clone(),
618                                total_tokens: result.total_tokens,
619                                total_cost: result.total_cost,
620                                tool_count: Some(result.tool_count),
621                                turns: Some(result.turns),
622                                failure_summary: result.failure_summary.clone(),
623                            });
624                        } else {
625                            let err = result.error.as_deref().unwrap_or("failed");
626                            eprintln!(
627                                "  ✗ {}  {}  {} ({})",
628                                result.id, result.title, duration, err
629                            );
630                        }
631                        failed += 1;
632                        wave_failed += 1;
633                        had_failure = true;
634                    }
635                }
636
637                if args.json_stream {
638                    stream::emit(&StreamEvent::RoundEnd {
639                        round: wave_idx + 1,
640                        success_count: wave_success,
641                        failed_count: wave_failed,
642                    });
643                }
644
645                if had_failure && !args.keep_going {
646                    break;
647                }
648            }
649
650            total_done = done;
651            total_failed = failed;
652            any_failed = had_failure;
653        }
654    }
655
656    // Trigger adversarial review for each successfully closed unit if --review is set.
657    // Review runs synchronously after all units in this pass complete.
658    if args.review && !successful_ids.is_empty() {
659        for id in &successful_ids {
660            if !args.json_stream {
661                eprintln!("Review: checking {} ...", id);
662            }
663            if let Err(e) = cmd_review(
664                mana_dir,
665                ReviewArgs {
666                    id: id.clone(),
667                    model: None,
668                    diff_only: false,
669                },
670            ) {
671                eprintln!("Review: warning — review of {} failed: {}", id, e);
672            }
673        }
674    }
675
676    if args.json_stream {
677        stream::emit(&StreamEvent::RunEnd {
678            total_success: total_done as usize,
679            total_failed: total_failed as usize,
680            duration_secs: run_start.elapsed().as_secs(),
681        });
682    } else {
683        let elapsed = format_duration(run_start.elapsed());
684        let mut summary = format!(
685            "\nDone: {} succeeded, {} failed, {} skipped  ({})",
686            total_done,
687            total_failed,
688            plan.skipped.len(),
689            elapsed,
690        );
691        if total_tokens > 0 || total_cost > 0.0 {
692            let token_str = if total_tokens >= 1_000_000 {
693                format!("{:.1}M tokens", total_tokens as f64 / 1_000_000.0)
694            } else if total_tokens >= 1_000 {
695                format!("{}k tokens", total_tokens / 1_000)
696            } else {
697                format!("{} tokens", total_tokens)
698            };
699            summary.push_str(&format!("  [{}, ${:.2}]", token_str, total_cost));
700        }
701        eprintln!("{}", summary);
702    }
703
704    if any_failed && !args.keep_going {
705        anyhow::bail!("Some agents failed");
706    }
707
708    Ok(())
709}
710
711/// Loop mode: keep dispatching until no ready units remain.
712fn run_loop(
713    mana_dir: &Path,
714    config: &Config,
715    args: &RunArgs,
716    _spawn_mode: &SpawnMode,
717) -> Result<()> {
718    let max_loops = if config.max_loops == 0 {
719        u32::MAX
720    } else {
721        config.max_loops
722    };
723
724    for iteration in 0..max_loops {
725        // Check for shutdown signal between loop iterations
726        if shutdown_requested() {
727            if !args.json_stream {
728                eprintln!("\nShutdown signal received, stopping.");
729            }
730            return Ok(());
731        }
732
733        if iteration > 0 && !args.json_stream {
734            eprintln!("\n--- Loop iteration {} ---\n", iteration + 1);
735        }
736
737        let plan = plan_dispatch(mana_dir, config, args.id.as_deref(), args.auto_plan, false)?;
738
739        if plan.waves.is_empty() {
740            if !args.json_stream {
741                if iteration == 0 {
742                    eprintln!("No ready units. Use `mana status` to see what's going on.");
743                } else {
744                    eprintln!("No more ready units. Stopping.");
745                }
746            }
747            return Ok(());
748        }
749
750        // Run one pass (non-loop, non-dry-run)
751        let inner_args = RunArgs {
752            id: args.id.clone(),
753            jobs: args.jobs,
754            dry_run: false,
755            loop_mode: false,
756            auto_plan: args.auto_plan,
757            keep_going: args.keep_going,
758            timeout: args.timeout,
759            idle_timeout: args.idle_timeout,
760            json_stream: args.json_stream,
761            review: args.review,
762        };
763
764        // Reload config each iteration (agents may have changed units)
765        let config = Config::load_with_extends(mana_dir)?;
766        let spawn_mode = determine_spawn_mode(&config);
767        match run_once(mana_dir, &config, &inner_args, &spawn_mode) {
768            Ok(()) => {}
769            Err(e) => {
770                if args.keep_going {
771                    eprintln!("Warning: {}", e);
772                } else {
773                    return Err(e);
774                }
775            }
776        }
777    }
778
779    eprintln!("Reached max_loops ({}). Stopping.", max_loops);
780    Ok(())
781}
782
783/// Print a human-readable summary of a batch verify run.
784///
785/// Example output:
786///   Batch verify: 2 commands, 3/4 units passed
787///     ✓ cargo check -p mana-cli  (units: 1.1, 1.2, 1.3)
788///     ✗ cargo test -p mana-core  (unit: 1.4) — exit code 1
789fn print_batch_verify_result(result: &mana_core::ops::batch_verify::BatchVerifyResult) {
790    let total = result.passed.len() + result.failed.len();
791    eprintln!(
792        "\nBatch verify: {} command{}, {}/{} unit{} passed",
793        result.commands_run,
794        if result.commands_run == 1 { "" } else { "s" },
795        result.passed.len(),
796        total,
797        if total == 1 { "" } else { "s" },
798    );
799
800    if !result.passed.is_empty() {
801        eprintln!(
802            "  ✓ {} unit{} passed",
803            result.passed.len(),
804            if result.passed.len() == 1 { "" } else { "s" }
805        );
806    }
807
808    // Group failures by verify command for compact display.
809    let mut by_cmd: std::collections::HashMap<&str, Vec<&str>> = std::collections::HashMap::new();
810    for failure in &result.failed {
811        by_cmd
812            .entry(&failure.verify_command)
813            .or_default()
814            .push(&failure.unit_id);
815    }
816
817    // Sort for deterministic output.
818    let mut cmd_entries: Vec<(&str, Vec<&str>)> = by_cmd.into_iter().collect();
819    cmd_entries.sort_by_key(|(cmd, _)| *cmd);
820
821    for (cmd, ids) in cmd_entries {
822        let ids_str = ids.join(", ");
823        let unit_word = if ids.len() == 1 { "unit" } else { "units" };
824        // Find exit code for this command from the first matching failure
825        let exit_info = result
826            .failed
827            .iter()
828            .find(|f| f.verify_command == cmd)
829            .map(|f| {
830                if f.timed_out {
831                    " — timed out".to_string()
832                } else if let Some(code) = f.exit_code {
833                    format!(" — exit code {}", code)
834                } else {
835                    String::new()
836                }
837            })
838            .unwrap_or_default();
839        eprintln!("  ✗ {}  ({}: {}){}", cmd, unit_word, ids_str, exit_info);
840    }
841}
842
843/// Format a duration as M:SS.
844pub(super) fn format_duration(d: Duration) -> String {
845    let secs = d.as_secs();
846    format!("{}:{:02}", secs / 60, secs % 60)
847}
848
849/// Find the unit file path. Public wrapper for use in other commands.
850pub fn find_unit_file(mana_dir: &Path, id: &str) -> Result<PathBuf> {
851    crate::discovery::find_unit_file(mana_dir, id)
852}
853
854#[cfg(test)]
855mod tests {
856    use super::*;
857    use std::fs;
858    use tempfile::TempDir;
859
860    fn make_mana_dir() -> (TempDir, std::path::PathBuf) {
861        let dir = TempDir::new().unwrap();
862        let mana_dir = dir.path().join(".mana");
863        fs::create_dir(&mana_dir).unwrap();
864        (dir, mana_dir)
865    }
866
867    fn write_config(mana_dir: &std::path::Path, run: Option<&str>) {
868        let run_line = match run {
869            Some(r) => format!("run: \"{}\"\n", r),
870            None => String::new(),
871        };
872        fs::write(
873            mana_dir.join("config.yaml"),
874            format!("project: test\nnext_id: 1\n{}", run_line),
875        )
876        .unwrap();
877    }
878
879    fn default_args() -> RunArgs {
880        RunArgs {
881            id: None,
882            jobs: 4,
883            dry_run: false,
884            loop_mode: false,
885            auto_plan: false,
886            keep_going: false,
887            timeout: 30,
888            idle_timeout: 5,
889            json_stream: false,
890            review: false,
891        }
892    }
893
894    #[test]
895    fn cmd_run_errors_when_no_run_template_and_no_pi() {
896        let (_dir, mana_dir) = make_mana_dir();
897        write_config(&mana_dir, None);
898
899        let args = default_args();
900
901        let result = cmd_run(&mana_dir, args);
902        // With no template and no pi on PATH, should error
903        // (The exact error depends on whether pi is installed)
904        // In CI/test without pi, it should bail
905        if !pi_available() && !imp_available() {
906            assert!(result.is_err());
907            let err = result.unwrap_err().to_string();
908            assert!(
909                err.contains("No agent configured") || err.contains("not found"),
910                "Error should mention missing agent: {}",
911                err
912            );
913        }
914    }
915
916    #[test]
917    fn dry_run_does_not_spawn() {
918        let (_dir, mana_dir) = make_mana_dir();
919        write_config(&mana_dir, Some("echo {id}"));
920
921        // Create a ready unit
922        let mut unit = crate::unit::Unit::new("1", "Test unit");
923        unit.verify = Some("echo ok".to_string());
924        unit.to_file(mana_dir.join("1-test.md")).unwrap();
925
926        let args = RunArgs {
927            dry_run: true,
928            ..default_args()
929        };
930
931        // dry_run should succeed without spawning any processes
932        let result = cmd_run(&mana_dir, args);
933        assert!(result.is_ok());
934    }
935
936    #[test]
937    fn dry_run_with_json_stream() {
938        let (_dir, mana_dir) = make_mana_dir();
939        write_config(&mana_dir, Some("echo {id}"));
940
941        let mut unit = crate::unit::Unit::new("1", "Test unit");
942        unit.verify = Some("echo ok".to_string());
943        unit.to_file(mana_dir.join("1-test.md")).unwrap();
944
945        let args = RunArgs {
946            dry_run: true,
947            json_stream: true,
948            ..default_args()
949        };
950
951        // Should succeed and emit JSON events (captured to stdout)
952        let result = cmd_run(&mana_dir, args);
953        assert!(result.is_ok());
954    }
955
956    #[test]
957    fn format_duration_formats_correctly() {
958        assert_eq!(format_duration(Duration::from_secs(0)), "0:00");
959        assert_eq!(format_duration(Duration::from_secs(32)), "0:32");
960        assert_eq!(format_duration(Duration::from_secs(62)), "1:02");
961        assert_eq!(format_duration(Duration::from_secs(600)), "10:00");
962    }
963
964    #[test]
965    fn determine_spawn_mode_template_when_run_set() {
966        let config = Config {
967            project: "test".to_string(),
968            next_id: 1,
969            auto_close_parent: true,
970            run: Some("echo {id}".to_string()),
971            plan: Some("plan {id}".to_string()),
972            max_loops: 10,
973            max_concurrent: 4,
974            poll_interval: 30,
975            extends: vec![],
976            rules_file: None,
977            file_locking: false,
978            worktree: false,
979            on_close: None,
980            on_fail: None,
981            post_plan: None,
982            verify_timeout: None,
983            review: None,
984            user: None,
985            user_email: None,
986            auto_commit: false,
987            commit_template: None,
988            research: None,
989            run_model: None,
990            plan_model: None,
991            review_model: None,
992            research_model: None,
993            batch_verify: false,
994            memory_reserve_mb: 0,
995            notify: None,
996        };
997        let mode = determine_spawn_mode(&config);
998        assert_eq!(
999            mode,
1000            SpawnMode::Template {
1001                run_template: "echo {id}".to_string(),
1002                plan_template: Some("plan {id}".to_string()),
1003            }
1004        );
1005    }
1006
1007    #[test]
1008    fn determine_spawn_mode_direct_when_no_run() {
1009        let config = Config {
1010            project: "test".to_string(),
1011            next_id: 1,
1012            auto_close_parent: true,
1013            run: None,
1014            plan: None,
1015            max_loops: 10,
1016            max_concurrent: 4,
1017            poll_interval: 30,
1018            extends: vec![],
1019            rules_file: None,
1020            file_locking: false,
1021            worktree: false,
1022            on_close: None,
1023            on_fail: None,
1024            post_plan: None,
1025            verify_timeout: None,
1026            review: None,
1027            user: None,
1028            user_email: None,
1029            auto_commit: false,
1030            commit_template: None,
1031            research: None,
1032            run_model: None,
1033            plan_model: None,
1034            review_model: None,
1035            research_model: None,
1036            batch_verify: false,
1037            memory_reserve_mb: 0,
1038            notify: None,
1039        };
1040        let mode = determine_spawn_mode(&config);
1041        assert_eq!(mode, SpawnMode::Direct);
1042    }
1043
1044    #[test]
1045    fn agent_result_tracks_tokens_and_cost() {
1046        let result = AgentResult {
1047            id: "1".to_string(),
1048            title: "Test".to_string(),
1049            action: UnitAction::Implement,
1050            success: true,
1051            duration: Duration::from_secs(10),
1052            total_tokens: Some(5000),
1053            total_cost: Some(0.03),
1054            error: None,
1055            tool_count: 5,
1056            turns: 2,
1057            failure_summary: None,
1058        };
1059        assert_eq!(result.total_tokens, Some(5000));
1060        assert_eq!(result.total_cost, Some(0.03));
1061    }
1062
1063    #[test]
1064    fn collect_decision_warnings_only_returns_dispatch_units_with_decisions() {
1065        let (_dir, mana_dir) = make_mana_dir();
1066        write_config(&mana_dir, Some("echo {id}"));
1067
1068        let mut unit1 = crate::unit::Unit::new("1", "Has decisions");
1069        unit1.verify = Some("echo ok".to_string());
1070        unit1.decisions = vec!["JWT or session cookies?".to_string()];
1071        unit1.to_file(mana_dir.join("1-has-decisions.md")).unwrap();
1072
1073        let mut unit2 = crate::unit::Unit::new("2", "No decisions");
1074        unit2.verify = Some("echo ok".to_string());
1075        unit2.to_file(mana_dir.join("2-no-decisions.md")).unwrap();
1076
1077        let index = crate::index::Index::build(&mana_dir).unwrap();
1078        let units = vec![
1079            SizedUnit {
1080                id: "1".to_string(),
1081                title: "Has decisions".to_string(),
1082                action: UnitAction::Implement,
1083                priority: 2,
1084                dependencies: Vec::new(),
1085                parent: None,
1086                produces: Vec::new(),
1087                requires: Vec::new(),
1088                paths: Vec::new(),
1089                model: None,
1090            },
1091            SizedUnit {
1092                id: "2".to_string(),
1093                title: "No decisions".to_string(),
1094                action: UnitAction::Implement,
1095                priority: 2,
1096                dependencies: Vec::new(),
1097                parent: None,
1098                produces: Vec::new(),
1099                requires: Vec::new(),
1100                paths: Vec::new(),
1101                model: None,
1102            },
1103        ];
1104
1105        let warnings = collect_decision_warnings(&mana_dir, &units, &index).unwrap();
1106        assert_eq!(warnings.len(), 1);
1107        assert_eq!(warnings[0].id, "1");
1108        assert_eq!(warnings[0].decisions, vec!["JWT or session cookies?"]);
1109    }
1110
1111    #[test]
1112    fn format_decision_warning_message_matches_single_unit_prompt() {
1113        let message = format_decision_warning_message(&[DecisionWarning {
1114            id: "42".to_string(),
1115            title: "Implement auth".to_string(),
1116            decisions: vec![
1117                "JWT or session cookies?".to_string(),
1118                "Which JWT library?".to_string(),
1119            ],
1120        }]);
1121
1122        assert!(message.contains("⚠ Unit 42 has 2 unresolved decisions"));
1123        assert!(message.contains("0: JWT or session cookies?"));
1124        assert!(message.contains("1: Which JWT library?"));
1125    }
1126
1127    #[test]
1128    fn signal_flag_defaults_to_false() {
1129        SHUTDOWN_REQUESTED.store(false, Ordering::SeqCst);
1130        assert!(!shutdown_requested());
1131    }
1132
1133    #[test]
1134    fn signal_flag_can_be_toggled() {
1135        SHUTDOWN_REQUESTED.store(true, Ordering::SeqCst);
1136        assert!(shutdown_requested());
1137        // Reset for other tests
1138        SHUTDOWN_REQUESTED.store(false, Ordering::SeqCst);
1139        assert!(!shutdown_requested());
1140    }
1141
1142    #[test]
1143    fn child_pid_tracking() {
1144        // Clear any existing PIDs
1145        if let Ok(mut pids) = CHILD_PIDS.lock() {
1146            pids.clear();
1147        }
1148
1149        register_child_pid(1234);
1150        register_child_pid(5678);
1151
1152        let count = CHILD_PIDS.lock().unwrap().len();
1153        assert_eq!(count, 2);
1154
1155        unregister_child_pid(1234);
1156        let count = CHILD_PIDS.lock().unwrap().len();
1157        assert_eq!(count, 1);
1158
1159        // Unregister non-existent PID is a no-op
1160        unregister_child_pid(9999);
1161        let count = CHILD_PIDS.lock().unwrap().len();
1162        assert_eq!(count, 1);
1163
1164        unregister_child_pid(5678);
1165        let count = CHILD_PIDS.lock().unwrap().len();
1166        assert_eq!(count, 0);
1167    }
1168}