Skip to main content

scud/commands/swarm/
session.rs

1//! Swarm session state tracking
2//!
3//! Tracks the state of a swarm execution session, including:
4//! - Waves executed
5//! - Rounds within waves
6//! - Task completion status
7//! - Validation results
8
9use anyhow::Result;
10use serde::{Deserialize, Serialize};
11use std::fs;
12use std::path::PathBuf;
13use std::process::Command;
14
15use crate::backpressure::ValidationResult;
16use crate::commands::spawn::monitor::{
17    AgentState, AgentStatus, AgentView, MonitorableSession, SpawnSession, StatusCounts,
18    WaveTaskState, WaveTaskView, WaveView,
19};
20
21/// Get the current git commit SHA
22pub fn get_current_commit() -> Option<String> {
23    Command::new("git")
24        .args(["rev-parse", "HEAD"])
25        .output()
26        .ok()
27        .and_then(|output| {
28            if output.status.success() {
29                Some(String::from_utf8_lossy(&output.stdout).trim().to_string())
30            } else {
31                None
32            }
33        })
34}
35
36/// Brief summary of what was done in a wave
37/// This is NOT accumulated context - just a simple summary for the next wave
38#[derive(Debug, Clone, Serialize, Deserialize)]
39pub struct WaveSummary {
40    /// Wave number
41    pub wave_number: usize,
42    /// Tasks that were completed
43    pub tasks_completed: Vec<String>,
44    /// Files that were changed
45    pub files_changed: Vec<String>,
46}
47
48impl WaveSummary {
49    /// Generate a brief text summary
50    pub fn to_text(&self) -> String {
51        let mut lines = Vec::new();
52
53        lines.push(format!(
54            "Wave {} completed {} task(s):",
55            self.wave_number,
56            self.tasks_completed.len()
57        ));
58
59        for task_id in &self.tasks_completed {
60            lines.push(format!("  - {}", task_id));
61        }
62
63        if !self.files_changed.is_empty() {
64            let file_summary = if self.files_changed.len() <= 5 {
65                self.files_changed.join(", ")
66            } else {
67                format!(
68                    "{} and {} more",
69                    self.files_changed[..5].join(", "),
70                    self.files_changed.len() - 5
71                )
72            };
73            lines.push(format!("Files changed: {}", file_summary));
74        }
75
76        lines.join("\n")
77    }
78}
79
80/// State of a single round within a wave
81#[derive(Debug, Clone, Serialize, Deserialize)]
82pub struct RoundState {
83    /// Round number (0-indexed)
84    pub round_number: usize,
85    /// Task IDs executed in this round
86    pub task_ids: Vec<String>,
87    /// Tags for each task
88    pub tags: Vec<String>,
89    /// Tasks that failed to spawn
90    pub failures: Vec<String>,
91    /// Start time
92    pub started_at: String,
93    /// End time (set when complete)
94    pub completed_at: Option<String>,
95}
96
97impl RoundState {
98    pub fn new(round_number: usize) -> Self {
99        Self {
100            round_number,
101            task_ids: Vec::new(),
102            tags: Vec::new(),
103            failures: Vec::new(),
104            started_at: chrono::Utc::now().to_rfc3339(),
105            completed_at: None,
106        }
107    }
108
109    pub fn mark_complete(&mut self) {
110        self.completed_at = Some(chrono::Utc::now().to_rfc3339());
111    }
112}
113
114/// State of a code review for a wave
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct ReviewState {
117    /// Tasks that were reviewed
118    pub reviewed_tasks: Vec<String>,
119    /// Whether all reviewed tasks passed
120    pub all_passed: bool,
121    /// Tasks flagged for improvement
122    pub tasks_needing_improvement: Vec<String>,
123    /// When the review completed
124    pub completed_at: String,
125}
126
127/// Record of a repair attempt
128#[derive(Debug, Clone, Serialize, Deserialize)]
129pub struct RepairAttempt {
130    /// Attempt number (1-indexed)
131    pub attempt_number: usize,
132    /// Tasks attributed as responsible for the failure
133    pub attributed_tasks: Vec<String>,
134    /// Tasks cleared as not responsible
135    pub cleared_tasks: Vec<String>,
136    /// Attribution confidence level: "high", "medium", "low"
137    pub attribution_confidence: String,
138    /// Whether validation passed after this repair
139    pub validation_passed: bool,
140    /// When the repair attempt completed
141    pub completed_at: String,
142}
143
144/// State of a single wave
145#[derive(Debug, Clone, Serialize, Deserialize)]
146pub struct WaveState {
147    /// Wave number (1-indexed)
148    pub wave_number: usize,
149    /// Rounds executed in this wave
150    pub rounds: Vec<RoundState>,
151    /// Validation result (if validation was run)
152    pub validation: Option<ValidationResult>,
153    /// Summary of what was done
154    pub summary: Option<WaveSummary>,
155    /// Git commit SHA at wave start (for tracking changes)
156    #[serde(default)]
157    pub start_commit: Option<String>,
158    /// Review result (if review was run)
159    #[serde(default)]
160    pub review: Option<ReviewState>,
161    /// Repair attempts (if validation failed)
162    #[serde(default)]
163    pub repairs: Vec<RepairAttempt>,
164    /// Start time
165    pub started_at: String,
166    /// End time (set when complete)
167    pub completed_at: Option<String>,
168}
169
170impl WaveState {
171    pub fn new(wave_number: usize) -> Self {
172        Self {
173            wave_number,
174            rounds: Vec::new(),
175            validation: None,
176            summary: None,
177            start_commit: get_current_commit(),
178            review: None,
179            repairs: Vec::new(),
180            started_at: chrono::Utc::now().to_rfc3339(),
181            completed_at: None,
182        }
183    }
184
185    pub fn mark_complete(&mut self) {
186        self.completed_at = Some(chrono::Utc::now().to_rfc3339());
187    }
188
189    /// Create a WaveState from an execution result
190    pub fn from_execution_result(wave_number: usize, result: WaveExecutionResult) -> Self {
191        let mut state = Self::new(wave_number);
192        state.rounds.push(result.round_state);
193        state
194    }
195
196    /// Apply an execution result to this wave
197    pub fn apply_execution_result(&mut self, result: WaveExecutionResult) {
198        self.rounds.push(result.round_state);
199    }
200
201    /// Get all task IDs from all rounds
202    pub fn all_task_ids(&self) -> Vec<String> {
203        self.rounds
204            .iter()
205            .flat_map(|r| r.task_ids.clone())
206            .collect()
207    }
208
209    /// Get task ID to tag mapping
210    pub fn task_tags(&self) -> Vec<(String, String)> {
211        self.rounds
212            .iter()
213            .flat_map(|r| {
214                r.task_ids
215                    .iter()
216                    .zip(r.tags.iter())
217                    .map(|(id, tag)| (id.clone(), tag.clone()))
218            })
219            .collect()
220    }
221}
222
223/// Full swarm session state
224#[derive(Debug, Clone, Serialize, Deserialize)]
225pub struct SwarmSession {
226    /// Session name
227    pub session_name: String,
228    /// Tag being executed
229    pub tag: String,
230    /// Terminal type
231    pub terminal: String,
232    /// Working directory
233    pub working_dir: String,
234    /// Round size (max tasks per round)
235    pub round_size: usize,
236    /// Waves executed
237    pub waves: Vec<WaveState>,
238    /// Session start time
239    pub started_at: String,
240    /// Session end time
241    pub completed_at: Option<String>,
242}
243
244impl SwarmSession {
245    pub fn new(
246        session_name: &str,
247        tag: &str,
248        terminal: &str,
249        working_dir: &str,
250        round_size: usize,
251    ) -> Self {
252        Self {
253            session_name: session_name.to_string(),
254            tag: tag.to_string(),
255            terminal: terminal.to_string(),
256            working_dir: working_dir.to_string(),
257            round_size,
258            waves: Vec::new(),
259            started_at: chrono::Utc::now().to_rfc3339(),
260            completed_at: None,
261        }
262    }
263
264    pub fn mark_complete(&mut self) {
265        self.completed_at = Some(chrono::Utc::now().to_rfc3339());
266    }
267
268    /// Get total tasks executed
269    pub fn total_tasks(&self) -> usize {
270        self.waves
271            .iter()
272            .flat_map(|w| &w.rounds)
273            .map(|r| r.task_ids.len())
274            .sum()
275    }
276
277    /// Get total failures
278    pub fn total_failures(&self) -> usize {
279        self.waves
280            .iter()
281            .flat_map(|w| &w.rounds)
282            .map(|r| r.failures.len())
283            .sum()
284    }
285
286    /// Get brief summary of the previous wave (if any)
287    /// This is just "what was done", not accumulated context
288    pub fn get_previous_summary(&self) -> Option<String> {
289        self.waves
290            .last()
291            .and_then(|w| w.summary.as_ref().map(|s| s.to_text()))
292    }
293
294    /// Convert to SpawnSession format for TUI compatibility
295    pub fn to_spawn_session(&self) -> SpawnSession {
296        let mut agents = Vec::new();
297
298        for wave in &self.waves {
299            for round in &wave.rounds {
300                for (idx, task_id) in round.task_ids.iter().enumerate() {
301                    let tag = round.tags.get(idx).cloned().unwrap_or_default();
302                    let failed = round.failures.contains(task_id);
303
304                    // Determine status based on wave/task state
305                    let status = if failed {
306                        AgentStatus::Failed
307                    } else if wave
308                        .validation
309                        .as_ref()
310                        .map(|v| v.all_passed)
311                        .unwrap_or(false)
312                    {
313                        AgentStatus::Completed
314                    } else {
315                        AgentStatus::Running
316                    };
317
318                    agents.push(AgentState {
319                        task_id: task_id.clone(),
320                        task_title: task_id.clone(), // Will be enriched by TUI
321                        window_name: format!("task-{}", task_id),
322                        status,
323                        started_at: wave.started_at.clone(),
324                        tag,
325                    });
326                }
327            }
328        }
329
330        SpawnSession {
331            session_name: self.session_name.clone(),
332            tag: self.tag.clone(),
333            terminal: self.terminal.clone(),
334            created_at: self.started_at.clone(),
335            working_dir: self.working_dir.clone(),
336            agents,
337        }
338    }
339}
340
341impl MonitorableSession for SwarmSession {
342    fn session_name(&self) -> &str {
343        &self.session_name
344    }
345
346    fn tag(&self) -> &str {
347        &self.tag
348    }
349
350    fn working_dir(&self) -> &str {
351        &self.working_dir
352    }
353
354    fn agents(&self) -> Vec<AgentView> {
355        let mut agents = Vec::new();
356
357        for wave in &self.waves {
358            for round in &wave.rounds {
359                for (idx, task_id) in round.task_ids.iter().enumerate() {
360                    let tag = round.tags.get(idx).cloned().unwrap_or_default();
361                    let failed = round.failures.contains(task_id);
362
363                    // Determine status based on wave/task state
364                    let status = if failed {
365                        AgentStatus::Failed
366                    } else if wave
367                        .validation
368                        .as_ref()
369                        .map(|v| v.all_passed)
370                        .unwrap_or(false)
371                    {
372                        AgentStatus::Completed
373                    } else {
374                        AgentStatus::Running
375                    };
376
377                    agents.push(AgentView {
378                        task_id: task_id.clone(),
379                        task_title: task_id.clone(), // Will be enriched by TUI
380                        window_name: format!("task-{}", task_id),
381                        status,
382                        tag,
383                    });
384                }
385            }
386        }
387
388        agents
389    }
390
391    fn waves(&self) -> Vec<WaveView> {
392        self.waves
393            .iter()
394            .map(|w| {
395                let tasks: Vec<WaveTaskView> = w
396                    .rounds
397                    .iter()
398                    .flat_map(|r| {
399                        r.task_ids.iter().map(|id| {
400                            let failed = r.failures.contains(id);
401                            let done = w
402                                .validation
403                                .as_ref()
404                                .map(|v| v.all_passed)
405                                .unwrap_or(false);
406
407                            WaveTaskView {
408                                task_id: id.clone(),
409                                task_title: id.clone(),
410                                state: if failed {
411                                    WaveTaskState::Blocked
412                                } else if done {
413                                    WaveTaskState::Done
414                                } else {
415                                    WaveTaskState::Running
416                                },
417                                complexity: None,
418                            }
419                        })
420                    })
421                    .collect();
422
423                WaveView {
424                    wave_number: w.wave_number,
425                    tasks,
426                }
427            })
428            .collect()
429    }
430
431    fn status_counts(&self) -> StatusCounts {
432        let agents = self.agents();
433        StatusCounts {
434            starting: agents
435                .iter()
436                .filter(|a| matches!(a.status, AgentStatus::Starting))
437                .count(),
438            running: agents
439                .iter()
440                .filter(|a| matches!(a.status, AgentStatus::Running))
441                .count(),
442            completed: agents
443                .iter()
444                .filter(|a| matches!(a.status, AgentStatus::Completed))
445                .count(),
446            failed: agents
447                .iter()
448                .filter(|a| matches!(a.status, AgentStatus::Failed))
449                .count(),
450        }
451    }
452}
453
454/// Get the swarm session directory
455pub fn swarm_dir(project_root: Option<&PathBuf>) -> PathBuf {
456    let root = project_root
457        .cloned()
458        .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
459    root.join(".scud").join("swarm")
460}
461
462/// Get the path to the session lock file for a given tag.
463/// Lock names include worktree ID when running inside a git worktree,
464/// allowing parallel salvo swarms on different worktrees.
465pub fn lock_file_path(project_root: Option<&PathBuf>, tag: &str) -> PathBuf {
466    let root = project_root
467        .cloned()
468        .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
469    let worktree_id = get_worktree_id(&root);
470    let lock_name = match worktree_id {
471        Some(wt_id) => format!("{}-{}.lock", tag, wt_id),
472        None => format!("{}.lock", tag),
473    };
474    swarm_dir(project_root).join(lock_name)
475}
476
477/// Detect if we're in a git worktree (has .git file, not .git directory).
478/// Returns the worktree directory name as an identifier.
479fn get_worktree_id(project_root: &std::path::Path) -> Option<String> {
480    let git_path = project_root.join(".git");
481    if git_path.is_file() {
482        // In a worktree, .git is a file pointing to the main repo
483        project_root
484            .file_name()
485            .and_then(|n| n.to_str())
486            .map(|s| s.to_string())
487    } else {
488        None
489    }
490}
491
492/// A session lock that prevents concurrent swarm sessions on the same tag.
493/// The lock is automatically released when this struct is dropped.
494pub struct SessionLock {
495    _file: fs::File,
496    path: PathBuf,
497}
498
499impl SessionLock {
500    /// Get the path to the lock file
501    pub fn path(&self) -> &PathBuf {
502        &self.path
503    }
504}
505
506impl Drop for SessionLock {
507    fn drop(&mut self) {
508        // Lock is released automatically when file is dropped
509        // Optionally remove the lock file
510        let _ = fs::remove_file(&self.path);
511    }
512}
513
514/// Acquire an exclusive session lock for a tag.
515/// Returns a SessionLock that will be released when dropped.
516/// Returns an error if another session already holds the lock.
517pub fn acquire_session_lock(project_root: Option<&PathBuf>, tag: &str) -> Result<SessionLock> {
518    use fs2::FileExt;
519
520    let dir = swarm_dir(project_root);
521    fs::create_dir_all(&dir)?;
522
523    let lock_path = lock_file_path(project_root, tag);
524    let file = fs::OpenOptions::new()
525        .write(true)
526        .create(true)
527        .truncate(true)
528        .open(&lock_path)?;
529
530    // Try to acquire exclusive lock (non-blocking)
531    file.try_lock_exclusive().map_err(|_| {
532        anyhow::anyhow!(
533            "Another swarm session is already running for tag '{}'. \
534             If this is incorrect, remove the lock file: {}",
535            tag,
536            lock_path.display()
537        )
538    })?;
539
540    // Write PID and timestamp to lock file for debugging
541    use std::io::Write;
542    let mut file = file;
543    writeln!(
544        file,
545        "pid={}\nstarted={}",
546        std::process::id(),
547        chrono::Utc::now().to_rfc3339()
548    )?;
549
550    Ok(SessionLock {
551        _file: file,
552        path: lock_path,
553    })
554}
555
556/// Get the path to a session's state file
557pub fn session_file(project_root: Option<&PathBuf>, session_name: &str) -> PathBuf {
558    swarm_dir(project_root).join(format!("{}.json", session_name))
559}
560
561/// Save swarm session state
562pub fn save_session(project_root: Option<&PathBuf>, session: &SwarmSession) -> Result<()> {
563    let dir = swarm_dir(project_root);
564    fs::create_dir_all(&dir)?;
565
566    let file = session_file(project_root, &session.session_name);
567    let json = serde_json::to_string_pretty(session)?;
568    fs::write(file, json)?;
569
570    Ok(())
571}
572
573/// Load swarm session state
574pub fn load_session(project_root: Option<&PathBuf>, session_name: &str) -> Result<SwarmSession> {
575    let file = session_file(project_root, session_name);
576    let json = fs::read_to_string(&file)?;
577    let session: SwarmSession = serde_json::from_str(&json)?;
578    Ok(session)
579}
580
581/// List all swarm sessions
582pub fn list_sessions(project_root: Option<&PathBuf>) -> Result<Vec<String>> {
583    let dir = swarm_dir(project_root);
584    if !dir.exists() {
585        return Ok(Vec::new());
586    }
587
588    let mut sessions = Vec::new();
589    for entry in fs::read_dir(dir)? {
590        let entry = entry?;
591        let path = entry.path();
592        if path.extension().map(|e| e == "json").unwrap_or(false) {
593            if let Some(stem) = path.file_stem() {
594                sessions.push(stem.to_string_lossy().to_string());
595            }
596        }
597    }
598
599    Ok(sessions)
600}
601
602// ============================================================================
603// Extension-based Agent Spawning
604// ============================================================================
605
606use std::path::Path;
607use tokio::sync::mpsc;
608
609use crate::commands::spawn::agent::generate_prompt;
610use crate::commands::spawn::terminal::Harness;
611use crate::extensions::runner::{load_agent_config, AgentEvent, AgentRunner, SpawnConfig};
612use crate::models::task::Task;
613
614/// Agent info for wave execution
615#[derive(Debug, Clone)]
616pub struct WaveAgent {
617    /// Task being executed
618    pub task: Task,
619    /// Tag/phase the task belongs to
620    pub tag: String,
621}
622
623impl WaveAgent {
624    /// Create a WaveAgent from a task and tag
625    pub fn new(task: Task, tag: impl Into<String>) -> Self {
626        Self {
627            task,
628            tag: tag.into(),
629        }
630    }
631
632    /// Create WaveAgents from a collection of (task, tag) pairs
633    ///
634    /// This is the primary conversion point from graph computation output
635    /// to extension-based spawning input.
636    pub fn from_task_pairs<I>(pairs: I) -> Vec<Self>
637    where
638        I: IntoIterator<Item = (Task, String)>,
639    {
640        pairs
641            .into_iter()
642            .map(|(task, tag)| Self::new(task, tag))
643            .collect()
644    }
645
646    /// Get the task ID
647    pub fn task_id(&self) -> &str {
648        &self.task.id
649    }
650}
651
652/// Result from wave execution
653#[derive(Debug, Clone)]
654pub struct WaveExecutionResult {
655    /// Round state with execution info
656    pub round_state: RoundState,
657    /// Results from each agent
658    pub agent_results: Vec<crate::extensions::runner::AgentResult>,
659}
660
661impl WaveExecutionResult {
662    /// Check if all agents completed successfully
663    pub fn all_succeeded(&self) -> bool {
664        self.agent_results.iter().all(|r| r.success)
665    }
666
667    /// Get task IDs that completed successfully
668    pub fn successful_task_ids(&self) -> Vec<String> {
669        self.agent_results
670            .iter()
671            .filter(|r| r.success)
672            .map(|r| r.task_id.clone())
673            .collect()
674    }
675
676    /// Get task IDs that failed
677    pub fn failed_task_ids(&self) -> Vec<String> {
678        self.agent_results
679            .iter()
680            .filter(|r| !r.success)
681            .map(|r| r.task_id.clone())
682            .collect()
683    }
684
685    /// Get total execution duration in milliseconds
686    pub fn total_duration_ms(&self) -> u64 {
687        self.agent_results
688            .iter()
689            .map(|r| r.duration_ms)
690            .max()
691            .unwrap_or(0)
692    }
693}
694
695/// Execute a wave of agents using extension-based spawning (no tmux)
696///
697/// This function spawns agents as direct subprocesses and waits for them
698/// to complete, collecting their results.
699///
700/// # Arguments
701/// * `agents` - The agents to execute in this wave
702/// * `working_dir` - Working directory for agents
703/// * `round_number` - Round number for state tracking
704/// * `default_harness` - Default harness if agent doesn't specify one
705/// * `event_callback` - Optional callback for agent events
706///
707/// # Returns
708/// WaveExecutionResult with round state and agent results
709pub async fn execute_wave_async(
710    agents: &[WaveAgent],
711    working_dir: &Path,
712    round_number: usize,
713    default_harness: Harness,
714) -> Result<WaveExecutionResult> {
715    let mut round_state = RoundState::new(round_number);
716    let mut runner = AgentRunner::new(agents.len() * 10);
717
718    // Spawn all agents
719    for agent in agents {
720        // Resolve agent config (harness, model) from agent_type
721        let (harness, model) = load_agent_config(
722            agent.task.agent_type.as_deref(),
723            default_harness,
724            None,
725            working_dir,
726        );
727
728        // Generate prompt for the agent
729        let prompt = generate_prompt(&agent.task, &agent.tag);
730
731        let config = SpawnConfig {
732            task_id: agent.task.id.clone(),
733            prompt,
734            working_dir: working_dir.to_path_buf(),
735            harness,
736            model,
737        };
738
739        match runner.spawn(config).await {
740            Ok(()) => {
741                round_state.task_ids.push(agent.task.id.clone());
742                round_state.tags.push(agent.tag.clone());
743            }
744            Err(e) => {
745                round_state.failures.push(agent.task.id.clone());
746                eprintln!("Failed to spawn agent for {}: {}", agent.task.id, e);
747            }
748        }
749    }
750
751    // Wait for all agents to complete
752    let agent_results = runner.wait_all().await;
753
754    round_state.mark_complete();
755
756    Ok(WaveExecutionResult {
757        round_state,
758        agent_results,
759    })
760}
761
762/// Execute a wave of agents with event streaming
763///
764/// Similar to execute_wave_async but allows receiving events during execution.
765///
766/// # Arguments
767/// * `agents` - The agents to execute in this wave
768/// * `working_dir` - Working directory for agents
769/// * `round_number` - Round number for state tracking
770/// * `default_harness` - Default harness if agent doesn't specify one
771/// * `event_tx` - Channel to send events to
772///
773/// # Returns
774/// WaveExecutionResult with round state and agent results
775pub async fn execute_wave_with_events(
776    agents: &[WaveAgent],
777    working_dir: &Path,
778    round_number: usize,
779    default_harness: Harness,
780    event_tx: mpsc::Sender<AgentEvent>,
781) -> Result<WaveExecutionResult> {
782    use crate::extensions::runner::spawn_agent;
783
784    let mut round_state = RoundState::new(round_number);
785    let mut handles = Vec::new();
786
787    // Spawn all agents
788    for agent in agents {
789        // Resolve agent config (harness, model) from agent_type
790        let (harness, model) = load_agent_config(
791            agent.task.agent_type.as_deref(),
792            default_harness,
793            None,
794            working_dir,
795        );
796
797        // Generate prompt for the agent
798        let prompt = generate_prompt(&agent.task, &agent.tag);
799
800        let config = SpawnConfig {
801            task_id: agent.task.id.clone(),
802            prompt,
803            working_dir: working_dir.to_path_buf(),
804            harness,
805            model,
806        };
807
808        match spawn_agent(config, event_tx.clone()).await {
809            Ok(handle) => {
810                handles.push(handle);
811                round_state.task_ids.push(agent.task.id.clone());
812                round_state.tags.push(agent.tag.clone());
813            }
814            Err(e) => {
815                round_state.failures.push(agent.task.id.clone());
816                let _ = event_tx
817                    .send(AgentEvent::SpawnFailed {
818                        task_id: agent.task.id.clone(),
819                        error: e.to_string(),
820                    })
821                    .await;
822            }
823        }
824    }
825
826    // Wait for all agents to complete
827    let mut agent_results = Vec::new();
828    for handle in handles {
829        if let Ok(result) = handle.await {
830            agent_results.push(result);
831        }
832    }
833
834    round_state.mark_complete();
835
836    Ok(WaveExecutionResult {
837        round_state,
838        agent_results,
839    })
840}
841
842/// Execute a complete wave with state tracking
843///
844/// This is the high-level interface for executing a wave of agents using
845/// extension-based spawning. It handles:
846/// - Converting task info to WaveAgent format
847/// - Spawning and managing agents
848/// - Updating wave state with results
849///
850/// # Arguments
851/// * `wave_number` - The wave number for state tracking
852/// * `task_pairs` - Iterator of (Task, tag) pairs from graph computation
853/// * `working_dir` - Working directory for agents
854/// * `default_harness` - Default harness if agent doesn't specify one
855///
856/// # Returns
857/// A tuple of (WaveState, Vec<AgentResult>) with the tracked state and raw results
858pub async fn execute_wave_with_tracking<I>(
859    wave_number: usize,
860    task_pairs: I,
861    working_dir: &Path,
862    default_harness: Harness,
863) -> Result<(WaveState, Vec<crate::extensions::runner::AgentResult>)>
864where
865    I: IntoIterator<Item = (Task, String)>,
866{
867    let agents = WaveAgent::from_task_pairs(task_pairs);
868    let result = execute_wave_async(&agents, working_dir, 0, default_harness).await?;
869
870    let wave_state = WaveState::from_execution_result(wave_number, result.clone());
871
872    Ok((wave_state, result.agent_results))
873}
874
875/// Execute multiple rounds within a wave with state tracking
876///
877/// This function handles chunking agents into rounds based on round_size
878/// and executes them sequentially, tracking state for each round.
879///
880/// # Arguments
881/// * `wave_number` - The wave number for state tracking
882/// * `task_pairs` - Iterator of (Task, tag) pairs from graph computation
883/// * `working_dir` - Working directory for agents
884/// * `round_size` - Maximum number of agents per round
885/// * `default_harness` - Default harness if agent doesn't specify one
886///
887/// # Returns
888/// WaveState with all rounds tracked
889pub async fn execute_wave_in_rounds<I>(
890    wave_number: usize,
891    task_pairs: I,
892    working_dir: &Path,
893    round_size: usize,
894    default_harness: Harness,
895) -> Result<WaveState>
896where
897    I: IntoIterator<Item = (Task, String)>,
898{
899    let agents: Vec<WaveAgent> = WaveAgent::from_task_pairs(task_pairs);
900    let mut wave_state = WaveState::new(wave_number);
901
902    for (round_idx, chunk) in agents.chunks(round_size).enumerate() {
903        let result = execute_wave_async(chunk, working_dir, round_idx, default_harness).await?;
904        wave_state.apply_execution_result(result);
905    }
906
907    wave_state.mark_complete();
908    Ok(wave_state)
909}
910
911/// Spawn a single agent using extension-based spawning (async, no tmux)
912///
913/// This is a convenience function for spawning a single agent.
914pub async fn spawn_subagent(
915    task: &Task,
916    tag: &str,
917    working_dir: &Path,
918    default_harness: Harness,
919) -> Result<crate::extensions::runner::AgentResult> {
920    let agents = vec![WaveAgent {
921        task: task.clone(),
922        tag: tag.to_string(),
923    }];
924
925    let result = execute_wave_async(&agents, working_dir, 0, default_harness).await?;
926
927    result
928        .agent_results
929        .into_iter()
930        .next()
931        .ok_or_else(|| anyhow::anyhow!("No result from agent"))
932}
933
934#[cfg(test)]
935mod tests {
936    use super::*;
937
938    #[test]
939    fn test_round_state_new() {
940        let round = RoundState::new(0);
941        assert_eq!(round.round_number, 0);
942        assert!(round.task_ids.is_empty());
943        assert!(round.completed_at.is_none());
944    }
945
946    #[test]
947    fn test_wave_state_all_task_ids() {
948        let mut wave = WaveState::new(1);
949
950        let mut round1 = RoundState::new(0);
951        round1.task_ids = vec!["task:1".to_string(), "task:2".to_string()];
952
953        let mut round2 = RoundState::new(1);
954        round2.task_ids = vec!["task:3".to_string()];
955
956        wave.rounds.push(round1);
957        wave.rounds.push(round2);
958
959        let all_ids = wave.all_task_ids();
960        assert_eq!(all_ids.len(), 3);
961        assert!(all_ids.contains(&"task:1".to_string()));
962        assert!(all_ids.contains(&"task:2".to_string()));
963        assert!(all_ids.contains(&"task:3".to_string()));
964    }
965
966    #[test]
967    fn test_swarm_session_total_tasks() {
968        let mut session = SwarmSession::new("test-session", "test-tag", "tmux", "/test/path", 5);
969
970        let mut wave = WaveState::new(1);
971        let mut round = RoundState::new(0);
972        round.task_ids = vec!["task:1".to_string(), "task:2".to_string()];
973        wave.rounds.push(round);
974        session.waves.push(wave);
975
976        assert_eq!(session.total_tasks(), 2);
977    }
978
979    #[test]
980    fn test_wave_summary_to_text() {
981        let summary = WaveSummary {
982            wave_number: 1,
983            tasks_completed: vec!["task:1".to_string(), "task:2".to_string()],
984            files_changed: vec!["src/main.rs".to_string()],
985        };
986
987        let text = summary.to_text();
988        assert!(text.contains("Wave 1"));
989        assert!(text.contains("task:1"));
990        assert!(text.contains("src/main.rs"));
991    }
992
993    #[test]
994    fn test_get_previous_summary() {
995        let mut session = SwarmSession::new("test", "tag", "tmux", "/path", 5);
996
997        // No waves yet
998        assert!(session.get_previous_summary().is_none());
999
1000        // Add wave with summary
1001        let mut wave = WaveState::new(1);
1002        wave.summary = Some(WaveSummary {
1003            wave_number: 1,
1004            tasks_completed: vec!["task:1".to_string()],
1005            files_changed: vec![],
1006        });
1007        session.waves.push(wave);
1008
1009        let summary = session.get_previous_summary();
1010        assert!(summary.is_some());
1011        assert!(summary.unwrap().contains("task:1"));
1012    }
1013
1014    #[test]
1015    fn test_session_lock_contention() {
1016        use tempfile::TempDir;
1017
1018        // Create a temporary directory for testing
1019        let temp_dir = TempDir::new().unwrap();
1020        let project_root = temp_dir.path().to_path_buf();
1021
1022        // Acquire first lock
1023        let _lock1 = acquire_session_lock(Some(&project_root), "test-tag")
1024            .expect("First lock should succeed");
1025
1026        // Try to acquire second lock for same tag while first is held
1027        let result = acquire_session_lock(Some(&project_root), "test-tag");
1028
1029        // Verify the second attempt fails and error message mentions "already running"
1030        match result {
1031            Ok(_) => panic!("Second lock should fail"),
1032            Err(e) => {
1033                let error_msg = e.to_string();
1034                assert!(
1035                    error_msg.contains("already running"),
1036                    "Error message should mention 'already running', got: {}",
1037                    error_msg
1038                );
1039            }
1040        }
1041    }
1042
1043    #[test]
1044    fn test_get_current_commit() {
1045        let result = get_current_commit();
1046
1047        // Should return Some(sha) since we're in a git repo
1048        assert!(result.is_some(), "Expected Some(sha) in a git repository");
1049
1050        let sha = result.unwrap();
1051
1052        // Verify the SHA is 40 characters long (full SHA)
1053        assert_eq!(
1054            sha.len(),
1055            40,
1056            "Expected SHA to be 40 characters long, got {}",
1057            sha.len()
1058        );
1059
1060        // Verify the SHA contains only hex characters (0-9, a-f)
1061        assert!(
1062            sha.chars().all(|c| c.is_ascii_hexdigit()),
1063            "Expected SHA to contain only hex characters, got: {}",
1064            sha
1065        );
1066    }
1067
1068    #[test]
1069    fn test_wave_agent_new() {
1070        let task = Task::new(
1071            "task:1".to_string(),
1072            "Test task".to_string(),
1073            "Description".to_string(),
1074        );
1075        let agent = WaveAgent::new(task.clone(), "test-tag");
1076
1077        assert_eq!(agent.task_id(), "task:1");
1078        assert_eq!(agent.tag, "test-tag");
1079    }
1080
1081    #[test]
1082    fn test_wave_agent_from_task_pairs() {
1083        let task1 = Task::new(
1084            "task:1".to_string(),
1085            "Task 1".to_string(),
1086            "Description".to_string(),
1087        );
1088        let task2 = Task::new(
1089            "task:2".to_string(),
1090            "Task 2".to_string(),
1091            "Description".to_string(),
1092        );
1093
1094        let pairs = vec![(task1, "tag-a".to_string()), (task2, "tag-b".to_string())];
1095
1096        let agents = WaveAgent::from_task_pairs(pairs);
1097
1098        assert_eq!(agents.len(), 2);
1099        assert_eq!(agents[0].task_id(), "task:1");
1100        assert_eq!(agents[0].tag, "tag-a");
1101        assert_eq!(agents[1].task_id(), "task:2");
1102        assert_eq!(agents[1].tag, "tag-b");
1103    }
1104
1105    #[test]
1106    fn test_wave_execution_result_helpers() {
1107        use crate::extensions::runner::AgentResult;
1108
1109        let result = WaveExecutionResult {
1110            round_state: RoundState::new(0),
1111            agent_results: vec![
1112                AgentResult {
1113                    task_id: "task:1".to_string(),
1114                    success: true,
1115                    exit_code: Some(0),
1116                    output: String::new(),
1117                    duration_ms: 1000,
1118                },
1119                AgentResult {
1120                    task_id: "task:2".to_string(),
1121                    success: false,
1122                    exit_code: Some(1),
1123                    output: String::new(),
1124                    duration_ms: 2000,
1125                },
1126            ],
1127        };
1128
1129        assert!(!result.all_succeeded());
1130        assert_eq!(result.successful_task_ids(), vec!["task:1"]);
1131        assert_eq!(result.failed_task_ids(), vec!["task:2"]);
1132        assert_eq!(result.total_duration_ms(), 2000);
1133    }
1134
1135    #[test]
1136    fn test_wave_state_from_execution_result() {
1137        use crate::extensions::runner::AgentResult;
1138
1139        let mut round_state = RoundState::new(0);
1140        round_state.task_ids = vec!["task:1".to_string()];
1141
1142        let result = WaveExecutionResult {
1143            round_state,
1144            agent_results: vec![AgentResult {
1145                task_id: "task:1".to_string(),
1146                success: true,
1147                exit_code: Some(0),
1148                output: String::new(),
1149                duration_ms: 1000,
1150            }],
1151        };
1152
1153        let wave_state = WaveState::from_execution_result(1, result);
1154
1155        assert_eq!(wave_state.wave_number, 1);
1156        assert_eq!(wave_state.rounds.len(), 1);
1157        assert_eq!(wave_state.rounds[0].task_ids, vec!["task:1"]);
1158    }
1159
1160    #[test]
1161    fn test_wave_state_apply_execution_result() {
1162        use crate::extensions::runner::AgentResult;
1163
1164        let mut wave_state = WaveState::new(1);
1165        assert!(wave_state.rounds.is_empty());
1166
1167        let mut round_state = RoundState::new(0);
1168        round_state.task_ids = vec!["task:1".to_string()];
1169
1170        let result = WaveExecutionResult {
1171            round_state,
1172            agent_results: vec![AgentResult {
1173                task_id: "task:1".to_string(),
1174                success: true,
1175                exit_code: Some(0),
1176                output: String::new(),
1177                duration_ms: 1000,
1178            }],
1179        };
1180
1181        wave_state.apply_execution_result(result);
1182
1183        assert_eq!(wave_state.rounds.len(), 1);
1184        assert_eq!(wave_state.all_task_ids(), vec!["task:1"]);
1185    }
1186}