Skip to main content

bn/commands/run/
mod.rs

1//! `bn run` — Dispatch ready beans to agents.
2//!
3//! Finds ready beans, groups them into waves by dependency order,
4//! and spawns agents for each wave.
5//!
6//! Modes:
7//! - `bn run` — one-shot: dispatch all ready beans, then exit
8//! - `bn run 5.1` — dispatch a single bean (or its ready children if parent)
9//! - `bn run --dry-run` — show plan without spawning
10//! - `bn run --loop` — keep running until no ready beans remain
11//! - `bn run --json-stream` — emit JSON stream events to stdout
12//!
13//! Spawning modes:
14//! - **Template mode** (backward compat): If `config.run` is set, spawn via `sh -c <template>`.
15//! - **Direct mode**: If no template is configured but `pi` is on PATH, spawn pi directly
16//!   with `--mode json --print --no-session`, monitoring with timeouts and parsing events.
17
18mod plan;
19mod ready_queue;
20mod wave;
21
22pub use plan::{DispatchPlan, SizedBean};
23pub use wave::Wave;
24
25use std::fmt;
26use std::path::{Path, PathBuf};
27use std::process::{Command, Stdio};
28use std::time::{Duration, Instant};
29
30use anyhow::Result;
31
32use crate::commands::review::{cmd_review, ReviewArgs};
33use crate::config::Config;
34use crate::stream::{self, StreamEvent};
35
36use plan::{plan_dispatch, print_plan, print_plan_json};
37use ready_queue::run_ready_queue_direct;
38use wave::run_wave;
39
40/// Shared config passed to wave/ready-queue runners.
41pub(super) struct RunConfig {
42    pub max_jobs: usize,
43    pub timeout_minutes: u32,
44    pub idle_timeout_minutes: u32,
45    pub json_stream: bool,
46    pub file_locking: bool,
47}
48
49/// Arguments for cmd_run, matching the CLI definition.
50pub struct RunArgs {
51    pub id: Option<String>,
52    pub jobs: u32,
53    pub dry_run: bool,
54    pub loop_mode: bool,
55    pub auto_plan: bool,
56    pub keep_going: bool,
57    pub timeout: u32,
58    pub idle_timeout: u32,
59    pub json_stream: bool,
60    /// If true, run adversarial review after each successful bean close.
61    pub review: bool,
62}
63
64/// What action to take for a bean.
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum BeanAction {
67    Implement,
68    Plan,
69}
70
71impl fmt::Display for BeanAction {
72    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73        match self {
74            BeanAction::Implement => write!(f, "implement"),
75            BeanAction::Plan => write!(f, "plan"),
76        }
77    }
78}
79
80/// Result of a completed agent.
81#[derive(Debug)]
82struct AgentResult {
83    id: String,
84    title: String,
85    action: BeanAction,
86    success: bool,
87    duration: Duration,
88    total_tokens: Option<u64>,
89    total_cost: Option<f64>,
90    error: Option<String>,
91}
92
93/// Which spawning mode to use.
94#[derive(Debug, Clone, PartialEq, Eq)]
95enum SpawnMode {
96    /// Use shell template from config (backward compat).
97    Template {
98        run_template: String,
99        plan_template: Option<String>,
100    },
101    /// Spawn pi directly with JSON output and monitoring.
102    Direct,
103}
104
105/// Execute the `bn run` command.
106pub fn cmd_run(beans_dir: &Path, args: RunArgs) -> Result<()> {
107    // Determine spawn mode
108    let config = Config::load_with_extends(beans_dir)?;
109    let spawn_mode = determine_spawn_mode(&config);
110
111    if spawn_mode == SpawnMode::Direct && !pi_available() {
112        anyhow::bail!(
113            "No agent configured and `pi` not found on PATH.\n\n\
114             Either:\n  \
115               1. Install pi: npm i -g @anthropic/pi\n  \
116               2. Set a run template: bn config set run \"<command>\"\n\n\
117             The command template uses {{id}} as a placeholder for the bean ID.\n\n\
118             Examples:\n  \
119               bn config set run \"pi @.beans/{{id}}-*.md 'implement and bn close {{id}}'\"\n  \
120               bn config set run \"claude -p 'implement bean {{id}} and run bn close {{id}}'\""
121        );
122    }
123
124    if let SpawnMode::Template {
125        ref run_template, ..
126    } = spawn_mode
127    {
128        // Validate template exists (kept for backward compat error message)
129        let _ = run_template;
130    }
131
132    if args.loop_mode {
133        run_loop(beans_dir, &config, &args, &spawn_mode)
134    } else {
135        run_once(beans_dir, &config, &args, &spawn_mode)
136    }
137}
138
139/// Determine the spawn mode based on config.
140fn determine_spawn_mode(config: &Config) -> SpawnMode {
141    if let Some(ref run) = config.run {
142        SpawnMode::Template {
143            run_template: run.clone(),
144            plan_template: config.plan.clone(),
145        }
146    } else {
147        SpawnMode::Direct
148    }
149}
150
151/// Check if `pi` is available on PATH.
152fn pi_available() -> bool {
153    Command::new("pi")
154        .arg("--version")
155        .stdout(Stdio::null())
156        .stderr(Stdio::null())
157        .status()
158        .map(|s| s.success())
159        .unwrap_or(false)
160}
161
162/// Single dispatch pass: plan → print/execute → report.
163fn run_once(
164    beans_dir: &Path,
165    config: &Config,
166    args: &RunArgs,
167    spawn_mode: &SpawnMode,
168) -> Result<()> {
169    let plan = plan_dispatch(
170        beans_dir,
171        config,
172        args.id.as_deref(),
173        args.auto_plan,
174        args.dry_run,
175    )?;
176
177    if plan.waves.is_empty() && plan.skipped.is_empty() {
178        if args.json_stream {
179            stream::emit_error("No ready beans");
180        } else {
181            eprintln!("No ready beans. Use `bn status` to see what's going on.");
182        }
183        return Ok(());
184    }
185
186    if args.dry_run {
187        if args.json_stream {
188            print_plan_json(&plan, args.id.as_deref());
189        } else {
190            print_plan(&plan);
191        }
192        return Ok(());
193    }
194
195    // Report skipped beans
196    if !plan.skipped.is_empty() && !args.json_stream {
197        eprintln!(
198            "{} bean(s) need planning, run `bn plan`:",
199            plan.skipped.len()
200        );
201        for sb in &plan.skipped {
202            eprintln!(
203                "  ⚠ {}  {}  ({}k tokens)",
204                sb.id,
205                sb.title,
206                sb.tokens / 1000
207            );
208        }
209        eprintln!();
210    }
211
212    let total_beans: usize = plan.waves.iter().map(|w| w.beans.len()).sum();
213    let total_waves = plan.waves.len();
214    let parent_id = args.id.as_deref().unwrap_or("all");
215
216    if args.json_stream {
217        let beans_info: Vec<stream::BeanInfo> = plan
218            .waves
219            .iter()
220            .enumerate()
221            .flat_map(|(wave_idx, wave)| {
222                wave.beans.iter().map(move |b| stream::BeanInfo {
223                    id: b.id.clone(),
224                    title: b.title.clone(),
225                    round: wave_idx + 1,
226                })
227            })
228            .collect();
229        stream::emit(&StreamEvent::RunStart {
230            parent_id: parent_id.to_string(),
231            total_beans,
232            total_rounds: total_waves,
233            beans: beans_info,
234        });
235    }
236
237    let run_cfg = RunConfig {
238        max_jobs: args.jobs.min(config.max_concurrent) as usize,
239        timeout_minutes: args.timeout,
240        idle_timeout_minutes: args.idle_timeout,
241        json_stream: args.json_stream,
242        file_locking: config.file_locking,
243    };
244    let run_start = Instant::now();
245    let total_done;
246    let total_failed;
247    let any_failed;
248    // Collect IDs of successfully closed beans for --review post-processing
249    let mut successful_ids: Vec<String> = Vec::new();
250
251    match spawn_mode {
252        SpawnMode::Direct => {
253            // Ready-queue: start each bean as soon as its specific deps finish
254            let (results, had_failure) = run_ready_queue_direct(
255                beans_dir,
256                &plan.all_beans,
257                &plan.index,
258                &run_cfg,
259                args.keep_going,
260            )?;
261
262            let mut done = 0u32;
263            let mut failed = 0u32;
264            for result in &results {
265                let duration = format_duration(result.duration);
266                if result.success {
267                    if args.json_stream {
268                        stream::emit(&StreamEvent::BeanDone {
269                            id: result.id.clone(),
270                            success: true,
271                            duration_secs: result.duration.as_secs(),
272                            error: None,
273                            total_tokens: result.total_tokens,
274                            total_cost: result.total_cost,
275                        });
276                    } else {
277                        eprintln!(
278                            "  ✓ {}  {}  {}  {}",
279                            result.id, result.title, result.action, duration
280                        );
281                    }
282                    done += 1;
283                    successful_ids.push(result.id.clone());
284                } else {
285                    if args.json_stream {
286                        stream::emit(&StreamEvent::BeanDone {
287                            id: result.id.clone(),
288                            success: false,
289                            duration_secs: result.duration.as_secs(),
290                            error: result.error.clone(),
291                            total_tokens: result.total_tokens,
292                            total_cost: result.total_cost,
293                        });
294                    } else {
295                        eprintln!(
296                            "  ✗ {}  {}  {}  {} (failed)",
297                            result.id, result.title, result.action, duration
298                        );
299                    }
300                    failed += 1;
301                }
302            }
303            total_done = done;
304            total_failed = failed;
305            any_failed = had_failure;
306        }
307
308        SpawnMode::Template { .. } => {
309            // Template mode: wave-based execution (legacy)
310            let mut done = 0u32;
311            let mut failed = 0u32;
312            let mut had_failure = false;
313
314            for (wave_idx, wave) in plan.waves.iter().enumerate() {
315                if args.json_stream {
316                    stream::emit(&StreamEvent::RoundStart {
317                        round: wave_idx + 1,
318                        total_rounds: total_waves,
319                        bean_count: wave.beans.len(),
320                    });
321                } else {
322                    eprintln!("Wave {}: {} bean(s)", wave_idx + 1, wave.beans.len());
323                }
324
325                let results = run_wave(beans_dir, &wave.beans, spawn_mode, &run_cfg, wave_idx + 1)?;
326
327                let mut wave_success = 0usize;
328                let mut wave_failed = 0usize;
329
330                for result in &results {
331                    let duration = format_duration(result.duration);
332                    if result.success {
333                        if args.json_stream {
334                            stream::emit(&StreamEvent::BeanDone {
335                                id: result.id.clone(),
336                                success: true,
337                                duration_secs: result.duration.as_secs(),
338                                error: None,
339                                total_tokens: result.total_tokens,
340                                total_cost: result.total_cost,
341                            });
342                        } else {
343                            eprintln!(
344                                "  ✓ {}  {}  {}  {}",
345                                result.id, result.title, result.action, duration
346                            );
347                        }
348                        done += 1;
349                        wave_success += 1;
350                        successful_ids.push(result.id.clone());
351                    } else {
352                        if args.json_stream {
353                            stream::emit(&StreamEvent::BeanDone {
354                                id: result.id.clone(),
355                                success: false,
356                                duration_secs: result.duration.as_secs(),
357                                error: result.error.clone(),
358                                total_tokens: result.total_tokens,
359                                total_cost: result.total_cost,
360                            });
361                        } else {
362                            eprintln!(
363                                "  ✗ {}  {}  {}  {} (failed)",
364                                result.id, result.title, result.action, duration
365                            );
366                        }
367                        failed += 1;
368                        wave_failed += 1;
369                        had_failure = true;
370                    }
371                }
372
373                if args.json_stream {
374                    stream::emit(&StreamEvent::RoundEnd {
375                        round: wave_idx + 1,
376                        success_count: wave_success,
377                        failed_count: wave_failed,
378                    });
379                }
380
381                if had_failure && !args.keep_going {
382                    break;
383                }
384            }
385
386            total_done = done;
387            total_failed = failed;
388            any_failed = had_failure;
389        }
390    }
391
392    // Trigger adversarial review for each successfully closed bean if --review is set.
393    // Review runs synchronously after all beans in this pass complete.
394    if args.review && !successful_ids.is_empty() {
395        for id in &successful_ids {
396            if !args.json_stream {
397                eprintln!("Review: checking {} ...", id);
398            }
399            if let Err(e) = cmd_review(
400                beans_dir,
401                ReviewArgs {
402                    id: id.clone(),
403                    model: None,
404                    diff_only: false,
405                },
406            ) {
407                eprintln!("Review: warning — review of {} failed: {}", id, e);
408            }
409        }
410    }
411
412    if args.json_stream {
413        stream::emit(&StreamEvent::RunEnd {
414            total_success: total_done as usize,
415            total_failed: total_failed as usize,
416            duration_secs: run_start.elapsed().as_secs(),
417        });
418    } else {
419        eprintln!();
420        eprintln!(
421            "Summary: {} done, {} failed, {} skipped",
422            total_done,
423            total_failed,
424            plan.skipped.len()
425        );
426    }
427
428    if any_failed && !args.keep_going {
429        anyhow::bail!("Some agents failed");
430    }
431
432    Ok(())
433}
434
435/// Loop mode: keep dispatching until no ready beans remain.
436fn run_loop(
437    beans_dir: &Path,
438    config: &Config,
439    args: &RunArgs,
440    _spawn_mode: &SpawnMode,
441) -> Result<()> {
442    let max_loops = if config.max_loops == 0 {
443        u32::MAX
444    } else {
445        config.max_loops
446    };
447
448    for iteration in 0..max_loops {
449        if iteration > 0 && !args.json_stream {
450            eprintln!("\n--- Loop iteration {} ---\n", iteration + 1);
451        }
452
453        let plan = plan_dispatch(beans_dir, config, args.id.as_deref(), args.auto_plan, false)?;
454
455        if plan.waves.is_empty() {
456            if !args.json_stream {
457                if iteration == 0 {
458                    eprintln!("No ready beans. Use `bn status` to see what's going on.");
459                } else {
460                    eprintln!("No more ready beans. Stopping.");
461                }
462            }
463            return Ok(());
464        }
465
466        // Run one pass (non-loop, non-dry-run)
467        let inner_args = RunArgs {
468            id: args.id.clone(),
469            jobs: args.jobs,
470            dry_run: false,
471            loop_mode: false,
472            auto_plan: args.auto_plan,
473            keep_going: args.keep_going,
474            timeout: args.timeout,
475            idle_timeout: args.idle_timeout,
476            json_stream: args.json_stream,
477            review: args.review,
478        };
479
480        // Reload config each iteration (agents may have changed beans)
481        let config = Config::load_with_extends(beans_dir)?;
482        let spawn_mode = determine_spawn_mode(&config);
483        match run_once(beans_dir, &config, &inner_args, &spawn_mode) {
484            Ok(()) => {}
485            Err(e) => {
486                if args.keep_going {
487                    eprintln!("Warning: {}", e);
488                } else {
489                    return Err(e);
490                }
491            }
492        }
493    }
494
495    eprintln!("Reached max_loops ({}). Stopping.", max_loops);
496    Ok(())
497}
498
499/// Format a duration as M:SS.
500fn format_duration(d: Duration) -> String {
501    let secs = d.as_secs();
502    format!("{}:{:02}", secs / 60, secs % 60)
503}
504
505/// Find the bean file path. Public wrapper for use in other commands.
506pub fn find_bean_file(beans_dir: &Path, id: &str) -> Result<PathBuf> {
507    crate::discovery::find_bean_file(beans_dir, id)
508}
509
510#[cfg(test)]
511mod tests {
512    use super::*;
513    use std::fs;
514    use tempfile::TempDir;
515
516    fn make_beans_dir() -> (TempDir, std::path::PathBuf) {
517        let dir = TempDir::new().unwrap();
518        let beans_dir = dir.path().join(".beans");
519        fs::create_dir(&beans_dir).unwrap();
520        (dir, beans_dir)
521    }
522
523    fn write_config(beans_dir: &std::path::Path, run: Option<&str>) {
524        let run_line = match run {
525            Some(r) => format!("run: \"{}\"\n", r),
526            None => String::new(),
527        };
528        fs::write(
529            beans_dir.join("config.yaml"),
530            format!("project: test\nnext_id: 1\n{}", run_line),
531        )
532        .unwrap();
533    }
534
535    fn default_args() -> RunArgs {
536        RunArgs {
537            id: None,
538            jobs: 4,
539            dry_run: false,
540            loop_mode: false,
541            auto_plan: false,
542            keep_going: false,
543            timeout: 30,
544            idle_timeout: 5,
545            json_stream: false,
546            review: false,
547        }
548    }
549
550    #[test]
551    fn cmd_run_errors_when_no_run_template_and_no_pi() {
552        let (_dir, beans_dir) = make_beans_dir();
553        write_config(&beans_dir, None);
554
555        let args = default_args();
556
557        let result = cmd_run(&beans_dir, args);
558        // With no template and no pi on PATH, should error
559        // (The exact error depends on whether pi is installed)
560        // In CI/test without pi, it should bail
561        if !pi_available() {
562            assert!(result.is_err());
563            let err = result.unwrap_err().to_string();
564            assert!(
565                err.contains("No agent configured") || err.contains("not found"),
566                "Error should mention missing agent: {}",
567                err
568            );
569        }
570    }
571
572    #[test]
573    fn dry_run_does_not_spawn() {
574        let (_dir, beans_dir) = make_beans_dir();
575        write_config(&beans_dir, Some("echo {id}"));
576
577        // Create a ready bean
578        let mut bean = crate::bean::Bean::new("1", "Test bean");
579        bean.verify = Some("echo ok".to_string());
580        bean.to_file(beans_dir.join("1-test.md")).unwrap();
581
582        let args = RunArgs {
583            dry_run: true,
584            ..default_args()
585        };
586
587        // dry_run should succeed without spawning any processes
588        let result = cmd_run(&beans_dir, args);
589        assert!(result.is_ok());
590    }
591
592    #[test]
593    fn dry_run_with_json_stream() {
594        let (_dir, beans_dir) = make_beans_dir();
595        write_config(&beans_dir, Some("echo {id}"));
596
597        let mut bean = crate::bean::Bean::new("1", "Test bean");
598        bean.verify = Some("echo ok".to_string());
599        bean.to_file(beans_dir.join("1-test.md")).unwrap();
600
601        let args = RunArgs {
602            dry_run: true,
603            json_stream: true,
604            ..default_args()
605        };
606
607        // Should succeed and emit JSON events (captured to stdout)
608        let result = cmd_run(&beans_dir, args);
609        assert!(result.is_ok());
610    }
611
612    #[test]
613    fn format_duration_formats_correctly() {
614        assert_eq!(format_duration(Duration::from_secs(0)), "0:00");
615        assert_eq!(format_duration(Duration::from_secs(32)), "0:32");
616        assert_eq!(format_duration(Duration::from_secs(62)), "1:02");
617        assert_eq!(format_duration(Duration::from_secs(600)), "10:00");
618    }
619
620    #[test]
621    fn determine_spawn_mode_template_when_run_set() {
622        let config = Config {
623            project: "test".to_string(),
624            next_id: 1,
625            auto_close_parent: true,
626            max_tokens: 30000,
627            run: Some("echo {id}".to_string()),
628            plan: Some("plan {id}".to_string()),
629            max_loops: 10,
630            max_concurrent: 4,
631            poll_interval: 30,
632            extends: vec![],
633            rules_file: None,
634            file_locking: false,
635            on_close: None,
636            on_fail: None,
637            post_plan: None,
638            verify_timeout: None,
639            review: None,
640        };
641        let mode = determine_spawn_mode(&config);
642        assert_eq!(
643            mode,
644            SpawnMode::Template {
645                run_template: "echo {id}".to_string(),
646                plan_template: Some("plan {id}".to_string()),
647            }
648        );
649    }
650
651    #[test]
652    fn determine_spawn_mode_direct_when_no_run() {
653        let config = Config {
654            project: "test".to_string(),
655            next_id: 1,
656            auto_close_parent: true,
657            max_tokens: 30000,
658            run: None,
659            plan: None,
660            max_loops: 10,
661            max_concurrent: 4,
662            poll_interval: 30,
663            extends: vec![],
664            rules_file: None,
665            file_locking: false,
666            on_close: None,
667            on_fail: None,
668            post_plan: None,
669            verify_timeout: None,
670            review: None,
671        };
672        let mode = determine_spawn_mode(&config);
673        assert_eq!(mode, SpawnMode::Direct);
674    }
675
676    #[test]
677    fn agent_result_tracks_tokens_and_cost() {
678        let result = AgentResult {
679            id: "1".to_string(),
680            title: "Test".to_string(),
681            action: BeanAction::Implement,
682            success: true,
683            duration: Duration::from_secs(10),
684            total_tokens: Some(5000),
685            total_cost: Some(0.03),
686            error: None,
687        };
688        assert_eq!(result.total_tokens, Some(5000));
689        assert_eq!(result.total_cost, Some(0.03));
690    }
691}