Skip to main content

ai_session/ccswarm/
mod.rs

1//! ccswarm integration module - Advanced AI agent coordination
2
3use anyhow::Result;
4use serde::{Deserialize, Serialize};
5use std::collections::HashMap;
6use std::sync::Arc;
7use tokio::sync::{RwLock, mpsc};
8use uuid::Uuid;
9
10use crate::agent::{AgentTask, AgentType, TaskType};
11
12/// ccswarm agent coordinator
13pub struct AgentCoordinator {
14    /// Agent registry
15    agents: Arc<RwLock<HashMap<String, AgentInfo>>>,
16    /// Task queue
17    task_queue: Arc<RwLock<TaskQueue>>,
18    /// Message bus
19    message_bus: mpsc::Sender<CoordinationMessage>,
20    /// Session manager reference
21    _session_manager: Arc<crate::SessionManager>,
22}
23
24/// Agent information
25#[derive(Debug, Clone, Serialize, Deserialize)]
26pub struct AgentInfo {
27    /// Agent ID
28    pub id: String,
29    /// Agent name
30    pub name: String,
31    /// Agent type
32    pub agent_type: AgentType,
33    /// Session ID
34    pub session_id: String,
35    /// Server URL
36    pub server_url: String,
37    /// Current status
38    pub status: AgentStatus,
39    /// Capabilities
40    pub capabilities: Vec<TaskType>,
41    /// Current workload
42    pub current_tasks: Vec<String>,
43}
44
45/// Agent status
46#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
47pub enum AgentStatus {
48    /// Ready to accept tasks
49    Available,
50    /// Currently working
51    Busy,
52    /// Temporarily unavailable
53    Paused,
54    /// Not responding
55    Offline,
56    /// Error state
57    Error,
58}
59
60/// Task queue for coordinating work
61pub struct TaskQueue {
62    /// Pending tasks
63    pending: Vec<AgentTask>,
64    /// Active tasks
65    active: HashMap<String, TaskAssignment>,
66    /// Completed tasks
67    completed: Vec<CompletedTask>,
68    /// Task dependencies
69    dependencies: HashMap<String, Vec<String>>,
70}
71
72/// Task assignment details
73#[derive(Debug, Clone, Serialize, Deserialize)]
74pub struct TaskAssignment {
75    /// Task
76    pub task: AgentTask,
77    /// Assigned agent
78    pub agent_id: String,
79    /// Assignment time
80    pub assigned_at: chrono::DateTime<chrono::Utc>,
81    /// Start time
82    pub started_at: Option<chrono::DateTime<chrono::Utc>>,
83    /// Retry count
84    pub retry_count: u32,
85}
86
87/// Completed task record
88#[derive(Debug, Clone, Serialize, Deserialize)]
89pub struct CompletedTask {
90    /// Original task
91    pub task: AgentTask,
92    /// Agent that completed it
93    pub agent_id: String,
94    /// Completion time
95    pub completed_at: chrono::DateTime<chrono::Utc>,
96    /// Result
97    pub result: TaskResult,
98    /// Total time in ms
99    pub duration_ms: u64,
100}
101
102/// Task result
103#[derive(Debug, Clone, Serialize, Deserialize)]
104pub struct TaskResult {
105    /// Success status
106    pub success: bool,
107    /// Output
108    pub output: Option<String>,
109    /// Error message
110    pub error: Option<String>,
111    /// Artifacts produced
112    pub artifacts: HashMap<String, serde_json::Value>,
113}
114
115/// Coordination message
116#[derive(Debug, Clone, Serialize, Deserialize)]
117pub struct CoordinationMessage {
118    /// Message ID
119    pub id: String,
120    /// Message type
121    pub msg_type: CoordinationMessageType,
122    /// Source agent
123    pub from: String,
124    /// Target agent (optional)
125    pub to: Option<String>,
126    /// Payload
127    pub payload: serde_json::Value,
128    /// Timestamp
129    pub timestamp: chrono::DateTime<chrono::Utc>,
130}
131
132/// Coordination message types
133#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
134pub enum CoordinationMessageType {
135    /// Agent registration
136    AgentRegistration,
137    /// Agent status update
138    StatusUpdate,
139    /// Task assignment
140    TaskAssignment,
141    /// Task accepted
142    TaskAccepted,
143    /// Task rejected
144    TaskRejected,
145    /// Task progress
146    TaskProgress,
147    /// Task completed
148    TaskCompleted,
149    /// Task failed
150    TaskFailed,
151    /// Request for help
152    HelpRequest,
153    /// Knowledge sharing
154    KnowledgeShare,
155    /// System announcement
156    SystemAnnouncement,
157}
158
159impl AgentCoordinator {
160    /// Create new coordinator
161    pub fn new(
162        session_manager: Arc<crate::SessionManager>,
163    ) -> (Self, mpsc::Receiver<CoordinationMessage>) {
164        let (tx, rx) = mpsc::channel(1000);
165
166        let coordinator = Self {
167            agents: Arc::new(RwLock::new(HashMap::new())),
168            task_queue: Arc::new(RwLock::new(TaskQueue {
169                pending: Vec::new(),
170                active: HashMap::new(),
171                completed: Vec::new(),
172                dependencies: HashMap::new(),
173            })),
174            message_bus: tx,
175            _session_manager: session_manager,
176        };
177
178        (coordinator, rx)
179    }
180
181    /// Register an agent
182    pub async fn register_agent(&self, agent_info: AgentInfo) -> Result<()> {
183        let agent_id = agent_info.id.clone();
184
185        let mut agents = self.agents.write().await;
186        agents.insert(agent_id.clone(), agent_info.clone());
187
188        // Send registration message
189        let msg = CoordinationMessage {
190            id: Uuid::new_v4().to_string(),
191            msg_type: CoordinationMessageType::AgentRegistration,
192            from: agent_id,
193            to: None,
194            payload: serde_json::to_value(&agent_info)?,
195            timestamp: chrono::Utc::now(),
196        };
197
198        self.message_bus.send(msg).await?;
199        Ok(())
200    }
201
202    /// Submit a task
203    pub async fn submit_task(&self, task: AgentTask) -> Result<()> {
204        let mut queue = self.task_queue.write().await;
205
206        // Check dependencies
207        if !task.depends_on.is_empty() {
208            queue
209                .dependencies
210                .insert(task.id.clone(), task.depends_on.clone());
211        }
212
213        // Add to pending queue
214        queue.pending.push(task);
215
216        // Trigger task assignment
217        drop(queue);
218        self.assign_pending_tasks().await?;
219
220        Ok(())
221    }
222
223    /// Assign pending tasks to available agents
224    async fn assign_pending_tasks(&self) -> Result<()> {
225        let agents = self.agents.read().await;
226
227        // Find available agents
228        let available_agents: Vec<_> = agents
229            .values()
230            .filter(|a| a.status == AgentStatus::Available)
231            .collect();
232
233        if available_agents.is_empty() {
234            return Ok(());
235        }
236
237        // Get tasks to assign
238        let mut tasks_to_assign = Vec::new();
239        {
240            let queue = self.task_queue.read().await;
241            for task in queue.pending.iter() {
242                // Check dependencies
243                if !self.has_unmet_dependencies(task, &queue.completed).await {
244                    // Find suitable agent
245                    if let Some(agent) = self.find_suitable_agent(task, &available_agents).await {
246                        tasks_to_assign.push((task.clone(), agent.id.clone()));
247                    }
248                }
249            }
250        }
251
252        // Now assign the tasks
253        if !tasks_to_assign.is_empty() {
254            let mut queue = self.task_queue.write().await;
255
256            for (task, agent_id) in tasks_to_assign {
257                let assignment = TaskAssignment {
258                    task: task.clone(),
259                    agent_id: agent_id.clone(),
260                    assigned_at: chrono::Utc::now(),
261                    started_at: None,
262                    retry_count: 0,
263                };
264
265                queue.active.insert(task.id.clone(), assignment.clone());
266                queue.pending.retain(|t| t.id != task.id);
267
268                // Send assignment message
269                let msg = CoordinationMessage {
270                    id: Uuid::new_v4().to_string(),
271                    msg_type: CoordinationMessageType::TaskAssignment,
272                    from: "coordinator".to_string(),
273                    to: Some(agent_id),
274                    payload: serde_json::to_value(&assignment)?,
275                    timestamp: chrono::Utc::now(),
276                };
277
278                self.message_bus.send(msg).await?;
279            }
280        }
281
282        Ok(())
283    }
284
285    /// Check if task has unmet dependencies
286    async fn has_unmet_dependencies(&self, task: &AgentTask, completed: &[CompletedTask]) -> bool {
287        if task.depends_on.is_empty() {
288            return false;
289        }
290
291        let completed_ids: Vec<_> = completed.iter().map(|t| &t.task.id).collect();
292        task.depends_on
293            .iter()
294            .any(|dep| !completed_ids.contains(&dep))
295    }
296
297    /// Find suitable agent for task
298    async fn find_suitable_agent<'a>(
299        &self,
300        task: &AgentTask,
301        agents: &[&'a AgentInfo],
302    ) -> Option<&'a AgentInfo> {
303        agents
304            .iter()
305            .find(|agent| agent.capabilities.contains(&task.task_type))
306            .copied()
307    }
308
309    /// Handle task completion
310    pub async fn complete_task(&self, task_id: String, result: TaskResult) -> Result<()> {
311        let mut queue = self.task_queue.write().await;
312
313        if let Some(assignment) = queue.active.remove(&task_id) {
314            let completed = CompletedTask {
315                task: assignment.task,
316                agent_id: assignment.agent_id.clone(),
317                completed_at: chrono::Utc::now(),
318                duration_ms: assignment
319                    .started_at
320                    .map(|start| (chrono::Utc::now() - start).num_milliseconds() as u64)
321                    .unwrap_or(0),
322                result,
323            };
324
325            queue.completed.push(completed.clone());
326
327            // Send completion message
328            let msg = CoordinationMessage {
329                id: Uuid::new_v4().to_string(),
330                msg_type: CoordinationMessageType::TaskCompleted,
331                from: assignment.agent_id,
332                to: None,
333                payload: serde_json::to_value(&completed)?,
334                timestamp: chrono::Utc::now(),
335            };
336
337            self.message_bus.send(msg).await?;
338
339            // Check if this unblocks any tasks
340            drop(queue);
341            self.assign_pending_tasks().await?;
342        }
343
344        Ok(())
345    }
346
347    /// Get agent status
348    pub async fn get_agent_status(&self, agent_id: &str) -> Option<AgentInfo> {
349        let agents = self.agents.read().await;
350        agents.get(agent_id).cloned()
351    }
352
353    /// Get task queue status
354    pub async fn get_queue_status(&self) -> Result<QueueStatus> {
355        let queue = self.task_queue.read().await;
356
357        Ok(QueueStatus {
358            pending_count: queue.pending.len(),
359            active_count: queue.active.len(),
360            completed_count: queue.completed.len(),
361            pending_tasks: queue.pending.clone(),
362            active_tasks: queue.active.values().cloned().collect(),
363            recent_completed: queue.completed.iter().rev().take(10).cloned().collect(),
364        })
365    }
366}
367
368/// Queue status summary
369#[derive(Debug, Clone, Serialize, Deserialize)]
370pub struct QueueStatus {
371    /// Number of pending tasks
372    pub pending_count: usize,
373    /// Number of active tasks
374    pub active_count: usize,
375    /// Number of completed tasks
376    pub completed_count: usize,
377    /// Pending tasks
378    pub pending_tasks: Vec<AgentTask>,
379    /// Active task assignments
380    pub active_tasks: Vec<TaskAssignment>,
381    /// Recent completed tasks
382    pub recent_completed: Vec<CompletedTask>,
383}
384
385/// Smart prompt builder for different agent types
386pub struct PromptBuilder {
387    agent_type: AgentType,
388    context: HashMap<String, String>,
389}
390
391impl PromptBuilder {
392    /// Create new prompt builder
393    pub fn new(agent_type: AgentType) -> Self {
394        Self {
395            agent_type,
396            context: HashMap::new(),
397        }
398    }
399
400    /// Add context
401    pub fn with_context(mut self, key: &str, value: &str) -> Self {
402        self.context.insert(key.to_string(), value.to_string());
403        self
404    }
405
406    /// Build prompt for task
407    pub fn build_task_prompt(&self, task: &AgentTask) -> String {
408        match self.agent_type {
409            AgentType::ClaudeCode => self.build_claude_prompt(task),
410            AgentType::Aider => self.build_aider_prompt(task),
411            _ => self.build_generic_prompt(task),
412        }
413    }
414
415    fn build_claude_prompt(&self, task: &AgentTask) -> String {
416        let mut prompt = String::new();
417
418        // Add context if available
419        if let Some(project) = self.context.get("project") {
420            prompt.push_str(&format!("Project: {}\n\n", project));
421        }
422
423        // Task description
424        prompt.push_str(&format!("Task: {}\n", task.description));
425
426        // Add specific instructions based on task type
427        match task.task_type {
428            TaskType::CodeGeneration => {
429                prompt.push_str("\nPlease generate the requested code with:\n");
430                prompt.push_str("- Clear documentation\n");
431                prompt.push_str("- Error handling\n");
432                prompt.push_str("- Unit tests\n");
433            }
434            TaskType::CodeReview => {
435                prompt.push_str("\nPlease review the code for:\n");
436                prompt.push_str("- Correctness\n");
437                prompt.push_str("- Performance\n");
438                prompt.push_str("- Security issues\n");
439                prompt.push_str("- Best practices\n");
440            }
441            TaskType::Debugging => {
442                prompt.push_str("\nPlease debug the issue by:\n");
443                prompt.push_str("- Identifying the root cause\n");
444                prompt.push_str("- Suggesting fixes\n");
445                prompt.push_str("- Preventing similar issues\n");
446            }
447            _ => {}
448        }
449
450        prompt
451    }
452
453    fn build_aider_prompt(&self, task: &AgentTask) -> String {
454        // Aider-specific prompt format
455        let mut prompt = format!("/ask {}\n", task.description);
456
457        if let Some(files) = task.parameters.get("files")
458            && let Some(files_list) = files.as_array()
459        {
460            for file in files_list {
461                if let Some(file_str) = file.as_str() {
462                    prompt.push_str(&format!("/add {file_str}\n"));
463                }
464            }
465        }
466
467        prompt
468    }
469
470    fn build_generic_prompt(&self, task: &AgentTask) -> String {
471        task.description.clone()
472    }
473}