1use anyhow::Result;
10use serde::{Deserialize, Serialize};
11use std::fs;
12use std::path::PathBuf;
13use std::process::Command;
14
15use crate::backpressure::ValidationResult;
16
17pub fn get_current_commit() -> Option<String> {
19 Command::new("git")
20 .args(["rev-parse", "HEAD"])
21 .output()
22 .ok()
23 .and_then(|output| {
24 if output.status.success() {
25 Some(String::from_utf8_lossy(&output.stdout).trim().to_string())
26 } else {
27 None
28 }
29 })
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct WaveSummary {
36 pub wave_number: usize,
38 pub tasks_completed: Vec<String>,
40 pub files_changed: Vec<String>,
42}
43
44impl WaveSummary {
45 pub fn to_text(&self) -> String {
47 let mut lines = Vec::new();
48
49 lines.push(format!(
50 "Wave {} completed {} task(s):",
51 self.wave_number,
52 self.tasks_completed.len()
53 ));
54
55 for task_id in &self.tasks_completed {
56 lines.push(format!(" - {}", task_id));
57 }
58
59 if !self.files_changed.is_empty() {
60 let file_summary = if self.files_changed.len() <= 5 {
61 self.files_changed.join(", ")
62 } else {
63 format!(
64 "{} and {} more",
65 self.files_changed[..5].join(", "),
66 self.files_changed.len() - 5
67 )
68 };
69 lines.push(format!("Files changed: {}", file_summary));
70 }
71
72 lines.join("\n")
73 }
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct RoundState {
79 pub round_number: usize,
81 pub task_ids: Vec<String>,
83 pub tags: Vec<String>,
85 pub failures: Vec<String>,
87 pub started_at: String,
89 pub completed_at: Option<String>,
91}
92
93impl RoundState {
94 pub fn new(round_number: usize) -> Self {
95 Self {
96 round_number,
97 task_ids: Vec::new(),
98 tags: Vec::new(),
99 failures: Vec::new(),
100 started_at: chrono::Utc::now().to_rfc3339(),
101 completed_at: None,
102 }
103 }
104
105 pub fn mark_complete(&mut self) {
106 self.completed_at = Some(chrono::Utc::now().to_rfc3339());
107 }
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct ReviewState {
113 pub reviewed_tasks: Vec<String>,
115 pub all_passed: bool,
117 pub tasks_needing_improvement: Vec<String>,
119 pub completed_at: String,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct RepairAttempt {
126 pub attempt_number: usize,
128 pub attributed_tasks: Vec<String>,
130 pub cleared_tasks: Vec<String>,
132 pub attribution_confidence: String,
134 pub validation_passed: bool,
136 pub completed_at: String,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct WaveState {
143 pub wave_number: usize,
145 pub rounds: Vec<RoundState>,
147 pub validation: Option<ValidationResult>,
149 pub summary: Option<WaveSummary>,
151 #[serde(default)]
153 pub start_commit: Option<String>,
154 #[serde(default)]
156 pub review: Option<ReviewState>,
157 #[serde(default)]
159 pub repairs: Vec<RepairAttempt>,
160 pub started_at: String,
162 pub completed_at: Option<String>,
164}
165
166impl WaveState {
167 pub fn new(wave_number: usize) -> Self {
168 Self {
169 wave_number,
170 rounds: Vec::new(),
171 validation: None,
172 summary: None,
173 start_commit: get_current_commit(),
174 review: None,
175 repairs: Vec::new(),
176 started_at: chrono::Utc::now().to_rfc3339(),
177 completed_at: None,
178 }
179 }
180
181 pub fn mark_complete(&mut self) {
182 self.completed_at = Some(chrono::Utc::now().to_rfc3339());
183 }
184
185 pub fn all_task_ids(&self) -> Vec<String> {
187 self.rounds
188 .iter()
189 .flat_map(|r| r.task_ids.clone())
190 .collect()
191 }
192
193 pub fn task_tags(&self) -> Vec<(String, String)> {
195 self.rounds
196 .iter()
197 .flat_map(|r| {
198 r.task_ids
199 .iter()
200 .zip(r.tags.iter())
201 .map(|(id, tag)| (id.clone(), tag.clone()))
202 })
203 .collect()
204 }
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct SwarmSession {
210 pub session_name: String,
212 pub tag: String,
214 pub terminal: String,
216 pub working_dir: String,
218 pub round_size: usize,
220 pub waves: Vec<WaveState>,
222 pub started_at: String,
224 pub completed_at: Option<String>,
226}
227
228impl SwarmSession {
229 pub fn new(
230 session_name: &str,
231 tag: &str,
232 terminal: &str,
233 working_dir: &str,
234 round_size: usize,
235 ) -> Self {
236 Self {
237 session_name: session_name.to_string(),
238 tag: tag.to_string(),
239 terminal: terminal.to_string(),
240 working_dir: working_dir.to_string(),
241 round_size,
242 waves: Vec::new(),
243 started_at: chrono::Utc::now().to_rfc3339(),
244 completed_at: None,
245 }
246 }
247
248 pub fn mark_complete(&mut self) {
249 self.completed_at = Some(chrono::Utc::now().to_rfc3339());
250 }
251
252 pub fn total_tasks(&self) -> usize {
254 self.waves
255 .iter()
256 .flat_map(|w| &w.rounds)
257 .map(|r| r.task_ids.len())
258 .sum()
259 }
260
261 pub fn total_failures(&self) -> usize {
263 self.waves
264 .iter()
265 .flat_map(|w| &w.rounds)
266 .map(|r| r.failures.len())
267 .sum()
268 }
269
270 pub fn get_previous_summary(&self) -> Option<String> {
273 self.waves
274 .last()
275 .and_then(|w| w.summary.as_ref().map(|s| s.to_text()))
276 }
277}
278
279pub fn swarm_dir(project_root: Option<&PathBuf>) -> PathBuf {
281 let root = project_root
282 .cloned()
283 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
284 root.join(".scud").join("swarm")
285}
286
287pub fn lock_file_path(project_root: Option<&PathBuf>, tag: &str) -> PathBuf {
291 let root = project_root
292 .cloned()
293 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
294 let worktree_id = get_worktree_id(&root);
295 let lock_name = match worktree_id {
296 Some(wt_id) => format!("{}-{}.lock", tag, wt_id),
297 None => format!("{}.lock", tag),
298 };
299 swarm_dir(project_root).join(lock_name)
300}
301
302fn get_worktree_id(project_root: &std::path::Path) -> Option<String> {
305 let git_path = project_root.join(".git");
306 if git_path.is_file() {
307 project_root
309 .file_name()
310 .and_then(|n| n.to_str())
311 .map(|s| s.to_string())
312 } else {
313 None
314 }
315}
316
317pub struct SessionLock {
320 _file: fs::File,
321 path: PathBuf,
322}
323
324impl SessionLock {
325 pub fn path(&self) -> &PathBuf {
327 &self.path
328 }
329}
330
331impl Drop for SessionLock {
332 fn drop(&mut self) {
333 let _ = fs::remove_file(&self.path);
336 }
337}
338
339pub fn acquire_session_lock(project_root: Option<&PathBuf>, tag: &str) -> Result<SessionLock> {
343 use fs2::FileExt;
344
345 let dir = swarm_dir(project_root);
346 fs::create_dir_all(&dir)?;
347
348 let lock_path = lock_file_path(project_root, tag);
349 let file = fs::OpenOptions::new()
350 .write(true)
351 .create(true)
352 .truncate(true)
353 .open(&lock_path)?;
354
355 file.try_lock_exclusive().map_err(|_| {
357 anyhow::anyhow!(
358 "Another swarm session is already running for tag '{}'. \
359 If this is incorrect, remove the lock file: {}",
360 tag,
361 lock_path.display()
362 )
363 })?;
364
365 use std::io::Write;
367 let mut file = file;
368 writeln!(
369 file,
370 "pid={}\nstarted={}",
371 std::process::id(),
372 chrono::Utc::now().to_rfc3339()
373 )?;
374
375 Ok(SessionLock {
376 _file: file,
377 path: lock_path,
378 })
379}
380
381pub fn session_file(project_root: Option<&PathBuf>, session_name: &str) -> PathBuf {
383 swarm_dir(project_root).join(format!("{}.json", session_name))
384}
385
386pub fn save_session(project_root: Option<&PathBuf>, session: &SwarmSession) -> Result<()> {
388 let dir = swarm_dir(project_root);
389 fs::create_dir_all(&dir)?;
390
391 let file = session_file(project_root, &session.session_name);
392 let json = serde_json::to_string_pretty(session)?;
393 fs::write(file, json)?;
394
395 Ok(())
396}
397
398pub fn load_session(project_root: Option<&PathBuf>, session_name: &str) -> Result<SwarmSession> {
400 let file = session_file(project_root, session_name);
401 let json = fs::read_to_string(&file)?;
402 let session: SwarmSession = serde_json::from_str(&json)?;
403 Ok(session)
404}
405
406pub fn list_sessions(project_root: Option<&PathBuf>) -> Result<Vec<String>> {
408 let dir = swarm_dir(project_root);
409 if !dir.exists() {
410 return Ok(Vec::new());
411 }
412
413 let mut sessions = Vec::new();
414 for entry in fs::read_dir(dir)? {
415 let entry = entry?;
416 let path = entry.path();
417 if path.extension().map(|e| e == "json").unwrap_or(false) {
418 if let Some(stem) = path.file_stem() {
419 sessions.push(stem.to_string_lossy().to_string());
420 }
421 }
422 }
423
424 Ok(sessions)
425}
426
427use std::path::Path;
432use tokio::sync::mpsc;
433
434use crate::commands::spawn::agent::generate_prompt;
435use crate::commands::spawn::terminal::Harness;
436use crate::extensions::runner::{load_agent_config, AgentEvent, AgentRunner, SpawnConfig};
437use crate::models::task::Task;
438
439#[derive(Debug, Clone)]
441pub struct WaveAgent {
442 pub task: Task,
444 pub tag: String,
446}
447
448impl WaveAgent {
449 pub fn new(task: Task, tag: impl Into<String>) -> Self {
451 Self {
452 task,
453 tag: tag.into(),
454 }
455 }
456
457 pub fn from_task_pairs<I>(pairs: I) -> Vec<Self>
462 where
463 I: IntoIterator<Item = (Task, String)>,
464 {
465 pairs.into_iter().map(|(task, tag)| Self::new(task, tag)).collect()
466 }
467
468 pub fn task_id(&self) -> &str {
470 &self.task.id
471 }
472}
473
474#[derive(Debug, Clone)]
476pub struct WaveExecutionResult {
477 pub round_state: RoundState,
479 pub agent_results: Vec<crate::extensions::runner::AgentResult>,
481}
482
483impl WaveExecutionResult {
484 pub fn all_succeeded(&self) -> bool {
486 self.agent_results.iter().all(|r| r.success)
487 }
488
489 pub fn successful_task_ids(&self) -> Vec<String> {
491 self.agent_results
492 .iter()
493 .filter(|r| r.success)
494 .map(|r| r.task_id.clone())
495 .collect()
496 }
497
498 pub fn failed_task_ids(&self) -> Vec<String> {
500 self.agent_results
501 .iter()
502 .filter(|r| !r.success)
503 .map(|r| r.task_id.clone())
504 .collect()
505 }
506
507 pub fn total_duration_ms(&self) -> u64 {
509 self.agent_results.iter().map(|r| r.duration_ms).max().unwrap_or(0)
510 }
511}
512
513impl WaveState {
514 pub fn apply_execution_result(&mut self, result: WaveExecutionResult) {
519 self.rounds.push(result.round_state);
520 }
521
522 pub fn from_execution_result(wave_number: usize, result: WaveExecutionResult) -> Self {
524 let mut state = Self::new(wave_number);
525 state.apply_execution_result(result);
526 state
527 }
528}
529
530pub async fn execute_wave_async(
545 agents: &[WaveAgent],
546 working_dir: &Path,
547 round_number: usize,
548 default_harness: Harness,
549) -> Result<WaveExecutionResult> {
550 let mut round_state = RoundState::new(round_number);
551 let mut runner = AgentRunner::new(agents.len() * 10);
552
553 for agent in agents {
555 let (harness, model) = load_agent_config(
557 agent.task.agent_type.as_deref(),
558 default_harness,
559 None,
560 working_dir,
561 );
562
563 let prompt = generate_prompt(&agent.task, &agent.tag);
565
566 let config = SpawnConfig {
567 task_id: agent.task.id.clone(),
568 prompt,
569 working_dir: working_dir.to_path_buf(),
570 harness,
571 model,
572 };
573
574 match runner.spawn(config).await {
575 Ok(()) => {
576 round_state.task_ids.push(agent.task.id.clone());
577 round_state.tags.push(agent.tag.clone());
578 }
579 Err(e) => {
580 round_state.failures.push(agent.task.id.clone());
581 eprintln!("Failed to spawn agent for {}: {}", agent.task.id, e);
582 }
583 }
584 }
585
586 let agent_results = runner.wait_all().await;
588
589 round_state.mark_complete();
590
591 Ok(WaveExecutionResult {
592 round_state,
593 agent_results,
594 })
595}
596
597pub async fn execute_wave_with_events(
611 agents: &[WaveAgent],
612 working_dir: &Path,
613 round_number: usize,
614 default_harness: Harness,
615 event_tx: mpsc::Sender<AgentEvent>,
616) -> Result<WaveExecutionResult> {
617 use crate::extensions::runner::spawn_agent;
618
619 let mut round_state = RoundState::new(round_number);
620 let mut handles = Vec::new();
621
622 for agent in agents {
624 let (harness, model) = load_agent_config(
626 agent.task.agent_type.as_deref(),
627 default_harness,
628 None,
629 working_dir,
630 );
631
632 let prompt = generate_prompt(&agent.task, &agent.tag);
634
635 let config = SpawnConfig {
636 task_id: agent.task.id.clone(),
637 prompt,
638 working_dir: working_dir.to_path_buf(),
639 harness,
640 model,
641 };
642
643 match spawn_agent(config, event_tx.clone()).await {
644 Ok(handle) => {
645 handles.push(handle);
646 round_state.task_ids.push(agent.task.id.clone());
647 round_state.tags.push(agent.tag.clone());
648 }
649 Err(e) => {
650 round_state.failures.push(agent.task.id.clone());
651 let _ = event_tx
652 .send(AgentEvent::SpawnFailed {
653 task_id: agent.task.id.clone(),
654 error: e.to_string(),
655 })
656 .await;
657 }
658 }
659 }
660
661 let mut agent_results = Vec::new();
663 for handle in handles {
664 if let Ok(result) = handle.await {
665 agent_results.push(result);
666 }
667 }
668
669 round_state.mark_complete();
670
671 Ok(WaveExecutionResult {
672 round_state,
673 agent_results,
674 })
675}
676
677pub async fn execute_wave_with_tracking<I>(
694 wave_number: usize,
695 task_pairs: I,
696 working_dir: &Path,
697 default_harness: Harness,
698) -> Result<(WaveState, Vec<crate::extensions::runner::AgentResult>)>
699where
700 I: IntoIterator<Item = (Task, String)>,
701{
702 let agents = WaveAgent::from_task_pairs(task_pairs);
703 let result = execute_wave_async(&agents, working_dir, 0, default_harness).await?;
704
705 let wave_state = WaveState::from_execution_result(wave_number, result.clone());
706
707 Ok((wave_state, result.agent_results))
708}
709
710pub async fn execute_wave_in_rounds<I>(
725 wave_number: usize,
726 task_pairs: I,
727 working_dir: &Path,
728 round_size: usize,
729 default_harness: Harness,
730) -> Result<WaveState>
731where
732 I: IntoIterator<Item = (Task, String)>,
733{
734 let agents: Vec<WaveAgent> = WaveAgent::from_task_pairs(task_pairs);
735 let mut wave_state = WaveState::new(wave_number);
736
737 for (round_idx, chunk) in agents.chunks(round_size).enumerate() {
738 let result = execute_wave_async(chunk, working_dir, round_idx, default_harness).await?;
739 wave_state.apply_execution_result(result);
740 }
741
742 wave_state.mark_complete();
743 Ok(wave_state)
744}
745
746pub async fn spawn_subagent(
750 task: &Task,
751 tag: &str,
752 working_dir: &Path,
753 default_harness: Harness,
754) -> Result<crate::extensions::runner::AgentResult> {
755 let agents = vec![WaveAgent {
756 task: task.clone(),
757 tag: tag.to_string(),
758 }];
759
760 let result = execute_wave_async(&agents, working_dir, 0, default_harness).await?;
761
762 result
763 .agent_results
764 .into_iter()
765 .next()
766 .ok_or_else(|| anyhow::anyhow!("No result from agent"))
767}
768
769#[cfg(test)]
770mod tests {
771 use super::*;
772
773 #[test]
774 fn test_round_state_new() {
775 let round = RoundState::new(0);
776 assert_eq!(round.round_number, 0);
777 assert!(round.task_ids.is_empty());
778 assert!(round.completed_at.is_none());
779 }
780
781 #[test]
782 fn test_wave_state_all_task_ids() {
783 let mut wave = WaveState::new(1);
784
785 let mut round1 = RoundState::new(0);
786 round1.task_ids = vec!["task:1".to_string(), "task:2".to_string()];
787
788 let mut round2 = RoundState::new(1);
789 round2.task_ids = vec!["task:3".to_string()];
790
791 wave.rounds.push(round1);
792 wave.rounds.push(round2);
793
794 let all_ids = wave.all_task_ids();
795 assert_eq!(all_ids.len(), 3);
796 assert!(all_ids.contains(&"task:1".to_string()));
797 assert!(all_ids.contains(&"task:2".to_string()));
798 assert!(all_ids.contains(&"task:3".to_string()));
799 }
800
801 #[test]
802 fn test_swarm_session_total_tasks() {
803 let mut session = SwarmSession::new("test-session", "test-tag", "tmux", "/test/path", 5);
804
805 let mut wave = WaveState::new(1);
806 let mut round = RoundState::new(0);
807 round.task_ids = vec!["task:1".to_string(), "task:2".to_string()];
808 wave.rounds.push(round);
809 session.waves.push(wave);
810
811 assert_eq!(session.total_tasks(), 2);
812 }
813
814 #[test]
815 fn test_wave_summary_to_text() {
816 let summary = WaveSummary {
817 wave_number: 1,
818 tasks_completed: vec!["task:1".to_string(), "task:2".to_string()],
819 files_changed: vec!["src/main.rs".to_string()],
820 };
821
822 let text = summary.to_text();
823 assert!(text.contains("Wave 1"));
824 assert!(text.contains("task:1"));
825 assert!(text.contains("src/main.rs"));
826 }
827
828 #[test]
829 fn test_get_previous_summary() {
830 let mut session = SwarmSession::new("test", "tag", "tmux", "/path", 5);
831
832 assert!(session.get_previous_summary().is_none());
834
835 let mut wave = WaveState::new(1);
837 wave.summary = Some(WaveSummary {
838 wave_number: 1,
839 tasks_completed: vec!["task:1".to_string()],
840 files_changed: vec![],
841 });
842 session.waves.push(wave);
843
844 let summary = session.get_previous_summary();
845 assert!(summary.is_some());
846 assert!(summary.unwrap().contains("task:1"));
847 }
848
849 #[test]
850 fn test_session_lock_contention() {
851 use tempfile::TempDir;
852
853 let temp_dir = TempDir::new().unwrap();
855 let project_root = temp_dir.path().to_path_buf();
856
857 let _lock1 = acquire_session_lock(Some(&project_root), "test-tag")
859 .expect("First lock should succeed");
860
861 let result = acquire_session_lock(Some(&project_root), "test-tag");
863
864 match result {
866 Ok(_) => panic!("Second lock should fail"),
867 Err(e) => {
868 let error_msg = e.to_string();
869 assert!(
870 error_msg.contains("already running"),
871 "Error message should mention 'already running', got: {}",
872 error_msg
873 );
874 }
875 }
876 }
877
878 #[test]
879 fn test_get_current_commit() {
880 let result = get_current_commit();
881
882 assert!(result.is_some(), "Expected Some(sha) in a git repository");
884
885 let sha = result.unwrap();
886
887 assert_eq!(
889 sha.len(),
890 40,
891 "Expected SHA to be 40 characters long, got {}",
892 sha.len()
893 );
894
895 assert!(
897 sha.chars().all(|c| c.is_ascii_hexdigit()),
898 "Expected SHA to contain only hex characters, got: {}",
899 sha
900 );
901 }
902
903 #[test]
904 fn test_wave_agent_new() {
905 let task = Task::new(
906 "task:1".to_string(),
907 "Test task".to_string(),
908 "Description".to_string(),
909 );
910 let agent = WaveAgent::new(task.clone(), "test-tag");
911
912 assert_eq!(agent.task_id(), "task:1");
913 assert_eq!(agent.tag, "test-tag");
914 }
915
916 #[test]
917 fn test_wave_agent_from_task_pairs() {
918 let task1 = Task::new(
919 "task:1".to_string(),
920 "Task 1".to_string(),
921 "Description".to_string(),
922 );
923 let task2 = Task::new(
924 "task:2".to_string(),
925 "Task 2".to_string(),
926 "Description".to_string(),
927 );
928
929 let pairs = vec![
930 (task1, "tag-a".to_string()),
931 (task2, "tag-b".to_string()),
932 ];
933
934 let agents = WaveAgent::from_task_pairs(pairs);
935
936 assert_eq!(agents.len(), 2);
937 assert_eq!(agents[0].task_id(), "task:1");
938 assert_eq!(agents[0].tag, "tag-a");
939 assert_eq!(agents[1].task_id(), "task:2");
940 assert_eq!(agents[1].tag, "tag-b");
941 }
942
943 #[test]
944 fn test_wave_execution_result_helpers() {
945 use crate::extensions::runner::AgentResult;
946
947 let result = WaveExecutionResult {
948 round_state: RoundState::new(0),
949 agent_results: vec![
950 AgentResult {
951 task_id: "task:1".to_string(),
952 success: true,
953 exit_code: Some(0),
954 output: String::new(),
955 duration_ms: 1000,
956 },
957 AgentResult {
958 task_id: "task:2".to_string(),
959 success: false,
960 exit_code: Some(1),
961 output: String::new(),
962 duration_ms: 2000,
963 },
964 ],
965 };
966
967 assert!(!result.all_succeeded());
968 assert_eq!(result.successful_task_ids(), vec!["task:1"]);
969 assert_eq!(result.failed_task_ids(), vec!["task:2"]);
970 assert_eq!(result.total_duration_ms(), 2000);
971 }
972
973 #[test]
974 fn test_wave_state_from_execution_result() {
975 use crate::extensions::runner::AgentResult;
976
977 let mut round_state = RoundState::new(0);
978 round_state.task_ids = vec!["task:1".to_string()];
979
980 let result = WaveExecutionResult {
981 round_state,
982 agent_results: vec![AgentResult {
983 task_id: "task:1".to_string(),
984 success: true,
985 exit_code: Some(0),
986 output: String::new(),
987 duration_ms: 1000,
988 }],
989 };
990
991 let wave_state = WaveState::from_execution_result(1, result);
992
993 assert_eq!(wave_state.wave_number, 1);
994 assert_eq!(wave_state.rounds.len(), 1);
995 assert_eq!(wave_state.rounds[0].task_ids, vec!["task:1"]);
996 }
997
998 #[test]
999 fn test_wave_state_apply_execution_result() {
1000 use crate::extensions::runner::AgentResult;
1001
1002 let mut wave_state = WaveState::new(1);
1003 assert!(wave_state.rounds.is_empty());
1004
1005 let mut round_state = RoundState::new(0);
1006 round_state.task_ids = vec!["task:1".to_string()];
1007
1008 let result = WaveExecutionResult {
1009 round_state,
1010 agent_results: vec![AgentResult {
1011 task_id: "task:1".to_string(),
1012 success: true,
1013 exit_code: Some(0),
1014 output: String::new(),
1015 duration_ms: 1000,
1016 }],
1017 };
1018
1019 wave_state.apply_execution_result(result);
1020
1021 assert_eq!(wave_state.rounds.len(), 1);
1022 assert_eq!(wave_state.all_task_ids(), vec!["task:1"]);
1023 }
1024}