1use anyhow::Result;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::{RwLock, mpsc};
8use uuid::Uuid;
9
10use crate::agent::{AgentTask, AgentType, TaskType};
11
12pub struct AgentCoordinator {
14 agents: Arc<RwLock<HashMap<String, AgentInfo>>>,
16 task_queue: Arc<RwLock<TaskQueue>>,
18 message_bus: mpsc::Sender<CoordinationMessage>,
20 _session_manager: Arc<crate::SessionManager>,
22}
23
24#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct AgentInfo {
27 pub id: String,
29 pub name: String,
31 pub agent_type: AgentType,
33 pub session_id: String,
35 pub server_url: String,
37 pub status: AgentStatus,
39 pub capabilities: Vec<TaskType>,
41 pub current_tasks: Vec<String>,
43}
44
45#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
47pub enum AgentStatus {
48 Available,
50 Busy,
52 Paused,
54 Offline,
56 Error,
58}
59
60pub struct TaskQueue {
62 pending: Vec<AgentTask>,
64 active: HashMap<String, TaskAssignment>,
66 completed: Vec<CompletedTask>,
68 dependencies: HashMap<String, Vec<String>>,
70}
71
72#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct TaskAssignment {
75 pub task: AgentTask,
77 pub agent_id: String,
79 pub assigned_at: chrono::DateTime<chrono::Utc>,
81 pub started_at: Option<chrono::DateTime<chrono::Utc>>,
83 pub retry_count: u32,
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct CompletedTask {
90 pub task: AgentTask,
92 pub agent_id: String,
94 pub completed_at: chrono::DateTime<chrono::Utc>,
96 pub result: TaskResult,
98 pub duration_ms: u64,
100}
101
102#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct TaskResult {
105 pub success: bool,
107 pub output: Option<String>,
109 pub error: Option<String>,
111 pub artifacts: HashMap<String, serde_json::Value>,
113}
114
115#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct CoordinationMessage {
118 pub id: String,
120 pub msg_type: CoordinationMessageType,
122 pub from: String,
124 pub to: Option<String>,
126 pub payload: serde_json::Value,
128 pub timestamp: chrono::DateTime<chrono::Utc>,
130}
131
132#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
134pub enum CoordinationMessageType {
135 AgentRegistration,
137 StatusUpdate,
139 TaskAssignment,
141 TaskAccepted,
143 TaskRejected,
145 TaskProgress,
147 TaskCompleted,
149 TaskFailed,
151 HelpRequest,
153 KnowledgeShare,
155 SystemAnnouncement,
157}
158
159impl AgentCoordinator {
160 pub fn new(
162 session_manager: Arc<crate::SessionManager>,
163 ) -> (Self, mpsc::Receiver<CoordinationMessage>) {
164 let (tx, rx) = mpsc::channel(1000);
165
166 let coordinator = Self {
167 agents: Arc::new(RwLock::new(HashMap::new())),
168 task_queue: Arc::new(RwLock::new(TaskQueue {
169 pending: Vec::new(),
170 active: HashMap::new(),
171 completed: Vec::new(),
172 dependencies: HashMap::new(),
173 })),
174 message_bus: tx,
175 _session_manager: session_manager,
176 };
177
178 (coordinator, rx)
179 }
180
181 pub async fn register_agent(&self, agent_info: AgentInfo) -> Result<()> {
183 let agent_id = agent_info.id.clone();
184
185 let mut agents = self.agents.write().await;
186 agents.insert(agent_id.clone(), agent_info.clone());
187
188 let msg = CoordinationMessage {
190 id: Uuid::new_v4().to_string(),
191 msg_type: CoordinationMessageType::AgentRegistration,
192 from: agent_id,
193 to: None,
194 payload: serde_json::to_value(&agent_info)?,
195 timestamp: chrono::Utc::now(),
196 };
197
198 self.message_bus.send(msg).await?;
199 Ok(())
200 }
201
202 pub async fn submit_task(&self, task: AgentTask) -> Result<()> {
204 let mut queue = self.task_queue.write().await;
205
206 if !task.depends_on.is_empty() {
208 queue
209 .dependencies
210 .insert(task.id.clone(), task.depends_on.clone());
211 }
212
213 queue.pending.push(task);
215
216 drop(queue);
218 self.assign_pending_tasks().await?;
219
220 Ok(())
221 }
222
223 async fn assign_pending_tasks(&self) -> Result<()> {
225 let agents = self.agents.read().await;
226
227 let available_agents: Vec<_> = agents
229 .values()
230 .filter(|a| a.status == AgentStatus::Available)
231 .collect();
232
233 if available_agents.is_empty() {
234 return Ok(());
235 }
236
237 let mut tasks_to_assign = Vec::new();
239 {
240 let queue = self.task_queue.read().await;
241 for task in queue.pending.iter() {
242 if !self.has_unmet_dependencies(task, &queue.completed).await {
244 if let Some(agent) = self.find_suitable_agent(task, &available_agents).await {
246 tasks_to_assign.push((task.clone(), agent.id.clone()));
247 }
248 }
249 }
250 }
251
252 if !tasks_to_assign.is_empty() {
254 let mut queue = self.task_queue.write().await;
255
256 for (task, agent_id) in tasks_to_assign {
257 let assignment = TaskAssignment {
258 task: task.clone(),
259 agent_id: agent_id.clone(),
260 assigned_at: chrono::Utc::now(),
261 started_at: None,
262 retry_count: 0,
263 };
264
265 queue.active.insert(task.id.clone(), assignment.clone());
266 queue.pending.retain(|t| t.id != task.id);
267
268 let msg = CoordinationMessage {
270 id: Uuid::new_v4().to_string(),
271 msg_type: CoordinationMessageType::TaskAssignment,
272 from: "coordinator".to_string(),
273 to: Some(agent_id),
274 payload: serde_json::to_value(&assignment)?,
275 timestamp: chrono::Utc::now(),
276 };
277
278 self.message_bus.send(msg).await?;
279 }
280 }
281
282 Ok(())
283 }
284
285 async fn has_unmet_dependencies(&self, task: &AgentTask, completed: &[CompletedTask]) -> bool {
287 if task.depends_on.is_empty() {
288 return false;
289 }
290
291 let completed_ids: Vec<_> = completed.iter().map(|t| &t.task.id).collect();
292 task.depends_on
293 .iter()
294 .any(|dep| !completed_ids.contains(&dep))
295 }
296
297 async fn find_suitable_agent<'a>(
299 &self,
300 task: &AgentTask,
301 agents: &[&'a AgentInfo],
302 ) -> Option<&'a AgentInfo> {
303 agents
304 .iter()
305 .find(|agent| agent.capabilities.contains(&task.task_type))
306 .copied()
307 }
308
309 pub async fn complete_task(&self, task_id: String, result: TaskResult) -> Result<()> {
311 let mut queue = self.task_queue.write().await;
312
313 if let Some(assignment) = queue.active.remove(&task_id) {
314 let completed = CompletedTask {
315 task: assignment.task,
316 agent_id: assignment.agent_id.clone(),
317 completed_at: chrono::Utc::now(),
318 duration_ms: assignment
319 .started_at
320 .map(|start| (chrono::Utc::now() - start).num_milliseconds() as u64)
321 .unwrap_or(0),
322 result,
323 };
324
325 queue.completed.push(completed.clone());
326
327 let msg = CoordinationMessage {
329 id: Uuid::new_v4().to_string(),
330 msg_type: CoordinationMessageType::TaskCompleted,
331 from: assignment.agent_id,
332 to: None,
333 payload: serde_json::to_value(&completed)?,
334 timestamp: chrono::Utc::now(),
335 };
336
337 self.message_bus.send(msg).await?;
338
339 drop(queue);
341 self.assign_pending_tasks().await?;
342 }
343
344 Ok(())
345 }
346
347 pub async fn get_agent_status(&self, agent_id: &str) -> Option<AgentInfo> {
349 let agents = self.agents.read().await;
350 agents.get(agent_id).cloned()
351 }
352
353 pub async fn get_queue_status(&self) -> Result<QueueStatus> {
355 let queue = self.task_queue.read().await;
356
357 Ok(QueueStatus {
358 pending_count: queue.pending.len(),
359 active_count: queue.active.len(),
360 completed_count: queue.completed.len(),
361 pending_tasks: queue.pending.clone(),
362 active_tasks: queue.active.values().cloned().collect(),
363 recent_completed: queue.completed.iter().rev().take(10).cloned().collect(),
364 })
365 }
366}
367
368#[derive(Debug, Clone, Serialize, Deserialize)]
370pub struct QueueStatus {
371 pub pending_count: usize,
373 pub active_count: usize,
375 pub completed_count: usize,
377 pub pending_tasks: Vec<AgentTask>,
379 pub active_tasks: Vec<TaskAssignment>,
381 pub recent_completed: Vec<CompletedTask>,
383}
384
385pub struct PromptBuilder {
387 agent_type: AgentType,
388 context: HashMap<String, String>,
389}
390
391impl PromptBuilder {
392 pub fn new(agent_type: AgentType) -> Self {
394 Self {
395 agent_type,
396 context: HashMap::new(),
397 }
398 }
399
400 pub fn with_context(mut self, key: &str, value: &str) -> Self {
402 self.context.insert(key.to_string(), value.to_string());
403 self
404 }
405
406 pub fn build_task_prompt(&self, task: &AgentTask) -> String {
408 match self.agent_type {
409 AgentType::ClaudeCode => self.build_claude_prompt(task),
410 AgentType::Aider => self.build_aider_prompt(task),
411 _ => self.build_generic_prompt(task),
412 }
413 }
414
415 fn build_claude_prompt(&self, task: &AgentTask) -> String {
416 let mut prompt = String::new();
417
418 if let Some(project) = self.context.get("project") {
420 prompt.push_str(&format!("Project: {}\n\n", project));
421 }
422
423 prompt.push_str(&format!("Task: {}\n", task.description));
425
426 match task.task_type {
428 TaskType::CodeGeneration => {
429 prompt.push_str("\nPlease generate the requested code with:\n");
430 prompt.push_str("- Clear documentation\n");
431 prompt.push_str("- Error handling\n");
432 prompt.push_str("- Unit tests\n");
433 }
434 TaskType::CodeReview => {
435 prompt.push_str("\nPlease review the code for:\n");
436 prompt.push_str("- Correctness\n");
437 prompt.push_str("- Performance\n");
438 prompt.push_str("- Security issues\n");
439 prompt.push_str("- Best practices\n");
440 }
441 TaskType::Debugging => {
442 prompt.push_str("\nPlease debug the issue by:\n");
443 prompt.push_str("- Identifying the root cause\n");
444 prompt.push_str("- Suggesting fixes\n");
445 prompt.push_str("- Preventing similar issues\n");
446 }
447 _ => {}
448 }
449
450 prompt
451 }
452
453 fn build_aider_prompt(&self, task: &AgentTask) -> String {
454 let mut prompt = format!("/ask {}\n", task.description);
456
457 if let Some(files) = task.parameters.get("files")
458 && let Some(files_list) = files.as_array()
459 {
460 for file in files_list {
461 if let Some(file_str) = file.as_str() {
462 prompt.push_str(&format!("/add {file_str}\n"));
463 }
464 }
465 }
466
467 prompt
468 }
469
470 fn build_generic_prompt(&self, task: &AgentTask) -> String {
471 task.description.clone()
472 }
473}