agcodex_core/subagents/
manager.rs

1//! Agent Manager for spawning and coordinating subagents
2//!
3//! This module manages agent lifecycles, communication, and coordination,
4//! working with the existing orchestrator infrastructure.
5
6use super::SubagentContext;
7use super::SubagentError;
8use super::SubagentStatus;
9use super::agents::AgentResult;
10use super::context::AgentMessage;
11use super::context::MessagePriority;
12use super::context::MessageTarget;
13use super::context::MessageType;
14use super::orchestrator::AgentOrchestrator;
15use super::orchestrator::OrchestratorConfig;
16use super::parser::AgentParser;
17use super::parser::ParsedInvocation;
18use super::registry::SubagentRegistry;
19use crate::code_tools::ast_agent_tools::ASTAgentTools;
20use crate::modes::OperatingMode;
21use chrono::Utc;
22use dashmap::DashMap;
23use std::collections::HashMap;
24use std::sync::Arc;
25use std::sync::atomic::AtomicBool;
26use std::sync::atomic::Ordering;
27use std::time::Duration;
28use std::time::SystemTime;
29use tokio::sync::RwLock;
30use tokio::sync::broadcast;
31use tokio::task::JoinHandle;
32use tracing::debug;
33use tracing::error;
34use tracing::info;
35use uuid::Uuid;
36
37/// Agent execution statistics
38#[derive(Debug, Clone)]
39pub struct AgentStats {
40    pub total_spawned: u64,
41    pub total_completed: u64,
42    pub total_failed: u64,
43    pub total_cancelled: u64,
44    pub avg_execution_time: Duration,
45    pub last_execution: Option<SystemTime>,
46}
47
48/// Agent Manager for spawning and managing agent lifecycles
49pub struct AgentManager {
50    /// Agent registry
51    registry: Arc<SubagentRegistry>,
52    /// Agent parser
53    _parser: AgentParser,
54    /// Orchestrator for execution
55    orchestrator: Arc<AgentOrchestrator>,
56    /// Active agent executions
57    active_agents: Arc<DashMap<Uuid, AgentHandle>>,
58    /// Message bus for inter-agent communication
59    message_bus: Arc<MessageBus>,
60    /// Global cancellation flag
61    global_cancel: Arc<AtomicBool>,
62    /// Execution statistics
63    stats: Arc<RwLock<HashMap<String, AgentStats>>>,
64    /// AST tools for agents
65    ast_tools: Arc<RwLock<ASTAgentTools>>,
66}
67
68/// Handle to a running agent
69pub struct AgentHandle {
70    pub id: Uuid,
71    pub name: String,
72    pub status: Arc<RwLock<SubagentStatus>>,
73    pub cancel_flag: Arc<AtomicBool>,
74    pub task_handle: JoinHandle<Result<AgentResult, SubagentError>>,
75    pub started_at: SystemTime,
76    pub context: SubagentContext,
77}
78
79/// Message bus for inter-agent communication
80pub struct MessageBus {
81    /// Broadcast sender for messages
82    sender: broadcast::Sender<AgentMessage>,
83    /// Keep one receiver alive to prevent channel closure
84    _receiver: broadcast::Receiver<AgentMessage>,
85    /// Message history
86    history: Arc<RwLock<Vec<AgentMessage>>>,
87    /// Topic subscriptions
88    subscriptions: Arc<DashMap<String, Vec<Uuid>>>,
89}
90
91impl MessageBus {
92    /// Create a new message bus
93    pub fn new(capacity: usize) -> Self {
94        let (sender, receiver) = broadcast::channel(capacity);
95        Self {
96            sender,
97            _receiver: receiver,
98            history: Arc::new(RwLock::new(Vec::new())),
99            subscriptions: Arc::new(DashMap::new()),
100        }
101    }
102
103    /// Send a message to all agents
104    pub async fn broadcast(&self, message: AgentMessage) -> Result<(), SubagentError> {
105        // Store in history
106        self.history.write().await.push(message.clone());
107
108        // Broadcast to all listeners
109        self.sender.send(message).map_err(|e| {
110            SubagentError::ExecutionFailed(format!("Failed to broadcast message: {}", e))
111        })?;
112
113        Ok(())
114    }
115
116    /// Subscribe to messages
117    pub fn subscribe(&self) -> broadcast::Receiver<AgentMessage> {
118        self.sender.subscribe()
119    }
120
121    /// Subscribe to a specific topic
122    pub async fn subscribe_to_topic(&self, topic: String, agent_id: Uuid) {
123        self.subscriptions.entry(topic).or_default().push(agent_id);
124    }
125
126    /// Get message history
127    pub async fn get_history(&self, limit: Option<usize>) -> Vec<AgentMessage> {
128        let history = self.history.read().await;
129        match limit {
130            Some(n) => {
131                // Return the last n messages in their original order
132                let start = history.len().saturating_sub(n);
133                history[start..].to_vec()
134            }
135            None => history.clone(),
136        }
137    }
138}
139
140impl AgentManager {
141    /// Create a new agent manager
142    pub fn new(registry: Arc<SubagentRegistry>, orchestrator_config: OrchestratorConfig) -> Self {
143        let parser = AgentParser::with_registry(registry.clone());
144        let orchestrator = Arc::new(AgentOrchestrator::new(
145            registry.clone(),
146            orchestrator_config,
147            crate::modes::OperatingMode::Build, // Default to Build mode
148        ));
149        let ast_tools = Arc::new(RwLock::new(ASTAgentTools::new()));
150
151        Self {
152            registry,
153            _parser: parser,
154            orchestrator,
155            active_agents: Arc::new(DashMap::new()),
156            message_bus: Arc::new(MessageBus::new(1000)),
157            global_cancel: Arc::new(AtomicBool::new(false)),
158            stats: Arc::new(RwLock::new(HashMap::new())),
159            ast_tools,
160        }
161    }
162
163    /// Spawn an agent from a parsed invocation
164    pub async fn spawn_from_invocation(
165        &self,
166        invocation: ParsedInvocation,
167    ) -> Result<Vec<Uuid>, SubagentError> {
168        let mut agent_ids = Vec::new();
169
170        // Execute based on the execution plan
171        match invocation.execution_plan {
172            super::invocation::ExecutionPlan::Single(agent_inv) => {
173                let id = self
174                    .spawn_agent(
175                        agent_inv.agent_name,
176                        self.create_context(invocation.context, invocation.mode_override),
177                    )
178                    .await?;
179                agent_ids.push(id);
180            }
181            super::invocation::ExecutionPlan::Sequential(chain) => {
182                for agent_inv in chain.agents {
183                    let id = self
184                        .spawn_agent(
185                            agent_inv.agent_name,
186                            self.create_context(
187                                invocation.context.clone(),
188                                invocation.mode_override,
189                            ),
190                        )
191                        .await?;
192                    agent_ids.push(id);
193
194                    // Wait for completion before spawning next
195                    self.wait_for_agent(id).await?;
196                }
197            }
198            super::invocation::ExecutionPlan::Parallel(agents) => {
199                let mut tasks = Vec::new();
200                for agent_inv in agents {
201                    let manager = self.clone_for_async();
202                    let context = invocation.context.clone();
203                    let mode = invocation.mode_override;
204
205                    tasks.push(tokio::spawn(async move {
206                        manager
207                            .spawn_agent(
208                                agent_inv.agent_name,
209                                manager.create_context(context, mode),
210                            )
211                            .await
212                    }));
213                }
214
215                for task in tasks {
216                    let id = task.await.map_err(|e| {
217                        SubagentError::ExecutionFailed(format!("Task join error: {}", e))
218                    })??;
219                    agent_ids.push(id);
220                }
221            }
222            _ => {
223                // Use orchestrator for complex plans
224                self.orchestrator
225                    .execute_plan(super::invocation::InvocationRequest {
226                        id: invocation.id,
227                        original_input: invocation.original_input,
228                        execution_plan: invocation.execution_plan,
229                        context: invocation.context,
230                    })
231                    .await?;
232            }
233        }
234
235        Ok(agent_ids)
236    }
237
238    /// Spawn a single agent
239    pub async fn spawn_agent(
240        &self,
241        agent_name: String,
242        context: SubagentContext,
243    ) -> Result<Uuid, SubagentError> {
244        // Get executable agent from registry
245        let agent = self
246            .registry
247            .get_executable_agent(&agent_name)
248            .ok_or_else(|| SubagentError::AgentNotFound {
249                name: agent_name.clone(),
250            })?;
251
252        let agent_id = Uuid::new_v4();
253        let cancel_flag = Arc::new(AtomicBool::new(false));
254        let status = Arc::new(RwLock::new(SubagentStatus::Pending));
255
256        // Update stats
257        self.update_stats(&agent_name, |stats| {
258            stats.total_spawned += 1;
259            stats.last_execution = Some(SystemTime::now());
260        })
261        .await;
262
263        // Create agent handle
264        let agent_clone = agent.clone();
265        let context_clone = context.clone();
266        let cancel_clone = cancel_flag.clone();
267        let status_clone = status.clone();
268        let message_bus = self.message_bus.clone();
269        let agent_name_clone = agent_name.clone();
270        let ast_tools = self.ast_tools.clone();
271
272        let task_handle = tokio::spawn(async move {
273            // Update status to running
274            *status_clone.write().await = SubagentStatus::Running;
275
276            // Notify via message bus
277            let _ = message_bus
278                .broadcast(AgentMessage {
279                    id: Uuid::new_v4(),
280                    from: agent_id.to_string(),
281                    to: MessageTarget::Broadcast,
282                    message_type: MessageType::Info,
283                    priority: MessagePriority::Normal,
284                    payload: serde_json::json!({
285                        "status": "started",
286                        "agent": agent_name_clone
287                    }),
288                    timestamp: Utc::now(),
289                })
290                .await;
291
292            // Execute agent
293            let mut tools = ast_tools.write().await;
294            let result = agent_clone
295                .execute(&context_clone, &mut tools, cancel_clone)
296                .await;
297
298            // Update status based on result
299            match &result {
300                Ok(agent_result) => {
301                    *status_clone.write().await = SubagentStatus::Completed;
302
303                    // Broadcast completion
304                    let _ = message_bus
305                        .broadcast(AgentMessage {
306                            id: Uuid::new_v4(),
307                            from: agent_id.to_string(),
308                            to: MessageTarget::Broadcast,
309                            message_type: MessageType::Result,
310                            priority: MessagePriority::High,
311                            payload: serde_json::json!({
312                                "summary": agent_result.summary.clone(),
313                                "status": "completed",
314                                "findings": agent_result.findings.len()
315                            }),
316                            timestamp: Utc::now(),
317                        })
318                        .await;
319                }
320                Err(e) => {
321                    *status_clone.write().await = SubagentStatus::Failed(e.to_string());
322
323                    // Broadcast failure
324                    let _ = message_bus
325                        .broadcast(AgentMessage {
326                            id: Uuid::new_v4(),
327                            from: agent_id.to_string(),
328                            to: MessageTarget::Broadcast,
329                            message_type: MessageType::Error,
330                            priority: MessagePriority::Critical,
331                            payload: serde_json::json!({
332                                "error": format!("Agent {} failed: {}", agent_name_clone, e)
333                            }),
334                            timestamp: Utc::now(),
335                        })
336                        .await;
337                }
338            }
339
340            result
341        });
342
343        // Store handle
344        self.active_agents.insert(
345            agent_id,
346            AgentHandle {
347                id: agent_id,
348                name: agent_name.clone(),
349                status,
350                cancel_flag,
351                task_handle,
352                started_at: SystemTime::now(),
353                context,
354            },
355        );
356
357        info!("Spawned agent {} with ID {}", agent_name, agent_id);
358
359        Ok(agent_id)
360    }
361
362    /// Wait for an agent to complete
363    pub async fn wait_for_agent(&self, agent_id: Uuid) -> Result<AgentResult, SubagentError> {
364        let handle =
365            self.active_agents
366                .remove(&agent_id)
367                .ok_or_else(|| SubagentError::AgentNotFound {
368                    name: format!("Agent with ID {}", agent_id),
369                })?;
370
371        let (_, agent_handle) = handle;
372        let agent_name = agent_handle.name.clone();
373        let started_at = agent_handle.started_at;
374
375        // Wait for task completion
376        let result = agent_handle
377            .task_handle
378            .await
379            .map_err(|e| SubagentError::ExecutionFailed(format!("Task join error: {}", e)))?;
380
381        // Update stats
382        let execution_time = SystemTime::now()
383            .duration_since(started_at)
384            .unwrap_or_else(|_| Duration::from_secs(0));
385
386        self.update_stats(&agent_name, |stats| {
387            match &result {
388                Ok(_) => stats.total_completed += 1,
389                Err(_) => stats.total_failed += 1,
390            }
391
392            // Update average execution time
393            let total = stats.total_completed + stats.total_failed;
394            if total > 0 {
395                let current_avg = stats.avg_execution_time.as_secs_f64();
396                let new_avg = (current_avg * (total - 1) as f64 + execution_time.as_secs_f64())
397                    / total as f64;
398                stats.avg_execution_time = Duration::from_secs_f64(new_avg);
399            }
400        })
401        .await;
402
403        result
404    }
405
406    /// Cancel an agent
407    pub async fn cancel_agent(&self, agent_id: Uuid) -> Result<(), SubagentError> {
408        if let Some(handle) = self.active_agents.get(&agent_id) {
409            handle.cancel_flag.store(true, Ordering::Release);
410            *handle.status.write().await = SubagentStatus::Cancelled;
411
412            // Update stats
413            self.update_stats(&handle.name, |stats| {
414                stats.total_cancelled += 1;
415            })
416            .await;
417
418            info!("Cancelled agent {} (ID: {})", handle.name, agent_id);
419            Ok(())
420        } else {
421            Err(SubagentError::AgentNotFound {
422                name: format!("Agent with ID {}", agent_id),
423            })
424        }
425    }
426
427    /// Cancel all active agents
428    pub async fn cancel_all(&self) {
429        self.global_cancel.store(true, Ordering::Release);
430
431        for entry in self.active_agents.iter() {
432            let handle = entry.value();
433            handle.cancel_flag.store(true, Ordering::Release);
434            *handle.status.write().await = SubagentStatus::Cancelled;
435        }
436
437        info!("Cancelled all active agents");
438    }
439
440    /// Get status of an agent
441    pub async fn get_agent_status(&self, agent_id: Uuid) -> Option<SubagentStatus> {
442        self.active_agents
443            .get(&agent_id)
444            .map(|handle| handle.status.clone())
445            .and_then(|status| {
446                tokio::task::block_in_place(|| {
447                    tokio::runtime::Handle::current()
448                        .block_on(async { Some(status.read().await.clone()) })
449                })
450            })
451    }
452
453    /// Get all active agents
454    pub fn get_active_agents(&self) -> Vec<(Uuid, String, SubagentStatus)> {
455        self.active_agents
456            .iter()
457            .map(|entry| {
458                let handle = entry.value();
459                let status = tokio::task::block_in_place(|| {
460                    tokio::runtime::Handle::current()
461                        .block_on(async { handle.status.read().await.clone() })
462                });
463                (*entry.key(), handle.name.clone(), status)
464            })
465            .collect()
466    }
467
468    /// Get agent statistics
469    pub async fn get_stats(&self, agent_name: Option<&str>) -> HashMap<String, AgentStats> {
470        let stats = self.stats.read().await;
471
472        match agent_name {
473            Some(name) => stats
474                .get(name)
475                .map(|s| HashMap::from([(name.to_string(), s.clone())]))
476                .unwrap_or_default(),
477            None => stats.clone(),
478        }
479    }
480
481    /// Handle agent communication via message bus
482    pub async fn handle_communication(&self) -> Result<(), SubagentError> {
483        let mut receiver = self.message_bus.subscribe();
484
485        loop {
486            tokio::select! {
487                Ok(message) = receiver.recv() => {
488                    self.process_message(message).await?;
489                }
490                _ = tokio::time::sleep(Duration::from_secs(1)) => {
491                    if self.global_cancel.load(Ordering::Acquire) {
492                        break;
493                    }
494                }
495            }
496        }
497
498        Ok(())
499    }
500
501    /// Process a message from the message bus
502    async fn process_message(&self, message: AgentMessage) -> Result<(), SubagentError> {
503        debug!("Processing message: {:?}", message.message_type);
504
505        match message.message_type {
506            MessageType::Request => {
507                // Handle inter-agent requests
508                if let MessageTarget::Agent(target_id) = &message.to
509                    && let Ok(target_uuid) = Uuid::parse_str(target_id)
510                    && let Some(handle) = self.active_agents.get(&target_uuid)
511                {
512                    // Agent-specific handling would go here
513                    debug!("Forwarding request to agent {}", handle.name);
514                }
515            }
516            MessageType::Response | MessageType::Result => {
517                // Log results for monitoring
518                info!("Agent result received: {:?}", message.payload);
519            }
520            MessageType::Error => {
521                // Log errors for debugging
522                error!("Agent error: {:?}", message.payload);
523            }
524            MessageType::Info => {
525                // Status updates for monitoring
526                debug!("Agent info: {:?}", message.payload);
527            }
528            MessageType::Coordination => {
529                // Coordination messages between agents
530                debug!("Agent coordination: {:?}", message.payload);
531            }
532            _ => {}
533        }
534
535        Ok(())
536    }
537
538    /// Support context inheritance between agents
539    pub async fn inherit_context(
540        &self,
541        from_agent: Uuid,
542        to_context: &mut SubagentContext,
543    ) -> Result<(), SubagentError> {
544        // Get message history from the source agent
545        let messages = self.message_bus.get_history(Some(10)).await;
546
547        let relevant_messages: Vec<_> = messages
548            .into_iter()
549            .filter(|m| m.from == from_agent.to_string())
550            .collect();
551
552        // Add relevant context to the new agent
553        for message in relevant_messages {
554            if message.message_type == MessageType::Result {
555                to_context.conversation_context.push('\n');
556                if let Some(summary) = message.payload.get("summary")
557                    && let Some(s) = summary.as_str()
558                {
559                    to_context.conversation_context.push_str(s);
560                }
561            }
562        }
563
564        Ok(())
565    }
566
567    /// Create a context for agent execution
568    fn create_context(&self, context_str: String, mode: Option<OperatingMode>) -> SubagentContext {
569        SubagentContext {
570            execution_id: Uuid::new_v4(),
571            mode: mode.unwrap_or(OperatingMode::Build),
572            available_tools: vec![
573                "search".to_string(),
574                "edit".to_string(),
575                "think".to_string(),
576                "plan".to_string(),
577            ],
578            conversation_context: context_str,
579            working_directory: std::env::current_dir().unwrap_or_default(),
580            parameters: HashMap::new(),
581            metadata: HashMap::new(),
582        }
583    }
584
585    /// Update agent statistics
586    async fn update_stats<F>(&self, agent_name: &str, updater: F)
587    where
588        F: FnOnce(&mut AgentStats),
589    {
590        let mut stats = self.stats.write().await;
591        let agent_stats = stats
592            .entry(agent_name.to_string())
593            .or_insert_with(|| AgentStats {
594                total_spawned: 0,
595                total_completed: 0,
596                total_failed: 0,
597                total_cancelled: 0,
598                avg_execution_time: Duration::from_secs(0),
599                last_execution: None,
600            });
601        updater(agent_stats);
602    }
603
604    /// Clone manager for async operations
605    fn clone_for_async(&self) -> Self {
606        Self {
607            registry: self.registry.clone(),
608            _parser: AgentParser::with_registry(self.registry.clone()),
609            orchestrator: self.orchestrator.clone(),
610            active_agents: self.active_agents.clone(),
611            message_bus: self.message_bus.clone(),
612            global_cancel: self.global_cancel.clone(),
613            stats: self.stats.clone(),
614            ast_tools: self.ast_tools.clone(),
615        }
616    }
617}
618
619/// Track agent status and results
620#[derive(Debug, Clone)]
621pub struct AgentTracker {
622    pub agent_id: Uuid,
623    pub agent_name: String,
624    pub status: SubagentStatus,
625    pub result: Option<AgentResult>,
626    pub started_at: SystemTime,
627    pub completed_at: Option<SystemTime>,
628}
629
630#[cfg(test)]
631mod tests {
632    use super::*;
633
634    #[tokio::test]
635    async fn test_agent_manager_creation() {
636        let registry = Arc::new(SubagentRegistry::new().unwrap());
637        let config = OrchestratorConfig::default();
638        let manager = AgentManager::new(registry.clone(), config);
639
640        assert_eq!(manager.get_active_agents().len(), 0);
641    }
642
643    #[tokio::test]
644    async fn test_message_bus() {
645        let bus = MessageBus::new(100);
646
647        let message = AgentMessage {
648            id: Uuid::new_v4(),
649            from: Uuid::new_v4().to_string(),
650            to: MessageTarget::Broadcast,
651            message_type: MessageType::Info,
652            priority: MessagePriority::Normal,
653            payload: serde_json::json!({
654                "content": "Test message"
655            }),
656            timestamp: Utc::now(),
657        };
658
659        bus.broadcast(message.clone()).await.unwrap();
660
661        let history = bus.get_history(None).await;
662        assert_eq!(history.len(), 1);
663        assert_eq!(history[0].payload["content"], "Test message");
664    }
665
666    #[test]
667    fn test_context_creation() {
668        let registry = Arc::new(SubagentRegistry::new().unwrap());
669        let config = OrchestratorConfig::default();
670        let manager = AgentManager::new(registry.clone(), config);
671
672        let context = manager.create_context("test context".to_string(), Some(OperatingMode::Plan));
673
674        assert_eq!(context.mode, OperatingMode::Plan);
675        assert_eq!(context.conversation_context, "test context");
676        assert!(!context.available_tools.is_empty());
677    }
678}