Skip to main content

aster/agents/communication/
coordinator.rs

1//! Agent Coordinator
2//!
3//! Coordinates multiple agents with task assignment,
4//! load balancing, and deadlock detection.
5//!
6//! **Feature: agents-alignment**
7
8use chrono::{DateTime, Duration, Utc};
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use std::collections::{HashMap, HashSet};
12use thiserror::Error;
13use uuid::Uuid;
14
15// ============================================================================
16// Error Types
17// ============================================================================
18
19/// Errors that can occur during agent coordination
20#[derive(Debug, Error, Clone, PartialEq)]
21pub enum CoordinatorError {
22    #[error("Agent not found: {0}")]
23    AgentNotFound(String),
24
25    #[error("No suitable agent available for task")]
26    NoSuitableAgent,
27
28    #[error("Task not found: {0}")]
29    TaskNotFound(String),
30
31    #[error("Task timeout: {0}")]
32    TaskTimeout(String),
33
34    #[error("Synchronization timeout")]
35    SyncTimeout,
36
37    #[error("Deadlock detected")]
38    DeadlockDetected,
39
40    #[error("Agent already registered: {0}")]
41    AgentAlreadyRegistered(String),
42
43    #[error("Invalid task state: {0}")]
44    InvalidTaskState(String),
45}
46
47pub type CoordinatorResult<T> = Result<T, CoordinatorError>;
48
49// ============================================================================
50// Types
51// ============================================================================
52
53/// Agent status
54#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
55pub enum AgentStatus {
56    #[default]
57    Idle,
58    Busy,
59    Offline,
60}
61
62/// Agent capabilities and state
63#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct AgentCapabilities {
65    /// Agent ID
66    pub agent_id: String,
67    /// Agent type (e.g., "explore", "plan", "code")
68    pub agent_type: String,
69    /// List of capabilities
70    pub capabilities: Vec<String>,
71    /// Current load (0.0 - 1.0)
72    pub current_load: f64,
73    /// Maximum concurrent tasks
74    pub max_concurrent_tasks: usize,
75    /// Current number of tasks
76    pub current_tasks: usize,
77    /// Agent status
78    pub status: AgentStatus,
79    /// Last heartbeat time
80    pub last_heartbeat: DateTime<Utc>,
81}
82
83impl AgentCapabilities {
84    pub fn new(agent_id: impl Into<String>, agent_type: impl Into<String>) -> Self {
85        Self {
86            agent_id: agent_id.into(),
87            agent_type: agent_type.into(),
88            capabilities: Vec::new(),
89            current_load: 0.0,
90            max_concurrent_tasks: 1,
91            current_tasks: 0,
92            status: AgentStatus::Idle,
93            last_heartbeat: Utc::now(),
94        }
95    }
96
97    pub fn with_capabilities(mut self, capabilities: Vec<String>) -> Self {
98        self.capabilities = capabilities;
99        self
100    }
101
102    pub fn with_max_concurrent_tasks(mut self, max: usize) -> Self {
103        self.max_concurrent_tasks = max;
104        self
105    }
106
107    /// Check if agent has a specific capability
108    pub fn has_capability(&self, capability: &str) -> bool {
109        self.capabilities.iter().any(|c| c == capability)
110    }
111
112    /// Check if agent has all required capabilities
113    pub fn has_all_capabilities(&self, required: &[String]) -> bool {
114        required.iter().all(|r| self.has_capability(r))
115    }
116
117    /// Check if agent can accept more tasks
118    pub fn can_accept_task(&self) -> bool {
119        self.status != AgentStatus::Offline && self.current_tasks < self.max_concurrent_tasks
120    }
121
122    /// Update load based on current tasks
123    pub fn update_load(&mut self) {
124        self.current_load = if self.max_concurrent_tasks > 0 {
125            self.current_tasks as f64 / self.max_concurrent_tasks as f64
126        } else {
127            1.0
128        };
129
130        self.status = if self.current_tasks == 0 {
131            AgentStatus::Idle
132        } else {
133            AgentStatus::Busy
134        };
135    }
136}
137
138/// Load balancing strategy
139#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
140pub enum LoadBalanceStrategy {
141    /// Select agent with lowest load
142    #[default]
143    LeastBusy,
144    /// Round-robin selection
145    RoundRobin,
146    /// Random selection
147    Random,
148    /// Select agent with best capability match
149    CapabilityMatch,
150}
151
152/// Task assignment criteria
153#[derive(Debug, Clone, Default, Serialize, Deserialize)]
154pub struct AssignmentCriteria {
155    /// Required agent type
156    pub required_agent_type: Option<String>,
157    /// Required capabilities
158    pub required_capabilities: Vec<String>,
159    /// Load balancing strategy
160    pub load_balance_strategy: LoadBalanceStrategy,
161    /// Task priority (0-10)
162    pub priority: u8,
163    /// Timeout in milliseconds
164    pub timeout_ms: Option<u64>,
165}
166
167impl AssignmentCriteria {
168    pub fn new() -> Self {
169        Self::default()
170    }
171
172    pub fn with_agent_type(mut self, agent_type: impl Into<String>) -> Self {
173        self.required_agent_type = Some(agent_type.into());
174        self
175    }
176
177    pub fn with_capabilities(mut self, capabilities: Vec<String>) -> Self {
178        self.required_capabilities = capabilities;
179        self
180    }
181
182    pub fn with_strategy(mut self, strategy: LoadBalanceStrategy) -> Self {
183        self.load_balance_strategy = strategy;
184        self
185    }
186
187    pub fn with_priority(mut self, priority: u8) -> Self {
188        self.priority = priority.min(10);
189        self
190    }
191
192    pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
193        self.timeout_ms = Some(timeout_ms);
194        self
195    }
196}
197
198/// Task definition
199#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct Task {
201    /// Task ID
202    pub id: String,
203    /// Task type
204    pub task_type: String,
205    /// Task data
206    pub data: Value,
207    /// Priority (0-10)
208    pub priority: u8,
209    /// Created time
210    pub created_at: DateTime<Utc>,
211    /// Timeout in milliseconds
212    pub timeout_ms: Option<u64>,
213}
214
215impl Task {
216    pub fn new(task_type: impl Into<String>, data: Value) -> Self {
217        Self {
218            id: Uuid::new_v4().to_string(),
219            task_type: task_type.into(),
220            data,
221            priority: 5,
222            created_at: Utc::now(),
223            timeout_ms: None,
224        }
225    }
226
227    pub fn with_id(mut self, id: impl Into<String>) -> Self {
228        self.id = id.into();
229        self
230    }
231
232    pub fn with_priority(mut self, priority: u8) -> Self {
233        self.priority = priority.min(10);
234        self
235    }
236
237    pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
238        self.timeout_ms = Some(timeout_ms);
239        self
240    }
241}
242
243/// Task status
244#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
245pub enum TaskStatus {
246    Pending,
247    Assigned,
248    Running,
249    Completed,
250    Failed,
251    Timeout,
252}
253
254/// Task result
255#[derive(Debug, Clone, Serialize, Deserialize)]
256pub struct TaskResult {
257    /// Task ID
258    pub task_id: String,
259    /// Executing agent ID
260    pub agent_id: String,
261    /// Whether successful
262    pub success: bool,
263    /// Result data
264    pub result: Option<Value>,
265    /// Error message
266    pub error: Option<String>,
267    /// Start time
268    pub start_time: DateTime<Utc>,
269    /// End time
270    pub end_time: DateTime<Utc>,
271    /// Duration in milliseconds
272    pub duration_ms: i64,
273}
274
275/// Task assignment record
276#[derive(Debug, Clone)]
277#[allow(dead_code)]
278struct TaskAssignment {
279    task: Task,
280    agent_id: String,
281    status: TaskStatus,
282    assigned_at: DateTime<Utc>,
283    started_at: Option<DateTime<Utc>>,
284    result: Option<TaskResult>,
285}
286
287/// Deadlock information
288#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct DeadlockInfo {
290    /// Detection time
291    pub detected_at: DateTime<Utc>,
292    /// Involved agents
293    pub involved_agents: Vec<String>,
294    /// Involved resources
295    pub involved_resources: Vec<String>,
296    /// Dependency chain
297    pub dependency_chain: Vec<DependencyLink>,
298}
299
300/// A link in the dependency chain
301#[derive(Debug, Clone, Serialize, Deserialize)]
302pub struct DependencyLink {
303    /// Agent waiting
304    pub agent: String,
305    /// Agent being waited for
306    pub waiting_for: String,
307    /// Resource being waited for
308    pub resource: String,
309}
310
311/// Synchronization barrier
312#[derive(Debug, Clone)]
313#[allow(dead_code)]
314struct SyncBarrier {
315    id: String,
316    agent_ids: HashSet<String>,
317    arrived: HashSet<String>,
318    created_at: DateTime<Utc>,
319}
320
321/// Coordinator statistics
322#[derive(Debug, Clone, Default, Serialize, Deserialize)]
323pub struct CoordinatorStats {
324    pub total_agents: usize,
325    pub active_agents: usize,
326    pub offline_agents: usize,
327    pub total_tasks: usize,
328    pub pending_tasks: usize,
329    pub running_tasks: usize,
330    pub completed_tasks: usize,
331    pub failed_tasks: usize,
332    pub average_load: f64,
333}
334
335/// Coordinator event
336#[derive(Debug, Clone)]
337pub enum CoordinatorEvent {
338    AgentRegistered(AgentCapabilities),
339    AgentUnregistered {
340        agent_id: String,
341    },
342    AgentStatusChanged {
343        agent_id: String,
344        status: AgentStatus,
345    },
346    AgentOffline {
347        agent_id: String,
348    },
349    TaskAssigned {
350        task_id: String,
351        agent_id: String,
352    },
353    TaskStarted {
354        task_id: String,
355        agent_id: String,
356    },
357    TaskCompleted(TaskResult),
358    TaskFailed {
359        task_id: String,
360        error: String,
361    },
362    DeadlockDetected(DeadlockInfo),
363    SyncBarrierReached {
364        barrier_id: String,
365    },
366}
367
368// ============================================================================
369// Agent Coordinator
370// ============================================================================
371
372/// Type alias for event callback functions
373type EventCallback = Box<dyn Fn(&CoordinatorEvent) + Send + Sync>;
374
375/// Agent Coordinator
376///
377/// Coordinates multiple agents with task assignment,
378/// load balancing, and deadlock detection.
379pub struct AgentCoordinator {
380    /// Registered agents
381    agents: HashMap<String, AgentCapabilities>,
382    /// Task assignments (task_id -> assignment)
383    task_assignments: HashMap<String, TaskAssignment>,
384    /// Resource dependencies (agent_id -> resources waiting for)
385    resource_dependencies: HashMap<String, HashSet<String>>,
386    /// Resource holders (resource -> agent_id holding it)
387    resource_holders: HashMap<String, String>,
388    /// Synchronization barriers
389    sync_barriers: HashMap<String, SyncBarrier>,
390    /// Round-robin index for load balancing
391    round_robin_index: usize,
392    /// Event callbacks
393    event_callbacks: Vec<EventCallback>,
394    /// Heartbeat timeout in seconds
395    heartbeat_timeout_secs: i64,
396}
397
398impl Default for AgentCoordinator {
399    fn default() -> Self {
400        Self::new()
401    }
402}
403
404impl AgentCoordinator {
405    /// Create a new coordinator
406    pub fn new() -> Self {
407        Self {
408            agents: HashMap::new(),
409            task_assignments: HashMap::new(),
410            resource_dependencies: HashMap::new(),
411            resource_holders: HashMap::new(),
412            sync_barriers: HashMap::new(),
413            round_robin_index: 0,
414            event_callbacks: Vec::new(),
415            heartbeat_timeout_secs: 15,
416        }
417    }
418
419    /// Set heartbeat timeout
420    pub fn with_heartbeat_timeout(mut self, secs: i64) -> Self {
421        self.heartbeat_timeout_secs = secs;
422        self
423    }
424
425    // ========================================================================
426    // Agent Management
427    // ========================================================================
428
429    /// Register an agent
430    pub fn register_agent(&mut self, capabilities: AgentCapabilities) -> CoordinatorResult<()> {
431        if self.agents.contains_key(&capabilities.agent_id) {
432            return Err(CoordinatorError::AgentAlreadyRegistered(
433                capabilities.agent_id.clone(),
434            ));
435        }
436
437        let agent_id = capabilities.agent_id.clone();
438        self.agents.insert(agent_id.clone(), capabilities.clone());
439        self.emit_event(CoordinatorEvent::AgentRegistered(capabilities));
440
441        Ok(())
442    }
443
444    /// Unregister an agent
445    pub fn unregister_agent(&mut self, agent_id: &str) -> CoordinatorResult<()> {
446        if self.agents.remove(agent_id).is_none() {
447            return Err(CoordinatorError::AgentNotFound(agent_id.to_string()));
448        }
449
450        // Clean up resource dependencies
451        self.resource_dependencies.remove(agent_id);
452
453        // Clean up resource holders
454        self.resource_holders.retain(|_, holder| holder != agent_id);
455
456        self.emit_event(CoordinatorEvent::AgentUnregistered {
457            agent_id: agent_id.to_string(),
458        });
459
460        Ok(())
461    }
462
463    /// Update agent status
464    pub fn update_agent_status(
465        &mut self,
466        agent_id: &str,
467        status: AgentStatus,
468    ) -> CoordinatorResult<()> {
469        let agent = self
470            .agents
471            .get_mut(agent_id)
472            .ok_or_else(|| CoordinatorError::AgentNotFound(agent_id.to_string()))?;
473
474        agent.status = status;
475        agent.last_heartbeat = Utc::now();
476
477        self.emit_event(CoordinatorEvent::AgentStatusChanged {
478            agent_id: agent_id.to_string(),
479            status,
480        });
481
482        Ok(())
483    }
484
485    /// Update agent heartbeat
486    pub fn heartbeat(&mut self, agent_id: &str) -> CoordinatorResult<()> {
487        let agent = self
488            .agents
489            .get_mut(agent_id)
490            .ok_or_else(|| CoordinatorError::AgentNotFound(agent_id.to_string()))?;
491
492        agent.last_heartbeat = Utc::now();
493
494        // If agent was offline, bring it back
495        if agent.status == AgentStatus::Offline {
496            agent.status = if agent.current_tasks == 0 {
497                AgentStatus::Idle
498            } else {
499                AgentStatus::Busy
500            };
501        }
502
503        Ok(())
504    }
505
506    /// Get agent by ID
507    pub fn get_agent(&self, agent_id: &str) -> Option<&AgentCapabilities> {
508        self.agents.get(agent_id)
509    }
510
511    /// Get mutable agent by ID
512    pub fn get_agent_mut(&mut self, agent_id: &str) -> Option<&mut AgentCapabilities> {
513        self.agents.get_mut(agent_id)
514    }
515
516    /// Get all agents
517    pub fn get_agents(&self) -> Vec<&AgentCapabilities> {
518        self.agents.values().collect()
519    }
520
521    /// Get agents by type
522    pub fn get_agents_by_type(&self, agent_type: &str) -> Vec<&AgentCapabilities> {
523        self.agents
524            .values()
525            .filter(|a| a.agent_type == agent_type)
526            .collect()
527    }
528
529    /// Get agents with capability
530    pub fn get_agents_with_capability(&self, capability: &str) -> Vec<&AgentCapabilities> {
531        self.agents
532            .values()
533            .filter(|a| a.has_capability(capability))
534            .collect()
535    }
536
537    /// Check agent health and mark offline if heartbeat timeout
538    pub fn check_agent_health(&mut self) {
539        let now = Utc::now();
540        let timeout = Duration::seconds(self.heartbeat_timeout_secs);
541
542        let offline_agents: Vec<String> = self
543            .agents
544            .iter()
545            .filter(|(_, agent)| {
546                agent.status != AgentStatus::Offline
547                    && now.signed_duration_since(agent.last_heartbeat) > timeout
548            })
549            .map(|(id, _)| id.clone())
550            .collect();
551
552        for agent_id in offline_agents {
553            if let Some(agent) = self.agents.get_mut(&agent_id) {
554                agent.status = AgentStatus::Offline;
555                self.emit_event(CoordinatorEvent::AgentOffline {
556                    agent_id: agent_id.clone(),
557                });
558            }
559        }
560    }
561
562    // ========================================================================
563    // Task Assignment
564    // ========================================================================
565
566    /// Assign a task to an agent based on criteria
567    pub fn assign_task(
568        &mut self,
569        task: Task,
570        criteria: &AssignmentCriteria,
571    ) -> CoordinatorResult<String> {
572        // Select an agent
573        let agent_id = self.select_agent(criteria)?;
574
575        // Update agent load
576        if let Some(agent) = self.agents.get_mut(&agent_id) {
577            agent.current_tasks += 1;
578            agent.update_load();
579        }
580
581        // Create assignment
582        let assignment = TaskAssignment {
583            task: task.clone(),
584            agent_id: agent_id.clone(),
585            status: TaskStatus::Assigned,
586            assigned_at: Utc::now(),
587            started_at: None,
588            result: None,
589        };
590
591        self.task_assignments.insert(task.id.clone(), assignment);
592
593        self.emit_event(CoordinatorEvent::TaskAssigned {
594            task_id: task.id.clone(),
595            agent_id: agent_id.clone(),
596        });
597
598        Ok(agent_id)
599    }
600
601    /// Select an agent based on criteria
602    fn select_agent(&mut self, criteria: &AssignmentCriteria) -> CoordinatorResult<String> {
603        // Filter candidates
604        let mut candidates: Vec<&AgentCapabilities> = self
605            .agents
606            .values()
607            .filter(|agent| agent.can_accept_task())
608            .collect();
609
610        // Filter by agent type
611        if let Some(ref agent_type) = criteria.required_agent_type {
612            candidates.retain(|agent| &agent.agent_type == agent_type);
613        }
614
615        // Filter by capabilities
616        if !criteria.required_capabilities.is_empty() {
617            candidates.retain(|agent| agent.has_all_capabilities(&criteria.required_capabilities));
618        }
619
620        if candidates.is_empty() {
621            return Err(CoordinatorError::NoSuitableAgent);
622        }
623
624        // Apply load balancing strategy
625        let selected = match criteria.load_balance_strategy {
626            LoadBalanceStrategy::LeastBusy => {
627                candidates.sort_by(|a, b| {
628                    a.current_load
629                        .partial_cmp(&b.current_load)
630                        .unwrap_or(std::cmp::Ordering::Equal)
631                });
632                candidates[0]
633            }
634            LoadBalanceStrategy::RoundRobin => {
635                self.round_robin_index = (self.round_robin_index + 1) % candidates.len();
636                candidates[self.round_robin_index]
637            }
638            LoadBalanceStrategy::Random => {
639                use std::collections::hash_map::DefaultHasher;
640                use std::hash::{Hash, Hasher};
641
642                let mut hasher = DefaultHasher::new();
643                Utc::now().timestamp_nanos_opt().hash(&mut hasher);
644                let index = (hasher.finish() as usize) % candidates.len();
645                candidates[index]
646            }
647            LoadBalanceStrategy::CapabilityMatch => {
648                // Sort by number of matching capabilities (descending)
649                let required = &criteria.required_capabilities;
650                candidates.sort_by(|a, b| {
651                    let a_match = a
652                        .capabilities
653                        .iter()
654                        .filter(|c| required.contains(c))
655                        .count();
656                    let b_match = b
657                        .capabilities
658                        .iter()
659                        .filter(|c| required.contains(c))
660                        .count();
661                    b_match.cmp(&a_match)
662                });
663                candidates[0]
664            }
665        };
666
667        Ok(selected.agent_id.clone())
668    }
669
670    /// Mark a task as started
671    pub fn start_task(&mut self, task_id: &str) -> CoordinatorResult<()> {
672        let assignment = self
673            .task_assignments
674            .get_mut(task_id)
675            .ok_or_else(|| CoordinatorError::TaskNotFound(task_id.to_string()))?;
676
677        if assignment.status != TaskStatus::Assigned {
678            return Err(CoordinatorError::InvalidTaskState(format!(
679                "Task {} is not in Assigned state",
680                task_id
681            )));
682        }
683
684        assignment.status = TaskStatus::Running;
685        assignment.started_at = Some(Utc::now());
686
687        // Clone agent_id before emitting event to avoid borrow issues
688        let agent_id = assignment.agent_id.clone();
689
690        self.emit_event(CoordinatorEvent::TaskStarted {
691            task_id: task_id.to_string(),
692            agent_id,
693        });
694
695        Ok(())
696    }
697
698    /// Complete a task
699    pub fn complete_task(&mut self, task_id: &str, result: TaskResult) -> CoordinatorResult<()> {
700        let assignment = self
701            .task_assignments
702            .get_mut(task_id)
703            .ok_or_else(|| CoordinatorError::TaskNotFound(task_id.to_string()))?;
704
705        // Update agent load
706        if let Some(agent) = self.agents.get_mut(&assignment.agent_id) {
707            agent.current_tasks = agent.current_tasks.saturating_sub(1);
708            agent.update_load();
709        }
710
711        assignment.status = if result.success {
712            TaskStatus::Completed
713        } else {
714            TaskStatus::Failed
715        };
716        assignment.result = Some(result.clone());
717
718        self.emit_event(CoordinatorEvent::TaskCompleted(result));
719
720        Ok(())
721    }
722
723    /// Fail a task
724    pub fn fail_task(&mut self, task_id: &str, error: String) -> CoordinatorResult<()> {
725        let assignment = self
726            .task_assignments
727            .get_mut(task_id)
728            .ok_or_else(|| CoordinatorError::TaskNotFound(task_id.to_string()))?;
729
730        // Update agent load
731        if let Some(agent) = self.agents.get_mut(&assignment.agent_id) {
732            agent.current_tasks = agent.current_tasks.saturating_sub(1);
733            agent.update_load();
734        }
735
736        assignment.status = TaskStatus::Failed;
737
738        self.emit_event(CoordinatorEvent::TaskFailed {
739            task_id: task_id.to_string(),
740            error,
741        });
742
743        Ok(())
744    }
745
746    /// Get task assignment
747    pub fn get_task(&self, task_id: &str) -> Option<(&Task, TaskStatus)> {
748        self.task_assignments
749            .get(task_id)
750            .map(|a| (&a.task, a.status))
751    }
752
753    /// Get task result
754    pub fn get_task_result(&self, task_id: &str) -> Option<&TaskResult> {
755        self.task_assignments
756            .get(task_id)
757            .and_then(|a| a.result.as_ref())
758    }
759
760    /// Get tasks assigned to an agent
761    pub fn get_agent_tasks(&self, agent_id: &str) -> Vec<&Task> {
762        self.task_assignments
763            .values()
764            .filter(|a| a.agent_id == agent_id)
765            .map(|a| &a.task)
766            .collect()
767    }
768
769    /// Get pending tasks
770    pub fn get_pending_tasks(&self) -> Vec<&Task> {
771        self.task_assignments
772            .values()
773            .filter(|a| a.status == TaskStatus::Pending || a.status == TaskStatus::Assigned)
774            .map(|a| &a.task)
775            .collect()
776    }
777
778    /// Get running tasks
779    pub fn get_running_tasks(&self) -> Vec<&Task> {
780        self.task_assignments
781            .values()
782            .filter(|a| a.status == TaskStatus::Running)
783            .map(|a| &a.task)
784            .collect()
785    }
786
787    // ========================================================================
788    // Resource Dependencies and Deadlock Detection
789    // ========================================================================
790
791    /// Record that an agent is waiting for a resource
792    pub fn record_resource_dependency(&mut self, agent_id: &str, resource: &str) {
793        self.resource_dependencies
794            .entry(agent_id.to_string())
795            .or_default()
796            .insert(resource.to_string());
797    }
798
799    /// Remove a resource dependency
800    pub fn remove_resource_dependency(&mut self, agent_id: &str, resource: &str) {
801        if let Some(resources) = self.resource_dependencies.get_mut(agent_id) {
802            resources.remove(resource);
803            if resources.is_empty() {
804                self.resource_dependencies.remove(agent_id);
805            }
806        }
807    }
808
809    /// Record that an agent holds a resource
810    pub fn record_resource_holder(&mut self, resource: &str, agent_id: &str) {
811        self.resource_holders
812            .insert(resource.to_string(), agent_id.to_string());
813    }
814
815    /// Remove a resource holder
816    pub fn remove_resource_holder(&mut self, resource: &str) {
817        self.resource_holders.remove(resource);
818    }
819
820    /// Detect deadlock using cycle detection in the wait-for graph
821    pub fn detect_deadlock(&self) -> Option<DeadlockInfo> {
822        // Build wait-for graph: agent -> agents it's waiting for
823        let mut wait_for_graph: HashMap<String, HashSet<String>> = HashMap::new();
824
825        for (agent_id, resources) in &self.resource_dependencies {
826            let mut waiting_for = HashSet::new();
827
828            for resource in resources {
829                if let Some(holder) = self.resource_holders.get(resource) {
830                    if holder != agent_id {
831                        waiting_for.insert(holder.clone());
832                    }
833                }
834            }
835
836            if !waiting_for.is_empty() {
837                wait_for_graph.insert(agent_id.clone(), waiting_for);
838            }
839        }
840
841        // Detect cycle using DFS
842        if let Some(cycle) = self.detect_cycle(&wait_for_graph) {
843            // Build deadlock info
844            let mut involved_resources = HashSet::new();
845            let mut dependency_chain = Vec::new();
846
847            for i in 0..cycle.len() {
848                let agent = &cycle[i];
849                let next_agent = &cycle[(i + 1) % cycle.len()];
850
851                // Find the resource this agent is waiting for from next_agent
852                if let Some(resources) = self.resource_dependencies.get(agent) {
853                    for resource in resources {
854                        if let Some(holder) = self.resource_holders.get(resource) {
855                            if holder == next_agent {
856                                involved_resources.insert(resource.clone());
857                                dependency_chain.push(DependencyLink {
858                                    agent: agent.clone(),
859                                    waiting_for: next_agent.clone(),
860                                    resource: resource.clone(),
861                                });
862                                break;
863                            }
864                        }
865                    }
866                }
867            }
868
869            let deadlock_info = DeadlockInfo {
870                detected_at: Utc::now(),
871                involved_agents: cycle,
872                involved_resources: involved_resources.into_iter().collect(),
873                dependency_chain,
874            };
875
876            return Some(deadlock_info);
877        }
878
879        None
880    }
881
882    /// Detect cycle in the wait-for graph using DFS
883    fn detect_cycle(&self, graph: &HashMap<String, HashSet<String>>) -> Option<Vec<String>> {
884        let mut visited = HashSet::new();
885        let mut rec_stack = HashSet::new();
886        let mut path = Vec::new();
887
888        for node in graph.keys() {
889            if !visited.contains(node) {
890                if let Some(cycle) =
891                    self.dfs_cycle(node, graph, &mut visited, &mut rec_stack, &mut path)
892                {
893                    return Some(cycle);
894                }
895            }
896        }
897
898        None
899    }
900
901    /// DFS helper for cycle detection
902    fn dfs_cycle(
903        &self,
904        node: &str,
905        graph: &HashMap<String, HashSet<String>>,
906        visited: &mut HashSet<String>,
907        rec_stack: &mut HashSet<String>,
908        path: &mut Vec<String>,
909    ) -> Option<Vec<String>> {
910        visited.insert(node.to_string());
911        rec_stack.insert(node.to_string());
912        path.push(node.to_string());
913
914        if let Some(neighbors) = graph.get(node) {
915            for neighbor in neighbors {
916                if !visited.contains(neighbor) {
917                    if let Some(cycle) = self.dfs_cycle(neighbor, graph, visited, rec_stack, path) {
918                        return Some(cycle);
919                    }
920                } else if rec_stack.contains(neighbor) {
921                    // Found a cycle - extract it from path
922                    let cycle_start = path.iter().position(|n| n == neighbor).unwrap();
923                    return Some(path[cycle_start..].to_vec());
924                }
925            }
926        }
927
928        rec_stack.remove(node);
929        path.pop();
930        None
931    }
932
933    // ========================================================================
934    // Synchronization
935    // ========================================================================
936
937    /// Create a synchronization barrier for agents
938    pub fn create_sync_barrier(&mut self, agent_ids: Vec<String>) -> String {
939        let barrier_id = Uuid::new_v4().to_string();
940        let barrier = SyncBarrier {
941            id: barrier_id.clone(),
942            agent_ids: agent_ids.into_iter().collect(),
943            arrived: HashSet::new(),
944            created_at: Utc::now(),
945        };
946        self.sync_barriers.insert(barrier_id.clone(), barrier);
947        barrier_id
948    }
949
950    /// Agent arrives at a barrier
951    pub fn arrive_at_barrier(
952        &mut self,
953        barrier_id: &str,
954        agent_id: &str,
955    ) -> CoordinatorResult<bool> {
956        let barrier = self
957            .sync_barriers
958            .get_mut(barrier_id)
959            .ok_or_else(|| CoordinatorError::TaskNotFound(format!("Barrier {}", barrier_id)))?;
960
961        if !barrier.agent_ids.contains(agent_id) {
962            return Err(CoordinatorError::AgentNotFound(agent_id.to_string()));
963        }
964
965        barrier.arrived.insert(agent_id.to_string());
966
967        // Check if all agents have arrived
968        let all_arrived = barrier.arrived.len() == barrier.agent_ids.len();
969
970        if all_arrived {
971            self.emit_event(CoordinatorEvent::SyncBarrierReached {
972                barrier_id: barrier_id.to_string(),
973            });
974        }
975
976        Ok(all_arrived)
977    }
978
979    /// Check if all agents have arrived at a barrier
980    pub fn is_barrier_reached(&self, barrier_id: &str) -> bool {
981        self.sync_barriers
982            .get(barrier_id)
983            .map(|b| b.arrived.len() == b.agent_ids.len())
984            .unwrap_or(false)
985    }
986
987    /// Remove a barrier
988    pub fn remove_barrier(&mut self, barrier_id: &str) {
989        self.sync_barriers.remove(barrier_id);
990    }
991
992    /// Get agents that haven't arrived at a barrier
993    pub fn get_pending_agents(&self, barrier_id: &str) -> Vec<String> {
994        self.sync_barriers
995            .get(barrier_id)
996            .map(|b| b.agent_ids.difference(&b.arrived).cloned().collect())
997            .unwrap_or_default()
998    }
999
1000    // ========================================================================
1001    // Statistics and Events
1002    // ========================================================================
1003
1004    /// Get coordinator statistics
1005    pub fn get_stats(&self) -> CoordinatorStats {
1006        let agents: Vec<&AgentCapabilities> = self.agents.values().collect();
1007        let active_agents = agents
1008            .iter()
1009            .filter(|a| a.status != AgentStatus::Offline)
1010            .count();
1011        let offline_agents = agents.len() - active_agents;
1012
1013        let total_load: f64 = agents.iter().map(|a| a.current_load).sum();
1014        let average_load = if agents.is_empty() {
1015            0.0
1016        } else {
1017            total_load / agents.len() as f64
1018        };
1019
1020        let mut pending_tasks = 0;
1021        let mut running_tasks = 0;
1022        let mut completed_tasks = 0;
1023        let mut failed_tasks = 0;
1024
1025        for assignment in self.task_assignments.values() {
1026            match assignment.status {
1027                TaskStatus::Pending | TaskStatus::Assigned => pending_tasks += 1,
1028                TaskStatus::Running => running_tasks += 1,
1029                TaskStatus::Completed => completed_tasks += 1,
1030                TaskStatus::Failed | TaskStatus::Timeout => failed_tasks += 1,
1031            }
1032        }
1033
1034        CoordinatorStats {
1035            total_agents: agents.len(),
1036            active_agents,
1037            offline_agents,
1038            total_tasks: self.task_assignments.len(),
1039            pending_tasks,
1040            running_tasks,
1041            completed_tasks,
1042            failed_tasks,
1043            average_load,
1044        }
1045    }
1046
1047    /// Register an event callback
1048    pub fn on_event<F>(&mut self, callback: F)
1049    where
1050        F: Fn(&CoordinatorEvent) + Send + Sync + 'static,
1051    {
1052        self.event_callbacks.push(Box::new(callback));
1053    }
1054
1055    /// Emit an event to all callbacks
1056    fn emit_event(&self, event: CoordinatorEvent) {
1057        for callback in &self.event_callbacks {
1058            callback(&event);
1059        }
1060    }
1061
1062    /// Clear all event callbacks
1063    pub fn clear_event_callbacks(&mut self) {
1064        self.event_callbacks.clear();
1065    }
1066}
1067
1068// ============================================================================
1069// Unit Tests
1070// ============================================================================
1071
1072#[cfg(test)]
1073mod tests {
1074    use super::*;
1075    use serde_json::json;
1076
1077    #[test]
1078    fn test_agent_registration() {
1079        let mut coordinator = AgentCoordinator::new();
1080
1081        let agent = AgentCapabilities::new("agent1", "explore")
1082            .with_capabilities(vec!["search".to_string(), "read".to_string()]);
1083
1084        coordinator.register_agent(agent.clone()).unwrap();
1085
1086        assert!(coordinator.get_agent("agent1").is_some());
1087        assert_eq!(coordinator.get_agents().len(), 1);
1088
1089        // Duplicate registration should fail
1090        assert!(coordinator.register_agent(agent).is_err());
1091    }
1092
1093    #[test]
1094    fn test_agent_unregistration() {
1095        let mut coordinator = AgentCoordinator::new();
1096
1097        let agent = AgentCapabilities::new("agent1", "explore");
1098        coordinator.register_agent(agent).unwrap();
1099
1100        coordinator.unregister_agent("agent1").unwrap();
1101        assert!(coordinator.get_agent("agent1").is_none());
1102
1103        // Unregistering non-existent agent should fail
1104        assert!(coordinator.unregister_agent("agent1").is_err());
1105    }
1106
1107    #[test]
1108    fn test_task_assignment_least_busy() {
1109        let mut coordinator = AgentCoordinator::new();
1110
1111        // Register two agents with different loads
1112        let mut agent1 = AgentCapabilities::new("agent1", "worker").with_max_concurrent_tasks(10);
1113        agent1.current_tasks = 5;
1114        agent1.update_load();
1115
1116        let agent2 = AgentCapabilities::new("agent2", "worker").with_max_concurrent_tasks(10);
1117
1118        coordinator.register_agent(agent1).unwrap();
1119        coordinator.register_agent(agent2).unwrap();
1120
1121        let task = Task::new("test", json!({}));
1122        let criteria = AssignmentCriteria::new().with_strategy(LoadBalanceStrategy::LeastBusy);
1123
1124        let assigned_agent = coordinator.assign_task(task, &criteria).unwrap();
1125
1126        // Should assign to agent2 (less busy)
1127        assert_eq!(assigned_agent, "agent2");
1128    }
1129
1130    #[test]
1131    fn test_task_assignment_by_type() {
1132        let mut coordinator = AgentCoordinator::new();
1133
1134        let agent1 = AgentCapabilities::new("agent1", "explore");
1135        let agent2 = AgentCapabilities::new("agent2", "plan");
1136
1137        coordinator.register_agent(agent1).unwrap();
1138        coordinator.register_agent(agent2).unwrap();
1139
1140        let task = Task::new("test", json!({}));
1141        let criteria = AssignmentCriteria::new().with_agent_type("plan");
1142
1143        let assigned_agent = coordinator.assign_task(task, &criteria).unwrap();
1144        assert_eq!(assigned_agent, "agent2");
1145    }
1146
1147    #[test]
1148    fn test_task_assignment_by_capability() {
1149        let mut coordinator = AgentCoordinator::new();
1150
1151        let agent1 =
1152            AgentCapabilities::new("agent1", "worker").with_capabilities(vec!["read".to_string()]);
1153        let agent2 = AgentCapabilities::new("agent2", "worker")
1154            .with_capabilities(vec!["read".to_string(), "write".to_string()]);
1155
1156        coordinator.register_agent(agent1).unwrap();
1157        coordinator.register_agent(agent2).unwrap();
1158
1159        let task = Task::new("test", json!({}));
1160        let criteria = AssignmentCriteria::new()
1161            .with_capabilities(vec!["read".to_string(), "write".to_string()]);
1162
1163        let assigned_agent = coordinator.assign_task(task, &criteria).unwrap();
1164        assert_eq!(assigned_agent, "agent2");
1165    }
1166
1167    #[test]
1168    fn test_no_suitable_agent() {
1169        let mut coordinator = AgentCoordinator::new();
1170
1171        let agent = AgentCapabilities::new("agent1", "explore");
1172        coordinator.register_agent(agent).unwrap();
1173
1174        let task = Task::new("test", json!({}));
1175        let criteria = AssignmentCriteria::new().with_agent_type("plan");
1176
1177        let result = coordinator.assign_task(task, &criteria);
1178        assert!(matches!(result, Err(CoordinatorError::NoSuitableAgent)));
1179    }
1180
1181    #[test]
1182    fn test_task_lifecycle() {
1183        let mut coordinator = AgentCoordinator::new();
1184
1185        let agent = AgentCapabilities::new("agent1", "worker").with_max_concurrent_tasks(5);
1186        coordinator.register_agent(agent).unwrap();
1187
1188        let task = Task::new("test", json!({})).with_id("task1");
1189        let criteria = AssignmentCriteria::new();
1190
1191        coordinator.assign_task(task, &criteria).unwrap();
1192
1193        // Check task is assigned
1194        let (_, status) = coordinator.get_task("task1").unwrap();
1195        assert_eq!(status, TaskStatus::Assigned);
1196
1197        // Start task
1198        coordinator.start_task("task1").unwrap();
1199        let (_, status) = coordinator.get_task("task1").unwrap();
1200        assert_eq!(status, TaskStatus::Running);
1201
1202        // Complete task
1203        let result = TaskResult {
1204            task_id: "task1".to_string(),
1205            agent_id: "agent1".to_string(),
1206            success: true,
1207            result: Some(json!({"output": "done"})),
1208            error: None,
1209            start_time: Utc::now(),
1210            end_time: Utc::now(),
1211            duration_ms: 100,
1212        };
1213        coordinator.complete_task("task1", result).unwrap();
1214
1215        let (_, status) = coordinator.get_task("task1").unwrap();
1216        assert_eq!(status, TaskStatus::Completed);
1217    }
1218
1219    #[test]
1220    fn test_deadlock_detection() {
1221        let mut coordinator = AgentCoordinator::new();
1222
1223        // Create a circular wait scenario:
1224        // agent1 holds resource1, waits for resource2
1225        // agent2 holds resource2, waits for resource1
1226
1227        coordinator.record_resource_holder("resource1", "agent1");
1228        coordinator.record_resource_holder("resource2", "agent2");
1229
1230        coordinator.record_resource_dependency("agent1", "resource2");
1231        coordinator.record_resource_dependency("agent2", "resource1");
1232
1233        let deadlock = coordinator.detect_deadlock();
1234        assert!(deadlock.is_some());
1235
1236        let info = deadlock.unwrap();
1237        assert_eq!(info.involved_agents.len(), 2);
1238        assert!(info.involved_agents.contains(&"agent1".to_string()));
1239        assert!(info.involved_agents.contains(&"agent2".to_string()));
1240    }
1241
1242    #[test]
1243    fn test_no_deadlock() {
1244        let mut coordinator = AgentCoordinator::new();
1245
1246        // No circular wait
1247        coordinator.record_resource_holder("resource1", "agent1");
1248        coordinator.record_resource_dependency("agent2", "resource1");
1249
1250        let deadlock = coordinator.detect_deadlock();
1251        assert!(deadlock.is_none());
1252    }
1253
1254    #[test]
1255    fn test_sync_barrier() {
1256        let mut coordinator = AgentCoordinator::new();
1257
1258        let barrier_id = coordinator.create_sync_barrier(vec![
1259            "agent1".to_string(),
1260            "agent2".to_string(),
1261            "agent3".to_string(),
1262        ]);
1263
1264        // First agent arrives
1265        let all_arrived = coordinator
1266            .arrive_at_barrier(&barrier_id, "agent1")
1267            .unwrap();
1268        assert!(!all_arrived);
1269        assert!(!coordinator.is_barrier_reached(&barrier_id));
1270
1271        // Second agent arrives
1272        let all_arrived = coordinator
1273            .arrive_at_barrier(&barrier_id, "agent2")
1274            .unwrap();
1275        assert!(!all_arrived);
1276
1277        // Third agent arrives
1278        let all_arrived = coordinator
1279            .arrive_at_barrier(&barrier_id, "agent3")
1280            .unwrap();
1281        assert!(all_arrived);
1282        assert!(coordinator.is_barrier_reached(&barrier_id));
1283    }
1284
1285    #[test]
1286    fn test_get_pending_agents() {
1287        let mut coordinator = AgentCoordinator::new();
1288
1289        let barrier_id =
1290            coordinator.create_sync_barrier(vec!["agent1".to_string(), "agent2".to_string()]);
1291
1292        coordinator
1293            .arrive_at_barrier(&barrier_id, "agent1")
1294            .unwrap();
1295
1296        let pending = coordinator.get_pending_agents(&barrier_id);
1297        assert_eq!(pending.len(), 1);
1298        assert!(pending.contains(&"agent2".to_string()));
1299    }
1300
1301    #[test]
1302    fn test_coordinator_stats() {
1303        let mut coordinator = AgentCoordinator::new();
1304
1305        let agent1 = AgentCapabilities::new("agent1", "worker").with_max_concurrent_tasks(5);
1306        let mut agent2 = AgentCapabilities::new("agent2", "worker").with_max_concurrent_tasks(5);
1307        agent2.status = AgentStatus::Offline;
1308
1309        coordinator.register_agent(agent1).unwrap();
1310        coordinator.register_agent(agent2).unwrap();
1311
1312        let stats = coordinator.get_stats();
1313        assert_eq!(stats.total_agents, 2);
1314        assert_eq!(stats.active_agents, 1);
1315        assert_eq!(stats.offline_agents, 1);
1316    }
1317}