1mod plan;
19mod ready_queue;
20mod wave;
21
22pub use plan::{DispatchPlan, SizedBean};
23pub use wave::Wave;
24
25use std::fmt;
26use std::path::{Path, PathBuf};
27use std::process::{Command, Stdio};
28use std::time::{Duration, Instant};
29
30use anyhow::Result;
31
32use crate::commands::review::{cmd_review, ReviewArgs};
33use crate::config::Config;
34use crate::stream::{self, StreamEvent};
35
36use plan::{plan_dispatch, print_plan, print_plan_json};
37use ready_queue::run_ready_queue_direct;
38use wave::run_wave;
39
40pub(super) struct RunConfig {
42 pub max_jobs: usize,
43 pub timeout_minutes: u32,
44 pub idle_timeout_minutes: u32,
45 pub json_stream: bool,
46 pub file_locking: bool,
47}
48
49pub struct RunArgs {
51 pub id: Option<String>,
52 pub jobs: u32,
53 pub dry_run: bool,
54 pub loop_mode: bool,
55 pub auto_plan: bool,
56 pub keep_going: bool,
57 pub timeout: u32,
58 pub idle_timeout: u32,
59 pub json_stream: bool,
60 pub review: bool,
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum BeanAction {
67 Implement,
68 Plan,
69}
70
71impl fmt::Display for BeanAction {
72 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
73 match self {
74 BeanAction::Implement => write!(f, "implement"),
75 BeanAction::Plan => write!(f, "plan"),
76 }
77 }
78}
79
80#[derive(Debug)]
82struct AgentResult {
83 id: String,
84 title: String,
85 action: BeanAction,
86 success: bool,
87 duration: Duration,
88 total_tokens: Option<u64>,
89 total_cost: Option<f64>,
90 error: Option<String>,
91}
92
93#[derive(Debug, Clone, PartialEq, Eq)]
95enum SpawnMode {
96 Template {
98 run_template: String,
99 plan_template: Option<String>,
100 },
101 Direct,
103}
104
105pub fn cmd_run(beans_dir: &Path, args: RunArgs) -> Result<()> {
107 let config = Config::load_with_extends(beans_dir)?;
109 let spawn_mode = determine_spawn_mode(&config);
110
111 if spawn_mode == SpawnMode::Direct && !pi_available() {
112 anyhow::bail!(
113 "No agent configured and `pi` not found on PATH.\n\n\
114 Either:\n \
115 1. Install pi: npm i -g @anthropic/pi\n \
116 2. Set a run template: bn config set run \"<command>\"\n\n\
117 The command template uses {{id}} as a placeholder for the bean ID.\n\n\
118 Examples:\n \
119 bn config set run \"pi @.beans/{{id}}-*.md 'implement and bn close {{id}}'\"\n \
120 bn config set run \"claude -p 'implement bean {{id}} and run bn close {{id}}'\""
121 );
122 }
123
124 if let SpawnMode::Template {
125 ref run_template, ..
126 } = spawn_mode
127 {
128 let _ = run_template;
130 }
131
132 if args.loop_mode {
133 run_loop(beans_dir, &config, &args, &spawn_mode)
134 } else {
135 run_once(beans_dir, &config, &args, &spawn_mode)
136 }
137}
138
139fn determine_spawn_mode(config: &Config) -> SpawnMode {
141 if let Some(ref run) = config.run {
142 SpawnMode::Template {
143 run_template: run.clone(),
144 plan_template: config.plan.clone(),
145 }
146 } else {
147 SpawnMode::Direct
148 }
149}
150
151fn pi_available() -> bool {
153 Command::new("pi")
154 .arg("--version")
155 .stdout(Stdio::null())
156 .stderr(Stdio::null())
157 .status()
158 .map(|s| s.success())
159 .unwrap_or(false)
160}
161
162fn run_once(
164 beans_dir: &Path,
165 config: &Config,
166 args: &RunArgs,
167 spawn_mode: &SpawnMode,
168) -> Result<()> {
169 let plan = plan_dispatch(
170 beans_dir,
171 config,
172 args.id.as_deref(),
173 args.auto_plan,
174 args.dry_run,
175 )?;
176
177 if plan.waves.is_empty() && plan.skipped.is_empty() {
178 if args.json_stream {
179 stream::emit_error("No ready beans");
180 } else {
181 eprintln!("No ready beans. Use `bn status` to see what's going on.");
182 }
183 return Ok(());
184 }
185
186 if args.dry_run {
187 if args.json_stream {
188 print_plan_json(&plan, args.id.as_deref());
189 } else {
190 print_plan(&plan);
191 }
192 return Ok(());
193 }
194
195 if !plan.skipped.is_empty() && !args.json_stream {
197 eprintln!(
198 "{} bean(s) need planning, run `bn plan`:",
199 plan.skipped.len()
200 );
201 for sb in &plan.skipped {
202 eprintln!(
203 " ⚠ {} {} ({}k tokens)",
204 sb.id,
205 sb.title,
206 sb.tokens / 1000
207 );
208 }
209 eprintln!();
210 }
211
212 let total_beans: usize = plan.waves.iter().map(|w| w.beans.len()).sum();
213 let total_waves = plan.waves.len();
214 let parent_id = args.id.as_deref().unwrap_or("all");
215
216 if args.json_stream {
217 let beans_info: Vec<stream::BeanInfo> = plan
218 .waves
219 .iter()
220 .enumerate()
221 .flat_map(|(wave_idx, wave)| {
222 wave.beans.iter().map(move |b| stream::BeanInfo {
223 id: b.id.clone(),
224 title: b.title.clone(),
225 round: wave_idx + 1,
226 })
227 })
228 .collect();
229 stream::emit(&StreamEvent::RunStart {
230 parent_id: parent_id.to_string(),
231 total_beans,
232 total_rounds: total_waves,
233 beans: beans_info,
234 });
235 }
236
237 let run_cfg = RunConfig {
238 max_jobs: args.jobs.min(config.max_concurrent) as usize,
239 timeout_minutes: args.timeout,
240 idle_timeout_minutes: args.idle_timeout,
241 json_stream: args.json_stream,
242 file_locking: config.file_locking,
243 };
244 let run_start = Instant::now();
245 let total_done;
246 let total_failed;
247 let any_failed;
248 let mut successful_ids: Vec<String> = Vec::new();
250
251 match spawn_mode {
252 SpawnMode::Direct => {
253 let (results, had_failure) = run_ready_queue_direct(
255 beans_dir,
256 &plan.all_beans,
257 &plan.index,
258 &run_cfg,
259 args.keep_going,
260 )?;
261
262 let mut done = 0u32;
263 let mut failed = 0u32;
264 for result in &results {
265 let duration = format_duration(result.duration);
266 if result.success {
267 if args.json_stream {
268 stream::emit(&StreamEvent::BeanDone {
269 id: result.id.clone(),
270 success: true,
271 duration_secs: result.duration.as_secs(),
272 error: None,
273 total_tokens: result.total_tokens,
274 total_cost: result.total_cost,
275 });
276 } else {
277 eprintln!(
278 " ✓ {} {} {} {}",
279 result.id, result.title, result.action, duration
280 );
281 }
282 done += 1;
283 successful_ids.push(result.id.clone());
284 } else {
285 if args.json_stream {
286 stream::emit(&StreamEvent::BeanDone {
287 id: result.id.clone(),
288 success: false,
289 duration_secs: result.duration.as_secs(),
290 error: result.error.clone(),
291 total_tokens: result.total_tokens,
292 total_cost: result.total_cost,
293 });
294 } else {
295 eprintln!(
296 " ✗ {} {} {} {} (failed)",
297 result.id, result.title, result.action, duration
298 );
299 }
300 failed += 1;
301 }
302 }
303 total_done = done;
304 total_failed = failed;
305 any_failed = had_failure;
306 }
307
308 SpawnMode::Template { .. } => {
309 let mut done = 0u32;
311 let mut failed = 0u32;
312 let mut had_failure = false;
313
314 for (wave_idx, wave) in plan.waves.iter().enumerate() {
315 if args.json_stream {
316 stream::emit(&StreamEvent::RoundStart {
317 round: wave_idx + 1,
318 total_rounds: total_waves,
319 bean_count: wave.beans.len(),
320 });
321 } else {
322 eprintln!("Wave {}: {} bean(s)", wave_idx + 1, wave.beans.len());
323 }
324
325 let results = run_wave(beans_dir, &wave.beans, spawn_mode, &run_cfg, wave_idx + 1)?;
326
327 let mut wave_success = 0usize;
328 let mut wave_failed = 0usize;
329
330 for result in &results {
331 let duration = format_duration(result.duration);
332 if result.success {
333 if args.json_stream {
334 stream::emit(&StreamEvent::BeanDone {
335 id: result.id.clone(),
336 success: true,
337 duration_secs: result.duration.as_secs(),
338 error: None,
339 total_tokens: result.total_tokens,
340 total_cost: result.total_cost,
341 });
342 } else {
343 eprintln!(
344 " ✓ {} {} {} {}",
345 result.id, result.title, result.action, duration
346 );
347 }
348 done += 1;
349 wave_success += 1;
350 successful_ids.push(result.id.clone());
351 } else {
352 if args.json_stream {
353 stream::emit(&StreamEvent::BeanDone {
354 id: result.id.clone(),
355 success: false,
356 duration_secs: result.duration.as_secs(),
357 error: result.error.clone(),
358 total_tokens: result.total_tokens,
359 total_cost: result.total_cost,
360 });
361 } else {
362 eprintln!(
363 " ✗ {} {} {} {} (failed)",
364 result.id, result.title, result.action, duration
365 );
366 }
367 failed += 1;
368 wave_failed += 1;
369 had_failure = true;
370 }
371 }
372
373 if args.json_stream {
374 stream::emit(&StreamEvent::RoundEnd {
375 round: wave_idx + 1,
376 success_count: wave_success,
377 failed_count: wave_failed,
378 });
379 }
380
381 if had_failure && !args.keep_going {
382 break;
383 }
384 }
385
386 total_done = done;
387 total_failed = failed;
388 any_failed = had_failure;
389 }
390 }
391
392 if args.review && !successful_ids.is_empty() {
395 for id in &successful_ids {
396 if !args.json_stream {
397 eprintln!("Review: checking {} ...", id);
398 }
399 if let Err(e) = cmd_review(
400 beans_dir,
401 ReviewArgs {
402 id: id.clone(),
403 model: None,
404 diff_only: false,
405 },
406 ) {
407 eprintln!("Review: warning — review of {} failed: {}", id, e);
408 }
409 }
410 }
411
412 if args.json_stream {
413 stream::emit(&StreamEvent::RunEnd {
414 total_success: total_done as usize,
415 total_failed: total_failed as usize,
416 duration_secs: run_start.elapsed().as_secs(),
417 });
418 } else {
419 eprintln!();
420 eprintln!(
421 "Summary: {} done, {} failed, {} skipped",
422 total_done,
423 total_failed,
424 plan.skipped.len()
425 );
426 }
427
428 if any_failed && !args.keep_going {
429 anyhow::bail!("Some agents failed");
430 }
431
432 Ok(())
433}
434
435fn run_loop(
437 beans_dir: &Path,
438 config: &Config,
439 args: &RunArgs,
440 _spawn_mode: &SpawnMode,
441) -> Result<()> {
442 let max_loops = if config.max_loops == 0 {
443 u32::MAX
444 } else {
445 config.max_loops
446 };
447
448 for iteration in 0..max_loops {
449 if iteration > 0 && !args.json_stream {
450 eprintln!("\n--- Loop iteration {} ---\n", iteration + 1);
451 }
452
453 let plan = plan_dispatch(beans_dir, config, args.id.as_deref(), args.auto_plan, false)?;
454
455 if plan.waves.is_empty() {
456 if !args.json_stream {
457 if iteration == 0 {
458 eprintln!("No ready beans. Use `bn status` to see what's going on.");
459 } else {
460 eprintln!("No more ready beans. Stopping.");
461 }
462 }
463 return Ok(());
464 }
465
466 let inner_args = RunArgs {
468 id: args.id.clone(),
469 jobs: args.jobs,
470 dry_run: false,
471 loop_mode: false,
472 auto_plan: args.auto_plan,
473 keep_going: args.keep_going,
474 timeout: args.timeout,
475 idle_timeout: args.idle_timeout,
476 json_stream: args.json_stream,
477 review: args.review,
478 };
479
480 let config = Config::load_with_extends(beans_dir)?;
482 let spawn_mode = determine_spawn_mode(&config);
483 match run_once(beans_dir, &config, &inner_args, &spawn_mode) {
484 Ok(()) => {}
485 Err(e) => {
486 if args.keep_going {
487 eprintln!("Warning: {}", e);
488 } else {
489 return Err(e);
490 }
491 }
492 }
493 }
494
495 eprintln!("Reached max_loops ({}). Stopping.", max_loops);
496 Ok(())
497}
498
499fn format_duration(d: Duration) -> String {
501 let secs = d.as_secs();
502 format!("{}:{:02}", secs / 60, secs % 60)
503}
504
505pub fn find_bean_file(beans_dir: &Path, id: &str) -> Result<PathBuf> {
507 crate::discovery::find_bean_file(beans_dir, id)
508}
509
510#[cfg(test)]
511mod tests {
512 use super::*;
513 use std::fs;
514 use tempfile::TempDir;
515
516 fn make_beans_dir() -> (TempDir, std::path::PathBuf) {
517 let dir = TempDir::new().unwrap();
518 let beans_dir = dir.path().join(".beans");
519 fs::create_dir(&beans_dir).unwrap();
520 (dir, beans_dir)
521 }
522
523 fn write_config(beans_dir: &std::path::Path, run: Option<&str>) {
524 let run_line = match run {
525 Some(r) => format!("run: \"{}\"\n", r),
526 None => String::new(),
527 };
528 fs::write(
529 beans_dir.join("config.yaml"),
530 format!("project: test\nnext_id: 1\n{}", run_line),
531 )
532 .unwrap();
533 }
534
535 fn default_args() -> RunArgs {
536 RunArgs {
537 id: None,
538 jobs: 4,
539 dry_run: false,
540 loop_mode: false,
541 auto_plan: false,
542 keep_going: false,
543 timeout: 30,
544 idle_timeout: 5,
545 json_stream: false,
546 review: false,
547 }
548 }
549
550 #[test]
551 fn cmd_run_errors_when_no_run_template_and_no_pi() {
552 let (_dir, beans_dir) = make_beans_dir();
553 write_config(&beans_dir, None);
554
555 let args = default_args();
556
557 let result = cmd_run(&beans_dir, args);
558 if !pi_available() {
562 assert!(result.is_err());
563 let err = result.unwrap_err().to_string();
564 assert!(
565 err.contains("No agent configured") || err.contains("not found"),
566 "Error should mention missing agent: {}",
567 err
568 );
569 }
570 }
571
572 #[test]
573 fn dry_run_does_not_spawn() {
574 let (_dir, beans_dir) = make_beans_dir();
575 write_config(&beans_dir, Some("echo {id}"));
576
577 let mut bean = crate::bean::Bean::new("1", "Test bean");
579 bean.verify = Some("echo ok".to_string());
580 bean.to_file(beans_dir.join("1-test.md")).unwrap();
581
582 let args = RunArgs {
583 dry_run: true,
584 ..default_args()
585 };
586
587 let result = cmd_run(&beans_dir, args);
589 assert!(result.is_ok());
590 }
591
592 #[test]
593 fn dry_run_with_json_stream() {
594 let (_dir, beans_dir) = make_beans_dir();
595 write_config(&beans_dir, Some("echo {id}"));
596
597 let mut bean = crate::bean::Bean::new("1", "Test bean");
598 bean.verify = Some("echo ok".to_string());
599 bean.to_file(beans_dir.join("1-test.md")).unwrap();
600
601 let args = RunArgs {
602 dry_run: true,
603 json_stream: true,
604 ..default_args()
605 };
606
607 let result = cmd_run(&beans_dir, args);
609 assert!(result.is_ok());
610 }
611
612 #[test]
613 fn format_duration_formats_correctly() {
614 assert_eq!(format_duration(Duration::from_secs(0)), "0:00");
615 assert_eq!(format_duration(Duration::from_secs(32)), "0:32");
616 assert_eq!(format_duration(Duration::from_secs(62)), "1:02");
617 assert_eq!(format_duration(Duration::from_secs(600)), "10:00");
618 }
619
620 #[test]
621 fn determine_spawn_mode_template_when_run_set() {
622 let config = Config {
623 project: "test".to_string(),
624 next_id: 1,
625 auto_close_parent: true,
626 max_tokens: 30000,
627 run: Some("echo {id}".to_string()),
628 plan: Some("plan {id}".to_string()),
629 max_loops: 10,
630 max_concurrent: 4,
631 poll_interval: 30,
632 extends: vec![],
633 rules_file: None,
634 file_locking: false,
635 on_close: None,
636 on_fail: None,
637 post_plan: None,
638 verify_timeout: None,
639 review: None,
640 };
641 let mode = determine_spawn_mode(&config);
642 assert_eq!(
643 mode,
644 SpawnMode::Template {
645 run_template: "echo {id}".to_string(),
646 plan_template: Some("plan {id}".to_string()),
647 }
648 );
649 }
650
651 #[test]
652 fn determine_spawn_mode_direct_when_no_run() {
653 let config = Config {
654 project: "test".to_string(),
655 next_id: 1,
656 auto_close_parent: true,
657 max_tokens: 30000,
658 run: None,
659 plan: None,
660 max_loops: 10,
661 max_concurrent: 4,
662 poll_interval: 30,
663 extends: vec![],
664 rules_file: None,
665 file_locking: false,
666 on_close: None,
667 on_fail: None,
668 post_plan: None,
669 verify_timeout: None,
670 review: None,
671 };
672 let mode = determine_spawn_mode(&config);
673 assert_eq!(mode, SpawnMode::Direct);
674 }
675
676 #[test]
677 fn agent_result_tracks_tokens_and_cost() {
678 let result = AgentResult {
679 id: "1".to_string(),
680 title: "Test".to_string(),
681 action: BeanAction::Implement,
682 success: true,
683 duration: Duration::from_secs(10),
684 total_tokens: Some(5000),
685 total_cost: Some(0.03),
686 error: None,
687 };
688 assert_eq!(result.total_tokens, Some(5000));
689 assert_eq!(result.total_cost, Some(0.03));
690 }
691}