1use serde::{Deserialize, Serialize};
4use std::collections::HashMap;
5use std::sync::Arc;
6
7use cortexai_agents::AgentEngine;
8use cortexai_core::*;
9
10use crate::process::Process;
11use crate::task_manager::TaskManager;
12
13#[derive(Debug, Clone, Serialize, Deserialize)]
15pub struct CrewConfig {
16 pub name: String,
18
19 pub description: String,
21
22 pub process: Process,
24
25 pub max_concurrency: usize,
27
28 pub verbose: bool,
30}
31
32impl CrewConfig {
33 pub fn new(name: impl Into<String>) -> Self {
34 Self {
35 name: name.into(),
36 description: String::new(),
37 process: Process::Sequential,
38 max_concurrency: 4,
39 verbose: false,
40 }
41 }
42
43 pub fn with_description(mut self, description: impl Into<String>) -> Self {
44 self.description = description.into();
45 self
46 }
47
48 pub fn with_process(mut self, process: Process) -> Self {
49 self.process = process;
50 self
51 }
52
53 pub fn with_max_concurrency(mut self, max: usize) -> Self {
54 self.max_concurrency = max;
55 self
56 }
57
58 pub fn with_verbose(mut self, verbose: bool) -> Self {
59 self.verbose = verbose;
60 self
61 }
62}
63
64pub struct Crew {
66 config: CrewConfig,
67 engine: Arc<AgentEngine>,
68 task_manager: TaskManager,
69 agents: HashMap<AgentId, AgentConfig>,
70}
71
72impl Crew {
73 pub fn new(config: CrewConfig, engine: Arc<AgentEngine>) -> Self {
75 Self {
76 config: config.clone(),
77 engine,
78 task_manager: TaskManager::new(config.max_concurrency),
79 agents: HashMap::new(),
80 }
81 }
82
83 pub fn add_agent(&mut self, agent_config: AgentConfig) {
85 self.agents.insert(agent_config.id.clone(), agent_config);
86 }
87
88 pub fn add_task(&mut self, task: Task) -> Result<(), CrewError> {
90 self.task_manager.add_task(task)
91 }
92
93 pub async fn kickoff(&mut self) -> Result<Vec<TaskResult>, CrewError> {
95 tracing::info!(
96 "Crew '{}' starting execution with {} tasks",
97 self.config.name,
98 self.task_manager.task_count()
99 );
100
101 match self.config.process {
102 Process::Sequential => self.execute_sequential().await,
103 Process::Parallel => self.execute_parallel().await,
104 Process::Hierarchical => self.execute_hierarchical().await,
105 }
106 }
107
108 async fn execute_sequential(&mut self) -> Result<Vec<TaskResult>, CrewError> {
110 let mut results = Vec::new();
111 let tasks = self.task_manager.get_all_tasks();
112
113 for task in tasks {
114 tracing::info!("Executing task: {}", task.description);
115 let result = self.execute_single_task(task).await?;
116 results.push(result);
117 }
118
119 Ok(results)
120 }
121
122 async fn execute_parallel(&mut self) -> Result<Vec<TaskResult>, CrewError> {
124 self.execute_sequential().await
127 }
128
129 async fn execute_hierarchical(&mut self) -> Result<Vec<TaskResult>, CrewError> {
131 tracing::warn!("Hierarchical process not fully implemented, using parallel");
134 self.execute_parallel().await
135 }
136
137 async fn execute_single_task(&self, task: Task) -> Result<TaskResult, CrewError> {
139 let agent_id = task
141 .agent_id
142 .as_ref()
143 .or_else(|| self.find_best_agent_for_task(&task))
144 .ok_or_else(|| CrewError::NoAgentAvailable(task.description.clone()))?;
145
146 let agent_runtime = self
148 .engine
149 .get_agent(agent_id)
150 .ok_or_else(|| CrewError::NoAgentAvailable(agent_id.to_string()))?;
151
152 let message = Message::new(
154 AgentId::new("crew"),
155 agent_id.clone(),
156 Content::Text(task.description.clone()),
157 );
158
159 self.engine
161 .send_message(message)
162 .map_err(CrewError::AgentError)?;
163
164 let timeout_secs = task
167 .context
168 .get("timeout_secs")
169 .and_then(|v| v.as_u64())
170 .unwrap_or(60);
171 let timeout_duration = std::time::Duration::from_secs(timeout_secs);
172 let poll_interval = std::time::Duration::from_millis(100);
173 let start = std::time::Instant::now();
174
175 loop {
177 if start.elapsed() > timeout_duration {
178 return Ok(TaskResult::failure(
179 task.id.clone(),
180 format!(
181 "Task timed out waiting to start after {:?}",
182 timeout_duration
183 ),
184 ));
185 }
186
187 let state = agent_runtime.state.read().await;
188 let is_idle = matches!(state.status, AgentStatus::Idle);
189 drop(state);
190
191 if !is_idle {
192 tracing::debug!("Agent {} started processing task", agent_id);
193 break;
194 }
195
196 tokio::time::sleep(poll_interval).await;
197 }
198
199 loop {
201 if start.elapsed() > timeout_duration {
202 return Ok(TaskResult::failure(
203 task.id.clone(),
204 format!("Task timed out after {:?}", timeout_duration),
205 ));
206 }
207
208 let state = agent_runtime.state.read().await;
209 let is_idle = matches!(state.status, AgentStatus::Idle);
210 drop(state);
211
212 if is_idle {
213 tracing::debug!("Agent {} finished processing task", agent_id);
214
215 if let Ok(history) = agent_runtime.memory.get_history().await {
217 for msg in history.iter().rev() {
219 if msg.from == *agent_id {
220 if let Content::Text(response_text) = &msg.content {
221 return Ok(TaskResult::success(
222 task.id.clone(),
223 serde_json::json!({
224 "status": "completed",
225 "response": response_text,
226 "agent_id": agent_id.to_string(),
227 "elapsed_ms": start.elapsed().as_millis()
228 }),
229 ));
230 }
231 }
232 }
233 }
234
235 return Ok(TaskResult::failure(
237 task.id.clone(),
238 "Agent completed but no response found".to_string(),
239 ));
240 }
241
242 tokio::time::sleep(poll_interval).await;
244 }
245 }
246
247 fn find_best_agent_for_task(&self, _task: &Task) -> Option<&AgentId> {
249 self.agents.keys().next()
252 }
253
254 pub fn stats(&self) -> CrewStats {
256 CrewStats {
257 name: self.config.name.clone(),
258 agent_count: self.agents.len(),
259 task_count: self.task_manager.task_count(),
260 completed_tasks: 0, }
262 }
263}
264
265#[derive(Debug, Clone, Serialize, Deserialize)]
267pub struct CrewStats {
268 pub name: String,
269 pub agent_count: usize,
270 pub task_count: usize,
271 pub completed_tasks: usize,
272}