Skip to main content

punch_kernel/
swarm.rs

1//! # Swarm Intelligence
2//!
3//! Higher-level coordination for emergent behavior across multiple fighters.
4//! The swarm coordinator manages complex tasks by decomposing them into subtasks,
5//! assigning them based on capabilities, and aggregating results.
6
7use std::sync::Arc;
8
9use chrono::Utc;
10use dashmap::DashMap;
11use tokio::sync::Mutex;
12use tracing::{info, warn};
13use uuid::Uuid;
14
15use punch_types::{FighterId, PunchError, PunchResult, SubtaskStatus, SwarmSubtask, SwarmTask};
16
17/// How long (in seconds) a running subtask can go without update before
18/// being considered stale.
19const STALE_THRESHOLD_SECS: i64 = 300;
20
21/// A record of fighter load for balancing purposes.
22#[derive(Debug, Clone)]
23pub struct FighterLoad {
24    /// The fighter's ID.
25    pub fighter_id: FighterId,
26    /// Number of currently assigned subtasks.
27    pub active_tasks: usize,
28    /// Whether the fighter is considered healthy.
29    pub healthy: bool,
30    /// Fighter capabilities for capability-aware assignment.
31    pub capabilities: Vec<String>,
32    /// Timestamp of last health check (epoch seconds).
33    pub last_heartbeat: i64,
34}
35
36/// The swarm coordinator manages concurrent swarm tasks across available fighters.
37pub struct SwarmCoordinator {
38    /// Active swarm tasks keyed by their UUID.
39    tasks: DashMap<Uuid, Arc<Mutex<SwarmTask>>>,
40    /// Fighter load tracking for balancing.
41    fighter_loads: DashMap<FighterId, FighterLoad>,
42}
43
44impl SwarmCoordinator {
45    /// Create a new swarm coordinator.
46    pub fn new() -> Self {
47        Self {
48            tasks: DashMap::new(),
49            fighter_loads: DashMap::new(),
50        }
51    }
52
53    /// Register a fighter as available for swarm work.
54    pub fn register_fighter(&self, fighter_id: FighterId) {
55        self.fighter_loads.insert(
56            fighter_id,
57            FighterLoad {
58                fighter_id,
59                active_tasks: 0,
60                healthy: true,
61                capabilities: vec![],
62                last_heartbeat: Utc::now().timestamp(),
63            },
64        );
65    }
66
67    /// Register a fighter with specific capabilities.
68    pub fn register_fighter_with_capabilities(
69        &self,
70        fighter_id: FighterId,
71        capabilities: Vec<String>,
72    ) {
73        self.fighter_loads.insert(
74            fighter_id,
75            FighterLoad {
76                fighter_id,
77                active_tasks: 0,
78                healthy: true,
79                capabilities,
80                last_heartbeat: Utc::now().timestamp(),
81            },
82        );
83    }
84
85    /// Remove a fighter from the available pool.
86    pub fn unregister_fighter(&self, fighter_id: &FighterId) {
87        self.fighter_loads.remove(fighter_id);
88    }
89
90    /// Record a heartbeat from a fighter to track liveness.
91    pub fn record_heartbeat(&self, fighter_id: &FighterId) {
92        if let Some(mut load) = self.fighter_loads.get_mut(fighter_id) {
93            load.last_heartbeat = Utc::now().timestamp();
94        }
95    }
96
97    /// Decompose a task description into subtasks.
98    ///
99    /// Uses intelligent splitting: tries paragraph breaks first, then
100    /// sentence boundaries, then falls back to line-based splitting.
101    pub fn decompose_task(&self, description: &str) -> Vec<SwarmSubtask> {
102        let trimmed = description.trim();
103        if trimmed.is_empty() {
104            return vec![SwarmSubtask {
105                id: Uuid::new_v4(),
106                description: description.to_string(),
107                assigned_to: None,
108                status: SubtaskStatus::Pending,
109                result: None,
110                depends_on: vec![],
111            }];
112        }
113
114        // Try splitting by paragraph breaks (double newlines) first.
115        let paragraphs: Vec<&str> = trimmed
116            .split("\n\n")
117            .map(|p| p.trim())
118            .filter(|p| !p.is_empty())
119            .collect();
120
121        if paragraphs.len() > 1 {
122            return paragraphs
123                .into_iter()
124                .map(|p| SwarmSubtask {
125                    id: Uuid::new_v4(),
126                    description: p.to_string(),
127                    assigned_to: None,
128                    status: SubtaskStatus::Pending,
129                    result: None,
130                    depends_on: vec![],
131                })
132                .collect();
133        }
134
135        // Try splitting by sentences (period followed by space or end).
136        let sentences: Vec<&str> = trimmed
137            .split(". ")
138            .map(|s| s.trim())
139            .filter(|s| !s.is_empty())
140            .collect();
141
142        if sentences.len() > 1 {
143            return sentences
144                .into_iter()
145                .map(|s| SwarmSubtask {
146                    id: Uuid::new_v4(),
147                    description: s.to_string(),
148                    assigned_to: None,
149                    status: SubtaskStatus::Pending,
150                    result: None,
151                    depends_on: vec![],
152                })
153                .collect();
154        }
155
156        // Fall back to line-based splitting.
157        let lines: Vec<&str> = trimmed
158            .lines()
159            .map(|l| l.trim())
160            .filter(|l| !l.is_empty())
161            .collect();
162
163        if lines.len() > 1 {
164            return lines
165                .into_iter()
166                .map(|line| SwarmSubtask {
167                    id: Uuid::new_v4(),
168                    description: line.to_string(),
169                    assigned_to: None,
170                    status: SubtaskStatus::Pending,
171                    result: None,
172                    depends_on: vec![],
173                })
174                .collect();
175        }
176
177        // Single subtask for atomic tasks.
178        vec![SwarmSubtask {
179            id: Uuid::new_v4(),
180            description: trimmed.to_string(),
181            assigned_to: None,
182            status: SubtaskStatus::Pending,
183            result: None,
184            depends_on: vec![],
185        }]
186    }
187
188    /// Create a new swarm task from a description.
189    ///
190    /// The task is automatically decomposed into subtasks.
191    pub fn create_task(&self, description: String) -> Uuid {
192        let subtasks = self.decompose_task(&description);
193        let task = SwarmTask {
194            id: Uuid::new_v4(),
195            description,
196            subtasks,
197            progress: 0.0,
198            created_at: Utc::now(),
199            aggregated_result: None,
200        };
201        let id = task.id;
202        self.tasks.insert(id, Arc::new(Mutex::new(task)));
203        info!(%id, "swarm task created");
204        id
205    }
206
207    /// Create a swarm task with explicit subtasks (for pipeline or dependent tasks).
208    pub fn create_task_with_subtasks(
209        &self,
210        description: String,
211        subtasks: Vec<SwarmSubtask>,
212    ) -> Uuid {
213        let task = SwarmTask {
214            id: Uuid::new_v4(),
215            description,
216            subtasks,
217            progress: 0.0,
218            created_at: Utc::now(),
219            aggregated_result: None,
220        };
221        let id = task.id;
222        self.tasks.insert(id, Arc::new(Mutex::new(task)));
223        info!(%id, "swarm task created with explicit subtasks");
224        id
225    }
226
227    /// Assign pending subtasks to available fighters using load balancing
228    /// and capability matching.
229    ///
230    /// Returns a list of (subtask_id, fighter_id) assignments.
231    pub async fn assign_subtasks(&self, task_id: &Uuid) -> PunchResult<Vec<(Uuid, FighterId)>> {
232        let task_ref = self
233            .tasks
234            .get(task_id)
235            .ok_or_else(|| PunchError::Troop(format!("swarm task {} not found", task_id)))?;
236
237        let mut task = task_ref.value().lock().await;
238        let mut assignments = Vec::new();
239
240        // First pass: determine which subtasks are eligible for assignment.
241        let eligible_indices: Vec<usize> = {
242            let subtasks = &task.subtasks;
243            subtasks
244                .iter()
245                .enumerate()
246                .filter(|(_, s)| s.status == SubtaskStatus::Pending)
247                .filter(|(_, s)| {
248                    s.depends_on.iter().all(|dep_id| {
249                        subtasks
250                            .iter()
251                            .any(|d| d.id == *dep_id && d.status == SubtaskStatus::Completed)
252                    })
253                })
254                .map(|(i, _)| i)
255                .collect()
256        };
257
258        // Second pass: assign eligible subtasks considering capabilities.
259        for idx in eligible_indices {
260            let subtask_desc = &task.subtasks[idx].description;
261            let fighter = self.find_best_fighter_for_task(subtask_desc);
262
263            if let Some(fighter_id) = fighter {
264                let subtask = &mut task.subtasks[idx];
265                subtask.assigned_to = Some(fighter_id);
266                subtask.status = SubtaskStatus::Running;
267
268                if let Some(mut load) = self.fighter_loads.get_mut(&fighter_id) {
269                    load.active_tasks += 1;
270                }
271
272                assignments.push((subtask.id, fighter_id));
273                info!(
274                    task_id = %task_id,
275                    subtask_id = %subtask.id,
276                    fighter_id = %fighter_id,
277                    "subtask assigned"
278                );
279            }
280        }
281
282        Ok(assignments)
283    }
284
285    /// Find the best fighter for a given task, considering both load and
286    /// capability matching.
287    fn find_best_fighter_for_task(&self, task_description: &str) -> Option<FighterId> {
288        let task_lower = task_description.to_lowercase();
289
290        // First try to find a capable fighter with the lowest load.
291        let capable_fighter = self
292            .fighter_loads
293            .iter()
294            .filter(|entry| entry.value().healthy)
295            .filter(|entry| {
296                // Either has matching capability or has no capabilities (generalist).
297                entry.value().capabilities.is_empty()
298                    || entry
299                        .value()
300                        .capabilities
301                        .iter()
302                        .any(|cap| task_lower.contains(&cap.to_lowercase()))
303            })
304            .min_by_key(|entry| entry.value().active_tasks)
305            .map(|entry| entry.value().fighter_id);
306
307        if capable_fighter.is_some() {
308            return capable_fighter;
309        }
310
311        // Fall back to any healthy fighter with least load.
312        self.find_least_loaded_fighter()
313    }
314
315    /// Find the fighter with the lowest active task count.
316    fn find_least_loaded_fighter(&self) -> Option<FighterId> {
317        self.fighter_loads
318            .iter()
319            .filter(|entry| entry.value().healthy)
320            .min_by_key(|entry| entry.value().active_tasks)
321            .map(|entry| entry.value().fighter_id)
322    }
323
324    /// Report the completion of a subtask.
325    pub async fn complete_subtask(
326        &self,
327        task_id: &Uuid,
328        subtask_id: &Uuid,
329        result: String,
330    ) -> PunchResult<()> {
331        let task_ref = self
332            .tasks
333            .get(task_id)
334            .ok_or_else(|| PunchError::Troop(format!("swarm task {} not found", task_id)))?;
335
336        let mut task = task_ref.value().lock().await;
337
338        let subtask = task
339            .subtasks
340            .iter_mut()
341            .find(|s| s.id == *subtask_id)
342            .ok_or_else(|| {
343                PunchError::Troop(format!(
344                    "subtask {} not found in task {}",
345                    subtask_id, task_id
346                ))
347            })?;
348
349        // Decrement load for the assigned fighter.
350        if let Some(fighter_id) = subtask.assigned_to
351            && let Some(mut load) = self.fighter_loads.get_mut(&fighter_id)
352        {
353            load.active_tasks = load.active_tasks.saturating_sub(1);
354        }
355
356        subtask.status = SubtaskStatus::Completed;
357        subtask.result = Some(result);
358
359        // Update overall progress.
360        self.update_progress(&mut task);
361
362        // If all subtasks are done, aggregate results intelligently.
363        if task.progress >= 1.0 {
364            task.aggregated_result = Some(self.aggregate_results(&task.subtasks));
365            info!(%task_id, "swarm task fully completed");
366        }
367
368        Ok(())
369    }
370
371    /// Update the progress field of a task based on subtask completion.
372    fn update_progress(&self, task: &mut SwarmTask) {
373        let total = task.subtasks.len() as f64;
374        let completed = task
375            .subtasks
376            .iter()
377            .filter(|s| s.status == SubtaskStatus::Completed)
378            .count() as f64;
379        task.progress = if total > 0.0 { completed / total } else { 0.0 };
380    }
381
382    /// Aggregate results from completed subtasks intelligently.
383    ///
384    /// Merges results preserving order and removing redundancy.
385    fn aggregate_results(&self, subtasks: &[SwarmSubtask]) -> String {
386        let results: Vec<&str> = subtasks
387            .iter()
388            .filter_map(|s| s.result.as_deref())
389            .collect();
390
391        if results.is_empty() {
392            return String::new();
393        }
394
395        // If results are short, join with double newlines.
396        // If long, add headers for each section.
397        let total_len: usize = results.iter().map(|r| r.len()).sum();
398        if total_len < 500 || results.len() <= 2 {
399            results.join("\n\n")
400        } else {
401            results
402                .iter()
403                .enumerate()
404                .map(|(i, r)| format!("--- Section {} ---\n{}", i + 1, r))
405                .collect::<Vec<_>>()
406                .join("\n\n")
407        }
408    }
409
410    /// Report the failure of a subtask.
411    pub async fn fail_subtask(
412        &self,
413        task_id: &Uuid,
414        subtask_id: &Uuid,
415        error: String,
416    ) -> PunchResult<()> {
417        let task_ref = self
418            .tasks
419            .get(task_id)
420            .ok_or_else(|| PunchError::Troop(format!("swarm task {} not found", task_id)))?;
421
422        let mut task = task_ref.value().lock().await;
423
424        let subtask = task
425            .subtasks
426            .iter_mut()
427            .find(|s| s.id == *subtask_id)
428            .ok_or_else(|| {
429                PunchError::Troop(format!(
430                    "subtask {} not found in task {}",
431                    subtask_id, task_id
432                ))
433            })?;
434
435        // Decrement load for the assigned fighter.
436        if let Some(fighter_id) = subtask.assigned_to
437            && let Some(mut load) = self.fighter_loads.get_mut(&fighter_id)
438        {
439            load.active_tasks = load.active_tasks.saturating_sub(1);
440        }
441
442        subtask.status = SubtaskStatus::Failed(error);
443        warn!(%task_id, %subtask_id, "subtask failed");
444
445        Ok(())
446    }
447
448    /// Reassign a failed subtask to a different fighter.
449    pub async fn reassign_failed_subtask(
450        &self,
451        task_id: &Uuid,
452        subtask_id: &Uuid,
453    ) -> PunchResult<Option<FighterId>> {
454        let task_ref = self
455            .tasks
456            .get(task_id)
457            .ok_or_else(|| PunchError::Troop(format!("swarm task {} not found", task_id)))?;
458
459        let mut task = task_ref.value().lock().await;
460
461        let subtask = task
462            .subtasks
463            .iter_mut()
464            .find(|s| s.id == *subtask_id)
465            .ok_or_else(|| {
466                PunchError::Troop(format!(
467                    "subtask {} not found in task {}",
468                    subtask_id, task_id
469                ))
470            })?;
471
472        // Only reassign if the subtask has failed.
473        if !matches!(subtask.status, SubtaskStatus::Failed(_)) {
474            return Err(PunchError::Troop(
475                "can only reassign failed subtasks".to_string(),
476            ));
477        }
478
479        let failed_fighter = subtask.assigned_to;
480
481        // Find a different healthy fighter.
482        let new_fighter = self
483            .fighter_loads
484            .iter()
485            .filter(|entry| entry.value().healthy)
486            .filter(|entry| Some(entry.value().fighter_id) != failed_fighter)
487            .min_by_key(|entry| entry.value().active_tasks)
488            .map(|entry| entry.value().fighter_id);
489
490        if let Some(fighter_id) = new_fighter {
491            subtask.assigned_to = Some(fighter_id);
492            subtask.status = SubtaskStatus::Running;
493
494            if let Some(mut load) = self.fighter_loads.get_mut(&fighter_id) {
495                load.active_tasks += 1;
496            }
497
498            info!(
499                %task_id,
500                %subtask_id,
501                new_fighter = %fighter_id,
502                "subtask reassigned after failure"
503            );
504        }
505
506        Ok(new_fighter)
507    }
508
509    /// Detect stale or failed fighters and reassign their work.
510    ///
511    /// Returns a list of fighter IDs that were detected as stale/failed.
512    pub fn detect_stale_fighters(&self) -> Vec<FighterId> {
513        let now = Utc::now().timestamp();
514        let mut stale = Vec::new();
515
516        for entry in self.fighter_loads.iter() {
517            let load = entry.value();
518            if load.healthy
519                && load.active_tasks > 0
520                && (now - load.last_heartbeat) > STALE_THRESHOLD_SECS
521            {
522                stale.push(load.fighter_id);
523            }
524        }
525
526        for fighter_id in &stale {
527            self.mark_unhealthy(fighter_id);
528            warn!(
529                %fighter_id,
530                "fighter detected as stale, marked unhealthy"
531            );
532        }
533
534        stale
535    }
536
537    /// Mark a fighter as unhealthy (will not receive new assignments).
538    pub fn mark_unhealthy(&self, fighter_id: &FighterId) {
539        if let Some(mut load) = self.fighter_loads.get_mut(fighter_id) {
540            load.healthy = false;
541            warn!(%fighter_id, "fighter marked unhealthy");
542        }
543    }
544
545    /// Mark a fighter as healthy again.
546    pub fn mark_healthy(&self, fighter_id: &FighterId) {
547        if let Some(mut load) = self.fighter_loads.get_mut(fighter_id) {
548            load.healthy = true;
549            load.last_heartbeat = Utc::now().timestamp();
550            info!(%fighter_id, "fighter marked healthy");
551        }
552    }
553
554    /// Get the current state of a swarm task.
555    pub async fn get_task(&self, task_id: &Uuid) -> Option<SwarmTask> {
556        let task_ref = self.tasks.get(task_id)?;
557        let task = task_ref.value().lock().await;
558        Some(task.clone())
559    }
560
561    /// Get the progress of a swarm task (0.0 to 1.0).
562    pub async fn get_progress(&self, task_id: &Uuid) -> Option<f64> {
563        let task_ref = self.tasks.get(task_id)?;
564        let task = task_ref.value().lock().await;
565        Some(task.progress)
566    }
567
568    /// Get a detailed progress report for a swarm task.
569    pub async fn get_progress_report(&self, task_id: &Uuid) -> Option<ProgressReport> {
570        let task_ref = self.tasks.get(task_id)?;
571        let task = task_ref.value().lock().await;
572
573        let total = task.subtasks.len();
574        let completed = task
575            .subtasks
576            .iter()
577            .filter(|s| s.status == SubtaskStatus::Completed)
578            .count();
579        let running = task
580            .subtasks
581            .iter()
582            .filter(|s| s.status == SubtaskStatus::Running)
583            .count();
584        let failed = task
585            .subtasks
586            .iter()
587            .filter(|s| matches!(s.status, SubtaskStatus::Failed(_)))
588            .count();
589        let pending = task
590            .subtasks
591            .iter()
592            .filter(|s| s.status == SubtaskStatus::Pending)
593            .count();
594
595        Some(ProgressReport {
596            task_id: *task_id,
597            total_subtasks: total,
598            completed,
599            running,
600            failed,
601            pending,
602            percentage: if total > 0 {
603                (completed as f64 / total as f64) * 100.0
604            } else {
605                0.0
606            },
607        })
608    }
609
610    /// List all active swarm task IDs.
611    pub fn list_task_ids(&self) -> Vec<Uuid> {
612        self.tasks.iter().map(|entry| *entry.key()).collect()
613    }
614
615    /// Get the number of available healthy fighters.
616    pub fn available_fighter_count(&self) -> usize {
617        self.fighter_loads
618            .iter()
619            .filter(|entry| entry.value().healthy)
620            .count()
621    }
622
623    /// Get load information for all fighters.
624    pub fn get_fighter_loads(&self) -> Vec<FighterLoad> {
625        self.fighter_loads
626            .iter()
627            .map(|entry| entry.value().clone())
628            .collect()
629    }
630
631    /// Remove a completed or failed swarm task.
632    pub fn remove_task(&self, task_id: &Uuid) -> bool {
633        self.tasks.remove(task_id).is_some()
634    }
635}
636
637/// Detailed progress report for a swarm task.
638#[derive(Debug, Clone)]
639pub struct ProgressReport {
640    /// The task ID.
641    pub task_id: Uuid,
642    /// Total number of subtasks.
643    pub total_subtasks: usize,
644    /// Number of completed subtasks.
645    pub completed: usize,
646    /// Number of currently running subtasks.
647    pub running: usize,
648    /// Number of failed subtasks.
649    pub failed: usize,
650    /// Number of pending subtasks.
651    pub pending: usize,
652    /// Completion percentage (0.0 to 100.0).
653    pub percentage: f64,
654}
655
656impl Default for SwarmCoordinator {
657    fn default() -> Self {
658        Self::new()
659    }
660}
661
662#[cfg(test)]
663mod tests {
664    use super::*;
665
666    #[test]
667    fn test_decompose_single_task() {
668        let coord = SwarmCoordinator::new();
669        let subtasks = coord.decompose_task("single task");
670        assert_eq!(subtasks.len(), 1);
671        assert_eq!(subtasks[0].description, "single task");
672    }
673
674    #[test]
675    fn test_decompose_multi_line_task() {
676        let coord = SwarmCoordinator::new();
677        let subtasks = coord.decompose_task("step 1\nstep 2\nstep 3");
678        assert_eq!(subtasks.len(), 3);
679        assert_eq!(subtasks[0].description, "step 1");
680        assert_eq!(subtasks[1].description, "step 2");
681        assert_eq!(subtasks[2].description, "step 3");
682    }
683
684    #[test]
685    fn test_decompose_ignores_blank_lines() {
686        let coord = SwarmCoordinator::new();
687        let subtasks = coord.decompose_task("step 1\n\n\nstep 2\n");
688        // Double newline splits into paragraphs.
689        assert_eq!(subtasks.len(), 2);
690    }
691
692    #[test]
693    fn test_decompose_by_paragraphs() {
694        let coord = SwarmCoordinator::new();
695        let input = "First paragraph about setup.\n\nSecond paragraph about execution.\n\nThird paragraph about cleanup.";
696        let subtasks = coord.decompose_task(input);
697        assert_eq!(subtasks.len(), 3);
698        assert!(subtasks[0].description.contains("setup"));
699        assert!(subtasks[1].description.contains("execution"));
700        assert!(subtasks[2].description.contains("cleanup"));
701    }
702
703    #[test]
704    fn test_decompose_by_sentences() {
705        let coord = SwarmCoordinator::new();
706        let input = "Analyze the code. Fix bugs. Write tests";
707        let subtasks = coord.decompose_task(input);
708        assert_eq!(subtasks.len(), 3);
709    }
710
711    #[test]
712    fn test_create_task() {
713        let coord = SwarmCoordinator::new();
714        let id = coord.create_task("test task".to_string());
715        assert!(coord.tasks.contains_key(&id));
716    }
717
718    #[test]
719    fn test_create_task_with_subtasks() {
720        let coord = SwarmCoordinator::new();
721        let subtasks = vec![
722            SwarmSubtask {
723                id: Uuid::new_v4(),
724                description: "sub1".to_string(),
725                assigned_to: None,
726                status: SubtaskStatus::Pending,
727                result: None,
728                depends_on: vec![],
729            },
730            SwarmSubtask {
731                id: Uuid::new_v4(),
732                description: "sub2".to_string(),
733                assigned_to: None,
734                status: SubtaskStatus::Pending,
735                result: None,
736                depends_on: vec![],
737            },
738        ];
739        let id = coord.create_task_with_subtasks("parent".to_string(), subtasks);
740        assert!(coord.tasks.contains_key(&id));
741    }
742
743    #[test]
744    fn test_register_and_unregister_fighter() {
745        let coord = SwarmCoordinator::new();
746        let f = FighterId::new();
747        coord.register_fighter(f);
748        assert_eq!(coord.available_fighter_count(), 1);
749        coord.unregister_fighter(&f);
750        assert_eq!(coord.available_fighter_count(), 0);
751    }
752
753    #[test]
754    fn test_register_fighter_with_capabilities() {
755        let coord = SwarmCoordinator::new();
756        let f = FighterId::new();
757        coord.register_fighter_with_capabilities(f, vec!["code".to_string(), "review".to_string()]);
758
759        let load = coord.fighter_loads.get(&f).expect("should exist");
760        assert_eq!(load.capabilities.len(), 2);
761        assert!(load.capabilities.contains(&"code".to_string()));
762    }
763
764    #[test]
765    fn test_find_least_loaded_fighter() {
766        let coord = SwarmCoordinator::new();
767        let f1 = FighterId::new();
768        let f2 = FighterId::new();
769        coord.register_fighter(f1);
770        coord.register_fighter(f2);
771
772        // Give f1 some load.
773        if let Some(mut load) = coord.fighter_loads.get_mut(&f1) {
774            load.active_tasks = 5;
775        }
776
777        let least = coord.find_least_loaded_fighter();
778        assert_eq!(least, Some(f2));
779    }
780
781    #[test]
782    fn test_find_least_loaded_skips_unhealthy() {
783        let coord = SwarmCoordinator::new();
784        let f1 = FighterId::new();
785        let f2 = FighterId::new();
786        coord.register_fighter(f1);
787        coord.register_fighter(f2);
788
789        coord.mark_unhealthy(&f2);
790        let least = coord.find_least_loaded_fighter();
791        assert_eq!(least, Some(f1));
792    }
793
794    #[test]
795    fn test_mark_healthy_unhealthy() {
796        let coord = SwarmCoordinator::new();
797        let f = FighterId::new();
798        coord.register_fighter(f);
799        assert_eq!(coord.available_fighter_count(), 1);
800
801        coord.mark_unhealthy(&f);
802        assert_eq!(coord.available_fighter_count(), 0);
803
804        coord.mark_healthy(&f);
805        assert_eq!(coord.available_fighter_count(), 1);
806    }
807
808    #[tokio::test]
809    async fn test_assign_subtasks() {
810        let coord = SwarmCoordinator::new();
811        let f1 = FighterId::new();
812        let f2 = FighterId::new();
813        coord.register_fighter(f1);
814        coord.register_fighter(f2);
815
816        let task_id = coord.create_task("step 1\nstep 2".to_string());
817        let assignments = coord
818            .assign_subtasks(&task_id)
819            .await
820            .expect("should assign");
821        assert_eq!(assignments.len(), 2);
822    }
823
824    #[tokio::test]
825    async fn test_assign_subtasks_respects_dependencies() {
826        let coord = SwarmCoordinator::new();
827        let f = FighterId::new();
828        coord.register_fighter(f);
829
830        let dep_id = Uuid::new_v4();
831        let subtasks = vec![
832            SwarmSubtask {
833                id: dep_id,
834                description: "first".to_string(),
835                assigned_to: None,
836                status: SubtaskStatus::Pending,
837                result: None,
838                depends_on: vec![],
839            },
840            SwarmSubtask {
841                id: Uuid::new_v4(),
842                description: "second (depends on first)".to_string(),
843                assigned_to: None,
844                status: SubtaskStatus::Pending,
845                result: None,
846                depends_on: vec![dep_id],
847            },
848        ];
849        let task_id = coord.create_task_with_subtasks("pipeline".to_string(), subtasks);
850
851        let assignments = coord
852            .assign_subtasks(&task_id)
853            .await
854            .expect("should assign");
855        // Only the first subtask (no dependencies) should be assigned.
856        assert_eq!(assignments.len(), 1);
857        assert_eq!(assignments[0].0, dep_id);
858    }
859
860    #[tokio::test]
861    async fn test_complete_subtask() {
862        let coord = SwarmCoordinator::new();
863        let f = FighterId::new();
864        coord.register_fighter(f);
865
866        let task_id = coord.create_task("single task".to_string());
867        let assignments = coord
868            .assign_subtasks(&task_id)
869            .await
870            .expect("should assign");
871        assert_eq!(assignments.len(), 1);
872
873        let (subtask_id, _) = assignments[0];
874        coord
875            .complete_subtask(&task_id, &subtask_id, "done".to_string())
876            .await
877            .expect("should complete");
878
879        let task = coord.get_task(&task_id).await.expect("should exist");
880        assert!((task.progress - 1.0).abs() < f64::EPSILON);
881        assert!(task.aggregated_result.is_some());
882    }
883
884    #[tokio::test]
885    async fn test_fail_subtask() {
886        let coord = SwarmCoordinator::new();
887        let f = FighterId::new();
888        coord.register_fighter(f);
889
890        let task_id = coord.create_task("fail task".to_string());
891        let assignments = coord
892            .assign_subtasks(&task_id)
893            .await
894            .expect("should assign");
895        let (subtask_id, _) = assignments[0];
896
897        coord
898            .fail_subtask(&task_id, &subtask_id, "error occurred".to_string())
899            .await
900            .expect("should fail");
901
902        let task = coord.get_task(&task_id).await.expect("should exist");
903        assert!(matches!(task.subtasks[0].status, SubtaskStatus::Failed(_)));
904    }
905
906    #[tokio::test]
907    async fn test_get_progress() {
908        let coord = SwarmCoordinator::new();
909        let f = FighterId::new();
910        coord.register_fighter(f);
911
912        let task_id = coord.create_task("a\nb".to_string());
913        let assignments = coord
914            .assign_subtasks(&task_id)
915            .await
916            .expect("should assign");
917
918        // Complete first subtask.
919        coord
920            .complete_subtask(&task_id, &assignments[0].0, "result 1".to_string())
921            .await
922            .expect("should complete");
923
924        let progress = coord.get_progress(&task_id).await.expect("should exist");
925        assert!((progress - 0.5).abs() < f64::EPSILON);
926    }
927
928    #[tokio::test]
929    async fn test_progress_report() {
930        let coord = SwarmCoordinator::new();
931        let f = FighterId::new();
932        coord.register_fighter(f);
933
934        let task_id = coord.create_task("a\nb\nc".to_string());
935        let assignments = coord
936            .assign_subtasks(&task_id)
937            .await
938            .expect("should assign");
939
940        // Complete one subtask.
941        coord
942            .complete_subtask(&task_id, &assignments[0].0, "done".to_string())
943            .await
944            .expect("should complete");
945
946        let report = coord
947            .get_progress_report(&task_id)
948            .await
949            .expect("should exist");
950        assert_eq!(report.total_subtasks, 3);
951        assert_eq!(report.completed, 1);
952        assert_eq!(report.running, 2);
953        assert_eq!(report.pending, 0);
954        assert_eq!(report.failed, 0);
955        assert!((report.percentage - 33.33).abs() < 1.0);
956    }
957
958    #[tokio::test]
959    async fn test_load_balancing_distributes_evenly() {
960        let coord = SwarmCoordinator::new();
961        let f1 = FighterId::new();
962        let f2 = FighterId::new();
963        let f3 = FighterId::new();
964        coord.register_fighter(f1);
965        coord.register_fighter(f2);
966        coord.register_fighter(f3);
967
968        let task_id = coord.create_task("a\nb\nc\nd\ne\nf".to_string());
969        let assignments = coord
970            .assign_subtasks(&task_id)
971            .await
972            .expect("should assign");
973
974        assert_eq!(assignments.len(), 6);
975
976        // Count assignments per fighter.
977        let mut counts: std::collections::HashMap<FighterId, usize> =
978            std::collections::HashMap::new();
979        for (_, fighter) in &assignments {
980            *counts.entry(*fighter).or_insert(0) += 1;
981        }
982
983        // Each fighter should get 2 tasks.
984        for count in counts.values() {
985            assert_eq!(*count, 2);
986        }
987    }
988
989    #[tokio::test]
990    async fn test_reassign_failed_subtask() {
991        let coord = SwarmCoordinator::new();
992        let f1 = FighterId::new();
993        let f2 = FighterId::new();
994        coord.register_fighter(f1);
995        coord.register_fighter(f2);
996
997        let task_id = coord.create_task("single task".to_string());
998        let assignments = coord
999            .assign_subtasks(&task_id)
1000            .await
1001            .expect("should assign");
1002        let (subtask_id, original_fighter) = assignments[0];
1003
1004        // Fail the subtask.
1005        coord
1006            .fail_subtask(&task_id, &subtask_id, "crashed".to_string())
1007            .await
1008            .expect("should fail");
1009
1010        // Reassign.
1011        let new_fighter = coord
1012            .reassign_failed_subtask(&task_id, &subtask_id)
1013            .await
1014            .expect("should reassign");
1015
1016        assert!(new_fighter.is_some());
1017        let new_id = new_fighter.expect("should have new fighter");
1018        assert_ne!(new_id, original_fighter);
1019    }
1020
1021    #[tokio::test]
1022    async fn test_detect_stale_fighters() {
1023        let coord = SwarmCoordinator::new();
1024        let f1 = FighterId::new();
1025        let f2 = FighterId::new();
1026        coord.register_fighter(f1);
1027        coord.register_fighter(f2);
1028
1029        // Simulate f1 being stale by setting old heartbeat and active tasks.
1030        if let Some(mut load) = coord.fighter_loads.get_mut(&f1) {
1031            load.active_tasks = 1;
1032            load.last_heartbeat = Utc::now().timestamp() - STALE_THRESHOLD_SECS - 10;
1033        }
1034
1035        let stale = coord.detect_stale_fighters();
1036        assert!(stale.contains(&f1));
1037        assert!(!stale.contains(&f2));
1038
1039        // f1 should now be unhealthy.
1040        let load = coord.fighter_loads.get(&f1).expect("should exist");
1041        assert!(!load.healthy);
1042    }
1043
1044    #[test]
1045    fn test_record_heartbeat() {
1046        let coord = SwarmCoordinator::new();
1047        let f = FighterId::new();
1048        coord.register_fighter(f);
1049
1050        // Set old heartbeat.
1051        if let Some(mut load) = coord.fighter_loads.get_mut(&f) {
1052            load.last_heartbeat = 0;
1053        }
1054
1055        coord.record_heartbeat(&f);
1056
1057        let load = coord.fighter_loads.get(&f).expect("should exist");
1058        assert!(load.last_heartbeat > 0);
1059    }
1060
1061    #[test]
1062    fn test_list_task_ids() {
1063        let coord = SwarmCoordinator::new();
1064        let id1 = coord.create_task("task 1".to_string());
1065        let id2 = coord.create_task("task 2".to_string());
1066
1067        let ids = coord.list_task_ids();
1068        assert_eq!(ids.len(), 2);
1069        assert!(ids.contains(&id1));
1070        assert!(ids.contains(&id2));
1071    }
1072
1073    #[test]
1074    fn test_remove_task() {
1075        let coord = SwarmCoordinator::new();
1076        let id = coord.create_task("temp".to_string());
1077        assert!(coord.remove_task(&id));
1078        assert!(!coord.remove_task(&id)); // Already removed.
1079    }
1080
1081    #[tokio::test]
1082    async fn test_complete_subtask_decrements_load() {
1083        let coord = SwarmCoordinator::new();
1084        let f = FighterId::new();
1085        coord.register_fighter(f);
1086
1087        let task_id = coord.create_task("work".to_string());
1088        coord
1089            .assign_subtasks(&task_id)
1090            .await
1091            .expect("should assign");
1092
1093        // Verify load incremented.
1094        let load = coord.fighter_loads.get(&f).expect("should exist");
1095        assert_eq!(load.active_tasks, 1);
1096        drop(load);
1097
1098        // Get subtask id.
1099        let task = coord.get_task(&task_id).await.expect("should exist");
1100        let subtask_id = task.subtasks[0].id;
1101
1102        coord
1103            .complete_subtask(&task_id, &subtask_id, "done".to_string())
1104            .await
1105            .expect("should complete");
1106
1107        let load = coord.fighter_loads.get(&f).expect("should exist");
1108        assert_eq!(load.active_tasks, 0);
1109    }
1110
1111    #[tokio::test]
1112    async fn test_no_fighters_available() {
1113        let coord = SwarmCoordinator::new();
1114        let task_id = coord.create_task("lonely task".to_string());
1115        let assignments = coord
1116            .assign_subtasks(&task_id)
1117            .await
1118            .expect("should assign");
1119        assert!(assignments.is_empty());
1120    }
1121
1122    #[tokio::test]
1123    async fn test_get_nonexistent_task() {
1124        let coord = SwarmCoordinator::new();
1125        let result = coord.get_task(&Uuid::new_v4()).await;
1126        assert!(result.is_none());
1127    }
1128
1129    #[tokio::test]
1130    async fn test_assign_nonexistent_task() {
1131        let coord = SwarmCoordinator::new();
1132        let result = coord.assign_subtasks(&Uuid::new_v4()).await;
1133        assert!(result.is_err());
1134    }
1135
1136    #[test]
1137    fn test_default_impl() {
1138        let coord = SwarmCoordinator::default();
1139        assert_eq!(coord.available_fighter_count(), 0);
1140    }
1141
1142    #[tokio::test]
1143    async fn test_aggregated_result_joins_all() {
1144        let coord = SwarmCoordinator::new();
1145        let f = FighterId::new();
1146        coord.register_fighter(f);
1147
1148        let task_id = coord.create_task("a\nb\nc".to_string());
1149        let assignments = coord.assign_subtasks(&task_id).await.expect("assign");
1150
1151        for (subtask_id, _) in &assignments {
1152            coord
1153                .complete_subtask(&task_id, subtask_id, format!("result-{}", subtask_id))
1154                .await
1155                .expect("complete");
1156        }
1157
1158        let task = coord.get_task(&task_id).await.expect("should exist");
1159        let agg = task.aggregated_result.expect("should be aggregated");
1160        assert_eq!(agg.matches("result-").count(), 3);
1161    }
1162
1163    #[test]
1164    fn test_get_fighter_loads() {
1165        let coord = SwarmCoordinator::new();
1166        let f1 = FighterId::new();
1167        let f2 = FighterId::new();
1168        coord.register_fighter(f1);
1169        coord.register_fighter(f2);
1170
1171        let loads = coord.get_fighter_loads();
1172        assert_eq!(loads.len(), 2);
1173    }
1174
1175    #[tokio::test]
1176    async fn test_capability_aware_assignment() {
1177        let coord = SwarmCoordinator::new();
1178        let coder = FighterId::new();
1179        let reviewer = FighterId::new();
1180        coord.register_fighter_with_capabilities(coder, vec!["code".to_string()]);
1181        coord.register_fighter_with_capabilities(reviewer, vec!["review".to_string()]);
1182
1183        // Create a task about code.
1184        let subtasks = vec![SwarmSubtask {
1185            id: Uuid::new_v4(),
1186            description: "fix the code bug".to_string(),
1187            assigned_to: None,
1188            status: SubtaskStatus::Pending,
1189            result: None,
1190            depends_on: vec![],
1191        }];
1192        let task_id = coord.create_task_with_subtasks("code task".to_string(), subtasks);
1193        let assignments = coord
1194            .assign_subtasks(&task_id)
1195            .await
1196            .expect("should assign");
1197
1198        assert_eq!(assignments.len(), 1);
1199        assert_eq!(assignments[0].1, coder);
1200    }
1201}