1use std::collections::HashMap;
8use std::sync::Arc;
9use std::sync::atomic::{AtomicUsize, Ordering};
10use std::time::Duration;
11
12use chrono::Utc;
13use dashmap::DashMap;
14use futures::future::join_all;
15use tracing::{info, warn};
16
17use crate::agent_messaging::MessageRouter;
18use punch_types::{
19 AgentMessageType, CoordinationStrategy, FighterId, PunchError, PunchResult, Troop, TroopId,
20 TroopStatus,
21};
22
23const DEFAULT_TASK_TIMEOUT: Duration = Duration::from_secs(60);
25
26#[derive(Debug, Clone)]
28pub struct TaskAssignmentResult {
29 pub assigned_to: Vec<FighterId>,
31 pub routing_decision: String,
33 pub results: Vec<(FighterId, String)>,
35}
36
37pub struct TroopManager {
39 troops: DashMap<TroopId, Troop>,
41 round_robin_counter: AtomicUsize,
43 router: Arc<MessageRouter>,
45 fighter_capabilities: DashMap<FighterId, Vec<String>>,
47 task_timeout: Duration,
49}
50
51impl TroopManager {
52 pub fn new() -> Self {
54 Self {
55 troops: DashMap::new(),
56 round_robin_counter: AtomicUsize::new(0),
57 router: Arc::new(MessageRouter::new()),
58 fighter_capabilities: DashMap::new(),
59 task_timeout: DEFAULT_TASK_TIMEOUT,
60 }
61 }
62
63 pub fn with_router(router: Arc<MessageRouter>) -> Self {
65 Self {
66 troops: DashMap::new(),
67 round_robin_counter: AtomicUsize::new(0),
68 router,
69 fighter_capabilities: DashMap::new(),
70 task_timeout: DEFAULT_TASK_TIMEOUT,
71 }
72 }
73
74 pub fn set_task_timeout(&mut self, timeout: Duration) {
76 self.task_timeout = timeout;
77 }
78
79 pub fn task_timeout(&self) -> Duration {
81 self.task_timeout
82 }
83
84 pub fn router(&self) -> &Arc<MessageRouter> {
86 &self.router
87 }
88
89 pub fn register_capabilities(&self, fighter_id: FighterId, capabilities: Vec<String>) {
91 self.fighter_capabilities.insert(fighter_id, capabilities);
92 }
93
94 pub fn form_troop(
98 &self,
99 name: String,
100 leader: FighterId,
101 mut members: Vec<FighterId>,
102 strategy: CoordinationStrategy,
103 ) -> TroopId {
104 let id = TroopId::new();
105
106 if !members.contains(&leader) {
108 members.insert(0, leader);
109 }
110
111 let troop = Troop {
112 id,
113 name: name.clone(),
114 leader,
115 members,
116 strategy,
117 status: TroopStatus::Active,
118 created_at: Utc::now(),
119 };
120
121 let member_count = troop.members.len();
122 self.troops.insert(id, troop);
123 info!(%id, name, member_count, "troop formed");
124 id
125 }
126
127 pub fn recruit(&self, troop_id: &TroopId, fighter_id: FighterId) -> PunchResult<()> {
129 let mut troop = self
130 .troops
131 .get_mut(troop_id)
132 .ok_or_else(|| PunchError::Troop(format!("troop {} not found", troop_id)))?;
133
134 if troop.status == TroopStatus::Disbanded {
135 return Err(PunchError::Troop(
136 "cannot recruit to a disbanded troop".to_string(),
137 ));
138 }
139
140 if troop.members.contains(&fighter_id) {
141 return Err(PunchError::Troop(format!(
142 "fighter {} is already a member of troop {}",
143 fighter_id, troop_id
144 )));
145 }
146
147 troop.members.push(fighter_id);
148 info!(%troop_id, %fighter_id, "fighter recruited to troop");
149 Ok(())
150 }
151
152 pub fn dismiss(&self, troop_id: &TroopId, fighter_id: &FighterId) -> PunchResult<()> {
158 let mut troop = self
159 .troops
160 .get_mut(troop_id)
161 .ok_or_else(|| PunchError::Troop(format!("troop {} not found", troop_id)))?;
162
163 if troop.status == TroopStatus::Disbanded {
164 return Err(PunchError::Troop(
165 "cannot dismiss from a disbanded troop".to_string(),
166 ));
167 }
168
169 let pos = troop
170 .members
171 .iter()
172 .position(|id| id == fighter_id)
173 .ok_or_else(|| {
174 PunchError::Troop(format!(
175 "fighter {} is not a member of troop {}",
176 fighter_id, troop_id
177 ))
178 })?;
179
180 if troop.members.len() <= 1 {
182 return Err(PunchError::Troop(
183 "cannot dismiss the last member; disband the troop instead".to_string(),
184 ));
185 }
186
187 troop.members.remove(pos);
188
189 if troop.leader == *fighter_id
191 && let Some(new_leader) = troop.members.first()
192 {
193 let new_leader = *new_leader;
194 info!(
195 %troop_id,
196 old_leader = %fighter_id,
197 new_leader = %new_leader,
198 "troop leader changed due to dismissal"
199 );
200 troop.leader = new_leader;
201 }
202
203 info!(%troop_id, %fighter_id, "fighter dismissed from troop");
204 Ok(())
205 }
206
207 pub fn disband_troop(&self, troop_id: &TroopId) -> PunchResult<String> {
209 let mut troop = self
210 .troops
211 .get_mut(troop_id)
212 .ok_or_else(|| PunchError::Troop(format!("troop {} not found", troop_id)))?;
213
214 if troop.status == TroopStatus::Disbanded {
215 return Err(PunchError::Troop("troop is already disbanded".to_string()));
216 }
217
218 troop.status = TroopStatus::Disbanded;
219 troop.members.clear();
220 let name = troop.name.clone();
221 info!(%troop_id, name, "troop disbanded");
222 Ok(name)
223 }
224
225 pub fn get_troop(&self, troop_id: &TroopId) -> Option<Troop> {
227 self.troops.get(troop_id).map(|t| t.value().clone())
228 }
229
230 pub fn list_troops(&self) -> Vec<Troop> {
232 self.troops.iter().map(|t| t.value().clone()).collect()
233 }
234
235 pub fn assign_task(
241 &self,
242 troop_id: &TroopId,
243 task_description: &str,
244 ) -> PunchResult<Vec<FighterId>> {
245 let troop = self
246 .troops
247 .get(troop_id)
248 .ok_or_else(|| PunchError::Troop(format!("troop {} not found", troop_id)))?;
249
250 if troop.status != TroopStatus::Active {
251 return Err(PunchError::Troop(format!(
252 "troop {} is not active (status: {})",
253 troop_id, troop.status
254 )));
255 }
256
257 if troop.members.is_empty() {
258 return Err(PunchError::Troop("troop has no members".to_string()));
259 }
260
261 let assigned = match &troop.strategy {
262 CoordinationStrategy::LeaderWorker => {
263 self.assign_leader_worker(&troop, task_description)
264 }
265 CoordinationStrategy::RoundRobin => self.assign_round_robin(&troop, task_description),
266 CoordinationStrategy::Broadcast => self.assign_broadcast(&troop, task_description),
267 CoordinationStrategy::Pipeline => self.assign_pipeline(&troop, task_description),
268 CoordinationStrategy::Consensus => self.assign_consensus(&troop, task_description),
269 CoordinationStrategy::Specialist => self.assign_specialist(&troop, task_description),
270 };
271
272 Ok(assigned)
273 }
274
275 pub async fn assign_task_async(
278 &self,
279 troop_id: &TroopId,
280 task_description: &str,
281 ) -> PunchResult<TaskAssignmentResult> {
282 let troop = self
283 .troops
284 .get(troop_id)
285 .ok_or_else(|| PunchError::Troop(format!("troop {} not found", troop_id)))?
286 .clone();
287
288 if troop.status != TroopStatus::Active {
289 return Err(PunchError::Troop(format!(
290 "troop {} is not active (status: {})",
291 troop_id, troop.status
292 )));
293 }
294
295 if troop.members.is_empty() {
296 return Err(PunchError::Troop("troop has no members".to_string()));
297 }
298
299 match &troop.strategy {
300 CoordinationStrategy::LeaderWorker => {
301 self.dispatch_leader_worker(&troop, task_description).await
302 }
303 CoordinationStrategy::RoundRobin => {
304 self.dispatch_round_robin(&troop, task_description).await
305 }
306 CoordinationStrategy::Broadcast => {
307 self.dispatch_broadcast(&troop, task_description).await
308 }
309 CoordinationStrategy::Pipeline => {
310 self.dispatch_pipeline(&troop, task_description).await
311 }
312 CoordinationStrategy::Consensus => {
313 self.dispatch_consensus(&troop, task_description).await
314 }
315 CoordinationStrategy::Specialist => {
316 self.dispatch_specialist(&troop, task_description).await
317 }
318 }
319 }
320
321 fn assign_leader_worker(&self, troop: &Troop, _task: &str) -> Vec<FighterId> {
326 let workers: Vec<FighterId> = troop
327 .members
328 .iter()
329 .filter(|id| **id != troop.leader)
330 .copied()
331 .collect();
332 if workers.is_empty() {
333 vec![troop.leader]
334 } else {
335 workers
336 }
337 }
338
339 fn assign_round_robin(&self, troop: &Troop, _task: &str) -> Vec<FighterId> {
340 let idx = self.round_robin_counter.fetch_add(1, Ordering::Relaxed) % troop.members.len();
341 vec![troop.members[idx]]
342 }
343
344 fn assign_broadcast(&self, troop: &Troop, _task: &str) -> Vec<FighterId> {
345 troop.members.clone()
346 }
347
348 fn assign_pipeline(&self, troop: &Troop, _task: &str) -> Vec<FighterId> {
349 troop.members.clone()
350 }
351
352 fn assign_consensus(&self, troop: &Troop, _task: &str) -> Vec<FighterId> {
353 troop.members.clone()
354 }
355
356 fn assign_specialist(&self, troop: &Troop, task: &str) -> Vec<FighterId> {
357 let task_lower = task.to_lowercase();
358
359 let mut best_match: Option<(FighterId, usize)> = None;
361
362 for member in &troop.members {
363 if let Some(caps) = self.fighter_capabilities.get(member) {
364 let match_count = caps
365 .iter()
366 .filter(|cap| task_lower.contains(&cap.to_lowercase()))
367 .count();
368 if match_count > 0 {
369 if let Some((_, best_count)) = best_match {
370 if match_count > best_count {
371 best_match = Some((*member, match_count));
372 }
373 } else {
374 best_match = Some((*member, match_count));
375 }
376 }
377 }
378 }
379
380 match best_match {
381 Some((fighter_id, _)) => {
382 info!(
383 %fighter_id,
384 task = task,
385 "specialist routing: matched fighter by capability"
386 );
387 vec![fighter_id]
388 }
389 None => {
390 info!(
392 leader = %troop.leader,
393 "specialist routing: no capability match, defaulting to leader"
394 );
395 vec![troop.leader]
396 }
397 }
398 }
399
400 async fn dispatch_leader_worker(
407 &self,
408 troop: &Troop,
409 task: &str,
410 ) -> PunchResult<TaskAssignmentResult> {
411 let timeout = self.task_timeout;
412 let workers: Vec<FighterId> = troop
413 .members
414 .iter()
415 .filter(|id| **id != troop.leader)
416 .copied()
417 .collect();
418
419 if workers.is_empty() {
420 let response = self
422 .router
423 .request(
424 troop.leader,
425 troop.leader,
426 AgentMessageType::TaskAssignment {
427 task: task.to_string(),
428 },
429 timeout,
430 )
431 .await;
432
433 let results = match response {
434 Ok(msg) => vec![(troop.leader, extract_result_content(&msg.content))],
435 Err(e) => {
436 warn!(leader = %troop.leader, error = %e, "leader_worker: solo leader failed");
437 vec![]
438 }
439 };
440
441 return Ok(TaskAssignmentResult {
442 assigned_to: vec![troop.leader],
443 routing_decision: "leader_worker: solo leader handles task".to_string(),
444 results,
445 });
446 }
447
448 let leader_response = self
450 .router
451 .request(
452 troop.leader,
453 troop.leader,
454 AgentMessageType::TaskAssignment {
455 task: format!("DECOMPOSE AND COORDINATE: {}", task),
456 },
457 timeout,
458 )
459 .await;
460
461 let subtasks = match &leader_response {
463 Ok(msg) => {
464 let content = extract_result_content(&msg.content);
465 let parsed: Vec<String> = content
466 .split('\n')
467 .map(|s| s.trim().to_string())
468 .filter(|s| !s.is_empty())
469 .collect();
470 if parsed.len() >= workers.len() {
471 parsed
472 } else {
473 decompose_task(task, workers.len())
474 }
475 }
476 Err(_) => decompose_task(task, workers.len()),
477 };
478
479 let mut futures = Vec::new();
481 for (i, worker) in workers.iter().enumerate() {
482 let subtask = subtasks.get(i).cloned().unwrap_or_else(|| task.to_string());
483 let router = self.router.clone();
484 let leader = troop.leader;
485 let worker_id = *worker;
486 futures.push(async move {
487 let resp = router
488 .request(
489 leader,
490 worker_id,
491 AgentMessageType::TaskAssignment { task: subtask },
492 timeout,
493 )
494 .await;
495 (worker_id, resp)
496 });
497 }
498
499 let worker_results = join_all(futures).await;
500
501 let mut results = Vec::new();
502 for (worker_id, resp) in worker_results {
503 match resp {
504 Ok(msg) => results.push((worker_id, extract_result_content(&msg.content))),
505 Err(e) => {
506 warn!(
507 worker = %worker_id,
508 error = %e,
509 "leader_worker: worker failed to respond"
510 );
511 }
512 }
513 }
514
515 info!(
516 leader = %troop.leader,
517 worker_count = workers.len(),
518 result_count = results.len(),
519 "leader_worker: dispatched subtasks to workers"
520 );
521
522 Ok(TaskAssignmentResult {
523 assigned_to: workers,
524 routing_decision: format!(
525 "leader_worker: leader {} delegated to {} workers",
526 troop.leader,
527 troop.members.len() - 1
528 ),
529 results,
530 })
531 }
532
533 async fn dispatch_round_robin(
536 &self,
537 troop: &Troop,
538 task: &str,
539 ) -> PunchResult<TaskAssignmentResult> {
540 let timeout = self.task_timeout;
541 let idx = self.round_robin_counter.fetch_add(1, Ordering::Relaxed) % troop.members.len();
542 let assigned = troop.members[idx];
543
544 let response = self
545 .router
546 .request(
547 troop.leader,
548 assigned,
549 AgentMessageType::TaskAssignment {
550 task: task.to_string(),
551 },
552 timeout,
553 )
554 .await;
555
556 let results = match response {
557 Ok(msg) => vec![(assigned, extract_result_content(&msg.content))],
558 Err(e) => {
559 warn!(
560 %assigned,
561 error = %e,
562 "round_robin: fighter failed to respond"
563 );
564 vec![]
565 }
566 };
567
568 info!(
569 %assigned,
570 index = idx,
571 result_count = results.len(),
572 "round_robin: assigned task to fighter"
573 );
574
575 Ok(TaskAssignmentResult {
576 assigned_to: vec![assigned],
577 routing_decision: format!(
578 "round_robin: assigned to member at index {} (fighter {})",
579 idx, assigned
580 ),
581 results,
582 })
583 }
584
585 async fn dispatch_broadcast(
588 &self,
589 troop: &Troop,
590 task: &str,
591 ) -> PunchResult<TaskAssignmentResult> {
592 let timeout = self.task_timeout;
593
594 let mut futures = Vec::new();
596 for member in &troop.members {
597 let router = self.router.clone();
598 let leader = troop.leader;
599 let member_id = *member;
600 let task_str = task.to_string();
601 futures.push(async move {
602 let resp = router
603 .request(
604 leader,
605 member_id,
606 AgentMessageType::TaskAssignment { task: task_str },
607 timeout,
608 )
609 .await;
610 (member_id, resp)
611 });
612 }
613
614 let all_results = join_all(futures).await;
615
616 let mut results = Vec::new();
617 for (member_id, resp) in all_results {
618 match resp {
619 Ok(msg) => results.push((member_id, extract_result_content(&msg.content))),
620 Err(e) => {
621 warn!(
622 member = %member_id,
623 error = %e,
624 "broadcast: member failed to respond"
625 );
626 }
627 }
628 }
629
630 info!(
631 member_count = troop.members.len(),
632 result_count = results.len(),
633 "broadcast: sent task to all members"
634 );
635
636 Ok(TaskAssignmentResult {
637 assigned_to: troop.members.clone(),
638 routing_decision: format!("broadcast: sent to all {} members", troop.members.len()),
639 results,
640 })
641 }
642
643 async fn dispatch_pipeline(
647 &self,
648 troop: &Troop,
649 task: &str,
650 ) -> PunchResult<TaskAssignmentResult> {
651 let timeout = self.task_timeout;
652 let mut current_input = task.to_string();
653 let mut results = Vec::new();
654
655 for (i, member) in troop.members.iter().enumerate() {
656 let response = self
657 .router
658 .request(
659 troop.leader,
660 *member,
661 AgentMessageType::TaskAssignment {
662 task: current_input.clone(),
663 },
664 timeout,
665 )
666 .await;
667
668 match response {
669 Ok(msg) => {
670 let stage_output = extract_result_content(&msg.content);
671 results.push((*member, stage_output.clone()));
672 current_input = stage_output;
673 }
674 Err(e) => {
675 warn!(
676 stage = i,
677 fighter = %member,
678 error = %e,
679 "pipeline: stage failed to respond"
680 );
681 return Err(PunchError::Troop(format!(
682 "pipeline stage {} ({}) failed: {}",
683 i, member, e
684 )));
685 }
686 }
687 }
688
689 let pipeline_desc: Vec<String> = troop.members.iter().map(|m| m.to_string()).collect();
690
691 info!(
692 pipeline = ?pipeline_desc,
693 stage_count = results.len(),
694 "pipeline: completed all stages"
695 );
696
697 Ok(TaskAssignmentResult {
698 assigned_to: troop.members.clone(),
699 routing_decision: format!(
700 "pipeline: completed {} stages: [{}]",
701 troop.members.len(),
702 pipeline_desc.join(" -> ")
703 ),
704 results,
705 })
706 }
707
708 async fn dispatch_consensus(
711 &self,
712 troop: &Troop,
713 task: &str,
714 ) -> PunchResult<TaskAssignmentResult> {
715 let timeout = self.task_timeout;
716
717 let mut futures = Vec::new();
719 for member in &troop.members {
720 let router = self.router.clone();
721 let leader = troop.leader;
722 let member_id = *member;
723 let proposal = task.to_string();
724 futures.push(async move {
725 let resp = router
726 .request(
727 leader,
728 member_id,
729 AgentMessageType::VoteRequest {
730 proposal,
731 options: vec!["approve".to_string(), "reject".to_string()],
732 },
733 timeout,
734 )
735 .await;
736 (member_id, resp)
737 });
738 }
739
740 let all_results = join_all(futures).await;
741
742 let mut votes = Vec::new();
743 let mut results = Vec::new();
744 for (member_id, resp) in all_results {
745 match resp {
746 Ok(msg) => {
747 let vote_str = extract_vote_content(&msg.content);
748 votes.push((member_id, vote_str.clone()));
749 results.push((member_id, vote_str));
750 }
751 Err(e) => {
752 warn!(
753 member = %member_id,
754 error = %e,
755 "consensus: member failed to respond"
756 );
757 }
758 }
759 }
760
761 let consensus = self.tally_votes(&votes);
763 let decision = match &consensus {
764 Some(winner) => format!(
765 "consensus: {} members voted, result: {} ({}/{} responded)",
766 troop.members.len(),
767 winner,
768 results.len(),
769 troop.members.len()
770 ),
771 None => format!(
772 "consensus: {} members voted, no consensus reached ({}/{} responded)",
773 troop.members.len(),
774 results.len(),
775 troop.members.len()
776 ),
777 };
778
779 info!(
780 member_count = troop.members.len(),
781 vote_count = votes.len(),
782 consensus = ?consensus,
783 "consensus: vote collection complete"
784 );
785
786 Ok(TaskAssignmentResult {
787 assigned_to: troop.members.clone(),
788 routing_decision: decision,
789 results,
790 })
791 }
792
793 pub fn tally_votes(&self, votes: &[(FighterId, String)]) -> Option<String> {
795 if votes.is_empty() {
796 return None;
797 }
798
799 let mut counts: HashMap<&str, usize> = HashMap::new();
800 for (_, vote) in votes {
801 *counts.entry(vote.as_str()).or_insert(0) += 1;
802 }
803
804 counts
805 .into_iter()
806 .max_by_key(|(_, count)| *count)
807 .map(|(vote, _)| vote.to_string())
808 }
809
810 async fn dispatch_specialist(
813 &self,
814 troop: &Troop,
815 task: &str,
816 ) -> PunchResult<TaskAssignmentResult> {
817 let timeout = self.task_timeout;
818 let assigned = self.assign_specialist(troop, task);
819 let target = assigned[0];
820
821 let response = self
822 .router
823 .request(
824 troop.leader,
825 target,
826 AgentMessageType::TaskAssignment {
827 task: task.to_string(),
828 },
829 timeout,
830 )
831 .await;
832
833 let results = match response {
834 Ok(msg) => vec![(target, extract_result_content(&msg.content))],
835 Err(e) => {
836 warn!(
837 %target,
838 error = %e,
839 "specialist: fighter failed to respond"
840 );
841 vec![]
842 }
843 };
844
845 let has_capability_match = self
846 .fighter_capabilities
847 .get(&target)
848 .map(|caps| {
849 let task_lower = task.to_lowercase();
850 caps.iter().any(|c| task_lower.contains(&c.to_lowercase()))
851 })
852 .unwrap_or(false);
853
854 let decision = if has_capability_match {
855 format!("specialist: routed to {} based on capability match", target)
856 } else {
857 format!(
858 "specialist: no capability match, defaulted to leader {}",
859 target
860 )
861 };
862
863 Ok(TaskAssignmentResult {
864 assigned_to: assigned,
865 routing_decision: decision,
866 results,
867 })
868 }
869
870 pub fn is_in_troop(&self, fighter_id: &FighterId) -> bool {
872 self.troops.iter().any(|t| {
873 t.value().status != TroopStatus::Disbanded && t.value().members.contains(fighter_id)
874 })
875 }
876
877 pub fn get_fighter_troops(&self, fighter_id: &FighterId) -> Vec<TroopId> {
879 self.troops
880 .iter()
881 .filter(|t| {
882 t.value().status != TroopStatus::Disbanded && t.value().members.contains(fighter_id)
883 })
884 .map(|t| *t.key())
885 .collect()
886 }
887
888 pub fn pause_troop(&self, troop_id: &TroopId) -> PunchResult<()> {
890 let mut troop = self
891 .troops
892 .get_mut(troop_id)
893 .ok_or_else(|| PunchError::Troop(format!("troop {} not found", troop_id)))?;
894
895 if troop.status == TroopStatus::Disbanded {
896 return Err(PunchError::Troop(
897 "cannot pause a disbanded troop".to_string(),
898 ));
899 }
900
901 troop.status = TroopStatus::Paused;
902 info!(%troop_id, "troop paused");
903 Ok(())
904 }
905
906 pub fn resume_troop(&self, troop_id: &TroopId) -> PunchResult<()> {
908 let mut troop = self
909 .troops
910 .get_mut(troop_id)
911 .ok_or_else(|| PunchError::Troop(format!("troop {} not found", troop_id)))?;
912
913 if troop.status != TroopStatus::Paused {
914 return Err(PunchError::Troop(format!(
915 "troop {} is not paused (status: {})",
916 troop_id, troop.status
917 )));
918 }
919
920 troop.status = TroopStatus::Active;
921 info!(%troop_id, "troop resumed");
922 Ok(())
923 }
924}
925
926impl Default for TroopManager {
927 fn default() -> Self {
928 Self::new()
929 }
930}
931
932fn extract_result_content(content: &AgentMessageType) -> String {
934 match content {
935 AgentMessageType::TaskResult { result, .. } => result.clone(),
936 AgentMessageType::StatusUpdate { detail, .. } => detail.clone(),
937 AgentMessageType::DataShare { value, .. } => value.to_string(),
938 AgentMessageType::VoteResponse { vote, .. } => vote.clone(),
939 AgentMessageType::TaskAssignment { task } => task.clone(),
940 AgentMessageType::VoteRequest { proposal, .. } => proposal.clone(),
941 AgentMessageType::Escalation { reason, .. } => reason.clone(),
942 }
943}
944
945fn extract_vote_content(content: &AgentMessageType) -> String {
947 match content {
948 AgentMessageType::VoteResponse { vote, .. } => vote.clone(),
949 AgentMessageType::TaskResult { result, .. } => result.clone(),
950 other => extract_result_content(other),
951 }
952}
953
954fn decompose_task(task: &str, num_parts: usize) -> Vec<String> {
958 if num_parts == 0 || task.is_empty() {
959 return vec![task.to_string()];
960 }
961
962 let sentences: Vec<&str> = task
964 .split(['.', '\n'])
965 .map(|s| s.trim())
966 .filter(|s| !s.is_empty())
967 .collect();
968
969 if sentences.len() >= num_parts {
970 let chunk_size = sentences.len().div_ceil(num_parts);
971 return sentences
972 .chunks(chunk_size)
973 .map(|chunk| chunk.join(". "))
974 .collect();
975 }
976
977 (0..num_parts)
979 .map(|i| format!("[part {}/{}] {}", i + 1, num_parts, task))
980 .collect()
981}
982
983#[cfg(test)]
984mod tests {
985 use super::*;
986 use punch_types::{AgentMessage, MessageChannel, MessagePriority};
987 use uuid::Uuid;
988
989 fn make_manager() -> TroopManager {
990 TroopManager::new()
991 }
992
993 fn make_manager_with_router() -> (TroopManager, Arc<MessageRouter>) {
994 let router = Arc::new(MessageRouter::new());
995 let mut mgr = TroopManager::with_router(router.clone());
996 mgr.set_task_timeout(Duration::from_secs(5));
998 (mgr, router)
999 }
1000
1001 fn spawn_task_responder(
1004 router: &Arc<MessageRouter>,
1005 fighter_id: FighterId,
1006 mut rx: tokio::sync::mpsc::Receiver<AgentMessage>,
1007 ) {
1008 let router = router.clone();
1009 tokio::spawn(async move {
1010 while let Some(msg) = rx.recv().await {
1011 let response = AgentMessage {
1012 id: Uuid::new_v4(),
1013 from: fighter_id,
1014 to: msg.from,
1015 channel: MessageChannel::Direct,
1016 content: AgentMessageType::TaskResult {
1017 result: format!("result-from-{}", fighter_id),
1018 success: true,
1019 },
1020 priority: MessagePriority::Normal,
1021 timestamp: Utc::now(),
1022 delivered: false,
1023 };
1024 let _ = router.respond(&msg.id, response);
1025 }
1026 });
1027 }
1028
1029 fn spawn_vote_responder(
1031 router: &Arc<MessageRouter>,
1032 fighter_id: FighterId,
1033 mut rx: tokio::sync::mpsc::Receiver<AgentMessage>,
1034 vote: String,
1035 ) {
1036 let router = router.clone();
1037 tokio::spawn(async move {
1038 while let Some(msg) = rx.recv().await {
1039 let response = AgentMessage {
1040 id: Uuid::new_v4(),
1041 from: fighter_id,
1042 to: msg.from,
1043 channel: MessageChannel::Direct,
1044 content: AgentMessageType::VoteResponse {
1045 proposal: "task".to_string(),
1046 vote: vote.clone(),
1047 },
1048 priority: MessagePriority::Normal,
1049 timestamp: Utc::now(),
1050 delivered: false,
1051 };
1052 let _ = router.respond(&msg.id, response);
1053 }
1054 });
1055 }
1056
1057 fn spawn_pipeline_responder(
1059 router: &Arc<MessageRouter>,
1060 fighter_id: FighterId,
1061 mut rx: tokio::sync::mpsc::Receiver<AgentMessage>,
1062 stage_tag: String,
1063 ) {
1064 let router = router.clone();
1065 tokio::spawn(async move {
1066 while let Some(msg) = rx.recv().await {
1067 let input = extract_result_content(&msg.content);
1068 let output = format!("{}+{}", input, stage_tag);
1069 let response = AgentMessage {
1070 id: Uuid::new_v4(),
1071 from: fighter_id,
1072 to: msg.from,
1073 channel: MessageChannel::Direct,
1074 content: AgentMessageType::TaskResult {
1075 result: output,
1076 success: true,
1077 },
1078 priority: MessagePriority::Normal,
1079 timestamp: Utc::now(),
1080 delivered: false,
1081 };
1082 let _ = router.respond(&msg.id, response);
1083 }
1084 });
1085 }
1086
1087 #[test]
1092 fn test_form_troop() {
1093 let mgr = make_manager();
1094 let leader = FighterId::new();
1095 let member1 = FighterId::new();
1096 let member2 = FighterId::new();
1097
1098 let troop_id = mgr.form_troop(
1099 "Alpha".to_string(),
1100 leader,
1101 vec![leader, member1, member2],
1102 CoordinationStrategy::LeaderWorker,
1103 );
1104
1105 let troop = mgr.get_troop(&troop_id).expect("troop should exist");
1106 assert_eq!(troop.name, "Alpha");
1107 assert_eq!(troop.leader, leader);
1108 assert_eq!(troop.members.len(), 3);
1109 assert_eq!(troop.status, TroopStatus::Active);
1110 }
1111
1112 #[test]
1113 fn test_form_troop_leader_auto_added() {
1114 let mgr = make_manager();
1115 let leader = FighterId::new();
1116 let member = FighterId::new();
1117
1118 let troop_id = mgr.form_troop(
1119 "Beta".to_string(),
1120 leader,
1121 vec![member],
1122 CoordinationStrategy::RoundRobin,
1123 );
1124
1125 let troop = mgr.get_troop(&troop_id).expect("troop should exist");
1126 assert!(troop.members.contains(&leader));
1127 assert!(troop.members.contains(&member));
1128 assert_eq!(troop.members.len(), 2);
1129 }
1130
1131 #[test]
1132 fn test_recruit() {
1133 let mgr = make_manager();
1134 let leader = FighterId::new();
1135 let troop_id = mgr.form_troop(
1136 "Gamma".to_string(),
1137 leader,
1138 vec![],
1139 CoordinationStrategy::Broadcast,
1140 );
1141
1142 let new_member = FighterId::new();
1143 mgr.recruit(&troop_id, new_member).expect("should recruit");
1144
1145 let troop = mgr.get_troop(&troop_id).expect("troop should exist");
1146 assert!(troop.members.contains(&new_member));
1147 }
1148
1149 #[test]
1150 fn test_recruit_duplicate() {
1151 let mgr = make_manager();
1152 let leader = FighterId::new();
1153 let troop_id = mgr.form_troop(
1154 "Delta".to_string(),
1155 leader,
1156 vec![],
1157 CoordinationStrategy::Pipeline,
1158 );
1159
1160 let result = mgr.recruit(&troop_id, leader);
1161 assert!(result.is_err());
1162 }
1163
1164 #[test]
1165 fn test_recruit_disbanded() {
1166 let mgr = make_manager();
1167 let leader = FighterId::new();
1168 let troop_id = mgr.form_troop(
1169 "Echo".to_string(),
1170 leader,
1171 vec![],
1172 CoordinationStrategy::Pipeline,
1173 );
1174 mgr.disband_troop(&troop_id).expect("should disband");
1175
1176 let result = mgr.recruit(&troop_id, FighterId::new());
1177 assert!(result.is_err());
1178 }
1179
1180 #[test]
1181 fn test_dismiss() {
1182 let mgr = make_manager();
1183 let leader = FighterId::new();
1184 let member = FighterId::new();
1185 let troop_id = mgr.form_troop(
1186 "Foxtrot".to_string(),
1187 leader,
1188 vec![member],
1189 CoordinationStrategy::LeaderWorker,
1190 );
1191
1192 mgr.dismiss(&troop_id, &member).expect("should dismiss");
1193 let troop = mgr.get_troop(&troop_id).expect("troop should exist");
1194 assert!(!troop.members.contains(&member));
1195 }
1196
1197 #[test]
1198 fn test_dismiss_leader_promotes_next() {
1199 let mgr = make_manager();
1200 let leader = FighterId::new();
1201 let member = FighterId::new();
1202 let troop_id = mgr.form_troop(
1203 "Golf".to_string(),
1204 leader,
1205 vec![member],
1206 CoordinationStrategy::LeaderWorker,
1207 );
1208
1209 mgr.dismiss(&troop_id, &leader)
1210 .expect("should dismiss leader");
1211 let troop = mgr.get_troop(&troop_id).expect("troop should exist");
1212 assert_eq!(troop.leader, member);
1213 assert!(!troop.members.contains(&leader));
1214 }
1215
1216 #[test]
1217 fn test_dismiss_last_member_fails() {
1218 let mgr = make_manager();
1219 let leader = FighterId::new();
1220 let troop_id = mgr.form_troop(
1221 "Hotel".to_string(),
1222 leader,
1223 vec![],
1224 CoordinationStrategy::Broadcast,
1225 );
1226
1227 let result = mgr.dismiss(&troop_id, &leader);
1228 assert!(result.is_err());
1229 }
1230
1231 #[test]
1232 fn test_dismiss_nonmember() {
1233 let mgr = make_manager();
1234 let leader = FighterId::new();
1235 let troop_id = mgr.form_troop(
1236 "India".to_string(),
1237 leader,
1238 vec![],
1239 CoordinationStrategy::Broadcast,
1240 );
1241
1242 let stranger = FighterId::new();
1243 let result = mgr.dismiss(&troop_id, &stranger);
1244 assert!(result.is_err());
1245 }
1246
1247 #[test]
1248 fn test_disband_troop() {
1249 let mgr = make_manager();
1250 let leader = FighterId::new();
1251 let troop_id = mgr.form_troop(
1252 "Juliet".to_string(),
1253 leader,
1254 vec![FighterId::new()],
1255 CoordinationStrategy::Consensus,
1256 );
1257
1258 let name = mgr.disband_troop(&troop_id).expect("should disband");
1259 assert_eq!(name, "Juliet");
1260
1261 let troop = mgr.get_troop(&troop_id).expect("troop should still exist");
1262 assert_eq!(troop.status, TroopStatus::Disbanded);
1263 assert!(troop.members.is_empty());
1264 }
1265
1266 #[test]
1267 fn test_disband_already_disbanded() {
1268 let mgr = make_manager();
1269 let leader = FighterId::new();
1270 let troop_id = mgr.form_troop(
1271 "Kilo".to_string(),
1272 leader,
1273 vec![],
1274 CoordinationStrategy::Broadcast,
1275 );
1276
1277 mgr.disband_troop(&troop_id).expect("should disband");
1278 let result = mgr.disband_troop(&troop_id);
1279 assert!(result.is_err());
1280 }
1281
1282 #[test]
1283 fn test_list_troops() {
1284 let mgr = make_manager();
1285 let leader = FighterId::new();
1286 mgr.form_troop(
1287 "A".to_string(),
1288 leader,
1289 vec![],
1290 CoordinationStrategy::Broadcast,
1291 );
1292 mgr.form_troop(
1293 "B".to_string(),
1294 leader,
1295 vec![],
1296 CoordinationStrategy::Pipeline,
1297 );
1298
1299 let troops = mgr.list_troops();
1300 assert_eq!(troops.len(), 2);
1301 }
1302
1303 #[test]
1304 fn test_assign_task_leader_worker() {
1305 let mgr = make_manager();
1306 let leader = FighterId::new();
1307 let w1 = FighterId::new();
1308 let w2 = FighterId::new();
1309 let troop_id = mgr.form_troop(
1310 "LW".to_string(),
1311 leader,
1312 vec![w1, w2],
1313 CoordinationStrategy::LeaderWorker,
1314 );
1315
1316 let assigned = mgr
1317 .assign_task(&troop_id, "do work")
1318 .expect("should assign");
1319 assert!(!assigned.contains(&leader));
1321 assert!(assigned.contains(&w1));
1322 assert!(assigned.contains(&w2));
1323 }
1324
1325 #[test]
1326 fn test_assign_task_leader_worker_solo() {
1327 let mgr = make_manager();
1328 let leader = FighterId::new();
1329 let troop_id = mgr.form_troop(
1330 "Solo".to_string(),
1331 leader,
1332 vec![],
1333 CoordinationStrategy::LeaderWorker,
1334 );
1335
1336 let assigned = mgr
1337 .assign_task(&troop_id, "solo task")
1338 .expect("should assign");
1339 assert_eq!(assigned, vec![leader]);
1340 }
1341
1342 #[test]
1343 fn test_assign_task_round_robin() {
1344 let mgr = make_manager();
1345 let m1 = FighterId::new();
1346 let m2 = FighterId::new();
1347 let m3 = FighterId::new();
1348 let troop_id = mgr.form_troop(
1349 "RR".to_string(),
1350 m1,
1351 vec![m2, m3],
1352 CoordinationStrategy::RoundRobin,
1353 );
1354
1355 let a1 = mgr.assign_task(&troop_id, "task 1").expect("should assign");
1356 let a2 = mgr.assign_task(&troop_id, "task 2").expect("should assign");
1357 let a3 = mgr.assign_task(&troop_id, "task 3").expect("should assign");
1358
1359 assert_eq!(a1.len(), 1);
1361 assert_eq!(a2.len(), 1);
1362 assert_eq!(a3.len(), 1);
1363 let a4 = mgr.assign_task(&troop_id, "task 4").expect("should assign");
1365 assert_eq!(a4[0], a1[0]);
1366 }
1367
1368 #[test]
1369 fn test_assign_task_broadcast() {
1370 let mgr = make_manager();
1371 let m1 = FighterId::new();
1372 let m2 = FighterId::new();
1373 let troop_id = mgr.form_troop(
1374 "BC".to_string(),
1375 m1,
1376 vec![m2],
1377 CoordinationStrategy::Broadcast,
1378 );
1379
1380 let assigned = mgr
1381 .assign_task(&troop_id, "broadcast task")
1382 .expect("should assign");
1383 assert_eq!(assigned.len(), 2);
1384 assert!(assigned.contains(&m1));
1385 assert!(assigned.contains(&m2));
1386 }
1387
1388 #[test]
1389 fn test_assign_task_pipeline() {
1390 let mgr = make_manager();
1391 let m1 = FighterId::new();
1392 let m2 = FighterId::new();
1393 let m3 = FighterId::new();
1394 let troop_id = mgr.form_troop(
1395 "PL".to_string(),
1396 m1,
1397 vec![m2, m3],
1398 CoordinationStrategy::Pipeline,
1399 );
1400
1401 let assigned = mgr
1402 .assign_task(&troop_id, "pipeline task")
1403 .expect("should assign");
1404 assert_eq!(assigned.len(), 3);
1405 }
1406
1407 #[test]
1408 fn test_assign_task_consensus() {
1409 let mgr = make_manager();
1410 let m1 = FighterId::new();
1411 let m2 = FighterId::new();
1412 let m3 = FighterId::new();
1413 let troop_id = mgr.form_troop(
1414 "CN".to_string(),
1415 m1,
1416 vec![m2, m3],
1417 CoordinationStrategy::Consensus,
1418 );
1419
1420 let assigned = mgr
1421 .assign_task(&troop_id, "vote task")
1422 .expect("should assign");
1423 assert_eq!(assigned.len(), 3);
1424 }
1425
1426 #[test]
1427 fn test_assign_task_specialist() {
1428 let mgr = make_manager();
1429 let leader = FighterId::new();
1430 let troop_id = mgr.form_troop(
1431 "SP".to_string(),
1432 leader,
1433 vec![FighterId::new()],
1434 CoordinationStrategy::Specialist,
1435 );
1436
1437 let assigned = mgr
1438 .assign_task(&troop_id, "specialist task")
1439 .expect("should assign");
1440 assert_eq!(assigned, vec![leader]);
1441 }
1442
1443 #[test]
1444 fn test_assign_task_inactive_troop() {
1445 let mgr = make_manager();
1446 let leader = FighterId::new();
1447 let troop_id = mgr.form_troop(
1448 "Paused".to_string(),
1449 leader,
1450 vec![],
1451 CoordinationStrategy::Broadcast,
1452 );
1453 mgr.pause_troop(&troop_id).expect("should pause");
1454
1455 let result = mgr.assign_task(&troop_id, "task");
1456 assert!(result.is_err());
1457 }
1458
1459 #[test]
1460 fn test_is_in_troop() {
1461 let mgr = make_manager();
1462 let leader = FighterId::new();
1463 let member = FighterId::new();
1464 let outsider = FighterId::new();
1465
1466 mgr.form_troop(
1467 "Check".to_string(),
1468 leader,
1469 vec![member],
1470 CoordinationStrategy::Broadcast,
1471 );
1472
1473 assert!(mgr.is_in_troop(&leader));
1474 assert!(mgr.is_in_troop(&member));
1475 assert!(!mgr.is_in_troop(&outsider));
1476 }
1477
1478 #[test]
1479 fn test_get_fighter_troops() {
1480 let mgr = make_manager();
1481 let fighter = FighterId::new();
1482
1483 let t1 = mgr.form_troop(
1484 "T1".to_string(),
1485 fighter,
1486 vec![],
1487 CoordinationStrategy::Broadcast,
1488 );
1489 let t2 = mgr.form_troop(
1490 "T2".to_string(),
1491 FighterId::new(),
1492 vec![fighter],
1493 CoordinationStrategy::Pipeline,
1494 );
1495
1496 let troops = mgr.get_fighter_troops(&fighter);
1497 assert_eq!(troops.len(), 2);
1498 assert!(troops.contains(&t1));
1499 assert!(troops.contains(&t2));
1500 }
1501
1502 #[test]
1503 fn test_pause_and_resume_troop() {
1504 let mgr = make_manager();
1505 let leader = FighterId::new();
1506 let troop_id = mgr.form_troop(
1507 "PR".to_string(),
1508 leader,
1509 vec![],
1510 CoordinationStrategy::Broadcast,
1511 );
1512
1513 mgr.pause_troop(&troop_id).expect("should pause");
1514 let troop = mgr.get_troop(&troop_id).expect("troop should exist");
1515 assert_eq!(troop.status, TroopStatus::Paused);
1516
1517 mgr.resume_troop(&troop_id).expect("should resume");
1518 let troop = mgr.get_troop(&troop_id).expect("troop should exist");
1519 assert_eq!(troop.status, TroopStatus::Active);
1520 }
1521
1522 #[test]
1523 fn test_resume_non_paused_fails() {
1524 let mgr = make_manager();
1525 let leader = FighterId::new();
1526 let troop_id = mgr.form_troop(
1527 "NP".to_string(),
1528 leader,
1529 vec![],
1530 CoordinationStrategy::Broadcast,
1531 );
1532
1533 let result = mgr.resume_troop(&troop_id);
1534 assert!(result.is_err());
1535 }
1536
1537 #[test]
1538 fn test_get_nonexistent_troop() {
1539 let mgr = make_manager();
1540 let result = mgr.get_troop(&TroopId::new());
1541 assert!(result.is_none());
1542 }
1543
1544 #[test]
1545 fn test_assign_task_nonexistent_troop() {
1546 let mgr = make_manager();
1547 let result = mgr.assign_task(&TroopId::new(), "task");
1548 assert!(result.is_err());
1549 }
1550
1551 #[test]
1552 fn test_empty_troop_list() {
1553 let mgr = make_manager();
1554 assert!(mgr.list_troops().is_empty());
1555 }
1556
1557 #[test]
1558 fn test_default_impl() {
1559 let mgr = TroopManager::default();
1560 assert!(mgr.list_troops().is_empty());
1561 }
1562
1563 #[test]
1564 fn test_disbanded_troop_not_in_troop() {
1565 let mgr = make_manager();
1566 let leader = FighterId::new();
1567 let troop_id = mgr.form_troop(
1568 "Gone".to_string(),
1569 leader,
1570 vec![],
1571 CoordinationStrategy::Broadcast,
1572 );
1573 mgr.disband_troop(&troop_id).expect("should disband");
1574 assert!(!mgr.is_in_troop(&leader));
1575 }
1576
1577 #[test]
1578 fn test_decompose_task_by_sentences() {
1579 let task = "Analyze the code. Fix any bugs. Write tests. Deploy to staging.";
1580 let parts = decompose_task(task, 2);
1581 assert_eq!(parts.len(), 2);
1582 }
1583
1584 #[test]
1585 fn test_decompose_task_duplicates_when_not_enough() {
1586 let task = "simple task";
1587 let parts = decompose_task(task, 3);
1588 assert_eq!(parts.len(), 3);
1589 assert!(parts[0].contains("simple task"));
1590 }
1591
1592 #[test]
1593 fn test_decompose_task_empty() {
1594 let parts = decompose_task("", 3);
1595 assert_eq!(parts.len(), 1);
1596 }
1597
1598 #[test]
1599 fn test_with_router_constructor() {
1600 let router = Arc::new(MessageRouter::new());
1601 let mgr = TroopManager::with_router(router.clone());
1602 assert!(mgr.list_troops().is_empty());
1603 assert!(Arc::ptr_eq(mgr.router(), &router));
1604 }
1605
1606 #[test]
1607 fn test_register_capabilities() {
1608 let mgr = make_manager();
1609 let fighter = FighterId::new();
1610 mgr.register_capabilities(fighter, vec!["code".to_string(), "test".to_string()]);
1611
1612 assert!(mgr.fighter_capabilities.contains_key(&fighter));
1613 let caps = mgr
1614 .fighter_capabilities
1615 .get(&fighter)
1616 .expect("should exist");
1617 assert_eq!(caps.len(), 2);
1618 }
1619
1620 #[test]
1621 fn test_task_timeout_getter_setter() {
1622 let mut mgr = TroopManager::new();
1623 assert_eq!(mgr.task_timeout(), DEFAULT_TASK_TIMEOUT);
1624 mgr.set_task_timeout(Duration::from_secs(30));
1625 assert_eq!(mgr.task_timeout(), Duration::from_secs(30));
1626 }
1627
1628 #[test]
1629 fn test_extract_result_content_variants() {
1630 assert_eq!(
1631 extract_result_content(&AgentMessageType::TaskResult {
1632 result: "done".to_string(),
1633 success: true,
1634 }),
1635 "done"
1636 );
1637 assert_eq!(
1638 extract_result_content(&AgentMessageType::StatusUpdate {
1639 progress: 1.0,
1640 detail: "finished".to_string(),
1641 }),
1642 "finished"
1643 );
1644 assert_eq!(
1645 extract_result_content(&AgentMessageType::VoteResponse {
1646 proposal: "p".to_string(),
1647 vote: "approve".to_string(),
1648 }),
1649 "approve"
1650 );
1651 assert_eq!(
1652 extract_result_content(&AgentMessageType::TaskAssignment {
1653 task: "work".to_string(),
1654 }),
1655 "work"
1656 );
1657 }
1658
1659 #[test]
1660 fn test_extract_vote_content_variants() {
1661 assert_eq!(
1662 extract_vote_content(&AgentMessageType::VoteResponse {
1663 proposal: "p".to_string(),
1664 vote: "reject".to_string(),
1665 }),
1666 "reject"
1667 );
1668 assert_eq!(
1669 extract_vote_content(&AgentMessageType::TaskResult {
1670 result: "approve".to_string(),
1671 success: true,
1672 }),
1673 "approve"
1674 );
1675 }
1676
1677 #[tokio::test]
1682 async fn test_round_robin_collects_result() {
1683 let (mgr, router) = make_manager_with_router();
1684 let m1 = FighterId::new();
1685 let m2 = FighterId::new();
1686
1687 let rx1 = router.register(m1);
1688 let rx2 = router.register(m2);
1689 spawn_task_responder(&router, m1, rx1);
1690 spawn_task_responder(&router, m2, rx2);
1691
1692 let troop_id = mgr.form_troop(
1693 "RR_Result".to_string(),
1694 m1,
1695 vec![m2],
1696 CoordinationStrategy::RoundRobin,
1697 );
1698
1699 let result = mgr
1700 .assign_task_async(&troop_id, "do work")
1701 .await
1702 .expect("should assign");
1703
1704 assert_eq!(result.assigned_to.len(), 1);
1705 assert_eq!(result.results.len(), 1);
1706 let (fighter_id, response) = &result.results[0];
1707 assert_eq!(*fighter_id, result.assigned_to[0]);
1708 assert!(response.starts_with("result-from-"));
1709 }
1710
1711 #[tokio::test]
1712 async fn test_broadcast_collects_all_results() {
1713 let (mgr, router) = make_manager_with_router();
1714 let m1 = FighterId::new();
1715 let m2 = FighterId::new();
1716 let m3 = FighterId::new();
1717
1718 let rx1 = router.register(m1);
1719 let rx2 = router.register(m2);
1720 let rx3 = router.register(m3);
1721 spawn_task_responder(&router, m1, rx1);
1722 spawn_task_responder(&router, m2, rx2);
1723 spawn_task_responder(&router, m3, rx3);
1724
1725 let troop_id = mgr.form_troop(
1726 "BC_Result".to_string(),
1727 m1,
1728 vec![m2, m3],
1729 CoordinationStrategy::Broadcast,
1730 );
1731
1732 let result = mgr
1733 .assign_task_async(&troop_id, "broadcast task")
1734 .await
1735 .expect("should assign");
1736
1737 assert_eq!(result.assigned_to.len(), 3);
1738 assert_eq!(result.results.len(), 3);
1739
1740 let result_ids: Vec<FighterId> = result.results.iter().map(|(id, _)| *id).collect();
1741 assert!(result_ids.contains(&m1));
1742 assert!(result_ids.contains(&m2));
1743 assert!(result_ids.contains(&m3));
1744 }
1745
1746 #[tokio::test]
1747 async fn test_pipeline_chains_output_to_input() {
1748 let (mgr, router) = make_manager_with_router();
1749 let m1 = FighterId::new();
1750 let m2 = FighterId::new();
1751 let m3 = FighterId::new();
1752
1753 let rx1 = router.register(m1);
1754 let rx2 = router.register(m2);
1755 let rx3 = router.register(m3);
1756 spawn_pipeline_responder(&router, m1, rx1, "stage1".to_string());
1757 spawn_pipeline_responder(&router, m2, rx2, "stage2".to_string());
1758 spawn_pipeline_responder(&router, m3, rx3, "stage3".to_string());
1759
1760 let troop_id = mgr.form_troop(
1761 "PL_Result".to_string(),
1762 m1,
1763 vec![m2, m3],
1764 CoordinationStrategy::Pipeline,
1765 );
1766
1767 let result = mgr
1768 .assign_task_async(&troop_id, "initial")
1769 .await
1770 .expect("should complete pipeline");
1771
1772 assert_eq!(result.assigned_to.len(), 3);
1773 assert_eq!(result.results.len(), 3);
1774
1775 let (_, r1) = &result.results[0];
1777 let (_, r2) = &result.results[1];
1778 let (_, r3) = &result.results[2];
1779 assert_eq!(r1, "initial+stage1");
1780 assert_eq!(r2, "initial+stage1+stage2");
1781 assert_eq!(r3, "initial+stage1+stage2+stage3");
1782 }
1783
1784 #[tokio::test]
1785 async fn test_consensus_tallies_votes() {
1786 let (mgr, router) = make_manager_with_router();
1787 let m1 = FighterId::new();
1788 let m2 = FighterId::new();
1789 let m3 = FighterId::new();
1790
1791 let rx1 = router.register(m1);
1792 let rx2 = router.register(m2);
1793 let rx3 = router.register(m3);
1794 spawn_vote_responder(&router, m1, rx1, "approve".to_string());
1795 spawn_vote_responder(&router, m2, rx2, "approve".to_string());
1796 spawn_vote_responder(&router, m3, rx3, "reject".to_string());
1797
1798 let troop_id = mgr.form_troop(
1799 "CN_Result".to_string(),
1800 m1,
1801 vec![m2, m3],
1802 CoordinationStrategy::Consensus,
1803 );
1804
1805 let result = mgr
1806 .assign_task_async(&troop_id, "should we merge?")
1807 .await
1808 .expect("should assign");
1809
1810 assert_eq!(result.assigned_to.len(), 3);
1811 assert_eq!(result.results.len(), 3);
1812 assert!(result.routing_decision.contains("approve"));
1813
1814 let approve_count = result
1815 .results
1816 .iter()
1817 .filter(|(_, v)| v == "approve")
1818 .count();
1819 let reject_count = result.results.iter().filter(|(_, v)| v == "reject").count();
1820 assert_eq!(approve_count, 2);
1821 assert_eq!(reject_count, 1);
1822 }
1823
1824 #[tokio::test]
1825 async fn test_consensus_majority_wins() {
1826 let mgr = make_manager();
1827
1828 let m1 = FighterId::new();
1829 let m2 = FighterId::new();
1830 let m3 = FighterId::new();
1831
1832 let votes = vec![
1833 (m1, "approve".to_string()),
1834 (m2, "approve".to_string()),
1835 (m3, "reject".to_string()),
1836 ];
1837
1838 let winner = mgr.tally_votes(&votes);
1839 assert_eq!(winner, Some("approve".to_string()));
1840 }
1841
1842 #[tokio::test]
1843 async fn test_consensus_empty_votes() {
1844 let mgr = make_manager();
1845 let winner = mgr.tally_votes(&[]);
1846 assert!(winner.is_none());
1847 }
1848
1849 #[tokio::test]
1850 async fn test_specialist_routes_and_collects_result() {
1851 let (mgr, router) = make_manager_with_router();
1852 let leader = FighterId::new();
1853 let coder = FighterId::new();
1854 let reviewer = FighterId::new();
1855
1856 let rx_leader = router.register(leader);
1857 let rx_coder = router.register(coder);
1858 let rx_reviewer = router.register(reviewer);
1859 spawn_task_responder(&router, leader, rx_leader);
1860 spawn_task_responder(&router, coder, rx_coder);
1861 spawn_task_responder(&router, reviewer, rx_reviewer);
1862
1863 mgr.register_capabilities(coder, vec!["code".to_string(), "rust".to_string()]);
1864 mgr.register_capabilities(reviewer, vec!["review".to_string(), "testing".to_string()]);
1865
1866 let troop_id = mgr.form_troop(
1867 "SP_Result".to_string(),
1868 leader,
1869 vec![coder, reviewer],
1870 CoordinationStrategy::Specialist,
1871 );
1872
1873 let result = mgr
1875 .assign_task_async(&troop_id, "write some rust code")
1876 .await
1877 .expect("should assign");
1878 assert_eq!(result.assigned_to, vec![coder]);
1879 assert_eq!(result.results.len(), 1);
1880 assert_eq!(result.results[0].0, coder);
1881 assert!(result.routing_decision.contains("capability match"));
1882
1883 let result = mgr
1885 .assign_task_async(&troop_id, "please review this PR")
1886 .await
1887 .expect("should assign");
1888 assert_eq!(result.assigned_to, vec![reviewer]);
1889 assert_eq!(result.results.len(), 1);
1890 assert_eq!(result.results[0].0, reviewer);
1891 }
1892
1893 #[tokio::test]
1894 async fn test_specialist_defaults_to_leader_no_match() {
1895 let (mgr, router) = make_manager_with_router();
1896 let leader = FighterId::new();
1897 let specialist = FighterId::new();
1898
1899 let rx1 = router.register(leader);
1900 let rx2 = router.register(specialist);
1901 spawn_task_responder(&router, leader, rx1);
1902 spawn_task_responder(&router, specialist, rx2);
1903
1904 mgr.register_capabilities(specialist, vec!["database".to_string()]);
1905
1906 let troop_id = mgr.form_troop(
1907 "SP_Default".to_string(),
1908 leader,
1909 vec![specialist],
1910 CoordinationStrategy::Specialist,
1911 );
1912
1913 let result = mgr
1914 .assign_task_async(&troop_id, "fix CSS styling")
1915 .await
1916 .expect("should assign");
1917 assert_eq!(result.assigned_to, vec![leader]);
1918 assert_eq!(result.results.len(), 1);
1919 assert!(result.routing_decision.contains("defaulted to leader"));
1920 }
1921
1922 #[tokio::test]
1923 async fn test_leader_worker_collects_results() {
1924 let (mgr, router) = make_manager_with_router();
1925 let leader = FighterId::new();
1926 let w1 = FighterId::new();
1927 let w2 = FighterId::new();
1928
1929 let rx_leader = router.register(leader);
1930 let rx_w1 = router.register(w1);
1931 let rx_w2 = router.register(w2);
1932 spawn_task_responder(&router, leader, rx_leader);
1933 spawn_task_responder(&router, w1, rx_w1);
1934 spawn_task_responder(&router, w2, rx_w2);
1935
1936 let troop_id = mgr.form_troop(
1937 "LW_Result".to_string(),
1938 leader,
1939 vec![w1, w2],
1940 CoordinationStrategy::LeaderWorker,
1941 );
1942
1943 let result = mgr
1944 .assign_task_async(&troop_id, "analyze this code")
1945 .await
1946 .expect("should assign");
1947
1948 assert!(result.assigned_to.contains(&w1));
1949 assert!(result.assigned_to.contains(&w2));
1950 assert!(!result.assigned_to.contains(&leader));
1951 assert_eq!(result.results.len(), 2);
1952 assert!(result.routing_decision.contains("leader_worker"));
1953
1954 let result_ids: Vec<FighterId> = result.results.iter().map(|(id, _)| *id).collect();
1955 assert!(result_ids.contains(&w1));
1956 assert!(result_ids.contains(&w2));
1957 }
1958
1959 #[tokio::test]
1960 async fn test_leader_worker_solo_collects_result() {
1961 let (mgr, router) = make_manager_with_router();
1962 let leader = FighterId::new();
1963 let rx = router.register(leader);
1964 spawn_task_responder(&router, leader, rx);
1965
1966 let troop_id = mgr.form_troop(
1967 "Solo_LW_Result".to_string(),
1968 leader,
1969 vec![],
1970 CoordinationStrategy::LeaderWorker,
1971 );
1972
1973 let result = mgr
1974 .assign_task_async(&troop_id, "solo work")
1975 .await
1976 .expect("should assign");
1977 assert_eq!(result.assigned_to, vec![leader]);
1978 assert_eq!(result.results.len(), 1);
1979 assert_eq!(result.results[0].0, leader);
1980 assert!(result.routing_decision.contains("solo"));
1981 }
1982
1983 #[tokio::test]
1984 async fn test_timeout_when_fighter_does_not_respond() {
1985 let (mut mgr, router) = make_manager_with_router();
1986 mgr.set_task_timeout(Duration::from_millis(100));
1987 let m1 = FighterId::new();
1988
1989 let _rx = router.register(m1);
1991
1992 let troop_id = mgr.form_troop(
1993 "Timeout_Test".to_string(),
1994 m1,
1995 vec![],
1996 CoordinationStrategy::RoundRobin,
1997 );
1998
1999 let result = mgr
2000 .assign_task_async(&troop_id, "this will timeout")
2001 .await
2002 .expect("should still return a result, just with empty results");
2003
2004 assert_eq!(result.assigned_to.len(), 1);
2006 assert_eq!(result.results.len(), 0);
2007 }
2008
2009 #[tokio::test]
2010 async fn test_pipeline_timeout_returns_error() {
2011 let (mut mgr, router) = make_manager_with_router();
2012 mgr.set_task_timeout(Duration::from_millis(100));
2013
2014 let m1 = FighterId::new();
2015 let m2 = FighterId::new();
2016
2017 let rx1 = router.register(m1);
2019 let _rx2 = router.register(m2);
2020 spawn_pipeline_responder(&router, m1, rx1, "stage1".to_string());
2021
2022 let troop_id = mgr.form_troop(
2023 "PL_Timeout".to_string(),
2024 m1,
2025 vec![m2],
2026 CoordinationStrategy::Pipeline,
2027 );
2028
2029 let result = mgr.assign_task_async(&troop_id, "input").await;
2030
2031 assert!(result.is_err());
2033 let err = result.unwrap_err().to_string();
2034 assert!(err.contains("pipeline stage"));
2035 }
2036
2037 #[tokio::test]
2038 async fn test_broadcast_partial_timeout() {
2039 let (mut mgr, router) = make_manager_with_router();
2040 mgr.set_task_timeout(Duration::from_millis(100));
2041
2042 let m1 = FighterId::new();
2043 let m2 = FighterId::new();
2044
2045 let rx1 = router.register(m1);
2046 let _rx2 = router.register(m2); spawn_task_responder(&router, m1, rx1);
2048
2049 let troop_id = mgr.form_troop(
2050 "BC_Partial".to_string(),
2051 m1,
2052 vec![m2],
2053 CoordinationStrategy::Broadcast,
2054 );
2055
2056 let result = mgr
2057 .assign_task_async(&troop_id, "broadcast partial")
2058 .await
2059 .expect("should succeed with partial results");
2060
2061 assert_eq!(result.assigned_to.len(), 2);
2062 assert_eq!(result.results.len(), 1);
2064 assert_eq!(result.results[0].0, m1);
2065 }
2066
2067 #[tokio::test]
2068 async fn test_round_robin_distributes_evenly() {
2069 let (mgr, router) = make_manager_with_router();
2070 let m1 = FighterId::new();
2071 let m2 = FighterId::new();
2072 let m3 = FighterId::new();
2073 let rx1 = router.register(m1);
2074 let rx2 = router.register(m2);
2075 let rx3 = router.register(m3);
2076 spawn_task_responder(&router, m1, rx1);
2077 spawn_task_responder(&router, m2, rx2);
2078 spawn_task_responder(&router, m3, rx3);
2079
2080 let troop_id = mgr.form_troop(
2081 "RR_Dispatch".to_string(),
2082 m1,
2083 vec![m2, m3],
2084 CoordinationStrategy::RoundRobin,
2085 );
2086
2087 let mut assignment_counts: HashMap<FighterId, usize> = HashMap::new();
2088
2089 for i in 0..9 {
2090 let result = mgr
2091 .assign_task_async(&troop_id, &format!("task {}", i))
2092 .await
2093 .expect("should assign");
2094 assert_eq!(result.assigned_to.len(), 1);
2095 assert_eq!(result.results.len(), 1);
2096 *assignment_counts.entry(result.assigned_to[0]).or_insert(0) += 1;
2097 }
2098
2099 for count in assignment_counts.values() {
2100 assert_eq!(*count, 3);
2101 }
2102 }
2103
2104 #[tokio::test]
2105 async fn test_empty_troop_assign_fails() {
2106 let mgr = make_manager();
2107 let leader = FighterId::new();
2108 let troop_id = mgr.form_troop(
2109 "EmptyTest".to_string(),
2110 leader,
2111 vec![],
2112 CoordinationStrategy::Broadcast,
2113 );
2114 mgr.disband_troop(&troop_id).expect("should disband");
2115
2116 let result = mgr.assign_task_async(&troop_id, "task").await;
2117 assert!(result.is_err());
2118 }
2119
2120 #[tokio::test]
2121 async fn test_leader_worker_decomposition_fan_out() {
2122 let (mgr, router) = make_manager_with_router();
2123 let leader = FighterId::new();
2124 let w1 = FighterId::new();
2125 let w2 = FighterId::new();
2126 let w3 = FighterId::new();
2127
2128 let rx_leader = router.register(leader);
2129 let rx_w1 = router.register(w1);
2130 let rx_w2 = router.register(w2);
2131 let rx_w3 = router.register(w3);
2132 spawn_task_responder(&router, leader, rx_leader);
2133 spawn_task_responder(&router, w1, rx_w1);
2134 spawn_task_responder(&router, w2, rx_w2);
2135 spawn_task_responder(&router, w3, rx_w3);
2136
2137 let troop_id = mgr.form_troop(
2138 "LW_Fanout".to_string(),
2139 leader,
2140 vec![w1, w2, w3],
2141 CoordinationStrategy::LeaderWorker,
2142 );
2143
2144 let result = mgr
2145 .assign_task_async(&troop_id, "Step one. Step two. Step three. Step four.")
2146 .await
2147 .expect("should assign");
2148
2149 assert_eq!(result.assigned_to.len(), 3);
2151 assert_eq!(result.results.len(), 3);
2152 }
2153}