1use chrono::{DateTime, Duration, Utc};
9use serde::{Deserialize, Serialize};
10use serde_json::Value;
11use std::collections::{HashMap, HashSet};
12use thiserror::Error;
13use uuid::Uuid;
14
15#[derive(Debug, Error, Clone, PartialEq)]
21pub enum CoordinatorError {
22 #[error("Agent not found: {0}")]
23 AgentNotFound(String),
24
25 #[error("No suitable agent available for task")]
26 NoSuitableAgent,
27
28 #[error("Task not found: {0}")]
29 TaskNotFound(String),
30
31 #[error("Task timeout: {0}")]
32 TaskTimeout(String),
33
34 #[error("Synchronization timeout")]
35 SyncTimeout,
36
37 #[error("Deadlock detected")]
38 DeadlockDetected,
39
40 #[error("Agent already registered: {0}")]
41 AgentAlreadyRegistered(String),
42
43 #[error("Invalid task state: {0}")]
44 InvalidTaskState(String),
45}
46
47pub type CoordinatorResult<T> = Result<T, CoordinatorError>;
48
49#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
55pub enum AgentStatus {
56 #[default]
57 Idle,
58 Busy,
59 Offline,
60}
61
62#[derive(Debug, Clone, Serialize, Deserialize)]
64pub struct AgentCapabilities {
65 pub agent_id: String,
67 pub agent_type: String,
69 pub capabilities: Vec<String>,
71 pub current_load: f64,
73 pub max_concurrent_tasks: usize,
75 pub current_tasks: usize,
77 pub status: AgentStatus,
79 pub last_heartbeat: DateTime<Utc>,
81}
82
83impl AgentCapabilities {
84 pub fn new(agent_id: impl Into<String>, agent_type: impl Into<String>) -> Self {
85 Self {
86 agent_id: agent_id.into(),
87 agent_type: agent_type.into(),
88 capabilities: Vec::new(),
89 current_load: 0.0,
90 max_concurrent_tasks: 1,
91 current_tasks: 0,
92 status: AgentStatus::Idle,
93 last_heartbeat: Utc::now(),
94 }
95 }
96
97 pub fn with_capabilities(mut self, capabilities: Vec<String>) -> Self {
98 self.capabilities = capabilities;
99 self
100 }
101
102 pub fn with_max_concurrent_tasks(mut self, max: usize) -> Self {
103 self.max_concurrent_tasks = max;
104 self
105 }
106
107 pub fn has_capability(&self, capability: &str) -> bool {
109 self.capabilities.iter().any(|c| c == capability)
110 }
111
112 pub fn has_all_capabilities(&self, required: &[String]) -> bool {
114 required.iter().all(|r| self.has_capability(r))
115 }
116
117 pub fn can_accept_task(&self) -> bool {
119 self.status != AgentStatus::Offline && self.current_tasks < self.max_concurrent_tasks
120 }
121
122 pub fn update_load(&mut self) {
124 self.current_load = if self.max_concurrent_tasks > 0 {
125 self.current_tasks as f64 / self.max_concurrent_tasks as f64
126 } else {
127 1.0
128 };
129
130 self.status = if self.current_tasks == 0 {
131 AgentStatus::Idle
132 } else {
133 AgentStatus::Busy
134 };
135 }
136}
137
138#[derive(Debug, Clone, Copy, PartialEq, Eq, Default, Serialize, Deserialize)]
140pub enum LoadBalanceStrategy {
141 #[default]
143 LeastBusy,
144 RoundRobin,
146 Random,
148 CapabilityMatch,
150}
151
152#[derive(Debug, Clone, Default, Serialize, Deserialize)]
154pub struct AssignmentCriteria {
155 pub required_agent_type: Option<String>,
157 pub required_capabilities: Vec<String>,
159 pub load_balance_strategy: LoadBalanceStrategy,
161 pub priority: u8,
163 pub timeout_ms: Option<u64>,
165}
166
167impl AssignmentCriteria {
168 pub fn new() -> Self {
169 Self::default()
170 }
171
172 pub fn with_agent_type(mut self, agent_type: impl Into<String>) -> Self {
173 self.required_agent_type = Some(agent_type.into());
174 self
175 }
176
177 pub fn with_capabilities(mut self, capabilities: Vec<String>) -> Self {
178 self.required_capabilities = capabilities;
179 self
180 }
181
182 pub fn with_strategy(mut self, strategy: LoadBalanceStrategy) -> Self {
183 self.load_balance_strategy = strategy;
184 self
185 }
186
187 pub fn with_priority(mut self, priority: u8) -> Self {
188 self.priority = priority.min(10);
189 self
190 }
191
192 pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
193 self.timeout_ms = Some(timeout_ms);
194 self
195 }
196}
197
198#[derive(Debug, Clone, Serialize, Deserialize)]
200pub struct Task {
201 pub id: String,
203 pub task_type: String,
205 pub data: Value,
207 pub priority: u8,
209 pub created_at: DateTime<Utc>,
211 pub timeout_ms: Option<u64>,
213}
214
215impl Task {
216 pub fn new(task_type: impl Into<String>, data: Value) -> Self {
217 Self {
218 id: Uuid::new_v4().to_string(),
219 task_type: task_type.into(),
220 data,
221 priority: 5,
222 created_at: Utc::now(),
223 timeout_ms: None,
224 }
225 }
226
227 pub fn with_id(mut self, id: impl Into<String>) -> Self {
228 self.id = id.into();
229 self
230 }
231
232 pub fn with_priority(mut self, priority: u8) -> Self {
233 self.priority = priority.min(10);
234 self
235 }
236
237 pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
238 self.timeout_ms = Some(timeout_ms);
239 self
240 }
241}
242
243#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
245pub enum TaskStatus {
246 Pending,
247 Assigned,
248 Running,
249 Completed,
250 Failed,
251 Timeout,
252}
253
254#[derive(Debug, Clone, Serialize, Deserialize)]
256pub struct TaskResult {
257 pub task_id: String,
259 pub agent_id: String,
261 pub success: bool,
263 pub result: Option<Value>,
265 pub error: Option<String>,
267 pub start_time: DateTime<Utc>,
269 pub end_time: DateTime<Utc>,
271 pub duration_ms: i64,
273}
274
275#[derive(Debug, Clone)]
277#[allow(dead_code)]
278struct TaskAssignment {
279 task: Task,
280 agent_id: String,
281 status: TaskStatus,
282 assigned_at: DateTime<Utc>,
283 started_at: Option<DateTime<Utc>>,
284 result: Option<TaskResult>,
285}
286
287#[derive(Debug, Clone, Serialize, Deserialize)]
289pub struct DeadlockInfo {
290 pub detected_at: DateTime<Utc>,
292 pub involved_agents: Vec<String>,
294 pub involved_resources: Vec<String>,
296 pub dependency_chain: Vec<DependencyLink>,
298}
299
300#[derive(Debug, Clone, Serialize, Deserialize)]
302pub struct DependencyLink {
303 pub agent: String,
305 pub waiting_for: String,
307 pub resource: String,
309}
310
311#[derive(Debug, Clone)]
313#[allow(dead_code)]
314struct SyncBarrier {
315 id: String,
316 agent_ids: HashSet<String>,
317 arrived: HashSet<String>,
318 created_at: DateTime<Utc>,
319}
320
321#[derive(Debug, Clone, Default, Serialize, Deserialize)]
323pub struct CoordinatorStats {
324 pub total_agents: usize,
325 pub active_agents: usize,
326 pub offline_agents: usize,
327 pub total_tasks: usize,
328 pub pending_tasks: usize,
329 pub running_tasks: usize,
330 pub completed_tasks: usize,
331 pub failed_tasks: usize,
332 pub average_load: f64,
333}
334
335#[derive(Debug, Clone)]
337pub enum CoordinatorEvent {
338 AgentRegistered(AgentCapabilities),
339 AgentUnregistered {
340 agent_id: String,
341 },
342 AgentStatusChanged {
343 agent_id: String,
344 status: AgentStatus,
345 },
346 AgentOffline {
347 agent_id: String,
348 },
349 TaskAssigned {
350 task_id: String,
351 agent_id: String,
352 },
353 TaskStarted {
354 task_id: String,
355 agent_id: String,
356 },
357 TaskCompleted(TaskResult),
358 TaskFailed {
359 task_id: String,
360 error: String,
361 },
362 DeadlockDetected(DeadlockInfo),
363 SyncBarrierReached {
364 barrier_id: String,
365 },
366}
367
368type EventCallback = Box<dyn Fn(&CoordinatorEvent) + Send + Sync>;
374
375pub struct AgentCoordinator {
380 agents: HashMap<String, AgentCapabilities>,
382 task_assignments: HashMap<String, TaskAssignment>,
384 resource_dependencies: HashMap<String, HashSet<String>>,
386 resource_holders: HashMap<String, String>,
388 sync_barriers: HashMap<String, SyncBarrier>,
390 round_robin_index: usize,
392 event_callbacks: Vec<EventCallback>,
394 heartbeat_timeout_secs: i64,
396}
397
398impl Default for AgentCoordinator {
399 fn default() -> Self {
400 Self::new()
401 }
402}
403
404impl AgentCoordinator {
405 pub fn new() -> Self {
407 Self {
408 agents: HashMap::new(),
409 task_assignments: HashMap::new(),
410 resource_dependencies: HashMap::new(),
411 resource_holders: HashMap::new(),
412 sync_barriers: HashMap::new(),
413 round_robin_index: 0,
414 event_callbacks: Vec::new(),
415 heartbeat_timeout_secs: 15,
416 }
417 }
418
419 pub fn with_heartbeat_timeout(mut self, secs: i64) -> Self {
421 self.heartbeat_timeout_secs = secs;
422 self
423 }
424
425 pub fn register_agent(&mut self, capabilities: AgentCapabilities) -> CoordinatorResult<()> {
431 if self.agents.contains_key(&capabilities.agent_id) {
432 return Err(CoordinatorError::AgentAlreadyRegistered(
433 capabilities.agent_id.clone(),
434 ));
435 }
436
437 let agent_id = capabilities.agent_id.clone();
438 self.agents.insert(agent_id.clone(), capabilities.clone());
439 self.emit_event(CoordinatorEvent::AgentRegistered(capabilities));
440
441 Ok(())
442 }
443
444 pub fn unregister_agent(&mut self, agent_id: &str) -> CoordinatorResult<()> {
446 if self.agents.remove(agent_id).is_none() {
447 return Err(CoordinatorError::AgentNotFound(agent_id.to_string()));
448 }
449
450 self.resource_dependencies.remove(agent_id);
452
453 self.resource_holders.retain(|_, holder| holder != agent_id);
455
456 self.emit_event(CoordinatorEvent::AgentUnregistered {
457 agent_id: agent_id.to_string(),
458 });
459
460 Ok(())
461 }
462
463 pub fn update_agent_status(
465 &mut self,
466 agent_id: &str,
467 status: AgentStatus,
468 ) -> CoordinatorResult<()> {
469 let agent = self
470 .agents
471 .get_mut(agent_id)
472 .ok_or_else(|| CoordinatorError::AgentNotFound(agent_id.to_string()))?;
473
474 agent.status = status;
475 agent.last_heartbeat = Utc::now();
476
477 self.emit_event(CoordinatorEvent::AgentStatusChanged {
478 agent_id: agent_id.to_string(),
479 status,
480 });
481
482 Ok(())
483 }
484
485 pub fn heartbeat(&mut self, agent_id: &str) -> CoordinatorResult<()> {
487 let agent = self
488 .agents
489 .get_mut(agent_id)
490 .ok_or_else(|| CoordinatorError::AgentNotFound(agent_id.to_string()))?;
491
492 agent.last_heartbeat = Utc::now();
493
494 if agent.status == AgentStatus::Offline {
496 agent.status = if agent.current_tasks == 0 {
497 AgentStatus::Idle
498 } else {
499 AgentStatus::Busy
500 };
501 }
502
503 Ok(())
504 }
505
506 pub fn get_agent(&self, agent_id: &str) -> Option<&AgentCapabilities> {
508 self.agents.get(agent_id)
509 }
510
511 pub fn get_agent_mut(&mut self, agent_id: &str) -> Option<&mut AgentCapabilities> {
513 self.agents.get_mut(agent_id)
514 }
515
516 pub fn get_agents(&self) -> Vec<&AgentCapabilities> {
518 self.agents.values().collect()
519 }
520
521 pub fn get_agents_by_type(&self, agent_type: &str) -> Vec<&AgentCapabilities> {
523 self.agents
524 .values()
525 .filter(|a| a.agent_type == agent_type)
526 .collect()
527 }
528
529 pub fn get_agents_with_capability(&self, capability: &str) -> Vec<&AgentCapabilities> {
531 self.agents
532 .values()
533 .filter(|a| a.has_capability(capability))
534 .collect()
535 }
536
537 pub fn check_agent_health(&mut self) {
539 let now = Utc::now();
540 let timeout = Duration::seconds(self.heartbeat_timeout_secs);
541
542 let offline_agents: Vec<String> = self
543 .agents
544 .iter()
545 .filter(|(_, agent)| {
546 agent.status != AgentStatus::Offline
547 && now.signed_duration_since(agent.last_heartbeat) > timeout
548 })
549 .map(|(id, _)| id.clone())
550 .collect();
551
552 for agent_id in offline_agents {
553 if let Some(agent) = self.agents.get_mut(&agent_id) {
554 agent.status = AgentStatus::Offline;
555 self.emit_event(CoordinatorEvent::AgentOffline {
556 agent_id: agent_id.clone(),
557 });
558 }
559 }
560 }
561
562 pub fn assign_task(
568 &mut self,
569 task: Task,
570 criteria: &AssignmentCriteria,
571 ) -> CoordinatorResult<String> {
572 let agent_id = self.select_agent(criteria)?;
574
575 if let Some(agent) = self.agents.get_mut(&agent_id) {
577 agent.current_tasks += 1;
578 agent.update_load();
579 }
580
581 let assignment = TaskAssignment {
583 task: task.clone(),
584 agent_id: agent_id.clone(),
585 status: TaskStatus::Assigned,
586 assigned_at: Utc::now(),
587 started_at: None,
588 result: None,
589 };
590
591 self.task_assignments.insert(task.id.clone(), assignment);
592
593 self.emit_event(CoordinatorEvent::TaskAssigned {
594 task_id: task.id.clone(),
595 agent_id: agent_id.clone(),
596 });
597
598 Ok(agent_id)
599 }
600
601 fn select_agent(&mut self, criteria: &AssignmentCriteria) -> CoordinatorResult<String> {
603 let mut candidates: Vec<&AgentCapabilities> = self
605 .agents
606 .values()
607 .filter(|agent| agent.can_accept_task())
608 .collect();
609
610 if let Some(ref agent_type) = criteria.required_agent_type {
612 candidates.retain(|agent| &agent.agent_type == agent_type);
613 }
614
615 if !criteria.required_capabilities.is_empty() {
617 candidates.retain(|agent| agent.has_all_capabilities(&criteria.required_capabilities));
618 }
619
620 if candidates.is_empty() {
621 return Err(CoordinatorError::NoSuitableAgent);
622 }
623
624 let selected = match criteria.load_balance_strategy {
626 LoadBalanceStrategy::LeastBusy => {
627 candidates.sort_by(|a, b| {
628 a.current_load
629 .partial_cmp(&b.current_load)
630 .unwrap_or(std::cmp::Ordering::Equal)
631 });
632 candidates[0]
633 }
634 LoadBalanceStrategy::RoundRobin => {
635 self.round_robin_index = (self.round_robin_index + 1) % candidates.len();
636 candidates[self.round_robin_index]
637 }
638 LoadBalanceStrategy::Random => {
639 use std::collections::hash_map::DefaultHasher;
640 use std::hash::{Hash, Hasher};
641
642 let mut hasher = DefaultHasher::new();
643 Utc::now().timestamp_nanos_opt().hash(&mut hasher);
644 let index = (hasher.finish() as usize) % candidates.len();
645 candidates[index]
646 }
647 LoadBalanceStrategy::CapabilityMatch => {
648 let required = &criteria.required_capabilities;
650 candidates.sort_by(|a, b| {
651 let a_match = a
652 .capabilities
653 .iter()
654 .filter(|c| required.contains(c))
655 .count();
656 let b_match = b
657 .capabilities
658 .iter()
659 .filter(|c| required.contains(c))
660 .count();
661 b_match.cmp(&a_match)
662 });
663 candidates[0]
664 }
665 };
666
667 Ok(selected.agent_id.clone())
668 }
669
670 pub fn start_task(&mut self, task_id: &str) -> CoordinatorResult<()> {
672 let assignment = self
673 .task_assignments
674 .get_mut(task_id)
675 .ok_or_else(|| CoordinatorError::TaskNotFound(task_id.to_string()))?;
676
677 if assignment.status != TaskStatus::Assigned {
678 return Err(CoordinatorError::InvalidTaskState(format!(
679 "Task {} is not in Assigned state",
680 task_id
681 )));
682 }
683
684 assignment.status = TaskStatus::Running;
685 assignment.started_at = Some(Utc::now());
686
687 let agent_id = assignment.agent_id.clone();
689
690 self.emit_event(CoordinatorEvent::TaskStarted {
691 task_id: task_id.to_string(),
692 agent_id,
693 });
694
695 Ok(())
696 }
697
698 pub fn complete_task(&mut self, task_id: &str, result: TaskResult) -> CoordinatorResult<()> {
700 let assignment = self
701 .task_assignments
702 .get_mut(task_id)
703 .ok_or_else(|| CoordinatorError::TaskNotFound(task_id.to_string()))?;
704
705 if let Some(agent) = self.agents.get_mut(&assignment.agent_id) {
707 agent.current_tasks = agent.current_tasks.saturating_sub(1);
708 agent.update_load();
709 }
710
711 assignment.status = if result.success {
712 TaskStatus::Completed
713 } else {
714 TaskStatus::Failed
715 };
716 assignment.result = Some(result.clone());
717
718 self.emit_event(CoordinatorEvent::TaskCompleted(result));
719
720 Ok(())
721 }
722
723 pub fn fail_task(&mut self, task_id: &str, error: String) -> CoordinatorResult<()> {
725 let assignment = self
726 .task_assignments
727 .get_mut(task_id)
728 .ok_or_else(|| CoordinatorError::TaskNotFound(task_id.to_string()))?;
729
730 if let Some(agent) = self.agents.get_mut(&assignment.agent_id) {
732 agent.current_tasks = agent.current_tasks.saturating_sub(1);
733 agent.update_load();
734 }
735
736 assignment.status = TaskStatus::Failed;
737
738 self.emit_event(CoordinatorEvent::TaskFailed {
739 task_id: task_id.to_string(),
740 error,
741 });
742
743 Ok(())
744 }
745
746 pub fn get_task(&self, task_id: &str) -> Option<(&Task, TaskStatus)> {
748 self.task_assignments
749 .get(task_id)
750 .map(|a| (&a.task, a.status))
751 }
752
753 pub fn get_task_result(&self, task_id: &str) -> Option<&TaskResult> {
755 self.task_assignments
756 .get(task_id)
757 .and_then(|a| a.result.as_ref())
758 }
759
760 pub fn get_agent_tasks(&self, agent_id: &str) -> Vec<&Task> {
762 self.task_assignments
763 .values()
764 .filter(|a| a.agent_id == agent_id)
765 .map(|a| &a.task)
766 .collect()
767 }
768
769 pub fn get_pending_tasks(&self) -> Vec<&Task> {
771 self.task_assignments
772 .values()
773 .filter(|a| a.status == TaskStatus::Pending || a.status == TaskStatus::Assigned)
774 .map(|a| &a.task)
775 .collect()
776 }
777
778 pub fn get_running_tasks(&self) -> Vec<&Task> {
780 self.task_assignments
781 .values()
782 .filter(|a| a.status == TaskStatus::Running)
783 .map(|a| &a.task)
784 .collect()
785 }
786
787 pub fn record_resource_dependency(&mut self, agent_id: &str, resource: &str) {
793 self.resource_dependencies
794 .entry(agent_id.to_string())
795 .or_default()
796 .insert(resource.to_string());
797 }
798
799 pub fn remove_resource_dependency(&mut self, agent_id: &str, resource: &str) {
801 if let Some(resources) = self.resource_dependencies.get_mut(agent_id) {
802 resources.remove(resource);
803 if resources.is_empty() {
804 self.resource_dependencies.remove(agent_id);
805 }
806 }
807 }
808
809 pub fn record_resource_holder(&mut self, resource: &str, agent_id: &str) {
811 self.resource_holders
812 .insert(resource.to_string(), agent_id.to_string());
813 }
814
815 pub fn remove_resource_holder(&mut self, resource: &str) {
817 self.resource_holders.remove(resource);
818 }
819
820 pub fn detect_deadlock(&self) -> Option<DeadlockInfo> {
822 let mut wait_for_graph: HashMap<String, HashSet<String>> = HashMap::new();
824
825 for (agent_id, resources) in &self.resource_dependencies {
826 let mut waiting_for = HashSet::new();
827
828 for resource in resources {
829 if let Some(holder) = self.resource_holders.get(resource) {
830 if holder != agent_id {
831 waiting_for.insert(holder.clone());
832 }
833 }
834 }
835
836 if !waiting_for.is_empty() {
837 wait_for_graph.insert(agent_id.clone(), waiting_for);
838 }
839 }
840
841 if let Some(cycle) = self.detect_cycle(&wait_for_graph) {
843 let mut involved_resources = HashSet::new();
845 let mut dependency_chain = Vec::new();
846
847 for i in 0..cycle.len() {
848 let agent = &cycle[i];
849 let next_agent = &cycle[(i + 1) % cycle.len()];
850
851 if let Some(resources) = self.resource_dependencies.get(agent) {
853 for resource in resources {
854 if let Some(holder) = self.resource_holders.get(resource) {
855 if holder == next_agent {
856 involved_resources.insert(resource.clone());
857 dependency_chain.push(DependencyLink {
858 agent: agent.clone(),
859 waiting_for: next_agent.clone(),
860 resource: resource.clone(),
861 });
862 break;
863 }
864 }
865 }
866 }
867 }
868
869 let deadlock_info = DeadlockInfo {
870 detected_at: Utc::now(),
871 involved_agents: cycle,
872 involved_resources: involved_resources.into_iter().collect(),
873 dependency_chain,
874 };
875
876 return Some(deadlock_info);
877 }
878
879 None
880 }
881
882 fn detect_cycle(&self, graph: &HashMap<String, HashSet<String>>) -> Option<Vec<String>> {
884 let mut visited = HashSet::new();
885 let mut rec_stack = HashSet::new();
886 let mut path = Vec::new();
887
888 for node in graph.keys() {
889 if !visited.contains(node) {
890 if let Some(cycle) =
891 self.dfs_cycle(node, graph, &mut visited, &mut rec_stack, &mut path)
892 {
893 return Some(cycle);
894 }
895 }
896 }
897
898 None
899 }
900
901 fn dfs_cycle(
903 &self,
904 node: &str,
905 graph: &HashMap<String, HashSet<String>>,
906 visited: &mut HashSet<String>,
907 rec_stack: &mut HashSet<String>,
908 path: &mut Vec<String>,
909 ) -> Option<Vec<String>> {
910 visited.insert(node.to_string());
911 rec_stack.insert(node.to_string());
912 path.push(node.to_string());
913
914 if let Some(neighbors) = graph.get(node) {
915 for neighbor in neighbors {
916 if !visited.contains(neighbor) {
917 if let Some(cycle) = self.dfs_cycle(neighbor, graph, visited, rec_stack, path) {
918 return Some(cycle);
919 }
920 } else if rec_stack.contains(neighbor) {
921 let cycle_start = path.iter().position(|n| n == neighbor).unwrap();
923 return Some(path[cycle_start..].to_vec());
924 }
925 }
926 }
927
928 rec_stack.remove(node);
929 path.pop();
930 None
931 }
932
933 pub fn create_sync_barrier(&mut self, agent_ids: Vec<String>) -> String {
939 let barrier_id = Uuid::new_v4().to_string();
940 let barrier = SyncBarrier {
941 id: barrier_id.clone(),
942 agent_ids: agent_ids.into_iter().collect(),
943 arrived: HashSet::new(),
944 created_at: Utc::now(),
945 };
946 self.sync_barriers.insert(barrier_id.clone(), barrier);
947 barrier_id
948 }
949
950 pub fn arrive_at_barrier(
952 &mut self,
953 barrier_id: &str,
954 agent_id: &str,
955 ) -> CoordinatorResult<bool> {
956 let barrier = self
957 .sync_barriers
958 .get_mut(barrier_id)
959 .ok_or_else(|| CoordinatorError::TaskNotFound(format!("Barrier {}", barrier_id)))?;
960
961 if !barrier.agent_ids.contains(agent_id) {
962 return Err(CoordinatorError::AgentNotFound(agent_id.to_string()));
963 }
964
965 barrier.arrived.insert(agent_id.to_string());
966
967 let all_arrived = barrier.arrived.len() == barrier.agent_ids.len();
969
970 if all_arrived {
971 self.emit_event(CoordinatorEvent::SyncBarrierReached {
972 barrier_id: barrier_id.to_string(),
973 });
974 }
975
976 Ok(all_arrived)
977 }
978
979 pub fn is_barrier_reached(&self, barrier_id: &str) -> bool {
981 self.sync_barriers
982 .get(barrier_id)
983 .map(|b| b.arrived.len() == b.agent_ids.len())
984 .unwrap_or(false)
985 }
986
987 pub fn remove_barrier(&mut self, barrier_id: &str) {
989 self.sync_barriers.remove(barrier_id);
990 }
991
992 pub fn get_pending_agents(&self, barrier_id: &str) -> Vec<String> {
994 self.sync_barriers
995 .get(barrier_id)
996 .map(|b| b.agent_ids.difference(&b.arrived).cloned().collect())
997 .unwrap_or_default()
998 }
999
1000 pub fn get_stats(&self) -> CoordinatorStats {
1006 let agents: Vec<&AgentCapabilities> = self.agents.values().collect();
1007 let active_agents = agents
1008 .iter()
1009 .filter(|a| a.status != AgentStatus::Offline)
1010 .count();
1011 let offline_agents = agents.len() - active_agents;
1012
1013 let total_load: f64 = agents.iter().map(|a| a.current_load).sum();
1014 let average_load = if agents.is_empty() {
1015 0.0
1016 } else {
1017 total_load / agents.len() as f64
1018 };
1019
1020 let mut pending_tasks = 0;
1021 let mut running_tasks = 0;
1022 let mut completed_tasks = 0;
1023 let mut failed_tasks = 0;
1024
1025 for assignment in self.task_assignments.values() {
1026 match assignment.status {
1027 TaskStatus::Pending | TaskStatus::Assigned => pending_tasks += 1,
1028 TaskStatus::Running => running_tasks += 1,
1029 TaskStatus::Completed => completed_tasks += 1,
1030 TaskStatus::Failed | TaskStatus::Timeout => failed_tasks += 1,
1031 }
1032 }
1033
1034 CoordinatorStats {
1035 total_agents: agents.len(),
1036 active_agents,
1037 offline_agents,
1038 total_tasks: self.task_assignments.len(),
1039 pending_tasks,
1040 running_tasks,
1041 completed_tasks,
1042 failed_tasks,
1043 average_load,
1044 }
1045 }
1046
1047 pub fn on_event<F>(&mut self, callback: F)
1049 where
1050 F: Fn(&CoordinatorEvent) + Send + Sync + 'static,
1051 {
1052 self.event_callbacks.push(Box::new(callback));
1053 }
1054
1055 fn emit_event(&self, event: CoordinatorEvent) {
1057 for callback in &self.event_callbacks {
1058 callback(&event);
1059 }
1060 }
1061
1062 pub fn clear_event_callbacks(&mut self) {
1064 self.event_callbacks.clear();
1065 }
1066}
1067
1068#[cfg(test)]
1073mod tests {
1074 use super::*;
1075 use serde_json::json;
1076
1077 #[test]
1078 fn test_agent_registration() {
1079 let mut coordinator = AgentCoordinator::new();
1080
1081 let agent = AgentCapabilities::new("agent1", "explore")
1082 .with_capabilities(vec!["search".to_string(), "read".to_string()]);
1083
1084 coordinator.register_agent(agent.clone()).unwrap();
1085
1086 assert!(coordinator.get_agent("agent1").is_some());
1087 assert_eq!(coordinator.get_agents().len(), 1);
1088
1089 assert!(coordinator.register_agent(agent).is_err());
1091 }
1092
1093 #[test]
1094 fn test_agent_unregistration() {
1095 let mut coordinator = AgentCoordinator::new();
1096
1097 let agent = AgentCapabilities::new("agent1", "explore");
1098 coordinator.register_agent(agent).unwrap();
1099
1100 coordinator.unregister_agent("agent1").unwrap();
1101 assert!(coordinator.get_agent("agent1").is_none());
1102
1103 assert!(coordinator.unregister_agent("agent1").is_err());
1105 }
1106
1107 #[test]
1108 fn test_task_assignment_least_busy() {
1109 let mut coordinator = AgentCoordinator::new();
1110
1111 let mut agent1 = AgentCapabilities::new("agent1", "worker").with_max_concurrent_tasks(10);
1113 agent1.current_tasks = 5;
1114 agent1.update_load();
1115
1116 let agent2 = AgentCapabilities::new("agent2", "worker").with_max_concurrent_tasks(10);
1117
1118 coordinator.register_agent(agent1).unwrap();
1119 coordinator.register_agent(agent2).unwrap();
1120
1121 let task = Task::new("test", json!({}));
1122 let criteria = AssignmentCriteria::new().with_strategy(LoadBalanceStrategy::LeastBusy);
1123
1124 let assigned_agent = coordinator.assign_task(task, &criteria).unwrap();
1125
1126 assert_eq!(assigned_agent, "agent2");
1128 }
1129
1130 #[test]
1131 fn test_task_assignment_by_type() {
1132 let mut coordinator = AgentCoordinator::new();
1133
1134 let agent1 = AgentCapabilities::new("agent1", "explore");
1135 let agent2 = AgentCapabilities::new("agent2", "plan");
1136
1137 coordinator.register_agent(agent1).unwrap();
1138 coordinator.register_agent(agent2).unwrap();
1139
1140 let task = Task::new("test", json!({}));
1141 let criteria = AssignmentCriteria::new().with_agent_type("plan");
1142
1143 let assigned_agent = coordinator.assign_task(task, &criteria).unwrap();
1144 assert_eq!(assigned_agent, "agent2");
1145 }
1146
1147 #[test]
1148 fn test_task_assignment_by_capability() {
1149 let mut coordinator = AgentCoordinator::new();
1150
1151 let agent1 =
1152 AgentCapabilities::new("agent1", "worker").with_capabilities(vec!["read".to_string()]);
1153 let agent2 = AgentCapabilities::new("agent2", "worker")
1154 .with_capabilities(vec!["read".to_string(), "write".to_string()]);
1155
1156 coordinator.register_agent(agent1).unwrap();
1157 coordinator.register_agent(agent2).unwrap();
1158
1159 let task = Task::new("test", json!({}));
1160 let criteria = AssignmentCriteria::new()
1161 .with_capabilities(vec!["read".to_string(), "write".to_string()]);
1162
1163 let assigned_agent = coordinator.assign_task(task, &criteria).unwrap();
1164 assert_eq!(assigned_agent, "agent2");
1165 }
1166
1167 #[test]
1168 fn test_no_suitable_agent() {
1169 let mut coordinator = AgentCoordinator::new();
1170
1171 let agent = AgentCapabilities::new("agent1", "explore");
1172 coordinator.register_agent(agent).unwrap();
1173
1174 let task = Task::new("test", json!({}));
1175 let criteria = AssignmentCriteria::new().with_agent_type("plan");
1176
1177 let result = coordinator.assign_task(task, &criteria);
1178 assert!(matches!(result, Err(CoordinatorError::NoSuitableAgent)));
1179 }
1180
1181 #[test]
1182 fn test_task_lifecycle() {
1183 let mut coordinator = AgentCoordinator::new();
1184
1185 let agent = AgentCapabilities::new("agent1", "worker").with_max_concurrent_tasks(5);
1186 coordinator.register_agent(agent).unwrap();
1187
1188 let task = Task::new("test", json!({})).with_id("task1");
1189 let criteria = AssignmentCriteria::new();
1190
1191 coordinator.assign_task(task, &criteria).unwrap();
1192
1193 let (_, status) = coordinator.get_task("task1").unwrap();
1195 assert_eq!(status, TaskStatus::Assigned);
1196
1197 coordinator.start_task("task1").unwrap();
1199 let (_, status) = coordinator.get_task("task1").unwrap();
1200 assert_eq!(status, TaskStatus::Running);
1201
1202 let result = TaskResult {
1204 task_id: "task1".to_string(),
1205 agent_id: "agent1".to_string(),
1206 success: true,
1207 result: Some(json!({"output": "done"})),
1208 error: None,
1209 start_time: Utc::now(),
1210 end_time: Utc::now(),
1211 duration_ms: 100,
1212 };
1213 coordinator.complete_task("task1", result).unwrap();
1214
1215 let (_, status) = coordinator.get_task("task1").unwrap();
1216 assert_eq!(status, TaskStatus::Completed);
1217 }
1218
1219 #[test]
1220 fn test_deadlock_detection() {
1221 let mut coordinator = AgentCoordinator::new();
1222
1223 coordinator.record_resource_holder("resource1", "agent1");
1228 coordinator.record_resource_holder("resource2", "agent2");
1229
1230 coordinator.record_resource_dependency("agent1", "resource2");
1231 coordinator.record_resource_dependency("agent2", "resource1");
1232
1233 let deadlock = coordinator.detect_deadlock();
1234 assert!(deadlock.is_some());
1235
1236 let info = deadlock.unwrap();
1237 assert_eq!(info.involved_agents.len(), 2);
1238 assert!(info.involved_agents.contains(&"agent1".to_string()));
1239 assert!(info.involved_agents.contains(&"agent2".to_string()));
1240 }
1241
1242 #[test]
1243 fn test_no_deadlock() {
1244 let mut coordinator = AgentCoordinator::new();
1245
1246 coordinator.record_resource_holder("resource1", "agent1");
1248 coordinator.record_resource_dependency("agent2", "resource1");
1249
1250 let deadlock = coordinator.detect_deadlock();
1251 assert!(deadlock.is_none());
1252 }
1253
1254 #[test]
1255 fn test_sync_barrier() {
1256 let mut coordinator = AgentCoordinator::new();
1257
1258 let barrier_id = coordinator.create_sync_barrier(vec![
1259 "agent1".to_string(),
1260 "agent2".to_string(),
1261 "agent3".to_string(),
1262 ]);
1263
1264 let all_arrived = coordinator
1266 .arrive_at_barrier(&barrier_id, "agent1")
1267 .unwrap();
1268 assert!(!all_arrived);
1269 assert!(!coordinator.is_barrier_reached(&barrier_id));
1270
1271 let all_arrived = coordinator
1273 .arrive_at_barrier(&barrier_id, "agent2")
1274 .unwrap();
1275 assert!(!all_arrived);
1276
1277 let all_arrived = coordinator
1279 .arrive_at_barrier(&barrier_id, "agent3")
1280 .unwrap();
1281 assert!(all_arrived);
1282 assert!(coordinator.is_barrier_reached(&barrier_id));
1283 }
1284
1285 #[test]
1286 fn test_get_pending_agents() {
1287 let mut coordinator = AgentCoordinator::new();
1288
1289 let barrier_id =
1290 coordinator.create_sync_barrier(vec!["agent1".to_string(), "agent2".to_string()]);
1291
1292 coordinator
1293 .arrive_at_barrier(&barrier_id, "agent1")
1294 .unwrap();
1295
1296 let pending = coordinator.get_pending_agents(&barrier_id);
1297 assert_eq!(pending.len(), 1);
1298 assert!(pending.contains(&"agent2".to_string()));
1299 }
1300
1301 #[test]
1302 fn test_coordinator_stats() {
1303 let mut coordinator = AgentCoordinator::new();
1304
1305 let agent1 = AgentCapabilities::new("agent1", "worker").with_max_concurrent_tasks(5);
1306 let mut agent2 = AgentCapabilities::new("agent2", "worker").with_max_concurrent_tasks(5);
1307 agent2.status = AgentStatus::Offline;
1308
1309 coordinator.register_agent(agent1).unwrap();
1310 coordinator.register_agent(agent2).unwrap();
1311
1312 let stats = coordinator.get_stats();
1313 assert_eq!(stats.total_agents, 2);
1314 assert_eq!(stats.active_agents, 1);
1315 assert_eq!(stats.offline_agents, 1);
1316 }
1317}