Skip to main content

scud/commands/swarm/
mod.rs

1//! Swarm mode - Parallel execution with multiple strategies
2//!
3//! Executes tasks using parallel agents with two main modes:
4//!
5//! ## Wave Mode (default)
6//! Executes tasks in dependency-order waves using parallel agents.
7//! After each wave, runs backpressure validation (build, lint, test).
8//!
9//! Flow:
10//! 1. [Optional] Research phase: Smart model analyzes tasks, may expand complex ones
11//! 2. Build phase: Fast models execute tasks in parallel rounds
12//! 3. Validate phase: Runs backpressure tests (compile, lint, test), smart model fixes issues
13//! 4. Repeat for next wave
14//!
15//! ## Beads Mode (`--swarm-mode beads`)
16//! Continuous ready-task polling inspired by Beads/Gas Town patterns.
17//! Tasks execute immediately when dependencies are met, no batch waiting.
18//!
19//! Flow:
20//! 1. Query for ready tasks (all dependencies Done)
21//! 2. Claim task (mark in-progress)
22//! 3. Spawn agent
23//! 4. Immediately loop back to step 1 (no waiting for batch)
24//!
25//! Usage:
26//!   scud swarm --tag <tag>                           # Wave mode with tmux (default)
27//!   scud swarm --tag <tag> --swarm-mode beads        # Beads continuous execution
28//!   scud swarm --tag <tag> --swarm-mode extensions   # Wave mode with async subprocesses
29//!   scud swarm --tag <tag> --no-research             # Skip research, use tasks as-is
30//!   scud swarm --tag <tag> --no-validate             # Skip backpressure validation
31
32pub mod beads;
33pub mod events;
34pub mod session;
35pub mod transcript;
36
37/// Re-export backpressure module for backward compatibility.
38///
39/// The canonical location is now [`crate::backpressure`], but this re-export
40/// maintains the old path `scud::commands::swarm::backpressure` for existing code.
41pub use crate::backpressure;
42
43use anyhow::Result;
44use colored::Colorize;
45use std::collections::HashMap;
46use std::path::PathBuf;
47use std::thread;
48use std::time::Duration;
49
50use crate::commands::helpers::resolve_group_tag;
51use crate::commands::spawn::agent;
52use crate::commands::spawn::hooks;
53use crate::commands::spawn::monitor::{self, SpawnSession};
54use crate::commands::spawn::terminal::{self, Harness};
55use crate::models::phase::Phase;
56use crate::models::task::{Task, TaskStatus};
57use crate::storage::Storage;
58use std::path::Path;
59
60use self::session::{acquire_session_lock, RoundState, SwarmSession, WaveState, WaveSummary};
61use crate::agents::AgentDef;
62use crate::attribution::{attribute_failure, AttributionConfidence};
63use crate::backpressure::{BackpressureConfig, ValidationResult};
64
65/// Swarm execution mode
66pub use crate::SwarmMode;
67
68/// Main entry point for the swarm command
69#[allow(clippy::too_many_arguments)]
70pub fn run(
71    project_root: Option<PathBuf>,
72    tag: Option<&str>,
73    round_size: usize,
74    all_tags: bool,
75    harness_arg: &str,
76    swarm_mode: SwarmMode,
77    dry_run: bool,
78    session_name: Option<String>,
79    no_research: bool,
80    no_validate: bool,
81    review: bool,
82    review_all: bool,
83    no_repair: bool,
84    max_repair_attempts: usize,
85) -> Result<()> {
86    let effective_tag = tag.unwrap_or("default");
87
88    if round_size == 0 {
89        anyhow::bail!("--round-size must be at least 1");
90    }
91
92    let storage = Storage::new(project_root.clone());
93
94    if !storage.is_initialized() {
95        anyhow::bail!("SCUD not initialized. Run: scud init");
96    }
97
98    // Check tmux is available (only in tmux mode)
99    if matches!(swarm_mode, SwarmMode::Tmux) {
100        terminal::check_tmux_available()?;
101    }
102
103    // Determine phase tag
104    let phase_tag = if all_tags {
105        "all".to_string()
106    } else {
107        resolve_group_tag(&storage, tag, true)?
108    };
109
110    // Acquire session lock to prevent concurrent swarm runs on same tag
111    // Lock is held for the duration of the function and released on drop
112    let _session_lock = if !dry_run {
113        Some(acquire_session_lock(project_root.as_ref(), &phase_tag)?)
114    } else {
115        None
116    };
117
118    // Parse harness and validate binary exists
119    let harness = Harness::parse(harness_arg)?;
120    terminal::find_harness_binary(harness)?;
121
122    // Generate session name
123    let session_name = session_name.unwrap_or_else(|| format!("swarm-{}", effective_tag));
124
125    // Get working directory
126    let working_dir = project_root
127        .clone()
128        .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
129
130    // Load backpressure configuration
131    let bp_config = BackpressureConfig::load(project_root.as_ref())?;
132
133    // Display header
134    println!("{}", "SCUD Swarm Mode".cyan().bold());
135    println!("{}", "═".repeat(50));
136    println!("{:<20} {}", "Tag:".dimmed(), phase_tag.green());
137    println!(
138        "{:<20} {}",
139        "Round size:".dimmed(),
140        round_size.to_string().cyan()
141    );
142    println!(
143        "{:<20} {}",
144        "Research:".dimmed(),
145        if no_research {
146            "skip".yellow()
147        } else {
148            "enabled".green()
149        }
150    );
151    println!(
152        "{:<20} {}",
153        "Validation:".dimmed(),
154        if no_validate {
155            "skip".yellow()
156        } else {
157            "enabled".green()
158        }
159    );
160    println!(
161        "{:<20} {}",
162        "Mode:".dimmed(),
163        match swarm_mode {
164            SwarmMode::Tmux => "tmux (waves)".cyan(),
165            SwarmMode::Extensions => "extensions (waves)".green(),
166            SwarmMode::Server => "server (opencode)".magenta(),
167            SwarmMode::Beads => "beads (continuous)".yellow(),
168        }
169    );
170    println!("{:<20} {}", "Harness:".dimmed(), harness.name().cyan());
171    println!(
172        "{:<20} {}",
173        "Review:".dimmed(),
174        if review_all {
175            "all tasks".green()
176        } else if review {
177            "sample (3 per wave)".green()
178        } else {
179            "disabled".yellow()
180        }
181    );
182    println!(
183        "{:<20} {}",
184        "Repair:".dimmed(),
185        if no_repair {
186            "disabled".yellow()
187        } else {
188            format!("up to {} attempts", max_repair_attempts).green()
189        }
190    );
191
192    if !bp_config.commands.is_empty() && !no_validate {
193        println!(
194            "{:<20} {}",
195            "Backpressure:".dimmed(),
196            bp_config.commands.join(", ").dimmed()
197        );
198    }
199    println!();
200
201    if dry_run {
202        return run_dry_run(project_root, &phase_tag, round_size, all_tags);
203    }
204
205    // Install hooks if needed
206    if !hooks::hooks_installed(&working_dir) {
207        println!("{}", "Installing Claude Code hooks...".dimmed());
208        if let Err(e) = hooks::install_hooks(&working_dir) {
209            println!(
210                "  {} Hook installation: {}",
211                "!".yellow(),
212                e.to_string().dimmed()
213            );
214        } else {
215            println!("  {} Hooks installed", "✓".green());
216        }
217    }
218
219    // Initialize swarm session
220    let terminal_mode = match swarm_mode {
221        SwarmMode::Tmux => "tmux",
222        SwarmMode::Extensions => "extensions",
223        SwarmMode::Beads => "beads",
224        SwarmMode::Server => "server",
225    };
226    let mut swarm_session = SwarmSession::new(
227        &session_name,
228        &phase_tag,
229        terminal_mode,
230        &working_dir.to_string_lossy(),
231        round_size,
232    );
233
234    // Detect orphan in-progress tasks (tasks with no running tmux window)
235    // Only applicable in tmux mode
236    let all_phases = storage.load_tasks()?;
237    if matches!(swarm_mode, SwarmMode::Tmux) {
238        let orphans = find_orphan_tasks(&all_phases, &phase_tag, all_tags, &session_name);
239
240        if !orphans.is_empty() {
241            println!();
242            println!(
243                "{}",
244                "Detected orphan in-progress tasks (no tmux window):".yellow()
245            );
246            for (task_id, tag) in &orphans {
247                println!(
248                    "  {} {} (tag: {})",
249                    "*".yellow(),
250                    task_id.cyan(),
251                    tag.dimmed()
252                );
253            }
254            println!();
255
256            // Prompt user for action
257            let choices = vec![
258                "Reset to pending and re-run",
259                "Kill existing windows (if any) and restart",
260                "Skip and continue (leave as in-progress)",
261                "Abort",
262            ];
263
264            let selection = dialoguer::Select::new()
265                .with_prompt("How should orphan tasks be handled?")
266                .items(&choices)
267                .default(0)
268                .interact()?;
269
270            match selection {
271                0 => {
272                    // Reset to pending
273                    for (task_id, tag) in &orphans {
274                        if let Ok(mut phase) = storage.load_group(tag) {
275                            if let Some(task) = phase.get_task_mut(task_id) {
276                                task.set_status(TaskStatus::Pending);
277                                storage.update_group(tag, &phase)?;
278                                println!("  {} {} -> pending", "v".green(), task_id);
279                            }
280                        }
281                    }
282                }
283                1 => {
284                    // Kill and restart - first try to kill any matching windows
285                    for (task_id, _) in &orphans {
286                        let window_name = format!("task-{}", task_id);
287                        let _ = terminal::kill_tmux_window(&session_name, &window_name);
288                    }
289                    // Reset to pending so they'll be picked up
290                    for (task_id, tag) in &orphans {
291                        if let Ok(mut phase) = storage.load_group(tag) {
292                            if let Some(task) = phase.get_task_mut(task_id) {
293                                task.set_status(TaskStatus::Pending);
294                                storage.update_group(tag, &phase)?;
295                                println!("  {} {} -> pending (will re-spawn)", "v".green(), task_id);
296                            }
297                        }
298                    }
299                }
300                2 => {
301                    // Skip - do nothing, leave as in-progress
302                    println!("{}", "Leaving orphan tasks as in-progress.".dimmed());
303                }
304                3 => {
305                    // Abort
306                    anyhow::bail!("Aborted by user");
307                }
308            _ => {}
309            }
310            println!();
311        }
312    }
313
314    // === BEADS MODE: Continuous execution ===
315    // If beads mode is selected, run the continuous polling loop instead of waves
316    if matches!(swarm_mode, SwarmMode::Beads) {
317        let beads_config = beads::BeadsConfig {
318            max_concurrent: round_size, // Reuse round_size as max concurrent
319            poll_interval: Duration::from_secs(3),
320        };
321
322        // Check tmux availability for beads mode (uses tmux by default)
323        terminal::check_tmux_available()?;
324
325        let result = beads::run_beads_loop(
326            &storage,
327            &phase_tag,
328            all_tags,
329            &working_dir,
330            &session_name,
331            harness,
332            &beads_config,
333            &mut swarm_session,
334        )?;
335
336        // Save session state
337        session::save_session(project_root.as_ref(), &swarm_session)?;
338
339        // Final summary
340        println!();
341        println!("{}", "Beads Session Summary".blue().bold());
342        println!("{}", "═".repeat(40).blue());
343        println!(
344            "  Tasks completed: {}",
345            result.tasks_completed.to_string().green()
346        );
347        println!(
348            "  Tasks failed:    {}",
349            if result.tasks_failed > 0 {
350                result.tasks_failed.to_string().red()
351            } else {
352                "0".to_string().green()
353            }
354        );
355        println!(
356            "  Duration:        {}",
357            format!("{:.1}s", result.total_duration.as_secs_f64()).cyan()
358        );
359
360        return Ok(());
361    }
362
363    // === WAVE MODE: Batch execution ===
364    // Main loop: execute waves until all tasks done
365    let mut wave_number = 1;
366    loop {
367        // Load fresh task state (must reload each iteration to see completed tasks)
368        let all_phases = storage.load_tasks()?;
369
370        // Compute waves from current state
371        let waves = compute_waves_from_tasks(&all_phases, &phase_tag, all_tags)?;
372
373        if waves.is_empty() {
374            println!();
375            println!("{}", "All tasks complete!".green().bold());
376            break;
377        }
378
379        // Get first wave (tasks with no pending dependencies)
380        let wave_tasks = &waves[0];
381
382        if wave_tasks.is_empty() {
383            println!();
384            println!("{}", "No ready tasks in current wave.".yellow());
385
386            let in_progress_count = count_in_progress(&all_phases, &phase_tag, all_tags);
387            if in_progress_count > 0 {
388                println!(
389                    "Waiting for {} in-progress task(s) to complete...",
390                    in_progress_count.to_string().cyan()
391                );
392                thread::sleep(Duration::from_secs(10));
393                continue;
394            } else {
395                println!("Check for blocked tasks: scud list --status blocked");
396                break;
397            }
398        }
399
400        println!();
401        println!(
402            "{} {} - {} task(s)",
403            "Wave".blue().bold(),
404            wave_number.to_string().cyan(),
405            wave_tasks.len()
406        );
407        println!("{}", "-".repeat(40).blue());
408
409        // Track wave state
410        let mut wave_state = WaveState::new(wave_number);
411
412        // === PHASE 1: RESEARCH (optional, first wave only) ===
413        if !no_research && wave_number == 1 {
414            println!();
415            println!("  {} Analyzing tasks...", "Research:".magenta());
416            // TODO: Smart model could expand complex tasks here
417            println!("    {} Task analysis complete", "✓".green());
418        }
419
420        // === PHASE 2: BUILD ===
421        let num_rounds = wave_tasks.len().div_ceil(round_size);
422        for (round_idx, round_tasks) in wave_tasks.chunks(round_size).enumerate() {
423            println!();
424            println!(
425                "  {} {}/{} - {} task(s)",
426                "Round".yellow(),
427                round_idx + 1,
428                num_rounds,
429                round_tasks.len()
430            );
431
432            // Spawn agents for this round based on swarm mode
433            // Note: Agents self-orient using scud CLI commands (scud list, scud show, etc.)
434            let round_state = match swarm_mode {
435                SwarmMode::Tmux => {
436                    let state = execute_round(
437                        &storage,
438                        round_tasks,
439                        &working_dir,
440                        &session_name,
441                        round_idx,
442                        harness,
443                    )?;
444
445                    // Create/update spawn proxy for monitor visibility (tmux mode only)
446                    let _proxy_path = create_and_update_spawn_proxy(
447                        &storage,
448                        project_root.as_ref(),
449                        &session_name,
450                        &phase_tag,
451                        &working_dir,
452                        &swarm_session,
453                        Some(&state),
454                    )?;
455
456                    // Wait for round completion (tmux mode - poll for status changes)
457                    println!("    Waiting for round completion...");
458                    wait_for_round_completion(&storage, round_tasks)?;
459
460                    state
461                }
462                SwarmMode::Extensions => {
463                    // Extensions mode: synchronous execution with async subprocess runner
464                    execute_round_extensions(
465                        &storage,
466                        round_tasks,
467                        &working_dir,
468                        round_idx,
469                        harness,
470                    )?
471                }
472                SwarmMode::Server => {
473                    // Server mode: use OpenCode Server for agent orchestration
474                    execute_round_server(
475                        &storage,
476                        round_tasks,
477                        &working_dir,
478                        round_idx,
479                    )?
480                }
481                SwarmMode::Beads => {
482                    // Beads mode is handled earlier with early return
483                    // This branch should never be reached
484                    unreachable!("Beads mode should exit before wave loop")
485                }
486            };
487
488            wave_state.rounds.push(round_state.clone());
489            println!("    {} Round {} complete", "✓".green(), round_idx + 1);
490        }
491
492        // === PHASE 3: VALIDATE (optional) ===
493        if !no_validate && !bp_config.commands.is_empty() {
494            println!();
495            println!("  {} Running backpressure checks...", "Validate:".magenta());
496
497            let validation_result = backpressure::run_validation(&working_dir, &bp_config)?;
498
499            if validation_result.all_passed {
500                println!("    {} All checks passed", "✓".green());
501
502                // Mark all tasks as done
503                for (task_id, tag) in wave_state.task_tags() {
504                    if let Ok(mut phase) = storage.load_group(&tag) {
505                        if let Some(task) = phase.get_task_mut(&task_id) {
506                            task.set_status(TaskStatus::Done);
507                            let _ = storage.update_group(&tag, &phase);
508                        }
509                    }
510                }
511            } else {
512                println!("    {} Some checks failed:", "!".yellow());
513                for failure in &validation_result.failures {
514                    println!("      - {}", failure.red());
515                }
516
517                if no_repair {
518                    // Old behavior: mark all tasks as failed
519                    let task_tags = wave_state.task_tags();
520                    for (task_id, tag) in &task_tags {
521                        if let Ok(mut phase) = storage.load_group(tag) {
522                            if let Some(task) = phase.get_task_mut(task_id) {
523                                task.set_status(TaskStatus::Failed);
524                                let _ = storage.update_group(tag, &phase);
525                            }
526                        }
527                    }
528                    println!(
529                        "    {} Marked {} task(s) as failed",
530                        "!".yellow(),
531                        task_tags.len()
532                    );
533                } else {
534                    // New behavior: run repair loop
535                    let repaired = run_repair_loop(
536                        &storage,
537                        &working_dir,
538                        &session_name,
539                        &bp_config,
540                        &wave_state,
541                        &validation_result,
542                        max_repair_attempts,
543                    )?;
544
545                    if !repaired {
546                        println!("    {} Wave failed after repair attempts", "!".red());
547                    }
548                }
549            }
550
551            wave_state.validation = Some(validation_result);
552        }
553
554        // Generate wave summary (just what was done - not context accumulation)
555        let summary = WaveSummary {
556            wave_number,
557            tasks_completed: wave_state.all_task_ids(),
558            files_changed: collect_changed_files(&working_dir, wave_state.start_commit.as_deref())
559                .unwrap_or_default(),
560        };
561        wave_state.summary = Some(summary.clone());
562
563        // === PHASE 4: REVIEW (optional) ===
564        if (review || review_all) && !dry_run {
565            // Build task list for review
566            let wave_tasks: Vec<(String, String)> = wave_state
567                .task_tags()
568                .iter()
569                .filter_map(|(id, tag)| {
570                    storage
571                        .load_group(tag)
572                        .ok()
573                        .and_then(|phase| phase.get_task(id).map(|t| (id.clone(), t.title.clone())))
574                })
575                .collect();
576
577            if !wave_tasks.is_empty() {
578                let review_result = spawn_reviewer(
579                    &working_dir,
580                    &session_name,
581                    &summary,
582                    &wave_tasks,
583                    review_all,
584                )?;
585
586                if !review_result.all_passed && !review_result.tasks_to_improve.is_empty() {
587                    println!(
588                        "    {} Reviewer found issues in: {}",
589                        "!".yellow(),
590                        review_result.tasks_to_improve.join(", ")
591                    );
592
593                    // Spawn improvement agents for flagged tasks
594                    for task_id in &review_result.tasks_to_improve {
595                        // Find task and spawn builder to improve
596                        if let Some((task, _tag)) =
597                            find_task_with_tag(&storage, task_id, &wave_state.task_tags())
598                        {
599                            let prompt = format!(
600                                "Improve SCUD task {}: {}\n\nThe reviewer flagged this task for improvements. \
601                                 Review the implementation and make it better. When done: scud set-status {} done",
602                                task.id, task.title, task.id
603                            );
604
605                            // Use builder agent for improvements
606                            if let Some(agent_def) = AgentDef::try_load("builder", &working_dir) {
607                                let harness = agent_def.harness()?;
608                                let model = agent_def.model();
609
610                                terminal::spawn_terminal_with_harness_and_model(
611                                    &format!("improve-{}", task_id),
612                                    &prompt,
613                                    &working_dir,
614                                    &session_name,
615                                    harness,
616                                    model,
617                                )?;
618
619                                println!(
620                                    "    {} Spawned improvement agent for {}",
621                                    "✓".green(),
622                                    task_id
623                                );
624                            }
625                        }
626                    }
627                } else {
628                    println!("    {} Review complete, all tasks approved", "✓".green());
629                }
630            }
631        }
632
633        // Save session state
634        swarm_session.waves.push(wave_state);
635        session::save_session(project_root.as_ref(), &swarm_session)?;
636
637        wave_number += 1;
638    }
639
640    // Final summary
641    // Final bridge update for spawn monitor/TUI compatibility
642    create_and_update_spawn_proxy(
643        &storage,
644        project_root.as_ref(),
645        &session_name,
646        &phase_tag,
647        &working_dir,
648        &swarm_session,
649        None, // Final update - include all rounds
650    )?;
651
652    println!();
653    println!("{}", "Swarm Session Summary".blue().bold());
654    println!("{}", "═".repeat(40).blue());
655    println!(
656        "  Waves completed: {}",
657        swarm_session.waves.len().to_string().green()
658    );
659
660    let total_tasks: usize = swarm_session
661        .waves
662        .iter()
663        .flat_map(|w| &w.rounds)
664        .map(|r| r.task_ids.len())
665        .sum();
666    println!("  Tasks executed: {}", total_tasks.to_string().green());
667
668    println!("  {} Spawn proxy updated for monitor/TUI", "✓".green());
669
670    Ok(())
671}
672
673fn create_and_update_spawn_proxy(
674    storage: &Storage,
675    project_root: Option<&PathBuf>,
676    session_name: &str,
677    phase_tag: &str,
678    working_dir: &Path,
679    swarm_session: &SwarmSession,
680    latest_round: Option<&RoundState>,
681) -> Result<Option<PathBuf>> {
682    let all_phases = storage.load_tasks()?;
683
684    // Try to load existing proxy session, or create new one
685    let mut spawn_session = match monitor::load_session(project_root, session_name) {
686        Ok(existing) => existing,
687        Err(_) => SpawnSession::new(
688            session_name,
689            phase_tag,
690            "tmux",
691            &working_dir.to_string_lossy(),
692        ),
693    };
694
695    // Get tasks to add (either from latest round or all tasks)
696    let tasks_to_add: Vec<String> = match latest_round {
697        Some(round) => round.task_ids.clone(),
698        None => swarm_session
699            .waves
700            .iter()
701            .flat_map(|w| w.all_task_ids())
702            .collect(),
703    };
704
705    // Add new agents (skip duplicates)
706    let existing_task_ids: std::collections::HashSet<String> = spawn_session
707        .agents
708        .iter()
709        .map(|a| a.task_id.clone())
710        .collect();
711
712    for task_id in &tasks_to_add {
713        if !existing_task_ids.contains(task_id) {
714            if let Some((title, tag)) = find_task_title_tag(&all_phases, task_id) {
715                spawn_session.add_agent(task_id, &title, &tag);
716            }
717        }
718    }
719
720    let session_file = monitor::save_session(project_root, &spawn_session)?;
721    Ok(Some(session_file))
722}
723
724fn find_task_title_tag<'a>(
725    phases: &'a HashMap<String, crate::models::phase::Phase>,
726    task_id: &str,
727) -> Option<(String, String)> {
728    for (tag, phase) in phases {
729        if let Some(task) = phase.get_task(task_id) {
730            return Some((task.title.clone(), tag.clone()));
731        }
732    }
733    None
734}
735
736/// Task info for wave computation
737#[derive(Clone)]
738struct TaskInfo<'a> {
739    task: &'a Task,
740    tag: String,
741}
742
743/// Compute execution waves from current task state
744fn compute_waves_from_tasks<'a>(
745    all_phases: &'a HashMap<String, Phase>,
746    phase_tag: &str,
747    all_tags: bool,
748) -> Result<Vec<Vec<TaskInfo<'a>>>> {
749    use std::collections::HashSet;
750
751    let mut actionable: Vec<TaskInfo<'a>> = Vec::new();
752
753    let phase_tags: Vec<&String> = if all_tags {
754        all_phases.keys().collect()
755    } else {
756        all_phases
757            .keys()
758            .filter(|t| t.as_str() == phase_tag)
759            .collect()
760    };
761
762    for tag in phase_tags {
763        if let Some(phase) = all_phases.get(tag) {
764            for task in &phase.tasks {
765                if is_task_actionable(task, phase) {
766                    actionable.push(TaskInfo {
767                        task,
768                        tag: tag.clone(),
769                    });
770                }
771            }
772        }
773    }
774
775    if actionable.is_empty() {
776        return Ok(Vec::new());
777    }
778
779    // Kahn's algorithm for wave computation
780    let task_ids: HashSet<String> = actionable.iter().map(|t| t.task.id.clone()).collect();
781    let mut in_degree: HashMap<String, usize> = HashMap::new();
782    let mut dependents: HashMap<String, Vec<String>> = HashMap::new();
783
784    // Collect in-progress task IDs for blocking check
785    let in_progress_ids: HashSet<String> = {
786        let tags: Vec<&str> = if all_tags {
787            all_phases.keys().map(|s| s.as_str()).collect()
788        } else {
789            vec![phase_tag]
790        };
791
792        tags.iter()
793            .filter_map(|tag| all_phases.get(*tag))
794            .flat_map(|phase| &phase.tasks)
795            .filter(|t| t.status == TaskStatus::InProgress)
796            .map(|t| t.id.clone())
797            .collect()
798    };
799
800    for info in &actionable {
801        in_degree.entry(info.task.id.clone()).or_insert(0);
802        for dep in &info.task.dependencies {
803            if task_ids.contains(dep) {
804                // Dependency is pending - will be in a wave
805                *in_degree.entry(info.task.id.clone()).or_insert(0) += 1;
806                dependents
807                    .entry(dep.clone())
808                    .or_default()
809                    .push(info.task.id.clone());
810            } else if in_progress_ids.contains(dep) {
811                // Dependency is in-progress - block this task
812                // Set very high in-degree so it never becomes ready
813                *in_degree.entry(info.task.id.clone()).or_insert(0) += 1000;
814            }
815            // If dep is Done/Failed/etc, it's satisfied - do nothing
816        }
817    }
818
819    let mut waves: Vec<Vec<TaskInfo<'a>>> = Vec::new();
820    let mut remaining = in_degree.clone();
821
822    while !remaining.is_empty() {
823        let ready: Vec<String> = remaining
824            .iter()
825            .filter(|(_, &deg)| deg == 0)
826            .map(|(id, _)| id.clone())
827            .collect();
828
829        if ready.is_empty() {
830            break; // Circular dependency
831        }
832
833        let wave: Vec<TaskInfo<'a>> = actionable
834            .iter()
835            .filter(|t| ready.contains(&t.task.id))
836            .cloned()
837            .collect();
838
839        for task_id in &ready {
840            remaining.remove(task_id);
841            if let Some(deps) = dependents.get(task_id) {
842                for dep_id in deps {
843                    if let Some(deg) = remaining.get_mut(dep_id) {
844                        *deg = deg.saturating_sub(1);
845                    }
846                }
847            }
848        }
849
850        waves.push(wave);
851    }
852
853    Ok(waves)
854}
855
856fn is_task_actionable(task: &Task, phase: &Phase) -> bool {
857    if task.status != TaskStatus::Pending {
858        return false;
859    }
860    if task.is_expanded() {
861        return false;
862    }
863    if let Some(ref parent_id) = task.parent_id {
864        let parent_expanded = phase
865            .get_task(parent_id)
866            .map(|p| p.is_expanded())
867            .unwrap_or(false);
868        if !parent_expanded {
869            return false;
870        }
871    }
872    true
873}
874
875fn count_in_progress(
876    all_phases: &HashMap<String, Phase>,
877    phase_tag: &str,
878    all_tags: bool,
879) -> usize {
880    let tags: Vec<&String> = if all_tags {
881        all_phases.keys().collect()
882    } else {
883        all_phases
884            .keys()
885            .filter(|t| t.as_str() == phase_tag)
886            .collect()
887    };
888
889    tags.iter()
890        .filter_map(|tag| all_phases.get(*tag))
891        .flat_map(|phase| &phase.tasks)
892        .filter(|t| t.status == TaskStatus::InProgress)
893        .count()
894}
895
896/// Check if a tmux window exists for a task
897fn tmux_window_exists_for_task(session_name: &str, task_id: &str) -> bool {
898    let window_name = format!("task-{}", task_id);
899    terminal::tmux_window_exists(session_name, &window_name)
900}
901
902/// Find in-progress tasks that have no running tmux window (orphans)
903fn find_orphan_tasks(
904    all_phases: &HashMap<String, Phase>,
905    phase_tag: &str,
906    all_tags: bool,
907    session_name: &str,
908) -> Vec<(String, String)> {
909    // (task_id, tag) pairs
910    let tags: Vec<&str> = if all_tags {
911        all_phases.keys().map(|s| s.as_str()).collect()
912    } else {
913        vec![phase_tag]
914    };
915
916    let mut orphans = Vec::new();
917
918    for tag in tags {
919        if let Some(phase) = all_phases.get(tag) {
920            for task in &phase.tasks {
921                if task.status == TaskStatus::InProgress
922                    && !tmux_window_exists_for_task(session_name, &task.id)
923                {
924                    orphans.push((task.id.clone(), tag.to_string()));
925                }
926            }
927        }
928    }
929
930    orphans
931}
932
933fn execute_round(
934    storage: &Storage,
935    tasks: &[TaskInfo],
936    working_dir: &std::path::Path,
937    session_name: &str,
938    round_idx: usize,
939    default_harness: Harness,
940) -> Result<RoundState> {
941    let mut round_state = RoundState::new(round_idx);
942
943    for info in tasks.iter() {
944        // Resolve agent config (harness, model, prompt) from task's agent_type
945        let config =
946            agent::resolve_agent_config(info.task, &info.tag, default_harness, None, working_dir);
947
948        // Warn if agent type was specified but definition not found
949        if info.task.agent_type.is_some() && !config.from_agent_def {
950            println!(
951                "    {} Agent '{}' not found, using defaults",
952                "!".yellow(),
953                info.task.agent_type.as_deref().unwrap_or("unknown")
954            );
955        }
956
957        match terminal::spawn_terminal_with_harness_and_model(
958            &info.task.id,
959            &config.prompt,
960            working_dir,
961            session_name,
962            config.harness,
963            config.model.as_deref(),
964        ) {
965            Ok(window_index) => {
966                println!(
967                    "    {} Spawned: {} | {} [{}] {}:{}",
968                    "✓".green(),
969                    info.task.id.cyan(),
970                    info.task.title.dimmed(),
971                    config.display_info().dimmed(),
972                    session_name.dimmed(),
973                    window_index.dimmed()
974                );
975                round_state.task_ids.push(info.task.id.clone());
976                round_state.tags.push(info.tag.clone());
977
978                if let Ok(mut phase) = storage.load_group(&info.tag) {
979                    if let Some(task) = phase.get_task_mut(&info.task.id) {
980                        task.set_status(TaskStatus::InProgress);
981                        let _ = storage.update_group(&info.tag, &phase);
982                    }
983                }
984            }
985            Err(e) => {
986                println!("    {} Failed: {} - {}", "✗".red(), info.task.id.red(), e);
987                round_state.failures.push(info.task.id.clone());
988            }
989        }
990
991        thread::sleep(Duration::from_millis(500));
992    }
993
994    Ok(round_state)
995}
996
997/// Execute a round using extension-based subprocesses (no tmux)
998fn execute_round_extensions(
999    storage: &Storage,
1000    tasks: &[TaskInfo],
1001    working_dir: &std::path::Path,
1002    round_idx: usize,
1003    default_harness: Harness,
1004) -> Result<RoundState> {
1005    // Convert TaskInfo to WaveAgent format
1006    let wave_agents: Vec<session::WaveAgent> = tasks
1007        .iter()
1008        .map(|info| session::WaveAgent::new(info.task.clone(), &info.tag))
1009        .collect();
1010
1011    // Mark tasks as in-progress before spawning
1012    for info in tasks {
1013        if let Ok(mut phase) = storage.load_group(&info.tag) {
1014            if let Some(task) = phase.get_task_mut(&info.task.id) {
1015                task.set_status(TaskStatus::InProgress);
1016                let _ = storage.update_group(&info.tag, &phase);
1017            }
1018        }
1019    }
1020
1021    // Run async execution using tokio runtime
1022    let handle = tokio::runtime::Handle::current();
1023    let result = handle.block_on(async {
1024        session::execute_wave_async(&wave_agents, working_dir, round_idx, default_harness).await
1025    })?;
1026
1027    // Print results
1028    for agent_result in &result.agent_results {
1029        if agent_result.success {
1030            println!(
1031                "    {} Completed: {} ({}ms)",
1032                "✓".green(),
1033                agent_result.task_id.cyan(),
1034                agent_result.duration_ms
1035            );
1036        } else {
1037            println!(
1038                "    {} Failed: {} (exit code: {:?})",
1039                "✗".red(),
1040                agent_result.task_id.red(),
1041                agent_result.exit_code
1042            );
1043        }
1044    }
1045
1046    // Update task statuses based on results
1047    for agent_result in &result.agent_results {
1048        // Find the task's tag
1049        if let Some(info) = tasks.iter().find(|t| t.task.id == agent_result.task_id) {
1050            if let Ok(mut phase) = storage.load_group(&info.tag) {
1051                if let Some(task) = phase.get_task_mut(&agent_result.task_id) {
1052                    // The agent itself should update the status via scud set-status
1053                    // But if it failed to start, mark it as failed
1054                    if !agent_result.success && agent_result.exit_code.is_none() {
1055                        task.set_status(TaskStatus::Failed);
1056                        let _ = storage.update_group(&info.tag, &phase);
1057                    }
1058                }
1059            }
1060        }
1061    }
1062
1063    Ok(result.round_state)
1064}
1065
1066/// Execute a round using OpenCode Server mode
1067fn execute_round_server(
1068    storage: &Storage,
1069    tasks: &[TaskInfo],
1070    working_dir: &std::path::Path,
1071    round_idx: usize,
1072) -> Result<RoundState> {
1073    use crate::opencode::AgentOrchestrator;
1074    use tokio::sync::mpsc;
1075
1076    // Mark tasks as in-progress before spawning
1077    for info in tasks {
1078        if let Ok(mut phase) = storage.load_group(&info.tag) {
1079            if let Some(task) = phase.get_task_mut(&info.task.id) {
1080                task.set_status(TaskStatus::InProgress);
1081                let _ = storage.update_group(&info.tag, &phase);
1082            }
1083        }
1084    }
1085
1086    // Run async execution using tokio runtime
1087    let handle = tokio::runtime::Handle::current();
1088    let result = handle.block_on(async {
1089        // Create event channel
1090        let (event_tx, _event_rx) = mpsc::channel(1000);
1091
1092        // Create orchestrator
1093        let mut orchestrator = AgentOrchestrator::new(event_tx.clone()).await?;
1094
1095        // Spawn all agents
1096        for info in tasks {
1097            let prompt = generate_server_prompt(info.task, &info.tag, working_dir);
1098
1099            // Use xAI/Grok as default for server mode
1100            let model = Some(("xai", "grok-3"));
1101
1102            match orchestrator.spawn_agent(info.task, &info.tag, &prompt, model).await {
1103                Ok(_) => {
1104                    println!(
1105                        "    {} Spawned: {} | {} [server/grok-3]",
1106                        "✓".green(),
1107                        info.task.id.cyan(),
1108                        info.task.title.dimmed(),
1109                    );
1110                }
1111                Err(e) => {
1112                    println!(
1113                        "    {} Failed to spawn {}: {}",
1114                        "✗".red(),
1115                        info.task.id,
1116                        e
1117                    );
1118                }
1119            }
1120        }
1121
1122        // Drop our sender so we can detect when orchestrator is done
1123        drop(event_tx);
1124
1125        // Collect results
1126        let results = orchestrator.wait_all().await;
1127
1128        // Cleanup
1129        orchestrator.cleanup().await;
1130
1131        Ok::<_, anyhow::Error>(results)
1132    })?;
1133
1134    // Build round state from results
1135    let mut round_state = RoundState::new(round_idx);
1136
1137    for agent_result in &result {
1138        if agent_result.success {
1139            println!(
1140                "    {} Completed: {} ({}ms)",
1141                "✓".green(),
1142                agent_result.task_id.cyan(),
1143                agent_result.duration_ms
1144            );
1145            round_state.task_ids.push(agent_result.task_id.clone());
1146        } else {
1147            println!(
1148                "    {} Failed: {} (exit code: {:?})",
1149                "✗".red(),
1150                agent_result.task_id.red(),
1151                agent_result.exit_code
1152            );
1153            round_state.failures.push(agent_result.task_id.clone());
1154        }
1155    }
1156
1157    // Add tags for successful tasks
1158    for task_id in &round_state.task_ids {
1159        if let Some(info) = tasks.iter().find(|t| t.task.id == *task_id) {
1160            round_state.tags.push(info.tag.clone());
1161        }
1162    }
1163
1164    // Update task statuses based on results
1165    for agent_result in &result {
1166        if let Some(info) = tasks.iter().find(|t| t.task.id == agent_result.task_id) {
1167            if let Ok(mut phase) = storage.load_group(&info.tag) {
1168                if let Some(task) = phase.get_task_mut(&agent_result.task_id) {
1169                    // If failed without exit code, mark as failed
1170                    if !agent_result.success && agent_result.exit_code.is_none() {
1171                        task.set_status(TaskStatus::Failed);
1172                        let _ = storage.update_group(&info.tag, &phase);
1173                    }
1174                }
1175            }
1176        }
1177    }
1178
1179    Ok(round_state)
1180}
1181
1182/// Generate prompt for server mode (similar to extensions but optimized for OpenCode)
1183fn generate_server_prompt(task: &Task, tag: &str, working_dir: &std::path::Path) -> String {
1184    let details = task
1185        .details
1186        .as_ref()
1187        .map(|d| format!("\n\n## Details\n\n{}", d))
1188        .unwrap_or_default();
1189
1190    let test_strategy = task
1191        .test_strategy
1192        .as_ref()
1193        .map(|t| format!("\n\n## Test Strategy\n\n{}", t))
1194        .unwrap_or_default();
1195
1196    format!(
1197        r#"You are working on task [{id}] in phase "{tag}".
1198
1199## Task: {title}
1200
1201{description}{details}{test_strategy}
1202
1203## Instructions
1204
12051. Implement the task requirements
12062. Test your changes
12073. When complete, run: `scud set-status {id} done --tag {tag}`
1208
1209Working directory: {working_dir}
1210"#,
1211        id = task.id,
1212        tag = tag,
1213        title = task.title,
1214        description = task.description,
1215        details = details,
1216        test_strategy = test_strategy,
1217        working_dir = working_dir.display(),
1218    )
1219}
1220
1221fn wait_for_round_completion(storage: &Storage, tasks: &[TaskInfo]) -> Result<()> {
1222    let task_ids: Vec<String> = tasks.iter().map(|t| t.task.id.clone()).collect();
1223    let task_tags: HashMap<String, String> = tasks
1224        .iter()
1225        .map(|t| (t.task.id.clone(), t.tag.clone()))
1226        .collect();
1227
1228    loop {
1229        let mut all_done = true;
1230
1231        for task_id in &task_ids {
1232            if let Some(tag) = task_tags.get(task_id) {
1233                if let Ok(phase) = storage.load_group(tag) {
1234                    if let Some(task) = phase.get_task(task_id) {
1235                        if task.status == TaskStatus::InProgress
1236                            || task.status == TaskStatus::Pending
1237                        {
1238                            all_done = false;
1239                            break;
1240                        }
1241                    }
1242                }
1243            }
1244        }
1245
1246        if all_done {
1247            break;
1248        }
1249
1250        thread::sleep(Duration::from_secs(5));
1251    }
1252
1253    Ok(())
1254}
1255
1256fn collect_changed_files(
1257    working_dir: &std::path::Path,
1258    start_commit: Option<&str>,
1259) -> Result<Vec<String>> {
1260    use std::process::Command;
1261
1262    // Construct the commit range: start_commit..HEAD or fallback to HEAD~1..HEAD
1263    let range = match start_commit {
1264        Some(commit) => format!("{}..HEAD", commit),
1265        None => "HEAD~1..HEAD".to_string(),
1266    };
1267
1268    let output = Command::new("git")
1269        .current_dir(working_dir)
1270        .args(["diff", "--name-only", &range])
1271        .output()?;
1272
1273    let files: Vec<String> = String::from_utf8_lossy(&output.stdout)
1274        .lines()
1275        .map(|s| s.to_string())
1276        .collect();
1277
1278    Ok(files)
1279}
1280
1281fn run_dry_run(
1282    project_root: Option<PathBuf>,
1283    phase_tag: &str,
1284    round_size: usize,
1285    all_tags: bool,
1286) -> Result<()> {
1287    let storage = Storage::new(project_root);
1288    let all_phases = storage.load_tasks()?;
1289
1290    let waves = compute_waves_from_tasks(&all_phases, phase_tag, all_tags)?;
1291
1292    println!("{}", "Execution Plan (dry-run)".yellow().bold());
1293    println!("{}", "═".repeat(50).yellow());
1294    println!();
1295
1296    let mut total_tasks = 0;
1297    let mut total_rounds = 0;
1298
1299    for (wave_idx, wave) in waves.iter().enumerate() {
1300        let rounds = wave.len().div_ceil(round_size);
1301        total_tasks += wave.len();
1302        total_rounds += rounds;
1303
1304        println!(
1305            "{} {} - {} task(s), {} round(s)",
1306            "Wave".blue().bold(),
1307            wave_idx + 1,
1308            wave.len(),
1309            rounds
1310        );
1311
1312        for (round_idx, chunk) in wave.chunks(round_size).enumerate() {
1313            println!("  {} {}:", "Round".yellow(), round_idx + 1);
1314            for info in chunk {
1315                println!(
1316                    "    {} {} | {}",
1317                    "○".white(),
1318                    info.task.id.cyan(),
1319                    info.task.title
1320                );
1321            }
1322        }
1323        println!();
1324    }
1325
1326    println!("{}", "Summary".blue().bold());
1327    println!("{}", "-".repeat(30).blue());
1328    println!("  Total waves:  {}", waves.len());
1329    println!("  Total tasks:  {}", total_tasks);
1330    println!("  Total rounds: {}", total_rounds);
1331
1332    if total_rounds > 0 {
1333        let speedup = total_tasks as f64 / total_rounds as f64;
1334        println!("  Speedup:      {}", format!("{:.1}x", speedup).green());
1335    }
1336
1337    println!();
1338    println!("{}", "No agents spawned (dry-run mode).".yellow());
1339
1340    Ok(())
1341}
1342
1343// ============================================================================
1344// Review Agent Support
1345// ============================================================================
1346
1347/// Result of a review operation
1348#[derive(Debug)]
1349pub struct ReviewResult {
1350    /// Whether all reviewed tasks passed
1351    pub all_passed: bool,
1352    /// Task IDs that need improvement
1353    pub tasks_to_improve: Vec<String>,
1354}
1355
1356/// Spawn a reviewer agent and wait for it to complete
1357#[allow(dead_code)]
1358pub fn spawn_reviewer(
1359    working_dir: &std::path::Path,
1360    session_name: &str,
1361    summary: &WaveSummary,
1362    wave_tasks: &[(String, String)], // (id, title)
1363    review_all: bool,
1364) -> Result<ReviewResult> {
1365    println!();
1366    println!("  {} Spawning reviewer agent...", "Review:".magenta());
1367
1368    let prompt = agent::generate_review_prompt(summary, wave_tasks, review_all);
1369
1370    // Load reviewer agent definition for harness/model
1371    let agent_def = AgentDef::try_load("reviewer", working_dir).unwrap_or_else(|| {
1372        // Fallback: claude/opus
1373        AgentDef {
1374            agent: crate::agents::AgentMeta {
1375                name: "reviewer".to_string(),
1376                description: "Code reviewer".to_string(),
1377            },
1378            model: crate::agents::ModelConfig {
1379                harness: "claude".to_string(),
1380                model: Some("opus".to_string()),
1381            },
1382            prompt: Default::default(),
1383        }
1384    });
1385
1386    let harness = agent_def.harness()?;
1387    let model = agent_def.model();
1388
1389    // Spawn reviewer
1390    terminal::spawn_terminal_with_harness_and_model(
1391        &format!("review-wave-{}", summary.wave_number),
1392        &prompt,
1393        working_dir,
1394        session_name,
1395        harness,
1396        model,
1397    )?;
1398
1399    println!(
1400        "    {} Reviewer spawned, waiting for completion...",
1401        "✓".green()
1402    );
1403
1404    // Wait for reviewer to complete by watching for output file
1405    wait_for_review_completion(working_dir, summary.wave_number)
1406}
1407
1408/// Wait for the review to complete by polling for marker file
1409fn wait_for_review_completion(
1410    working_dir: &std::path::Path,
1411    wave_number: usize,
1412) -> Result<ReviewResult> {
1413    let marker_path = working_dir
1414        .join(".scud")
1415        .join(format!("review-complete-{}", wave_number));
1416
1417    let timeout = Duration::from_secs(1800); // 30 minute timeout
1418    let start = std::time::Instant::now();
1419
1420    loop {
1421        if start.elapsed() > timeout {
1422            println!("    {} Review timed out after 30 minutes", "!".yellow());
1423            return Ok(ReviewResult {
1424                all_passed: true, // Assume pass on timeout
1425                tasks_to_improve: vec![],
1426            });
1427        }
1428
1429        if marker_path.exists() {
1430            let content = std::fs::read_to_string(&marker_path)?;
1431            std::fs::remove_file(&marker_path)?; // Clean up
1432
1433            let all_passed = content.contains("ALL_PASS");
1434            let tasks_to_improve = if content.contains("IMPROVE_TASKS:") {
1435                content
1436                    .lines()
1437                    .find(|l| l.starts_with("IMPROVE_TASKS:"))
1438                    .map(|l| {
1439                        l.strip_prefix("IMPROVE_TASKS:")
1440                            .unwrap_or("")
1441                            .split(',')
1442                            .map(|s| s.trim().to_string())
1443                            .filter(|s| !s.is_empty())
1444                            .collect()
1445                    })
1446                    .unwrap_or_default()
1447            } else {
1448                vec![]
1449            };
1450
1451            println!("    {} Review complete", "✓".green());
1452            if !all_passed {
1453                println!(
1454                    "    {} Tasks needing improvement: {}",
1455                    "!".yellow(),
1456                    tasks_to_improve.join(", ")
1457                );
1458            }
1459
1460            return Ok(ReviewResult {
1461                all_passed,
1462                tasks_to_improve,
1463            });
1464        }
1465
1466        thread::sleep(Duration::from_secs(5));
1467    }
1468}
1469
1470// ============================================================================
1471// Repair Loop Support
1472// ============================================================================
1473
1474/// Run repair loop for failed validation
1475#[allow(dead_code)]
1476#[allow(clippy::too_many_arguments)]
1477pub fn run_repair_loop(
1478    storage: &Storage,
1479    working_dir: &std::path::Path,
1480    session_name: &str,
1481    bp_config: &BackpressureConfig,
1482    wave_state: &WaveState,
1483    validation_result: &ValidationResult,
1484    max_attempts: usize,
1485) -> Result<bool> {
1486    let wave_tasks = wave_state.all_task_ids();
1487    let task_tags = wave_state.task_tags();
1488
1489    println!();
1490    println!("  {} Analyzing failure attribution...", "Repair:".magenta());
1491
1492    // Get the first failed command for attribution
1493    let failed_cmd = validation_result.results.iter().find(|r| !r.passed);
1494    let failed_cmd = match failed_cmd {
1495        Some(cmd) => cmd,
1496        None => return Ok(true), // No failures? Shouldn't happen
1497    };
1498
1499    // Attribute the failure
1500    let attribution = attribute_failure(
1501        working_dir,
1502        &failed_cmd.stderr,
1503        &failed_cmd.stdout,
1504        &wave_tasks,
1505        wave_state.start_commit.as_deref(),
1506    )?;
1507
1508    match attribution.confidence {
1509        AttributionConfidence::High => {
1510            println!(
1511                "    {} High confidence: task {} responsible",
1512                "✓".green(),
1513                attribution.responsible_tasks.join(", ")
1514            );
1515        }
1516        AttributionConfidence::Medium => {
1517            println!(
1518                "    {} Medium confidence: tasks {} may be responsible",
1519                "~".yellow(),
1520                attribution.responsible_tasks.join(", ")
1521            );
1522        }
1523        AttributionConfidence::Low => {
1524            println!(
1525                "    {} Low confidence: cannot determine specific task",
1526                "!".red()
1527            );
1528        }
1529    }
1530
1531    // Mark cleared tasks as done
1532    for task_id in &attribution.cleared_tasks {
1533        if let Some(tag) = task_tags
1534            .iter()
1535            .find(|(id, _)| id == task_id)
1536            .map(|(_, t)| t)
1537        {
1538            if let Ok(mut phase) = storage.load_group(tag) {
1539                if let Some(task) = phase.get_task_mut(task_id) {
1540                    task.set_status(TaskStatus::Done);
1541                    let _ = storage.update_group(tag, &phase);
1542                    println!("    {} Cleared: {} (not responsible)", "✓".green(), task_id);
1543                }
1544            }
1545        }
1546    }
1547
1548    // Attempt repairs on responsible tasks
1549    for attempt in 1..=max_attempts {
1550        println!();
1551        println!(
1552            "  {} Repair attempt {}/{}",
1553            "Repair:".magenta(),
1554            attempt,
1555            max_attempts
1556        );
1557
1558        let mut all_repaired = true;
1559
1560        for task_id in &attribution.responsible_tasks {
1561            // Find task details
1562            let (task, _tag) = match find_task_with_tag(storage, task_id, &task_tags) {
1563                Some(t) => t,
1564                None => continue,
1565            };
1566
1567            // Get files changed by this task
1568            let task_files = crate::attribution::get_task_changed_files(
1569                working_dir,
1570                task_id,
1571                wave_state.start_commit.as_deref(),
1572            )?;
1573
1574            // Parse error files
1575            let error_files: Vec<String> =
1576                crate::attribution::parse_error_locations(&failed_cmd.stderr, &failed_cmd.stdout)
1577                    .into_iter()
1578                    .map(|(f, _)| f)
1579                    .collect();
1580
1581            // Generate repair prompt
1582            let prompt = agent::generate_repair_prompt(
1583                task_id,
1584                &task.title,
1585                &failed_cmd.command,
1586                &format!("{}\n{}", failed_cmd.stderr, failed_cmd.stdout),
1587                &task_files.into_iter().collect::<Vec<_>>(),
1588                &error_files,
1589            );
1590
1591            // Spawn repairer
1592            spawn_repairer(working_dir, session_name, task_id, &prompt)?;
1593
1594            // Wait for repair completion
1595            if !wait_for_repair_completion_task(working_dir, task_id)? {
1596                all_repaired = false;
1597            }
1598        }
1599
1600        if !all_repaired {
1601            println!("    {} Some repairs failed or blocked", "!".yellow());
1602            continue;
1603        }
1604
1605        // Re-run validation
1606        println!();
1607        println!("  {} Re-running validation...", "Validate:".magenta());
1608        let new_result = crate::backpressure::run_validation(working_dir, bp_config)?;
1609
1610        if new_result.all_passed {
1611            println!("    {} Validation passed after repair!", "✓".green());
1612
1613            // Mark all responsible tasks as done
1614            for task_id in &attribution.responsible_tasks {
1615                if let Some(tag) = task_tags
1616                    .iter()
1617                    .find(|(id, _)| id == task_id)
1618                    .map(|(_, t)| t)
1619                {
1620                    if let Ok(mut phase) = storage.load_group(tag) {
1621                        if let Some(task) = phase.get_task_mut(task_id) {
1622                            task.set_status(TaskStatus::Done);
1623                            let _ = storage.update_group(tag, &phase);
1624                        }
1625                    }
1626                }
1627            }
1628
1629            return Ok(true);
1630        }
1631
1632        println!(
1633            "    {} Validation still failing, will retry...",
1634            "!".yellow()
1635        );
1636    }
1637
1638    // Max attempts reached - mark responsible tasks as failed
1639    println!();
1640    println!("  {} Max repair attempts reached", "!".red());
1641
1642    for task_id in &attribution.responsible_tasks {
1643        if let Some(tag) = task_tags
1644            .iter()
1645            .find(|(id, _)| id == task_id)
1646            .map(|(_, t)| t)
1647        {
1648            if let Ok(mut phase) = storage.load_group(tag) {
1649                if let Some(task) = phase.get_task_mut(task_id) {
1650                    task.set_status(TaskStatus::Failed);
1651                    let _ = storage.update_group(tag, &phase);
1652                    println!("    {} Marked failed: {}", "✗".red(), task_id);
1653                }
1654            }
1655        }
1656    }
1657
1658    Ok(false)
1659}
1660
1661/// Spawn a repairer agent for a specific task
1662fn spawn_repairer(
1663    working_dir: &std::path::Path,
1664    session_name: &str,
1665    task_id: &str,
1666    prompt: &str,
1667) -> Result<()> {
1668    // Load repairer agent definition
1669    let agent_def = AgentDef::try_load("repairer", working_dir).unwrap_or_else(|| AgentDef {
1670        agent: crate::agents::AgentMeta {
1671            name: "repairer".to_string(),
1672            description: "Repair agent".to_string(),
1673        },
1674        model: crate::agents::ModelConfig {
1675            harness: "claude".to_string(),
1676            model: Some("opus".to_string()),
1677        },
1678        prompt: Default::default(),
1679    });
1680
1681    let harness = agent_def.harness()?;
1682    let model = agent_def.model();
1683
1684    terminal::spawn_terminal_with_harness_and_model(
1685        &format!("repair-{}", task_id),
1686        prompt,
1687        working_dir,
1688        session_name,
1689        harness,
1690        model,
1691    )?;
1692
1693    println!("    {} Spawned repairer for {}", "✓".green(), task_id);
1694    Ok(())
1695}
1696
1697/// Wait for a repair to complete by polling for marker file
1698fn wait_for_repair_completion_task(working_dir: &std::path::Path, task_id: &str) -> Result<bool> {
1699    let marker_path = working_dir
1700        .join(".scud")
1701        .join(format!("repair-complete-{}", task_id));
1702
1703    let timeout = Duration::from_secs(1800); // 30 minute timeout
1704    let start = std::time::Instant::now();
1705
1706    loop {
1707        if start.elapsed() > timeout {
1708            println!("    {} Repair timed out for {}", "!".yellow(), task_id);
1709            return Ok(false);
1710        }
1711
1712        if marker_path.exists() {
1713            let content = std::fs::read_to_string(&marker_path)?;
1714            std::fs::remove_file(&marker_path)?;
1715
1716            let success = content.contains("SUCCESS");
1717            if success {
1718                println!("    {} Repair completed for {}", "✓".green(), task_id);
1719            } else {
1720                println!("    {} Repair blocked for {}", "!".yellow(), task_id);
1721            }
1722
1723            return Ok(success);
1724        }
1725
1726        thread::sleep(Duration::from_secs(5));
1727    }
1728}
1729
1730/// Find a task by ID along with its tag
1731fn find_task_with_tag(
1732    storage: &Storage,
1733    task_id: &str,
1734    task_tags: &[(String, String)],
1735) -> Option<(Task, String)> {
1736    let tag = task_tags.iter().find(|(id, _)| id == task_id)?.1.clone();
1737    let phase = storage.load_group(&tag).ok()?;
1738    let task = phase.get_task(task_id)?.clone();
1739    Some((task, tag))
1740}