1pub mod beads;
33pub mod events;
34pub mod publisher;
35pub mod runtime;
36pub mod session;
37pub mod transcript;
38pub mod zmq_client;
39
40pub use crate::backpressure;
45
46use anyhow::Result;
47use colored::Colorize;
48use std::collections::HashMap;
49use std::path::PathBuf;
50use std::sync::atomic::{AtomicBool, Ordering};
51use std::sync::Arc;
52use std::thread;
53use std::time::Duration;
54
55use self::runtime::SwarmRuntime;
56use crate::commands::helpers::resolve_group_tag;
57use crate::commands::spawn::agent;
58use crate::commands::spawn::headless::{self, store::SessionStatus, StreamStore};
59use crate::commands::spawn::hooks;
60use crate::commands::spawn::monitor::{self, SpawnSession};
61use crate::commands::spawn::terminal::{self, Harness};
62use crate::commands::spawn::tui;
63use crate::models::phase::Phase;
64use crate::models::task::{Task, TaskStatus};
65use crate::storage::Storage;
66use std::path::Path;
67
68use self::session::{acquire_session_lock, RoundState, SwarmSession, WaveState, WaveSummary};
69use crate::agents::AgentDef;
70use crate::attribution::{attribute_failure, AttributionConfidence};
71use crate::backpressure::{BackpressureConfig, ValidationResult};
72use crate::commands::task_selection::{count_in_progress_tasks, is_actionable_pending_task};
73use crate::transcript_watcher::TranscriptWatcher;
74
75pub use crate::SwarmMode;
77
78pub struct SwarmConfig {
80 pub project_root: Option<PathBuf>,
81 pub tag: Option<String>,
82 pub round_size: usize,
83 pub all_tags: bool,
84 pub harness_arg: String,
85 pub swarm_mode: SwarmMode,
86 pub dry_run: bool,
87 pub session_name: Option<String>,
88 pub no_research: bool,
89 pub no_validate: bool,
90 pub review: bool,
91 pub review_all: bool,
92 pub no_repair: bool,
93 pub max_repair_attempts: usize,
94 pub no_worktree: bool,
95 pub salvo_dir: Option<PathBuf>,
96 pub stale_timeout_minutes: Option<u64>,
97 pub idle_timeout_minutes: u64,
98 pub no_publish_events: bool,
99 pub pause_flag: Option<Arc<AtomicBool>>,
100 pub stop_flag: Option<Arc<AtomicBool>>,
101}
102
103pub async fn run(config: SwarmConfig) -> Result<()> {
105 let SwarmConfig {
107 project_root,
108 tag,
109 round_size,
110 all_tags,
111 harness_arg,
112 swarm_mode,
113 dry_run,
114 session_name,
115 no_research,
116 no_validate,
117 review,
118 review_all,
119 no_repair,
120 max_repair_attempts,
121 no_worktree,
122 salvo_dir,
123 stale_timeout_minutes,
124 idle_timeout_minutes,
125 no_publish_events,
126 pause_flag,
127 stop_flag,
128 } = config;
129
130 let tag = tag.as_deref();
131 let harness_arg = &harness_arg;
132 let effective_tag = tag.unwrap_or("default");
133
134 if round_size == 0 {
135 anyhow::bail!("--round-size must be at least 1");
136 }
137
138 let storage = Storage::new(project_root.clone());
139
140 if !storage.is_initialized() {
141 anyhow::bail!("SCUD not initialized. Run: scud init");
142 }
143
144 let runtime = SwarmRuntime::from(swarm_mode);
145 runtime.ensure_requirements()?;
146
147 let phase_tag = if all_tags {
149 "all".to_string()
150 } else {
151 resolve_group_tag(&storage, tag, true)?
152 };
153
154 let _session_lock = if !dry_run {
157 Some(acquire_session_lock(project_root.as_ref(), &phase_tag)?)
158 } else {
159 None
160 };
161
162 let harness = Harness::parse(harness_arg)?;
164 terminal::find_harness_binary(harness)?;
165
166 let session_name = session_name.unwrap_or_else(|| format!("swarm-{}", effective_tag));
168
169 let original_working_dir = project_root
171 .clone()
172 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
173
174 let (working_dir, is_salvo_worktree, main_project_root) = if !no_worktree && !all_tags {
176 if let Some(tag_name) = tag {
177 match crate::commands::salvo::ensure_worktree(
178 &original_working_dir,
179 tag_name,
180 salvo_dir.as_deref(),
181 ) {
182 Ok(wt_path) => (wt_path, true, Some(original_working_dir.clone())),
183 Err(e) => {
184 eprintln!("Warning: Could not create salvo worktree: {}", e);
185 eprintln!("Running in-place (use --no-worktree to suppress this warning)");
186 (original_working_dir.clone(), false, None)
187 }
188 }
189 } else {
190 (original_working_dir.clone(), false, None)
191 }
192 } else {
193 (original_working_dir.clone(), false, None)
194 };
195
196 let bp_config = BackpressureConfig::load(project_root.as_ref())?;
198
199 if !dry_run {
201 let watcher_session = session_name.clone();
202 let watcher_root = working_dir.clone();
203 let _watcher_handle = std::thread::spawn(move || {
204 let db = std::sync::Arc::new(crate::db::Database::new(&watcher_root));
205 if db.initialize().is_err() {
206 return;
207 }
208 let watcher = TranscriptWatcher::new(&watcher_root, db);
209 if let Err(e) = watcher.watch(&watcher_session) {
210 eprintln!("Transcript watcher error: {}", e);
211 }
212 });
213 }
214
215 println!("{}", "SCUD Swarm Mode".cyan().bold());
217 println!("{}", "═".repeat(50));
218 println!("{:<20} {}", "Tag:".dimmed(), phase_tag.green());
219 println!(
220 "{:<20} {}",
221 "Round size:".dimmed(),
222 round_size.to_string().cyan()
223 );
224 println!(
225 "{:<20} {}",
226 "Research:".dimmed(),
227 if no_research {
228 "skip".yellow()
229 } else {
230 "enabled".green()
231 }
232 );
233 println!(
234 "{:<20} {}",
235 "Validation:".dimmed(),
236 if no_validate {
237 "skip".yellow()
238 } else {
239 "enabled".green()
240 }
241 );
242 let mode_label = match runtime {
243 SwarmRuntime::Tmux => runtime.display_label().cyan(),
244 SwarmRuntime::Extensions => runtime.display_label().green(),
245 SwarmRuntime::Server => runtime.display_label().magenta(),
246 SwarmRuntime::Headless => runtime.display_label().green(),
247 SwarmRuntime::Beads => runtime.display_label().yellow(),
248 };
249 println!("{:<20} {}", "Mode:".dimmed(), mode_label);
250 println!("{:<20} {}", "Harness:".dimmed(), harness.name().cyan());
251 println!(
252 "{:<20} {}",
253 "Review:".dimmed(),
254 if review_all {
255 "all tasks".green()
256 } else if review {
257 "sample (3 per wave)".green()
258 } else {
259 "disabled".yellow()
260 }
261 );
262 println!(
263 "{:<20} {}",
264 "Repair:".dimmed(),
265 if no_repair {
266 "disabled".yellow()
267 } else {
268 format!("up to {} attempts", max_repair_attempts).green()
269 }
270 );
271
272 if !bp_config.commands.is_empty() && !no_validate {
273 println!(
274 "{:<20} {}",
275 "Backpressure:".dimmed(),
276 bp_config.commands.join(", ").dimmed()
277 );
278 }
279 println!();
280
281 if dry_run {
282 return run_dry_run(project_root, &phase_tag, round_size, all_tags);
283 }
284
285 if !hooks::hooks_installed(&working_dir) {
287 println!("{}", "Installing Claude Code hooks...".dimmed());
288 if let Err(e) = hooks::install_hooks(&working_dir) {
289 println!(
290 " {} Hook installation: {}",
291 "!".yellow(),
292 e.to_string().dimmed()
293 );
294 } else {
295 println!(" {} Hooks installed", "✓".green());
296 }
297 }
298
299 let terminal_mode = runtime.terminal_label();
301 let mut swarm_session = SwarmSession::new(
302 &session_name,
303 &phase_tag,
304 terminal_mode,
305 &working_dir.to_string_lossy(),
306 round_size,
307 );
308
309 let stale_timeout = stale_timeout_minutes.map(|m| Duration::from_secs(m * 60));
313
314 let event_writer =
317 events::EventWriter::new_with_zmq(&working_dir, &session_name, !no_publish_events).ok();
318
319 let status_state = Arc::new(std::sync::Mutex::new(
321 crate::commands::swarm::publisher::SwarmStatus {
322 state: "running".to_string(),
323 current_wave: 0,
324 total_waves: 0,
325 tasks_completed: 0,
326 tasks_total: 0,
327 },
328 ));
329
330 let heartbeat_handle = if event_writer.is_some() {
332 let working_dir = working_dir.clone();
333 let session_name = session_name.clone();
334 let stop_flag = Arc::new(AtomicBool::new(false));
335 let stop_flag_clone = Arc::clone(&stop_flag);
336
337 let handle = thread::spawn(move || {
338 let writer = match events::EventWriter::new(&working_dir, &session_name) {
339 Ok(w) => w,
340 Err(e) => {
341 eprintln!("Failed to create heartbeat EventWriter: {}", e);
342 return;
343 }
344 };
345
346 while !stop_flag_clone.load(Ordering::Relaxed) {
347 if let Err(e) = writer.log_heartbeat() {
348 eprintln!("Heartbeat logging error: {}", e);
349 }
350 thread::sleep(Duration::from_secs(5));
351 }
352 });
353
354 Some((handle, stop_flag))
355 } else {
356 None
357 };
358
359 let all_phases = storage.load_tasks()?;
362 if runtime.is_tmux() {
363 let orphans = find_orphan_tasks(&all_phases, &phase_tag, all_tags, &session_name);
364
365 if !orphans.is_empty() {
366 println!();
367 println!(
368 "{}",
369 "Detected orphan in-progress tasks (no tmux window):".yellow()
370 );
371 for (task_id, tag) in &orphans {
372 println!(
373 " {} {} (tag: {})",
374 "*".yellow(),
375 task_id.cyan(),
376 tag.dimmed()
377 );
378 }
379 println!();
380
381 let choices = vec![
383 "Reset to pending and re-run",
384 "Kill existing windows (if any) and restart",
385 "Skip and continue (leave as in-progress)",
386 "Abort",
387 ];
388
389 let selection = dialoguer::Select::new()
390 .with_prompt("How should orphan tasks be handled?")
391 .items(&choices)
392 .default(0)
393 .interact()?;
394
395 match selection {
396 0 => {
397 for (task_id, tag) in &orphans {
399 if let Ok(mut phase) = storage.load_group(tag) {
400 if let Some(task) = phase.get_task_mut(task_id) {
401 task.set_status(TaskStatus::Pending);
402 storage.update_group(tag, &phase)?;
403 println!(" {} {} -> pending", "v".green(), task_id);
404 }
405 }
406 }
407 }
408 1 => {
409 for (task_id, _) in &orphans {
411 let window_name = format!("task-{}", task_id);
412 let _ = terminal::kill_tmux_window(&session_name, &window_name);
413 }
414 for (task_id, tag) in &orphans {
416 if let Ok(mut phase) = storage.load_group(tag) {
417 if let Some(task) = phase.get_task_mut(task_id) {
418 task.set_status(TaskStatus::Pending);
419 storage.update_group(tag, &phase)?;
420 println!(
421 " {} {} -> pending (will re-spawn)",
422 "v".green(),
423 task_id
424 );
425 }
426 }
427 }
428 }
429 2 => {
430 println!("{}", "Leaving orphan tasks as in-progress.".dimmed());
432 }
433 3 => {
434 anyhow::bail!("Aborted by user");
436 }
437 _ => {}
438 }
439 println!();
440 }
441 }
442
443 if runtime.is_beads() {
446 let beads_config = beads::BeadsConfig {
447 max_concurrent: round_size, poll_interval: Duration::from_secs(3),
449 };
450
451 let result = beads::run_beads_loop(
453 &storage,
454 &phase_tag,
455 all_tags,
456 &working_dir,
457 &session_name,
458 harness,
459 &beads_config,
460 &mut swarm_session,
461 )?;
462
463 session::save_session(project_root.as_ref(), &swarm_session)?;
465
466 println!();
468 println!("{}", "Beads Session Summary".blue().bold());
469 println!("{}", "═".repeat(40).blue());
470 println!(
471 " Tasks completed: {}",
472 result.tasks_completed.to_string().green()
473 );
474 println!(
475 " Tasks failed: {}",
476 if result.tasks_failed > 0 {
477 result.tasks_failed.to_string().red()
478 } else {
479 "0".to_string().green()
480 }
481 );
482 println!(
483 " Duration: {}",
484 format!("{:.1}s", result.total_duration.as_secs_f64()).cyan()
485 );
486
487 if let Some((handle, stop_flag)) = heartbeat_handle {
489 stop_flag.store(true, Ordering::Relaxed);
490 let _ = handle.join();
491 }
492
493 return Ok(());
494 }
495
496 let mut wave_number = 1;
499 loop {
500 if let Some(ref pause_flag) = pause_flag {
502 while pause_flag.load(Ordering::SeqCst) {
503 #[cfg(feature = "zmq")]
505 if let Some(ref writer) = &event_writer {
506 if let Some(zmq_publisher) = writer.zmq_publisher() {
507 let _ = zmq_publisher.handle_control_request(
508 pause_flag,
509 stop_flag
510 .as_ref()
511 .unwrap_or(&Arc::new(AtomicBool::new(false))),
512 &|| status_state.lock().unwrap().clone(),
513 );
514 }
515 }
516
517 std::thread::sleep(Duration::from_millis(100));
518 if let Some(ref stop_flag) = stop_flag {
519 if stop_flag.load(Ordering::SeqCst) {
520 println!();
521 println!("{}", "Swarm stopped by control command".yellow());
522 break;
523 }
524 }
525 }
526 }
527
528 if let Some(ref stop_flag) = stop_flag {
529 if stop_flag.load(Ordering::SeqCst) {
530 println!();
531 println!("{}", "Swarm stopped by control command".yellow());
532
533 {
535 let mut status = status_state.lock().unwrap();
536 status.state = "stopped".to_string();
537 }
538
539 break;
540 }
541 }
542
543 let all_phases = storage.load_tasks()?;
545
546 let waves = compute_waves_from_tasks(&all_phases, &phase_tag, all_tags)?;
548
549 {
551 let mut status = status_state.lock().unwrap();
552 status.current_wave = wave_number;
553 status.total_waves = waves.len();
554 status.tasks_total = waves.iter().map(|w| w.len()).sum();
555 status.tasks_completed = all_phases
556 .values()
557 .flat_map(|phase| &phase.tasks)
558 .filter(|task| matches!(task.status, TaskStatus::Done))
559 .count();
560 status.state = if pause_flag
561 .as_ref()
562 .is_some_and(|f| f.load(Ordering::SeqCst))
563 {
564 "paused".to_string()
565 } else {
566 "running".to_string()
567 };
568 }
569
570 if waves.is_empty() {
571 println!();
572 println!("{}", "All tasks complete!".green().bold());
573
574 {
576 let mut status = status_state.lock().unwrap();
577 status.state = "completed".to_string();
578 }
579
580 if let Some(ref writer) = &event_writer {
582 let _ =
583 writer.publish_event(&publisher::ZmqEvent::SwarmCompleted { success: true });
584 let _ = writer.log_swarm_completed(true);
585 }
586
587 break;
588 }
589
590 let wave_tasks = &waves[0];
592
593 if wave_tasks.is_empty() {
594 println!();
595 println!("{}", "No ready tasks in current wave.".yellow());
596
597 let in_progress_count = count_in_progress_tasks(&all_phases, &phase_tag, all_tags);
598 if in_progress_count > 0 {
599 if runtime.is_tmux() {
601 let orphans =
602 find_orphan_tasks(&all_phases, &phase_tag, all_tags, &session_name);
603 for (task_id, tag) in &orphans {
604 println!(
605 " {} {} has no tmux window, resetting to pending",
606 "⚠".yellow(),
607 task_id.cyan()
608 );
609 if let Ok(mut phase) = storage.load_group(tag) {
610 if let Some(task) = phase.get_task_mut(task_id) {
611 task.set_status(TaskStatus::Pending);
612 let _ = storage.update_group(tag, &phase);
613 }
614 }
615 }
616 if !orphans.is_empty() {
617 continue; }
619 }
620
621 println!(
622 "Waiting for {} in-progress task(s) to complete...",
623 in_progress_count.to_string().cyan()
624 );
625 thread::sleep(Duration::from_secs(10));
626 continue;
627 } else {
628 println!("Check for blocked tasks: scud list --status blocked");
629 break;
630 }
631 }
632
633 println!();
634 println!(
635 "{} {} - {} task(s)",
636 "Wave".blue().bold(),
637 wave_number.to_string().cyan(),
638 wave_tasks.len()
639 );
640 println!("{}", "-".repeat(40).blue());
641
642 let mut wave_state = WaveState::new(wave_number);
644 let wave_start = std::time::Instant::now();
645
646 if let Some(ref writer) = event_writer {
648 let _ = writer.log_wave_started(wave_number, wave_tasks.len());
649 }
650
651 if !no_research && wave_number == 1 {
653 println!();
654 println!(" {} Analyzing tasks...", "Research:".magenta());
655 println!(" {} Task analysis complete", "✓".green());
657 }
658
659 let num_rounds = wave_tasks.len().div_ceil(round_size);
661 for (round_idx, round_tasks) in wave_tasks.chunks(round_size).enumerate() {
662 println!();
663 println!(
664 " {} {}/{} - {} task(s)",
665 "Round".yellow(),
666 round_idx + 1,
667 num_rounds,
668 round_tasks.len()
669 );
670
671 let round_state = runtime
674 .run_round(
675 &storage,
676 round_tasks,
677 &working_dir,
678 &session_name,
679 round_idx,
680 harness,
681 stale_timeout,
682 idle_timeout_minutes,
683 event_writer.as_ref(),
684 )
685 .await?;
686
687 if runtime.is_tmux() {
689 let _proxy_path = create_and_update_spawn_proxy(
690 &storage,
691 project_root.as_ref(),
692 &session_name,
693 &phase_tag,
694 &working_dir,
695 &swarm_session,
696 Some(&round_state),
697 )?;
698 }
699
700 wave_state.rounds.push(round_state.clone());
701 println!(" {} Round {} complete", "✓".green(), round_idx + 1);
702 }
703
704 if !no_validate && !bp_config.commands.is_empty() {
706 println!();
707 println!(" {} Running backpressure checks...", "Validate:".magenta());
708
709 let validation_result = backpressure::run_validation(&working_dir, &bp_config)?;
710
711 if validation_result.all_passed {
712 println!(" {} All checks passed", "✓".green());
713
714 if let Some(ref writer) = event_writer {
716 let _ = writer.log_validation_passed();
717 }
718
719 for (task_id, tag) in wave_state.task_tags() {
721 if let Ok(mut phase) = storage.load_group(&tag) {
722 if let Some(task) = phase.get_task_mut(&task_id) {
723 task.set_status(TaskStatus::Done);
724 let _ = storage.update_group(&tag, &phase);
725 }
726 }
727 }
728 } else {
729 println!(" {} Some checks failed:", "!".yellow());
730 for failure in &validation_result.failures {
731 println!(" - {}", failure.red());
732 }
733
734 if let Some(ref writer) = event_writer {
736 let _ = writer.log_validation_failed(&validation_result.failures);
737 }
738
739 if no_repair {
740 let task_tags = wave_state.task_tags();
742 for (task_id, tag) in &task_tags {
743 if let Ok(mut phase) = storage.load_group(tag) {
744 if let Some(task) = phase.get_task_mut(task_id) {
745 task.set_status(TaskStatus::Failed);
746 let _ = storage.update_group(tag, &phase);
747 }
748 }
749 }
750 println!(
751 " {} Marked {} task(s) as failed",
752 "!".yellow(),
753 task_tags.len()
754 );
755 } else {
756 let repaired = run_repair_loop(
758 &storage,
759 &working_dir,
760 &session_name,
761 &bp_config,
762 &wave_state,
763 &validation_result,
764 max_repair_attempts,
765 )?;
766
767 if !repaired {
768 println!(" {} Wave failed after repair attempts", "!".red());
769 }
770 }
771 }
772
773 wave_state.validation = Some(validation_result);
774 }
775
776 let summary = WaveSummary {
778 wave_number,
779 tasks_completed: wave_state.all_task_ids(),
780 files_changed: collect_changed_files(&working_dir, wave_state.start_commit.as_deref())
781 .unwrap_or_default(),
782 };
783 wave_state.summary = Some(summary.clone());
784
785 if (review || review_all) && !dry_run {
787 let wave_tasks: Vec<(String, String)> = wave_state
789 .task_tags()
790 .iter()
791 .filter_map(|(id, tag)| {
792 storage
793 .load_group(tag)
794 .ok()
795 .and_then(|phase| phase.get_task(id).map(|t| (id.clone(), t.title.clone())))
796 })
797 .collect();
798
799 if !wave_tasks.is_empty() {
800 let review_result = spawn_reviewer(
801 &working_dir,
802 &session_name,
803 &summary,
804 &wave_tasks,
805 review_all,
806 )?;
807
808 if !review_result.all_passed && !review_result.tasks_to_improve.is_empty() {
809 println!(
810 " {} Reviewer found issues in: {}",
811 "!".yellow(),
812 review_result.tasks_to_improve.join(", ")
813 );
814
815 for task_id in &review_result.tasks_to_improve {
817 if let Some((task, _tag)) =
819 find_task_with_tag(&storage, task_id, &wave_state.task_tags())
820 {
821 let prompt = format!(
822 "Improve SCUD task {}: {}\n\nThe reviewer flagged this task for improvements. \
823 Review the implementation and make it better. When done: scud set-status {} done",
824 task.id, task.title, task.id
825 );
826
827 if let Some(agent_def) = AgentDef::try_load("builder", &working_dir) {
829 let harness = agent_def.harness()?;
830 let model = agent_def.model();
831
832 let spawn_config = terminal::SpawnConfig {
833 task_id: &format!("improve-{}", task_id),
834 prompt: &prompt,
835 working_dir: &working_dir,
836 session_name: &session_name,
837 harness,
838 model,
839 task_list_id: None,
840 };
841 terminal::spawn_tmux_agent(&spawn_config)?;
842
843 println!(
844 " {} Spawned improvement agent for {}",
845 "✓".green(),
846 task_id
847 );
848 }
849 }
850 }
851 } else {
852 println!(" {} Review complete, all tasks approved", "✓".green());
853 }
854 }
855 }
856
857 if let Some(ref writer) = event_writer {
859 let _ = writer.log_wave_completed(wave_number, wave_start.elapsed().as_millis() as u64);
860 }
861
862 swarm_session.waves.push(wave_state);
864 session::save_session(project_root.as_ref(), &swarm_session)?;
865
866 {
868 let spawn_session = swarm_session.to_spawn_session();
869 monitor::save_session(project_root.as_ref(), &spawn_session)?;
870 }
871
872 wave_number += 1;
873 }
874
875 if let Some(ref writer) = &event_writer {
877 let _ = writer.publish_event(&publisher::ZmqEvent::SwarmCompleted { success: true });
878 let _ = writer.log_swarm_completed(true);
879 }
880
881 create_and_update_spawn_proxy(
884 &storage,
885 project_root.as_ref(),
886 &session_name,
887 &phase_tag,
888 &working_dir,
889 &swarm_session,
890 None, )?;
892
893 println!();
894 println!("{}", "Swarm Session Summary".blue().bold());
895 println!("{}", "═".repeat(40).blue());
896 println!(
897 " Waves completed: {}",
898 swarm_session.waves.len().to_string().green()
899 );
900
901 let total_tasks: usize = swarm_session
902 .waves
903 .iter()
904 .flat_map(|w| &w.rounds)
905 .map(|r| r.task_ids.len())
906 .sum();
907 println!(" Tasks executed: {}", total_tasks.to_string().green());
908
909 println!(" {} Spawn proxy updated for monitor/TUI", "✓".green());
910
911 if is_salvo_worktree {
913 if let (Some(main_root), Some(tag_name)) = (&main_project_root, &tag) {
914 if let Err(e) = crate::commands::salvo::sync_to_main(main_root, &working_dir, tag_name)
915 {
916 eprintln!("Warning: Failed to sync salvo back to main: {}", e);
917 eprintln!("Run manually: scud salvo sync {}", tag_name);
918 }
919 }
920 }
921
922 if let Some((handle, stop_flag)) = heartbeat_handle {
924 stop_flag.store(true, Ordering::Relaxed);
925 if let Err(e) = handle.join() {
926 eprintln!("Heartbeat thread join error: {:?}", e);
927 }
928 }
929
930 Ok(())
931}
932
933fn create_and_update_spawn_proxy(
934 storage: &Storage,
935 project_root: Option<&PathBuf>,
936 session_name: &str,
937 phase_tag: &str,
938 working_dir: &Path,
939 swarm_session: &SwarmSession,
940 latest_round: Option<&RoundState>,
941) -> Result<Option<PathBuf>> {
942 let all_phases = storage.load_tasks()?;
943
944 let mut spawn_session = match monitor::load_session(project_root, session_name) {
946 Ok(existing) => existing,
947 Err(_) => SpawnSession::new(
948 session_name,
949 phase_tag,
950 "tmux",
951 &working_dir.to_string_lossy(),
952 ),
953 };
954
955 let tasks_to_add: Vec<String> = match latest_round {
957 Some(round) => round.task_ids.clone(),
958 None => swarm_session
959 .waves
960 .iter()
961 .flat_map(|w| w.all_task_ids())
962 .collect(),
963 };
964
965 let existing_task_ids: std::collections::HashSet<String> = spawn_session
967 .agents
968 .iter()
969 .map(|a| a.task_id.clone())
970 .collect();
971
972 for task_id in &tasks_to_add {
973 if !existing_task_ids.contains(task_id) {
974 if let Some((title, tag)) = find_task_title_tag(&all_phases, task_id) {
975 spawn_session.add_agent(task_id, &title, &tag);
976 }
977 }
978 }
979
980 let session_file = monitor::save_session(project_root, &spawn_session)?;
981 Ok(Some(session_file))
982}
983
984fn find_task_title_tag(
985 phases: &HashMap<String, crate::models::phase::Phase>,
986 task_id: &str,
987) -> Option<(String, String)> {
988 for (tag, phase) in phases {
989 if let Some(task) = phase.get_task(task_id) {
990 return Some((task.title.clone(), tag.clone()));
991 }
992 }
993 None
994}
995
996#[derive(Clone)]
998struct TaskInfo<'a> {
999 task: &'a Task,
1000 tag: String,
1001}
1002
1003fn compute_waves_from_tasks<'a>(
1005 all_phases: &'a HashMap<String, Phase>,
1006 phase_tag: &str,
1007 all_tags: bool,
1008) -> Result<Vec<Vec<TaskInfo<'a>>>> {
1009 use std::collections::HashSet;
1010
1011 let mut actionable: Vec<TaskInfo<'a>> = Vec::new();
1012
1013 let phase_tags: Vec<&String> = if all_tags {
1014 all_phases.keys().collect()
1015 } else {
1016 all_phases
1017 .keys()
1018 .filter(|t| t.as_str() == phase_tag)
1019 .collect()
1020 };
1021
1022 for tag in phase_tags {
1023 if let Some(phase) = all_phases.get(tag) {
1024 for task in &phase.tasks {
1025 if is_actionable_pending_task(task, phase) {
1026 actionable.push(TaskInfo {
1027 task,
1028 tag: tag.clone(),
1029 });
1030 }
1031 }
1032 }
1033 }
1034
1035 if actionable.is_empty() {
1036 return Ok(Vec::new());
1037 }
1038
1039 let task_ids: HashSet<String> = actionable.iter().map(|t| t.task.id.clone()).collect();
1041 let mut in_degree: HashMap<String, usize> = HashMap::new();
1042 let mut dependents: HashMap<String, Vec<String>> = HashMap::new();
1043
1044 let in_progress_ids: HashSet<String> = {
1046 let tags: Vec<&str> = if all_tags {
1047 all_phases.keys().map(|s| s.as_str()).collect()
1048 } else {
1049 vec![phase_tag]
1050 };
1051
1052 tags.iter()
1053 .filter_map(|tag| all_phases.get(*tag))
1054 .flat_map(|phase| &phase.tasks)
1055 .filter(|t| t.status == TaskStatus::InProgress)
1056 .map(|t| t.id.clone())
1057 .collect()
1058 };
1059
1060 for info in &actionable {
1061 in_degree.entry(info.task.id.clone()).or_insert(0);
1062 for dep in &info.task.dependencies {
1063 if task_ids.contains(dep) {
1064 *in_degree.entry(info.task.id.clone()).or_insert(0) += 1;
1066 dependents
1067 .entry(dep.clone())
1068 .or_default()
1069 .push(info.task.id.clone());
1070 } else if in_progress_ids.contains(dep) {
1071 *in_degree.entry(info.task.id.clone()).or_insert(0) += 1000;
1074 }
1075 }
1077 }
1078
1079 let mut waves: Vec<Vec<TaskInfo<'a>>> = Vec::new();
1080 let mut remaining = in_degree.clone();
1081
1082 while !remaining.is_empty() {
1083 let ready: Vec<String> = remaining
1084 .iter()
1085 .filter(|(_, °)| deg == 0)
1086 .map(|(id, _)| id.clone())
1087 .collect();
1088
1089 if ready.is_empty() {
1090 break; }
1092
1093 let wave: Vec<TaskInfo<'a>> = actionable
1094 .iter()
1095 .filter(|t| ready.contains(&t.task.id))
1096 .cloned()
1097 .collect();
1098
1099 for task_id in &ready {
1100 remaining.remove(task_id);
1101 if let Some(deps) = dependents.get(task_id) {
1102 for dep_id in deps {
1103 if let Some(deg) = remaining.get_mut(dep_id) {
1104 *deg = deg.saturating_sub(1);
1105 }
1106 }
1107 }
1108 }
1109
1110 waves.push(wave);
1111 }
1112
1113 Ok(waves)
1114}
1115
1116fn tmux_window_exists_for_task(session_name: &str, task_id: &str) -> bool {
1118 let window_name = format!("task-{}", task_id);
1119 terminal::tmux_window_exists(session_name, &window_name)
1120}
1121
1122fn find_orphan_tasks(
1124 all_phases: &HashMap<String, Phase>,
1125 phase_tag: &str,
1126 all_tags: bool,
1127 session_name: &str,
1128) -> Vec<(String, String)> {
1129 let tags: Vec<&str> = if all_tags {
1131 all_phases.keys().map(|s| s.as_str()).collect()
1132 } else {
1133 vec![phase_tag]
1134 };
1135
1136 let mut orphans = Vec::new();
1137
1138 for tag in tags {
1139 if let Some(phase) = all_phases.get(tag) {
1140 for task in &phase.tasks {
1141 if task.status == TaskStatus::InProgress
1142 && !tmux_window_exists_for_task(session_name, &task.id)
1143 {
1144 orphans.push((task.id.clone(), tag.to_string()));
1145 }
1146 }
1147 }
1148 }
1149
1150 orphans
1151}
1152
1153fn mark_tasks_in_progress(storage: &Storage, tasks: &[TaskInfo]) {
1155 for info in tasks {
1156 if let Ok(mut phase) = storage.load_group(&info.tag) {
1157 if let Some(task) = phase.get_task_mut(&info.task.id) {
1158 task.set_status(TaskStatus::InProgress);
1159 let _ = storage.update_group(&info.tag, &phase);
1160 }
1161 }
1162 }
1163}
1164
1165fn execute_round(
1166 storage: &Storage,
1167 tasks: &[TaskInfo],
1168 working_dir: &std::path::Path,
1169 session_name: &str,
1170 round_idx: usize,
1171 default_harness: Harness,
1172 event_writer: Option<&events::EventWriter>,
1173) -> Result<RoundState> {
1174 let mut round_state = RoundState::new(round_idx);
1175
1176 for info in tasks.iter() {
1177 let config =
1179 agent::resolve_agent_config(info.task, &info.tag, default_harness, None, working_dir);
1180
1181 if info.task.agent_type.is_some() && !config.from_agent_def {
1183 println!(
1184 " {} Agent '{}' not found, using defaults",
1185 "!".yellow(),
1186 info.task.agent_type.as_deref().unwrap_or("unknown")
1187 );
1188 }
1189
1190 let spawn_config = terminal::SpawnConfig {
1191 task_id: &info.task.id,
1192 prompt: &config.prompt,
1193 working_dir,
1194 session_name,
1195 harness: config.harness,
1196 model: config.model.as_deref(),
1197 task_list_id: None,
1198 };
1199 match terminal::spawn_tmux_agent(&spawn_config) {
1200 Ok(window_index) => {
1201 println!(
1202 " {} Spawned: {} | {} [{}] {}:{}",
1203 "✓".green(),
1204 info.task.id.cyan(),
1205 info.task.title.dimmed(),
1206 config.display_info().dimmed(),
1207 session_name.dimmed(),
1208 window_index.dimmed()
1209 );
1210 round_state.task_ids.push(info.task.id.clone());
1211 round_state.tags.push(info.tag.clone());
1212
1213 if let Some(writer) = event_writer {
1215 let _ = writer.log_spawned(&info.task.id);
1216 }
1217
1218 if let Ok(mut phase) = storage.load_group(&info.tag) {
1219 if let Some(task) = phase.get_task_mut(&info.task.id) {
1220 task.set_status(TaskStatus::InProgress);
1221 let _ = storage.update_group(&info.tag, &phase);
1222 }
1223 }
1224 }
1225 Err(e) => {
1226 println!(" {} Failed: {} - {}", "✗".red(), info.task.id.red(), e);
1227 round_state.failures.push(info.task.id.clone());
1228 }
1229 }
1230
1231 thread::sleep(Duration::from_millis(500));
1232 }
1233
1234 Ok(round_state)
1235}
1236
1237async fn execute_round_extensions<'a>(
1239 storage: &Storage,
1240 tasks: &[TaskInfo<'a>],
1241 working_dir: &std::path::Path,
1242 round_idx: usize,
1243 default_harness: Harness,
1244) -> Result<RoundState> {
1245 let wave_agents: Vec<session::WaveAgent> = tasks
1247 .iter()
1248 .map(|info| session::WaveAgent::new(info.task.clone(), &info.tag))
1249 .collect();
1250
1251 mark_tasks_in_progress(storage, tasks);
1252
1253 let result =
1254 session::execute_wave_async(&wave_agents, working_dir, round_idx, default_harness).await?;
1255
1256 for agent_result in &result.agent_results {
1258 if agent_result.success {
1259 println!(
1260 " {} Completed: {} ({}ms)",
1261 "✓".green(),
1262 agent_result.task_id.cyan(),
1263 agent_result.duration_ms
1264 );
1265 } else {
1266 println!(
1267 " {} Failed: {} (exit code: {:?})",
1268 "✗".red(),
1269 agent_result.task_id.red(),
1270 agent_result.exit_code
1271 );
1272 }
1273 }
1274
1275 for agent_result in &result.agent_results {
1277 if let Some(info) = tasks.iter().find(|t| t.task.id == agent_result.task_id) {
1279 if let Ok(mut phase) = storage.load_group(&info.tag) {
1280 if let Some(task) = phase.get_task_mut(&agent_result.task_id) {
1281 if !agent_result.success && agent_result.exit_code.is_none() {
1284 task.set_status(TaskStatus::Failed);
1285 let _ = storage.update_group(&info.tag, &phase);
1286 }
1287 }
1288 }
1289 }
1290 }
1291
1292 Ok(result.round_state)
1293}
1294
1295async fn execute_round_server<'a>(
1297 storage: &Storage,
1298 tasks: &[TaskInfo<'a>],
1299 working_dir: &std::path::Path,
1300 round_idx: usize,
1301) -> Result<RoundState> {
1302 use crate::opencode::AgentOrchestrator;
1303 use tokio::sync::mpsc;
1304
1305 for info in tasks {
1307 if let Ok(mut phase) = storage.load_group(&info.tag) {
1308 if let Some(task) = phase.get_task_mut(&info.task.id) {
1309 task.set_status(TaskStatus::InProgress);
1310 let _ = storage.update_group(&info.tag, &phase);
1311 }
1312 }
1313 }
1314 let (event_tx, _event_rx) = mpsc::channel(1000);
1316
1317 let mut orchestrator = AgentOrchestrator::new(event_tx.clone()).await?;
1319
1320 let config_path = working_dir.join(".scud").join("config.toml");
1322 let config = crate::config::Config::load(&config_path).unwrap_or_default();
1323 let model_str = config.swarm_model().to_string();
1324 let provider_str = config.swarm.direct_api_provider.clone();
1325
1326 for info in tasks {
1328 let prompt = generate_server_prompt(info.task, &info.tag, working_dir);
1329
1330 let model = Some((provider_str.as_str(), model_str.as_str()));
1331
1332 match orchestrator
1333 .spawn_agent(info.task, &info.tag, &prompt, model)
1334 .await
1335 {
1336 Ok(_) => {
1337 println!(
1338 " {} Spawned: {} | {} [server/{}/{}]",
1339 "✓".green(),
1340 info.task.id.cyan(),
1341 info.task.title.dimmed(),
1342 provider_str,
1343 model_str,
1344 );
1345 }
1346 Err(e) => {
1347 println!(" {} Failed to spawn {}: {}", "✗".red(), info.task.id, e);
1348 }
1349 }
1350 }
1351
1352 drop(event_tx);
1354
1355 let results = orchestrator.wait_all().await;
1357
1358 orchestrator.cleanup().await;
1360
1361 let result = results;
1362
1363 let mut round_state = RoundState::new(round_idx);
1365
1366 for agent_result in &result {
1367 if agent_result.success {
1368 println!(
1369 " {} Completed: {} ({}ms)",
1370 "✓".green(),
1371 agent_result.task_id.cyan(),
1372 agent_result.duration_ms
1373 );
1374 round_state.task_ids.push(agent_result.task_id.clone());
1375 } else {
1376 println!(
1377 " {} Failed: {} (exit code: {:?})",
1378 "✗".red(),
1379 agent_result.task_id.red(),
1380 agent_result.exit_code
1381 );
1382 round_state.failures.push(agent_result.task_id.clone());
1383 }
1384 }
1385
1386 for task_id in &round_state.task_ids {
1388 if let Some(info) = tasks.iter().find(|t| t.task.id == *task_id) {
1389 round_state.tags.push(info.tag.clone());
1390 }
1391 }
1392
1393 for agent_result in &result {
1395 if let Some(info) = tasks.iter().find(|t| t.task.id == agent_result.task_id) {
1396 if let Ok(mut phase) = storage.load_group(&info.tag) {
1397 if let Some(task) = phase.get_task_mut(&agent_result.task_id) {
1398 if !agent_result.success && agent_result.exit_code.is_none() {
1400 task.set_status(TaskStatus::Failed);
1401 let _ = storage.update_group(&info.tag, &phase);
1402 }
1403 }
1404 }
1405 }
1406 }
1407
1408 Ok(round_state)
1409}
1410
1411async fn execute_round_headless(
1416 storage: &Storage,
1417 tasks: &[TaskInfo<'_>],
1418 working_dir: &std::path::Path,
1419 round_idx: usize,
1420 default_harness: Harness,
1421 event_writer: Option<&events::EventWriter>,
1422) -> Result<RoundState> {
1423 use crate::commands::attach::{save_session_metadata, SessionMetadata};
1424
1425 let mut round_state = RoundState::new(round_idx);
1426
1427 let store = StreamStore::new();
1429
1430 for info in tasks {
1432 let config =
1434 agent::resolve_agent_config(info.task, &info.tag, default_harness, None, working_dir);
1435
1436 store.create_session(&info.task.id, &info.tag);
1438
1439 let runner = match headless::create_runner(config.harness) {
1441 Ok(r) => r,
1442 Err(e) => {
1443 println!(
1444 " {} Failed to create runner for {}: {}",
1445 "✗".red(),
1446 info.task.id.red(),
1447 e
1448 );
1449 round_state.failures.push(info.task.id.clone());
1450 continue;
1451 }
1452 };
1453
1454 let spawn_result = runner
1456 .start(
1457 &info.task.id,
1458 &config.prompt,
1459 working_dir,
1460 config.model.as_deref(),
1461 )
1462 .await;
1463
1464 match spawn_result {
1465 Ok(mut session_handle) => {
1466 if let Some(pid) = session_handle.pid() {
1468 store.set_pid(&info.task.id, pid);
1469 }
1470
1471 println!(
1472 " {} Spawned (headless): {} | {} [{}]",
1473 "✓".green(),
1474 info.task.id.cyan(),
1475 info.task.title.dimmed(),
1476 config.display_info().dimmed(),
1477 );
1478
1479 round_state.task_ids.push(info.task.id.clone());
1480 round_state.tags.push(info.tag.clone());
1481
1482 if let Some(writer) = event_writer {
1484 let _ = writer.log_spawned(&info.task.id);
1485 }
1486
1487 if let Ok(mut phase) = storage.load_group(&info.tag) {
1489 if let Some(task) = phase.get_task_mut(&info.task.id) {
1490 task.set_status(TaskStatus::InProgress);
1491 let _ = storage.update_group(&info.tag, &phase);
1492 }
1493 }
1494
1495 let store_clone = store.clone();
1497 let task_id = info.task.id.clone();
1498 let tag = info.tag.clone();
1499 let working_dir_clone = working_dir.to_path_buf();
1500 let harness_name = config.harness.name().to_string();
1501
1502 tokio::spawn(async move {
1503 let mut saw_terminal_event = false;
1504 while let Some(event) = session_handle.events.recv().await {
1505 if matches!(
1506 event.kind,
1507 headless::StreamEventKind::Complete { .. }
1508 | headless::StreamEventKind::Error { .. }
1509 ) {
1510 saw_terminal_event = true;
1511 }
1512
1513 if let headless::StreamEventKind::SessionAssigned { ref session_id } =
1515 event.kind
1516 {
1517 store_clone.set_session_id(&task_id, session_id);
1518
1519 let metadata =
1521 SessionMetadata::new(&task_id, session_id, &tag, &harness_name);
1522 let _ = save_session_metadata(&working_dir_clone, &metadata);
1523 }
1524
1525 store_clone.push_event(&task_id, event);
1526 }
1527
1528 let wait_ok = session_handle.wait().await.unwrap_or(false);
1531 if !saw_terminal_event {
1532 if wait_ok {
1533 store_clone.push_event(&task_id, headless::StreamEvent::complete(true));
1534 } else {
1535 store_clone.push_event(
1536 &task_id,
1537 headless::StreamEvent::error(
1538 "Agent process exited without completion event".to_string(),
1539 ),
1540 );
1541 }
1542 } else if !wait_ok {
1543 store_clone.push_event(
1545 &task_id,
1546 headless::StreamEvent::error(
1547 "Agent process exited with non-zero status".to_string(),
1548 ),
1549 );
1550 }
1551 });
1552 }
1553 Err(e) => {
1554 println!(
1555 " {} Failed (headless): {} - {}",
1556 "✗".red(),
1557 info.task.id.red(),
1558 e
1559 );
1560 round_state.failures.push(info.task.id.clone());
1561
1562 store.push_event(&info.task.id, headless::StreamEvent::error(e.to_string()));
1564 }
1565 }
1566
1567 tokio::time::sleep(Duration::from_millis(200)).await;
1569 }
1570
1571 let max_wait = Duration::from_secs(3600); let start = std::time::Instant::now();
1574 let total_tasks = round_state.task_ids.len();
1575 let mut poll_count = 0u32;
1576 let mut prev_display_lines = 0usize;
1578
1579 loop {
1580 let poll_interval = if poll_count < 5 {
1582 Duration::from_secs(2)
1583 } else {
1584 Duration::from_secs(5)
1585 };
1586 poll_count += 1;
1587
1588 if poll_count == 1 {
1590 tokio::time::sleep(Duration::from_secs(2)).await;
1591 }
1592
1593 let active_tasks = store.active_tasks();
1594 let active_count = active_tasks.len();
1595 if active_count == 0 {
1596 break;
1597 }
1598
1599 if start.elapsed() > max_wait {
1600 println!(
1601 " {} Timeout waiting for {} tasks",
1602 "!".yellow(),
1603 active_count
1604 );
1605 break;
1606 }
1607
1608 if prev_display_lines > 0 {
1610 for _ in 0..prev_display_lines {
1612 print!("\x1b[A\x1b[2K");
1613 }
1614 }
1615
1616 let mut display = Vec::new();
1618 let completed = total_tasks - active_count;
1619 let elapsed = start.elapsed().as_secs();
1620
1621 display.push(format!(
1622 "\n ─── {} {}/{} done ({} active) · {}s ───",
1623 "▶".blue(),
1624 completed,
1625 total_tasks,
1626 active_count,
1627 format_duration(elapsed),
1628 ));
1629
1630 for task_id in &round_state.task_ids {
1632 let status = store.get_status(task_id);
1633 let elapsed_task = store.get_elapsed_secs(task_id).unwrap_or(0);
1634 let stats = store
1635 .session_stats(task_id)
1636 .map(|(events, _)| format!("{}ev", events))
1637 .unwrap_or_default();
1638
1639 let (icon, status_str) = match &status {
1640 Some(SessionStatus::Completed) => ("✓".green(), "done".green()),
1641 Some(SessionStatus::Failed) => ("✗".red(), "fail".red()),
1642 Some(SessionStatus::Running) => ("⟳".blue(), "run".blue()),
1643 Some(SessionStatus::Starting) => ("…".yellow(), "init".yellow()),
1644 None => ("?".dimmed(), "?".dimmed()),
1645 };
1646
1647 let detail = match &status {
1649 Some(SessionStatus::Running) | Some(SessionStatus::Starting) => {
1650 let tool_line = store.get_last_tool_line(task_id);
1652 let activity = tool_line
1653 .or_else(|| store.get_output(task_id, 1).into_iter().next())
1654 .unwrap_or_default();
1655 let trimmed = if activity.len() > 60 {
1656 format!("{}…", &activity[..59])
1657 } else {
1658 activity
1659 };
1660 if trimmed.is_empty() {
1661 format!("{}s {}", format_duration(elapsed_task), stats)
1662 } else {
1663 format!("{}s {} {}", format_duration(elapsed_task), stats, trimmed)
1664 }
1665 }
1666 _ => {
1667 format!("{}s {}", format_duration(elapsed_task), stats)
1668 }
1669 };
1670
1671 display.push(format!(
1672 " {} {:>5} [{}] {}",
1673 icon,
1674 task_id.cyan(),
1675 status_str,
1676 detail.dimmed(),
1677 ));
1678 }
1679
1680 prev_display_lines = display.len();
1682 for line in &display {
1683 println!("{}", line);
1684 }
1685
1686 tokio::time::sleep(poll_interval).await;
1687 }
1688
1689 let elapsed = start.elapsed().as_secs();
1691 let successes = round_state
1692 .task_ids
1693 .iter()
1694 .filter(|id| matches!(store.get_status(id), Some(SessionStatus::Completed)))
1695 .count();
1696 let failures = round_state
1697 .task_ids
1698 .iter()
1699 .filter(|id| matches!(store.get_status(id), Some(SessionStatus::Failed)))
1700 .count();
1701 println!(
1702 "\n ─── Round complete: {} ok, {} failed, {} total in {}s ───",
1703 format!("{}", successes).green(),
1704 format!("{}", failures).red(),
1705 total_tasks,
1706 format_duration(elapsed),
1707 );
1708 for task_id in &round_state.task_ids {
1710 if matches!(store.get_status(task_id), Some(SessionStatus::Failed)) {
1711 println!(" {} {} — last output:", "✗".red(), task_id.red());
1712 let output = store.get_all_output(task_id);
1713 for line in output.iter().rev().take(5).rev() {
1714 let trimmed = if line.len() > 80 {
1715 format!("{}…", &line[..79])
1716 } else {
1717 line.clone()
1718 };
1719 if !trimmed.is_empty() {
1720 println!(" {}", trimmed.dimmed());
1721 }
1722 }
1723 }
1724 }
1725
1726 for task_id in &round_state.task_ids {
1728 if let Some(writer) = event_writer {
1729 let success = matches!(store.get_status(task_id), Some(SessionStatus::Completed));
1730 let _ = writer.log_completed(task_id, success, start.elapsed().as_millis() as u64);
1731 }
1732 }
1733
1734 Ok(round_state)
1735}
1736
1737fn format_duration(secs: u64) -> String {
1739 if secs < 60 {
1740 format!("{}", secs)
1741 } else if secs < 3600 {
1742 format!("{}m{:02}", secs / 60, secs % 60)
1743 } else {
1744 format!("{}h{:02}m", secs / 3600, (secs % 3600) / 60)
1745 }
1746}
1747
1748fn generate_server_prompt(task: &Task, tag: &str, working_dir: &std::path::Path) -> String {
1750 let details = task
1751 .details
1752 .as_ref()
1753 .map(|d| format!("\n\n## Details\n\n{}", d))
1754 .unwrap_or_default();
1755
1756 let test_strategy = task
1757 .test_strategy
1758 .as_ref()
1759 .map(|t| format!("\n\n## Test Strategy\n\n{}", t))
1760 .unwrap_or_default();
1761
1762 format!(
1763 r#"You are working on task [{id}] in phase "{tag}".
1764
1765## Task: {title}
1766
1767{description}{details}{test_strategy}
1768
1769## Instructions
1770
17711. Implement the task requirements
17722. Test your changes
17733. When complete, run: `scud set-status {id} done --tag {tag}`
1774
1775Working directory: {working_dir}
1776"#,
1777 id = task.id,
1778 tag = tag,
1779 title = task.title,
1780 description = task.description,
1781 details = details,
1782 test_strategy = test_strategy,
1783 working_dir = working_dir.display(),
1784 )
1785}
1786
1787fn wait_for_round_completion(
1788 storage: &Storage,
1789 tasks: &[TaskInfo],
1790 session_name: &str,
1791 stale_timeout: Option<Duration>,
1792 idle_timeout_minutes: u64,
1793 event_writer: Option<&events::EventWriter>,
1794) -> Result<()> {
1795 use std::collections::HashSet;
1796 use std::io::Write;
1797 use std::time::Instant;
1798
1799 let task_ids: Vec<String> = tasks.iter().map(|t| t.task.id.clone()).collect();
1800 let task_tags: HashMap<String, String> = tasks
1801 .iter()
1802 .map(|t| (t.task.id.clone(), t.tag.clone()))
1803 .collect();
1804
1805 let round_start = Instant::now();
1806 let mut completed_tasks: HashSet<String> = HashSet::new();
1807 let spinner_chars = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'];
1808 let mut spin_idx: usize = 0;
1809 let mut last_orphan_check = Instant::now();
1810
1811 let mut last_content_hashes: HashMap<String, u64> = HashMap::new();
1813 let mut last_activity: HashMap<String, Instant> = HashMap::new();
1814 for task_id in &task_ids {
1815 last_activity.insert(task_id.clone(), Instant::now());
1816 }
1817
1818 loop {
1819 let mut still_running: Vec<String> = Vec::new();
1820
1821 for task_id in &task_ids {
1822 if completed_tasks.contains(task_id) {
1823 continue;
1824 }
1825
1826 if let Some(tag) = task_tags.get(task_id) {
1827 if let Ok(phase) = storage.load_group(tag) {
1828 if let Some(task) = phase.get_task(task_id) {
1829 if task.status == TaskStatus::InProgress
1830 || task.status == TaskStatus::Pending
1831 {
1832 still_running.push(task_id.clone());
1833 } else {
1834 completed_tasks.insert(task_id.clone());
1836 let elapsed = round_start.elapsed().as_secs();
1837 let status_icon = if task.status == TaskStatus::Done {
1838 "✓".green()
1839 } else {
1840 "✗".red()
1841 };
1842 print!("\r{}\r", " ".repeat(80));
1844 println!(
1845 " {} {} completed ({}s)",
1846 status_icon,
1847 task_id.cyan(),
1848 elapsed
1849 );
1850 if let Some(writer) = event_writer {
1852 let success = task.status == TaskStatus::Done;
1853 let _ = writer.log_completed(
1854 task_id,
1855 success,
1856 round_start.elapsed().as_millis() as u64,
1857 );
1858 }
1859 }
1860 }
1861 }
1862 }
1863 }
1864
1865 if still_running.is_empty() {
1866 print!("\r{}\r", " ".repeat(80));
1868 let _ = std::io::stdout().flush();
1869 break;
1870 }
1871
1872 if last_orphan_check.elapsed() >= Duration::from_secs(30) {
1874 last_orphan_check = Instant::now();
1875 for task_id in &still_running {
1876 if !tmux_window_exists_for_task(session_name, task_id) {
1877 print!("\r{}\r", " ".repeat(80));
1879 println!(
1880 " {} {} agent died (tmux window gone), marking failed",
1881 "⚠".yellow(),
1882 task_id.cyan()
1883 );
1884 if let Some(tag) = task_tags.get(task_id) {
1886 if let Ok(mut phase) = storage.load_group(tag) {
1887 if let Some(task) = phase.get_task_mut(task_id) {
1888 task.set_status(TaskStatus::Failed);
1889 let _ = storage.update_group(tag, &phase);
1890 }
1891 }
1892 }
1893 completed_tasks.insert(task_id.clone());
1894 if let Some(writer) = event_writer {
1896 let event = events::AgentEvent::new(
1897 writer.session_id(),
1898 task_id,
1899 events::EventKind::Failed {
1900 reason: "agent window disappeared".to_string(),
1901 },
1902 );
1903 let _ = writer.write(&event);
1904 }
1905 }
1906 }
1907 }
1908
1909 if let Some(timeout) = stale_timeout {
1911 if round_start.elapsed() >= timeout {
1912 for task_id in &still_running {
1913 if !tmux_window_exists_for_task(session_name, task_id) {
1915 print!("\r{}\r", " ".repeat(80));
1916 println!(
1917 " {} {} stale (timeout + no tmux window), resetting to pending",
1918 "⚠".yellow(),
1919 task_id.cyan()
1920 );
1921 if let Some(tag) = task_tags.get(task_id) {
1922 if let Ok(mut phase) = storage.load_group(tag) {
1923 if let Some(task) = phase.get_task_mut(task_id) {
1924 task.set_status(TaskStatus::Pending);
1925 let _ = storage.update_group(tag, &phase);
1926 }
1927 }
1928 }
1929 completed_tasks.insert(task_id.clone());
1930 }
1931 }
1932 }
1933 }
1934
1935 for task_id in &still_running {
1937 if completed_tasks.contains(task_id) {
1938 continue;
1939 }
1940 let window_name = format!("task-{}", task_id);
1941 let window_target = format!("{}:{}", session_name, window_name);
1942 if let Ok(output) = std::process::Command::new("tmux")
1943 .args(["capture-pane", "-t", &window_target, "-p", "-S", "-20"])
1944 .output()
1945 {
1946 if output.status.success() {
1947 let content = String::from_utf8_lossy(&output.stdout);
1948 let hash = {
1949 use std::hash::{Hash, Hasher};
1950 let mut hasher = std::collections::hash_map::DefaultHasher::new();
1951 content.hash(&mut hasher);
1952 hasher.finish()
1953 };
1954 let prev_hash = last_content_hashes.get(task_id).copied();
1955 if prev_hash.is_none() || prev_hash != Some(hash) {
1956 last_activity.insert(task_id.clone(), Instant::now());
1957 }
1958 last_content_hashes.insert(task_id.clone(), hash);
1959 }
1960 }
1961 }
1962
1963 let idle_timeout = Duration::from_secs(idle_timeout_minutes * 60);
1965 for task_id in &still_running {
1966 if completed_tasks.contains(task_id) {
1967 continue;
1968 }
1969
1970 let is_idle_timeout = last_activity
1972 .get(task_id)
1973 .map(|t| t.elapsed() > idle_timeout)
1974 .unwrap_or(false);
1975
1976 if !is_idle_timeout {
1977 continue;
1978 }
1979
1980 let window_name = format!("task-{}", task_id);
1982 if terminal::tmux_pane_shows_prompt(session_name, &window_name) {
1983 print!("\r{}\r", " ".repeat(80));
1984 println!(
1985 " {} {} agent idle with shell prompt, marking failed",
1986 "⚠".yellow(),
1987 task_id.cyan()
1988 );
1989
1990 if let Some(tag) = task_tags.get(task_id) {
1992 if let Ok(mut phase) = storage.load_group(tag) {
1993 if let Some(task) = phase.get_task_mut(task_id) {
1994 task.set_status(TaskStatus::Failed);
1995 let _ = storage.update_group(tag, &phase);
1996 }
1997 }
1998 }
1999 completed_tasks.insert(task_id.clone());
2000
2001 if let Some(writer) = event_writer {
2003 let event = events::AgentEvent::new(
2004 writer.session_id(),
2005 task_id,
2006 events::EventKind::Failed {
2007 reason: "agent idle with shell prompt (process crashed)".to_string(),
2008 },
2009 );
2010 let _ = writer.write(&event);
2011 }
2012 }
2013 }
2014
2015 let elapsed = round_start.elapsed().as_secs();
2017 let spinner = spinner_chars[spin_idx % spinner_chars.len()];
2018 spin_idx += 1;
2019
2020 let idle_agents: Vec<&String> = still_running
2022 .iter()
2023 .filter(|id| {
2024 !completed_tasks.contains(*id)
2025 && last_activity
2026 .get(*id)
2027 .map(|t| t.elapsed() > Duration::from_secs(60))
2028 .unwrap_or(false)
2029 })
2030 .collect();
2031
2032 let running_count = still_running
2033 .iter()
2034 .filter(|id| !completed_tasks.contains(*id))
2035 .count();
2036
2037 let status = if running_count <= 2 {
2038 let names: Vec<&str> = still_running
2039 .iter()
2040 .filter(|id| !completed_tasks.contains(*id))
2041 .map(|s| s.as_str())
2042 .collect();
2043 format!("{} running: {}", running_count, names.join(", "))
2044 } else {
2045 format!("{} running", running_count)
2046 };
2047
2048 let idle_note = if !idle_agents.is_empty() {
2049 format!(" ({} idle >60s)", idle_agents.len())
2050 } else {
2051 String::new()
2052 };
2053
2054 print!(
2055 "\r Waiting... [{}] {} {}s{}",
2056 status, spinner, elapsed, idle_note
2057 );
2058 let _ = std::io::stdout().flush();
2059
2060 thread::sleep(Duration::from_secs(5));
2061 }
2062
2063 Ok(())
2064}
2065
2066fn collect_changed_files(
2067 working_dir: &std::path::Path,
2068 start_commit: Option<&str>,
2069) -> Result<Vec<String>> {
2070 use std::process::Command;
2071
2072 let range = match start_commit {
2074 Some(commit) => format!("{}..HEAD", commit),
2075 None => "HEAD~1..HEAD".to_string(),
2076 };
2077
2078 let output = Command::new("git")
2079 .current_dir(working_dir)
2080 .args(["diff", "--name-only", &range])
2081 .output()?;
2082
2083 let files: Vec<String> = String::from_utf8_lossy(&output.stdout)
2084 .lines()
2085 .map(|s| s.to_string())
2086 .collect();
2087
2088 Ok(files)
2089}
2090
2091fn run_dry_run(
2092 project_root: Option<PathBuf>,
2093 phase_tag: &str,
2094 round_size: usize,
2095 all_tags: bool,
2096) -> Result<()> {
2097 let storage = Storage::new(project_root);
2098 let all_phases = storage.load_tasks()?;
2099
2100 let waves = compute_waves_from_tasks(&all_phases, phase_tag, all_tags)?;
2101
2102 println!("{}", "Execution Plan (dry-run)".yellow().bold());
2103 println!("{}", "═".repeat(50).yellow());
2104 println!();
2105
2106 let mut total_tasks = 0;
2107 let mut total_rounds = 0;
2108
2109 for (wave_idx, wave) in waves.iter().enumerate() {
2110 let rounds = wave.len().div_ceil(round_size);
2111 total_tasks += wave.len();
2112 total_rounds += rounds;
2113
2114 println!(
2115 "{} {} - {} task(s), {} round(s)",
2116 "Wave".blue().bold(),
2117 wave_idx + 1,
2118 wave.len(),
2119 rounds
2120 );
2121
2122 for (round_idx, chunk) in wave.chunks(round_size).enumerate() {
2123 println!(" {} {}:", "Round".yellow(), round_idx + 1);
2124 for info in chunk {
2125 println!(
2126 " {} {} | {}",
2127 "○".white(),
2128 info.task.id.cyan(),
2129 info.task.title
2130 );
2131 }
2132 }
2133 println!();
2134 }
2135
2136 println!("{}", "Summary".blue().bold());
2137 println!("{}", "-".repeat(30).blue());
2138 println!(" Total waves: {}", waves.len());
2139 println!(" Total tasks: {}", total_tasks);
2140 println!(" Total rounds: {}", total_rounds);
2141
2142 if total_rounds > 0 {
2143 let speedup = total_tasks as f64 / total_rounds as f64;
2144 println!(" Speedup: {}", format!("{:.1}x", speedup).green());
2145 }
2146
2147 println!();
2148 println!("{}", "No agents spawned (dry-run mode).".yellow());
2149
2150 Ok(())
2151}
2152
2153#[derive(Debug)]
2159pub struct ReviewResult {
2160 pub all_passed: bool,
2162 pub tasks_to_improve: Vec<String>,
2164}
2165
2166#[allow(dead_code)]
2168pub fn spawn_reviewer(
2169 working_dir: &std::path::Path,
2170 session_name: &str,
2171 summary: &WaveSummary,
2172 wave_tasks: &[(String, String)], review_all: bool,
2174) -> Result<ReviewResult> {
2175 println!();
2176 println!(" {} Spawning reviewer agent...", "Review:".magenta());
2177
2178 let prompt = agent::generate_review_prompt(summary, wave_tasks, review_all);
2179
2180 let agent_def = AgentDef::try_load("reviewer", working_dir).unwrap_or_else(|| {
2182 AgentDef {
2184 agent: crate::agents::AgentMeta {
2185 name: "reviewer".to_string(),
2186 description: "Code reviewer".to_string(),
2187 },
2188 model: crate::agents::ModelConfig {
2189 harness: "claude".to_string(),
2190 model: Some("opus".to_string()),
2191 },
2192 prompt: Default::default(),
2193 }
2194 });
2195
2196 let harness = agent_def.harness()?;
2197 let model = agent_def.model();
2198
2199 let spawn_config = terminal::SpawnConfig {
2201 task_id: &format!("review-wave-{}", summary.wave_number),
2202 prompt: &prompt,
2203 working_dir,
2204 session_name,
2205 harness,
2206 model,
2207 task_list_id: None,
2208 };
2209 terminal::spawn_tmux_agent(&spawn_config)?;
2210
2211 println!(
2212 " {} Reviewer spawned, waiting for completion...",
2213 "✓".green()
2214 );
2215
2216 wait_for_review_completion(working_dir, summary.wave_number)
2218}
2219
2220fn wait_for_review_completion(
2222 working_dir: &std::path::Path,
2223 wave_number: usize,
2224) -> Result<ReviewResult> {
2225 let marker_path = working_dir
2226 .join(".scud")
2227 .join(format!("review-complete-{}", wave_number));
2228
2229 let timeout = Duration::from_secs(1800); let start = std::time::Instant::now();
2231
2232 loop {
2233 if start.elapsed() > timeout {
2234 println!(" {} Review timed out after 30 minutes", "!".yellow());
2235 return Ok(ReviewResult {
2236 all_passed: true, tasks_to_improve: vec![],
2238 });
2239 }
2240
2241 if marker_path.exists() {
2242 let content = std::fs::read_to_string(&marker_path)?;
2243 std::fs::remove_file(&marker_path)?; let all_passed = content.contains("ALL_PASS");
2246 let tasks_to_improve = if content.contains("IMPROVE_TASKS:") {
2247 content
2248 .lines()
2249 .find(|l| l.starts_with("IMPROVE_TASKS:"))
2250 .map(|l| {
2251 l.strip_prefix("IMPROVE_TASKS:")
2252 .unwrap_or("")
2253 .split(',')
2254 .map(|s| s.trim().to_string())
2255 .filter(|s| !s.is_empty())
2256 .collect()
2257 })
2258 .unwrap_or_default()
2259 } else {
2260 vec![]
2261 };
2262
2263 println!(" {} Review complete", "✓".green());
2264 if !all_passed {
2265 println!(
2266 " {} Tasks needing improvement: {}",
2267 "!".yellow(),
2268 tasks_to_improve.join(", ")
2269 );
2270 }
2271
2272 return Ok(ReviewResult {
2273 all_passed,
2274 tasks_to_improve,
2275 });
2276 }
2277
2278 thread::sleep(Duration::from_secs(5));
2279 }
2280}
2281
2282#[allow(dead_code)]
2288#[allow(clippy::too_many_arguments)]
2289pub fn run_repair_loop(
2290 storage: &Storage,
2291 working_dir: &std::path::Path,
2292 session_name: &str,
2293 bp_config: &BackpressureConfig,
2294 wave_state: &WaveState,
2295 validation_result: &ValidationResult,
2296 max_attempts: usize,
2297) -> Result<bool> {
2298 let wave_tasks = wave_state.all_task_ids();
2299 let task_tags = wave_state.task_tags();
2300
2301 println!();
2302 println!(" {} Analyzing failure attribution...", "Repair:".magenta());
2303
2304 let failed_cmd = validation_result.results.iter().find(|r| !r.passed);
2306 let failed_cmd = match failed_cmd {
2307 Some(cmd) => cmd,
2308 None => return Ok(true), };
2310
2311 let attribution = attribute_failure(
2313 working_dir,
2314 &failed_cmd.stderr,
2315 &failed_cmd.stdout,
2316 &wave_tasks,
2317 wave_state.start_commit.as_deref(),
2318 )?;
2319
2320 match attribution.confidence {
2321 AttributionConfidence::High => {
2322 println!(
2323 " {} High confidence: task {} responsible",
2324 "✓".green(),
2325 attribution.responsible_tasks.join(", ")
2326 );
2327 }
2328 AttributionConfidence::Medium => {
2329 println!(
2330 " {} Medium confidence: tasks {} may be responsible",
2331 "~".yellow(),
2332 attribution.responsible_tasks.join(", ")
2333 );
2334 }
2335 AttributionConfidence::Low => {
2336 println!(
2337 " {} Low confidence: cannot determine specific task",
2338 "!".red()
2339 );
2340 }
2341 }
2342
2343 for task_id in &attribution.cleared_tasks {
2345 if let Some(tag) = task_tags
2346 .iter()
2347 .find(|(id, _)| id == task_id)
2348 .map(|(_, t)| t)
2349 {
2350 if let Ok(mut phase) = storage.load_group(tag) {
2351 if let Some(task) = phase.get_task_mut(task_id) {
2352 task.set_status(TaskStatus::Done);
2353 let _ = storage.update_group(tag, &phase);
2354 println!(" {} Cleared: {} (not responsible)", "✓".green(), task_id);
2355 }
2356 }
2357 }
2358 }
2359
2360 let mut task_infos: Vec<(String, String, Vec<String>)> = Vec::new();
2362 for task_id in &attribution.responsible_tasks {
2363 let (task, _tag) = match find_task_with_tag(storage, task_id, &task_tags) {
2364 Some(t) => t,
2365 None => continue,
2366 };
2367
2368 let task_files = crate::attribution::get_task_changed_files(
2369 working_dir,
2370 task_id,
2371 wave_state.start_commit.as_deref(),
2372 )
2373 .unwrap_or_default()
2374 .into_iter()
2375 .collect();
2376
2377 task_infos.push((task_id.clone(), task.title.clone(), task_files));
2378 }
2379
2380 let error_locations: Vec<(String, Option<u32>)> =
2382 crate::attribution::parse_error_locations(&failed_cmd.stderr, &failed_cmd.stdout);
2383
2384 for attempt in 1..=max_attempts {
2386 println!();
2387 println!(
2388 " {} Batch repair attempt {}/{}",
2389 "Repair:".magenta(),
2390 attempt,
2391 max_attempts
2392 );
2393
2394 let prompt = agent::generate_batch_repair_prompt(
2396 &task_infos,
2397 &failed_cmd.command,
2398 &format!("{}\n{}", failed_cmd.stderr, failed_cmd.stdout),
2399 &error_locations,
2400 );
2401
2402 spawn_batch_repairer(working_dir, session_name, &prompt)?;
2404
2405 let repair_result = wait_for_batch_repair_completion(working_dir)?;
2407
2408 match repair_result {
2409 BatchRepairResult::Success(fixed_tasks) => {
2410 println!();
2412 println!(" {} Re-running validation...", "Validate:".magenta());
2413 let new_result = crate::backpressure::run_validation(working_dir, bp_config)?;
2414
2415 if new_result.all_passed {
2416 println!(" {} Validation passed after batch repair!", "✓".green());
2417
2418 for task_id in &attribution.responsible_tasks {
2420 if let Some(tag) = task_tags
2421 .iter()
2422 .find(|(id, _)| id == task_id)
2423 .map(|(_, t)| t)
2424 {
2425 if let Ok(mut phase) = storage.load_group(tag) {
2426 if let Some(task) = phase.get_task_mut(task_id) {
2427 task.set_status(TaskStatus::Done);
2428 let _ = storage.update_group(tag, &phase);
2429 }
2430 }
2431 }
2432 }
2433
2434 return Ok(true);
2435 }
2436
2437 println!(
2438 " {} Validation still failing (fixed: {}), will retry...",
2439 "!".yellow(),
2440 fixed_tasks.join(", ")
2441 );
2442 }
2443 BatchRepairResult::Partial(fixed, blocked) => {
2444 for task_id in &fixed {
2446 if let Some(tag) = task_tags
2447 .iter()
2448 .find(|(id, _)| id == task_id)
2449 .map(|(_, t)| t)
2450 {
2451 if let Ok(mut phase) = storage.load_group(tag) {
2452 if let Some(task) = phase.get_task_mut(task_id) {
2453 task.set_status(TaskStatus::Done);
2454 let _ = storage.update_group(tag, &phase);
2455 println!(" {} Fixed: {}", "✓".green(), task_id);
2456 }
2457 }
2458 }
2459 }
2460 for task_id in &blocked {
2461 if let Some(tag) = task_tags
2462 .iter()
2463 .find(|(id, _)| id == task_id)
2464 .map(|(_, t)| t)
2465 {
2466 if let Ok(mut phase) = storage.load_group(tag) {
2467 if let Some(task) = phase.get_task_mut(task_id) {
2468 task.set_status(TaskStatus::Blocked);
2469 let _ = storage.update_group(tag, &phase);
2470 println!(" {} Blocked: {}", "!".yellow(), task_id);
2471 }
2472 }
2473 }
2474 }
2475
2476 let new_result = crate::backpressure::run_validation(working_dir, bp_config)?;
2478 if new_result.all_passed {
2479 println!(" {} Validation passed!", "✓".green());
2480 return Ok(true);
2481 }
2482 }
2483 BatchRepairResult::Blocked(reason) => {
2484 println!(" {} Batch repair blocked: {}", "!".red(), reason);
2485 }
2486 BatchRepairResult::Timeout => {
2487 println!(" {} Batch repair timed out", "!".yellow());
2488 }
2489 }
2490 }
2491
2492 println!();
2494 println!(" {} Max repair attempts reached", "!".red());
2495
2496 for task_id in &attribution.responsible_tasks {
2497 if let Some(tag) = task_tags
2498 .iter()
2499 .find(|(id, _)| id == task_id)
2500 .map(|(_, t)| t)
2501 {
2502 if let Ok(mut phase) = storage.load_group(tag) {
2503 if let Some(task) = phase.get_task_mut(task_id) {
2504 task.set_status(TaskStatus::Failed);
2505 let _ = storage.update_group(tag, &phase);
2506 println!(" {} Marked failed: {}", "✗".red(), task_id);
2507 }
2508 }
2509 }
2510 }
2511
2512 Ok(false)
2513}
2514
2515#[allow(dead_code)]
2517fn spawn_repairer(
2518 working_dir: &std::path::Path,
2519 session_name: &str,
2520 task_id: &str,
2521 prompt: &str,
2522) -> Result<()> {
2523 let agent_def = AgentDef::try_load("repairer", working_dir).unwrap_or_else(|| AgentDef {
2525 agent: crate::agents::AgentMeta {
2526 name: "repairer".to_string(),
2527 description: "Repair agent".to_string(),
2528 },
2529 model: crate::agents::ModelConfig {
2530 harness: "claude".to_string(),
2531 model: Some("opus".to_string()),
2532 },
2533 prompt: Default::default(),
2534 });
2535
2536 let harness = agent_def.harness()?;
2537 let model = agent_def.model();
2538
2539 let spawn_config = terminal::SpawnConfig {
2540 task_id: &format!("repair-{}", task_id),
2541 prompt,
2542 working_dir,
2543 session_name,
2544 harness,
2545 model,
2546 task_list_id: None,
2547 };
2548 terminal::spawn_tmux_agent(&spawn_config)?;
2549
2550 println!(" {} Spawned repairer for {}", "✓".green(), task_id);
2551 Ok(())
2552}
2553
2554#[allow(dead_code)]
2556fn wait_for_repair_completion_task(working_dir: &std::path::Path, task_id: &str) -> Result<bool> {
2557 let marker_path = working_dir
2558 .join(".scud")
2559 .join(format!("repair-complete-{}", task_id));
2560
2561 let timeout = Duration::from_secs(1800); let start = std::time::Instant::now();
2563
2564 loop {
2565 if start.elapsed() > timeout {
2566 println!(" {} Repair timed out for {}", "!".yellow(), task_id);
2567 return Ok(false);
2568 }
2569
2570 if marker_path.exists() {
2571 let content = std::fs::read_to_string(&marker_path)?;
2572 std::fs::remove_file(&marker_path)?;
2573
2574 let success = content.contains("SUCCESS");
2575 if success {
2576 println!(" {} Repair completed for {}", "✓".green(), task_id);
2577 } else {
2578 println!(" {} Repair blocked for {}", "!".yellow(), task_id);
2579 }
2580
2581 return Ok(success);
2582 }
2583
2584 thread::sleep(Duration::from_secs(5));
2585 }
2586}
2587
2588enum BatchRepairResult {
2590 Success(Vec<String>), Partial(Vec<String>, Vec<String>), Blocked(String), Timeout, }
2595
2596fn spawn_batch_repairer(
2598 working_dir: &std::path::Path,
2599 session_name: &str,
2600 prompt: &str,
2601) -> Result<()> {
2602 let agent_def = AgentDef::try_load("repairer", working_dir).unwrap_or_else(|| AgentDef {
2604 agent: crate::agents::AgentMeta {
2605 name: "batch-repairer".to_string(),
2606 description: "Batch repair agent".to_string(),
2607 },
2608 model: crate::agents::ModelConfig {
2609 harness: "claude".to_string(),
2610 model: Some("opus".to_string()),
2611 },
2612 prompt: Default::default(),
2613 });
2614
2615 let harness = agent_def.harness()?;
2616 let model = agent_def.model();
2617
2618 let spawn_config = terminal::SpawnConfig {
2619 task_id: "batch-repair",
2620 prompt,
2621 working_dir,
2622 session_name,
2623 harness,
2624 model,
2625 task_list_id: None,
2626 };
2627 terminal::spawn_tmux_agent(&spawn_config)?;
2628
2629 println!(" {} Spawned batch repairer", "✓".green());
2630 Ok(())
2631}
2632
2633fn wait_for_batch_repair_completion(working_dir: &std::path::Path) -> Result<BatchRepairResult> {
2635 let marker_path = working_dir.join(".scud").join("batch-repair-complete");
2636
2637 let timeout = Duration::from_secs(2700); let start = std::time::Instant::now();
2639
2640 loop {
2641 if start.elapsed() > timeout {
2642 return Ok(BatchRepairResult::Timeout);
2643 }
2644
2645 if marker_path.exists() {
2646 let content = std::fs::read_to_string(&marker_path)?;
2647 let _ = std::fs::remove_file(&marker_path); if content.contains("SUCCESS") {
2651 let fixed = parse_task_list(&content, "FIXED_TASKS:");
2652 return Ok(BatchRepairResult::Success(fixed));
2653 } else if content.contains("PARTIAL") {
2654 let fixed = parse_task_list(&content, "FIXED_TASKS:");
2655 let blocked = parse_task_list(&content, "BLOCKED_TASKS:");
2656 return Ok(BatchRepairResult::Partial(fixed, blocked));
2657 } else if content.contains("BLOCKED") {
2658 let reason = content
2659 .lines()
2660 .find(|l| l.starts_with("REASON:"))
2661 .map(|l| l.trim_start_matches("REASON:").trim().to_string())
2662 .unwrap_or_else(|| "Unknown reason".to_string());
2663 return Ok(BatchRepairResult::Blocked(reason));
2664 }
2665 }
2666
2667 thread::sleep(Duration::from_secs(5));
2668 }
2669}
2670
2671fn parse_task_list(content: &str, prefix: &str) -> Vec<String> {
2673 content
2674 .lines()
2675 .find(|l| l.starts_with(prefix))
2676 .map(|l| {
2677 l.trim_start_matches(prefix)
2678 .trim()
2679 .split(',')
2680 .map(|s| s.trim().to_string())
2681 .filter(|s| !s.is_empty())
2682 .collect()
2683 })
2684 .unwrap_or_default()
2685}
2686
2687fn find_task_with_tag(
2689 storage: &Storage,
2690 task_id: &str,
2691 task_tags: &[(String, String)],
2692) -> Option<(Task, String)> {
2693 let tag = task_tags.iter().find(|(id, _)| id == task_id)?.1.clone();
2694 let phase = storage.load_group(&tag).ok()?;
2695 let task = phase.get_task(task_id)?.clone();
2696 Some((task, tag))
2697}