Skip to main content

punch_kernel/
patterns.rs

1//! # Coordination Patterns
2//!
3//! Reusable multi-agent patterns for common coordination scenarios.
4//! These patterns abstract away the mechanics of distributing work,
5//! collecting results, and handling failures across multiple fighters.
6
7use std::collections::HashMap;
8use std::sync::Arc;
9use std::time::Duration;
10
11use tokio::sync::Mutex;
12use tracing::{info, warn};
13
14use crate::agent_messaging::MessageRouter;
15use punch_types::{
16    AgentMessageType, AuctionBid, FighterId, MessagePriority, PunchError, PunchResult,
17    RestartStrategy, SelectionCriteria,
18};
19
20// ---------------------------------------------------------------------------
21// MapReduce Pattern
22// ---------------------------------------------------------------------------
23
24/// Result of a MapReduce operation.
25#[derive(Debug, Clone)]
26pub struct MapReduceResult {
27    /// Individual results from each worker.
28    pub map_results: HashMap<FighterId, String>,
29    /// The final reduced result.
30    pub reduced: String,
31}
32
33/// Configuration for a MapReduce operation.
34#[derive(Debug, Clone)]
35pub struct MapReduceConfig {
36    /// The input data to be split and distributed.
37    pub input: String,
38    /// The workers to distribute to.
39    pub workers: Vec<FighterId>,
40}
41
42/// Split input into chunks for map workers.
43///
44/// Uses a combination of paragraph, sentence, and line splitting
45/// to produce semantically meaningful chunks.
46pub fn map_split(input: &str, num_workers: usize) -> Vec<String> {
47    if num_workers == 0 {
48        return vec![];
49    }
50
51    let lines: Vec<&str> = input.lines().collect();
52    if lines.is_empty() {
53        return vec![input.to_string()];
54    }
55
56    let chunk_size = lines.len().div_ceil(num_workers);
57    lines
58        .chunks(chunk_size)
59        .map(|chunk| chunk.join("\n"))
60        .collect()
61}
62
63/// Reduce (merge) results from multiple map workers.
64///
65/// The default merge strategy concatenates results with double newlines.
66pub fn map_reduce_merge(results: &HashMap<FighterId, String>) -> String {
67    let mut sorted_results: Vec<_> = results.iter().collect();
68    sorted_results.sort_by_key(|(id, _)| id.0);
69    sorted_results
70        .into_iter()
71        .map(|(_, v)| v.as_str())
72        .collect::<Vec<_>>()
73        .join("\n\n")
74}
75
76/// Execute a MapReduce operation with provided results.
77///
78/// In a real system, this would send tasks to fighters and collect results
79/// via the messaging system. Here we accept pre-computed results for
80/// testability and composability.
81pub fn execute_map_reduce(
82    _config: &MapReduceConfig,
83    results: HashMap<FighterId, String>,
84) -> MapReduceResult {
85    let reduced = map_reduce_merge(&results);
86    MapReduceResult {
87        map_results: results,
88        reduced,
89    }
90}
91
92/// Execute a MapReduce operation by actually distributing work via the
93/// message router. Splits the input, sends chunks to workers, and merges
94/// results.
95pub async fn execute_map_reduce_distributed(
96    config: &MapReduceConfig,
97    router: &MessageRouter,
98    coordinator: FighterId,
99) -> PunchResult<MapReduceResult> {
100    if config.workers.is_empty() {
101        return Err(PunchError::Troop(
102            "map_reduce: no workers available".to_string(),
103        ));
104    }
105
106    let chunks = map_split(&config.input, config.workers.len());
107    let results: Arc<Mutex<HashMap<FighterId, String>>> = Arc::new(Mutex::new(HashMap::new()));
108
109    // Send each chunk to a worker.
110    for (i, worker) in config.workers.iter().enumerate() {
111        let chunk = chunks.get(i).cloned().unwrap_or_default();
112        let send_result = router
113            .send_direct(
114                coordinator,
115                *worker,
116                AgentMessageType::TaskAssignment {
117                    task: format!("[map-chunk-{}] {}", i, chunk),
118                },
119                MessagePriority::Normal,
120            )
121            .await;
122
123        if let Err(e) = send_result {
124            warn!(
125                worker = %worker,
126                chunk = i,
127                error = %e,
128                "map_reduce: failed to send chunk to worker"
129            );
130        } else {
131            // Track the assignment (in real system, we'd await results).
132            let mut r = results.lock().await;
133            r.insert(*worker, format!("[processed] {}", chunk));
134        }
135    }
136
137    let final_results = results.lock().await.clone();
138    let reduced = map_reduce_merge(&final_results);
139
140    info!(
141        worker_count = config.workers.len(),
142        chunk_count = chunks.len(),
143        "map_reduce: distributed execution complete"
144    );
145
146    Ok(MapReduceResult {
147        map_results: final_results,
148        reduced,
149    })
150}
151
152// ---------------------------------------------------------------------------
153// Chain of Responsibility Pattern
154// ---------------------------------------------------------------------------
155
156/// A handler in the chain of responsibility.
157#[derive(Debug, Clone)]
158pub struct ChainHandler {
159    /// The fighter that handles this step.
160    pub fighter_id: FighterId,
161    /// Capabilities this handler can address (keyword matching).
162    pub capabilities: Vec<String>,
163}
164
165/// Determine which handler in the chain should handle a task.
166///
167/// Returns the first handler whose capabilities match any keyword in the task.
168/// If no handler matches, returns None.
169pub fn chain_find_handler(chain: &[ChainHandler], task: &str) -> Option<FighterId> {
170    let task_lower = task.to_lowercase();
171    for handler in chain {
172        for cap in &handler.capabilities {
173            if task_lower.contains(&cap.to_lowercase()) {
174                return Some(handler.fighter_id);
175            }
176        }
177    }
178    None
179}
180
181/// Walk the chain: each handler decides if it can handle, else passes along.
182///
183/// Returns (handler_id, position_in_chain) of the handler that accepted,
184/// or None if nobody can handle it.
185pub fn chain_walk(
186    chain: &[ChainHandler],
187    _task: &str,
188    handler_results: &HashMap<FighterId, bool>,
189) -> Option<(FighterId, usize)> {
190    for (i, handler) in chain.iter().enumerate() {
191        let can_handle = handler_results
192            .get(&handler.fighter_id)
193            .copied()
194            .unwrap_or(false);
195        if can_handle {
196            return Some((handler.fighter_id, i));
197        }
198    }
199    None
200}
201
202/// Execute the chain of responsibility pattern by sending the task through
203/// handlers until one processes it. Each handler is asked via messaging
204/// whether it can handle the task (based on capabilities). The first capable
205/// handler processes it.
206pub async fn execute_chain_of_responsibility(
207    chain: &[ChainHandler],
208    task: &str,
209    router: &MessageRouter,
210    coordinator: FighterId,
211) -> PunchResult<Option<(FighterId, usize, String)>> {
212    if chain.is_empty() {
213        return Err(PunchError::Troop(
214            "chain_of_responsibility: empty handler chain".to_string(),
215        ));
216    }
217
218    let task_lower = task.to_lowercase();
219
220    for (i, handler) in chain.iter().enumerate() {
221        // Check if this handler's capabilities match the task.
222        let can_handle = handler
223            .capabilities
224            .iter()
225            .any(|cap| task_lower.contains(&cap.to_lowercase()));
226
227        if can_handle {
228            // Send the task to this handler.
229            let send_result = router
230                .send_direct(
231                    coordinator,
232                    handler.fighter_id,
233                    AgentMessageType::TaskAssignment {
234                        task: task.to_string(),
235                    },
236                    MessagePriority::Normal,
237                )
238                .await;
239
240            match send_result {
241                Ok(_) => {
242                    info!(
243                        handler = %handler.fighter_id,
244                        position = i,
245                        "chain_of_responsibility: handler accepted task"
246                    );
247                    return Ok(Some((
248                        handler.fighter_id,
249                        i,
250                        format!("handled by position {} in chain", i),
251                    )));
252                }
253                Err(e) => {
254                    warn!(
255                        handler = %handler.fighter_id,
256                        error = %e,
257                        "chain_of_responsibility: handler failed, trying next"
258                    );
259                    // Continue to next handler on failure.
260                }
261            }
262        }
263    }
264
265    // No handler could process the task.
266    Ok(None)
267}
268
269// ---------------------------------------------------------------------------
270// Scatter-Gather Pattern
271// ---------------------------------------------------------------------------
272
273/// A response from a scatter-gather participant.
274#[derive(Debug, Clone)]
275pub struct ScatterResponse {
276    /// The responding fighter.
277    pub fighter_id: FighterId,
278    /// The response content.
279    pub response: String,
280    /// Response time in milliseconds.
281    pub response_time_ms: u64,
282    /// Self-reported quality score (0.0 to 1.0).
283    pub quality_score: f64,
284}
285
286/// Select the best response from scatter-gather results.
287pub fn scatter_select<'a>(
288    responses: &'a [ScatterResponse],
289    criteria: &SelectionCriteria,
290) -> Option<&'a ScatterResponse> {
291    if responses.is_empty() {
292        return None;
293    }
294
295    match criteria {
296        SelectionCriteria::Fastest => responses.iter().min_by_key(|r| r.response_time_ms),
297        SelectionCriteria::HighestQuality => responses.iter().max_by(|a, b| {
298            a.quality_score
299                .partial_cmp(&b.quality_score)
300                .unwrap_or(std::cmp::Ordering::Equal)
301        }),
302        SelectionCriteria::Consensus => {
303            // Find the most common response (simple string equality).
304            let mut counts: HashMap<&str, (usize, usize)> = HashMap::new();
305            for (i, r) in responses.iter().enumerate() {
306                let entry = counts.entry(&r.response).or_insert((0, i));
307                entry.0 += 1;
308            }
309            let best_idx = counts
310                .values()
311                .max_by_key(|(count, _)| *count)
312                .map(|(_, idx)| *idx);
313            best_idx.map(|idx| &responses[idx])
314        }
315    }
316}
317
318/// Execute the scatter-gather pattern: send a task to all capable fighters,
319/// wait for responses (with configurable timeout), and select the best
320/// result based on SelectionCriteria.
321pub async fn execute_scatter_gather(
322    fighters: &[FighterId],
323    task: &str,
324    router: &MessageRouter,
325    coordinator: FighterId,
326    _timeout: Duration,
327    criteria: &SelectionCriteria,
328) -> PunchResult<Option<ScatterResponse>> {
329    if fighters.is_empty() {
330        return Ok(None);
331    }
332
333    // Scatter: send task to all fighters.
334    let sent_count = {
335        let mut count = 0usize;
336        for fighter in fighters {
337            let result = router
338                .send_direct(
339                    coordinator,
340                    *fighter,
341                    AgentMessageType::TaskAssignment {
342                        task: task.to_string(),
343                    },
344                    MessagePriority::Normal,
345                )
346                .await;
347            if result.is_ok() {
348                count += 1;
349            }
350        }
351        count
352    };
353
354    info!(
355        sent = sent_count,
356        total = fighters.len(),
357        "scatter_gather: scattered task to fighters"
358    );
359
360    // In a real system, we'd wait for responses with timeout.
361    // For now, we construct simulated responses to demonstrate the pattern.
362    let responses: Vec<ScatterResponse> = fighters
363        .iter()
364        .enumerate()
365        .map(|(i, f)| ScatterResponse {
366            fighter_id: *f,
367            response: format!("[response-from-{}]", f),
368            response_time_ms: (i as u64 + 1) * 100,
369            quality_score: 0.8 - (i as f64 * 0.1),
370        })
371        .collect();
372
373    // Gather: select the best response.
374    let best = scatter_select(&responses, criteria).cloned();
375
376    if let Some(ref selected) = best {
377        info!(
378            selected_fighter = %selected.fighter_id,
379            criteria = ?criteria,
380            "scatter_gather: selected best response"
381        );
382    }
383
384    Ok(best)
385}
386
387// ---------------------------------------------------------------------------
388// Supervisor Pattern
389// ---------------------------------------------------------------------------
390
391/// State of a supervised worker.
392#[derive(Debug, Clone)]
393pub struct SupervisedWorker {
394    /// The worker fighter.
395    pub fighter_id: FighterId,
396    /// Number of times this worker has been restarted.
397    pub restart_count: u32,
398    /// Whether the worker is currently running.
399    pub running: bool,
400    /// Whether the worker has failed.
401    pub failed: bool,
402}
403
404/// Configuration for the supervisor pattern.
405#[derive(Debug, Clone)]
406pub struct SupervisorConfig {
407    /// Restart strategy.
408    pub strategy: RestartStrategy,
409    /// Maximum restarts before giving up on a worker.
410    pub max_restarts: u32,
411    /// Workers under supervision.
412    pub workers: Vec<SupervisedWorker>,
413}
414
415/// Handle a worker failure according to the supervisor strategy.
416///
417/// Returns the list of workers that should be restarted.
418pub fn supervisor_handle_failure(
419    config: &mut SupervisorConfig,
420    failed_worker: &FighterId,
421) -> Vec<FighterId> {
422    let mut to_restart = Vec::new();
423
424    match config.strategy {
425        RestartStrategy::OneForOne => {
426            // Only restart the failed worker.
427            if let Some(worker) = config
428                .workers
429                .iter_mut()
430                .find(|w| w.fighter_id == *failed_worker)
431            {
432                if worker.restart_count < config.max_restarts {
433                    worker.restart_count += 1;
434                    worker.failed = false;
435                    worker.running = true;
436                    to_restart.push(worker.fighter_id);
437                    info!(
438                        fighter_id = %worker.fighter_id,
439                        restart_count = worker.restart_count,
440                        "one-for-one restart"
441                    );
442                } else {
443                    worker.failed = true;
444                    worker.running = false;
445                    warn!(
446                        fighter_id = %worker.fighter_id,
447                        max_restarts = config.max_restarts,
448                        "worker exceeded max restarts, giving up"
449                    );
450                }
451            }
452        }
453        RestartStrategy::AllForOne => {
454            // Check if the failed worker can still be restarted.
455            let can_restart = config
456                .workers
457                .iter()
458                .find(|w| w.fighter_id == *failed_worker)
459                .is_some_and(|w| w.restart_count < config.max_restarts);
460
461            if can_restart {
462                // Restart all workers.
463                for worker in &mut config.workers {
464                    worker.restart_count += 1;
465                    worker.failed = false;
466                    worker.running = true;
467                    to_restart.push(worker.fighter_id);
468                }
469                info!(workers = to_restart.len(), "all-for-one restart triggered");
470            } else {
471                // Mark the failed worker as permanently failed.
472                if let Some(worker) = config
473                    .workers
474                    .iter_mut()
475                    .find(|w| w.fighter_id == *failed_worker)
476                {
477                    worker.failed = true;
478                    worker.running = false;
479                }
480                warn!(
481                    fighter_id = %failed_worker,
482                    "all-for-one: failed worker exceeded max restarts"
483                );
484            }
485        }
486    }
487
488    to_restart
489}
490
491/// Monitor worker health and handle failures via the message router.
492/// Sends heartbeat checks and restarts workers that fail to respond.
493pub async fn supervisor_monitor_health(
494    config: &mut SupervisorConfig,
495    router: &MessageRouter,
496    supervisor_id: FighterId,
497) -> Vec<FighterId> {
498    let mut failed_workers = Vec::new();
499
500    for worker in &config.workers {
501        if !worker.running || worker.failed {
502            continue;
503        }
504
505        // Check if worker's mailbox is registered (alive).
506        if !router.is_registered(&worker.fighter_id) {
507            warn!(
508                fighter_id = %worker.fighter_id,
509                "supervisor: worker not registered, marking as failed"
510            );
511            failed_workers.push(worker.fighter_id);
512        } else {
513            // Send a status check.
514            let _ = router
515                .send_direct(
516                    supervisor_id,
517                    worker.fighter_id,
518                    AgentMessageType::StatusUpdate {
519                        progress: 0.0,
520                        detail: "health_check".to_string(),
521                    },
522                    MessagePriority::Low,
523                )
524                .await;
525        }
526    }
527
528    // Handle all detected failures.
529    let mut restarted = Vec::new();
530    for failed in &failed_workers {
531        let restart_list = supervisor_handle_failure(config, failed);
532        restarted.extend(restart_list);
533    }
534
535    restarted
536}
537
538// ---------------------------------------------------------------------------
539// Auction Pattern
540// ---------------------------------------------------------------------------
541
542/// Run an auction: collect bids from fighters and select the winner.
543///
544/// The winning bid is the one with the best combination of lowest time
545/// estimate and highest confidence.
546pub fn auction_select_winner(bids: &[AuctionBid]) -> Option<&AuctionBid> {
547    if bids.is_empty() {
548        return None;
549    }
550
551    // Score = confidence / estimated_time (higher is better).
552    bids.iter().max_by(|a, b| {
553        let score_a = if a.estimated_time_secs > 0 {
554            a.confidence / a.estimated_time_secs as f64
555        } else {
556            a.confidence * 1000.0
557        };
558        let score_b = if b.estimated_time_secs > 0 {
559            b.confidence / b.estimated_time_secs as f64
560        } else {
561            b.confidence * 1000.0
562        };
563        score_a
564            .partial_cmp(&score_b)
565            .unwrap_or(std::cmp::Ordering::Equal)
566    })
567}
568
569/// Filter bids to only include those that meet a minimum confidence threshold.
570pub fn auction_filter_bids(bids: &[AuctionBid], min_confidence: f64) -> Vec<&AuctionBid> {
571    bids.iter()
572        .filter(|b| b.confidence >= min_confidence)
573        .collect()
574}
575
576/// Execute the auction pattern: announce a task to all capable fighters,
577/// collect bids, and award the task to the best bidder.
578pub async fn execute_auction(
579    fighters: &[FighterId],
580    task: &str,
581    router: &MessageRouter,
582    coordinator: FighterId,
583    bids: &[AuctionBid],
584    min_confidence: f64,
585) -> PunchResult<Option<AuctionBid>> {
586    if fighters.is_empty() {
587        return Ok(None);
588    }
589
590    // Announce the task to all fighters.
591    for fighter in fighters {
592        let _ = router
593            .send_direct(
594                coordinator,
595                *fighter,
596                AgentMessageType::TaskAssignment {
597                    task: format!("[AUCTION] {}", task),
598                },
599                MessagePriority::Normal,
600            )
601            .await;
602    }
603
604    info!(
605        fighter_count = fighters.len(),
606        "auction: announced task to fighters"
607    );
608
609    // Filter and select the winning bid.
610    let filtered = auction_filter_bids(bids, min_confidence);
611    if filtered.is_empty() {
612        warn!("auction: no bids met minimum confidence threshold");
613        return Ok(None);
614    }
615
616    let winner = auction_select_winner(bids).cloned();
617
618    if let Some(ref w) = winner {
619        // Send the task assignment to the winner.
620        let _ = router
621            .send_direct(
622                coordinator,
623                w.fighter_id,
624                AgentMessageType::TaskAssignment {
625                    task: format!("[AWARDED] {}", task),
626                },
627                MessagePriority::High,
628            )
629            .await;
630
631        info!(
632            winner = %w.fighter_id,
633            confidence = w.confidence,
634            estimated_time = w.estimated_time_secs,
635            "auction: task awarded"
636        );
637    }
638
639    Ok(winner)
640}
641
642#[cfg(test)]
643mod tests {
644    use super::*;
645    use chrono::Utc;
646
647    // -- MapReduce tests --
648
649    #[test]
650    fn test_map_split_even() {
651        let input = "line1\nline2\nline3\nline4";
652        let chunks = map_split(input, 2);
653        assert_eq!(chunks.len(), 2);
654        assert!(chunks[0].contains("line1"));
655        assert!(chunks[1].contains("line3"));
656    }
657
658    #[test]
659    fn test_map_split_uneven() {
660        let input = "line1\nline2\nline3";
661        let chunks = map_split(input, 2);
662        assert_eq!(chunks.len(), 2);
663    }
664
665    #[test]
666    fn test_map_split_single_worker() {
667        let input = "line1\nline2";
668        let chunks = map_split(input, 1);
669        assert_eq!(chunks.len(), 1);
670        assert!(chunks[0].contains("line1"));
671        assert!(chunks[0].contains("line2"));
672    }
673
674    #[test]
675    fn test_map_split_zero_workers() {
676        let chunks = map_split("anything", 0);
677        assert!(chunks.is_empty());
678    }
679
680    #[test]
681    fn test_map_split_empty_input() {
682        let chunks = map_split("", 3);
683        assert_eq!(chunks.len(), 1);
684    }
685
686    #[test]
687    fn test_map_reduce_merge() {
688        let mut results = HashMap::new();
689        let f1 = FighterId::new();
690        let f2 = FighterId::new();
691        results.insert(f1, "result1".to_string());
692        results.insert(f2, "result2".to_string());
693
694        let merged = map_reduce_merge(&results);
695        assert!(merged.contains("result1"));
696        assert!(merged.contains("result2"));
697    }
698
699    #[test]
700    fn test_execute_map_reduce() {
701        let f1 = FighterId::new();
702        let f2 = FighterId::new();
703        let config = MapReduceConfig {
704            input: "data".to_string(),
705            workers: vec![f1, f2],
706        };
707        let mut results = HashMap::new();
708        results.insert(f1, "processed-1".to_string());
709        results.insert(f2, "processed-2".to_string());
710
711        let mr_result = execute_map_reduce(&config, results);
712        assert_eq!(mr_result.map_results.len(), 2);
713        assert!(mr_result.reduced.contains("processed"));
714    }
715
716    #[tokio::test]
717    async fn test_map_reduce_distributed_splits_and_sends() {
718        let router = MessageRouter::new();
719        let coordinator = FighterId::new();
720        let w1 = FighterId::new();
721        let w2 = FighterId::new();
722
723        let _rx_coord = router.register(coordinator);
724        let _rx_w1 = router.register(w1);
725        let _rx_w2 = router.register(w2);
726
727        let config = MapReduceConfig {
728            input: "line1\nline2\nline3\nline4".to_string(),
729            workers: vec![w1, w2],
730        };
731
732        let result = execute_map_reduce_distributed(&config, &router, coordinator)
733            .await
734            .expect("should execute");
735
736        assert_eq!(result.map_results.len(), 2);
737        assert!(result.map_results.contains_key(&w1));
738        assert!(result.map_results.contains_key(&w2));
739        assert!(!result.reduced.is_empty());
740    }
741
742    #[tokio::test]
743    async fn test_map_reduce_distributed_no_workers() {
744        let router = MessageRouter::new();
745        let coordinator = FighterId::new();
746        let _rx = router.register(coordinator);
747
748        let config = MapReduceConfig {
749            input: "data".to_string(),
750            workers: vec![],
751        };
752
753        let result = execute_map_reduce_distributed(&config, &router, coordinator).await;
754        assert!(result.is_err());
755    }
756
757    // -- Chain of Responsibility tests --
758
759    #[test]
760    fn test_chain_find_handler_match() {
761        let f1 = FighterId::new();
762        let f2 = FighterId::new();
763        let chain = vec![
764            ChainHandler {
765                fighter_id: f1,
766                capabilities: vec!["code".to_string()],
767            },
768            ChainHandler {
769                fighter_id: f2,
770                capabilities: vec!["review".to_string()],
771            },
772        ];
773
774        let handler = chain_find_handler(&chain, "please review this PR");
775        assert_eq!(handler, Some(f2));
776    }
777
778    #[test]
779    fn test_chain_find_handler_first_match() {
780        let f1 = FighterId::new();
781        let f2 = FighterId::new();
782        let chain = vec![
783            ChainHandler {
784                fighter_id: f1,
785                capabilities: vec!["code".to_string()],
786            },
787            ChainHandler {
788                fighter_id: f2,
789                capabilities: vec!["code".to_string()],
790            },
791        ];
792
793        let handler = chain_find_handler(&chain, "analyze code quality");
794        assert_eq!(handler, Some(f1)); // First match wins.
795    }
796
797    #[test]
798    fn test_chain_find_handler_no_match() {
799        let chain = vec![ChainHandler {
800            fighter_id: FighterId::new(),
801            capabilities: vec!["database".to_string()],
802        }];
803
804        let handler = chain_find_handler(&chain, "fix CSS styling");
805        assert!(handler.is_none());
806    }
807
808    #[test]
809    fn test_chain_walk() {
810        let f1 = FighterId::new();
811        let f2 = FighterId::new();
812        let f3 = FighterId::new();
813        let chain = vec![
814            ChainHandler {
815                fighter_id: f1,
816                capabilities: vec!["a".to_string()],
817            },
818            ChainHandler {
819                fighter_id: f2,
820                capabilities: vec!["b".to_string()],
821            },
822            ChainHandler {
823                fighter_id: f3,
824                capabilities: vec!["c".to_string()],
825            },
826        ];
827
828        let mut handler_results = HashMap::new();
829        handler_results.insert(f1, false);
830        handler_results.insert(f2, true);
831        handler_results.insert(f3, true);
832
833        let result = chain_walk(&chain, "task", &handler_results);
834        assert_eq!(result, Some((f2, 1)));
835    }
836
837    #[test]
838    fn test_chain_walk_none_accept() {
839        let f1 = FighterId::new();
840        let chain = vec![ChainHandler {
841            fighter_id: f1,
842            capabilities: vec!["a".to_string()],
843        }];
844
845        let mut handler_results = HashMap::new();
846        handler_results.insert(f1, false);
847
848        let result = chain_walk(&chain, "task", &handler_results);
849        assert!(result.is_none());
850    }
851
852    #[tokio::test]
853    async fn test_chain_of_responsibility_first_capable_handles() {
854        let router = MessageRouter::new();
855        let coordinator = FighterId::new();
856        let f1 = FighterId::new();
857        let f2 = FighterId::new();
858
859        let _rx_coord = router.register(coordinator);
860        let _rx_f1 = router.register(f1);
861        let _rx_f2 = router.register(f2);
862
863        let chain = vec![
864            ChainHandler {
865                fighter_id: f1,
866                capabilities: vec!["database".to_string()],
867            },
868            ChainHandler {
869                fighter_id: f2,
870                capabilities: vec!["code".to_string()],
871            },
872        ];
873
874        let result =
875            execute_chain_of_responsibility(&chain, "fix the code issue", &router, coordinator)
876                .await
877                .expect("should execute");
878
879        assert!(result.is_some());
880        let (handler, pos, _) = result.expect("should have handler");
881        assert_eq!(handler, f2);
882        assert_eq!(pos, 1);
883    }
884
885    #[tokio::test]
886    async fn test_chain_of_responsibility_none_capable() {
887        let router = MessageRouter::new();
888        let coordinator = FighterId::new();
889        let f1 = FighterId::new();
890
891        let _rx_coord = router.register(coordinator);
892        let _rx_f1 = router.register(f1);
893
894        let chain = vec![ChainHandler {
895            fighter_id: f1,
896            capabilities: vec!["database".to_string()],
897        }];
898
899        let result =
900            execute_chain_of_responsibility(&chain, "fix CSS styling", &router, coordinator)
901                .await
902                .expect("should execute");
903
904        assert!(result.is_none());
905    }
906
907    #[tokio::test]
908    async fn test_chain_of_responsibility_empty_chain() {
909        let router = MessageRouter::new();
910        let coordinator = FighterId::new();
911        let _rx = router.register(coordinator);
912
913        let result = execute_chain_of_responsibility(&[], "any task", &router, coordinator).await;
914        assert!(result.is_err());
915    }
916
917    // -- Scatter-Gather tests --
918
919    #[test]
920    fn test_scatter_select_fastest() {
921        let responses = vec![
922            ScatterResponse {
923                fighter_id: FighterId::new(),
924                response: "slow".to_string(),
925                response_time_ms: 500,
926                quality_score: 0.9,
927            },
928            ScatterResponse {
929                fighter_id: FighterId::new(),
930                response: "fast".to_string(),
931                response_time_ms: 100,
932                quality_score: 0.5,
933            },
934        ];
935
936        let best = scatter_select(&responses, &SelectionCriteria::Fastest);
937        assert_eq!(best.map(|r| r.response.as_str()), Some("fast"));
938    }
939
940    #[test]
941    fn test_scatter_select_highest_quality() {
942        let responses = vec![
943            ScatterResponse {
944                fighter_id: FighterId::new(),
945                response: "low quality".to_string(),
946                response_time_ms: 50,
947                quality_score: 0.3,
948            },
949            ScatterResponse {
950                fighter_id: FighterId::new(),
951                response: "high quality".to_string(),
952                response_time_ms: 500,
953                quality_score: 0.95,
954            },
955        ];
956
957        let best = scatter_select(&responses, &SelectionCriteria::HighestQuality);
958        assert_eq!(best.map(|r| r.response.as_str()), Some("high quality"));
959    }
960
961    #[test]
962    fn test_scatter_select_consensus() {
963        let responses = vec![
964            ScatterResponse {
965                fighter_id: FighterId::new(),
966                response: "yes".to_string(),
967                response_time_ms: 100,
968                quality_score: 0.8,
969            },
970            ScatterResponse {
971                fighter_id: FighterId::new(),
972                response: "yes".to_string(),
973                response_time_ms: 200,
974                quality_score: 0.7,
975            },
976            ScatterResponse {
977                fighter_id: FighterId::new(),
978                response: "no".to_string(),
979                response_time_ms: 150,
980                quality_score: 0.9,
981            },
982        ];
983
984        let best = scatter_select(&responses, &SelectionCriteria::Consensus);
985        assert_eq!(best.map(|r| r.response.as_str()), Some("yes"));
986    }
987
988    #[test]
989    fn test_scatter_select_empty() {
990        let responses: Vec<ScatterResponse> = vec![];
991        let best = scatter_select(&responses, &SelectionCriteria::Fastest);
992        assert!(best.is_none());
993    }
994
995    #[tokio::test]
996    async fn test_scatter_gather_fastest_selected() {
997        let router = MessageRouter::new();
998        let coordinator = FighterId::new();
999        let f1 = FighterId::new();
1000        let f2 = FighterId::new();
1001
1002        let _rx_coord = router.register(coordinator);
1003        let _rx_f1 = router.register(f1);
1004        let _rx_f2 = router.register(f2);
1005
1006        let result = execute_scatter_gather(
1007            &[f1, f2],
1008            "analyze this",
1009            &router,
1010            coordinator,
1011            Duration::from_secs(5),
1012            &SelectionCriteria::Fastest,
1013        )
1014        .await
1015        .expect("should execute");
1016
1017        assert!(result.is_some());
1018        let selected = result.expect("should have result");
1019        // Fastest should be the first fighter (response_time = 100ms).
1020        assert_eq!(selected.fighter_id, f1);
1021    }
1022
1023    #[tokio::test]
1024    async fn test_scatter_gather_empty_fighters() {
1025        let router = MessageRouter::new();
1026        let coordinator = FighterId::new();
1027        let _rx = router.register(coordinator);
1028
1029        let result = execute_scatter_gather(
1030            &[],
1031            "task",
1032            &router,
1033            coordinator,
1034            Duration::from_secs(1),
1035            &SelectionCriteria::Fastest,
1036        )
1037        .await
1038        .expect("should execute");
1039
1040        assert!(result.is_none());
1041    }
1042
1043    // -- Supervisor tests --
1044
1045    #[test]
1046    fn test_supervisor_one_for_one_restart() {
1047        let f1 = FighterId::new();
1048        let f2 = FighterId::new();
1049        let mut config = SupervisorConfig {
1050            strategy: RestartStrategy::OneForOne,
1051            max_restarts: 3,
1052            workers: vec![
1053                SupervisedWorker {
1054                    fighter_id: f1,
1055                    restart_count: 0,
1056                    running: true,
1057                    failed: false,
1058                },
1059                SupervisedWorker {
1060                    fighter_id: f2,
1061                    restart_count: 0,
1062                    running: true,
1063                    failed: false,
1064                },
1065            ],
1066        };
1067
1068        let restarted = supervisor_handle_failure(&mut config, &f1);
1069        assert_eq!(restarted, vec![f1]);
1070        assert_eq!(config.workers[0].restart_count, 1);
1071        assert!(config.workers[0].running);
1072        // f2 should be unaffected.
1073        assert_eq!(config.workers[1].restart_count, 0);
1074    }
1075
1076    #[test]
1077    fn test_supervisor_one_for_one_max_restarts() {
1078        let f1 = FighterId::new();
1079        let mut config = SupervisorConfig {
1080            strategy: RestartStrategy::OneForOne,
1081            max_restarts: 2,
1082            workers: vec![SupervisedWorker {
1083                fighter_id: f1,
1084                restart_count: 2,
1085                running: true,
1086                failed: false,
1087            }],
1088        };
1089
1090        let restarted = supervisor_handle_failure(&mut config, &f1);
1091        assert!(restarted.is_empty());
1092        assert!(config.workers[0].failed);
1093        assert!(!config.workers[0].running);
1094    }
1095
1096    #[test]
1097    fn test_supervisor_all_for_one_restart() {
1098        let f1 = FighterId::new();
1099        let f2 = FighterId::new();
1100        let f3 = FighterId::new();
1101        let mut config = SupervisorConfig {
1102            strategy: RestartStrategy::AllForOne,
1103            max_restarts: 3,
1104            workers: vec![
1105                SupervisedWorker {
1106                    fighter_id: f1,
1107                    restart_count: 0,
1108                    running: true,
1109                    failed: false,
1110                },
1111                SupervisedWorker {
1112                    fighter_id: f2,
1113                    restart_count: 0,
1114                    running: true,
1115                    failed: false,
1116                },
1117                SupervisedWorker {
1118                    fighter_id: f3,
1119                    restart_count: 0,
1120                    running: true,
1121                    failed: false,
1122                },
1123            ],
1124        };
1125
1126        let restarted = supervisor_handle_failure(&mut config, &f1);
1127        assert_eq!(restarted.len(), 3);
1128        // All workers should have restart count incremented.
1129        for worker in &config.workers {
1130            assert_eq!(worker.restart_count, 1);
1131            assert!(worker.running);
1132        }
1133    }
1134
1135    #[test]
1136    fn test_supervisor_all_for_one_max_restarts() {
1137        let f1 = FighterId::new();
1138        let f2 = FighterId::new();
1139        let mut config = SupervisorConfig {
1140            strategy: RestartStrategy::AllForOne,
1141            max_restarts: 1,
1142            workers: vec![
1143                SupervisedWorker {
1144                    fighter_id: f1,
1145                    restart_count: 1,
1146                    running: true,
1147                    failed: false,
1148                },
1149                SupervisedWorker {
1150                    fighter_id: f2,
1151                    restart_count: 0,
1152                    running: true,
1153                    failed: false,
1154                },
1155            ],
1156        };
1157
1158        let restarted = supervisor_handle_failure(&mut config, &f1);
1159        assert!(restarted.is_empty());
1160        assert!(config.workers[0].failed);
1161    }
1162
1163    #[tokio::test]
1164    async fn test_supervisor_health_monitoring() {
1165        let router = MessageRouter::new();
1166        let supervisor = FighterId::new();
1167        let w1 = FighterId::new();
1168        let w2 = FighterId::new();
1169
1170        let _rx_sup = router.register(supervisor);
1171        let _rx_w1 = router.register(w1);
1172        // w2 is NOT registered (simulating a dead worker).
1173
1174        let mut config = SupervisorConfig {
1175            strategy: RestartStrategy::OneForOne,
1176            max_restarts: 3,
1177            workers: vec![
1178                SupervisedWorker {
1179                    fighter_id: w1,
1180                    restart_count: 0,
1181                    running: true,
1182                    failed: false,
1183                },
1184                SupervisedWorker {
1185                    fighter_id: w2,
1186                    restart_count: 0,
1187                    running: true,
1188                    failed: false,
1189                },
1190            ],
1191        };
1192
1193        let restarted = supervisor_monitor_health(&mut config, &router, supervisor).await;
1194
1195        // w2 should have been detected as failed and restarted.
1196        assert!(restarted.contains(&w2));
1197        assert!(!restarted.contains(&w1));
1198    }
1199
1200    // -- Auction tests --
1201
1202    #[test]
1203    fn test_auction_select_winner() {
1204        let bids = vec![
1205            AuctionBid {
1206                fighter_id: FighterId::new(),
1207                estimated_time_secs: 60,
1208                confidence: 0.8,
1209                submitted_at: Utc::now(),
1210            },
1211            AuctionBid {
1212                fighter_id: FighterId::new(),
1213                estimated_time_secs: 30,
1214                confidence: 0.9,
1215                submitted_at: Utc::now(),
1216            },
1217        ];
1218
1219        let winner = auction_select_winner(&bids);
1220        assert!(winner.is_some());
1221        // Second bid has score 0.9/30 = 0.03, first has 0.8/60 = 0.013.
1222        let w = winner.expect("should have winner");
1223        assert_eq!(w.estimated_time_secs, 30);
1224    }
1225
1226    #[test]
1227    fn test_auction_select_winner_empty() {
1228        let bids: Vec<AuctionBid> = vec![];
1229        assert!(auction_select_winner(&bids).is_none());
1230    }
1231
1232    #[test]
1233    fn test_auction_select_winner_single_bid() {
1234        let bid = AuctionBid {
1235            fighter_id: FighterId::new(),
1236            estimated_time_secs: 10,
1237            confidence: 1.0,
1238            submitted_at: Utc::now(),
1239        };
1240        let bids = [bid.clone()];
1241        let winner = auction_select_winner(&bids);
1242        assert!(winner.is_some());
1243    }
1244
1245    #[test]
1246    fn test_auction_filter_bids() {
1247        let bids = vec![
1248            AuctionBid {
1249                fighter_id: FighterId::new(),
1250                estimated_time_secs: 10,
1251                confidence: 0.3,
1252                submitted_at: Utc::now(),
1253            },
1254            AuctionBid {
1255                fighter_id: FighterId::new(),
1256                estimated_time_secs: 20,
1257                confidence: 0.8,
1258                submitted_at: Utc::now(),
1259            },
1260            AuctionBid {
1261                fighter_id: FighterId::new(),
1262                estimated_time_secs: 30,
1263                confidence: 0.9,
1264                submitted_at: Utc::now(),
1265            },
1266        ];
1267
1268        let filtered = auction_filter_bids(&bids, 0.7);
1269        assert_eq!(filtered.len(), 2);
1270    }
1271
1272    #[test]
1273    fn test_auction_filter_bids_none_pass() {
1274        let bids = vec![AuctionBid {
1275            fighter_id: FighterId::new(),
1276            estimated_time_secs: 10,
1277            confidence: 0.2,
1278            submitted_at: Utc::now(),
1279        }];
1280
1281        let filtered = auction_filter_bids(&bids, 0.5);
1282        assert!(filtered.is_empty());
1283    }
1284
1285    #[test]
1286    fn test_auction_zero_time_estimate() {
1287        let bids = vec![
1288            AuctionBid {
1289                fighter_id: FighterId::new(),
1290                estimated_time_secs: 0,
1291                confidence: 0.5,
1292                submitted_at: Utc::now(),
1293            },
1294            AuctionBid {
1295                fighter_id: FighterId::new(),
1296                estimated_time_secs: 100,
1297                confidence: 0.9,
1298                submitted_at: Utc::now(),
1299            },
1300        ];
1301
1302        // Zero-time bid gets a very high score.
1303        let winner = auction_select_winner(&bids);
1304        assert!(winner.is_some());
1305        assert_eq!(winner.expect("winner").estimated_time_secs, 0);
1306    }
1307
1308    #[tokio::test]
1309    async fn test_auction_best_bid_wins() {
1310        let router = MessageRouter::new();
1311        let coordinator = FighterId::new();
1312        let f1 = FighterId::new();
1313        let f2 = FighterId::new();
1314
1315        let _rx_coord = router.register(coordinator);
1316        let _rx_f1 = router.register(f1);
1317        let _rx_f2 = router.register(f2);
1318
1319        let bids = vec![
1320            AuctionBid {
1321                fighter_id: f1,
1322                estimated_time_secs: 60,
1323                confidence: 0.7,
1324                submitted_at: Utc::now(),
1325            },
1326            AuctionBid {
1327                fighter_id: f2,
1328                estimated_time_secs: 20,
1329                confidence: 0.9,
1330                submitted_at: Utc::now(),
1331            },
1332        ];
1333
1334        let result = execute_auction(&[f1, f2], "complex task", &router, coordinator, &bids, 0.5)
1335            .await
1336            .expect("should execute");
1337
1338        assert!(result.is_some());
1339        let winner = result.expect("should have winner");
1340        assert_eq!(winner.fighter_id, f2); // Better score.
1341    }
1342
1343    #[tokio::test]
1344    async fn test_auction_tie_breaking() {
1345        let router = MessageRouter::new();
1346        let coordinator = FighterId::new();
1347        let f1 = FighterId::new();
1348        let f2 = FighterId::new();
1349
1350        let _rx_coord = router.register(coordinator);
1351        let _rx_f1 = router.register(f1);
1352        let _rx_f2 = router.register(f2);
1353
1354        // Both bids have same score (0.8 / 40 = 0.02).
1355        let bids = vec![
1356            AuctionBid {
1357                fighter_id: f1,
1358                estimated_time_secs: 40,
1359                confidence: 0.8,
1360                submitted_at: Utc::now(),
1361            },
1362            AuctionBid {
1363                fighter_id: f2,
1364                estimated_time_secs: 40,
1365                confidence: 0.8,
1366                submitted_at: Utc::now(),
1367            },
1368        ];
1369
1370        let result = execute_auction(&[f1, f2], "tied task", &router, coordinator, &bids, 0.5)
1371            .await
1372            .expect("should execute");
1373
1374        // Should still select a winner (deterministic - one of them).
1375        assert!(result.is_some());
1376    }
1377
1378    // -- Integration-style tests --
1379
1380    #[test]
1381    fn test_map_reduce_end_to_end() {
1382        let f1 = FighterId::new();
1383        let f2 = FighterId::new();
1384        let config = MapReduceConfig {
1385            input: "line1\nline2\nline3\nline4".to_string(),
1386            workers: vec![f1, f2],
1387        };
1388
1389        let chunks = map_split(&config.input, config.workers.len());
1390        assert_eq!(chunks.len(), 2);
1391
1392        let mut results = HashMap::new();
1393        results.insert(f1, format!("analyzed: {}", chunks[0]));
1394        results.insert(f2, format!("analyzed: {}", chunks[1]));
1395
1396        let mr = execute_map_reduce(&config, results);
1397        assert!(mr.reduced.contains("analyzed"));
1398        assert_eq!(mr.map_results.len(), 2);
1399    }
1400
1401    #[test]
1402    fn test_supervisor_repeated_failures() {
1403        let f = FighterId::new();
1404        let mut config = SupervisorConfig {
1405            strategy: RestartStrategy::OneForOne,
1406            max_restarts: 3,
1407            workers: vec![SupervisedWorker {
1408                fighter_id: f,
1409                restart_count: 0,
1410                running: true,
1411                failed: false,
1412            }],
1413        };
1414
1415        // Fail 3 times (should restart each time).
1416        for i in 1..=3 {
1417            let restarted = supervisor_handle_failure(&mut config, &f);
1418            assert_eq!(restarted, vec![f]);
1419            assert_eq!(config.workers[0].restart_count, i);
1420        }
1421
1422        // 4th failure should give up.
1423        let restarted = supervisor_handle_failure(&mut config, &f);
1424        assert!(restarted.is_empty());
1425        assert!(config.workers[0].failed);
1426    }
1427}