1mod plan;
19mod ready_queue;
20mod wave;
21
22pub use plan::{DispatchPlan, SizedBean};
23pub use wave::Wave;
24
25use std::fmt;
26use std::path::{Path, PathBuf};
27use std::process::{Command, Stdio};
28use std::time::{Duration, Instant};
29
30use anyhow::Result;
31
32use crate::commands::review::{cmd_review, ReviewArgs};
33use crate::config::Config;
34use crate::stream::{self, StreamEvent};
35
36use plan::{plan_dispatch, print_plan, print_plan_json};
37use ready_queue::run_ready_queue_direct;
38use wave::run_wave;
39
40pub(super) struct RunConfig {
42 pub max_jobs: usize,
43 pub timeout_minutes: u32,
44 pub idle_timeout_minutes: u32,
45 pub json_stream: bool,
46 pub file_locking: bool,
47}
48
49pub struct RunArgs {
51 pub id: Option<String>,
52 pub jobs: u32,
53 pub dry_run: bool,
54 pub loop_mode: bool,
55 pub auto_plan: bool,
56 pub keep_going: bool,
57 pub timeout: u32,
58 pub idle_timeout: u32,
59 pub json_stream: bool,
60 pub review: bool,
62}
63
64#[derive(Debug, Clone, Copy, PartialEq, Eq)]
66pub enum BeanAction {
67 Implement,
68}
69
70impl fmt::Display for BeanAction {
71 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
72 match self {
73 BeanAction::Implement => write!(f, "implement"),
74 }
75 }
76}
77
78#[derive(Debug)]
80struct AgentResult {
81 id: String,
82 title: String,
83 action: BeanAction,
84 success: bool,
85 duration: Duration,
86 total_tokens: Option<u64>,
87 total_cost: Option<f64>,
88 error: Option<String>,
89 tool_count: usize,
90 turns: usize,
91 failure_summary: Option<String>,
92}
93
94#[derive(Debug, Clone, PartialEq, Eq)]
96enum SpawnMode {
97 Template {
99 run_template: String,
100 plan_template: Option<String>,
101 },
102 Direct,
104}
105
106pub fn cmd_run(beans_dir: &Path, args: RunArgs) -> Result<()> {
108 let config = Config::load_with_extends(beans_dir)?;
110 let spawn_mode = determine_spawn_mode(&config);
111
112 if spawn_mode == SpawnMode::Direct && !pi_available() {
113 anyhow::bail!(
114 "No agent configured and `pi` not found on PATH.\n\n\
115 Either:\n \
116 1. Install pi: npm i -g @anthropic/pi\n \
117 2. Set a run template: bn config set run \"<command>\"\n\n\
118 The command template uses {{id}} as a placeholder for the bean ID.\n\n\
119 Examples:\n \
120 bn config set run \"pi @.beans/{{id}}-*.md 'implement and bn close {{id}}'\"\n \
121 bn config set run \"claude -p 'implement bean {{id}} and run bn close {{id}}'\""
122 );
123 }
124
125 if let SpawnMode::Template {
126 ref run_template, ..
127 } = spawn_mode
128 {
129 let _ = run_template;
131 }
132
133 if args.loop_mode {
134 run_loop(beans_dir, &config, &args, &spawn_mode)
135 } else {
136 run_once(beans_dir, &config, &args, &spawn_mode)
137 }
138}
139
140fn determine_spawn_mode(config: &Config) -> SpawnMode {
142 if let Some(ref run) = config.run {
143 SpawnMode::Template {
144 run_template: run.clone(),
145 plan_template: config.plan.clone(),
146 }
147 } else {
148 SpawnMode::Direct
149 }
150}
151
152fn pi_available() -> bool {
154 Command::new("pi")
155 .arg("--version")
156 .stdout(Stdio::null())
157 .stderr(Stdio::null())
158 .status()
159 .map(|s| s.success())
160 .unwrap_or(false)
161}
162
163fn run_once(
165 beans_dir: &Path,
166 config: &Config,
167 args: &RunArgs,
168 spawn_mode: &SpawnMode,
169) -> Result<()> {
170 let plan = plan_dispatch(
171 beans_dir,
172 config,
173 args.id.as_deref(),
174 args.auto_plan,
175 args.dry_run,
176 )?;
177
178 if plan.waves.is_empty() && plan.skipped.is_empty() {
179 if args.json_stream {
180 stream::emit_error("No ready beans");
181 } else {
182 eprintln!("No ready beans. Use `bn status` to see what's going on.");
183 }
184 return Ok(());
185 }
186
187 if args.dry_run {
188 if args.json_stream {
189 print_plan_json(&plan, args.id.as_deref());
190 } else {
191 print_plan(&plan);
192 }
193 return Ok(());
194 }
195
196 if !plan.skipped.is_empty() && !args.json_stream {
198 eprintln!("{} bean(s) blocked:", plan.skipped.len());
199 for bb in &plan.skipped {
200 eprintln!(" ⚠ {} {} ({})", bb.id, bb.title, bb.reason);
201 }
202 eprintln!();
203 }
204
205 let total_beans: usize = plan.waves.iter().map(|w| w.beans.len()).sum();
206 let total_waves = plan.waves.len();
207 let parent_id = args.id.as_deref().unwrap_or("all");
208
209 if args.json_stream {
210 let beans_info: Vec<stream::BeanInfo> = plan
211 .waves
212 .iter()
213 .enumerate()
214 .flat_map(|(wave_idx, wave)| {
215 wave.beans.iter().map(move |b| stream::BeanInfo {
216 id: b.id.clone(),
217 title: b.title.clone(),
218 round: wave_idx + 1,
219 })
220 })
221 .collect();
222 stream::emit(&StreamEvent::RunStart {
223 parent_id: parent_id.to_string(),
224 total_beans,
225 total_rounds: total_waves,
226 beans: beans_info,
227 });
228 }
229
230 let run_cfg = RunConfig {
231 max_jobs: args.jobs.min(config.max_concurrent) as usize,
232 timeout_minutes: args.timeout,
233 idle_timeout_minutes: args.idle_timeout,
234 json_stream: args.json_stream,
235 file_locking: config.file_locking,
236 };
237 let run_start = Instant::now();
238 let total_done;
239 let total_failed;
240 let any_failed;
241 let mut successful_ids: Vec<String> = Vec::new();
243
244 match spawn_mode {
245 SpawnMode::Direct => {
246 if !args.json_stream {
247 eprintln!("Dispatching {} bean(s)...", total_beans);
248 }
249
250 let (results, had_failure) = run_ready_queue_direct(
253 beans_dir,
254 &plan.all_beans,
255 &plan.index,
256 &run_cfg,
257 args.keep_going,
258 )?;
259
260 let mut done = 0u32;
261 let mut failed = 0u32;
262 for result in &results {
263 if result.success {
264 if args.json_stream {
265 stream::emit(&StreamEvent::BeanDone {
266 id: result.id.clone(),
267 success: true,
268 duration_secs: result.duration.as_secs(),
269 error: None,
270 total_tokens: result.total_tokens,
271 total_cost: result.total_cost,
272 tool_count: Some(result.tool_count),
273 turns: Some(result.turns),
274 failure_summary: None,
275 });
276 }
277 done += 1;
278 successful_ids.push(result.id.clone());
279 } else {
280 if args.json_stream {
281 stream::emit(&StreamEvent::BeanDone {
282 id: result.id.clone(),
283 success: false,
284 duration_secs: result.duration.as_secs(),
285 error: result.error.clone(),
286 total_tokens: result.total_tokens,
287 total_cost: result.total_cost,
288 tool_count: Some(result.tool_count),
289 turns: Some(result.turns),
290 failure_summary: result.failure_summary.clone(),
291 });
292 }
293 failed += 1;
294 }
295 }
296 total_done = done;
297 total_failed = failed;
298 any_failed = had_failure;
299 }
300
301 SpawnMode::Template { .. } => {
302 let mut done = 0u32;
304 let mut failed = 0u32;
305 let mut had_failure = false;
306
307 for (wave_idx, wave) in plan.waves.iter().enumerate() {
308 if args.json_stream {
309 stream::emit(&StreamEvent::RoundStart {
310 round: wave_idx + 1,
311 total_rounds: total_waves,
312 bean_count: wave.beans.len(),
313 });
314 } else {
315 eprintln!("Wave {}: {} bean(s)", wave_idx + 1, wave.beans.len());
316 }
317
318 let results = run_wave(beans_dir, &wave.beans, spawn_mode, &run_cfg, wave_idx + 1)?;
319
320 let mut wave_success = 0usize;
321 let mut wave_failed = 0usize;
322
323 for result in &results {
324 let duration = format_duration(result.duration);
325 if result.success {
326 if args.json_stream {
327 stream::emit(&StreamEvent::BeanDone {
328 id: result.id.clone(),
329 success: true,
330 duration_secs: result.duration.as_secs(),
331 error: None,
332 total_tokens: result.total_tokens,
333 total_cost: result.total_cost,
334 tool_count: Some(result.tool_count),
335 turns: Some(result.turns),
336 failure_summary: None,
337 });
338 } else {
339 eprintln!(
340 " ✓ {} {} {} {}",
341 result.id, result.title, result.action, duration
342 );
343 }
344 done += 1;
345 wave_success += 1;
346 successful_ids.push(result.id.clone());
347 } else {
348 if args.json_stream {
349 stream::emit(&StreamEvent::BeanDone {
350 id: result.id.clone(),
351 success: false,
352 duration_secs: result.duration.as_secs(),
353 error: result.error.clone(),
354 total_tokens: result.total_tokens,
355 total_cost: result.total_cost,
356 tool_count: Some(result.tool_count),
357 turns: Some(result.turns),
358 failure_summary: result.failure_summary.clone(),
359 });
360 } else {
361 eprintln!(
362 " ✗ {} {} {} {} (failed)",
363 result.id, result.title, result.action, duration
364 );
365 }
366 failed += 1;
367 wave_failed += 1;
368 had_failure = true;
369 }
370 }
371
372 if args.json_stream {
373 stream::emit(&StreamEvent::RoundEnd {
374 round: wave_idx + 1,
375 success_count: wave_success,
376 failed_count: wave_failed,
377 });
378 }
379
380 if had_failure && !args.keep_going {
381 break;
382 }
383 }
384
385 total_done = done;
386 total_failed = failed;
387 any_failed = had_failure;
388 }
389 }
390
391 if args.review && !successful_ids.is_empty() {
394 for id in &successful_ids {
395 if !args.json_stream {
396 eprintln!("Review: checking {} ...", id);
397 }
398 if let Err(e) = cmd_review(
399 beans_dir,
400 ReviewArgs {
401 id: id.clone(),
402 model: None,
403 diff_only: false,
404 },
405 ) {
406 eprintln!("Review: warning — review of {} failed: {}", id, e);
407 }
408 }
409 }
410
411 if args.json_stream {
412 stream::emit(&StreamEvent::RunEnd {
413 total_success: total_done as usize,
414 total_failed: total_failed as usize,
415 duration_secs: run_start.elapsed().as_secs(),
416 });
417 } else {
418 eprintln!();
419 eprintln!(
420 "Summary: {} done, {} failed, {} skipped",
421 total_done,
422 total_failed,
423 plan.skipped.len()
424 );
425 }
426
427 if any_failed && !args.keep_going {
428 anyhow::bail!("Some agents failed");
429 }
430
431 Ok(())
432}
433
434fn run_loop(
436 beans_dir: &Path,
437 config: &Config,
438 args: &RunArgs,
439 _spawn_mode: &SpawnMode,
440) -> Result<()> {
441 let max_loops = if config.max_loops == 0 {
442 u32::MAX
443 } else {
444 config.max_loops
445 };
446
447 for iteration in 0..max_loops {
448 if iteration > 0 && !args.json_stream {
449 eprintln!("\n--- Loop iteration {} ---\n", iteration + 1);
450 }
451
452 let plan = plan_dispatch(beans_dir, config, args.id.as_deref(), args.auto_plan, false)?;
453
454 if plan.waves.is_empty() {
455 if !args.json_stream {
456 if iteration == 0 {
457 eprintln!("No ready beans. Use `bn status` to see what's going on.");
458 } else {
459 eprintln!("No more ready beans. Stopping.");
460 }
461 }
462 return Ok(());
463 }
464
465 let inner_args = RunArgs {
467 id: args.id.clone(),
468 jobs: args.jobs,
469 dry_run: false,
470 loop_mode: false,
471 auto_plan: args.auto_plan,
472 keep_going: args.keep_going,
473 timeout: args.timeout,
474 idle_timeout: args.idle_timeout,
475 json_stream: args.json_stream,
476 review: args.review,
477 };
478
479 let config = Config::load_with_extends(beans_dir)?;
481 let spawn_mode = determine_spawn_mode(&config);
482 match run_once(beans_dir, &config, &inner_args, &spawn_mode) {
483 Ok(()) => {}
484 Err(e) => {
485 if args.keep_going {
486 eprintln!("Warning: {}", e);
487 } else {
488 return Err(e);
489 }
490 }
491 }
492 }
493
494 eprintln!("Reached max_loops ({}). Stopping.", max_loops);
495 Ok(())
496}
497
498pub(super) fn format_duration(d: Duration) -> String {
500 let secs = d.as_secs();
501 format!("{}:{:02}", secs / 60, secs % 60)
502}
503
504pub fn find_bean_file(beans_dir: &Path, id: &str) -> Result<PathBuf> {
506 crate::discovery::find_bean_file(beans_dir, id)
507}
508
509#[cfg(test)]
510mod tests {
511 use super::*;
512 use std::fs;
513 use tempfile::TempDir;
514
515 fn make_beans_dir() -> (TempDir, std::path::PathBuf) {
516 let dir = TempDir::new().unwrap();
517 let beans_dir = dir.path().join(".beans");
518 fs::create_dir(&beans_dir).unwrap();
519 (dir, beans_dir)
520 }
521
522 fn write_config(beans_dir: &std::path::Path, run: Option<&str>) {
523 let run_line = match run {
524 Some(r) => format!("run: \"{}\"\n", r),
525 None => String::new(),
526 };
527 fs::write(
528 beans_dir.join("config.yaml"),
529 format!("project: test\nnext_id: 1\n{}", run_line),
530 )
531 .unwrap();
532 }
533
534 fn default_args() -> RunArgs {
535 RunArgs {
536 id: None,
537 jobs: 4,
538 dry_run: false,
539 loop_mode: false,
540 auto_plan: false,
541 keep_going: false,
542 timeout: 30,
543 idle_timeout: 5,
544 json_stream: false,
545 review: false,
546 }
547 }
548
549 #[test]
550 fn cmd_run_errors_when_no_run_template_and_no_pi() {
551 let (_dir, beans_dir) = make_beans_dir();
552 write_config(&beans_dir, None);
553
554 let args = default_args();
555
556 let result = cmd_run(&beans_dir, args);
557 if !pi_available() {
561 assert!(result.is_err());
562 let err = result.unwrap_err().to_string();
563 assert!(
564 err.contains("No agent configured") || err.contains("not found"),
565 "Error should mention missing agent: {}",
566 err
567 );
568 }
569 }
570
571 #[test]
572 fn dry_run_does_not_spawn() {
573 let (_dir, beans_dir) = make_beans_dir();
574 write_config(&beans_dir, Some("echo {id}"));
575
576 let mut bean = crate::bean::Bean::new("1", "Test bean");
578 bean.verify = Some("echo ok".to_string());
579 bean.to_file(beans_dir.join("1-test.md")).unwrap();
580
581 let args = RunArgs {
582 dry_run: true,
583 ..default_args()
584 };
585
586 let result = cmd_run(&beans_dir, args);
588 assert!(result.is_ok());
589 }
590
591 #[test]
592 fn dry_run_with_json_stream() {
593 let (_dir, beans_dir) = make_beans_dir();
594 write_config(&beans_dir, Some("echo {id}"));
595
596 let mut bean = crate::bean::Bean::new("1", "Test bean");
597 bean.verify = Some("echo ok".to_string());
598 bean.to_file(beans_dir.join("1-test.md")).unwrap();
599
600 let args = RunArgs {
601 dry_run: true,
602 json_stream: true,
603 ..default_args()
604 };
605
606 let result = cmd_run(&beans_dir, args);
608 assert!(result.is_ok());
609 }
610
611 #[test]
612 fn format_duration_formats_correctly() {
613 assert_eq!(format_duration(Duration::from_secs(0)), "0:00");
614 assert_eq!(format_duration(Duration::from_secs(32)), "0:32");
615 assert_eq!(format_duration(Duration::from_secs(62)), "1:02");
616 assert_eq!(format_duration(Duration::from_secs(600)), "10:00");
617 }
618
619 #[test]
620 fn determine_spawn_mode_template_when_run_set() {
621 let config = Config {
622 project: "test".to_string(),
623 next_id: 1,
624 auto_close_parent: true,
625 run: Some("echo {id}".to_string()),
626 plan: Some("plan {id}".to_string()),
627 max_loops: 10,
628 max_concurrent: 4,
629 poll_interval: 30,
630 extends: vec![],
631 rules_file: None,
632 file_locking: false,
633 worktree: false,
634 on_close: None,
635 on_fail: None,
636 post_plan: None,
637 verify_timeout: None,
638 review: None,
639 user: None,
640 user_email: None,
641 };
642 let mode = determine_spawn_mode(&config);
643 assert_eq!(
644 mode,
645 SpawnMode::Template {
646 run_template: "echo {id}".to_string(),
647 plan_template: Some("plan {id}".to_string()),
648 }
649 );
650 }
651
652 #[test]
653 fn determine_spawn_mode_direct_when_no_run() {
654 let config = Config {
655 project: "test".to_string(),
656 next_id: 1,
657 auto_close_parent: true,
658 run: None,
659 plan: None,
660 max_loops: 10,
661 max_concurrent: 4,
662 poll_interval: 30,
663 extends: vec![],
664 rules_file: None,
665 file_locking: false,
666 worktree: false,
667 on_close: None,
668 on_fail: None,
669 post_plan: None,
670 verify_timeout: None,
671 review: None,
672 user: None,
673 user_email: None,
674 };
675 let mode = determine_spawn_mode(&config);
676 assert_eq!(mode, SpawnMode::Direct);
677 }
678
679 #[test]
680 fn agent_result_tracks_tokens_and_cost() {
681 let result = AgentResult {
682 id: "1".to_string(),
683 title: "Test".to_string(),
684 action: BeanAction::Implement,
685 success: true,
686 duration: Duration::from_secs(10),
687 total_tokens: Some(5000),
688 total_cost: Some(0.03),
689 error: None,
690 tool_count: 5,
691 turns: 2,
692 failure_summary: None,
693 };
694 assert_eq!(result.total_tokens, Some(5000));
695 assert_eq!(result.total_cost, Some(0.03));
696 }
697}