1use anyhow::Result;
10use serde::{Deserialize, Serialize};
11use std::fs;
12use std::path::PathBuf;
13use std::process::Command;
14
15use crate::backpressure::ValidationResult;
16
17pub fn get_current_commit() -> Option<String> {
19 Command::new("git")
20 .args(["rev-parse", "HEAD"])
21 .output()
22 .ok()
23 .and_then(|output| {
24 if output.status.success() {
25 Some(String::from_utf8_lossy(&output.stdout).trim().to_string())
26 } else {
27 None
28 }
29 })
30}
31
32#[derive(Debug, Clone, Serialize, Deserialize)]
35pub struct WaveSummary {
36 pub wave_number: usize,
38 pub tasks_completed: Vec<String>,
40 pub files_changed: Vec<String>,
42}
43
44impl WaveSummary {
45 pub fn to_text(&self) -> String {
47 let mut lines = Vec::new();
48
49 lines.push(format!(
50 "Wave {} completed {} task(s):",
51 self.wave_number,
52 self.tasks_completed.len()
53 ));
54
55 for task_id in &self.tasks_completed {
56 lines.push(format!(" - {}", task_id));
57 }
58
59 if !self.files_changed.is_empty() {
60 let file_summary = if self.files_changed.len() <= 5 {
61 self.files_changed.join(", ")
62 } else {
63 format!(
64 "{} and {} more",
65 self.files_changed[..5].join(", "),
66 self.files_changed.len() - 5
67 )
68 };
69 lines.push(format!("Files changed: {}", file_summary));
70 }
71
72 lines.join("\n")
73 }
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct RoundState {
79 pub round_number: usize,
81 pub task_ids: Vec<String>,
83 pub tags: Vec<String>,
85 pub failures: Vec<String>,
87 pub started_at: String,
89 pub completed_at: Option<String>,
91}
92
93impl RoundState {
94 pub fn new(round_number: usize) -> Self {
95 Self {
96 round_number,
97 task_ids: Vec::new(),
98 tags: Vec::new(),
99 failures: Vec::new(),
100 started_at: chrono::Utc::now().to_rfc3339(),
101 completed_at: None,
102 }
103 }
104
105 pub fn mark_complete(&mut self) {
106 self.completed_at = Some(chrono::Utc::now().to_rfc3339());
107 }
108}
109
110#[derive(Debug, Clone, Serialize, Deserialize)]
112pub struct ReviewState {
113 pub reviewed_tasks: Vec<String>,
115 pub all_passed: bool,
117 pub tasks_needing_improvement: Vec<String>,
119 pub completed_at: String,
121}
122
123#[derive(Debug, Clone, Serialize, Deserialize)]
125pub struct RepairAttempt {
126 pub attempt_number: usize,
128 pub attributed_tasks: Vec<String>,
130 pub cleared_tasks: Vec<String>,
132 pub attribution_confidence: String,
134 pub validation_passed: bool,
136 pub completed_at: String,
138}
139
140#[derive(Debug, Clone, Serialize, Deserialize)]
142pub struct WaveState {
143 pub wave_number: usize,
145 pub rounds: Vec<RoundState>,
147 pub validation: Option<ValidationResult>,
149 pub summary: Option<WaveSummary>,
151 #[serde(default)]
153 pub start_commit: Option<String>,
154 #[serde(default)]
156 pub review: Option<ReviewState>,
157 #[serde(default)]
159 pub repairs: Vec<RepairAttempt>,
160 pub started_at: String,
162 pub completed_at: Option<String>,
164}
165
166impl WaveState {
167 pub fn new(wave_number: usize) -> Self {
168 Self {
169 wave_number,
170 rounds: Vec::new(),
171 validation: None,
172 summary: None,
173 start_commit: get_current_commit(),
174 review: None,
175 repairs: Vec::new(),
176 started_at: chrono::Utc::now().to_rfc3339(),
177 completed_at: None,
178 }
179 }
180
181 pub fn mark_complete(&mut self) {
182 self.completed_at = Some(chrono::Utc::now().to_rfc3339());
183 }
184
185 pub fn all_task_ids(&self) -> Vec<String> {
187 self.rounds
188 .iter()
189 .flat_map(|r| r.task_ids.clone())
190 .collect()
191 }
192
193 pub fn task_tags(&self) -> Vec<(String, String)> {
195 self.rounds
196 .iter()
197 .flat_map(|r| {
198 r.task_ids
199 .iter()
200 .zip(r.tags.iter())
201 .map(|(id, tag)| (id.clone(), tag.clone()))
202 })
203 .collect()
204 }
205}
206
207#[derive(Debug, Clone, Serialize, Deserialize)]
209pub struct SwarmSession {
210 pub session_name: String,
212 pub tag: String,
214 pub terminal: String,
216 pub working_dir: String,
218 pub round_size: usize,
220 pub waves: Vec<WaveState>,
222 pub started_at: String,
224 pub completed_at: Option<String>,
226}
227
228impl SwarmSession {
229 pub fn new(
230 session_name: &str,
231 tag: &str,
232 terminal: &str,
233 working_dir: &str,
234 round_size: usize,
235 ) -> Self {
236 Self {
237 session_name: session_name.to_string(),
238 tag: tag.to_string(),
239 terminal: terminal.to_string(),
240 working_dir: working_dir.to_string(),
241 round_size,
242 waves: Vec::new(),
243 started_at: chrono::Utc::now().to_rfc3339(),
244 completed_at: None,
245 }
246 }
247
248 pub fn mark_complete(&mut self) {
249 self.completed_at = Some(chrono::Utc::now().to_rfc3339());
250 }
251
252 pub fn total_tasks(&self) -> usize {
254 self.waves
255 .iter()
256 .flat_map(|w| &w.rounds)
257 .map(|r| r.task_ids.len())
258 .sum()
259 }
260
261 pub fn total_failures(&self) -> usize {
263 self.waves
264 .iter()
265 .flat_map(|w| &w.rounds)
266 .map(|r| r.failures.len())
267 .sum()
268 }
269
270 pub fn get_previous_summary(&self) -> Option<String> {
273 self.waves
274 .last()
275 .and_then(|w| w.summary.as_ref().map(|s| s.to_text()))
276 }
277}
278
279pub fn swarm_dir(project_root: Option<&PathBuf>) -> PathBuf {
281 let root = project_root
282 .cloned()
283 .unwrap_or_else(|| std::env::current_dir().unwrap_or_default());
284 root.join(".scud").join("swarm")
285}
286
287pub fn lock_file_path(project_root: Option<&PathBuf>, tag: &str) -> PathBuf {
289 swarm_dir(project_root).join(format!("{}.lock", tag))
290}
291
292pub struct SessionLock {
295 _file: fs::File,
296 path: PathBuf,
297}
298
299impl SessionLock {
300 pub fn path(&self) -> &PathBuf {
302 &self.path
303 }
304}
305
306impl Drop for SessionLock {
307 fn drop(&mut self) {
308 let _ = fs::remove_file(&self.path);
311 }
312}
313
314pub fn acquire_session_lock(project_root: Option<&PathBuf>, tag: &str) -> Result<SessionLock> {
318 use fs2::FileExt;
319
320 let dir = swarm_dir(project_root);
321 fs::create_dir_all(&dir)?;
322
323 let lock_path = lock_file_path(project_root, tag);
324 let file = fs::OpenOptions::new()
325 .write(true)
326 .create(true)
327 .truncate(true)
328 .open(&lock_path)?;
329
330 file.try_lock_exclusive().map_err(|_| {
332 anyhow::anyhow!(
333 "Another swarm session is already running for tag '{}'. \
334 If this is incorrect, remove the lock file: {}",
335 tag,
336 lock_path.display()
337 )
338 })?;
339
340 use std::io::Write;
342 let mut file = file;
343 writeln!(
344 file,
345 "pid={}\nstarted={}",
346 std::process::id(),
347 chrono::Utc::now().to_rfc3339()
348 )?;
349
350 Ok(SessionLock {
351 _file: file,
352 path: lock_path,
353 })
354}
355
356pub fn session_file(project_root: Option<&PathBuf>, session_name: &str) -> PathBuf {
358 swarm_dir(project_root).join(format!("{}.json", session_name))
359}
360
361pub fn save_session(project_root: Option<&PathBuf>, session: &SwarmSession) -> Result<()> {
363 let dir = swarm_dir(project_root);
364 fs::create_dir_all(&dir)?;
365
366 let file = session_file(project_root, &session.session_name);
367 let json = serde_json::to_string_pretty(session)?;
368 fs::write(file, json)?;
369
370 Ok(())
371}
372
373pub fn load_session(project_root: Option<&PathBuf>, session_name: &str) -> Result<SwarmSession> {
375 let file = session_file(project_root, session_name);
376 let json = fs::read_to_string(&file)?;
377 let session: SwarmSession = serde_json::from_str(&json)?;
378 Ok(session)
379}
380
381pub fn list_sessions(project_root: Option<&PathBuf>) -> Result<Vec<String>> {
383 let dir = swarm_dir(project_root);
384 if !dir.exists() {
385 return Ok(Vec::new());
386 }
387
388 let mut sessions = Vec::new();
389 for entry in fs::read_dir(dir)? {
390 let entry = entry?;
391 let path = entry.path();
392 if path.extension().map(|e| e == "json").unwrap_or(false) {
393 if let Some(stem) = path.file_stem() {
394 sessions.push(stem.to_string_lossy().to_string());
395 }
396 }
397 }
398
399 Ok(sessions)
400}
401
402use std::path::Path;
407use tokio::sync::mpsc;
408
409use crate::commands::spawn::agent::generate_prompt;
410use crate::commands::spawn::terminal::Harness;
411use crate::extensions::runner::{load_agent_config, AgentEvent, AgentRunner, SpawnConfig};
412use crate::models::task::Task;
413
414#[derive(Debug, Clone)]
416pub struct WaveAgent {
417 pub task: Task,
419 pub tag: String,
421}
422
423impl WaveAgent {
424 pub fn new(task: Task, tag: impl Into<String>) -> Self {
426 Self {
427 task,
428 tag: tag.into(),
429 }
430 }
431
432 pub fn from_task_pairs<I>(pairs: I) -> Vec<Self>
437 where
438 I: IntoIterator<Item = (Task, String)>,
439 {
440 pairs.into_iter().map(|(task, tag)| Self::new(task, tag)).collect()
441 }
442
443 pub fn task_id(&self) -> &str {
445 &self.task.id
446 }
447}
448
449#[derive(Debug, Clone)]
451pub struct WaveExecutionResult {
452 pub round_state: RoundState,
454 pub agent_results: Vec<crate::extensions::runner::AgentResult>,
456}
457
458impl WaveExecutionResult {
459 pub fn all_succeeded(&self) -> bool {
461 self.agent_results.iter().all(|r| r.success)
462 }
463
464 pub fn successful_task_ids(&self) -> Vec<String> {
466 self.agent_results
467 .iter()
468 .filter(|r| r.success)
469 .map(|r| r.task_id.clone())
470 .collect()
471 }
472
473 pub fn failed_task_ids(&self) -> Vec<String> {
475 self.agent_results
476 .iter()
477 .filter(|r| !r.success)
478 .map(|r| r.task_id.clone())
479 .collect()
480 }
481
482 pub fn total_duration_ms(&self) -> u64 {
484 self.agent_results.iter().map(|r| r.duration_ms).max().unwrap_or(0)
485 }
486}
487
488impl WaveState {
489 pub fn apply_execution_result(&mut self, result: WaveExecutionResult) {
494 self.rounds.push(result.round_state);
495 }
496
497 pub fn from_execution_result(wave_number: usize, result: WaveExecutionResult) -> Self {
499 let mut state = Self::new(wave_number);
500 state.apply_execution_result(result);
501 state
502 }
503}
504
505pub async fn execute_wave_async(
520 agents: &[WaveAgent],
521 working_dir: &Path,
522 round_number: usize,
523 default_harness: Harness,
524) -> Result<WaveExecutionResult> {
525 let mut round_state = RoundState::new(round_number);
526 let mut runner = AgentRunner::new(agents.len() * 10);
527
528 for agent in agents {
530 let (harness, model) = load_agent_config(
532 agent.task.agent_type.as_deref(),
533 default_harness,
534 None,
535 working_dir,
536 );
537
538 let prompt = generate_prompt(&agent.task, &agent.tag);
540
541 let config = SpawnConfig {
542 task_id: agent.task.id.clone(),
543 prompt,
544 working_dir: working_dir.to_path_buf(),
545 harness,
546 model,
547 };
548
549 match runner.spawn(config).await {
550 Ok(()) => {
551 round_state.task_ids.push(agent.task.id.clone());
552 round_state.tags.push(agent.tag.clone());
553 }
554 Err(e) => {
555 round_state.failures.push(agent.task.id.clone());
556 eprintln!("Failed to spawn agent for {}: {}", agent.task.id, e);
557 }
558 }
559 }
560
561 let agent_results = runner.wait_all().await;
563
564 round_state.mark_complete();
565
566 Ok(WaveExecutionResult {
567 round_state,
568 agent_results,
569 })
570}
571
572pub async fn execute_wave_with_events(
586 agents: &[WaveAgent],
587 working_dir: &Path,
588 round_number: usize,
589 default_harness: Harness,
590 event_tx: mpsc::Sender<AgentEvent>,
591) -> Result<WaveExecutionResult> {
592 use crate::extensions::runner::spawn_agent;
593
594 let mut round_state = RoundState::new(round_number);
595 let mut handles = Vec::new();
596
597 for agent in agents {
599 let (harness, model) = load_agent_config(
601 agent.task.agent_type.as_deref(),
602 default_harness,
603 None,
604 working_dir,
605 );
606
607 let prompt = generate_prompt(&agent.task, &agent.tag);
609
610 let config = SpawnConfig {
611 task_id: agent.task.id.clone(),
612 prompt,
613 working_dir: working_dir.to_path_buf(),
614 harness,
615 model,
616 };
617
618 match spawn_agent(config, event_tx.clone()).await {
619 Ok(handle) => {
620 handles.push(handle);
621 round_state.task_ids.push(agent.task.id.clone());
622 round_state.tags.push(agent.tag.clone());
623 }
624 Err(e) => {
625 round_state.failures.push(agent.task.id.clone());
626 let _ = event_tx
627 .send(AgentEvent::SpawnFailed {
628 task_id: agent.task.id.clone(),
629 error: e.to_string(),
630 })
631 .await;
632 }
633 }
634 }
635
636 let mut agent_results = Vec::new();
638 for handle in handles {
639 if let Ok(result) = handle.await {
640 agent_results.push(result);
641 }
642 }
643
644 round_state.mark_complete();
645
646 Ok(WaveExecutionResult {
647 round_state,
648 agent_results,
649 })
650}
651
652pub async fn execute_wave_with_tracking<I>(
669 wave_number: usize,
670 task_pairs: I,
671 working_dir: &Path,
672 default_harness: Harness,
673) -> Result<(WaveState, Vec<crate::extensions::runner::AgentResult>)>
674where
675 I: IntoIterator<Item = (Task, String)>,
676{
677 let agents = WaveAgent::from_task_pairs(task_pairs);
678 let result = execute_wave_async(&agents, working_dir, 0, default_harness).await?;
679
680 let wave_state = WaveState::from_execution_result(wave_number, result.clone());
681
682 Ok((wave_state, result.agent_results))
683}
684
685pub async fn execute_wave_in_rounds<I>(
700 wave_number: usize,
701 task_pairs: I,
702 working_dir: &Path,
703 round_size: usize,
704 default_harness: Harness,
705) -> Result<WaveState>
706where
707 I: IntoIterator<Item = (Task, String)>,
708{
709 let agents: Vec<WaveAgent> = WaveAgent::from_task_pairs(task_pairs);
710 let mut wave_state = WaveState::new(wave_number);
711
712 for (round_idx, chunk) in agents.chunks(round_size).enumerate() {
713 let result = execute_wave_async(chunk, working_dir, round_idx, default_harness).await?;
714 wave_state.apply_execution_result(result);
715 }
716
717 wave_state.mark_complete();
718 Ok(wave_state)
719}
720
721pub async fn spawn_subagent(
725 task: &Task,
726 tag: &str,
727 working_dir: &Path,
728 default_harness: Harness,
729) -> Result<crate::extensions::runner::AgentResult> {
730 let agents = vec![WaveAgent {
731 task: task.clone(),
732 tag: tag.to_string(),
733 }];
734
735 let result = execute_wave_async(&agents, working_dir, 0, default_harness).await?;
736
737 result
738 .agent_results
739 .into_iter()
740 .next()
741 .ok_or_else(|| anyhow::anyhow!("No result from agent"))
742}
743
744#[cfg(test)]
745mod tests {
746 use super::*;
747
748 #[test]
749 fn test_round_state_new() {
750 let round = RoundState::new(0);
751 assert_eq!(round.round_number, 0);
752 assert!(round.task_ids.is_empty());
753 assert!(round.completed_at.is_none());
754 }
755
756 #[test]
757 fn test_wave_state_all_task_ids() {
758 let mut wave = WaveState::new(1);
759
760 let mut round1 = RoundState::new(0);
761 round1.task_ids = vec!["task:1".to_string(), "task:2".to_string()];
762
763 let mut round2 = RoundState::new(1);
764 round2.task_ids = vec!["task:3".to_string()];
765
766 wave.rounds.push(round1);
767 wave.rounds.push(round2);
768
769 let all_ids = wave.all_task_ids();
770 assert_eq!(all_ids.len(), 3);
771 assert!(all_ids.contains(&"task:1".to_string()));
772 assert!(all_ids.contains(&"task:2".to_string()));
773 assert!(all_ids.contains(&"task:3".to_string()));
774 }
775
776 #[test]
777 fn test_swarm_session_total_tasks() {
778 let mut session = SwarmSession::new("test-session", "test-tag", "tmux", "/test/path", 5);
779
780 let mut wave = WaveState::new(1);
781 let mut round = RoundState::new(0);
782 round.task_ids = vec!["task:1".to_string(), "task:2".to_string()];
783 wave.rounds.push(round);
784 session.waves.push(wave);
785
786 assert_eq!(session.total_tasks(), 2);
787 }
788
789 #[test]
790 fn test_wave_summary_to_text() {
791 let summary = WaveSummary {
792 wave_number: 1,
793 tasks_completed: vec!["task:1".to_string(), "task:2".to_string()],
794 files_changed: vec!["src/main.rs".to_string()],
795 };
796
797 let text = summary.to_text();
798 assert!(text.contains("Wave 1"));
799 assert!(text.contains("task:1"));
800 assert!(text.contains("src/main.rs"));
801 }
802
803 #[test]
804 fn test_get_previous_summary() {
805 let mut session = SwarmSession::new("test", "tag", "tmux", "/path", 5);
806
807 assert!(session.get_previous_summary().is_none());
809
810 let mut wave = WaveState::new(1);
812 wave.summary = Some(WaveSummary {
813 wave_number: 1,
814 tasks_completed: vec!["task:1".to_string()],
815 files_changed: vec![],
816 });
817 session.waves.push(wave);
818
819 let summary = session.get_previous_summary();
820 assert!(summary.is_some());
821 assert!(summary.unwrap().contains("task:1"));
822 }
823
824 #[test]
825 fn test_session_lock_contention() {
826 use tempfile::TempDir;
827
828 let temp_dir = TempDir::new().unwrap();
830 let project_root = temp_dir.path().to_path_buf();
831
832 let _lock1 = acquire_session_lock(Some(&project_root), "test-tag")
834 .expect("First lock should succeed");
835
836 let result = acquire_session_lock(Some(&project_root), "test-tag");
838
839 match result {
841 Ok(_) => panic!("Second lock should fail"),
842 Err(e) => {
843 let error_msg = e.to_string();
844 assert!(
845 error_msg.contains("already running"),
846 "Error message should mention 'already running', got: {}",
847 error_msg
848 );
849 }
850 }
851 }
852
853 #[test]
854 fn test_get_current_commit() {
855 let result = get_current_commit();
856
857 assert!(result.is_some(), "Expected Some(sha) in a git repository");
859
860 let sha = result.unwrap();
861
862 assert_eq!(
864 sha.len(),
865 40,
866 "Expected SHA to be 40 characters long, got {}",
867 sha.len()
868 );
869
870 assert!(
872 sha.chars().all(|c| c.is_ascii_hexdigit()),
873 "Expected SHA to contain only hex characters, got: {}",
874 sha
875 );
876 }
877
878 #[test]
879 fn test_wave_agent_new() {
880 let task = Task::new(
881 "task:1".to_string(),
882 "Test task".to_string(),
883 "Description".to_string(),
884 );
885 let agent = WaveAgent::new(task.clone(), "test-tag");
886
887 assert_eq!(agent.task_id(), "task:1");
888 assert_eq!(agent.tag, "test-tag");
889 }
890
891 #[test]
892 fn test_wave_agent_from_task_pairs() {
893 let task1 = Task::new(
894 "task:1".to_string(),
895 "Task 1".to_string(),
896 "Description".to_string(),
897 );
898 let task2 = Task::new(
899 "task:2".to_string(),
900 "Task 2".to_string(),
901 "Description".to_string(),
902 );
903
904 let pairs = vec![
905 (task1, "tag-a".to_string()),
906 (task2, "tag-b".to_string()),
907 ];
908
909 let agents = WaveAgent::from_task_pairs(pairs);
910
911 assert_eq!(agents.len(), 2);
912 assert_eq!(agents[0].task_id(), "task:1");
913 assert_eq!(agents[0].tag, "tag-a");
914 assert_eq!(agents[1].task_id(), "task:2");
915 assert_eq!(agents[1].tag, "tag-b");
916 }
917
918 #[test]
919 fn test_wave_execution_result_helpers() {
920 use crate::extensions::runner::AgentResult;
921
922 let result = WaveExecutionResult {
923 round_state: RoundState::new(0),
924 agent_results: vec![
925 AgentResult {
926 task_id: "task:1".to_string(),
927 success: true,
928 exit_code: Some(0),
929 output: String::new(),
930 duration_ms: 1000,
931 },
932 AgentResult {
933 task_id: "task:2".to_string(),
934 success: false,
935 exit_code: Some(1),
936 output: String::new(),
937 duration_ms: 2000,
938 },
939 ],
940 };
941
942 assert!(!result.all_succeeded());
943 assert_eq!(result.successful_task_ids(), vec!["task:1"]);
944 assert_eq!(result.failed_task_ids(), vec!["task:2"]);
945 assert_eq!(result.total_duration_ms(), 2000);
946 }
947
948 #[test]
949 fn test_wave_state_from_execution_result() {
950 use crate::extensions::runner::AgentResult;
951
952 let mut round_state = RoundState::new(0);
953 round_state.task_ids = vec!["task:1".to_string()];
954
955 let result = WaveExecutionResult {
956 round_state,
957 agent_results: vec![AgentResult {
958 task_id: "task:1".to_string(),
959 success: true,
960 exit_code: Some(0),
961 output: String::new(),
962 duration_ms: 1000,
963 }],
964 };
965
966 let wave_state = WaveState::from_execution_result(1, result);
967
968 assert_eq!(wave_state.wave_number, 1);
969 assert_eq!(wave_state.rounds.len(), 1);
970 assert_eq!(wave_state.rounds[0].task_ids, vec!["task:1"]);
971 }
972
973 #[test]
974 fn test_wave_state_apply_execution_result() {
975 use crate::extensions::runner::AgentResult;
976
977 let mut wave_state = WaveState::new(1);
978 assert!(wave_state.rounds.is_empty());
979
980 let mut round_state = RoundState::new(0);
981 round_state.task_ids = vec!["task:1".to_string()];
982
983 let result = WaveExecutionResult {
984 round_state,
985 agent_results: vec![AgentResult {
986 task_id: "task:1".to_string(),
987 success: true,
988 exit_code: Some(0),
989 output: String::new(),
990 duration_ms: 1000,
991 }],
992 };
993
994 wave_state.apply_execution_result(result);
995
996 assert_eq!(wave_state.rounds.len(), 1);
997 assert_eq!(wave_state.all_task_ids(), vec!["task:1"]);
998 }
999}