use crate::agents::{
AgentContext, AgentResult, CoordinationMessage,
MessageType, Priority, SpecializedAgent,
};
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use serde_json::{json, Value};
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use uuid::Uuid;
pub struct AgentCoordinator {
agents: Arc<RwLock<HashMap<String, Arc<dyn SpecializedAgent>>>>,
message_queue: Arc<RwLock<VecDeque<CoordinationMessage>>>,
}
impl AgentCoordinator {
pub fn new() -> Self {
Self {
agents: Arc::new(RwLock::new(HashMap::new())),
message_queue: Arc::new(RwLock::new(VecDeque::new())),
consensus_engine: ConsensusEngine::new(),
task_scheduler: TaskScheduler::new(),
state_manager: StateManager::new(),
}
}
pub async fn register_agent(&self, agent: Arc<dyn SpecializedAgent>) {
let mut agents = self.agents.write().await;
agents.insert(agent.id().to_string(), agent);
}
pub async fn coordinate_task(&self, task: CoordinationTask) -> CoordinationResult {
let start_time = std::time::Instant::now();
let selected_agents = self.select_agents_for_task(&task).await;
let coordination_plan = self.create_coordination_plan(&task, &selected_agents).await;
let execution_result = self.execute_coordination_plan(coordination_plan).await;
let final_result = self.aggregate_results(&execution_result).await;
CoordinationResult {
success: final_result.success,
execution_time_ms: start_time.elapsed().as_millis() as u64,
participating_agents: selected_agents.len(),
consensus_achieved: execution_result.consensus_achieved,
fault_tolerance_level: self.calculate_fault_tolerance(&selected_agents),
}
}
async fn route_message(&self, message: CoordinationMessage) -> RoutingResult {
let target_agents = self.determine_message_targets(&message).await;
let mut delivery_results = Vec::new();
for agent_id in &target_agents {
let delivery_result = self.deliver_message_to_agent(agent_id, &message).await;
delivery_results.push((agent_id.clone(), delivery_result));
}
let successful_deliveries = delivery_results.iter()
.filter(|(_, result)| result.success)
.count();
RoutingResult {
message_id: message.id.clone(),
target_agents,
successful_deliveries,
total_attempts: delivery_results.len(),
delivery_success_rate: successful_deliveries as f64 / delivery_results.len() as f64,
}
}
async fn select_agents_for_task(&self, task: &CoordinationTask) -> Vec<String> {
let agents = self.agents.read().await;
agents.values()
.filter(|agent| self.agent_matches_task_requirements(agent, task))
.map(|agent| agent.id().to_string())
.collect()
}
fn agent_matches_task_requirements(&self, agent: &Arc<dyn SpecializedAgent>, task: &CoordinationTask) -> bool {
match task.required_specialization {
Some(specialization) => agent.specialization() == specialization,
None => true,
}
}
async fn create_coordination_plan(&self, task: &CoordinationTask, agents: &[String]) -> CoordinationPlan {
CoordinationPlan {
task_id: task.id.clone(),
participating_agents: agents.to_vec(),
execution_order: self.determine_execution_order(agents).await,
consensus_required: task.requires_consensus,
fault_tolerance_level: self.calculate_fault_tolerance(agents),
timeout_ms: task.timeout_ms,
}
}
async fn determine_execution_order(&self, agents: &[String]) -> ExecutionOrder {
ExecutionOrder::Parallel }
async fn execute_coordination_plan(&self, plan: CoordinationPlan) -> PlanExecutionResult {
let mut agent_results = Vec::new();
for agent_id in &plan.participating_agents {
let agent = {
let agents = self.agents.read().await;
agents.get(agent_id).cloned()
};
if let Some(agent) = agent {
let context = self.create_agent_context(&plan).await;
let result = agent.execute(&context).await;
agent_results.push((agent_id.clone(), result));
}
}
let consensus_achieved = if plan.consensus_required {
self.consensus_engine.check_consensus(&agent_results).await
} else {
true };
PlanExecutionResult {
plan_id: plan.task_id,
agent_results,
consensus_achieved,
execution_time_ms: 0, }
}
async fn aggregate_results(&self, execution_result: &PlanExecutionResult) -> AggregatedResult {
let successful_results: Vec<_> = execution_result.agent_results.iter()
.filter(|(_, result)| result.success)
.collect();
let aggregated_output = if successful_results.is_empty() {
json!({"error": "No successful agent executions"})
} else {
let mut merged = json!({});
for (_, result) in successful_results {
if let Some(output) = result.output.as_object() {
for (key, value) in output {
merged[key] = value.clone();
}
}
}
merged
};
AggregatedResult {
success: !successful_results.is_empty(),
aggregated_output,
participating_agents: execution_result.agent_results.len(),
successful_agents: successful_results.len(),
failed_agents: execution_result.agent_results.len() - successful_results.len(),
}
}
async fn determine_message_targets(&self, message: &CoordinationMessage) -> Vec<String> {
let agents = self.agents.read().await;
if !message.to_agent.is_empty() {
vec![message.to_agent.clone()]
} else {
match message.message_type {
MessageType::StatusUpdate => {
agents.keys().cloned().collect()
}
MessageType::KnowledgeShare => {
self.select_agents_for_knowledge_sharing(&message).await
}
_ => {
agents.keys().cloned().collect()
}
}
}
}
async fn select_agents_for_knowledge_sharing(&self, message: &CoordinationMessage) -> Vec<String> {
let agents = self.agents.read().await;
agents.keys()
.filter(|agent_id| {
if let Some(domain) = message.payload.get("domain").and_then(|v| v.as_str()) {
self.is_agent_interested_in_domain(agent_id, domain)
} else {
true
}
})
.cloned()
.collect()
}
fn is_agent_interested_in_domain(&self, _agent_id: &str, _domain: &str) -> bool {
true
}
async fn deliver_message_to_agent(&self, agent_id: &str, message: &CoordinationMessage) -> DeliveryResult {
let agents = self.agents.read().await;
if let Some(agent) = agents.get(agent_id) {
match agent.coordinate(message.clone()).await.success {
true => DeliveryResult {
success: true,
response_time_ms: 50, error_message: None,
},
false => DeliveryResult {
success: false,
response_time_ms: 25,
error_message: Some("Agent rejected message".to_string()),
},
}
} else {
DeliveryResult {
success: false,
response_time_ms: 0,
error_message: Some("Agent not found".to_string()),
}
}
}
fn calculate_fault_tolerance(&self, agents: &[String]) -> usize {
(agents.len() - 1) / 2
}
async fn create_agent_context(&self, _plan: &CoordinationPlan) -> AgentContext {
AgentContext {
request_id: Uuid::new_v4(),
project_context: ProjectContext {
project_id: "ggen-marketplace-innovations".to_string(),
project_type: ProjectType::Library,
technology_stack: vec!["rust".to_string(), "async".to_string()],
dependencies: vec![],
current_phase: ProjectPhase::Development,
risk_level: RiskLevel::Low,
},
execution_environment: ExecutionEnvironment {
available_resources: ResourceAllocation {
cpu_cores: 8,
memory_mb: 8192,
disk_gb: 100,
},
network_connectivity: NetworkStatus::Online,
cache_status: CacheStatus::Fresh,
external_services: vec![],
},
knowledge_graph: KnowledgeGraph {
triples: vec![],
ontologies: vec!["ggen-marketplace".to_string()],
inference_rules: vec![],
last_updated: chrono::Utc::now(),
},
performance_metrics: PerformanceMetrics {
response_times: vec![100.0, 150.0, 120.0],
error_rates: 0.02,
throughput: 1000,
resource_utilization: ResourceUtilization {
cpu_percent: 45.0,
memory_percent: 60.0,
disk_percent: 30.0,
},
},
user_preferences: UserPreferences {
preferred_languages: vec!["rust".to_string()],
interaction_style: InteractionStyle::Conversational,
automation_level: AutomationLevel::SemiAutomated,
notification_preferences: NotificationSettings {
email: false,
push: true,
in_app: true,
frequency: NotificationFrequency::RealTime,
},
},
}
}
}
use crate::agents::{AgentKnowledge, AgentStatus, CoordinationTask, TaskType, CoordinationPlan,
ExecutionOrder, PlanExecutionResult, AggregatedResult, CoordinationResult, RoutingResult, DeliveryResult};
use crate::agents::{ProjectContext, ProjectType, ProjectPhase, RiskLevel, Dependency,
ExecutionEnvironment, ResourceAllocation, NetworkStatus, CacheStatus, ServiceStatus, ServiceHealth,
KnowledgeGraph, RdfTriple, PerformanceMetrics, ResourceUtilization, UserPreferences,
InteractionStyle, AutomationLevel, NotificationSettings, NotificationFrequency};
use crate::agents::Priority;
use serde_json::Value;