1pub mod beads;
33pub mod events;
34pub mod session;
35pub mod transcript;
36
37pub use crate::backpressure;
42
43use anyhow::Result;
44use colored::Colorize;
45use std::collections::HashMap;
46use std::path::PathBuf;
47use std::thread;
48use std::time::Duration;
49
50use crate::commands::helpers::resolve_group_tag;
51use crate::commands::spawn::agent;
52use crate::commands::spawn::hooks;
53use crate::commands::spawn::monitor::{self, SpawnSession};
54use crate::commands::spawn::terminal::{self, Harness};
55use crate::models::phase::Phase;
56use crate::models::task::{Task, TaskStatus};
57use crate::storage::Storage;
58use std::path::Path;
59
60use self::session::{acquire_session_lock, RoundState, SwarmSession, WaveState, WaveSummary};
61use crate::agents::AgentDef;
62use crate::attribution::{attribute_failure, AttributionConfidence};
63use crate::backpressure::{BackpressureConfig, ValidationResult};
64
65pub use crate::SwarmMode;
67
68#[allow(clippy::too_many_arguments)]
70pub fn run(
71 project_root: Option<PathBuf>,
72 tag: Option<&str>,
73 round_size: usize,
74 all_tags: bool,
75 harness_arg: &str,
76 swarm_mode: SwarmMode,
77 dry_run: bool,
78 session_name: Option<String>,
79 no_research: bool,
80 no_validate: bool,
81 review: bool,
82 review_all: bool,
83 no_repair: bool,
84 max_repair_attempts: usize,
85) -> Result<()> {
86 let effective_tag = tag.unwrap_or("default");
87
88 if round_size == 0 {
89 anyhow::bail!("--round-size must be at least 1");
90 }
91
92 let storage = Storage::new(project_root.clone());
93
94 if !storage.is_initialized() {
95 anyhow::bail!("SCUD not initialized. Run: scud init");
96 }
97
98 if matches!(swarm_mode, SwarmMode::Tmux) {
100 terminal::check_tmux_available()?;
101 }
102
103 let phase_tag = if all_tags {
105 "all".to_string()
106 } else {
107 resolve_group_tag(&storage, tag, true)?
108 };
109
110 let _session_lock = if !dry_run {
113 Some(acquire_session_lock(project_root.as_ref(), &phase_tag)?)
114 } else {
115 None
116 };
117
118 let harness = Harness::parse(harness_arg)?;
120 terminal::find_harness_binary(harness)?;
121
122 let session_name = session_name.unwrap_or_else(|| format!("swarm-{}", effective_tag));
124
125 let working_dir = project_root
127 .clone()
128 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
129
130 let bp_config = BackpressureConfig::load(project_root.as_ref())?;
132
133 println!("{}", "SCUD Swarm Mode".cyan().bold());
135 println!("{}", "═".repeat(50));
136 println!("{:<20} {}", "Tag:".dimmed(), phase_tag.green());
137 println!(
138 "{:<20} {}",
139 "Round size:".dimmed(),
140 round_size.to_string().cyan()
141 );
142 println!(
143 "{:<20} {}",
144 "Research:".dimmed(),
145 if no_research {
146 "skip".yellow()
147 } else {
148 "enabled".green()
149 }
150 );
151 println!(
152 "{:<20} {}",
153 "Validation:".dimmed(),
154 if no_validate {
155 "skip".yellow()
156 } else {
157 "enabled".green()
158 }
159 );
160 println!(
161 "{:<20} {}",
162 "Mode:".dimmed(),
163 match swarm_mode {
164 SwarmMode::Tmux => "tmux (waves)".cyan(),
165 SwarmMode::Extensions => "extensions (waves)".green(),
166 SwarmMode::Server => "server (opencode)".magenta(),
167 SwarmMode::Beads => "beads (continuous)".yellow(),
168 }
169 );
170 println!("{:<20} {}", "Harness:".dimmed(), harness.name().cyan());
171 println!(
172 "{:<20} {}",
173 "Review:".dimmed(),
174 if review_all {
175 "all tasks".green()
176 } else if review {
177 "sample (3 per wave)".green()
178 } else {
179 "disabled".yellow()
180 }
181 );
182 println!(
183 "{:<20} {}",
184 "Repair:".dimmed(),
185 if no_repair {
186 "disabled".yellow()
187 } else {
188 format!("up to {} attempts", max_repair_attempts).green()
189 }
190 );
191
192 if !bp_config.commands.is_empty() && !no_validate {
193 println!(
194 "{:<20} {}",
195 "Backpressure:".dimmed(),
196 bp_config.commands.join(", ").dimmed()
197 );
198 }
199 println!();
200
201 if dry_run {
202 return run_dry_run(project_root, &phase_tag, round_size, all_tags);
203 }
204
205 if !hooks::hooks_installed(&working_dir) {
207 println!("{}", "Installing Claude Code hooks...".dimmed());
208 if let Err(e) = hooks::install_hooks(&working_dir) {
209 println!(
210 " {} Hook installation: {}",
211 "!".yellow(),
212 e.to_string().dimmed()
213 );
214 } else {
215 println!(" {} Hooks installed", "✓".green());
216 }
217 }
218
219 let terminal_mode = match swarm_mode {
221 SwarmMode::Tmux => "tmux",
222 SwarmMode::Extensions => "extensions",
223 SwarmMode::Beads => "beads",
224 SwarmMode::Server => "server",
225 };
226 let mut swarm_session = SwarmSession::new(
227 &session_name,
228 &phase_tag,
229 terminal_mode,
230 &working_dir.to_string_lossy(),
231 round_size,
232 );
233
234 let all_phases = storage.load_tasks()?;
237 if matches!(swarm_mode, SwarmMode::Tmux) {
238 let orphans = find_orphan_tasks(&all_phases, &phase_tag, all_tags, &session_name);
239
240 if !orphans.is_empty() {
241 println!();
242 println!(
243 "{}",
244 "Detected orphan in-progress tasks (no tmux window):".yellow()
245 );
246 for (task_id, tag) in &orphans {
247 println!(
248 " {} {} (tag: {})",
249 "*".yellow(),
250 task_id.cyan(),
251 tag.dimmed()
252 );
253 }
254 println!();
255
256 let choices = vec![
258 "Reset to pending and re-run",
259 "Kill existing windows (if any) and restart",
260 "Skip and continue (leave as in-progress)",
261 "Abort",
262 ];
263
264 let selection = dialoguer::Select::new()
265 .with_prompt("How should orphan tasks be handled?")
266 .items(&choices)
267 .default(0)
268 .interact()?;
269
270 match selection {
271 0 => {
272 for (task_id, tag) in &orphans {
274 if let Ok(mut phase) = storage.load_group(tag) {
275 if let Some(task) = phase.get_task_mut(task_id) {
276 task.set_status(TaskStatus::Pending);
277 storage.update_group(tag, &phase)?;
278 println!(" {} {} -> pending", "v".green(), task_id);
279 }
280 }
281 }
282 }
283 1 => {
284 for (task_id, _) in &orphans {
286 let window_name = format!("task-{}", task_id);
287 let _ = terminal::kill_tmux_window(&session_name, &window_name);
288 }
289 for (task_id, tag) in &orphans {
291 if let Ok(mut phase) = storage.load_group(tag) {
292 if let Some(task) = phase.get_task_mut(task_id) {
293 task.set_status(TaskStatus::Pending);
294 storage.update_group(tag, &phase)?;
295 println!(" {} {} -> pending (will re-spawn)", "v".green(), task_id);
296 }
297 }
298 }
299 }
300 2 => {
301 println!("{}", "Leaving orphan tasks as in-progress.".dimmed());
303 }
304 3 => {
305 anyhow::bail!("Aborted by user");
307 }
308 _ => {}
309 }
310 println!();
311 }
312 }
313
314 if matches!(swarm_mode, SwarmMode::Beads) {
317 let beads_config = beads::BeadsConfig {
318 max_concurrent: round_size, poll_interval: Duration::from_secs(3),
320 };
321
322 terminal::check_tmux_available()?;
324
325 let result = beads::run_beads_loop(
326 &storage,
327 &phase_tag,
328 all_tags,
329 &working_dir,
330 &session_name,
331 harness,
332 &beads_config,
333 &mut swarm_session,
334 )?;
335
336 session::save_session(project_root.as_ref(), &swarm_session)?;
338
339 println!();
341 println!("{}", "Beads Session Summary".blue().bold());
342 println!("{}", "═".repeat(40).blue());
343 println!(
344 " Tasks completed: {}",
345 result.tasks_completed.to_string().green()
346 );
347 println!(
348 " Tasks failed: {}",
349 if result.tasks_failed > 0 {
350 result.tasks_failed.to_string().red()
351 } else {
352 "0".to_string().green()
353 }
354 );
355 println!(
356 " Duration: {}",
357 format!("{:.1}s", result.total_duration.as_secs_f64()).cyan()
358 );
359
360 return Ok(());
361 }
362
363 let mut wave_number = 1;
366 loop {
367 let all_phases = storage.load_tasks()?;
369
370 let waves = compute_waves_from_tasks(&all_phases, &phase_tag, all_tags)?;
372
373 if waves.is_empty() {
374 println!();
375 println!("{}", "All tasks complete!".green().bold());
376 break;
377 }
378
379 let wave_tasks = &waves[0];
381
382 if wave_tasks.is_empty() {
383 println!();
384 println!("{}", "No ready tasks in current wave.".yellow());
385
386 let in_progress_count = count_in_progress(&all_phases, &phase_tag, all_tags);
387 if in_progress_count > 0 {
388 println!(
389 "Waiting for {} in-progress task(s) to complete...",
390 in_progress_count.to_string().cyan()
391 );
392 thread::sleep(Duration::from_secs(10));
393 continue;
394 } else {
395 println!("Check for blocked tasks: scud list --status blocked");
396 break;
397 }
398 }
399
400 println!();
401 println!(
402 "{} {} - {} task(s)",
403 "Wave".blue().bold(),
404 wave_number.to_string().cyan(),
405 wave_tasks.len()
406 );
407 println!("{}", "-".repeat(40).blue());
408
409 let mut wave_state = WaveState::new(wave_number);
411
412 if !no_research && wave_number == 1 {
414 println!();
415 println!(" {} Analyzing tasks...", "Research:".magenta());
416 println!(" {} Task analysis complete", "✓".green());
418 }
419
420 let num_rounds = wave_tasks.len().div_ceil(round_size);
422 for (round_idx, round_tasks) in wave_tasks.chunks(round_size).enumerate() {
423 println!();
424 println!(
425 " {} {}/{} - {} task(s)",
426 "Round".yellow(),
427 round_idx + 1,
428 num_rounds,
429 round_tasks.len()
430 );
431
432 let round_state = match swarm_mode {
435 SwarmMode::Tmux => {
436 let state = execute_round(
437 &storage,
438 round_tasks,
439 &working_dir,
440 &session_name,
441 round_idx,
442 harness,
443 )?;
444
445 let _proxy_path = create_and_update_spawn_proxy(
447 &storage,
448 project_root.as_ref(),
449 &session_name,
450 &phase_tag,
451 &working_dir,
452 &swarm_session,
453 Some(&state),
454 )?;
455
456 println!(" Waiting for round completion...");
458 wait_for_round_completion(&storage, round_tasks)?;
459
460 state
461 }
462 SwarmMode::Extensions => {
463 execute_round_extensions(
465 &storage,
466 round_tasks,
467 &working_dir,
468 round_idx,
469 harness,
470 )?
471 }
472 SwarmMode::Server => {
473 execute_round_server(
475 &storage,
476 round_tasks,
477 &working_dir,
478 round_idx,
479 )?
480 }
481 SwarmMode::Beads => {
482 unreachable!("Beads mode should exit before wave loop")
485 }
486 };
487
488 wave_state.rounds.push(round_state.clone());
489 println!(" {} Round {} complete", "✓".green(), round_idx + 1);
490 }
491
492 if !no_validate && !bp_config.commands.is_empty() {
494 println!();
495 println!(" {} Running backpressure checks...", "Validate:".magenta());
496
497 let validation_result = backpressure::run_validation(&working_dir, &bp_config)?;
498
499 if validation_result.all_passed {
500 println!(" {} All checks passed", "✓".green());
501
502 for (task_id, tag) in wave_state.task_tags() {
504 if let Ok(mut phase) = storage.load_group(&tag) {
505 if let Some(task) = phase.get_task_mut(&task_id) {
506 task.set_status(TaskStatus::Done);
507 let _ = storage.update_group(&tag, &phase);
508 }
509 }
510 }
511 } else {
512 println!(" {} Some checks failed:", "!".yellow());
513 for failure in &validation_result.failures {
514 println!(" - {}", failure.red());
515 }
516
517 if no_repair {
518 let task_tags = wave_state.task_tags();
520 for (task_id, tag) in &task_tags {
521 if let Ok(mut phase) = storage.load_group(tag) {
522 if let Some(task) = phase.get_task_mut(task_id) {
523 task.set_status(TaskStatus::Failed);
524 let _ = storage.update_group(tag, &phase);
525 }
526 }
527 }
528 println!(
529 " {} Marked {} task(s) as failed",
530 "!".yellow(),
531 task_tags.len()
532 );
533 } else {
534 let repaired = run_repair_loop(
536 &storage,
537 &working_dir,
538 &session_name,
539 &bp_config,
540 &wave_state,
541 &validation_result,
542 max_repair_attempts,
543 )?;
544
545 if !repaired {
546 println!(" {} Wave failed after repair attempts", "!".red());
547 }
548 }
549 }
550
551 wave_state.validation = Some(validation_result);
552 }
553
554 let summary = WaveSummary {
556 wave_number,
557 tasks_completed: wave_state.all_task_ids(),
558 files_changed: collect_changed_files(&working_dir, wave_state.start_commit.as_deref())
559 .unwrap_or_default(),
560 };
561 wave_state.summary = Some(summary.clone());
562
563 if (review || review_all) && !dry_run {
565 let wave_tasks: Vec<(String, String)> = wave_state
567 .task_tags()
568 .iter()
569 .filter_map(|(id, tag)| {
570 storage
571 .load_group(tag)
572 .ok()
573 .and_then(|phase| phase.get_task(id).map(|t| (id.clone(), t.title.clone())))
574 })
575 .collect();
576
577 if !wave_tasks.is_empty() {
578 let review_result = spawn_reviewer(
579 &working_dir,
580 &session_name,
581 &summary,
582 &wave_tasks,
583 review_all,
584 )?;
585
586 if !review_result.all_passed && !review_result.tasks_to_improve.is_empty() {
587 println!(
588 " {} Reviewer found issues in: {}",
589 "!".yellow(),
590 review_result.tasks_to_improve.join(", ")
591 );
592
593 for task_id in &review_result.tasks_to_improve {
595 if let Some((task, _tag)) =
597 find_task_with_tag(&storage, task_id, &wave_state.task_tags())
598 {
599 let prompt = format!(
600 "Improve SCUD task {}: {}\n\nThe reviewer flagged this task for improvements. \
601 Review the implementation and make it better. When done: scud set-status {} done",
602 task.id, task.title, task.id
603 );
604
605 if let Some(agent_def) = AgentDef::try_load("builder", &working_dir) {
607 let harness = agent_def.harness()?;
608 let model = agent_def.model();
609
610 terminal::spawn_terminal_with_harness_and_model(
611 &format!("improve-{}", task_id),
612 &prompt,
613 &working_dir,
614 &session_name,
615 harness,
616 model,
617 )?;
618
619 println!(
620 " {} Spawned improvement agent for {}",
621 "✓".green(),
622 task_id
623 );
624 }
625 }
626 }
627 } else {
628 println!(" {} Review complete, all tasks approved", "✓".green());
629 }
630 }
631 }
632
633 swarm_session.waves.push(wave_state);
635 session::save_session(project_root.as_ref(), &swarm_session)?;
636
637 wave_number += 1;
638 }
639
640 create_and_update_spawn_proxy(
643 &storage,
644 project_root.as_ref(),
645 &session_name,
646 &phase_tag,
647 &working_dir,
648 &swarm_session,
649 None, )?;
651
652 println!();
653 println!("{}", "Swarm Session Summary".blue().bold());
654 println!("{}", "═".repeat(40).blue());
655 println!(
656 " Waves completed: {}",
657 swarm_session.waves.len().to_string().green()
658 );
659
660 let total_tasks: usize = swarm_session
661 .waves
662 .iter()
663 .flat_map(|w| &w.rounds)
664 .map(|r| r.task_ids.len())
665 .sum();
666 println!(" Tasks executed: {}", total_tasks.to_string().green());
667
668 println!(" {} Spawn proxy updated for monitor/TUI", "✓".green());
669
670 Ok(())
671}
672
673fn create_and_update_spawn_proxy(
674 storage: &Storage,
675 project_root: Option<&PathBuf>,
676 session_name: &str,
677 phase_tag: &str,
678 working_dir: &Path,
679 swarm_session: &SwarmSession,
680 latest_round: Option<&RoundState>,
681) -> Result<Option<PathBuf>> {
682 let all_phases = storage.load_tasks()?;
683
684 let mut spawn_session = match monitor::load_session(project_root, session_name) {
686 Ok(existing) => existing,
687 Err(_) => SpawnSession::new(
688 session_name,
689 phase_tag,
690 "tmux",
691 &working_dir.to_string_lossy(),
692 ),
693 };
694
695 let tasks_to_add: Vec<String> = match latest_round {
697 Some(round) => round.task_ids.clone(),
698 None => swarm_session
699 .waves
700 .iter()
701 .flat_map(|w| w.all_task_ids())
702 .collect(),
703 };
704
705 let existing_task_ids: std::collections::HashSet<String> = spawn_session
707 .agents
708 .iter()
709 .map(|a| a.task_id.clone())
710 .collect();
711
712 for task_id in &tasks_to_add {
713 if !existing_task_ids.contains(task_id) {
714 if let Some((title, tag)) = find_task_title_tag(&all_phases, task_id) {
715 spawn_session.add_agent(task_id, &title, &tag);
716 }
717 }
718 }
719
720 let session_file = monitor::save_session(project_root, &spawn_session)?;
721 Ok(Some(session_file))
722}
723
724fn find_task_title_tag<'a>(
725 phases: &'a HashMap<String, crate::models::phase::Phase>,
726 task_id: &str,
727) -> Option<(String, String)> {
728 for (tag, phase) in phases {
729 if let Some(task) = phase.get_task(task_id) {
730 return Some((task.title.clone(), tag.clone()));
731 }
732 }
733 None
734}
735
736#[derive(Clone)]
738struct TaskInfo<'a> {
739 task: &'a Task,
740 tag: String,
741}
742
743fn compute_waves_from_tasks<'a>(
745 all_phases: &'a HashMap<String, Phase>,
746 phase_tag: &str,
747 all_tags: bool,
748) -> Result<Vec<Vec<TaskInfo<'a>>>> {
749 use std::collections::HashSet;
750
751 let mut actionable: Vec<TaskInfo<'a>> = Vec::new();
752
753 let phase_tags: Vec<&String> = if all_tags {
754 all_phases.keys().collect()
755 } else {
756 all_phases
757 .keys()
758 .filter(|t| t.as_str() == phase_tag)
759 .collect()
760 };
761
762 for tag in phase_tags {
763 if let Some(phase) = all_phases.get(tag) {
764 for task in &phase.tasks {
765 if is_task_actionable(task, phase) {
766 actionable.push(TaskInfo {
767 task,
768 tag: tag.clone(),
769 });
770 }
771 }
772 }
773 }
774
775 if actionable.is_empty() {
776 return Ok(Vec::new());
777 }
778
779 let task_ids: HashSet<String> = actionable.iter().map(|t| t.task.id.clone()).collect();
781 let mut in_degree: HashMap<String, usize> = HashMap::new();
782 let mut dependents: HashMap<String, Vec<String>> = HashMap::new();
783
784 let in_progress_ids: HashSet<String> = {
786 let tags: Vec<&str> = if all_tags {
787 all_phases.keys().map(|s| s.as_str()).collect()
788 } else {
789 vec![phase_tag]
790 };
791
792 tags.iter()
793 .filter_map(|tag| all_phases.get(*tag))
794 .flat_map(|phase| &phase.tasks)
795 .filter(|t| t.status == TaskStatus::InProgress)
796 .map(|t| t.id.clone())
797 .collect()
798 };
799
800 for info in &actionable {
801 in_degree.entry(info.task.id.clone()).or_insert(0);
802 for dep in &info.task.dependencies {
803 if task_ids.contains(dep) {
804 *in_degree.entry(info.task.id.clone()).or_insert(0) += 1;
806 dependents
807 .entry(dep.clone())
808 .or_default()
809 .push(info.task.id.clone());
810 } else if in_progress_ids.contains(dep) {
811 *in_degree.entry(info.task.id.clone()).or_insert(0) += 1000;
814 }
815 }
817 }
818
819 let mut waves: Vec<Vec<TaskInfo<'a>>> = Vec::new();
820 let mut remaining = in_degree.clone();
821
822 while !remaining.is_empty() {
823 let ready: Vec<String> = remaining
824 .iter()
825 .filter(|(_, °)| deg == 0)
826 .map(|(id, _)| id.clone())
827 .collect();
828
829 if ready.is_empty() {
830 break; }
832
833 let wave: Vec<TaskInfo<'a>> = actionable
834 .iter()
835 .filter(|t| ready.contains(&t.task.id))
836 .cloned()
837 .collect();
838
839 for task_id in &ready {
840 remaining.remove(task_id);
841 if let Some(deps) = dependents.get(task_id) {
842 for dep_id in deps {
843 if let Some(deg) = remaining.get_mut(dep_id) {
844 *deg = deg.saturating_sub(1);
845 }
846 }
847 }
848 }
849
850 waves.push(wave);
851 }
852
853 Ok(waves)
854}
855
856fn is_task_actionable(task: &Task, phase: &Phase) -> bool {
857 if task.status != TaskStatus::Pending {
858 return false;
859 }
860 if task.is_expanded() {
861 return false;
862 }
863 if let Some(ref parent_id) = task.parent_id {
864 let parent_expanded = phase
865 .get_task(parent_id)
866 .map(|p| p.is_expanded())
867 .unwrap_or(false);
868 if !parent_expanded {
869 return false;
870 }
871 }
872 true
873}
874
875fn count_in_progress(
876 all_phases: &HashMap<String, Phase>,
877 phase_tag: &str,
878 all_tags: bool,
879) -> usize {
880 let tags: Vec<&String> = if all_tags {
881 all_phases.keys().collect()
882 } else {
883 all_phases
884 .keys()
885 .filter(|t| t.as_str() == phase_tag)
886 .collect()
887 };
888
889 tags.iter()
890 .filter_map(|tag| all_phases.get(*tag))
891 .flat_map(|phase| &phase.tasks)
892 .filter(|t| t.status == TaskStatus::InProgress)
893 .count()
894}
895
896fn tmux_window_exists_for_task(session_name: &str, task_id: &str) -> bool {
898 let window_name = format!("task-{}", task_id);
899 terminal::tmux_window_exists(session_name, &window_name)
900}
901
902fn find_orphan_tasks(
904 all_phases: &HashMap<String, Phase>,
905 phase_tag: &str,
906 all_tags: bool,
907 session_name: &str,
908) -> Vec<(String, String)> {
909 let tags: Vec<&str> = if all_tags {
911 all_phases.keys().map(|s| s.as_str()).collect()
912 } else {
913 vec![phase_tag]
914 };
915
916 let mut orphans = Vec::new();
917
918 for tag in tags {
919 if let Some(phase) = all_phases.get(tag) {
920 for task in &phase.tasks {
921 if task.status == TaskStatus::InProgress
922 && !tmux_window_exists_for_task(session_name, &task.id)
923 {
924 orphans.push((task.id.clone(), tag.to_string()));
925 }
926 }
927 }
928 }
929
930 orphans
931}
932
933fn execute_round(
934 storage: &Storage,
935 tasks: &[TaskInfo],
936 working_dir: &std::path::Path,
937 session_name: &str,
938 round_idx: usize,
939 default_harness: Harness,
940) -> Result<RoundState> {
941 let mut round_state = RoundState::new(round_idx);
942
943 for info in tasks.iter() {
944 let config =
946 agent::resolve_agent_config(info.task, &info.tag, default_harness, None, working_dir);
947
948 if info.task.agent_type.is_some() && !config.from_agent_def {
950 println!(
951 " {} Agent '{}' not found, using defaults",
952 "!".yellow(),
953 info.task.agent_type.as_deref().unwrap_or("unknown")
954 );
955 }
956
957 match terminal::spawn_terminal_with_harness_and_model(
958 &info.task.id,
959 &config.prompt,
960 working_dir,
961 session_name,
962 config.harness,
963 config.model.as_deref(),
964 ) {
965 Ok(window_index) => {
966 println!(
967 " {} Spawned: {} | {} [{}] {}:{}",
968 "✓".green(),
969 info.task.id.cyan(),
970 info.task.title.dimmed(),
971 config.display_info().dimmed(),
972 session_name.dimmed(),
973 window_index.dimmed()
974 );
975 round_state.task_ids.push(info.task.id.clone());
976 round_state.tags.push(info.tag.clone());
977
978 if let Ok(mut phase) = storage.load_group(&info.tag) {
979 if let Some(task) = phase.get_task_mut(&info.task.id) {
980 task.set_status(TaskStatus::InProgress);
981 let _ = storage.update_group(&info.tag, &phase);
982 }
983 }
984 }
985 Err(e) => {
986 println!(" {} Failed: {} - {}", "✗".red(), info.task.id.red(), e);
987 round_state.failures.push(info.task.id.clone());
988 }
989 }
990
991 thread::sleep(Duration::from_millis(500));
992 }
993
994 Ok(round_state)
995}
996
997fn execute_round_extensions(
999 storage: &Storage,
1000 tasks: &[TaskInfo],
1001 working_dir: &std::path::Path,
1002 round_idx: usize,
1003 default_harness: Harness,
1004) -> Result<RoundState> {
1005 let wave_agents: Vec<session::WaveAgent> = tasks
1007 .iter()
1008 .map(|info| session::WaveAgent::new(info.task.clone(), &info.tag))
1009 .collect();
1010
1011 for info in tasks {
1013 if let Ok(mut phase) = storage.load_group(&info.tag) {
1014 if let Some(task) = phase.get_task_mut(&info.task.id) {
1015 task.set_status(TaskStatus::InProgress);
1016 let _ = storage.update_group(&info.tag, &phase);
1017 }
1018 }
1019 }
1020
1021 let handle = tokio::runtime::Handle::current();
1023 let result = handle.block_on(async {
1024 session::execute_wave_async(&wave_agents, working_dir, round_idx, default_harness).await
1025 })?;
1026
1027 for agent_result in &result.agent_results {
1029 if agent_result.success {
1030 println!(
1031 " {} Completed: {} ({}ms)",
1032 "✓".green(),
1033 agent_result.task_id.cyan(),
1034 agent_result.duration_ms
1035 );
1036 } else {
1037 println!(
1038 " {} Failed: {} (exit code: {:?})",
1039 "✗".red(),
1040 agent_result.task_id.red(),
1041 agent_result.exit_code
1042 );
1043 }
1044 }
1045
1046 for agent_result in &result.agent_results {
1048 if let Some(info) = tasks.iter().find(|t| t.task.id == agent_result.task_id) {
1050 if let Ok(mut phase) = storage.load_group(&info.tag) {
1051 if let Some(task) = phase.get_task_mut(&agent_result.task_id) {
1052 if !agent_result.success && agent_result.exit_code.is_none() {
1055 task.set_status(TaskStatus::Failed);
1056 let _ = storage.update_group(&info.tag, &phase);
1057 }
1058 }
1059 }
1060 }
1061 }
1062
1063 Ok(result.round_state)
1064}
1065
1066fn execute_round_server(
1068 storage: &Storage,
1069 tasks: &[TaskInfo],
1070 working_dir: &std::path::Path,
1071 round_idx: usize,
1072) -> Result<RoundState> {
1073 use crate::opencode::AgentOrchestrator;
1074 use tokio::sync::mpsc;
1075
1076 for info in tasks {
1078 if let Ok(mut phase) = storage.load_group(&info.tag) {
1079 if let Some(task) = phase.get_task_mut(&info.task.id) {
1080 task.set_status(TaskStatus::InProgress);
1081 let _ = storage.update_group(&info.tag, &phase);
1082 }
1083 }
1084 }
1085
1086 let handle = tokio::runtime::Handle::current();
1088 let result = handle.block_on(async {
1089 let (event_tx, _event_rx) = mpsc::channel(1000);
1091
1092 let mut orchestrator = AgentOrchestrator::new(event_tx.clone()).await?;
1094
1095 for info in tasks {
1097 let prompt = generate_server_prompt(info.task, &info.tag, working_dir);
1098
1099 let model = Some(("xai", "grok-3"));
1101
1102 match orchestrator.spawn_agent(info.task, &info.tag, &prompt, model).await {
1103 Ok(_) => {
1104 println!(
1105 " {} Spawned: {} | {} [server/grok-3]",
1106 "✓".green(),
1107 info.task.id.cyan(),
1108 info.task.title.dimmed(),
1109 );
1110 }
1111 Err(e) => {
1112 println!(
1113 " {} Failed to spawn {}: {}",
1114 "✗".red(),
1115 info.task.id,
1116 e
1117 );
1118 }
1119 }
1120 }
1121
1122 drop(event_tx);
1124
1125 let results = orchestrator.wait_all().await;
1127
1128 orchestrator.cleanup().await;
1130
1131 Ok::<_, anyhow::Error>(results)
1132 })?;
1133
1134 let mut round_state = RoundState::new(round_idx);
1136
1137 for agent_result in &result {
1138 if agent_result.success {
1139 println!(
1140 " {} Completed: {} ({}ms)",
1141 "✓".green(),
1142 agent_result.task_id.cyan(),
1143 agent_result.duration_ms
1144 );
1145 round_state.task_ids.push(agent_result.task_id.clone());
1146 } else {
1147 println!(
1148 " {} Failed: {} (exit code: {:?})",
1149 "✗".red(),
1150 agent_result.task_id.red(),
1151 agent_result.exit_code
1152 );
1153 round_state.failures.push(agent_result.task_id.clone());
1154 }
1155 }
1156
1157 for task_id in &round_state.task_ids {
1159 if let Some(info) = tasks.iter().find(|t| t.task.id == *task_id) {
1160 round_state.tags.push(info.tag.clone());
1161 }
1162 }
1163
1164 for agent_result in &result {
1166 if let Some(info) = tasks.iter().find(|t| t.task.id == agent_result.task_id) {
1167 if let Ok(mut phase) = storage.load_group(&info.tag) {
1168 if let Some(task) = phase.get_task_mut(&agent_result.task_id) {
1169 if !agent_result.success && agent_result.exit_code.is_none() {
1171 task.set_status(TaskStatus::Failed);
1172 let _ = storage.update_group(&info.tag, &phase);
1173 }
1174 }
1175 }
1176 }
1177 }
1178
1179 Ok(round_state)
1180}
1181
1182fn generate_server_prompt(task: &Task, tag: &str, working_dir: &std::path::Path) -> String {
1184 let details = task
1185 .details
1186 .as_ref()
1187 .map(|d| format!("\n\n## Details\n\n{}", d))
1188 .unwrap_or_default();
1189
1190 let test_strategy = task
1191 .test_strategy
1192 .as_ref()
1193 .map(|t| format!("\n\n## Test Strategy\n\n{}", t))
1194 .unwrap_or_default();
1195
1196 format!(
1197 r#"You are working on task [{id}] in phase "{tag}".
1198
1199## Task: {title}
1200
1201{description}{details}{test_strategy}
1202
1203## Instructions
1204
12051. Implement the task requirements
12062. Test your changes
12073. When complete, run: `scud set-status {id} done --tag {tag}`
1208
1209Working directory: {working_dir}
1210"#,
1211 id = task.id,
1212 tag = tag,
1213 title = task.title,
1214 description = task.description,
1215 details = details,
1216 test_strategy = test_strategy,
1217 working_dir = working_dir.display(),
1218 )
1219}
1220
1221fn wait_for_round_completion(storage: &Storage, tasks: &[TaskInfo]) -> Result<()> {
1222 let task_ids: Vec<String> = tasks.iter().map(|t| t.task.id.clone()).collect();
1223 let task_tags: HashMap<String, String> = tasks
1224 .iter()
1225 .map(|t| (t.task.id.clone(), t.tag.clone()))
1226 .collect();
1227
1228 loop {
1229 let mut all_done = true;
1230
1231 for task_id in &task_ids {
1232 if let Some(tag) = task_tags.get(task_id) {
1233 if let Ok(phase) = storage.load_group(tag) {
1234 if let Some(task) = phase.get_task(task_id) {
1235 if task.status == TaskStatus::InProgress
1236 || task.status == TaskStatus::Pending
1237 {
1238 all_done = false;
1239 break;
1240 }
1241 }
1242 }
1243 }
1244 }
1245
1246 if all_done {
1247 break;
1248 }
1249
1250 thread::sleep(Duration::from_secs(5));
1251 }
1252
1253 Ok(())
1254}
1255
1256fn collect_changed_files(
1257 working_dir: &std::path::Path,
1258 start_commit: Option<&str>,
1259) -> Result<Vec<String>> {
1260 use std::process::Command;
1261
1262 let range = match start_commit {
1264 Some(commit) => format!("{}..HEAD", commit),
1265 None => "HEAD~1..HEAD".to_string(),
1266 };
1267
1268 let output = Command::new("git")
1269 .current_dir(working_dir)
1270 .args(["diff", "--name-only", &range])
1271 .output()?;
1272
1273 let files: Vec<String> = String::from_utf8_lossy(&output.stdout)
1274 .lines()
1275 .map(|s| s.to_string())
1276 .collect();
1277
1278 Ok(files)
1279}
1280
1281fn run_dry_run(
1282 project_root: Option<PathBuf>,
1283 phase_tag: &str,
1284 round_size: usize,
1285 all_tags: bool,
1286) -> Result<()> {
1287 let storage = Storage::new(project_root);
1288 let all_phases = storage.load_tasks()?;
1289
1290 let waves = compute_waves_from_tasks(&all_phases, phase_tag, all_tags)?;
1291
1292 println!("{}", "Execution Plan (dry-run)".yellow().bold());
1293 println!("{}", "═".repeat(50).yellow());
1294 println!();
1295
1296 let mut total_tasks = 0;
1297 let mut total_rounds = 0;
1298
1299 for (wave_idx, wave) in waves.iter().enumerate() {
1300 let rounds = wave.len().div_ceil(round_size);
1301 total_tasks += wave.len();
1302 total_rounds += rounds;
1303
1304 println!(
1305 "{} {} - {} task(s), {} round(s)",
1306 "Wave".blue().bold(),
1307 wave_idx + 1,
1308 wave.len(),
1309 rounds
1310 );
1311
1312 for (round_idx, chunk) in wave.chunks(round_size).enumerate() {
1313 println!(" {} {}:", "Round".yellow(), round_idx + 1);
1314 for info in chunk {
1315 println!(
1316 " {} {} | {}",
1317 "○".white(),
1318 info.task.id.cyan(),
1319 info.task.title
1320 );
1321 }
1322 }
1323 println!();
1324 }
1325
1326 println!("{}", "Summary".blue().bold());
1327 println!("{}", "-".repeat(30).blue());
1328 println!(" Total waves: {}", waves.len());
1329 println!(" Total tasks: {}", total_tasks);
1330 println!(" Total rounds: {}", total_rounds);
1331
1332 if total_rounds > 0 {
1333 let speedup = total_tasks as f64 / total_rounds as f64;
1334 println!(" Speedup: {}", format!("{:.1}x", speedup).green());
1335 }
1336
1337 println!();
1338 println!("{}", "No agents spawned (dry-run mode).".yellow());
1339
1340 Ok(())
1341}
1342
1343#[derive(Debug)]
1349pub struct ReviewResult {
1350 pub all_passed: bool,
1352 pub tasks_to_improve: Vec<String>,
1354}
1355
1356#[allow(dead_code)]
1358pub fn spawn_reviewer(
1359 working_dir: &std::path::Path,
1360 session_name: &str,
1361 summary: &WaveSummary,
1362 wave_tasks: &[(String, String)], review_all: bool,
1364) -> Result<ReviewResult> {
1365 println!();
1366 println!(" {} Spawning reviewer agent...", "Review:".magenta());
1367
1368 let prompt = agent::generate_review_prompt(summary, wave_tasks, review_all);
1369
1370 let agent_def = AgentDef::try_load("reviewer", working_dir).unwrap_or_else(|| {
1372 AgentDef {
1374 agent: crate::agents::AgentMeta {
1375 name: "reviewer".to_string(),
1376 description: "Code reviewer".to_string(),
1377 },
1378 model: crate::agents::ModelConfig {
1379 harness: "claude".to_string(),
1380 model: Some("opus".to_string()),
1381 },
1382 prompt: Default::default(),
1383 }
1384 });
1385
1386 let harness = agent_def.harness()?;
1387 let model = agent_def.model();
1388
1389 terminal::spawn_terminal_with_harness_and_model(
1391 &format!("review-wave-{}", summary.wave_number),
1392 &prompt,
1393 working_dir,
1394 session_name,
1395 harness,
1396 model,
1397 )?;
1398
1399 println!(
1400 " {} Reviewer spawned, waiting for completion...",
1401 "✓".green()
1402 );
1403
1404 wait_for_review_completion(working_dir, summary.wave_number)
1406}
1407
1408fn wait_for_review_completion(
1410 working_dir: &std::path::Path,
1411 wave_number: usize,
1412) -> Result<ReviewResult> {
1413 let marker_path = working_dir
1414 .join(".scud")
1415 .join(format!("review-complete-{}", wave_number));
1416
1417 let timeout = Duration::from_secs(1800); let start = std::time::Instant::now();
1419
1420 loop {
1421 if start.elapsed() > timeout {
1422 println!(" {} Review timed out after 30 minutes", "!".yellow());
1423 return Ok(ReviewResult {
1424 all_passed: true, tasks_to_improve: vec![],
1426 });
1427 }
1428
1429 if marker_path.exists() {
1430 let content = std::fs::read_to_string(&marker_path)?;
1431 std::fs::remove_file(&marker_path)?; let all_passed = content.contains("ALL_PASS");
1434 let tasks_to_improve = if content.contains("IMPROVE_TASKS:") {
1435 content
1436 .lines()
1437 .find(|l| l.starts_with("IMPROVE_TASKS:"))
1438 .map(|l| {
1439 l.strip_prefix("IMPROVE_TASKS:")
1440 .unwrap_or("")
1441 .split(',')
1442 .map(|s| s.trim().to_string())
1443 .filter(|s| !s.is_empty())
1444 .collect()
1445 })
1446 .unwrap_or_default()
1447 } else {
1448 vec![]
1449 };
1450
1451 println!(" {} Review complete", "✓".green());
1452 if !all_passed {
1453 println!(
1454 " {} Tasks needing improvement: {}",
1455 "!".yellow(),
1456 tasks_to_improve.join(", ")
1457 );
1458 }
1459
1460 return Ok(ReviewResult {
1461 all_passed,
1462 tasks_to_improve,
1463 });
1464 }
1465
1466 thread::sleep(Duration::from_secs(5));
1467 }
1468}
1469
1470#[allow(dead_code)]
1476#[allow(clippy::too_many_arguments)]
1477pub fn run_repair_loop(
1478 storage: &Storage,
1479 working_dir: &std::path::Path,
1480 session_name: &str,
1481 bp_config: &BackpressureConfig,
1482 wave_state: &WaveState,
1483 validation_result: &ValidationResult,
1484 max_attempts: usize,
1485) -> Result<bool> {
1486 let wave_tasks = wave_state.all_task_ids();
1487 let task_tags = wave_state.task_tags();
1488
1489 println!();
1490 println!(" {} Analyzing failure attribution...", "Repair:".magenta());
1491
1492 let failed_cmd = validation_result.results.iter().find(|r| !r.passed);
1494 let failed_cmd = match failed_cmd {
1495 Some(cmd) => cmd,
1496 None => return Ok(true), };
1498
1499 let attribution = attribute_failure(
1501 working_dir,
1502 &failed_cmd.stderr,
1503 &failed_cmd.stdout,
1504 &wave_tasks,
1505 wave_state.start_commit.as_deref(),
1506 )?;
1507
1508 match attribution.confidence {
1509 AttributionConfidence::High => {
1510 println!(
1511 " {} High confidence: task {} responsible",
1512 "✓".green(),
1513 attribution.responsible_tasks.join(", ")
1514 );
1515 }
1516 AttributionConfidence::Medium => {
1517 println!(
1518 " {} Medium confidence: tasks {} may be responsible",
1519 "~".yellow(),
1520 attribution.responsible_tasks.join(", ")
1521 );
1522 }
1523 AttributionConfidence::Low => {
1524 println!(
1525 " {} Low confidence: cannot determine specific task",
1526 "!".red()
1527 );
1528 }
1529 }
1530
1531 for task_id in &attribution.cleared_tasks {
1533 if let Some(tag) = task_tags
1534 .iter()
1535 .find(|(id, _)| id == task_id)
1536 .map(|(_, t)| t)
1537 {
1538 if let Ok(mut phase) = storage.load_group(tag) {
1539 if let Some(task) = phase.get_task_mut(task_id) {
1540 task.set_status(TaskStatus::Done);
1541 let _ = storage.update_group(tag, &phase);
1542 println!(" {} Cleared: {} (not responsible)", "✓".green(), task_id);
1543 }
1544 }
1545 }
1546 }
1547
1548 for attempt in 1..=max_attempts {
1550 println!();
1551 println!(
1552 " {} Repair attempt {}/{}",
1553 "Repair:".magenta(),
1554 attempt,
1555 max_attempts
1556 );
1557
1558 let mut all_repaired = true;
1559
1560 for task_id in &attribution.responsible_tasks {
1561 let (task, _tag) = match find_task_with_tag(storage, task_id, &task_tags) {
1563 Some(t) => t,
1564 None => continue,
1565 };
1566
1567 let task_files = crate::attribution::get_task_changed_files(
1569 working_dir,
1570 task_id,
1571 wave_state.start_commit.as_deref(),
1572 )?;
1573
1574 let error_files: Vec<String> =
1576 crate::attribution::parse_error_locations(&failed_cmd.stderr, &failed_cmd.stdout)
1577 .into_iter()
1578 .map(|(f, _)| f)
1579 .collect();
1580
1581 let prompt = agent::generate_repair_prompt(
1583 task_id,
1584 &task.title,
1585 &failed_cmd.command,
1586 &format!("{}\n{}", failed_cmd.stderr, failed_cmd.stdout),
1587 &task_files.into_iter().collect::<Vec<_>>(),
1588 &error_files,
1589 );
1590
1591 spawn_repairer(working_dir, session_name, task_id, &prompt)?;
1593
1594 if !wait_for_repair_completion_task(working_dir, task_id)? {
1596 all_repaired = false;
1597 }
1598 }
1599
1600 if !all_repaired {
1601 println!(" {} Some repairs failed or blocked", "!".yellow());
1602 continue;
1603 }
1604
1605 println!();
1607 println!(" {} Re-running validation...", "Validate:".magenta());
1608 let new_result = crate::backpressure::run_validation(working_dir, bp_config)?;
1609
1610 if new_result.all_passed {
1611 println!(" {} Validation passed after repair!", "✓".green());
1612
1613 for task_id in &attribution.responsible_tasks {
1615 if let Some(tag) = task_tags
1616 .iter()
1617 .find(|(id, _)| id == task_id)
1618 .map(|(_, t)| t)
1619 {
1620 if let Ok(mut phase) = storage.load_group(tag) {
1621 if let Some(task) = phase.get_task_mut(task_id) {
1622 task.set_status(TaskStatus::Done);
1623 let _ = storage.update_group(tag, &phase);
1624 }
1625 }
1626 }
1627 }
1628
1629 return Ok(true);
1630 }
1631
1632 println!(
1633 " {} Validation still failing, will retry...",
1634 "!".yellow()
1635 );
1636 }
1637
1638 println!();
1640 println!(" {} Max repair attempts reached", "!".red());
1641
1642 for task_id in &attribution.responsible_tasks {
1643 if let Some(tag) = task_tags
1644 .iter()
1645 .find(|(id, _)| id == task_id)
1646 .map(|(_, t)| t)
1647 {
1648 if let Ok(mut phase) = storage.load_group(tag) {
1649 if let Some(task) = phase.get_task_mut(task_id) {
1650 task.set_status(TaskStatus::Failed);
1651 let _ = storage.update_group(tag, &phase);
1652 println!(" {} Marked failed: {}", "✗".red(), task_id);
1653 }
1654 }
1655 }
1656 }
1657
1658 Ok(false)
1659}
1660
1661fn spawn_repairer(
1663 working_dir: &std::path::Path,
1664 session_name: &str,
1665 task_id: &str,
1666 prompt: &str,
1667) -> Result<()> {
1668 let agent_def = AgentDef::try_load("repairer", working_dir).unwrap_or_else(|| AgentDef {
1670 agent: crate::agents::AgentMeta {
1671 name: "repairer".to_string(),
1672 description: "Repair agent".to_string(),
1673 },
1674 model: crate::agents::ModelConfig {
1675 harness: "claude".to_string(),
1676 model: Some("opus".to_string()),
1677 },
1678 prompt: Default::default(),
1679 });
1680
1681 let harness = agent_def.harness()?;
1682 let model = agent_def.model();
1683
1684 terminal::spawn_terminal_with_harness_and_model(
1685 &format!("repair-{}", task_id),
1686 prompt,
1687 working_dir,
1688 session_name,
1689 harness,
1690 model,
1691 )?;
1692
1693 println!(" {} Spawned repairer for {}", "✓".green(), task_id);
1694 Ok(())
1695}
1696
1697fn wait_for_repair_completion_task(working_dir: &std::path::Path, task_id: &str) -> Result<bool> {
1699 let marker_path = working_dir
1700 .join(".scud")
1701 .join(format!("repair-complete-{}", task_id));
1702
1703 let timeout = Duration::from_secs(1800); let start = std::time::Instant::now();
1705
1706 loop {
1707 if start.elapsed() > timeout {
1708 println!(" {} Repair timed out for {}", "!".yellow(), task_id);
1709 return Ok(false);
1710 }
1711
1712 if marker_path.exists() {
1713 let content = std::fs::read_to_string(&marker_path)?;
1714 std::fs::remove_file(&marker_path)?;
1715
1716 let success = content.contains("SUCCESS");
1717 if success {
1718 println!(" {} Repair completed for {}", "✓".green(), task_id);
1719 } else {
1720 println!(" {} Repair blocked for {}", "!".yellow(), task_id);
1721 }
1722
1723 return Ok(success);
1724 }
1725
1726 thread::sleep(Duration::from_secs(5));
1727 }
1728}
1729
1730fn find_task_with_tag(
1732 storage: &Storage,
1733 task_id: &str,
1734 task_tags: &[(String, String)],
1735) -> Option<(Task, String)> {
1736 let tag = task_tags.iter().find(|(id, _)| id == task_id)?.1.clone();
1737 let phase = storage.load_group(&tag).ok()?;
1738 let task = phase.get_task(task_id)?.clone();
1739 Some((task, tag))
1740}