Skip to main content

punch_kernel/
swarm.rs

1//! # Swarm Intelligence
2//!
3//! Higher-level coordination for emergent behavior across multiple fighters.
4//! The swarm coordinator manages complex tasks by decomposing them into subtasks,
5//! assigning them based on capabilities, and aggregating results.
6
7use std::sync::Arc;
8
9use chrono::Utc;
10use dashmap::DashMap;
11use tokio::sync::Mutex;
12use tracing::{info, warn};
13use uuid::Uuid;
14
15use punch_types::{
16    FighterId, PunchError, PunchResult, SubtaskStatus, SwarmSubtask, SwarmTask,
17};
18
19/// How long (in seconds) a running subtask can go without update before
20/// being considered stale.
21const STALE_THRESHOLD_SECS: i64 = 300;
22
23/// A record of fighter load for balancing purposes.
24#[derive(Debug, Clone)]
25pub struct FighterLoad {
26    /// The fighter's ID.
27    pub fighter_id: FighterId,
28    /// Number of currently assigned subtasks.
29    pub active_tasks: usize,
30    /// Whether the fighter is considered healthy.
31    pub healthy: bool,
32    /// Fighter capabilities for capability-aware assignment.
33    pub capabilities: Vec<String>,
34    /// Timestamp of last health check (epoch seconds).
35    pub last_heartbeat: i64,
36}
37
38/// The swarm coordinator manages concurrent swarm tasks across available fighters.
39pub struct SwarmCoordinator {
40    /// Active swarm tasks keyed by their UUID.
41    tasks: DashMap<Uuid, Arc<Mutex<SwarmTask>>>,
42    /// Fighter load tracking for balancing.
43    fighter_loads: DashMap<FighterId, FighterLoad>,
44}
45
46impl SwarmCoordinator {
47    /// Create a new swarm coordinator.
48    pub fn new() -> Self {
49        Self {
50            tasks: DashMap::new(),
51            fighter_loads: DashMap::new(),
52        }
53    }
54
55    /// Register a fighter as available for swarm work.
56    pub fn register_fighter(&self, fighter_id: FighterId) {
57        self.fighter_loads.insert(
58            fighter_id,
59            FighterLoad {
60                fighter_id,
61                active_tasks: 0,
62                healthy: true,
63                capabilities: vec![],
64                last_heartbeat: Utc::now().timestamp(),
65            },
66        );
67    }
68
69    /// Register a fighter with specific capabilities.
70    pub fn register_fighter_with_capabilities(
71        &self,
72        fighter_id: FighterId,
73        capabilities: Vec<String>,
74    ) {
75        self.fighter_loads.insert(
76            fighter_id,
77            FighterLoad {
78                fighter_id,
79                active_tasks: 0,
80                healthy: true,
81                capabilities,
82                last_heartbeat: Utc::now().timestamp(),
83            },
84        );
85    }
86
87    /// Remove a fighter from the available pool.
88    pub fn unregister_fighter(&self, fighter_id: &FighterId) {
89        self.fighter_loads.remove(fighter_id);
90    }
91
92    /// Record a heartbeat from a fighter to track liveness.
93    pub fn record_heartbeat(&self, fighter_id: &FighterId) {
94        if let Some(mut load) = self.fighter_loads.get_mut(fighter_id) {
95            load.last_heartbeat = Utc::now().timestamp();
96        }
97    }
98
99    /// Decompose a task description into subtasks.
100    ///
101    /// Uses intelligent splitting: tries paragraph breaks first, then
102    /// sentence boundaries, then falls back to line-based splitting.
103    pub fn decompose_task(&self, description: &str) -> Vec<SwarmSubtask> {
104        let trimmed = description.trim();
105        if trimmed.is_empty() {
106            return vec![SwarmSubtask {
107                id: Uuid::new_v4(),
108                description: description.to_string(),
109                assigned_to: None,
110                status: SubtaskStatus::Pending,
111                result: None,
112                depends_on: vec![],
113            }];
114        }
115
116        // Try splitting by paragraph breaks (double newlines) first.
117        let paragraphs: Vec<&str> = trimmed
118            .split("\n\n")
119            .map(|p| p.trim())
120            .filter(|p| !p.is_empty())
121            .collect();
122
123        if paragraphs.len() > 1 {
124            return paragraphs
125                .into_iter()
126                .map(|p| SwarmSubtask {
127                    id: Uuid::new_v4(),
128                    description: p.to_string(),
129                    assigned_to: None,
130                    status: SubtaskStatus::Pending,
131                    result: None,
132                    depends_on: vec![],
133                })
134                .collect();
135        }
136
137        // Try splitting by sentences (period followed by space or end).
138        let sentences: Vec<&str> = trimmed
139            .split(". ")
140            .map(|s| s.trim())
141            .filter(|s| !s.is_empty())
142            .collect();
143
144        if sentences.len() > 1 {
145            return sentences
146                .into_iter()
147                .map(|s| SwarmSubtask {
148                    id: Uuid::new_v4(),
149                    description: s.to_string(),
150                    assigned_to: None,
151                    status: SubtaskStatus::Pending,
152                    result: None,
153                    depends_on: vec![],
154                })
155                .collect();
156        }
157
158        // Fall back to line-based splitting.
159        let lines: Vec<&str> = trimmed
160            .lines()
161            .map(|l| l.trim())
162            .filter(|l| !l.is_empty())
163            .collect();
164
165        if lines.len() > 1 {
166            return lines
167                .into_iter()
168                .map(|line| SwarmSubtask {
169                    id: Uuid::new_v4(),
170                    description: line.to_string(),
171                    assigned_to: None,
172                    status: SubtaskStatus::Pending,
173                    result: None,
174                    depends_on: vec![],
175                })
176                .collect();
177        }
178
179        // Single subtask for atomic tasks.
180        vec![SwarmSubtask {
181            id: Uuid::new_v4(),
182            description: trimmed.to_string(),
183            assigned_to: None,
184            status: SubtaskStatus::Pending,
185            result: None,
186            depends_on: vec![],
187        }]
188    }
189
190    /// Create a new swarm task from a description.
191    ///
192    /// The task is automatically decomposed into subtasks.
193    pub fn create_task(&self, description: String) -> Uuid {
194        let subtasks = self.decompose_task(&description);
195        let task = SwarmTask {
196            id: Uuid::new_v4(),
197            description,
198            subtasks,
199            progress: 0.0,
200            created_at: Utc::now(),
201            aggregated_result: None,
202        };
203        let id = task.id;
204        self.tasks.insert(id, Arc::new(Mutex::new(task)));
205        info!(%id, "swarm task created");
206        id
207    }
208
209    /// Create a swarm task with explicit subtasks (for pipeline or dependent tasks).
210    pub fn create_task_with_subtasks(
211        &self,
212        description: String,
213        subtasks: Vec<SwarmSubtask>,
214    ) -> Uuid {
215        let task = SwarmTask {
216            id: Uuid::new_v4(),
217            description,
218            subtasks,
219            progress: 0.0,
220            created_at: Utc::now(),
221            aggregated_result: None,
222        };
223        let id = task.id;
224        self.tasks.insert(id, Arc::new(Mutex::new(task)));
225        info!(%id, "swarm task created with explicit subtasks");
226        id
227    }
228
229    /// Assign pending subtasks to available fighters using load balancing
230    /// and capability matching.
231    ///
232    /// Returns a list of (subtask_id, fighter_id) assignments.
233    pub async fn assign_subtasks(
234        &self,
235        task_id: &Uuid,
236    ) -> PunchResult<Vec<(Uuid, FighterId)>> {
237        let task_ref = self
238            .tasks
239            .get(task_id)
240            .ok_or_else(|| PunchError::Troop(format!("swarm task {} not found", task_id)))?;
241
242        let mut task = task_ref.value().lock().await;
243        let mut assignments = Vec::new();
244
245        // First pass: determine which subtasks are eligible for assignment.
246        let eligible_indices: Vec<usize> = {
247            let subtasks = &task.subtasks;
248            subtasks
249                .iter()
250                .enumerate()
251                .filter(|(_, s)| s.status == SubtaskStatus::Pending)
252                .filter(|(_, s)| {
253                    s.depends_on.iter().all(|dep_id| {
254                        subtasks
255                            .iter()
256                            .any(|d| d.id == *dep_id && d.status == SubtaskStatus::Completed)
257                    })
258                })
259                .map(|(i, _)| i)
260                .collect()
261        };
262
263        // Second pass: assign eligible subtasks considering capabilities.
264        for idx in eligible_indices {
265            let subtask_desc = &task.subtasks[idx].description;
266            let fighter = self.find_best_fighter_for_task(subtask_desc);
267
268            if let Some(fighter_id) = fighter {
269                let subtask = &mut task.subtasks[idx];
270                subtask.assigned_to = Some(fighter_id);
271                subtask.status = SubtaskStatus::Running;
272
273                if let Some(mut load) = self.fighter_loads.get_mut(&fighter_id) {
274                    load.active_tasks += 1;
275                }
276
277                assignments.push((subtask.id, fighter_id));
278                info!(
279                    task_id = %task_id,
280                    subtask_id = %subtask.id,
281                    fighter_id = %fighter_id,
282                    "subtask assigned"
283                );
284            }
285        }
286
287        Ok(assignments)
288    }
289
290    /// Find the best fighter for a given task, considering both load and
291    /// capability matching.
292    fn find_best_fighter_for_task(&self, task_description: &str) -> Option<FighterId> {
293        let task_lower = task_description.to_lowercase();
294
295        // First try to find a capable fighter with the lowest load.
296        let capable_fighter = self
297            .fighter_loads
298            .iter()
299            .filter(|entry| entry.value().healthy)
300            .filter(|entry| {
301                // Either has matching capability or has no capabilities (generalist).
302                entry.value().capabilities.is_empty()
303                    || entry
304                        .value()
305                        .capabilities
306                        .iter()
307                        .any(|cap| task_lower.contains(&cap.to_lowercase()))
308            })
309            .min_by_key(|entry| entry.value().active_tasks)
310            .map(|entry| entry.value().fighter_id);
311
312        if capable_fighter.is_some() {
313            return capable_fighter;
314        }
315
316        // Fall back to any healthy fighter with least load.
317        self.find_least_loaded_fighter()
318    }
319
320    /// Find the fighter with the lowest active task count.
321    fn find_least_loaded_fighter(&self) -> Option<FighterId> {
322        self.fighter_loads
323            .iter()
324            .filter(|entry| entry.value().healthy)
325            .min_by_key(|entry| entry.value().active_tasks)
326            .map(|entry| entry.value().fighter_id)
327    }
328
329    /// Report the completion of a subtask.
330    pub async fn complete_subtask(
331        &self,
332        task_id: &Uuid,
333        subtask_id: &Uuid,
334        result: String,
335    ) -> PunchResult<()> {
336        let task_ref = self
337            .tasks
338            .get(task_id)
339            .ok_or_else(|| PunchError::Troop(format!("swarm task {} not found", task_id)))?;
340
341        let mut task = task_ref.value().lock().await;
342
343        let subtask = task
344            .subtasks
345            .iter_mut()
346            .find(|s| s.id == *subtask_id)
347            .ok_or_else(|| {
348                PunchError::Troop(format!("subtask {} not found in task {}", subtask_id, task_id))
349            })?;
350
351        // Decrement load for the assigned fighter.
352        if let Some(fighter_id) = subtask.assigned_to
353            && let Some(mut load) = self.fighter_loads.get_mut(&fighter_id)
354        {
355            load.active_tasks = load.active_tasks.saturating_sub(1);
356        }
357
358        subtask.status = SubtaskStatus::Completed;
359        subtask.result = Some(result);
360
361        // Update overall progress.
362        self.update_progress(&mut task);
363
364        // If all subtasks are done, aggregate results intelligently.
365        if task.progress >= 1.0 {
366            task.aggregated_result = Some(self.aggregate_results(&task.subtasks));
367            info!(%task_id, "swarm task fully completed");
368        }
369
370        Ok(())
371    }
372
373    /// Update the progress field of a task based on subtask completion.
374    fn update_progress(&self, task: &mut SwarmTask) {
375        let total = task.subtasks.len() as f64;
376        let completed = task
377            .subtasks
378            .iter()
379            .filter(|s| s.status == SubtaskStatus::Completed)
380            .count() as f64;
381        task.progress = if total > 0.0 { completed / total } else { 0.0 };
382    }
383
384    /// Aggregate results from completed subtasks intelligently.
385    ///
386    /// Merges results preserving order and removing redundancy.
387    fn aggregate_results(&self, subtasks: &[SwarmSubtask]) -> String {
388        let results: Vec<&str> = subtasks
389            .iter()
390            .filter_map(|s| s.result.as_deref())
391            .collect();
392
393        if results.is_empty() {
394            return String::new();
395        }
396
397        // If results are short, join with double newlines.
398        // If long, add headers for each section.
399        let total_len: usize = results.iter().map(|r| r.len()).sum();
400        if total_len < 500 || results.len() <= 2 {
401            results.join("\n\n")
402        } else {
403            results
404                .iter()
405                .enumerate()
406                .map(|(i, r)| format!("--- Section {} ---\n{}", i + 1, r))
407                .collect::<Vec<_>>()
408                .join("\n\n")
409        }
410    }
411
412    /// Report the failure of a subtask.
413    pub async fn fail_subtask(
414        &self,
415        task_id: &Uuid,
416        subtask_id: &Uuid,
417        error: String,
418    ) -> PunchResult<()> {
419        let task_ref = self
420            .tasks
421            .get(task_id)
422            .ok_or_else(|| PunchError::Troop(format!("swarm task {} not found", task_id)))?;
423
424        let mut task = task_ref.value().lock().await;
425
426        let subtask = task
427            .subtasks
428            .iter_mut()
429            .find(|s| s.id == *subtask_id)
430            .ok_or_else(|| {
431                PunchError::Troop(format!("subtask {} not found in task {}", subtask_id, task_id))
432            })?;
433
434        // Decrement load for the assigned fighter.
435        if let Some(fighter_id) = subtask.assigned_to
436            && let Some(mut load) = self.fighter_loads.get_mut(&fighter_id)
437        {
438            load.active_tasks = load.active_tasks.saturating_sub(1);
439        }
440
441        subtask.status = SubtaskStatus::Failed(error);
442        warn!(%task_id, %subtask_id, "subtask failed");
443
444        Ok(())
445    }
446
447    /// Reassign a failed subtask to a different fighter.
448    pub async fn reassign_failed_subtask(
449        &self,
450        task_id: &Uuid,
451        subtask_id: &Uuid,
452    ) -> PunchResult<Option<FighterId>> {
453        let task_ref = self
454            .tasks
455            .get(task_id)
456            .ok_or_else(|| PunchError::Troop(format!("swarm task {} not found", task_id)))?;
457
458        let mut task = task_ref.value().lock().await;
459
460        let subtask = task
461            .subtasks
462            .iter_mut()
463            .find(|s| s.id == *subtask_id)
464            .ok_or_else(|| {
465                PunchError::Troop(format!("subtask {} not found in task {}", subtask_id, task_id))
466            })?;
467
468        // Only reassign if the subtask has failed.
469        if !matches!(subtask.status, SubtaskStatus::Failed(_)) {
470            return Err(PunchError::Troop(
471                "can only reassign failed subtasks".to_string(),
472            ));
473        }
474
475        let failed_fighter = subtask.assigned_to;
476
477        // Find a different healthy fighter.
478        let new_fighter = self
479            .fighter_loads
480            .iter()
481            .filter(|entry| entry.value().healthy)
482            .filter(|entry| Some(entry.value().fighter_id) != failed_fighter)
483            .min_by_key(|entry| entry.value().active_tasks)
484            .map(|entry| entry.value().fighter_id);
485
486        if let Some(fighter_id) = new_fighter {
487            subtask.assigned_to = Some(fighter_id);
488            subtask.status = SubtaskStatus::Running;
489
490            if let Some(mut load) = self.fighter_loads.get_mut(&fighter_id) {
491                load.active_tasks += 1;
492            }
493
494            info!(
495                %task_id,
496                %subtask_id,
497                new_fighter = %fighter_id,
498                "subtask reassigned after failure"
499            );
500        }
501
502        Ok(new_fighter)
503    }
504
505    /// Detect stale or failed fighters and reassign their work.
506    ///
507    /// Returns a list of fighter IDs that were detected as stale/failed.
508    pub fn detect_stale_fighters(&self) -> Vec<FighterId> {
509        let now = Utc::now().timestamp();
510        let mut stale = Vec::new();
511
512        for entry in self.fighter_loads.iter() {
513            let load = entry.value();
514            if load.healthy
515                && load.active_tasks > 0
516                && (now - load.last_heartbeat) > STALE_THRESHOLD_SECS
517            {
518                stale.push(load.fighter_id);
519            }
520        }
521
522        for fighter_id in &stale {
523            self.mark_unhealthy(fighter_id);
524            warn!(
525                %fighter_id,
526                "fighter detected as stale, marked unhealthy"
527            );
528        }
529
530        stale
531    }
532
533    /// Mark a fighter as unhealthy (will not receive new assignments).
534    pub fn mark_unhealthy(&self, fighter_id: &FighterId) {
535        if let Some(mut load) = self.fighter_loads.get_mut(fighter_id) {
536            load.healthy = false;
537            warn!(%fighter_id, "fighter marked unhealthy");
538        }
539    }
540
541    /// Mark a fighter as healthy again.
542    pub fn mark_healthy(&self, fighter_id: &FighterId) {
543        if let Some(mut load) = self.fighter_loads.get_mut(fighter_id) {
544            load.healthy = true;
545            load.last_heartbeat = Utc::now().timestamp();
546            info!(%fighter_id, "fighter marked healthy");
547        }
548    }
549
550    /// Get the current state of a swarm task.
551    pub async fn get_task(&self, task_id: &Uuid) -> Option<SwarmTask> {
552        let task_ref = self.tasks.get(task_id)?;
553        let task = task_ref.value().lock().await;
554        Some(task.clone())
555    }
556
557    /// Get the progress of a swarm task (0.0 to 1.0).
558    pub async fn get_progress(&self, task_id: &Uuid) -> Option<f64> {
559        let task_ref = self.tasks.get(task_id)?;
560        let task = task_ref.value().lock().await;
561        Some(task.progress)
562    }
563
564    /// Get a detailed progress report for a swarm task.
565    pub async fn get_progress_report(&self, task_id: &Uuid) -> Option<ProgressReport> {
566        let task_ref = self.tasks.get(task_id)?;
567        let task = task_ref.value().lock().await;
568
569        let total = task.subtasks.len();
570        let completed = task
571            .subtasks
572            .iter()
573            .filter(|s| s.status == SubtaskStatus::Completed)
574            .count();
575        let running = task
576            .subtasks
577            .iter()
578            .filter(|s| s.status == SubtaskStatus::Running)
579            .count();
580        let failed = task
581            .subtasks
582            .iter()
583            .filter(|s| matches!(s.status, SubtaskStatus::Failed(_)))
584            .count();
585        let pending = task
586            .subtasks
587            .iter()
588            .filter(|s| s.status == SubtaskStatus::Pending)
589            .count();
590
591        Some(ProgressReport {
592            task_id: *task_id,
593            total_subtasks: total,
594            completed,
595            running,
596            failed,
597            pending,
598            percentage: if total > 0 {
599                (completed as f64 / total as f64) * 100.0
600            } else {
601                0.0
602            },
603        })
604    }
605
606    /// List all active swarm task IDs.
607    pub fn list_task_ids(&self) -> Vec<Uuid> {
608        self.tasks.iter().map(|entry| *entry.key()).collect()
609    }
610
611    /// Get the number of available healthy fighters.
612    pub fn available_fighter_count(&self) -> usize {
613        self.fighter_loads
614            .iter()
615            .filter(|entry| entry.value().healthy)
616            .count()
617    }
618
619    /// Get load information for all fighters.
620    pub fn get_fighter_loads(&self) -> Vec<FighterLoad> {
621        self.fighter_loads
622            .iter()
623            .map(|entry| entry.value().clone())
624            .collect()
625    }
626
627    /// Remove a completed or failed swarm task.
628    pub fn remove_task(&self, task_id: &Uuid) -> bool {
629        self.tasks.remove(task_id).is_some()
630    }
631}
632
633/// Detailed progress report for a swarm task.
634#[derive(Debug, Clone)]
635pub struct ProgressReport {
636    /// The task ID.
637    pub task_id: Uuid,
638    /// Total number of subtasks.
639    pub total_subtasks: usize,
640    /// Number of completed subtasks.
641    pub completed: usize,
642    /// Number of currently running subtasks.
643    pub running: usize,
644    /// Number of failed subtasks.
645    pub failed: usize,
646    /// Number of pending subtasks.
647    pub pending: usize,
648    /// Completion percentage (0.0 to 100.0).
649    pub percentage: f64,
650}
651
652impl Default for SwarmCoordinator {
653    fn default() -> Self {
654        Self::new()
655    }
656}
657
658#[cfg(test)]
659mod tests {
660    use super::*;
661
662    #[test]
663    fn test_decompose_single_task() {
664        let coord = SwarmCoordinator::new();
665        let subtasks = coord.decompose_task("single task");
666        assert_eq!(subtasks.len(), 1);
667        assert_eq!(subtasks[0].description, "single task");
668    }
669
670    #[test]
671    fn test_decompose_multi_line_task() {
672        let coord = SwarmCoordinator::new();
673        let subtasks = coord.decompose_task("step 1\nstep 2\nstep 3");
674        assert_eq!(subtasks.len(), 3);
675        assert_eq!(subtasks[0].description, "step 1");
676        assert_eq!(subtasks[1].description, "step 2");
677        assert_eq!(subtasks[2].description, "step 3");
678    }
679
680    #[test]
681    fn test_decompose_ignores_blank_lines() {
682        let coord = SwarmCoordinator::new();
683        let subtasks = coord.decompose_task("step 1\n\n\nstep 2\n");
684        // Double newline splits into paragraphs.
685        assert_eq!(subtasks.len(), 2);
686    }
687
688    #[test]
689    fn test_decompose_by_paragraphs() {
690        let coord = SwarmCoordinator::new();
691        let input = "First paragraph about setup.\n\nSecond paragraph about execution.\n\nThird paragraph about cleanup.";
692        let subtasks = coord.decompose_task(input);
693        assert_eq!(subtasks.len(), 3);
694        assert!(subtasks[0].description.contains("setup"));
695        assert!(subtasks[1].description.contains("execution"));
696        assert!(subtasks[2].description.contains("cleanup"));
697    }
698
699    #[test]
700    fn test_decompose_by_sentences() {
701        let coord = SwarmCoordinator::new();
702        let input = "Analyze the code. Fix bugs. Write tests";
703        let subtasks = coord.decompose_task(input);
704        assert_eq!(subtasks.len(), 3);
705    }
706
707    #[test]
708    fn test_create_task() {
709        let coord = SwarmCoordinator::new();
710        let id = coord.create_task("test task".to_string());
711        assert!(coord.tasks.contains_key(&id));
712    }
713
714    #[test]
715    fn test_create_task_with_subtasks() {
716        let coord = SwarmCoordinator::new();
717        let subtasks = vec![
718            SwarmSubtask {
719                id: Uuid::new_v4(),
720                description: "sub1".to_string(),
721                assigned_to: None,
722                status: SubtaskStatus::Pending,
723                result: None,
724                depends_on: vec![],
725            },
726            SwarmSubtask {
727                id: Uuid::new_v4(),
728                description: "sub2".to_string(),
729                assigned_to: None,
730                status: SubtaskStatus::Pending,
731                result: None,
732                depends_on: vec![],
733            },
734        ];
735        let id = coord.create_task_with_subtasks("parent".to_string(), subtasks);
736        assert!(coord.tasks.contains_key(&id));
737    }
738
739    #[test]
740    fn test_register_and_unregister_fighter() {
741        let coord = SwarmCoordinator::new();
742        let f = FighterId::new();
743        coord.register_fighter(f);
744        assert_eq!(coord.available_fighter_count(), 1);
745        coord.unregister_fighter(&f);
746        assert_eq!(coord.available_fighter_count(), 0);
747    }
748
749    #[test]
750    fn test_register_fighter_with_capabilities() {
751        let coord = SwarmCoordinator::new();
752        let f = FighterId::new();
753        coord.register_fighter_with_capabilities(
754            f,
755            vec!["code".to_string(), "review".to_string()],
756        );
757
758        let load = coord.fighter_loads.get(&f).expect("should exist");
759        assert_eq!(load.capabilities.len(), 2);
760        assert!(load.capabilities.contains(&"code".to_string()));
761    }
762
763    #[test]
764    fn test_find_least_loaded_fighter() {
765        let coord = SwarmCoordinator::new();
766        let f1 = FighterId::new();
767        let f2 = FighterId::new();
768        coord.register_fighter(f1);
769        coord.register_fighter(f2);
770
771        // Give f1 some load.
772        if let Some(mut load) = coord.fighter_loads.get_mut(&f1) {
773            load.active_tasks = 5;
774        }
775
776        let least = coord.find_least_loaded_fighter();
777        assert_eq!(least, Some(f2));
778    }
779
780    #[test]
781    fn test_find_least_loaded_skips_unhealthy() {
782        let coord = SwarmCoordinator::new();
783        let f1 = FighterId::new();
784        let f2 = FighterId::new();
785        coord.register_fighter(f1);
786        coord.register_fighter(f2);
787
788        coord.mark_unhealthy(&f2);
789        let least = coord.find_least_loaded_fighter();
790        assert_eq!(least, Some(f1));
791    }
792
793    #[test]
794    fn test_mark_healthy_unhealthy() {
795        let coord = SwarmCoordinator::new();
796        let f = FighterId::new();
797        coord.register_fighter(f);
798        assert_eq!(coord.available_fighter_count(), 1);
799
800        coord.mark_unhealthy(&f);
801        assert_eq!(coord.available_fighter_count(), 0);
802
803        coord.mark_healthy(&f);
804        assert_eq!(coord.available_fighter_count(), 1);
805    }
806
807    #[tokio::test]
808    async fn test_assign_subtasks() {
809        let coord = SwarmCoordinator::new();
810        let f1 = FighterId::new();
811        let f2 = FighterId::new();
812        coord.register_fighter(f1);
813        coord.register_fighter(f2);
814
815        let task_id = coord.create_task("step 1\nstep 2".to_string());
816        let assignments = coord.assign_subtasks(&task_id).await.expect("should assign");
817        assert_eq!(assignments.len(), 2);
818    }
819
820    #[tokio::test]
821    async fn test_assign_subtasks_respects_dependencies() {
822        let coord = SwarmCoordinator::new();
823        let f = FighterId::new();
824        coord.register_fighter(f);
825
826        let dep_id = Uuid::new_v4();
827        let subtasks = vec![
828            SwarmSubtask {
829                id: dep_id,
830                description: "first".to_string(),
831                assigned_to: None,
832                status: SubtaskStatus::Pending,
833                result: None,
834                depends_on: vec![],
835            },
836            SwarmSubtask {
837                id: Uuid::new_v4(),
838                description: "second (depends on first)".to_string(),
839                assigned_to: None,
840                status: SubtaskStatus::Pending,
841                result: None,
842                depends_on: vec![dep_id],
843            },
844        ];
845        let task_id = coord.create_task_with_subtasks("pipeline".to_string(), subtasks);
846
847        let assignments = coord.assign_subtasks(&task_id).await.expect("should assign");
848        // Only the first subtask (no dependencies) should be assigned.
849        assert_eq!(assignments.len(), 1);
850        assert_eq!(assignments[0].0, dep_id);
851    }
852
853    #[tokio::test]
854    async fn test_complete_subtask() {
855        let coord = SwarmCoordinator::new();
856        let f = FighterId::new();
857        coord.register_fighter(f);
858
859        let task_id = coord.create_task("single task".to_string());
860        let assignments = coord.assign_subtasks(&task_id).await.expect("should assign");
861        assert_eq!(assignments.len(), 1);
862
863        let (subtask_id, _) = assignments[0];
864        coord
865            .complete_subtask(&task_id, &subtask_id, "done".to_string())
866            .await
867            .expect("should complete");
868
869        let task = coord.get_task(&task_id).await.expect("should exist");
870        assert!((task.progress - 1.0).abs() < f64::EPSILON);
871        assert!(task.aggregated_result.is_some());
872    }
873
874    #[tokio::test]
875    async fn test_fail_subtask() {
876        let coord = SwarmCoordinator::new();
877        let f = FighterId::new();
878        coord.register_fighter(f);
879
880        let task_id = coord.create_task("fail task".to_string());
881        let assignments = coord.assign_subtasks(&task_id).await.expect("should assign");
882        let (subtask_id, _) = assignments[0];
883
884        coord
885            .fail_subtask(&task_id, &subtask_id, "error occurred".to_string())
886            .await
887            .expect("should fail");
888
889        let task = coord.get_task(&task_id).await.expect("should exist");
890        assert!(matches!(task.subtasks[0].status, SubtaskStatus::Failed(_)));
891    }
892
893    #[tokio::test]
894    async fn test_get_progress() {
895        let coord = SwarmCoordinator::new();
896        let f = FighterId::new();
897        coord.register_fighter(f);
898
899        let task_id = coord.create_task("a\nb".to_string());
900        let assignments = coord.assign_subtasks(&task_id).await.expect("should assign");
901
902        // Complete first subtask.
903        coord
904            .complete_subtask(&task_id, &assignments[0].0, "result 1".to_string())
905            .await
906            .expect("should complete");
907
908        let progress = coord.get_progress(&task_id).await.expect("should exist");
909        assert!((progress - 0.5).abs() < f64::EPSILON);
910    }
911
912    #[tokio::test]
913    async fn test_progress_report() {
914        let coord = SwarmCoordinator::new();
915        let f = FighterId::new();
916        coord.register_fighter(f);
917
918        let task_id = coord.create_task("a\nb\nc".to_string());
919        let assignments = coord.assign_subtasks(&task_id).await.expect("should assign");
920
921        // Complete one subtask.
922        coord
923            .complete_subtask(&task_id, &assignments[0].0, "done".to_string())
924            .await
925            .expect("should complete");
926
927        let report = coord
928            .get_progress_report(&task_id)
929            .await
930            .expect("should exist");
931        assert_eq!(report.total_subtasks, 3);
932        assert_eq!(report.completed, 1);
933        assert_eq!(report.running, 2);
934        assert_eq!(report.pending, 0);
935        assert_eq!(report.failed, 0);
936        assert!((report.percentage - 33.33).abs() < 1.0);
937    }
938
939    #[tokio::test]
940    async fn test_load_balancing_distributes_evenly() {
941        let coord = SwarmCoordinator::new();
942        let f1 = FighterId::new();
943        let f2 = FighterId::new();
944        let f3 = FighterId::new();
945        coord.register_fighter(f1);
946        coord.register_fighter(f2);
947        coord.register_fighter(f3);
948
949        let task_id = coord.create_task("a\nb\nc\nd\ne\nf".to_string());
950        let assignments = coord.assign_subtasks(&task_id).await.expect("should assign");
951
952        assert_eq!(assignments.len(), 6);
953
954        // Count assignments per fighter.
955        let mut counts: std::collections::HashMap<FighterId, usize> =
956            std::collections::HashMap::new();
957        for (_, fighter) in &assignments {
958            *counts.entry(*fighter).or_insert(0) += 1;
959        }
960
961        // Each fighter should get 2 tasks.
962        for count in counts.values() {
963            assert_eq!(*count, 2);
964        }
965    }
966
967    #[tokio::test]
968    async fn test_reassign_failed_subtask() {
969        let coord = SwarmCoordinator::new();
970        let f1 = FighterId::new();
971        let f2 = FighterId::new();
972        coord.register_fighter(f1);
973        coord.register_fighter(f2);
974
975        let task_id = coord.create_task("single task".to_string());
976        let assignments = coord.assign_subtasks(&task_id).await.expect("should assign");
977        let (subtask_id, original_fighter) = assignments[0];
978
979        // Fail the subtask.
980        coord
981            .fail_subtask(&task_id, &subtask_id, "crashed".to_string())
982            .await
983            .expect("should fail");
984
985        // Reassign.
986        let new_fighter = coord
987            .reassign_failed_subtask(&task_id, &subtask_id)
988            .await
989            .expect("should reassign");
990
991        assert!(new_fighter.is_some());
992        let new_id = new_fighter.expect("should have new fighter");
993        assert_ne!(new_id, original_fighter);
994    }
995
996    #[tokio::test]
997    async fn test_detect_stale_fighters() {
998        let coord = SwarmCoordinator::new();
999        let f1 = FighterId::new();
1000        let f2 = FighterId::new();
1001        coord.register_fighter(f1);
1002        coord.register_fighter(f2);
1003
1004        // Simulate f1 being stale by setting old heartbeat and active tasks.
1005        if let Some(mut load) = coord.fighter_loads.get_mut(&f1) {
1006            load.active_tasks = 1;
1007            load.last_heartbeat = Utc::now().timestamp() - STALE_THRESHOLD_SECS - 10;
1008        }
1009
1010        let stale = coord.detect_stale_fighters();
1011        assert!(stale.contains(&f1));
1012        assert!(!stale.contains(&f2));
1013
1014        // f1 should now be unhealthy.
1015        let load = coord.fighter_loads.get(&f1).expect("should exist");
1016        assert!(!load.healthy);
1017    }
1018
1019    #[test]
1020    fn test_record_heartbeat() {
1021        let coord = SwarmCoordinator::new();
1022        let f = FighterId::new();
1023        coord.register_fighter(f);
1024
1025        // Set old heartbeat.
1026        if let Some(mut load) = coord.fighter_loads.get_mut(&f) {
1027            load.last_heartbeat = 0;
1028        }
1029
1030        coord.record_heartbeat(&f);
1031
1032        let load = coord.fighter_loads.get(&f).expect("should exist");
1033        assert!(load.last_heartbeat > 0);
1034    }
1035
1036    #[test]
1037    fn test_list_task_ids() {
1038        let coord = SwarmCoordinator::new();
1039        let id1 = coord.create_task("task 1".to_string());
1040        let id2 = coord.create_task("task 2".to_string());
1041
1042        let ids = coord.list_task_ids();
1043        assert_eq!(ids.len(), 2);
1044        assert!(ids.contains(&id1));
1045        assert!(ids.contains(&id2));
1046    }
1047
1048    #[test]
1049    fn test_remove_task() {
1050        let coord = SwarmCoordinator::new();
1051        let id = coord.create_task("temp".to_string());
1052        assert!(coord.remove_task(&id));
1053        assert!(!coord.remove_task(&id)); // Already removed.
1054    }
1055
1056    #[tokio::test]
1057    async fn test_complete_subtask_decrements_load() {
1058        let coord = SwarmCoordinator::new();
1059        let f = FighterId::new();
1060        coord.register_fighter(f);
1061
1062        let task_id = coord.create_task("work".to_string());
1063        coord.assign_subtasks(&task_id).await.expect("should assign");
1064
1065        // Verify load incremented.
1066        let load = coord.fighter_loads.get(&f).expect("should exist");
1067        assert_eq!(load.active_tasks, 1);
1068        drop(load);
1069
1070        // Get subtask id.
1071        let task = coord.get_task(&task_id).await.expect("should exist");
1072        let subtask_id = task.subtasks[0].id;
1073
1074        coord
1075            .complete_subtask(&task_id, &subtask_id, "done".to_string())
1076            .await
1077            .expect("should complete");
1078
1079        let load = coord.fighter_loads.get(&f).expect("should exist");
1080        assert_eq!(load.active_tasks, 0);
1081    }
1082
1083    #[tokio::test]
1084    async fn test_no_fighters_available() {
1085        let coord = SwarmCoordinator::new();
1086        let task_id = coord.create_task("lonely task".to_string());
1087        let assignments = coord.assign_subtasks(&task_id).await.expect("should assign");
1088        assert!(assignments.is_empty());
1089    }
1090
1091    #[tokio::test]
1092    async fn test_get_nonexistent_task() {
1093        let coord = SwarmCoordinator::new();
1094        let result = coord.get_task(&Uuid::new_v4()).await;
1095        assert!(result.is_none());
1096    }
1097
1098    #[tokio::test]
1099    async fn test_assign_nonexistent_task() {
1100        let coord = SwarmCoordinator::new();
1101        let result = coord.assign_subtasks(&Uuid::new_v4()).await;
1102        assert!(result.is_err());
1103    }
1104
1105    #[test]
1106    fn test_default_impl() {
1107        let coord = SwarmCoordinator::default();
1108        assert_eq!(coord.available_fighter_count(), 0);
1109    }
1110
1111    #[tokio::test]
1112    async fn test_aggregated_result_joins_all() {
1113        let coord = SwarmCoordinator::new();
1114        let f = FighterId::new();
1115        coord.register_fighter(f);
1116
1117        let task_id = coord.create_task("a\nb\nc".to_string());
1118        let assignments = coord.assign_subtasks(&task_id).await.expect("assign");
1119
1120        for (subtask_id, _) in &assignments {
1121            coord
1122                .complete_subtask(&task_id, subtask_id, format!("result-{}", subtask_id))
1123                .await
1124                .expect("complete");
1125        }
1126
1127        let task = coord.get_task(&task_id).await.expect("should exist");
1128        let agg = task.aggregated_result.expect("should be aggregated");
1129        assert_eq!(agg.matches("result-").count(), 3);
1130    }
1131
1132    #[test]
1133    fn test_get_fighter_loads() {
1134        let coord = SwarmCoordinator::new();
1135        let f1 = FighterId::new();
1136        let f2 = FighterId::new();
1137        coord.register_fighter(f1);
1138        coord.register_fighter(f2);
1139
1140        let loads = coord.get_fighter_loads();
1141        assert_eq!(loads.len(), 2);
1142    }
1143
1144    #[tokio::test]
1145    async fn test_capability_aware_assignment() {
1146        let coord = SwarmCoordinator::new();
1147        let coder = FighterId::new();
1148        let reviewer = FighterId::new();
1149        coord.register_fighter_with_capabilities(coder, vec!["code".to_string()]);
1150        coord.register_fighter_with_capabilities(reviewer, vec!["review".to_string()]);
1151
1152        // Create a task about code.
1153        let subtasks = vec![SwarmSubtask {
1154            id: Uuid::new_v4(),
1155            description: "fix the code bug".to_string(),
1156            assigned_to: None,
1157            status: SubtaskStatus::Pending,
1158            result: None,
1159            depends_on: vec![],
1160        }];
1161        let task_id = coord.create_task_with_subtasks("code task".to_string(), subtasks);
1162        let assignments = coord.assign_subtasks(&task_id).await.expect("should assign");
1163
1164        assert_eq!(assignments.len(), 1);
1165        assert_eq!(assignments[0].1, coder);
1166    }
1167}