1use std::sync::Arc;
8
9use chrono::Utc;
10use dashmap::DashMap;
11use tokio::sync::Mutex;
12use tracing::{info, warn};
13use uuid::Uuid;
14
15use punch_types::{FighterId, PunchError, PunchResult, SubtaskStatus, SwarmSubtask, SwarmTask};
16
17const STALE_THRESHOLD_SECS: i64 = 300;
20
21#[derive(Debug, Clone)]
23pub struct FighterLoad {
24 pub fighter_id: FighterId,
26 pub active_tasks: usize,
28 pub healthy: bool,
30 pub capabilities: Vec<String>,
32 pub last_heartbeat: i64,
34}
35
36pub struct SwarmCoordinator {
38 tasks: DashMap<Uuid, Arc<Mutex<SwarmTask>>>,
40 fighter_loads: DashMap<FighterId, FighterLoad>,
42}
43
44impl SwarmCoordinator {
45 pub fn new() -> Self {
47 Self {
48 tasks: DashMap::new(),
49 fighter_loads: DashMap::new(),
50 }
51 }
52
53 pub fn register_fighter(&self, fighter_id: FighterId) {
55 self.fighter_loads.insert(
56 fighter_id,
57 FighterLoad {
58 fighter_id,
59 active_tasks: 0,
60 healthy: true,
61 capabilities: vec![],
62 last_heartbeat: Utc::now().timestamp(),
63 },
64 );
65 }
66
67 pub fn register_fighter_with_capabilities(
69 &self,
70 fighter_id: FighterId,
71 capabilities: Vec<String>,
72 ) {
73 self.fighter_loads.insert(
74 fighter_id,
75 FighterLoad {
76 fighter_id,
77 active_tasks: 0,
78 healthy: true,
79 capabilities,
80 last_heartbeat: Utc::now().timestamp(),
81 },
82 );
83 }
84
85 pub fn unregister_fighter(&self, fighter_id: &FighterId) {
87 self.fighter_loads.remove(fighter_id);
88 }
89
90 pub fn record_heartbeat(&self, fighter_id: &FighterId) {
92 if let Some(mut load) = self.fighter_loads.get_mut(fighter_id) {
93 load.last_heartbeat = Utc::now().timestamp();
94 }
95 }
96
97 pub fn decompose_task(&self, description: &str) -> Vec<SwarmSubtask> {
102 let trimmed = description.trim();
103 if trimmed.is_empty() {
104 return vec![SwarmSubtask {
105 id: Uuid::new_v4(),
106 description: description.to_string(),
107 assigned_to: None,
108 status: SubtaskStatus::Pending,
109 result: None,
110 depends_on: vec![],
111 }];
112 }
113
114 let paragraphs: Vec<&str> = trimmed
116 .split("\n\n")
117 .map(|p| p.trim())
118 .filter(|p| !p.is_empty())
119 .collect();
120
121 if paragraphs.len() > 1 {
122 return paragraphs
123 .into_iter()
124 .map(|p| SwarmSubtask {
125 id: Uuid::new_v4(),
126 description: p.to_string(),
127 assigned_to: None,
128 status: SubtaskStatus::Pending,
129 result: None,
130 depends_on: vec![],
131 })
132 .collect();
133 }
134
135 let sentences: Vec<&str> = trimmed
137 .split(". ")
138 .map(|s| s.trim())
139 .filter(|s| !s.is_empty())
140 .collect();
141
142 if sentences.len() > 1 {
143 return sentences
144 .into_iter()
145 .map(|s| SwarmSubtask {
146 id: Uuid::new_v4(),
147 description: s.to_string(),
148 assigned_to: None,
149 status: SubtaskStatus::Pending,
150 result: None,
151 depends_on: vec![],
152 })
153 .collect();
154 }
155
156 let lines: Vec<&str> = trimmed
158 .lines()
159 .map(|l| l.trim())
160 .filter(|l| !l.is_empty())
161 .collect();
162
163 if lines.len() > 1 {
164 return lines
165 .into_iter()
166 .map(|line| SwarmSubtask {
167 id: Uuid::new_v4(),
168 description: line.to_string(),
169 assigned_to: None,
170 status: SubtaskStatus::Pending,
171 result: None,
172 depends_on: vec![],
173 })
174 .collect();
175 }
176
177 vec![SwarmSubtask {
179 id: Uuid::new_v4(),
180 description: trimmed.to_string(),
181 assigned_to: None,
182 status: SubtaskStatus::Pending,
183 result: None,
184 depends_on: vec![],
185 }]
186 }
187
188 pub fn create_task(&self, description: String) -> Uuid {
192 let subtasks = self.decompose_task(&description);
193 let task = SwarmTask {
194 id: Uuid::new_v4(),
195 description,
196 subtasks,
197 progress: 0.0,
198 created_at: Utc::now(),
199 aggregated_result: None,
200 };
201 let id = task.id;
202 self.tasks.insert(id, Arc::new(Mutex::new(task)));
203 info!(%id, "swarm task created");
204 id
205 }
206
207 pub fn create_task_with_subtasks(
209 &self,
210 description: String,
211 subtasks: Vec<SwarmSubtask>,
212 ) -> Uuid {
213 let task = SwarmTask {
214 id: Uuid::new_v4(),
215 description,
216 subtasks,
217 progress: 0.0,
218 created_at: Utc::now(),
219 aggregated_result: None,
220 };
221 let id = task.id;
222 self.tasks.insert(id, Arc::new(Mutex::new(task)));
223 info!(%id, "swarm task created with explicit subtasks");
224 id
225 }
226
227 pub async fn assign_subtasks(&self, task_id: &Uuid) -> PunchResult<Vec<(Uuid, FighterId)>> {
232 let task_ref = self
233 .tasks
234 .get(task_id)
235 .ok_or_else(|| PunchError::Troop(format!("swarm task {} not found", task_id)))?;
236
237 let mut task = task_ref.value().lock().await;
238 let mut assignments = Vec::new();
239
240 let eligible_indices: Vec<usize> = {
242 let subtasks = &task.subtasks;
243 subtasks
244 .iter()
245 .enumerate()
246 .filter(|(_, s)| s.status == SubtaskStatus::Pending)
247 .filter(|(_, s)| {
248 s.depends_on.iter().all(|dep_id| {
249 subtasks
250 .iter()
251 .any(|d| d.id == *dep_id && d.status == SubtaskStatus::Completed)
252 })
253 })
254 .map(|(i, _)| i)
255 .collect()
256 };
257
258 for idx in eligible_indices {
260 let subtask_desc = &task.subtasks[idx].description;
261 let fighter = self.find_best_fighter_for_task(subtask_desc);
262
263 if let Some(fighter_id) = fighter {
264 let subtask = &mut task.subtasks[idx];
265 subtask.assigned_to = Some(fighter_id);
266 subtask.status = SubtaskStatus::Running;
267
268 if let Some(mut load) = self.fighter_loads.get_mut(&fighter_id) {
269 load.active_tasks += 1;
270 }
271
272 assignments.push((subtask.id, fighter_id));
273 info!(
274 task_id = %task_id,
275 subtask_id = %subtask.id,
276 fighter_id = %fighter_id,
277 "subtask assigned"
278 );
279 }
280 }
281
282 Ok(assignments)
283 }
284
285 fn find_best_fighter_for_task(&self, task_description: &str) -> Option<FighterId> {
288 let task_lower = task_description.to_lowercase();
289
290 let capable_fighter = self
292 .fighter_loads
293 .iter()
294 .filter(|entry| entry.value().healthy)
295 .filter(|entry| {
296 entry.value().capabilities.is_empty()
298 || entry
299 .value()
300 .capabilities
301 .iter()
302 .any(|cap| task_lower.contains(&cap.to_lowercase()))
303 })
304 .min_by_key(|entry| entry.value().active_tasks)
305 .map(|entry| entry.value().fighter_id);
306
307 if capable_fighter.is_some() {
308 return capable_fighter;
309 }
310
311 self.find_least_loaded_fighter()
313 }
314
315 fn find_least_loaded_fighter(&self) -> Option<FighterId> {
317 self.fighter_loads
318 .iter()
319 .filter(|entry| entry.value().healthy)
320 .min_by_key(|entry| entry.value().active_tasks)
321 .map(|entry| entry.value().fighter_id)
322 }
323
324 pub async fn complete_subtask(
326 &self,
327 task_id: &Uuid,
328 subtask_id: &Uuid,
329 result: String,
330 ) -> PunchResult<()> {
331 let task_ref = self
332 .tasks
333 .get(task_id)
334 .ok_or_else(|| PunchError::Troop(format!("swarm task {} not found", task_id)))?;
335
336 let mut task = task_ref.value().lock().await;
337
338 let subtask = task
339 .subtasks
340 .iter_mut()
341 .find(|s| s.id == *subtask_id)
342 .ok_or_else(|| {
343 PunchError::Troop(format!(
344 "subtask {} not found in task {}",
345 subtask_id, task_id
346 ))
347 })?;
348
349 if let Some(fighter_id) = subtask.assigned_to
351 && let Some(mut load) = self.fighter_loads.get_mut(&fighter_id)
352 {
353 load.active_tasks = load.active_tasks.saturating_sub(1);
354 }
355
356 subtask.status = SubtaskStatus::Completed;
357 subtask.result = Some(result);
358
359 self.update_progress(&mut task);
361
362 if task.progress >= 1.0 {
364 task.aggregated_result = Some(self.aggregate_results(&task.subtasks));
365 info!(%task_id, "swarm task fully completed");
366 }
367
368 Ok(())
369 }
370
371 fn update_progress(&self, task: &mut SwarmTask) {
373 let total = task.subtasks.len() as f64;
374 let completed = task
375 .subtasks
376 .iter()
377 .filter(|s| s.status == SubtaskStatus::Completed)
378 .count() as f64;
379 task.progress = if total > 0.0 { completed / total } else { 0.0 };
380 }
381
382 fn aggregate_results(&self, subtasks: &[SwarmSubtask]) -> String {
386 let results: Vec<&str> = subtasks
387 .iter()
388 .filter_map(|s| s.result.as_deref())
389 .collect();
390
391 if results.is_empty() {
392 return String::new();
393 }
394
395 let total_len: usize = results.iter().map(|r| r.len()).sum();
398 if total_len < 500 || results.len() <= 2 {
399 results.join("\n\n")
400 } else {
401 results
402 .iter()
403 .enumerate()
404 .map(|(i, r)| format!("--- Section {} ---\n{}", i + 1, r))
405 .collect::<Vec<_>>()
406 .join("\n\n")
407 }
408 }
409
410 pub async fn fail_subtask(
412 &self,
413 task_id: &Uuid,
414 subtask_id: &Uuid,
415 error: String,
416 ) -> PunchResult<()> {
417 let task_ref = self
418 .tasks
419 .get(task_id)
420 .ok_or_else(|| PunchError::Troop(format!("swarm task {} not found", task_id)))?;
421
422 let mut task = task_ref.value().lock().await;
423
424 let subtask = task
425 .subtasks
426 .iter_mut()
427 .find(|s| s.id == *subtask_id)
428 .ok_or_else(|| {
429 PunchError::Troop(format!(
430 "subtask {} not found in task {}",
431 subtask_id, task_id
432 ))
433 })?;
434
435 if let Some(fighter_id) = subtask.assigned_to
437 && let Some(mut load) = self.fighter_loads.get_mut(&fighter_id)
438 {
439 load.active_tasks = load.active_tasks.saturating_sub(1);
440 }
441
442 subtask.status = SubtaskStatus::Failed(error);
443 warn!(%task_id, %subtask_id, "subtask failed");
444
445 Ok(())
446 }
447
448 pub async fn reassign_failed_subtask(
450 &self,
451 task_id: &Uuid,
452 subtask_id: &Uuid,
453 ) -> PunchResult<Option<FighterId>> {
454 let task_ref = self
455 .tasks
456 .get(task_id)
457 .ok_or_else(|| PunchError::Troop(format!("swarm task {} not found", task_id)))?;
458
459 let mut task = task_ref.value().lock().await;
460
461 let subtask = task
462 .subtasks
463 .iter_mut()
464 .find(|s| s.id == *subtask_id)
465 .ok_or_else(|| {
466 PunchError::Troop(format!(
467 "subtask {} not found in task {}",
468 subtask_id, task_id
469 ))
470 })?;
471
472 if !matches!(subtask.status, SubtaskStatus::Failed(_)) {
474 return Err(PunchError::Troop(
475 "can only reassign failed subtasks".to_string(),
476 ));
477 }
478
479 let failed_fighter = subtask.assigned_to;
480
481 let new_fighter = self
483 .fighter_loads
484 .iter()
485 .filter(|entry| entry.value().healthy)
486 .filter(|entry| Some(entry.value().fighter_id) != failed_fighter)
487 .min_by_key(|entry| entry.value().active_tasks)
488 .map(|entry| entry.value().fighter_id);
489
490 if let Some(fighter_id) = new_fighter {
491 subtask.assigned_to = Some(fighter_id);
492 subtask.status = SubtaskStatus::Running;
493
494 if let Some(mut load) = self.fighter_loads.get_mut(&fighter_id) {
495 load.active_tasks += 1;
496 }
497
498 info!(
499 %task_id,
500 %subtask_id,
501 new_fighter = %fighter_id,
502 "subtask reassigned after failure"
503 );
504 }
505
506 Ok(new_fighter)
507 }
508
509 pub fn detect_stale_fighters(&self) -> Vec<FighterId> {
513 let now = Utc::now().timestamp();
514 let mut stale = Vec::new();
515
516 for entry in self.fighter_loads.iter() {
517 let load = entry.value();
518 if load.healthy
519 && load.active_tasks > 0
520 && (now - load.last_heartbeat) > STALE_THRESHOLD_SECS
521 {
522 stale.push(load.fighter_id);
523 }
524 }
525
526 for fighter_id in &stale {
527 self.mark_unhealthy(fighter_id);
528 warn!(
529 %fighter_id,
530 "fighter detected as stale, marked unhealthy"
531 );
532 }
533
534 stale
535 }
536
537 pub fn mark_unhealthy(&self, fighter_id: &FighterId) {
539 if let Some(mut load) = self.fighter_loads.get_mut(fighter_id) {
540 load.healthy = false;
541 warn!(%fighter_id, "fighter marked unhealthy");
542 }
543 }
544
545 pub fn mark_healthy(&self, fighter_id: &FighterId) {
547 if let Some(mut load) = self.fighter_loads.get_mut(fighter_id) {
548 load.healthy = true;
549 load.last_heartbeat = Utc::now().timestamp();
550 info!(%fighter_id, "fighter marked healthy");
551 }
552 }
553
554 pub async fn get_task(&self, task_id: &Uuid) -> Option<SwarmTask> {
556 let task_ref = self.tasks.get(task_id)?;
557 let task = task_ref.value().lock().await;
558 Some(task.clone())
559 }
560
561 pub async fn get_progress(&self, task_id: &Uuid) -> Option<f64> {
563 let task_ref = self.tasks.get(task_id)?;
564 let task = task_ref.value().lock().await;
565 Some(task.progress)
566 }
567
568 pub async fn get_progress_report(&self, task_id: &Uuid) -> Option<ProgressReport> {
570 let task_ref = self.tasks.get(task_id)?;
571 let task = task_ref.value().lock().await;
572
573 let total = task.subtasks.len();
574 let completed = task
575 .subtasks
576 .iter()
577 .filter(|s| s.status == SubtaskStatus::Completed)
578 .count();
579 let running = task
580 .subtasks
581 .iter()
582 .filter(|s| s.status == SubtaskStatus::Running)
583 .count();
584 let failed = task
585 .subtasks
586 .iter()
587 .filter(|s| matches!(s.status, SubtaskStatus::Failed(_)))
588 .count();
589 let pending = task
590 .subtasks
591 .iter()
592 .filter(|s| s.status == SubtaskStatus::Pending)
593 .count();
594
595 Some(ProgressReport {
596 task_id: *task_id,
597 total_subtasks: total,
598 completed,
599 running,
600 failed,
601 pending,
602 percentage: if total > 0 {
603 (completed as f64 / total as f64) * 100.0
604 } else {
605 0.0
606 },
607 })
608 }
609
610 pub fn list_task_ids(&self) -> Vec<Uuid> {
612 self.tasks.iter().map(|entry| *entry.key()).collect()
613 }
614
615 pub fn available_fighter_count(&self) -> usize {
617 self.fighter_loads
618 .iter()
619 .filter(|entry| entry.value().healthy)
620 .count()
621 }
622
623 pub fn get_fighter_loads(&self) -> Vec<FighterLoad> {
625 self.fighter_loads
626 .iter()
627 .map(|entry| entry.value().clone())
628 .collect()
629 }
630
631 pub fn remove_task(&self, task_id: &Uuid) -> bool {
633 self.tasks.remove(task_id).is_some()
634 }
635}
636
637#[derive(Debug, Clone)]
639pub struct ProgressReport {
640 pub task_id: Uuid,
642 pub total_subtasks: usize,
644 pub completed: usize,
646 pub running: usize,
648 pub failed: usize,
650 pub pending: usize,
652 pub percentage: f64,
654}
655
656impl Default for SwarmCoordinator {
657 fn default() -> Self {
658 Self::new()
659 }
660}
661
662#[cfg(test)]
663mod tests {
664 use super::*;
665
666 #[test]
667 fn test_decompose_single_task() {
668 let coord = SwarmCoordinator::new();
669 let subtasks = coord.decompose_task("single task");
670 assert_eq!(subtasks.len(), 1);
671 assert_eq!(subtasks[0].description, "single task");
672 }
673
674 #[test]
675 fn test_decompose_multi_line_task() {
676 let coord = SwarmCoordinator::new();
677 let subtasks = coord.decompose_task("step 1\nstep 2\nstep 3");
678 assert_eq!(subtasks.len(), 3);
679 assert_eq!(subtasks[0].description, "step 1");
680 assert_eq!(subtasks[1].description, "step 2");
681 assert_eq!(subtasks[2].description, "step 3");
682 }
683
684 #[test]
685 fn test_decompose_ignores_blank_lines() {
686 let coord = SwarmCoordinator::new();
687 let subtasks = coord.decompose_task("step 1\n\n\nstep 2\n");
688 assert_eq!(subtasks.len(), 2);
690 }
691
692 #[test]
693 fn test_decompose_by_paragraphs() {
694 let coord = SwarmCoordinator::new();
695 let input = "First paragraph about setup.\n\nSecond paragraph about execution.\n\nThird paragraph about cleanup.";
696 let subtasks = coord.decompose_task(input);
697 assert_eq!(subtasks.len(), 3);
698 assert!(subtasks[0].description.contains("setup"));
699 assert!(subtasks[1].description.contains("execution"));
700 assert!(subtasks[2].description.contains("cleanup"));
701 }
702
703 #[test]
704 fn test_decompose_by_sentences() {
705 let coord = SwarmCoordinator::new();
706 let input = "Analyze the code. Fix bugs. Write tests";
707 let subtasks = coord.decompose_task(input);
708 assert_eq!(subtasks.len(), 3);
709 }
710
711 #[test]
712 fn test_create_task() {
713 let coord = SwarmCoordinator::new();
714 let id = coord.create_task("test task".to_string());
715 assert!(coord.tasks.contains_key(&id));
716 }
717
718 #[test]
719 fn test_create_task_with_subtasks() {
720 let coord = SwarmCoordinator::new();
721 let subtasks = vec![
722 SwarmSubtask {
723 id: Uuid::new_v4(),
724 description: "sub1".to_string(),
725 assigned_to: None,
726 status: SubtaskStatus::Pending,
727 result: None,
728 depends_on: vec![],
729 },
730 SwarmSubtask {
731 id: Uuid::new_v4(),
732 description: "sub2".to_string(),
733 assigned_to: None,
734 status: SubtaskStatus::Pending,
735 result: None,
736 depends_on: vec![],
737 },
738 ];
739 let id = coord.create_task_with_subtasks("parent".to_string(), subtasks);
740 assert!(coord.tasks.contains_key(&id));
741 }
742
743 #[test]
744 fn test_register_and_unregister_fighter() {
745 let coord = SwarmCoordinator::new();
746 let f = FighterId::new();
747 coord.register_fighter(f);
748 assert_eq!(coord.available_fighter_count(), 1);
749 coord.unregister_fighter(&f);
750 assert_eq!(coord.available_fighter_count(), 0);
751 }
752
753 #[test]
754 fn test_register_fighter_with_capabilities() {
755 let coord = SwarmCoordinator::new();
756 let f = FighterId::new();
757 coord.register_fighter_with_capabilities(f, vec!["code".to_string(), "review".to_string()]);
758
759 let load = coord.fighter_loads.get(&f).expect("should exist");
760 assert_eq!(load.capabilities.len(), 2);
761 assert!(load.capabilities.contains(&"code".to_string()));
762 }
763
764 #[test]
765 fn test_find_least_loaded_fighter() {
766 let coord = SwarmCoordinator::new();
767 let f1 = FighterId::new();
768 let f2 = FighterId::new();
769 coord.register_fighter(f1);
770 coord.register_fighter(f2);
771
772 if let Some(mut load) = coord.fighter_loads.get_mut(&f1) {
774 load.active_tasks = 5;
775 }
776
777 let least = coord.find_least_loaded_fighter();
778 assert_eq!(least, Some(f2));
779 }
780
781 #[test]
782 fn test_find_least_loaded_skips_unhealthy() {
783 let coord = SwarmCoordinator::new();
784 let f1 = FighterId::new();
785 let f2 = FighterId::new();
786 coord.register_fighter(f1);
787 coord.register_fighter(f2);
788
789 coord.mark_unhealthy(&f2);
790 let least = coord.find_least_loaded_fighter();
791 assert_eq!(least, Some(f1));
792 }
793
794 #[test]
795 fn test_mark_healthy_unhealthy() {
796 let coord = SwarmCoordinator::new();
797 let f = FighterId::new();
798 coord.register_fighter(f);
799 assert_eq!(coord.available_fighter_count(), 1);
800
801 coord.mark_unhealthy(&f);
802 assert_eq!(coord.available_fighter_count(), 0);
803
804 coord.mark_healthy(&f);
805 assert_eq!(coord.available_fighter_count(), 1);
806 }
807
808 #[tokio::test]
809 async fn test_assign_subtasks() {
810 let coord = SwarmCoordinator::new();
811 let f1 = FighterId::new();
812 let f2 = FighterId::new();
813 coord.register_fighter(f1);
814 coord.register_fighter(f2);
815
816 let task_id = coord.create_task("step 1\nstep 2".to_string());
817 let assignments = coord
818 .assign_subtasks(&task_id)
819 .await
820 .expect("should assign");
821 assert_eq!(assignments.len(), 2);
822 }
823
824 #[tokio::test]
825 async fn test_assign_subtasks_respects_dependencies() {
826 let coord = SwarmCoordinator::new();
827 let f = FighterId::new();
828 coord.register_fighter(f);
829
830 let dep_id = Uuid::new_v4();
831 let subtasks = vec![
832 SwarmSubtask {
833 id: dep_id,
834 description: "first".to_string(),
835 assigned_to: None,
836 status: SubtaskStatus::Pending,
837 result: None,
838 depends_on: vec![],
839 },
840 SwarmSubtask {
841 id: Uuid::new_v4(),
842 description: "second (depends on first)".to_string(),
843 assigned_to: None,
844 status: SubtaskStatus::Pending,
845 result: None,
846 depends_on: vec![dep_id],
847 },
848 ];
849 let task_id = coord.create_task_with_subtasks("pipeline".to_string(), subtasks);
850
851 let assignments = coord
852 .assign_subtasks(&task_id)
853 .await
854 .expect("should assign");
855 assert_eq!(assignments.len(), 1);
857 assert_eq!(assignments[0].0, dep_id);
858 }
859
860 #[tokio::test]
861 async fn test_complete_subtask() {
862 let coord = SwarmCoordinator::new();
863 let f = FighterId::new();
864 coord.register_fighter(f);
865
866 let task_id = coord.create_task("single task".to_string());
867 let assignments = coord
868 .assign_subtasks(&task_id)
869 .await
870 .expect("should assign");
871 assert_eq!(assignments.len(), 1);
872
873 let (subtask_id, _) = assignments[0];
874 coord
875 .complete_subtask(&task_id, &subtask_id, "done".to_string())
876 .await
877 .expect("should complete");
878
879 let task = coord.get_task(&task_id).await.expect("should exist");
880 assert!((task.progress - 1.0).abs() < f64::EPSILON);
881 assert!(task.aggregated_result.is_some());
882 }
883
884 #[tokio::test]
885 async fn test_fail_subtask() {
886 let coord = SwarmCoordinator::new();
887 let f = FighterId::new();
888 coord.register_fighter(f);
889
890 let task_id = coord.create_task("fail task".to_string());
891 let assignments = coord
892 .assign_subtasks(&task_id)
893 .await
894 .expect("should assign");
895 let (subtask_id, _) = assignments[0];
896
897 coord
898 .fail_subtask(&task_id, &subtask_id, "error occurred".to_string())
899 .await
900 .expect("should fail");
901
902 let task = coord.get_task(&task_id).await.expect("should exist");
903 assert!(matches!(task.subtasks[0].status, SubtaskStatus::Failed(_)));
904 }
905
906 #[tokio::test]
907 async fn test_get_progress() {
908 let coord = SwarmCoordinator::new();
909 let f = FighterId::new();
910 coord.register_fighter(f);
911
912 let task_id = coord.create_task("a\nb".to_string());
913 let assignments = coord
914 .assign_subtasks(&task_id)
915 .await
916 .expect("should assign");
917
918 coord
920 .complete_subtask(&task_id, &assignments[0].0, "result 1".to_string())
921 .await
922 .expect("should complete");
923
924 let progress = coord.get_progress(&task_id).await.expect("should exist");
925 assert!((progress - 0.5).abs() < f64::EPSILON);
926 }
927
928 #[tokio::test]
929 async fn test_progress_report() {
930 let coord = SwarmCoordinator::new();
931 let f = FighterId::new();
932 coord.register_fighter(f);
933
934 let task_id = coord.create_task("a\nb\nc".to_string());
935 let assignments = coord
936 .assign_subtasks(&task_id)
937 .await
938 .expect("should assign");
939
940 coord
942 .complete_subtask(&task_id, &assignments[0].0, "done".to_string())
943 .await
944 .expect("should complete");
945
946 let report = coord
947 .get_progress_report(&task_id)
948 .await
949 .expect("should exist");
950 assert_eq!(report.total_subtasks, 3);
951 assert_eq!(report.completed, 1);
952 assert_eq!(report.running, 2);
953 assert_eq!(report.pending, 0);
954 assert_eq!(report.failed, 0);
955 assert!((report.percentage - 33.33).abs() < 1.0);
956 }
957
958 #[tokio::test]
959 async fn test_load_balancing_distributes_evenly() {
960 let coord = SwarmCoordinator::new();
961 let f1 = FighterId::new();
962 let f2 = FighterId::new();
963 let f3 = FighterId::new();
964 coord.register_fighter(f1);
965 coord.register_fighter(f2);
966 coord.register_fighter(f3);
967
968 let task_id = coord.create_task("a\nb\nc\nd\ne\nf".to_string());
969 let assignments = coord
970 .assign_subtasks(&task_id)
971 .await
972 .expect("should assign");
973
974 assert_eq!(assignments.len(), 6);
975
976 let mut counts: std::collections::HashMap<FighterId, usize> =
978 std::collections::HashMap::new();
979 for (_, fighter) in &assignments {
980 *counts.entry(*fighter).or_insert(0) += 1;
981 }
982
983 for count in counts.values() {
985 assert_eq!(*count, 2);
986 }
987 }
988
989 #[tokio::test]
990 async fn test_reassign_failed_subtask() {
991 let coord = SwarmCoordinator::new();
992 let f1 = FighterId::new();
993 let f2 = FighterId::new();
994 coord.register_fighter(f1);
995 coord.register_fighter(f2);
996
997 let task_id = coord.create_task("single task".to_string());
998 let assignments = coord
999 .assign_subtasks(&task_id)
1000 .await
1001 .expect("should assign");
1002 let (subtask_id, original_fighter) = assignments[0];
1003
1004 coord
1006 .fail_subtask(&task_id, &subtask_id, "crashed".to_string())
1007 .await
1008 .expect("should fail");
1009
1010 let new_fighter = coord
1012 .reassign_failed_subtask(&task_id, &subtask_id)
1013 .await
1014 .expect("should reassign");
1015
1016 assert!(new_fighter.is_some());
1017 let new_id = new_fighter.expect("should have new fighter");
1018 assert_ne!(new_id, original_fighter);
1019 }
1020
1021 #[tokio::test]
1022 async fn test_detect_stale_fighters() {
1023 let coord = SwarmCoordinator::new();
1024 let f1 = FighterId::new();
1025 let f2 = FighterId::new();
1026 coord.register_fighter(f1);
1027 coord.register_fighter(f2);
1028
1029 if let Some(mut load) = coord.fighter_loads.get_mut(&f1) {
1031 load.active_tasks = 1;
1032 load.last_heartbeat = Utc::now().timestamp() - STALE_THRESHOLD_SECS - 10;
1033 }
1034
1035 let stale = coord.detect_stale_fighters();
1036 assert!(stale.contains(&f1));
1037 assert!(!stale.contains(&f2));
1038
1039 let load = coord.fighter_loads.get(&f1).expect("should exist");
1041 assert!(!load.healthy);
1042 }
1043
1044 #[test]
1045 fn test_record_heartbeat() {
1046 let coord = SwarmCoordinator::new();
1047 let f = FighterId::new();
1048 coord.register_fighter(f);
1049
1050 if let Some(mut load) = coord.fighter_loads.get_mut(&f) {
1052 load.last_heartbeat = 0;
1053 }
1054
1055 coord.record_heartbeat(&f);
1056
1057 let load = coord.fighter_loads.get(&f).expect("should exist");
1058 assert!(load.last_heartbeat > 0);
1059 }
1060
1061 #[test]
1062 fn test_list_task_ids() {
1063 let coord = SwarmCoordinator::new();
1064 let id1 = coord.create_task("task 1".to_string());
1065 let id2 = coord.create_task("task 2".to_string());
1066
1067 let ids = coord.list_task_ids();
1068 assert_eq!(ids.len(), 2);
1069 assert!(ids.contains(&id1));
1070 assert!(ids.contains(&id2));
1071 }
1072
1073 #[test]
1074 fn test_remove_task() {
1075 let coord = SwarmCoordinator::new();
1076 let id = coord.create_task("temp".to_string());
1077 assert!(coord.remove_task(&id));
1078 assert!(!coord.remove_task(&id)); }
1080
1081 #[tokio::test]
1082 async fn test_complete_subtask_decrements_load() {
1083 let coord = SwarmCoordinator::new();
1084 let f = FighterId::new();
1085 coord.register_fighter(f);
1086
1087 let task_id = coord.create_task("work".to_string());
1088 coord
1089 .assign_subtasks(&task_id)
1090 .await
1091 .expect("should assign");
1092
1093 let load = coord.fighter_loads.get(&f).expect("should exist");
1095 assert_eq!(load.active_tasks, 1);
1096 drop(load);
1097
1098 let task = coord.get_task(&task_id).await.expect("should exist");
1100 let subtask_id = task.subtasks[0].id;
1101
1102 coord
1103 .complete_subtask(&task_id, &subtask_id, "done".to_string())
1104 .await
1105 .expect("should complete");
1106
1107 let load = coord.fighter_loads.get(&f).expect("should exist");
1108 assert_eq!(load.active_tasks, 0);
1109 }
1110
1111 #[tokio::test]
1112 async fn test_no_fighters_available() {
1113 let coord = SwarmCoordinator::new();
1114 let task_id = coord.create_task("lonely task".to_string());
1115 let assignments = coord
1116 .assign_subtasks(&task_id)
1117 .await
1118 .expect("should assign");
1119 assert!(assignments.is_empty());
1120 }
1121
1122 #[tokio::test]
1123 async fn test_get_nonexistent_task() {
1124 let coord = SwarmCoordinator::new();
1125 let result = coord.get_task(&Uuid::new_v4()).await;
1126 assert!(result.is_none());
1127 }
1128
1129 #[tokio::test]
1130 async fn test_assign_nonexistent_task() {
1131 let coord = SwarmCoordinator::new();
1132 let result = coord.assign_subtasks(&Uuid::new_v4()).await;
1133 assert!(result.is_err());
1134 }
1135
1136 #[test]
1137 fn test_default_impl() {
1138 let coord = SwarmCoordinator::default();
1139 assert_eq!(coord.available_fighter_count(), 0);
1140 }
1141
1142 #[tokio::test]
1143 async fn test_aggregated_result_joins_all() {
1144 let coord = SwarmCoordinator::new();
1145 let f = FighterId::new();
1146 coord.register_fighter(f);
1147
1148 let task_id = coord.create_task("a\nb\nc".to_string());
1149 let assignments = coord.assign_subtasks(&task_id).await.expect("assign");
1150
1151 for (subtask_id, _) in &assignments {
1152 coord
1153 .complete_subtask(&task_id, subtask_id, format!("result-{}", subtask_id))
1154 .await
1155 .expect("complete");
1156 }
1157
1158 let task = coord.get_task(&task_id).await.expect("should exist");
1159 let agg = task.aggregated_result.expect("should be aggregated");
1160 assert_eq!(agg.matches("result-").count(), 3);
1161 }
1162
1163 #[test]
1164 fn test_get_fighter_loads() {
1165 let coord = SwarmCoordinator::new();
1166 let f1 = FighterId::new();
1167 let f2 = FighterId::new();
1168 coord.register_fighter(f1);
1169 coord.register_fighter(f2);
1170
1171 let loads = coord.get_fighter_loads();
1172 assert_eq!(loads.len(), 2);
1173 }
1174
1175 #[tokio::test]
1176 async fn test_capability_aware_assignment() {
1177 let coord = SwarmCoordinator::new();
1178 let coder = FighterId::new();
1179 let reviewer = FighterId::new();
1180 coord.register_fighter_with_capabilities(coder, vec!["code".to_string()]);
1181 coord.register_fighter_with_capabilities(reviewer, vec!["review".to_string()]);
1182
1183 let subtasks = vec![SwarmSubtask {
1185 id: Uuid::new_v4(),
1186 description: "fix the code bug".to_string(),
1187 assigned_to: None,
1188 status: SubtaskStatus::Pending,
1189 result: None,
1190 depends_on: vec![],
1191 }];
1192 let task_id = coord.create_task_with_subtasks("code task".to_string(), subtasks);
1193 let assignments = coord
1194 .assign_subtasks(&task_id)
1195 .await
1196 .expect("should assign");
1197
1198 assert_eq!(assignments.len(), 1);
1199 assert_eq!(assignments[0].1, coder);
1200 }
1201}