Skip to main content

cortexai_agents/
engine.rs

1//! Agent execution engine with ReACT loop
2//!
3//! Features:
4//! - Tracing instrumentation for observability (Jaeger, Honeycomb compatible)
5//! - Configurable timeout per agent to prevent hangs
6//! - Error injection into context for self-correction
7//! - Cost tracking integration for LLM API usage monitoring
8
9use dashmap::DashMap;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::{mpsc, RwLock};
13use tokio::time::timeout;
14use tracing::{debug, debug_span, error, info, instrument, warn, Instrument};
15
16use cortexai_core::types::PlanningMode;
17use cortexai_core::*;
18use cortexai_monitoring::CostTracker;
19use cortexai_providers::*;
20use cortexai_tools::ToolRegistry;
21
22use crate::executor::ToolExecutor;
23use crate::memory::AgentMemory;
24use crate::planning::{check_stop_words, PlanGenerator, StepExecutionContext};
25
26/// Agent engine managing multiple agents
27#[derive(Clone)]
28pub struct AgentEngine {
29    agents: Arc<DashMap<AgentId, Arc<AgentRuntime>>>,
30    metrics: Arc<EngineMetrics>,
31    cost_tracker: Option<Arc<CostTracker>>,
32}
33
34/// Engine metrics with atomic counters for lock-free updates
35#[derive(Default)]
36pub struct EngineMetrics {
37    pub agents_spawned: std::sync::atomic::AtomicU64,
38    pub agents_active: std::sync::atomic::AtomicU64,
39    pub messages_processed: std::sync::atomic::AtomicU64,
40    pub messages_failed: std::sync::atomic::AtomicU64,
41    pub total_tool_calls: std::sync::atomic::AtomicU64,
42    pub timeouts: std::sync::atomic::AtomicU64,
43}
44
45/// Agent runtime state
46pub struct AgentRuntime {
47    pub config: AgentConfig,
48    pub state: Arc<RwLock<AgentState>>,
49    pub memory: Arc<AgentMemory>,
50    pub tool_registry: Arc<ToolRegistry>,
51    pub backend: Arc<dyn LLMBackend>,
52    pub executor: ToolExecutor,
53    pub inbox_tx: mpsc::UnboundedSender<Message>,
54    processing_task: parking_lot::Mutex<Option<tokio::task::JoinHandle<()>>>,
55}
56
57impl AgentEngine {
58    pub fn new() -> Self {
59        Self {
60            agents: Arc::new(DashMap::new()),
61            metrics: Arc::new(EngineMetrics::default()),
62            cost_tracker: None,
63        }
64    }
65
66    /// Create an engine with cost tracking enabled
67    pub fn with_cost_tracker(cost_tracker: Arc<CostTracker>) -> Self {
68        Self {
69            agents: Arc::new(DashMap::new()),
70            metrics: Arc::new(EngineMetrics::default()),
71            cost_tracker: Some(cost_tracker),
72        }
73    }
74
75    /// Set or replace the cost tracker
76    pub fn set_cost_tracker(&mut self, cost_tracker: Arc<CostTracker>) {
77        self.cost_tracker = Some(cost_tracker);
78    }
79
80    /// Get the cost tracker (if configured)
81    pub fn cost_tracker(&self) -> Option<&Arc<CostTracker>> {
82        self.cost_tracker.as_ref()
83    }
84
85    /// Spawn a new agent with tracing instrumentation
86    #[instrument(skip(self, tool_registry, backend), fields(agent_id = %config.id, agent_name = %config.name))]
87    pub async fn spawn_agent(
88        &self,
89        config: AgentConfig,
90        tool_registry: Arc<ToolRegistry>,
91        backend: Arc<dyn LLMBackend>,
92    ) -> Result<AgentId, AgentError> {
93        use std::sync::atomic::Ordering;
94
95        let (inbox_tx, mut inbox_rx) = mpsc::unbounded_channel::<Message>();
96
97        let agent_id = config.id.clone();
98        let state = Arc::new(RwLock::new(AgentState::new()));
99        let memory = Arc::new(AgentMemory::new(config.memory_config.clone()));
100        let executor = ToolExecutor::new(10); // 10 concurrent tool calls
101
102        // Clone everything needed for the processing task
103        let config_clone = config.clone();
104        let state_clone = state.clone();
105        let memory_clone = memory.clone();
106        let registry_clone = tool_registry.clone();
107        let backend_clone = backend.clone();
108        let executor_clone = executor.clone();
109        let engine_clone = self.clone();
110        let metrics_clone = self.metrics.clone();
111        let cost_tracker_clone = self.cost_tracker.clone();
112
113        // Create a span for the agent's processing loop
114        let agent_loop_span = tracing::info_span!(
115            "agent_loop",
116            agent_id = %agent_id,
117            agent_name = %config.name
118        );
119
120        // Spawn processing task with tracing
121        let processing_task = tokio::spawn(
122            async move {
123                while let Some(message) = inbox_rx.recv().await {
124                    // Create a span for each message processing
125                    let msg_span = debug_span!(
126                        "process_message",
127                        from = %message.from,
128                        iteration = tracing::field::Empty
129                    );
130
131                    async {
132                        let start = std::time::Instant::now();
133                        let timeout_duration = Duration::from_secs(config_clone.timeout_secs);
134
135                        info!(
136                            timeout_secs = config_clone.timeout_secs,
137                            "Processing message from {}", message.from
138                        );
139
140                        // Wrap processing in a timeout
141                        let process_result = timeout(
142                            timeout_duration,
143                            Self::process_message(
144                                &config_clone,
145                                &state_clone,
146                                &memory_clone,
147                                &registry_clone,
148                                &backend_clone,
149                                &executor_clone,
150                                &metrics_clone,
151                                cost_tracker_clone.as_ref(),
152                                message.clone(),
153                            ),
154                        )
155                        .await;
156
157                        match process_result {
158                            Ok(Ok(responses)) => {
159                                // Successfully processed
160                                for response in responses {
161                                    if let Err(e) = engine_clone.send_message(response) {
162                                        error!("Failed to route response: {}", e);
163                                    }
164                                }
165                                metrics_clone
166                                    .messages_processed
167                                    .fetch_add(1, Ordering::Relaxed);
168
169                                let latency = start.elapsed();
170                                if latency.as_millis() > 500 {
171                                    warn!(
172                                        latency_ms = latency.as_millis(),
173                                        "Slow processing detected"
174                                    );
175                                }
176                            }
177                            Ok(Err(e)) => {
178                                // Processing error - restore state to Idle
179                                error!("Processing error: {}", e);
180                                {
181                                    let mut state_write = state_clone.write().await;
182                                    state_write.status = AgentStatus::Idle;
183                                }
184                                metrics_clone
185                                    .messages_failed
186                                    .fetch_add(1, Ordering::Relaxed);
187                            }
188                            Err(_) => {
189                                // Timeout - restore state to Idle
190                                error!(
191                                    timeout_secs = config_clone.timeout_secs,
192                                    "Agent timeout processing message"
193                                );
194                                {
195                                    let mut state_write = state_clone.write().await;
196                                    state_write.status = AgentStatus::Idle;
197                                }
198                                metrics_clone.timeouts.fetch_add(1, Ordering::Relaxed);
199                                metrics_clone
200                                    .messages_failed
201                                    .fetch_add(1, Ordering::Relaxed);
202                            }
203                        }
204                    }
205                    .instrument(msg_span)
206                    .await;
207                }
208
209                info!("Agent processing task stopped");
210            }
211            .instrument(agent_loop_span),
212        );
213
214        let runtime = Arc::new(AgentRuntime {
215            config: config.clone(),
216            state,
217            memory,
218            tool_registry,
219            backend,
220            executor,
221            inbox_tx,
222            processing_task: parking_lot::Mutex::new(Some(processing_task)),
223        });
224
225        self.agents.insert(agent_id.clone(), runtime);
226
227        self.metrics.agents_spawned.fetch_add(1, Ordering::Relaxed);
228        self.metrics.agents_active.fetch_add(1, Ordering::Relaxed);
229
230        info!("Agent spawned successfully");
231
232        Ok(agent_id)
233    }
234
235    /// Process a single message with ReACT loop (Reason -> Act -> Observe)
236    /// Supports planning mode for structured task execution
237    #[instrument(
238        skip(state, memory, tool_registry, backend, executor, metrics, cost_tracker, message),
239        fields(agent_id = %config.id, max_iterations = config.max_iterations, planning_mode = ?config.planning_mode)
240    )]
241    async fn process_message(
242        config: &AgentConfig,
243        state: &Arc<RwLock<AgentState>>,
244        memory: &Arc<AgentMemory>,
245        tool_registry: &Arc<ToolRegistry>,
246        backend: &Arc<dyn LLMBackend>,
247        executor: &ToolExecutor,
248        metrics: &Arc<EngineMetrics>,
249        cost_tracker: Option<&Arc<CostTracker>>,
250        message: Message,
251    ) -> Result<Vec<Message>, AgentError> {
252        let mut responses = Vec::new();
253
254        // Update state
255        {
256            let mut state_write = state.write().await;
257            state_write.status = AgentStatus::Processing;
258            state_write.iteration = 0;
259        }
260
261        // Add message to memory
262        memory.add_message(message.clone()).await?;
263
264        // Check if planning mode is enabled
265        if config.planning_mode.is_enabled() {
266            return Self::process_with_planning(
267                config,
268                state,
269                memory,
270                tool_registry,
271                backend,
272                executor,
273                metrics,
274                cost_tracker,
275                message,
276            )
277            .await;
278        }
279
280        // ReACT Loop: Reason -> Act -> Observe (no planning)
281        let mut iteration = 0;
282        let max_iterations = config.max_iterations;
283
284        while iteration < max_iterations {
285            iteration += 1;
286
287            // Create a span for each iteration
288            let iter_span = debug_span!("react_iteration", n = iteration, max = max_iterations);
289            let _iter_guard = iter_span.enter();
290
291            {
292                let mut state_write = state.write().await;
293                state_write.iteration = iteration;
294                state_write.status = AgentStatus::Thinking;
295            }
296
297            debug!(iteration, max_iterations, "ReACT iteration");
298
299            // 1. REASON: Get conversation history and available tools
300            let history = memory.get_history().await?;
301            let tool_schemas = tool_registry.list_schemas();
302
303            // Convert to LLM format
304            let mut llm_messages = Vec::new();
305
306            // Add system prompt
307            if let Some(system_prompt) = &config.system_prompt {
308                llm_messages.push(LLMMessage::system(system_prompt));
309            }
310
311            // Add conversation history
312            for msg in history {
313                match &msg.content {
314                    Content::Text(text) => {
315                        let role = if msg.from == config.id {
316                            MessageRole::Assistant
317                        } else {
318                            MessageRole::User
319                        };
320                        llm_messages.push(LLMMessage {
321                            role,
322                            content: text.clone(),
323                            tool_calls: None,
324                            tool_call_id: None,
325                            name: None,
326                        });
327                    }
328                    Content::ToolCall(calls) => {
329                        llm_messages.push(LLMMessage::assistant_with_tools(calls.clone()));
330                    }
331                    Content::ToolResult(results) => {
332                        for result in results {
333                            let content = if result.success {
334                                serde_json::to_string_pretty(&result.data).unwrap_or_default()
335                            } else {
336                                result.error.clone().unwrap_or_default()
337                            };
338                            llm_messages.push(LLMMessage::tool(
339                                result.call_id.clone(),
340                                "tool_result".to_string(),
341                                content,
342                            ));
343                        }
344                    }
345                    _ => {}
346                }
347            }
348
349            // 2. Call LLM with cost tracking
350            let infer_start = std::time::Instant::now();
351            let inference = backend
352                .infer(&llm_messages, &tool_schemas, config.temperature)
353                .await;
354            let infer_latency_ms = infer_start.elapsed().as_secs_f64() * 1000.0;
355
356            // Record cost if tracker is configured
357            if let Some(tracker) = cost_tracker {
358                let model_info = backend.model_info();
359                match &inference {
360                    Ok(inf) => {
361                        tracker.record_request_detailed(
362                            &model_info.model,
363                            inf.token_usage.prompt_tokens as u64,
364                            inf.token_usage.completion_tokens as u64,
365                            inf.token_usage.cached_tokens.unwrap_or(0) as u64,
366                            infer_latency_ms,
367                            Some(config.id.0.as_str()),
368                            true,
369                        );
370                    }
371                    Err(_) => {
372                        tracker.record_request_detailed(
373                            &model_info.model,
374                            0,
375                            0,
376                            0,
377                            infer_latency_ms,
378                            Some(config.id.0.as_str()),
379                            false,
380                        );
381                    }
382                }
383            }
384
385            let inference = inference?;
386
387            debug!(
388                content_len = inference.content.len(),
389                tool_calls = inference.tool_calls.as_ref().map(|c| c.len()).unwrap_or(0),
390                "LLM inference complete"
391            );
392
393            // 3. ACT: Check if agent wants to use tools
394            if let Some(tool_calls) = &inference.tool_calls {
395                if !tool_calls.is_empty() {
396                    {
397                        let mut state_write = state.write().await;
398                        state_write.status = AgentStatus::ExecutingTool;
399                    }
400
401                    info!(num_calls = tool_calls.len(), "Executing tool calls");
402
403                    // Track tool call metrics
404                    metrics.total_tool_calls.fetch_add(
405                        tool_calls.len() as u64,
406                        std::sync::atomic::Ordering::Relaxed,
407                    );
408
409                    // Store tool call in memory
410                    let tool_call_msg = Message::new(
411                        config.id.clone(),
412                        config.id.clone(),
413                        Content::ToolCall(tool_calls.clone()),
414                    );
415                    memory.add_message(tool_call_msg).await?;
416
417                    // Execute tools in parallel
418                    let results = executor
419                        .execute_tools(tool_calls, tool_registry, &config.id)
420                        .await;
421
422                    // Store results in memory
423                    let result_msg = Message::new(
424                        AgentId::new("system"),
425                        config.id.clone(),
426                        Content::ToolResult(results),
427                    );
428                    memory.add_message(result_msg).await?;
429
430                    // Continue loop to process tool results
431                    continue;
432                }
433            }
434
435            // 4. RESPOND: Agent has final answer
436            if !inference.content.is_empty() {
437                // Check for stop words
438                if !config.stop_words.is_empty() {
439                    if let Some(stop_word) =
440                        check_stop_words(&inference.content, &config.stop_words)
441                    {
442                        info!(stop_word = %stop_word, "Stop word detected, terminating");
443                        let mut state_write = state.write().await;
444                        state_write.status = AgentStatus::StoppedByStopWord;
445                        drop(state_write);
446
447                        let response = Message::new(
448                            config.id.clone(),
449                            message.from.clone(),
450                            Content::Text(inference.content.clone()),
451                        );
452                        memory.add_message(response.clone()).await?;
453                        responses.push(response);
454                        return Ok(responses);
455                    }
456                }
457
458                let response = Message::new(
459                    config.id.clone(),
460                    message.from.clone(),
461                    Content::Text(inference.content.clone()),
462                );
463
464                // Store own response in memory
465                memory.add_message(response.clone()).await?;
466
467                responses.push(response);
468                break;
469            }
470
471            // Safety: if LLM returns nothing, break
472            if inference.content.is_empty() && inference.tool_calls.is_none() {
473                warn!("Agent {} LLM returned empty response", config.id);
474                break;
475            }
476        }
477
478        // Update state to idle before returning (success or error)
479        {
480            let mut state_write = state.write().await;
481            state_write.status = AgentStatus::Idle;
482        }
483
484        if iteration >= max_iterations {
485            warn!(iterations = iteration, "Max iterations reached");
486            return Err(AgentError::MaxIterationsExceeded);
487        }
488
489        Ok(responses)
490    }
491
492    /// Process message with planning mode enabled
493    #[instrument(
494        skip(state, memory, tool_registry, backend, executor, metrics, cost_tracker, message),
495        fields(agent_id = %config.id, planning_mode = ?config.planning_mode)
496    )]
497    async fn process_with_planning(
498        config: &AgentConfig,
499        state: &Arc<RwLock<AgentState>>,
500        memory: &Arc<AgentMemory>,
501        tool_registry: &Arc<ToolRegistry>,
502        backend: &Arc<dyn LLMBackend>,
503        executor: &ToolExecutor,
504        metrics: &Arc<EngineMetrics>,
505        cost_tracker: Option<&Arc<CostTracker>>,
506        message: Message,
507    ) -> Result<Vec<Message>, AgentError> {
508        let mut responses = Vec::new();
509
510        // Extract goal from message
511        let goal = match &message.content {
512            Content::Text(text) => text.clone(),
513            _ => {
514                return Err(AgentError::ProcessingError(
515                    "Planning requires text message".into(),
516                ))
517            }
518        };
519
520        // 1. PLANNING PHASE: Generate execution plan
521        {
522            let mut state_write = state.write().await;
523            state_write.status = AgentStatus::Planning;
524        }
525
526        info!(goal = %goal, "Generating execution plan");
527
528        let tool_schemas = tool_registry.list_schemas();
529        let planning_prompt = PlanGenerator::create_planning_prompt(&goal, &tool_schemas);
530
531        let plan_messages = vec![
532            config
533                .system_prompt
534                .as_ref()
535                .map(LLMMessage::system)
536                .unwrap_or_else(|| {
537                    LLMMessage::system("You are a planning agent. Create detailed execution plans.")
538                }),
539            LLMMessage::user(&planning_prompt),
540        ];
541
542        let infer_start = std::time::Instant::now();
543        let plan_inference = backend
544            .infer(&plan_messages, &[], config.temperature)
545            .await?;
546        let infer_latency_ms = infer_start.elapsed().as_secs_f64() * 1000.0;
547
548        // Record cost for planning
549        if let Some(tracker) = cost_tracker {
550            let model_info = backend.model_info();
551            tracker.record_request_detailed(
552                &model_info.model,
553                plan_inference.token_usage.prompt_tokens as u64,
554                plan_inference.token_usage.completion_tokens as u64,
555                plan_inference.token_usage.cached_tokens.unwrap_or(0) as u64,
556                infer_latency_ms,
557                Some(config.id.0.as_str()),
558                true,
559            );
560        }
561
562        // Parse the plan
563        let mut plan = match PlanGenerator::parse_plan(&goal, &plan_inference.content) {
564            Ok(p) => {
565                info!(steps = p.steps.len(), "Plan generated successfully");
566                p
567            }
568            Err(e) => {
569                warn!(error = %e, "Failed to parse plan, returning raw response");
570                // Return the LLM response directly instead of recursing
571                let response = Message::new(
572                    config.id.clone(),
573                    message.from.clone(),
574                    Content::Text(plan_inference.content.clone()),
575                );
576                memory.add_message(response.clone()).await?;
577
578                let mut state_write = state.write().await;
579                state_write.status = AgentStatus::Idle;
580                drop(state_write);
581
582                return Ok(vec![response]);
583            }
584        };
585
586        // Store plan in state
587        {
588            let mut state_write = state.write().await;
589            state_write.set_plan(plan.clone());
590            state_write.status = AgentStatus::ExecutingPlan;
591        }
592
593        // 2. EXECUTION PHASE: Execute each step
594        while plan.advance() {
595            let step = plan.current_step().unwrap().clone();
596
597            info!(
598                step = step.step_number,
599                total = plan.steps.len(),
600                description = %step.description,
601                "Executing plan step"
602            );
603
604            let step_ctx = StepExecutionContext::from_step(&step, &plan);
605
606            // Create messages for step execution
607            let mut step_messages = vec![];
608
609            if let Some(sys_prompt) = &config.system_prompt {
610                step_messages.push(LLMMessage::system(sys_prompt));
611            }
612            step_messages.push(LLMMessage::user(&step_ctx.prompt));
613
614            // Execute step with ReACT loop
615            let step_result = Self::execute_step(
616                config,
617                state,
618                memory,
619                tool_registry,
620                backend,
621                executor,
622                metrics,
623                cost_tracker,
624                step_messages,
625            )
626            .await?;
627
628            // Mark step completed
629            plan.mark_current_completed(&step_result);
630
631            // Update state with progress
632            {
633                let mut state_write = state.write().await;
634                state_write.current_plan = Some(plan.clone());
635            }
636
637            // Check for stop words in step result
638            if !config.stop_words.is_empty() {
639                if let Some(stop_word) = check_stop_words(&step_result, &config.stop_words) {
640                    info!(stop_word = %stop_word, "Stop word detected during plan execution");
641                    let mut state_write = state.write().await;
642                    state_write.status = AgentStatus::StoppedByStopWord;
643                    break;
644                }
645            }
646
647            // Adaptive re-planning
648            if config.planning_mode == PlanningMode::Adaptive && !plan.is_complete() {
649                let replan_prompt = PlanGenerator::create_replan_prompt(&plan, &step_result);
650                let replan_messages = vec![
651                    LLMMessage::system(
652                        "You are a planning agent. Review progress and adjust the plan if needed.",
653                    ),
654                    LLMMessage::user(&replan_prompt),
655                ];
656
657                if let Ok(replan_inference) = backend
658                    .infer(&replan_messages, &[], config.temperature)
659                    .await
660                {
661                    if let Ok(modified) =
662                        PlanGenerator::apply_replan(&mut plan, &replan_inference.content)
663                    {
664                        if modified {
665                            info!("Plan was modified based on step results");
666                            let mut state_write = state.write().await;
667                            state_write.current_plan = Some(plan.clone());
668                        }
669                    }
670                }
671            }
672        }
673
674        // 3. COMPLETION: Generate final response
675        plan.completed = true;
676        plan.success = plan.steps.iter().all(|s| s.completed);
677
678        let summary = format!(
679            "Plan completed. Goal: {}\nSteps executed: {}/{}\nSuccess: {}",
680            plan.goal,
681            plan.steps.iter().filter(|s| s.completed).count(),
682            plan.steps.len(),
683            plan.success
684        );
685
686        let response = Message::new(
687            config.id.clone(),
688            message.from.clone(),
689            Content::Text(summary),
690        );
691
692        memory.add_message(response.clone()).await?;
693        responses.push(response);
694
695        // Update final state
696        {
697            let mut state_write = state.write().await;
698            state_write.status = AgentStatus::Idle;
699            state_write.current_plan = Some(plan);
700        }
701
702        Ok(responses)
703    }
704
705    /// Execute a single step using ReACT loop
706    async fn execute_step(
707        config: &AgentConfig,
708        state: &Arc<RwLock<AgentState>>,
709        memory: &Arc<AgentMemory>,
710        tool_registry: &Arc<ToolRegistry>,
711        backend: &Arc<dyn LLMBackend>,
712        executor: &ToolExecutor,
713        metrics: &Arc<EngineMetrics>,
714        cost_tracker: Option<&Arc<CostTracker>>,
715        mut messages: Vec<LLMMessage>,
716    ) -> Result<String, AgentError> {
717        let tool_schemas = tool_registry.list_schemas();
718        let mut iteration = 0;
719        let max_iterations = config.max_iterations.min(5); // Limit per-step iterations
720
721        loop {
722            iteration += 1;
723            if iteration > max_iterations {
724                return Err(AgentError::MaxIterationsExceeded);
725            }
726
727            let infer_start = std::time::Instant::now();
728            let inference = backend
729                .infer(&messages, &tool_schemas, config.temperature)
730                .await?;
731            let infer_latency_ms = infer_start.elapsed().as_secs_f64() * 1000.0;
732
733            if let Some(tracker) = cost_tracker {
734                let model_info = backend.model_info();
735                tracker.record_request_detailed(
736                    &model_info.model,
737                    inference.token_usage.prompt_tokens as u64,
738                    inference.token_usage.completion_tokens as u64,
739                    inference.token_usage.cached_tokens.unwrap_or(0) as u64,
740                    infer_latency_ms,
741                    Some(config.id.0.as_str()),
742                    true,
743                );
744            }
745
746            // Handle tool calls
747            if let Some(tool_calls) = &inference.tool_calls {
748                if !tool_calls.is_empty() {
749                    {
750                        let mut state_write = state.write().await;
751                        state_write.status = AgentStatus::ExecutingTool;
752                    }
753
754                    metrics.total_tool_calls.fetch_add(
755                        tool_calls.len() as u64,
756                        std::sync::atomic::Ordering::Relaxed,
757                    );
758
759                    // Store tool call
760                    let tool_call_msg = Message::new(
761                        config.id.clone(),
762                        config.id.clone(),
763                        Content::ToolCall(tool_calls.clone()),
764                    );
765                    memory.add_message(tool_call_msg).await?;
766
767                    // Execute tools
768                    let results = executor
769                        .execute_tools(tool_calls, tool_registry, &config.id)
770                        .await;
771
772                    // Add tool results to messages
773                    messages.push(LLMMessage::assistant_with_tools(tool_calls.clone()));
774                    for result in &results {
775                        let content = if result.success {
776                            serde_json::to_string_pretty(&result.data).unwrap_or_default()
777                        } else {
778                            result.error.clone().unwrap_or_default()
779                        };
780                        messages.push(LLMMessage::tool(
781                            result.call_id.clone(),
782                            "tool_result".to_string(),
783                            content,
784                        ));
785                    }
786
787                    // Store results
788                    let result_msg = Message::new(
789                        AgentId::new("system"),
790                        config.id.clone(),
791                        Content::ToolResult(results),
792                    );
793                    memory.add_message(result_msg).await?;
794
795                    {
796                        let mut state_write = state.write().await;
797                        state_write.status = AgentStatus::ExecutingPlan;
798                    }
799
800                    continue;
801                }
802            }
803
804            // Return response
805            if !inference.content.is_empty() {
806                return Ok(inference.content);
807            }
808
809            // Empty response
810            return Ok("Step completed without explicit output".to_string());
811        }
812    }
813
814    /// Send a message to an agent
815    #[instrument(skip(self), fields(to = %message.to, from = %message.from))]
816    pub fn send_message(&self, message: Message) -> Result<(), AgentError> {
817        if let Some(agent) = self.agents.get(&message.to) {
818            agent
819                .inbox_tx
820                .send(message)
821                .map_err(|e| AgentError::SendError(e.to_string()))?;
822
823            debug!("Message sent successfully");
824            Ok(())
825        } else {
826            warn!("Agent not found");
827            Err(AgentError::AgentNotFound(message.to))
828        }
829    }
830
831    /// Get agent by ID
832    pub fn get_agent(&self, id: &AgentId) -> Option<Arc<AgentRuntime>> {
833        self.agents.get(id).map(|r| r.clone())
834    }
835
836    /// Get agent count
837    pub fn agent_count(&self) -> usize {
838        self.agents.len()
839    }
840
841    /// Get metrics
842    pub fn metrics(&self) -> EngineMetricsSnapshot {
843        use std::sync::atomic::Ordering;
844        EngineMetricsSnapshot {
845            agents_spawned: self.metrics.agents_spawned.load(Ordering::Relaxed),
846            agents_active: self.metrics.agents_active.load(Ordering::Relaxed),
847            messages_processed: self.metrics.messages_processed.load(Ordering::Relaxed),
848            messages_failed: self.metrics.messages_failed.load(Ordering::Relaxed),
849            total_tool_calls: self.metrics.total_tool_calls.load(Ordering::Relaxed),
850            timeouts: self.metrics.timeouts.load(Ordering::Relaxed),
851        }
852    }
853}
854
855/// Snapshot of engine metrics
856#[derive(Debug, Clone)]
857pub struct EngineMetricsSnapshot {
858    pub agents_spawned: u64,
859    pub agents_active: u64,
860    pub messages_processed: u64,
861    pub messages_failed: u64,
862    pub total_tool_calls: u64,
863    pub timeouts: u64,
864}
865
866impl EngineMetricsSnapshot {
867    pub fn success_rate(&self) -> f64 {
868        let total = self.messages_processed + self.messages_failed;
869        if total > 0 {
870            self.messages_processed as f64 / total as f64
871        } else {
872            1.0
873        }
874    }
875}
876
877impl AgentEngine {
878    /// Stop an agent
879    #[instrument(skip(self), fields(agent_id = %id))]
880    pub async fn stop_agent(&self, id: &AgentId) -> Result<(), AgentError> {
881        use std::sync::atomic::Ordering;
882
883        if let Some((_, runtime)) = self.agents.remove(id) {
884            // Abort the processing task
885            if let Some(handle) = runtime.processing_task.lock().take() {
886                handle.abort();
887            }
888            self.metrics.agents_active.fetch_sub(1, Ordering::Relaxed);
889            info!("Agent stopped");
890            Ok(())
891        } else {
892            warn!("Agent not found");
893            Err(AgentError::AgentNotFound(id.clone()))
894        }
895    }
896
897    /// Stop all agents
898    #[instrument(skip(self))]
899    pub async fn shutdown(&self) {
900        let ids: Vec<AgentId> = self.agents.iter().map(|r| r.key().clone()).collect();
901        info!(agent_count = ids.len(), "Shutting down all agents");
902        for id in ids {
903            let _ = self.stop_agent(&id).await;
904        }
905    }
906}
907
908impl Default for AgentEngine {
909    fn default() -> Self {
910        Self::new()
911    }
912}
913
914impl Drop for AgentEngine {
915    fn drop(&mut self) {
916        // Note: We can't use async in Drop, so agents will be aborted when tasks are dropped
917        debug!("AgentEngine dropped");
918    }
919}