1use async_trait::async_trait;
2use futures::stream::{Stream, StreamExt};
3use parking_lot::RwLock;
4use serde_json::Value;
5use std::collections::HashMap;
6use std::pin::Pin;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::time::Instant;
10use tracing::{debug, error, info, instrument, warn};
11
12use ai_agents_context::{ContextManager, ContextProvider, TemplateRenderer};
13use ai_agents_core::{
14 AgentError, AgentSnapshot, AgentStorage, ChatMessage, FinishReason, LLMProvider, LLMResponse,
15 Result, ToolResult,
16};
17use ai_agents_disambiguation::{
18 DisambiguationConfig, DisambiguationContext, DisambiguationManager, DisambiguationResult,
19};
20use ai_agents_hitl::{
21 ApprovalHandler, ApprovalResult, ApprovalTrigger, HITLCheckResult, HITLEngine,
22 RejectAllHandler, TimeoutAction,
23};
24use ai_agents_hooks::{AgentHooks, NoopHooks};
25use ai_agents_llm::LLMRegistry;
26use ai_agents_memory::{
27 CompressResult, EvictionReason, Memory, MemoryBudgetEvent, MemoryCompressEvent,
28 MemoryEvictEvent, MemoryTokenBudget, OverflowStrategy,
29};
30use ai_agents_process::{ProcessData, ProcessProcessor};
31use ai_agents_reasoning::{
32 CriterionResult, EvaluationResult, Plan, PlanAction, PlanStatus, PlanStep, ReasoningConfig,
33 ReasoningMetadata, ReasoningMode, ReasoningOutput, ReflectionAttempt, ReflectionConfig,
34 ReflectionMetadata, StepFailureAction,
35};
36use ai_agents_recovery::{
37 ByRoleFilter, ContextOverflowAction, FilterConfig, IntoClassifiedError, KeepRecentFilter,
38 LLMFailureAction, MessageFilter, RecoveryManager, RetryConfig, SkipPatternFilter,
39 ToolFailureAction,
40};
41use ai_agents_skills::{SkillDefinition, SkillExecutor, SkillRouter};
42use ai_agents_state::{
43 PromptMode, StateAction, StateMachine, StateMachineSnapshot, StateTransitionEvent, ToolRef,
44 TransitionContext, TransitionEvaluator,
45};
46use ai_agents_storage::{StorageConfig as StorageStorageConfig, create_storage};
47use ai_agents_tools::{
48 ConditionEvaluator, EvaluationContext, LLMGetter, SecurityCheckResult, ToolCallRecord,
49 ToolRegistry, ToolSecurityEngine,
50};
51
52use super::{
53 Agent, AgentInfo, AgentResponse, ParallelToolsConfig, StreamChunk, StreamingConfig, ToolCall,
54};
55use crate::spec::StorageConfig;
56
57enum ToolCallOutcome {
59 Continue,
61 TransitionFired,
63 Rejected(AgentResponse),
65}
66
67enum SkillRouteResult {
69 NoMatch,
71 Response(String),
73 NeedsClarification(AgentResponse),
75}
76
77enum PostLoopResult {
79 NoTransition(String),
81 Transitioned(String),
83 NeedsRedispatch,
86}
87
88pub struct RuntimeAgent {
89 info: AgentInfo,
90 llm_registry: Arc<LLMRegistry>,
91 memory: Arc<dyn Memory>,
92 tools: Arc<ToolRegistry>,
93 skills: Vec<SkillDefinition>,
94 skill_router: Option<SkillRouter>,
95 skill_executor: Option<SkillExecutor>,
96 base_system_prompt: String,
97 max_iterations: u32,
98 iteration_count: RwLock<u32>,
99 max_context_tokens: u32,
100 memory_token_budget: Option<MemoryTokenBudget>,
101 recovery_manager: RecoveryManager,
102 tool_security: ToolSecurityEngine,
103 process_processor: Option<ProcessProcessor>,
104 message_filters: RwLock<HashMap<String, Arc<dyn MessageFilter>>>,
105 state_machine: Option<Arc<StateMachine>>,
106 transition_evaluator: Option<Arc<dyn TransitionEvaluator>>,
107 context_manager: Arc<ContextManager>,
108 template_renderer: TemplateRenderer,
109 tool_call_history: RwLock<Vec<ToolCallRecord>>,
110 parallel_tools: ParallelToolsConfig,
111 streaming: StreamingConfig,
112 hooks: Arc<dyn AgentHooks>,
113 hitl_engine: Option<HITLEngine>,
114 approval_handler: Arc<dyn ApprovalHandler>,
115 storage_config: StorageConfig,
116 storage: RwLock<Option<Arc<dyn AgentStorage>>>,
117 reasoning_config: ReasoningConfig,
118 reflection_config: ReflectionConfig,
119 disambiguation_manager: Option<DisambiguationManager>,
120 persona_manager: Option<Arc<ai_agents_persona::PersonaManager>>,
122 pending_skill_id: RwLock<Option<String>>,
126 current_plan: RwLock<Option<Plan>>,
127 declared_tool_ids: Option<Vec<String>>,
129 context_initialized: AtomicBool,
131 spawner: Option<Arc<crate::spawner::AgentSpawner>>,
133 spawner_registry: Option<Arc<crate::spawner::AgentRegistry>>,
135 redispatch_depth: RwLock<u32>,
138}
139
140impl std::fmt::Debug for RuntimeAgent {
141 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142 f.debug_struct("RuntimeAgent")
143 .field("info", &self.info)
144 .field("base_system_prompt", &self.base_system_prompt)
145 .field("max_iterations", &self.max_iterations)
146 .field("skills_count", &self.skills.len())
147 .field("max_context_tokens", &self.max_context_tokens)
148 .field("has_state_machine", &self.state_machine.is_some())
149 .field("parallel_tools", &self.parallel_tools)
150 .field("streaming", &self.streaming)
151 .field("has_hooks", &true)
152 .field("has_hitl", &self.hitl_engine.is_some())
153 .field("storage_type", &self.storage_config.storage_type())
154 .field("reasoning_mode", &self.reasoning_config.mode)
155 .field("reflection_enabled", &self.reflection_config.enabled)
156 .field("declared_tool_ids", &self.declared_tool_ids)
157 .field("has_persona", &self.persona_manager.is_some())
158 .finish_non_exhaustive()
159 }
160}
161
162struct RegistryLLMGetter {
163 registry: Arc<LLMRegistry>,
164}
165
166impl LLMGetter for RegistryLLMGetter {
167 fn get_llm(&self, alias: &str) -> Option<Arc<dyn LLMProvider>> {
168 self.registry.get(alias).ok()
169 }
170}
171
172impl RuntimeAgent {
173 #[allow(clippy::too_many_arguments)]
174 pub fn new(
175 info: AgentInfo,
176 llm_registry: Arc<LLMRegistry>,
177 memory: Arc<dyn Memory>,
178 tools: Arc<ToolRegistry>,
179 skills: Vec<SkillDefinition>,
180 system_prompt: String,
181 max_iterations: u32,
182 ) -> Self {
183 let (skill_router, skill_executor) = if !skills.is_empty() {
184 let router_llm = llm_registry.router().ok();
185 let router = router_llm.map(|llm| SkillRouter::new(llm, skills.clone()));
186 let executor = SkillExecutor::new(llm_registry.clone(), tools.clone());
187 (router, Some(executor))
188 } else {
189 (None, None)
190 };
191
192 let context_manager =
193 ContextManager::new(HashMap::new(), info.name.clone(), info.version.clone());
194
195 Self {
196 info,
197 llm_registry,
198 memory,
199 tools,
200 skills,
201 skill_router,
202 skill_executor,
203 base_system_prompt: system_prompt,
204 max_iterations,
205 iteration_count: RwLock::new(0),
206 max_context_tokens: 4096,
207 memory_token_budget: None,
208 recovery_manager: RecoveryManager::default(),
209 tool_security: ToolSecurityEngine::default(),
210 process_processor: None,
211 message_filters: RwLock::new(HashMap::new()),
212 state_machine: None,
213 transition_evaluator: None,
214 context_manager: Arc::new(context_manager),
215 template_renderer: TemplateRenderer::new(),
216 tool_call_history: RwLock::new(Vec::new()),
217 parallel_tools: ParallelToolsConfig::default(),
218 streaming: StreamingConfig::default(),
219 hooks: Arc::new(NoopHooks),
220 hitl_engine: None,
221 approval_handler: Arc::new(RejectAllHandler::new()),
222 storage_config: StorageConfig::default(),
223 storage: RwLock::new(None),
224 reasoning_config: ReasoningConfig::default(),
225 reflection_config: ReflectionConfig::default(),
226 disambiguation_manager: None,
227 persona_manager: None,
228 pending_skill_id: RwLock::new(None),
229 current_plan: RwLock::new(None),
230 declared_tool_ids: None,
231 context_initialized: AtomicBool::new(false),
232 spawner: None,
233 spawner_registry: None,
234 redispatch_depth: RwLock::new(0),
235 }
236 }
237
238 pub fn with_declared_tool_ids(mut self, ids: Option<Vec<String>>) -> Self {
239 self.declared_tool_ids = ids;
240 self
241 }
242
243 pub fn with_storage_config(mut self, config: StorageConfig) -> Self {
244 self.storage_config = config;
245 self
246 }
247
248 pub fn with_storage(self, storage: Arc<dyn AgentStorage>) -> Self {
249 *self.storage.write() = Some(storage);
250 self
251 }
252
253 pub fn with_reasoning(mut self, config: ReasoningConfig) -> Self {
254 self.reasoning_config = config;
255 self
256 }
257
258 pub fn with_reflection(mut self, config: ReflectionConfig) -> Self {
259 self.reflection_config = config;
260 self
261 }
262
263 pub fn reasoning_config(&self) -> &ReasoningConfig {
264 &self.reasoning_config
265 }
266
267 pub fn reflection_config(&self) -> &ReflectionConfig {
268 &self.reflection_config
269 }
270
271 pub fn with_persona(mut self, manager: Arc<ai_agents_persona::PersonaManager>) -> Self {
272 self.persona_manager = Some(manager);
273 self
274 }
275
276 pub fn persona_manager(&self) -> Option<&Arc<ai_agents_persona::PersonaManager>> {
277 self.persona_manager.as_ref()
278 }
279
280 pub fn with_disambiguation(mut self, config: DisambiguationConfig) -> Self {
281 if config.is_enabled() {
282 self.disambiguation_manager = Some(DisambiguationManager::new(
283 config,
284 Arc::clone(&self.llm_registry),
285 ));
286 }
287 self
288 }
289
290 pub fn disambiguation_manager(&self) -> Option<&DisambiguationManager> {
291 self.disambiguation_manager.as_ref()
292 }
293
294 pub fn has_disambiguation(&self) -> bool {
295 self.disambiguation_manager
296 .as_ref()
297 .is_some_and(|m| m.is_enabled())
298 }
299
300 pub async fn init_storage(&self) -> Result<()> {
301 if self.storage_config.is_none() {
302 return Ok(());
303 }
304 if self.storage.read().is_some() {
305 return Ok(());
306 }
307 let storage_config = self.convert_storage_config();
308 let storage = create_storage(&storage_config).await?;
309 *self.storage.write() = storage;
310 Ok(())
311 }
312
313 fn convert_storage_config(&self) -> StorageStorageConfig {
314 crate::spec::storage::to_storage_config(&self.storage_config)
315 }
316
317 pub fn storage(&self) -> Option<Arc<dyn AgentStorage>> {
318 self.storage.read().clone()
319 }
320
321 pub fn storage_config(&self) -> &StorageConfig {
322 &self.storage_config
323 }
324
325 pub fn spawner(&self) -> Option<&Arc<crate::spawner::AgentSpawner>> {
327 self.spawner.as_ref()
328 }
329
330 pub fn spawner_registry(&self) -> Option<&Arc<crate::spawner::AgentRegistry>> {
332 self.spawner_registry.as_ref()
333 }
334
335 pub fn has_spawner(&self) -> bool {
336 self.spawner_registry.is_some()
337 }
338
339 pub fn with_spawner_handles(
340 mut self,
341 spawner: Arc<crate::spawner::AgentSpawner>,
342 registry: Arc<crate::spawner::AgentRegistry>,
343 ) -> Self {
344 self.spawner = Some(spawner);
345 self.spawner_registry = Some(registry);
346 self
347 }
348
349 pub fn with_hooks(mut self, hooks: Arc<dyn AgentHooks>) -> Self {
350 self.hooks = hooks;
351 self
352 }
353
354 pub fn with_parallel_tools(mut self, config: ParallelToolsConfig) -> Self {
355 self.parallel_tools = config;
356 self
357 }
358
359 pub fn with_streaming(mut self, config: StreamingConfig) -> Self {
360 self.streaming = config;
361 self
362 }
363
364 pub fn with_hitl(mut self, engine: HITLEngine, handler: Arc<dyn ApprovalHandler>) -> Self {
365 self.hitl_engine = Some(engine);
366 self.approval_handler = handler;
367 self
368 }
369
370 pub fn with_max_context_tokens(mut self, tokens: u32) -> Self {
371 self.max_context_tokens = tokens;
372 self
373 }
374
375 pub fn with_memory_token_budget(mut self, budget: MemoryTokenBudget) -> Self {
376 self.memory_token_budget = Some(budget);
377 self
378 }
379
380 pub fn with_recovery_manager(mut self, manager: RecoveryManager) -> Self {
381 self.recovery_manager = manager;
382 self
383 }
384
385 pub fn with_tool_security(mut self, engine: ToolSecurityEngine) -> Self {
386 self.tool_security = engine;
387 self
388 }
389
390 pub fn with_process_processor(mut self, processor: ProcessProcessor) -> Self {
391 self.process_processor = Some(processor);
392 self
393 }
394
395 pub fn with_state_machine(
396 mut self,
397 state_machine: Arc<StateMachine>,
398 evaluator: Arc<dyn TransitionEvaluator>,
399 ) -> Self {
400 self.state_machine = Some(state_machine);
401 self.transition_evaluator = Some(evaluator);
402 self
403 }
404
405 pub fn with_context_manager(mut self, manager: Arc<ContextManager>) -> Self {
406 self.context_manager = manager;
407 self
408 }
409
410 pub fn register_message_filter(&self, name: impl Into<String>, filter: Arc<dyn MessageFilter>) {
411 self.message_filters.write().insert(name.into(), filter);
412 }
413
414 pub fn set_context(&self, key: &str, value: Value) -> Result<()> {
415 self.context_manager.set(key, value)
416 }
417
418 pub fn update_context(&self, path: &str, value: Value) -> Result<()> {
419 self.context_manager.update(path, value)
420 }
421
422 pub fn get_context(&self) -> HashMap<String, Value> {
423 self.context_manager.get_all()
424 }
425
426 pub fn remove_context(&self, key: &str) -> Option<Value> {
427 self.context_manager.remove(key)
428 }
429
430 pub async fn refresh_context(&self, key: &str) -> Result<()> {
431 self.context_manager.refresh(key).await
432 }
433
434 pub fn register_context_provider(&self, name: &str, provider: Arc<dyn ContextProvider>) {
435 self.context_manager.register_provider(name, provider);
436 }
437
438 pub fn current_state(&self) -> Option<String> {
439 self.state_machine.as_ref().map(|sm| sm.current())
440 }
441
442 pub async fn transition_to(&self, state: &str) -> Result<()> {
443 if let Some(ref sm) = self.state_machine {
444 let from_state = sm.current();
445 self.execute_state_exit_actions(&from_state).await;
446 sm.transition_to(state, "manual transition")?;
447 self.execute_state_enter_actions(state).await;
448 info!(to = %state, "Manual state transition");
449 }
450 Ok(())
451 }
452
453 pub fn state_history(&self) -> Vec<StateTransitionEvent> {
454 self.state_machine
455 .as_ref()
456 .map(|sm| sm.history())
457 .unwrap_or_default()
458 }
459
460 pub async fn save_state(&self) -> Result<AgentSnapshot> {
461 let memory_snapshot = self.memory.snapshot().await?;
462 let state_machine_snapshot = self.state_machine.as_ref().map(|sm| sm.snapshot());
463 let context_snapshot = self.context_manager.snapshot();
464
465 let mut snapshot = AgentSnapshot::new(self.info.id.clone())
466 .with_memory(memory_snapshot)
467 .with_context(context_snapshot)
468 .with_state_machine(
469 state_machine_snapshot.unwrap_or_else(|| StateMachineSnapshot {
470 current_state: String::new(),
471 previous_state: None,
472 turn_count: 0,
473 no_transition_count: 0,
474 history: vec![],
475 }),
476 );
477
478 if let Some(ref persona) = self.persona_manager {
479 snapshot.persona = Some(persona.snapshot_as_value()?);
480 }
481
482 Ok(snapshot)
483 }
484
485 pub async fn save_state_full(&self) -> Result<AgentSnapshot> {
487 let mut snapshot = self.save_state().await?;
488 if let Some(ref registry) = self.spawner_registry {
489 let entries = registry.list_with_specs();
490 if !entries.is_empty() {
491 snapshot = snapshot.with_spawned_agents(entries);
492 }
493 }
494 Ok(snapshot)
495 }
496
497 pub async fn restore_state(&self, snapshot: AgentSnapshot) -> Result<()> {
498 self.memory.restore(snapshot.memory).await?;
499
500 if let (Some(sm), Some(sm_snapshot)) = (&self.state_machine, snapshot.state_machine) {
501 if !sm_snapshot.current_state.is_empty() {
502 sm.restore(sm_snapshot)?;
503 }
504 }
505
506 self.context_manager.restore(snapshot.context);
507
508 if let (Some(persona_value), Some(persona_manager)) =
509 (snapshot.persona, &self.persona_manager)
510 {
511 persona_manager.restore_from_value(persona_value)?;
512 }
513
514 info!(agent_id = %snapshot.agent_id, "State restored");
515 Ok(())
516 }
517
518 pub async fn save_to(&self, storage: &dyn AgentStorage, session_id: &str) -> Result<()> {
519 let snapshot = self.save_state().await?;
520 storage.save(session_id, &snapshot).await
521 }
522
523 pub async fn load_from(&self, storage: &dyn AgentStorage, session_id: &str) -> Result<bool> {
524 if let Some(snapshot) = storage.load(session_id).await? {
525 self.restore_state(snapshot).await?;
526 Ok(true)
527 } else {
528 Ok(false)
529 }
530 }
531
532 pub async fn save_session(&self, session_id: &str) -> Result<()> {
533 let storage = self.storage.read().clone();
534 match storage {
535 Some(s) => self.save_to(s.as_ref(), session_id).await,
536 None => Err(AgentError::Config(
537 "No storage configured. Use with_storage_config() or with_storage() first".into(),
538 )),
539 }
540 }
541
542 pub async fn load_session(&self, session_id: &str) -> Result<bool> {
543 let storage = self.storage.read().clone();
544 match storage {
545 Some(s) => self.load_from(s.as_ref(), session_id).await,
546 None => Err(AgentError::Config(
547 "No storage configured. Use with_storage_config() or with_storage() first".into(),
548 )),
549 }
550 }
551
552 pub async fn delete_session(&self, session_id: &str) -> Result<()> {
553 let storage = self.storage.read().clone();
554 match storage {
555 Some(s) => s.delete(session_id).await,
556 None => Err(AgentError::Config(
557 "No storage configured. Use with_storage_config() or with_storage() first".into(),
558 )),
559 }
560 }
561
562 pub async fn list_sessions(&self) -> Result<Vec<String>> {
563 let storage = self.storage.read().clone();
564 match storage {
565 Some(s) => s.list_sessions().await,
566 None => Err(AgentError::Config(
567 "No storage configured. Use with_storage_config() or with_storage() first".into(),
568 )),
569 }
570 }
571
572 fn estimate_tokens(&self, text: &str) -> u32 {
573 (text.len() as f32 / 4.0).ceil() as u32
574 }
575
576 fn estimate_total_tokens(&self, messages: &[ChatMessage]) -> u32 {
577 messages
578 .iter()
579 .map(|m| self.estimate_tokens(&m.content))
580 .sum()
581 }
582
583 fn truncate_context(&self, messages: &mut Vec<ChatMessage>, keep_recent: usize) {
584 if messages.len() <= keep_recent + 1 {
585 return;
586 }
587 let system_msg = messages.remove(0);
588 let to_remove = messages.len().saturating_sub(keep_recent);
589 messages.drain(..to_remove);
590 messages.insert(0, system_msg);
591 }
592
593 fn get_filter(&self, config: &FilterConfig) -> Arc<dyn MessageFilter> {
594 match config {
595 FilterConfig::KeepRecent(n) => Arc::new(KeepRecentFilter::new(*n)),
596 FilterConfig::ByRole { keep_roles } => Arc::new(ByRoleFilter::new(keep_roles.clone())),
597 FilterConfig::SkipPattern { skip_if_contains } => {
598 Arc::new(SkipPatternFilter::new(skip_if_contains.clone()))
599 }
600 FilterConfig::Custom { name } => {
601 let filters = self.message_filters.read();
602 filters
603 .get(name)
604 .cloned()
605 .unwrap_or_else(|| Arc::new(KeepRecentFilter::new(10)))
606 }
607 }
608 }
609
610 async fn summarize_context(
611 &self,
612 messages: &mut Vec<ChatMessage>,
613 summarizer_llm: Option<&str>,
614 max_summary_tokens: u32,
615 custom_prompt: Option<&str>,
616 keep_recent: usize,
617 filter: Option<&FilterConfig>,
618 ) -> Result<()> {
619 let system_msg = messages.remove(0);
620
621 let to_summarize_count = messages.len().saturating_sub(keep_recent);
622 if to_summarize_count == 0 {
623 messages.insert(0, system_msg);
624 return Ok(());
625 }
626
627 let recent_msgs: Vec<ChatMessage> = messages.drain(to_summarize_count..).collect();
628 let mut to_summarize = std::mem::take(messages);
629
630 if let Some(filter_config) = filter {
631 let filter = self.get_filter(filter_config);
632 to_summarize = filter.filter(to_summarize);
633 }
634
635 if to_summarize.is_empty() {
636 *messages = recent_msgs;
637 messages.insert(0, system_msg);
638 return Ok(());
639 }
640
641 let conversation_text = to_summarize
642 .iter()
643 .map(|m| format!("{:?}: {}", m.role, m.content))
644 .collect::<Vec<_>>()
645 .join("\n");
646
647 let default_prompt = format!(
648 "Summarize the following conversation in under {} tokens, preserving key information:\n\n{}",
649 max_summary_tokens, conversation_text
650 );
651
652 let summary_prompt = custom_prompt
653 .map(|p| format!("{}\n\n{}", p, conversation_text))
654 .unwrap_or(default_prompt);
655
656 let summarizer = if let Some(alias) = summarizer_llm {
657 self.llm_registry
658 .get(alias)
659 .map_err(|e| AgentError::Config(e.to_string()))?
660 } else {
661 self.llm_registry
662 .router()
663 .or_else(|_| self.llm_registry.default())
664 .map_err(|e| AgentError::Config(e.to_string()))?
665 };
666
667 let summary_msgs = vec![ChatMessage::user(&summary_prompt)];
668 let response = summarizer.complete(&summary_msgs, None).await?;
669
670 let summary_message = ChatMessage::system(&format!(
671 "[Previous conversation summary]\n{}",
672 response.content
673 ));
674
675 *messages = vec![system_msg, summary_message];
676 messages.extend(recent_msgs);
677
678 debug!(
679 summarized_count = to_summarize_count,
680 kept_recent = keep_recent,
681 "Context summarized"
682 );
683
684 Ok(())
685 }
686
687 fn render_system_prompt(&self) -> Result<String> {
688 let context = self.context_manager.get_all();
689 self.template_renderer
690 .render(&self.base_system_prompt, &context)
691 }
692
693 async fn get_available_tool_ids(&self) -> Result<Vec<String>> {
694 match self.get_current_tool_refs() {
695 Some(tool_refs) => {
697 if tool_refs.is_empty() {
698 return Ok(Vec::new());
700 }
701
702 let eval_ctx = self.build_evaluation_context().await?;
703 let llm_getter = RegistryLLMGetter {
704 registry: self.llm_registry.clone(),
705 };
706 let evaluator = ConditionEvaluator::new(llm_getter);
707
708 let mut available = Vec::new();
709 for tool_ref in &tool_refs {
710 let tool_id = tool_ref.id();
711
712 if self.tools.get(tool_id).is_none() {
713 continue;
714 }
715
716 if let Some(condition) = tool_ref.condition() {
717 match evaluator.evaluate(condition, &eval_ctx).await {
718 Ok(true) => {
719 available.push(tool_id.to_string());
720 }
721 Ok(false) => {
722 debug!(tool = tool_id, "Tool condition not met, skipping");
723 }
724 Err(e) => {
725 warn!(tool = tool_id, error = %e, "Error evaluating tool condition");
726 }
727 }
728 } else {
729 available.push(tool_id.to_string());
730 }
731 }
732
733 Ok(available)
734 }
735 None => {
737 match &self.declared_tool_ids {
738 Some(ids) if !ids.is_empty() => Ok(ids
740 .iter()
741 .filter(|id| self.tools.get(id).is_some())
742 .cloned()
743 .collect()),
744 Some(_) => Ok(Vec::new()),
746 None => Ok(self.tools.list_ids()),
748 }
749 }
750 }
751 }
752
753 fn get_current_tool_refs(&self) -> Option<Vec<ToolRef>> {
757 if let Some(ref sm) = self.state_machine {
758 if let Some(state_def) = sm.current_definition() {
759 let parent_def = sm.get_parent_definition();
760 if let Some(effective) = state_def.get_effective_tools(parent_def.as_ref()) {
761 return Some(effective.into_iter().cloned().collect());
762 }
763 }
764 }
765 None
766 }
767
768 async fn build_evaluation_context(&self) -> Result<EvaluationContext> {
769 let context = self.context_manager.get_all();
770 let messages = self.memory.get_messages(Some(10)).await?;
771 let tool_history = self.tool_call_history.read().clone();
772
773 let (state_name, turn_count, previous_state) = if let Some(ref sm) = self.state_machine {
774 (Some(sm.current()), sm.turn_count(), sm.previous())
775 } else {
776 (None, 0, None)
777 };
778
779 Ok(EvaluationContext::default()
780 .with_context(context)
781 .with_state(state_name, turn_count, previous_state)
782 .with_called_tools(tool_history)
783 .with_messages(messages))
784 }
785
786 fn record_tool_call(&self, tool_id: &str, result: Value) {
787 self.tool_call_history.write().push(ToolCallRecord {
788 tool_id: tool_id.to_string(),
789 result,
790 timestamp: chrono::Utc::now(),
791 });
792 }
793
794 async fn get_effective_system_prompt(&self) -> Result<String> {
795 let rendered_base = self.render_system_prompt()?;
796
797 let persona_prefix = if let Some(ref persona) = self.persona_manager {
799 let context = self.context_manager.get_all();
800 let render_result = persona.render_prompt(&context)?;
801
802 for content in &render_result.newly_revealed {
804 self.hooks.on_secret_revealed(content).await;
805 }
806
807 render_result.prompt
808 } else {
809 String::new()
810 };
811
812 if let Some(ref sm) = self.state_machine {
813 if let Some(state_def) = sm.current_definition() {
814 let state_prompt = if let Some(ref prompt) = state_def.prompt {
815 let context = self.context_manager.get_all();
816 self.template_renderer.render_with_state(
817 prompt,
818 &context,
819 &sm.current(),
820 sm.previous().as_deref(),
821 sm.turn_count(),
822 state_def.max_turns,
823 )?
824 } else {
825 String::new()
826 };
827
828 let combined = match state_def.prompt_mode {
829 PromptMode::Append => {
830 if state_prompt.is_empty() {
831 rendered_base
832 } else {
833 format!(
834 "{}\n\n[Current State: {}]\n{}",
835 rendered_base,
836 sm.current(),
837 state_prompt
838 )
839 }
840 }
841 PromptMode::Replace => {
842 if state_prompt.is_empty() {
843 rendered_base
844 } else {
845 state_prompt
846 }
847 }
848 PromptMode::Prepend => {
849 if state_prompt.is_empty() {
850 rendered_base
851 } else {
852 format!("{}\n\n{}", state_prompt, rendered_base)
853 }
854 }
855 };
856
857 let with_persona = if persona_prefix.is_empty() {
859 combined
860 } else {
861 format!("{}\n\n{}", persona_prefix, combined)
862 };
863
864 let available_tool_ids = self.get_available_tool_ids().await?;
865 if !available_tool_ids.is_empty() {
868 let tools_prompt = self.tools.generate_filtered_prompt_with_parallel(
869 &available_tool_ids,
870 self.parallel_tools.enabled,
871 );
872 if !tools_prompt.is_empty() {
873 return Ok(format!("{}\n\n{}", with_persona, tools_prompt));
874 }
875 }
876 return Ok(with_persona);
877 }
878 }
879
880 let with_persona = if persona_prefix.is_empty() {
882 rendered_base
883 } else {
884 format!("{}\n\n{}", persona_prefix, rendered_base)
885 };
886
887 let tools_prompt = match &self.declared_tool_ids {
888 Some(ids) if !ids.is_empty() => self
889 .tools
890 .generate_filtered_prompt_with_parallel(ids, self.parallel_tools.enabled),
891 Some(_) => {
892 String::new()
894 }
895 None => {
896 self.tools
898 .generate_tools_prompt_with_parallel(self.parallel_tools.enabled)
899 }
900 };
901 if !tools_prompt.is_empty() {
902 Ok(format!("{}\n\n{}", with_persona, tools_prompt))
903 } else {
904 Ok(with_persona)
905 }
906 }
907
908 fn get_state_llm(&self) -> Result<Arc<dyn LLMProvider>> {
909 if let Some(ref sm) = self.state_machine {
910 if let Some(state_def) = sm.current_definition() {
911 if let Some(ref llm_alias) = state_def.llm {
912 return self
913 .llm_registry
914 .get(llm_alias)
915 .map_err(|e| AgentError::Config(e.to_string()));
916 }
917 }
918 }
919 self.llm_registry
920 .default()
921 .map_err(|e| AgentError::Config(e.to_string()))
922 }
923
924 fn get_effective_reasoning_config(&self) -> ReasoningConfig {
925 if let Some(ref sm) = self.state_machine {
926 if let Some(state_def) = sm.current_definition() {
927 if let Some(ref state_reasoning) = state_def.reasoning {
928 return state_reasoning.clone();
929 }
930 }
931 }
932 self.reasoning_config.clone()
933 }
934
935 fn get_effective_reflection_config(&self) -> ReflectionConfig {
936 if let Some(ref sm) = self.state_machine {
937 if let Some(state_def) = sm.current_definition() {
938 if let Some(ref state_reflection) = state_def.reflection {
939 return state_reflection.clone();
940 }
941 }
942 }
943 self.reflection_config.clone()
944 }
945
946 fn get_skill_reasoning_config(&self, skill: &SkillDefinition) -> ReasoningConfig {
947 skill
948 .reasoning
949 .clone()
950 .unwrap_or_else(|| self.get_effective_reasoning_config())
951 }
952
953 fn get_skill_reflection_config(&self, skill: &SkillDefinition) -> ReflectionConfig {
954 skill
955 .reflection
956 .clone()
957 .unwrap_or_else(|| self.get_effective_reflection_config())
958 }
959
960 async fn build_disambiguation_context(&self) -> Result<DisambiguationContext> {
961 let recent_messages: Vec<String> = self
962 .memory
963 .get_messages(Some(5))
964 .await?
965 .iter()
966 .rev()
967 .map(|m| format!("{:?}: {}", m.role, m.content))
968 .collect();
969
970 let current_state = self.current_state().map(|s| s.to_string());
971
972 let state_prompt: Option<String> = self
975 .state_machine
976 .as_ref()
977 .and_then(|sm| sm.current_definition())
978 .and_then(|def| def.prompt.clone());
979
980 let available_tools: Vec<String> = self
981 .get_available_tool_ids()
982 .await
983 .unwrap_or_else(|_| self.tools.list_ids());
984
985 let available_skills: Vec<String> = self.skills.iter().map(|s| s.id.clone()).collect();
986
987 let user_context = self.context_manager.get_all();
988
989 let available_intents: Vec<String> = if let Some(ref sm) = self.state_machine {
991 sm.current_definition()
992 .map(|def| {
993 def.transitions
994 .iter()
995 .filter_map(|t| t.intent.clone())
996 .collect()
997 })
998 .unwrap_or_default()
999 } else {
1000 Vec::new()
1001 };
1002
1003 Ok(DisambiguationContext::from_agent_state(
1004 recent_messages,
1005 current_state,
1006 state_prompt,
1007 available_tools,
1008 available_skills,
1009 available_intents,
1010 user_context,
1011 ))
1012 }
1013
1014 fn get_available_skills(&self) -> Vec<&SkillDefinition> {
1015 if let Some(ref sm) = self.state_machine {
1016 if let Some(state_def) = sm.current_definition() {
1017 let parent_def = sm.get_parent_definition();
1018 let effective_skills = state_def.get_effective_skills(parent_def.as_ref());
1019 if !effective_skills.is_empty() {
1020 return self
1021 .skills
1022 .iter()
1023 .filter(|s| effective_skills.contains(&&s.id))
1024 .collect();
1025 }
1026 }
1027 }
1028 self.skills.iter().collect()
1029 }
1030
1031 async fn build_messages(&self) -> Result<Vec<ChatMessage>> {
1032 let system_prompt = self.get_effective_system_prompt().await?;
1033 let mut messages = vec![ChatMessage::system(&system_prompt)];
1034
1035 let context = self.memory.get_context().await?;
1040 let history = if let Some(ref budget) = self.memory_token_budget {
1041 context.to_llm_messages_with_allocation(&budget.allocation)
1044 } else {
1045 context.to_llm_messages()
1046 };
1047 messages.extend(history);
1048
1049 let total_tokens = self.estimate_total_tokens(&messages);
1050
1051 if total_tokens > self.max_context_tokens {
1052 debug!(
1053 total = total_tokens,
1054 limit = self.max_context_tokens,
1055 "Context overflow"
1056 );
1057
1058 match &self.recovery_manager.config().llm.on_context_overflow {
1059 ContextOverflowAction::Error => {
1060 return Err(AgentError::LLM(format!(
1061 "Context overflow: {} tokens > {} limit",
1062 total_tokens, self.max_context_tokens
1063 )));
1064 }
1065 ContextOverflowAction::Truncate { keep_recent } => {
1066 self.truncate_context(&mut messages, *keep_recent);
1067 }
1068 ContextOverflowAction::Summarize {
1069 summarizer_llm,
1070 max_summary_tokens,
1071 custom_prompt,
1072 keep_recent,
1073 filter,
1074 } => {
1075 self.summarize_context(
1076 &mut messages,
1077 summarizer_llm.as_deref(),
1078 *max_summary_tokens,
1079 custom_prompt.as_deref(),
1080 *keep_recent,
1081 filter.as_ref(),
1082 )
1083 .await?;
1084 }
1085 }
1086 }
1087
1088 Ok(messages)
1089 }
1090
1091 fn parse_tool_calls(&self, content: &str) -> Option<Vec<ToolCall>> {
1092 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(content) {
1094 if let Some(arr) = parsed.as_array() {
1096 let calls: Vec<ToolCall> = arr
1097 .iter()
1098 .filter_map(|v| self.extract_tool_call_from_value(v))
1099 .collect();
1100 if !calls.is_empty() {
1101 return Some(calls);
1102 }
1103 }
1104 if let Some(tool_call) = self.extract_tool_call_from_value(&parsed) {
1106 return Some(vec![tool_call]);
1107 }
1108 }
1109
1110 if let Some(json_str) = self.extract_json_from_content(content) {
1112 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&json_str) {
1113 if let Some(arr) = parsed.as_array() {
1115 let calls: Vec<ToolCall> = arr
1116 .iter()
1117 .filter_map(|v| self.extract_tool_call_from_value(v))
1118 .collect();
1119 if !calls.is_empty() {
1120 return Some(calls);
1121 }
1122 }
1123 if let Some(tool_call) = self.extract_tool_call_from_value(&parsed) {
1125 return Some(vec![tool_call]);
1126 }
1127 }
1128 }
1129
1130 None
1131 }
1132
1133 fn extract_tool_call_from_value(&self, parsed: &serde_json::Value) -> Option<ToolCall> {
1134 if let Some(tool_name) = parsed.get("tool").and_then(|v| v.as_str()) {
1135 let arguments = parsed
1136 .get("arguments")
1137 .cloned()
1138 .unwrap_or(serde_json::json!({}));
1139 return Some(ToolCall {
1140 id: uuid::Uuid::new_v4().to_string(),
1141 name: tool_name.to_string(),
1142 arguments,
1143 });
1144 }
1145 None
1146 }
1147
1148 fn extract_json_from_content(&self, content: &str) -> Option<String> {
1150 if let Some(result) = self.extract_json_array_from_content(content) {
1152 return Some(result);
1153 }
1154 self.extract_json_object_from_content(content)
1155 }
1156
1157 fn extract_json_array_from_content(&self, content: &str) -> Option<String> {
1159 let start = content.find('[')?;
1160 let content_from_start = &content[start..];
1161
1162 let mut depth = 0;
1163 let mut end = 0;
1164 for (i, ch) in content_from_start.char_indices() {
1165 match ch {
1166 '[' => depth += 1,
1167 ']' => {
1168 depth -= 1;
1169 if depth == 0 {
1170 end = i + 1;
1171 break;
1172 }
1173 }
1174 _ => {}
1175 }
1176 }
1177
1178 if end > 0 {
1179 let json_str = &content_from_start[..end];
1180 if json_str.contains("\"tool\"") {
1182 return Some(json_str.to_string());
1183 }
1184 }
1185
1186 None
1187 }
1188
1189 fn extract_json_object_from_content(&self, content: &str) -> Option<String> {
1191 let start = content.find('{')?;
1192 let content_from_start = &content[start..];
1193
1194 let mut depth = 0;
1196 let mut end = 0;
1197 for (i, ch) in content_from_start.char_indices() {
1198 match ch {
1199 '{' => depth += 1,
1200 '}' => {
1201 depth -= 1;
1202 if depth == 0 {
1203 end = i + 1;
1204 break;
1205 }
1206 }
1207 _ => {}
1208 }
1209 }
1210
1211 if end > 0 {
1212 let json_str = &content_from_start[..end];
1213 if json_str.contains("\"tool\"") {
1215 return Some(json_str.to_string());
1216 }
1217 }
1218
1219 None
1220 }
1221
1222 async fn execute_tool(&self, tool_call: &ToolCall) -> Result<String> {
1223 let tool = self
1224 .tools
1225 .get(&tool_call.name)
1226 .ok_or_else(|| AgentError::Tool(format!("Tool not found: {}", tool_call.name)))?;
1227
1228 let result = tool.execute(tool_call.arguments.clone()).await;
1229
1230 if result.success {
1231 Ok(result.output)
1232 } else {
1233 Err(AgentError::Tool(result.output))
1234 }
1235 }
1236
1237 #[instrument(skip(self, tool_call), fields(tool = %tool_call.name))]
1238 async fn execute_tool_smart(&self, tool_call: &ToolCall) -> Result<String> {
1239 let mut tool_call = tool_call.clone();
1240 info!(args = %tool_call.arguments, "Executing tool");
1241
1242 self.hooks
1243 .on_tool_start(&tool_call.name, &tool_call.arguments)
1244 .await;
1245 let tool_start = Instant::now();
1246
1247 let available_tool_ids = self.get_available_tool_ids().await?;
1248 let resolved_id = self
1253 .tools
1254 .get(&tool_call.name)
1255 .map(|t| t.id().to_lowercase());
1256 let tool_name_lower = tool_call.name.to_lowercase();
1257 if !available_tool_ids.is_empty() {
1258 let is_available = available_tool_ids.iter().any(|id| {
1259 let id_lower = id.to_lowercase();
1260 id_lower == tool_name_lower || resolved_id.as_deref() == Some(&id_lower)
1261 });
1262 if !is_available {
1263 warn!(tool = %tool_call.name, "Tool not available in current context");
1264 return Err(AgentError::Tool(format!(
1265 "Tool '{}' is not available. Available tools: {}",
1266 tool_call.name,
1267 available_tool_ids.join(", ")
1268 )));
1269 }
1270 }
1271
1272 let hitl_lang_ctx = self.build_hitl_language_context();
1274
1275 let hitl_tool_id = resolved_id.as_deref().unwrap_or(&tool_name_lower);
1278
1279 if self.tool_security.config().enabled {
1281 let security_result = self
1282 .tool_security
1283 .check_tool_execution(&tool_call.name, &tool_call.arguments)
1284 .await?;
1285
1286 match security_result {
1287 SecurityCheckResult::Allow => {}
1288 SecurityCheckResult::Block { reason } => {
1289 warn!(reason = %reason, "Tool blocked by security");
1290 return Err(AgentError::Tool(format!("Blocked: {}", reason)));
1291 }
1292 SecurityCheckResult::RequireConfirmation { message } => {
1293 if let Some(ref hitl_engine) = self.hitl_engine {
1295 let check_result = hitl_engine
1296 .check_tool_with_localization(
1297 hitl_tool_id,
1298 &tool_call.arguments,
1299 &hitl_lang_ctx,
1300 self.approval_handler.as_ref(),
1301 Some(&self.llm_registry),
1302 )
1303 .await?;
1304 let result = self.request_hitl_approval(check_result).await?;
1305 match result {
1306 ApprovalResult::Approved | ApprovalResult::Modified { .. } => {}
1307 ApprovalResult::Rejected { reason } => {
1308 let reason_str = reason.as_deref().unwrap_or("rejected");
1309 warn!(tool = %tool_call.name, reason = %reason_str, "Security confirmation rejected by approver");
1310 return Err(AgentError::Tool(format!(
1311 "Confirmation rejected: {}",
1312 message
1313 )));
1314 }
1315 _ => {}
1316 }
1317 } else {
1318 warn!(message = %message, "Tool requires confirmation but no HITL handler");
1319 return Err(AgentError::Tool(format!(
1320 "Confirmation required: {}",
1321 message
1322 )));
1323 }
1324 }
1325 SecurityCheckResult::Warn { message } => {
1326 warn!(message = %message, "Tool security warning");
1327 }
1328 }
1329 }
1330
1331 if let Some(ref hitl_engine) = self.hitl_engine {
1333 let check_result = hitl_engine
1334 .check_tool_with_localization(
1335 hitl_tool_id,
1336 &tool_call.arguments,
1337 &hitl_lang_ctx,
1338 self.approval_handler.as_ref(),
1339 Some(&self.llm_registry),
1340 )
1341 .await?;
1342 if check_result.is_required() {
1343 let result = self.request_hitl_approval(check_result).await?;
1344 match result {
1345 ApprovalResult::Approved => {}
1346 ApprovalResult::Modified { changes } => {
1347 if let Some(obj) = tool_call.arguments.as_object_mut() {
1348 for (k, v) in changes {
1349 obj.insert(k, v);
1350 }
1351 }
1352 info!(tool = %tool_call.name, "Tool arguments modified by approver");
1353 }
1354 ApprovalResult::Rejected { reason: _reason } => {
1355 warn!(tool = %tool_call.name, "Tool execution rejected by HITL");
1356 return Err(AgentError::HITLRejected(format!(
1357 "Tool '{}' was rejected by human approver. Do not retry.",
1358 tool_call.name
1359 )));
1360 }
1361 _ => {}
1362 }
1363 }
1364
1365 let condition_check = hitl_engine
1367 .check_conditions_with_localization(
1368 &tool_call.arguments,
1369 &hitl_lang_ctx,
1370 self.approval_handler.as_ref(),
1371 Some(&self.llm_registry),
1372 )
1373 .await?;
1374 if condition_check.is_required() {
1375 let result = self.request_hitl_approval(condition_check).await?;
1376 match result {
1377 ApprovalResult::Approved => {}
1378 ApprovalResult::Modified { changes } => {
1379 if let Some(obj) = tool_call.arguments.as_object_mut() {
1380 for (k, v) in changes {
1381 obj.insert(k, v);
1382 }
1383 }
1384 info!(tool = %tool_call.name, "Tool arguments modified by approver (condition)");
1385 }
1386 ApprovalResult::Rejected { reason: _reason } => {
1387 warn!(tool = %tool_call.name, "Tool execution rejected by HITL condition");
1388 return Err(AgentError::HITLRejected(format!(
1389 "Tool '{}' was rejected due to policy condition. Do not retry.",
1390 tool_call.name
1391 )));
1392 }
1393 _ => {}
1394 }
1395 }
1396 }
1397
1398 let tool_config = self.recovery_manager.get_tool_config(&tool_call.name);
1399
1400 let result = if tool_config.max_retries > 0 {
1401 let retry_config = RetryConfig {
1402 max_retries: tool_config.max_retries,
1403 ..Default::default()
1404 };
1405
1406 let tool_call_clone = tool_call.clone();
1407 self.recovery_manager
1408 .with_retry(
1409 &format!("tool:{}", tool_call.name),
1410 Some(&retry_config),
1411 || {
1412 let tc = tool_call_clone.clone();
1413 async move { self.execute_tool(&tc).await.map_err(|e| e.classify()) }
1414 },
1415 )
1416 .await
1417 .map_err(|e| AgentError::Tool(e.to_string()))
1418 } else {
1419 self.execute_tool(&tool_call).await
1420 };
1421
1422 let result = match result {
1424 Ok(output) => Ok(output),
1425 Err(e) => match &tool_config.on_failure {
1426 ToolFailureAction::Skip => {
1427 warn!(
1428 tool = %tool_call.name,
1429 error = %e,
1430 "Tool failed, skipping per on_failure: skip policy"
1431 );
1432 Ok(format!(
1433 "{{\"skipped\": true, \"reason\": \"Tool '{}' was skipped after failure\"}}",
1434 tool_call.name
1435 ))
1436 }
1437 ToolFailureAction::Fallback { fallback_tool } => {
1438 warn!(
1439 tool = %tool_call.name,
1440 fallback = %fallback_tool,
1441 error = %e,
1442 "Tool failed, trying fallback tool"
1443 );
1444 let fallback_call = ToolCall {
1445 id: tool_call.id.clone(),
1446 name: fallback_tool.clone(),
1447 arguments: tool_call.arguments.clone(),
1448 };
1449 self.execute_tool(&fallback_call).await
1450 }
1451 ToolFailureAction::ReportError => Err(e),
1452 },
1453 };
1454
1455 let tool_duration_ms = tool_start.elapsed().as_millis() as u64;
1456
1457 match &result {
1458 Ok(output) => {
1459 info!(output_len = output.len(), "Tool execution successful");
1460 let result_value: Value =
1461 serde_json::from_str(output).unwrap_or(Value::String(output.clone()));
1462 self.record_tool_call(&tool_call.name, result_value);
1463
1464 let tool_result = ToolResult {
1465 success: true,
1466 output: output.clone(),
1467 metadata: None,
1468 };
1469 self.hooks
1470 .on_tool_complete(&tool_call.name, &tool_result, tool_duration_ms)
1471 .await;
1472 }
1473 Err(e) => {
1474 error!(error = %e, "Tool execution failed");
1475 self.record_tool_call(&tool_call.name, serde_json::json!({"error": e.to_string()}));
1476
1477 let tool_result = ToolResult {
1478 success: false,
1479 output: e.to_string(),
1480 metadata: None,
1481 };
1482 self.hooks
1483 .on_tool_complete(&tool_call.name, &tool_result, tool_duration_ms)
1484 .await;
1485 self.hooks.on_error(e).await;
1486 }
1487 }
1488
1489 result
1490 }
1491
1492 async fn try_skill_route(&self, input: &str) -> Result<SkillRouteResult> {
1497 if let Some(ref router) = self.skill_router {
1498 let available_skills = self.get_available_skills();
1499 let skill_ids: Vec<&str> = available_skills.iter().map(|s| s.id.as_str()).collect();
1500
1501 if let Some(skill_id) = router.select_skill_filtered(input, &skill_ids).await? {
1502 info!(skill_id = %skill_id, "Skill selected");
1503
1504 let skill = router
1505 .get_skill(&skill_id)
1506 .ok_or_else(|| AgentError::Skill(format!("Skill not found: {}", skill_id)))?;
1507
1508 if let Some(ref skill_disambig) = skill.disambiguation {
1513 if skill_disambig.enabled.unwrap_or(false) {
1514 if let Some(ref disambiguator) = self.disambiguation_manager {
1515 let context = self.build_disambiguation_context().await?;
1516 let state_override = self
1517 .state_machine
1518 .as_ref()
1519 .and_then(|sm| sm.current_definition())
1520 .and_then(|def| def.disambiguation.clone());
1521
1522 match disambiguator
1523 .process_input_with_override(
1524 input,
1525 &context,
1526 state_override.as_ref(),
1527 Some(skill_disambig),
1528 )
1529 .await?
1530 {
1531 DisambiguationResult::Clear => {
1532 debug!(skill_id = %skill_id, "Skill disambiguation: clear");
1533 }
1535 DisambiguationResult::NeedsClarification {
1536 question,
1537 detection,
1538 } => {
1539 info!(
1540 skill_id = %skill_id,
1541 ambiguity_type = ?detection.ambiguity_type,
1542 confidence = detection.confidence,
1543 "Skill requires clarification before execution"
1544 );
1545 *self.pending_skill_id.write() = Some(skill_id.clone());
1548
1549 return Ok(SkillRouteResult::NeedsClarification(
1550 AgentResponse::new(&question.question).with_metadata(
1551 "disambiguation",
1552 serde_json::json!({
1553 "status": "awaiting_clarification",
1554 "skill_id": skill_id,
1555 "options": question.options,
1556 "clarifying": question.clarifying,
1557 "detection": {
1558 "type": detection.ambiguity_type,
1559 "confidence": detection.confidence,
1560 "what_is_unclear": detection.what_is_unclear,
1561 }
1562 }),
1563 ),
1564 ));
1565 }
1566 DisambiguationResult::Clarified { enriched_input, .. } => {
1567 info!(
1568 skill_id = %skill_id,
1569 enriched = %enriched_input,
1570 "Skill disambiguation clarified, executing with enriched input"
1571 );
1572 return Ok(SkillRouteResult::Response(
1573 self.execute_skill(skill, &enriched_input).await?,
1574 ));
1575 }
1576 DisambiguationResult::ProceedWithBestGuess { enriched_input } => {
1577 info!(skill_id = %skill_id, "Skill disambiguation: proceeding with best guess");
1578 return Ok(SkillRouteResult::Response(
1579 self.execute_skill(skill, &enriched_input).await?,
1580 ));
1581 }
1582 DisambiguationResult::GiveUp { reason } => {
1583 warn!(skill_id = %skill_id, reason = %reason, "Skill disambiguation gave up");
1584 let apology = self
1585 .generate_localized_apology(
1586 "Generate a brief, polite apology saying you couldn't understand the request. Be concise.",
1587 &reason,
1588 )
1589 .await
1590 .unwrap_or_else(|_| {
1591 format!("I'm sorry, I couldn't understand your request: {}", reason)
1592 });
1593 return Ok(SkillRouteResult::NeedsClarification(
1594 AgentResponse::new(&apology),
1595 ));
1596 }
1597 DisambiguationResult::Escalate { reason } => {
1598 info!(skill_id = %skill_id, reason = %reason, "Skill disambiguation escalating");
1599 let apology = self
1600 .generate_localized_apology(
1601 "Explain briefly that you're transferring the user to a human agent for help.",
1602 &reason,
1603 )
1604 .await
1605 .unwrap_or_else(|_| {
1606 format!("I need human assistance to help with your request: {}", reason)
1607 });
1608 return Ok(SkillRouteResult::NeedsClarification(
1609 AgentResponse::new(&apology),
1610 ));
1611 }
1612 DisambiguationResult::Abandoned { .. } => {
1613 debug!(skill_id = %skill_id, "Skill disambiguation abandoned");
1616 return Ok(SkillRouteResult::NoMatch);
1617 }
1618 }
1619 }
1620 }
1621 }
1622
1623 return Ok(SkillRouteResult::Response(
1624 self.execute_skill(skill, input).await?,
1625 ));
1626 }
1627 }
1628 Ok(SkillRouteResult::NoMatch)
1629 }
1630
1631 async fn execute_skill(&self, skill: &SkillDefinition, input: &str) -> Result<String> {
1633 if let Some(ref executor) = self.skill_executor {
1634 let skill_reasoning = self.get_skill_reasoning_config(skill);
1635 let skill_reflection = self.get_skill_reflection_config(skill);
1636
1637 debug!(
1638 skill_id = %skill.id,
1639 reasoning_mode = ?skill_reasoning.mode,
1640 reflection_enabled = ?skill_reflection.enabled,
1641 "Skill reasoning/reflection config"
1642 );
1643
1644 let response = executor
1645 .execute(skill, input, serde_json::json!({}))
1646 .await?;
1647
1648 if skill_reflection.requires_evaluation() && skill_reflection.is_enabled() {
1649 let should_reflect = self
1650 .should_reflect_with_config(input, &response, &skill_reflection)
1651 .await?;
1652 if should_reflect {
1653 let evaluated = self
1654 .evaluate_and_retry_with_config(input, response, &skill_reflection)
1655 .await?;
1656 return Ok(evaluated);
1657 }
1658 }
1659
1660 return Ok(response);
1661 }
1662 Err(AgentError::Skill(
1663 "No skill executor configured".to_string(),
1664 ))
1665 }
1666
1667 async fn execute_skill_by_id(&self, skill_id: &str, input: &str) -> Result<String> {
1670 let skill = self
1671 .skill_router
1672 .as_ref()
1673 .and_then(|r| r.get_skill(skill_id).cloned())
1674 .ok_or_else(|| AgentError::Skill(format!("Skill not found: {}", skill_id)))?;
1675 self.execute_skill(&skill, input).await
1676 }
1677
1678 async fn should_reflect_with_config(
1679 &self,
1680 input: &str,
1681 response: &str,
1682 config: &ReflectionConfig,
1683 ) -> Result<bool> {
1684 if !config.requires_evaluation() {
1685 return Ok(false);
1686 }
1687
1688 if config.is_enabled() {
1689 return Ok(true);
1690 }
1691
1692 let evaluator_llm = config
1693 .evaluator_llm
1694 .as_ref()
1695 .and_then(|alias| self.llm_registry.get(alias).ok())
1696 .or_else(|| self.llm_registry.router().ok())
1697 .or_else(|| self.llm_registry.default().ok());
1698
1699 let Some(llm) = evaluator_llm else {
1700 return Ok(false);
1701 };
1702
1703 let prompt = format!(
1704 r#"Should this response be evaluated for quality? Consider if it's a complex or important response.
1705
1706User query: "{}"
1707Response: "{}"
1708
1709Answer YES or NO only."#,
1710 input,
1711 &response[..response.len().min(500)]
1712 );
1713
1714 let messages = vec![ChatMessage::user(&prompt)];
1715 let result = llm.complete(&messages, None).await;
1716
1717 match result {
1718 Ok(resp) => Ok(resp.content.trim().to_uppercase().contains("YES")),
1719 Err(_) => Ok(false),
1720 }
1721 }
1722
1723 async fn evaluate_and_retry_with_config(
1724 &self,
1725 input: &str,
1726 mut response: String,
1727 config: &ReflectionConfig,
1728 ) -> Result<String> {
1729 let llm = self.get_state_llm()?;
1730 let mut attempts = 0u32;
1731 let max_retries = config.max_retries;
1732
1733 loop {
1734 let evaluation = self
1735 .evaluate_response_with_config(input, &response, config)
1736 .await?;
1737
1738 if evaluation.passed || attempts >= max_retries {
1739 info!(
1740 passed = evaluation.passed,
1741 confidence = evaluation.confidence,
1742 attempts = attempts + 1,
1743 "Skill reflection evaluation complete"
1744 );
1745 return Ok(response);
1746 }
1747
1748 debug!(
1749 attempt = attempts + 1,
1750 failed_criteria = evaluation.failed_criteria().count(),
1751 "Skill response did not meet criteria, retrying"
1752 );
1753
1754 let feedback: Vec<String> = evaluation
1755 .failed_criteria()
1756 .map(|c| format!("- {}", c.criterion))
1757 .collect();
1758
1759 let retry_prompt = format!(
1760 "Your previous response did not meet these criteria:\n{}\n\nPlease provide an improved response to: {}",
1761 feedback.join("\n"),
1762 input
1763 );
1764
1765 let messages = vec![ChatMessage::user(&retry_prompt)];
1766 let retry_response = llm
1767 .complete(&messages, None)
1768 .await
1769 .map_err(|e| AgentError::LLM(e.to_string()))?;
1770
1771 response = retry_response.content.trim().to_string();
1772 attempts += 1;
1773 }
1774 }
1775
1776 async fn evaluate_response_with_config(
1777 &self,
1778 input: &str,
1779 response: &str,
1780 config: &ReflectionConfig,
1781 ) -> Result<EvaluationResult> {
1782 let evaluator_llm = config
1783 .evaluator_llm
1784 .as_ref()
1785 .and_then(|alias| self.llm_registry.get(alias).ok())
1786 .or_else(|| self.llm_registry.router().ok())
1787 .or_else(|| self.llm_registry.default().ok())
1788 .ok_or_else(|| AgentError::Config("No LLM available for evaluation".into()))?;
1789
1790 let criteria = &config.criteria;
1791 let criteria_list = criteria
1792 .iter()
1793 .enumerate()
1794 .map(|(i, c)| format!("{}. {}", i + 1, c))
1795 .collect::<Vec<_>>()
1796 .join("\n");
1797
1798 let prompt = format!(
1799 r#"Evaluate this response against the criteria.
1800
1801User query: "{}"
1802
1803Response to evaluate: "{}"
1804
1805Criteria:
1806{}
1807
1808For each criterion, respond with:
1809- criterion number
1810- PASS or FAIL
1811- brief reason
1812
1813Then provide overall confidence (0.0 to 1.0) and whether it passes overall.
1814
1815Format:
18161. PASS/FAIL - reason
18172. PASS/FAIL - reason
1818...
1819CONFIDENCE: 0.X
1820OVERALL: PASS/FAIL"#,
1821 input, response, criteria_list
1822 );
1823
1824 let messages = vec![ChatMessage::user(&prompt)];
1825 let eval_response = evaluator_llm
1826 .complete(&messages, None)
1827 .await
1828 .map_err(|e| AgentError::LLM(format!("Evaluation failed: {}", e)))?;
1829
1830 let content = eval_response.content.to_uppercase();
1831 let llm_pass = content.contains("OVERALL: PASS");
1832
1833 let confidence = content
1834 .lines()
1835 .find(|l| l.contains("CONFIDENCE:"))
1836 .and_then(|l| {
1837 l.split(':')
1838 .nth(1)
1839 .and_then(|v| v.trim().parse::<f32>().ok())
1840 })
1841 .unwrap_or(if llm_pass { 0.8 } else { 0.4 });
1842
1843 let overall_pass = llm_pass && confidence >= config.pass_threshold;
1846
1847 let mut criteria_results = Vec::new();
1848 for (i, criterion) in criteria.iter().enumerate() {
1849 let line_marker = format!("{}.", i + 1);
1850 let passed = eval_response
1851 .content
1852 .lines()
1853 .find(|l| l.contains(&line_marker))
1854 .map(|l| l.to_uppercase().contains("PASS"))
1855 .unwrap_or(overall_pass);
1856
1857 if passed {
1858 criteria_results.push(CriterionResult::pass(criterion));
1859 } else {
1860 criteria_results.push(CriterionResult::fail(criterion, "Did not meet criterion"));
1861 }
1862 }
1863
1864 Ok(EvaluationResult::new(overall_pass, confidence).with_criteria(criteria_results))
1865 }
1866
1867 async fn process_input(&self, input: &str) -> Result<ProcessData> {
1869 if let Some(processor) = self.get_state_process_processor() {
1870 return processor.process_input(input).await;
1871 }
1872 if let Some(ref processor) = self.process_processor {
1873 processor.process_input(input).await
1874 } else {
1875 Ok(ProcessData::new(input))
1876 }
1877 }
1878
1879 async fn process_output(
1881 &self,
1882 output: &str,
1883 input_context: &std::collections::HashMap<String, serde_json::Value>,
1884 ) -> Result<ProcessData> {
1885 if let Some(processor) = self.get_state_process_processor() {
1886 return processor.process_output(output, input_context).await;
1887 }
1888 if let Some(ref processor) = self.process_processor {
1889 processor.process_output(output, input_context).await
1890 } else {
1891 Ok(ProcessData::new(output))
1892 }
1893 }
1894
1895 fn get_state_process_processor(&self) -> Option<ProcessProcessor> {
1897 let sm = self.state_machine.as_ref()?;
1898 let def = sm.current_definition()?;
1899 let config = def.process.as_ref()?;
1900 let mut processor = ProcessProcessor::new(config.clone());
1901 if let Some(ref registry) = Some(self.llm_registry.clone()) {
1902 processor = processor.with_llm_registry(registry.clone());
1903 }
1904 Some(processor)
1905 }
1906
1907 async fn check_turn_timeout(&self) -> Result<()> {
1908 if let Some(ref sm) = self.state_machine {
1909 if let Some(timeout_state) = sm.check_timeout() {
1910 let from_state = sm.current();
1911 self.execute_state_exit_actions(&from_state).await;
1912 sm.transition_to(&timeout_state, "max_turns exceeded")?;
1913 self.execute_state_enter_actions(&timeout_state).await;
1914 info!(to = %timeout_state, "Timeout transition");
1915 }
1916 }
1917 Ok(())
1918 }
1919
1920 fn increment_turn(&self) {
1921 if let Some(ref sm) = self.state_machine {
1922 sm.increment_turn();
1923 }
1924 }
1925
1926 async fn evaluate_transitions(&self, user_message: &str, response: &str) -> Result<bool> {
1927 let (transitions, evaluator, current_state) =
1928 match (&self.state_machine, &self.transition_evaluator) {
1929 (Some(sm), Some(eval)) => {
1930 let auto_transitions: Vec<_> = sm
1931 .auto_transitions()
1932 .into_iter()
1933 .filter(|t| {
1934 match t.cooldown_turns {
1936 Some(cd) if cd > 0 => {
1937 let resolved =
1938 sm.config().resolve_full_path(&sm.current(), &t.to);
1939 !sm.is_on_cooldown(&resolved, cd)
1940 }
1941 _ => true,
1942 }
1943 })
1944 .collect();
1945 if auto_transitions.is_empty() {
1946 return Ok(false);
1947 }
1948 (auto_transitions, eval, sm.current())
1949 }
1950 _ => return Ok(false),
1951 };
1952
1953 let context_map = self.context_manager.get_all();
1954 let context = TransitionContext::new(user_message, response, ¤t_state)
1955 .with_context(context_map);
1956
1957 if let Some(index) = evaluator.select_transition(&transitions, &context).await? {
1958 let target = transitions[index].to.clone();
1959 let reason = if transitions[index].when.is_empty() {
1960 "guard condition met".to_string()
1961 } else {
1962 transitions[index].when.clone()
1963 };
1964
1965 if let Some(ref sm) = self.state_machine {
1966 let approved = self
1968 .check_state_hitl(Some(&context.current_state), &target)
1969 .await?;
1970 if !approved {
1971 info!(to = %target, "State transition rejected by HITL");
1972 return Ok(false);
1973 }
1974
1975 self.execute_state_exit_actions(&context.current_state)
1977 .await;
1978
1979 sm.transition_to(&target, &reason)?;
1980 sm.reset_no_transition();
1981
1982 self.execute_state_enter_actions(&target).await;
1984
1985 self.hooks
1986 .on_state_transition(Some(&context.current_state), &target, &reason)
1987 .await;
1988 info!(from = %context.current_state, to = %target, "State transition");
1989 }
1990 return Ok(true);
1991 }
1992
1993 if let Some(ref sm) = self.state_machine {
1994 sm.increment_no_transition();
1995 if let Some(fallback) = sm.check_fallback() {
1996 let from_state = current_state.clone();
1997
1998 let approved = self.check_state_hitl(Some(&from_state), &fallback).await?;
2000 if !approved {
2001 info!(to = %fallback, "Fallback transition rejected by HITL");
2002 return Ok(false);
2003 }
2004
2005 self.execute_state_exit_actions(&from_state).await;
2007
2008 sm.transition_to(&fallback, "fallback after no transitions")?;
2009
2010 self.execute_state_enter_actions(&fallback).await;
2012
2013 self.hooks
2014 .on_state_transition(
2015 Some(&from_state),
2016 &fallback,
2017 "fallback after no transitions",
2018 )
2019 .await;
2020 info!(to = %fallback, "Fallback transition");
2021 return Ok(true);
2022 }
2023 }
2024
2025 Ok(false)
2026 }
2027
2028 async fn execute_state_exit_actions(&self, state_path: &str) {
2030 if let Some(ref sm) = self.state_machine {
2031 if let Some(def) = sm.get_definition(state_path) {
2032 if !def.on_exit.is_empty() {
2033 debug!(state = %state_path, count = def.on_exit.len(), "Executing on_exit actions");
2034 self.execute_state_actions(&def.on_exit).await;
2035 }
2036 }
2037 }
2038 }
2039
2040 async fn execute_state_enter_actions(&self, state_path: &str) {
2042 if let Some(ref sm) = self.state_machine {
2043 if let Some(def) = sm.get_definition(state_path) {
2044 let is_reentry = sm.history().iter().any(|e| e.to == state_path);
2046
2047 if is_reentry && !def.on_reenter.is_empty() {
2048 debug!(state = %state_path, count = def.on_reenter.len(), "Executing on_reenter actions");
2049 self.execute_state_actions(&def.on_reenter).await;
2050 } else if !def.on_enter.is_empty() {
2051 debug!(state = %state_path, count = def.on_enter.len(), "Executing on_enter actions");
2052 self.execute_state_actions(&def.on_enter).await;
2053 }
2054 }
2055 }
2056 }
2057
2058 async fn execute_state_actions(&self, actions: &[StateAction]) {
2060 for action in actions {
2061 match action {
2062 StateAction::Tool { tool, args } => {
2063 let raw_args = args.clone().unwrap_or(Value::Object(Default::default()));
2064 let args_value = self.render_action_args(&raw_args);
2066 if let Some(t) = self.tools.get(tool) {
2067 self.hooks.on_tool_start(tool, &args_value).await;
2068 let start = Instant::now();
2069 let result = t.execute(args_value).await;
2070 let duration_ms = start.elapsed().as_millis() as u64;
2071 self.hooks
2072 .on_tool_complete(tool, &result, duration_ms)
2073 .await;
2074 if result.success {
2075 debug!(tool = %tool, "State action: tool executed");
2076 let context_key = format!("last_tool_result");
2078 let _ = self
2079 .context_manager
2080 .set(&context_key, serde_json::Value::String(result.output));
2081 } else {
2082 warn!(tool = %tool, error = %result.output, "State action: tool failed");
2083 }
2084 } else {
2085 warn!(tool = %tool, "State action: tool not found");
2086 }
2087 }
2088 StateAction::Skill { skill } => {
2089 if let Some(ref executor) = self.skill_executor {
2090 if let Some(def) = self.skills.iter().find(|s| s.id == *skill) {
2091 match executor.execute(def, "", serde_json::json!({})).await {
2092 Ok(_) => debug!(skill = %skill, "State action: skill executed"),
2093 Err(e) => {
2094 warn!(skill = %skill, error = %e, "State action: skill failed")
2095 }
2096 }
2097 } else {
2098 warn!(skill = %skill, "State action: skill not found");
2099 }
2100 }
2101 }
2102 StateAction::SetContext { set_context } => {
2103 for (key, value) in set_context {
2104 if let Err(e) = self.context_manager.set(key, value.clone()) {
2105 warn!(key = %key, error = %e, "State action: set_context failed");
2106 } else {
2107 debug!(key = %key, "State action: context set");
2108 }
2109 }
2110 }
2111 StateAction::Prompt {
2112 prompt,
2113 llm,
2114 store_as,
2115 } => {
2116 let llm_result = if let Some(alias) = llm {
2117 self.llm_registry.get(alias)
2118 } else {
2119 self.llm_registry.default()
2120 };
2121 match llm_result {
2122 Ok(llm_provider) => {
2123 let context = self.context_manager.get_all();
2125 let rendered_prompt = self
2126 .template_renderer
2127 .render(prompt, &context)
2128 .unwrap_or_else(|_| prompt.clone());
2129 let recent =
2130 self.memory.get_messages(Some(5)).await.unwrap_or_default();
2131 let mut messages: Vec<ChatMessage> = recent;
2132 messages.push(ChatMessage::user(&rendered_prompt));
2133 match llm_provider.complete(&messages, None).await {
2134 Ok(response) => {
2135 if let Some(key) = store_as {
2136 let _ = self
2137 .context_manager
2138 .set(key, Value::String(response.content));
2139 debug!(key = %key, "State action: prompt result stored");
2140 }
2141 }
2142 Err(e) => {
2143 warn!(error = %e, "State action: prompt LLM call failed");
2144 }
2145 }
2146 }
2147 Err(e) => {
2148 warn!(error = %e, "State action: LLM not found for prompt");
2149 }
2150 }
2151 }
2152 }
2153 }
2154 }
2155
2156 async fn run_context_extractors(&self, user_message: &str) {
2158 let extractors = match &self.state_machine {
2159 Some(sm) => match sm.current_definition() {
2160 Some(def) if !def.extract.is_empty() => def.extract.clone(),
2161 _ => return,
2162 },
2163 None => return,
2164 };
2165
2166 for extractor in &extractors {
2167 let prompt = if let Some(ref custom) = extractor.llm_extract {
2168 format!(
2169 "User message:\n\"{}\"\n\nInstruction:\n{}",
2170 user_message, custom
2171 )
2172 } else if let Some(ref desc) = extractor.description {
2173 format!(
2174 "From the following message, extract: {}\n\n\
2175 Message: \"{}\"\n\n\
2176 If the information is present, return ONLY the extracted value.\n\
2177 If NOT present, return exactly: __NONE__",
2178 desc, user_message
2179 )
2180 } else {
2181 continue;
2182 };
2183
2184 let llm = match self
2185 .llm_registry
2186 .get(&extractor.llm)
2187 .or_else(|_| self.llm_registry.get("router"))
2188 .or_else(|_| self.llm_registry.get("default"))
2189 {
2190 Ok(llm) => llm,
2191 Err(e) => {
2192 warn!(key = %extractor.key, error = %e, "Extractor LLM not found");
2193 continue;
2194 }
2195 };
2196
2197 let messages = vec![ChatMessage::user(&prompt)];
2198 match llm.complete(&messages, None).await {
2199 Ok(response) => {
2200 let value = response.content.trim().to_string();
2201 if value != "__NONE__" && !value.is_empty() {
2202 let _ = self
2203 .context_manager
2204 .update(&extractor.key, serde_json::Value::String(value.clone()));
2205 debug!(key = %extractor.key, value = %value, "Context extracted");
2206 } else if extractor.required {
2207 warn!(key = %extractor.key, "Required extraction returned no value");
2208 }
2209 }
2210 Err(e) => {
2211 warn!(key = %extractor.key, error = %e, "Context extraction LLM call failed");
2212 }
2213 }
2214 }
2215 }
2216
2217 async fn check_memory_compression(&self) -> Result<()> {
2218 if self.memory.needs_compression() {
2219 let result = self.memory.compress(None).await?;
2220 if let CompressResult::Compressed {
2221 messages_summarized,
2222 new_summary_length,
2223 tokens_saved,
2224 } = result
2225 {
2226 let event = MemoryCompressEvent::new(
2227 messages_summarized,
2228 tokens_saved,
2229 new_summary_length as u32,
2230 );
2231 self.hooks.on_memory_compress(&event).await;
2232 debug!(
2233 messages = messages_summarized,
2234 tokens_saved = tokens_saved,
2235 "Memory compressed"
2236 );
2237 }
2238 }
2239
2240 self.handle_memory_overflow().await?;
2242 self.check_memory_budget().await;
2243
2244 Ok(())
2245 }
2246
2247 async fn check_memory_budget(&self) {
2248 let Some(ref budget) = self.memory_token_budget else {
2249 return;
2250 };
2251
2252 let context = match self.memory.get_context().await {
2253 Ok(ctx) => ctx,
2254 Err(_) => return,
2255 };
2256
2257 let used_tokens = context.estimated_tokens();
2259 if budget.is_over_warn_threshold(used_tokens) {
2260 let event = MemoryBudgetEvent::new("memory", used_tokens, budget.total);
2261 self.hooks.on_memory_budget_warning(&event).await;
2262 debug!(
2263 used = used_tokens,
2264 total = budget.total,
2265 percent = event.usage_percent,
2266 "Memory budget warning"
2267 );
2268 }
2269
2270 if let Some(ref summary) = context.summary {
2272 let summary_tokens = ai_agents_memory::estimate_tokens(summary);
2273 let summary_budget = budget.allocation.summary;
2274 if summary_budget > 0 {
2275 let warn_threshold =
2276 (summary_budget as f64 * budget.warn_at_percent as f64 / 100.0) as u32;
2277 if summary_tokens >= warn_threshold {
2278 let event = MemoryBudgetEvent::new("summary", summary_tokens, summary_budget);
2279 self.hooks.on_memory_budget_warning(&event).await;
2280 }
2281 }
2282 }
2283
2284 let recent_tokens: u32 = context
2286 .messages
2287 .iter()
2288 .map(ai_agents_memory::estimate_message_tokens)
2289 .sum();
2290 let recent_budget = budget.allocation.recent_messages;
2291 if recent_budget > 0 {
2292 let warn_threshold =
2293 (recent_budget as f64 * budget.warn_at_percent as f64 / 100.0) as u32;
2294 if recent_tokens >= warn_threshold {
2295 let event = MemoryBudgetEvent::new("recent_messages", recent_tokens, recent_budget);
2296 self.hooks.on_memory_budget_warning(&event).await;
2297 }
2298 }
2299 }
2300
2301 async fn handle_memory_overflow(&self) -> Result<()> {
2302 let Some(ref budget) = self.memory_token_budget else {
2303 return Ok(());
2304 };
2305
2306 let context = self.memory.get_context().await?;
2307 let used_tokens = context.estimated_tokens();
2308
2309 if used_tokens <= budget.total {
2310 return Ok(());
2311 }
2312
2313 match budget.overflow_strategy {
2314 OverflowStrategy::TruncateOldest => {
2315 let tokens_to_free = used_tokens - budget.total;
2316 let messages_to_evict = self.calculate_eviction_count(tokens_to_free);
2317 if messages_to_evict > 0 {
2318 self.evict_messages(messages_to_evict, EvictionReason::TokenBudgetExceeded)
2319 .await?;
2320 }
2321 }
2322 OverflowStrategy::SummarizeMore => {
2323 self.memory.compress(None).await?;
2324 }
2325 OverflowStrategy::Error => {
2326 return Err(AgentError::MemoryBudgetExceeded {
2327 used: used_tokens,
2328 budget: budget.total,
2329 });
2330 }
2331 }
2332 Ok(())
2333 }
2334
2335 fn calculate_eviction_count(&self, tokens_to_free: u32) -> usize {
2336 ((tokens_to_free as f64 / 50.0).ceil() as usize).max(1)
2338 }
2339
2340 async fn evict_messages(&self, count: usize, reason: EvictionReason) -> Result<()> {
2341 let evicted = self.memory.evict_oldest(count).await?;
2342 if !evicted.is_empty() {
2343 let event = MemoryEvictEvent {
2344 reason,
2345 messages_evicted: evicted.len(),
2346 importance_scores: vec![],
2347 };
2348 self.hooks.on_memory_evict(&event).await;
2349 debug!(count = evicted.len(), "Messages evicted from memory");
2350 }
2351 Ok(())
2352 }
2353
2354 #[instrument(skip(self, input), fields(agent = %self.info.name))]
2355 async fn determine_reasoning_mode(&self, input: &str) -> Result<ReasoningMode> {
2356 let effective_config = self.get_effective_reasoning_config();
2357
2358 if !matches!(effective_config.mode, ReasoningMode::Auto) {
2359 return Ok(effective_config.mode.clone());
2360 }
2361
2362 let judge_llm = effective_config
2363 .judge_llm
2364 .as_ref()
2365 .and_then(|alias| self.llm_registry.get(alias).ok())
2366 .or_else(|| self.llm_registry.router().ok())
2367 .or_else(|| self.llm_registry.default().ok());
2368
2369 let Some(llm) = judge_llm else {
2370 return Ok(ReasoningMode::None);
2371 };
2372
2373 let prompt = format!(
2374 r#"Analyze this user request and determine the appropriate reasoning mode.
2375
2376User request: "{}"
2377
2378Choose ONE of these modes:
2379- none: Simple queries, greetings, direct answers (fastest)
2380- cot: Complex analysis, multi-step reasoning, math problems
2381- react: Tasks requiring multiple tool calls with observation
2382- plan_and_execute: Complex multi-step tasks requiring coordination
2383
2384Respond with ONLY the mode name (none, cot, react, or plan_and_execute)."#,
2385 input
2386 );
2387
2388 let messages = vec![ChatMessage::user(&prompt)];
2389 let response = llm.complete(&messages, None).await;
2390
2391 match response {
2392 Ok(resp) => {
2393 let mode_str = resp.content.trim().to_lowercase();
2394 Ok(match mode_str.as_str() {
2395 "cot" => ReasoningMode::CoT,
2396 "react" => ReasoningMode::React,
2397 "plan_and_execute" => ReasoningMode::PlanAndExecute,
2398 _ => ReasoningMode::None,
2399 })
2400 }
2401 Err(_) => Ok(ReasoningMode::None),
2402 }
2403 }
2404
2405 async fn should_reflect(&self, input: &str, response: &str) -> Result<bool> {
2406 let effective_config = self.get_effective_reflection_config();
2407
2408 if !effective_config.requires_evaluation() {
2409 return Ok(false);
2410 }
2411
2412 if effective_config.is_enabled() {
2413 return Ok(true);
2414 }
2415
2416 let evaluator_llm = effective_config
2417 .evaluator_llm
2418 .as_ref()
2419 .and_then(|alias| self.llm_registry.get(alias).ok())
2420 .or_else(|| self.llm_registry.router().ok())
2421 .or_else(|| self.llm_registry.default().ok());
2422
2423 let Some(llm) = evaluator_llm else {
2424 return Ok(false);
2425 };
2426
2427 let prompt = format!(
2428 r#"Should this response be evaluated for quality? Consider if it's a complex or important response.
2429
2430User query: "{}"
2431Response: "{}"
2432
2433Answer YES or NO only."#,
2434 input,
2435 &response[..response.len().min(500)]
2436 );
2437
2438 let messages = vec![ChatMessage::user(&prompt)];
2439 let result = llm.complete(&messages, None).await;
2440
2441 match result {
2442 Ok(resp) => Ok(resp.content.trim().to_uppercase().contains("YES")),
2443 Err(_) => Ok(false),
2444 }
2445 }
2446
2447 fn build_cot_system_prompt(&self, base_prompt: &str) -> String {
2448 format!(
2449 "{}\n\n<instruction>\nThink through this step by step before answering:\n1. Understand what is being asked\n2. Break down the problem\n3. Work through each part\n4. Provide your final answer\n\nShow your thinking process, then give your final answer.\n</instruction>",
2450 base_prompt
2451 )
2452 }
2453
2454 fn build_react_system_prompt(&self, base_prompt: &str) -> String {
2455 format!(
2456 "{}\n\n<instruction>\nUse the Reason-Act-Observe pattern:\n1. Thought: Think about what to do\n2. Action: Use a tool if needed\n3. Observation: Analyze the result\n4. Repeat until you have the answer\n\nFormat your response showing Thought/Action/Observation steps.\n</instruction>",
2457 base_prompt
2458 )
2459 }
2460
2461 async fn generate_plan(&self, input: &str) -> Result<Plan> {
2462 let effective = self.get_effective_reasoning_config();
2463 let planning_config = effective.get_planning();
2464
2465 let planner_llm = planning_config
2466 .and_then(|c| c.planner_llm.as_ref())
2467 .and_then(|alias| self.llm_registry.get(alias).ok())
2468 .or_else(|| self.llm_registry.router().ok())
2469 .or_else(|| self.llm_registry.default().ok())
2470 .ok_or_else(|| AgentError::Config("No LLM available for planning".into()))?;
2471
2472 let mut available_tool_ids: Vec<String> = self
2473 .get_available_tool_ids()
2474 .await
2475 .unwrap_or_else(|_| self.tools.list_ids());
2476 let mut available_skills: Vec<String> = self.skills.iter().map(|s| s.id.clone()).collect();
2477
2478 if let Some(config) = planning_config {
2480 if !config.available.tools.is_all() {
2481 available_tool_ids.retain(|t| config.available.tools.allows(t));
2482 }
2483 if !config.available.skills.is_all() {
2484 available_skills.retain(|s| config.available.skills.allows(s));
2485 }
2486 }
2487
2488 let tool_descriptions: Vec<String> = available_tool_ids
2491 .iter()
2492 .filter_map(|id| {
2493 self.tools.get(id).map(|tool| {
2494 let schema = tool.input_schema();
2495 let args_desc = schema
2496 .get("properties")
2497 .and_then(|p| serde_json::to_string(p).ok())
2498 .unwrap_or_else(|| "{}".to_string());
2499 format!(
2500 "- {} ({}): {}\n Arguments: {}",
2501 id,
2502 tool.name(),
2503 tool.description(),
2504 args_desc
2505 )
2506 })
2507 })
2508 .collect();
2509
2510 let tools_section = if tool_descriptions.is_empty() {
2511 "Available tools: none".to_string()
2512 } else {
2513 format!("Available tools:\n{}", tool_descriptions.join("\n"))
2514 };
2515
2516 let skills_section = if available_skills.is_empty() {
2517 "Available skills: none".to_string()
2518 } else {
2519 format!("Available skills: {}", available_skills.join(", "))
2520 };
2521
2522 let prompt = format!(
2523 r#"Create a step-by-step plan to accomplish this goal.
2524
2525Goal: "{}"
2526
2527{}
2528
2529{}
2530
2531Create a plan with clear steps. For each step, specify:
2532- description: What this step accomplishes
2533- action_type: "tool", "skill", "think", or "respond"
2534- action_target: The tool/skill id (if applicable)
2535- args: The arguments object matching the tool's schema (if action_type is "tool")
2536- dependencies: List of step IDs this depends on (empty if none)
2537
2538Respond in JSON format:
2539{{
2540 "steps": [
2541 {{"id": "step1", "description": "...", "action_type": "tool", "action_target": "tool_id", "args": {{"required_field": "value"}}, "dependencies": []}},
2542 {{"id": "step2", "description": "...", "action_type": "think", "action_target": "...", "dependencies": ["step1"]}}
2543 ]
2544}}"#,
2545 input, tools_section, skills_section,
2546 );
2547
2548 let messages = vec![ChatMessage::user(&prompt)];
2549 let response = planner_llm
2550 .complete(&messages, None)
2551 .await
2552 .map_err(|e| AgentError::LLM(format!("Planning failed: {}", e)))?;
2553
2554 let mut plan = Plan::new(input);
2555
2556 if let Some(json_start) = response.content.find('{') {
2557 if let Some(json_end) = response.content.rfind('}') {
2558 let json_str = &response.content[json_start..=json_end];
2559 if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(json_str) {
2560 if let Some(steps) = parsed.get("steps").and_then(|s| s.as_array()) {
2561 for step_value in steps {
2562 let id = step_value
2563 .get("id")
2564 .and_then(|v| v.as_str())
2565 .unwrap_or("step");
2566 let desc = step_value
2567 .get("description")
2568 .and_then(|v| v.as_str())
2569 .unwrap_or("");
2570 let action_type = step_value
2571 .get("action_type")
2572 .and_then(|v| v.as_str())
2573 .unwrap_or("think");
2574 let action_target = step_value
2575 .get("action_target")
2576 .and_then(|v| v.as_str())
2577 .unwrap_or("");
2578 let args = step_value
2579 .get("args")
2580 .cloned()
2581 .unwrap_or(serde_json::json!({}));
2582 let deps: Vec<String> = step_value
2583 .get("dependencies")
2584 .and_then(|v| v.as_array())
2585 .map(|arr| {
2586 arr.iter()
2587 .filter_map(|v| v.as_str().map(String::from))
2588 .collect()
2589 })
2590 .unwrap_or_default();
2591
2592 let action = match action_type {
2593 "tool" => PlanAction::tool(action_target, args),
2594 "skill" => PlanAction::skill(action_target),
2595 "respond" => PlanAction::respond(action_target),
2596 _ => PlanAction::think(desc),
2597 };
2598
2599 let step = PlanStep::new(desc, action)
2600 .with_id(id)
2601 .with_dependencies(deps);
2602 plan.add_step(step);
2603 }
2604 }
2605 }
2606 }
2607 }
2608
2609 if plan.steps.is_empty() {
2610 plan.add_step(PlanStep::new(
2611 "Process the request",
2612 PlanAction::think(input),
2613 ));
2614 plan.add_step(PlanStep::new(
2615 "Provide response",
2616 PlanAction::respond("Answer based on analysis"),
2617 ));
2618 }
2619
2620 Ok(plan)
2621 }
2622
2623 async fn execute_plan(&self, plan: &mut Plan) -> Result<String> {
2624 let llm = self.get_state_llm()?;
2625 let mut results: HashMap<String, serde_json::Value> = HashMap::new();
2626 let effective = self.get_effective_reasoning_config();
2627 let max_steps = effective.get_planning().map(|c| c.max_steps).unwrap_or(10);
2628
2629 plan.status = PlanStatus::InProgress;
2630
2631 for step_idx in 0..plan.steps.len().min(max_steps as usize) {
2632 let step = &plan.steps[step_idx];
2633
2634 let deps_satisfied = step.dependencies.iter().all(|dep| {
2635 plan.steps
2636 .iter()
2637 .find(|s| &s.id == dep)
2638 .map(|s| s.status.is_completed())
2639 .unwrap_or(false)
2640 });
2641
2642 if !deps_satisfied {
2643 continue;
2644 }
2645
2646 plan.steps[step_idx].mark_running();
2647
2648 let result = match &plan.steps[step_idx].action {
2649 PlanAction::Tool { tool, args } => {
2650 let has_dep_results = plan.steps[step_idx]
2656 .dependencies
2657 .iter()
2658 .any(|dep| results.contains_key(dep));
2659
2660 let final_args = if has_dep_results {
2661 let dep_context: String = plan.steps[step_idx]
2662 .dependencies
2663 .iter()
2664 .filter_map(|dep| results.get(dep).map(|r| format!("{}: {}", dep, r)))
2665 .collect::<Vec<_>>()
2666 .join("\n");
2667
2668 let tool_schema = self
2669 .tools
2670 .get(tool)
2671 .map(|t| {
2672 let schema = t.input_schema();
2673 let props = schema
2674 .get("properties")
2675 .and_then(|p| serde_json::to_string(p).ok())
2676 .unwrap_or_else(|| "{}".to_string());
2677 format!(
2678 "{}: {}\nArguments schema: {}",
2679 t.id(),
2680 t.description(),
2681 props
2682 )
2683 })
2684 .unwrap_or_default();
2685
2686 let step_desc = &plan.steps[step_idx].description;
2687 let arg_prompt = format!(
2688 "Generate the JSON arguments for a tool call.\n\n\
2689 Tool: {}\n\n\
2690 Task: {}\n\n\
2691 Previous step results:\n{}\n\n\
2692 Planner's draft arguments: {}\n\n\
2693 Produce ONLY a valid JSON object with the correct argument values.\n\
2694 Use actual values from the previous step results, not template references.",
2695 tool_schema,
2696 step_desc,
2697 dep_context,
2698 serde_json::to_string(args).unwrap_or_default()
2699 );
2700 let messages = vec![ChatMessage::user(&arg_prompt)];
2701 match llm.complete(&messages, None).await {
2702 Ok(resp) => {
2703 let content = resp.content.trim();
2704 let json_start = content.find('{');
2706 let json_end = content.rfind('}');
2707 if let (Some(start), Some(end)) = (json_start, json_end) {
2708 serde_json::from_str(&content[start..=end])
2709 .unwrap_or_else(|_| args.clone())
2710 } else {
2711 args.clone()
2712 }
2713 }
2714 Err(_) => args.clone(),
2715 }
2716 } else {
2717 args.clone()
2718 };
2719
2720 let tool_call = ToolCall {
2721 id: uuid::Uuid::new_v4().to_string(),
2722 name: tool.clone(),
2723 arguments: final_args,
2724 };
2725 match self.execute_tool_smart(&tool_call).await {
2726 Ok(output) => serde_json::json!({ "output": output }),
2727 Err(e) => {
2728 plan.steps[step_idx].mark_failed(e.to_string());
2729 continue;
2730 }
2731 }
2732 }
2733 PlanAction::Skill { skill } => {
2734 if let Some(skill_def) = self.skills.iter().find(|s| &s.id == skill) {
2735 if let Some(ref executor) = self.skill_executor {
2736 match executor.execute(skill_def, "", serde_json::json!({})).await {
2737 Ok(output) => serde_json::json!({ "output": output }),
2738 Err(e) => {
2739 plan.steps[step_idx].mark_failed(e.to_string());
2740 continue;
2741 }
2742 }
2743 } else {
2744 serde_json::json!({ "output": "Skill executor not available" })
2745 }
2746 } else {
2747 plan.steps[step_idx].mark_failed("Skill not found");
2748 continue;
2749 }
2750 }
2751 PlanAction::Think { prompt } => {
2752 let context: String = results
2753 .iter()
2754 .map(|(k, v)| format!("{}: {}", k, v))
2755 .collect::<Vec<_>>()
2756 .join("\n");
2757
2758 let think_prompt = format!("Context:\n{}\n\nTask: {}", context, prompt);
2759 let messages = vec![ChatMessage::user(&think_prompt)];
2760
2761 match llm.complete(&messages, None).await {
2762 Ok(resp) => serde_json::json!({ "output": resp.content }),
2763 Err(e) => {
2764 plan.steps[step_idx].mark_failed(e.to_string());
2765 continue;
2766 }
2767 }
2768 }
2769 PlanAction::Respond { template } => {
2770 let context: String = results
2771 .iter()
2772 .map(|(k, v)| format!("{}: {}", k, v))
2773 .collect::<Vec<_>>()
2774 .join("\n");
2775
2776 let respond_prompt = format!(
2777 "Based on this context:\n{}\n\nGenerate a response following this template/instruction: {}",
2778 context, template
2779 );
2780 let messages = vec![ChatMessage::user(&respond_prompt)];
2781
2782 match llm.complete(&messages, None).await {
2783 Ok(resp) => serde_json::json!({ "output": resp.content }),
2784 Err(e) => {
2785 plan.steps[step_idx].mark_failed(e.to_string());
2786 continue;
2787 }
2788 }
2789 }
2790 };
2791
2792 results.insert(plan.steps[step_idx].id.clone(), result.clone());
2793 plan.steps[step_idx].mark_completed(Some(result));
2794 }
2795
2796 let has_failures = plan.steps.iter().any(|s| s.status.is_failed());
2798 if has_failures {
2799 let failed_ids: Vec<String> = plan
2800 .steps
2801 .iter()
2802 .filter(|s| s.status.is_failed())
2803 .map(|s| s.id.clone())
2804 .collect();
2805 plan.status = PlanStatus::Failed {
2806 error: format!("Steps failed: {}", failed_ids.join(", ")),
2807 };
2808 } else {
2809 plan.status = PlanStatus::Completed;
2810 }
2811
2812 let all_outputs: Vec<String> = plan
2814 .steps
2815 .iter()
2816 .filter(|s| s.status.is_completed())
2817 .filter_map(|s| {
2818 s.result
2819 .as_ref()
2820 .and_then(|r| r.get("output"))
2821 .and_then(|o| o.as_str())
2822 .map(|o| format!("{}: {}", s.description, o))
2823 })
2824 .collect();
2825
2826 if all_outputs.is_empty() {
2827 return Ok("Plan execution completed but produced no results.".to_string());
2828 }
2829
2830 if all_outputs.len() == 1 {
2831 return Ok(all_outputs.into_iter().next().unwrap());
2832 }
2833
2834 let context = all_outputs.join("\n\n");
2836 let prompt = format!(
2837 "You completed a multi-step plan for: \"{}\"\n\nStep results:\n{}\n\nProvide a coherent final response that synthesizes these results.",
2838 plan.goal, context
2839 );
2840 let messages = vec![ChatMessage::user(&prompt)];
2841 match llm.complete(&messages, None).await {
2842 Ok(resp) => Ok(resp.content.trim().to_string()),
2843 Err(_) => Ok(context),
2844 }
2845 }
2846
2847 async fn evaluate_response(&self, input: &str, response: &str) -> Result<EvaluationResult> {
2848 let effective_config = self.get_effective_reflection_config();
2849 self.evaluate_response_with_config(input, response, &effective_config)
2850 .await
2851 }
2852
2853 fn extract_thinking(&self, content: &str) -> (Option<String>, String) {
2854 if let Some(start) = content.find("<thinking>") {
2855 if let Some(end) = content.find("</thinking>") {
2856 let thinking = content[start + 10..end].trim().to_string();
2857 let answer = content[end + 11..].trim().to_string();
2858 return (Some(thinking), answer);
2859 }
2860 }
2861 (None, content.to_string())
2862 }
2863
2864 fn format_response_with_thinking(&self, thinking: Option<&str>, answer: &str) -> String {
2865 match self.get_effective_reasoning_config().output {
2866 ReasoningOutput::Hidden => answer.to_string(),
2867 ReasoningOutput::Visible => {
2868 if let Some(t) = thinking {
2869 format!("Thinking:\n{}\n\nAnswer:\n{}", t, answer)
2870 } else {
2871 answer.to_string()
2872 }
2873 }
2874 ReasoningOutput::Tagged => {
2875 if let Some(t) = thinking {
2876 format!("<thinking>{}</thinking>\n{}", t, answer)
2877 } else {
2878 answer.to_string()
2879 }
2880 }
2881 }
2882 }
2883
2884 async fn run_loop(&self, input: &str) -> Result<AgentResponse> {
2885 info!(input_len = input.len(), "Starting chat");
2886
2887 self.hooks.on_message_received(input).await;
2888
2889 if !self.context_initialized.swap(true, Ordering::SeqCst) {
2893 self.context_manager.initialize().await?;
2894 debug!("Context manager initialized (defaults, env, builtins)");
2895 }
2896
2897 self.check_turn_timeout().await?;
2898 self.context_manager.refresh_per_turn().await?;
2899
2900 self.clear_disambiguation_context();
2903
2904 if let Some(ref disambiguator) = self.disambiguation_manager {
2906 let disambiguation_context = self.build_disambiguation_context().await?;
2907
2908 let state_override = self
2910 .state_machine
2911 .as_ref()
2912 .and_then(|sm| sm.current_definition())
2913 .and_then(|def| def.disambiguation.clone());
2914
2915 match disambiguator
2916 .process_input_with_override(
2917 input,
2918 &disambiguation_context,
2919 state_override.as_ref(),
2920 None,
2921 )
2922 .await?
2923 {
2924 DisambiguationResult::Clear => {
2925 debug!("Input is clear, proceeding normally");
2926 }
2927 DisambiguationResult::NeedsClarification {
2928 question,
2929 detection,
2930 } => {
2931 info!(
2932 ambiguity_type = ?detection.ambiguity_type,
2933 confidence = detection.confidence,
2934 "Input requires clarification"
2935 );
2936
2937 self.memory.add_message(ChatMessage::user(input)).await?;
2938 self.memory
2939 .add_message(ChatMessage::assistant(&question.question))
2940 .await?;
2941
2942 return Ok(AgentResponse::new(&question.question).with_metadata(
2943 "disambiguation",
2944 serde_json::json!({
2945 "status": "awaiting_clarification",
2946 "options": question.options,
2947 "clarifying": question.clarifying,
2948 "detection": {
2949 "type": detection.ambiguity_type,
2950 "confidence": detection.confidence,
2951 "what_is_unclear": detection.what_is_unclear,
2952 }
2953 }),
2954 ));
2955 }
2956 DisambiguationResult::Clarified {
2957 enriched_input,
2958 resolved,
2959 ..
2960 } => {
2961 info!(
2962 resolved_count = resolved.len(),
2963 enriched = %enriched_input,
2964 "Input clarified, injecting resolved intent into context"
2965 );
2966
2967 for (key, value) in &resolved {
2970 let context_key = format!("disambiguation.{}", key);
2971 let _ = self.context_manager.set(&context_key, value.clone());
2972 }
2973
2974 if let Some(intent) = resolved.get("intent") {
2975 let _ = self.context_manager.set("resolved_intent", intent.clone());
2976 }
2977
2978 let _ = self
2979 .context_manager
2980 .set("disambiguation.resolved", serde_json::Value::Bool(true));
2981
2982 let skill_id = self.pending_skill_id.read().clone();
2986 if let Some(skill_id) = skill_id {
2987 info!(skill_id = %skill_id, "Re-checking skill disambiguation on clarified input");
2988 return self
2989 .recheck_skill_disambiguation(&skill_id, &enriched_input)
2990 .await;
2991 }
2992
2993 return self.run_loop_internal(&enriched_input).await;
2994 }
2995 DisambiguationResult::ProceedWithBestGuess { enriched_input } => {
2996 info!("Proceeding with best guess interpretation");
2997
2998 let skill_id = self.pending_skill_id.read().clone();
3000 if let Some(skill_id) = skill_id {
3001 info!(skill_id = %skill_id, "Re-checking skill disambiguation on best-guess input");
3002 return self
3003 .recheck_skill_disambiguation(&skill_id, &enriched_input)
3004 .await;
3005 }
3006
3007 return self.run_loop_internal(&enriched_input).await;
3008 }
3009 DisambiguationResult::GiveUp { reason } => {
3010 *self.pending_skill_id.write() = None;
3011 warn!(reason = %reason, "Disambiguation gave up");
3012 let apology = self
3013 .generate_localized_apology(
3014 "Generate a brief, polite apology saying you couldn't understand the request. Be concise.",
3015 &reason,
3016 )
3017 .await
3018 .unwrap_or_else(|_| {
3019 format!("I'm sorry, I couldn't understand your request: {}", reason)
3020 });
3021 return Ok(AgentResponse::new(&apology));
3022 }
3023 DisambiguationResult::Escalate { reason } => {
3024 *self.pending_skill_id.write() = None;
3025 info!(reason = %reason, "Escalating to human");
3026 if let Some(ref hitl) = self.hitl_engine {
3027 let trigger =
3028 ApprovalTrigger::condition("disambiguation_escalation", reason.clone());
3029 let mut context_map = HashMap::new();
3030 context_map.insert("original_input".to_string(), serde_json::json!(input));
3031 context_map.insert("reason".to_string(), serde_json::json!(&reason));
3032 let check_result = HITLCheckResult::required(
3033 trigger,
3034 context_map,
3035 format!("User request needs human assistance: {}", reason),
3036 Some(hitl.config().default_timeout_seconds),
3037 );
3038 let result = self.request_hitl_approval(check_result).await?;
3039 if matches!(
3040 result,
3041 ApprovalResult::Approved | ApprovalResult::Modified { .. }
3042 ) {
3043 return self.run_loop_internal(input).await;
3044 }
3045 }
3046 let apology = self
3047 .generate_localized_apology(
3048 "Explain briefly that you're transferring the user to a human agent for help.",
3049 &reason,
3050 )
3051 .await
3052 .unwrap_or_else(|_| {
3053 format!("I need human assistance to help with your request: {}", reason)
3054 });
3055 return Ok(AgentResponse::new(&apology));
3056 }
3057 DisambiguationResult::Abandoned { new_input } => {
3058 *self.pending_skill_id.write() = None;
3059
3060 info!(
3061 has_new_input = new_input.is_some(),
3062 "Clarification abandoned by user"
3063 );
3064
3065 self.memory.add_message(ChatMessage::user(input)).await?;
3066
3067 match new_input {
3068 Some(fresh_input) => {
3069 return self.run_loop_internal(&fresh_input).await;
3072 }
3073 None => {
3074 let ack = self
3076 .generate_localized_apology(
3077 "The user changed their mind about their previous request. \
3078 Generate a brief, friendly acknowledgment (e.g. 'OK, no problem. What else can I help with?'). \
3079 Do NOT apologize excessively. Be concise.",
3080 "User abandoned clarification",
3081 )
3082 .await
3083 .unwrap_or_else(|_| {
3084 "OK, no problem. What else can I help with?".to_string()
3085 });
3086
3087 self.memory
3088 .add_message(ChatMessage::assistant(&ack))
3089 .await?;
3090
3091 return Ok(AgentResponse::new(&ack));
3092 }
3093 }
3094 }
3095 }
3096 }
3097
3098 self.run_loop_internal(input).await
3099 }
3100
3101 async fn generate_localized_apology(&self, instruction: &str, reason: &str) -> Result<String> {
3103 let llm = self.llm_registry.router().map_err(|e| {
3104 AgentError::LLM(format!(
3105 "Router LLM not available for localized response: {}",
3106 e
3107 ))
3108 })?;
3109
3110 let recent: Vec<String> = self
3111 .memory
3112 .get_messages(Some(3))
3113 .await?
3114 .iter()
3115 .map(|m| m.content.clone())
3116 .collect();
3117
3118 let context_hint = if recent.is_empty() {
3119 String::new()
3120 } else {
3121 format!(
3122 "\nRecent conversation (detect the user's language from this):\n{}\n",
3123 recent.join("\n")
3124 )
3125 };
3126
3127 let prompt = format!(
3128 "{}\nReason: {}\n{}Respond in the same language as the user. Output ONLY the message, nothing else.",
3129 instruction, reason, context_hint
3130 );
3131
3132 let messages = vec![ChatMessage::user(&prompt)];
3133 let response = llm
3134 .complete(&messages, None)
3135 .await
3136 .map_err(|e| AgentError::LLM(format!("Localized response generation failed: {}", e)))?;
3137
3138 Ok(response.content.trim().to_string())
3139 }
3140
3141 fn render_action_args(&self, args: &Value) -> Value {
3145 let context = self.context_manager.get_all();
3146 match args {
3147 Value::Object(map) => {
3148 let mut rendered = serde_json::Map::new();
3149 for (k, v) in map {
3150 match v {
3151 Value::String(s) if s.contains("{{") => {
3152 match self.template_renderer.render(s, &context) {
3153 Ok(rendered_str) => {
3154 rendered.insert(k.clone(), Value::String(rendered_str));
3155 }
3156 Err(_) => {
3157 rendered.insert(k.clone(), v.clone());
3158 }
3159 }
3160 }
3161 _ => {
3162 rendered.insert(k.clone(), v.clone());
3163 }
3164 }
3165 }
3166 Value::Object(rendered)
3167 }
3168 _ => args.clone(),
3169 }
3170 }
3171
3172 fn clear_disambiguation_context(&self) {
3174 let _ = self
3175 .context_manager
3176 .set("resolved_intent", serde_json::Value::Null);
3177
3178 let all = self.context_manager.get_all();
3179 for key in all.keys() {
3180 if key.starts_with("disambiguation.") {
3181 let _ = self.context_manager.set(key, serde_json::Value::Null);
3182 }
3183 }
3184 }
3185
3186 async fn recheck_skill_disambiguation(
3192 &self,
3193 skill_id: &str,
3194 enriched_input: &str,
3195 ) -> Result<AgentResponse> {
3196 let skill = self
3197 .skill_router
3198 .as_ref()
3199 .and_then(|r| r.get_skill(skill_id).cloned());
3200
3201 if let Some(ref skill) = skill {
3203 if let Some(ref skill_disambig) = skill.disambiguation {
3204 if skill_disambig.enabled.unwrap_or(false) {
3205 if let Some(ref disambiguator) = self.disambiguation_manager {
3206 let context = self.build_disambiguation_context().await?;
3207 let state_override = self
3208 .state_machine
3209 .as_ref()
3210 .and_then(|sm| sm.current_definition())
3211 .and_then(|def| def.disambiguation.clone());
3212
3213 match disambiguator
3214 .process_input_with_override(
3215 enriched_input,
3216 &context,
3217 state_override.as_ref(),
3218 Some(skill_disambig),
3219 )
3220 .await?
3221 {
3222 DisambiguationResult::Clear => {
3223 debug!(skill_id = %skill_id, "Skill re-check: all fields present");
3224 }
3225 DisambiguationResult::NeedsClarification {
3226 question,
3227 detection,
3228 } => {
3229 info!(
3230 skill_id = %skill_id,
3231 ambiguity_type = ?detection.ambiguity_type,
3232 what_is_unclear = ?detection.what_is_unclear,
3233 "Skill re-check: still missing fields, asking again"
3234 );
3235 self.memory
3239 .add_message(ChatMessage::user(enriched_input))
3240 .await?;
3241 self.memory
3242 .add_message(ChatMessage::assistant(&question.question))
3243 .await?;
3244
3245 return Ok(AgentResponse::new(&question.question).with_metadata(
3246 "disambiguation",
3247 serde_json::json!({
3248 "status": "awaiting_clarification",
3249 "skill_id": skill_id,
3250 "options": question.options,
3251 "clarifying": question.clarifying,
3252 "detection": {
3253 "type": detection.ambiguity_type,
3254 "confidence": detection.confidence,
3255 "what_is_unclear": detection.what_is_unclear,
3256 }
3257 }),
3258 ));
3259 }
3260 DisambiguationResult::Clarified {
3261 enriched_input: re_enriched,
3262 ..
3263 } => {
3264 debug!(skill_id = %skill_id, "Skill re-check: clarified immediately, executing");
3265 *self.pending_skill_id.write() = None;
3267 let skill_response =
3268 self.execute_skill_by_id(skill_id, &re_enriched).await?;
3269 self.memory
3270 .add_message(ChatMessage::user(&re_enriched))
3271 .await?;
3272 return self
3273 .handle_skill_response(
3274 &re_enriched,
3275 skill_response,
3276 &HashMap::new(),
3277 )
3278 .await;
3279 }
3280 DisambiguationResult::ProceedWithBestGuess {
3281 enriched_input: re_enriched,
3282 } => {
3283 debug!(skill_id = %skill_id, "Skill re-check: proceeding with best guess");
3284 *self.pending_skill_id.write() = None;
3285 let skill_response =
3286 self.execute_skill_by_id(skill_id, &re_enriched).await?;
3287 self.memory
3288 .add_message(ChatMessage::user(&re_enriched))
3289 .await?;
3290 return self
3291 .handle_skill_response(
3292 &re_enriched,
3293 skill_response,
3294 &HashMap::new(),
3295 )
3296 .await;
3297 }
3298 DisambiguationResult::GiveUp { reason } => {
3299 *self.pending_skill_id.write() = None;
3300 let apology = self
3301 .generate_localized_apology(
3302 "Generate a brief, polite apology saying you couldn't understand the request. Be concise.",
3303 &reason,
3304 )
3305 .await
3306 .unwrap_or_else(|_| {
3307 format!("I'm sorry, I couldn't understand your request: {}", reason)
3308 });
3309 return Ok(AgentResponse::new(&apology));
3310 }
3311 DisambiguationResult::Escalate { reason } => {
3312 *self.pending_skill_id.write() = None;
3313 let apology = self
3314 .generate_localized_apology(
3315 "Explain briefly that you're transferring the user to a human agent for help.",
3316 &reason,
3317 )
3318 .await
3319 .unwrap_or_else(|_| {
3320 format!("I need human assistance to help with your request: {}", reason)
3321 });
3322 return Ok(AgentResponse::new(&apology));
3323 }
3324 DisambiguationResult::Abandoned { new_input } => {
3325 *self.pending_skill_id.write() = None;
3328 debug!(skill_id = %skill_id, "Skill re-check: abandoned by user");
3329 if let Some(fresh) = new_input {
3330 return self.run_loop_internal(&fresh).await;
3331 }
3332 let ack = self
3333 .generate_localized_apology(
3334 "The user changed their mind about their previous request. \
3335 Generate a brief, friendly acknowledgment (e.g. 'OK, no problem. What else can I help with?'). \
3336 Do NOT apologize excessively. Be concise.",
3337 "User abandoned clarification",
3338 )
3339 .await
3340 .unwrap_or_else(|_| {
3341 "OK, no problem. What else can I help with?".to_string()
3342 });
3343 self.memory
3344 .add_message(ChatMessage::assistant(&ack))
3345 .await?;
3346 return Ok(AgentResponse::new(&ack));
3347 }
3348 }
3349 }
3350 }
3351 }
3352 }
3353
3354 *self.pending_skill_id.write() = None;
3356 let skill_response = self.execute_skill_by_id(skill_id, enriched_input).await?;
3357 self.memory
3358 .add_message(ChatMessage::user(enriched_input))
3359 .await?;
3360 self.handle_skill_response(enriched_input, skill_response, &HashMap::new())
3361 .await
3362 }
3363
3364 async fn handle_skill_response(
3367 &self,
3368 processed_input: &str,
3369 skill_response: String,
3370 input_context: &HashMap<String, Value>,
3371 ) -> Result<AgentResponse> {
3372 let output_data = self.process_output(&skill_response, input_context).await?;
3373 let final_response = output_data.content;
3374
3375 self.memory
3376 .add_message(ChatMessage::assistant(&final_response))
3377 .await?;
3378
3379 self.check_memory_compression().await?;
3380
3381 self.increment_turn();
3382 self.evaluate_transitions(processed_input, &final_response)
3383 .await?;
3384
3385 let response = AgentResponse::new(final_response);
3386 self.hooks.on_response(&response).await;
3387 Ok(response)
3388 }
3389
3390 async fn handle_plan_and_execute(
3393 &self,
3394 processed_input: &str,
3395 input_context: &HashMap<String, Value>,
3396 auto_detected: bool,
3397 ) -> Result<AgentResponse> {
3398 let effective = self.get_effective_reasoning_config();
3399 let plan_reflection = effective
3400 .get_planning()
3401 .map(|c| c.reflection.clone())
3402 .unwrap_or_default();
3403
3404 let max_attempts = if plan_reflection.enabled {
3405 1 + plan_reflection.max_replans
3406 } else {
3407 1
3408 };
3409
3410 let mut plan = self.generate_plan(processed_input).await?;
3411 info!(
3412 plan_id = %plan.id,
3413 steps = plan.steps.len(),
3414 "Plan generated"
3415 );
3416
3417 let mut plan_result = String::new();
3418
3419 for attempt in 0..max_attempts {
3420 *self.current_plan.write() = Some(plan.clone());
3421 plan_result = self.execute_plan(&mut plan).await?;
3422
3423 info!(
3424 plan_status = ?plan.status,
3425 completed_steps = plan.completed_steps().count(),
3426 attempt = attempt + 1,
3427 "Plan execution completed"
3428 );
3429
3430 if !plan_reflection.enabled {
3431 break;
3432 }
3433
3434 let has_failures = plan.steps.iter().any(|s| s.status.is_failed());
3435 if !has_failures {
3436 break;
3437 }
3438
3439 if attempt + 1 >= max_attempts {
3440 break;
3441 }
3442
3443 match plan_reflection.on_step_failure {
3444 StepFailureAction::Replan => {
3445 info!(attempt = attempt + 1, "Plan had failures, replanning");
3446 plan = self.generate_plan(processed_input).await?;
3447 }
3448 StepFailureAction::Abort => {
3449 warn!("Plan step failed, aborting");
3450 break;
3451 }
3452 StepFailureAction::Skip | StepFailureAction::Continue => {
3453 break;
3454 }
3455 }
3456 }
3457
3458 *self.current_plan.write() = Some(plan);
3459
3460 let output_data = self.process_output(&plan_result, input_context).await?;
3461 let final_content = output_data.content;
3462
3463 self.memory
3464 .add_message(ChatMessage::assistant(&final_content))
3465 .await?;
3466
3467 self.check_memory_compression().await?;
3468 self.increment_turn();
3469 self.evaluate_transitions(processed_input, &final_content)
3470 .await?;
3471
3472 let reasoning_metadata =
3473 ReasoningMetadata::new(ReasoningMode::PlanAndExecute).with_auto_detected(auto_detected);
3474
3475 let response = AgentResponse::new(&final_content).with_metadata(
3476 "reasoning",
3477 serde_json::to_value(&reasoning_metadata).unwrap_or_default(),
3478 );
3479
3480 self.hooks.on_response(&response).await;
3481 Ok(response)
3482 }
3483
3484 fn inject_reasoning_prompt(
3486 &self,
3487 messages: &mut [ChatMessage],
3488 reasoning_mode: &ReasoningMode,
3489 is_first_iteration: bool,
3490 ) {
3491 if !is_first_iteration {
3492 return;
3493 }
3494 match reasoning_mode {
3495 ReasoningMode::CoT => {
3496 if let Some(msg) = messages.first_mut() {
3497 if matches!(msg.role, ai_agents_core::Role::System) {
3498 msg.content = self.build_cot_system_prompt(&msg.content);
3499 debug!("Applied Chain-of-Thought system prompt");
3500 }
3501 }
3502 }
3503 ReasoningMode::React => {
3504 if let Some(msg) = messages.first_mut() {
3505 if matches!(msg.role, ai_agents_core::Role::System) {
3506 msg.content = self.build_react_system_prompt(&msg.content);
3507 debug!("Applied ReAct system prompt");
3508 }
3509 }
3510 }
3511 _ => {}
3512 }
3513 }
3514
3515 async fn handle_tool_calls(
3517 &self,
3518 processed_input: &str,
3519 content: &str,
3520 tool_calls: Vec<ToolCall>,
3521 all_tool_calls: &mut Vec<ToolCall>,
3522 ) -> Result<ToolCallOutcome> {
3523 let transition_fired = self.evaluate_transitions(processed_input, content).await?;
3527 if transition_fired {
3528 self.memory
3529 .add_message(ChatMessage::assistant(
3530 "(Transitioned to new state — tool call handled by workflow)",
3531 ))
3532 .await?;
3533 return Ok(ToolCallOutcome::TransitionFired);
3534 }
3535
3536 self.memory
3538 .add_message(ChatMessage::assistant(content))
3539 .await?;
3540
3541 let results = self.execute_tools_parallel(&tool_calls).await;
3542
3543 for ((_id, result), tool_call) in results.into_iter().zip(tool_calls.iter()) {
3544 match result {
3545 Ok(output) => {
3546 self.memory
3547 .add_message(ChatMessage::function(&tool_call.name, &output))
3548 .await?;
3549 }
3550 Err(e) => {
3551 if matches!(e, AgentError::HITLRejected(_)) {
3553 self.memory
3554 .add_message(ChatMessage::assistant(&format!(
3555 "The operation was rejected by the approver: {}",
3556 e
3557 )))
3558 .await?;
3559 return Ok(ToolCallOutcome::Rejected(AgentResponse {
3561 content: format!("Operation cancelled: {}", e),
3562 metadata: None,
3563 tool_calls: Some(all_tool_calls.clone()),
3564 }));
3565 }
3566 self.memory
3567 .add_message(ChatMessage::function(
3568 &tool_call.name,
3569 &format!("Error: {}", e),
3570 ))
3571 .await?;
3572 }
3573 }
3574 all_tool_calls.push(tool_call.clone());
3575 }
3576 Ok(ToolCallOutcome::Continue)
3577 }
3578
3579 async fn run_reflection(
3581 &self,
3582 llm: &dyn LLMProvider,
3583 processed_input: &str,
3584 mut content: String,
3585 ) -> Result<(String, Option<ReflectionMetadata>)> {
3586 let should_reflect = self.should_reflect(processed_input, &content).await?;
3587 if !should_reflect {
3588 return Ok((content, None));
3589 }
3590
3591 info!("Starting response reflection evaluation");
3592 let mut attempts = 0u32;
3593 let max_retries = self.reflection_config.max_retries;
3594 let mut history: Vec<ReflectionAttempt> = Vec::new();
3595
3596 loop {
3597 let evaluation = self.evaluate_response(processed_input, &content).await?;
3598
3599 if evaluation.passed || attempts >= max_retries {
3600 info!(
3601 passed = evaluation.passed,
3602 confidence = evaluation.confidence,
3603 attempts = attempts + 1,
3604 "Reflection evaluation complete"
3605 );
3606 let reflection_metadata = Some(
3607 ReflectionMetadata::new(evaluation)
3608 .with_attempts(attempts + 1)
3609 .with_history(history),
3610 );
3611 return Ok((content, reflection_metadata));
3612 }
3613
3614 debug!(
3615 attempt = attempts + 1,
3616 failed_criteria = evaluation.failed_criteria().count(),
3617 "Response did not meet criteria, retrying"
3618 );
3619
3620 history.push(
3621 ReflectionAttempt::new(&content, evaluation.clone())
3622 .with_feedback("Response did not meet quality criteria"),
3623 );
3624
3625 let feedback: Vec<String> = evaluation
3626 .failed_criteria()
3627 .map(|c| format!("- {}", c.criterion))
3628 .collect();
3629
3630 let retry_prompt = format!(
3631 "Your previous response did not meet these criteria:\n{}\n\nPlease provide an improved response.",
3632 feedback.join("\n")
3633 );
3634
3635 self.memory
3636 .add_message(ChatMessage::user(&retry_prompt))
3637 .await?;
3638
3639 let retry_messages = self.build_messages().await?;
3640 let retry_response = llm
3641 .complete(&retry_messages, None)
3642 .await
3643 .map_err(|e| AgentError::LLM(e.to_string()))?;
3644
3645 content = retry_response.content.trim().to_string();
3646 attempts += 1;
3647 }
3648 }
3649
3650 async fn post_loop_processing(
3653 &self,
3654 processed_input: &str,
3655 content: String,
3656 ) -> Result<PostLoopResult> {
3657 self.increment_turn();
3662
3663 self.run_context_extractors(processed_input).await;
3665
3666 let transitioned = self.evaluate_transitions(processed_input, &content).await?;
3667
3668 if !transitioned {
3669 self.memory
3670 .add_message(ChatMessage::assistant(&content))
3671 .await?;
3672 self.check_memory_compression().await?;
3673 return Ok(PostLoopResult::NoTransition(content));
3674 }
3675
3676 if !self.should_regenerate_after_transition() {
3678 self.memory
3679 .add_message(ChatMessage::assistant(&content))
3680 .await?;
3681 self.check_memory_compression().await?;
3682 return Ok(PostLoopResult::Transitioned(content));
3683 }
3684
3685 if self.needs_redispatch_for_new_state() {
3689 info!("Post-transition NeedsRedispatch: new state requires full dispatch");
3690 return Ok(PostLoopResult::NeedsRedispatch);
3693 }
3694
3695 self.memory
3698 .add_message(ChatMessage::assistant(&content))
3699 .await?;
3700 self.check_memory_compression().await?;
3701
3702 let new_llm = self.get_state_llm()?;
3708 let mut final_content;
3709
3710 for post_iter in 0..self.max_iterations {
3711 let new_messages = self.build_messages().await?;
3712 if post_iter == 0 {
3713 if let Some(system_msg) = new_messages.first() {
3714 if system_msg.role == ai_agents_core::Role::System {
3715 debug!(
3716 prompt_preview =
3717 &system_msg.content[system_msg.content.len().saturating_sub(200)..],
3718 "Post-transition system prompt (last 200 chars)"
3719 );
3720 }
3721 }
3722 }
3723
3724 let new_response = new_llm
3725 .complete(&new_messages, None)
3726 .await
3727 .map_err(|e| AgentError::LLM(e.to_string()))?;
3728 final_content = new_response.content.trim().to_string();
3729
3730 if let Some(tool_calls) = self.parse_tool_calls(&final_content) {
3733 debug!(
3734 post_iter = post_iter,
3735 tools = tool_calls.len(),
3736 "Post-transition tool call detected, executing"
3737 );
3738
3739 self.memory
3740 .add_message(ChatMessage::assistant(&final_content))
3741 .await?;
3742
3743 let results = self.execute_tools_parallel(&tool_calls).await;
3744 for ((_id, result), tool_call) in results.into_iter().zip(tool_calls.iter()) {
3745 match result {
3746 Ok(output) => {
3747 self.memory
3748 .add_message(ChatMessage::function(&tool_call.name, &output))
3749 .await?;
3750 }
3751 Err(e) => {
3752 self.memory
3753 .add_message(ChatMessage::function(
3754 &tool_call.name,
3755 &format!("Error: {}", e),
3756 ))
3757 .await?;
3758 }
3759 }
3760 }
3761 continue;
3763 }
3764
3765 self.memory
3767 .add_message(ChatMessage::assistant(&final_content))
3768 .await?;
3769 return Ok(PostLoopResult::Transitioned(final_content));
3770 }
3771
3772 final_content = "Post-transition processing completed.".to_string();
3774 self.memory
3775 .add_message(ChatMessage::assistant(&final_content))
3776 .await?;
3777
3778 Ok(PostLoopResult::Transitioned(final_content))
3779 }
3780
3781 fn should_regenerate_after_transition(&self) -> bool {
3784 if let Some(ref sm) = self.state_machine {
3785 if !sm.config().regenerate_on_transition {
3787 return false;
3788 }
3789 if let Some(def) = sm.current_definition() {
3791 if let Some(regen) = def.regenerate_on_enter {
3792 return regen;
3793 }
3794 }
3795 }
3796 true
3797 }
3798
3799 fn needs_redispatch_for_new_state(&self) -> bool {
3802 if let Some(ref sm) = self.state_machine {
3803 if let Some(def) = sm.current_definition() {
3804 if def.concurrent.is_some()
3805 || def.group_chat.is_some()
3806 || def.pipeline.is_some()
3807 || def.handoff.is_some()
3808 || def.delegate.is_some()
3809 {
3810 return true;
3811 }
3812 let effective = self.get_effective_reasoning_config();
3814 if !matches!(effective.mode, ReasoningMode::None) {
3815 return true;
3816 }
3817 }
3818 }
3819 false
3820 }
3821
3822 async fn apply_post_loop_result(
3825 &self,
3826 processed_input: &str,
3827 result: PostLoopResult,
3828 ) -> Result<String> {
3829 match result {
3830 PostLoopResult::NoTransition(content) | PostLoopResult::Transitioned(content) => {
3831 Ok(content)
3832 }
3833 PostLoopResult::NeedsRedispatch => {
3834 const MAX_REDISPATCH_DEPTH: u32 = 3;
3835 let current_depth = *self.redispatch_depth.read();
3836 if current_depth >= MAX_REDISPATCH_DEPTH {
3837 warn!(
3838 depth = current_depth,
3839 "Post-transition re-dispatch depth limit reached, returning empty response"
3840 );
3841 let content = String::new();
3842 self.memory
3843 .add_message(ChatMessage::assistant(&content))
3844 .await?;
3845 return Ok(content);
3846 }
3847 *self.redispatch_depth.write() += 1;
3848 info!(
3849 depth = current_depth + 1,
3850 "Re-dispatching for new state after transition"
3851 );
3852 let resp = Box::pin(self.run_loop_internal(processed_input)).await;
3853 *self.redispatch_depth.write() -= 1;
3854 resp.map(|r| r.content)
3855 }
3856 }
3857 }
3858
3859 fn build_agent_response(
3860 &self,
3861 content: String,
3862 all_tool_calls: Vec<ToolCall>,
3863 reasoning_mode: ReasoningMode,
3864 auto_detected: bool,
3865 iterations: u32,
3866 thinking: Option<String>,
3867 reflection_metadata: Option<ReflectionMetadata>,
3868 ) -> AgentResponse {
3869 let reasoning_metadata = ReasoningMetadata::new(reasoning_mode.clone())
3870 .with_thinking(thinking.clone().unwrap_or_default())
3871 .with_iterations(iterations)
3872 .with_auto_detected(auto_detected);
3873
3874 let mut response = AgentResponse::new(&content);
3875 if !all_tool_calls.is_empty() {
3876 response = response.with_tool_calls(all_tool_calls);
3877 }
3878
3879 if let Some(state) = self.current_state() {
3880 response = response.with_metadata("current_state", serde_json::json!(state));
3881 }
3882
3883 response = response.with_metadata(
3884 "reasoning",
3885 serde_json::to_value(&reasoning_metadata).unwrap_or_default(),
3886 );
3887
3888 if let Some(ref refl_meta) = reflection_metadata {
3889 response = response.with_metadata(
3890 "reflection",
3891 serde_json::to_value(refl_meta).unwrap_or_default(),
3892 );
3893 }
3894
3895 response
3896 }
3897
3898 async fn handle_delegated_state(
3900 &self,
3901 input: &str,
3902 delegate_id: &str,
3903 state_def: &ai_agents_state::StateDefinition,
3904 ) -> Result<AgentResponse> {
3905 use std::time::Instant;
3906
3907 let registry = self.spawner_registry.as_ref().ok_or_else(|| {
3908 AgentError::Config(format!(
3909 "State delegates to '{}' but no agent registry is configured. \
3910 Add a spawner section with auto_spawn to your YAML.",
3911 delegate_id
3912 ))
3913 })?;
3914
3915 let state_name = self
3916 .state_machine
3917 .as_ref()
3918 .map(|sm| sm.current())
3919 .unwrap_or_else(|| "unknown".to_string());
3920
3921 self.hooks.on_delegate_start(delegate_id, &state_name).await;
3922 let start = Instant::now();
3923
3924 let delegate = registry.get(delegate_id).ok_or_else(|| {
3925 AgentError::Other(format!(
3926 "State '{}' delegates to '{}' but no agent with that ID exists in the registry.",
3927 state_name, delegate_id
3928 ))
3929 })?;
3930
3931 let context_mode = state_def.delegate_context.clone().unwrap_or_default();
3933 let effective_input = crate::orchestration::context::prepare_delegate_input(
3934 input,
3935 &context_mode,
3936 &*self.memory,
3937 self.llm_registry.get("router").ok().as_deref(),
3938 )
3939 .await?;
3940
3941 let response = delegate.chat(&effective_input).await?;
3942
3943 let duration_ms = start.elapsed().as_millis() as u64;
3944 self.hooks
3945 .on_delegate_complete(delegate_id, &state_name, duration_ms)
3946 .await;
3947
3948 let ctx_key = format!("delegation.{}.last_response", delegate_id);
3950 let _ = self.context_manager.set(
3951 &ctx_key,
3952 serde_json::Value::String(response.content.clone()),
3953 );
3954
3955 let _ = self.context_manager.set(
3957 "orchestration",
3958 serde_json::json!({
3959 "type": "delegate",
3960 "agent": delegate_id,
3961 "state": state_name,
3962 "response": response.content,
3963 "duration_ms": duration_ms,
3964 }),
3965 );
3966
3967 if *self.redispatch_depth.read() == 0 {
3969 self.memory
3970 .add_message(ai_agents_llm::ChatMessage::user(input))
3971 .await?;
3972 }
3973
3974 let post_result = self
3977 .post_loop_processing(
3978 input,
3979 format!("[Delegated to {}]: {}", delegate_id, response.content),
3980 )
3981 .await?;
3982 let final_content = self.apply_post_loop_result(input, post_result).await?;
3983
3984 let mut result = AgentResponse::new(final_content);
3985
3986 let metadata = serde_json::json!({
3987 "orchestration": {
3988 "type": "delegate",
3989 "agent": delegate_id,
3990 "state": state_name,
3991 "response": response.content,
3992 "duration_ms": duration_ms,
3993 }
3994 });
3995 result.metadata = Some(
3996 serde_json::from_value::<std::collections::HashMap<String, serde_json::Value>>(
3997 metadata,
3998 )
3999 .unwrap_or_default(),
4000 );
4001
4002 self.hooks.on_response(&result).await;
4003 Ok(result)
4004 }
4005
4006 async fn handle_concurrent_state(
4008 &self,
4009 input: &str,
4010 config: &ai_agents_state::ConcurrentStateConfig,
4011 ) -> Result<AgentResponse> {
4012 use std::time::Instant;
4013
4014 let registry = self.spawner_registry.as_ref().ok_or_else(|| {
4015 AgentError::Config(
4016 "Concurrent state requires an agent registry. Add a spawner section.".into(),
4017 )
4018 })?;
4019
4020 let context_mode = config.context_mode.clone().unwrap_or_default();
4025 let context_input = crate::orchestration::context::prepare_delegate_input(
4026 input,
4027 &context_mode,
4028 &*self.memory,
4029 self.llm_registry.get("router").ok().as_deref(),
4030 )
4031 .await?;
4032
4033 let effective_input = if let Some(ref tmpl) = config.input {
4034 render_concurrent_template(tmpl, &context_input, &self.context_manager.get_all())
4035 .unwrap_or_else(|_| context_input.clone())
4036 } else {
4037 context_input
4038 };
4039
4040 let start = Instant::now();
4041
4042 let llm_name = config
4043 .aggregation
4044 .synthesizer_llm
4045 .as_deref()
4046 .unwrap_or("router");
4047 let llm_provider = self.llm_registry.get(llm_name).ok();
4048
4049 let result = crate::orchestration::concurrent(
4050 registry,
4051 &effective_input,
4052 &config.agents,
4053 &config.aggregation,
4054 llm_provider.as_deref(),
4055 config.min_required,
4056 config.timeout_ms,
4057 config.on_partial_failure.clone(),
4058 )
4059 .await?;
4060
4061 let duration_ms = start.elapsed().as_millis() as u64;
4062 let agent_ids: Vec<String> = config.agents.iter().map(|a| a.id().to_string()).collect();
4063 let strategy = format!("{:?}", config.aggregation.strategy);
4064 self.hooks
4065 .on_concurrent_complete(&agent_ids, &strategy, duration_ms)
4066 .await;
4067
4068 let _ = self.context_manager.set(
4070 "concurrent.result",
4071 serde_json::Value::String(result.response.content.clone()),
4072 );
4073
4074 let agents_json: Vec<serde_json::Value> = result
4076 .agent_results
4077 .iter()
4078 .map(|ar| {
4079 serde_json::json!({
4080 "id": ar.agent_id,
4081 "response": ar.response.as_ref().map(|r| r.content.as_str()),
4082 "success": ar.success,
4083 "error": ar.error,
4084 "duration_ms": ar.duration_ms,
4085 })
4086 })
4087 .collect();
4088
4089 let _ = self.context_manager.set(
4091 "orchestration",
4092 serde_json::json!({
4093 "type": "concurrent",
4094 "result": result.response.content,
4095 "strategy": strategy,
4096 "agents": agents_json,
4097 "duration_ms": duration_ms,
4098 }),
4099 );
4100
4101 if *self.redispatch_depth.read() == 0 {
4103 self.memory
4104 .add_message(ai_agents_llm::ChatMessage::user(input))
4105 .await?;
4106 }
4107
4108 let post_result = self
4109 .post_loop_processing(input, result.response.content.clone())
4110 .await?;
4111 let final_content = self.apply_post_loop_result(input, post_result).await?;
4112
4113 let mut response = AgentResponse::new(final_content);
4114 let metadata = serde_json::json!({
4115 "orchestration": {
4116 "type": "concurrent",
4117 "result": result.response.content,
4118 "strategy": strategy,
4119 "agents": agents_json,
4120 "duration_ms": duration_ms,
4121 }
4122 });
4123 response.metadata = Some(
4124 serde_json::from_value::<std::collections::HashMap<String, serde_json::Value>>(
4125 metadata,
4126 )
4127 .unwrap_or_default(),
4128 );
4129
4130 self.hooks.on_response(&response).await;
4131 Ok(response)
4132 }
4133
4134 async fn handle_group_chat_state(
4136 &self,
4137 input: &str,
4138 config: &ai_agents_state::GroupChatStateConfig,
4139 ) -> Result<AgentResponse> {
4140 use std::time::Instant;
4141
4142 let registry = self.spawner_registry.as_ref().ok_or_else(|| {
4143 AgentError::Config(
4144 "Group chat state requires an agent registry. Add a spawner section.".into(),
4145 )
4146 })?;
4147
4148 let start = Instant::now();
4149
4150 let llm_provider = self.llm_registry.get("router").ok();
4151
4152 let context_mode = config.context_mode.clone().unwrap_or_default();
4154 let context_input = crate::orchestration::context::prepare_delegate_input(
4155 input,
4156 &context_mode,
4157 &*self.memory,
4158 self.llm_registry.get("router").ok().as_deref(),
4159 )
4160 .await?;
4161
4162 let effective_topic = if let Some(ref tmpl) = config.input {
4164 render_concurrent_template(tmpl, &context_input, &self.context_manager.get_all())
4165 .unwrap_or_else(|_| context_input.clone())
4166 } else {
4167 context_input
4168 };
4169
4170 let result = crate::orchestration::group_chat(
4171 registry,
4172 &effective_topic,
4173 config,
4174 llm_provider.as_deref(),
4175 Some(&*self.hooks),
4176 )
4177 .await?;
4178
4179 let duration_ms = start.elapsed().as_millis() as u64;
4180
4181 let _ = self.context_manager.set(
4183 "group_chat.conclusion",
4184 serde_json::Value::String(result.response.content.clone()),
4185 );
4186
4187 let transcript_json: Vec<serde_json::Value> = result
4189 .transcript
4190 .iter()
4191 .map(|t| {
4192 serde_json::json!({
4193 "speaker": t.speaker,
4194 "round": t.round,
4195 "content": t.content,
4196 })
4197 })
4198 .collect();
4199
4200 let _ = self.context_manager.set(
4202 "orchestration",
4203 serde_json::json!({
4204 "type": "group_chat",
4205 "conclusion": result.response.content,
4206 "transcript": transcript_json,
4207 "rounds": result.rounds_completed,
4208 "termination": result.termination_reason,
4209 "duration_ms": duration_ms,
4210 }),
4211 );
4212
4213 if *self.redispatch_depth.read() == 0 {
4215 self.memory
4216 .add_message(ai_agents_llm::ChatMessage::user(input))
4217 .await?;
4218 }
4219
4220 let post_result = self
4221 .post_loop_processing(input, result.response.content.clone())
4222 .await?;
4223 let final_content = self.apply_post_loop_result(input, post_result).await?;
4224
4225 let mut response = AgentResponse::new(final_content);
4226 let metadata = serde_json::json!({
4227 "orchestration": {
4228 "type": "group_chat",
4229 "conclusion": result.response.content,
4230 "transcript": transcript_json,
4231 "rounds": result.rounds_completed,
4232 "termination": result.termination_reason,
4233 "duration_ms": duration_ms,
4234 }
4235 });
4236 response.metadata = Some(
4237 serde_json::from_value::<std::collections::HashMap<String, serde_json::Value>>(
4238 metadata,
4239 )
4240 .unwrap_or_default(),
4241 );
4242
4243 self.hooks.on_response(&response).await;
4244 Ok(response)
4245 }
4246
4247 async fn handle_pipeline_state(
4249 &self,
4250 input: &str,
4251 config: &ai_agents_state::PipelineStateConfig,
4252 ) -> Result<AgentResponse> {
4253 use std::time::Instant;
4254
4255 let registry = self.spawner_registry.as_ref().ok_or_else(|| {
4256 AgentError::Config(
4257 "Pipeline state requires an agent registry. Add a spawner section.".into(),
4258 )
4259 })?;
4260
4261 let start = Instant::now();
4262
4263 let stages: Vec<crate::orchestration::PipelineStage> = config
4264 .stages
4265 .iter()
4266 .map(|entry| {
4267 let mut stage = crate::orchestration::PipelineStage::id(entry.id());
4268 if let Some(tmpl) = entry.input() {
4269 stage = stage.with_input(tmpl);
4270 }
4271 stage
4272 })
4273 .collect();
4274
4275 let context_mode = config.context_mode.clone().unwrap_or_default();
4277 let context_input = crate::orchestration::context::prepare_delegate_input(
4278 input,
4279 &context_mode,
4280 &*self.memory,
4281 self.llm_registry.get("router").ok().as_deref(),
4282 )
4283 .await?;
4284
4285 let context_values = self.context_manager.get_all();
4286 let result = crate::orchestration::pipeline(
4287 registry,
4288 &context_input,
4289 &stages,
4290 config.timeout_ms,
4291 Some(&*self.hooks),
4292 Some(&context_values),
4293 )
4294 .await?;
4295
4296 let duration_ms = start.elapsed().as_millis() as u64;
4297
4298 let _ = self.context_manager.set(
4300 "pipeline.result",
4301 serde_json::Value::String(result.response.content.clone()),
4302 );
4303
4304 let stages_json: Vec<serde_json::Value> = result
4306 .stage_outputs
4307 .iter()
4308 .map(|s| {
4309 serde_json::json!({
4310 "agent_id": s.agent_id,
4311 "output": s.output,
4312 "duration_ms": s.duration_ms,
4313 "skipped": s.skipped,
4314 })
4315 })
4316 .collect();
4317
4318 let _ = self.context_manager.set(
4320 "orchestration",
4321 serde_json::json!({
4322 "type": "pipeline",
4323 "result": result.response.content,
4324 "stages": stages_json,
4325 "duration_ms": duration_ms,
4326 }),
4327 );
4328
4329 if *self.redispatch_depth.read() == 0 {
4331 self.memory
4332 .add_message(ai_agents_llm::ChatMessage::user(input))
4333 .await?;
4334 }
4335
4336 let post_result = self
4337 .post_loop_processing(input, result.response.content.clone())
4338 .await?;
4339 let final_content = self.apply_post_loop_result(input, post_result).await?;
4340
4341 let mut response = AgentResponse::new(final_content);
4342 let metadata = serde_json::json!({
4343 "orchestration": {
4344 "type": "pipeline",
4345 "result": result.response.content,
4346 "stages": stages_json,
4347 "duration_ms": duration_ms,
4348 }
4349 });
4350 response.metadata = Some(
4351 serde_json::from_value::<std::collections::HashMap<String, serde_json::Value>>(
4352 metadata,
4353 )
4354 .unwrap_or_default(),
4355 );
4356
4357 self.hooks.on_response(&response).await;
4358 Ok(response)
4359 }
4360
4361 async fn handle_handoff_state(
4363 &self,
4364 input: &str,
4365 config: &ai_agents_state::HandoffStateConfig,
4366 ) -> Result<AgentResponse> {
4367 use std::time::Instant;
4368
4369 let registry = self.spawner_registry.as_ref().ok_or_else(|| {
4370 AgentError::Config(
4371 "Handoff state requires an agent registry. Add a spawner section.".into(),
4372 )
4373 })?;
4374
4375 let llm = self
4376 .llm_registry
4377 .get("router")
4378 .map_err(|_| AgentError::Config("Handoff state requires a router LLM.".into()))?;
4379
4380 let start = Instant::now();
4381
4382 let context_mode = config.context_mode.clone().unwrap_or_default();
4384 let context_input = crate::orchestration::context::prepare_delegate_input(
4385 input,
4386 &context_mode,
4387 &*self.memory,
4388 self.llm_registry.get("router").ok().as_deref(),
4389 )
4390 .await?;
4391
4392 let effective_input = if let Some(ref tmpl) = config.input {
4394 render_concurrent_template(tmpl, &context_input, &self.context_manager.get_all())
4395 .unwrap_or_else(|_| context_input.clone())
4396 } else {
4397 context_input
4398 };
4399
4400 let result = crate::orchestration::handoff(
4401 registry,
4402 &effective_input,
4403 &config.initial_agent,
4404 &config.available_agents,
4405 config.max_handoffs,
4406 llm.as_ref(),
4407 Some(&*self.hooks),
4408 )
4409 .await?;
4410
4411 let duration_ms = start.elapsed().as_millis() as u64;
4412
4413 let _ = self.context_manager.set(
4415 "handoff.result",
4416 serde_json::Value::String(result.response.content.clone()),
4417 );
4418
4419 let chain_json: Vec<serde_json::Value> = result
4421 .handoff_chain
4422 .iter()
4423 .map(|h| {
4424 serde_json::json!({
4425 "from": h.from_agent,
4426 "to": h.to_agent,
4427 "reason": h.reason,
4428 })
4429 })
4430 .collect();
4431
4432 let _ = self.context_manager.set(
4434 "orchestration",
4435 serde_json::json!({
4436 "type": "handoff",
4437 "result": result.response.content,
4438 "final_agent": result.final_agent,
4439 "handoff_chain": chain_json,
4440 "duration_ms": duration_ms,
4441 }),
4442 );
4443
4444 if *self.redispatch_depth.read() == 0 {
4446 self.memory
4447 .add_message(ai_agents_llm::ChatMessage::user(input))
4448 .await?;
4449 }
4450
4451 let post_result = self
4452 .post_loop_processing(input, result.response.content.clone())
4453 .await?;
4454 let final_content = self.apply_post_loop_result(input, post_result).await?;
4455
4456 let mut response = AgentResponse::new(final_content);
4457 let metadata = serde_json::json!({
4458 "orchestration": {
4459 "type": "handoff",
4460 "result": result.response.content,
4461 "final_agent": result.final_agent,
4462 "handoff_chain": chain_json,
4463 "duration_ms": duration_ms,
4464 }
4465 });
4466 response.metadata = Some(
4467 serde_json::from_value::<std::collections::HashMap<String, serde_json::Value>>(
4468 metadata,
4469 )
4470 .unwrap_or_default(),
4471 );
4472
4473 self.hooks.on_response(&response).await;
4474 Ok(response)
4475 }
4476
4477 async fn run_loop_internal(&self, input: &str) -> Result<AgentResponse> {
4479 let input_data = self.process_input(input).await?;
4480
4481 for (key, value) in &input_data.context {
4484 let _ = self.context_manager.set(key, value.clone());
4485 }
4486
4487 if input_data.metadata.rejected {
4488 let reason = input_data
4489 .metadata
4490 .rejection_reason
4491 .unwrap_or_else(|| "Input rejected".to_string());
4492 warn!(reason = %reason, "Input rejected");
4493 return Ok(AgentResponse::new(reason));
4494 }
4495
4496 let processed_input = &input_data.content;
4497
4498 if let Some(ref sm) = self.state_machine {
4500 if let Some(def) = sm.current_definition() {
4501 if let Some(ref delegate_id) = def.delegate {
4502 return self
4503 .handle_delegated_state(processed_input, delegate_id, &def)
4504 .await;
4505 }
4506 if let Some(ref concurrent_config) = def.concurrent {
4507 return self
4508 .handle_concurrent_state(processed_input, concurrent_config)
4509 .await;
4510 }
4511 if let Some(ref group_chat_config) = def.group_chat {
4512 return self
4513 .handle_group_chat_state(processed_input, group_chat_config)
4514 .await;
4515 }
4516 if let Some(ref pipeline_config) = def.pipeline {
4517 return self
4518 .handle_pipeline_state(processed_input, pipeline_config)
4519 .await;
4520 }
4521 if let Some(ref handoff_config) = def.handoff {
4522 return self
4523 .handle_handoff_state(processed_input, handoff_config)
4524 .await;
4525 }
4526 }
4527 }
4528
4529 match self.try_skill_route(processed_input).await? {
4530 SkillRouteResult::Response(skill_response) => {
4531 if *self.redispatch_depth.read() == 0 {
4532 self.memory
4533 .add_message(ChatMessage::user(processed_input))
4534 .await?;
4535 }
4536 return self
4537 .handle_skill_response(processed_input, skill_response, &input_data.context)
4538 .await;
4539 }
4540 SkillRouteResult::NeedsClarification(response) => {
4541 if *self.redispatch_depth.read() == 0 {
4542 self.memory
4543 .add_message(ChatMessage::user(processed_input))
4544 .await?;
4545 }
4546 if let Some(q) = response
4547 .metadata
4548 .as_ref()
4549 .and_then(|m| m.get("disambiguation"))
4550 .and_then(|d| d.get("status"))
4551 .and_then(|s| s.as_str())
4552 {
4553 if q == "awaiting_clarification" {
4554 self.memory
4557 .add_message(ChatMessage::assistant(&response.content))
4558 .await?;
4559 }
4560 }
4561 return Ok(response);
4562 }
4563 SkillRouteResult::NoMatch => {} }
4565
4566 let effective_reasoning = self.get_effective_reasoning_config();
4567 let reasoning_mode = self.determine_reasoning_mode(processed_input).await?;
4568 let auto_detected = matches!(effective_reasoning.mode, ReasoningMode::Auto);
4569
4570 info!(
4571 reasoning_mode = ?reasoning_mode,
4572 auto_detected = auto_detected,
4573 reflection_enabled = ?self.reflection_config.enabled,
4574 "Reasoning mode determined"
4575 );
4576
4577 if matches!(reasoning_mode, ReasoningMode::PlanAndExecute) {
4578 if *self.redispatch_depth.read() == 0 {
4579 self.memory
4580 .add_message(ChatMessage::user(processed_input))
4581 .await?;
4582 }
4583 return self
4584 .handle_plan_and_execute(processed_input, &input_data.context, auto_detected)
4585 .await;
4586 }
4587
4588 if *self.redispatch_depth.read() == 0 {
4589 self.memory
4590 .add_message(ChatMessage::user(processed_input))
4591 .await?;
4592 }
4593
4594 let mut iterations = 0u32;
4595 let mut all_tool_calls: Vec<ToolCall> = Vec::new();
4596 let mut thinking_content: Option<String> = None;
4597
4598 let llm = self.get_state_llm()?;
4599
4600 loop {
4601 let effective_max = if reasoning_mode != ReasoningMode::None {
4603 let rc = self.get_effective_reasoning_config();
4604 self.max_iterations.min(rc.max_iterations)
4605 } else {
4606 self.max_iterations
4607 };
4608
4609 if iterations >= effective_max {
4610 let err = AgentError::Other(format!("Max iterations ({}) exceeded", effective_max));
4611 self.hooks.on_error(&err).await;
4612 error!(iterations = iterations, "Max iterations exceeded");
4613 return Err(err);
4614 }
4615 iterations += 1;
4616 *self.iteration_count.write() = iterations;
4617
4618 debug!(iteration = iterations, max = effective_max, "LLM call");
4619
4620 let mut messages = self.build_messages().await?;
4621 self.inject_reasoning_prompt(&mut messages, &reasoning_mode, iterations == 1);
4622
4623 self.hooks.on_llm_start(&messages).await;
4624 let llm_start = Instant::now();
4625
4626 let response = {
4628 let primary_result = if self.recovery_manager.config().default.max_retries > 0 {
4629 self.recovery_manager
4630 .with_retry("llm_call", None, || async {
4631 llm.complete(&messages, None)
4632 .await
4633 .map_err(|e| e.classify())
4634 })
4635 .await
4636 .map_err(|e| AgentError::LLM(e.to_string()))
4637 } else {
4638 llm.complete(&messages, None)
4639 .await
4640 .map_err(|e| AgentError::LLM(e.to_string()))
4641 };
4642
4643 match primary_result {
4644 Ok(resp) => resp,
4645 Err(primary_err) => match &self.recovery_manager.config().llm.on_failure {
4646 LLMFailureAction::FallbackLlm { fallback_llm } => {
4647 warn!(
4648 fallback = %fallback_llm,
4649 error = %primary_err,
4650 "Primary LLM failed, attempting fallback LLM"
4651 );
4652 let fb = self.llm_registry.get(fallback_llm).map_err(|e| {
4653 AgentError::Config(format!(
4654 "Fallback LLM '{}' not found: {}",
4655 fallback_llm, e
4656 ))
4657 })?;
4658 fb.complete(&messages, None)
4659 .await
4660 .map_err(|e| AgentError::LLM(e.to_string()))?
4661 }
4662 LLMFailureAction::FallbackResponse { message } => {
4663 warn!(
4664 error = %primary_err,
4665 "Primary LLM failed, using static fallback response"
4666 );
4667 LLMResponse::new(message.clone(), FinishReason::Stop)
4668 }
4669 LLMFailureAction::Error => {
4670 return Err(primary_err);
4671 }
4672 },
4673 }
4674 };
4675
4676 let llm_duration_ms = llm_start.elapsed().as_millis() as u64;
4677 self.hooks.on_llm_complete(&response, llm_duration_ms).await;
4678
4679 let content = response.content.trim();
4680
4681 if let Some(tool_calls) = self.parse_tool_calls(content) {
4682 match self
4683 .handle_tool_calls(processed_input, content, tool_calls, &mut all_tool_calls)
4684 .await?
4685 {
4686 ToolCallOutcome::Continue | ToolCallOutcome::TransitionFired => continue,
4687 ToolCallOutcome::Rejected(resp) => return Ok(resp),
4688 }
4689 }
4690
4691 let (extracted_thinking, answer) = self.extract_thinking(content);
4692 if extracted_thinking.is_some() {
4693 thinking_content = extracted_thinking;
4694 }
4695
4696 let output_data = self.process_output(&answer, &input_data.context).await?;
4697
4698 let mut final_content = if output_data.metadata.rejected {
4699 output_data
4700 .metadata
4701 .rejection_reason
4702 .unwrap_or_else(|| answer.to_string())
4703 } else {
4704 output_data.content
4705 };
4706
4707 let reflection_metadata;
4709 (final_content, reflection_metadata) = self
4710 .run_reflection(&*llm, processed_input, final_content)
4711 .await?;
4712
4713 final_content =
4714 self.format_response_with_thinking(thinking_content.as_deref(), &final_content);
4715
4716 let final_content = {
4720 let result = self
4721 .post_loop_processing(processed_input, final_content)
4722 .await?;
4723 self.apply_post_loop_result(processed_input, result).await?
4724 };
4725
4726 let reflected = reflection_metadata.is_some();
4727 let reasoning_mode_debug = format!("{:?}", reasoning_mode);
4728
4729 let response = self.build_agent_response(
4730 final_content,
4731 all_tool_calls,
4732 reasoning_mode,
4733 auto_detected,
4734 iterations,
4735 thinking_content,
4736 reflection_metadata,
4737 );
4738
4739 self.hooks.on_response(&response).await;
4740
4741 let tool_call_count = response.tool_calls.as_ref().map(|tc| tc.len()).unwrap_or(0);
4742 info!(
4743 tool_calls = tool_call_count,
4744 response_len = response.content.len(),
4745 reasoning_mode = %reasoning_mode_debug,
4746 reflected = reflected,
4747 "Chat completed"
4748 );
4749 return Ok(response);
4750 }
4751 }
4752
4753 fn run_loop_internal_stream<'a>(
4757 &'a self,
4758 input: &'a str,
4759 ) -> Pin<Box<dyn Stream<Item = StreamChunk> + Send + 'a>> {
4760 let include_tool_events = self.streaming.include_tool_events;
4761 let include_state_events = self.streaming.include_state_events;
4762
4763 Box::pin(async_stream::stream! {
4764 let input_data = match self.process_input(input).await {
4765 Ok(data) => data,
4766 Err(e) => {
4767 yield StreamChunk::error(e.to_string());
4768 return;
4769 }
4770 };
4771
4772 for (key, value) in &input_data.context {
4774 let _ = self.context_manager.set(key, value.clone());
4775 }
4776
4777 if input_data.metadata.rejected {
4778 let reason = input_data
4779 .metadata
4780 .rejection_reason
4781 .unwrap_or_else(|| "Input rejected".to_string());
4782 warn!(reason = %reason, "Input rejected (stream)");
4783 yield StreamChunk::error(reason);
4784 return;
4785 }
4786
4787 let processed_input = &input_data.content;
4788
4789 if let Some(ref sm) = self.state_machine {
4791 if let Some(def) = sm.current_definition() {
4792 let orchestration_result = if let Some(ref delegate_id) = def.delegate {
4793 Some(self.handle_delegated_state(processed_input, delegate_id, &def).await)
4794 } else if let Some(ref concurrent_config) = def.concurrent {
4795 Some(self.handle_concurrent_state(processed_input, concurrent_config).await)
4796 } else if let Some(ref group_chat_config) = def.group_chat {
4797 Some(self.handle_group_chat_state(processed_input, group_chat_config).await)
4798 } else if let Some(ref pipeline_config) = def.pipeline {
4799 Some(self.handle_pipeline_state(processed_input, pipeline_config).await)
4800 } else if let Some(ref handoff_config) = def.handoff {
4801 Some(self.handle_handoff_state(processed_input, handoff_config).await)
4802 } else {
4803 None
4804 };
4805
4806 if let Some(result) = orchestration_result {
4807 match result {
4808 Ok(response) => {
4809 yield StreamChunk::content(&response.content);
4810 yield StreamChunk::Done {};
4811 }
4812 Err(e) => {
4813 yield StreamChunk::error(e.to_string());
4814 }
4815 }
4816 return;
4817 }
4818 }
4819 }
4820
4821 match self.try_skill_route(processed_input).await {
4823 Ok(SkillRouteResult::Response(skill_response)) => {
4824 if *self.redispatch_depth.read() == 0 {
4825 let _ = self.memory.add_message(ChatMessage::user(processed_input)).await;
4826 }
4827 match self.handle_skill_response(processed_input, skill_response, &input_data.context).await {
4828 Ok(resp) => {
4829 yield StreamChunk::content(&resp.content);
4830 yield StreamChunk::Done {};
4831 return;
4832 }
4833 Err(e) => {
4834 yield StreamChunk::error(e.to_string());
4835 return;
4836 }
4837 }
4838 }
4839 Ok(SkillRouteResult::NeedsClarification(response)) => {
4840 if *self.redispatch_depth.read() == 0 {
4841 let _ = self.memory.add_message(ChatMessage::user(processed_input)).await;
4842 }
4843 let _ = self.memory.add_message(ChatMessage::assistant(&response.content)).await;
4844 yield StreamChunk::content(&response.content);
4845 yield StreamChunk::Done {};
4846 return;
4847 }
4848 Ok(SkillRouteResult::NoMatch) => {} Err(e) => {
4850 yield StreamChunk::error(e.to_string());
4851 return;
4852 }
4853 }
4854
4855 let effective_reasoning = self.get_effective_reasoning_config();
4857 let reasoning_mode = match self.determine_reasoning_mode(processed_input).await {
4858 Ok(mode) => mode,
4859 Err(e) => {
4860 yield StreamChunk::error(e.to_string());
4861 return;
4862 }
4863 };
4864 let auto_detected = matches!(effective_reasoning.mode, ReasoningMode::Auto);
4865
4866 info!(
4867 reasoning_mode = ?reasoning_mode,
4868 auto_detected = auto_detected,
4869 "Reasoning mode determined (stream)"
4870 );
4871
4872 if matches!(reasoning_mode, ReasoningMode::PlanAndExecute) {
4874 if *self.redispatch_depth.read() == 0 {
4875 let _ = self.memory.add_message(ChatMessage::user(processed_input)).await;
4876 }
4877 match self.handle_plan_and_execute(processed_input, &input_data.context, auto_detected).await {
4878 Ok(resp) => {
4879 yield StreamChunk::content(&resp.content);
4880 yield StreamChunk::Done {};
4881 return;
4882 }
4883 Err(e) => {
4884 yield StreamChunk::error(e.to_string());
4885 return;
4886 }
4887 }
4888 }
4889
4890 if *self.redispatch_depth.read() == 0 {
4891 let _ = self.memory.add_message(ChatMessage::user(processed_input)).await;
4892 }
4893
4894 let llm = match self.get_state_llm() {
4895 Ok(llm) => llm,
4896 Err(e) => {
4897 yield StreamChunk::error(e.to_string());
4898 return;
4899 }
4900 };
4901
4902 let mut iterations = 0u32;
4903 let mut all_tool_calls: Vec<ToolCall> = Vec::new();
4904 let mut thinking_content: Option<String> = None;
4905
4906 loop {
4907 let effective_max = if reasoning_mode != ReasoningMode::None {
4909 let rc = self.get_effective_reasoning_config();
4910 self.max_iterations.min(rc.max_iterations)
4911 } else {
4912 self.max_iterations
4913 };
4914
4915 if iterations >= effective_max {
4916 let err_msg = format!("Max iterations ({}) exceeded", effective_max);
4917 let err = AgentError::Other(err_msg.clone());
4918 self.hooks.on_error(&err).await;
4919 error!(iterations = iterations, "Max iterations exceeded (stream)");
4920 yield StreamChunk::error(err_msg);
4921 return;
4922 }
4923 iterations += 1;
4924 *self.iteration_count.write() = iterations;
4925
4926 debug!(iteration = iterations, max = effective_max, "LLM call (stream)");
4927
4928 let mut messages = match self.build_messages().await {
4929 Ok(m) => m,
4930 Err(e) => {
4931 yield StreamChunk::error(e.to_string());
4932 return;
4933 }
4934 };
4935 self.inject_reasoning_prompt(&mut messages, &reasoning_mode, iterations == 1);
4936
4937 self.hooks.on_llm_start(&messages).await;
4938 let llm_start = Instant::now();
4939
4940 let reflection_active = match self.should_reflect(processed_input, "").await {
4943 Ok(v) => v,
4944 Err(_) => false,
4945 };
4946
4947 let content = if reflection_active {
4948 let response = match llm.complete(&messages, None).await {
4950 Ok(r) => r,
4951 Err(e) => {
4952 yield StreamChunk::error(e.to_string());
4953 return;
4954 }
4955 };
4956 let llm_duration_ms = llm_start.elapsed().as_millis() as u64;
4957 self.hooks.on_llm_complete(&response, llm_duration_ms).await;
4958 response.content.trim().to_string()
4959 } else {
4960 let llm_stream = match llm.complete_stream(&messages, None).await {
4962 Ok(s) => s,
4963 Err(e) => {
4964 yield StreamChunk::error(e.to_string());
4965 return;
4966 }
4967 };
4968 let mut accumulated = String::new();
4969 let mut stream_inner = llm_stream;
4970 while let Some(chunk_result) = stream_inner.next().await {
4971 match chunk_result {
4972 Ok(chunk) => {
4973 accumulated.push_str(&chunk.delta);
4974 yield StreamChunk::content(chunk.delta);
4975 }
4976 Err(e) => {
4977 yield StreamChunk::error(e.to_string());
4978 return;
4979 }
4980 }
4981 }
4982 let llm_duration_ms = llm_start.elapsed().as_millis() as u64;
4983 let llm_response = ai_agents_core::LLMResponse::new(
4985 accumulated.trim(),
4986 ai_agents_core::FinishReason::Stop,
4987 );
4988 self.hooks.on_llm_complete(&llm_response, llm_duration_ms).await;
4989 accumulated.trim().to_string()
4990 };
4991
4992 if let Some(tool_calls) = self.parse_tool_calls(&content) {
4994 let transition_fired = match self.evaluate_transitions(processed_input, &content).await {
4997 Ok(v) => v,
4998 Err(e) => {
4999 yield StreamChunk::error(e.to_string());
5000 return;
5001 }
5002 };
5003 if transition_fired {
5004 let _ = self.memory.add_message(ChatMessage::assistant(
5005 "(Transitioned to new state — tool call handled by workflow)",
5006 )).await;
5007
5008 if include_state_events {
5009 if let Some(state) = self.current_state() {
5010 yield StreamChunk::state_transition(None, state);
5011 }
5012 }
5013 continue;
5014 }
5015
5016 let _ = self.memory.add_message(ChatMessage::assistant(&content)).await;
5018
5019 let results = self.execute_tools_parallel(&tool_calls).await;
5021
5022 for ((_id, result), tool_call) in results.into_iter().zip(tool_calls.iter()) {
5023 if include_tool_events {
5024 yield StreamChunk::tool_start(&tool_call.id, &tool_call.name);
5025 }
5026
5027 match result {
5028 Ok(output) => {
5029 if include_tool_events {
5030 yield StreamChunk::tool_result(
5031 &tool_call.id,
5032 &tool_call.name,
5033 &output,
5034 true,
5035 );
5036 }
5037 let _ = self.memory
5038 .add_message(ChatMessage::function(&tool_call.name, &output))
5039 .await;
5040 }
5041 Err(e) => {
5042 if matches!(e, AgentError::HITLRejected(_)) {
5043 let _ = self.memory.add_message(ChatMessage::assistant(
5044 &format!("The operation was rejected by the approver: {}", e),
5045 )).await;
5046 yield StreamChunk::error(format!("Operation cancelled: {}", e));
5047 yield StreamChunk::Done {};
5048 return;
5049 }
5050 if include_tool_events {
5051 yield StreamChunk::tool_result(
5052 &tool_call.id,
5053 &tool_call.name,
5054 &e.to_string(),
5055 false,
5056 );
5057 }
5058 let _ = self.memory
5059 .add_message(ChatMessage::function(
5060 &tool_call.name,
5061 &format!("Error: {}", e),
5062 ))
5063 .await;
5064 }
5065 }
5066 all_tool_calls.push(tool_call.clone());
5067
5068 if include_tool_events {
5069 yield StreamChunk::tool_end(&tool_call.id);
5070 }
5071 }
5072 continue;
5073 }
5074
5075 let (extracted_thinking, answer) = self.extract_thinking(&content);
5077 if extracted_thinking.is_some() {
5078 thinking_content = extracted_thinking;
5079 }
5080
5081 let output_data = match self.process_output(&answer, &input_data.context).await {
5082 Ok(d) => d,
5083 Err(e) => {
5084 yield StreamChunk::error(e.to_string());
5085 return;
5086 }
5087 };
5088
5089 let final_content = if output_data.metadata.rejected {
5090 output_data
5091 .metadata
5092 .rejection_reason
5093 .unwrap_or_else(|| answer.to_string())
5094 } else {
5095 output_data.content
5096 };
5097
5098 let (final_content, _reflection_metadata) = match self
5100 .run_reflection(&*llm, processed_input, final_content)
5101 .await
5102 {
5103 Ok(r) => r,
5104 Err(e) => {
5105 yield StreamChunk::error(e.to_string());
5106 return;
5107 }
5108 };
5109
5110 let final_content = self.format_response_with_thinking(
5111 thinking_content.as_deref(),
5112 &final_content,
5113 );
5114
5115 if reflection_active {
5117 yield StreamChunk::content(&final_content);
5118 }
5119
5120 let post_result = match self
5124 .post_loop_processing(processed_input, final_content)
5125 .await
5126 {
5127 Ok(r) => r,
5128 Err(e) => {
5129 yield StreamChunk::error(e.to_string());
5130 return;
5131 }
5132 };
5133
5134 let (final_content, transitioned) = match post_result {
5135 PostLoopResult::NoTransition(content) => (content, false),
5136 PostLoopResult::Transitioned(content) => (content, true),
5137 PostLoopResult::NeedsRedispatch => {
5138 const MAX_REDISPATCH_DEPTH: u32 = 3;
5139 let current_depth = *self.redispatch_depth.read();
5140 let content = if current_depth >= MAX_REDISPATCH_DEPTH {
5141 warn!(
5142 depth = current_depth,
5143 "Post-transition re-dispatch depth limit reached (stream)"
5144 );
5145 let c = String::new();
5146 let _ = self.memory.add_message(ChatMessage::assistant(&c)).await;
5147 c
5148 } else {
5149 *self.redispatch_depth.write() += 1;
5150 info!(
5151 depth = current_depth + 1,
5152 "Re-dispatching for new state after transition (stream)"
5153 );
5154 let result = self.run_loop_internal(processed_input).await;
5155 *self.redispatch_depth.write() -= 1;
5156 match result {
5157 Ok(resp) => resp.content,
5158 Err(e) => {
5159 yield StreamChunk::error(e.to_string());
5160 return;
5161 }
5162 }
5163 };
5164 (content, true)
5165 }
5166 };
5167
5168 if transitioned {
5169 if include_state_events {
5170 if let Some(state) = self.current_state() {
5171 yield StreamChunk::state_transition(None, state);
5172 }
5173 }
5174 yield StreamChunk::content(&final_content);
5176 }
5177
5178 yield StreamChunk::Done {};
5179 return;
5180 }
5181 })
5182 }
5183
5184 fn run_loop_stream<'a>(
5187 &'a self,
5188 input: &'a str,
5189 ) -> Pin<Box<dyn Stream<Item = StreamChunk> + Send + 'a>> {
5190 Box::pin(async_stream::stream! {
5191 self.hooks.on_message_received(input).await;
5192
5193 if !self.context_initialized.swap(true, Ordering::SeqCst) {
5195 if let Err(e) = self.context_manager.initialize().await {
5196 yield StreamChunk::error(e.to_string());
5197 return;
5198 }
5199 debug!("Context manager initialized (defaults, env, builtins)");
5200 }
5201
5202 if let Err(e) = self.check_turn_timeout().await {
5203 yield StreamChunk::error(e.to_string());
5204 return;
5205 }
5206 if let Err(e) = self.context_manager.refresh_per_turn().await {
5207 yield StreamChunk::error(e.to_string());
5208 return;
5209 }
5210
5211 self.clear_disambiguation_context();
5213
5214 if let Some(ref disambiguator) = self.disambiguation_manager {
5216 let disambiguation_context = match self.build_disambiguation_context().await {
5217 Ok(ctx) => ctx,
5218 Err(e) => {
5219 yield StreamChunk::error(e.to_string());
5220 return;
5221 }
5222 };
5223
5224 let state_override = self
5225 .state_machine
5226 .as_ref()
5227 .and_then(|sm| sm.current_definition())
5228 .and_then(|def| def.disambiguation.clone());
5229
5230 let result = match disambiguator
5231 .process_input_with_override(
5232 input,
5233 &disambiguation_context,
5234 state_override.as_ref(),
5235 None,
5236 )
5237 .await
5238 {
5239 Ok(r) => r,
5240 Err(e) => {
5241 yield StreamChunk::error(e.to_string());
5242 return;
5243 }
5244 };
5245
5246 match result {
5247 DisambiguationResult::Clear => {
5248 debug!("Input is clear, proceeding normally (stream)");
5249 }
5250 DisambiguationResult::NeedsClarification {
5251 question,
5252 detection,
5253 } => {
5254 info!(
5255 ambiguity_type = ?detection.ambiguity_type,
5256 confidence = detection.confidence,
5257 "Input requires clarification (stream)"
5258 );
5259 let _ = self.memory.add_message(ChatMessage::user(input)).await;
5260 let _ = self
5261 .memory
5262 .add_message(ChatMessage::assistant(&question.question))
5263 .await;
5264 yield StreamChunk::content(&question.question);
5265 yield StreamChunk::Done {};
5266 return;
5267 }
5268 DisambiguationResult::Clarified {
5269 enriched_input,
5270 resolved,
5271 ..
5272 } => {
5273 info!(
5274 resolved_count = resolved.len(),
5275 enriched = %enriched_input,
5276 "Input clarified (stream)"
5277 );
5278 for (key, value) in &resolved {
5279 let context_key = format!("disambiguation.{}", key);
5280 let _ = self.context_manager.set(&context_key, value.clone());
5281 }
5282 if let Some(intent) = resolved.get("intent") {
5283 let _ = self.context_manager.set("resolved_intent", intent.clone());
5284 }
5285 let _ = self
5286 .context_manager
5287 .set("disambiguation.resolved", serde_json::Value::Bool(true));
5288
5289 let skill_id = self.pending_skill_id.read().clone();
5293 if let Some(skill_id) = skill_id {
5294 info!(skill_id = %skill_id, "Re-checking skill disambiguation on clarified input (stream)");
5295 match self.recheck_skill_disambiguation(&skill_id, &enriched_input).await {
5296 Ok(resp) => {
5297 yield StreamChunk::content(&resp.content);
5298 yield StreamChunk::Done {};
5299 return;
5300 }
5301 Err(e) => {
5302 yield StreamChunk::error(e.to_string());
5303 return;
5304 }
5305 }
5306 }
5307
5308 let mut inner = self.run_loop_internal_stream(&enriched_input);
5310 while let Some(chunk) = inner.next().await {
5311 yield chunk;
5312 }
5313 return;
5314 }
5315 DisambiguationResult::ProceedWithBestGuess { enriched_input } => {
5316 info!("Proceeding with best guess (stream)");
5317
5318 let skill_id = self.pending_skill_id.read().clone();
5320 if let Some(skill_id) = skill_id {
5321 info!(skill_id = %skill_id, "Re-checking skill disambiguation on best-guess input (stream)");
5322 match self.recheck_skill_disambiguation(&skill_id, &enriched_input).await {
5323 Ok(resp) => {
5324 yield StreamChunk::content(&resp.content);
5325 yield StreamChunk::Done {};
5326 return;
5327 }
5328 Err(e) => {
5329 yield StreamChunk::error(e.to_string());
5330 return;
5331 }
5332 }
5333 }
5334
5335 let mut inner = self.run_loop_internal_stream(&enriched_input);
5336 while let Some(chunk) = inner.next().await {
5337 yield chunk;
5338 }
5339 return;
5340 }
5341 DisambiguationResult::GiveUp { reason } => {
5342 *self.pending_skill_id.write() = None;
5343 warn!(reason = %reason, "Disambiguation gave up (stream)");
5344 let apology = self
5345 .generate_localized_apology(
5346 "Generate a brief, polite apology saying you couldn't understand the request. Be concise.",
5347 &reason,
5348 )
5349 .await
5350 .unwrap_or_else(|_| {
5351 format!("I'm sorry, I couldn't understand your request: {}", reason)
5352 });
5353 yield StreamChunk::content(&apology);
5354 yield StreamChunk::Done {};
5355 return;
5356 }
5357 DisambiguationResult::Escalate { reason } => {
5358 *self.pending_skill_id.write() = None;
5359 info!(reason = %reason, "Escalating to human (stream)");
5360 if let Some(ref hitl) = self.hitl_engine {
5361 let trigger =
5362 ApprovalTrigger::condition("disambiguation_escalation", reason.clone());
5363 let mut context_map = HashMap::new();
5364 context_map.insert("original_input".to_string(), serde_json::json!(input));
5365 context_map.insert("reason".to_string(), serde_json::json!(&reason));
5366 let check_result = HITLCheckResult::required(
5367 trigger,
5368 context_map,
5369 format!("User request needs human assistance: {}", reason),
5370 Some(hitl.config().default_timeout_seconds),
5371 );
5372 match self.request_hitl_approval(check_result).await {
5373 Ok(result) if matches!(result, ApprovalResult::Approved | ApprovalResult::Modified { .. }) => {
5374 let mut inner = self.run_loop_internal_stream(input);
5375 while let Some(chunk) = inner.next().await {
5376 yield chunk;
5377 }
5378 return;
5379 }
5380 Ok(_) => {}
5381 Err(e) => {
5382 yield StreamChunk::error(e.to_string());
5383 return;
5384 }
5385 }
5386 }
5387 let apology = self
5388 .generate_localized_apology(
5389 "Explain briefly that you're transferring the user to a human agent for help.",
5390 &reason,
5391 )
5392 .await
5393 .unwrap_or_else(|_| {
5394 format!("I need human assistance to help with your request: {}", reason)
5395 });
5396 yield StreamChunk::content(&apology);
5397 yield StreamChunk::Done {};
5398 return;
5399 }
5400 DisambiguationResult::Abandoned { new_input } => {
5401 *self.pending_skill_id.write() = None;
5402
5403 info!(
5404 has_new_input = new_input.is_some(),
5405 "Clarification abandoned by user (stream)"
5406 );
5407
5408 let _ = self.memory.add_message(ChatMessage::user(input)).await;
5409
5410 match new_input {
5411 Some(fresh_input) => {
5412 let mut inner = self.run_loop_internal_stream(&fresh_input);
5414 while let Some(chunk) = inner.next().await {
5415 yield chunk;
5416 }
5417 return;
5418 }
5419 None => {
5420 let ack = self
5422 .generate_localized_apology(
5423 "The user changed their mind about their previous request. \
5424 Generate a brief, friendly acknowledgment (e.g. 'OK, no problem. What else can I help with?'). \
5425 Do NOT apologize excessively. Be concise.",
5426 "User abandoned clarification",
5427 )
5428 .await
5429 .unwrap_or_else(|_| {
5430 "OK, no problem. What else can I help with?".to_string()
5431 });
5432
5433 let _ = self
5434 .memory
5435 .add_message(ChatMessage::assistant(&ack))
5436 .await;
5437
5438 yield StreamChunk::content(&ack);
5439 yield StreamChunk::Done {};
5440 return;
5441 }
5442 }
5443 }
5444 }
5445 }
5446
5447 let mut inner = self.run_loop_internal_stream(input);
5449 while let Some(chunk) = inner.next().await {
5450 yield chunk;
5451 }
5452 })
5453 }
5454
5455 pub fn info(&self) -> AgentInfo {
5456 self.info.clone()
5457 }
5458
5459 pub fn skills(&self) -> &[SkillDefinition] {
5460 &self.skills
5461 }
5462
5463 pub async fn reset(&self) -> Result<()> {
5464 self.memory.clear().await?;
5465 *self.iteration_count.write() = 0;
5466 self.tool_call_history.write().clear();
5467 *self.pending_skill_id.write() = None;
5468 if let Some(ref sm) = self.state_machine {
5469 sm.reset();
5470 }
5471 Ok(())
5472 }
5473
5474 pub fn max_context_tokens(&self) -> u32 {
5475 self.max_context_tokens
5476 }
5477
5478 pub fn llm_registry(&self) -> &Arc<LLMRegistry> {
5479 &self.llm_registry
5480 }
5481
5482 pub fn state_machine(&self) -> Option<&Arc<StateMachine>> {
5483 self.state_machine.as_ref()
5484 }
5485
5486 pub fn context_manager(&self) -> &Arc<ContextManager> {
5487 &self.context_manager
5488 }
5489
5490 pub fn tool_call_history(&self) -> Vec<ToolCallRecord> {
5491 self.tool_call_history.read().clone()
5492 }
5493
5494 pub fn memory_token_budget(&self) -> Option<&MemoryTokenBudget> {
5495 self.memory_token_budget.as_ref()
5496 }
5497
5498 pub fn parallel_tools_config(&self) -> &ParallelToolsConfig {
5499 &self.parallel_tools
5500 }
5501
5502 pub fn streaming_config(&self) -> &StreamingConfig {
5503 &self.streaming
5504 }
5505
5506 pub fn hooks(&self) -> &Arc<dyn AgentHooks> {
5507 &self.hooks
5508 }
5509
5510 pub fn hitl_engine(&self) -> Option<&HITLEngine> {
5511 self.hitl_engine.as_ref()
5512 }
5513
5514 pub fn approval_handler(&self) -> &Arc<dyn ApprovalHandler> {
5515 &self.approval_handler
5516 }
5517
5518 fn build_hitl_language_context(&self) -> HashMap<String, Value> {
5520 let mut ctx = HashMap::new();
5521 for key in &["user.language", "input.detected.language", "language"] {
5522 if let Some(val) = self.context_manager.get(key) {
5523 ctx.insert(key.to_string(), val);
5524 }
5525 }
5526 ctx
5527 }
5528
5529 async fn request_hitl_approval(&self, check_result: HITLCheckResult) -> Result<ApprovalResult> {
5531 let Some(request) = check_result.into_request() else {
5532 return Ok(ApprovalResult::Approved);
5533 };
5534
5535 self.hooks.on_approval_requested(&request).await;
5536
5537 let request_id = request.id.clone();
5538 let timeout = request.timeout;
5539
5540 let result = if let Some(duration) = timeout {
5541 match tokio::time::timeout(duration, self.approval_handler.request_approval(request))
5542 .await
5543 {
5544 Ok(result) => result,
5545 Err(_) => ApprovalResult::timeout(),
5546 }
5547 } else {
5548 self.approval_handler.request_approval(request).await
5549 };
5550
5551 self.hooks.on_approval_result(&request_id, &result).await;
5552
5553 let result = match result {
5555 ApprovalResult::Timeout => {
5556 if let Some(ref engine) = self.hitl_engine {
5557 match engine.config().on_timeout {
5558 TimeoutAction::Approve => ApprovalResult::Approved,
5559 TimeoutAction::Reject => ApprovalResult::Rejected {
5560 reason: Some("Timeout".to_string()),
5561 },
5562 TimeoutAction::Error => {
5563 return Err(AgentError::Other("HITL approval timeout".to_string()));
5564 }
5565 }
5566 } else {
5567 ApprovalResult::Rejected {
5568 reason: Some("Timeout (no engine)".to_string()),
5569 }
5570 }
5571 }
5572 other => other,
5573 };
5574
5575 Ok(result)
5576 }
5577
5578 pub async fn check_state_hitl(&self, from: Option<&str>, to: &str) -> Result<bool> {
5579 if let Some(ref hitl_engine) = self.hitl_engine {
5580 let hitl_lang_ctx = self.build_hitl_language_context();
5581 let check_result = hitl_engine
5582 .check_state_transition_with_localization(
5583 from,
5584 to,
5585 &hitl_lang_ctx,
5586 self.approval_handler.as_ref(),
5587 Some(&self.llm_registry),
5588 )
5589 .await?;
5590 if check_result.is_required() {
5591 let result = self.request_hitl_approval(check_result).await?;
5592 return Ok(matches!(
5593 result,
5594 ApprovalResult::Approved | ApprovalResult::Modified { .. }
5595 ));
5596 }
5597 }
5598 Ok(true)
5599 }
5600
5601 async fn execute_tools_parallel(
5603 &self,
5604 tool_calls: &[ToolCall],
5605 ) -> Vec<(String, Result<String>)> {
5606 if !self.parallel_tools.enabled || tool_calls.len() <= 1 {
5607 let mut results = Vec::new();
5608 for tc in tool_calls {
5609 let result = self.execute_tool_smart(tc).await;
5610 results.push((tc.id.clone(), result));
5611 }
5612 return results;
5613 }
5614
5615 let chunks: Vec<_> = tool_calls
5616 .chunks(self.parallel_tools.max_parallel)
5617 .collect();
5618
5619 let mut all_results = Vec::new();
5620
5621 for chunk in chunks {
5622 let futures: Vec<_> = chunk
5623 .iter()
5624 .map(|tc| {
5625 let tc = tc.clone();
5626 async move {
5627 let result = self.execute_tool_smart(&tc).await;
5628 (tc.id.clone(), result)
5629 }
5630 })
5631 .collect();
5632
5633 let results = futures::future::join_all(futures).await;
5634 all_results.extend(results);
5635 }
5636
5637 all_results
5638 }
5639
5640 pub async fn chat_stream<'a>(
5642 &'a self,
5643 input: &'a str,
5644 ) -> Result<Pin<Box<dyn Stream<Item = StreamChunk> + Send + 'a>>> {
5645 info!(input_len = input.len(), "Starting streaming chat");
5646 Ok(self.run_loop_stream(input))
5647 }
5648}
5649
5650#[async_trait]
5651impl Agent for RuntimeAgent {
5652 async fn chat(&self, input: &str) -> Result<AgentResponse> {
5653 self.run_loop(input).await
5654 }
5655
5656 fn info(&self) -> AgentInfo {
5657 self.info.clone()
5658 }
5659
5660 async fn reset(&self) -> Result<()> {
5661 self.memory.clear().await?;
5662 *self.iteration_count.write() = 0;
5663 self.tool_call_history.write().clear();
5664 if let Some(ref sm) = self.state_machine {
5665 sm.reset();
5666 }
5667 Ok(())
5668 }
5669}
5670
5671fn render_concurrent_template(
5680 template: &str,
5681 user_input: &str,
5682 context_values: &std::collections::HashMap<String, serde_json::Value>,
5683) -> Result<String> {
5684 let mut env = minijinja::Environment::new();
5685 env.add_template("concurrent", template)
5686 .map_err(|e| AgentError::Other(format!("Concurrent template parse error: {}", e)))?;
5687
5688 let mut ctx = std::collections::BTreeMap::new();
5689 ctx.insert("user_input".to_string(), minijinja::Value::from(user_input));
5690
5691 let context_obj = minijinja::Value::from_serialize(context_values);
5693 ctx.insert("context".to_string(), context_obj);
5694
5695 let tmpl = env
5696 .get_template("concurrent")
5697 .map_err(|e| AgentError::Other(format!("Concurrent template error: {}", e)))?;
5698
5699 tmpl.render(minijinja::Value::from_serialize(&ctx))
5700 .map_err(|e| AgentError::Other(format!("Concurrent template render error: {}", e)))
5701}
5702
5703#[cfg(test)]
5704mod tests {
5705 use super::*;
5706 use crate::AgentBuilder;
5707 use ai_agents_llm::mock::MockLLMProvider;
5708
5709 fn mock_with_response(response: &str) -> MockLLMProvider {
5710 let mut mock = MockLLMProvider::new("test");
5711 mock.set_response(response);
5712 mock
5713 }
5714
5715 fn mock_with_responses(responses: Vec<&str>) -> MockLLMProvider {
5716 let mut mock = MockLLMProvider::new("test");
5717 mock.set_responses(responses.into_iter().map(String::from).collect(), true);
5718 mock
5719 }
5720
5721 #[tokio::test]
5723 async fn test_integration_yaml_to_chat_basic() {
5724 let mock = mock_with_response("Hello! How can I help you?");
5725 let agent = AgentBuilder::new()
5726 .system_prompt("You are a test assistant.")
5727 .llm(Arc::new(mock))
5728 .build()
5729 .unwrap();
5730
5731 let response = agent.chat("Hi").await.unwrap();
5732 assert!(!response.content.is_empty());
5733 assert_eq!(response.content, "Hello! How can I help you?");
5734 }
5735
5736 #[tokio::test]
5738 async fn test_integration_multi_turn_conversation() {
5739 let mock = mock_with_responses(vec![
5740 "Hello! I'm your assistant.",
5741 "The weather is sunny today.",
5742 "Goodbye!",
5743 ]);
5744 let agent = AgentBuilder::new()
5745 .system_prompt("You are helpful.")
5746 .llm(Arc::new(mock))
5747 .build()
5748 .unwrap();
5749
5750 let r1 = agent.chat("Hi").await.unwrap();
5751 assert_eq!(r1.content, "Hello! I'm your assistant.");
5752
5753 let r2 = agent.chat("What's the weather?").await.unwrap();
5754 assert_eq!(r2.content, "The weather is sunny today.");
5755
5756 let r3 = agent.chat("Bye").await.unwrap();
5757 assert_eq!(r3.content, "Goodbye!");
5758
5759 let messages = agent.memory.get_messages(None).await.unwrap();
5761 assert_eq!(messages.len(), 6);
5763 }
5764
5765 #[tokio::test]
5767 async fn test_integration_tool_execution() {
5768 let mock = mock_with_responses(vec![
5770 r#"I'll calculate that for you.
5772[TOOL_CALL: {"name": "calculator", "arguments": {"expression": "2+2"}}]"#,
5773 "The answer is 4.",
5775 ]);
5776 let mut tools = ai_agents_tools::ToolRegistry::new();
5777 tools
5778 .register(Arc::new(ai_agents_tools::CalculatorTool))
5779 .unwrap();
5780
5781 let agent = AgentBuilder::new()
5782 .system_prompt("You are a calculator assistant.")
5783 .llm(Arc::new(mock))
5784 .tools(tools)
5785 .build()
5786 .unwrap();
5787
5788 let response = agent.chat("What is 2+2?").await.unwrap();
5789 assert!(!response.content.is_empty());
5791 }
5792
5793 #[tokio::test]
5795 async fn test_integration_state_machine_basic() {
5796 let yaml = r#"
5797name: StateAgent
5798system_prompt: "You are a support agent."
5799states:
5800 initial: greeting
5801 states:
5802 greeting:
5803 prompt: "Welcome the user warmly."
5804 transitions:
5805 - to: support
5806 when: "User needs help"
5807 auto: true
5808 support:
5809 prompt: "Help solve the user's problem."
5810"#;
5811 let mock = mock_with_responses(vec![
5812 "Welcome! How can I help?", "1", "I'll help you with that.", ]);
5816 let builder = AgentBuilder::from_yaml(yaml).unwrap();
5817 let agent = builder.llm(Arc::new(mock)).build().unwrap();
5818
5819 assert_eq!(agent.current_state(), Some("greeting".to_string()));
5820 let _ = agent.chat("I need help").await.unwrap();
5821 }
5824
5825 #[tokio::test]
5827 async fn test_integration_state_on_enter_set_context() {
5828 let yaml = r#"
5829name: ActionAgent
5830system_prompt: "You are helpful."
5831states:
5832 initial: step1
5833 states:
5834 step1:
5835 prompt: "Step 1"
5836 on_exit:
5837 - set_context:
5838 step1_exited: true
5839 transitions:
5840 - to: step2
5841 when: "always"
5842 auto: true
5843 step2:
5844 prompt: "Step 2"
5845 on_enter:
5846 - set_context:
5847 step2_entered: true
5848"#;
5849 let mock = mock_with_responses(vec![
5851 "Processing step 1.",
5852 "0", ]);
5854 let builder = AgentBuilder::from_yaml(yaml).unwrap();
5855 let agent = builder.llm(Arc::new(mock)).build().unwrap();
5856
5857 assert_eq!(agent.current_state(), Some("step1".to_string()));
5858
5859 agent.transition_to("step2").await.unwrap();
5861
5862 assert_eq!(agent.current_state(), Some("step2".to_string()));
5863
5864 let ctx = agent.get_context();
5866 assert_eq!(ctx.get("step1_exited"), Some(&serde_json::json!(true)));
5867 assert_eq!(ctx.get("step2_entered"), Some(&serde_json::json!(true)));
5868 }
5869
5870 #[tokio::test]
5872 async fn test_integration_process_normalize() {
5873 let yaml = r#"
5874name: ProcessAgent
5875system_prompt: "You are helpful."
5876process:
5877 input:
5878 - type: normalize
5879 config:
5880 trim: true
5881 collapse_whitespace: true
5882"#;
5883 let mock = mock_with_response("Got your message.");
5884 let builder = AgentBuilder::from_yaml(yaml).unwrap();
5885 let agent = builder.llm(Arc::new(mock.clone())).build().unwrap();
5886
5887 let _ = agent.chat(" hello world ").await.unwrap();
5888
5889 let history = mock.call_history();
5891 assert!(!history.is_empty());
5892 let last_call = history.last().unwrap();
5894 let user_msg = last_call
5895 .messages
5896 .iter()
5897 .find(|m| m.role == ai_agents_core::Role::User)
5898 .unwrap();
5899 assert_eq!(user_msg.content, "hello world");
5900 }
5901
5902 #[tokio::test]
5906 async fn test_integration_memory_compression() {
5907 let yaml = r#"
5908name: MemoryAgent
5909system_prompt: "You are helpful."
5910memory:
5911 type: compacting
5912 max_messages: 100
5913 compress_threshold: 5
5914 max_recent_messages: 3
5915 summarize_batch_size: 2
5916"#;
5917 let responses: Vec<&str> = (0..8).map(|_| "Response from assistant.").collect();
5919 let mock = mock_with_responses(responses);
5920 let builder = AgentBuilder::from_yaml(yaml).unwrap();
5921 let agent = builder.llm(Arc::new(mock)).build().unwrap();
5922
5923 for i in 0..6 {
5925 let _ = agent.chat(&format!("Message {}", i)).await.unwrap();
5926 }
5927
5928 let messages = agent.memory.get_messages(None).await.unwrap();
5931 assert!(messages.len() <= 12); }
5935
5936 #[tokio::test]
5938 async fn test_integration_multi_llm_registry() {
5939 let mut mock_default = MockLLMProvider::new("default");
5940 mock_default.set_response("Default LLM response.");
5941 let mut mock_router = MockLLMProvider::new("router");
5942 mock_router.set_response("Router response.");
5943
5944 let agent = AgentBuilder::new()
5945 .system_prompt("You are helpful.")
5946 .llm_alias("default", Arc::new(mock_default))
5947 .llm_alias("router", Arc::new(mock_router))
5948 .build()
5949 .unwrap();
5950
5951 let response = agent.chat("Hello").await.unwrap();
5952 assert_eq!(response.content, "Default LLM response.");
5953 }
5954
5955 #[tokio::test]
5957 async fn test_integration_agent_reset() {
5958 let mock = mock_with_responses(vec!["Hello!", "Hello again!"]);
5959 let agent = AgentBuilder::new()
5960 .system_prompt("You are helpful.")
5961 .llm(Arc::new(mock))
5962 .build()
5963 .unwrap();
5964
5965 let _ = agent.chat("Hi").await.unwrap();
5966 let messages = agent.memory.get_messages(None).await.unwrap();
5967 assert_eq!(messages.len(), 2); agent.reset().await.unwrap();
5970 let messages = agent.memory.get_messages(None).await.unwrap();
5971 assert_eq!(messages.len(), 0);
5972 }
5973
5974 #[tokio::test]
5976 async fn test_integration_process_validate_reject() {
5977 use ai_agents_process::{ProcessConfig, ProcessProcessor};
5978
5979 let validate_config = ai_agents_process::ValidateStage {
5980 id: Some("length_check".to_string()),
5981 condition: None,
5982 config: ai_agents_process::ValidateConfig {
5983 rules: vec![ai_agents_process::ValidationRule::MinLength {
5984 min_length: 10,
5985 on_fail: ai_agents_process::ValidationAction {
5986 action: ai_agents_process::ValidationActionType::Reject,
5987 message: None,
5988 },
5989 }],
5990 ..Default::default()
5991 },
5992 };
5993 let process_config = ProcessConfig {
5994 input: vec![ai_agents_process::ProcessStage::Validate(validate_config)],
5995 ..Default::default()
5996 };
5997 let processor = ProcessProcessor::new(process_config);
5998
5999 let mock = mock_with_response("Should not reach here.");
6000 let agent = AgentBuilder::new()
6001 .system_prompt("You are helpful.")
6002 .llm(Arc::new(mock))
6003 .process_processor(processor)
6004 .build()
6005 .unwrap();
6006
6007 let response = agent.chat("Hi").await.unwrap();
6008 assert!(
6010 response.content.contains("rejected")
6011 || response.content.contains("Input rejected")
6012 || response.content.contains("too short")
6013 || response.content.contains("Too short")
6014 || response.content.len() < 50, "Expected rejection response, got: {}",
6016 response.content
6017 );
6018 }
6019
6020 #[tokio::test]
6022 async fn test_llm_fallback_on_failure() {
6023 use ai_agents_recovery::{ErrorRecoveryConfig, LLMFailureAction, LLMRecoveryConfig};
6024
6025 let mut primary = MockLLMProvider::new("primary");
6026 primary.set_error("Primary LLM is unavailable");
6027
6028 let mut fallback = MockLLMProvider::new("fallback");
6029 fallback.set_response("Fallback response works!");
6030
6031 let agent = AgentBuilder::new()
6032 .system_prompt("You are helpful.")
6033 .llm_alias("default", Arc::new(primary))
6034 .llm_alias("backup", Arc::new(fallback))
6035 .recovery_manager(RecoveryManager::new(ErrorRecoveryConfig {
6036 llm: LLMRecoveryConfig {
6037 on_failure: LLMFailureAction::FallbackLlm {
6038 fallback_llm: "backup".to_string(),
6039 },
6040 ..Default::default()
6041 },
6042 ..Default::default()
6043 }))
6044 .build()
6045 .unwrap();
6046
6047 let response = agent.chat("Hello").await.unwrap();
6048 assert!(
6049 response.content.contains("Fallback response"),
6050 "Expected fallback response, got: {}",
6051 response.content
6052 );
6053 }
6054
6055 #[tokio::test]
6057 async fn test_llm_fallback_response_static_message() {
6058 use ai_agents_recovery::{ErrorRecoveryConfig, LLMFailureAction, LLMRecoveryConfig};
6059
6060 let mut primary = MockLLMProvider::new("primary");
6061 primary.set_error("Primary LLM is unavailable");
6062
6063 let agent = AgentBuilder::new()
6064 .system_prompt("You are helpful.")
6065 .llm(Arc::new(primary))
6066 .recovery_manager(RecoveryManager::new(ErrorRecoveryConfig {
6067 llm: LLMRecoveryConfig {
6068 on_failure: LLMFailureAction::FallbackResponse {
6069 message: "I am temporarily unavailable. Please try again later."
6070 .to_string(),
6071 },
6072 ..Default::default()
6073 },
6074 ..Default::default()
6075 }))
6076 .build()
6077 .unwrap();
6078
6079 let response = agent.chat("Hello").await.unwrap();
6080 assert!(
6081 response.content.contains("temporarily unavailable"),
6082 "Expected static fallback message, got: {}",
6083 response.content
6084 );
6085 }
6086
6087 #[tokio::test]
6089 async fn test_tool_failure_skip() {
6090 use ai_agents_recovery::{
6091 ErrorRecoveryConfig, ToolFailureAction, ToolRecoveryConfig, ToolRetryConfig,
6092 };
6093
6094 let mock = mock_with_responses(vec![
6096 r#"I'll use the nonexistent tool.
6097[TOOL_CALL: {"name": "nonexistent_tool", "arguments": {}}]"#,
6098 "The tool was unavailable, but I can still help you.",
6099 ]);
6100
6101 let agent = AgentBuilder::new()
6102 .system_prompt("You are helpful.")
6103 .llm(Arc::new(mock))
6104 .recovery_manager(RecoveryManager::new(ErrorRecoveryConfig {
6105 tools: ToolRecoveryConfig {
6106 default: ToolRetryConfig {
6107 max_retries: 0,
6108 timeout_ms: None,
6109 on_failure: ToolFailureAction::Skip,
6110 },
6111 ..Default::default()
6112 },
6113 ..Default::default()
6114 }))
6115 .build()
6116 .unwrap();
6117
6118 let response = agent.chat("Use the nonexistent tool").await;
6120 assert!(
6121 response.is_ok(),
6122 "Expected Ok with skip policy, got: {:?}",
6123 response
6124 );
6125 }
6126}