1use dashmap::DashMap;
10use std::sync::Arc;
11use std::time::Duration;
12use tokio::sync::{mpsc, RwLock};
13use tokio::time::timeout;
14use tracing::{debug, debug_span, error, info, instrument, warn, Instrument};
15
16use cortexai_core::types::PlanningMode;
17use cortexai_core::*;
18use cortexai_monitoring::CostTracker;
19use cortexai_providers::*;
20use cortexai_tools::ToolRegistry;
21
22use crate::executor::ToolExecutor;
23use crate::memory::AgentMemory;
24use crate::planning::{check_stop_words, PlanGenerator, StepExecutionContext};
25
26#[derive(Clone)]
28pub struct AgentEngine {
29 agents: Arc<DashMap<AgentId, Arc<AgentRuntime>>>,
30 metrics: Arc<EngineMetrics>,
31 cost_tracker: Option<Arc<CostTracker>>,
32}
33
34#[derive(Default)]
36pub struct EngineMetrics {
37 pub agents_spawned: std::sync::atomic::AtomicU64,
38 pub agents_active: std::sync::atomic::AtomicU64,
39 pub messages_processed: std::sync::atomic::AtomicU64,
40 pub messages_failed: std::sync::atomic::AtomicU64,
41 pub total_tool_calls: std::sync::atomic::AtomicU64,
42 pub timeouts: std::sync::atomic::AtomicU64,
43}
44
45pub struct AgentRuntime {
47 pub config: AgentConfig,
48 pub state: Arc<RwLock<AgentState>>,
49 pub memory: Arc<AgentMemory>,
50 pub tool_registry: Arc<ToolRegistry>,
51 pub backend: Arc<dyn LLMBackend>,
52 pub executor: ToolExecutor,
53 pub inbox_tx: mpsc::UnboundedSender<Message>,
54 processing_task: parking_lot::Mutex<Option<tokio::task::JoinHandle<()>>>,
55}
56
57impl AgentEngine {
58 pub fn new() -> Self {
59 Self {
60 agents: Arc::new(DashMap::new()),
61 metrics: Arc::new(EngineMetrics::default()),
62 cost_tracker: None,
63 }
64 }
65
66 pub fn with_cost_tracker(cost_tracker: Arc<CostTracker>) -> Self {
68 Self {
69 agents: Arc::new(DashMap::new()),
70 metrics: Arc::new(EngineMetrics::default()),
71 cost_tracker: Some(cost_tracker),
72 }
73 }
74
75 pub fn set_cost_tracker(&mut self, cost_tracker: Arc<CostTracker>) {
77 self.cost_tracker = Some(cost_tracker);
78 }
79
80 pub fn cost_tracker(&self) -> Option<&Arc<CostTracker>> {
82 self.cost_tracker.as_ref()
83 }
84
85 #[instrument(skip(self, tool_registry, backend), fields(agent_id = %config.id, agent_name = %config.name))]
87 pub async fn spawn_agent(
88 &self,
89 config: AgentConfig,
90 tool_registry: Arc<ToolRegistry>,
91 backend: Arc<dyn LLMBackend>,
92 ) -> Result<AgentId, AgentError> {
93 use std::sync::atomic::Ordering;
94
95 let (inbox_tx, mut inbox_rx) = mpsc::unbounded_channel::<Message>();
96
97 let agent_id = config.id.clone();
98 let state = Arc::new(RwLock::new(AgentState::new()));
99 let memory = Arc::new(AgentMemory::new(config.memory_config.clone()));
100 let executor = ToolExecutor::new(10); let config_clone = config.clone();
104 let state_clone = state.clone();
105 let memory_clone = memory.clone();
106 let registry_clone = tool_registry.clone();
107 let backend_clone = backend.clone();
108 let executor_clone = executor.clone();
109 let engine_clone = self.clone();
110 let metrics_clone = self.metrics.clone();
111 let cost_tracker_clone = self.cost_tracker.clone();
112
113 let agent_loop_span = tracing::info_span!(
115 "agent_loop",
116 agent_id = %agent_id,
117 agent_name = %config.name
118 );
119
120 let processing_task = tokio::spawn(
122 async move {
123 while let Some(message) = inbox_rx.recv().await {
124 let msg_span = debug_span!(
126 "process_message",
127 from = %message.from,
128 iteration = tracing::field::Empty
129 );
130
131 async {
132 let start = std::time::Instant::now();
133 let timeout_duration = Duration::from_secs(config_clone.timeout_secs);
134
135 info!(
136 timeout_secs = config_clone.timeout_secs,
137 "Processing message from {}", message.from
138 );
139
140 let process_result = timeout(
142 timeout_duration,
143 Self::process_message(
144 &config_clone,
145 &state_clone,
146 &memory_clone,
147 ®istry_clone,
148 &backend_clone,
149 &executor_clone,
150 &metrics_clone,
151 cost_tracker_clone.as_ref(),
152 message.clone(),
153 ),
154 )
155 .await;
156
157 match process_result {
158 Ok(Ok(responses)) => {
159 for response in responses {
161 if let Err(e) = engine_clone.send_message(response) {
162 error!("Failed to route response: {}", e);
163 }
164 }
165 metrics_clone
166 .messages_processed
167 .fetch_add(1, Ordering::Relaxed);
168
169 let latency = start.elapsed();
170 if latency.as_millis() > 500 {
171 warn!(
172 latency_ms = latency.as_millis(),
173 "Slow processing detected"
174 );
175 }
176 }
177 Ok(Err(e)) => {
178 error!("Processing error: {}", e);
180 {
181 let mut state_write = state_clone.write().await;
182 state_write.status = AgentStatus::Idle;
183 }
184 metrics_clone
185 .messages_failed
186 .fetch_add(1, Ordering::Relaxed);
187 }
188 Err(_) => {
189 error!(
191 timeout_secs = config_clone.timeout_secs,
192 "Agent timeout processing message"
193 );
194 {
195 let mut state_write = state_clone.write().await;
196 state_write.status = AgentStatus::Idle;
197 }
198 metrics_clone.timeouts.fetch_add(1, Ordering::Relaxed);
199 metrics_clone
200 .messages_failed
201 .fetch_add(1, Ordering::Relaxed);
202 }
203 }
204 }
205 .instrument(msg_span)
206 .await;
207 }
208
209 info!("Agent processing task stopped");
210 }
211 .instrument(agent_loop_span),
212 );
213
214 let runtime = Arc::new(AgentRuntime {
215 config: config.clone(),
216 state,
217 memory,
218 tool_registry,
219 backend,
220 executor,
221 inbox_tx,
222 processing_task: parking_lot::Mutex::new(Some(processing_task)),
223 });
224
225 self.agents.insert(agent_id.clone(), runtime);
226
227 self.metrics.agents_spawned.fetch_add(1, Ordering::Relaxed);
228 self.metrics.agents_active.fetch_add(1, Ordering::Relaxed);
229
230 info!("Agent spawned successfully");
231
232 Ok(agent_id)
233 }
234
235 #[instrument(
238 skip(state, memory, tool_registry, backend, executor, metrics, cost_tracker, message),
239 fields(agent_id = %config.id, max_iterations = config.max_iterations, planning_mode = ?config.planning_mode)
240 )]
241 async fn process_message(
242 config: &AgentConfig,
243 state: &Arc<RwLock<AgentState>>,
244 memory: &Arc<AgentMemory>,
245 tool_registry: &Arc<ToolRegistry>,
246 backend: &Arc<dyn LLMBackend>,
247 executor: &ToolExecutor,
248 metrics: &Arc<EngineMetrics>,
249 cost_tracker: Option<&Arc<CostTracker>>,
250 message: Message,
251 ) -> Result<Vec<Message>, AgentError> {
252 let mut responses = Vec::new();
253
254 {
256 let mut state_write = state.write().await;
257 state_write.status = AgentStatus::Processing;
258 state_write.iteration = 0;
259 }
260
261 memory.add_message(message.clone()).await?;
263
264 if config.planning_mode.is_enabled() {
266 return Self::process_with_planning(
267 config,
268 state,
269 memory,
270 tool_registry,
271 backend,
272 executor,
273 metrics,
274 cost_tracker,
275 message,
276 )
277 .await;
278 }
279
280 let mut iteration = 0;
282 let max_iterations = config.max_iterations;
283
284 while iteration < max_iterations {
285 iteration += 1;
286
287 let iter_span = debug_span!("react_iteration", n = iteration, max = max_iterations);
289 let _iter_guard = iter_span.enter();
290
291 {
292 let mut state_write = state.write().await;
293 state_write.iteration = iteration;
294 state_write.status = AgentStatus::Thinking;
295 }
296
297 debug!(iteration, max_iterations, "ReACT iteration");
298
299 let history = memory.get_history().await?;
301 let tool_schemas = tool_registry.list_schemas();
302
303 let mut llm_messages = Vec::new();
305
306 if let Some(system_prompt) = &config.system_prompt {
308 llm_messages.push(LLMMessage::system(system_prompt));
309 }
310
311 for msg in history {
313 match &msg.content {
314 Content::Text(text) => {
315 let role = if msg.from == config.id {
316 MessageRole::Assistant
317 } else {
318 MessageRole::User
319 };
320 llm_messages.push(LLMMessage {
321 role,
322 content: text.clone(),
323 tool_calls: None,
324 tool_call_id: None,
325 name: None,
326 });
327 }
328 Content::ToolCall(calls) => {
329 llm_messages.push(LLMMessage::assistant_with_tools(calls.clone()));
330 }
331 Content::ToolResult(results) => {
332 for result in results {
333 let content = if result.success {
334 serde_json::to_string_pretty(&result.data).unwrap_or_default()
335 } else {
336 result.error.clone().unwrap_or_default()
337 };
338 llm_messages.push(LLMMessage::tool(
339 result.call_id.clone(),
340 "tool_result".to_string(),
341 content,
342 ));
343 }
344 }
345 _ => {}
346 }
347 }
348
349 let infer_start = std::time::Instant::now();
351 let inference = backend
352 .infer(&llm_messages, &tool_schemas, config.temperature)
353 .await;
354 let infer_latency_ms = infer_start.elapsed().as_secs_f64() * 1000.0;
355
356 if let Some(tracker) = cost_tracker {
358 let model_info = backend.model_info();
359 match &inference {
360 Ok(inf) => {
361 tracker.record_request_detailed(
362 &model_info.model,
363 inf.token_usage.prompt_tokens as u64,
364 inf.token_usage.completion_tokens as u64,
365 inf.token_usage.cached_tokens.unwrap_or(0) as u64,
366 infer_latency_ms,
367 Some(config.id.0.as_str()),
368 true,
369 );
370 }
371 Err(_) => {
372 tracker.record_request_detailed(
373 &model_info.model,
374 0,
375 0,
376 0,
377 infer_latency_ms,
378 Some(config.id.0.as_str()),
379 false,
380 );
381 }
382 }
383 }
384
385 let inference = inference?;
386
387 debug!(
388 content_len = inference.content.len(),
389 tool_calls = inference.tool_calls.as_ref().map(|c| c.len()).unwrap_or(0),
390 "LLM inference complete"
391 );
392
393 if let Some(tool_calls) = &inference.tool_calls {
395 if !tool_calls.is_empty() {
396 {
397 let mut state_write = state.write().await;
398 state_write.status = AgentStatus::ExecutingTool;
399 }
400
401 info!(num_calls = tool_calls.len(), "Executing tool calls");
402
403 metrics.total_tool_calls.fetch_add(
405 tool_calls.len() as u64,
406 std::sync::atomic::Ordering::Relaxed,
407 );
408
409 let tool_call_msg = Message::new(
411 config.id.clone(),
412 config.id.clone(),
413 Content::ToolCall(tool_calls.clone()),
414 );
415 memory.add_message(tool_call_msg).await?;
416
417 let results = executor
419 .execute_tools(tool_calls, tool_registry, &config.id)
420 .await;
421
422 let result_msg = Message::new(
424 AgentId::new("system"),
425 config.id.clone(),
426 Content::ToolResult(results),
427 );
428 memory.add_message(result_msg).await?;
429
430 continue;
432 }
433 }
434
435 if !inference.content.is_empty() {
437 if !config.stop_words.is_empty() {
439 if let Some(stop_word) =
440 check_stop_words(&inference.content, &config.stop_words)
441 {
442 info!(stop_word = %stop_word, "Stop word detected, terminating");
443 let mut state_write = state.write().await;
444 state_write.status = AgentStatus::StoppedByStopWord;
445 drop(state_write);
446
447 let response = Message::new(
448 config.id.clone(),
449 message.from.clone(),
450 Content::Text(inference.content.clone()),
451 );
452 memory.add_message(response.clone()).await?;
453 responses.push(response);
454 return Ok(responses);
455 }
456 }
457
458 let response = Message::new(
459 config.id.clone(),
460 message.from.clone(),
461 Content::Text(inference.content.clone()),
462 );
463
464 memory.add_message(response.clone()).await?;
466
467 responses.push(response);
468 break;
469 }
470
471 if inference.content.is_empty() && inference.tool_calls.is_none() {
473 warn!("Agent {} LLM returned empty response", config.id);
474 break;
475 }
476 }
477
478 {
480 let mut state_write = state.write().await;
481 state_write.status = AgentStatus::Idle;
482 }
483
484 if iteration >= max_iterations {
485 warn!(iterations = iteration, "Max iterations reached");
486 return Err(AgentError::MaxIterationsExceeded);
487 }
488
489 Ok(responses)
490 }
491
492 #[instrument(
494 skip(state, memory, tool_registry, backend, executor, metrics, cost_tracker, message),
495 fields(agent_id = %config.id, planning_mode = ?config.planning_mode)
496 )]
497 async fn process_with_planning(
498 config: &AgentConfig,
499 state: &Arc<RwLock<AgentState>>,
500 memory: &Arc<AgentMemory>,
501 tool_registry: &Arc<ToolRegistry>,
502 backend: &Arc<dyn LLMBackend>,
503 executor: &ToolExecutor,
504 metrics: &Arc<EngineMetrics>,
505 cost_tracker: Option<&Arc<CostTracker>>,
506 message: Message,
507 ) -> Result<Vec<Message>, AgentError> {
508 let mut responses = Vec::new();
509
510 let goal = match &message.content {
512 Content::Text(text) => text.clone(),
513 _ => {
514 return Err(AgentError::ProcessingError(
515 "Planning requires text message".into(),
516 ))
517 }
518 };
519
520 {
522 let mut state_write = state.write().await;
523 state_write.status = AgentStatus::Planning;
524 }
525
526 info!(goal = %goal, "Generating execution plan");
527
528 let tool_schemas = tool_registry.list_schemas();
529 let planning_prompt = PlanGenerator::create_planning_prompt(&goal, &tool_schemas);
530
531 let plan_messages = vec![
532 config
533 .system_prompt
534 .as_ref()
535 .map(LLMMessage::system)
536 .unwrap_or_else(|| {
537 LLMMessage::system("You are a planning agent. Create detailed execution plans.")
538 }),
539 LLMMessage::user(&planning_prompt),
540 ];
541
542 let infer_start = std::time::Instant::now();
543 let plan_inference = backend
544 .infer(&plan_messages, &[], config.temperature)
545 .await?;
546 let infer_latency_ms = infer_start.elapsed().as_secs_f64() * 1000.0;
547
548 if let Some(tracker) = cost_tracker {
550 let model_info = backend.model_info();
551 tracker.record_request_detailed(
552 &model_info.model,
553 plan_inference.token_usage.prompt_tokens as u64,
554 plan_inference.token_usage.completion_tokens as u64,
555 plan_inference.token_usage.cached_tokens.unwrap_or(0) as u64,
556 infer_latency_ms,
557 Some(config.id.0.as_str()),
558 true,
559 );
560 }
561
562 let mut plan = match PlanGenerator::parse_plan(&goal, &plan_inference.content) {
564 Ok(p) => {
565 info!(steps = p.steps.len(), "Plan generated successfully");
566 p
567 }
568 Err(e) => {
569 warn!(error = %e, "Failed to parse plan, returning raw response");
570 let response = Message::new(
572 config.id.clone(),
573 message.from.clone(),
574 Content::Text(plan_inference.content.clone()),
575 );
576 memory.add_message(response.clone()).await?;
577
578 let mut state_write = state.write().await;
579 state_write.status = AgentStatus::Idle;
580 drop(state_write);
581
582 return Ok(vec![response]);
583 }
584 };
585
586 {
588 let mut state_write = state.write().await;
589 state_write.set_plan(plan.clone());
590 state_write.status = AgentStatus::ExecutingPlan;
591 }
592
593 while plan.advance() {
595 let step = plan.current_step().unwrap().clone();
596
597 info!(
598 step = step.step_number,
599 total = plan.steps.len(),
600 description = %step.description,
601 "Executing plan step"
602 );
603
604 let step_ctx = StepExecutionContext::from_step(&step, &plan);
605
606 let mut step_messages = vec![];
608
609 if let Some(sys_prompt) = &config.system_prompt {
610 step_messages.push(LLMMessage::system(sys_prompt));
611 }
612 step_messages.push(LLMMessage::user(&step_ctx.prompt));
613
614 let step_result = Self::execute_step(
616 config,
617 state,
618 memory,
619 tool_registry,
620 backend,
621 executor,
622 metrics,
623 cost_tracker,
624 step_messages,
625 )
626 .await?;
627
628 plan.mark_current_completed(&step_result);
630
631 {
633 let mut state_write = state.write().await;
634 state_write.current_plan = Some(plan.clone());
635 }
636
637 if !config.stop_words.is_empty() {
639 if let Some(stop_word) = check_stop_words(&step_result, &config.stop_words) {
640 info!(stop_word = %stop_word, "Stop word detected during plan execution");
641 let mut state_write = state.write().await;
642 state_write.status = AgentStatus::StoppedByStopWord;
643 break;
644 }
645 }
646
647 if config.planning_mode == PlanningMode::Adaptive && !plan.is_complete() {
649 let replan_prompt = PlanGenerator::create_replan_prompt(&plan, &step_result);
650 let replan_messages = vec![
651 LLMMessage::system(
652 "You are a planning agent. Review progress and adjust the plan if needed.",
653 ),
654 LLMMessage::user(&replan_prompt),
655 ];
656
657 if let Ok(replan_inference) = backend
658 .infer(&replan_messages, &[], config.temperature)
659 .await
660 {
661 if let Ok(modified) =
662 PlanGenerator::apply_replan(&mut plan, &replan_inference.content)
663 {
664 if modified {
665 info!("Plan was modified based on step results");
666 let mut state_write = state.write().await;
667 state_write.current_plan = Some(plan.clone());
668 }
669 }
670 }
671 }
672 }
673
674 plan.completed = true;
676 plan.success = plan.steps.iter().all(|s| s.completed);
677
678 let summary = format!(
679 "Plan completed. Goal: {}\nSteps executed: {}/{}\nSuccess: {}",
680 plan.goal,
681 plan.steps.iter().filter(|s| s.completed).count(),
682 plan.steps.len(),
683 plan.success
684 );
685
686 let response = Message::new(
687 config.id.clone(),
688 message.from.clone(),
689 Content::Text(summary),
690 );
691
692 memory.add_message(response.clone()).await?;
693 responses.push(response);
694
695 {
697 let mut state_write = state.write().await;
698 state_write.status = AgentStatus::Idle;
699 state_write.current_plan = Some(plan);
700 }
701
702 Ok(responses)
703 }
704
705 async fn execute_step(
707 config: &AgentConfig,
708 state: &Arc<RwLock<AgentState>>,
709 memory: &Arc<AgentMemory>,
710 tool_registry: &Arc<ToolRegistry>,
711 backend: &Arc<dyn LLMBackend>,
712 executor: &ToolExecutor,
713 metrics: &Arc<EngineMetrics>,
714 cost_tracker: Option<&Arc<CostTracker>>,
715 mut messages: Vec<LLMMessage>,
716 ) -> Result<String, AgentError> {
717 let tool_schemas = tool_registry.list_schemas();
718 let mut iteration = 0;
719 let max_iterations = config.max_iterations.min(5); loop {
722 iteration += 1;
723 if iteration > max_iterations {
724 return Err(AgentError::MaxIterationsExceeded);
725 }
726
727 let infer_start = std::time::Instant::now();
728 let inference = backend
729 .infer(&messages, &tool_schemas, config.temperature)
730 .await?;
731 let infer_latency_ms = infer_start.elapsed().as_secs_f64() * 1000.0;
732
733 if let Some(tracker) = cost_tracker {
734 let model_info = backend.model_info();
735 tracker.record_request_detailed(
736 &model_info.model,
737 inference.token_usage.prompt_tokens as u64,
738 inference.token_usage.completion_tokens as u64,
739 inference.token_usage.cached_tokens.unwrap_or(0) as u64,
740 infer_latency_ms,
741 Some(config.id.0.as_str()),
742 true,
743 );
744 }
745
746 if let Some(tool_calls) = &inference.tool_calls {
748 if !tool_calls.is_empty() {
749 {
750 let mut state_write = state.write().await;
751 state_write.status = AgentStatus::ExecutingTool;
752 }
753
754 metrics.total_tool_calls.fetch_add(
755 tool_calls.len() as u64,
756 std::sync::atomic::Ordering::Relaxed,
757 );
758
759 let tool_call_msg = Message::new(
761 config.id.clone(),
762 config.id.clone(),
763 Content::ToolCall(tool_calls.clone()),
764 );
765 memory.add_message(tool_call_msg).await?;
766
767 let results = executor
769 .execute_tools(tool_calls, tool_registry, &config.id)
770 .await;
771
772 messages.push(LLMMessage::assistant_with_tools(tool_calls.clone()));
774 for result in &results {
775 let content = if result.success {
776 serde_json::to_string_pretty(&result.data).unwrap_or_default()
777 } else {
778 result.error.clone().unwrap_or_default()
779 };
780 messages.push(LLMMessage::tool(
781 result.call_id.clone(),
782 "tool_result".to_string(),
783 content,
784 ));
785 }
786
787 let result_msg = Message::new(
789 AgentId::new("system"),
790 config.id.clone(),
791 Content::ToolResult(results),
792 );
793 memory.add_message(result_msg).await?;
794
795 {
796 let mut state_write = state.write().await;
797 state_write.status = AgentStatus::ExecutingPlan;
798 }
799
800 continue;
801 }
802 }
803
804 if !inference.content.is_empty() {
806 return Ok(inference.content);
807 }
808
809 return Ok("Step completed without explicit output".to_string());
811 }
812 }
813
814 #[instrument(skip(self), fields(to = %message.to, from = %message.from))]
816 pub fn send_message(&self, message: Message) -> Result<(), AgentError> {
817 if let Some(agent) = self.agents.get(&message.to) {
818 agent
819 .inbox_tx
820 .send(message)
821 .map_err(|e| AgentError::SendError(e.to_string()))?;
822
823 debug!("Message sent successfully");
824 Ok(())
825 } else {
826 warn!("Agent not found");
827 Err(AgentError::AgentNotFound(message.to))
828 }
829 }
830
831 pub fn get_agent(&self, id: &AgentId) -> Option<Arc<AgentRuntime>> {
833 self.agents.get(id).map(|r| r.clone())
834 }
835
836 pub fn agent_count(&self) -> usize {
838 self.agents.len()
839 }
840
841 pub fn metrics(&self) -> EngineMetricsSnapshot {
843 use std::sync::atomic::Ordering;
844 EngineMetricsSnapshot {
845 agents_spawned: self.metrics.agents_spawned.load(Ordering::Relaxed),
846 agents_active: self.metrics.agents_active.load(Ordering::Relaxed),
847 messages_processed: self.metrics.messages_processed.load(Ordering::Relaxed),
848 messages_failed: self.metrics.messages_failed.load(Ordering::Relaxed),
849 total_tool_calls: self.metrics.total_tool_calls.load(Ordering::Relaxed),
850 timeouts: self.metrics.timeouts.load(Ordering::Relaxed),
851 }
852 }
853}
854
855#[derive(Debug, Clone)]
857pub struct EngineMetricsSnapshot {
858 pub agents_spawned: u64,
859 pub agents_active: u64,
860 pub messages_processed: u64,
861 pub messages_failed: u64,
862 pub total_tool_calls: u64,
863 pub timeouts: u64,
864}
865
866impl EngineMetricsSnapshot {
867 pub fn success_rate(&self) -> f64 {
868 let total = self.messages_processed + self.messages_failed;
869 if total > 0 {
870 self.messages_processed as f64 / total as f64
871 } else {
872 1.0
873 }
874 }
875}
876
877impl AgentEngine {
878 #[instrument(skip(self), fields(agent_id = %id))]
880 pub async fn stop_agent(&self, id: &AgentId) -> Result<(), AgentError> {
881 use std::sync::atomic::Ordering;
882
883 if let Some((_, runtime)) = self.agents.remove(id) {
884 if let Some(handle) = runtime.processing_task.lock().take() {
886 handle.abort();
887 }
888 self.metrics.agents_active.fetch_sub(1, Ordering::Relaxed);
889 info!("Agent stopped");
890 Ok(())
891 } else {
892 warn!("Agent not found");
893 Err(AgentError::AgentNotFound(id.clone()))
894 }
895 }
896
897 #[instrument(skip(self))]
899 pub async fn shutdown(&self) {
900 let ids: Vec<AgentId> = self.agents.iter().map(|r| r.key().clone()).collect();
901 info!(agent_count = ids.len(), "Shutting down all agents");
902 for id in ids {
903 let _ = self.stop_agent(&id).await;
904 }
905 }
906}
907
908impl Default for AgentEngine {
909 fn default() -> Self {
910 Self::new()
911 }
912}
913
914impl Drop for AgentEngine {
915 fn drop(&mut self) {
916 debug!("AgentEngine dropped");
918 }
919}