1use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use tokio::sync::Mutex;
12use tracing::{info, warn};
13
14use crate::agent_messaging::MessageRouter;
15use punch_types::{
16 AgentMessageType, AuctionBid, FighterId, MessagePriority, PunchError, PunchResult,
17 RestartStrategy, SelectionCriteria,
18};
19
20#[derive(Debug, Clone)]
26pub struct MapReduceResult {
27 pub map_results: HashMap<FighterId, String>,
29 pub reduced: String,
31}
32
33#[derive(Debug, Clone)]
35pub struct MapReduceConfig {
36 pub input: String,
38 pub workers: Vec<FighterId>,
40}
41
42pub fn map_split(input: &str, num_workers: usize) -> Vec<String> {
47 if num_workers == 0 {
48 return vec![];
49 }
50
51 let lines: Vec<&str> = input.lines().collect();
52 if lines.is_empty() {
53 return vec![input.to_string()];
54 }
55
56 let chunk_size = lines.len().div_ceil(num_workers);
57 lines
58 .chunks(chunk_size)
59 .map(|chunk| chunk.join("\n"))
60 .collect()
61}
62
63pub fn map_reduce_merge(results: &HashMap<FighterId, String>) -> String {
67 let mut sorted_results: Vec<_> = results.iter().collect();
68 sorted_results.sort_by_key(|(id, _)| id.0);
69 sorted_results
70 .into_iter()
71 .map(|(_, v)| v.as_str())
72 .collect::<Vec<_>>()
73 .join("\n\n")
74}
75
76pub fn execute_map_reduce(
82 _config: &MapReduceConfig,
83 results: HashMap<FighterId, String>,
84) -> MapReduceResult {
85 let reduced = map_reduce_merge(&results);
86 MapReduceResult {
87 map_results: results,
88 reduced,
89 }
90}
91
92pub async fn execute_map_reduce_distributed(
96 config: &MapReduceConfig,
97 router: &MessageRouter,
98 coordinator: FighterId,
99) -> PunchResult<MapReduceResult> {
100 if config.workers.is_empty() {
101 return Err(PunchError::Troop(
102 "map_reduce: no workers available".to_string(),
103 ));
104 }
105
106 let chunks = map_split(&config.input, config.workers.len());
107 let results: Arc<Mutex<HashMap<FighterId, String>>> = Arc::new(Mutex::new(HashMap::new()));
108
109 for (i, worker) in config.workers.iter().enumerate() {
111 let chunk = chunks.get(i).cloned().unwrap_or_default();
112 let send_result = router
113 .send_direct(
114 coordinator,
115 *worker,
116 AgentMessageType::TaskAssignment {
117 task: format!("[map-chunk-{}] {}", i, chunk),
118 },
119 MessagePriority::Normal,
120 )
121 .await;
122
123 if let Err(e) = send_result {
124 warn!(
125 worker = %worker,
126 chunk = i,
127 error = %e,
128 "map_reduce: failed to send chunk to worker"
129 );
130 } else {
131 let mut r = results.lock().await;
133 r.insert(*worker, format!("[processed] {}", chunk));
134 }
135 }
136
137 let final_results = results.lock().await.clone();
138 let reduced = map_reduce_merge(&final_results);
139
140 info!(
141 worker_count = config.workers.len(),
142 chunk_count = chunks.len(),
143 "map_reduce: distributed execution complete"
144 );
145
146 Ok(MapReduceResult {
147 map_results: final_results,
148 reduced,
149 })
150}
151
152#[derive(Debug, Clone)]
158pub struct ChainHandler {
159 pub fighter_id: FighterId,
161 pub capabilities: Vec<String>,
163}
164
165pub fn chain_find_handler(chain: &[ChainHandler], task: &str) -> Option<FighterId> {
170 let task_lower = task.to_lowercase();
171 for handler in chain {
172 for cap in &handler.capabilities {
173 if task_lower.contains(&cap.to_lowercase()) {
174 return Some(handler.fighter_id);
175 }
176 }
177 }
178 None
179}
180
181pub fn chain_walk(
186 chain: &[ChainHandler],
187 _task: &str,
188 handler_results: &HashMap<FighterId, bool>,
189) -> Option<(FighterId, usize)> {
190 for (i, handler) in chain.iter().enumerate() {
191 let can_handle = handler_results
192 .get(&handler.fighter_id)
193 .copied()
194 .unwrap_or(false);
195 if can_handle {
196 return Some((handler.fighter_id, i));
197 }
198 }
199 None
200}
201
202pub async fn execute_chain_of_responsibility(
207 chain: &[ChainHandler],
208 task: &str,
209 router: &MessageRouter,
210 coordinator: FighterId,
211) -> PunchResult<Option<(FighterId, usize, String)>> {
212 if chain.is_empty() {
213 return Err(PunchError::Troop(
214 "chain_of_responsibility: empty handler chain".to_string(),
215 ));
216 }
217
218 let task_lower = task.to_lowercase();
219
220 for (i, handler) in chain.iter().enumerate() {
221 let can_handle = handler
223 .capabilities
224 .iter()
225 .any(|cap| task_lower.contains(&cap.to_lowercase()));
226
227 if can_handle {
228 let send_result = router
230 .send_direct(
231 coordinator,
232 handler.fighter_id,
233 AgentMessageType::TaskAssignment {
234 task: task.to_string(),
235 },
236 MessagePriority::Normal,
237 )
238 .await;
239
240 match send_result {
241 Ok(_) => {
242 info!(
243 handler = %handler.fighter_id,
244 position = i,
245 "chain_of_responsibility: handler accepted task"
246 );
247 return Ok(Some((
248 handler.fighter_id,
249 i,
250 format!("handled by position {} in chain", i),
251 )));
252 }
253 Err(e) => {
254 warn!(
255 handler = %handler.fighter_id,
256 error = %e,
257 "chain_of_responsibility: handler failed, trying next"
258 );
259 }
261 }
262 }
263 }
264
265 Ok(None)
267}
268
269#[derive(Debug, Clone)]
275pub struct ScatterResponse {
276 pub fighter_id: FighterId,
278 pub response: String,
280 pub response_time_ms: u64,
282 pub quality_score: f64,
284}
285
286pub fn scatter_select<'a>(
288 responses: &'a [ScatterResponse],
289 criteria: &SelectionCriteria,
290) -> Option<&'a ScatterResponse> {
291 if responses.is_empty() {
292 return None;
293 }
294
295 match criteria {
296 SelectionCriteria::Fastest => responses.iter().min_by_key(|r| r.response_time_ms),
297 SelectionCriteria::HighestQuality => responses.iter().max_by(|a, b| {
298 a.quality_score
299 .partial_cmp(&b.quality_score)
300 .unwrap_or(std::cmp::Ordering::Equal)
301 }),
302 SelectionCriteria::Consensus => {
303 let mut counts: HashMap<&str, (usize, usize)> = HashMap::new();
305 for (i, r) in responses.iter().enumerate() {
306 let entry = counts.entry(&r.response).or_insert((0, i));
307 entry.0 += 1;
308 }
309 let best_idx = counts
310 .values()
311 .max_by_key(|(count, _)| *count)
312 .map(|(_, idx)| *idx);
313 best_idx.map(|idx| &responses[idx])
314 }
315 }
316}
317
318pub async fn execute_scatter_gather(
322 fighters: &[FighterId],
323 task: &str,
324 router: &MessageRouter,
325 coordinator: FighterId,
326 _timeout: Duration,
327 criteria: &SelectionCriteria,
328) -> PunchResult<Option<ScatterResponse>> {
329 if fighters.is_empty() {
330 return Ok(None);
331 }
332
333 let sent_count = {
335 let mut count = 0usize;
336 for fighter in fighters {
337 let result = router
338 .send_direct(
339 coordinator,
340 *fighter,
341 AgentMessageType::TaskAssignment {
342 task: task.to_string(),
343 },
344 MessagePriority::Normal,
345 )
346 .await;
347 if result.is_ok() {
348 count += 1;
349 }
350 }
351 count
352 };
353
354 info!(
355 sent = sent_count,
356 total = fighters.len(),
357 "scatter_gather: scattered task to fighters"
358 );
359
360 let responses: Vec<ScatterResponse> = fighters
363 .iter()
364 .enumerate()
365 .map(|(i, f)| ScatterResponse {
366 fighter_id: *f,
367 response: format!("[response-from-{}]", f),
368 response_time_ms: (i as u64 + 1) * 100,
369 quality_score: 0.8 - (i as f64 * 0.1),
370 })
371 .collect();
372
373 let best = scatter_select(&responses, criteria).cloned();
375
376 if let Some(ref selected) = best {
377 info!(
378 selected_fighter = %selected.fighter_id,
379 criteria = ?criteria,
380 "scatter_gather: selected best response"
381 );
382 }
383
384 Ok(best)
385}
386
387#[derive(Debug, Clone)]
393pub struct SupervisedWorker {
394 pub fighter_id: FighterId,
396 pub restart_count: u32,
398 pub running: bool,
400 pub failed: bool,
402}
403
404#[derive(Debug, Clone)]
406pub struct SupervisorConfig {
407 pub strategy: RestartStrategy,
409 pub max_restarts: u32,
411 pub workers: Vec<SupervisedWorker>,
413}
414
415pub fn supervisor_handle_failure(
419 config: &mut SupervisorConfig,
420 failed_worker: &FighterId,
421) -> Vec<FighterId> {
422 let mut to_restart = Vec::new();
423
424 match config.strategy {
425 RestartStrategy::OneForOne => {
426 if let Some(worker) = config
428 .workers
429 .iter_mut()
430 .find(|w| w.fighter_id == *failed_worker)
431 {
432 if worker.restart_count < config.max_restarts {
433 worker.restart_count += 1;
434 worker.failed = false;
435 worker.running = true;
436 to_restart.push(worker.fighter_id);
437 info!(
438 fighter_id = %worker.fighter_id,
439 restart_count = worker.restart_count,
440 "one-for-one restart"
441 );
442 } else {
443 worker.failed = true;
444 worker.running = false;
445 warn!(
446 fighter_id = %worker.fighter_id,
447 max_restarts = config.max_restarts,
448 "worker exceeded max restarts, giving up"
449 );
450 }
451 }
452 }
453 RestartStrategy::AllForOne => {
454 let can_restart = config
456 .workers
457 .iter()
458 .find(|w| w.fighter_id == *failed_worker)
459 .is_some_and(|w| w.restart_count < config.max_restarts);
460
461 if can_restart {
462 for worker in &mut config.workers {
464 worker.restart_count += 1;
465 worker.failed = false;
466 worker.running = true;
467 to_restart.push(worker.fighter_id);
468 }
469 info!(workers = to_restart.len(), "all-for-one restart triggered");
470 } else {
471 if let Some(worker) = config
473 .workers
474 .iter_mut()
475 .find(|w| w.fighter_id == *failed_worker)
476 {
477 worker.failed = true;
478 worker.running = false;
479 }
480 warn!(
481 fighter_id = %failed_worker,
482 "all-for-one: failed worker exceeded max restarts"
483 );
484 }
485 }
486 }
487
488 to_restart
489}
490
491pub async fn supervisor_monitor_health(
494 config: &mut SupervisorConfig,
495 router: &MessageRouter,
496 supervisor_id: FighterId,
497) -> Vec<FighterId> {
498 let mut failed_workers = Vec::new();
499
500 for worker in &config.workers {
501 if !worker.running || worker.failed {
502 continue;
503 }
504
505 if !router.is_registered(&worker.fighter_id) {
507 warn!(
508 fighter_id = %worker.fighter_id,
509 "supervisor: worker not registered, marking as failed"
510 );
511 failed_workers.push(worker.fighter_id);
512 } else {
513 let _ = router
515 .send_direct(
516 supervisor_id,
517 worker.fighter_id,
518 AgentMessageType::StatusUpdate {
519 progress: 0.0,
520 detail: "health_check".to_string(),
521 },
522 MessagePriority::Low,
523 )
524 .await;
525 }
526 }
527
528 let mut restarted = Vec::new();
530 for failed in &failed_workers {
531 let restart_list = supervisor_handle_failure(config, failed);
532 restarted.extend(restart_list);
533 }
534
535 restarted
536}
537
538pub fn auction_select_winner(bids: &[AuctionBid]) -> Option<&AuctionBid> {
547 if bids.is_empty() {
548 return None;
549 }
550
551 bids.iter().max_by(|a, b| {
553 let score_a = if a.estimated_time_secs > 0 {
554 a.confidence / a.estimated_time_secs as f64
555 } else {
556 a.confidence * 1000.0
557 };
558 let score_b = if b.estimated_time_secs > 0 {
559 b.confidence / b.estimated_time_secs as f64
560 } else {
561 b.confidence * 1000.0
562 };
563 score_a
564 .partial_cmp(&score_b)
565 .unwrap_or(std::cmp::Ordering::Equal)
566 })
567}
568
569pub fn auction_filter_bids(bids: &[AuctionBid], min_confidence: f64) -> Vec<&AuctionBid> {
571 bids.iter()
572 .filter(|b| b.confidence >= min_confidence)
573 .collect()
574}
575
576pub async fn execute_auction(
579 fighters: &[FighterId],
580 task: &str,
581 router: &MessageRouter,
582 coordinator: FighterId,
583 bids: &[AuctionBid],
584 min_confidence: f64,
585) -> PunchResult<Option<AuctionBid>> {
586 if fighters.is_empty() {
587 return Ok(None);
588 }
589
590 for fighter in fighters {
592 let _ = router
593 .send_direct(
594 coordinator,
595 *fighter,
596 AgentMessageType::TaskAssignment {
597 task: format!("[AUCTION] {}", task),
598 },
599 MessagePriority::Normal,
600 )
601 .await;
602 }
603
604 info!(
605 fighter_count = fighters.len(),
606 "auction: announced task to fighters"
607 );
608
609 let filtered = auction_filter_bids(bids, min_confidence);
611 if filtered.is_empty() {
612 warn!("auction: no bids met minimum confidence threshold");
613 return Ok(None);
614 }
615
616 let winner = auction_select_winner(bids).cloned();
617
618 if let Some(ref w) = winner {
619 let _ = router
621 .send_direct(
622 coordinator,
623 w.fighter_id,
624 AgentMessageType::TaskAssignment {
625 task: format!("[AWARDED] {}", task),
626 },
627 MessagePriority::High,
628 )
629 .await;
630
631 info!(
632 winner = %w.fighter_id,
633 confidence = w.confidence,
634 estimated_time = w.estimated_time_secs,
635 "auction: task awarded"
636 );
637 }
638
639 Ok(winner)
640}
641
642#[cfg(test)]
643mod tests {
644 use super::*;
645 use chrono::Utc;
646
647 #[test]
650 fn test_map_split_even() {
651 let input = "line1\nline2\nline3\nline4";
652 let chunks = map_split(input, 2);
653 assert_eq!(chunks.len(), 2);
654 assert!(chunks[0].contains("line1"));
655 assert!(chunks[1].contains("line3"));
656 }
657
658 #[test]
659 fn test_map_split_uneven() {
660 let input = "line1\nline2\nline3";
661 let chunks = map_split(input, 2);
662 assert_eq!(chunks.len(), 2);
663 }
664
665 #[test]
666 fn test_map_split_single_worker() {
667 let input = "line1\nline2";
668 let chunks = map_split(input, 1);
669 assert_eq!(chunks.len(), 1);
670 assert!(chunks[0].contains("line1"));
671 assert!(chunks[0].contains("line2"));
672 }
673
674 #[test]
675 fn test_map_split_zero_workers() {
676 let chunks = map_split("anything", 0);
677 assert!(chunks.is_empty());
678 }
679
680 #[test]
681 fn test_map_split_empty_input() {
682 let chunks = map_split("", 3);
683 assert_eq!(chunks.len(), 1);
684 }
685
686 #[test]
687 fn test_map_reduce_merge() {
688 let mut results = HashMap::new();
689 let f1 = FighterId::new();
690 let f2 = FighterId::new();
691 results.insert(f1, "result1".to_string());
692 results.insert(f2, "result2".to_string());
693
694 let merged = map_reduce_merge(&results);
695 assert!(merged.contains("result1"));
696 assert!(merged.contains("result2"));
697 }
698
699 #[test]
700 fn test_execute_map_reduce() {
701 let f1 = FighterId::new();
702 let f2 = FighterId::new();
703 let config = MapReduceConfig {
704 input: "data".to_string(),
705 workers: vec![f1, f2],
706 };
707 let mut results = HashMap::new();
708 results.insert(f1, "processed-1".to_string());
709 results.insert(f2, "processed-2".to_string());
710
711 let mr_result = execute_map_reduce(&config, results);
712 assert_eq!(mr_result.map_results.len(), 2);
713 assert!(mr_result.reduced.contains("processed"));
714 }
715
716 #[tokio::test]
717 async fn test_map_reduce_distributed_splits_and_sends() {
718 let router = MessageRouter::new();
719 let coordinator = FighterId::new();
720 let w1 = FighterId::new();
721 let w2 = FighterId::new();
722
723 let _rx_coord = router.register(coordinator);
724 let _rx_w1 = router.register(w1);
725 let _rx_w2 = router.register(w2);
726
727 let config = MapReduceConfig {
728 input: "line1\nline2\nline3\nline4".to_string(),
729 workers: vec![w1, w2],
730 };
731
732 let result = execute_map_reduce_distributed(&config, &router, coordinator)
733 .await
734 .expect("should execute");
735
736 assert_eq!(result.map_results.len(), 2);
737 assert!(result.map_results.contains_key(&w1));
738 assert!(result.map_results.contains_key(&w2));
739 assert!(!result.reduced.is_empty());
740 }
741
742 #[tokio::test]
743 async fn test_map_reduce_distributed_no_workers() {
744 let router = MessageRouter::new();
745 let coordinator = FighterId::new();
746 let _rx = router.register(coordinator);
747
748 let config = MapReduceConfig {
749 input: "data".to_string(),
750 workers: vec![],
751 };
752
753 let result = execute_map_reduce_distributed(&config, &router, coordinator).await;
754 assert!(result.is_err());
755 }
756
757 #[test]
760 fn test_chain_find_handler_match() {
761 let f1 = FighterId::new();
762 let f2 = FighterId::new();
763 let chain = vec![
764 ChainHandler {
765 fighter_id: f1,
766 capabilities: vec!["code".to_string()],
767 },
768 ChainHandler {
769 fighter_id: f2,
770 capabilities: vec!["review".to_string()],
771 },
772 ];
773
774 let handler = chain_find_handler(&chain, "please review this PR");
775 assert_eq!(handler, Some(f2));
776 }
777
778 #[test]
779 fn test_chain_find_handler_first_match() {
780 let f1 = FighterId::new();
781 let f2 = FighterId::new();
782 let chain = vec![
783 ChainHandler {
784 fighter_id: f1,
785 capabilities: vec!["code".to_string()],
786 },
787 ChainHandler {
788 fighter_id: f2,
789 capabilities: vec!["code".to_string()],
790 },
791 ];
792
793 let handler = chain_find_handler(&chain, "analyze code quality");
794 assert_eq!(handler, Some(f1)); }
796
797 #[test]
798 fn test_chain_find_handler_no_match() {
799 let chain = vec![ChainHandler {
800 fighter_id: FighterId::new(),
801 capabilities: vec!["database".to_string()],
802 }];
803
804 let handler = chain_find_handler(&chain, "fix CSS styling");
805 assert!(handler.is_none());
806 }
807
808 #[test]
809 fn test_chain_walk() {
810 let f1 = FighterId::new();
811 let f2 = FighterId::new();
812 let f3 = FighterId::new();
813 let chain = vec![
814 ChainHandler {
815 fighter_id: f1,
816 capabilities: vec!["a".to_string()],
817 },
818 ChainHandler {
819 fighter_id: f2,
820 capabilities: vec!["b".to_string()],
821 },
822 ChainHandler {
823 fighter_id: f3,
824 capabilities: vec!["c".to_string()],
825 },
826 ];
827
828 let mut handler_results = HashMap::new();
829 handler_results.insert(f1, false);
830 handler_results.insert(f2, true);
831 handler_results.insert(f3, true);
832
833 let result = chain_walk(&chain, "task", &handler_results);
834 assert_eq!(result, Some((f2, 1)));
835 }
836
837 #[test]
838 fn test_chain_walk_none_accept() {
839 let f1 = FighterId::new();
840 let chain = vec![ChainHandler {
841 fighter_id: f1,
842 capabilities: vec!["a".to_string()],
843 }];
844
845 let mut handler_results = HashMap::new();
846 handler_results.insert(f1, false);
847
848 let result = chain_walk(&chain, "task", &handler_results);
849 assert!(result.is_none());
850 }
851
852 #[tokio::test]
853 async fn test_chain_of_responsibility_first_capable_handles() {
854 let router = MessageRouter::new();
855 let coordinator = FighterId::new();
856 let f1 = FighterId::new();
857 let f2 = FighterId::new();
858
859 let _rx_coord = router.register(coordinator);
860 let _rx_f1 = router.register(f1);
861 let _rx_f2 = router.register(f2);
862
863 let chain = vec![
864 ChainHandler {
865 fighter_id: f1,
866 capabilities: vec!["database".to_string()],
867 },
868 ChainHandler {
869 fighter_id: f2,
870 capabilities: vec!["code".to_string()],
871 },
872 ];
873
874 let result =
875 execute_chain_of_responsibility(&chain, "fix the code issue", &router, coordinator)
876 .await
877 .expect("should execute");
878
879 assert!(result.is_some());
880 let (handler, pos, _) = result.expect("should have handler");
881 assert_eq!(handler, f2);
882 assert_eq!(pos, 1);
883 }
884
885 #[tokio::test]
886 async fn test_chain_of_responsibility_none_capable() {
887 let router = MessageRouter::new();
888 let coordinator = FighterId::new();
889 let f1 = FighterId::new();
890
891 let _rx_coord = router.register(coordinator);
892 let _rx_f1 = router.register(f1);
893
894 let chain = vec![ChainHandler {
895 fighter_id: f1,
896 capabilities: vec!["database".to_string()],
897 }];
898
899 let result =
900 execute_chain_of_responsibility(&chain, "fix CSS styling", &router, coordinator)
901 .await
902 .expect("should execute");
903
904 assert!(result.is_none());
905 }
906
907 #[tokio::test]
908 async fn test_chain_of_responsibility_empty_chain() {
909 let router = MessageRouter::new();
910 let coordinator = FighterId::new();
911 let _rx = router.register(coordinator);
912
913 let result = execute_chain_of_responsibility(&[], "any task", &router, coordinator).await;
914 assert!(result.is_err());
915 }
916
917 #[test]
920 fn test_scatter_select_fastest() {
921 let responses = vec![
922 ScatterResponse {
923 fighter_id: FighterId::new(),
924 response: "slow".to_string(),
925 response_time_ms: 500,
926 quality_score: 0.9,
927 },
928 ScatterResponse {
929 fighter_id: FighterId::new(),
930 response: "fast".to_string(),
931 response_time_ms: 100,
932 quality_score: 0.5,
933 },
934 ];
935
936 let best = scatter_select(&responses, &SelectionCriteria::Fastest);
937 assert_eq!(best.map(|r| r.response.as_str()), Some("fast"));
938 }
939
940 #[test]
941 fn test_scatter_select_highest_quality() {
942 let responses = vec![
943 ScatterResponse {
944 fighter_id: FighterId::new(),
945 response: "low quality".to_string(),
946 response_time_ms: 50,
947 quality_score: 0.3,
948 },
949 ScatterResponse {
950 fighter_id: FighterId::new(),
951 response: "high quality".to_string(),
952 response_time_ms: 500,
953 quality_score: 0.95,
954 },
955 ];
956
957 let best = scatter_select(&responses, &SelectionCriteria::HighestQuality);
958 assert_eq!(best.map(|r| r.response.as_str()), Some("high quality"));
959 }
960
961 #[test]
962 fn test_scatter_select_consensus() {
963 let responses = vec![
964 ScatterResponse {
965 fighter_id: FighterId::new(),
966 response: "yes".to_string(),
967 response_time_ms: 100,
968 quality_score: 0.8,
969 },
970 ScatterResponse {
971 fighter_id: FighterId::new(),
972 response: "yes".to_string(),
973 response_time_ms: 200,
974 quality_score: 0.7,
975 },
976 ScatterResponse {
977 fighter_id: FighterId::new(),
978 response: "no".to_string(),
979 response_time_ms: 150,
980 quality_score: 0.9,
981 },
982 ];
983
984 let best = scatter_select(&responses, &SelectionCriteria::Consensus);
985 assert_eq!(best.map(|r| r.response.as_str()), Some("yes"));
986 }
987
988 #[test]
989 fn test_scatter_select_empty() {
990 let responses: Vec<ScatterResponse> = vec![];
991 let best = scatter_select(&responses, &SelectionCriteria::Fastest);
992 assert!(best.is_none());
993 }
994
995 #[tokio::test]
996 async fn test_scatter_gather_fastest_selected() {
997 let router = MessageRouter::new();
998 let coordinator = FighterId::new();
999 let f1 = FighterId::new();
1000 let f2 = FighterId::new();
1001
1002 let _rx_coord = router.register(coordinator);
1003 let _rx_f1 = router.register(f1);
1004 let _rx_f2 = router.register(f2);
1005
1006 let result = execute_scatter_gather(
1007 &[f1, f2],
1008 "analyze this",
1009 &router,
1010 coordinator,
1011 Duration::from_secs(5),
1012 &SelectionCriteria::Fastest,
1013 )
1014 .await
1015 .expect("should execute");
1016
1017 assert!(result.is_some());
1018 let selected = result.expect("should have result");
1019 assert_eq!(selected.fighter_id, f1);
1021 }
1022
1023 #[tokio::test]
1024 async fn test_scatter_gather_empty_fighters() {
1025 let router = MessageRouter::new();
1026 let coordinator = FighterId::new();
1027 let _rx = router.register(coordinator);
1028
1029 let result = execute_scatter_gather(
1030 &[],
1031 "task",
1032 &router,
1033 coordinator,
1034 Duration::from_secs(1),
1035 &SelectionCriteria::Fastest,
1036 )
1037 .await
1038 .expect("should execute");
1039
1040 assert!(result.is_none());
1041 }
1042
1043 #[test]
1046 fn test_supervisor_one_for_one_restart() {
1047 let f1 = FighterId::new();
1048 let f2 = FighterId::new();
1049 let mut config = SupervisorConfig {
1050 strategy: RestartStrategy::OneForOne,
1051 max_restarts: 3,
1052 workers: vec![
1053 SupervisedWorker {
1054 fighter_id: f1,
1055 restart_count: 0,
1056 running: true,
1057 failed: false,
1058 },
1059 SupervisedWorker {
1060 fighter_id: f2,
1061 restart_count: 0,
1062 running: true,
1063 failed: false,
1064 },
1065 ],
1066 };
1067
1068 let restarted = supervisor_handle_failure(&mut config, &f1);
1069 assert_eq!(restarted, vec![f1]);
1070 assert_eq!(config.workers[0].restart_count, 1);
1071 assert!(config.workers[0].running);
1072 assert_eq!(config.workers[1].restart_count, 0);
1074 }
1075
1076 #[test]
1077 fn test_supervisor_one_for_one_max_restarts() {
1078 let f1 = FighterId::new();
1079 let mut config = SupervisorConfig {
1080 strategy: RestartStrategy::OneForOne,
1081 max_restarts: 2,
1082 workers: vec![SupervisedWorker {
1083 fighter_id: f1,
1084 restart_count: 2,
1085 running: true,
1086 failed: false,
1087 }],
1088 };
1089
1090 let restarted = supervisor_handle_failure(&mut config, &f1);
1091 assert!(restarted.is_empty());
1092 assert!(config.workers[0].failed);
1093 assert!(!config.workers[0].running);
1094 }
1095
1096 #[test]
1097 fn test_supervisor_all_for_one_restart() {
1098 let f1 = FighterId::new();
1099 let f2 = FighterId::new();
1100 let f3 = FighterId::new();
1101 let mut config = SupervisorConfig {
1102 strategy: RestartStrategy::AllForOne,
1103 max_restarts: 3,
1104 workers: vec![
1105 SupervisedWorker {
1106 fighter_id: f1,
1107 restart_count: 0,
1108 running: true,
1109 failed: false,
1110 },
1111 SupervisedWorker {
1112 fighter_id: f2,
1113 restart_count: 0,
1114 running: true,
1115 failed: false,
1116 },
1117 SupervisedWorker {
1118 fighter_id: f3,
1119 restart_count: 0,
1120 running: true,
1121 failed: false,
1122 },
1123 ],
1124 };
1125
1126 let restarted = supervisor_handle_failure(&mut config, &f1);
1127 assert_eq!(restarted.len(), 3);
1128 for worker in &config.workers {
1130 assert_eq!(worker.restart_count, 1);
1131 assert!(worker.running);
1132 }
1133 }
1134
1135 #[test]
1136 fn test_supervisor_all_for_one_max_restarts() {
1137 let f1 = FighterId::new();
1138 let f2 = FighterId::new();
1139 let mut config = SupervisorConfig {
1140 strategy: RestartStrategy::AllForOne,
1141 max_restarts: 1,
1142 workers: vec![
1143 SupervisedWorker {
1144 fighter_id: f1,
1145 restart_count: 1,
1146 running: true,
1147 failed: false,
1148 },
1149 SupervisedWorker {
1150 fighter_id: f2,
1151 restart_count: 0,
1152 running: true,
1153 failed: false,
1154 },
1155 ],
1156 };
1157
1158 let restarted = supervisor_handle_failure(&mut config, &f1);
1159 assert!(restarted.is_empty());
1160 assert!(config.workers[0].failed);
1161 }
1162
1163 #[tokio::test]
1164 async fn test_supervisor_health_monitoring() {
1165 let router = MessageRouter::new();
1166 let supervisor = FighterId::new();
1167 let w1 = FighterId::new();
1168 let w2 = FighterId::new();
1169
1170 let _rx_sup = router.register(supervisor);
1171 let _rx_w1 = router.register(w1);
1172 let mut config = SupervisorConfig {
1175 strategy: RestartStrategy::OneForOne,
1176 max_restarts: 3,
1177 workers: vec![
1178 SupervisedWorker {
1179 fighter_id: w1,
1180 restart_count: 0,
1181 running: true,
1182 failed: false,
1183 },
1184 SupervisedWorker {
1185 fighter_id: w2,
1186 restart_count: 0,
1187 running: true,
1188 failed: false,
1189 },
1190 ],
1191 };
1192
1193 let restarted = supervisor_monitor_health(&mut config, &router, supervisor).await;
1194
1195 assert!(restarted.contains(&w2));
1197 assert!(!restarted.contains(&w1));
1198 }
1199
1200 #[test]
1203 fn test_auction_select_winner() {
1204 let bids = vec![
1205 AuctionBid {
1206 fighter_id: FighterId::new(),
1207 estimated_time_secs: 60,
1208 confidence: 0.8,
1209 submitted_at: Utc::now(),
1210 },
1211 AuctionBid {
1212 fighter_id: FighterId::new(),
1213 estimated_time_secs: 30,
1214 confidence: 0.9,
1215 submitted_at: Utc::now(),
1216 },
1217 ];
1218
1219 let winner = auction_select_winner(&bids);
1220 assert!(winner.is_some());
1221 let w = winner.expect("should have winner");
1223 assert_eq!(w.estimated_time_secs, 30);
1224 }
1225
1226 #[test]
1227 fn test_auction_select_winner_empty() {
1228 let bids: Vec<AuctionBid> = vec![];
1229 assert!(auction_select_winner(&bids).is_none());
1230 }
1231
1232 #[test]
1233 fn test_auction_select_winner_single_bid() {
1234 let bid = AuctionBid {
1235 fighter_id: FighterId::new(),
1236 estimated_time_secs: 10,
1237 confidence: 1.0,
1238 submitted_at: Utc::now(),
1239 };
1240 let bids = [bid.clone()];
1241 let winner = auction_select_winner(&bids);
1242 assert!(winner.is_some());
1243 }
1244
1245 #[test]
1246 fn test_auction_filter_bids() {
1247 let bids = vec![
1248 AuctionBid {
1249 fighter_id: FighterId::new(),
1250 estimated_time_secs: 10,
1251 confidence: 0.3,
1252 submitted_at: Utc::now(),
1253 },
1254 AuctionBid {
1255 fighter_id: FighterId::new(),
1256 estimated_time_secs: 20,
1257 confidence: 0.8,
1258 submitted_at: Utc::now(),
1259 },
1260 AuctionBid {
1261 fighter_id: FighterId::new(),
1262 estimated_time_secs: 30,
1263 confidence: 0.9,
1264 submitted_at: Utc::now(),
1265 },
1266 ];
1267
1268 let filtered = auction_filter_bids(&bids, 0.7);
1269 assert_eq!(filtered.len(), 2);
1270 }
1271
1272 #[test]
1273 fn test_auction_filter_bids_none_pass() {
1274 let bids = vec![AuctionBid {
1275 fighter_id: FighterId::new(),
1276 estimated_time_secs: 10,
1277 confidence: 0.2,
1278 submitted_at: Utc::now(),
1279 }];
1280
1281 let filtered = auction_filter_bids(&bids, 0.5);
1282 assert!(filtered.is_empty());
1283 }
1284
1285 #[test]
1286 fn test_auction_zero_time_estimate() {
1287 let bids = vec![
1288 AuctionBid {
1289 fighter_id: FighterId::new(),
1290 estimated_time_secs: 0,
1291 confidence: 0.5,
1292 submitted_at: Utc::now(),
1293 },
1294 AuctionBid {
1295 fighter_id: FighterId::new(),
1296 estimated_time_secs: 100,
1297 confidence: 0.9,
1298 submitted_at: Utc::now(),
1299 },
1300 ];
1301
1302 let winner = auction_select_winner(&bids);
1304 assert!(winner.is_some());
1305 assert_eq!(winner.expect("winner").estimated_time_secs, 0);
1306 }
1307
1308 #[tokio::test]
1309 async fn test_auction_best_bid_wins() {
1310 let router = MessageRouter::new();
1311 let coordinator = FighterId::new();
1312 let f1 = FighterId::new();
1313 let f2 = FighterId::new();
1314
1315 let _rx_coord = router.register(coordinator);
1316 let _rx_f1 = router.register(f1);
1317 let _rx_f2 = router.register(f2);
1318
1319 let bids = vec![
1320 AuctionBid {
1321 fighter_id: f1,
1322 estimated_time_secs: 60,
1323 confidence: 0.7,
1324 submitted_at: Utc::now(),
1325 },
1326 AuctionBid {
1327 fighter_id: f2,
1328 estimated_time_secs: 20,
1329 confidence: 0.9,
1330 submitted_at: Utc::now(),
1331 },
1332 ];
1333
1334 let result = execute_auction(&[f1, f2], "complex task", &router, coordinator, &bids, 0.5)
1335 .await
1336 .expect("should execute");
1337
1338 assert!(result.is_some());
1339 let winner = result.expect("should have winner");
1340 assert_eq!(winner.fighter_id, f2); }
1342
1343 #[tokio::test]
1344 async fn test_auction_tie_breaking() {
1345 let router = MessageRouter::new();
1346 let coordinator = FighterId::new();
1347 let f1 = FighterId::new();
1348 let f2 = FighterId::new();
1349
1350 let _rx_coord = router.register(coordinator);
1351 let _rx_f1 = router.register(f1);
1352 let _rx_f2 = router.register(f2);
1353
1354 let bids = vec![
1356 AuctionBid {
1357 fighter_id: f1,
1358 estimated_time_secs: 40,
1359 confidence: 0.8,
1360 submitted_at: Utc::now(),
1361 },
1362 AuctionBid {
1363 fighter_id: f2,
1364 estimated_time_secs: 40,
1365 confidence: 0.8,
1366 submitted_at: Utc::now(),
1367 },
1368 ];
1369
1370 let result = execute_auction(&[f1, f2], "tied task", &router, coordinator, &bids, 0.5)
1371 .await
1372 .expect("should execute");
1373
1374 assert!(result.is_some());
1376 }
1377
1378 #[test]
1381 fn test_map_reduce_end_to_end() {
1382 let f1 = FighterId::new();
1383 let f2 = FighterId::new();
1384 let config = MapReduceConfig {
1385 input: "line1\nline2\nline3\nline4".to_string(),
1386 workers: vec![f1, f2],
1387 };
1388
1389 let chunks = map_split(&config.input, config.workers.len());
1390 assert_eq!(chunks.len(), 2);
1391
1392 let mut results = HashMap::new();
1393 results.insert(f1, format!("analyzed: {}", chunks[0]));
1394 results.insert(f2, format!("analyzed: {}", chunks[1]));
1395
1396 let mr = execute_map_reduce(&config, results);
1397 assert!(mr.reduced.contains("analyzed"));
1398 assert_eq!(mr.map_results.len(), 2);
1399 }
1400
1401 #[test]
1402 fn test_supervisor_repeated_failures() {
1403 let f = FighterId::new();
1404 let mut config = SupervisorConfig {
1405 strategy: RestartStrategy::OneForOne,
1406 max_restarts: 3,
1407 workers: vec![SupervisedWorker {
1408 fighter_id: f,
1409 restart_count: 0,
1410 running: true,
1411 failed: false,
1412 }],
1413 };
1414
1415 for i in 1..=3 {
1417 let restarted = supervisor_handle_failure(&mut config, &f);
1418 assert_eq!(restarted, vec![f]);
1419 assert_eq!(config.workers[0].restart_count, i);
1420 }
1421
1422 let restarted = supervisor_handle_failure(&mut config, &f);
1424 assert!(restarted.is_empty());
1425 assert!(config.workers[0].failed);
1426 }
1427}