Skip to main content

bn/commands/run/
mod.rs

1//! `bn run` — Dispatch ready beans to agents.
2//!
3//! Finds ready beans, groups them into waves by dependency order,
4//! and spawns agents for each wave.
5//!
6//! Modes:
7//! - `bn run` — one-shot: dispatch all ready beans, then exit
8//! - `bn run 5.1` — dispatch a single bean (or its ready children if parent)
9//! - `bn run --dry-run` — show plan without spawning
10//! - `bn run --loop` — keep running until no ready beans remain
11//! - `bn run --json-stream` — emit JSON stream events to stdout
12//!
13//! Spawning modes:
14//! - **Template mode** (backward compat): If `config.run` is set, spawn via `sh -c <template>`.
15//! - **Direct mode**: If no template is configured but `pi` is on PATH, spawn pi directly
16//!   with `--mode json --print --no-session`, monitoring with timeouts and parsing events.
17
18mod plan;
19mod ready_queue;
20mod wave;
21
22pub use plan::{DispatchPlan, SizedBean};
23pub use wave::Wave;
24
25use std::fmt;
26use std::path::{Path, PathBuf};
27use std::process::{Command, Stdio};
28use std::time::{Duration, Instant};
29
30use anyhow::Result;
31
32use crate::commands::review::{cmd_review, ReviewArgs};
33use crate::config::Config;
34use crate::stream::{self, StreamEvent};
35
36use plan::{plan_dispatch, print_plan, print_plan_json};
37use ready_queue::run_ready_queue_direct;
38use wave::run_wave;
39
40/// Shared config passed to wave/ready-queue runners.
41pub(super) struct RunConfig {
42    pub max_jobs: usize,
43    pub timeout_minutes: u32,
44    pub idle_timeout_minutes: u32,
45    pub json_stream: bool,
46    pub file_locking: bool,
47}
48
49/// Arguments for cmd_run, matching the CLI definition.
50pub struct RunArgs {
51    pub id: Option<String>,
52    pub jobs: u32,
53    pub dry_run: bool,
54    pub loop_mode: bool,
55    pub auto_plan: bool,
56    pub keep_going: bool,
57    pub timeout: u32,
58    pub idle_timeout: u32,
59    pub json_stream: bool,
60    /// If true, run adversarial review after each successful bean close.
61    pub review: bool,
62}
63
64/// What action to take for a bean.
65#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum BeanAction {
67    Implement,
68}
69
70impl fmt::Display for BeanAction {
71    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72        match self {
73            BeanAction::Implement => write!(f, "implement"),
74        }
75    }
76}
77
78/// Result of a completed agent.
79#[derive(Debug)]
80struct AgentResult {
81    id: String,
82    title: String,
83    action: BeanAction,
84    success: bool,
85    duration: Duration,
86    total_tokens: Option<u64>,
87    total_cost: Option<f64>,
88    error: Option<String>,
89    tool_count: usize,
90    turns: usize,
91    failure_summary: Option<String>,
92}
93
94/// Which spawning mode to use.
95#[derive(Debug, Clone, PartialEq, Eq)]
96enum SpawnMode {
97    /// Use shell template from config (backward compat).
98    Template {
99        run_template: String,
100        plan_template: Option<String>,
101    },
102    /// Spawn pi directly with JSON output and monitoring.
103    Direct,
104}
105
106/// Execute the `bn run` command.
107pub fn cmd_run(beans_dir: &Path, args: RunArgs) -> Result<()> {
108    // Determine spawn mode
109    let config = Config::load_with_extends(beans_dir)?;
110    let spawn_mode = determine_spawn_mode(&config);
111
112    if spawn_mode == SpawnMode::Direct && !pi_available() {
113        anyhow::bail!(
114            "No agent configured and `pi` not found on PATH.\n\n\
115             Either:\n  \
116               1. Install pi: npm i -g @anthropic/pi\n  \
117               2. Set a run template: bn config set run \"<command>\"\n\n\
118             The command template uses {{id}} as a placeholder for the bean ID.\n\n\
119             Examples:\n  \
120               bn config set run \"pi @.beans/{{id}}-*.md 'implement and bn close {{id}}'\"\n  \
121               bn config set run \"claude -p 'implement bean {{id}} and run bn close {{id}}'\""
122        );
123    }
124
125    if let SpawnMode::Template {
126        ref run_template, ..
127    } = spawn_mode
128    {
129        // Validate template exists (kept for backward compat error message)
130        let _ = run_template;
131    }
132
133    if args.loop_mode {
134        run_loop(beans_dir, &config, &args, &spawn_mode)
135    } else {
136        run_once(beans_dir, &config, &args, &spawn_mode)
137    }
138}
139
140/// Determine the spawn mode based on config.
141fn determine_spawn_mode(config: &Config) -> SpawnMode {
142    if let Some(ref run) = config.run {
143        SpawnMode::Template {
144            run_template: run.clone(),
145            plan_template: config.plan.clone(),
146        }
147    } else {
148        SpawnMode::Direct
149    }
150}
151
152/// Check if `pi` is available on PATH.
153fn pi_available() -> bool {
154    Command::new("pi")
155        .arg("--version")
156        .stdout(Stdio::null())
157        .stderr(Stdio::null())
158        .status()
159        .map(|s| s.success())
160        .unwrap_or(false)
161}
162
163/// Single dispatch pass: plan → print/execute → report.
164fn run_once(
165    beans_dir: &Path,
166    config: &Config,
167    args: &RunArgs,
168    spawn_mode: &SpawnMode,
169) -> Result<()> {
170    let plan = plan_dispatch(
171        beans_dir,
172        config,
173        args.id.as_deref(),
174        args.auto_plan,
175        args.dry_run,
176    )?;
177
178    if plan.waves.is_empty() && plan.skipped.is_empty() {
179        if args.json_stream {
180            stream::emit_error("No ready beans");
181        } else {
182            eprintln!("No ready beans. Use `bn status` to see what's going on.");
183        }
184        return Ok(());
185    }
186
187    if args.dry_run {
188        if args.json_stream {
189            print_plan_json(&plan, args.id.as_deref());
190        } else {
191            print_plan(&plan);
192        }
193        return Ok(());
194    }
195
196    // Report blocked beans (oversized/unscoped)
197    if !plan.skipped.is_empty() && !args.json_stream {
198        eprintln!("{} bean(s) blocked:", plan.skipped.len());
199        for bb in &plan.skipped {
200            eprintln!("  ⚠ {}  {}  ({})", bb.id, bb.title, bb.reason);
201        }
202        eprintln!();
203    }
204
205    let total_beans: usize = plan.waves.iter().map(|w| w.beans.len()).sum();
206    let total_waves = plan.waves.len();
207    let parent_id = args.id.as_deref().unwrap_or("all");
208
209    if args.json_stream {
210        let beans_info: Vec<stream::BeanInfo> = plan
211            .waves
212            .iter()
213            .enumerate()
214            .flat_map(|(wave_idx, wave)| {
215                wave.beans.iter().map(move |b| stream::BeanInfo {
216                    id: b.id.clone(),
217                    title: b.title.clone(),
218                    round: wave_idx + 1,
219                })
220            })
221            .collect();
222        stream::emit(&StreamEvent::RunStart {
223            parent_id: parent_id.to_string(),
224            total_beans,
225            total_rounds: total_waves,
226            beans: beans_info,
227        });
228    }
229
230    let run_cfg = RunConfig {
231        max_jobs: args.jobs.min(config.max_concurrent) as usize,
232        timeout_minutes: args.timeout,
233        idle_timeout_minutes: args.idle_timeout,
234        json_stream: args.json_stream,
235        file_locking: config.file_locking,
236    };
237    let run_start = Instant::now();
238    let total_done;
239    let total_failed;
240    let any_failed;
241    // Collect IDs of successfully closed beans for --review post-processing
242    let mut successful_ids: Vec<String> = Vec::new();
243
244    match spawn_mode {
245        SpawnMode::Direct => {
246            if !args.json_stream {
247                eprintln!("Dispatching {} bean(s)...", total_beans);
248            }
249
250            // Ready-queue: start each bean as soon as its specific deps finish.
251            // Progress (▸ start, ✓/✗ done) is printed in real-time by the queue.
252            let (results, had_failure) = run_ready_queue_direct(
253                beans_dir,
254                &plan.all_beans,
255                &plan.index,
256                &run_cfg,
257                args.keep_going,
258            )?;
259
260            let mut done = 0u32;
261            let mut failed = 0u32;
262            for result in &results {
263                if result.success {
264                    if args.json_stream {
265                        stream::emit(&StreamEvent::BeanDone {
266                            id: result.id.clone(),
267                            success: true,
268                            duration_secs: result.duration.as_secs(),
269                            error: None,
270                            total_tokens: result.total_tokens,
271                            total_cost: result.total_cost,
272                            tool_count: Some(result.tool_count),
273                            turns: Some(result.turns),
274                            failure_summary: None,
275                        });
276                    }
277                    done += 1;
278                    successful_ids.push(result.id.clone());
279                } else {
280                    if args.json_stream {
281                        stream::emit(&StreamEvent::BeanDone {
282                            id: result.id.clone(),
283                            success: false,
284                            duration_secs: result.duration.as_secs(),
285                            error: result.error.clone(),
286                            total_tokens: result.total_tokens,
287                            total_cost: result.total_cost,
288                            tool_count: Some(result.tool_count),
289                            turns: Some(result.turns),
290                            failure_summary: result.failure_summary.clone(),
291                        });
292                    }
293                    failed += 1;
294                }
295            }
296            total_done = done;
297            total_failed = failed;
298            any_failed = had_failure;
299        }
300
301        SpawnMode::Template { .. } => {
302            // Template mode: wave-based execution (legacy)
303            let mut done = 0u32;
304            let mut failed = 0u32;
305            let mut had_failure = false;
306
307            for (wave_idx, wave) in plan.waves.iter().enumerate() {
308                if args.json_stream {
309                    stream::emit(&StreamEvent::RoundStart {
310                        round: wave_idx + 1,
311                        total_rounds: total_waves,
312                        bean_count: wave.beans.len(),
313                    });
314                } else {
315                    eprintln!("Wave {}: {} bean(s)", wave_idx + 1, wave.beans.len());
316                }
317
318                let results = run_wave(beans_dir, &wave.beans, spawn_mode, &run_cfg, wave_idx + 1)?;
319
320                let mut wave_success = 0usize;
321                let mut wave_failed = 0usize;
322
323                for result in &results {
324                    let duration = format_duration(result.duration);
325                    if result.success {
326                        if args.json_stream {
327                            stream::emit(&StreamEvent::BeanDone {
328                                id: result.id.clone(),
329                                success: true,
330                                duration_secs: result.duration.as_secs(),
331                                error: None,
332                                total_tokens: result.total_tokens,
333                                total_cost: result.total_cost,
334                                tool_count: Some(result.tool_count),
335                                turns: Some(result.turns),
336                                failure_summary: None,
337                            });
338                        } else {
339                            eprintln!(
340                                "  ✓ {}  {}  {}  {}",
341                                result.id, result.title, result.action, duration
342                            );
343                        }
344                        done += 1;
345                        wave_success += 1;
346                        successful_ids.push(result.id.clone());
347                    } else {
348                        if args.json_stream {
349                            stream::emit(&StreamEvent::BeanDone {
350                                id: result.id.clone(),
351                                success: false,
352                                duration_secs: result.duration.as_secs(),
353                                error: result.error.clone(),
354                                total_tokens: result.total_tokens,
355                                total_cost: result.total_cost,
356                                tool_count: Some(result.tool_count),
357                                turns: Some(result.turns),
358                                failure_summary: result.failure_summary.clone(),
359                            });
360                        } else {
361                            eprintln!(
362                                "  ✗ {}  {}  {}  {} (failed)",
363                                result.id, result.title, result.action, duration
364                            );
365                        }
366                        failed += 1;
367                        wave_failed += 1;
368                        had_failure = true;
369                    }
370                }
371
372                if args.json_stream {
373                    stream::emit(&StreamEvent::RoundEnd {
374                        round: wave_idx + 1,
375                        success_count: wave_success,
376                        failed_count: wave_failed,
377                    });
378                }
379
380                if had_failure && !args.keep_going {
381                    break;
382                }
383            }
384
385            total_done = done;
386            total_failed = failed;
387            any_failed = had_failure;
388        }
389    }
390
391    // Trigger adversarial review for each successfully closed bean if --review is set.
392    // Review runs synchronously after all beans in this pass complete.
393    if args.review && !successful_ids.is_empty() {
394        for id in &successful_ids {
395            if !args.json_stream {
396                eprintln!("Review: checking {} ...", id);
397            }
398            if let Err(e) = cmd_review(
399                beans_dir,
400                ReviewArgs {
401                    id: id.clone(),
402                    model: None,
403                    diff_only: false,
404                },
405            ) {
406                eprintln!("Review: warning — review of {} failed: {}", id, e);
407            }
408        }
409    }
410
411    if args.json_stream {
412        stream::emit(&StreamEvent::RunEnd {
413            total_success: total_done as usize,
414            total_failed: total_failed as usize,
415            duration_secs: run_start.elapsed().as_secs(),
416        });
417    } else {
418        eprintln!();
419        eprintln!(
420            "Summary: {} done, {} failed, {} skipped",
421            total_done,
422            total_failed,
423            plan.skipped.len()
424        );
425    }
426
427    if any_failed && !args.keep_going {
428        anyhow::bail!("Some agents failed");
429    }
430
431    Ok(())
432}
433
434/// Loop mode: keep dispatching until no ready beans remain.
435fn run_loop(
436    beans_dir: &Path,
437    config: &Config,
438    args: &RunArgs,
439    _spawn_mode: &SpawnMode,
440) -> Result<()> {
441    let max_loops = if config.max_loops == 0 {
442        u32::MAX
443    } else {
444        config.max_loops
445    };
446
447    for iteration in 0..max_loops {
448        if iteration > 0 && !args.json_stream {
449            eprintln!("\n--- Loop iteration {} ---\n", iteration + 1);
450        }
451
452        let plan = plan_dispatch(beans_dir, config, args.id.as_deref(), args.auto_plan, false)?;
453
454        if plan.waves.is_empty() {
455            if !args.json_stream {
456                if iteration == 0 {
457                    eprintln!("No ready beans. Use `bn status` to see what's going on.");
458                } else {
459                    eprintln!("No more ready beans. Stopping.");
460                }
461            }
462            return Ok(());
463        }
464
465        // Run one pass (non-loop, non-dry-run)
466        let inner_args = RunArgs {
467            id: args.id.clone(),
468            jobs: args.jobs,
469            dry_run: false,
470            loop_mode: false,
471            auto_plan: args.auto_plan,
472            keep_going: args.keep_going,
473            timeout: args.timeout,
474            idle_timeout: args.idle_timeout,
475            json_stream: args.json_stream,
476            review: args.review,
477        };
478
479        // Reload config each iteration (agents may have changed beans)
480        let config = Config::load_with_extends(beans_dir)?;
481        let spawn_mode = determine_spawn_mode(&config);
482        match run_once(beans_dir, &config, &inner_args, &spawn_mode) {
483            Ok(()) => {}
484            Err(e) => {
485                if args.keep_going {
486                    eprintln!("Warning: {}", e);
487                } else {
488                    return Err(e);
489                }
490            }
491        }
492    }
493
494    eprintln!("Reached max_loops ({}). Stopping.", max_loops);
495    Ok(())
496}
497
498/// Format a duration as M:SS.
499pub(super) fn format_duration(d: Duration) -> String {
500    let secs = d.as_secs();
501    format!("{}:{:02}", secs / 60, secs % 60)
502}
503
504/// Find the bean file path. Public wrapper for use in other commands.
505pub fn find_bean_file(beans_dir: &Path, id: &str) -> Result<PathBuf> {
506    crate::discovery::find_bean_file(beans_dir, id)
507}
508
509#[cfg(test)]
510mod tests {
511    use super::*;
512    use std::fs;
513    use tempfile::TempDir;
514
515    fn make_beans_dir() -> (TempDir, std::path::PathBuf) {
516        let dir = TempDir::new().unwrap();
517        let beans_dir = dir.path().join(".beans");
518        fs::create_dir(&beans_dir).unwrap();
519        (dir, beans_dir)
520    }
521
522    fn write_config(beans_dir: &std::path::Path, run: Option<&str>) {
523        let run_line = match run {
524            Some(r) => format!("run: \"{}\"\n", r),
525            None => String::new(),
526        };
527        fs::write(
528            beans_dir.join("config.yaml"),
529            format!("project: test\nnext_id: 1\n{}", run_line),
530        )
531        .unwrap();
532    }
533
534    fn default_args() -> RunArgs {
535        RunArgs {
536            id: None,
537            jobs: 4,
538            dry_run: false,
539            loop_mode: false,
540            auto_plan: false,
541            keep_going: false,
542            timeout: 30,
543            idle_timeout: 5,
544            json_stream: false,
545            review: false,
546        }
547    }
548
549    #[test]
550    fn cmd_run_errors_when_no_run_template_and_no_pi() {
551        let (_dir, beans_dir) = make_beans_dir();
552        write_config(&beans_dir, None);
553
554        let args = default_args();
555
556        let result = cmd_run(&beans_dir, args);
557        // With no template and no pi on PATH, should error
558        // (The exact error depends on whether pi is installed)
559        // In CI/test without pi, it should bail
560        if !pi_available() {
561            assert!(result.is_err());
562            let err = result.unwrap_err().to_string();
563            assert!(
564                err.contains("No agent configured") || err.contains("not found"),
565                "Error should mention missing agent: {}",
566                err
567            );
568        }
569    }
570
571    #[test]
572    fn dry_run_does_not_spawn() {
573        let (_dir, beans_dir) = make_beans_dir();
574        write_config(&beans_dir, Some("echo {id}"));
575
576        // Create a ready bean
577        let mut bean = crate::bean::Bean::new("1", "Test bean");
578        bean.verify = Some("echo ok".to_string());
579        bean.to_file(beans_dir.join("1-test.md")).unwrap();
580
581        let args = RunArgs {
582            dry_run: true,
583            ..default_args()
584        };
585
586        // dry_run should succeed without spawning any processes
587        let result = cmd_run(&beans_dir, args);
588        assert!(result.is_ok());
589    }
590
591    #[test]
592    fn dry_run_with_json_stream() {
593        let (_dir, beans_dir) = make_beans_dir();
594        write_config(&beans_dir, Some("echo {id}"));
595
596        let mut bean = crate::bean::Bean::new("1", "Test bean");
597        bean.verify = Some("echo ok".to_string());
598        bean.to_file(beans_dir.join("1-test.md")).unwrap();
599
600        let args = RunArgs {
601            dry_run: true,
602            json_stream: true,
603            ..default_args()
604        };
605
606        // Should succeed and emit JSON events (captured to stdout)
607        let result = cmd_run(&beans_dir, args);
608        assert!(result.is_ok());
609    }
610
611    #[test]
612    fn format_duration_formats_correctly() {
613        assert_eq!(format_duration(Duration::from_secs(0)), "0:00");
614        assert_eq!(format_duration(Duration::from_secs(32)), "0:32");
615        assert_eq!(format_duration(Duration::from_secs(62)), "1:02");
616        assert_eq!(format_duration(Duration::from_secs(600)), "10:00");
617    }
618
619    #[test]
620    fn determine_spawn_mode_template_when_run_set() {
621        let config = Config {
622            project: "test".to_string(),
623            next_id: 1,
624            auto_close_parent: true,
625            run: Some("echo {id}".to_string()),
626            plan: Some("plan {id}".to_string()),
627            max_loops: 10,
628            max_concurrent: 4,
629            poll_interval: 30,
630            extends: vec![],
631            rules_file: None,
632            file_locking: false,
633            worktree: false,
634            on_close: None,
635            on_fail: None,
636            post_plan: None,
637            verify_timeout: None,
638            review: None,
639            user: None,
640            user_email: None,
641        };
642        let mode = determine_spawn_mode(&config);
643        assert_eq!(
644            mode,
645            SpawnMode::Template {
646                run_template: "echo {id}".to_string(),
647                plan_template: Some("plan {id}".to_string()),
648            }
649        );
650    }
651
652    #[test]
653    fn determine_spawn_mode_direct_when_no_run() {
654        let config = Config {
655            project: "test".to_string(),
656            next_id: 1,
657            auto_close_parent: true,
658            run: None,
659            plan: None,
660            max_loops: 10,
661            max_concurrent: 4,
662            poll_interval: 30,
663            extends: vec![],
664            rules_file: None,
665            file_locking: false,
666            worktree: false,
667            on_close: None,
668            on_fail: None,
669            post_plan: None,
670            verify_timeout: None,
671            review: None,
672            user: None,
673            user_email: None,
674        };
675        let mode = determine_spawn_mode(&config);
676        assert_eq!(mode, SpawnMode::Direct);
677    }
678
679    #[test]
680    fn agent_result_tracks_tokens_and_cost() {
681        let result = AgentResult {
682            id: "1".to_string(),
683            title: "Test".to_string(),
684            action: BeanAction::Implement,
685            success: true,
686            duration: Duration::from_secs(10),
687            total_tokens: Some(5000),
688            total_cost: Some(0.03),
689            error: None,
690            tool_count: 5,
691            turns: 2,
692            failure_summary: None,
693        };
694        assert_eq!(result.total_tokens, Some(5000));
695        assert_eq!(result.total_cost, Some(0.03));
696    }
697}