use anyhow::Result;
use serde::{Deserialize, Serialize};
use std::fs;
use std::path::PathBuf;
use std::process::Command;
use crate::backpressure::ValidationResult;
use crate::commands::spawn::monitor::{
AgentState, AgentStatus, AgentView, MonitorableSession, SpawnSession, StatusCounts,
WaveTaskState, WaveTaskView, WaveView,
};
pub fn get_current_commit() -> Option<String> {
Command::new("git")
.args(["rev-parse", "HEAD"])
.output()
.ok()
.and_then(|output| {
if output.status.success() {
Some(String::from_utf8_lossy(&output.stdout).trim().to_string())
} else {
None
}
})
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WaveSummary {
pub wave_number: usize,
pub tasks_completed: Vec<String>,
pub files_changed: Vec<String>,
}
impl WaveSummary {
pub fn to_text(&self) -> String {
let mut lines = Vec::new();
lines.push(format!(
"Wave {} completed {} task(s):",
self.wave_number,
self.tasks_completed.len()
));
for task_id in &self.tasks_completed {
lines.push(format!(" - {}", task_id));
}
if !self.files_changed.is_empty() {
let file_summary = if self.files_changed.len() <= 5 {
self.files_changed.join(", ")
} else {
format!(
"{} and {} more",
self.files_changed[..5].join(", "),
self.files_changed.len() - 5
)
};
lines.push(format!("Files changed: {}", file_summary));
}
lines.join("\n")
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RoundState {
pub round_number: usize,
pub task_ids: Vec<String>,
pub tags: Vec<String>,
pub failures: Vec<String>,
pub started_at: String,
pub completed_at: Option<String>,
}
impl RoundState {
pub fn new(round_number: usize) -> Self {
Self {
round_number,
task_ids: Vec::new(),
tags: Vec::new(),
failures: Vec::new(),
started_at: chrono::Utc::now().to_rfc3339(),
completed_at: None,
}
}
pub fn mark_complete(&mut self) {
self.completed_at = Some(chrono::Utc::now().to_rfc3339());
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ReviewState {
pub reviewed_tasks: Vec<String>,
pub all_passed: bool,
pub tasks_needing_improvement: Vec<String>,
pub completed_at: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RepairAttempt {
pub attempt_number: usize,
pub attributed_tasks: Vec<String>,
pub cleared_tasks: Vec<String>,
pub attribution_confidence: String,
pub validation_passed: bool,
pub completed_at: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WaveState {
pub wave_number: usize,
pub rounds: Vec<RoundState>,
pub validation: Option<ValidationResult>,
pub summary: Option<WaveSummary>,
#[serde(default)]
pub start_commit: Option<String>,
#[serde(default)]
pub review: Option<ReviewState>,
#[serde(default)]
pub repairs: Vec<RepairAttempt>,
pub started_at: String,
pub completed_at: Option<String>,
}
impl WaveState {
pub fn new(wave_number: usize) -> Self {
Self {
wave_number,
rounds: Vec::new(),
validation: None,
summary: None,
start_commit: get_current_commit(),
review: None,
repairs: Vec::new(),
started_at: chrono::Utc::now().to_rfc3339(),
completed_at: None,
}
}
pub fn mark_complete(&mut self) {
self.completed_at = Some(chrono::Utc::now().to_rfc3339());
}
pub fn from_execution_result(wave_number: usize, result: WaveExecutionResult) -> Self {
let mut state = Self::new(wave_number);
state.rounds.push(result.round_state);
state
}
pub fn apply_execution_result(&mut self, result: WaveExecutionResult) {
self.rounds.push(result.round_state);
}
pub fn all_task_ids(&self) -> Vec<String> {
self.rounds
.iter()
.flat_map(|r| r.task_ids.clone())
.collect()
}
pub fn task_tags(&self) -> Vec<(String, String)> {
self.rounds
.iter()
.flat_map(|r| {
r.task_ids
.iter()
.zip(r.tags.iter())
.map(|(id, tag)| (id.clone(), tag.clone()))
})
.collect()
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SwarmSession {
pub session_name: String,
pub tag: String,
pub terminal: String,
pub working_dir: String,
pub round_size: usize,
pub waves: Vec<WaveState>,
pub started_at: String,
pub completed_at: Option<String>,
}
impl SwarmSession {
pub fn new(
session_name: &str,
tag: &str,
terminal: &str,
working_dir: &str,
round_size: usize,
) -> Self {
Self {
session_name: session_name.to_string(),
tag: tag.to_string(),
terminal: terminal.to_string(),
working_dir: working_dir.to_string(),
round_size,
waves: Vec::new(),
started_at: chrono::Utc::now().to_rfc3339(),
completed_at: None,
}
}
pub fn mark_complete(&mut self) {
self.completed_at = Some(chrono::Utc::now().to_rfc3339());
}
pub fn total_tasks(&self) -> usize {
self.waves
.iter()
.flat_map(|w| &w.rounds)
.map(|r| r.task_ids.len())
.sum()
}
pub fn total_failures(&self) -> usize {
self.waves
.iter()
.flat_map(|w| &w.rounds)
.map(|r| r.failures.len())
.sum()
}
pub fn get_previous_summary(&self) -> Option<String> {
self.waves
.last()
.and_then(|w| w.summary.as_ref().map(|s| s.to_text()))
}
pub fn to_spawn_session(&self) -> SpawnSession {
let mut agents = Vec::new();
for wave in &self.waves {
for round in &wave.rounds {
for (idx, task_id) in round.task_ids.iter().enumerate() {
let tag = round.tags.get(idx).cloned().unwrap_or_default();
let failed = round.failures.contains(task_id);
let status = if failed {
AgentStatus::Failed
} else if wave
.validation
.as_ref()
.map(|v| v.all_passed)
.unwrap_or(false)
{
AgentStatus::Completed
} else {
AgentStatus::Running
};
agents.push(AgentState {
task_id: task_id.clone(),
task_title: task_id.clone(), window_name: format!("task-{}", task_id),
status,
started_at: wave.started_at.clone(),
tag,
});
}
}
}
SpawnSession {
session_name: self.session_name.clone(),
tag: self.tag.clone(),
terminal: self.terminal.clone(),
created_at: self.started_at.clone(),
working_dir: self.working_dir.clone(),
agents,
}
}
}
impl MonitorableSession for SwarmSession {
fn session_name(&self) -> &str {
&self.session_name
}
fn tag(&self) -> &str {
&self.tag
}
fn working_dir(&self) -> &str {
&self.working_dir
}
fn agents(&self) -> Vec<AgentView> {
let mut agents = Vec::new();
for wave in &self.waves {
for round in &wave.rounds {
for (idx, task_id) in round.task_ids.iter().enumerate() {
let tag = round.tags.get(idx).cloned().unwrap_or_default();
let failed = round.failures.contains(task_id);
let status = if failed {
AgentStatus::Failed
} else if wave
.validation
.as_ref()
.map(|v| v.all_passed)
.unwrap_or(false)
{
AgentStatus::Completed
} else {
AgentStatus::Running
};
agents.push(AgentView {
task_id: task_id.clone(),
task_title: task_id.clone(), window_name: format!("task-{}", task_id),
status,
tag,
});
}
}
}
agents
}
fn waves(&self) -> Vec<WaveView> {
self.waves
.iter()
.map(|w| {
let tasks: Vec<WaveTaskView> = w
.rounds
.iter()
.flat_map(|r| {
r.task_ids.iter().map(|id| {
let failed = r.failures.contains(id);
let done = w
.validation
.as_ref()
.map(|v| v.all_passed)
.unwrap_or(false);
WaveTaskView {
task_id: id.clone(),
task_title: id.clone(),
state: if failed {
WaveTaskState::Blocked
} else if done {
WaveTaskState::Done
} else {
WaveTaskState::Running
},
complexity: None,
}
})
})
.collect();
WaveView {
wave_number: w.wave_number,
tasks,
}
})
.collect()
}
fn status_counts(&self) -> StatusCounts {
let agents = self.agents();
StatusCounts {
starting: agents
.iter()
.filter(|a| matches!(a.status, AgentStatus::Starting))
.count(),
running: agents
.iter()
.filter(|a| matches!(a.status, AgentStatus::Running))
.count(),
completed: agents
.iter()
.filter(|a| matches!(a.status, AgentStatus::Completed))
.count(),
failed: agents
.iter()
.filter(|a| matches!(a.status, AgentStatus::Failed))
.count(),
}
}
}
pub fn swarm_dir(project_root: Option<&PathBuf>) -> PathBuf {
let root = project_root
.cloned()
.unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
root.join(".scud").join("swarm")
}
pub fn lock_file_path(project_root: Option<&PathBuf>, tag: &str) -> PathBuf {
let root = project_root
.cloned()
.unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
let worktree_id = get_worktree_id(&root);
let lock_name = match worktree_id {
Some(wt_id) => format!("{}-{}.lock", tag, wt_id),
None => format!("{}.lock", tag),
};
swarm_dir(project_root).join(lock_name)
}
fn get_worktree_id(project_root: &std::path::Path) -> Option<String> {
let git_path = project_root.join(".git");
if git_path.is_file() {
project_root
.file_name()
.and_then(|n| n.to_str())
.map(|s| s.to_string())
} else {
None
}
}
pub struct SessionLock {
_file: fs::File,
path: PathBuf,
}
impl SessionLock {
pub fn path(&self) -> &PathBuf {
&self.path
}
}
impl Drop for SessionLock {
fn drop(&mut self) {
let _ = fs::remove_file(&self.path);
}
}
pub fn acquire_session_lock(project_root: Option<&PathBuf>, tag: &str) -> Result<SessionLock> {
use fs2::FileExt;
let dir = swarm_dir(project_root);
fs::create_dir_all(&dir)?;
let lock_path = lock_file_path(project_root, tag);
let file = fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&lock_path)?;
file.try_lock_exclusive().map_err(|_| {
anyhow::anyhow!(
"Another swarm session is already running for tag '{}'. \
If this is incorrect, remove the lock file: {}",
tag,
lock_path.display()
)
})?;
use std::io::Write;
let mut file = file;
writeln!(
file,
"pid={}\nstarted={}",
std::process::id(),
chrono::Utc::now().to_rfc3339()
)?;
Ok(SessionLock {
_file: file,
path: lock_path,
})
}
pub fn session_file(project_root: Option<&PathBuf>, session_name: &str) -> PathBuf {
swarm_dir(project_root).join(format!("{}.json", session_name))
}
pub fn save_session(project_root: Option<&PathBuf>, session: &SwarmSession) -> Result<()> {
let dir = swarm_dir(project_root);
fs::create_dir_all(&dir)?;
let file = session_file(project_root, &session.session_name);
let json = serde_json::to_string_pretty(session)?;
fs::write(file, json)?;
Ok(())
}
pub fn load_session(project_root: Option<&PathBuf>, session_name: &str) -> Result<SwarmSession> {
let file = session_file(project_root, session_name);
let json = fs::read_to_string(&file)?;
let session: SwarmSession = serde_json::from_str(&json)?;
Ok(session)
}
pub fn list_sessions(project_root: Option<&PathBuf>) -> Result<Vec<String>> {
let dir = swarm_dir(project_root);
if !dir.exists() {
return Ok(Vec::new());
}
let mut sessions = Vec::new();
for entry in fs::read_dir(dir)? {
let entry = entry?;
let path = entry.path();
if path.extension().map(|e| e == "json").unwrap_or(false) {
if let Some(stem) = path.file_stem() {
sessions.push(stem.to_string_lossy().to_string());
}
}
}
Ok(sessions)
}
use std::path::Path;
use tokio::sync::mpsc;
use crate::commands::spawn::agent::generate_prompt;
use crate::commands::spawn::terminal::Harness;
use crate::extensions::runner::{load_agent_config, AgentEvent, AgentRunner, SpawnConfig};
use crate::models::task::Task;
#[derive(Debug, Clone)]
pub struct WaveAgent {
pub task: Task,
pub tag: String,
}
impl WaveAgent {
pub fn new(task: Task, tag: impl Into<String>) -> Self {
Self {
task,
tag: tag.into(),
}
}
pub fn from_task_pairs<I>(pairs: I) -> Vec<Self>
where
I: IntoIterator<Item = (Task, String)>,
{
pairs
.into_iter()
.map(|(task, tag)| Self::new(task, tag))
.collect()
}
pub fn task_id(&self) -> &str {
&self.task.id
}
}
#[derive(Debug, Clone)]
pub struct WaveExecutionResult {
pub round_state: RoundState,
pub agent_results: Vec<crate::extensions::runner::AgentResult>,
}
impl WaveExecutionResult {
pub fn all_succeeded(&self) -> bool {
self.agent_results.iter().all(|r| r.success)
}
pub fn successful_task_ids(&self) -> Vec<String> {
self.agent_results
.iter()
.filter(|r| r.success)
.map(|r| r.task_id.clone())
.collect()
}
pub fn failed_task_ids(&self) -> Vec<String> {
self.agent_results
.iter()
.filter(|r| !r.success)
.map(|r| r.task_id.clone())
.collect()
}
pub fn total_duration_ms(&self) -> u64 {
self.agent_results
.iter()
.map(|r| r.duration_ms)
.max()
.unwrap_or(0)
}
}
pub async fn execute_wave_async(
agents: &[WaveAgent],
working_dir: &Path,
round_number: usize,
default_harness: Harness,
) -> Result<WaveExecutionResult> {
let mut round_state = RoundState::new(round_number);
let mut runner = AgentRunner::new(agents.len() * 10);
for agent in agents {
let (harness, model) = load_agent_config(
agent.task.agent_type.as_deref(),
default_harness,
None,
working_dir,
);
let prompt = generate_prompt(&agent.task, &agent.tag);
let config = SpawnConfig {
task_id: agent.task.id.clone(),
prompt,
working_dir: working_dir.to_path_buf(),
harness,
model,
};
match runner.spawn(config).await {
Ok(()) => {
round_state.task_ids.push(agent.task.id.clone());
round_state.tags.push(agent.tag.clone());
}
Err(e) => {
round_state.failures.push(agent.task.id.clone());
eprintln!("Failed to spawn agent for {}: {}", agent.task.id, e);
}
}
}
let agent_results = runner.wait_all().await;
round_state.mark_complete();
Ok(WaveExecutionResult {
round_state,
agent_results,
})
}
pub async fn execute_wave_with_events(
agents: &[WaveAgent],
working_dir: &Path,
round_number: usize,
default_harness: Harness,
event_tx: mpsc::Sender<AgentEvent>,
) -> Result<WaveExecutionResult> {
use crate::extensions::runner::spawn_agent;
let mut round_state = RoundState::new(round_number);
let mut handles = Vec::new();
for agent in agents {
let (harness, model) = load_agent_config(
agent.task.agent_type.as_deref(),
default_harness,
None,
working_dir,
);
let prompt = generate_prompt(&agent.task, &agent.tag);
let config = SpawnConfig {
task_id: agent.task.id.clone(),
prompt,
working_dir: working_dir.to_path_buf(),
harness,
model,
};
match spawn_agent(config, event_tx.clone()).await {
Ok(handle) => {
handles.push(handle);
round_state.task_ids.push(agent.task.id.clone());
round_state.tags.push(agent.tag.clone());
}
Err(e) => {
round_state.failures.push(agent.task.id.clone());
let _ = event_tx
.send(AgentEvent::SpawnFailed {
task_id: agent.task.id.clone(),
error: e.to_string(),
})
.await;
}
}
}
let mut agent_results = Vec::new();
for handle in handles {
if let Ok(result) = handle.await {
agent_results.push(result);
}
}
round_state.mark_complete();
Ok(WaveExecutionResult {
round_state,
agent_results,
})
}
pub async fn execute_wave_with_tracking<I>(
wave_number: usize,
task_pairs: I,
working_dir: &Path,
default_harness: Harness,
) -> Result<(WaveState, Vec<crate::extensions::runner::AgentResult>)>
where
I: IntoIterator<Item = (Task, String)>,
{
let agents = WaveAgent::from_task_pairs(task_pairs);
let result = execute_wave_async(&agents, working_dir, 0, default_harness).await?;
let wave_state = WaveState::from_execution_result(wave_number, result.clone());
Ok((wave_state, result.agent_results))
}
pub async fn execute_wave_in_rounds<I>(
wave_number: usize,
task_pairs: I,
working_dir: &Path,
round_size: usize,
default_harness: Harness,
) -> Result<WaveState>
where
I: IntoIterator<Item = (Task, String)>,
{
let agents: Vec<WaveAgent> = WaveAgent::from_task_pairs(task_pairs);
let mut wave_state = WaveState::new(wave_number);
for (round_idx, chunk) in agents.chunks(round_size).enumerate() {
let result = execute_wave_async(chunk, working_dir, round_idx, default_harness).await?;
wave_state.apply_execution_result(result);
}
wave_state.mark_complete();
Ok(wave_state)
}
pub async fn spawn_subagent(
task: &Task,
tag: &str,
working_dir: &Path,
default_harness: Harness,
) -> Result<crate::extensions::runner::AgentResult> {
let agents = vec![WaveAgent {
task: task.clone(),
tag: tag.to_string(),
}];
let result = execute_wave_async(&agents, working_dir, 0, default_harness).await?;
result
.agent_results
.into_iter()
.next()
.ok_or_else(|| anyhow::anyhow!("No result from agent"))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_round_state_new() {
let round = RoundState::new(0);
assert_eq!(round.round_number, 0);
assert!(round.task_ids.is_empty());
assert!(round.completed_at.is_none());
}
#[test]
fn test_wave_state_all_task_ids() {
let mut wave = WaveState::new(1);
let mut round1 = RoundState::new(0);
round1.task_ids = vec!["task:1".to_string(), "task:2".to_string()];
let mut round2 = RoundState::new(1);
round2.task_ids = vec!["task:3".to_string()];
wave.rounds.push(round1);
wave.rounds.push(round2);
let all_ids = wave.all_task_ids();
assert_eq!(all_ids.len(), 3);
assert!(all_ids.contains(&"task:1".to_string()));
assert!(all_ids.contains(&"task:2".to_string()));
assert!(all_ids.contains(&"task:3".to_string()));
}
#[test]
fn test_swarm_session_total_tasks() {
let mut session = SwarmSession::new("test-session", "test-tag", "tmux", "/test/path", 5);
let mut wave = WaveState::new(1);
let mut round = RoundState::new(0);
round.task_ids = vec!["task:1".to_string(), "task:2".to_string()];
wave.rounds.push(round);
session.waves.push(wave);
assert_eq!(session.total_tasks(), 2);
}
#[test]
fn test_wave_summary_to_text() {
let summary = WaveSummary {
wave_number: 1,
tasks_completed: vec!["task:1".to_string(), "task:2".to_string()],
files_changed: vec!["src/main.rs".to_string()],
};
let text = summary.to_text();
assert!(text.contains("Wave 1"));
assert!(text.contains("task:1"));
assert!(text.contains("src/main.rs"));
}
#[test]
fn test_get_previous_summary() {
let mut session = SwarmSession::new("test", "tag", "tmux", "/path", 5);
assert!(session.get_previous_summary().is_none());
let mut wave = WaveState::new(1);
wave.summary = Some(WaveSummary {
wave_number: 1,
tasks_completed: vec!["task:1".to_string()],
files_changed: vec![],
});
session.waves.push(wave);
let summary = session.get_previous_summary();
assert!(summary.is_some());
assert!(summary.unwrap().contains("task:1"));
}
#[test]
fn test_session_lock_contention() {
use tempfile::TempDir;
let temp_dir = TempDir::new().unwrap();
let project_root = temp_dir.path().to_path_buf();
let _lock1 = acquire_session_lock(Some(&project_root), "test-tag")
.expect("First lock should succeed");
let result = acquire_session_lock(Some(&project_root), "test-tag");
match result {
Ok(_) => panic!("Second lock should fail"),
Err(e) => {
let error_msg = e.to_string();
assert!(
error_msg.contains("already running"),
"Error message should mention 'already running', got: {}",
error_msg
);
}
}
}
#[test]
fn test_get_current_commit() {
let result = get_current_commit();
assert!(result.is_some(), "Expected Some(sha) in a git repository");
let sha = result.unwrap();
assert_eq!(
sha.len(),
40,
"Expected SHA to be 40 characters long, got {}",
sha.len()
);
assert!(
sha.chars().all(|c| c.is_ascii_hexdigit()),
"Expected SHA to contain only hex characters, got: {}",
sha
);
}
#[test]
fn test_wave_agent_new() {
let task = Task::new(
"task:1".to_string(),
"Test task".to_string(),
"Description".to_string(),
);
let agent = WaveAgent::new(task.clone(), "test-tag");
assert_eq!(agent.task_id(), "task:1");
assert_eq!(agent.tag, "test-tag");
}
#[test]
fn test_wave_agent_from_task_pairs() {
let task1 = Task::new(
"task:1".to_string(),
"Task 1".to_string(),
"Description".to_string(),
);
let task2 = Task::new(
"task:2".to_string(),
"Task 2".to_string(),
"Description".to_string(),
);
let pairs = vec![(task1, "tag-a".to_string()), (task2, "tag-b".to_string())];
let agents = WaveAgent::from_task_pairs(pairs);
assert_eq!(agents.len(), 2);
assert_eq!(agents[0].task_id(), "task:1");
assert_eq!(agents[0].tag, "tag-a");
assert_eq!(agents[1].task_id(), "task:2");
assert_eq!(agents[1].tag, "tag-b");
}
#[test]
fn test_wave_execution_result_helpers() {
use crate::extensions::runner::AgentResult;
let result = WaveExecutionResult {
round_state: RoundState::new(0),
agent_results: vec![
AgentResult {
task_id: "task:1".to_string(),
success: true,
exit_code: Some(0),
output: String::new(),
duration_ms: 1000,
},
AgentResult {
task_id: "task:2".to_string(),
success: false,
exit_code: Some(1),
output: String::new(),
duration_ms: 2000,
},
],
};
assert!(!result.all_succeeded());
assert_eq!(result.successful_task_ids(), vec!["task:1"]);
assert_eq!(result.failed_task_ids(), vec!["task:2"]);
assert_eq!(result.total_duration_ms(), 2000);
}
#[test]
fn test_wave_state_from_execution_result() {
use crate::extensions::runner::AgentResult;
let mut round_state = RoundState::new(0);
round_state.task_ids = vec!["task:1".to_string()];
let result = WaveExecutionResult {
round_state,
agent_results: vec![AgentResult {
task_id: "task:1".to_string(),
success: true,
exit_code: Some(0),
output: String::new(),
duration_ms: 1000,
}],
};
let wave_state = WaveState::from_execution_result(1, result);
assert_eq!(wave_state.wave_number, 1);
assert_eq!(wave_state.rounds.len(), 1);
assert_eq!(wave_state.rounds[0].task_ids, vec!["task:1"]);
}
#[test]
fn test_wave_state_apply_execution_result() {
use crate::extensions::runner::AgentResult;
let mut wave_state = WaveState::new(1);
assert!(wave_state.rounds.is_empty());
let mut round_state = RoundState::new(0);
round_state.task_ids = vec!["task:1".to_string()];
let result = WaveExecutionResult {
round_state,
agent_results: vec![AgentResult {
task_id: "task:1".to_string(),
success: true,
exit_code: Some(0),
output: String::new(),
duration_ms: 1000,
}],
};
wave_state.apply_execution_result(result);
assert_eq!(wave_state.rounds.len(), 1);
assert_eq!(wave_state.all_task_ids(), vec!["task:1"]);
}
}