Skip to main content

cortexai_crew/
crew.rs

1//! Crew management
2
3use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use cortexai_agents::AgentEngine;
8use cortexai_core::*;
9
10use crate::process::Process;
11use crate::task_manager::TaskManager;
12
13/// Crew configuration
14#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct CrewConfig {
16    /// Crew name
17    pub name: String,
18
19    /// Crew description
20    pub description: String,
21
22    /// Execution process type
23    pub process: Process,
24
25    /// Maximum concurrent tasks
26    pub max_concurrency: usize,
27
28    /// Verbose logging
29    pub verbose: bool,
30}
31
32impl CrewConfig {
33    pub fn new(name: impl Into<String>) -> Self {
34        Self {
35            name: name.into(),
36            description: String::new(),
37            process: Process::Sequential,
38            max_concurrency: 4,
39            verbose: false,
40        }
41    }
42
43    pub fn with_description(mut self, description: impl Into<String>) -> Self {
44        self.description = description.into();
45        self
46    }
47
48    pub fn with_process(mut self, process: Process) -> Self {
49        self.process = process;
50        self
51    }
52
53    pub fn with_max_concurrency(mut self, max: usize) -> Self {
54        self.max_concurrency = max;
55        self
56    }
57
58    pub fn with_verbose(mut self, verbose: bool) -> Self {
59        self.verbose = verbose;
60        self
61    }
62}
63
64/// Crew of agents working together
65pub struct Crew {
66    config: CrewConfig,
67    engine: Arc<AgentEngine>,
68    task_manager: TaskManager,
69    agents: HashMap<AgentId, AgentConfig>,
70}
71
72impl Crew {
73    /// Create a new crew
74    pub fn new(config: CrewConfig, engine: Arc<AgentEngine>) -> Self {
75        Self {
76            config: config.clone(),
77            engine,
78            task_manager: TaskManager::new(config.max_concurrency),
79            agents: HashMap::new(),
80        }
81    }
82
83    /// Add an agent to the crew
84    pub fn add_agent(&mut self, agent_config: AgentConfig) {
85        self.agents.insert(agent_config.id.clone(), agent_config);
86    }
87
88    /// Add a task to the crew
89    pub fn add_task(&mut self, task: Task) -> Result<(), CrewError> {
90        self.task_manager.add_task(task)
91    }
92
93    /// Execute all tasks
94    pub async fn kickoff(&mut self) -> Result<Vec<TaskResult>, CrewError> {
95        tracing::info!(
96            "Crew '{}' starting execution with {} tasks",
97            self.config.name,
98            self.task_manager.task_count()
99        );
100
101        match self.config.process {
102            Process::Sequential => self.execute_sequential().await,
103            Process::Parallel => self.execute_parallel().await,
104            Process::Hierarchical => self.execute_hierarchical().await,
105        }
106    }
107
108    /// Execute tasks sequentially
109    async fn execute_sequential(&mut self) -> Result<Vec<TaskResult>, CrewError> {
110        let mut results = Vec::new();
111        let tasks = self.task_manager.get_all_tasks();
112
113        for task in tasks {
114            tracing::info!("Executing task: {}", task.description);
115            let result = self.execute_single_task(task).await?;
116            results.push(result);
117        }
118
119        Ok(results)
120    }
121
122    /// Execute tasks in parallel (respecting dependencies)
123    async fn execute_parallel(&mut self) -> Result<Vec<TaskResult>, CrewError> {
124        // For parallel execution, we use sequential for now due to borrow checker constraints
125        // TODO: Implement proper parallel execution with Arc<Self>
126        self.execute_sequential().await
127    }
128
129    /// Execute tasks with hierarchical coordination
130    async fn execute_hierarchical(&mut self) -> Result<Vec<TaskResult>, CrewError> {
131        // For hierarchical, we need a manager agent
132        // For now, fall back to parallel execution
133        tracing::warn!("Hierarchical process not fully implemented, using parallel");
134        self.execute_parallel().await
135    }
136
137    /// Execute a single task
138    async fn execute_single_task(&self, task: Task) -> Result<TaskResult, CrewError> {
139        // Find agent for task
140        let agent_id = task
141            .agent_id
142            .as_ref()
143            .or_else(|| self.find_best_agent_for_task(&task))
144            .ok_or_else(|| CrewError::NoAgentAvailable(task.description.clone()))?;
145
146        // Get agent runtime to access its state and memory
147        let agent_runtime = self
148            .engine
149            .get_agent(agent_id)
150            .ok_or_else(|| CrewError::NoAgentAvailable(agent_id.to_string()))?;
151
152        // Create message for agent
153        let message = Message::new(
154            AgentId::new("crew"),
155            agent_id.clone(),
156            Content::Text(task.description.clone()),
157        );
158
159        // Send to agent
160        self.engine
161            .send_message(message)
162            .map_err(CrewError::AgentError)?;
163
164        // Wait for agent to process with timeout
165        // Use timeout from task context or default to 60 seconds
166        let timeout_secs = task
167            .context
168            .get("timeout_secs")
169            .and_then(|v| v.as_u64())
170            .unwrap_or(60);
171        let timeout_duration = std::time::Duration::from_secs(timeout_secs);
172        let poll_interval = std::time::Duration::from_millis(100);
173        let start = std::time::Instant::now();
174
175        // Phase 1: Wait for agent to START processing (leave Idle state)
176        loop {
177            if start.elapsed() > timeout_duration {
178                return Ok(TaskResult::failure(
179                    task.id.clone(),
180                    format!(
181                        "Task timed out waiting to start after {:?}",
182                        timeout_duration
183                    ),
184                ));
185            }
186
187            let state = agent_runtime.state.read().await;
188            let is_idle = matches!(state.status, AgentStatus::Idle);
189            drop(state);
190
191            if !is_idle {
192                tracing::debug!("Agent {} started processing task", agent_id);
193                break;
194            }
195
196            tokio::time::sleep(poll_interval).await;
197        }
198
199        // Phase 2: Wait for agent to FINISH processing (return to Idle state)
200        loop {
201            if start.elapsed() > timeout_duration {
202                return Ok(TaskResult::failure(
203                    task.id.clone(),
204                    format!("Task timed out after {:?}", timeout_duration),
205                ));
206            }
207
208            let state = agent_runtime.state.read().await;
209            let is_idle = matches!(state.status, AgentStatus::Idle);
210            drop(state);
211
212            if is_idle {
213                tracing::debug!("Agent {} finished processing task", agent_id);
214
215                // Agent finished processing, check memory for response
216                if let Ok(history) = agent_runtime.memory.get_history().await {
217                    // Find the last response from the agent after our message
218                    for msg in history.iter().rev() {
219                        if msg.from == *agent_id {
220                            if let Content::Text(response_text) = &msg.content {
221                                return Ok(TaskResult::success(
222                                    task.id.clone(),
223                                    serde_json::json!({
224                                        "status": "completed",
225                                        "response": response_text,
226                                        "agent_id": agent_id.to_string(),
227                                        "elapsed_ms": start.elapsed().as_millis()
228                                    }),
229                                ));
230                            }
231                        }
232                    }
233                }
234
235                // Agent is idle but no response found - might have errored
236                return Ok(TaskResult::failure(
237                    task.id.clone(),
238                    "Agent completed but no response found".to_string(),
239                ));
240            }
241
242            // Agent still processing, wait before checking again
243            tokio::time::sleep(poll_interval).await;
244        }
245    }
246
247    /// Find best agent for a task based on capabilities
248    fn find_best_agent_for_task(&self, _task: &Task) -> Option<&AgentId> {
249        // Simple strategy: return first available agent
250        // TODO: Implement capability matching
251        self.agents.keys().next()
252    }
253
254    /// Get crew statistics
255    pub fn stats(&self) -> CrewStats {
256        CrewStats {
257            name: self.config.name.clone(),
258            agent_count: self.agents.len(),
259            task_count: self.task_manager.task_count(),
260            completed_tasks: 0, // TODO: Track this
261        }
262    }
263}
264
265/// Crew statistics
266#[derive(Debug, Clone, Serialize, Deserialize)]
267pub struct CrewStats {
268    pub name: String,
269    pub agent_count: usize,
270    pub task_count: usize,
271    pub completed_tasks: usize,
272}