1use super::SubagentContext;
7use super::SubagentError;
8use super::SubagentStatus;
9use super::agents::AgentResult;
10use super::context::AgentMessage;
11use super::context::MessagePriority;
12use super::context::MessageTarget;
13use super::context::MessageType;
14use super::orchestrator::AgentOrchestrator;
15use super::orchestrator::OrchestratorConfig;
16use super::parser::AgentParser;
17use super::parser::ParsedInvocation;
18use super::registry::SubagentRegistry;
19use crate::code_tools::ast_agent_tools::ASTAgentTools;
20use crate::modes::OperatingMode;
21use chrono::Utc;
22use dashmap::DashMap;
23use std::collections::HashMap;
24use std::sync::Arc;
25use std::sync::atomic::AtomicBool;
26use std::sync::atomic::Ordering;
27use std::time::Duration;
28use std::time::SystemTime;
29use tokio::sync::RwLock;
30use tokio::sync::broadcast;
31use tokio::task::JoinHandle;
32use tracing::debug;
33use tracing::error;
34use tracing::info;
35use uuid::Uuid;
36
37#[derive(Debug, Clone)]
39pub struct AgentStats {
40 pub total_spawned: u64,
41 pub total_completed: u64,
42 pub total_failed: u64,
43 pub total_cancelled: u64,
44 pub avg_execution_time: Duration,
45 pub last_execution: Option<SystemTime>,
46}
47
48pub struct AgentManager {
50 registry: Arc<SubagentRegistry>,
52 _parser: AgentParser,
54 orchestrator: Arc<AgentOrchestrator>,
56 active_agents: Arc<DashMap<Uuid, AgentHandle>>,
58 message_bus: Arc<MessageBus>,
60 global_cancel: Arc<AtomicBool>,
62 stats: Arc<RwLock<HashMap<String, AgentStats>>>,
64 ast_tools: Arc<RwLock<ASTAgentTools>>,
66}
67
68pub struct AgentHandle {
70 pub id: Uuid,
71 pub name: String,
72 pub status: Arc<RwLock<SubagentStatus>>,
73 pub cancel_flag: Arc<AtomicBool>,
74 pub task_handle: JoinHandle<Result<AgentResult, SubagentError>>,
75 pub started_at: SystemTime,
76 pub context: SubagentContext,
77}
78
79pub struct MessageBus {
81 sender: broadcast::Sender<AgentMessage>,
83 _receiver: broadcast::Receiver<AgentMessage>,
85 history: Arc<RwLock<Vec<AgentMessage>>>,
87 subscriptions: Arc<DashMap<String, Vec<Uuid>>>,
89}
90
91impl MessageBus {
92 pub fn new(capacity: usize) -> Self {
94 let (sender, receiver) = broadcast::channel(capacity);
95 Self {
96 sender,
97 _receiver: receiver,
98 history: Arc::new(RwLock::new(Vec::new())),
99 subscriptions: Arc::new(DashMap::new()),
100 }
101 }
102
103 pub async fn broadcast(&self, message: AgentMessage) -> Result<(), SubagentError> {
105 self.history.write().await.push(message.clone());
107
108 self.sender.send(message).map_err(|e| {
110 SubagentError::ExecutionFailed(format!("Failed to broadcast message: {}", e))
111 })?;
112
113 Ok(())
114 }
115
116 pub fn subscribe(&self) -> broadcast::Receiver<AgentMessage> {
118 self.sender.subscribe()
119 }
120
121 pub async fn subscribe_to_topic(&self, topic: String, agent_id: Uuid) {
123 self.subscriptions.entry(topic).or_default().push(agent_id);
124 }
125
126 pub async fn get_history(&self, limit: Option<usize>) -> Vec<AgentMessage> {
128 let history = self.history.read().await;
129 match limit {
130 Some(n) => {
131 let start = history.len().saturating_sub(n);
133 history[start..].to_vec()
134 }
135 None => history.clone(),
136 }
137 }
138}
139
140impl AgentManager {
141 pub fn new(registry: Arc<SubagentRegistry>, orchestrator_config: OrchestratorConfig) -> Self {
143 let parser = AgentParser::with_registry(registry.clone());
144 let orchestrator = Arc::new(AgentOrchestrator::new(
145 registry.clone(),
146 orchestrator_config,
147 crate::modes::OperatingMode::Build, ));
149 let ast_tools = Arc::new(RwLock::new(ASTAgentTools::new()));
150
151 Self {
152 registry,
153 _parser: parser,
154 orchestrator,
155 active_agents: Arc::new(DashMap::new()),
156 message_bus: Arc::new(MessageBus::new(1000)),
157 global_cancel: Arc::new(AtomicBool::new(false)),
158 stats: Arc::new(RwLock::new(HashMap::new())),
159 ast_tools,
160 }
161 }
162
163 pub async fn spawn_from_invocation(
165 &self,
166 invocation: ParsedInvocation,
167 ) -> Result<Vec<Uuid>, SubagentError> {
168 let mut agent_ids = Vec::new();
169
170 match invocation.execution_plan {
172 super::invocation::ExecutionPlan::Single(agent_inv) => {
173 let id = self
174 .spawn_agent(
175 agent_inv.agent_name,
176 self.create_context(invocation.context, invocation.mode_override),
177 )
178 .await?;
179 agent_ids.push(id);
180 }
181 super::invocation::ExecutionPlan::Sequential(chain) => {
182 for agent_inv in chain.agents {
183 let id = self
184 .spawn_agent(
185 agent_inv.agent_name,
186 self.create_context(
187 invocation.context.clone(),
188 invocation.mode_override,
189 ),
190 )
191 .await?;
192 agent_ids.push(id);
193
194 self.wait_for_agent(id).await?;
196 }
197 }
198 super::invocation::ExecutionPlan::Parallel(agents) => {
199 let mut tasks = Vec::new();
200 for agent_inv in agents {
201 let manager = self.clone_for_async();
202 let context = invocation.context.clone();
203 let mode = invocation.mode_override;
204
205 tasks.push(tokio::spawn(async move {
206 manager
207 .spawn_agent(
208 agent_inv.agent_name,
209 manager.create_context(context, mode),
210 )
211 .await
212 }));
213 }
214
215 for task in tasks {
216 let id = task.await.map_err(|e| {
217 SubagentError::ExecutionFailed(format!("Task join error: {}", e))
218 })??;
219 agent_ids.push(id);
220 }
221 }
222 _ => {
223 self.orchestrator
225 .execute_plan(super::invocation::InvocationRequest {
226 id: invocation.id,
227 original_input: invocation.original_input,
228 execution_plan: invocation.execution_plan,
229 context: invocation.context,
230 })
231 .await?;
232 }
233 }
234
235 Ok(agent_ids)
236 }
237
238 pub async fn spawn_agent(
240 &self,
241 agent_name: String,
242 context: SubagentContext,
243 ) -> Result<Uuid, SubagentError> {
244 let agent = self
246 .registry
247 .get_executable_agent(&agent_name)
248 .ok_or_else(|| SubagentError::AgentNotFound {
249 name: agent_name.clone(),
250 })?;
251
252 let agent_id = Uuid::new_v4();
253 let cancel_flag = Arc::new(AtomicBool::new(false));
254 let status = Arc::new(RwLock::new(SubagentStatus::Pending));
255
256 self.update_stats(&agent_name, |stats| {
258 stats.total_spawned += 1;
259 stats.last_execution = Some(SystemTime::now());
260 })
261 .await;
262
263 let agent_clone = agent.clone();
265 let context_clone = context.clone();
266 let cancel_clone = cancel_flag.clone();
267 let status_clone = status.clone();
268 let message_bus = self.message_bus.clone();
269 let agent_name_clone = agent_name.clone();
270 let ast_tools = self.ast_tools.clone();
271
272 let task_handle = tokio::spawn(async move {
273 *status_clone.write().await = SubagentStatus::Running;
275
276 let _ = message_bus
278 .broadcast(AgentMessage {
279 id: Uuid::new_v4(),
280 from: agent_id.to_string(),
281 to: MessageTarget::Broadcast,
282 message_type: MessageType::Info,
283 priority: MessagePriority::Normal,
284 payload: serde_json::json!({
285 "status": "started",
286 "agent": agent_name_clone
287 }),
288 timestamp: Utc::now(),
289 })
290 .await;
291
292 let mut tools = ast_tools.write().await;
294 let result = agent_clone
295 .execute(&context_clone, &mut tools, cancel_clone)
296 .await;
297
298 match &result {
300 Ok(agent_result) => {
301 *status_clone.write().await = SubagentStatus::Completed;
302
303 let _ = message_bus
305 .broadcast(AgentMessage {
306 id: Uuid::new_v4(),
307 from: agent_id.to_string(),
308 to: MessageTarget::Broadcast,
309 message_type: MessageType::Result,
310 priority: MessagePriority::High,
311 payload: serde_json::json!({
312 "summary": agent_result.summary.clone(),
313 "status": "completed",
314 "findings": agent_result.findings.len()
315 }),
316 timestamp: Utc::now(),
317 })
318 .await;
319 }
320 Err(e) => {
321 *status_clone.write().await = SubagentStatus::Failed(e.to_string());
322
323 let _ = message_bus
325 .broadcast(AgentMessage {
326 id: Uuid::new_v4(),
327 from: agent_id.to_string(),
328 to: MessageTarget::Broadcast,
329 message_type: MessageType::Error,
330 priority: MessagePriority::Critical,
331 payload: serde_json::json!({
332 "error": format!("Agent {} failed: {}", agent_name_clone, e)
333 }),
334 timestamp: Utc::now(),
335 })
336 .await;
337 }
338 }
339
340 result
341 });
342
343 self.active_agents.insert(
345 agent_id,
346 AgentHandle {
347 id: agent_id,
348 name: agent_name.clone(),
349 status,
350 cancel_flag,
351 task_handle,
352 started_at: SystemTime::now(),
353 context,
354 },
355 );
356
357 info!("Spawned agent {} with ID {}", agent_name, agent_id);
358
359 Ok(agent_id)
360 }
361
362 pub async fn wait_for_agent(&self, agent_id: Uuid) -> Result<AgentResult, SubagentError> {
364 let handle =
365 self.active_agents
366 .remove(&agent_id)
367 .ok_or_else(|| SubagentError::AgentNotFound {
368 name: format!("Agent with ID {}", agent_id),
369 })?;
370
371 let (_, agent_handle) = handle;
372 let agent_name = agent_handle.name.clone();
373 let started_at = agent_handle.started_at;
374
375 let result = agent_handle
377 .task_handle
378 .await
379 .map_err(|e| SubagentError::ExecutionFailed(format!("Task join error: {}", e)))?;
380
381 let execution_time = SystemTime::now()
383 .duration_since(started_at)
384 .unwrap_or_else(|_| Duration::from_secs(0));
385
386 self.update_stats(&agent_name, |stats| {
387 match &result {
388 Ok(_) => stats.total_completed += 1,
389 Err(_) => stats.total_failed += 1,
390 }
391
392 let total = stats.total_completed + stats.total_failed;
394 if total > 0 {
395 let current_avg = stats.avg_execution_time.as_secs_f64();
396 let new_avg = (current_avg * (total - 1) as f64 + execution_time.as_secs_f64())
397 / total as f64;
398 stats.avg_execution_time = Duration::from_secs_f64(new_avg);
399 }
400 })
401 .await;
402
403 result
404 }
405
406 pub async fn cancel_agent(&self, agent_id: Uuid) -> Result<(), SubagentError> {
408 if let Some(handle) = self.active_agents.get(&agent_id) {
409 handle.cancel_flag.store(true, Ordering::Release);
410 *handle.status.write().await = SubagentStatus::Cancelled;
411
412 self.update_stats(&handle.name, |stats| {
414 stats.total_cancelled += 1;
415 })
416 .await;
417
418 info!("Cancelled agent {} (ID: {})", handle.name, agent_id);
419 Ok(())
420 } else {
421 Err(SubagentError::AgentNotFound {
422 name: format!("Agent with ID {}", agent_id),
423 })
424 }
425 }
426
427 pub async fn cancel_all(&self) {
429 self.global_cancel.store(true, Ordering::Release);
430
431 for entry in self.active_agents.iter() {
432 let handle = entry.value();
433 handle.cancel_flag.store(true, Ordering::Release);
434 *handle.status.write().await = SubagentStatus::Cancelled;
435 }
436
437 info!("Cancelled all active agents");
438 }
439
440 pub async fn get_agent_status(&self, agent_id: Uuid) -> Option<SubagentStatus> {
442 self.active_agents
443 .get(&agent_id)
444 .map(|handle| handle.status.clone())
445 .and_then(|status| {
446 tokio::task::block_in_place(|| {
447 tokio::runtime::Handle::current()
448 .block_on(async { Some(status.read().await.clone()) })
449 })
450 })
451 }
452
453 pub fn get_active_agents(&self) -> Vec<(Uuid, String, SubagentStatus)> {
455 self.active_agents
456 .iter()
457 .map(|entry| {
458 let handle = entry.value();
459 let status = tokio::task::block_in_place(|| {
460 tokio::runtime::Handle::current()
461 .block_on(async { handle.status.read().await.clone() })
462 });
463 (*entry.key(), handle.name.clone(), status)
464 })
465 .collect()
466 }
467
468 pub async fn get_stats(&self, agent_name: Option<&str>) -> HashMap<String, AgentStats> {
470 let stats = self.stats.read().await;
471
472 match agent_name {
473 Some(name) => stats
474 .get(name)
475 .map(|s| HashMap::from([(name.to_string(), s.clone())]))
476 .unwrap_or_default(),
477 None => stats.clone(),
478 }
479 }
480
481 pub async fn handle_communication(&self) -> Result<(), SubagentError> {
483 let mut receiver = self.message_bus.subscribe();
484
485 loop {
486 tokio::select! {
487 Ok(message) = receiver.recv() => {
488 self.process_message(message).await?;
489 }
490 _ = tokio::time::sleep(Duration::from_secs(1)) => {
491 if self.global_cancel.load(Ordering::Acquire) {
492 break;
493 }
494 }
495 }
496 }
497
498 Ok(())
499 }
500
501 async fn process_message(&self, message: AgentMessage) -> Result<(), SubagentError> {
503 debug!("Processing message: {:?}", message.message_type);
504
505 match message.message_type {
506 MessageType::Request => {
507 if let MessageTarget::Agent(target_id) = &message.to
509 && let Ok(target_uuid) = Uuid::parse_str(target_id)
510 && let Some(handle) = self.active_agents.get(&target_uuid)
511 {
512 debug!("Forwarding request to agent {}", handle.name);
514 }
515 }
516 MessageType::Response | MessageType::Result => {
517 info!("Agent result received: {:?}", message.payload);
519 }
520 MessageType::Error => {
521 error!("Agent error: {:?}", message.payload);
523 }
524 MessageType::Info => {
525 debug!("Agent info: {:?}", message.payload);
527 }
528 MessageType::Coordination => {
529 debug!("Agent coordination: {:?}", message.payload);
531 }
532 _ => {}
533 }
534
535 Ok(())
536 }
537
538 pub async fn inherit_context(
540 &self,
541 from_agent: Uuid,
542 to_context: &mut SubagentContext,
543 ) -> Result<(), SubagentError> {
544 let messages = self.message_bus.get_history(Some(10)).await;
546
547 let relevant_messages: Vec<_> = messages
548 .into_iter()
549 .filter(|m| m.from == from_agent.to_string())
550 .collect();
551
552 for message in relevant_messages {
554 if message.message_type == MessageType::Result {
555 to_context.conversation_context.push('\n');
556 if let Some(summary) = message.payload.get("summary")
557 && let Some(s) = summary.as_str()
558 {
559 to_context.conversation_context.push_str(s);
560 }
561 }
562 }
563
564 Ok(())
565 }
566
567 fn create_context(&self, context_str: String, mode: Option<OperatingMode>) -> SubagentContext {
569 SubagentContext {
570 execution_id: Uuid::new_v4(),
571 mode: mode.unwrap_or(OperatingMode::Build),
572 available_tools: vec![
573 "search".to_string(),
574 "edit".to_string(),
575 "think".to_string(),
576 "plan".to_string(),
577 ],
578 conversation_context: context_str,
579 working_directory: std::env::current_dir().unwrap_or_default(),
580 parameters: HashMap::new(),
581 metadata: HashMap::new(),
582 }
583 }
584
585 async fn update_stats<F>(&self, agent_name: &str, updater: F)
587 where
588 F: FnOnce(&mut AgentStats),
589 {
590 let mut stats = self.stats.write().await;
591 let agent_stats = stats
592 .entry(agent_name.to_string())
593 .or_insert_with(|| AgentStats {
594 total_spawned: 0,
595 total_completed: 0,
596 total_failed: 0,
597 total_cancelled: 0,
598 avg_execution_time: Duration::from_secs(0),
599 last_execution: None,
600 });
601 updater(agent_stats);
602 }
603
604 fn clone_for_async(&self) -> Self {
606 Self {
607 registry: self.registry.clone(),
608 _parser: AgentParser::with_registry(self.registry.clone()),
609 orchestrator: self.orchestrator.clone(),
610 active_agents: self.active_agents.clone(),
611 message_bus: self.message_bus.clone(),
612 global_cancel: self.global_cancel.clone(),
613 stats: self.stats.clone(),
614 ast_tools: self.ast_tools.clone(),
615 }
616 }
617}
618
619#[derive(Debug, Clone)]
621pub struct AgentTracker {
622 pub agent_id: Uuid,
623 pub agent_name: String,
624 pub status: SubagentStatus,
625 pub result: Option<AgentResult>,
626 pub started_at: SystemTime,
627 pub completed_at: Option<SystemTime>,
628}
629
630#[cfg(test)]
631mod tests {
632 use super::*;
633
634 #[tokio::test]
635 async fn test_agent_manager_creation() {
636 let registry = Arc::new(SubagentRegistry::new().unwrap());
637 let config = OrchestratorConfig::default();
638 let manager = AgentManager::new(registry.clone(), config);
639
640 assert_eq!(manager.get_active_agents().len(), 0);
641 }
642
643 #[tokio::test]
644 async fn test_message_bus() {
645 let bus = MessageBus::new(100);
646
647 let message = AgentMessage {
648 id: Uuid::new_v4(),
649 from: Uuid::new_v4().to_string(),
650 to: MessageTarget::Broadcast,
651 message_type: MessageType::Info,
652 priority: MessagePriority::Normal,
653 payload: serde_json::json!({
654 "content": "Test message"
655 }),
656 timestamp: Utc::now(),
657 };
658
659 bus.broadcast(message.clone()).await.unwrap();
660
661 let history = bus.get_history(None).await;
662 assert_eq!(history.len(), 1);
663 assert_eq!(history[0].payload["content"], "Test message");
664 }
665
666 #[test]
667 fn test_context_creation() {
668 let registry = Arc::new(SubagentRegistry::new().unwrap());
669 let config = OrchestratorConfig::default();
670 let manager = AgentManager::new(registry.clone(), config);
671
672 let context = manager.create_context("test context".to_string(), Some(OperatingMode::Plan));
673
674 assert_eq!(context.mode, OperatingMode::Plan);
675 assert_eq!(context.conversation_context, "test context");
676 assert!(!context.available_tools.is_empty());
677 }
678}