1use std::sync::Arc;
8
9use chrono::Utc;
10use dashmap::DashMap;
11use tokio::sync::Mutex;
12use tracing::{info, warn};
13use uuid::Uuid;
14
15use punch_types::{
16 FighterId, PunchError, PunchResult, SubtaskStatus, SwarmSubtask, SwarmTask,
17};
18
19const STALE_THRESHOLD_SECS: i64 = 300;
22
23#[derive(Debug, Clone)]
25pub struct FighterLoad {
26 pub fighter_id: FighterId,
28 pub active_tasks: usize,
30 pub healthy: bool,
32 pub capabilities: Vec<String>,
34 pub last_heartbeat: i64,
36}
37
38pub struct SwarmCoordinator {
40 tasks: DashMap<Uuid, Arc<Mutex<SwarmTask>>>,
42 fighter_loads: DashMap<FighterId, FighterLoad>,
44}
45
46impl SwarmCoordinator {
47 pub fn new() -> Self {
49 Self {
50 tasks: DashMap::new(),
51 fighter_loads: DashMap::new(),
52 }
53 }
54
55 pub fn register_fighter(&self, fighter_id: FighterId) {
57 self.fighter_loads.insert(
58 fighter_id,
59 FighterLoad {
60 fighter_id,
61 active_tasks: 0,
62 healthy: true,
63 capabilities: vec![],
64 last_heartbeat: Utc::now().timestamp(),
65 },
66 );
67 }
68
69 pub fn register_fighter_with_capabilities(
71 &self,
72 fighter_id: FighterId,
73 capabilities: Vec<String>,
74 ) {
75 self.fighter_loads.insert(
76 fighter_id,
77 FighterLoad {
78 fighter_id,
79 active_tasks: 0,
80 healthy: true,
81 capabilities,
82 last_heartbeat: Utc::now().timestamp(),
83 },
84 );
85 }
86
87 pub fn unregister_fighter(&self, fighter_id: &FighterId) {
89 self.fighter_loads.remove(fighter_id);
90 }
91
92 pub fn record_heartbeat(&self, fighter_id: &FighterId) {
94 if let Some(mut load) = self.fighter_loads.get_mut(fighter_id) {
95 load.last_heartbeat = Utc::now().timestamp();
96 }
97 }
98
99 pub fn decompose_task(&self, description: &str) -> Vec<SwarmSubtask> {
104 let trimmed = description.trim();
105 if trimmed.is_empty() {
106 return vec![SwarmSubtask {
107 id: Uuid::new_v4(),
108 description: description.to_string(),
109 assigned_to: None,
110 status: SubtaskStatus::Pending,
111 result: None,
112 depends_on: vec![],
113 }];
114 }
115
116 let paragraphs: Vec<&str> = trimmed
118 .split("\n\n")
119 .map(|p| p.trim())
120 .filter(|p| !p.is_empty())
121 .collect();
122
123 if paragraphs.len() > 1 {
124 return paragraphs
125 .into_iter()
126 .map(|p| SwarmSubtask {
127 id: Uuid::new_v4(),
128 description: p.to_string(),
129 assigned_to: None,
130 status: SubtaskStatus::Pending,
131 result: None,
132 depends_on: vec![],
133 })
134 .collect();
135 }
136
137 let sentences: Vec<&str> = trimmed
139 .split(". ")
140 .map(|s| s.trim())
141 .filter(|s| !s.is_empty())
142 .collect();
143
144 if sentences.len() > 1 {
145 return sentences
146 .into_iter()
147 .map(|s| SwarmSubtask {
148 id: Uuid::new_v4(),
149 description: s.to_string(),
150 assigned_to: None,
151 status: SubtaskStatus::Pending,
152 result: None,
153 depends_on: vec![],
154 })
155 .collect();
156 }
157
158 let lines: Vec<&str> = trimmed
160 .lines()
161 .map(|l| l.trim())
162 .filter(|l| !l.is_empty())
163 .collect();
164
165 if lines.len() > 1 {
166 return lines
167 .into_iter()
168 .map(|line| SwarmSubtask {
169 id: Uuid::new_v4(),
170 description: line.to_string(),
171 assigned_to: None,
172 status: SubtaskStatus::Pending,
173 result: None,
174 depends_on: vec![],
175 })
176 .collect();
177 }
178
179 vec![SwarmSubtask {
181 id: Uuid::new_v4(),
182 description: trimmed.to_string(),
183 assigned_to: None,
184 status: SubtaskStatus::Pending,
185 result: None,
186 depends_on: vec![],
187 }]
188 }
189
190 pub fn create_task(&self, description: String) -> Uuid {
194 let subtasks = self.decompose_task(&description);
195 let task = SwarmTask {
196 id: Uuid::new_v4(),
197 description,
198 subtasks,
199 progress: 0.0,
200 created_at: Utc::now(),
201 aggregated_result: None,
202 };
203 let id = task.id;
204 self.tasks.insert(id, Arc::new(Mutex::new(task)));
205 info!(%id, "swarm task created");
206 id
207 }
208
209 pub fn create_task_with_subtasks(
211 &self,
212 description: String,
213 subtasks: Vec<SwarmSubtask>,
214 ) -> Uuid {
215 let task = SwarmTask {
216 id: Uuid::new_v4(),
217 description,
218 subtasks,
219 progress: 0.0,
220 created_at: Utc::now(),
221 aggregated_result: None,
222 };
223 let id = task.id;
224 self.tasks.insert(id, Arc::new(Mutex::new(task)));
225 info!(%id, "swarm task created with explicit subtasks");
226 id
227 }
228
229 pub async fn assign_subtasks(
234 &self,
235 task_id: &Uuid,
236 ) -> PunchResult<Vec<(Uuid, FighterId)>> {
237 let task_ref = self
238 .tasks
239 .get(task_id)
240 .ok_or_else(|| PunchError::Troop(format!("swarm task {} not found", task_id)))?;
241
242 let mut task = task_ref.value().lock().await;
243 let mut assignments = Vec::new();
244
245 let eligible_indices: Vec<usize> = {
247 let subtasks = &task.subtasks;
248 subtasks
249 .iter()
250 .enumerate()
251 .filter(|(_, s)| s.status == SubtaskStatus::Pending)
252 .filter(|(_, s)| {
253 s.depends_on.iter().all(|dep_id| {
254 subtasks
255 .iter()
256 .any(|d| d.id == *dep_id && d.status == SubtaskStatus::Completed)
257 })
258 })
259 .map(|(i, _)| i)
260 .collect()
261 };
262
263 for idx in eligible_indices {
265 let subtask_desc = &task.subtasks[idx].description;
266 let fighter = self.find_best_fighter_for_task(subtask_desc);
267
268 if let Some(fighter_id) = fighter {
269 let subtask = &mut task.subtasks[idx];
270 subtask.assigned_to = Some(fighter_id);
271 subtask.status = SubtaskStatus::Running;
272
273 if let Some(mut load) = self.fighter_loads.get_mut(&fighter_id) {
274 load.active_tasks += 1;
275 }
276
277 assignments.push((subtask.id, fighter_id));
278 info!(
279 task_id = %task_id,
280 subtask_id = %subtask.id,
281 fighter_id = %fighter_id,
282 "subtask assigned"
283 );
284 }
285 }
286
287 Ok(assignments)
288 }
289
290 fn find_best_fighter_for_task(&self, task_description: &str) -> Option<FighterId> {
293 let task_lower = task_description.to_lowercase();
294
295 let capable_fighter = self
297 .fighter_loads
298 .iter()
299 .filter(|entry| entry.value().healthy)
300 .filter(|entry| {
301 entry.value().capabilities.is_empty()
303 || entry
304 .value()
305 .capabilities
306 .iter()
307 .any(|cap| task_lower.contains(&cap.to_lowercase()))
308 })
309 .min_by_key(|entry| entry.value().active_tasks)
310 .map(|entry| entry.value().fighter_id);
311
312 if capable_fighter.is_some() {
313 return capable_fighter;
314 }
315
316 self.find_least_loaded_fighter()
318 }
319
320 fn find_least_loaded_fighter(&self) -> Option<FighterId> {
322 self.fighter_loads
323 .iter()
324 .filter(|entry| entry.value().healthy)
325 .min_by_key(|entry| entry.value().active_tasks)
326 .map(|entry| entry.value().fighter_id)
327 }
328
329 pub async fn complete_subtask(
331 &self,
332 task_id: &Uuid,
333 subtask_id: &Uuid,
334 result: String,
335 ) -> PunchResult<()> {
336 let task_ref = self
337 .tasks
338 .get(task_id)
339 .ok_or_else(|| PunchError::Troop(format!("swarm task {} not found", task_id)))?;
340
341 let mut task = task_ref.value().lock().await;
342
343 let subtask = task
344 .subtasks
345 .iter_mut()
346 .find(|s| s.id == *subtask_id)
347 .ok_or_else(|| {
348 PunchError::Troop(format!("subtask {} not found in task {}", subtask_id, task_id))
349 })?;
350
351 if let Some(fighter_id) = subtask.assigned_to
353 && let Some(mut load) = self.fighter_loads.get_mut(&fighter_id)
354 {
355 load.active_tasks = load.active_tasks.saturating_sub(1);
356 }
357
358 subtask.status = SubtaskStatus::Completed;
359 subtask.result = Some(result);
360
361 self.update_progress(&mut task);
363
364 if task.progress >= 1.0 {
366 task.aggregated_result = Some(self.aggregate_results(&task.subtasks));
367 info!(%task_id, "swarm task fully completed");
368 }
369
370 Ok(())
371 }
372
373 fn update_progress(&self, task: &mut SwarmTask) {
375 let total = task.subtasks.len() as f64;
376 let completed = task
377 .subtasks
378 .iter()
379 .filter(|s| s.status == SubtaskStatus::Completed)
380 .count() as f64;
381 task.progress = if total > 0.0 { completed / total } else { 0.0 };
382 }
383
384 fn aggregate_results(&self, subtasks: &[SwarmSubtask]) -> String {
388 let results: Vec<&str> = subtasks
389 .iter()
390 .filter_map(|s| s.result.as_deref())
391 .collect();
392
393 if results.is_empty() {
394 return String::new();
395 }
396
397 let total_len: usize = results.iter().map(|r| r.len()).sum();
400 if total_len < 500 || results.len() <= 2 {
401 results.join("\n\n")
402 } else {
403 results
404 .iter()
405 .enumerate()
406 .map(|(i, r)| format!("--- Section {} ---\n{}", i + 1, r))
407 .collect::<Vec<_>>()
408 .join("\n\n")
409 }
410 }
411
412 pub async fn fail_subtask(
414 &self,
415 task_id: &Uuid,
416 subtask_id: &Uuid,
417 error: String,
418 ) -> PunchResult<()> {
419 let task_ref = self
420 .tasks
421 .get(task_id)
422 .ok_or_else(|| PunchError::Troop(format!("swarm task {} not found", task_id)))?;
423
424 let mut task = task_ref.value().lock().await;
425
426 let subtask = task
427 .subtasks
428 .iter_mut()
429 .find(|s| s.id == *subtask_id)
430 .ok_or_else(|| {
431 PunchError::Troop(format!("subtask {} not found in task {}", subtask_id, task_id))
432 })?;
433
434 if let Some(fighter_id) = subtask.assigned_to
436 && let Some(mut load) = self.fighter_loads.get_mut(&fighter_id)
437 {
438 load.active_tasks = load.active_tasks.saturating_sub(1);
439 }
440
441 subtask.status = SubtaskStatus::Failed(error);
442 warn!(%task_id, %subtask_id, "subtask failed");
443
444 Ok(())
445 }
446
447 pub async fn reassign_failed_subtask(
449 &self,
450 task_id: &Uuid,
451 subtask_id: &Uuid,
452 ) -> PunchResult<Option<FighterId>> {
453 let task_ref = self
454 .tasks
455 .get(task_id)
456 .ok_or_else(|| PunchError::Troop(format!("swarm task {} not found", task_id)))?;
457
458 let mut task = task_ref.value().lock().await;
459
460 let subtask = task
461 .subtasks
462 .iter_mut()
463 .find(|s| s.id == *subtask_id)
464 .ok_or_else(|| {
465 PunchError::Troop(format!("subtask {} not found in task {}", subtask_id, task_id))
466 })?;
467
468 if !matches!(subtask.status, SubtaskStatus::Failed(_)) {
470 return Err(PunchError::Troop(
471 "can only reassign failed subtasks".to_string(),
472 ));
473 }
474
475 let failed_fighter = subtask.assigned_to;
476
477 let new_fighter = self
479 .fighter_loads
480 .iter()
481 .filter(|entry| entry.value().healthy)
482 .filter(|entry| Some(entry.value().fighter_id) != failed_fighter)
483 .min_by_key(|entry| entry.value().active_tasks)
484 .map(|entry| entry.value().fighter_id);
485
486 if let Some(fighter_id) = new_fighter {
487 subtask.assigned_to = Some(fighter_id);
488 subtask.status = SubtaskStatus::Running;
489
490 if let Some(mut load) = self.fighter_loads.get_mut(&fighter_id) {
491 load.active_tasks += 1;
492 }
493
494 info!(
495 %task_id,
496 %subtask_id,
497 new_fighter = %fighter_id,
498 "subtask reassigned after failure"
499 );
500 }
501
502 Ok(new_fighter)
503 }
504
505 pub fn detect_stale_fighters(&self) -> Vec<FighterId> {
509 let now = Utc::now().timestamp();
510 let mut stale = Vec::new();
511
512 for entry in self.fighter_loads.iter() {
513 let load = entry.value();
514 if load.healthy
515 && load.active_tasks > 0
516 && (now - load.last_heartbeat) > STALE_THRESHOLD_SECS
517 {
518 stale.push(load.fighter_id);
519 }
520 }
521
522 for fighter_id in &stale {
523 self.mark_unhealthy(fighter_id);
524 warn!(
525 %fighter_id,
526 "fighter detected as stale, marked unhealthy"
527 );
528 }
529
530 stale
531 }
532
533 pub fn mark_unhealthy(&self, fighter_id: &FighterId) {
535 if let Some(mut load) = self.fighter_loads.get_mut(fighter_id) {
536 load.healthy = false;
537 warn!(%fighter_id, "fighter marked unhealthy");
538 }
539 }
540
541 pub fn mark_healthy(&self, fighter_id: &FighterId) {
543 if let Some(mut load) = self.fighter_loads.get_mut(fighter_id) {
544 load.healthy = true;
545 load.last_heartbeat = Utc::now().timestamp();
546 info!(%fighter_id, "fighter marked healthy");
547 }
548 }
549
550 pub async fn get_task(&self, task_id: &Uuid) -> Option<SwarmTask> {
552 let task_ref = self.tasks.get(task_id)?;
553 let task = task_ref.value().lock().await;
554 Some(task.clone())
555 }
556
557 pub async fn get_progress(&self, task_id: &Uuid) -> Option<f64> {
559 let task_ref = self.tasks.get(task_id)?;
560 let task = task_ref.value().lock().await;
561 Some(task.progress)
562 }
563
564 pub async fn get_progress_report(&self, task_id: &Uuid) -> Option<ProgressReport> {
566 let task_ref = self.tasks.get(task_id)?;
567 let task = task_ref.value().lock().await;
568
569 let total = task.subtasks.len();
570 let completed = task
571 .subtasks
572 .iter()
573 .filter(|s| s.status == SubtaskStatus::Completed)
574 .count();
575 let running = task
576 .subtasks
577 .iter()
578 .filter(|s| s.status == SubtaskStatus::Running)
579 .count();
580 let failed = task
581 .subtasks
582 .iter()
583 .filter(|s| matches!(s.status, SubtaskStatus::Failed(_)))
584 .count();
585 let pending = task
586 .subtasks
587 .iter()
588 .filter(|s| s.status == SubtaskStatus::Pending)
589 .count();
590
591 Some(ProgressReport {
592 task_id: *task_id,
593 total_subtasks: total,
594 completed,
595 running,
596 failed,
597 pending,
598 percentage: if total > 0 {
599 (completed as f64 / total as f64) * 100.0
600 } else {
601 0.0
602 },
603 })
604 }
605
606 pub fn list_task_ids(&self) -> Vec<Uuid> {
608 self.tasks.iter().map(|entry| *entry.key()).collect()
609 }
610
611 pub fn available_fighter_count(&self) -> usize {
613 self.fighter_loads
614 .iter()
615 .filter(|entry| entry.value().healthy)
616 .count()
617 }
618
619 pub fn get_fighter_loads(&self) -> Vec<FighterLoad> {
621 self.fighter_loads
622 .iter()
623 .map(|entry| entry.value().clone())
624 .collect()
625 }
626
627 pub fn remove_task(&self, task_id: &Uuid) -> bool {
629 self.tasks.remove(task_id).is_some()
630 }
631}
632
633#[derive(Debug, Clone)]
635pub struct ProgressReport {
636 pub task_id: Uuid,
638 pub total_subtasks: usize,
640 pub completed: usize,
642 pub running: usize,
644 pub failed: usize,
646 pub pending: usize,
648 pub percentage: f64,
650}
651
652impl Default for SwarmCoordinator {
653 fn default() -> Self {
654 Self::new()
655 }
656}
657
658#[cfg(test)]
659mod tests {
660 use super::*;
661
662 #[test]
663 fn test_decompose_single_task() {
664 let coord = SwarmCoordinator::new();
665 let subtasks = coord.decompose_task("single task");
666 assert_eq!(subtasks.len(), 1);
667 assert_eq!(subtasks[0].description, "single task");
668 }
669
670 #[test]
671 fn test_decompose_multi_line_task() {
672 let coord = SwarmCoordinator::new();
673 let subtasks = coord.decompose_task("step 1\nstep 2\nstep 3");
674 assert_eq!(subtasks.len(), 3);
675 assert_eq!(subtasks[0].description, "step 1");
676 assert_eq!(subtasks[1].description, "step 2");
677 assert_eq!(subtasks[2].description, "step 3");
678 }
679
680 #[test]
681 fn test_decompose_ignores_blank_lines() {
682 let coord = SwarmCoordinator::new();
683 let subtasks = coord.decompose_task("step 1\n\n\nstep 2\n");
684 assert_eq!(subtasks.len(), 2);
686 }
687
688 #[test]
689 fn test_decompose_by_paragraphs() {
690 let coord = SwarmCoordinator::new();
691 let input = "First paragraph about setup.\n\nSecond paragraph about execution.\n\nThird paragraph about cleanup.";
692 let subtasks = coord.decompose_task(input);
693 assert_eq!(subtasks.len(), 3);
694 assert!(subtasks[0].description.contains("setup"));
695 assert!(subtasks[1].description.contains("execution"));
696 assert!(subtasks[2].description.contains("cleanup"));
697 }
698
699 #[test]
700 fn test_decompose_by_sentences() {
701 let coord = SwarmCoordinator::new();
702 let input = "Analyze the code. Fix bugs. Write tests";
703 let subtasks = coord.decompose_task(input);
704 assert_eq!(subtasks.len(), 3);
705 }
706
707 #[test]
708 fn test_create_task() {
709 let coord = SwarmCoordinator::new();
710 let id = coord.create_task("test task".to_string());
711 assert!(coord.tasks.contains_key(&id));
712 }
713
714 #[test]
715 fn test_create_task_with_subtasks() {
716 let coord = SwarmCoordinator::new();
717 let subtasks = vec![
718 SwarmSubtask {
719 id: Uuid::new_v4(),
720 description: "sub1".to_string(),
721 assigned_to: None,
722 status: SubtaskStatus::Pending,
723 result: None,
724 depends_on: vec![],
725 },
726 SwarmSubtask {
727 id: Uuid::new_v4(),
728 description: "sub2".to_string(),
729 assigned_to: None,
730 status: SubtaskStatus::Pending,
731 result: None,
732 depends_on: vec![],
733 },
734 ];
735 let id = coord.create_task_with_subtasks("parent".to_string(), subtasks);
736 assert!(coord.tasks.contains_key(&id));
737 }
738
739 #[test]
740 fn test_register_and_unregister_fighter() {
741 let coord = SwarmCoordinator::new();
742 let f = FighterId::new();
743 coord.register_fighter(f);
744 assert_eq!(coord.available_fighter_count(), 1);
745 coord.unregister_fighter(&f);
746 assert_eq!(coord.available_fighter_count(), 0);
747 }
748
749 #[test]
750 fn test_register_fighter_with_capabilities() {
751 let coord = SwarmCoordinator::new();
752 let f = FighterId::new();
753 coord.register_fighter_with_capabilities(
754 f,
755 vec!["code".to_string(), "review".to_string()],
756 );
757
758 let load = coord.fighter_loads.get(&f).expect("should exist");
759 assert_eq!(load.capabilities.len(), 2);
760 assert!(load.capabilities.contains(&"code".to_string()));
761 }
762
763 #[test]
764 fn test_find_least_loaded_fighter() {
765 let coord = SwarmCoordinator::new();
766 let f1 = FighterId::new();
767 let f2 = FighterId::new();
768 coord.register_fighter(f1);
769 coord.register_fighter(f2);
770
771 if let Some(mut load) = coord.fighter_loads.get_mut(&f1) {
773 load.active_tasks = 5;
774 }
775
776 let least = coord.find_least_loaded_fighter();
777 assert_eq!(least, Some(f2));
778 }
779
780 #[test]
781 fn test_find_least_loaded_skips_unhealthy() {
782 let coord = SwarmCoordinator::new();
783 let f1 = FighterId::new();
784 let f2 = FighterId::new();
785 coord.register_fighter(f1);
786 coord.register_fighter(f2);
787
788 coord.mark_unhealthy(&f2);
789 let least = coord.find_least_loaded_fighter();
790 assert_eq!(least, Some(f1));
791 }
792
793 #[test]
794 fn test_mark_healthy_unhealthy() {
795 let coord = SwarmCoordinator::new();
796 let f = FighterId::new();
797 coord.register_fighter(f);
798 assert_eq!(coord.available_fighter_count(), 1);
799
800 coord.mark_unhealthy(&f);
801 assert_eq!(coord.available_fighter_count(), 0);
802
803 coord.mark_healthy(&f);
804 assert_eq!(coord.available_fighter_count(), 1);
805 }
806
807 #[tokio::test]
808 async fn test_assign_subtasks() {
809 let coord = SwarmCoordinator::new();
810 let f1 = FighterId::new();
811 let f2 = FighterId::new();
812 coord.register_fighter(f1);
813 coord.register_fighter(f2);
814
815 let task_id = coord.create_task("step 1\nstep 2".to_string());
816 let assignments = coord.assign_subtasks(&task_id).await.expect("should assign");
817 assert_eq!(assignments.len(), 2);
818 }
819
820 #[tokio::test]
821 async fn test_assign_subtasks_respects_dependencies() {
822 let coord = SwarmCoordinator::new();
823 let f = FighterId::new();
824 coord.register_fighter(f);
825
826 let dep_id = Uuid::new_v4();
827 let subtasks = vec![
828 SwarmSubtask {
829 id: dep_id,
830 description: "first".to_string(),
831 assigned_to: None,
832 status: SubtaskStatus::Pending,
833 result: None,
834 depends_on: vec![],
835 },
836 SwarmSubtask {
837 id: Uuid::new_v4(),
838 description: "second (depends on first)".to_string(),
839 assigned_to: None,
840 status: SubtaskStatus::Pending,
841 result: None,
842 depends_on: vec![dep_id],
843 },
844 ];
845 let task_id = coord.create_task_with_subtasks("pipeline".to_string(), subtasks);
846
847 let assignments = coord.assign_subtasks(&task_id).await.expect("should assign");
848 assert_eq!(assignments.len(), 1);
850 assert_eq!(assignments[0].0, dep_id);
851 }
852
853 #[tokio::test]
854 async fn test_complete_subtask() {
855 let coord = SwarmCoordinator::new();
856 let f = FighterId::new();
857 coord.register_fighter(f);
858
859 let task_id = coord.create_task("single task".to_string());
860 let assignments = coord.assign_subtasks(&task_id).await.expect("should assign");
861 assert_eq!(assignments.len(), 1);
862
863 let (subtask_id, _) = assignments[0];
864 coord
865 .complete_subtask(&task_id, &subtask_id, "done".to_string())
866 .await
867 .expect("should complete");
868
869 let task = coord.get_task(&task_id).await.expect("should exist");
870 assert!((task.progress - 1.0).abs() < f64::EPSILON);
871 assert!(task.aggregated_result.is_some());
872 }
873
874 #[tokio::test]
875 async fn test_fail_subtask() {
876 let coord = SwarmCoordinator::new();
877 let f = FighterId::new();
878 coord.register_fighter(f);
879
880 let task_id = coord.create_task("fail task".to_string());
881 let assignments = coord.assign_subtasks(&task_id).await.expect("should assign");
882 let (subtask_id, _) = assignments[0];
883
884 coord
885 .fail_subtask(&task_id, &subtask_id, "error occurred".to_string())
886 .await
887 .expect("should fail");
888
889 let task = coord.get_task(&task_id).await.expect("should exist");
890 assert!(matches!(task.subtasks[0].status, SubtaskStatus::Failed(_)));
891 }
892
893 #[tokio::test]
894 async fn test_get_progress() {
895 let coord = SwarmCoordinator::new();
896 let f = FighterId::new();
897 coord.register_fighter(f);
898
899 let task_id = coord.create_task("a\nb".to_string());
900 let assignments = coord.assign_subtasks(&task_id).await.expect("should assign");
901
902 coord
904 .complete_subtask(&task_id, &assignments[0].0, "result 1".to_string())
905 .await
906 .expect("should complete");
907
908 let progress = coord.get_progress(&task_id).await.expect("should exist");
909 assert!((progress - 0.5).abs() < f64::EPSILON);
910 }
911
912 #[tokio::test]
913 async fn test_progress_report() {
914 let coord = SwarmCoordinator::new();
915 let f = FighterId::new();
916 coord.register_fighter(f);
917
918 let task_id = coord.create_task("a\nb\nc".to_string());
919 let assignments = coord.assign_subtasks(&task_id).await.expect("should assign");
920
921 coord
923 .complete_subtask(&task_id, &assignments[0].0, "done".to_string())
924 .await
925 .expect("should complete");
926
927 let report = coord
928 .get_progress_report(&task_id)
929 .await
930 .expect("should exist");
931 assert_eq!(report.total_subtasks, 3);
932 assert_eq!(report.completed, 1);
933 assert_eq!(report.running, 2);
934 assert_eq!(report.pending, 0);
935 assert_eq!(report.failed, 0);
936 assert!((report.percentage - 33.33).abs() < 1.0);
937 }
938
939 #[tokio::test]
940 async fn test_load_balancing_distributes_evenly() {
941 let coord = SwarmCoordinator::new();
942 let f1 = FighterId::new();
943 let f2 = FighterId::new();
944 let f3 = FighterId::new();
945 coord.register_fighter(f1);
946 coord.register_fighter(f2);
947 coord.register_fighter(f3);
948
949 let task_id = coord.create_task("a\nb\nc\nd\ne\nf".to_string());
950 let assignments = coord.assign_subtasks(&task_id).await.expect("should assign");
951
952 assert_eq!(assignments.len(), 6);
953
954 let mut counts: std::collections::HashMap<FighterId, usize> =
956 std::collections::HashMap::new();
957 for (_, fighter) in &assignments {
958 *counts.entry(*fighter).or_insert(0) += 1;
959 }
960
961 for count in counts.values() {
963 assert_eq!(*count, 2);
964 }
965 }
966
967 #[tokio::test]
968 async fn test_reassign_failed_subtask() {
969 let coord = SwarmCoordinator::new();
970 let f1 = FighterId::new();
971 let f2 = FighterId::new();
972 coord.register_fighter(f1);
973 coord.register_fighter(f2);
974
975 let task_id = coord.create_task("single task".to_string());
976 let assignments = coord.assign_subtasks(&task_id).await.expect("should assign");
977 let (subtask_id, original_fighter) = assignments[0];
978
979 coord
981 .fail_subtask(&task_id, &subtask_id, "crashed".to_string())
982 .await
983 .expect("should fail");
984
985 let new_fighter = coord
987 .reassign_failed_subtask(&task_id, &subtask_id)
988 .await
989 .expect("should reassign");
990
991 assert!(new_fighter.is_some());
992 let new_id = new_fighter.expect("should have new fighter");
993 assert_ne!(new_id, original_fighter);
994 }
995
996 #[tokio::test]
997 async fn test_detect_stale_fighters() {
998 let coord = SwarmCoordinator::new();
999 let f1 = FighterId::new();
1000 let f2 = FighterId::new();
1001 coord.register_fighter(f1);
1002 coord.register_fighter(f2);
1003
1004 if let Some(mut load) = coord.fighter_loads.get_mut(&f1) {
1006 load.active_tasks = 1;
1007 load.last_heartbeat = Utc::now().timestamp() - STALE_THRESHOLD_SECS - 10;
1008 }
1009
1010 let stale = coord.detect_stale_fighters();
1011 assert!(stale.contains(&f1));
1012 assert!(!stale.contains(&f2));
1013
1014 let load = coord.fighter_loads.get(&f1).expect("should exist");
1016 assert!(!load.healthy);
1017 }
1018
1019 #[test]
1020 fn test_record_heartbeat() {
1021 let coord = SwarmCoordinator::new();
1022 let f = FighterId::new();
1023 coord.register_fighter(f);
1024
1025 if let Some(mut load) = coord.fighter_loads.get_mut(&f) {
1027 load.last_heartbeat = 0;
1028 }
1029
1030 coord.record_heartbeat(&f);
1031
1032 let load = coord.fighter_loads.get(&f).expect("should exist");
1033 assert!(load.last_heartbeat > 0);
1034 }
1035
1036 #[test]
1037 fn test_list_task_ids() {
1038 let coord = SwarmCoordinator::new();
1039 let id1 = coord.create_task("task 1".to_string());
1040 let id2 = coord.create_task("task 2".to_string());
1041
1042 let ids = coord.list_task_ids();
1043 assert_eq!(ids.len(), 2);
1044 assert!(ids.contains(&id1));
1045 assert!(ids.contains(&id2));
1046 }
1047
1048 #[test]
1049 fn test_remove_task() {
1050 let coord = SwarmCoordinator::new();
1051 let id = coord.create_task("temp".to_string());
1052 assert!(coord.remove_task(&id));
1053 assert!(!coord.remove_task(&id)); }
1055
1056 #[tokio::test]
1057 async fn test_complete_subtask_decrements_load() {
1058 let coord = SwarmCoordinator::new();
1059 let f = FighterId::new();
1060 coord.register_fighter(f);
1061
1062 let task_id = coord.create_task("work".to_string());
1063 coord.assign_subtasks(&task_id).await.expect("should assign");
1064
1065 let load = coord.fighter_loads.get(&f).expect("should exist");
1067 assert_eq!(load.active_tasks, 1);
1068 drop(load);
1069
1070 let task = coord.get_task(&task_id).await.expect("should exist");
1072 let subtask_id = task.subtasks[0].id;
1073
1074 coord
1075 .complete_subtask(&task_id, &subtask_id, "done".to_string())
1076 .await
1077 .expect("should complete");
1078
1079 let load = coord.fighter_loads.get(&f).expect("should exist");
1080 assert_eq!(load.active_tasks, 0);
1081 }
1082
1083 #[tokio::test]
1084 async fn test_no_fighters_available() {
1085 let coord = SwarmCoordinator::new();
1086 let task_id = coord.create_task("lonely task".to_string());
1087 let assignments = coord.assign_subtasks(&task_id).await.expect("should assign");
1088 assert!(assignments.is_empty());
1089 }
1090
1091 #[tokio::test]
1092 async fn test_get_nonexistent_task() {
1093 let coord = SwarmCoordinator::new();
1094 let result = coord.get_task(&Uuid::new_v4()).await;
1095 assert!(result.is_none());
1096 }
1097
1098 #[tokio::test]
1099 async fn test_assign_nonexistent_task() {
1100 let coord = SwarmCoordinator::new();
1101 let result = coord.assign_subtasks(&Uuid::new_v4()).await;
1102 assert!(result.is_err());
1103 }
1104
1105 #[test]
1106 fn test_default_impl() {
1107 let coord = SwarmCoordinator::default();
1108 assert_eq!(coord.available_fighter_count(), 0);
1109 }
1110
1111 #[tokio::test]
1112 async fn test_aggregated_result_joins_all() {
1113 let coord = SwarmCoordinator::new();
1114 let f = FighterId::new();
1115 coord.register_fighter(f);
1116
1117 let task_id = coord.create_task("a\nb\nc".to_string());
1118 let assignments = coord.assign_subtasks(&task_id).await.expect("assign");
1119
1120 for (subtask_id, _) in &assignments {
1121 coord
1122 .complete_subtask(&task_id, subtask_id, format!("result-{}", subtask_id))
1123 .await
1124 .expect("complete");
1125 }
1126
1127 let task = coord.get_task(&task_id).await.expect("should exist");
1128 let agg = task.aggregated_result.expect("should be aggregated");
1129 assert_eq!(agg.matches("result-").count(), 3);
1130 }
1131
1132 #[test]
1133 fn test_get_fighter_loads() {
1134 let coord = SwarmCoordinator::new();
1135 let f1 = FighterId::new();
1136 let f2 = FighterId::new();
1137 coord.register_fighter(f1);
1138 coord.register_fighter(f2);
1139
1140 let loads = coord.get_fighter_loads();
1141 assert_eq!(loads.len(), 2);
1142 }
1143
1144 #[tokio::test]
1145 async fn test_capability_aware_assignment() {
1146 let coord = SwarmCoordinator::new();
1147 let coder = FighterId::new();
1148 let reviewer = FighterId::new();
1149 coord.register_fighter_with_capabilities(coder, vec!["code".to_string()]);
1150 coord.register_fighter_with_capabilities(reviewer, vec!["review".to_string()]);
1151
1152 let subtasks = vec![SwarmSubtask {
1154 id: Uuid::new_v4(),
1155 description: "fix the code bug".to_string(),
1156 assigned_to: None,
1157 status: SubtaskStatus::Pending,
1158 result: None,
1159 depends_on: vec![],
1160 }];
1161 let task_id = coord.create_task_with_subtasks("code task".to_string(), subtasks);
1162 let assignments = coord.assign_subtasks(&task_id).await.expect("should assign");
1163
1164 assert_eq!(assignments.len(), 1);
1165 assert_eq!(assignments[0].1, coder);
1166 }
1167}