Skip to main content

ai_agents_runtime/
runtime.rs

1use async_trait::async_trait;
2use futures::stream::{Stream, StreamExt};
3use parking_lot::RwLock;
4use serde_json::Value;
5use std::collections::HashMap;
6use std::pin::Pin;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9use std::time::Instant;
10use tracing::{debug, error, info, instrument, warn};
11
12use ai_agents_context::{ContextManager, ContextProvider, TemplateRenderer};
13use ai_agents_core::{
14    AgentError, AgentSnapshot, AgentStorage, ChatMessage, FinishReason, LLMProvider, LLMResponse,
15    Result, ToolResult,
16};
17use ai_agents_disambiguation::{
18    DisambiguationConfig, DisambiguationContext, DisambiguationManager, DisambiguationResult,
19};
20use ai_agents_hitl::{
21    ApprovalHandler, ApprovalResult, ApprovalTrigger, HITLCheckResult, HITLEngine,
22    RejectAllHandler, TimeoutAction,
23};
24use ai_agents_hooks::{AgentHooks, NoopHooks};
25use ai_agents_llm::LLMRegistry;
26use ai_agents_memory::{
27    CompressResult, EvictionReason, Memory, MemoryBudgetEvent, MemoryCompressEvent,
28    MemoryEvictEvent, MemoryTokenBudget, OverflowStrategy,
29};
30use ai_agents_process::{ProcessData, ProcessProcessor};
31use ai_agents_reasoning::{
32    CriterionResult, EvaluationResult, Plan, PlanAction, PlanStatus, PlanStep, ReasoningConfig,
33    ReasoningMetadata, ReasoningMode, ReasoningOutput, ReflectionAttempt, ReflectionConfig,
34    ReflectionMetadata, StepFailureAction,
35};
36use ai_agents_recovery::{
37    ByRoleFilter, ContextOverflowAction, FilterConfig, IntoClassifiedError, KeepRecentFilter,
38    LLMFailureAction, MessageFilter, RecoveryManager, RetryConfig, SkipPatternFilter,
39    ToolFailureAction,
40};
41use ai_agents_skills::{SkillDefinition, SkillExecutor, SkillRouter};
42use ai_agents_state::{
43    PromptMode, StateAction, StateMachine, StateMachineSnapshot, StateTransitionEvent, ToolRef,
44    TransitionContext, TransitionEvaluator,
45};
46use ai_agents_storage::{StorageConfig as StorageStorageConfig, create_storage};
47use ai_agents_tools::{
48    ConditionEvaluator, EvaluationContext, LLMGetter, SecurityCheckResult, ToolCallRecord,
49    ToolRegistry, ToolSecurityEngine,
50};
51
52use super::{
53    Agent, AgentInfo, AgentResponse, ParallelToolsConfig, StreamChunk, StreamingConfig, ToolCall,
54};
55use crate::spec::StorageConfig;
56
57/// Outcome of processing tool calls within the agent loop.
58enum ToolCallOutcome {
59    /// Tools executed successfully, continue the LLM loop for the next iteration.
60    Continue,
61    /// A state transition fired during tool call handling, continue the loop.
62    TransitionFired,
63    /// HITL rejected a tool call, return this response immediately.
64    Rejected(AgentResponse),
65}
66
67/// Outcome of skill routing — used by `try_skill_route`.
68enum SkillRouteResult {
69    /// No skill matched, continue to normal LLM chat.
70    NoMatch,
71    /// Skill executed successfully.
72    Response(String),
73    /// Skill matched but needs disambiguation first.
74    NeedsClarification(AgentResponse),
75}
76
77/// Outcome of post_loop_processing - drives the caller's next step.
78enum PostLoopResult {
79    /// No transition fired. Content is the LLM response for this turn.
80    NoTransition(String),
81    /// Transition fired. Content is from plain post-transition re-generation.
82    Transitioned(String),
83    /// Transition fired into a state that requires full dispatch.
84    /// Caller re-enters run_loop_internal to apply the correct handler.
85    NeedsRedispatch,
86}
87
88pub struct RuntimeAgent {
89    info: AgentInfo,
90    llm_registry: Arc<LLMRegistry>,
91    memory: Arc<dyn Memory>,
92    tools: Arc<ToolRegistry>,
93    skills: Vec<SkillDefinition>,
94    skill_router: Option<SkillRouter>,
95    skill_executor: Option<SkillExecutor>,
96    base_system_prompt: String,
97    max_iterations: u32,
98    iteration_count: RwLock<u32>,
99    max_context_tokens: u32,
100    memory_token_budget: Option<MemoryTokenBudget>,
101    recovery_manager: RecoveryManager,
102    tool_security: ToolSecurityEngine,
103    process_processor: Option<ProcessProcessor>,
104    message_filters: RwLock<HashMap<String, Arc<dyn MessageFilter>>>,
105    state_machine: Option<Arc<StateMachine>>,
106    transition_evaluator: Option<Arc<dyn TransitionEvaluator>>,
107    context_manager: Arc<ContextManager>,
108    template_renderer: TemplateRenderer,
109    tool_call_history: RwLock<Vec<ToolCallRecord>>,
110    parallel_tools: ParallelToolsConfig,
111    streaming: StreamingConfig,
112    hooks: Arc<dyn AgentHooks>,
113    hitl_engine: Option<HITLEngine>,
114    approval_handler: Arc<dyn ApprovalHandler>,
115    storage_config: StorageConfig,
116    storage: RwLock<Option<Arc<dyn AgentStorage>>>,
117    reasoning_config: ReasoningConfig,
118    reflection_config: ReflectionConfig,
119    disambiguation_manager: Option<DisambiguationManager>,
120    /// Structured persona manager for identity, evolution, and secrets.
121    persona_manager: Option<Arc<ai_agents_persona::PersonaManager>>,
122    /// Skill ID that triggered the current pending disambiguation.
123    /// Set by try_skill_route() when skill-level disambiguation triggers clarification.
124    /// Read by run_loop() when clarification resolves to route directly to the skill.
125    pending_skill_id: RwLock<Option<String>>,
126    current_plan: RwLock<Option<Plan>>,
127    /// Tool IDs declared in the top-level `tools:` spec.
128    declared_tool_ids: Option<Vec<String>>,
129    /// Whether the context manager has been initialized (defaults loaded, env resolved, etc.)
130    context_initialized: AtomicBool,
131    /// Spawner for dynamic agent creation (set when YAML has a spawner: section).
132    spawner: Option<Arc<crate::spawner::AgentSpawner>>,
133    /// Registry tracking spawned agents (set when YAML has a spawner: section).
134    spawner_registry: Option<Arc<crate::spawner::AgentRegistry>>,
135    /// Re-dispatch depth for post-transition full dispatch.
136    /// 0 = not re-dispatching. > 0 = user message already in memory, skip re-adding.
137    redispatch_depth: RwLock<u32>,
138}
139
140impl std::fmt::Debug for RuntimeAgent {
141    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
142        f.debug_struct("RuntimeAgent")
143            .field("info", &self.info)
144            .field("base_system_prompt", &self.base_system_prompt)
145            .field("max_iterations", &self.max_iterations)
146            .field("skills_count", &self.skills.len())
147            .field("max_context_tokens", &self.max_context_tokens)
148            .field("has_state_machine", &self.state_machine.is_some())
149            .field("parallel_tools", &self.parallel_tools)
150            .field("streaming", &self.streaming)
151            .field("has_hooks", &true)
152            .field("has_hitl", &self.hitl_engine.is_some())
153            .field("storage_type", &self.storage_config.storage_type())
154            .field("reasoning_mode", &self.reasoning_config.mode)
155            .field("reflection_enabled", &self.reflection_config.enabled)
156            .field("declared_tool_ids", &self.declared_tool_ids)
157            .field("has_persona", &self.persona_manager.is_some())
158            .finish_non_exhaustive()
159    }
160}
161
162struct RegistryLLMGetter {
163    registry: Arc<LLMRegistry>,
164}
165
166impl LLMGetter for RegistryLLMGetter {
167    fn get_llm(&self, alias: &str) -> Option<Arc<dyn LLMProvider>> {
168        self.registry.get(alias).ok()
169    }
170}
171
172impl RuntimeAgent {
173    #[allow(clippy::too_many_arguments)]
174    pub fn new(
175        info: AgentInfo,
176        llm_registry: Arc<LLMRegistry>,
177        memory: Arc<dyn Memory>,
178        tools: Arc<ToolRegistry>,
179        skills: Vec<SkillDefinition>,
180        system_prompt: String,
181        max_iterations: u32,
182    ) -> Self {
183        let (skill_router, skill_executor) = if !skills.is_empty() {
184            let router_llm = llm_registry.router().ok();
185            let router = router_llm.map(|llm| SkillRouter::new(llm, skills.clone()));
186            let executor = SkillExecutor::new(llm_registry.clone(), tools.clone());
187            (router, Some(executor))
188        } else {
189            (None, None)
190        };
191
192        let context_manager =
193            ContextManager::new(HashMap::new(), info.name.clone(), info.version.clone());
194
195        Self {
196            info,
197            llm_registry,
198            memory,
199            tools,
200            skills,
201            skill_router,
202            skill_executor,
203            base_system_prompt: system_prompt,
204            max_iterations,
205            iteration_count: RwLock::new(0),
206            max_context_tokens: 4096,
207            memory_token_budget: None,
208            recovery_manager: RecoveryManager::default(),
209            tool_security: ToolSecurityEngine::default(),
210            process_processor: None,
211            message_filters: RwLock::new(HashMap::new()),
212            state_machine: None,
213            transition_evaluator: None,
214            context_manager: Arc::new(context_manager),
215            template_renderer: TemplateRenderer::new(),
216            tool_call_history: RwLock::new(Vec::new()),
217            parallel_tools: ParallelToolsConfig::default(),
218            streaming: StreamingConfig::default(),
219            hooks: Arc::new(NoopHooks),
220            hitl_engine: None,
221            approval_handler: Arc::new(RejectAllHandler::new()),
222            storage_config: StorageConfig::default(),
223            storage: RwLock::new(None),
224            reasoning_config: ReasoningConfig::default(),
225            reflection_config: ReflectionConfig::default(),
226            disambiguation_manager: None,
227            persona_manager: None,
228            pending_skill_id: RwLock::new(None),
229            current_plan: RwLock::new(None),
230            declared_tool_ids: None,
231            context_initialized: AtomicBool::new(false),
232            spawner: None,
233            spawner_registry: None,
234            redispatch_depth: RwLock::new(0),
235        }
236    }
237
238    pub fn with_declared_tool_ids(mut self, ids: Option<Vec<String>>) -> Self {
239        self.declared_tool_ids = ids;
240        self
241    }
242
243    pub fn with_storage_config(mut self, config: StorageConfig) -> Self {
244        self.storage_config = config;
245        self
246    }
247
248    pub fn with_storage(self, storage: Arc<dyn AgentStorage>) -> Self {
249        *self.storage.write() = Some(storage);
250        self
251    }
252
253    pub fn with_reasoning(mut self, config: ReasoningConfig) -> Self {
254        self.reasoning_config = config;
255        self
256    }
257
258    pub fn with_reflection(mut self, config: ReflectionConfig) -> Self {
259        self.reflection_config = config;
260        self
261    }
262
263    pub fn reasoning_config(&self) -> &ReasoningConfig {
264        &self.reasoning_config
265    }
266
267    pub fn reflection_config(&self) -> &ReflectionConfig {
268        &self.reflection_config
269    }
270
271    pub fn with_persona(mut self, manager: Arc<ai_agents_persona::PersonaManager>) -> Self {
272        self.persona_manager = Some(manager);
273        self
274    }
275
276    pub fn persona_manager(&self) -> Option<&Arc<ai_agents_persona::PersonaManager>> {
277        self.persona_manager.as_ref()
278    }
279
280    pub fn with_disambiguation(mut self, config: DisambiguationConfig) -> Self {
281        if config.is_enabled() {
282            self.disambiguation_manager = Some(DisambiguationManager::new(
283                config,
284                Arc::clone(&self.llm_registry),
285            ));
286        }
287        self
288    }
289
290    pub fn disambiguation_manager(&self) -> Option<&DisambiguationManager> {
291        self.disambiguation_manager.as_ref()
292    }
293
294    pub fn has_disambiguation(&self) -> bool {
295        self.disambiguation_manager
296            .as_ref()
297            .is_some_and(|m| m.is_enabled())
298    }
299
300    pub async fn init_storage(&self) -> Result<()> {
301        if self.storage_config.is_none() {
302            return Ok(());
303        }
304        if self.storage.read().is_some() {
305            return Ok(());
306        }
307        let storage_config = self.convert_storage_config();
308        let storage = create_storage(&storage_config).await?;
309        *self.storage.write() = storage;
310        Ok(())
311    }
312
313    fn convert_storage_config(&self) -> StorageStorageConfig {
314        crate::spec::storage::to_storage_config(&self.storage_config)
315    }
316
317    pub fn storage(&self) -> Option<Arc<dyn AgentStorage>> {
318        self.storage.read().clone()
319    }
320
321    pub fn storage_config(&self) -> &StorageConfig {
322        &self.storage_config
323    }
324
325    /// Returns the spawner if configured via a spawner: YAML section.
326    pub fn spawner(&self) -> Option<&Arc<crate::spawner::AgentSpawner>> {
327        self.spawner.as_ref()
328    }
329
330    /// Returns the agent registry if configured via a spawner: YAML section.
331    pub fn spawner_registry(&self) -> Option<&Arc<crate::spawner::AgentRegistry>> {
332        self.spawner_registry.as_ref()
333    }
334
335    pub fn has_spawner(&self) -> bool {
336        self.spawner_registry.is_some()
337    }
338
339    pub fn with_spawner_handles(
340        mut self,
341        spawner: Arc<crate::spawner::AgentSpawner>,
342        registry: Arc<crate::spawner::AgentRegistry>,
343    ) -> Self {
344        self.spawner = Some(spawner);
345        self.spawner_registry = Some(registry);
346        self
347    }
348
349    pub fn with_hooks(mut self, hooks: Arc<dyn AgentHooks>) -> Self {
350        self.hooks = hooks;
351        self
352    }
353
354    pub fn with_parallel_tools(mut self, config: ParallelToolsConfig) -> Self {
355        self.parallel_tools = config;
356        self
357    }
358
359    pub fn with_streaming(mut self, config: StreamingConfig) -> Self {
360        self.streaming = config;
361        self
362    }
363
364    pub fn with_hitl(mut self, engine: HITLEngine, handler: Arc<dyn ApprovalHandler>) -> Self {
365        self.hitl_engine = Some(engine);
366        self.approval_handler = handler;
367        self
368    }
369
370    pub fn with_max_context_tokens(mut self, tokens: u32) -> Self {
371        self.max_context_tokens = tokens;
372        self
373    }
374
375    pub fn with_memory_token_budget(mut self, budget: MemoryTokenBudget) -> Self {
376        self.memory_token_budget = Some(budget);
377        self
378    }
379
380    pub fn with_recovery_manager(mut self, manager: RecoveryManager) -> Self {
381        self.recovery_manager = manager;
382        self
383    }
384
385    pub fn with_tool_security(mut self, engine: ToolSecurityEngine) -> Self {
386        self.tool_security = engine;
387        self
388    }
389
390    pub fn with_process_processor(mut self, processor: ProcessProcessor) -> Self {
391        self.process_processor = Some(processor);
392        self
393    }
394
395    pub fn with_state_machine(
396        mut self,
397        state_machine: Arc<StateMachine>,
398        evaluator: Arc<dyn TransitionEvaluator>,
399    ) -> Self {
400        self.state_machine = Some(state_machine);
401        self.transition_evaluator = Some(evaluator);
402        self
403    }
404
405    pub fn with_context_manager(mut self, manager: Arc<ContextManager>) -> Self {
406        self.context_manager = manager;
407        self
408    }
409
410    pub fn register_message_filter(&self, name: impl Into<String>, filter: Arc<dyn MessageFilter>) {
411        self.message_filters.write().insert(name.into(), filter);
412    }
413
414    pub fn set_context(&self, key: &str, value: Value) -> Result<()> {
415        self.context_manager.set(key, value)
416    }
417
418    pub fn update_context(&self, path: &str, value: Value) -> Result<()> {
419        self.context_manager.update(path, value)
420    }
421
422    pub fn get_context(&self) -> HashMap<String, Value> {
423        self.context_manager.get_all()
424    }
425
426    pub fn remove_context(&self, key: &str) -> Option<Value> {
427        self.context_manager.remove(key)
428    }
429
430    pub async fn refresh_context(&self, key: &str) -> Result<()> {
431        self.context_manager.refresh(key).await
432    }
433
434    pub fn register_context_provider(&self, name: &str, provider: Arc<dyn ContextProvider>) {
435        self.context_manager.register_provider(name, provider);
436    }
437
438    pub fn current_state(&self) -> Option<String> {
439        self.state_machine.as_ref().map(|sm| sm.current())
440    }
441
442    pub async fn transition_to(&self, state: &str) -> Result<()> {
443        if let Some(ref sm) = self.state_machine {
444            let from_state = sm.current();
445            self.execute_state_exit_actions(&from_state).await;
446            sm.transition_to(state, "manual transition")?;
447            self.execute_state_enter_actions(state).await;
448            info!(to = %state, "Manual state transition");
449        }
450        Ok(())
451    }
452
453    pub fn state_history(&self) -> Vec<StateTransitionEvent> {
454        self.state_machine
455            .as_ref()
456            .map(|sm| sm.history())
457            .unwrap_or_default()
458    }
459
460    pub async fn save_state(&self) -> Result<AgentSnapshot> {
461        let memory_snapshot = self.memory.snapshot().await?;
462        let state_machine_snapshot = self.state_machine.as_ref().map(|sm| sm.snapshot());
463        let context_snapshot = self.context_manager.snapshot();
464
465        let mut snapshot = AgentSnapshot::new(self.info.id.clone())
466            .with_memory(memory_snapshot)
467            .with_context(context_snapshot)
468            .with_state_machine(
469                state_machine_snapshot.unwrap_or_else(|| StateMachineSnapshot {
470                    current_state: String::new(),
471                    previous_state: None,
472                    turn_count: 0,
473                    no_transition_count: 0,
474                    history: vec![],
475                }),
476            );
477
478        if let Some(ref persona) = self.persona_manager {
479            snapshot.persona = Some(persona.snapshot_as_value()?);
480        }
481
482        Ok(snapshot)
483    }
484
485    /// Save state including spawned agents manifest for session persistence.
486    pub async fn save_state_full(&self) -> Result<AgentSnapshot> {
487        let mut snapshot = self.save_state().await?;
488        if let Some(ref registry) = self.spawner_registry {
489            let entries = registry.list_with_specs();
490            if !entries.is_empty() {
491                snapshot = snapshot.with_spawned_agents(entries);
492            }
493        }
494        Ok(snapshot)
495    }
496
497    pub async fn restore_state(&self, snapshot: AgentSnapshot) -> Result<()> {
498        self.memory.restore(snapshot.memory).await?;
499
500        if let (Some(sm), Some(sm_snapshot)) = (&self.state_machine, snapshot.state_machine) {
501            if !sm_snapshot.current_state.is_empty() {
502                sm.restore(sm_snapshot)?;
503            }
504        }
505
506        self.context_manager.restore(snapshot.context);
507
508        if let (Some(persona_value), Some(persona_manager)) =
509            (snapshot.persona, &self.persona_manager)
510        {
511            persona_manager.restore_from_value(persona_value)?;
512        }
513
514        info!(agent_id = %snapshot.agent_id, "State restored");
515        Ok(())
516    }
517
518    pub async fn save_to(&self, storage: &dyn AgentStorage, session_id: &str) -> Result<()> {
519        let snapshot = self.save_state().await?;
520        storage.save(session_id, &snapshot).await
521    }
522
523    pub async fn load_from(&self, storage: &dyn AgentStorage, session_id: &str) -> Result<bool> {
524        if let Some(snapshot) = storage.load(session_id).await? {
525            self.restore_state(snapshot).await?;
526            Ok(true)
527        } else {
528            Ok(false)
529        }
530    }
531
532    pub async fn save_session(&self, session_id: &str) -> Result<()> {
533        let storage = self.storage.read().clone();
534        match storage {
535            Some(s) => self.save_to(s.as_ref(), session_id).await,
536            None => Err(AgentError::Config(
537                "No storage configured. Use with_storage_config() or with_storage() first".into(),
538            )),
539        }
540    }
541
542    pub async fn load_session(&self, session_id: &str) -> Result<bool> {
543        let storage = self.storage.read().clone();
544        match storage {
545            Some(s) => self.load_from(s.as_ref(), session_id).await,
546            None => Err(AgentError::Config(
547                "No storage configured. Use with_storage_config() or with_storage() first".into(),
548            )),
549        }
550    }
551
552    pub async fn delete_session(&self, session_id: &str) -> Result<()> {
553        let storage = self.storage.read().clone();
554        match storage {
555            Some(s) => s.delete(session_id).await,
556            None => Err(AgentError::Config(
557                "No storage configured. Use with_storage_config() or with_storage() first".into(),
558            )),
559        }
560    }
561
562    pub async fn list_sessions(&self) -> Result<Vec<String>> {
563        let storage = self.storage.read().clone();
564        match storage {
565            Some(s) => s.list_sessions().await,
566            None => Err(AgentError::Config(
567                "No storage configured. Use with_storage_config() or with_storage() first".into(),
568            )),
569        }
570    }
571
572    fn estimate_tokens(&self, text: &str) -> u32 {
573        (text.len() as f32 / 4.0).ceil() as u32
574    }
575
576    fn estimate_total_tokens(&self, messages: &[ChatMessage]) -> u32 {
577        messages
578            .iter()
579            .map(|m| self.estimate_tokens(&m.content))
580            .sum()
581    }
582
583    fn truncate_context(&self, messages: &mut Vec<ChatMessage>, keep_recent: usize) {
584        if messages.len() <= keep_recent + 1 {
585            return;
586        }
587        let system_msg = messages.remove(0);
588        let to_remove = messages.len().saturating_sub(keep_recent);
589        messages.drain(..to_remove);
590        messages.insert(0, system_msg);
591    }
592
593    fn get_filter(&self, config: &FilterConfig) -> Arc<dyn MessageFilter> {
594        match config {
595            FilterConfig::KeepRecent(n) => Arc::new(KeepRecentFilter::new(*n)),
596            FilterConfig::ByRole { keep_roles } => Arc::new(ByRoleFilter::new(keep_roles.clone())),
597            FilterConfig::SkipPattern { skip_if_contains } => {
598                Arc::new(SkipPatternFilter::new(skip_if_contains.clone()))
599            }
600            FilterConfig::Custom { name } => {
601                let filters = self.message_filters.read();
602                filters
603                    .get(name)
604                    .cloned()
605                    .unwrap_or_else(|| Arc::new(KeepRecentFilter::new(10)))
606            }
607        }
608    }
609
610    async fn summarize_context(
611        &self,
612        messages: &mut Vec<ChatMessage>,
613        summarizer_llm: Option<&str>,
614        max_summary_tokens: u32,
615        custom_prompt: Option<&str>,
616        keep_recent: usize,
617        filter: Option<&FilterConfig>,
618    ) -> Result<()> {
619        let system_msg = messages.remove(0);
620
621        let to_summarize_count = messages.len().saturating_sub(keep_recent);
622        if to_summarize_count == 0 {
623            messages.insert(0, system_msg);
624            return Ok(());
625        }
626
627        let recent_msgs: Vec<ChatMessage> = messages.drain(to_summarize_count..).collect();
628        let mut to_summarize = std::mem::take(messages);
629
630        if let Some(filter_config) = filter {
631            let filter = self.get_filter(filter_config);
632            to_summarize = filter.filter(to_summarize);
633        }
634
635        if to_summarize.is_empty() {
636            *messages = recent_msgs;
637            messages.insert(0, system_msg);
638            return Ok(());
639        }
640
641        let conversation_text = to_summarize
642            .iter()
643            .map(|m| format!("{:?}: {}", m.role, m.content))
644            .collect::<Vec<_>>()
645            .join("\n");
646
647        let default_prompt = format!(
648            "Summarize the following conversation in under {} tokens, preserving key information:\n\n{}",
649            max_summary_tokens, conversation_text
650        );
651
652        let summary_prompt = custom_prompt
653            .map(|p| format!("{}\n\n{}", p, conversation_text))
654            .unwrap_or(default_prompt);
655
656        let summarizer = if let Some(alias) = summarizer_llm {
657            self.llm_registry
658                .get(alias)
659                .map_err(|e| AgentError::Config(e.to_string()))?
660        } else {
661            self.llm_registry
662                .router()
663                .or_else(|_| self.llm_registry.default())
664                .map_err(|e| AgentError::Config(e.to_string()))?
665        };
666
667        let summary_msgs = vec![ChatMessage::user(&summary_prompt)];
668        let response = summarizer.complete(&summary_msgs, None).await?;
669
670        let summary_message = ChatMessage::system(&format!(
671            "[Previous conversation summary]\n{}",
672            response.content
673        ));
674
675        *messages = vec![system_msg, summary_message];
676        messages.extend(recent_msgs);
677
678        debug!(
679            summarized_count = to_summarize_count,
680            kept_recent = keep_recent,
681            "Context summarized"
682        );
683
684        Ok(())
685    }
686
687    fn render_system_prompt(&self) -> Result<String> {
688        let context = self.context_manager.get_all();
689        self.template_renderer
690            .render(&self.base_system_prompt, &context)
691    }
692
693    async fn get_available_tool_ids(&self) -> Result<Vec<String>> {
694        match self.get_current_tool_refs() {
695            // State explicitly sets tools (including empty = no tools)
696            Some(tool_refs) => {
697                if tool_refs.is_empty() {
698                    // Explicitly empty: no tools available in this state
699                    return Ok(Vec::new());
700                }
701
702                let eval_ctx = self.build_evaluation_context().await?;
703                let llm_getter = RegistryLLMGetter {
704                    registry: self.llm_registry.clone(),
705                };
706                let evaluator = ConditionEvaluator::new(llm_getter);
707
708                let mut available = Vec::new();
709                for tool_ref in &tool_refs {
710                    let tool_id = tool_ref.id();
711
712                    if self.tools.get(tool_id).is_none() {
713                        continue;
714                    }
715
716                    if let Some(condition) = tool_ref.condition() {
717                        match evaluator.evaluate(condition, &eval_ctx).await {
718                            Ok(true) => {
719                                available.push(tool_id.to_string());
720                            }
721                            Ok(false) => {
722                                debug!(tool = tool_id, "Tool condition not met, skipping");
723                            }
724                            Err(e) => {
725                                warn!(tool = tool_id, error = %e, "Error evaluating tool condition");
726                            }
727                        }
728                    } else {
729                        available.push(tool_id.to_string());
730                    }
731                }
732
733                Ok(available)
734            }
735            // State doesn't specify tools: fallback to agent-level
736            None => {
737                match &self.declared_tool_ids {
738                    // tools: [...] — specific tools listed
739                    Some(ids) if !ids.is_empty() => Ok(ids
740                        .iter()
741                        .filter(|id| self.tools.get(id).is_some())
742                        .cloned()
743                        .collect()),
744                    // tools: [] — explicitly no tools
745                    Some(_) => Ok(Vec::new()),
746                    // tools: not specified — all registered tools available
747                    None => Ok(self.tools.list_ids()),
748                }
749            }
750        }
751    }
752
753    /// Returns `Some(tools)` if the current state explicitly declares tools
754    /// (including `Some([])` for "no tools"), or `None` if the state doesn't
755    /// specify tools (meaning: fall back to agent-level declared_tool_ids).
756    fn get_current_tool_refs(&self) -> Option<Vec<ToolRef>> {
757        if let Some(ref sm) = self.state_machine {
758            if let Some(state_def) = sm.current_definition() {
759                let parent_def = sm.get_parent_definition();
760                if let Some(effective) = state_def.get_effective_tools(parent_def.as_ref()) {
761                    return Some(effective.into_iter().cloned().collect());
762                }
763            }
764        }
765        None
766    }
767
768    async fn build_evaluation_context(&self) -> Result<EvaluationContext> {
769        let context = self.context_manager.get_all();
770        let messages = self.memory.get_messages(Some(10)).await?;
771        let tool_history = self.tool_call_history.read().clone();
772
773        let (state_name, turn_count, previous_state) = if let Some(ref sm) = self.state_machine {
774            (Some(sm.current()), sm.turn_count(), sm.previous())
775        } else {
776            (None, 0, None)
777        };
778
779        Ok(EvaluationContext::default()
780            .with_context(context)
781            .with_state(state_name, turn_count, previous_state)
782            .with_called_tools(tool_history)
783            .with_messages(messages))
784    }
785
786    fn record_tool_call(&self, tool_id: &str, result: Value) {
787        self.tool_call_history.write().push(ToolCallRecord {
788            tool_id: tool_id.to_string(),
789            result,
790            timestamp: chrono::Utc::now(),
791        });
792    }
793
794    async fn get_effective_system_prompt(&self) -> Result<String> {
795        let rendered_base = self.render_system_prompt()?;
796
797        // Render persona prompt (if configured) and fire hooks for newly revealed secrets.
798        let persona_prefix = if let Some(ref persona) = self.persona_manager {
799            let context = self.context_manager.get_all();
800            let render_result = persona.render_prompt(&context)?;
801
802            // Fire on_secret_revealed for each newly revealed secret.
803            for content in &render_result.newly_revealed {
804                self.hooks.on_secret_revealed(content).await;
805            }
806
807            render_result.prompt
808        } else {
809            String::new()
810        };
811
812        if let Some(ref sm) = self.state_machine {
813            if let Some(state_def) = sm.current_definition() {
814                let state_prompt = if let Some(ref prompt) = state_def.prompt {
815                    let context = self.context_manager.get_all();
816                    self.template_renderer.render_with_state(
817                        prompt,
818                        &context,
819                        &sm.current(),
820                        sm.previous().as_deref(),
821                        sm.turn_count(),
822                        state_def.max_turns,
823                    )?
824                } else {
825                    String::new()
826                };
827
828                let combined = match state_def.prompt_mode {
829                    PromptMode::Append => {
830                        if state_prompt.is_empty() {
831                            rendered_base
832                        } else {
833                            format!(
834                                "{}\n\n[Current State: {}]\n{}",
835                                rendered_base,
836                                sm.current(),
837                                state_prompt
838                            )
839                        }
840                    }
841                    PromptMode::Replace => {
842                        if state_prompt.is_empty() {
843                            rendered_base
844                        } else {
845                            state_prompt
846                        }
847                    }
848                    PromptMode::Prepend => {
849                        if state_prompt.is_empty() {
850                            rendered_base
851                        } else {
852                            format!("{}\n\n{}", state_prompt, rendered_base)
853                        }
854                    }
855                };
856
857                // Persona always prepended regardless of prompt_mode.
858                let with_persona = if persona_prefix.is_empty() {
859                    combined
860                } else {
861                    format!("{}\n\n{}", persona_prefix, combined)
862                };
863
864                let available_tool_ids = self.get_available_tool_ids().await?;
865                // Only add tools prompt if tools are available.
866                // When tools: [] is set (explicitly empty), show NO tools to the LLM.
867                if !available_tool_ids.is_empty() {
868                    let tools_prompt = self.tools.generate_filtered_prompt_with_parallel(
869                        &available_tool_ids,
870                        self.parallel_tools.enabled,
871                    );
872                    if !tools_prompt.is_empty() {
873                        return Ok(format!("{}\n\n{}", with_persona, tools_prompt));
874                    }
875                }
876                return Ok(with_persona);
877            }
878        }
879
880        // No state machine - prepend persona to base.
881        let with_persona = if persona_prefix.is_empty() {
882            rendered_base
883        } else {
884            format!("{}\n\n{}", persona_prefix, rendered_base)
885        };
886
887        let tools_prompt = match &self.declared_tool_ids {
888            Some(ids) if !ids.is_empty() => self
889                .tools
890                .generate_filtered_prompt_with_parallel(ids, self.parallel_tools.enabled),
891            Some(_) => {
892                // tools: [] - explicitly no tools, empty prompt
893                String::new()
894            }
895            None => {
896                // tools: not specified - all registered tools
897                self.tools
898                    .generate_tools_prompt_with_parallel(self.parallel_tools.enabled)
899            }
900        };
901        if !tools_prompt.is_empty() {
902            Ok(format!("{}\n\n{}", with_persona, tools_prompt))
903        } else {
904            Ok(with_persona)
905        }
906    }
907
908    fn get_state_llm(&self) -> Result<Arc<dyn LLMProvider>> {
909        if let Some(ref sm) = self.state_machine {
910            if let Some(state_def) = sm.current_definition() {
911                if let Some(ref llm_alias) = state_def.llm {
912                    return self
913                        .llm_registry
914                        .get(llm_alias)
915                        .map_err(|e| AgentError::Config(e.to_string()));
916                }
917            }
918        }
919        self.llm_registry
920            .default()
921            .map_err(|e| AgentError::Config(e.to_string()))
922    }
923
924    fn get_effective_reasoning_config(&self) -> ReasoningConfig {
925        if let Some(ref sm) = self.state_machine {
926            if let Some(state_def) = sm.current_definition() {
927                if let Some(ref state_reasoning) = state_def.reasoning {
928                    return state_reasoning.clone();
929                }
930            }
931        }
932        self.reasoning_config.clone()
933    }
934
935    fn get_effective_reflection_config(&self) -> ReflectionConfig {
936        if let Some(ref sm) = self.state_machine {
937            if let Some(state_def) = sm.current_definition() {
938                if let Some(ref state_reflection) = state_def.reflection {
939                    return state_reflection.clone();
940                }
941            }
942        }
943        self.reflection_config.clone()
944    }
945
946    fn get_skill_reasoning_config(&self, skill: &SkillDefinition) -> ReasoningConfig {
947        skill
948            .reasoning
949            .clone()
950            .unwrap_or_else(|| self.get_effective_reasoning_config())
951    }
952
953    fn get_skill_reflection_config(&self, skill: &SkillDefinition) -> ReflectionConfig {
954        skill
955            .reflection
956            .clone()
957            .unwrap_or_else(|| self.get_effective_reflection_config())
958    }
959
960    async fn build_disambiguation_context(&self) -> Result<DisambiguationContext> {
961        let recent_messages: Vec<String> = self
962            .memory
963            .get_messages(Some(5))
964            .await?
965            .iter()
966            .rev()
967            .map(|m| format!("{:?}: {}", m.role, m.content))
968            .collect();
969
970        let current_state = self.current_state().map(|s| s.to_string());
971
972        // Include the current state's prompt text so the detector understands
973        // what kind of input is expected (e.g., "Ask for the order number").
974        let state_prompt: Option<String> = self
975            .state_machine
976            .as_ref()
977            .and_then(|sm| sm.current_definition())
978            .and_then(|def| def.prompt.clone());
979
980        let available_tools: Vec<String> = self
981            .get_available_tool_ids()
982            .await
983            .unwrap_or_else(|_| self.tools.list_ids());
984
985        let available_skills: Vec<String> = self.skills.iter().map(|s| s.id.clone()).collect();
986
987        let user_context = self.context_manager.get_all();
988
989        // Extract canonical intent labels from current state's transitions
990        let available_intents: Vec<String> = if let Some(ref sm) = self.state_machine {
991            sm.current_definition()
992                .map(|def| {
993                    def.transitions
994                        .iter()
995                        .filter_map(|t| t.intent.clone())
996                        .collect()
997                })
998                .unwrap_or_default()
999        } else {
1000            Vec::new()
1001        };
1002
1003        Ok(DisambiguationContext::from_agent_state(
1004            recent_messages,
1005            current_state,
1006            state_prompt,
1007            available_tools,
1008            available_skills,
1009            available_intents,
1010            user_context,
1011        ))
1012    }
1013
1014    fn get_available_skills(&self) -> Vec<&SkillDefinition> {
1015        if let Some(ref sm) = self.state_machine {
1016            if let Some(state_def) = sm.current_definition() {
1017                let parent_def = sm.get_parent_definition();
1018                let effective_skills = state_def.get_effective_skills(parent_def.as_ref());
1019                if !effective_skills.is_empty() {
1020                    return self
1021                        .skills
1022                        .iter()
1023                        .filter(|s| effective_skills.contains(&&s.id))
1024                        .collect();
1025                }
1026            }
1027        }
1028        self.skills.iter().collect()
1029    }
1030
1031    async fn build_messages(&self) -> Result<Vec<ChatMessage>> {
1032        let system_prompt = self.get_effective_system_prompt().await?;
1033        let mut messages = vec![ChatMessage::system(&system_prompt)];
1034
1035        // Use get_context() instead of get_messages() so that the conversation
1036        // summary produced by CompactingMemory is included in the prompt.
1037        // Token budget controls what's stored in memory (via handle_memory_overflow)
1038        // max_context_tokens + ContextOverflowAction handles LLM context limits
1039        let context = self.memory.get_context().await?;
1040        let history = if let Some(ref budget) = self.memory_token_budget {
1041            // Use per-component allocation so summary and recent messages
1042            // are each capped to their declared token budgets.
1043            context.to_llm_messages_with_allocation(&budget.allocation)
1044        } else {
1045            context.to_llm_messages()
1046        };
1047        messages.extend(history);
1048
1049        let total_tokens = self.estimate_total_tokens(&messages);
1050
1051        if total_tokens > self.max_context_tokens {
1052            debug!(
1053                total = total_tokens,
1054                limit = self.max_context_tokens,
1055                "Context overflow"
1056            );
1057
1058            match &self.recovery_manager.config().llm.on_context_overflow {
1059                ContextOverflowAction::Error => {
1060                    return Err(AgentError::LLM(format!(
1061                        "Context overflow: {} tokens > {} limit",
1062                        total_tokens, self.max_context_tokens
1063                    )));
1064                }
1065                ContextOverflowAction::Truncate { keep_recent } => {
1066                    self.truncate_context(&mut messages, *keep_recent);
1067                }
1068                ContextOverflowAction::Summarize {
1069                    summarizer_llm,
1070                    max_summary_tokens,
1071                    custom_prompt,
1072                    keep_recent,
1073                    filter,
1074                } => {
1075                    self.summarize_context(
1076                        &mut messages,
1077                        summarizer_llm.as_deref(),
1078                        *max_summary_tokens,
1079                        custom_prompt.as_deref(),
1080                        *keep_recent,
1081                        filter.as_ref(),
1082                    )
1083                    .await?;
1084                }
1085            }
1086        }
1087
1088        Ok(messages)
1089    }
1090
1091    fn parse_tool_calls(&self, content: &str) -> Option<Vec<ToolCall>> {
1092        // Try direct JSON parse first
1093        if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(content) {
1094            // Handle JSON array of tool calls (parallel tool calling)
1095            if let Some(arr) = parsed.as_array() {
1096                let calls: Vec<ToolCall> = arr
1097                    .iter()
1098                    .filter_map(|v| self.extract_tool_call_from_value(v))
1099                    .collect();
1100                if !calls.is_empty() {
1101                    return Some(calls);
1102                }
1103            }
1104            // Handle single JSON object
1105            if let Some(tool_call) = self.extract_tool_call_from_value(&parsed) {
1106                return Some(vec![tool_call]);
1107            }
1108        }
1109
1110        // Try to extract JSON from content (handles extra text/braces from LLM)
1111        if let Some(json_str) = self.extract_json_from_content(content) {
1112            if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(&json_str) {
1113                // Handle JSON array of tool calls (parallel tool calling)
1114                if let Some(arr) = parsed.as_array() {
1115                    let calls: Vec<ToolCall> = arr
1116                        .iter()
1117                        .filter_map(|v| self.extract_tool_call_from_value(v))
1118                        .collect();
1119                    if !calls.is_empty() {
1120                        return Some(calls);
1121                    }
1122                }
1123                // Handle single JSON object
1124                if let Some(tool_call) = self.extract_tool_call_from_value(&parsed) {
1125                    return Some(vec![tool_call]);
1126                }
1127            }
1128        }
1129
1130        None
1131    }
1132
1133    fn extract_tool_call_from_value(&self, parsed: &serde_json::Value) -> Option<ToolCall> {
1134        if let Some(tool_name) = parsed.get("tool").and_then(|v| v.as_str()) {
1135            let arguments = parsed
1136                .get("arguments")
1137                .cloned()
1138                .unwrap_or(serde_json::json!({}));
1139            return Some(ToolCall {
1140                id: uuid::Uuid::new_v4().to_string(),
1141                name: tool_name.to_string(),
1142                arguments,
1143            });
1144        }
1145        None
1146    }
1147
1148    // Lite models could generate unmatched braces: this function handles such cases
1149    fn extract_json_from_content(&self, content: &str) -> Option<String> {
1150        // Try array first (for parallel tool calls), then single object
1151        if let Some(result) = self.extract_json_array_from_content(content) {
1152            return Some(result);
1153        }
1154        self.extract_json_object_from_content(content)
1155    }
1156
1157    /// Extract a JSON array `[...]` containing tool calls from mixed content.
1158    fn extract_json_array_from_content(&self, content: &str) -> Option<String> {
1159        let start = content.find('[')?;
1160        let content_from_start = &content[start..];
1161
1162        let mut depth = 0;
1163        let mut end = 0;
1164        for (i, ch) in content_from_start.char_indices() {
1165            match ch {
1166                '[' => depth += 1,
1167                ']' => {
1168                    depth -= 1;
1169                    if depth == 0 {
1170                        end = i + 1;
1171                        break;
1172                    }
1173                }
1174                _ => {}
1175            }
1176        }
1177
1178        if end > 0 {
1179            let json_str = &content_from_start[..end];
1180            // Verify it looks like an array of tool calls
1181            if json_str.contains("\"tool\"") {
1182                return Some(json_str.to_string());
1183            }
1184        }
1185
1186        None
1187    }
1188
1189    /// Extract a JSON object `{...}` containing a tool call from mixed content.
1190    fn extract_json_object_from_content(&self, content: &str) -> Option<String> {
1191        let start = content.find('{')?;
1192        let content_from_start = &content[start..];
1193
1194        // Count braces to find the matching closing brace
1195        let mut depth = 0;
1196        let mut end = 0;
1197        for (i, ch) in content_from_start.char_indices() {
1198            match ch {
1199                '{' => depth += 1,
1200                '}' => {
1201                    depth -= 1;
1202                    if depth == 0 {
1203                        end = i + 1;
1204                        break;
1205                    }
1206                }
1207                _ => {}
1208            }
1209        }
1210
1211        if end > 0 {
1212            let json_str = &content_from_start[..end];
1213            // Verify it looks like a tool call
1214            if json_str.contains("\"tool\"") {
1215                return Some(json_str.to_string());
1216            }
1217        }
1218
1219        None
1220    }
1221
1222    async fn execute_tool(&self, tool_call: &ToolCall) -> Result<String> {
1223        let tool = self
1224            .tools
1225            .get(&tool_call.name)
1226            .ok_or_else(|| AgentError::Tool(format!("Tool not found: {}", tool_call.name)))?;
1227
1228        let result = tool.execute(tool_call.arguments.clone()).await;
1229
1230        if result.success {
1231            Ok(result.output)
1232        } else {
1233            Err(AgentError::Tool(result.output))
1234        }
1235    }
1236
1237    #[instrument(skip(self, tool_call), fields(tool = %tool_call.name))]
1238    async fn execute_tool_smart(&self, tool_call: &ToolCall) -> Result<String> {
1239        let mut tool_call = tool_call.clone();
1240        info!(args = %tool_call.arguments, "Executing tool");
1241
1242        self.hooks
1243            .on_tool_start(&tool_call.name, &tool_call.arguments)
1244            .await;
1245        let tool_start = Instant::now();
1246
1247        let available_tool_ids = self.get_available_tool_ids().await?;
1248        // Resolve the tool call name to its canonical ID via the registry.
1249        // The LLM may use the display name (e.g., "HTTP Client") but
1250        // available_tool_ids contains IDs (e.g., "http"). Registry.get()
1251        // now matches by ID, display name, and alias.
1252        let resolved_id = self
1253            .tools
1254            .get(&tool_call.name)
1255            .map(|t| t.id().to_lowercase());
1256        let tool_name_lower = tool_call.name.to_lowercase();
1257        if !available_tool_ids.is_empty() {
1258            let is_available = available_tool_ids.iter().any(|id| {
1259                let id_lower = id.to_lowercase();
1260                id_lower == tool_name_lower || resolved_id.as_deref() == Some(&id_lower)
1261            });
1262            if !is_available {
1263                warn!(tool = %tool_call.name, "Tool not available in current context");
1264                return Err(AgentError::Tool(format!(
1265                    "Tool '{}' is not available. Available tools: {}",
1266                    tool_call.name,
1267                    available_tool_ids.join(", ")
1268                )));
1269            }
1270        }
1271
1272        // Build language context for HITL localization.
1273        let hitl_lang_ctx = self.build_hitl_language_context();
1274
1275        // Compute canonical tool ID before security/HITL checks so both use it.
1276        // Use the tool ID (e.g. "http") instead of the name (e.g. "HTTP Client") so it matches the HITL config keys in hitl.tools.<id>.
1277        let hitl_tool_id = resolved_id.as_deref().unwrap_or(&tool_name_lower);
1278
1279        // Check security FIRST -- don't bother the human if security blocks
1280        if self.tool_security.config().enabled {
1281            let security_result = self
1282                .tool_security
1283                .check_tool_execution(&tool_call.name, &tool_call.arguments)
1284                .await?;
1285
1286            match security_result {
1287                SecurityCheckResult::Allow => {}
1288                SecurityCheckResult::Block { reason } => {
1289                    warn!(reason = %reason, "Tool blocked by security");
1290                    return Err(AgentError::Tool(format!("Blocked: {}", reason)));
1291                }
1292                SecurityCheckResult::RequireConfirmation { message } => {
1293                    // Route through HITL if available, otherwise error
1294                    if let Some(ref hitl_engine) = self.hitl_engine {
1295                        let check_result = hitl_engine
1296                            .check_tool_with_localization(
1297                                hitl_tool_id,
1298                                &tool_call.arguments,
1299                                &hitl_lang_ctx,
1300                                self.approval_handler.as_ref(),
1301                                Some(&self.llm_registry),
1302                            )
1303                            .await?;
1304                        let result = self.request_hitl_approval(check_result).await?;
1305                        match result {
1306                            ApprovalResult::Approved | ApprovalResult::Modified { .. } => {}
1307                            ApprovalResult::Rejected { reason } => {
1308                                let reason_str = reason.as_deref().unwrap_or("rejected");
1309                                warn!(tool = %tool_call.name, reason = %reason_str, "Security confirmation rejected by approver");
1310                                return Err(AgentError::Tool(format!(
1311                                    "Confirmation rejected: {}",
1312                                    message
1313                                )));
1314                            }
1315                            _ => {}
1316                        }
1317                    } else {
1318                        warn!(message = %message, "Tool requires confirmation but no HITL handler");
1319                        return Err(AgentError::Tool(format!(
1320                            "Confirmation required: {}",
1321                            message
1322                        )));
1323                    }
1324                }
1325                SecurityCheckResult::Warn { message } => {
1326                    warn!(message = %message, "Tool security warning");
1327                }
1328            }
1329        }
1330
1331        // Check HITL approval for tool (after security passes)
1332        if let Some(ref hitl_engine) = self.hitl_engine {
1333            let check_result = hitl_engine
1334                .check_tool_with_localization(
1335                    hitl_tool_id,
1336                    &tool_call.arguments,
1337                    &hitl_lang_ctx,
1338                    self.approval_handler.as_ref(),
1339                    Some(&self.llm_registry),
1340                )
1341                .await?;
1342            if check_result.is_required() {
1343                let result = self.request_hitl_approval(check_result).await?;
1344                match result {
1345                    ApprovalResult::Approved => {}
1346                    ApprovalResult::Modified { changes } => {
1347                        if let Some(obj) = tool_call.arguments.as_object_mut() {
1348                            for (k, v) in changes {
1349                                obj.insert(k, v);
1350                            }
1351                        }
1352                        info!(tool = %tool_call.name, "Tool arguments modified by approver");
1353                    }
1354                    ApprovalResult::Rejected { reason: _reason } => {
1355                        warn!(tool = %tool_call.name, "Tool execution rejected by HITL");
1356                        return Err(AgentError::HITLRejected(format!(
1357                            "Tool '{}' was rejected by human approver. Do not retry.",
1358                            tool_call.name
1359                        )));
1360                    }
1361                    _ => {}
1362                }
1363            }
1364
1365            // Check conditions (e.g., amount > 1000)
1366            let condition_check = hitl_engine
1367                .check_conditions_with_localization(
1368                    &tool_call.arguments,
1369                    &hitl_lang_ctx,
1370                    self.approval_handler.as_ref(),
1371                    Some(&self.llm_registry),
1372                )
1373                .await?;
1374            if condition_check.is_required() {
1375                let result = self.request_hitl_approval(condition_check).await?;
1376                match result {
1377                    ApprovalResult::Approved => {}
1378                    ApprovalResult::Modified { changes } => {
1379                        if let Some(obj) = tool_call.arguments.as_object_mut() {
1380                            for (k, v) in changes {
1381                                obj.insert(k, v);
1382                            }
1383                        }
1384                        info!(tool = %tool_call.name, "Tool arguments modified by approver (condition)");
1385                    }
1386                    ApprovalResult::Rejected { reason: _reason } => {
1387                        warn!(tool = %tool_call.name, "Tool execution rejected by HITL condition");
1388                        return Err(AgentError::HITLRejected(format!(
1389                            "Tool '{}' was rejected due to policy condition. Do not retry.",
1390                            tool_call.name
1391                        )));
1392                    }
1393                    _ => {}
1394                }
1395            }
1396        }
1397
1398        let tool_config = self.recovery_manager.get_tool_config(&tool_call.name);
1399
1400        let result = if tool_config.max_retries > 0 {
1401            let retry_config = RetryConfig {
1402                max_retries: tool_config.max_retries,
1403                ..Default::default()
1404            };
1405
1406            let tool_call_clone = tool_call.clone();
1407            self.recovery_manager
1408                .with_retry(
1409                    &format!("tool:{}", tool_call.name),
1410                    Some(&retry_config),
1411                    || {
1412                        let tc = tool_call_clone.clone();
1413                        async move { self.execute_tool(&tc).await.map_err(|e| e.classify()) }
1414                    },
1415                )
1416                .await
1417                .map_err(|e| AgentError::Tool(e.to_string()))
1418        } else {
1419            self.execute_tool(&tool_call).await
1420        };
1421
1422        // Apply on_failure policy when the tool fails after all retries.
1423        let result = match result {
1424            Ok(output) => Ok(output),
1425            Err(e) => match &tool_config.on_failure {
1426                ToolFailureAction::Skip => {
1427                    warn!(
1428                        tool = %tool_call.name,
1429                        error = %e,
1430                        "Tool failed, skipping per on_failure: skip policy"
1431                    );
1432                    Ok(format!(
1433                        "{{\"skipped\": true, \"reason\": \"Tool '{}' was skipped after failure\"}}",
1434                        tool_call.name
1435                    ))
1436                }
1437                ToolFailureAction::Fallback { fallback_tool } => {
1438                    warn!(
1439                        tool = %tool_call.name,
1440                        fallback = %fallback_tool,
1441                        error = %e,
1442                        "Tool failed, trying fallback tool"
1443                    );
1444                    let fallback_call = ToolCall {
1445                        id: tool_call.id.clone(),
1446                        name: fallback_tool.clone(),
1447                        arguments: tool_call.arguments.clone(),
1448                    };
1449                    self.execute_tool(&fallback_call).await
1450                }
1451                ToolFailureAction::ReportError => Err(e),
1452            },
1453        };
1454
1455        let tool_duration_ms = tool_start.elapsed().as_millis() as u64;
1456
1457        match &result {
1458            Ok(output) => {
1459                info!(output_len = output.len(), "Tool execution successful");
1460                let result_value: Value =
1461                    serde_json::from_str(output).unwrap_or(Value::String(output.clone()));
1462                self.record_tool_call(&tool_call.name, result_value);
1463
1464                let tool_result = ToolResult {
1465                    success: true,
1466                    output: output.clone(),
1467                    metadata: None,
1468                };
1469                self.hooks
1470                    .on_tool_complete(&tool_call.name, &tool_result, tool_duration_ms)
1471                    .await;
1472            }
1473            Err(e) => {
1474                error!(error = %e, "Tool execution failed");
1475                self.record_tool_call(&tool_call.name, serde_json::json!({"error": e.to_string()}));
1476
1477                let tool_result = ToolResult {
1478                    success: false,
1479                    output: e.to_string(),
1480                    metadata: None,
1481                };
1482                self.hooks
1483                    .on_tool_complete(&tool_call.name, &tool_result, tool_duration_ms)
1484                    .await;
1485                self.hooks.on_error(e).await;
1486            }
1487        }
1488
1489        result
1490    }
1491
1492    /// Result of skill routing — three possible outcomes.
1493    /// NoMatch: no skill matched, continue to normal LLM chat.
1494    /// Response: skill executed successfully, here is the response string.
1495    /// NeedsClarification: skill matched but needs disambiguation first.
1496    async fn try_skill_route(&self, input: &str) -> Result<SkillRouteResult> {
1497        if let Some(ref router) = self.skill_router {
1498            let available_skills = self.get_available_skills();
1499            let skill_ids: Vec<&str> = available_skills.iter().map(|s| s.id.as_str()).collect();
1500
1501            if let Some(skill_id) = router.select_skill_filtered(input, &skill_ids).await? {
1502                info!(skill_id = %skill_id, "Skill selected");
1503
1504                let skill = router
1505                    .get_skill(&skill_id)
1506                    .ok_or_else(|| AgentError::Skill(format!("Skill not found: {}", skill_id)))?;
1507
1508                // Skill-level disambiguation check: if the skill has a disambiguation
1509                // override and it's enabled, run a second disambiguation pass before
1510                // executing. This lets skills enforce stricter thresholds and
1511                // required_clarity fields (e.g. transfer_money requiring recipient + amount).
1512                if let Some(ref skill_disambig) = skill.disambiguation {
1513                    if skill_disambig.enabled.unwrap_or(false) {
1514                        if let Some(ref disambiguator) = self.disambiguation_manager {
1515                            let context = self.build_disambiguation_context().await?;
1516                            let state_override = self
1517                                .state_machine
1518                                .as_ref()
1519                                .and_then(|sm| sm.current_definition())
1520                                .and_then(|def| def.disambiguation.clone());
1521
1522                            match disambiguator
1523                                .process_input_with_override(
1524                                    input,
1525                                    &context,
1526                                    state_override.as_ref(),
1527                                    Some(skill_disambig),
1528                                )
1529                                .await?
1530                            {
1531                                DisambiguationResult::Clear => {
1532                                    debug!(skill_id = %skill_id, "Skill disambiguation: clear");
1533                                    // Fall through to execute_skill below
1534                                }
1535                                DisambiguationResult::NeedsClarification {
1536                                    question,
1537                                    detection,
1538                                } => {
1539                                    info!(
1540                                        skill_id = %skill_id,
1541                                        ambiguity_type = ?detection.ambiguity_type,
1542                                        confidence = detection.confidence,
1543                                        "Skill requires clarification before execution"
1544                                    );
1545                                    // Tag the runtime with the skill ID so the next turn
1546                                    // routes directly to this skill after clarification resolves.
1547                                    *self.pending_skill_id.write() = Some(skill_id.clone());
1548
1549                                    return Ok(SkillRouteResult::NeedsClarification(
1550                                        AgentResponse::new(&question.question).with_metadata(
1551                                            "disambiguation",
1552                                            serde_json::json!({
1553                                                "status": "awaiting_clarification",
1554                                                "skill_id": skill_id,
1555                                                "options": question.options,
1556                                                "clarifying": question.clarifying,
1557                                                "detection": {
1558                                                    "type": detection.ambiguity_type,
1559                                                    "confidence": detection.confidence,
1560                                                    "what_is_unclear": detection.what_is_unclear,
1561                                                }
1562                                            }),
1563                                        ),
1564                                    ));
1565                                }
1566                                DisambiguationResult::Clarified { enriched_input, .. } => {
1567                                    info!(
1568                                        skill_id = %skill_id,
1569                                        enriched = %enriched_input,
1570                                        "Skill disambiguation clarified, executing with enriched input"
1571                                    );
1572                                    return Ok(SkillRouteResult::Response(
1573                                        self.execute_skill(skill, &enriched_input).await?,
1574                                    ));
1575                                }
1576                                DisambiguationResult::ProceedWithBestGuess { enriched_input } => {
1577                                    info!(skill_id = %skill_id, "Skill disambiguation: proceeding with best guess");
1578                                    return Ok(SkillRouteResult::Response(
1579                                        self.execute_skill(skill, &enriched_input).await?,
1580                                    ));
1581                                }
1582                                DisambiguationResult::GiveUp { reason } => {
1583                                    warn!(skill_id = %skill_id, reason = %reason, "Skill disambiguation gave up");
1584                                    let apology = self
1585                                        .generate_localized_apology(
1586                                            "Generate a brief, polite apology saying you couldn't understand the request. Be concise.",
1587                                            &reason,
1588                                        )
1589                                        .await
1590                                        .unwrap_or_else(|_| {
1591                                            format!("I'm sorry, I couldn't understand your request: {}", reason)
1592                                        });
1593                                    return Ok(SkillRouteResult::NeedsClarification(
1594                                        AgentResponse::new(&apology),
1595                                    ));
1596                                }
1597                                DisambiguationResult::Escalate { reason } => {
1598                                    info!(skill_id = %skill_id, reason = %reason, "Skill disambiguation escalating");
1599                                    let apology = self
1600                                        .generate_localized_apology(
1601                                            "Explain briefly that you're transferring the user to a human agent for help.",
1602                                            &reason,
1603                                        )
1604                                        .await
1605                                        .unwrap_or_else(|_| {
1606                                            format!("I need human assistance to help with your request: {}", reason)
1607                                        });
1608                                    return Ok(SkillRouteResult::NeedsClarification(
1609                                        AgentResponse::new(&apology),
1610                                    ));
1611                                }
1612                                DisambiguationResult::Abandoned { .. } => {
1613                                    // User abandoned during skill-level disambiguation.
1614                                    // Return NoMatch so run_loop falls through to normal processing.
1615                                    debug!(skill_id = %skill_id, "Skill disambiguation abandoned");
1616                                    return Ok(SkillRouteResult::NoMatch);
1617                                }
1618                            }
1619                        }
1620                    }
1621                }
1622
1623                return Ok(SkillRouteResult::Response(
1624                    self.execute_skill(skill, input).await?,
1625                ));
1626            }
1627        }
1628        Ok(SkillRouteResult::NoMatch)
1629    }
1630
1631    /// Execute a skill with reasoning and reflection, returning the response string.
1632    async fn execute_skill(&self, skill: &SkillDefinition, input: &str) -> Result<String> {
1633        if let Some(ref executor) = self.skill_executor {
1634            let skill_reasoning = self.get_skill_reasoning_config(skill);
1635            let skill_reflection = self.get_skill_reflection_config(skill);
1636
1637            debug!(
1638                skill_id = %skill.id,
1639                reasoning_mode = ?skill_reasoning.mode,
1640                reflection_enabled = ?skill_reflection.enabled,
1641                "Skill reasoning/reflection config"
1642            );
1643
1644            let response = executor
1645                .execute(skill, input, serde_json::json!({}))
1646                .await?;
1647
1648            if skill_reflection.requires_evaluation() && skill_reflection.is_enabled() {
1649                let should_reflect = self
1650                    .should_reflect_with_config(input, &response, &skill_reflection)
1651                    .await?;
1652                if should_reflect {
1653                    let evaluated = self
1654                        .evaluate_and_retry_with_config(input, response, &skill_reflection)
1655                        .await?;
1656                    return Ok(evaluated);
1657                }
1658            }
1659
1660            return Ok(response);
1661        }
1662        Err(AgentError::Skill(
1663            "No skill executor configured".to_string(),
1664        ))
1665    }
1666
1667    /// Execute a skill by ID, bypassing the skill router.
1668    /// Used after skill-triggered disambiguation resolves to route directly to the matched skill.
1669    async fn execute_skill_by_id(&self, skill_id: &str, input: &str) -> Result<String> {
1670        let skill = self
1671            .skill_router
1672            .as_ref()
1673            .and_then(|r| r.get_skill(skill_id).cloned())
1674            .ok_or_else(|| AgentError::Skill(format!("Skill not found: {}", skill_id)))?;
1675        self.execute_skill(&skill, input).await
1676    }
1677
1678    async fn should_reflect_with_config(
1679        &self,
1680        input: &str,
1681        response: &str,
1682        config: &ReflectionConfig,
1683    ) -> Result<bool> {
1684        if !config.requires_evaluation() {
1685            return Ok(false);
1686        }
1687
1688        if config.is_enabled() {
1689            return Ok(true);
1690        }
1691
1692        let evaluator_llm = config
1693            .evaluator_llm
1694            .as_ref()
1695            .and_then(|alias| self.llm_registry.get(alias).ok())
1696            .or_else(|| self.llm_registry.router().ok())
1697            .or_else(|| self.llm_registry.default().ok());
1698
1699        let Some(llm) = evaluator_llm else {
1700            return Ok(false);
1701        };
1702
1703        let prompt = format!(
1704            r#"Should this response be evaluated for quality? Consider if it's a complex or important response.
1705
1706User query: "{}"
1707Response: "{}"
1708
1709Answer YES or NO only."#,
1710            input,
1711            &response[..response.len().min(500)]
1712        );
1713
1714        let messages = vec![ChatMessage::user(&prompt)];
1715        let result = llm.complete(&messages, None).await;
1716
1717        match result {
1718            Ok(resp) => Ok(resp.content.trim().to_uppercase().contains("YES")),
1719            Err(_) => Ok(false),
1720        }
1721    }
1722
1723    async fn evaluate_and_retry_with_config(
1724        &self,
1725        input: &str,
1726        mut response: String,
1727        config: &ReflectionConfig,
1728    ) -> Result<String> {
1729        let llm = self.get_state_llm()?;
1730        let mut attempts = 0u32;
1731        let max_retries = config.max_retries;
1732
1733        loop {
1734            let evaluation = self
1735                .evaluate_response_with_config(input, &response, config)
1736                .await?;
1737
1738            if evaluation.passed || attempts >= max_retries {
1739                info!(
1740                    passed = evaluation.passed,
1741                    confidence = evaluation.confidence,
1742                    attempts = attempts + 1,
1743                    "Skill reflection evaluation complete"
1744                );
1745                return Ok(response);
1746            }
1747
1748            debug!(
1749                attempt = attempts + 1,
1750                failed_criteria = evaluation.failed_criteria().count(),
1751                "Skill response did not meet criteria, retrying"
1752            );
1753
1754            let feedback: Vec<String> = evaluation
1755                .failed_criteria()
1756                .map(|c| format!("- {}", c.criterion))
1757                .collect();
1758
1759            let retry_prompt = format!(
1760                "Your previous response did not meet these criteria:\n{}\n\nPlease provide an improved response to: {}",
1761                feedback.join("\n"),
1762                input
1763            );
1764
1765            let messages = vec![ChatMessage::user(&retry_prompt)];
1766            let retry_response = llm
1767                .complete(&messages, None)
1768                .await
1769                .map_err(|e| AgentError::LLM(e.to_string()))?;
1770
1771            response = retry_response.content.trim().to_string();
1772            attempts += 1;
1773        }
1774    }
1775
1776    async fn evaluate_response_with_config(
1777        &self,
1778        input: &str,
1779        response: &str,
1780        config: &ReflectionConfig,
1781    ) -> Result<EvaluationResult> {
1782        let evaluator_llm = config
1783            .evaluator_llm
1784            .as_ref()
1785            .and_then(|alias| self.llm_registry.get(alias).ok())
1786            .or_else(|| self.llm_registry.router().ok())
1787            .or_else(|| self.llm_registry.default().ok())
1788            .ok_or_else(|| AgentError::Config("No LLM available for evaluation".into()))?;
1789
1790        let criteria = &config.criteria;
1791        let criteria_list = criteria
1792            .iter()
1793            .enumerate()
1794            .map(|(i, c)| format!("{}. {}", i + 1, c))
1795            .collect::<Vec<_>>()
1796            .join("\n");
1797
1798        let prompt = format!(
1799            r#"Evaluate this response against the criteria.
1800
1801User query: "{}"
1802
1803Response to evaluate: "{}"
1804
1805Criteria:
1806{}
1807
1808For each criterion, respond with:
1809- criterion number
1810- PASS or FAIL
1811- brief reason
1812
1813Then provide overall confidence (0.0 to 1.0) and whether it passes overall.
1814
1815Format:
18161. PASS/FAIL - reason
18172. PASS/FAIL - reason
1818...
1819CONFIDENCE: 0.X
1820OVERALL: PASS/FAIL"#,
1821            input, response, criteria_list
1822        );
1823
1824        let messages = vec![ChatMessage::user(&prompt)];
1825        let eval_response = evaluator_llm
1826            .complete(&messages, None)
1827            .await
1828            .map_err(|e| AgentError::LLM(format!("Evaluation failed: {}", e)))?;
1829
1830        let content = eval_response.content.to_uppercase();
1831        let llm_pass = content.contains("OVERALL: PASS");
1832
1833        let confidence = content
1834            .lines()
1835            .find(|l| l.contains("CONFIDENCE:"))
1836            .and_then(|l| {
1837                l.split(':')
1838                    .nth(1)
1839                    .and_then(|v| v.trim().parse::<f32>().ok())
1840            })
1841            .unwrap_or(if llm_pass { 0.8 } else { 0.4 });
1842
1843        // Gate pass against confidence threshold.
1844        // LLM may say PASS but with low confidence - the threshold catches this.
1845        let overall_pass = llm_pass && confidence >= config.pass_threshold;
1846
1847        let mut criteria_results = Vec::new();
1848        for (i, criterion) in criteria.iter().enumerate() {
1849            let line_marker = format!("{}.", i + 1);
1850            let passed = eval_response
1851                .content
1852                .lines()
1853                .find(|l| l.contains(&line_marker))
1854                .map(|l| l.to_uppercase().contains("PASS"))
1855                .unwrap_or(overall_pass);
1856
1857            if passed {
1858                criteria_results.push(CriterionResult::pass(criterion));
1859            } else {
1860                criteria_results.push(CriterionResult::fail(criterion, "Did not meet criterion"));
1861            }
1862        }
1863
1864        Ok(EvaluationResult::new(overall_pass, confidence).with_criteria(criteria_results))
1865    }
1866
1867    /// Process input through the pipeline (state-level override or agent-level).
1868    async fn process_input(&self, input: &str) -> Result<ProcessData> {
1869        if let Some(processor) = self.get_state_process_processor() {
1870            return processor.process_input(input).await;
1871        }
1872        if let Some(ref processor) = self.process_processor {
1873            processor.process_input(input).await
1874        } else {
1875            Ok(ProcessData::new(input))
1876        }
1877    }
1878
1879    /// Process output through the pipeline (state-level override or agent-level).
1880    async fn process_output(
1881        &self,
1882        output: &str,
1883        input_context: &std::collections::HashMap<String, serde_json::Value>,
1884    ) -> Result<ProcessData> {
1885        if let Some(processor) = self.get_state_process_processor() {
1886            return processor.process_output(output, input_context).await;
1887        }
1888        if let Some(ref processor) = self.process_processor {
1889            processor.process_output(output, input_context).await
1890        } else {
1891            Ok(ProcessData::new(output))
1892        }
1893    }
1894
1895    /// Build a ProcessProcessor from the current state's process config, if any.
1896    fn get_state_process_processor(&self) -> Option<ProcessProcessor> {
1897        let sm = self.state_machine.as_ref()?;
1898        let def = sm.current_definition()?;
1899        let config = def.process.as_ref()?;
1900        let mut processor = ProcessProcessor::new(config.clone());
1901        if let Some(ref registry) = Some(self.llm_registry.clone()) {
1902            processor = processor.with_llm_registry(registry.clone());
1903        }
1904        Some(processor)
1905    }
1906
1907    async fn check_turn_timeout(&self) -> Result<()> {
1908        if let Some(ref sm) = self.state_machine {
1909            if let Some(timeout_state) = sm.check_timeout() {
1910                let from_state = sm.current();
1911                self.execute_state_exit_actions(&from_state).await;
1912                sm.transition_to(&timeout_state, "max_turns exceeded")?;
1913                self.execute_state_enter_actions(&timeout_state).await;
1914                info!(to = %timeout_state, "Timeout transition");
1915            }
1916        }
1917        Ok(())
1918    }
1919
1920    fn increment_turn(&self) {
1921        if let Some(ref sm) = self.state_machine {
1922            sm.increment_turn();
1923        }
1924    }
1925
1926    async fn evaluate_transitions(&self, user_message: &str, response: &str) -> Result<bool> {
1927        let (transitions, evaluator, current_state) =
1928            match (&self.state_machine, &self.transition_evaluator) {
1929                (Some(sm), Some(eval)) => {
1930                    let auto_transitions: Vec<_> = sm
1931                        .auto_transitions()
1932                        .into_iter()
1933                        .filter(|t| {
1934                            // S7: skip transitions on cooldown
1935                            match t.cooldown_turns {
1936                                Some(cd) if cd > 0 => {
1937                                    let resolved =
1938                                        sm.config().resolve_full_path(&sm.current(), &t.to);
1939                                    !sm.is_on_cooldown(&resolved, cd)
1940                                }
1941                                _ => true,
1942                            }
1943                        })
1944                        .collect();
1945                    if auto_transitions.is_empty() {
1946                        return Ok(false);
1947                    }
1948                    (auto_transitions, eval, sm.current())
1949                }
1950                _ => return Ok(false),
1951            };
1952
1953        let context_map = self.context_manager.get_all();
1954        let context = TransitionContext::new(user_message, response, &current_state)
1955            .with_context(context_map);
1956
1957        if let Some(index) = evaluator.select_transition(&transitions, &context).await? {
1958            let target = transitions[index].to.clone();
1959            let reason = if transitions[index].when.is_empty() {
1960                "guard condition met".to_string()
1961            } else {
1962                transitions[index].when.clone()
1963            };
1964
1965            if let Some(ref sm) = self.state_machine {
1966                // Check HITL approval for state transition
1967                let approved = self
1968                    .check_state_hitl(Some(&context.current_state), &target)
1969                    .await?;
1970                if !approved {
1971                    info!(to = %target, "State transition rejected by HITL");
1972                    return Ok(false);
1973                }
1974
1975                // Execute on_exit actions for the current state
1976                self.execute_state_exit_actions(&context.current_state)
1977                    .await;
1978
1979                sm.transition_to(&target, &reason)?;
1980                sm.reset_no_transition();
1981
1982                // Execute on_enter actions for the new state
1983                self.execute_state_enter_actions(&target).await;
1984
1985                self.hooks
1986                    .on_state_transition(Some(&context.current_state), &target, &reason)
1987                    .await;
1988                info!(from = %context.current_state, to = %target, "State transition");
1989            }
1990            return Ok(true);
1991        }
1992
1993        if let Some(ref sm) = self.state_machine {
1994            sm.increment_no_transition();
1995            if let Some(fallback) = sm.check_fallback() {
1996                let from_state = current_state.clone();
1997
1998                // Check HITL approval for fallback transition
1999                let approved = self.check_state_hitl(Some(&from_state), &fallback).await?;
2000                if !approved {
2001                    info!(to = %fallback, "Fallback transition rejected by HITL");
2002                    return Ok(false);
2003                }
2004
2005                // Execute on_exit actions for the current state
2006                self.execute_state_exit_actions(&from_state).await;
2007
2008                sm.transition_to(&fallback, "fallback after no transitions")?;
2009
2010                // Execute on_enter actions for the fallback state
2011                self.execute_state_enter_actions(&fallback).await;
2012
2013                self.hooks
2014                    .on_state_transition(
2015                        Some(&from_state),
2016                        &fallback,
2017                        "fallback after no transitions",
2018                    )
2019                    .await;
2020                info!(to = %fallback, "Fallback transition");
2021                return Ok(true);
2022            }
2023        }
2024
2025        Ok(false)
2026    }
2027
2028    /// Execute on_exit actions for a state being left.
2029    async fn execute_state_exit_actions(&self, state_path: &str) {
2030        if let Some(ref sm) = self.state_machine {
2031            if let Some(def) = sm.get_definition(state_path) {
2032                if !def.on_exit.is_empty() {
2033                    debug!(state = %state_path, count = def.on_exit.len(), "Executing on_exit actions");
2034                    self.execute_state_actions(&def.on_exit).await;
2035                }
2036            }
2037        }
2038    }
2039
2040    /// Execute on_enter (or on_reenter) actions for a state being entered.
2041    async fn execute_state_enter_actions(&self, state_path: &str) {
2042        if let Some(ref sm) = self.state_machine {
2043            if let Some(def) = sm.get_definition(state_path) {
2044                // Check if this state was previously visited
2045                let is_reentry = sm.history().iter().any(|e| e.to == state_path);
2046
2047                if is_reentry && !def.on_reenter.is_empty() {
2048                    debug!(state = %state_path, count = def.on_reenter.len(), "Executing on_reenter actions");
2049                    self.execute_state_actions(&def.on_reenter).await;
2050                } else if !def.on_enter.is_empty() {
2051                    debug!(state = %state_path, count = def.on_enter.len(), "Executing on_enter actions");
2052                    self.execute_state_actions(&def.on_enter).await;
2053                }
2054            }
2055        }
2056    }
2057
2058    /// Execute a list of state actions (tool calls, skill invocations, context updates, LLM prompts).
2059    async fn execute_state_actions(&self, actions: &[StateAction]) {
2060        for action in actions {
2061            match action {
2062                StateAction::Tool { tool, args } => {
2063                    let raw_args = args.clone().unwrap_or(Value::Object(Default::default()));
2064                    // Render template variables in tool args (e.g., {{ context.order_id }})
2065                    let args_value = self.render_action_args(&raw_args);
2066                    if let Some(t) = self.tools.get(tool) {
2067                        self.hooks.on_tool_start(tool, &args_value).await;
2068                        let start = Instant::now();
2069                        let result = t.execute(args_value).await;
2070                        let duration_ms = start.elapsed().as_millis() as u64;
2071                        self.hooks
2072                            .on_tool_complete(tool, &result, duration_ms)
2073                            .await;
2074                        if result.success {
2075                            debug!(tool = %tool, "State action: tool executed");
2076                            // Store tool result in context so YAML prompts can reference it
2077                            let context_key = format!("last_tool_result");
2078                            let _ = self
2079                                .context_manager
2080                                .set(&context_key, serde_json::Value::String(result.output));
2081                        } else {
2082                            warn!(tool = %tool, error = %result.output, "State action: tool failed");
2083                        }
2084                    } else {
2085                        warn!(tool = %tool, "State action: tool not found");
2086                    }
2087                }
2088                StateAction::Skill { skill } => {
2089                    if let Some(ref executor) = self.skill_executor {
2090                        if let Some(def) = self.skills.iter().find(|s| s.id == *skill) {
2091                            match executor.execute(def, "", serde_json::json!({})).await {
2092                                Ok(_) => debug!(skill = %skill, "State action: skill executed"),
2093                                Err(e) => {
2094                                    warn!(skill = %skill, error = %e, "State action: skill failed")
2095                                }
2096                            }
2097                        } else {
2098                            warn!(skill = %skill, "State action: skill not found");
2099                        }
2100                    }
2101                }
2102                StateAction::SetContext { set_context } => {
2103                    for (key, value) in set_context {
2104                        if let Err(e) = self.context_manager.set(key, value.clone()) {
2105                            warn!(key = %key, error = %e, "State action: set_context failed");
2106                        } else {
2107                            debug!(key = %key, "State action: context set");
2108                        }
2109                    }
2110                }
2111                StateAction::Prompt {
2112                    prompt,
2113                    llm,
2114                    store_as,
2115                } => {
2116                    let llm_result = if let Some(alias) = llm {
2117                        self.llm_registry.get(alias)
2118                    } else {
2119                        self.llm_registry.default()
2120                    };
2121                    match llm_result {
2122                        Ok(llm_provider) => {
2123                            // Render template variables and include conversation context
2124                            let context = self.context_manager.get_all();
2125                            let rendered_prompt = self
2126                                .template_renderer
2127                                .render(prompt, &context)
2128                                .unwrap_or_else(|_| prompt.clone());
2129                            let recent =
2130                                self.memory.get_messages(Some(5)).await.unwrap_or_default();
2131                            let mut messages: Vec<ChatMessage> = recent;
2132                            messages.push(ChatMessage::user(&rendered_prompt));
2133                            match llm_provider.complete(&messages, None).await {
2134                                Ok(response) => {
2135                                    if let Some(key) = store_as {
2136                                        let _ = self
2137                                            .context_manager
2138                                            .set(key, Value::String(response.content));
2139                                        debug!(key = %key, "State action: prompt result stored");
2140                                    }
2141                                }
2142                                Err(e) => {
2143                                    warn!(error = %e, "State action: prompt LLM call failed");
2144                                }
2145                            }
2146                        }
2147                        Err(e) => {
2148                            warn!(error = %e, "State action: LLM not found for prompt");
2149                        }
2150                    }
2151                }
2152            }
2153        }
2154    }
2155
2156    /// Run context extractors for the current state on the user's input.
2157    async fn run_context_extractors(&self, user_message: &str) {
2158        let extractors = match &self.state_machine {
2159            Some(sm) => match sm.current_definition() {
2160                Some(def) if !def.extract.is_empty() => def.extract.clone(),
2161                _ => return,
2162            },
2163            None => return,
2164        };
2165
2166        for extractor in &extractors {
2167            let prompt = if let Some(ref custom) = extractor.llm_extract {
2168                format!(
2169                    "User message:\n\"{}\"\n\nInstruction:\n{}",
2170                    user_message, custom
2171                )
2172            } else if let Some(ref desc) = extractor.description {
2173                format!(
2174                    "From the following message, extract: {}\n\n\
2175                     Message: \"{}\"\n\n\
2176                     If the information is present, return ONLY the extracted value.\n\
2177                     If NOT present, return exactly: __NONE__",
2178                    desc, user_message
2179                )
2180            } else {
2181                continue;
2182            };
2183
2184            let llm = match self
2185                .llm_registry
2186                .get(&extractor.llm)
2187                .or_else(|_| self.llm_registry.get("router"))
2188                .or_else(|_| self.llm_registry.get("default"))
2189            {
2190                Ok(llm) => llm,
2191                Err(e) => {
2192                    warn!(key = %extractor.key, error = %e, "Extractor LLM not found");
2193                    continue;
2194                }
2195            };
2196
2197            let messages = vec![ChatMessage::user(&prompt)];
2198            match llm.complete(&messages, None).await {
2199                Ok(response) => {
2200                    let value = response.content.trim().to_string();
2201                    if value != "__NONE__" && !value.is_empty() {
2202                        let _ = self
2203                            .context_manager
2204                            .update(&extractor.key, serde_json::Value::String(value.clone()));
2205                        debug!(key = %extractor.key, value = %value, "Context extracted");
2206                    } else if extractor.required {
2207                        warn!(key = %extractor.key, "Required extraction returned no value");
2208                    }
2209                }
2210                Err(e) => {
2211                    warn!(key = %extractor.key, error = %e, "Context extraction LLM call failed");
2212                }
2213            }
2214        }
2215    }
2216
2217    async fn check_memory_compression(&self) -> Result<()> {
2218        if self.memory.needs_compression() {
2219            let result = self.memory.compress(None).await?;
2220            if let CompressResult::Compressed {
2221                messages_summarized,
2222                new_summary_length,
2223                tokens_saved,
2224            } = result
2225            {
2226                let event = MemoryCompressEvent::new(
2227                    messages_summarized,
2228                    tokens_saved,
2229                    new_summary_length as u32,
2230                );
2231                self.hooks.on_memory_compress(&event).await;
2232                debug!(
2233                    messages = messages_summarized,
2234                    tokens_saved = tokens_saved,
2235                    "Memory compressed"
2236                );
2237            }
2238        }
2239
2240        // Handle overflow AFTER compression, then check warning threshold
2241        self.handle_memory_overflow().await?;
2242        self.check_memory_budget().await;
2243
2244        Ok(())
2245    }
2246
2247    async fn check_memory_budget(&self) {
2248        let Some(ref budget) = self.memory_token_budget else {
2249            return;
2250        };
2251
2252        let context = match self.memory.get_context().await {
2253            Ok(ctx) => ctx,
2254            Err(_) => return,
2255        };
2256
2257        // Overall budget warning
2258        let used_tokens = context.estimated_tokens();
2259        if budget.is_over_warn_threshold(used_tokens) {
2260            let event = MemoryBudgetEvent::new("memory", used_tokens, budget.total);
2261            self.hooks.on_memory_budget_warning(&event).await;
2262            debug!(
2263                used = used_tokens,
2264                total = budget.total,
2265                percent = event.usage_percent,
2266                "Memory budget warning"
2267            );
2268        }
2269
2270        // Per-component warning: summary
2271        if let Some(ref summary) = context.summary {
2272            let summary_tokens = ai_agents_memory::estimate_tokens(summary);
2273            let summary_budget = budget.allocation.summary;
2274            if summary_budget > 0 {
2275                let warn_threshold =
2276                    (summary_budget as f64 * budget.warn_at_percent as f64 / 100.0) as u32;
2277                if summary_tokens >= warn_threshold {
2278                    let event = MemoryBudgetEvent::new("summary", summary_tokens, summary_budget);
2279                    self.hooks.on_memory_budget_warning(&event).await;
2280                }
2281            }
2282        }
2283
2284        // Per-component warning: recent_messages
2285        let recent_tokens: u32 = context
2286            .messages
2287            .iter()
2288            .map(ai_agents_memory::estimate_message_tokens)
2289            .sum();
2290        let recent_budget = budget.allocation.recent_messages;
2291        if recent_budget > 0 {
2292            let warn_threshold =
2293                (recent_budget as f64 * budget.warn_at_percent as f64 / 100.0) as u32;
2294            if recent_tokens >= warn_threshold {
2295                let event = MemoryBudgetEvent::new("recent_messages", recent_tokens, recent_budget);
2296                self.hooks.on_memory_budget_warning(&event).await;
2297            }
2298        }
2299    }
2300
2301    async fn handle_memory_overflow(&self) -> Result<()> {
2302        let Some(ref budget) = self.memory_token_budget else {
2303            return Ok(());
2304        };
2305
2306        let context = self.memory.get_context().await?;
2307        let used_tokens = context.estimated_tokens();
2308
2309        if used_tokens <= budget.total {
2310            return Ok(());
2311        }
2312
2313        match budget.overflow_strategy {
2314            OverflowStrategy::TruncateOldest => {
2315                let tokens_to_free = used_tokens - budget.total;
2316                let messages_to_evict = self.calculate_eviction_count(tokens_to_free);
2317                if messages_to_evict > 0 {
2318                    self.evict_messages(messages_to_evict, EvictionReason::TokenBudgetExceeded)
2319                        .await?;
2320                }
2321            }
2322            OverflowStrategy::SummarizeMore => {
2323                self.memory.compress(None).await?;
2324            }
2325            OverflowStrategy::Error => {
2326                return Err(AgentError::MemoryBudgetExceeded {
2327                    used: used_tokens,
2328                    budget: budget.total,
2329                });
2330            }
2331        }
2332        Ok(())
2333    }
2334
2335    fn calculate_eviction_count(&self, tokens_to_free: u32) -> usize {
2336        // Estimate ~50 tokens per message on average
2337        ((tokens_to_free as f64 / 50.0).ceil() as usize).max(1)
2338    }
2339
2340    async fn evict_messages(&self, count: usize, reason: EvictionReason) -> Result<()> {
2341        let evicted = self.memory.evict_oldest(count).await?;
2342        if !evicted.is_empty() {
2343            let event = MemoryEvictEvent {
2344                reason,
2345                messages_evicted: evicted.len(),
2346                importance_scores: vec![],
2347            };
2348            self.hooks.on_memory_evict(&event).await;
2349            debug!(count = evicted.len(), "Messages evicted from memory");
2350        }
2351        Ok(())
2352    }
2353
2354    #[instrument(skip(self, input), fields(agent = %self.info.name))]
2355    async fn determine_reasoning_mode(&self, input: &str) -> Result<ReasoningMode> {
2356        let effective_config = self.get_effective_reasoning_config();
2357
2358        if !matches!(effective_config.mode, ReasoningMode::Auto) {
2359            return Ok(effective_config.mode.clone());
2360        }
2361
2362        let judge_llm = effective_config
2363            .judge_llm
2364            .as_ref()
2365            .and_then(|alias| self.llm_registry.get(alias).ok())
2366            .or_else(|| self.llm_registry.router().ok())
2367            .or_else(|| self.llm_registry.default().ok());
2368
2369        let Some(llm) = judge_llm else {
2370            return Ok(ReasoningMode::None);
2371        };
2372
2373        let prompt = format!(
2374            r#"Analyze this user request and determine the appropriate reasoning mode.
2375
2376User request: "{}"
2377
2378Choose ONE of these modes:
2379- none: Simple queries, greetings, direct answers (fastest)
2380- cot: Complex analysis, multi-step reasoning, math problems
2381- react: Tasks requiring multiple tool calls with observation
2382- plan_and_execute: Complex multi-step tasks requiring coordination
2383
2384Respond with ONLY the mode name (none, cot, react, or plan_and_execute)."#,
2385            input
2386        );
2387
2388        let messages = vec![ChatMessage::user(&prompt)];
2389        let response = llm.complete(&messages, None).await;
2390
2391        match response {
2392            Ok(resp) => {
2393                let mode_str = resp.content.trim().to_lowercase();
2394                Ok(match mode_str.as_str() {
2395                    "cot" => ReasoningMode::CoT,
2396                    "react" => ReasoningMode::React,
2397                    "plan_and_execute" => ReasoningMode::PlanAndExecute,
2398                    _ => ReasoningMode::None,
2399                })
2400            }
2401            Err(_) => Ok(ReasoningMode::None),
2402        }
2403    }
2404
2405    async fn should_reflect(&self, input: &str, response: &str) -> Result<bool> {
2406        let effective_config = self.get_effective_reflection_config();
2407
2408        if !effective_config.requires_evaluation() {
2409            return Ok(false);
2410        }
2411
2412        if effective_config.is_enabled() {
2413            return Ok(true);
2414        }
2415
2416        let evaluator_llm = effective_config
2417            .evaluator_llm
2418            .as_ref()
2419            .and_then(|alias| self.llm_registry.get(alias).ok())
2420            .or_else(|| self.llm_registry.router().ok())
2421            .or_else(|| self.llm_registry.default().ok());
2422
2423        let Some(llm) = evaluator_llm else {
2424            return Ok(false);
2425        };
2426
2427        let prompt = format!(
2428            r#"Should this response be evaluated for quality? Consider if it's a complex or important response.
2429
2430User query: "{}"
2431Response: "{}"
2432
2433Answer YES or NO only."#,
2434            input,
2435            &response[..response.len().min(500)]
2436        );
2437
2438        let messages = vec![ChatMessage::user(&prompt)];
2439        let result = llm.complete(&messages, None).await;
2440
2441        match result {
2442            Ok(resp) => Ok(resp.content.trim().to_uppercase().contains("YES")),
2443            Err(_) => Ok(false),
2444        }
2445    }
2446
2447    fn build_cot_system_prompt(&self, base_prompt: &str) -> String {
2448        format!(
2449            "{}\n\n<instruction>\nThink through this step by step before answering:\n1. Understand what is being asked\n2. Break down the problem\n3. Work through each part\n4. Provide your final answer\n\nShow your thinking process, then give your final answer.\n</instruction>",
2450            base_prompt
2451        )
2452    }
2453
2454    fn build_react_system_prompt(&self, base_prompt: &str) -> String {
2455        format!(
2456            "{}\n\n<instruction>\nUse the Reason-Act-Observe pattern:\n1. Thought: Think about what to do\n2. Action: Use a tool if needed\n3. Observation: Analyze the result\n4. Repeat until you have the answer\n\nFormat your response showing Thought/Action/Observation steps.\n</instruction>",
2457            base_prompt
2458        )
2459    }
2460
2461    async fn generate_plan(&self, input: &str) -> Result<Plan> {
2462        let effective = self.get_effective_reasoning_config();
2463        let planning_config = effective.get_planning();
2464
2465        let planner_llm = planning_config
2466            .and_then(|c| c.planner_llm.as_ref())
2467            .and_then(|alias| self.llm_registry.get(alias).ok())
2468            .or_else(|| self.llm_registry.router().ok())
2469            .or_else(|| self.llm_registry.default().ok())
2470            .ok_or_else(|| AgentError::Config("No LLM available for planning".into()))?;
2471
2472        let mut available_tool_ids: Vec<String> = self
2473            .get_available_tool_ids()
2474            .await
2475            .unwrap_or_else(|_| self.tools.list_ids());
2476        let mut available_skills: Vec<String> = self.skills.iter().map(|s| s.id.clone()).collect();
2477
2478        // Apply planning-level tool and skill filters.
2479        if let Some(config) = planning_config {
2480            if !config.available.tools.is_all() {
2481                available_tool_ids.retain(|t| config.available.tools.allows(t));
2482            }
2483            if !config.available.skills.is_all() {
2484                available_skills.retain(|s| config.available.skills.allows(s));
2485            }
2486        }
2487
2488        // Build tool descriptions with argument schemas so the planner
2489        // knows how to construct valid args for each step.
2490        let tool_descriptions: Vec<String> = available_tool_ids
2491            .iter()
2492            .filter_map(|id| {
2493                self.tools.get(id).map(|tool| {
2494                    let schema = tool.input_schema();
2495                    let args_desc = schema
2496                        .get("properties")
2497                        .and_then(|p| serde_json::to_string(p).ok())
2498                        .unwrap_or_else(|| "{}".to_string());
2499                    format!(
2500                        "- {} ({}): {}\n  Arguments: {}",
2501                        id,
2502                        tool.name(),
2503                        tool.description(),
2504                        args_desc
2505                    )
2506                })
2507            })
2508            .collect();
2509
2510        let tools_section = if tool_descriptions.is_empty() {
2511            "Available tools: none".to_string()
2512        } else {
2513            format!("Available tools:\n{}", tool_descriptions.join("\n"))
2514        };
2515
2516        let skills_section = if available_skills.is_empty() {
2517            "Available skills: none".to_string()
2518        } else {
2519            format!("Available skills: {}", available_skills.join(", "))
2520        };
2521
2522        let prompt = format!(
2523            r#"Create a step-by-step plan to accomplish this goal.
2524
2525Goal: "{}"
2526
2527{}
2528
2529{}
2530
2531Create a plan with clear steps. For each step, specify:
2532- description: What this step accomplishes
2533- action_type: "tool", "skill", "think", or "respond"
2534- action_target: The tool/skill id (if applicable)
2535- args: The arguments object matching the tool's schema (if action_type is "tool")
2536- dependencies: List of step IDs this depends on (empty if none)
2537
2538Respond in JSON format:
2539{{
2540  "steps": [
2541    {{"id": "step1", "description": "...", "action_type": "tool", "action_target": "tool_id", "args": {{"required_field": "value"}}, "dependencies": []}},
2542    {{"id": "step2", "description": "...", "action_type": "think", "action_target": "...", "dependencies": ["step1"]}}
2543  ]
2544}}"#,
2545            input, tools_section, skills_section,
2546        );
2547
2548        let messages = vec![ChatMessage::user(&prompt)];
2549        let response = planner_llm
2550            .complete(&messages, None)
2551            .await
2552            .map_err(|e| AgentError::LLM(format!("Planning failed: {}", e)))?;
2553
2554        let mut plan = Plan::new(input);
2555
2556        if let Some(json_start) = response.content.find('{') {
2557            if let Some(json_end) = response.content.rfind('}') {
2558                let json_str = &response.content[json_start..=json_end];
2559                if let Ok(parsed) = serde_json::from_str::<serde_json::Value>(json_str) {
2560                    if let Some(steps) = parsed.get("steps").and_then(|s| s.as_array()) {
2561                        for step_value in steps {
2562                            let id = step_value
2563                                .get("id")
2564                                .and_then(|v| v.as_str())
2565                                .unwrap_or("step");
2566                            let desc = step_value
2567                                .get("description")
2568                                .and_then(|v| v.as_str())
2569                                .unwrap_or("");
2570                            let action_type = step_value
2571                                .get("action_type")
2572                                .and_then(|v| v.as_str())
2573                                .unwrap_or("think");
2574                            let action_target = step_value
2575                                .get("action_target")
2576                                .and_then(|v| v.as_str())
2577                                .unwrap_or("");
2578                            let args = step_value
2579                                .get("args")
2580                                .cloned()
2581                                .unwrap_or(serde_json::json!({}));
2582                            let deps: Vec<String> = step_value
2583                                .get("dependencies")
2584                                .and_then(|v| v.as_array())
2585                                .map(|arr| {
2586                                    arr.iter()
2587                                        .filter_map(|v| v.as_str().map(String::from))
2588                                        .collect()
2589                                })
2590                                .unwrap_or_default();
2591
2592                            let action = match action_type {
2593                                "tool" => PlanAction::tool(action_target, args),
2594                                "skill" => PlanAction::skill(action_target),
2595                                "respond" => PlanAction::respond(action_target),
2596                                _ => PlanAction::think(desc),
2597                            };
2598
2599                            let step = PlanStep::new(desc, action)
2600                                .with_id(id)
2601                                .with_dependencies(deps);
2602                            plan.add_step(step);
2603                        }
2604                    }
2605                }
2606            }
2607        }
2608
2609        if plan.steps.is_empty() {
2610            plan.add_step(PlanStep::new(
2611                "Process the request",
2612                PlanAction::think(input),
2613            ));
2614            plan.add_step(PlanStep::new(
2615                "Provide response",
2616                PlanAction::respond("Answer based on analysis"),
2617            ));
2618        }
2619
2620        Ok(plan)
2621    }
2622
2623    async fn execute_plan(&self, plan: &mut Plan) -> Result<String> {
2624        let llm = self.get_state_llm()?;
2625        let mut results: HashMap<String, serde_json::Value> = HashMap::new();
2626        let effective = self.get_effective_reasoning_config();
2627        let max_steps = effective.get_planning().map(|c| c.max_steps).unwrap_or(10);
2628
2629        plan.status = PlanStatus::InProgress;
2630
2631        for step_idx in 0..plan.steps.len().min(max_steps as usize) {
2632            let step = &plan.steps[step_idx];
2633
2634            let deps_satisfied = step.dependencies.iter().all(|dep| {
2635                plan.steps
2636                    .iter()
2637                    .find(|s| &s.id == dep)
2638                    .map(|s| s.status.is_completed())
2639                    .unwrap_or(false)
2640            });
2641
2642            if !deps_satisfied {
2643                continue;
2644            }
2645
2646            plan.steps[step_idx].mark_running();
2647
2648            let result = match &plan.steps[step_idx].action {
2649                PlanAction::Tool { tool, args } => {
2650                    // When a tool step has dependency results, ask the LLM to
2651                    // produce the correct arguments given the context and tool schema.
2652                    // This avoids brittle {{stepN}} template substitution and lets
2653                    // the LLM handle type adaptation (e.g. picking the iso field
2654                    // from a datetime result for a downstream format call).
2655                    let has_dep_results = plan.steps[step_idx]
2656                        .dependencies
2657                        .iter()
2658                        .any(|dep| results.contains_key(dep));
2659
2660                    let final_args = if has_dep_results {
2661                        let dep_context: String = plan.steps[step_idx]
2662                            .dependencies
2663                            .iter()
2664                            .filter_map(|dep| results.get(dep).map(|r| format!("{}: {}", dep, r)))
2665                            .collect::<Vec<_>>()
2666                            .join("\n");
2667
2668                        let tool_schema = self
2669                            .tools
2670                            .get(tool)
2671                            .map(|t| {
2672                                let schema = t.input_schema();
2673                                let props = schema
2674                                    .get("properties")
2675                                    .and_then(|p| serde_json::to_string(p).ok())
2676                                    .unwrap_or_else(|| "{}".to_string());
2677                                format!(
2678                                    "{}: {}\nArguments schema: {}",
2679                                    t.id(),
2680                                    t.description(),
2681                                    props
2682                                )
2683                            })
2684                            .unwrap_or_default();
2685
2686                        let step_desc = &plan.steps[step_idx].description;
2687                        let arg_prompt = format!(
2688                            "Generate the JSON arguments for a tool call.\n\n\
2689                             Tool: {}\n\n\
2690                             Task: {}\n\n\
2691                             Previous step results:\n{}\n\n\
2692                             Planner's draft arguments: {}\n\n\
2693                             Produce ONLY a valid JSON object with the correct argument values.\n\
2694                             Use actual values from the previous step results, not template references.",
2695                            tool_schema,
2696                            step_desc,
2697                            dep_context,
2698                            serde_json::to_string(args).unwrap_or_default()
2699                        );
2700                        let messages = vec![ChatMessage::user(&arg_prompt)];
2701                        match llm.complete(&messages, None).await {
2702                            Ok(resp) => {
2703                                let content = resp.content.trim();
2704                                // Parse the LLM's JSON response, fall back to planner args.
2705                                let json_start = content.find('{');
2706                                let json_end = content.rfind('}');
2707                                if let (Some(start), Some(end)) = (json_start, json_end) {
2708                                    serde_json::from_str(&content[start..=end])
2709                                        .unwrap_or_else(|_| args.clone())
2710                                } else {
2711                                    args.clone()
2712                                }
2713                            }
2714                            Err(_) => args.clone(),
2715                        }
2716                    } else {
2717                        args.clone()
2718                    };
2719
2720                    let tool_call = ToolCall {
2721                        id: uuid::Uuid::new_v4().to_string(),
2722                        name: tool.clone(),
2723                        arguments: final_args,
2724                    };
2725                    match self.execute_tool_smart(&tool_call).await {
2726                        Ok(output) => serde_json::json!({ "output": output }),
2727                        Err(e) => {
2728                            plan.steps[step_idx].mark_failed(e.to_string());
2729                            continue;
2730                        }
2731                    }
2732                }
2733                PlanAction::Skill { skill } => {
2734                    if let Some(skill_def) = self.skills.iter().find(|s| &s.id == skill) {
2735                        if let Some(ref executor) = self.skill_executor {
2736                            match executor.execute(skill_def, "", serde_json::json!({})).await {
2737                                Ok(output) => serde_json::json!({ "output": output }),
2738                                Err(e) => {
2739                                    plan.steps[step_idx].mark_failed(e.to_string());
2740                                    continue;
2741                                }
2742                            }
2743                        } else {
2744                            serde_json::json!({ "output": "Skill executor not available" })
2745                        }
2746                    } else {
2747                        plan.steps[step_idx].mark_failed("Skill not found");
2748                        continue;
2749                    }
2750                }
2751                PlanAction::Think { prompt } => {
2752                    let context: String = results
2753                        .iter()
2754                        .map(|(k, v)| format!("{}: {}", k, v))
2755                        .collect::<Vec<_>>()
2756                        .join("\n");
2757
2758                    let think_prompt = format!("Context:\n{}\n\nTask: {}", context, prompt);
2759                    let messages = vec![ChatMessage::user(&think_prompt)];
2760
2761                    match llm.complete(&messages, None).await {
2762                        Ok(resp) => serde_json::json!({ "output": resp.content }),
2763                        Err(e) => {
2764                            plan.steps[step_idx].mark_failed(e.to_string());
2765                            continue;
2766                        }
2767                    }
2768                }
2769                PlanAction::Respond { template } => {
2770                    let context: String = results
2771                        .iter()
2772                        .map(|(k, v)| format!("{}: {}", k, v))
2773                        .collect::<Vec<_>>()
2774                        .join("\n");
2775
2776                    let respond_prompt = format!(
2777                        "Based on this context:\n{}\n\nGenerate a response following this template/instruction: {}",
2778                        context, template
2779                    );
2780                    let messages = vec![ChatMessage::user(&respond_prompt)];
2781
2782                    match llm.complete(&messages, None).await {
2783                        Ok(resp) => serde_json::json!({ "output": resp.content }),
2784                        Err(e) => {
2785                            plan.steps[step_idx].mark_failed(e.to_string());
2786                            continue;
2787                        }
2788                    }
2789                }
2790            };
2791
2792            results.insert(plan.steps[step_idx].id.clone(), result.clone());
2793            plan.steps[step_idx].mark_completed(Some(result));
2794        }
2795
2796        // Set plan status based on whether any steps actually failed.
2797        let has_failures = plan.steps.iter().any(|s| s.status.is_failed());
2798        if has_failures {
2799            let failed_ids: Vec<String> = plan
2800                .steps
2801                .iter()
2802                .filter(|s| s.status.is_failed())
2803                .map(|s| s.id.clone())
2804                .collect();
2805            plan.status = PlanStatus::Failed {
2806                error: format!("Steps failed: {}", failed_ids.join(", ")),
2807            };
2808        } else {
2809            plan.status = PlanStatus::Completed;
2810        }
2811
2812        // Synthesize final output from all completed step results.
2813        let all_outputs: Vec<String> = plan
2814            .steps
2815            .iter()
2816            .filter(|s| s.status.is_completed())
2817            .filter_map(|s| {
2818                s.result
2819                    .as_ref()
2820                    .and_then(|r| r.get("output"))
2821                    .and_then(|o| o.as_str())
2822                    .map(|o| format!("{}: {}", s.description, o))
2823            })
2824            .collect();
2825
2826        if all_outputs.is_empty() {
2827            return Ok("Plan execution completed but produced no results.".to_string());
2828        }
2829
2830        if all_outputs.len() == 1 {
2831            return Ok(all_outputs.into_iter().next().unwrap());
2832        }
2833
2834        // Synthesize a coherent summary from multiple step results via LLM.
2835        let context = all_outputs.join("\n\n");
2836        let prompt = format!(
2837            "You completed a multi-step plan for: \"{}\"\n\nStep results:\n{}\n\nProvide a coherent final response that synthesizes these results.",
2838            plan.goal, context
2839        );
2840        let messages = vec![ChatMessage::user(&prompt)];
2841        match llm.complete(&messages, None).await {
2842            Ok(resp) => Ok(resp.content.trim().to_string()),
2843            Err(_) => Ok(context),
2844        }
2845    }
2846
2847    async fn evaluate_response(&self, input: &str, response: &str) -> Result<EvaluationResult> {
2848        let effective_config = self.get_effective_reflection_config();
2849        self.evaluate_response_with_config(input, response, &effective_config)
2850            .await
2851    }
2852
2853    fn extract_thinking(&self, content: &str) -> (Option<String>, String) {
2854        if let Some(start) = content.find("<thinking>") {
2855            if let Some(end) = content.find("</thinking>") {
2856                let thinking = content[start + 10..end].trim().to_string();
2857                let answer = content[end + 11..].trim().to_string();
2858                return (Some(thinking), answer);
2859            }
2860        }
2861        (None, content.to_string())
2862    }
2863
2864    fn format_response_with_thinking(&self, thinking: Option<&str>, answer: &str) -> String {
2865        match self.get_effective_reasoning_config().output {
2866            ReasoningOutput::Hidden => answer.to_string(),
2867            ReasoningOutput::Visible => {
2868                if let Some(t) = thinking {
2869                    format!("Thinking:\n{}\n\nAnswer:\n{}", t, answer)
2870                } else {
2871                    answer.to_string()
2872                }
2873            }
2874            ReasoningOutput::Tagged => {
2875                if let Some(t) = thinking {
2876                    format!("<thinking>{}</thinking>\n{}", t, answer)
2877                } else {
2878                    answer.to_string()
2879                }
2880            }
2881        }
2882    }
2883
2884    async fn run_loop(&self, input: &str) -> Result<AgentResponse> {
2885        info!(input_len = input.len(), "Starting chat");
2886
2887        self.hooks.on_message_received(input).await;
2888
2889        // One-shot context initialization: load runtime defaults, resolve env vars,
2890        // populate builtin sources (session, agent), etc.  This must happen before
2891        // the first template render so that {{ context.* }} variables are available.
2892        if !self.context_initialized.swap(true, Ordering::SeqCst) {
2893            self.context_manager.initialize().await?;
2894            debug!("Context manager initialized (defaults, env, builtins)");
2895        }
2896
2897        self.check_turn_timeout().await?;
2898        self.context_manager.refresh_per_turn().await?;
2899
2900        // Clear stale disambiguation context from previous turns.
2901        // This prevents resolved_intent from leaking across turns and causing incorrect deterministic routing on subsequent inputs.
2902        self.clear_disambiguation_context();
2903
2904        // Disambiguation check (before input processing)
2905        if let Some(ref disambiguator) = self.disambiguation_manager {
2906            let disambiguation_context = self.build_disambiguation_context().await?;
2907
2908            // Get state-level disambiguation override
2909            let state_override = self
2910                .state_machine
2911                .as_ref()
2912                .and_then(|sm| sm.current_definition())
2913                .and_then(|def| def.disambiguation.clone());
2914
2915            match disambiguator
2916                .process_input_with_override(
2917                    input,
2918                    &disambiguation_context,
2919                    state_override.as_ref(),
2920                    None,
2921                )
2922                .await?
2923            {
2924                DisambiguationResult::Clear => {
2925                    debug!("Input is clear, proceeding normally");
2926                }
2927                DisambiguationResult::NeedsClarification {
2928                    question,
2929                    detection,
2930                } => {
2931                    info!(
2932                        ambiguity_type = ?detection.ambiguity_type,
2933                        confidence = detection.confidence,
2934                        "Input requires clarification"
2935                    );
2936
2937                    self.memory.add_message(ChatMessage::user(input)).await?;
2938                    self.memory
2939                        .add_message(ChatMessage::assistant(&question.question))
2940                        .await?;
2941
2942                    return Ok(AgentResponse::new(&question.question).with_metadata(
2943                        "disambiguation",
2944                        serde_json::json!({
2945                            "status": "awaiting_clarification",
2946                            "options": question.options,
2947                            "clarifying": question.clarifying,
2948                            "detection": {
2949                                "type": detection.ambiguity_type,
2950                                "confidence": detection.confidence,
2951                                "what_is_unclear": detection.what_is_unclear,
2952                            }
2953                        }),
2954                    ));
2955                }
2956                DisambiguationResult::Clarified {
2957                    enriched_input,
2958                    resolved,
2959                    ..
2960                } => {
2961                    info!(
2962                        resolved_count = resolved.len(),
2963                        enriched = %enriched_input,
2964                        "Input clarified, injecting resolved intent into context"
2965                    );
2966
2967                    // Routing uses `resolved` (structured, deterministic)
2968                    // This is what makes post-disambiguation routing DETERMINISTIC
2969                    for (key, value) in &resolved {
2970                        let context_key = format!("disambiguation.{}", key);
2971                        let _ = self.context_manager.set(&context_key, value.clone());
2972                    }
2973
2974                    if let Some(intent) = resolved.get("intent") {
2975                        let _ = self.context_manager.set("resolved_intent", intent.clone());
2976                    }
2977
2978                    let _ = self
2979                        .context_manager
2980                        .set("disambiguation.resolved", serde_json::Value::Bool(true));
2981
2982                    // Check if this clarification was triggered by a skill-level override.
2983                    // If so, route directly to the matched skill instead of going through
2984                    // skill routing again (which might match a different skill).
2985                    let skill_id = self.pending_skill_id.read().clone();
2986                    if let Some(skill_id) = skill_id {
2987                        info!(skill_id = %skill_id, "Re-checking skill disambiguation on clarified input");
2988                        return self
2989                            .recheck_skill_disambiguation(&skill_id, &enriched_input)
2990                            .await;
2991                    }
2992
2993                    return self.run_loop_internal(&enriched_input).await;
2994                }
2995                DisambiguationResult::ProceedWithBestGuess { enriched_input } => {
2996                    info!("Proceeding with best guess interpretation");
2997
2998                    // Same skill-id re-check for best-guess path
2999                    let skill_id = self.pending_skill_id.read().clone();
3000                    if let Some(skill_id) = skill_id {
3001                        info!(skill_id = %skill_id, "Re-checking skill disambiguation on best-guess input");
3002                        return self
3003                            .recheck_skill_disambiguation(&skill_id, &enriched_input)
3004                            .await;
3005                    }
3006
3007                    return self.run_loop_internal(&enriched_input).await;
3008                }
3009                DisambiguationResult::GiveUp { reason } => {
3010                    *self.pending_skill_id.write() = None;
3011                    warn!(reason = %reason, "Disambiguation gave up");
3012                    let apology = self
3013                        .generate_localized_apology(
3014                            "Generate a brief, polite apology saying you couldn't understand the request. Be concise.",
3015                            &reason,
3016                        )
3017                        .await
3018                        .unwrap_or_else(|_| {
3019                            format!("I'm sorry, I couldn't understand your request: {}", reason)
3020                        });
3021                    return Ok(AgentResponse::new(&apology));
3022                }
3023                DisambiguationResult::Escalate { reason } => {
3024                    *self.pending_skill_id.write() = None;
3025                    info!(reason = %reason, "Escalating to human");
3026                    if let Some(ref hitl) = self.hitl_engine {
3027                        let trigger =
3028                            ApprovalTrigger::condition("disambiguation_escalation", reason.clone());
3029                        let mut context_map = HashMap::new();
3030                        context_map.insert("original_input".to_string(), serde_json::json!(input));
3031                        context_map.insert("reason".to_string(), serde_json::json!(&reason));
3032                        let check_result = HITLCheckResult::required(
3033                            trigger,
3034                            context_map,
3035                            format!("User request needs human assistance: {}", reason),
3036                            Some(hitl.config().default_timeout_seconds),
3037                        );
3038                        let result = self.request_hitl_approval(check_result).await?;
3039                        if matches!(
3040                            result,
3041                            ApprovalResult::Approved | ApprovalResult::Modified { .. }
3042                        ) {
3043                            return self.run_loop_internal(input).await;
3044                        }
3045                    }
3046                    let apology = self
3047                        .generate_localized_apology(
3048                            "Explain briefly that you're transferring the user to a human agent for help.",
3049                            &reason,
3050                        )
3051                        .await
3052                        .unwrap_or_else(|_| {
3053                            format!("I need human assistance to help with your request: {}", reason)
3054                        });
3055                    return Ok(AgentResponse::new(&apology));
3056                }
3057                DisambiguationResult::Abandoned { new_input } => {
3058                    *self.pending_skill_id.write() = None;
3059
3060                    info!(
3061                        has_new_input = new_input.is_some(),
3062                        "Clarification abandoned by user"
3063                    );
3064
3065                    self.memory.add_message(ChatMessage::user(input)).await?;
3066
3067                    match new_input {
3068                        Some(fresh_input) => {
3069                            // Topic switch: process the user's new input from scratch.
3070                            // The LLM sees full conversation context including the abandoned exchange.
3071                            return self.run_loop_internal(&fresh_input).await;
3072                        }
3073                        None => {
3074                            // Pure abandonment: generate a brief acknowledgment.
3075                            let ack = self
3076                                .generate_localized_apology(
3077                                    "The user changed their mind about their previous request. \
3078                                     Generate a brief, friendly acknowledgment (e.g. 'OK, no problem. What else can I help with?'). \
3079                                     Do NOT apologize excessively. Be concise.",
3080                                    "User abandoned clarification",
3081                                )
3082                                .await
3083                                .unwrap_or_else(|_| {
3084                                    "OK, no problem. What else can I help with?".to_string()
3085                                });
3086
3087                            self.memory
3088                                .add_message(ChatMessage::assistant(&ack))
3089                                .await?;
3090
3091                            return Ok(AgentResponse::new(&ack));
3092                        }
3093                    }
3094                }
3095            }
3096        }
3097
3098        self.run_loop_internal(input).await
3099    }
3100
3101    /// Generate a localized response using the router LLM
3102    async fn generate_localized_apology(&self, instruction: &str, reason: &str) -> Result<String> {
3103        let llm = self.llm_registry.router().map_err(|e| {
3104            AgentError::LLM(format!(
3105                "Router LLM not available for localized response: {}",
3106                e
3107            ))
3108        })?;
3109
3110        let recent: Vec<String> = self
3111            .memory
3112            .get_messages(Some(3))
3113            .await?
3114            .iter()
3115            .map(|m| m.content.clone())
3116            .collect();
3117
3118        let context_hint = if recent.is_empty() {
3119            String::new()
3120        } else {
3121            format!(
3122                "\nRecent conversation (detect the user's language from this):\n{}\n",
3123                recent.join("\n")
3124            )
3125        };
3126
3127        let prompt = format!(
3128            "{}\nReason: {}\n{}Respond in the same language as the user. Output ONLY the message, nothing else.",
3129            instruction, reason, context_hint
3130        );
3131
3132        let messages = vec![ChatMessage::user(&prompt)];
3133        let response = llm
3134            .complete(&messages, None)
3135            .await
3136            .map_err(|e| AgentError::LLM(format!("Localized response generation failed: {}", e)))?;
3137
3138        Ok(response.content.trim().to_string())
3139    }
3140
3141    /// Clear disambiguation-related keys from the context manager.
3142    ///
3143    /// Render template variables in state action args using the context manager.
3144    fn render_action_args(&self, args: &Value) -> Value {
3145        let context = self.context_manager.get_all();
3146        match args {
3147            Value::Object(map) => {
3148                let mut rendered = serde_json::Map::new();
3149                for (k, v) in map {
3150                    match v {
3151                        Value::String(s) if s.contains("{{") => {
3152                            match self.template_renderer.render(s, &context) {
3153                                Ok(rendered_str) => {
3154                                    rendered.insert(k.clone(), Value::String(rendered_str));
3155                                }
3156                                Err(_) => {
3157                                    rendered.insert(k.clone(), v.clone());
3158                                }
3159                            }
3160                        }
3161                        _ => {
3162                            rendered.insert(k.clone(), v.clone());
3163                        }
3164                    }
3165                }
3166                Value::Object(rendered)
3167            }
3168            _ => args.clone(),
3169        }
3170    }
3171
3172    /// Called at the start of each turn to prevent stale `resolved_intent` from leaking across turns.
3173    fn clear_disambiguation_context(&self) {
3174        let _ = self
3175            .context_manager
3176            .set("resolved_intent", serde_json::Value::Null);
3177
3178        let all = self.context_manager.get_all();
3179        for key in all.keys() {
3180            if key.starts_with("disambiguation.") {
3181                let _ = self.context_manager.set(key, serde_json::Value::Null);
3182            }
3183        }
3184    }
3185
3186    /// Re-run skill disambiguation on enriched input before executing the skill.
3187    /// After clarification resolves, the enriched input may still be missing required_clarity fields (e.g. "Transfer money to Jane." still lacks amount).
3188    /// This method re-runs the skill's disambiguation pass.
3189    /// If fields are still missing, it returns the new clarification question and keeps pending_skill_id set.
3190    /// If all fields are present (Clear), it executes the skill and returns the response.
3191    async fn recheck_skill_disambiguation(
3192        &self,
3193        skill_id: &str,
3194        enriched_input: &str,
3195    ) -> Result<AgentResponse> {
3196        let skill = self
3197            .skill_router
3198            .as_ref()
3199            .and_then(|r| r.get_skill(skill_id).cloned());
3200
3201        // If the skill has disambiguation enabled, re-run it on the enriched input.
3202        if let Some(ref skill) = skill {
3203            if let Some(ref skill_disambig) = skill.disambiguation {
3204                if skill_disambig.enabled.unwrap_or(false) {
3205                    if let Some(ref disambiguator) = self.disambiguation_manager {
3206                        let context = self.build_disambiguation_context().await?;
3207                        let state_override = self
3208                            .state_machine
3209                            .as_ref()
3210                            .and_then(|sm| sm.current_definition())
3211                            .and_then(|def| def.disambiguation.clone());
3212
3213                        match disambiguator
3214                            .process_input_with_override(
3215                                enriched_input,
3216                                &context,
3217                                state_override.as_ref(),
3218                                Some(skill_disambig),
3219                            )
3220                            .await?
3221                        {
3222                            DisambiguationResult::Clear => {
3223                                debug!(skill_id = %skill_id, "Skill re-check: all fields present");
3224                            }
3225                            DisambiguationResult::NeedsClarification {
3226                                question,
3227                                detection,
3228                            } => {
3229                                info!(
3230                                    skill_id = %skill_id,
3231                                    ambiguity_type = ?detection.ambiguity_type,
3232                                    what_is_unclear = ?detection.what_is_unclear,
3233                                    "Skill re-check: still missing fields, asking again"
3234                                );
3235                                // Keep pending_skill_id set (do NOT clear it).
3236                                // The next turn will resolve this new clarification and
3237                                // re-enter this method until all fields are present.
3238                                self.memory
3239                                    .add_message(ChatMessage::user(enriched_input))
3240                                    .await?;
3241                                self.memory
3242                                    .add_message(ChatMessage::assistant(&question.question))
3243                                    .await?;
3244
3245                                return Ok(AgentResponse::new(&question.question).with_metadata(
3246                                    "disambiguation",
3247                                    serde_json::json!({
3248                                        "status": "awaiting_clarification",
3249                                        "skill_id": skill_id,
3250                                        "options": question.options,
3251                                        "clarifying": question.clarifying,
3252                                        "detection": {
3253                                            "type": detection.ambiguity_type,
3254                                            "confidence": detection.confidence,
3255                                            "what_is_unclear": detection.what_is_unclear,
3256                                        }
3257                                    }),
3258                                ));
3259                            }
3260                            DisambiguationResult::Clarified {
3261                                enriched_input: re_enriched,
3262                                ..
3263                            } => {
3264                                debug!(skill_id = %skill_id, "Skill re-check: clarified immediately, executing");
3265                                // Fall through to execute with the further-enriched input.
3266                                *self.pending_skill_id.write() = None;
3267                                let skill_response =
3268                                    self.execute_skill_by_id(skill_id, &re_enriched).await?;
3269                                self.memory
3270                                    .add_message(ChatMessage::user(&re_enriched))
3271                                    .await?;
3272                                return self
3273                                    .handle_skill_response(
3274                                        &re_enriched,
3275                                        skill_response,
3276                                        &HashMap::new(),
3277                                    )
3278                                    .await;
3279                            }
3280                            DisambiguationResult::ProceedWithBestGuess {
3281                                enriched_input: re_enriched,
3282                            } => {
3283                                debug!(skill_id = %skill_id, "Skill re-check: proceeding with best guess");
3284                                *self.pending_skill_id.write() = None;
3285                                let skill_response =
3286                                    self.execute_skill_by_id(skill_id, &re_enriched).await?;
3287                                self.memory
3288                                    .add_message(ChatMessage::user(&re_enriched))
3289                                    .await?;
3290                                return self
3291                                    .handle_skill_response(
3292                                        &re_enriched,
3293                                        skill_response,
3294                                        &HashMap::new(),
3295                                    )
3296                                    .await;
3297                            }
3298                            DisambiguationResult::GiveUp { reason } => {
3299                                *self.pending_skill_id.write() = None;
3300                                let apology = self
3301                                    .generate_localized_apology(
3302                                        "Generate a brief, polite apology saying you couldn't understand the request. Be concise.",
3303                                        &reason,
3304                                    )
3305                                    .await
3306                                    .unwrap_or_else(|_| {
3307                                        format!("I'm sorry, I couldn't understand your request: {}", reason)
3308                                    });
3309                                return Ok(AgentResponse::new(&apology));
3310                            }
3311                            DisambiguationResult::Escalate { reason } => {
3312                                *self.pending_skill_id.write() = None;
3313                                let apology = self
3314                                    .generate_localized_apology(
3315                                        "Explain briefly that you're transferring the user to a human agent for help.",
3316                                        &reason,
3317                                    )
3318                                    .await
3319                                    .unwrap_or_else(|_| {
3320                                        format!("I need human assistance to help with your request: {}", reason)
3321                                    });
3322                                return Ok(AgentResponse::new(&apology));
3323                            }
3324                            DisambiguationResult::Abandoned { new_input } => {
3325                                // User abandoned during skill re-check.
3326                                // Clear skill routing state and fall through to normal execution.
3327                                *self.pending_skill_id.write() = None;
3328                                debug!(skill_id = %skill_id, "Skill re-check: abandoned by user");
3329                                if let Some(fresh) = new_input {
3330                                    return self.run_loop_internal(&fresh).await;
3331                                }
3332                                let ack = self
3333                                    .generate_localized_apology(
3334                                        "The user changed their mind about their previous request. \
3335                                         Generate a brief, friendly acknowledgment (e.g. 'OK, no problem. What else can I help with?'). \
3336                                         Do NOT apologize excessively. Be concise.",
3337                                        "User abandoned clarification",
3338                                    )
3339                                    .await
3340                                    .unwrap_or_else(|_| {
3341                                        "OK, no problem. What else can I help with?".to_string()
3342                                    });
3343                                self.memory
3344                                    .add_message(ChatMessage::assistant(&ack))
3345                                    .await?;
3346                                return Ok(AgentResponse::new(&ack));
3347                            }
3348                        }
3349                    }
3350                }
3351            }
3352        }
3353
3354        // Skill has no disambiguation or all fields present: execute.
3355        *self.pending_skill_id.write() = None;
3356        let skill_response = self.execute_skill_by_id(skill_id, enriched_input).await?;
3357        self.memory
3358            .add_message(ChatMessage::user(enriched_input))
3359            .await?;
3360        self.handle_skill_response(enriched_input, skill_response, &HashMap::new())
3361            .await
3362    }
3363
3364    /// Handle skill routing result: output processing, memory, transitions.
3365    /// Returns a fully formed AgentResponse for skill-routed requests.
3366    async fn handle_skill_response(
3367        &self,
3368        processed_input: &str,
3369        skill_response: String,
3370        input_context: &HashMap<String, Value>,
3371    ) -> Result<AgentResponse> {
3372        let output_data = self.process_output(&skill_response, input_context).await?;
3373        let final_response = output_data.content;
3374
3375        self.memory
3376            .add_message(ChatMessage::assistant(&final_response))
3377            .await?;
3378
3379        self.check_memory_compression().await?;
3380
3381        self.increment_turn();
3382        self.evaluate_transitions(processed_input, &final_response)
3383            .await?;
3384
3385        let response = AgentResponse::new(final_response);
3386        self.hooks.on_response(&response).await;
3387        Ok(response)
3388    }
3389
3390    /// Run the Plan-and-Execute flow: generate plan, execute steps, finalize.
3391    /// Supports plan-level reflection with replan loop when configured.
3392    async fn handle_plan_and_execute(
3393        &self,
3394        processed_input: &str,
3395        input_context: &HashMap<String, Value>,
3396        auto_detected: bool,
3397    ) -> Result<AgentResponse> {
3398        let effective = self.get_effective_reasoning_config();
3399        let plan_reflection = effective
3400            .get_planning()
3401            .map(|c| c.reflection.clone())
3402            .unwrap_or_default();
3403
3404        let max_attempts = if plan_reflection.enabled {
3405            1 + plan_reflection.max_replans
3406        } else {
3407            1
3408        };
3409
3410        let mut plan = self.generate_plan(processed_input).await?;
3411        info!(
3412            plan_id = %plan.id,
3413            steps = plan.steps.len(),
3414            "Plan generated"
3415        );
3416
3417        let mut plan_result = String::new();
3418
3419        for attempt in 0..max_attempts {
3420            *self.current_plan.write() = Some(plan.clone());
3421            plan_result = self.execute_plan(&mut plan).await?;
3422
3423            info!(
3424                plan_status = ?plan.status,
3425                completed_steps = plan.completed_steps().count(),
3426                attempt = attempt + 1,
3427                "Plan execution completed"
3428            );
3429
3430            if !plan_reflection.enabled {
3431                break;
3432            }
3433
3434            let has_failures = plan.steps.iter().any(|s| s.status.is_failed());
3435            if !has_failures {
3436                break;
3437            }
3438
3439            if attempt + 1 >= max_attempts {
3440                break;
3441            }
3442
3443            match plan_reflection.on_step_failure {
3444                StepFailureAction::Replan => {
3445                    info!(attempt = attempt + 1, "Plan had failures, replanning");
3446                    plan = self.generate_plan(processed_input).await?;
3447                }
3448                StepFailureAction::Abort => {
3449                    warn!("Plan step failed, aborting");
3450                    break;
3451                }
3452                StepFailureAction::Skip | StepFailureAction::Continue => {
3453                    break;
3454                }
3455            }
3456        }
3457
3458        *self.current_plan.write() = Some(plan);
3459
3460        let output_data = self.process_output(&plan_result, input_context).await?;
3461        let final_content = output_data.content;
3462
3463        self.memory
3464            .add_message(ChatMessage::assistant(&final_content))
3465            .await?;
3466
3467        self.check_memory_compression().await?;
3468        self.increment_turn();
3469        self.evaluate_transitions(processed_input, &final_content)
3470            .await?;
3471
3472        let reasoning_metadata =
3473            ReasoningMetadata::new(ReasoningMode::PlanAndExecute).with_auto_detected(auto_detected);
3474
3475        let response = AgentResponse::new(&final_content).with_metadata(
3476            "reasoning",
3477            serde_json::to_value(&reasoning_metadata).unwrap_or_default(),
3478        );
3479
3480        self.hooks.on_response(&response).await;
3481        Ok(response)
3482    }
3483
3484    /// Inject CoT/ReAct reasoning prompt into the system message (first iteration only).
3485    fn inject_reasoning_prompt(
3486        &self,
3487        messages: &mut [ChatMessage],
3488        reasoning_mode: &ReasoningMode,
3489        is_first_iteration: bool,
3490    ) {
3491        if !is_first_iteration {
3492            return;
3493        }
3494        match reasoning_mode {
3495            ReasoningMode::CoT => {
3496                if let Some(msg) = messages.first_mut() {
3497                    if matches!(msg.role, ai_agents_core::Role::System) {
3498                        msg.content = self.build_cot_system_prompt(&msg.content);
3499                        debug!("Applied Chain-of-Thought system prompt");
3500                    }
3501                }
3502            }
3503            ReasoningMode::React => {
3504                if let Some(msg) = messages.first_mut() {
3505                    if matches!(msg.role, ai_agents_core::Role::System) {
3506                        msg.content = self.build_react_system_prompt(&msg.content);
3507                        debug!("Applied ReAct system prompt");
3508                    }
3509                }
3510            }
3511            _ => {}
3512        }
3513    }
3514
3515    /// Handle tool calls: check transitions, execute tools in parallel, handle HITL rejection.
3516    async fn handle_tool_calls(
3517        &self,
3518        processed_input: &str,
3519        content: &str,
3520        tool_calls: Vec<ToolCall>,
3521        all_tool_calls: &mut Vec<ToolCall>,
3522    ) -> Result<ToolCallOutcome> {
3523        // Check if a transition should fire before executing the LLM's tool call.
3524        // If a transition fires, on_enter actions handle the tool call correctly
3525        // (with proper URLs from YAML), so skip the LLM's tool call.
3526        let transition_fired = self.evaluate_transitions(processed_input, content).await?;
3527        if transition_fired {
3528            self.memory
3529                .add_message(ChatMessage::assistant(
3530                    "(Transitioned to new state — tool call handled by workflow)",
3531                ))
3532                .await?;
3533            return Ok(ToolCallOutcome::TransitionFired);
3534        }
3535
3536        // Store the assistant's tool-call message so the LLM sees its own decision in conversation history. Without this, the model only sees the tool result and may repeat the same call.
3537        self.memory
3538            .add_message(ChatMessage::assistant(content))
3539            .await?;
3540
3541        let results = self.execute_tools_parallel(&tool_calls).await;
3542
3543        for ((_id, result), tool_call) in results.into_iter().zip(tool_calls.iter()) {
3544            match result {
3545                Ok(output) => {
3546                    self.memory
3547                        .add_message(ChatMessage::function(&tool_call.name, &output))
3548                        .await?;
3549                }
3550                Err(e) => {
3551                    // Check if this is a HITL rejection - if so, break the loop
3552                    if matches!(e, AgentError::HITLRejected(_)) {
3553                        self.memory
3554                            .add_message(ChatMessage::assistant(&format!(
3555                                "The operation was rejected by the approver: {}",
3556                                e
3557                            )))
3558                            .await?;
3559                        // Return the rejection message to user, don't continue loop
3560                        return Ok(ToolCallOutcome::Rejected(AgentResponse {
3561                            content: format!("Operation cancelled: {}", e),
3562                            metadata: None,
3563                            tool_calls: Some(all_tool_calls.clone()),
3564                        }));
3565                    }
3566                    self.memory
3567                        .add_message(ChatMessage::function(
3568                            &tool_call.name,
3569                            &format!("Error: {}", e),
3570                        ))
3571                        .await?;
3572                }
3573            }
3574            all_tool_calls.push(tool_call.clone());
3575        }
3576        Ok(ToolCallOutcome::Continue)
3577    }
3578
3579    /// Run the reflection loop on a response, returning (improved_content, reflection_metadata).
3580    async fn run_reflection(
3581        &self,
3582        llm: &dyn LLMProvider,
3583        processed_input: &str,
3584        mut content: String,
3585    ) -> Result<(String, Option<ReflectionMetadata>)> {
3586        let should_reflect = self.should_reflect(processed_input, &content).await?;
3587        if !should_reflect {
3588            return Ok((content, None));
3589        }
3590
3591        info!("Starting response reflection evaluation");
3592        let mut attempts = 0u32;
3593        let max_retries = self.reflection_config.max_retries;
3594        let mut history: Vec<ReflectionAttempt> = Vec::new();
3595
3596        loop {
3597            let evaluation = self.evaluate_response(processed_input, &content).await?;
3598
3599            if evaluation.passed || attempts >= max_retries {
3600                info!(
3601                    passed = evaluation.passed,
3602                    confidence = evaluation.confidence,
3603                    attempts = attempts + 1,
3604                    "Reflection evaluation complete"
3605                );
3606                let reflection_metadata = Some(
3607                    ReflectionMetadata::new(evaluation)
3608                        .with_attempts(attempts + 1)
3609                        .with_history(history),
3610                );
3611                return Ok((content, reflection_metadata));
3612            }
3613
3614            debug!(
3615                attempt = attempts + 1,
3616                failed_criteria = evaluation.failed_criteria().count(),
3617                "Response did not meet criteria, retrying"
3618            );
3619
3620            history.push(
3621                ReflectionAttempt::new(&content, evaluation.clone())
3622                    .with_feedback("Response did not meet quality criteria"),
3623            );
3624
3625            let feedback: Vec<String> = evaluation
3626                .failed_criteria()
3627                .map(|c| format!("- {}", c.criterion))
3628                .collect();
3629
3630            let retry_prompt = format!(
3631                "Your previous response did not meet these criteria:\n{}\n\nPlease provide an improved response.",
3632                feedback.join("\n")
3633            );
3634
3635            self.memory
3636                .add_message(ChatMessage::user(&retry_prompt))
3637                .await?;
3638
3639            let retry_messages = self.build_messages().await?;
3640            let retry_response = llm
3641                .complete(&retry_messages, None)
3642                .await
3643                .map_err(|e| AgentError::LLM(e.to_string()))?;
3644
3645            content = retry_response.content.trim().to_string();
3646            attempts += 1;
3647        }
3648    }
3649
3650    /// Record the assistant turn, evaluate transitions, and decide what to do next.
3651    /// Returns PostLoopResult so callers can apply_post_loop_result for re-dispatch.
3652    async fn post_loop_processing(
3653        &self,
3654        processed_input: &str,
3655        content: String,
3656    ) -> Result<PostLoopResult> {
3657        // Do NOT add the assistant message to memory yet.
3658        // evaluate_transitions receives content as a direct parameter, so the message does not need to be in memory for transitions to evaluate correctly.
3659        // For NeedsRedispatch we skip adding the stale old-state response entirely, keeping memory clean for the re-dispatched handler.
3660
3661        self.increment_turn();
3662
3663        // Run context extractors so guards can check freshly-extracted values.
3664        self.run_context_extractors(processed_input).await;
3665
3666        let transitioned = self.evaluate_transitions(processed_input, &content).await?;
3667
3668        if !transitioned {
3669            self.memory
3670                .add_message(ChatMessage::assistant(&content))
3671                .await?;
3672            self.check_memory_compression().await?;
3673            return Ok(PostLoopResult::NoTransition(content));
3674        }
3675
3676        // Check if we should skip re-generation after this transition.
3677        if !self.should_regenerate_after_transition() {
3678            self.memory
3679                .add_message(ChatMessage::assistant(&content))
3680                .await?;
3681            self.check_memory_compression().await?;
3682            return Ok(PostLoopResult::Transitioned(content));
3683        }
3684
3685        // Check if the new state needs full dispatch.
3686        // Orchestration states (concurrent, group_chat, pipeline, handoff, delegate) need their dedicated handlers.
3687        // Any non-None effective reasoning mode needs CoT/ReAct prompt injection, the plan-and-execute handler, or Auto re-determination - all of which live in run_loop_internal, not here.
3688        if self.needs_redispatch_for_new_state() {
3689            info!("Post-transition NeedsRedispatch: new state requires full dispatch");
3690            // Stale old-state content is NOT added to memory.
3691            // apply_post_loop_result will increment redispatch_depth and call run_loop_internal, which produces the correct response and adds it.
3692            return Ok(PostLoopResult::NeedsRedispatch);
3693        }
3694
3695        // Normal post-transition re-generation (plain LLM with optional tool calls).
3696        // Add the stale response to history so the new LLM sees the conversation.
3697        self.memory
3698            .add_message(ChatMessage::assistant(&content))
3699            .await?;
3700        self.check_memory_compression().await?;
3701
3702        // If a transition fired, on_enter actions already executed (e.g., HTTP calls).
3703        // The current content was generated in the OLD state context and is stale.
3704        // Re-generate in the new state context so the LLM can reference on_enter results.
3705        // If the LLM responds with a tool call, execute it in a mini-loop so the
3706        // result is not returned as raw JSON text.
3707        let new_llm = self.get_state_llm()?;
3708        let mut final_content;
3709
3710        for post_iter in 0..self.max_iterations {
3711            let new_messages = self.build_messages().await?;
3712            if post_iter == 0 {
3713                if let Some(system_msg) = new_messages.first() {
3714                    if system_msg.role == ai_agents_core::Role::System {
3715                        debug!(
3716                            prompt_preview =
3717                                &system_msg.content[system_msg.content.len().saturating_sub(200)..],
3718                            "Post-transition system prompt (last 200 chars)"
3719                        );
3720                    }
3721                }
3722            }
3723
3724            let new_response = new_llm
3725                .complete(&new_messages, None)
3726                .await
3727                .map_err(|e| AgentError::LLM(e.to_string()))?;
3728            final_content = new_response.content.trim().to_string();
3729
3730            // Check if the post-transition response contains tool calls.
3731            // If so, execute them and loop so the LLM can summarize the result.
3732            if let Some(tool_calls) = self.parse_tool_calls(&final_content) {
3733                debug!(
3734                    post_iter = post_iter,
3735                    tools = tool_calls.len(),
3736                    "Post-transition tool call detected, executing"
3737                );
3738
3739                self.memory
3740                    .add_message(ChatMessage::assistant(&final_content))
3741                    .await?;
3742
3743                let results = self.execute_tools_parallel(&tool_calls).await;
3744                for ((_id, result), tool_call) in results.into_iter().zip(tool_calls.iter()) {
3745                    match result {
3746                        Ok(output) => {
3747                            self.memory
3748                                .add_message(ChatMessage::function(&tool_call.name, &output))
3749                                .await?;
3750                        }
3751                        Err(e) => {
3752                            self.memory
3753                                .add_message(ChatMessage::function(
3754                                    &tool_call.name,
3755                                    &format!("Error: {}", e),
3756                                ))
3757                                .await?;
3758                        }
3759                    }
3760                }
3761                // Loop to let the LLM see the tool result and produce a text response.
3762                continue;
3763            }
3764
3765            // No tool call - this is the final text response.
3766            self.memory
3767                .add_message(ChatMessage::assistant(&final_content))
3768                .await?;
3769            return Ok(PostLoopResult::Transitioned(final_content));
3770        }
3771
3772        // Exhausted post-transition iterations (unlikely). Return last content.
3773        final_content = "Post-transition processing completed.".to_string();
3774        self.memory
3775            .add_message(ChatMessage::assistant(&final_content))
3776            .await?;
3777
3778        Ok(PostLoopResult::Transitioned(final_content))
3779    }
3780
3781    /// Build the final AgentResponse with all metadata.
3782    /// Check whether to re-generate a response after a state transition.
3783    fn should_regenerate_after_transition(&self) -> bool {
3784        if let Some(ref sm) = self.state_machine {
3785            // Global setting
3786            if !sm.config().regenerate_on_transition {
3787                return false;
3788            }
3789            // Per-state override on the new (current) state
3790            if let Some(def) = sm.current_definition() {
3791                if let Some(regen) = def.regenerate_on_enter {
3792                    return regen;
3793                }
3794            }
3795        }
3796        true
3797    }
3798
3799    /// Return true when the new state requires full dispatch via run_loop_internal.
3800    /// Covers orchestration states and any non-None effective reasoning mode.
3801    fn needs_redispatch_for_new_state(&self) -> bool {
3802        if let Some(ref sm) = self.state_machine {
3803            if let Some(def) = sm.current_definition() {
3804                if def.concurrent.is_some()
3805                    || def.group_chat.is_some()
3806                    || def.pipeline.is_some()
3807                    || def.handoff.is_some()
3808                    || def.delegate.is_some()
3809                {
3810                    return true;
3811                }
3812                // Any non-None effective reasoning mode requires the main dispatch loop.
3813                let effective = self.get_effective_reasoning_config();
3814                if !matches!(effective.mode, ReasoningMode::None) {
3815                    return true;
3816                }
3817            }
3818        }
3819        false
3820    }
3821
3822    /// Consume a PostLoopResult. NeedsRedispatch re-enters run_loop_internal.
3823    /// The user message is already in memory - redispatch_depth suppresses re-adding it.
3824    async fn apply_post_loop_result(
3825        &self,
3826        processed_input: &str,
3827        result: PostLoopResult,
3828    ) -> Result<String> {
3829        match result {
3830            PostLoopResult::NoTransition(content) | PostLoopResult::Transitioned(content) => {
3831                Ok(content)
3832            }
3833            PostLoopResult::NeedsRedispatch => {
3834                const MAX_REDISPATCH_DEPTH: u32 = 3;
3835                let current_depth = *self.redispatch_depth.read();
3836                if current_depth >= MAX_REDISPATCH_DEPTH {
3837                    warn!(
3838                        depth = current_depth,
3839                        "Post-transition re-dispatch depth limit reached, returning empty response"
3840                    );
3841                    let content = String::new();
3842                    self.memory
3843                        .add_message(ChatMessage::assistant(&content))
3844                        .await?;
3845                    return Ok(content);
3846                }
3847                *self.redispatch_depth.write() += 1;
3848                info!(
3849                    depth = current_depth + 1,
3850                    "Re-dispatching for new state after transition"
3851                );
3852                let resp = Box::pin(self.run_loop_internal(processed_input)).await;
3853                *self.redispatch_depth.write() -= 1;
3854                resp.map(|r| r.content)
3855            }
3856        }
3857    }
3858
3859    fn build_agent_response(
3860        &self,
3861        content: String,
3862        all_tool_calls: Vec<ToolCall>,
3863        reasoning_mode: ReasoningMode,
3864        auto_detected: bool,
3865        iterations: u32,
3866        thinking: Option<String>,
3867        reflection_metadata: Option<ReflectionMetadata>,
3868    ) -> AgentResponse {
3869        let reasoning_metadata = ReasoningMetadata::new(reasoning_mode.clone())
3870            .with_thinking(thinking.clone().unwrap_or_default())
3871            .with_iterations(iterations)
3872            .with_auto_detected(auto_detected);
3873
3874        let mut response = AgentResponse::new(&content);
3875        if !all_tool_calls.is_empty() {
3876            response = response.with_tool_calls(all_tool_calls);
3877        }
3878
3879        if let Some(state) = self.current_state() {
3880            response = response.with_metadata("current_state", serde_json::json!(state));
3881        }
3882
3883        response = response.with_metadata(
3884            "reasoning",
3885            serde_json::to_value(&reasoning_metadata).unwrap_or_default(),
3886        );
3887
3888        if let Some(ref refl_meta) = reflection_metadata {
3889            response = response.with_metadata(
3890                "reflection",
3891                serde_json::to_value(refl_meta).unwrap_or_default(),
3892            );
3893        }
3894
3895        response
3896    }
3897
3898    // Handle delegation: forward user input to a registry agent.
3899    async fn handle_delegated_state(
3900        &self,
3901        input: &str,
3902        delegate_id: &str,
3903        state_def: &ai_agents_state::StateDefinition,
3904    ) -> Result<AgentResponse> {
3905        use std::time::Instant;
3906
3907        let registry = self.spawner_registry.as_ref().ok_or_else(|| {
3908            AgentError::Config(format!(
3909                "State delegates to '{}' but no agent registry is configured. \
3910                 Add a spawner section with auto_spawn to your YAML.",
3911                delegate_id
3912            ))
3913        })?;
3914
3915        let state_name = self
3916            .state_machine
3917            .as_ref()
3918            .map(|sm| sm.current())
3919            .unwrap_or_else(|| "unknown".to_string());
3920
3921        self.hooks.on_delegate_start(delegate_id, &state_name).await;
3922        let start = Instant::now();
3923
3924        let delegate = registry.get(delegate_id).ok_or_else(|| {
3925            AgentError::Other(format!(
3926                "State '{}' delegates to '{}' but no agent with that ID exists in the registry.",
3927                state_name, delegate_id
3928            ))
3929        })?;
3930
3931        // Prepare input based on delegate_context mode.
3932        let context_mode = state_def.delegate_context.clone().unwrap_or_default();
3933        let effective_input = crate::orchestration::context::prepare_delegate_input(
3934            input,
3935            &context_mode,
3936            &*self.memory,
3937            self.llm_registry.get("router").ok().as_deref(),
3938        )
3939        .await?;
3940
3941        let response = delegate.chat(&effective_input).await?;
3942
3943        let duration_ms = start.elapsed().as_millis() as u64;
3944        self.hooks
3945            .on_delegate_complete(delegate_id, &state_name, duration_ms)
3946            .await;
3947
3948        // Backward-compatible context key.
3949        let ctx_key = format!("delegation.{}.last_response", delegate_id);
3950        let _ = self.context_manager.set(
3951            &ctx_key,
3952            serde_json::Value::String(response.content.clone()),
3953        );
3954
3955        // Structured orchestration context.
3956        let _ = self.context_manager.set(
3957            "orchestration",
3958            serde_json::json!({
3959                "type": "delegate",
3960                "agent": delegate_id,
3961                "state": state_name,
3962                "response": response.content,
3963                "duration_ms": duration_ms,
3964            }),
3965        );
3966
3967        // Add user message to parent memory for tracking (skip on redispatch - already present).
3968        if *self.redispatch_depth.read() == 0 {
3969            self.memory
3970                .add_message(ai_agents_llm::ChatMessage::user(input))
3971                .await?;
3972        }
3973
3974        // post_loop_processing records the assistant turn and evaluates transitions.
3975        // apply_post_loop_result handles NeedsRedispatch by re-entering run_loop_internal.
3976        let post_result = self
3977            .post_loop_processing(
3978                input,
3979                format!("[Delegated to {}]: {}", delegate_id, response.content),
3980            )
3981            .await?;
3982        let final_content = self.apply_post_loop_result(input, post_result).await?;
3983
3984        let mut result = AgentResponse::new(final_content);
3985
3986        let metadata = serde_json::json!({
3987            "orchestration": {
3988                "type": "delegate",
3989                "agent": delegate_id,
3990                "state": state_name,
3991                "response": response.content,
3992                "duration_ms": duration_ms,
3993            }
3994        });
3995        result.metadata = Some(
3996            serde_json::from_value::<std::collections::HashMap<String, serde_json::Value>>(
3997                metadata,
3998            )
3999            .unwrap_or_default(),
4000        );
4001
4002        self.hooks.on_response(&result).await;
4003        Ok(result)
4004    }
4005
4006    // Handle concurrent execution: run multiple registry agents in parallel and aggregate.
4007    async fn handle_concurrent_state(
4008        &self,
4009        input: &str,
4010        config: &ai_agents_state::ConcurrentStateConfig,
4011    ) -> Result<AgentResponse> {
4012        use std::time::Instant;
4013
4014        let registry = self.spawner_registry.as_ref().ok_or_else(|| {
4015            AgentError::Config(
4016                "Concurrent state requires an agent registry. Add a spawner section.".into(),
4017            )
4018        })?;
4019
4020        // Render input template if provided, otherwise use the raw input.
4021        // Uses direct minijinja rendering (same approach as pipeline) so variables
4022        // are top-level: {{ user_input }}, not {{ context.user_input }}.
4023        // Enrich input with parent conversation history when context_mode is set.
4024        let context_mode = config.context_mode.clone().unwrap_or_default();
4025        let context_input = crate::orchestration::context::prepare_delegate_input(
4026            input,
4027            &context_mode,
4028            &*self.memory,
4029            self.llm_registry.get("router").ok().as_deref(),
4030        )
4031        .await?;
4032
4033        let effective_input = if let Some(ref tmpl) = config.input {
4034            render_concurrent_template(tmpl, &context_input, &self.context_manager.get_all())
4035                .unwrap_or_else(|_| context_input.clone())
4036        } else {
4037            context_input
4038        };
4039
4040        let start = Instant::now();
4041
4042        let llm_name = config
4043            .aggregation
4044            .synthesizer_llm
4045            .as_deref()
4046            .unwrap_or("router");
4047        let llm_provider = self.llm_registry.get(llm_name).ok();
4048
4049        let result = crate::orchestration::concurrent(
4050            registry,
4051            &effective_input,
4052            &config.agents,
4053            &config.aggregation,
4054            llm_provider.as_deref(),
4055            config.min_required,
4056            config.timeout_ms,
4057            config.on_partial_failure.clone(),
4058        )
4059        .await?;
4060
4061        let duration_ms = start.elapsed().as_millis() as u64;
4062        let agent_ids: Vec<String> = config.agents.iter().map(|a| a.id().to_string()).collect();
4063        let strategy = format!("{:?}", config.aggregation.strategy);
4064        self.hooks
4065            .on_concurrent_complete(&agent_ids, &strategy, duration_ms)
4066            .await;
4067
4068        // Backward-compatible context key.
4069        let _ = self.context_manager.set(
4070            "concurrent.result",
4071            serde_json::Value::String(result.response.content.clone()),
4072        );
4073
4074        // Build per-agent result data for context and metadata.
4075        let agents_json: Vec<serde_json::Value> = result
4076            .agent_results
4077            .iter()
4078            .map(|ar| {
4079                serde_json::json!({
4080                    "id": ar.agent_id,
4081                    "response": ar.response.as_ref().map(|r| r.content.as_str()),
4082                    "success": ar.success,
4083                    "error": ar.error,
4084                    "duration_ms": ar.duration_ms,
4085                })
4086            })
4087            .collect();
4088
4089        // Structured orchestration context with per-agent results.
4090        let _ = self.context_manager.set(
4091            "orchestration",
4092            serde_json::json!({
4093                "type": "concurrent",
4094                "result": result.response.content,
4095                "strategy": strategy,
4096                "agents": agents_json,
4097                "duration_ms": duration_ms,
4098            }),
4099        );
4100
4101        // Add user message to parent memory for tracking (skip on redispatch - already present).
4102        if *self.redispatch_depth.read() == 0 {
4103            self.memory
4104                .add_message(ai_agents_llm::ChatMessage::user(input))
4105                .await?;
4106        }
4107
4108        let post_result = self
4109            .post_loop_processing(input, result.response.content.clone())
4110            .await?;
4111        let final_content = self.apply_post_loop_result(input, post_result).await?;
4112
4113        let mut response = AgentResponse::new(final_content);
4114        let metadata = serde_json::json!({
4115            "orchestration": {
4116                "type": "concurrent",
4117                "result": result.response.content,
4118                "strategy": strategy,
4119                "agents": agents_json,
4120                "duration_ms": duration_ms,
4121            }
4122        });
4123        response.metadata = Some(
4124            serde_json::from_value::<std::collections::HashMap<String, serde_json::Value>>(
4125                metadata,
4126            )
4127            .unwrap_or_default(),
4128        );
4129
4130        self.hooks.on_response(&response).await;
4131        Ok(response)
4132    }
4133
4134    // Handle group chat: run a multi-turn multi-agent conversation.
4135    async fn handle_group_chat_state(
4136        &self,
4137        input: &str,
4138        config: &ai_agents_state::GroupChatStateConfig,
4139    ) -> Result<AgentResponse> {
4140        use std::time::Instant;
4141
4142        let registry = self.spawner_registry.as_ref().ok_or_else(|| {
4143            AgentError::Config(
4144                "Group chat state requires an agent registry. Add a spawner section.".into(),
4145            )
4146        })?;
4147
4148        let start = Instant::now();
4149
4150        let llm_provider = self.llm_registry.get("router").ok();
4151
4152        // Enrich input with parent conversation history when context_mode is set.
4153        let context_mode = config.context_mode.clone().unwrap_or_default();
4154        let context_input = crate::orchestration::context::prepare_delegate_input(
4155            input,
4156            &context_mode,
4157            &*self.memory,
4158            self.llm_registry.get("router").ok().as_deref(),
4159        )
4160        .await?;
4161
4162        // Render input template if provided, otherwise use the raw user message as topic.
4163        let effective_topic = if let Some(ref tmpl) = config.input {
4164            render_concurrent_template(tmpl, &context_input, &self.context_manager.get_all())
4165                .unwrap_or_else(|_| context_input.clone())
4166        } else {
4167            context_input
4168        };
4169
4170        let result = crate::orchestration::group_chat(
4171            registry,
4172            &effective_topic,
4173            config,
4174            llm_provider.as_deref(),
4175            Some(&*self.hooks),
4176        )
4177        .await?;
4178
4179        let duration_ms = start.elapsed().as_millis() as u64;
4180
4181        // Backward-compatible context key.
4182        let _ = self.context_manager.set(
4183            "group_chat.conclusion",
4184            serde_json::Value::String(result.response.content.clone()),
4185        );
4186
4187        // Build transcript data for context and metadata.
4188        let transcript_json: Vec<serde_json::Value> = result
4189            .transcript
4190            .iter()
4191            .map(|t| {
4192                serde_json::json!({
4193                    "speaker": t.speaker,
4194                    "round": t.round,
4195                    "content": t.content,
4196                })
4197            })
4198            .collect();
4199
4200        // Structured orchestration context with full transcript.
4201        let _ = self.context_manager.set(
4202            "orchestration",
4203            serde_json::json!({
4204                "type": "group_chat",
4205                "conclusion": result.response.content,
4206                "transcript": transcript_json,
4207                "rounds": result.rounds_completed,
4208                "termination": result.termination_reason,
4209                "duration_ms": duration_ms,
4210            }),
4211        );
4212
4213        // Add user message to parent memory for tracking (skip on redispatch - already present).
4214        if *self.redispatch_depth.read() == 0 {
4215            self.memory
4216                .add_message(ai_agents_llm::ChatMessage::user(input))
4217                .await?;
4218        }
4219
4220        let post_result = self
4221            .post_loop_processing(input, result.response.content.clone())
4222            .await?;
4223        let final_content = self.apply_post_loop_result(input, post_result).await?;
4224
4225        let mut response = AgentResponse::new(final_content);
4226        let metadata = serde_json::json!({
4227            "orchestration": {
4228                "type": "group_chat",
4229                "conclusion": result.response.content,
4230                "transcript": transcript_json,
4231                "rounds": result.rounds_completed,
4232                "termination": result.termination_reason,
4233                "duration_ms": duration_ms,
4234            }
4235        });
4236        response.metadata = Some(
4237            serde_json::from_value::<std::collections::HashMap<String, serde_json::Value>>(
4238                metadata,
4239            )
4240            .unwrap_or_default(),
4241        );
4242
4243        self.hooks.on_response(&response).await;
4244        Ok(response)
4245    }
4246
4247    // Handle pipeline: run agents sequentially with per-stage input templates.
4248    async fn handle_pipeline_state(
4249        &self,
4250        input: &str,
4251        config: &ai_agents_state::PipelineStateConfig,
4252    ) -> Result<AgentResponse> {
4253        use std::time::Instant;
4254
4255        let registry = self.spawner_registry.as_ref().ok_or_else(|| {
4256            AgentError::Config(
4257                "Pipeline state requires an agent registry. Add a spawner section.".into(),
4258            )
4259        })?;
4260
4261        let start = Instant::now();
4262
4263        let stages: Vec<crate::orchestration::PipelineStage> = config
4264            .stages
4265            .iter()
4266            .map(|entry| {
4267                let mut stage = crate::orchestration::PipelineStage::id(entry.id());
4268                if let Some(tmpl) = entry.input() {
4269                    stage = stage.with_input(tmpl);
4270                }
4271                stage
4272            })
4273            .collect();
4274
4275        // Enrich input with parent conversation history when context_mode is set.
4276        let context_mode = config.context_mode.clone().unwrap_or_default();
4277        let context_input = crate::orchestration::context::prepare_delegate_input(
4278            input,
4279            &context_mode,
4280            &*self.memory,
4281            self.llm_registry.get("router").ok().as_deref(),
4282        )
4283        .await?;
4284
4285        let context_values = self.context_manager.get_all();
4286        let result = crate::orchestration::pipeline(
4287            registry,
4288            &context_input,
4289            &stages,
4290            config.timeout_ms,
4291            Some(&*self.hooks),
4292            Some(&context_values),
4293        )
4294        .await?;
4295
4296        let duration_ms = start.elapsed().as_millis() as u64;
4297
4298        // Backward-compatible context key.
4299        let _ = self.context_manager.set(
4300            "pipeline.result",
4301            serde_json::Value::String(result.response.content.clone()),
4302        );
4303
4304        // Build per-stage data for context and metadata.
4305        let stages_json: Vec<serde_json::Value> = result
4306            .stage_outputs
4307            .iter()
4308            .map(|s| {
4309                serde_json::json!({
4310                    "agent_id": s.agent_id,
4311                    "output": s.output,
4312                    "duration_ms": s.duration_ms,
4313                    "skipped": s.skipped,
4314                })
4315            })
4316            .collect();
4317
4318        // Structured orchestration context.
4319        let _ = self.context_manager.set(
4320            "orchestration",
4321            serde_json::json!({
4322                "type": "pipeline",
4323                "result": result.response.content,
4324                "stages": stages_json,
4325                "duration_ms": duration_ms,
4326            }),
4327        );
4328
4329        // Add user message to parent memory for tracking (skip on redispatch - already present).
4330        if *self.redispatch_depth.read() == 0 {
4331            self.memory
4332                .add_message(ai_agents_llm::ChatMessage::user(input))
4333                .await?;
4334        }
4335
4336        let post_result = self
4337            .post_loop_processing(input, result.response.content.clone())
4338            .await?;
4339        let final_content = self.apply_post_loop_result(input, post_result).await?;
4340
4341        let mut response = AgentResponse::new(final_content);
4342        let metadata = serde_json::json!({
4343            "orchestration": {
4344                "type": "pipeline",
4345                "result": result.response.content,
4346                "stages": stages_json,
4347                "duration_ms": duration_ms,
4348            }
4349        });
4350        response.metadata = Some(
4351            serde_json::from_value::<std::collections::HashMap<String, serde_json::Value>>(
4352                metadata,
4353            )
4354            .unwrap_or_default(),
4355        );
4356
4357        self.hooks.on_response(&response).await;
4358        Ok(response)
4359    }
4360
4361    // Handle handoff: LLM-directed agent-to-agent control transfer.
4362    async fn handle_handoff_state(
4363        &self,
4364        input: &str,
4365        config: &ai_agents_state::HandoffStateConfig,
4366    ) -> Result<AgentResponse> {
4367        use std::time::Instant;
4368
4369        let registry = self.spawner_registry.as_ref().ok_or_else(|| {
4370            AgentError::Config(
4371                "Handoff state requires an agent registry. Add a spawner section.".into(),
4372            )
4373        })?;
4374
4375        let llm = self
4376            .llm_registry
4377            .get("router")
4378            .map_err(|_| AgentError::Config("Handoff state requires a router LLM.".into()))?;
4379
4380        let start = Instant::now();
4381
4382        // Enrich input with parent conversation history when context_mode is set.
4383        let context_mode = config.context_mode.clone().unwrap_or_default();
4384        let context_input = crate::orchestration::context::prepare_delegate_input(
4385            input,
4386            &context_mode,
4387            &*self.memory,
4388            self.llm_registry.get("router").ok().as_deref(),
4389        )
4390        .await?;
4391
4392        // Render input template if provided, otherwise forward the raw user message.
4393        let effective_input = if let Some(ref tmpl) = config.input {
4394            render_concurrent_template(tmpl, &context_input, &self.context_manager.get_all())
4395                .unwrap_or_else(|_| context_input.clone())
4396        } else {
4397            context_input
4398        };
4399
4400        let result = crate::orchestration::handoff(
4401            registry,
4402            &effective_input,
4403            &config.initial_agent,
4404            &config.available_agents,
4405            config.max_handoffs,
4406            llm.as_ref(),
4407            Some(&*self.hooks),
4408        )
4409        .await?;
4410
4411        let duration_ms = start.elapsed().as_millis() as u64;
4412
4413        // Backward-compatible context key.
4414        let _ = self.context_manager.set(
4415            "handoff.result",
4416            serde_json::Value::String(result.response.content.clone()),
4417        );
4418
4419        // Build handoff chain data for context and metadata.
4420        let chain_json: Vec<serde_json::Value> = result
4421            .handoff_chain
4422            .iter()
4423            .map(|h| {
4424                serde_json::json!({
4425                    "from": h.from_agent,
4426                    "to": h.to_agent,
4427                    "reason": h.reason,
4428                })
4429            })
4430            .collect();
4431
4432        // Structured orchestration context.
4433        let _ = self.context_manager.set(
4434            "orchestration",
4435            serde_json::json!({
4436                "type": "handoff",
4437                "result": result.response.content,
4438                "final_agent": result.final_agent,
4439                "handoff_chain": chain_json,
4440                "duration_ms": duration_ms,
4441            }),
4442        );
4443
4444        // Add user message to parent memory for tracking (skip on redispatch - already present).
4445        if *self.redispatch_depth.read() == 0 {
4446            self.memory
4447                .add_message(ai_agents_llm::ChatMessage::user(input))
4448                .await?;
4449        }
4450
4451        let post_result = self
4452            .post_loop_processing(input, result.response.content.clone())
4453            .await?;
4454        let final_content = self.apply_post_loop_result(input, post_result).await?;
4455
4456        let mut response = AgentResponse::new(final_content);
4457        let metadata = serde_json::json!({
4458            "orchestration": {
4459                "type": "handoff",
4460                "result": result.response.content,
4461                "final_agent": result.final_agent,
4462                "handoff_chain": chain_json,
4463                "duration_ms": duration_ms,
4464            }
4465        });
4466        response.metadata = Some(
4467            serde_json::from_value::<std::collections::HashMap<String, serde_json::Value>>(
4468                metadata,
4469            )
4470            .unwrap_or_default(),
4471        );
4472
4473        self.hooks.on_response(&response).await;
4474        Ok(response)
4475    }
4476
4477    // run_loop_internal: blocking (non-streaming) agent pipeline.
4478    async fn run_loop_internal(&self, input: &str) -> Result<AgentResponse> {
4479        let input_data = self.process_input(input).await?;
4480
4481        // Inject process context (detect/extract results) into agent context
4482        // so system prompt templates can use {{ context.detected_language }} etc.
4483        for (key, value) in &input_data.context {
4484            let _ = self.context_manager.set(key, value.clone());
4485        }
4486
4487        if input_data.metadata.rejected {
4488            let reason = input_data
4489                .metadata
4490                .rejection_reason
4491                .unwrap_or_else(|| "Input rejected".to_string());
4492            warn!(reason = %reason, "Input rejected");
4493            return Ok(AgentResponse::new(reason));
4494        }
4495
4496        let processed_input = &input_data.content;
4497
4498        // Handle orchestration states (delegate, concurrent, group_chat, pipeline, handoff).
4499        if let Some(ref sm) = self.state_machine {
4500            if let Some(def) = sm.current_definition() {
4501                if let Some(ref delegate_id) = def.delegate {
4502                    return self
4503                        .handle_delegated_state(processed_input, delegate_id, &def)
4504                        .await;
4505                }
4506                if let Some(ref concurrent_config) = def.concurrent {
4507                    return self
4508                        .handle_concurrent_state(processed_input, concurrent_config)
4509                        .await;
4510                }
4511                if let Some(ref group_chat_config) = def.group_chat {
4512                    return self
4513                        .handle_group_chat_state(processed_input, group_chat_config)
4514                        .await;
4515                }
4516                if let Some(ref pipeline_config) = def.pipeline {
4517                    return self
4518                        .handle_pipeline_state(processed_input, pipeline_config)
4519                        .await;
4520                }
4521                if let Some(ref handoff_config) = def.handoff {
4522                    return self
4523                        .handle_handoff_state(processed_input, handoff_config)
4524                        .await;
4525                }
4526            }
4527        }
4528
4529        match self.try_skill_route(processed_input).await? {
4530            SkillRouteResult::Response(skill_response) => {
4531                if *self.redispatch_depth.read() == 0 {
4532                    self.memory
4533                        .add_message(ChatMessage::user(processed_input))
4534                        .await?;
4535                }
4536                return self
4537                    .handle_skill_response(processed_input, skill_response, &input_data.context)
4538                    .await;
4539            }
4540            SkillRouteResult::NeedsClarification(response) => {
4541                if *self.redispatch_depth.read() == 0 {
4542                    self.memory
4543                        .add_message(ChatMessage::user(processed_input))
4544                        .await?;
4545                }
4546                if let Some(q) = response
4547                    .metadata
4548                    .as_ref()
4549                    .and_then(|m| m.get("disambiguation"))
4550                    .and_then(|d| d.get("status"))
4551                    .and_then(|s| s.as_str())
4552                {
4553                    if q == "awaiting_clarification" {
4554                        // Store the clarification question in memory so the next turn
4555                        // can be handled as a clarification response.
4556                        self.memory
4557                            .add_message(ChatMessage::assistant(&response.content))
4558                            .await?;
4559                    }
4560                }
4561                return Ok(response);
4562            }
4563            SkillRouteResult::NoMatch => {} // continue to normal LLM chat
4564        }
4565
4566        let effective_reasoning = self.get_effective_reasoning_config();
4567        let reasoning_mode = self.determine_reasoning_mode(processed_input).await?;
4568        let auto_detected = matches!(effective_reasoning.mode, ReasoningMode::Auto);
4569
4570        info!(
4571            reasoning_mode = ?reasoning_mode,
4572            auto_detected = auto_detected,
4573            reflection_enabled = ?self.reflection_config.enabled,
4574            "Reasoning mode determined"
4575        );
4576
4577        if matches!(reasoning_mode, ReasoningMode::PlanAndExecute) {
4578            if *self.redispatch_depth.read() == 0 {
4579                self.memory
4580                    .add_message(ChatMessage::user(processed_input))
4581                    .await?;
4582            }
4583            return self
4584                .handle_plan_and_execute(processed_input, &input_data.context, auto_detected)
4585                .await;
4586        }
4587
4588        if *self.redispatch_depth.read() == 0 {
4589            self.memory
4590                .add_message(ChatMessage::user(processed_input))
4591                .await?;
4592        }
4593
4594        let mut iterations = 0u32;
4595        let mut all_tool_calls: Vec<ToolCall> = Vec::new();
4596        let mut thinking_content: Option<String> = None;
4597
4598        let llm = self.get_state_llm()?;
4599
4600        loop {
4601            // When reasoning is active, cap iterations at the reasoning-specific limit.
4602            let effective_max = if reasoning_mode != ReasoningMode::None {
4603                let rc = self.get_effective_reasoning_config();
4604                self.max_iterations.min(rc.max_iterations)
4605            } else {
4606                self.max_iterations
4607            };
4608
4609            if iterations >= effective_max {
4610                let err = AgentError::Other(format!("Max iterations ({}) exceeded", effective_max));
4611                self.hooks.on_error(&err).await;
4612                error!(iterations = iterations, "Max iterations exceeded");
4613                return Err(err);
4614            }
4615            iterations += 1;
4616            *self.iteration_count.write() = iterations;
4617
4618            debug!(iteration = iterations, max = effective_max, "LLM call");
4619
4620            let mut messages = self.build_messages().await?;
4621            self.inject_reasoning_prompt(&mut messages, &reasoning_mode, iterations == 1);
4622
4623            self.hooks.on_llm_start(&messages).await;
4624            let llm_start = Instant::now();
4625
4626            // Try primary LLM (with retry if configured), then apply on_failure policy.
4627            let response = {
4628                let primary_result = if self.recovery_manager.config().default.max_retries > 0 {
4629                    self.recovery_manager
4630                        .with_retry("llm_call", None, || async {
4631                            llm.complete(&messages, None)
4632                                .await
4633                                .map_err(|e| e.classify())
4634                        })
4635                        .await
4636                        .map_err(|e| AgentError::LLM(e.to_string()))
4637                } else {
4638                    llm.complete(&messages, None)
4639                        .await
4640                        .map_err(|e| AgentError::LLM(e.to_string()))
4641                };
4642
4643                match primary_result {
4644                    Ok(resp) => resp,
4645                    Err(primary_err) => match &self.recovery_manager.config().llm.on_failure {
4646                        LLMFailureAction::FallbackLlm { fallback_llm } => {
4647                            warn!(
4648                                fallback = %fallback_llm,
4649                                error = %primary_err,
4650                                "Primary LLM failed, attempting fallback LLM"
4651                            );
4652                            let fb = self.llm_registry.get(fallback_llm).map_err(|e| {
4653                                AgentError::Config(format!(
4654                                    "Fallback LLM '{}' not found: {}",
4655                                    fallback_llm, e
4656                                ))
4657                            })?;
4658                            fb.complete(&messages, None)
4659                                .await
4660                                .map_err(|e| AgentError::LLM(e.to_string()))?
4661                        }
4662                        LLMFailureAction::FallbackResponse { message } => {
4663                            warn!(
4664                                error = %primary_err,
4665                                "Primary LLM failed, using static fallback response"
4666                            );
4667                            LLMResponse::new(message.clone(), FinishReason::Stop)
4668                        }
4669                        LLMFailureAction::Error => {
4670                            return Err(primary_err);
4671                        }
4672                    },
4673                }
4674            };
4675
4676            let llm_duration_ms = llm_start.elapsed().as_millis() as u64;
4677            self.hooks.on_llm_complete(&response, llm_duration_ms).await;
4678
4679            let content = response.content.trim();
4680
4681            if let Some(tool_calls) = self.parse_tool_calls(content) {
4682                match self
4683                    .handle_tool_calls(processed_input, content, tool_calls, &mut all_tool_calls)
4684                    .await?
4685                {
4686                    ToolCallOutcome::Continue | ToolCallOutcome::TransitionFired => continue,
4687                    ToolCallOutcome::Rejected(resp) => return Ok(resp),
4688                }
4689            }
4690
4691            let (extracted_thinking, answer) = self.extract_thinking(content);
4692            if extracted_thinking.is_some() {
4693                thinking_content = extracted_thinking;
4694            }
4695
4696            let output_data = self.process_output(&answer, &input_data.context).await?;
4697
4698            let mut final_content = if output_data.metadata.rejected {
4699                output_data
4700                    .metadata
4701                    .rejection_reason
4702                    .unwrap_or_else(|| answer.to_string())
4703            } else {
4704                output_data.content
4705            };
4706
4707            // Run reflection (blocking LLM calls for retries)
4708            let reflection_metadata;
4709            (final_content, reflection_metadata) = self
4710                .run_reflection(&*llm, processed_input, final_content)
4711                .await?;
4712
4713            final_content =
4714                self.format_response_with_thinking(thinking_content.as_deref(), &final_content);
4715
4716            // Post-loop: memory, transitions, post-transition re-generation.
4717            // apply_post_loop_result handles NeedsRedispatch by re-entering
4718            // run_loop_internal so the new state's full dispatch activates.
4719            let final_content = {
4720                let result = self
4721                    .post_loop_processing(processed_input, final_content)
4722                    .await?;
4723                self.apply_post_loop_result(processed_input, result).await?
4724            };
4725
4726            let reflected = reflection_metadata.is_some();
4727            let reasoning_mode_debug = format!("{:?}", reasoning_mode);
4728
4729            let response = self.build_agent_response(
4730                final_content,
4731                all_tool_calls,
4732                reasoning_mode,
4733                auto_detected,
4734                iterations,
4735                thinking_content,
4736                reflection_metadata,
4737            );
4738
4739            self.hooks.on_response(&response).await;
4740
4741            let tool_call_count = response.tool_calls.as_ref().map(|tc| tc.len()).unwrap_or(0);
4742            info!(
4743                tool_calls = tool_call_count,
4744                response_len = response.content.len(),
4745                reasoning_mode = %reasoning_mode_debug,
4746                reflected = reflected,
4747                "Chat completed"
4748            );
4749            return Ok(response);
4750        }
4751    }
4752
4753    /// Streaming agent pipeline
4754    /// Uses all the same shared helpers as run_loop_internal.
4755    /// The ONLY difference: LLM calls use complete_stream() + yield deltas.
4756    fn run_loop_internal_stream<'a>(
4757        &'a self,
4758        input: &'a str,
4759    ) -> Pin<Box<dyn Stream<Item = StreamChunk> + Send + 'a>> {
4760        let include_tool_events = self.streaming.include_tool_events;
4761        let include_state_events = self.streaming.include_state_events;
4762
4763        Box::pin(async_stream::stream! {
4764            let input_data = match self.process_input(input).await {
4765                Ok(data) => data,
4766                Err(e) => {
4767                    yield StreamChunk::error(e.to_string());
4768                    return;
4769                }
4770            };
4771
4772            // Inject process context (detect/extract results) into agent context
4773            for (key, value) in &input_data.context {
4774                let _ = self.context_manager.set(key, value.clone());
4775            }
4776
4777            if input_data.metadata.rejected {
4778                let reason = input_data
4779                    .metadata
4780                    .rejection_reason
4781                    .unwrap_or_else(|| "Input rejected".to_string());
4782                warn!(reason = %reason, "Input rejected (stream)");
4783                yield StreamChunk::error(reason);
4784                return;
4785            }
4786
4787            let processed_input = &input_data.content;
4788
4789            // Handle orchestration states in streaming mode.
4790            if let Some(ref sm) = self.state_machine {
4791                if let Some(def) = sm.current_definition() {
4792                    let orchestration_result = if let Some(ref delegate_id) = def.delegate {
4793                        Some(self.handle_delegated_state(processed_input, delegate_id, &def).await)
4794                    } else if let Some(ref concurrent_config) = def.concurrent {
4795                        Some(self.handle_concurrent_state(processed_input, concurrent_config).await)
4796                    } else if let Some(ref group_chat_config) = def.group_chat {
4797                        Some(self.handle_group_chat_state(processed_input, group_chat_config).await)
4798                    } else if let Some(ref pipeline_config) = def.pipeline {
4799                        Some(self.handle_pipeline_state(processed_input, pipeline_config).await)
4800                    } else if let Some(ref handoff_config) = def.handoff {
4801                        Some(self.handle_handoff_state(processed_input, handoff_config).await)
4802                    } else {
4803                        None
4804                    };
4805
4806                    if let Some(result) = orchestration_result {
4807                        match result {
4808                            Ok(response) => {
4809                                yield StreamChunk::content(&response.content);
4810                                yield StreamChunk::Done {};
4811                            }
4812                            Err(e) => {
4813                                yield StreamChunk::error(e.to_string());
4814                            }
4815                        }
4816                        return;
4817                    }
4818                }
4819            }
4820
4821            // Skill routing
4822            match self.try_skill_route(processed_input).await {
4823                Ok(SkillRouteResult::Response(skill_response)) => {
4824                    if *self.redispatch_depth.read() == 0 {
4825                        let _ = self.memory.add_message(ChatMessage::user(processed_input)).await;
4826                    }
4827                    match self.handle_skill_response(processed_input, skill_response, &input_data.context).await {
4828                        Ok(resp) => {
4829                            yield StreamChunk::content(&resp.content);
4830                            yield StreamChunk::Done {};
4831                            return;
4832                        }
4833                        Err(e) => {
4834                            yield StreamChunk::error(e.to_string());
4835                            return;
4836                        }
4837                    }
4838                }
4839                Ok(SkillRouteResult::NeedsClarification(response)) => {
4840                    if *self.redispatch_depth.read() == 0 {
4841                        let _ = self.memory.add_message(ChatMessage::user(processed_input)).await;
4842                    }
4843                    let _ = self.memory.add_message(ChatMessage::assistant(&response.content)).await;
4844                    yield StreamChunk::content(&response.content);
4845                    yield StreamChunk::Done {};
4846                    return;
4847                }
4848                Ok(SkillRouteResult::NoMatch) => {} // no skill matched, continue
4849                Err(e) => {
4850                    yield StreamChunk::error(e.to_string());
4851                    return;
4852                }
4853            }
4854
4855            // Reasoning mode determination
4856            let effective_reasoning = self.get_effective_reasoning_config();
4857            let reasoning_mode = match self.determine_reasoning_mode(processed_input).await {
4858                Ok(mode) => mode,
4859                Err(e) => {
4860                    yield StreamChunk::error(e.to_string());
4861                    return;
4862                }
4863            };
4864            let auto_detected = matches!(effective_reasoning.mode, ReasoningMode::Auto);
4865
4866            info!(
4867                reasoning_mode = ?reasoning_mode,
4868                auto_detected = auto_detected,
4869                "Reasoning mode determined (stream)"
4870            );
4871
4872            // Plan-and-Execute: yield final result as single chunk (not token-by-token)
4873            if matches!(reasoning_mode, ReasoningMode::PlanAndExecute) {
4874                if *self.redispatch_depth.read() == 0 {
4875                    let _ = self.memory.add_message(ChatMessage::user(processed_input)).await;
4876                }
4877                match self.handle_plan_and_execute(processed_input, &input_data.context, auto_detected).await {
4878                    Ok(resp) => {
4879                        yield StreamChunk::content(&resp.content);
4880                        yield StreamChunk::Done {};
4881                        return;
4882                    }
4883                    Err(e) => {
4884                        yield StreamChunk::error(e.to_string());
4885                        return;
4886                    }
4887                }
4888            }
4889
4890            if *self.redispatch_depth.read() == 0 {
4891                let _ = self.memory.add_message(ChatMessage::user(processed_input)).await;
4892            }
4893
4894            let llm = match self.get_state_llm() {
4895                Ok(llm) => llm,
4896                Err(e) => {
4897                    yield StreamChunk::error(e.to_string());
4898                    return;
4899                }
4900            };
4901
4902            let mut iterations = 0u32;
4903            let mut all_tool_calls: Vec<ToolCall> = Vec::new();
4904            let mut thinking_content: Option<String> = None;
4905
4906            loop {
4907                // When reasoning is active, cap iterations at the reasoning-specific limit.
4908                let effective_max = if reasoning_mode != ReasoningMode::None {
4909                    let rc = self.get_effective_reasoning_config();
4910                    self.max_iterations.min(rc.max_iterations)
4911                } else {
4912                    self.max_iterations
4913                };
4914
4915                if iterations >= effective_max {
4916                    let err_msg = format!("Max iterations ({}) exceeded", effective_max);
4917                    let err = AgentError::Other(err_msg.clone());
4918                    self.hooks.on_error(&err).await;
4919                    error!(iterations = iterations, "Max iterations exceeded (stream)");
4920                    yield StreamChunk::error(err_msg);
4921                    return;
4922                }
4923                iterations += 1;
4924                *self.iteration_count.write() = iterations;
4925
4926                debug!(iteration = iterations, max = effective_max, "LLM call (stream)");
4927
4928                let mut messages = match self.build_messages().await {
4929                    Ok(m) => m,
4930                    Err(e) => {
4931                        yield StreamChunk::error(e.to_string());
4932                        return;
4933                    }
4934                };
4935                self.inject_reasoning_prompt(&mut messages, &reasoning_mode, iterations == 1);
4936
4937                self.hooks.on_llm_start(&messages).await;
4938                let llm_start = Instant::now();
4939
4940                // Check if reflection is active — if so, suppress streaming for this LLM call
4941                // because we may need to retry and the user would see a stale first attempt.
4942                let reflection_active = match self.should_reflect(processed_input, "").await {
4943                    Ok(v) => v,
4944                    Err(_) => false,
4945                };
4946
4947                let content = if reflection_active {
4948                    // Use blocking call so reflection can retry without partial output
4949                    let response = match llm.complete(&messages, None).await {
4950                        Ok(r) => r,
4951                        Err(e) => {
4952                            yield StreamChunk::error(e.to_string());
4953                            return;
4954                        }
4955                    };
4956                    let llm_duration_ms = llm_start.elapsed().as_millis() as u64;
4957                    self.hooks.on_llm_complete(&response, llm_duration_ms).await;
4958                    response.content.trim().to_string()
4959                } else {
4960                    // Streaming LLM call
4961                    let llm_stream = match llm.complete_stream(&messages, None).await {
4962                        Ok(s) => s,
4963                        Err(e) => {
4964                            yield StreamChunk::error(e.to_string());
4965                            return;
4966                        }
4967                    };
4968                    let mut accumulated = String::new();
4969                    let mut stream_inner = llm_stream;
4970                    while let Some(chunk_result) = stream_inner.next().await {
4971                        match chunk_result {
4972                            Ok(chunk) => {
4973                                accumulated.push_str(&chunk.delta);
4974                                yield StreamChunk::content(chunk.delta);
4975                            }
4976                            Err(e) => {
4977                                yield StreamChunk::error(e.to_string());
4978                                return;
4979                            }
4980                        }
4981                    }
4982                    let llm_duration_ms = llm_start.elapsed().as_millis() as u64;
4983                    // Construct LLMResponse for hooks
4984                    let llm_response = ai_agents_core::LLMResponse::new(
4985                        accumulated.trim(),
4986                        ai_agents_core::FinishReason::Stop,
4987                    );
4988                    self.hooks.on_llm_complete(&llm_response, llm_duration_ms).await;
4989                    accumulated.trim().to_string()
4990                };
4991
4992                // Tool call handling
4993                if let Some(tool_calls) = self.parse_tool_calls(&content) {
4994                    // Emit tool events for streaming
4995                    // First check transitions (same as blocking path)
4996                    let transition_fired = match self.evaluate_transitions(processed_input, &content).await {
4997                        Ok(v) => v,
4998                        Err(e) => {
4999                            yield StreamChunk::error(e.to_string());
5000                            return;
5001                        }
5002                    };
5003                    if transition_fired {
5004                        let _ = self.memory.add_message(ChatMessage::assistant(
5005                            "(Transitioned to new state — tool call handled by workflow)",
5006                        )).await;
5007
5008                        if include_state_events {
5009                            if let Some(state) = self.current_state() {
5010                                yield StreamChunk::state_transition(None, state);
5011                            }
5012                        }
5013                        continue;
5014                    }
5015
5016                    // Store the assistant's tool-call message (same as blocking path)
5017                    let _ = self.memory.add_message(ChatMessage::assistant(&content)).await;
5018
5019                    // Execute tools with streaming events
5020                    let results = self.execute_tools_parallel(&tool_calls).await;
5021
5022                    for ((_id, result), tool_call) in results.into_iter().zip(tool_calls.iter()) {
5023                        if include_tool_events {
5024                            yield StreamChunk::tool_start(&tool_call.id, &tool_call.name);
5025                        }
5026
5027                        match result {
5028                            Ok(output) => {
5029                                if include_tool_events {
5030                                    yield StreamChunk::tool_result(
5031                                        &tool_call.id,
5032                                        &tool_call.name,
5033                                        &output,
5034                                        true,
5035                                    );
5036                                }
5037                                let _ = self.memory
5038                                    .add_message(ChatMessage::function(&tool_call.name, &output))
5039                                    .await;
5040                            }
5041                            Err(e) => {
5042                                if matches!(e, AgentError::HITLRejected(_)) {
5043                                    let _ = self.memory.add_message(ChatMessage::assistant(
5044                                        &format!("The operation was rejected by the approver: {}", e),
5045                                    )).await;
5046                                    yield StreamChunk::error(format!("Operation cancelled: {}", e));
5047                                    yield StreamChunk::Done {};
5048                                    return;
5049                                }
5050                                if include_tool_events {
5051                                    yield StreamChunk::tool_result(
5052                                        &tool_call.id,
5053                                        &tool_call.name,
5054                                        &e.to_string(),
5055                                        false,
5056                                    );
5057                                }
5058                                let _ = self.memory
5059                                    .add_message(ChatMessage::function(
5060                                        &tool_call.name,
5061                                        &format!("Error: {}", e),
5062                                    ))
5063                                    .await;
5064                            }
5065                        }
5066                        all_tool_calls.push(tool_call.clone());
5067
5068                        if include_tool_events {
5069                            yield StreamChunk::tool_end(&tool_call.id);
5070                        }
5071                    }
5072                    continue;
5073                }
5074
5075                // Extract thinking, process output
5076                let (extracted_thinking, answer) = self.extract_thinking(&content);
5077                if extracted_thinking.is_some() {
5078                    thinking_content = extracted_thinking;
5079                }
5080
5081                let output_data = match self.process_output(&answer, &input_data.context).await {
5082                    Ok(d) => d,
5083                    Err(e) => {
5084                        yield StreamChunk::error(e.to_string());
5085                        return;
5086                    }
5087                };
5088
5089                let final_content = if output_data.metadata.rejected {
5090                    output_data
5091                        .metadata
5092                        .rejection_reason
5093                        .unwrap_or_else(|| answer.to_string())
5094                } else {
5095                    output_data.content
5096                };
5097
5098                // Reflection (uses blocking LLM calls for retries)
5099                let (final_content, _reflection_metadata) = match self
5100                    .run_reflection(&*llm, processed_input, final_content)
5101                    .await
5102                {
5103                    Ok(r) => r,
5104                    Err(e) => {
5105                        yield StreamChunk::error(e.to_string());
5106                        return;
5107                    }
5108                };
5109
5110                let final_content = self.format_response_with_thinking(
5111                    thinking_content.as_deref(),
5112                    &final_content,
5113                );
5114
5115                // If reflection was active (we used blocking call), yield the final content now
5116                if reflection_active {
5117                    yield StreamChunk::content(&final_content);
5118                }
5119
5120                // Post-loop: memory, transitions, post-transition re-generation.
5121                // For NeedsRedispatch, run_loop_internal handles the new state's full
5122                // dispatch and its result is yielded as a single non-streamed chunk.
5123                let post_result = match self
5124                    .post_loop_processing(processed_input, final_content)
5125                    .await
5126                {
5127                    Ok(r) => r,
5128                    Err(e) => {
5129                        yield StreamChunk::error(e.to_string());
5130                        return;
5131                    }
5132                };
5133
5134                let (final_content, transitioned) = match post_result {
5135                    PostLoopResult::NoTransition(content) => (content, false),
5136                    PostLoopResult::Transitioned(content) => (content, true),
5137                    PostLoopResult::NeedsRedispatch => {
5138                        const MAX_REDISPATCH_DEPTH: u32 = 3;
5139                        let current_depth = *self.redispatch_depth.read();
5140                        let content = if current_depth >= MAX_REDISPATCH_DEPTH {
5141                            warn!(
5142                                depth = current_depth,
5143                                "Post-transition re-dispatch depth limit reached (stream)"
5144                            );
5145                            let c = String::new();
5146                            let _ = self.memory.add_message(ChatMessage::assistant(&c)).await;
5147                            c
5148                        } else {
5149                            *self.redispatch_depth.write() += 1;
5150                            info!(
5151                                depth = current_depth + 1,
5152                                "Re-dispatching for new state after transition (stream)"
5153                            );
5154                            let result = self.run_loop_internal(processed_input).await;
5155                            *self.redispatch_depth.write() -= 1;
5156                            match result {
5157                                Ok(resp) => resp.content,
5158                                Err(e) => {
5159                                    yield StreamChunk::error(e.to_string());
5160                                    return;
5161                                }
5162                            }
5163                        };
5164                        (content, true)
5165                    }
5166                };
5167
5168                if transitioned {
5169                    if include_state_events {
5170                        if let Some(state) = self.current_state() {
5171                            yield StreamChunk::state_transition(None, state);
5172                        }
5173                    }
5174                    // Yield the post-transition re-generated or re-dispatched content.
5175                    yield StreamChunk::content(&final_content);
5176                }
5177
5178                yield StreamChunk::Done {};
5179                return;
5180            }
5181        })
5182    }
5183
5184    /// Streaming entry point with disambiguation
5185    /// Mirrors run_loop but yields StreamChunks instead of AgentResponse.
5186    fn run_loop_stream<'a>(
5187        &'a self,
5188        input: &'a str,
5189    ) -> Pin<Box<dyn Stream<Item = StreamChunk> + Send + 'a>> {
5190        Box::pin(async_stream::stream! {
5191            self.hooks.on_message_received(input).await;
5192
5193            // One-shot context initialization (mirrors run_loop)
5194            if !self.context_initialized.swap(true, Ordering::SeqCst) {
5195                if let Err(e) = self.context_manager.initialize().await {
5196                    yield StreamChunk::error(e.to_string());
5197                    return;
5198                }
5199                debug!("Context manager initialized (defaults, env, builtins)");
5200            }
5201
5202            if let Err(e) = self.check_turn_timeout().await {
5203                yield StreamChunk::error(e.to_string());
5204                return;
5205            }
5206            if let Err(e) = self.context_manager.refresh_per_turn().await {
5207                yield StreamChunk::error(e.to_string());
5208                return;
5209            }
5210
5211            // Clear stale disambiguation context from previous turns.
5212            self.clear_disambiguation_context();
5213
5214            // Disambiguation check (before input processing)
5215            if let Some(ref disambiguator) = self.disambiguation_manager {
5216                let disambiguation_context = match self.build_disambiguation_context().await {
5217                    Ok(ctx) => ctx,
5218                    Err(e) => {
5219                        yield StreamChunk::error(e.to_string());
5220                        return;
5221                    }
5222                };
5223
5224                let state_override = self
5225                    .state_machine
5226                    .as_ref()
5227                    .and_then(|sm| sm.current_definition())
5228                    .and_then(|def| def.disambiguation.clone());
5229
5230                let result = match disambiguator
5231                    .process_input_with_override(
5232                        input,
5233                        &disambiguation_context,
5234                        state_override.as_ref(),
5235                        None,
5236                    )
5237                    .await
5238                {
5239                    Ok(r) => r,
5240                    Err(e) => {
5241                        yield StreamChunk::error(e.to_string());
5242                        return;
5243                    }
5244                };
5245
5246                match result {
5247                    DisambiguationResult::Clear => {
5248                        debug!("Input is clear, proceeding normally (stream)");
5249                    }
5250                    DisambiguationResult::NeedsClarification {
5251                        question,
5252                        detection,
5253                    } => {
5254                        info!(
5255                            ambiguity_type = ?detection.ambiguity_type,
5256                            confidence = detection.confidence,
5257                            "Input requires clarification (stream)"
5258                        );
5259                        let _ = self.memory.add_message(ChatMessage::user(input)).await;
5260                        let _ = self
5261                            .memory
5262                            .add_message(ChatMessage::assistant(&question.question))
5263                            .await;
5264                        yield StreamChunk::content(&question.question);
5265                        yield StreamChunk::Done {};
5266                        return;
5267                    }
5268                    DisambiguationResult::Clarified {
5269                        enriched_input,
5270                        resolved,
5271                        ..
5272                    } => {
5273                        info!(
5274                            resolved_count = resolved.len(),
5275                            enriched = %enriched_input,
5276                            "Input clarified (stream)"
5277                        );
5278                        for (key, value) in &resolved {
5279                            let context_key = format!("disambiguation.{}", key);
5280                            let _ = self.context_manager.set(&context_key, value.clone());
5281                        }
5282                        if let Some(intent) = resolved.get("intent") {
5283                            let _ = self.context_manager.set("resolved_intent", intent.clone());
5284                        }
5285                        let _ = self
5286                            .context_manager
5287                            .set("disambiguation.resolved", serde_json::Value::Bool(true));
5288
5289                        // Check if this clarification was triggered by a skill-level override.
5290                        // Re-run skill disambiguation to verify all required_clarity fields
5291                        // are present before executing.
5292                        let skill_id = self.pending_skill_id.read().clone();
5293                        if let Some(skill_id) = skill_id {
5294                            info!(skill_id = %skill_id, "Re-checking skill disambiguation on clarified input (stream)");
5295                            match self.recheck_skill_disambiguation(&skill_id, &enriched_input).await {
5296                                Ok(resp) => {
5297                                    yield StreamChunk::content(&resp.content);
5298                                    yield StreamChunk::Done {};
5299                                    return;
5300                                }
5301                                Err(e) => {
5302                                    yield StreamChunk::error(e.to_string());
5303                                    return;
5304                                }
5305                            }
5306                        }
5307
5308                        // Forward to internal stream with enriched input
5309                        let mut inner = self.run_loop_internal_stream(&enriched_input);
5310                        while let Some(chunk) = inner.next().await {
5311                            yield chunk;
5312                        }
5313                        return;
5314                    }
5315                    DisambiguationResult::ProceedWithBestGuess { enriched_input } => {
5316                        info!("Proceeding with best guess (stream)");
5317
5318                        // Same skill-id re-check for best-guess path
5319                        let skill_id = self.pending_skill_id.read().clone();
5320                        if let Some(skill_id) = skill_id {
5321                            info!(skill_id = %skill_id, "Re-checking skill disambiguation on best-guess input (stream)");
5322                            match self.recheck_skill_disambiguation(&skill_id, &enriched_input).await {
5323                                Ok(resp) => {
5324                                    yield StreamChunk::content(&resp.content);
5325                                    yield StreamChunk::Done {};
5326                                    return;
5327                                }
5328                                Err(e) => {
5329                                    yield StreamChunk::error(e.to_string());
5330                                    return;
5331                                }
5332                            }
5333                        }
5334
5335                        let mut inner = self.run_loop_internal_stream(&enriched_input);
5336                        while let Some(chunk) = inner.next().await {
5337                            yield chunk;
5338                        }
5339                        return;
5340                    }
5341                    DisambiguationResult::GiveUp { reason } => {
5342                        *self.pending_skill_id.write() = None;
5343                        warn!(reason = %reason, "Disambiguation gave up (stream)");
5344                        let apology = self
5345                            .generate_localized_apology(
5346                                "Generate a brief, polite apology saying you couldn't understand the request. Be concise.",
5347                                &reason,
5348                            )
5349                            .await
5350                            .unwrap_or_else(|_| {
5351                                format!("I'm sorry, I couldn't understand your request: {}", reason)
5352                            });
5353                        yield StreamChunk::content(&apology);
5354                        yield StreamChunk::Done {};
5355                        return;
5356                    }
5357                    DisambiguationResult::Escalate { reason } => {
5358                        *self.pending_skill_id.write() = None;
5359                        info!(reason = %reason, "Escalating to human (stream)");
5360                        if let Some(ref hitl) = self.hitl_engine {
5361                            let trigger =
5362                                ApprovalTrigger::condition("disambiguation_escalation", reason.clone());
5363                            let mut context_map = HashMap::new();
5364                            context_map.insert("original_input".to_string(), serde_json::json!(input));
5365                            context_map.insert("reason".to_string(), serde_json::json!(&reason));
5366                            let check_result = HITLCheckResult::required(
5367                                trigger,
5368                                context_map,
5369                                format!("User request needs human assistance: {}", reason),
5370                                Some(hitl.config().default_timeout_seconds),
5371                            );
5372                            match self.request_hitl_approval(check_result).await {
5373                                Ok(result) if matches!(result, ApprovalResult::Approved | ApprovalResult::Modified { .. }) => {
5374                                    let mut inner = self.run_loop_internal_stream(input);
5375                                    while let Some(chunk) = inner.next().await {
5376                                        yield chunk;
5377                                    }
5378                                    return;
5379                                }
5380                                Ok(_) => {}
5381                                Err(e) => {
5382                                    yield StreamChunk::error(e.to_string());
5383                                    return;
5384                                }
5385                            }
5386                        }
5387                        let apology = self
5388                            .generate_localized_apology(
5389                                "Explain briefly that you're transferring the user to a human agent for help.",
5390                                &reason,
5391                            )
5392                            .await
5393                            .unwrap_or_else(|_| {
5394                                format!("I need human assistance to help with your request: {}", reason)
5395                            });
5396                        yield StreamChunk::content(&apology);
5397                        yield StreamChunk::Done {};
5398                        return;
5399                    }
5400                    DisambiguationResult::Abandoned { new_input } => {
5401                        *self.pending_skill_id.write() = None;
5402
5403                        info!(
5404                            has_new_input = new_input.is_some(),
5405                            "Clarification abandoned by user (stream)"
5406                        );
5407
5408                        let _ = self.memory.add_message(ChatMessage::user(input)).await;
5409
5410                        match new_input {
5411                            Some(fresh_input) => {
5412                                // Topic switch: forward to internal stream with fresh input.
5413                                let mut inner = self.run_loop_internal_stream(&fresh_input);
5414                                while let Some(chunk) = inner.next().await {
5415                                    yield chunk;
5416                                }
5417                                return;
5418                            }
5419                            None => {
5420                                // Pure abandonment: generate a brief acknowledgment.
5421                                let ack = self
5422                                    .generate_localized_apology(
5423                                        "The user changed their mind about their previous request. \
5424                                         Generate a brief, friendly acknowledgment (e.g. 'OK, no problem. What else can I help with?'). \
5425                                         Do NOT apologize excessively. Be concise.",
5426                                        "User abandoned clarification",
5427                                    )
5428                                    .await
5429                                    .unwrap_or_else(|_| {
5430                                        "OK, no problem. What else can I help with?".to_string()
5431                                    });
5432
5433                                let _ = self
5434                                    .memory
5435                                    .add_message(ChatMessage::assistant(&ack))
5436                                    .await;
5437
5438                                yield StreamChunk::content(&ack);
5439                                yield StreamChunk::Done {};
5440                                return;
5441                            }
5442                        }
5443                    }
5444                }
5445            }
5446
5447            // No disambiguation or Clear result — proceed with internal stream
5448            let mut inner = self.run_loop_internal_stream(input);
5449            while let Some(chunk) = inner.next().await {
5450                yield chunk;
5451            }
5452        })
5453    }
5454
5455    pub fn info(&self) -> AgentInfo {
5456        self.info.clone()
5457    }
5458
5459    pub fn skills(&self) -> &[SkillDefinition] {
5460        &self.skills
5461    }
5462
5463    pub async fn reset(&self) -> Result<()> {
5464        self.memory.clear().await?;
5465        *self.iteration_count.write() = 0;
5466        self.tool_call_history.write().clear();
5467        *self.pending_skill_id.write() = None;
5468        if let Some(ref sm) = self.state_machine {
5469            sm.reset();
5470        }
5471        Ok(())
5472    }
5473
5474    pub fn max_context_tokens(&self) -> u32 {
5475        self.max_context_tokens
5476    }
5477
5478    pub fn llm_registry(&self) -> &Arc<LLMRegistry> {
5479        &self.llm_registry
5480    }
5481
5482    pub fn state_machine(&self) -> Option<&Arc<StateMachine>> {
5483        self.state_machine.as_ref()
5484    }
5485
5486    pub fn context_manager(&self) -> &Arc<ContextManager> {
5487        &self.context_manager
5488    }
5489
5490    pub fn tool_call_history(&self) -> Vec<ToolCallRecord> {
5491        self.tool_call_history.read().clone()
5492    }
5493
5494    pub fn memory_token_budget(&self) -> Option<&MemoryTokenBudget> {
5495        self.memory_token_budget.as_ref()
5496    }
5497
5498    pub fn parallel_tools_config(&self) -> &ParallelToolsConfig {
5499        &self.parallel_tools
5500    }
5501
5502    pub fn streaming_config(&self) -> &StreamingConfig {
5503        &self.streaming
5504    }
5505
5506    pub fn hooks(&self) -> &Arc<dyn AgentHooks> {
5507        &self.hooks
5508    }
5509
5510    pub fn hitl_engine(&self) -> Option<&HITLEngine> {
5511        self.hitl_engine.as_ref()
5512    }
5513
5514    pub fn approval_handler(&self) -> &Arc<dyn ApprovalHandler> {
5515        &self.approval_handler
5516    }
5517
5518    /// Build a context map with language hints from context_manager for HITL message localization.
5519    fn build_hitl_language_context(&self) -> HashMap<String, Value> {
5520        let mut ctx = HashMap::new();
5521        for key in &["user.language", "input.detected.language", "language"] {
5522            if let Some(val) = self.context_manager.get(key) {
5523                ctx.insert(key.to_string(), val);
5524            }
5525        }
5526        ctx
5527    }
5528
5529    /// Send a HITL check result through the approval flow and return the full ApprovalResult.
5530    async fn request_hitl_approval(&self, check_result: HITLCheckResult) -> Result<ApprovalResult> {
5531        let Some(request) = check_result.into_request() else {
5532            return Ok(ApprovalResult::Approved);
5533        };
5534
5535        self.hooks.on_approval_requested(&request).await;
5536
5537        let request_id = request.id.clone();
5538        let timeout = request.timeout;
5539
5540        let result = if let Some(duration) = timeout {
5541            match tokio::time::timeout(duration, self.approval_handler.request_approval(request))
5542                .await
5543            {
5544                Ok(result) => result,
5545                Err(_) => ApprovalResult::timeout(),
5546            }
5547        } else {
5548            self.approval_handler.request_approval(request).await
5549        };
5550
5551        self.hooks.on_approval_result(&request_id, &result).await;
5552
5553        // Resolve timeout action
5554        let result = match result {
5555            ApprovalResult::Timeout => {
5556                if let Some(ref engine) = self.hitl_engine {
5557                    match engine.config().on_timeout {
5558                        TimeoutAction::Approve => ApprovalResult::Approved,
5559                        TimeoutAction::Reject => ApprovalResult::Rejected {
5560                            reason: Some("Timeout".to_string()),
5561                        },
5562                        TimeoutAction::Error => {
5563                            return Err(AgentError::Other("HITL approval timeout".to_string()));
5564                        }
5565                    }
5566                } else {
5567                    ApprovalResult::Rejected {
5568                        reason: Some("Timeout (no engine)".to_string()),
5569                    }
5570                }
5571            }
5572            other => other,
5573        };
5574
5575        Ok(result)
5576    }
5577
5578    pub async fn check_state_hitl(&self, from: Option<&str>, to: &str) -> Result<bool> {
5579        if let Some(ref hitl_engine) = self.hitl_engine {
5580            let hitl_lang_ctx = self.build_hitl_language_context();
5581            let check_result = hitl_engine
5582                .check_state_transition_with_localization(
5583                    from,
5584                    to,
5585                    &hitl_lang_ctx,
5586                    self.approval_handler.as_ref(),
5587                    Some(&self.llm_registry),
5588                )
5589                .await?;
5590            if check_result.is_required() {
5591                let result = self.request_hitl_approval(check_result).await?;
5592                return Ok(matches!(
5593                    result,
5594                    ApprovalResult::Approved | ApprovalResult::Modified { .. }
5595                ));
5596            }
5597        }
5598        Ok(true)
5599    }
5600
5601    /// Execute multiple tools in parallel
5602    async fn execute_tools_parallel(
5603        &self,
5604        tool_calls: &[ToolCall],
5605    ) -> Vec<(String, Result<String>)> {
5606        if !self.parallel_tools.enabled || tool_calls.len() <= 1 {
5607            let mut results = Vec::new();
5608            for tc in tool_calls {
5609                let result = self.execute_tool_smart(tc).await;
5610                results.push((tc.id.clone(), result));
5611            }
5612            return results;
5613        }
5614
5615        let chunks: Vec<_> = tool_calls
5616            .chunks(self.parallel_tools.max_parallel)
5617            .collect();
5618
5619        let mut all_results = Vec::new();
5620
5621        for chunk in chunks {
5622            let futures: Vec<_> = chunk
5623                .iter()
5624                .map(|tc| {
5625                    let tc = tc.clone();
5626                    async move {
5627                        let result = self.execute_tool_smart(&tc).await;
5628                        (tc.id.clone(), result)
5629                    }
5630                })
5631                .collect();
5632
5633            let results = futures::future::join_all(futures).await;
5634            all_results.extend(results);
5635        }
5636
5637        all_results
5638    }
5639
5640    /// Stream a chat response with real-time updates
5641    pub async fn chat_stream<'a>(
5642        &'a self,
5643        input: &'a str,
5644    ) -> Result<Pin<Box<dyn Stream<Item = StreamChunk> + Send + 'a>>> {
5645        info!(input_len = input.len(), "Starting streaming chat");
5646        Ok(self.run_loop_stream(input))
5647    }
5648}
5649
5650#[async_trait]
5651impl Agent for RuntimeAgent {
5652    async fn chat(&self, input: &str) -> Result<AgentResponse> {
5653        self.run_loop(input).await
5654    }
5655
5656    fn info(&self) -> AgentInfo {
5657        self.info.clone()
5658    }
5659
5660    async fn reset(&self) -> Result<()> {
5661        self.memory.clear().await?;
5662        *self.iteration_count.write() = 0;
5663        self.tool_call_history.write().clear();
5664        if let Some(ref sm) = self.state_machine {
5665            sm.reset();
5666        }
5667        Ok(())
5668    }
5669}
5670
5671//
5672// Render a concurrent input template using direct minijinja.
5673// Same approach as pipeline's render_stage_template so variables are top-level.
5674//
5675// Available variables:
5676//   {{ user_input }}    - the user's actual message
5677//   {{ context.<key> }} - values from the context manager
5678//
5679fn render_concurrent_template(
5680    template: &str,
5681    user_input: &str,
5682    context_values: &std::collections::HashMap<String, serde_json::Value>,
5683) -> Result<String> {
5684    let mut env = minijinja::Environment::new();
5685    env.add_template("concurrent", template)
5686        .map_err(|e| AgentError::Other(format!("Concurrent template parse error: {}", e)))?;
5687
5688    let mut ctx = std::collections::BTreeMap::new();
5689    ctx.insert("user_input".to_string(), minijinja::Value::from(user_input));
5690
5691    // Expose context manager values under {{ context.<key> }}.
5692    let context_obj = minijinja::Value::from_serialize(context_values);
5693    ctx.insert("context".to_string(), context_obj);
5694
5695    let tmpl = env
5696        .get_template("concurrent")
5697        .map_err(|e| AgentError::Other(format!("Concurrent template error: {}", e)))?;
5698
5699    tmpl.render(minijinja::Value::from_serialize(&ctx))
5700        .map_err(|e| AgentError::Other(format!("Concurrent template render error: {}", e)))
5701}
5702
5703#[cfg(test)]
5704mod tests {
5705    use super::*;
5706    use crate::AgentBuilder;
5707    use ai_agents_llm::mock::MockLLMProvider;
5708
5709    fn mock_with_response(response: &str) -> MockLLMProvider {
5710        let mut mock = MockLLMProvider::new("test");
5711        mock.set_response(response);
5712        mock
5713    }
5714
5715    fn mock_with_responses(responses: Vec<&str>) -> MockLLMProvider {
5716        let mut mock = MockLLMProvider::new("test");
5717        mock.set_responses(responses.into_iter().map(String::from).collect(), true);
5718        mock
5719    }
5720
5721    // Basic YAML → Build → Chat flow
5722    #[tokio::test]
5723    async fn test_integration_yaml_to_chat_basic() {
5724        let mock = mock_with_response("Hello! How can I help you?");
5725        let agent = AgentBuilder::new()
5726            .system_prompt("You are a test assistant.")
5727            .llm(Arc::new(mock))
5728            .build()
5729            .unwrap();
5730
5731        let response = agent.chat("Hi").await.unwrap();
5732        assert!(!response.content.is_empty());
5733        assert_eq!(response.content, "Hello! How can I help you?");
5734    }
5735
5736    // Multi-turn conversation
5737    #[tokio::test]
5738    async fn test_integration_multi_turn_conversation() {
5739        let mock = mock_with_responses(vec![
5740            "Hello! I'm your assistant.",
5741            "The weather is sunny today.",
5742            "Goodbye!",
5743        ]);
5744        let agent = AgentBuilder::new()
5745            .system_prompt("You are helpful.")
5746            .llm(Arc::new(mock))
5747            .build()
5748            .unwrap();
5749
5750        let r1 = agent.chat("Hi").await.unwrap();
5751        assert_eq!(r1.content, "Hello! I'm your assistant.");
5752
5753        let r2 = agent.chat("What's the weather?").await.unwrap();
5754        assert_eq!(r2.content, "The weather is sunny today.");
5755
5756        let r3 = agent.chat("Bye").await.unwrap();
5757        assert_eq!(r3.content, "Goodbye!");
5758
5759        // Verify memory accumulated messages
5760        let messages = agent.memory.get_messages(None).await.unwrap();
5761        // 3 user + 3 assistant = 6 messages
5762        assert_eq!(messages.len(), 6);
5763    }
5764
5765    // Tool execution in chat flow
5766    #[tokio::test]
5767    async fn test_integration_tool_execution() {
5768        // Mock LLM that returns a tool call then a final answer
5769        let mock = mock_with_responses(vec![
5770            // First response: tool call
5771            r#"I'll calculate that for you.
5772[TOOL_CALL: {"name": "calculator", "arguments": {"expression": "2+2"}}]"#,
5773            // After tool result: final answer
5774            "The answer is 4.",
5775        ]);
5776        let mut tools = ai_agents_tools::ToolRegistry::new();
5777        tools
5778            .register(Arc::new(ai_agents_tools::CalculatorTool))
5779            .unwrap();
5780
5781        let agent = AgentBuilder::new()
5782            .system_prompt("You are a calculator assistant.")
5783            .llm(Arc::new(mock))
5784            .tools(tools)
5785            .build()
5786            .unwrap();
5787
5788        let response = agent.chat("What is 2+2?").await.unwrap();
5789        // The agent should eventually produce a response
5790        assert!(!response.content.is_empty());
5791    }
5792
5793    // State machine transitions
5794    #[tokio::test]
5795    async fn test_integration_state_machine_basic() {
5796        let yaml = r#"
5797name: StateAgent
5798system_prompt: "You are a support agent."
5799states:
5800  initial: greeting
5801  states:
5802    greeting:
5803      prompt: "Welcome the user warmly."
5804      transitions:
5805        - to: support
5806          when: "User needs help"
5807          auto: true
5808    support:
5809      prompt: "Help solve the user's problem."
5810"#;
5811        let mock = mock_with_responses(vec![
5812            "Welcome! How can I help?", // greeting response
5813            "1",                        // transition evaluator picks first (index 0)
5814            "I'll help you with that.", // support response
5815        ]);
5816        let builder = AgentBuilder::from_yaml(yaml).unwrap();
5817        let agent = builder.llm(Arc::new(mock)).build().unwrap();
5818
5819        assert_eq!(agent.current_state(), Some("greeting".to_string()));
5820        let _ = agent.chat("I need help").await.unwrap();
5821        // After transition evaluation, state may or may not have changed
5822        // depending on mock evaluator response - the key is that it doesn't crash
5823    }
5824
5825    // State on_enter/on_exit actions
5826    #[tokio::test]
5827    async fn test_integration_state_on_enter_set_context() {
5828        let yaml = r#"
5829name: ActionAgent
5830system_prompt: "You are helpful."
5831states:
5832  initial: step1
5833  states:
5834    step1:
5835      prompt: "Step 1"
5836      on_exit:
5837        - set_context:
5838            step1_exited: true
5839      transitions:
5840        - to: step2
5841          when: "always"
5842          auto: true
5843    step2:
5844      prompt: "Step 2"
5845      on_enter:
5846        - set_context:
5847            step2_entered: true
5848"#;
5849        // The transition evaluator will pick the first transition (index 0)
5850        let mock = mock_with_responses(vec![
5851            "Processing step 1.",
5852            "0", // transition evaluator response: select first transition
5853        ]);
5854        let builder = AgentBuilder::from_yaml(yaml).unwrap();
5855        let agent = builder.llm(Arc::new(mock)).build().unwrap();
5856
5857        assert_eq!(agent.current_state(), Some("step1".to_string()));
5858
5859        // Manually transition to test on_enter/on_exit
5860        agent.transition_to("step2").await.unwrap();
5861
5862        assert_eq!(agent.current_state(), Some("step2".to_string()));
5863
5864        // Verify context was set by on_exit and on_enter actions
5865        let ctx = agent.get_context();
5866        assert_eq!(ctx.get("step1_exited"), Some(&serde_json::json!(true)));
5867        assert_eq!(ctx.get("step2_entered"), Some(&serde_json::json!(true)));
5868    }
5869
5870    // Process pipeline transforms input
5871    #[tokio::test]
5872    async fn test_integration_process_normalize() {
5873        let yaml = r#"
5874name: ProcessAgent
5875system_prompt: "You are helpful."
5876process:
5877  input:
5878    - type: normalize
5879      config:
5880        trim: true
5881        collapse_whitespace: true
5882"#;
5883        let mock = mock_with_response("Got your message.");
5884        let builder = AgentBuilder::from_yaml(yaml).unwrap();
5885        let agent = builder.llm(Arc::new(mock.clone())).build().unwrap();
5886
5887        let _ = agent.chat("  hello   world  ").await.unwrap();
5888
5889        // Verify the LLM received the normalized input (trimmed + collapsed whitespace)
5890        let history = mock.call_history();
5891        assert!(!history.is_empty());
5892        // The user message in LLM call should be normalized
5893        let last_call = history.last().unwrap();
5894        let user_msg = last_call
5895            .messages
5896            .iter()
5897            .find(|m| m.role == ai_agents_core::Role::User)
5898            .unwrap();
5899        assert_eq!(user_msg.content, "hello world");
5900    }
5901
5902    // ═══════════════════════════════════════════════════════════
5903    // Integration Test 2.1.7: Memory compression triggers
5904    // ═══════════════════════════════════════════════════════════
5905    #[tokio::test]
5906    async fn test_integration_memory_compression() {
5907        let yaml = r#"
5908name: MemoryAgent
5909system_prompt: "You are helpful."
5910memory:
5911  type: compacting
5912  max_messages: 100
5913  compress_threshold: 5
5914  max_recent_messages: 3
5915  summarize_batch_size: 2
5916"#;
5917        // Provide enough responses for compression to trigger
5918        let responses: Vec<&str> = (0..8).map(|_| "Response from assistant.").collect();
5919        let mock = mock_with_responses(responses);
5920        let builder = AgentBuilder::from_yaml(yaml).unwrap();
5921        let agent = builder.llm(Arc::new(mock)).build().unwrap();
5922
5923        // Send enough messages to trigger compression
5924        for i in 0..6 {
5925            let _ = agent.chat(&format!("Message {}", i)).await.unwrap();
5926        }
5927
5928        // Memory should have compressed - verify it didn't crash
5929        // and that messages are bounded
5930        let messages = agent.memory.get_messages(None).await.unwrap();
5931        // With compress_threshold=5 and max_recent_messages=3,
5932        // after 6 turns (12 messages), compression should have run
5933        assert!(messages.len() <= 12); // At most all messages if no compression, fewer if compressed
5934    }
5935
5936    // YAML with multiple LLMs
5937    #[tokio::test]
5938    async fn test_integration_multi_llm_registry() {
5939        let mut mock_default = MockLLMProvider::new("default");
5940        mock_default.set_response("Default LLM response.");
5941        let mut mock_router = MockLLMProvider::new("router");
5942        mock_router.set_response("Router response.");
5943
5944        let agent = AgentBuilder::new()
5945            .system_prompt("You are helpful.")
5946            .llm_alias("default", Arc::new(mock_default))
5947            .llm_alias("router", Arc::new(mock_router))
5948            .build()
5949            .unwrap();
5950
5951        let response = agent.chat("Hello").await.unwrap();
5952        assert_eq!(response.content, "Default LLM response.");
5953    }
5954
5955    // Agent reset clears state
5956    #[tokio::test]
5957    async fn test_integration_agent_reset() {
5958        let mock = mock_with_responses(vec!["Hello!", "Hello again!"]);
5959        let agent = AgentBuilder::new()
5960            .system_prompt("You are helpful.")
5961            .llm(Arc::new(mock))
5962            .build()
5963            .unwrap();
5964
5965        let _ = agent.chat("Hi").await.unwrap();
5966        let messages = agent.memory.get_messages(None).await.unwrap();
5967        assert_eq!(messages.len(), 2); // user + assistant
5968
5969        agent.reset().await.unwrap();
5970        let messages = agent.memory.get_messages(None).await.unwrap();
5971        assert_eq!(messages.len(), 0);
5972    }
5973
5974    // Process pipeline rejects input
5975    #[tokio::test]
5976    async fn test_integration_process_validate_reject() {
5977        use ai_agents_process::{ProcessConfig, ProcessProcessor};
5978
5979        let validate_config = ai_agents_process::ValidateStage {
5980            id: Some("length_check".to_string()),
5981            condition: None,
5982            config: ai_agents_process::ValidateConfig {
5983                rules: vec![ai_agents_process::ValidationRule::MinLength {
5984                    min_length: 10,
5985                    on_fail: ai_agents_process::ValidationAction {
5986                        action: ai_agents_process::ValidationActionType::Reject,
5987                        message: None,
5988                    },
5989                }],
5990                ..Default::default()
5991            },
5992        };
5993        let process_config = ProcessConfig {
5994            input: vec![ai_agents_process::ProcessStage::Validate(validate_config)],
5995            ..Default::default()
5996        };
5997        let processor = ProcessProcessor::new(process_config);
5998
5999        let mock = mock_with_response("Should not reach here.");
6000        let agent = AgentBuilder::new()
6001            .system_prompt("You are helpful.")
6002            .llm(Arc::new(mock))
6003            .process_processor(processor)
6004            .build()
6005            .unwrap();
6006
6007        let response = agent.chat("Hi").await.unwrap();
6008        // Rejected input should produce a rejection response, not call LLM
6009        assert!(
6010            response.content.contains("rejected")
6011                || response.content.contains("Input rejected")
6012                || response.content.contains("too short")
6013                || response.content.contains("Too short")
6014                || response.content.len() < 50, // rejection message is typically short
6015            "Expected rejection response, got: {}",
6016            response.content
6017        );
6018    }
6019
6020    // LLM fallback: primary fails, fallback LLM responds
6021    #[tokio::test]
6022    async fn test_llm_fallback_on_failure() {
6023        use ai_agents_recovery::{ErrorRecoveryConfig, LLMFailureAction, LLMRecoveryConfig};
6024
6025        let mut primary = MockLLMProvider::new("primary");
6026        primary.set_error("Primary LLM is unavailable");
6027
6028        let mut fallback = MockLLMProvider::new("fallback");
6029        fallback.set_response("Fallback response works!");
6030
6031        let agent = AgentBuilder::new()
6032            .system_prompt("You are helpful.")
6033            .llm_alias("default", Arc::new(primary))
6034            .llm_alias("backup", Arc::new(fallback))
6035            .recovery_manager(RecoveryManager::new(ErrorRecoveryConfig {
6036                llm: LLMRecoveryConfig {
6037                    on_failure: LLMFailureAction::FallbackLlm {
6038                        fallback_llm: "backup".to_string(),
6039                    },
6040                    ..Default::default()
6041                },
6042                ..Default::default()
6043            }))
6044            .build()
6045            .unwrap();
6046
6047        let response = agent.chat("Hello").await.unwrap();
6048        assert!(
6049            response.content.contains("Fallback response"),
6050            "Expected fallback response, got: {}",
6051            response.content
6052        );
6053    }
6054
6055    // LLM fallback: primary fails, static message returned
6056    #[tokio::test]
6057    async fn test_llm_fallback_response_static_message() {
6058        use ai_agents_recovery::{ErrorRecoveryConfig, LLMFailureAction, LLMRecoveryConfig};
6059
6060        let mut primary = MockLLMProvider::new("primary");
6061        primary.set_error("Primary LLM is unavailable");
6062
6063        let agent = AgentBuilder::new()
6064            .system_prompt("You are helpful.")
6065            .llm(Arc::new(primary))
6066            .recovery_manager(RecoveryManager::new(ErrorRecoveryConfig {
6067                llm: LLMRecoveryConfig {
6068                    on_failure: LLMFailureAction::FallbackResponse {
6069                        message: "I am temporarily unavailable. Please try again later."
6070                            .to_string(),
6071                    },
6072                    ..Default::default()
6073                },
6074                ..Default::default()
6075            }))
6076            .build()
6077            .unwrap();
6078
6079        let response = agent.chat("Hello").await.unwrap();
6080        assert!(
6081            response.content.contains("temporarily unavailable"),
6082            "Expected static fallback message, got: {}",
6083            response.content
6084        );
6085    }
6086
6087    // Tool skip: tool fails, on_failure: skip absorbs the error
6088    #[tokio::test]
6089    async fn test_tool_failure_skip() {
6090        use ai_agents_recovery::{
6091            ErrorRecoveryConfig, ToolFailureAction, ToolRecoveryConfig, ToolRetryConfig,
6092        };
6093
6094        // LLM requests a nonexistent tool, then responds after seeing the skip result
6095        let mock = mock_with_responses(vec![
6096            r#"I'll use the nonexistent tool.
6097[TOOL_CALL: {"name": "nonexistent_tool", "arguments": {}}]"#,
6098            "The tool was unavailable, but I can still help you.",
6099        ]);
6100
6101        let agent = AgentBuilder::new()
6102            .system_prompt("You are helpful.")
6103            .llm(Arc::new(mock))
6104            .recovery_manager(RecoveryManager::new(ErrorRecoveryConfig {
6105                tools: ToolRecoveryConfig {
6106                    default: ToolRetryConfig {
6107                        max_retries: 0,
6108                        timeout_ms: None,
6109                        on_failure: ToolFailureAction::Skip,
6110                    },
6111                    ..Default::default()
6112                },
6113                ..Default::default()
6114            }))
6115            .build()
6116            .unwrap();
6117
6118        // The tool will fail (not found), but on_failure: skip absorbs the error
6119        let response = agent.chat("Use the nonexistent tool").await;
6120        assert!(
6121            response.is_ok(),
6122            "Expected Ok with skip policy, got: {:?}",
6123            response
6124        );
6125    }
6126}