Skip to main content

scud/commands/swarm/
mod.rs

1//! Swarm mode - Parallel execution with multiple strategies
2//!
3//! Executes tasks using parallel agents with two main modes:
4//!
5//! ## Wave Mode (default)
6//! Executes tasks in dependency-order waves using parallel agents.
7//! After each wave, runs backpressure validation (build, lint, test).
8//!
9//! Flow:
10//! 1. [Optional] Research phase: Smart model analyzes tasks, may expand complex ones
11//! 2. Build phase: Fast models execute tasks in parallel rounds
12//! 3. Validate phase: Runs backpressure tests (compile, lint, test), smart model fixes issues
13//! 4. Repeat for next wave
14//!
15//! ## Beads Mode (`--swarm-mode beads`)
16//! Continuous ready-task polling inspired by Beads/Gas Town patterns.
17//! Tasks execute immediately when dependencies are met, no batch waiting.
18//!
19//! Flow:
20//! 1. Query for ready tasks (all dependencies Done)
21//! 2. Claim task (mark in-progress)
22//! 3. Spawn agent
23//! 4. Immediately loop back to step 1 (no waiting for batch)
24//!
25//! Usage:
26//!   scud swarm --tag <tag>                           # Wave mode with tmux (default)
27//!   scud swarm --tag <tag> --swarm-mode beads        # Beads continuous execution
28//!   scud swarm --tag <tag> --swarm-mode extensions   # Wave mode with async subprocesses
29//!   scud swarm --tag <tag> --no-research             # Skip research, use tasks as-is
30//!   scud swarm --tag <tag> --no-validate             # Skip backpressure validation
31
32pub mod beads;
33pub mod events;
34pub mod publisher;
35pub mod runtime;
36pub mod session;
37pub mod transcript;
38pub mod zmq_client;
39
40/// Re-export backpressure module for backward compatibility.
41///
42/// The canonical location is now [`crate::backpressure`], but this re-export
43/// maintains the old path `scud::commands::swarm::backpressure` for existing code.
44pub use crate::backpressure;
45
46use anyhow::Result;
47use colored::Colorize;
48use std::collections::HashMap;
49use std::path::PathBuf;
50use std::sync::atomic::{AtomicBool, Ordering};
51use std::sync::Arc;
52use std::thread;
53use std::time::Duration;
54
55use self::runtime::SwarmRuntime;
56use crate::commands::helpers::resolve_group_tag;
57use crate::commands::spawn::agent;
58use crate::commands::spawn::headless::{self, store::SessionStatus, StreamStore};
59use crate::commands::spawn::hooks;
60use crate::commands::spawn::monitor::{self, SpawnSession};
61use crate::commands::spawn::terminal::{self, Harness};
62use crate::commands::spawn::tui;
63use crate::models::phase::Phase;
64use crate::models::task::{Task, TaskStatus};
65use crate::storage::Storage;
66use std::path::Path;
67
68use self::session::{acquire_session_lock, RoundState, SwarmSession, WaveState, WaveSummary};
69use crate::agents::AgentDef;
70use crate::attribution::{attribute_failure, AttributionConfidence};
71use crate::backpressure::{BackpressureConfig, ValidationResult};
72use crate::commands::task_selection::{count_in_progress_tasks, is_actionable_pending_task};
73use crate::transcript_watcher::TranscriptWatcher;
74
75/// Swarm execution mode
76pub use crate::SwarmMode;
77
78/// Configuration for a swarm execution session.
79pub struct SwarmConfig {
80    pub project_root: Option<PathBuf>,
81    pub tag: Option<String>,
82    pub round_size: usize,
83    pub all_tags: bool,
84    pub harness_arg: String,
85    pub swarm_mode: SwarmMode,
86    pub dry_run: bool,
87    pub session_name: Option<String>,
88    pub no_research: bool,
89    pub no_validate: bool,
90    pub review: bool,
91    pub review_all: bool,
92    pub no_repair: bool,
93    pub max_repair_attempts: usize,
94    pub no_worktree: bool,
95    pub salvo_dir: Option<PathBuf>,
96    pub stale_timeout_minutes: Option<u64>,
97    pub idle_timeout_minutes: u64,
98    pub no_publish_events: bool,
99    pub pause_flag: Option<Arc<AtomicBool>>,
100    pub stop_flag: Option<Arc<AtomicBool>>,
101}
102
103/// Main entry point for the swarm command
104pub async fn run(config: SwarmConfig) -> Result<()> {
105    // Destructure config for use throughout the function
106    let SwarmConfig {
107        project_root,
108        tag,
109        round_size,
110        all_tags,
111        harness_arg,
112        swarm_mode,
113        dry_run,
114        session_name,
115        no_research,
116        no_validate,
117        review,
118        review_all,
119        no_repair,
120        max_repair_attempts,
121        no_worktree,
122        salvo_dir,
123        stale_timeout_minutes,
124        idle_timeout_minutes,
125        no_publish_events,
126        pause_flag,
127        stop_flag,
128    } = config;
129
130    let tag = tag.as_deref();
131    let harness_arg = &harness_arg;
132    let effective_tag = tag.unwrap_or("default");
133
134    if round_size == 0 {
135        anyhow::bail!("--round-size must be at least 1");
136    }
137
138    let storage = Storage::new(project_root.clone());
139
140    if !storage.is_initialized() {
141        anyhow::bail!("SCUD not initialized. Run: scud init");
142    }
143
144    let runtime = SwarmRuntime::from(swarm_mode);
145    runtime.ensure_requirements()?;
146
147    // Determine phase tag
148    let phase_tag = if all_tags {
149        "all".to_string()
150    } else {
151        resolve_group_tag(&storage, tag, true)?
152    };
153
154    // Acquire session lock to prevent concurrent swarm runs on same tag
155    // Lock is held for the duration of the function and released on drop
156    let _session_lock = if !dry_run {
157        Some(acquire_session_lock(project_root.as_ref(), &phase_tag)?)
158    } else {
159        None
160    };
161
162    // Parse harness and validate binary exists
163    let harness = Harness::parse(harness_arg)?;
164    terminal::find_harness_binary(harness)?;
165
166    // Generate session name
167    let session_name = session_name.unwrap_or_else(|| format!("swarm-{}", effective_tag));
168
169    // Get working directory
170    let original_working_dir = project_root
171        .clone()
172        .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
173
174    // Determine actual working directory (may be a salvo worktree)
175    let (working_dir, is_salvo_worktree, main_project_root) = if !no_worktree && !all_tags {
176        if let Some(tag_name) = tag {
177            match crate::commands::salvo::ensure_worktree(
178                &original_working_dir,
179                tag_name,
180                salvo_dir.as_deref(),
181            ) {
182                Ok(wt_path) => (wt_path, true, Some(original_working_dir.clone())),
183                Err(e) => {
184                    eprintln!("Warning: Could not create salvo worktree: {}", e);
185                    eprintln!("Running in-place (use --no-worktree to suppress this warning)");
186                    (original_working_dir.clone(), false, None)
187                }
188            }
189        } else {
190            (original_working_dir.clone(), false, None)
191        }
192    } else {
193        (original_working_dir.clone(), false, None)
194    };
195
196    // Load backpressure configuration
197    let bp_config = BackpressureConfig::load(project_root.as_ref())?;
198
199    // Start transcript watcher in background thread
200    if !dry_run {
201        let watcher_session = session_name.clone();
202        let watcher_root = working_dir.clone();
203        let _watcher_handle = std::thread::spawn(move || {
204            let db = std::sync::Arc::new(crate::db::Database::new(&watcher_root));
205            if db.initialize().is_err() {
206                return;
207            }
208            let watcher = TranscriptWatcher::new(&watcher_root, db);
209            if let Err(e) = watcher.watch(&watcher_session) {
210                eprintln!("Transcript watcher error: {}", e);
211            }
212        });
213    }
214
215    // Display header
216    println!("{}", "SCUD Swarm Mode".cyan().bold());
217    println!("{}", "═".repeat(50));
218    println!("{:<20} {}", "Tag:".dimmed(), phase_tag.green());
219    println!(
220        "{:<20} {}",
221        "Round size:".dimmed(),
222        round_size.to_string().cyan()
223    );
224    println!(
225        "{:<20} {}",
226        "Research:".dimmed(),
227        if no_research {
228            "skip".yellow()
229        } else {
230            "enabled".green()
231        }
232    );
233    println!(
234        "{:<20} {}",
235        "Validation:".dimmed(),
236        if no_validate {
237            "skip".yellow()
238        } else {
239            "enabled".green()
240        }
241    );
242    let mode_label = match runtime {
243        SwarmRuntime::Tmux => runtime.display_label().cyan(),
244        SwarmRuntime::Extensions => runtime.display_label().green(),
245        SwarmRuntime::Server => runtime.display_label().magenta(),
246        SwarmRuntime::Headless => runtime.display_label().green(),
247        SwarmRuntime::Beads => runtime.display_label().yellow(),
248    };
249    println!("{:<20} {}", "Mode:".dimmed(), mode_label);
250    println!("{:<20} {}", "Harness:".dimmed(), harness.name().cyan());
251    println!(
252        "{:<20} {}",
253        "Review:".dimmed(),
254        if review_all {
255            "all tasks".green()
256        } else if review {
257            "sample (3 per wave)".green()
258        } else {
259            "disabled".yellow()
260        }
261    );
262    println!(
263        "{:<20} {}",
264        "Repair:".dimmed(),
265        if no_repair {
266            "disabled".yellow()
267        } else {
268            format!("up to {} attempts", max_repair_attempts).green()
269        }
270    );
271
272    if !bp_config.commands.is_empty() && !no_validate {
273        println!(
274            "{:<20} {}",
275            "Backpressure:".dimmed(),
276            bp_config.commands.join(", ").dimmed()
277        );
278    }
279    println!();
280
281    if dry_run {
282        return run_dry_run(project_root, &phase_tag, round_size, all_tags);
283    }
284
285    // Install hooks if needed
286    if !hooks::hooks_installed(&working_dir) {
287        println!("{}", "Installing Claude Code hooks...".dimmed());
288        if let Err(e) = hooks::install_hooks(&working_dir) {
289            println!(
290                "  {} Hook installation: {}",
291                "!".yellow(),
292                e.to_string().dimmed()
293            );
294        } else {
295            println!("  {} Hooks installed", "✓".green());
296        }
297    }
298
299    // Initialize swarm session
300    let terminal_mode = runtime.terminal_label();
301    let mut swarm_session = SwarmSession::new(
302        &session_name,
303        &phase_tag,
304        terminal_mode,
305        &working_dir.to_string_lossy(),
306        round_size,
307    );
308
309    // EventWriter will be created later in the function
310
311    // Compute stale timeout duration
312    let stale_timeout = stale_timeout_minutes.map(|m| Duration::from_secs(m * 60));
313
314    // Create EventWriter for SQLite event logging (Phase 1b)
315    // Include ZMQ publisher if not disabled
316    let event_writer =
317        events::EventWriter::new_with_zmq(&working_dir, &session_name, !no_publish_events).ok();
318
319    // Create status tracking for control commands
320    let status_state = Arc::new(std::sync::Mutex::new(
321        crate::commands::swarm::publisher::SwarmStatus {
322            state: "running".to_string(),
323            current_wave: 0,
324            total_waves: 0,
325            tasks_completed: 0,
326            tasks_total: 0,
327        },
328    ));
329
330    // Start heartbeat background task for connection liveness detection
331    let heartbeat_handle = if event_writer.is_some() {
332        let working_dir = working_dir.clone();
333        let session_name = session_name.clone();
334        let stop_flag = Arc::new(AtomicBool::new(false));
335        let stop_flag_clone = Arc::clone(&stop_flag);
336
337        let handle = thread::spawn(move || {
338            let writer = match events::EventWriter::new(&working_dir, &session_name) {
339                Ok(w) => w,
340                Err(e) => {
341                    eprintln!("Failed to create heartbeat EventWriter: {}", e);
342                    return;
343                }
344            };
345
346            while !stop_flag_clone.load(Ordering::Relaxed) {
347                if let Err(e) = writer.log_heartbeat() {
348                    eprintln!("Heartbeat logging error: {}", e);
349                }
350                thread::sleep(Duration::from_secs(5));
351            }
352        });
353
354        Some((handle, stop_flag))
355    } else {
356        None
357    };
358
359    // Detect orphan in-progress tasks (tasks with no running tmux window)
360    // Only applicable in tmux mode
361    let all_phases = storage.load_tasks()?;
362    if runtime.is_tmux() {
363        let orphans = find_orphan_tasks(&all_phases, &phase_tag, all_tags, &session_name);
364
365        if !orphans.is_empty() {
366            println!();
367            println!(
368                "{}",
369                "Detected orphan in-progress tasks (no tmux window):".yellow()
370            );
371            for (task_id, tag) in &orphans {
372                println!(
373                    "  {} {} (tag: {})",
374                    "*".yellow(),
375                    task_id.cyan(),
376                    tag.dimmed()
377                );
378            }
379            println!();
380
381            // Prompt user for action
382            let choices = vec![
383                "Reset to pending and re-run",
384                "Kill existing windows (if any) and restart",
385                "Skip and continue (leave as in-progress)",
386                "Abort",
387            ];
388
389            let selection = dialoguer::Select::new()
390                .with_prompt("How should orphan tasks be handled?")
391                .items(&choices)
392                .default(0)
393                .interact()?;
394
395            match selection {
396                0 => {
397                    // Reset to pending
398                    for (task_id, tag) in &orphans {
399                        if let Ok(mut phase) = storage.load_group(tag) {
400                            if let Some(task) = phase.get_task_mut(task_id) {
401                                task.set_status(TaskStatus::Pending);
402                                storage.update_group(tag, &phase)?;
403                                println!("  {} {} -> pending", "v".green(), task_id);
404                            }
405                        }
406                    }
407                }
408                1 => {
409                    // Kill and restart - first try to kill any matching windows
410                    for (task_id, _) in &orphans {
411                        let window_name = format!("task-{}", task_id);
412                        let _ = terminal::kill_tmux_window(&session_name, &window_name);
413                    }
414                    // Reset to pending so they'll be picked up
415                    for (task_id, tag) in &orphans {
416                        if let Ok(mut phase) = storage.load_group(tag) {
417                            if let Some(task) = phase.get_task_mut(task_id) {
418                                task.set_status(TaskStatus::Pending);
419                                storage.update_group(tag, &phase)?;
420                                println!(
421                                    "  {} {} -> pending (will re-spawn)",
422                                    "v".green(),
423                                    task_id
424                                );
425                            }
426                        }
427                    }
428                }
429                2 => {
430                    // Skip - do nothing, leave as in-progress
431                    println!("{}", "Leaving orphan tasks as in-progress.".dimmed());
432                }
433                3 => {
434                    // Abort
435                    anyhow::bail!("Aborted by user");
436                }
437                _ => {}
438            }
439            println!();
440        }
441    }
442
443    // === BEADS MODE: Continuous execution ===
444    // If beads mode is selected, run the continuous polling loop instead of waves
445    if runtime.is_beads() {
446        let beads_config = beads::BeadsConfig {
447            max_concurrent: round_size, // Reuse round_size as max concurrent
448            poll_interval: Duration::from_secs(3),
449        };
450
451        // Check tmux availability for beads mode (uses tmux by default)
452        let result = beads::run_beads_loop(
453            &storage,
454            &phase_tag,
455            all_tags,
456            &working_dir,
457            &session_name,
458            harness,
459            &beads_config,
460            &mut swarm_session,
461        )?;
462
463        // Save session state
464        session::save_session(project_root.as_ref(), &swarm_session)?;
465
466        // Final summary
467        println!();
468        println!("{}", "Beads Session Summary".blue().bold());
469        println!("{}", "═".repeat(40).blue());
470        println!(
471            "  Tasks completed: {}",
472            result.tasks_completed.to_string().green()
473        );
474        println!(
475            "  Tasks failed:    {}",
476            if result.tasks_failed > 0 {
477                result.tasks_failed.to_string().red()
478            } else {
479                "0".to_string().green()
480            }
481        );
482        println!(
483            "  Duration:        {}",
484            format!("{:.1}s", result.total_duration.as_secs_f64()).cyan()
485        );
486
487        // Stop heartbeat background task
488        if let Some((handle, stop_flag)) = heartbeat_handle {
489            stop_flag.store(true, Ordering::Relaxed);
490            let _ = handle.join();
491        }
492
493        return Ok(());
494    }
495
496    // === WAVE MODE: Batch execution ===
497    // Main loop: execute waves until all tasks done
498    let mut wave_number = 1;
499    loop {
500        // Check pause/stop flags from control commands
501        if let Some(ref pause_flag) = pause_flag {
502            while pause_flag.load(Ordering::SeqCst) {
503                // Handle any pending control requests while paused
504                #[cfg(feature = "zmq")]
505                if let Some(ref writer) = &event_writer {
506                    if let Some(zmq_publisher) = writer.zmq_publisher() {
507                        let _ = zmq_publisher.handle_control_request(
508                            pause_flag,
509                            stop_flag
510                                .as_ref()
511                                .unwrap_or(&Arc::new(AtomicBool::new(false))),
512                            &|| status_state.lock().unwrap().clone(),
513                        );
514                    }
515                }
516
517                std::thread::sleep(Duration::from_millis(100));
518                if let Some(ref stop_flag) = stop_flag {
519                    if stop_flag.load(Ordering::SeqCst) {
520                        println!();
521                        println!("{}", "Swarm stopped by control command".yellow());
522                        break;
523                    }
524                }
525            }
526        }
527
528        if let Some(ref stop_flag) = stop_flag {
529            if stop_flag.load(Ordering::SeqCst) {
530                println!();
531                println!("{}", "Swarm stopped by control command".yellow());
532
533                // Update status
534                {
535                    let mut status = status_state.lock().unwrap();
536                    status.state = "stopped".to_string();
537                }
538
539                break;
540            }
541        }
542
543        // Load fresh task state (must reload each iteration to see completed tasks)
544        let all_phases = storage.load_tasks()?;
545
546        // Compute waves from current state
547        let waves = compute_waves_from_tasks(&all_phases, &phase_tag, all_tags)?;
548
549        // Update status for control commands
550        {
551            let mut status = status_state.lock().unwrap();
552            status.current_wave = wave_number;
553            status.total_waves = waves.len();
554            status.tasks_total = waves.iter().map(|w| w.len()).sum();
555            status.tasks_completed = all_phases
556                .values()
557                .flat_map(|phase| &phase.tasks)
558                .filter(|task| matches!(task.status, TaskStatus::Done))
559                .count();
560            status.state = if pause_flag
561                .as_ref()
562                .is_some_and(|f| f.load(Ordering::SeqCst))
563            {
564                "paused".to_string()
565            } else {
566                "running".to_string()
567            };
568        }
569
570        if waves.is_empty() {
571            println!();
572            println!("{}", "All tasks complete!".green().bold());
573
574            // Update status
575            {
576                let mut status = status_state.lock().unwrap();
577                status.state = "completed".to_string();
578            }
579
580            // Publish swarm completed event
581            if let Some(ref writer) = &event_writer {
582                let _ =
583                    writer.publish_event(&publisher::ZmqEvent::SwarmCompleted { success: true });
584                let _ = writer.log_swarm_completed(true);
585            }
586
587            break;
588        }
589
590        // Get first wave (tasks with no pending dependencies)
591        let wave_tasks = &waves[0];
592
593        if wave_tasks.is_empty() {
594            println!();
595            println!("{}", "No ready tasks in current wave.".yellow());
596
597            let in_progress_count = count_in_progress_tasks(&all_phases, &phase_tag, all_tags);
598            if in_progress_count > 0 {
599                // Check for stale in-progress tasks whose tmux windows are gone (1d)
600                if runtime.is_tmux() {
601                    let orphans =
602                        find_orphan_tasks(&all_phases, &phase_tag, all_tags, &session_name);
603                    for (task_id, tag) in &orphans {
604                        println!(
605                            "  {} {} has no tmux window, resetting to pending",
606                            "⚠".yellow(),
607                            task_id.cyan()
608                        );
609                        if let Ok(mut phase) = storage.load_group(tag) {
610                            if let Some(task) = phase.get_task_mut(task_id) {
611                                task.set_status(TaskStatus::Pending);
612                                let _ = storage.update_group(tag, &phase);
613                            }
614                        }
615                    }
616                    if !orphans.is_empty() {
617                        continue; // Re-check waves with reset tasks
618                    }
619                }
620
621                println!(
622                    "Waiting for {} in-progress task(s) to complete...",
623                    in_progress_count.to_string().cyan()
624                );
625                thread::sleep(Duration::from_secs(10));
626                continue;
627            } else {
628                println!("Check for blocked tasks: scud list --status blocked");
629                break;
630            }
631        }
632
633        println!();
634        println!(
635            "{} {} - {} task(s)",
636            "Wave".blue().bold(),
637            wave_number.to_string().cyan(),
638            wave_tasks.len()
639        );
640        println!("{}", "-".repeat(40).blue());
641
642        // Track wave state
643        let mut wave_state = WaveState::new(wave_number);
644        let wave_start = std::time::Instant::now();
645
646        // Emit wave started event
647        if let Some(ref writer) = event_writer {
648            let _ = writer.log_wave_started(wave_number, wave_tasks.len());
649        }
650
651        // === PHASE 1: RESEARCH (optional, first wave only) ===
652        if !no_research && wave_number == 1 {
653            println!();
654            println!("  {} Analyzing tasks...", "Research:".magenta());
655            // TODO: Smart model could expand complex tasks here
656            println!("    {} Task analysis complete", "✓".green());
657        }
658
659        // === PHASE 2: BUILD ===
660        let num_rounds = wave_tasks.len().div_ceil(round_size);
661        for (round_idx, round_tasks) in wave_tasks.chunks(round_size).enumerate() {
662            println!();
663            println!(
664                "  {} {}/{} - {} task(s)",
665                "Round".yellow(),
666                round_idx + 1,
667                num_rounds,
668                round_tasks.len()
669            );
670
671            // Spawn agents for this round based on swarm mode
672            // Note: Agents self-orient using scud CLI commands (scud list, scud show, etc.)
673            let round_state = runtime
674                .run_round(
675                    &storage,
676                    round_tasks,
677                    &working_dir,
678                    &session_name,
679                    round_idx,
680                    harness,
681                    stale_timeout,
682                    idle_timeout_minutes,
683                    event_writer.as_ref(),
684                )
685                .await?;
686
687            // Create/update spawn proxy for monitor visibility (tmux mode only)
688            if runtime.is_tmux() {
689                let _proxy_path = create_and_update_spawn_proxy(
690                    &storage,
691                    project_root.as_ref(),
692                    &session_name,
693                    &phase_tag,
694                    &working_dir,
695                    &swarm_session,
696                    Some(&round_state),
697                )?;
698            }
699
700            wave_state.rounds.push(round_state.clone());
701            println!("    {} Round {} complete", "✓".green(), round_idx + 1);
702        }
703
704        // === PHASE 3: VALIDATE (optional) ===
705        if !no_validate && !bp_config.commands.is_empty() {
706            println!();
707            println!("  {} Running backpressure checks...", "Validate:".magenta());
708
709            let validation_result = backpressure::run_validation(&working_dir, &bp_config)?;
710
711            if validation_result.all_passed {
712                println!("    {} All checks passed", "✓".green());
713
714                // Emit validation passed event
715                if let Some(ref writer) = event_writer {
716                    let _ = writer.log_validation_passed();
717                }
718
719                // Mark all tasks as done
720                for (task_id, tag) in wave_state.task_tags() {
721                    if let Ok(mut phase) = storage.load_group(&tag) {
722                        if let Some(task) = phase.get_task_mut(&task_id) {
723                            task.set_status(TaskStatus::Done);
724                            let _ = storage.update_group(&tag, &phase);
725                        }
726                    }
727                }
728            } else {
729                println!("    {} Some checks failed:", "!".yellow());
730                for failure in &validation_result.failures {
731                    println!("      - {}", failure.red());
732                }
733
734                // Emit validation failed event
735                if let Some(ref writer) = event_writer {
736                    let _ = writer.log_validation_failed(&validation_result.failures);
737                }
738
739                if no_repair {
740                    // Old behavior: mark all tasks as failed
741                    let task_tags = wave_state.task_tags();
742                    for (task_id, tag) in &task_tags {
743                        if let Ok(mut phase) = storage.load_group(tag) {
744                            if let Some(task) = phase.get_task_mut(task_id) {
745                                task.set_status(TaskStatus::Failed);
746                                let _ = storage.update_group(tag, &phase);
747                            }
748                        }
749                    }
750                    println!(
751                        "    {} Marked {} task(s) as failed",
752                        "!".yellow(),
753                        task_tags.len()
754                    );
755                } else {
756                    // New behavior: run repair loop
757                    let repaired = run_repair_loop(
758                        &storage,
759                        &working_dir,
760                        &session_name,
761                        &bp_config,
762                        &wave_state,
763                        &validation_result,
764                        max_repair_attempts,
765                    )?;
766
767                    if !repaired {
768                        println!("    {} Wave failed after repair attempts", "!".red());
769                    }
770                }
771            }
772
773            wave_state.validation = Some(validation_result);
774        }
775
776        // Generate wave summary (just what was done - not context accumulation)
777        let summary = WaveSummary {
778            wave_number,
779            tasks_completed: wave_state.all_task_ids(),
780            files_changed: collect_changed_files(&working_dir, wave_state.start_commit.as_deref())
781                .unwrap_or_default(),
782        };
783        wave_state.summary = Some(summary.clone());
784
785        // === PHASE 4: REVIEW (optional) ===
786        if (review || review_all) && !dry_run {
787            // Build task list for review
788            let wave_tasks: Vec<(String, String)> = wave_state
789                .task_tags()
790                .iter()
791                .filter_map(|(id, tag)| {
792                    storage
793                        .load_group(tag)
794                        .ok()
795                        .and_then(|phase| phase.get_task(id).map(|t| (id.clone(), t.title.clone())))
796                })
797                .collect();
798
799            if !wave_tasks.is_empty() {
800                let review_result = spawn_reviewer(
801                    &working_dir,
802                    &session_name,
803                    &summary,
804                    &wave_tasks,
805                    review_all,
806                )?;
807
808                if !review_result.all_passed && !review_result.tasks_to_improve.is_empty() {
809                    println!(
810                        "    {} Reviewer found issues in: {}",
811                        "!".yellow(),
812                        review_result.tasks_to_improve.join(", ")
813                    );
814
815                    // Spawn improvement agents for flagged tasks
816                    for task_id in &review_result.tasks_to_improve {
817                        // Find task and spawn builder to improve
818                        if let Some((task, _tag)) =
819                            find_task_with_tag(&storage, task_id, &wave_state.task_tags())
820                        {
821                            let prompt = format!(
822                                "Improve SCUD task {}: {}\n\nThe reviewer flagged this task for improvements. \
823                                 Review the implementation and make it better. When done: scud set-status {} done",
824                                task.id, task.title, task.id
825                            );
826
827                            // Use builder agent for improvements
828                            if let Some(agent_def) = AgentDef::try_load("builder", &working_dir) {
829                                let harness = agent_def.harness()?;
830                                let model = agent_def.model();
831
832                                let spawn_config = terminal::SpawnConfig {
833                                    task_id: &format!("improve-{}", task_id),
834                                    prompt: &prompt,
835                                    working_dir: &working_dir,
836                                    session_name: &session_name,
837                                    harness,
838                                    model,
839                                    task_list_id: None,
840                                };
841                                terminal::spawn_tmux_agent(&spawn_config)?;
842
843                                println!(
844                                    "    {} Spawned improvement agent for {}",
845                                    "✓".green(),
846                                    task_id
847                                );
848                            }
849                        }
850                    }
851                } else {
852                    println!("    {} Review complete, all tasks approved", "✓".green());
853                }
854            }
855        }
856
857        // Emit wave completed event
858        if let Some(ref writer) = event_writer {
859            let _ = writer.log_wave_completed(wave_number, wave_start.elapsed().as_millis() as u64);
860        }
861
862        // Save session state
863        swarm_session.waves.push(wave_state);
864        session::save_session(project_root.as_ref(), &swarm_session)?;
865
866        // Also save spawn-format session for TUI refresh
867        {
868            let spawn_session = swarm_session.to_spawn_session();
869            monitor::save_session(project_root.as_ref(), &spawn_session)?;
870        }
871
872        wave_number += 1;
873    }
874
875    // Publish swarm completed event (successful completion)
876    if let Some(ref writer) = &event_writer {
877        let _ = writer.publish_event(&publisher::ZmqEvent::SwarmCompleted { success: true });
878        let _ = writer.log_swarm_completed(true);
879    }
880
881    // Final summary
882    // Final bridge update for spawn monitor/TUI compatibility
883    create_and_update_spawn_proxy(
884        &storage,
885        project_root.as_ref(),
886        &session_name,
887        &phase_tag,
888        &working_dir,
889        &swarm_session,
890        None, // Final update - include all rounds
891    )?;
892
893    println!();
894    println!("{}", "Swarm Session Summary".blue().bold());
895    println!("{}", "═".repeat(40).blue());
896    println!(
897        "  Waves completed: {}",
898        swarm_session.waves.len().to_string().green()
899    );
900
901    let total_tasks: usize = swarm_session
902        .waves
903        .iter()
904        .flat_map(|w| &w.rounds)
905        .map(|r| r.task_ids.len())
906        .sum();
907    println!("  Tasks executed: {}", total_tasks.to_string().green());
908
909    println!("  {} Spawn proxy updated for monitor/TUI", "✓".green());
910
911    // Auto-sync worktree results back to main
912    if is_salvo_worktree {
913        if let (Some(main_root), Some(tag_name)) = (&main_project_root, &tag) {
914            if let Err(e) = crate::commands::salvo::sync_to_main(main_root, &working_dir, tag_name)
915            {
916                eprintln!("Warning: Failed to sync salvo back to main: {}", e);
917                eprintln!("Run manually: scud salvo sync {}", tag_name);
918            }
919        }
920    }
921
922    // Stop heartbeat background task
923    if let Some((handle, stop_flag)) = heartbeat_handle {
924        stop_flag.store(true, Ordering::Relaxed);
925        if let Err(e) = handle.join() {
926            eprintln!("Heartbeat thread join error: {:?}", e);
927        }
928    }
929
930    Ok(())
931}
932
933fn create_and_update_spawn_proxy(
934    storage: &Storage,
935    project_root: Option<&PathBuf>,
936    session_name: &str,
937    phase_tag: &str,
938    working_dir: &Path,
939    swarm_session: &SwarmSession,
940    latest_round: Option<&RoundState>,
941) -> Result<Option<PathBuf>> {
942    let all_phases = storage.load_tasks()?;
943
944    // Try to load existing proxy session, or create new one
945    let mut spawn_session = match monitor::load_session(project_root, session_name) {
946        Ok(existing) => existing,
947        Err(_) => SpawnSession::new(
948            session_name,
949            phase_tag,
950            "tmux",
951            &working_dir.to_string_lossy(),
952        ),
953    };
954
955    // Get tasks to add (either from latest round or all tasks)
956    let tasks_to_add: Vec<String> = match latest_round {
957        Some(round) => round.task_ids.clone(),
958        None => swarm_session
959            .waves
960            .iter()
961            .flat_map(|w| w.all_task_ids())
962            .collect(),
963    };
964
965    // Add new agents (skip duplicates)
966    let existing_task_ids: std::collections::HashSet<String> = spawn_session
967        .agents
968        .iter()
969        .map(|a| a.task_id.clone())
970        .collect();
971
972    for task_id in &tasks_to_add {
973        if !existing_task_ids.contains(task_id) {
974            if let Some((title, tag)) = find_task_title_tag(&all_phases, task_id) {
975                spawn_session.add_agent(task_id, &title, &tag);
976            }
977        }
978    }
979
980    let session_file = monitor::save_session(project_root, &spawn_session)?;
981    Ok(Some(session_file))
982}
983
984fn find_task_title_tag(
985    phases: &HashMap<String, crate::models::phase::Phase>,
986    task_id: &str,
987) -> Option<(String, String)> {
988    for (tag, phase) in phases {
989        if let Some(task) = phase.get_task(task_id) {
990            return Some((task.title.clone(), tag.clone()));
991        }
992    }
993    None
994}
995
996/// Task info for wave computation
997#[derive(Clone)]
998struct TaskInfo<'a> {
999    task: &'a Task,
1000    tag: String,
1001}
1002
1003/// Compute execution waves from current task state
1004fn compute_waves_from_tasks<'a>(
1005    all_phases: &'a HashMap<String, Phase>,
1006    phase_tag: &str,
1007    all_tags: bool,
1008) -> Result<Vec<Vec<TaskInfo<'a>>>> {
1009    use std::collections::HashSet;
1010
1011    let mut actionable: Vec<TaskInfo<'a>> = Vec::new();
1012
1013    let phase_tags: Vec<&String> = if all_tags {
1014        all_phases.keys().collect()
1015    } else {
1016        all_phases
1017            .keys()
1018            .filter(|t| t.as_str() == phase_tag)
1019            .collect()
1020    };
1021
1022    for tag in phase_tags {
1023        if let Some(phase) = all_phases.get(tag) {
1024            for task in &phase.tasks {
1025                if is_actionable_pending_task(task, phase) {
1026                    actionable.push(TaskInfo {
1027                        task,
1028                        tag: tag.clone(),
1029                    });
1030                }
1031            }
1032        }
1033    }
1034
1035    if actionable.is_empty() {
1036        return Ok(Vec::new());
1037    }
1038
1039    // Kahn's algorithm for wave computation
1040    let task_ids: HashSet<String> = actionable.iter().map(|t| t.task.id.clone()).collect();
1041    let mut in_degree: HashMap<String, usize> = HashMap::new();
1042    let mut dependents: HashMap<String, Vec<String>> = HashMap::new();
1043
1044    // Collect in-progress task IDs for blocking check
1045    let in_progress_ids: HashSet<String> = {
1046        let tags: Vec<&str> = if all_tags {
1047            all_phases.keys().map(|s| s.as_str()).collect()
1048        } else {
1049            vec![phase_tag]
1050        };
1051
1052        tags.iter()
1053            .filter_map(|tag| all_phases.get(*tag))
1054            .flat_map(|phase| &phase.tasks)
1055            .filter(|t| t.status == TaskStatus::InProgress)
1056            .map(|t| t.id.clone())
1057            .collect()
1058    };
1059
1060    for info in &actionable {
1061        in_degree.entry(info.task.id.clone()).or_insert(0);
1062        for dep in &info.task.dependencies {
1063            if task_ids.contains(dep) {
1064                // Dependency is pending - will be in a wave
1065                *in_degree.entry(info.task.id.clone()).or_insert(0) += 1;
1066                dependents
1067                    .entry(dep.clone())
1068                    .or_default()
1069                    .push(info.task.id.clone());
1070            } else if in_progress_ids.contains(dep) {
1071                // Dependency is in-progress - block this task
1072                // Set very high in-degree so it never becomes ready
1073                *in_degree.entry(info.task.id.clone()).or_insert(0) += 1000;
1074            }
1075            // If dep is Done/Failed/etc, it's satisfied - do nothing
1076        }
1077    }
1078
1079    let mut waves: Vec<Vec<TaskInfo<'a>>> = Vec::new();
1080    let mut remaining = in_degree.clone();
1081
1082    while !remaining.is_empty() {
1083        let ready: Vec<String> = remaining
1084            .iter()
1085            .filter(|(_, &deg)| deg == 0)
1086            .map(|(id, _)| id.clone())
1087            .collect();
1088
1089        if ready.is_empty() {
1090            break; // Circular dependency
1091        }
1092
1093        let wave: Vec<TaskInfo<'a>> = actionable
1094            .iter()
1095            .filter(|t| ready.contains(&t.task.id))
1096            .cloned()
1097            .collect();
1098
1099        for task_id in &ready {
1100            remaining.remove(task_id);
1101            if let Some(deps) = dependents.get(task_id) {
1102                for dep_id in deps {
1103                    if let Some(deg) = remaining.get_mut(dep_id) {
1104                        *deg = deg.saturating_sub(1);
1105                    }
1106                }
1107            }
1108        }
1109
1110        waves.push(wave);
1111    }
1112
1113    Ok(waves)
1114}
1115
1116/// Check if a tmux window exists for a task
1117fn tmux_window_exists_for_task(session_name: &str, task_id: &str) -> bool {
1118    let window_name = format!("task-{}", task_id);
1119    terminal::tmux_window_exists(session_name, &window_name)
1120}
1121
1122/// Find in-progress tasks that have no running tmux window (orphans)
1123fn find_orphan_tasks(
1124    all_phases: &HashMap<String, Phase>,
1125    phase_tag: &str,
1126    all_tags: bool,
1127    session_name: &str,
1128) -> Vec<(String, String)> {
1129    // (task_id, tag) pairs
1130    let tags: Vec<&str> = if all_tags {
1131        all_phases.keys().map(|s| s.as_str()).collect()
1132    } else {
1133        vec![phase_tag]
1134    };
1135
1136    let mut orphans = Vec::new();
1137
1138    for tag in tags {
1139        if let Some(phase) = all_phases.get(tag) {
1140            for task in &phase.tasks {
1141                if task.status == TaskStatus::InProgress
1142                    && !tmux_window_exists_for_task(session_name, &task.id)
1143                {
1144                    orphans.push((task.id.clone(), tag.to_string()));
1145                }
1146            }
1147        }
1148    }
1149
1150    orphans
1151}
1152
1153/// Mark a batch of tasks as in-progress in storage.
1154fn mark_tasks_in_progress(storage: &Storage, tasks: &[TaskInfo]) {
1155    for info in tasks {
1156        if let Ok(mut phase) = storage.load_group(&info.tag) {
1157            if let Some(task) = phase.get_task_mut(&info.task.id) {
1158                task.set_status(TaskStatus::InProgress);
1159                let _ = storage.update_group(&info.tag, &phase);
1160            }
1161        }
1162    }
1163}
1164
1165fn execute_round(
1166    storage: &Storage,
1167    tasks: &[TaskInfo],
1168    working_dir: &std::path::Path,
1169    session_name: &str,
1170    round_idx: usize,
1171    default_harness: Harness,
1172    event_writer: Option<&events::EventWriter>,
1173) -> Result<RoundState> {
1174    let mut round_state = RoundState::new(round_idx);
1175
1176    for info in tasks.iter() {
1177        // Resolve agent config (harness, model, prompt) from task's agent_type
1178        let config =
1179            agent::resolve_agent_config(info.task, &info.tag, default_harness, None, working_dir);
1180
1181        // Warn if agent type was specified but definition not found
1182        if info.task.agent_type.is_some() && !config.from_agent_def {
1183            println!(
1184                "    {} Agent '{}' not found, using defaults",
1185                "!".yellow(),
1186                info.task.agent_type.as_deref().unwrap_or("unknown")
1187            );
1188        }
1189
1190        let spawn_config = terminal::SpawnConfig {
1191            task_id: &info.task.id,
1192            prompt: &config.prompt,
1193            working_dir,
1194            session_name,
1195            harness: config.harness,
1196            model: config.model.as_deref(),
1197            task_list_id: None,
1198        };
1199        match terminal::spawn_tmux_agent(&spawn_config) {
1200            Ok(window_index) => {
1201                println!(
1202                    "    {} Spawned: {} | {} [{}] {}:{}",
1203                    "✓".green(),
1204                    info.task.id.cyan(),
1205                    info.task.title.dimmed(),
1206                    config.display_info().dimmed(),
1207                    session_name.dimmed(),
1208                    window_index.dimmed()
1209                );
1210                round_state.task_ids.push(info.task.id.clone());
1211                round_state.tags.push(info.tag.clone());
1212
1213                // Emit spawn event
1214                if let Some(writer) = event_writer {
1215                    let _ = writer.log_spawned(&info.task.id);
1216                }
1217
1218                if let Ok(mut phase) = storage.load_group(&info.tag) {
1219                    if let Some(task) = phase.get_task_mut(&info.task.id) {
1220                        task.set_status(TaskStatus::InProgress);
1221                        let _ = storage.update_group(&info.tag, &phase);
1222                    }
1223                }
1224            }
1225            Err(e) => {
1226                println!("    {} Failed: {} - {}", "✗".red(), info.task.id.red(), e);
1227                round_state.failures.push(info.task.id.clone());
1228            }
1229        }
1230
1231        thread::sleep(Duration::from_millis(500));
1232    }
1233
1234    Ok(round_state)
1235}
1236
1237/// Execute a round using extension-based subprocesses (no tmux)
1238async fn execute_round_extensions<'a>(
1239    storage: &Storage,
1240    tasks: &[TaskInfo<'a>],
1241    working_dir: &std::path::Path,
1242    round_idx: usize,
1243    default_harness: Harness,
1244) -> Result<RoundState> {
1245    // Convert TaskInfo to WaveAgent format
1246    let wave_agents: Vec<session::WaveAgent> = tasks
1247        .iter()
1248        .map(|info| session::WaveAgent::new(info.task.clone(), &info.tag))
1249        .collect();
1250
1251    mark_tasks_in_progress(storage, tasks);
1252
1253    let result =
1254        session::execute_wave_async(&wave_agents, working_dir, round_idx, default_harness).await?;
1255
1256    // Print results
1257    for agent_result in &result.agent_results {
1258        if agent_result.success {
1259            println!(
1260                "    {} Completed: {} ({}ms)",
1261                "✓".green(),
1262                agent_result.task_id.cyan(),
1263                agent_result.duration_ms
1264            );
1265        } else {
1266            println!(
1267                "    {} Failed: {} (exit code: {:?})",
1268                "✗".red(),
1269                agent_result.task_id.red(),
1270                agent_result.exit_code
1271            );
1272        }
1273    }
1274
1275    // Update task statuses based on results
1276    for agent_result in &result.agent_results {
1277        // Find the task's tag
1278        if let Some(info) = tasks.iter().find(|t| t.task.id == agent_result.task_id) {
1279            if let Ok(mut phase) = storage.load_group(&info.tag) {
1280                if let Some(task) = phase.get_task_mut(&agent_result.task_id) {
1281                    // The agent itself should update the status via scud set-status
1282                    // But if it failed to start, mark it as failed
1283                    if !agent_result.success && agent_result.exit_code.is_none() {
1284                        task.set_status(TaskStatus::Failed);
1285                        let _ = storage.update_group(&info.tag, &phase);
1286                    }
1287                }
1288            }
1289        }
1290    }
1291
1292    Ok(result.round_state)
1293}
1294
1295/// Execute a round using OpenCode Server mode
1296async fn execute_round_server<'a>(
1297    storage: &Storage,
1298    tasks: &[TaskInfo<'a>],
1299    working_dir: &std::path::Path,
1300    round_idx: usize,
1301) -> Result<RoundState> {
1302    use crate::opencode::AgentOrchestrator;
1303    use tokio::sync::mpsc;
1304
1305    // Mark tasks as in-progress before spawning
1306    for info in tasks {
1307        if let Ok(mut phase) = storage.load_group(&info.tag) {
1308            if let Some(task) = phase.get_task_mut(&info.task.id) {
1309                task.set_status(TaskStatus::InProgress);
1310                let _ = storage.update_group(&info.tag, &phase);
1311            }
1312        }
1313    }
1314    // Create event channel
1315    let (event_tx, _event_rx) = mpsc::channel(1000);
1316
1317    // Create orchestrator
1318    let mut orchestrator = AgentOrchestrator::new(event_tx.clone()).await?;
1319
1320    // Resolve model from config
1321    let config_path = working_dir.join(".scud").join("config.toml");
1322    let config = crate::config::Config::load(&config_path).unwrap_or_default();
1323    let model_str = config.swarm_model().to_string();
1324    let provider_str = config.swarm.direct_api_provider.clone();
1325
1326    // Spawn all agents
1327    for info in tasks {
1328        let prompt = generate_server_prompt(info.task, &info.tag, working_dir);
1329
1330        let model = Some((provider_str.as_str(), model_str.as_str()));
1331
1332        match orchestrator
1333            .spawn_agent(info.task, &info.tag, &prompt, model)
1334            .await
1335        {
1336            Ok(_) => {
1337                println!(
1338                    "    {} Spawned: {} | {} [server/{}/{}]",
1339                    "✓".green(),
1340                    info.task.id.cyan(),
1341                    info.task.title.dimmed(),
1342                    provider_str,
1343                    model_str,
1344                );
1345            }
1346            Err(e) => {
1347                println!("    {} Failed to spawn {}: {}", "✗".red(), info.task.id, e);
1348            }
1349        }
1350    }
1351
1352    // Drop our sender so we can detect when orchestrator is done
1353    drop(event_tx);
1354
1355    // Collect results
1356    let results = orchestrator.wait_all().await;
1357
1358    // Cleanup
1359    orchestrator.cleanup().await;
1360
1361    let result = results;
1362
1363    // Build round state from results
1364    let mut round_state = RoundState::new(round_idx);
1365
1366    for agent_result in &result {
1367        if agent_result.success {
1368            println!(
1369                "    {} Completed: {} ({}ms)",
1370                "✓".green(),
1371                agent_result.task_id.cyan(),
1372                agent_result.duration_ms
1373            );
1374            round_state.task_ids.push(agent_result.task_id.clone());
1375        } else {
1376            println!(
1377                "    {} Failed: {} (exit code: {:?})",
1378                "✗".red(),
1379                agent_result.task_id.red(),
1380                agent_result.exit_code
1381            );
1382            round_state.failures.push(agent_result.task_id.clone());
1383        }
1384    }
1385
1386    // Add tags for successful tasks
1387    for task_id in &round_state.task_ids {
1388        if let Some(info) = tasks.iter().find(|t| t.task.id == *task_id) {
1389            round_state.tags.push(info.tag.clone());
1390        }
1391    }
1392
1393    // Update task statuses based on results
1394    for agent_result in &result {
1395        if let Some(info) = tasks.iter().find(|t| t.task.id == agent_result.task_id) {
1396            if let Ok(mut phase) = storage.load_group(&info.tag) {
1397                if let Some(task) = phase.get_task_mut(&agent_result.task_id) {
1398                    // If failed without exit code, mark as failed
1399                    if !agent_result.success && agent_result.exit_code.is_none() {
1400                        task.set_status(TaskStatus::Failed);
1401                        let _ = storage.update_group(&info.tag, &phase);
1402                    }
1403                }
1404            }
1405        }
1406    }
1407
1408    Ok(round_state)
1409}
1410
1411/// Execute a round using spawn's headless streaming infrastructure (no tmux)
1412///
1413/// Uses the headless runner from `spawn::headless` to capture streaming JSON
1414/// events from Claude Code or OpenCode agents without requiring tmux.
1415async fn execute_round_headless(
1416    storage: &Storage,
1417    tasks: &[TaskInfo<'_>],
1418    working_dir: &std::path::Path,
1419    round_idx: usize,
1420    default_harness: Harness,
1421    event_writer: Option<&events::EventWriter>,
1422) -> Result<RoundState> {
1423    use crate::commands::attach::{save_session_metadata, SessionMetadata};
1424
1425    let mut round_state = RoundState::new(round_idx);
1426
1427    // Create stream store for this round
1428    let store = StreamStore::new();
1429
1430    // Mark tasks as in-progress and spawn agents
1431    for info in tasks {
1432        // Resolve agent config (harness, model, prompt) from task's agent_type
1433        let config =
1434            agent::resolve_agent_config(info.task, &info.tag, default_harness, None, working_dir);
1435
1436        // Create session in store
1437        store.create_session(&info.task.id, &info.tag);
1438
1439        // Create a runner for this task's specific harness
1440        let runner = match headless::create_runner(config.harness) {
1441            Ok(r) => r,
1442            Err(e) => {
1443                println!(
1444                    "    {} Failed to create runner for {}: {}",
1445                    "✗".red(),
1446                    info.task.id.red(),
1447                    e
1448                );
1449                round_state.failures.push(info.task.id.clone());
1450                continue;
1451            }
1452        };
1453
1454        // Spawn headless session
1455        let spawn_result = runner
1456            .start(
1457                &info.task.id,
1458                &config.prompt,
1459                working_dir,
1460                config.model.as_deref(),
1461            )
1462            .await;
1463
1464        match spawn_result {
1465            Ok(mut session_handle) => {
1466                // Set PID in store
1467                if let Some(pid) = session_handle.pid() {
1468                    store.set_pid(&info.task.id, pid);
1469                }
1470
1471                println!(
1472                    "    {} Spawned (headless): {} | {} [{}]",
1473                    "✓".green(),
1474                    info.task.id.cyan(),
1475                    info.task.title.dimmed(),
1476                    config.display_info().dimmed(),
1477                );
1478
1479                round_state.task_ids.push(info.task.id.clone());
1480                round_state.tags.push(info.tag.clone());
1481
1482                // Emit spawn event
1483                if let Some(writer) = event_writer {
1484                    let _ = writer.log_spawned(&info.task.id);
1485                }
1486
1487                // Mark task as in-progress
1488                if let Ok(mut phase) = storage.load_group(&info.tag) {
1489                    if let Some(task) = phase.get_task_mut(&info.task.id) {
1490                        task.set_status(TaskStatus::InProgress);
1491                        let _ = storage.update_group(&info.tag, &phase);
1492                    }
1493                }
1494
1495                // Spawn background task to collect events
1496                let store_clone = store.clone();
1497                let task_id = info.task.id.clone();
1498                let tag = info.tag.clone();
1499                let working_dir_clone = working_dir.to_path_buf();
1500                let harness_name = config.harness.name().to_string();
1501
1502                tokio::spawn(async move {
1503                    let mut saw_terminal_event = false;
1504                    while let Some(event) = session_handle.events.recv().await {
1505                        if matches!(
1506                            event.kind,
1507                            headless::StreamEventKind::Complete { .. }
1508                                | headless::StreamEventKind::Error { .. }
1509                        ) {
1510                            saw_terminal_event = true;
1511                        }
1512
1513                        // Check for session ID assignment
1514                        if let headless::StreamEventKind::SessionAssigned { ref session_id } =
1515                            event.kind
1516                        {
1517                            store_clone.set_session_id(&task_id, session_id);
1518
1519                            // Save session metadata for `scud attach` continuation
1520                            let metadata =
1521                                SessionMetadata::new(&task_id, session_id, &tag, &harness_name);
1522                            let _ = save_session_metadata(&working_dir_clone, &metadata);
1523                        }
1524
1525                        store_clone.push_event(&task_id, event);
1526                    }
1527
1528                    // Wait for process to complete. If the harness exited without a terminal
1529                    // stream event, synthesize one so rounds cannot get stuck indefinitely.
1530                    let wait_ok = session_handle.wait().await.unwrap_or(false);
1531                    if !saw_terminal_event {
1532                        if wait_ok {
1533                            store_clone.push_event(&task_id, headless::StreamEvent::complete(true));
1534                        } else {
1535                            store_clone.push_event(
1536                                &task_id,
1537                                headless::StreamEvent::error(
1538                                    "Agent process exited without completion event".to_string(),
1539                                ),
1540                            );
1541                        }
1542                    } else if !wait_ok {
1543                        // Even if we saw a "complete" stream event, trust the process exit code.
1544                        store_clone.push_event(
1545                            &task_id,
1546                            headless::StreamEvent::error(
1547                                "Agent process exited with non-zero status".to_string(),
1548                            ),
1549                        );
1550                    }
1551                });
1552            }
1553            Err(e) => {
1554                println!(
1555                    "    {} Failed (headless): {} - {}",
1556                    "✗".red(),
1557                    info.task.id.red(),
1558                    e
1559                );
1560                round_state.failures.push(info.task.id.clone());
1561
1562                // Record error in store
1563                store.push_event(&info.task.id, headless::StreamEvent::error(e.to_string()));
1564            }
1565        }
1566
1567        // Small delay between spawns to avoid overwhelming the system
1568        tokio::time::sleep(Duration::from_millis(200)).await;
1569    }
1570
1571    // Wait for all tasks to complete by polling the store
1572    let max_wait = Duration::from_secs(3600); // 1 hour max
1573    let start = std::time::Instant::now();
1574    let total_tasks = round_state.task_ids.len();
1575    let mut poll_count = 0u32;
1576    // Track how many display lines we printed last time (for ANSI overwrite)
1577    let mut prev_display_lines = 0usize;
1578
1579    loop {
1580        // Fast initial polling (2s for first 5 polls), then slow (5s)
1581        let poll_interval = if poll_count < 5 {
1582            Duration::from_secs(2)
1583        } else {
1584            Duration::from_secs(5)
1585        };
1586        poll_count += 1;
1587
1588        // Small initial delay to let agents start producing output
1589        if poll_count == 1 {
1590            tokio::time::sleep(Duration::from_secs(2)).await;
1591        }
1592
1593        let active_tasks = store.active_tasks();
1594        let active_count = active_tasks.len();
1595        if active_count == 0 {
1596            break;
1597        }
1598
1599        if start.elapsed() > max_wait {
1600            println!(
1601                "    {} Timeout waiting for {} tasks",
1602                "!".yellow(),
1603                active_count
1604            );
1605            break;
1606        }
1607
1608        // Move cursor up to overwrite previous display (if any)
1609        if prev_display_lines > 0 {
1610            // Move up N lines and clear each one
1611            for _ in 0..prev_display_lines {
1612                print!("\x1b[A\x1b[2K");
1613            }
1614        }
1615
1616        // Build display lines, then print them all
1617        let mut display = Vec::new();
1618        let completed = total_tasks - active_count;
1619        let elapsed = start.elapsed().as_secs();
1620
1621        display.push(format!(
1622            "\n    ─── {} {}/{} done ({} active) · {}s ───",
1623            "▶".blue(),
1624            completed,
1625            total_tasks,
1626            active_count,
1627            format_duration(elapsed),
1628        ));
1629
1630        // Show ALL tasks in this round with their current status
1631        for task_id in &round_state.task_ids {
1632            let status = store.get_status(task_id);
1633            let elapsed_task = store.get_elapsed_secs(task_id).unwrap_or(0);
1634            let stats = store
1635                .session_stats(task_id)
1636                .map(|(events, _)| format!("{}ev", events))
1637                .unwrap_or_default();
1638
1639            let (icon, status_str) = match &status {
1640                Some(SessionStatus::Completed) => ("✓".green(), "done".green()),
1641                Some(SessionStatus::Failed) => ("✗".red(), "fail".red()),
1642                Some(SessionStatus::Running) => ("⟳".blue(), "run".blue()),
1643                Some(SessionStatus::Starting) => ("…".yellow(), "init".yellow()),
1644                None => ("?".dimmed(), "?".dimmed()),
1645            };
1646
1647            // For active tasks, show last tool activity; for done tasks, just show stats
1648            let detail = match &status {
1649                Some(SessionStatus::Running) | Some(SessionStatus::Starting) => {
1650                    // Prefer tool line, fall back to last output line
1651                    let tool_line = store.get_last_tool_line(task_id);
1652                    let activity = tool_line
1653                        .or_else(|| store.get_output(task_id, 1).into_iter().next())
1654                        .unwrap_or_default();
1655                    let trimmed = if activity.len() > 60 {
1656                        format!("{}…", &activity[..59])
1657                    } else {
1658                        activity
1659                    };
1660                    if trimmed.is_empty() {
1661                        format!("{}s {}", format_duration(elapsed_task), stats)
1662                    } else {
1663                        format!("{}s {} {}", format_duration(elapsed_task), stats, trimmed)
1664                    }
1665                }
1666                _ => {
1667                    format!("{}s {}", format_duration(elapsed_task), stats)
1668                }
1669            };
1670
1671            display.push(format!(
1672                "      {} {:>5} [{}] {}",
1673                icon,
1674                task_id.cyan(),
1675                status_str,
1676                detail.dimmed(),
1677            ));
1678        }
1679
1680        // Print and track line count for next overwrite
1681        prev_display_lines = display.len();
1682        for line in &display {
1683            println!("{}", line);
1684        }
1685
1686        tokio::time::sleep(poll_interval).await;
1687    }
1688
1689    // Final summary (printed once, not overwritten)
1690    let elapsed = start.elapsed().as_secs();
1691    let successes = round_state
1692        .task_ids
1693        .iter()
1694        .filter(|id| matches!(store.get_status(id), Some(SessionStatus::Completed)))
1695        .count();
1696    let failures = round_state
1697        .task_ids
1698        .iter()
1699        .filter(|id| matches!(store.get_status(id), Some(SessionStatus::Failed)))
1700        .count();
1701    println!(
1702        "\n    ─── Round complete: {} ok, {} failed, {} total in {}s ───",
1703        format!("{}", successes).green(),
1704        format!("{}", failures).red(),
1705        total_tasks,
1706        format_duration(elapsed),
1707    );
1708    // Show details for failed tasks only
1709    for task_id in &round_state.task_ids {
1710        if matches!(store.get_status(task_id), Some(SessionStatus::Failed)) {
1711            println!("      {} {} — last output:", "✗".red(), task_id.red());
1712            let output = store.get_all_output(task_id);
1713            for line in output.iter().rev().take(5).rev() {
1714                let trimmed = if line.len() > 80 {
1715                    format!("{}…", &line[..79])
1716                } else {
1717                    line.clone()
1718                };
1719                if !trimmed.is_empty() {
1720                    println!("        {}", trimmed.dimmed());
1721                }
1722            }
1723        }
1724    }
1725
1726    // Emit completion events
1727    for task_id in &round_state.task_ids {
1728        if let Some(writer) = event_writer {
1729            let success = matches!(store.get_status(task_id), Some(SessionStatus::Completed));
1730            let _ = writer.log_completed(task_id, success, start.elapsed().as_millis() as u64);
1731        }
1732    }
1733
1734    Ok(round_state)
1735}
1736
1737/// Format seconds into compact human-readable duration (e.g., "45s", "2m30s", "1h05m")
1738fn format_duration(secs: u64) -> String {
1739    if secs < 60 {
1740        format!("{}", secs)
1741    } else if secs < 3600 {
1742        format!("{}m{:02}", secs / 60, secs % 60)
1743    } else {
1744        format!("{}h{:02}m", secs / 3600, (secs % 3600) / 60)
1745    }
1746}
1747
1748/// Generate prompt for server mode (similar to extensions but optimized for OpenCode)
1749fn generate_server_prompt(task: &Task, tag: &str, working_dir: &std::path::Path) -> String {
1750    let details = task
1751        .details
1752        .as_ref()
1753        .map(|d| format!("\n\n## Details\n\n{}", d))
1754        .unwrap_or_default();
1755
1756    let test_strategy = task
1757        .test_strategy
1758        .as_ref()
1759        .map(|t| format!("\n\n## Test Strategy\n\n{}", t))
1760        .unwrap_or_default();
1761
1762    format!(
1763        r#"You are working on task [{id}] in phase "{tag}".
1764
1765## Task: {title}
1766
1767{description}{details}{test_strategy}
1768
1769## Instructions
1770
17711. Implement the task requirements
17722. Test your changes
17733. When complete, run: `scud set-status {id} done --tag {tag}`
1774
1775Working directory: {working_dir}
1776"#,
1777        id = task.id,
1778        tag = tag,
1779        title = task.title,
1780        description = task.description,
1781        details = details,
1782        test_strategy = test_strategy,
1783        working_dir = working_dir.display(),
1784    )
1785}
1786
1787fn wait_for_round_completion(
1788    storage: &Storage,
1789    tasks: &[TaskInfo],
1790    session_name: &str,
1791    stale_timeout: Option<Duration>,
1792    idle_timeout_minutes: u64,
1793    event_writer: Option<&events::EventWriter>,
1794) -> Result<()> {
1795    use std::collections::HashSet;
1796    use std::io::Write;
1797    use std::time::Instant;
1798
1799    let task_ids: Vec<String> = tasks.iter().map(|t| t.task.id.clone()).collect();
1800    let task_tags: HashMap<String, String> = tasks
1801        .iter()
1802        .map(|t| (t.task.id.clone(), t.tag.clone()))
1803        .collect();
1804
1805    let round_start = Instant::now();
1806    let mut completed_tasks: HashSet<String> = HashSet::new();
1807    let spinner_chars = ['⠋', '⠙', '⠹', '⠸', '⠼', '⠴', '⠦', '⠧', '⠇', '⠏'];
1808    let mut spin_idx: usize = 0;
1809    let mut last_orphan_check = Instant::now();
1810
1811    // Track per-task content hashes for heartbeat (1c)
1812    let mut last_content_hashes: HashMap<String, u64> = HashMap::new();
1813    let mut last_activity: HashMap<String, Instant> = HashMap::new();
1814    for task_id in &task_ids {
1815        last_activity.insert(task_id.clone(), Instant::now());
1816    }
1817
1818    loop {
1819        let mut still_running: Vec<String> = Vec::new();
1820
1821        for task_id in &task_ids {
1822            if completed_tasks.contains(task_id) {
1823                continue;
1824            }
1825
1826            if let Some(tag) = task_tags.get(task_id) {
1827                if let Ok(phase) = storage.load_group(tag) {
1828                    if let Some(task) = phase.get_task(task_id) {
1829                        if task.status == TaskStatus::InProgress
1830                            || task.status == TaskStatus::Pending
1831                        {
1832                            still_running.push(task_id.clone());
1833                        } else {
1834                            // Task just completed
1835                            completed_tasks.insert(task_id.clone());
1836                            let elapsed = round_start.elapsed().as_secs();
1837                            let status_icon = if task.status == TaskStatus::Done {
1838                                "✓".green()
1839                            } else {
1840                                "✗".red()
1841                            };
1842                            // Clear the status line, then print completion
1843                            print!("\r{}\r", " ".repeat(80));
1844                            println!(
1845                                "    {} {} completed ({}s)",
1846                                status_icon,
1847                                task_id.cyan(),
1848                                elapsed
1849                            );
1850                            // Emit completion event (1b)
1851                            if let Some(writer) = event_writer {
1852                                let success = task.status == TaskStatus::Done;
1853                                let _ = writer.log_completed(
1854                                    task_id,
1855                                    success,
1856                                    round_start.elapsed().as_millis() as u64,
1857                                );
1858                            }
1859                        }
1860                    }
1861                }
1862            }
1863        }
1864
1865        if still_running.is_empty() {
1866            // Clear status line
1867            print!("\r{}\r", " ".repeat(80));
1868            let _ = std::io::stdout().flush();
1869            break;
1870        }
1871
1872        // Periodic orphan detection (1e): every 30s, check if tmux windows still exist
1873        if last_orphan_check.elapsed() >= Duration::from_secs(30) {
1874            last_orphan_check = Instant::now();
1875            for task_id in &still_running {
1876                if !tmux_window_exists_for_task(session_name, task_id) {
1877                    // Agent died - tmux window gone but task still InProgress
1878                    print!("\r{}\r", " ".repeat(80));
1879                    println!(
1880                        "    {} {} agent died (tmux window gone), marking failed",
1881                        "⚠".yellow(),
1882                        task_id.cyan()
1883                    );
1884                    // Mark as Failed
1885                    if let Some(tag) = task_tags.get(task_id) {
1886                        if let Ok(mut phase) = storage.load_group(tag) {
1887                            if let Some(task) = phase.get_task_mut(task_id) {
1888                                task.set_status(TaskStatus::Failed);
1889                                let _ = storage.update_group(tag, &phase);
1890                            }
1891                        }
1892                    }
1893                    completed_tasks.insert(task_id.clone());
1894                    // Emit failed event
1895                    if let Some(writer) = event_writer {
1896                        let event = events::AgentEvent::new(
1897                            writer.session_id(),
1898                            task_id,
1899                            events::EventKind::Failed {
1900                                reason: "agent window disappeared".to_string(),
1901                            },
1902                        );
1903                        let _ = writer.write(&event);
1904                    }
1905                }
1906            }
1907        }
1908
1909        // Stale task timeout (1d): check if any task exceeded the threshold
1910        if let Some(timeout) = stale_timeout {
1911            if round_start.elapsed() >= timeout {
1912                for task_id in &still_running {
1913                    // Cross-reference with tmux window existence
1914                    if !tmux_window_exists_for_task(session_name, task_id) {
1915                        print!("\r{}\r", " ".repeat(80));
1916                        println!(
1917                            "    {} {} stale (timeout + no tmux window), resetting to pending",
1918                            "⚠".yellow(),
1919                            task_id.cyan()
1920                        );
1921                        if let Some(tag) = task_tags.get(task_id) {
1922                            if let Ok(mut phase) = storage.load_group(tag) {
1923                                if let Some(task) = phase.get_task_mut(task_id) {
1924                                    task.set_status(TaskStatus::Pending);
1925                                    let _ = storage.update_group(tag, &phase);
1926                                }
1927                            }
1928                        }
1929                        completed_tasks.insert(task_id.clone());
1930                    }
1931                }
1932            }
1933        }
1934
1935        // Heartbeat check (1c): poll tmux panes for activity
1936        for task_id in &still_running {
1937            if completed_tasks.contains(task_id) {
1938                continue;
1939            }
1940            let window_name = format!("task-{}", task_id);
1941            let window_target = format!("{}:{}", session_name, window_name);
1942            if let Ok(output) = std::process::Command::new("tmux")
1943                .args(["capture-pane", "-t", &window_target, "-p", "-S", "-20"])
1944                .output()
1945            {
1946                if output.status.success() {
1947                    let content = String::from_utf8_lossy(&output.stdout);
1948                    let hash = {
1949                        use std::hash::{Hash, Hasher};
1950                        let mut hasher = std::collections::hash_map::DefaultHasher::new();
1951                        content.hash(&mut hasher);
1952                        hasher.finish()
1953                    };
1954                    let prev_hash = last_content_hashes.get(task_id).copied();
1955                    if prev_hash.is_none() || prev_hash != Some(hash) {
1956                        last_activity.insert(task_id.clone(), Instant::now());
1957                    }
1958                    last_content_hashes.insert(task_id.clone(), hash);
1959                }
1960            }
1961        }
1962
1963        // Idle timeout failure detection: if agent has been idle AND shows shell prompt
1964        let idle_timeout = Duration::from_secs(idle_timeout_minutes * 60);
1965        for task_id in &still_running {
1966            if completed_tasks.contains(task_id) {
1967                continue;
1968            }
1969
1970            // Check if this task has been idle long enough
1971            let is_idle_timeout = last_activity
1972                .get(task_id)
1973                .map(|t| t.elapsed() > idle_timeout)
1974                .unwrap_or(false);
1975
1976            if !is_idle_timeout {
1977                continue;
1978            }
1979
1980            // Check if the pane shows a shell prompt (process exited)
1981            let window_name = format!("task-{}", task_id);
1982            if terminal::tmux_pane_shows_prompt(session_name, &window_name) {
1983                print!("\r{}\r", " ".repeat(80));
1984                println!(
1985                    "    {} {} agent idle with shell prompt, marking failed",
1986                    "⚠".yellow(),
1987                    task_id.cyan()
1988                );
1989
1990                // Mark as Failed
1991                if let Some(tag) = task_tags.get(task_id) {
1992                    if let Ok(mut phase) = storage.load_group(tag) {
1993                        if let Some(task) = phase.get_task_mut(task_id) {
1994                            task.set_status(TaskStatus::Failed);
1995                            let _ = storage.update_group(tag, &phase);
1996                        }
1997                    }
1998                }
1999                completed_tasks.insert(task_id.clone());
2000
2001                // Emit failed event
2002                if let Some(writer) = event_writer {
2003                    let event = events::AgentEvent::new(
2004                        writer.session_id(),
2005                        task_id,
2006                        events::EventKind::Failed {
2007                            reason: "agent idle with shell prompt (process crashed)".to_string(),
2008                        },
2009                    );
2010                    let _ = writer.write(&event);
2011                }
2012            }
2013        }
2014
2015        // Print status line (1a)
2016        let elapsed = round_start.elapsed().as_secs();
2017        let spinner = spinner_chars[spin_idx % spinner_chars.len()];
2018        spin_idx += 1;
2019
2020        // Check for idle agents
2021        let idle_agents: Vec<&String> = still_running
2022            .iter()
2023            .filter(|id| {
2024                !completed_tasks.contains(*id)
2025                    && last_activity
2026                        .get(*id)
2027                        .map(|t| t.elapsed() > Duration::from_secs(60))
2028                        .unwrap_or(false)
2029            })
2030            .collect();
2031
2032        let running_count = still_running
2033            .iter()
2034            .filter(|id| !completed_tasks.contains(*id))
2035            .count();
2036
2037        let status = if running_count <= 2 {
2038            let names: Vec<&str> = still_running
2039                .iter()
2040                .filter(|id| !completed_tasks.contains(*id))
2041                .map(|s| s.as_str())
2042                .collect();
2043            format!("{} running: {}", running_count, names.join(", "))
2044        } else {
2045            format!("{} running", running_count)
2046        };
2047
2048        let idle_note = if !idle_agents.is_empty() {
2049            format!(" ({} idle >60s)", idle_agents.len())
2050        } else {
2051            String::new()
2052        };
2053
2054        print!(
2055            "\r    Waiting... [{}] {} {}s{}",
2056            status, spinner, elapsed, idle_note
2057        );
2058        let _ = std::io::stdout().flush();
2059
2060        thread::sleep(Duration::from_secs(5));
2061    }
2062
2063    Ok(())
2064}
2065
2066fn collect_changed_files(
2067    working_dir: &std::path::Path,
2068    start_commit: Option<&str>,
2069) -> Result<Vec<String>> {
2070    use std::process::Command;
2071
2072    // Construct the commit range: start_commit..HEAD or fallback to HEAD~1..HEAD
2073    let range = match start_commit {
2074        Some(commit) => format!("{}..HEAD", commit),
2075        None => "HEAD~1..HEAD".to_string(),
2076    };
2077
2078    let output = Command::new("git")
2079        .current_dir(working_dir)
2080        .args(["diff", "--name-only", &range])
2081        .output()?;
2082
2083    let files: Vec<String> = String::from_utf8_lossy(&output.stdout)
2084        .lines()
2085        .map(|s| s.to_string())
2086        .collect();
2087
2088    Ok(files)
2089}
2090
2091fn run_dry_run(
2092    project_root: Option<PathBuf>,
2093    phase_tag: &str,
2094    round_size: usize,
2095    all_tags: bool,
2096) -> Result<()> {
2097    let storage = Storage::new(project_root);
2098    let all_phases = storage.load_tasks()?;
2099
2100    let waves = compute_waves_from_tasks(&all_phases, phase_tag, all_tags)?;
2101
2102    println!("{}", "Execution Plan (dry-run)".yellow().bold());
2103    println!("{}", "═".repeat(50).yellow());
2104    println!();
2105
2106    let mut total_tasks = 0;
2107    let mut total_rounds = 0;
2108
2109    for (wave_idx, wave) in waves.iter().enumerate() {
2110        let rounds = wave.len().div_ceil(round_size);
2111        total_tasks += wave.len();
2112        total_rounds += rounds;
2113
2114        println!(
2115            "{} {} - {} task(s), {} round(s)",
2116            "Wave".blue().bold(),
2117            wave_idx + 1,
2118            wave.len(),
2119            rounds
2120        );
2121
2122        for (round_idx, chunk) in wave.chunks(round_size).enumerate() {
2123            println!("  {} {}:", "Round".yellow(), round_idx + 1);
2124            for info in chunk {
2125                println!(
2126                    "    {} {} | {}",
2127                    "○".white(),
2128                    info.task.id.cyan(),
2129                    info.task.title
2130                );
2131            }
2132        }
2133        println!();
2134    }
2135
2136    println!("{}", "Summary".blue().bold());
2137    println!("{}", "-".repeat(30).blue());
2138    println!("  Total waves:  {}", waves.len());
2139    println!("  Total tasks:  {}", total_tasks);
2140    println!("  Total rounds: {}", total_rounds);
2141
2142    if total_rounds > 0 {
2143        let speedup = total_tasks as f64 / total_rounds as f64;
2144        println!("  Speedup:      {}", format!("{:.1}x", speedup).green());
2145    }
2146
2147    println!();
2148    println!("{}", "No agents spawned (dry-run mode).".yellow());
2149
2150    Ok(())
2151}
2152
2153// ============================================================================
2154// Review Agent Support
2155// ============================================================================
2156
2157/// Result of a review operation
2158#[derive(Debug)]
2159pub struct ReviewResult {
2160    /// Whether all reviewed tasks passed
2161    pub all_passed: bool,
2162    /// Task IDs that need improvement
2163    pub tasks_to_improve: Vec<String>,
2164}
2165
2166/// Spawn a reviewer agent and wait for it to complete
2167#[allow(dead_code)]
2168pub fn spawn_reviewer(
2169    working_dir: &std::path::Path,
2170    session_name: &str,
2171    summary: &WaveSummary,
2172    wave_tasks: &[(String, String)], // (id, title)
2173    review_all: bool,
2174) -> Result<ReviewResult> {
2175    println!();
2176    println!("  {} Spawning reviewer agent...", "Review:".magenta());
2177
2178    let prompt = agent::generate_review_prompt(summary, wave_tasks, review_all);
2179
2180    // Load reviewer agent definition for harness/model
2181    let agent_def = AgentDef::try_load("reviewer", working_dir).unwrap_or_else(|| {
2182        // Fallback: claude/opus
2183        AgentDef {
2184            agent: crate::agents::AgentMeta {
2185                name: "reviewer".to_string(),
2186                description: "Code reviewer".to_string(),
2187            },
2188            model: crate::agents::ModelConfig {
2189                harness: "claude".to_string(),
2190                model: Some("opus".to_string()),
2191            },
2192            prompt: Default::default(),
2193        }
2194    });
2195
2196    let harness = agent_def.harness()?;
2197    let model = agent_def.model();
2198
2199    // Spawn reviewer
2200    let spawn_config = terminal::SpawnConfig {
2201        task_id: &format!("review-wave-{}", summary.wave_number),
2202        prompt: &prompt,
2203        working_dir,
2204        session_name,
2205        harness,
2206        model,
2207        task_list_id: None,
2208    };
2209    terminal::spawn_tmux_agent(&spawn_config)?;
2210
2211    println!(
2212        "    {} Reviewer spawned, waiting for completion...",
2213        "✓".green()
2214    );
2215
2216    // Wait for reviewer to complete by watching for output file
2217    wait_for_review_completion(working_dir, summary.wave_number)
2218}
2219
2220/// Wait for the review to complete by polling for marker file
2221fn wait_for_review_completion(
2222    working_dir: &std::path::Path,
2223    wave_number: usize,
2224) -> Result<ReviewResult> {
2225    let marker_path = working_dir
2226        .join(".scud")
2227        .join(format!("review-complete-{}", wave_number));
2228
2229    let timeout = Duration::from_secs(1800); // 30 minute timeout
2230    let start = std::time::Instant::now();
2231
2232    loop {
2233        if start.elapsed() > timeout {
2234            println!("    {} Review timed out after 30 minutes", "!".yellow());
2235            return Ok(ReviewResult {
2236                all_passed: true, // Assume pass on timeout
2237                tasks_to_improve: vec![],
2238            });
2239        }
2240
2241        if marker_path.exists() {
2242            let content = std::fs::read_to_string(&marker_path)?;
2243            std::fs::remove_file(&marker_path)?; // Clean up
2244
2245            let all_passed = content.contains("ALL_PASS");
2246            let tasks_to_improve = if content.contains("IMPROVE_TASKS:") {
2247                content
2248                    .lines()
2249                    .find(|l| l.starts_with("IMPROVE_TASKS:"))
2250                    .map(|l| {
2251                        l.strip_prefix("IMPROVE_TASKS:")
2252                            .unwrap_or("")
2253                            .split(',')
2254                            .map(|s| s.trim().to_string())
2255                            .filter(|s| !s.is_empty())
2256                            .collect()
2257                    })
2258                    .unwrap_or_default()
2259            } else {
2260                vec![]
2261            };
2262
2263            println!("    {} Review complete", "✓".green());
2264            if !all_passed {
2265                println!(
2266                    "    {} Tasks needing improvement: {}",
2267                    "!".yellow(),
2268                    tasks_to_improve.join(", ")
2269                );
2270            }
2271
2272            return Ok(ReviewResult {
2273                all_passed,
2274                tasks_to_improve,
2275            });
2276        }
2277
2278        thread::sleep(Duration::from_secs(5));
2279    }
2280}
2281
2282// ============================================================================
2283// Repair Loop Support
2284// ============================================================================
2285
2286/// Run repair loop for failed validation
2287#[allow(dead_code)]
2288#[allow(clippy::too_many_arguments)]
2289pub fn run_repair_loop(
2290    storage: &Storage,
2291    working_dir: &std::path::Path,
2292    session_name: &str,
2293    bp_config: &BackpressureConfig,
2294    wave_state: &WaveState,
2295    validation_result: &ValidationResult,
2296    max_attempts: usize,
2297) -> Result<bool> {
2298    let wave_tasks = wave_state.all_task_ids();
2299    let task_tags = wave_state.task_tags();
2300
2301    println!();
2302    println!("  {} Analyzing failure attribution...", "Repair:".magenta());
2303
2304    // Get the first failed command for attribution
2305    let failed_cmd = validation_result.results.iter().find(|r| !r.passed);
2306    let failed_cmd = match failed_cmd {
2307        Some(cmd) => cmd,
2308        None => return Ok(true), // No failures? Shouldn't happen
2309    };
2310
2311    // Attribute the failure
2312    let attribution = attribute_failure(
2313        working_dir,
2314        &failed_cmd.stderr,
2315        &failed_cmd.stdout,
2316        &wave_tasks,
2317        wave_state.start_commit.as_deref(),
2318    )?;
2319
2320    match attribution.confidence {
2321        AttributionConfidence::High => {
2322            println!(
2323                "    {} High confidence: task {} responsible",
2324                "✓".green(),
2325                attribution.responsible_tasks.join(", ")
2326            );
2327        }
2328        AttributionConfidence::Medium => {
2329            println!(
2330                "    {} Medium confidence: tasks {} may be responsible",
2331                "~".yellow(),
2332                attribution.responsible_tasks.join(", ")
2333            );
2334        }
2335        AttributionConfidence::Low => {
2336            println!(
2337                "    {} Low confidence: cannot determine specific task",
2338                "!".red()
2339            );
2340        }
2341    }
2342
2343    // Mark cleared tasks as done
2344    for task_id in &attribution.cleared_tasks {
2345        if let Some(tag) = task_tags
2346            .iter()
2347            .find(|(id, _)| id == task_id)
2348            .map(|(_, t)| t)
2349        {
2350            if let Ok(mut phase) = storage.load_group(tag) {
2351                if let Some(task) = phase.get_task_mut(task_id) {
2352                    task.set_status(TaskStatus::Done);
2353                    let _ = storage.update_group(tag, &phase);
2354                    println!("    {} Cleared: {} (not responsible)", "✓".green(), task_id);
2355                }
2356            }
2357        }
2358    }
2359
2360    // Collect task info for batch repair
2361    let mut task_infos: Vec<(String, String, Vec<String>)> = Vec::new();
2362    for task_id in &attribution.responsible_tasks {
2363        let (task, _tag) = match find_task_with_tag(storage, task_id, &task_tags) {
2364            Some(t) => t,
2365            None => continue,
2366        };
2367
2368        let task_files = crate::attribution::get_task_changed_files(
2369            working_dir,
2370            task_id,
2371            wave_state.start_commit.as_deref(),
2372        )
2373        .unwrap_or_default()
2374        .into_iter()
2375        .collect();
2376
2377        task_infos.push((task_id.clone(), task.title.clone(), task_files));
2378    }
2379
2380    // Parse error locations for the prompt
2381    let error_locations: Vec<(String, Option<u32>)> =
2382        crate::attribution::parse_error_locations(&failed_cmd.stderr, &failed_cmd.stdout);
2383
2384    // Attempt batch repairs
2385    for attempt in 1..=max_attempts {
2386        println!();
2387        println!(
2388            "  {} Batch repair attempt {}/{}",
2389            "Repair:".magenta(),
2390            attempt,
2391            max_attempts
2392        );
2393
2394        // Generate batch repair prompt
2395        let prompt = agent::generate_batch_repair_prompt(
2396            &task_infos,
2397            &failed_cmd.command,
2398            &format!("{}\n{}", failed_cmd.stderr, failed_cmd.stdout),
2399            &error_locations,
2400        );
2401
2402        // Spawn single batch repairer
2403        spawn_batch_repairer(working_dir, session_name, &prompt)?;
2404
2405        // Wait for batch repair completion
2406        let repair_result = wait_for_batch_repair_completion(working_dir)?;
2407
2408        match repair_result {
2409            BatchRepairResult::Success(fixed_tasks) => {
2410                // Re-run validation
2411                println!();
2412                println!("  {} Re-running validation...", "Validate:".magenta());
2413                let new_result = crate::backpressure::run_validation(working_dir, bp_config)?;
2414
2415                if new_result.all_passed {
2416                    println!("    {} Validation passed after batch repair!", "✓".green());
2417
2418                    // Mark all responsible tasks as done
2419                    for task_id in &attribution.responsible_tasks {
2420                        if let Some(tag) = task_tags
2421                            .iter()
2422                            .find(|(id, _)| id == task_id)
2423                            .map(|(_, t)| t)
2424                        {
2425                            if let Ok(mut phase) = storage.load_group(tag) {
2426                                if let Some(task) = phase.get_task_mut(task_id) {
2427                                    task.set_status(TaskStatus::Done);
2428                                    let _ = storage.update_group(tag, &phase);
2429                                }
2430                            }
2431                        }
2432                    }
2433
2434                    return Ok(true);
2435                }
2436
2437                println!(
2438                    "    {} Validation still failing (fixed: {}), will retry...",
2439                    "!".yellow(),
2440                    fixed_tasks.join(", ")
2441                );
2442            }
2443            BatchRepairResult::Partial(fixed, blocked) => {
2444                // Mark fixed tasks as done, blocked as blocked
2445                for task_id in &fixed {
2446                    if let Some(tag) = task_tags
2447                        .iter()
2448                        .find(|(id, _)| id == task_id)
2449                        .map(|(_, t)| t)
2450                    {
2451                        if let Ok(mut phase) = storage.load_group(tag) {
2452                            if let Some(task) = phase.get_task_mut(task_id) {
2453                                task.set_status(TaskStatus::Done);
2454                                let _ = storage.update_group(tag, &phase);
2455                                println!("    {} Fixed: {}", "✓".green(), task_id);
2456                            }
2457                        }
2458                    }
2459                }
2460                for task_id in &blocked {
2461                    if let Some(tag) = task_tags
2462                        .iter()
2463                        .find(|(id, _)| id == task_id)
2464                        .map(|(_, t)| t)
2465                    {
2466                        if let Ok(mut phase) = storage.load_group(tag) {
2467                            if let Some(task) = phase.get_task_mut(task_id) {
2468                                task.set_status(TaskStatus::Blocked);
2469                                let _ = storage.update_group(tag, &phase);
2470                                println!("    {} Blocked: {}", "!".yellow(), task_id);
2471                            }
2472                        }
2473                    }
2474                }
2475
2476                // Re-run validation
2477                let new_result = crate::backpressure::run_validation(working_dir, bp_config)?;
2478                if new_result.all_passed {
2479                    println!("    {} Validation passed!", "✓".green());
2480                    return Ok(true);
2481                }
2482            }
2483            BatchRepairResult::Blocked(reason) => {
2484                println!("    {} Batch repair blocked: {}", "!".red(), reason);
2485            }
2486            BatchRepairResult::Timeout => {
2487                println!("    {} Batch repair timed out", "!".yellow());
2488            }
2489        }
2490    }
2491
2492    // Max attempts reached - mark responsible tasks as failed
2493    println!();
2494    println!("  {} Max repair attempts reached", "!".red());
2495
2496    for task_id in &attribution.responsible_tasks {
2497        if let Some(tag) = task_tags
2498            .iter()
2499            .find(|(id, _)| id == task_id)
2500            .map(|(_, t)| t)
2501        {
2502            if let Ok(mut phase) = storage.load_group(tag) {
2503                if let Some(task) = phase.get_task_mut(task_id) {
2504                    task.set_status(TaskStatus::Failed);
2505                    let _ = storage.update_group(tag, &phase);
2506                    println!("    {} Marked failed: {}", "✗".red(), task_id);
2507                }
2508            }
2509        }
2510    }
2511
2512    Ok(false)
2513}
2514
2515/// Spawn a repairer agent for a specific task (kept as fallback for single-task scenarios)
2516#[allow(dead_code)]
2517fn spawn_repairer(
2518    working_dir: &std::path::Path,
2519    session_name: &str,
2520    task_id: &str,
2521    prompt: &str,
2522) -> Result<()> {
2523    // Load repairer agent definition
2524    let agent_def = AgentDef::try_load("repairer", working_dir).unwrap_or_else(|| AgentDef {
2525        agent: crate::agents::AgentMeta {
2526            name: "repairer".to_string(),
2527            description: "Repair agent".to_string(),
2528        },
2529        model: crate::agents::ModelConfig {
2530            harness: "claude".to_string(),
2531            model: Some("opus".to_string()),
2532        },
2533        prompt: Default::default(),
2534    });
2535
2536    let harness = agent_def.harness()?;
2537    let model = agent_def.model();
2538
2539    let spawn_config = terminal::SpawnConfig {
2540        task_id: &format!("repair-{}", task_id),
2541        prompt,
2542        working_dir,
2543        session_name,
2544        harness,
2545        model,
2546        task_list_id: None,
2547    };
2548    terminal::spawn_tmux_agent(&spawn_config)?;
2549
2550    println!("    {} Spawned repairer for {}", "✓".green(), task_id);
2551    Ok(())
2552}
2553
2554/// Wait for a repair to complete by polling for marker file (kept as fallback for single-task scenarios)
2555#[allow(dead_code)]
2556fn wait_for_repair_completion_task(working_dir: &std::path::Path, task_id: &str) -> Result<bool> {
2557    let marker_path = working_dir
2558        .join(".scud")
2559        .join(format!("repair-complete-{}", task_id));
2560
2561    let timeout = Duration::from_secs(1800); // 30 minute timeout
2562    let start = std::time::Instant::now();
2563
2564    loop {
2565        if start.elapsed() > timeout {
2566            println!("    {} Repair timed out for {}", "!".yellow(), task_id);
2567            return Ok(false);
2568        }
2569
2570        if marker_path.exists() {
2571            let content = std::fs::read_to_string(&marker_path)?;
2572            std::fs::remove_file(&marker_path)?;
2573
2574            let success = content.contains("SUCCESS");
2575            if success {
2576                println!("    {} Repair completed for {}", "✓".green(), task_id);
2577            } else {
2578                println!("    {} Repair blocked for {}", "!".yellow(), task_id);
2579            }
2580
2581            return Ok(success);
2582        }
2583
2584        thread::sleep(Duration::from_secs(5));
2585    }
2586}
2587
2588/// Result of a batch repair attempt
2589enum BatchRepairResult {
2590    Success(Vec<String>),              // All fixed, list of task IDs
2591    Partial(Vec<String>, Vec<String>), // Some fixed, some blocked
2592    Blocked(String),                   // Completely blocked with reason
2593    Timeout,                           // Timed out
2594}
2595
2596/// Spawn a batch repairer agent
2597fn spawn_batch_repairer(
2598    working_dir: &std::path::Path,
2599    session_name: &str,
2600    prompt: &str,
2601) -> Result<()> {
2602    // Load repairer agent definition
2603    let agent_def = AgentDef::try_load("repairer", working_dir).unwrap_or_else(|| AgentDef {
2604        agent: crate::agents::AgentMeta {
2605            name: "batch-repairer".to_string(),
2606            description: "Batch repair agent".to_string(),
2607        },
2608        model: crate::agents::ModelConfig {
2609            harness: "claude".to_string(),
2610            model: Some("opus".to_string()),
2611        },
2612        prompt: Default::default(),
2613    });
2614
2615    let harness = agent_def.harness()?;
2616    let model = agent_def.model();
2617
2618    let spawn_config = terminal::SpawnConfig {
2619        task_id: "batch-repair",
2620        prompt,
2621        working_dir,
2622        session_name,
2623        harness,
2624        model,
2625        task_list_id: None,
2626    };
2627    terminal::spawn_tmux_agent(&spawn_config)?;
2628
2629    println!("    {} Spawned batch repairer", "✓".green());
2630    Ok(())
2631}
2632
2633/// Wait for batch repair to complete by polling for marker file
2634fn wait_for_batch_repair_completion(working_dir: &std::path::Path) -> Result<BatchRepairResult> {
2635    let marker_path = working_dir.join(".scud").join("batch-repair-complete");
2636
2637    let timeout = Duration::from_secs(2700); // 45 minute timeout for batch
2638    let start = std::time::Instant::now();
2639
2640    loop {
2641        if start.elapsed() > timeout {
2642            return Ok(BatchRepairResult::Timeout);
2643        }
2644
2645        if marker_path.exists() {
2646            let content = std::fs::read_to_string(&marker_path)?;
2647            let _ = std::fs::remove_file(&marker_path); // Clean up
2648
2649            // Parse the marker file
2650            if content.contains("SUCCESS") {
2651                let fixed = parse_task_list(&content, "FIXED_TASKS:");
2652                return Ok(BatchRepairResult::Success(fixed));
2653            } else if content.contains("PARTIAL") {
2654                let fixed = parse_task_list(&content, "FIXED_TASKS:");
2655                let blocked = parse_task_list(&content, "BLOCKED_TASKS:");
2656                return Ok(BatchRepairResult::Partial(fixed, blocked));
2657            } else if content.contains("BLOCKED") {
2658                let reason = content
2659                    .lines()
2660                    .find(|l| l.starts_with("REASON:"))
2661                    .map(|l| l.trim_start_matches("REASON:").trim().to_string())
2662                    .unwrap_or_else(|| "Unknown reason".to_string());
2663                return Ok(BatchRepairResult::Blocked(reason));
2664            }
2665        }
2666
2667        thread::sleep(Duration::from_secs(5));
2668    }
2669}
2670
2671/// Parse comma-separated task list from marker file line
2672fn parse_task_list(content: &str, prefix: &str) -> Vec<String> {
2673    content
2674        .lines()
2675        .find(|l| l.starts_with(prefix))
2676        .map(|l| {
2677            l.trim_start_matches(prefix)
2678                .trim()
2679                .split(',')
2680                .map(|s| s.trim().to_string())
2681                .filter(|s| !s.is_empty())
2682                .collect()
2683        })
2684        .unwrap_or_default()
2685}
2686
2687/// Find a task by ID along with its tag
2688fn find_task_with_tag(
2689    storage: &Storage,
2690    task_id: &str,
2691    task_tags: &[(String, String)],
2692) -> Option<(Task, String)> {
2693    let tag = task_tags.iter().find(|(id, _)| id == task_id)?.1.clone();
2694    let phase = storage.load_group(&tag).ok()?;
2695    let task = phase.get_task(task_id)?.clone();
2696    Some((task, tag))
2697}