Skip to main content

motosan_agent_loop/core/
engine.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use futures::future::{join_all, try_join_all};
5use motosan_agent_tool::{Tool, ToolContext, ToolDef, ToolResult};
6use tokio::sync::mpsc;
7
8use crate::context::ContextProvider;
9use crate::core::decision::FlowDecision;
10use crate::core::event::{AgentEvent, CoreEvent, ExtensionEvent};
11use crate::error::AgentError;
12use crate::llm::{LlmClient, TokenUsage, ToolCallItem};
13use crate::message::{Message, ToolCallRef};
14use crate::Result;
15
16/// The current operating mode of the agent loop.
17///
18/// When plan mode is enabled, the agent can enter `Planning` mode to explore
19/// and brainstorm before committing to an implementation approach.
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
21pub enum AgentMode {
22    /// Normal execution mode — the agent implements directly.
23    #[default]
24    Normal,
25    /// Planning mode — the agent explores, asks questions, and proposes a plan.
26    Planning,
27}
28
29/// Schedule controlling when `next_step_prompt` is injected.
30///
31/// Used by `StuckDetectionExtension` (in `src/extensions/stuck_detection/`)
32/// to control the cadence at which the think-before-act prompt is prepended
33/// to the conversation.
34///
35/// The default is `Every(1)` (inject every iteration), but injection only
36/// happens when a `next_step_prompt` has been configured on the extension.
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum NextStepSchedule {
39    /// Inject only before the first iteration.
40    FirstOnly,
41    /// Inject every `n` iterations (1 = every iteration, 2 = every other, …).
42    ///
43    /// A value of `0` is treated as `1`.
44    Every(usize),
45}
46
47impl Default for NextStepSchedule {
48    fn default() -> Self {
49        Self::Every(1)
50    }
51}
52
53impl NextStepSchedule {
54    /// Returns `true` if the prompt should be injected for the given
55    /// 1-based `iteration`.
56    // TODO(phase-4): remove once StuckDetectionExtension is the only caller.
57    #[allow(dead_code)]
58    pub(crate) fn should_inject(&self, iteration: usize) -> bool {
59        match self {
60            Self::FirstOnly => iteration == 1,
61            Self::Every(n) => {
62                let n = if *n == 0 { 1 } else { *n };
63                iteration == 1 || (iteration - 1).is_multiple_of(n)
64            }
65        }
66    }
67}
68
69/// Policy for handling channel backpressure when a bounded queue is full.
70#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
71pub enum BackpressurePolicy {
72    /// Block the sender until capacity is available (default).
73    #[default]
74    Block,
75    /// Drop the **incoming** (newest) op when the queue is full.
76    ///
77    /// Note: true drop-oldest semantics would require a ring-buffer channel.
78    /// This variant currently behaves like `Reject` but emits [`CoreEvent::OpDropped`]
79    /// instead of [`CoreEvent::OpRejected`], allowing callers to distinguish intent.
80    ///
81    /// [`CoreEvent::OpDropped`]: crate::core::CoreEvent::OpDropped
82    /// [`CoreEvent::OpRejected`]: crate::core::CoreEvent::OpRejected
83    DropOldest,
84    /// Reject the new item and emit a telemetry event.
85    Reject,
86}
87
88/// Configuration for bounded channel capacities used by [`AgentSession`](crate::AgentSession).
89///
90/// All capacities have sensible defaults (64) and can be overridden via the
91/// builder pattern on [`EngineBuilder`].
92#[derive(Debug, Clone)]
93pub struct ChannelConfig {
94    /// Capacity of the user-input channel (messages sent via `AgentSession::send`).
95    pub input_capacity: usize,
96    /// Capacity of the per-turn operations channel (ops like Interrupt, InjectHint).
97    pub ops_capacity: usize,
98    /// Backpressure policy applied when the ops channel is full.
99    pub ops_backpressure: BackpressurePolicy,
100    /// Capacity of the stream channel used by [`Engine::run`].
101    ///
102    /// `None` means use the default of `256`. The channel is bounded; if it
103    /// fills up, intermediate `Event` items are dropped (best-effort) but the
104    /// terminal item is always delivered (sent via the awaiting `send`).
105    pub stream_capacity: Option<usize>,
106}
107
108impl Default for ChannelConfig {
109    fn default() -> Self {
110        Self {
111            input_capacity: 64,
112            ops_capacity: 64,
113            ops_backpressure: BackpressurePolicy::Block,
114            stream_capacity: None,
115        }
116    }
117}
118
119// ---------------------------------------------------------------------------
120// Pipeline stage helpers
121// ---------------------------------------------------------------------------
122
123/// Resolved tool registry for a single run, combining static and dynamic tools.
124///
125/// This always clones the base maps so it can own the data. When `extra_tools`
126/// is empty the clone is the only cost; when extra tools are present they are
127/// inserted into the cloned maps.
128struct MergedTools {
129    map: HashMap<String, Arc<dyn Tool>>,
130    defs: Vec<ToolDef>,
131}
132
133impl MergedTools {
134    /// Clone base maps and merge any extra tools into them.
135    ///
136    /// Note: this always clones `base_map` and `base_defs` because `MergedTools`
137    /// owns its data. If `extra_tools` is empty the clone is the only cost.
138    fn new(
139        base_map: &HashMap<String, Arc<dyn Tool>>,
140        base_defs: &[ToolDef],
141        extra_tools: &[Arc<dyn Tool>],
142    ) -> Self {
143        if extra_tools.is_empty() {
144            return Self {
145                map: base_map.clone(),
146                defs: base_defs.to_vec(),
147            };
148        }
149        let mut map = base_map.clone();
150        let mut defs = base_defs.to_vec();
151        for t in extra_tools {
152            map.insert(t.def().name.clone(), Arc::clone(t));
153            defs.push(t.def());
154        }
155        Self { map, defs }
156    }
157
158    fn tool_map(&self) -> &HashMap<String, Arc<dyn Tool>> {
159        &self.map
160    }
161
162    fn tool_defs(&self) -> &[ToolDef] {
163        &self.defs
164    }
165}
166
167/// Outcome of processing an LLM text response via
168/// `Engine::handle_text_response`.
169enum TextResponseOutcome {
170    /// The turn should finalize with this text as the answer. The caller
171    /// still needs to push `Message::assistant(text)` onto the state
172    /// and build the `AgentResult` (the helper doesn't do this because
173    /// the caller returns different types — `AgentResult` vs the
174    /// `(Result<AgentResult>, Vec<Message>)` tuple).
175    Finalize(String),
176    /// An extension injected a continuation message. The helper already
177    /// pushed `Message::assistant(text)` and `state.messages.push(msg)`,
178    /// and updated `state.continuation_text`. The caller should loop back
179    /// to another LLM call immediately.
180    Continue,
181    /// An extension halted the turn. The caller should return
182    /// `Err(AgentError::Internal(...))` with the halt reason's message.
183    Halt(String),
184}
185
186/// Accumulator for per-run mutable state shared across turn iterations.
187struct RunTerminalMeta {
188    turn_result: crate::core::extension::TurnResult,
189    iteration: usize,
190    token_usage: TokenUsage,
191}
192
193struct TurnState {
194    messages: Vec<Message>,
195    all_tool_calls: Vec<(String, serde_json::Value)>,
196    total_usage: TokenUsage,
197    /// Accumulated text across all token-budget continuation segments.
198    continuation_text: String,
199}
200
201impl TurnState {
202    fn new(messages: Vec<Message>) -> Self {
203        Self {
204            messages,
205            all_tool_calls: Vec::new(),
206            total_usage: TokenUsage::default(),
207            continuation_text: String::new(),
208        }
209    }
210
211    /// Record token usage from an LLM call.
212    fn accumulate_usage(&mut self, usage: Option<TokenUsage>) {
213        if let Some(u) = usage {
214            self.total_usage.accumulate(u);
215        }
216    }
217
218    /// Build the final result after the LLM produces a text answer.
219    ///
220    /// If there were continuations, the accumulated continuation text is
221    /// prepended to the final segment so the full answer is returned.
222    fn into_result(self, answer: String, iteration: usize) -> AgentResult {
223        let full_answer = if self.continuation_text.is_empty() {
224            answer
225        } else {
226            let mut buf = self.continuation_text;
227            buf.push_str(&answer);
228            buf
229        };
230        AgentResult {
231            answer: full_answer,
232            tool_calls: self.all_tool_calls,
233            iterations: iteration,
234            usage: self.total_usage,
235            messages: self.messages,
236        }
237    }
238}
239
240impl RunTerminalMeta {
241    fn success(agent_result: &AgentResult) -> Self {
242        Self {
243            turn_result: crate::core::extension::TurnResult::Success,
244            iteration: agent_result.iterations,
245            token_usage: agent_result.usage,
246        }
247    }
248
249    fn interrupted(iteration: usize, token_usage: TokenUsage) -> Self {
250        Self {
251            turn_result: crate::core::extension::TurnResult::Interrupted,
252            iteration,
253            token_usage,
254        }
255    }
256
257    fn failure(iteration: usize, token_usage: TokenUsage, err: &AgentError) -> Self {
258        Self {
259            turn_result: crate::core::extension::TurnResult::Failure(err.to_string()),
260            iteration,
261            token_usage,
262        }
263    }
264}
265
266fn run_return_success(
267    res: AgentResult,
268    snapshot: Vec<Message>,
269) -> (Result<AgentResult>, Vec<Message>, RunTerminalMeta) {
270    let meta = RunTerminalMeta::success(&res);
271    (Ok(res), snapshot, meta)
272}
273
274fn run_return_interrupted(
275    res: AgentResult,
276    snapshot: Vec<Message>,
277) -> (Result<AgentResult>, Vec<Message>, RunTerminalMeta) {
278    let meta = RunTerminalMeta::interrupted(res.iterations, res.usage);
279    (Ok(res), snapshot, meta)
280}
281
282fn run_return_failure(
283    err: AgentError,
284    messages: Vec<Message>,
285    iteration: usize,
286    token_usage: TokenUsage,
287) -> (Result<AgentResult>, Vec<Message>, RunTerminalMeta) {
288    let meta = RunTerminalMeta::failure(iteration, token_usage, &err);
289    (Err(err), messages, meta)
290}
291
292/// Stage: emit ToolStarted events for all items in a batch.
293fn emit_tool_started(items: &[ToolCallItem], on_event: &(impl Fn(AgentEvent) + Send + Sync)) {
294    for tc in items {
295        on_event(AgentEvent::Core(CoreEvent::ToolStarted {
296            name: tc.name.clone(),
297        }));
298    }
299}
300
301/// Merge streamed tool results with batch-executed results for any tools that
302/// were not streamed, reassembling everything in the canonical order from the
303/// `Done` payload.
304///
305/// Returns `(final_items, final_results)` in the same order as `canonical_items`.
306async fn merge_streamed_tool_results(
307    canonical_items: &[ToolCallItem],
308    streamed_items: Vec<ToolCallItem>,
309    streamed_results: Vec<ToolResult>,
310    tool_map: &HashMap<String, Arc<dyn Tool>>,
311    tool_timeout: Option<std::time::Duration>,
312    tool_context: &ToolContext,
313    on_event: &(impl Fn(AgentEvent) + Send + Sync),
314) -> (Vec<ToolCallItem>, Vec<ToolResult>) {
315    let streamed_ids: std::collections::HashSet<&str> =
316        streamed_items.iter().map(|i| i.id.as_str()).collect();
317
318    // Find tools in the canonical payload that were NOT streamed.
319    let remaining: Vec<ToolCallItem> = canonical_items
320        .iter()
321        .filter(|item| !streamed_ids.contains(item.id.as_str()))
322        .cloned()
323        .collect();
324
325    // Build lookup of streamed results by ID.
326    let streamed_map: HashMap<&str, ToolResult> = streamed_items
327        .iter()
328        .zip(streamed_results)
329        .map(|(item, result)| (item.id.as_str(), result))
330        .collect();
331
332    // Execute remaining tools (if any) and build their lookup.
333    let remaining_map: HashMap<String, ToolResult> = if !remaining.is_empty() {
334        emit_tool_started(&remaining, on_event);
335        let remaining_results =
336            Engine::execute_tools_parallel(tool_map, &remaining, tool_timeout, tool_context).await;
337        remaining
338            .iter()
339            .zip(remaining_results)
340            .map(|(item, result)| (item.id.clone(), result))
341            .collect()
342    } else {
343        HashMap::new()
344    };
345
346    // Reassemble in canonical order, with a defensive fallback for missing IDs.
347    let mut final_items = Vec::with_capacity(canonical_items.len());
348    let mut final_results = Vec::with_capacity(canonical_items.len());
349
350    for item in canonical_items {
351        let result = if let Some(r) = streamed_map.get(item.id.as_str()).cloned() {
352            r
353        } else if let Some(r) = remaining_map.get(item.id.as_str()).cloned() {
354            r
355        } else {
356            // Defensive: this should never happen if the backend follows the
357            // contract, but we must not silently drop a tool call — the LLM
358            // expects a result for every call it made.
359            ToolResult::error(format!(
360                "internal error: no result for tool call '{}'",
361                item.id
362            ))
363        };
364        final_items.push(item.clone());
365        final_results.push(result);
366    }
367
368    (final_items, final_results)
369}
370
371/// A single question within an `ask_user` interaction.
372#[derive(Debug, Clone)]
373pub struct AskUserQuestion {
374    /// The question text to display to the user.
375    pub question: String,
376    /// An optional short header (max 12 chars) for grouping or labeling.
377    pub header: Option<String>,
378    /// The available options for the user to choose from.
379    pub options: Vec<AskUserOption>,
380    /// Whether the user can select multiple options.
381    pub multi_select: bool,
382}
383
384/// A single option within an [`AskUserQuestion`].
385#[derive(Debug, Clone)]
386pub struct AskUserOption {
387    /// The display label for this option.
388    pub label: String,
389    /// An optional longer description of this option.
390    pub description: Option<String>,
391    /// An optional preview string (e.g. a code snippet or summary).
392    pub preview: Option<String>,
393}
394
395/// Commands that can be sent into a running [`Engine`].
396#[derive(Debug, Clone)]
397pub enum AgentOp {
398    /// Stop the current turn at the next safe checkpoint.
399    Interrupt,
400    /// Append a user message before the next LLM iteration.
401    InjectUserMessage(String),
402    /// Append a user-visible note before the next LLM iteration.
403    InjectHint(String),
404    /// Answer a pending `ask_user` request.
405    AskUserAnswer {
406        call_id: Option<String>,
407        answer: String,
408    },
409    /// Approve or reject a pending plan from `exit_plan_mode`.
410    ApprovePlan {
411        /// Whether the plan is approved.
412        approved: bool,
413        /// Optional feedback (used when rejecting).
414        feedback: Option<String>,
415    },
416}
417
418/// The final outcome produced by a completed agent run — the return
419/// type of [`RunBuilder::result`] and [`RunBuilder::callback`], and the
420/// payload carried by the terminal item of [`RunBuilder::stream`].
421#[derive(Debug, Clone)]
422pub struct AgentResult {
423    /// The assistant's final textual answer.
424    pub answer: String,
425    /// History of tool calls made: (tool_name, arguments).
426    pub tool_calls: Vec<(String, serde_json::Value)>,
427    /// Number of LLM round-trips performed.
428    pub iterations: usize,
429    /// Accumulated token usage across all LLM calls.
430    pub usage: TokenUsage,
431    /// Full conversation history including tool call/result pairs.
432    ///
433    /// Callers can pass this to a subsequent `run()` call to continue a
434    /// multi-turn conversation.
435    pub messages: Vec<Message>,
436}
437
438/// Builder for constructing an [`Engine`] with validated configuration.
439pub struct EngineBuilder {
440    tools: Vec<Arc<dyn Tool>>,
441    context_providers: Vec<Box<dyn ContextProvider>>,
442    max_iterations: usize,
443    tool_timeout: Option<std::time::Duration>,
444    tool_context: Option<ToolContext>,
445    channel_config: ChannelConfig,
446    #[cfg(feature = "mcp-client")]
447    mcp_servers: Vec<Arc<dyn crate::mcp::McpServer>>,
448    /// Extensions pending registration; drained into `ExtensionSet` in `build()`.
449    pending_extensions: Vec<(Box<dyn crate::core::Extension>, crate::core::ErrorPolicy)>,
450}
451
452impl EngineBuilder {
453    /// Set the maximum number of LLM round-trips before aborting.
454    pub fn max_iterations(mut self, n: usize) -> Self {
455        self.max_iterations = n;
456        self
457    }
458
459    /// Register a tool with the agent loop.
460    pub fn tool(mut self, tool: Arc<dyn Tool>) -> Self {
461        self.tools.push(tool);
462        self
463    }
464
465    /// Register multiple tools at once.
466    pub fn tools(mut self, tools: impl IntoIterator<Item = Arc<dyn Tool>>) -> Self {
467        self.tools.extend(tools);
468        self
469    }
470
471    /// Set a static system prompt that is injected as a `System` message
472    /// at the beginning of every conversation.
473    ///
474    /// This is a convenience shortcut for registering a [`ContextProvider`]
475    /// that always returns the given string.
476    pub fn system_prompt(self, prompt: impl Into<String>) -> Self {
477        self.context(crate::context::StringContextProvider(prompt.into()))
478    }
479
480    /// Register a context provider that injects dynamic context into the
481    /// conversation before each [`Engine::run`] call.
482    ///
483    /// Multiple providers can be registered; they are invoked in registration
484    /// order and each non-empty result becomes a `System` message.
485    pub fn context(mut self, provider: impl ContextProvider + 'static) -> Self {
486        self.context_providers.push(Box::new(provider));
487        self
488    }
489
490    /// Register multiple context providers at once.
491    ///
492    /// This is the batch equivalent of [`context()`](Self::context), mirroring
493    /// the [`tools()`](Self::tools) / [`tool()`](Self::tool) pattern.
494    pub fn contexts(
495        mut self,
496        providers: impl IntoIterator<Item = Box<dyn ContextProvider>>,
497    ) -> Self {
498        self.context_providers.extend(providers);
499        self
500    }
501
502    /// Register an MCP server whose tools will be connected lazily on `run()`.
503    ///
504    /// Requires the `mcp-client` feature.
505    #[cfg(feature = "mcp-client")]
506    pub fn mcp_server(mut self, server: impl crate::mcp::McpServer + 'static) -> Self {
507        self.mcp_servers.push(std::sync::Arc::new(server));
508        self
509    }
510
511    /// Register a pre-shared `Arc<dyn McpServer>` whose tools will be connected
512    /// lazily on `run()`.
513    ///
514    /// This is useful when the same MCP server instance must be shared between
515    /// multiple [`Engine`]s or retained by the caller.
516    ///
517    /// Requires the `mcp-client` feature.
518    #[cfg(feature = "mcp-client")]
519    pub fn mcp_server_arc(mut self, server: std::sync::Arc<dyn crate::mcp::McpServer>) -> Self {
520        self.mcp_servers.push(server);
521        self
522    }
523
524    /// Set a per-tool-call timeout.  When a tool takes longer than `duration`
525    /// it returns a `ToolResult::error("timed out")` instead of blocking.
526    ///
527    /// By default there is no timeout.
528    pub fn tool_timeout(mut self, duration: std::time::Duration) -> Self {
529        self.tool_timeout = Some(duration);
530        self
531    }
532
533    /// Set a custom [`ToolContext`] that will be passed to every tool invocation.
534    ///
535    /// By default, [`ToolContext::default()`] is used. Use this when tools need
536    /// access to session-level state such as a working directory or environment
537    /// variables.
538    pub fn tool_context(mut self, ctx: ToolContext) -> Self {
539        self.tool_context = Some(ctx);
540        self
541    }
542
543    /// Set the full channel configuration for bounded queues.
544    ///
545    /// See [`ChannelConfig`] for details on each field.
546    pub fn channel_config(mut self, config: ChannelConfig) -> Self {
547        self.channel_config = config;
548        self
549    }
550
551    /// Set the capacity of the user-input channel.
552    ///
553    /// Default: 64.
554    pub fn input_channel_capacity(mut self, capacity: usize) -> Self {
555        self.channel_config.input_capacity = capacity;
556        self
557    }
558
559    /// Set the capacity of the per-turn operations channel.
560    ///
561    /// Default: 64.
562    pub fn ops_channel_capacity(mut self, capacity: usize) -> Self {
563        self.channel_config.ops_capacity = capacity;
564        self
565    }
566
567    /// Set the backpressure policy for the operations channel.
568    ///
569    /// Default: [`BackpressurePolicy::Block`].
570    pub fn ops_backpressure(mut self, policy: BackpressurePolicy) -> Self {
571        self.channel_config.ops_backpressure = policy;
572        self
573    }
574
575    /// Set the capacity of the stream channel used by
576    /// [`Engine::run`].
577    ///
578    /// Default: `256`. The channel is bounded; intermediate `Event` items may
579    /// be dropped (best-effort) if the consumer is slower than the producer,
580    /// but the terminal item is always delivered.
581    pub fn stream_capacity(mut self, capacity: usize) -> Self {
582        assert!(capacity > 0, "stream_capacity must be >= 1");
583        self.channel_config.stream_capacity = Some(capacity);
584        self
585    }
586
587    /// Register an extension. See [`crate::core::Extension`] and the
588    /// extension architecture spec for usage. Extensions are dispatched
589    /// in registration order.
590    pub fn extension(mut self, ext: Box<dyn crate::core::Extension>) -> Self {
591        self.pending_extensions
592            .push((ext, crate::core::ErrorPolicy::Fallback));
593        self
594    }
595
596    /// Register an extension with an explicit [`crate::core::ErrorPolicy`].
597    ///
598    /// `ErrorPolicy::Fallback` (the default for [`Self::extension`]) logs hook
599    /// errors and treats them as the hook's default value. `ErrorPolicy::Abort`
600    /// propagates hook errors as turn failures via [`AgentError::Internal`].
601    pub fn extension_with_policy(
602        mut self,
603        ext: Box<dyn crate::core::Extension>,
604        policy: crate::core::ErrorPolicy,
605    ) -> Self {
606        self.pending_extensions.push((ext, policy));
607        self
608    }
609
610    /// Register the built-in extensions from a declarative
611    /// [`crate::extensions::config::ExtensionsConfig`] plus runtime
612    /// [`crate::extensions::config::ExtensionWiring`].
613    ///
614    /// This is a one-call convenience over [`Self::extension`]. The
615    /// builder reuses its own [`Self::tool_context`] so callers don't
616    /// pass it twice — set `.tool_context(...)` **before** calling
617    /// `with_extensions_config` if you need a non-default context.
618    /// The context is snapshotted at call time; a `.tool_context()`
619    /// call placed *after* this method will not reach the extensions
620    /// registered by this call (though it will still apply to the
621    /// resulting `Engine`).
622    ///
623    /// Returns `Err` if the config/wiring pair is invalid (see
624    /// [`crate::extensions::config::ExtensionBuildError`]).
625    ///
626    /// This method is fully compatible with the direct
627    /// [`Self::extension`] path — you can mix config-driven
628    /// registration with manual `.extension(...)` calls in either
629    /// order.
630    pub fn with_extensions_config(
631        mut self,
632        config: crate::extensions::config::ExtensionsConfig,
633        wiring: crate::extensions::config::ExtensionWiring,
634    ) -> std::result::Result<Self, crate::extensions::config::ExtensionBuildError> {
635        let tool_context = self.tool_context.clone().unwrap_or_default();
636        let set = crate::extensions::config::build_extension_set(&config, wiring, tool_context)?;
637        self.pending_extensions.extend(set.into_inner());
638        Ok(self)
639    }
640
641    /// Consume the builder and produce an [`Engine`].
642    ///
643    /// # Panics
644    ///
645    /// Panics if `input_capacity` or `ops_capacity` is zero, since
646    /// `tokio::sync::mpsc::channel(0)` panics at runtime.
647    pub fn build(self) -> Engine {
648        assert!(
649            self.channel_config.input_capacity > 0,
650            "input_capacity must be >= 1"
651        );
652        assert!(
653            self.channel_config.ops_capacity > 0,
654            "ops_capacity must be >= 1"
655        );
656        let tool_map: HashMap<String, Arc<dyn Tool>> = self
657            .tools
658            .iter()
659            .map(|t| (t.def().name.clone(), Arc::clone(t)))
660            .collect();
661        let mut tool_defs: Vec<ToolDef> = self.tools.iter().map(|t| t.def()).collect();
662        // Aggregate tool defs from registered extensions (tool_defs() trait method).
663        // This lets extensions expose their tools (ask_user, enter/exit_plan_mode,
664        // planning, etc.) without any builder-level configuration.
665        for (ext, _) in &self.pending_extensions {
666            tool_defs.extend(ext.tool_defs());
667        }
668        let mut extensions = crate::core::ExtensionSet::new();
669        for (ext, policy) in self.pending_extensions {
670            extensions
671                .add_with_policy(ext, policy)
672                .unwrap_or_else(|e| panic!("duplicate extension name: {}", e.message()));
673        }
674
675        Engine {
676            tool_map,
677            tool_defs,
678            context_providers: self.context_providers,
679            max_iterations: self.max_iterations,
680            tool_timeout: self.tool_timeout,
681            tool_context: self.tool_context.unwrap_or_default(),
682            channel_config: self.channel_config,
683            extensions: tokio::sync::Mutex::new(extensions),
684            deferred_calls: tokio::sync::Mutex::new(std::collections::HashMap::new()),
685            #[cfg(feature = "mcp-client")]
686            mcp_servers: self.mcp_servers,
687            #[cfg(test)]
688            test_parked_notify: std::sync::Mutex::new(None),
689        }
690    }
691}
692
693/// The core ReAct agent loop.
694///
695/// Drives an LLM through iterative reasoning and tool execution until
696/// the model produces a final text answer or the iteration limit is
697/// reached.
698///
699/// # Starting a turn
700///
701/// Every agent turn goes through [`Engine::run`], which returns a
702/// [`RunBuilder`]. Chain axis setters (`.ops`, `.cancel`, `.chunked`)
703/// in any order, then call one of the three terminators
704/// (`.result`, `.callback`, `.stream`) to execute the turn:
705///
706/// ```ignore
707/// use std::sync::Arc;
708/// use motosan_agent_loop::{Engine, Message};
709///
710/// # async fn demo(llm: Arc<dyn motosan_agent_loop::LlmClient>) -> motosan_agent_loop::Result<()> {
711/// let agent = Arc::new(Engine::builder().build());
712///
713/// // Simplest: just the final result.
714/// let result = Arc::clone(&agent)
715///     .run(llm.clone(), vec![Message::user("Hi!")])
716///     .result()
717///     .await?;
718///
719/// // Live token-by-token UI with AskUser support:
720/// # let (_tx, ops_rx) = tokio::sync::mpsc::channel(8);
721/// Arc::clone(&agent)
722///     .run(llm, vec![Message::user("Ask me a question")])
723///     .chunked()
724///     .ops(ops_rx)
725///     .callback(|event| println!("{event:?}"))
726///     .await?;
727/// # Ok(()) }
728/// ```
729///
730/// See [`RunBuilder`] for the full axis/terminator matrix and
731/// [`Engine::run`] for the entry point.
732///
733/// The loop does **not** own the LLM client; instead, [`RunBuilder`]
734/// takes `Arc<dyn LlmClient>` so the same loop can be reused with
735/// different backends.
736pub struct Engine {
737    /// Pre-built lookup map for static tools (excludes per-run MCP tools).
738    tool_map: HashMap<String, Arc<dyn Tool>>,
739    /// Pre-built tool definitions for static tools (excludes per-run MCP tools).
740    tool_defs: Vec<ToolDef>,
741    context_providers: Vec<Box<dyn ContextProvider>>,
742    max_iterations: usize,
743    /// Optional per-tool-call timeout.  When set, any tool that takes longer
744    /// returns `ToolResult::error("timed out")` rather than blocking indefinitely.
745    pub(crate) tool_timeout: Option<std::time::Duration>,
746    /// Context passed to every tool invocation.
747    tool_context: ToolContext,
748    /// Channel configuration for bounded queues and backpressure.
749    channel_config: ChannelConfig,
750    /// Registered extensions dispatched on each iteration. Wrapped in a
751    /// `Mutex` so hooks can be called through the `&self` methods of `Engine`.
752    extensions: tokio::sync::Mutex<crate::core::ExtensionSet>,
753    /// Tool calls suspended by extensions via `ToolDecision::Defer`,
754    /// awaiting a matching `OpDecision::ResumeDeferred` from an
755    /// extension's `on_op` hook. See spec §8.3.
756    ///
757    /// `tokio::sync::Mutex` because both `intercept_tool_call` (writes
758    /// new entries) and `on_op` (removes resolved entries) can run
759    /// from different async contexts within a single turn.
760    #[allow(dead_code)] // Used in Phase 3B Tasks 4-5
761    deferred_calls: tokio::sync::Mutex<std::collections::HashMap<String, DeferredCall>>,
762    #[cfg(feature = "mcp-client")]
763    mcp_servers: Vec<Arc<dyn crate::mcp::McpServer>>,
764
765    /// Test-only hook fired by `resolve_deferred_slots` each time it
766    /// is about to park on `ops_rx.recv()`. Lets tests synchronize on
767    /// "engine is now waiting for an op" without wall-clock sleeps.
768    /// Set via `Engine::set_test_parked_notify`. Never touched in
769    /// release builds.
770    ///
771    /// Note: this hook fires on EVERY park, not just the first. If a
772    /// deferred-slot resolution takes multiple iterations (e.g. a tool
773    /// call that defers again after its first resolve), the notifier
774    /// fires once per iteration. Tests that need to distinguish first-
775    /// park from subsequent parks should use a dedicated Notify per
776    /// expected park point, or set the field to `None` between assertions.
777    #[cfg(test)]
778    test_parked_notify: std::sync::Mutex<Option<Arc<tokio::sync::Notify>>>,
779}
780
781/// Result from consuming a streaming LLM response.
782struct ConsumeStreamResult {
783    accumulated_text: String,
784    response: crate::LlmResponse,
785    streaming_results: Option<(Vec<ToolCallItem>, Vec<ToolResult>)>,
786    stop_reason: Option<crate::llm::StopReason>,
787    /// Output tokens consumed by this stream call (delta from before/after).
788    output_tokens: Option<u64>,
789}
790
791impl Engine {
792    /// Create a builder with default settings.
793    pub fn builder() -> EngineBuilder {
794        EngineBuilder {
795            tools: Vec::new(),
796            context_providers: Vec::new(),
797            max_iterations: 10,
798            tool_timeout: None,
799            tool_context: None,
800            channel_config: ChannelConfig::default(),
801            #[cfg(feature = "mcp-client")]
802            mcp_servers: Vec::new(),
803            pending_extensions: Vec::new(),
804        }
805    }
806
807    /// Returns the channel configuration for this agent loop.
808    ///
809    /// Used by [`AgentSession`](crate::AgentSession) to create bounded
810    /// channels with the configured capacities and policies.
811    pub fn channel_config(&self) -> &ChannelConfig {
812        &self.channel_config
813    }
814
815    /// Install a notifier fired each time `resolve_deferred_slots`
816    /// is about to park on `ops_rx.recv()`. The same notifier is
817    /// reused across parks within a single turn; see the field doc
818    /// on `test_parked_notify` for multi-iteration caveats.
819    /// Test-only — not part of the public API.
820    #[cfg(test)]
821    #[allow(dead_code)]
822    pub(crate) fn set_test_parked_notify(&self, notify: Arc<tokio::sync::Notify>) {
823        *self.test_parked_notify.lock().unwrap() = Some(notify);
824    }
825
826    /// Connect all MCP servers, collecting their tools.
827    ///
828    /// If any server fails to connect, all previously-connected servers are
829    /// disconnected (best-effort) before the error is returned.
830    #[cfg(feature = "mcp-client")]
831    async fn connect_mcp_servers(&self) -> Result<Vec<Arc<dyn Tool>>> {
832        use crate::mcp::adapter::McpToolAdapter;
833
834        let mut connected: Vec<&Arc<dyn crate::mcp::McpServer>> = Vec::new();
835        let mut tools: Vec<Arc<dyn Tool>> = Vec::new();
836
837        for server in &self.mcp_servers {
838            match server.connect().await {
839                Ok(()) => {
840                    connected.push(server);
841                    match McpToolAdapter::from_server(Arc::clone(server)).await {
842                        Ok(adapter_tools) => tools.extend(adapter_tools),
843                        Err(e) => {
844                            for s in &connected {
845                                let _ = s.disconnect().await;
846                            }
847                            return Err(e);
848                        }
849                    }
850                }
851                Err(e) => {
852                    for s in &connected {
853                        let _ = s.disconnect().await;
854                    }
855                    return Err(e);
856                }
857            }
858        }
859
860        Ok(tools)
861    }
862
863    #[cfg(feature = "cancellation")]
864    async fn run_streaming_with_cancel_inner(
865        &self,
866        llm: &dyn LlmClient,
867        messages: Vec<Message>,
868        extra_tools: &[Arc<dyn Tool>],
869        on_event: impl Fn(AgentEvent) + Send + Sync,
870        cancel: &tokio_util::sync::CancellationToken,
871    ) -> (Result<AgentResult>, Vec<Message>, RunTerminalMeta) {
872        use futures::StreamExt;
873
874        let tools = MergedTools::new(&self.tool_map, &self.tool_defs, extra_tools);
875        let prepared = match self.prepare_messages(messages).await {
876            Ok(m) => m,
877            Err(e) => return run_return_failure(e, Vec::new(), 0, TokenUsage::default()),
878        };
879        let mut state = TurnState::new(prepared);
880
881        for iteration in 1..=self.max_iterations {
882            // Pre-turn: cancellation check.
883            if cancel.is_cancelled() {
884                return run_return_failure(
885                    AgentError::Cancelled,
886                    state.messages,
887                    iteration,
888                    state.total_usage,
889                );
890            }
891            on_event(AgentEvent::Core(CoreEvent::IterationStarted { iteration }));
892
893            // before_iteration extension hook (after IterationStarted, before request build).
894            if let Err(e) = self
895                .dispatch_before_iteration(iteration, &mut state.messages, &on_event)
896                .await
897            {
898                return run_return_failure(e, state.messages, iteration, state.total_usage);
899            }
900
901            // Autocompact check before LLM call (after IterationStarted).
902            if let Err(e) = self
903                .maybe_compact_via_extensions(&mut state.messages, &on_event)
904                .await
905            {
906                return run_return_failure(e, state.messages, iteration, state.total_usage);
907            }
908
909            loop {
910                // Streaming LLM step with cancellation race and eager tool execution.
911                let (
912                    accumulated_text,
913                    response,
914                    streaming_results,
915                    stream_stop_reason,
916                    output_tokens,
917                ) = {
918                    let mut stream = llm.chat_stream(&state.messages, tools.tool_defs());
919                    let mut accumulated = String::new();
920                    let mut final_response: Option<crate::LlmResponse> = None;
921                    let mut executor = crate::streaming_executor::StreamingToolExecutor::new();
922                    let mut submitted_ids: std::collections::HashSet<String> =
923                        std::collections::HashSet::new();
924                    let mut stop_reason: Option<crate::llm::StopReason> = None;
925
926                    // Snapshot output tokens before this stream to compute the delta.
927                    let output_tokens_before = state.total_usage.output_tokens;
928
929                    loop {
930                        tokio::select! {
931                            chunk_opt = stream.next() => {
932                                match chunk_opt {
933                                    Some(chunk_result) => {
934                                        let chunk = match chunk_result {
935                                            Ok(c) => c,
936                                            Err(e) => return run_return_failure(e, state.messages.clone(), iteration, state.total_usage),
937                                        };
938                                        match chunk {
939                                            crate::llm::StreamChunk::TextDelta(delta) => {
940                                                accumulated.push_str(&delta);
941                                                on_event(AgentEvent::Core(CoreEvent::TextChunk(delta)));
942                                            }
943                                            crate::llm::StreamChunk::ToolUse { id, name, args } => {
944                                                if !submitted_ids.contains(&id) {
945                                                    let item = crate::llm::ToolCallItem {
946                                                        id: id.clone(),
947                                                        name: name.clone(),
948                                                        args,
949                                                    };
950                                                    on_event(AgentEvent::Core(CoreEvent::ToolStarted {
951                                                        name: name.clone(),
952                                                    }));
953                                                    executor.submit(
954                                                        item,
955                                                        tools.tool_map(),
956                                                        self.tool_timeout,
957                                                        &self.tool_context,
958                                                    );
959                                                    submitted_ids.insert(id);
960                                                }
961                                            }
962                                            crate::llm::StreamChunk::Done(resp) => {
963                                                final_response = Some(resp);
964                                            }
965                                            crate::llm::StreamChunk::Usage(usage) => {
966                                                state.total_usage.accumulate(usage);
967                                            }
968                                            crate::llm::StreamChunk::StopReason(reason) => {
969                                                stop_reason = Some(reason);
970                                            }
971                                        }
972                                    }
973                                    None => break,
974                                }
975                            }
976                            _ = cancel.cancelled() => {
977                                return run_return_failure(AgentError::Cancelled, state.messages.clone(), iteration, state.total_usage);
978                            }
979                        }
980                    }
981
982                    let resp = final_response
983                        .unwrap_or_else(|| crate::LlmResponse::Message(accumulated.clone()));
984                    let sr = if executor.has_pending() {
985                        Some(executor.collect().await)
986                    } else {
987                        None
988                    };
989                    let delta = state.total_usage.output_tokens - output_tokens_before;
990                    let ot = if delta > 0 { Some(delta) } else { None };
991                    (accumulated, resp, sr, stop_reason, ot)
992                };
993
994                match response {
995                    crate::LlmResponse::Message(text) => {
996                        if accumulated_text.is_empty() && !text.is_empty() {
997                            on_event(AgentEvent::Core(CoreEvent::TextChunk(text.clone())));
998                        }
999                        on_event(AgentEvent::Core(CoreEvent::TextDone(text.clone())));
1000
1001                        let meta = crate::llm::LlmResponseMeta {
1002                            stop_reason: stream_stop_reason,
1003                            output_tokens,
1004                        };
1005                        let outcome = match self
1006                            .handle_text_response(text, meta, &mut state, &on_event)
1007                            .await
1008                        {
1009                            Ok(o) => o,
1010                            Err(e) => {
1011                                return run_return_failure(
1012                                    e,
1013                                    state.messages,
1014                                    iteration,
1015                                    state.total_usage,
1016                                )
1017                            }
1018                        };
1019
1020                        match outcome {
1021                            TextResponseOutcome::Continue => continue,
1022                            TextResponseOutcome::Halt(msg) => {
1023                                return run_return_failure(
1024                                    AgentError::Internal(format!(
1025                                        "Turn halted by extension: {}",
1026                                        msg
1027                                    )),
1028                                    state.messages,
1029                                    iteration,
1030                                    state.total_usage,
1031                                );
1032                            }
1033                            TextResponseOutcome::Finalize(text) => {
1034                                state.messages.push(Message::assistant(&text));
1035                                let res = state.into_result(text, iteration);
1036                                let snapshot = res.messages.clone();
1037                                return run_return_success(res, snapshot);
1038                            }
1039                        }
1040                    }
1041                    crate::LlmResponse::ToolCalls(items) => {
1042                        if let Some((streamed_items, streamed_results)) = streaming_results {
1043                            let (final_items, final_results) = merge_streamed_tool_results(
1044                                &items,
1045                                streamed_items,
1046                                streamed_results,
1047                                tools.tool_map(),
1048                                self.tool_timeout,
1049                                &self.tool_context,
1050                                &on_event,
1051                            )
1052                            .await;
1053                            if let Err(e) = self
1054                                .finalize_tool_call_batch(
1055                                    &final_items,
1056                                    final_results,
1057                                    iteration,
1058                                    tools.tool_defs(),
1059                                    &mut state,
1060                                    &on_event,
1061                                )
1062                                .await
1063                            {
1064                                return run_return_failure(
1065                                    e,
1066                                    state.messages,
1067                                    iteration,
1068                                    state.total_usage,
1069                                );
1070                            }
1071                        } else {
1072                            emit_tool_started(&items, &on_event);
1073                            let slots = self.dispatch_intercept_tool_calls(&items, &on_event).await;
1074                            // Resolve any Deferred slots. No ops channel in this path.
1075                            let mut tmp_ops_state = OpsState::default();
1076                            let slots = match self
1077                                .resolve_deferred_slots(
1078                                    slots,
1079                                    &mut tmp_ops_state,
1080                                    &mut None,
1081                                    &on_event,
1082                                )
1083                                .await
1084                            {
1085                                Ok(s) => s,
1086                                Err(e) => {
1087                                    return run_return_failure(
1088                                        e,
1089                                        state.messages,
1090                                        iteration,
1091                                        state.total_usage,
1092                                    )
1093                                }
1094                            };
1095                            let mut remaining_items: Vec<crate::llm::ToolCallItem> = Vec::new();
1096                            let mut remaining_indices: Vec<usize> = Vec::new();
1097                            let mut final_results: Vec<Option<motosan_agent_tool::ToolResult>> =
1098                                slots
1099                                    .into_iter()
1100                                    .map(|s| match s {
1101                                        InterceptedSlot::Resolved(r) => Some(r),
1102                                        InterceptedSlot::Pending => None,
1103                                        InterceptedSlot::Deferred(_) => {
1104                                            unreachable!(
1105                                                "InterceptedSlot::Deferred should have been \
1106                                                 resolved by resolve_deferred_slots"
1107                                            );
1108                                        }
1109                                    })
1110                                    .collect();
1111                            for (i, item) in items.iter().enumerate() {
1112                                if final_results[i].is_none() {
1113                                    remaining_items.push(item.clone());
1114                                    remaining_indices.push(i);
1115                                }
1116                            }
1117                            let execution_results = Self::execute_tools_parallel(
1118                                tools.tool_map(),
1119                                &remaining_items,
1120                                self.tool_timeout,
1121                                &self.tool_context,
1122                            )
1123                            .await;
1124                            for (exec_idx, original_idx) in remaining_indices.iter().enumerate() {
1125                                final_results[*original_idx] =
1126                                    Some(execution_results[exec_idx].clone());
1127                            }
1128                            let results: Vec<motosan_agent_tool::ToolResult> = final_results
1129                                .into_iter()
1130                                .map(|opt| opt.expect("all slots filled after intercept + execute"))
1131                                .collect();
1132                            if let Err(e) = self
1133                                .finalize_tool_call_batch(
1134                                    &items,
1135                                    results,
1136                                    iteration,
1137                                    tools.tool_defs(),
1138                                    &mut state,
1139                                    &on_event,
1140                                )
1141                                .await
1142                            {
1143                                return run_return_failure(
1144                                    e,
1145                                    state.messages,
1146                                    iteration,
1147                                    state.total_usage,
1148                                );
1149                            }
1150                        }
1151                        break; // Back to outer iteration loop.
1152                    }
1153                }
1154            }
1155        }
1156
1157        run_return_failure(
1158            AgentError::MaxIterations(self.max_iterations),
1159            state.messages,
1160            self.max_iterations,
1161            state.total_usage,
1162        )
1163    }
1164
1165    /// Consume a streaming LLM response, forwarding text deltas as events,
1166    /// accumulating usage, and eagerly submitting tool calls via
1167    /// [`StreamingToolExecutor`](crate::StreamingToolExecutor).
1168    ///
1169    /// When a [`StreamChunk::ToolUse`] is received mid-stream the tool starts
1170    /// executing immediately. After the stream ends the executor's results
1171    /// are collected. This overlaps tool execution with LLM output generation.
1172    ///
1173    /// Returns the accumulated text, the final [`LlmResponse`](crate::LlmResponse),
1174    /// and optionally the collected streaming tool results (items + results)
1175    /// if any tools were submitted during the stream.
1176    #[allow(clippy::too_many_arguments)]
1177    async fn consume_stream(
1178        llm: &dyn LlmClient,
1179        messages: &[Message],
1180        tool_defs: &[ToolDef],
1181        tool_map: &HashMap<String, Arc<dyn Tool>>,
1182        tool_timeout: Option<std::time::Duration>,
1183        tool_context: &motosan_agent_tool::ToolContext,
1184        total_usage: &mut TokenUsage,
1185        on_event: &(impl Fn(AgentEvent) + Send + Sync),
1186    ) -> Result<ConsumeStreamResult> {
1187        use futures::StreamExt;
1188
1189        let mut stream = llm.chat_stream(messages, tool_defs);
1190        let mut accumulated = String::new();
1191        let mut final_response: Option<crate::LlmResponse> = None;
1192        let mut executor = crate::streaming_executor::StreamingToolExecutor::new();
1193        let mut submitted_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
1194        let mut stop_reason: Option<crate::llm::StopReason> = None;
1195
1196        // Snapshot output tokens before this stream to compute the delta.
1197        let output_tokens_before = total_usage.output_tokens;
1198
1199        while let Some(chunk_result) = stream.next().await {
1200            let chunk = chunk_result?;
1201            match chunk {
1202                crate::llm::StreamChunk::TextDelta(delta) => {
1203                    accumulated.push_str(&delta);
1204                    on_event(AgentEvent::Core(CoreEvent::TextChunk(delta)));
1205                }
1206                crate::llm::StreamChunk::ToolUse { id, name, args } => {
1207                    if !submitted_ids.contains(&id) {
1208                        let item = ToolCallItem {
1209                            id: id.clone(),
1210                            name: name.clone(),
1211                            args,
1212                        };
1213                        on_event(AgentEvent::Core(CoreEvent::ToolStarted {
1214                            name: name.clone(),
1215                        }));
1216                        executor.submit(item, tool_map, tool_timeout, tool_context);
1217                        submitted_ids.insert(id);
1218                    }
1219                }
1220                crate::llm::StreamChunk::Done(response) => {
1221                    final_response = Some(response);
1222                }
1223                crate::llm::StreamChunk::Usage(usage) => {
1224                    total_usage.accumulate(usage);
1225                }
1226                crate::llm::StreamChunk::StopReason(reason) => {
1227                    stop_reason = Some(reason);
1228                }
1229            }
1230        }
1231
1232        let response =
1233            final_response.unwrap_or_else(|| crate::LlmResponse::Message(accumulated.clone()));
1234
1235        let streaming_results = if executor.has_pending() {
1236            Some(executor.collect().await)
1237        } else {
1238            None
1239        };
1240
1241        // Compute output token delta for this call.
1242        let output_tokens_delta = total_usage.output_tokens - output_tokens_before;
1243        let output_tokens = if output_tokens_delta > 0 {
1244            Some(output_tokens_delta)
1245        } else {
1246            None
1247        };
1248
1249        Ok(ConsumeStreamResult {
1250            accumulated_text: accumulated,
1251            response,
1252            streaming_results,
1253            stop_reason,
1254            output_tokens,
1255        })
1256    }
1257
1258    /// Prepend context-provider messages to the conversation.
1259    ///
1260    /// Each non-empty result is inserted in registration order starting at
1261    /// index 0 (i.e. at the *beginning* of the conversation, as documented
1262    /// on [`ContextProvider`]).
1263    async fn prepare_messages(&self, mut messages: Vec<Message>) -> Result<Vec<Message>> {
1264        if self.context_providers.is_empty() {
1265            return Ok(messages);
1266        }
1267        let query: String = messages
1268            .iter()
1269            .rev()
1270            .find(|m| m.role() == crate::message::Role::User)
1271            .map(|m| m.text())
1272            .unwrap_or_default();
1273
1274        let contexts = try_join_all(self.context_providers.iter().map(|p| p.build(&query))).await?;
1275
1276        let mut insert_idx = 0;
1277        for ctx in contexts {
1278            if !ctx.is_empty() {
1279                messages.insert(insert_idx, Message::system(&ctx));
1280                insert_idx += 1;
1281            }
1282        }
1283        Ok(messages)
1284    }
1285
1286    #[cfg(feature = "cancellation")]
1287    async fn run_inner_cancel(
1288        &self,
1289        llm: &dyn LlmClient,
1290        messages: Vec<Message>,
1291        extra_tools: &[Arc<dyn Tool>],
1292        on_event: &(impl Fn(AgentEvent) + Send + Sync),
1293        cancel: &tokio_util::sync::CancellationToken,
1294    ) -> (Result<AgentResult>, Vec<Message>, RunTerminalMeta) {
1295        let tools = MergedTools::new(&self.tool_map, &self.tool_defs, extra_tools);
1296        let prepared = match self.prepare_messages(messages).await {
1297            Ok(m) => m,
1298            Err(e) => return run_return_failure(e, Vec::new(), 0, TokenUsage::default()),
1299        };
1300        let mut state = TurnState::new(prepared);
1301
1302        for iteration in 1..=self.max_iterations {
1303            // Pre-turn: cancellation check.
1304            if cancel.is_cancelled() {
1305                return run_return_failure(
1306                    AgentError::Cancelled,
1307                    state.messages,
1308                    iteration,
1309                    state.total_usage,
1310                );
1311            }
1312            on_event(AgentEvent::Core(CoreEvent::IterationStarted { iteration }));
1313
1314            // before_iteration extension hook (after IterationStarted, before request build).
1315            if let Err(e) = self
1316                .dispatch_before_iteration(iteration, &mut state.messages, on_event)
1317                .await
1318            {
1319                return run_return_failure(e, state.messages, iteration, state.total_usage);
1320            }
1321
1322            // Autocompact check before LLM call (after IterationStarted).
1323            if let Err(e) = self
1324                .maybe_compact_via_extensions(&mut state.messages, on_event)
1325                .await
1326            {
1327                return run_return_failure(e, state.messages, iteration, state.total_usage);
1328            }
1329
1330            loop {
1331                // LLM step with cancellation race.
1332                let output = tokio::select! {
1333                    output = llm.chat(&state.messages, tools.tool_defs()) => match output {
1334                        Ok(o) => o,
1335                        Err(e) => return run_return_failure(e, state.messages, iteration, state.total_usage),
1336                    },
1337                    _ = cancel.cancelled() => return run_return_failure(AgentError::Cancelled, state.messages, iteration, state.total_usage),
1338                };
1339
1340                state.accumulate_usage(output.usage);
1341                let stop_reason = output.stop_reason;
1342                let output_tokens = output.usage.map(|u| u.output_tokens);
1343
1344                match output.response {
1345                    crate::LlmResponse::Message(text) => {
1346                        on_event(AgentEvent::Core(CoreEvent::TextChunk(text.clone())));
1347
1348                        let meta = crate::llm::LlmResponseMeta {
1349                            stop_reason,
1350                            output_tokens,
1351                        };
1352                        let outcome = match self
1353                            .handle_text_response(text, meta, &mut state, on_event)
1354                            .await
1355                        {
1356                            Ok(o) => o,
1357                            Err(e) => {
1358                                return run_return_failure(
1359                                    e,
1360                                    state.messages,
1361                                    iteration,
1362                                    state.total_usage,
1363                                )
1364                            }
1365                        };
1366
1367                        match outcome {
1368                            TextResponseOutcome::Continue => continue,
1369                            TextResponseOutcome::Halt(msg) => {
1370                                return run_return_failure(
1371                                    AgentError::Internal(format!(
1372                                        "Turn halted by extension: {}",
1373                                        msg
1374                                    )),
1375                                    state.messages,
1376                                    iteration,
1377                                    state.total_usage,
1378                                );
1379                            }
1380                            TextResponseOutcome::Finalize(text) => {
1381                                state.messages.push(Message::assistant(&text));
1382                                let res = state.into_result(text, iteration);
1383                                let snapshot = res.messages.clone();
1384                                return run_return_success(res, snapshot);
1385                            }
1386                        }
1387                    }
1388                    crate::LlmResponse::ToolCalls(items) => {
1389                        emit_tool_started(&items, on_event);
1390                        let slots = self.dispatch_intercept_tool_calls(&items, on_event).await;
1391                        // Resolve any Deferred slots. No ops channel in this path.
1392                        let mut tmp_ops_state = OpsState::default();
1393                        let slots = match self
1394                            .resolve_deferred_slots(slots, &mut tmp_ops_state, &mut None, on_event)
1395                            .await
1396                        {
1397                            Ok(s) => s,
1398                            Err(e) => {
1399                                return run_return_failure(
1400                                    e,
1401                                    state.messages,
1402                                    iteration,
1403                                    state.total_usage,
1404                                )
1405                            }
1406                        };
1407                        let mut remaining_items: Vec<crate::llm::ToolCallItem> = Vec::new();
1408                        let mut remaining_indices: Vec<usize> = Vec::new();
1409                        let mut final_results: Vec<Option<motosan_agent_tool::ToolResult>> = slots
1410                            .into_iter()
1411                            .map(|s| match s {
1412                                InterceptedSlot::Resolved(r) => Some(r),
1413                                InterceptedSlot::Pending => None,
1414                                InterceptedSlot::Deferred(_) => {
1415                                    unreachable!(
1416                                        "InterceptedSlot::Deferred should have been \
1417                                         resolved by resolve_deferred_slots"
1418                                    );
1419                                }
1420                            })
1421                            .collect();
1422                        for (i, item) in items.iter().enumerate() {
1423                            if final_results[i].is_none() {
1424                                remaining_items.push(item.clone());
1425                                remaining_indices.push(i);
1426                            }
1427                        }
1428                        let execution_results = Self::execute_tools_parallel(
1429                            tools.tool_map(),
1430                            &remaining_items,
1431                            self.tool_timeout,
1432                            &self.tool_context,
1433                        )
1434                        .await;
1435                        for (exec_idx, original_idx) in remaining_indices.iter().enumerate() {
1436                            final_results[*original_idx] =
1437                                Some(execution_results[exec_idx].clone());
1438                        }
1439                        let results: Vec<motosan_agent_tool::ToolResult> = final_results
1440                            .into_iter()
1441                            .map(|opt| opt.expect("all slots filled after intercept + execute"))
1442                            .collect();
1443                        if let Err(e) = self
1444                            .finalize_tool_call_batch(
1445                                &items,
1446                                results,
1447                                iteration,
1448                                tools.tool_defs(),
1449                                &mut state,
1450                                on_event,
1451                            )
1452                            .await
1453                        {
1454                            return run_return_failure(
1455                                e,
1456                                state.messages,
1457                                iteration,
1458                                state.total_usage,
1459                            );
1460                        }
1461                        break;
1462                    }
1463                }
1464            }
1465        }
1466
1467        run_return_failure(
1468            AgentError::MaxIterations(self.max_iterations),
1469            state.messages,
1470            self.max_iterations,
1471            state.total_usage,
1472        )
1473    }
1474
1475    /// Internal turn driver.
1476    ///
1477    /// Returns a tuple of `(Result<AgentResult>, Vec<Message>)` so callers
1478    /// (notably `run`) can recover the final conversation history even
1479    /// when the loop terminated with an error. On success, the returned
1480    /// `Vec<Message>` is a clone of `AgentResult.messages` for ergonomic
1481    /// access; on failure it contains the partial history accumulated so far.
1482    async fn run_inner_with_ops(
1483        &self,
1484        llm: &dyn LlmClient,
1485        messages: Vec<Message>,
1486        extra_tools: &[Arc<dyn Tool>],
1487        on_event: &(impl Fn(AgentEvent) + Send + Sync),
1488        mut ops_rx: Option<mpsc::Receiver<AgentOp>>,
1489    ) -> (Result<AgentResult>, Vec<Message>, RunTerminalMeta) {
1490        let tools = MergedTools::new(&self.tool_map, &self.tool_defs, extra_tools);
1491        let prepared = match self.prepare_messages(messages).await {
1492            Ok(m) => m,
1493            Err(e) => return run_return_failure(e, Vec::new(), 0, TokenUsage::default()),
1494        };
1495        let mut state = TurnState::new(prepared);
1496        let mut ops_state = OpsState::default();
1497
1498        for iteration in 1..=self.max_iterations {
1499            // Pre-turn: ingest pending ops.
1500            if let Err(e) = self
1501                .drain_ops(&mut state.messages, &mut ops_rx, &mut ops_state, on_event)
1502                .await
1503            {
1504                return run_return_failure(e, state.messages, iteration, state.total_usage);
1505            }
1506            if ops_state.interrupted {
1507                on_event(AgentEvent::Core(CoreEvent::Interrupted));
1508                let res =
1509                    state.into_result("(interrupted)".to_string(), iteration.saturating_sub(1));
1510                let snapshot = res.messages.clone();
1511                return run_return_interrupted(res, snapshot);
1512            }
1513
1514            on_event(AgentEvent::Core(CoreEvent::IterationStarted { iteration }));
1515
1516            // before_iteration extension hook (after IterationStarted, before request build).
1517            if let Err(e) = self
1518                .dispatch_before_iteration(iteration, &mut state.messages, on_event)
1519                .await
1520            {
1521                return run_return_failure(e, state.messages, iteration, state.total_usage);
1522            }
1523
1524            // Autocompact check before LLM call (after IterationStarted).
1525            if let Err(e) = self
1526                .maybe_compact_via_extensions(&mut state.messages, on_event)
1527                .await
1528            {
1529                return run_return_failure(e, state.messages, iteration, state.total_usage);
1530            }
1531
1532            loop {
1533                // LLM step.
1534                let output = match llm.chat(&state.messages, tools.tool_defs()).await {
1535                    Ok(o) => o,
1536                    Err(e) => {
1537                        return run_return_failure(e, state.messages, iteration, state.total_usage);
1538                    }
1539                };
1540
1541                state.accumulate_usage(output.usage);
1542                let stop_reason = output.stop_reason;
1543                let output_tokens = output.usage.map(|u| u.output_tokens);
1544
1545                match output.response {
1546                    crate::LlmResponse::Message(text) => {
1547                        on_event(AgentEvent::Core(CoreEvent::TextChunk(text.clone())));
1548
1549                        let meta = crate::llm::LlmResponseMeta {
1550                            stop_reason,
1551                            output_tokens,
1552                        };
1553                        let outcome = match self
1554                            .handle_text_response(text, meta, &mut state, on_event)
1555                            .await
1556                        {
1557                            Ok(o) => o,
1558                            Err(e) => {
1559                                return run_return_failure(
1560                                    e,
1561                                    state.messages,
1562                                    iteration,
1563                                    state.total_usage,
1564                                )
1565                            }
1566                        };
1567
1568                        match outcome {
1569                            TextResponseOutcome::Continue => continue,
1570                            TextResponseOutcome::Halt(msg) => {
1571                                return run_return_failure(
1572                                    AgentError::Internal(format!(
1573                                        "Turn halted by extension: {}",
1574                                        msg
1575                                    )),
1576                                    state.messages,
1577                                    iteration,
1578                                    state.total_usage,
1579                                );
1580                            }
1581                            TextResponseOutcome::Finalize(text) => {
1582                                state.messages.push(Message::assistant(&text));
1583                                let res = state.into_result(text, iteration);
1584                                let snapshot = res.messages.clone();
1585                                return run_return_success(res, snapshot);
1586                            }
1587                        }
1588                    }
1589                    crate::LlmResponse::ToolCalls(items) => {
1590                        // Tool-call execution stage (with ask_user policy).
1591                        emit_tool_started(&items, on_event);
1592                        let results = self
1593                            .execute_tools_with_policy(
1594                                tools.tool_map(),
1595                                &items,
1596                                &mut state.messages,
1597                                &mut ops_rx,
1598                                &mut ops_state,
1599                                on_event,
1600                            )
1601                            .await;
1602                        if let Err(e) = self
1603                            .finalize_tool_call_batch(
1604                                &items,
1605                                results,
1606                                iteration,
1607                                tools.tool_defs(),
1608                                &mut state,
1609                                on_event,
1610                            )
1611                            .await
1612                        {
1613                            return run_return_failure(
1614                                e,
1615                                state.messages,
1616                                iteration,
1617                                state.total_usage,
1618                            );
1619                        }
1620                        break; // Back to outer iteration loop.
1621                    }
1622                }
1623            }
1624        }
1625
1626        run_return_failure(
1627            AgentError::MaxIterations(self.max_iterations),
1628            state.messages,
1629            self.max_iterations,
1630            state.total_usage,
1631        )
1632    }
1633
1634    /// Inner function covering (batch LLM + ops_rx + cancel). Combines
1635    /// the ops drain-and-dispatch loop from `run_inner_with_ops` with
1636    /// the pre-iteration cancel check from `run_inner_cancel`. Returns
1637    /// the same `(Result<AgentResult>, Vec<Message>)` tuple shape as
1638    /// `run_inner_with_ops` for dispatch uniformity.
1639    ///
1640    /// # Cancel semantics
1641    ///
1642    /// The cancellation token is checked at two points per iteration:
1643    ///
1644    /// 1. **Before `drain_ops` at iteration top.** A pending cancel wins
1645    ///    over any queued ops — the function returns
1646    ///    `Err(AgentError::Cancelled)` with no `Interrupted` event and
1647    ///    any unread ops in the channel are dropped on the floor. This
1648    ///    matches the intuition that cancel is stronger than interrupt.
1649    ///
1650    /// 2. **Inside the `tokio::select!` around `llm.chat()`.** If the
1651    ///    token is tripped during the LLM call, the in-flight chat future
1652    ///    is dropped and the function returns `Err(AgentError::Cancelled)`.
1653    ///
1654    /// # Known latency gap: tool-call deferred-slot resolution
1655    ///
1656    /// During tool-call execution, this function calls
1657    /// `execute_tools_with_policy`, which in turn calls
1658    /// `resolve_deferred_slots` to wait for matching ops (e.g. the
1659    /// `AskUserAnswer` for an `ask_user` tool call). The cancel token is
1660    /// **not** raced inside `resolve_deferred_slots`, so a cancel that
1661    /// arrives while an `ask_user` slot is still awaiting its answer will
1662    /// not be observed until the slot resolves (or the 60 s defer
1663    /// timeout fires). This is a known limitation inherited from the
1664    /// current `execute_tools_with_policy` signature, which does not
1665    /// accept a cancel token. Fixing it requires threading cancel through
1666    /// to `resolve_deferred_slots`, tracked as a follow-up task.
1667    #[cfg(feature = "cancellation")]
1668    #[allow(dead_code)] // Called by RunBuilder dispatch in a later task
1669    async fn run_inner_with_ops_and_cancel(
1670        &self,
1671        llm: &dyn LlmClient,
1672        messages: Vec<Message>,
1673        extra_tools: &[Arc<dyn Tool>],
1674        on_event: &(impl Fn(AgentEvent) + Send + Sync),
1675        ops_rx: mpsc::Receiver<AgentOp>,
1676        cancel: &tokio_util::sync::CancellationToken,
1677    ) -> (Result<AgentResult>, Vec<Message>, RunTerminalMeta) {
1678        let tools = MergedTools::new(&self.tool_map, &self.tool_defs, extra_tools);
1679        let prepared = match self.prepare_messages(messages).await {
1680            Ok(m) => m,
1681            Err(e) => return run_return_failure(e, Vec::new(), 0, TokenUsage::default()),
1682        };
1683        let mut state = TurnState::new(prepared);
1684        let mut ops_state = OpsState::default();
1685        let mut ops_rx_opt = Some(ops_rx);
1686
1687        for iteration in 1..=self.max_iterations {
1688            // Cancel check (from run_inner_cancel).
1689            if cancel.is_cancelled() {
1690                return run_return_failure(
1691                    AgentError::Cancelled,
1692                    state.messages,
1693                    iteration,
1694                    state.total_usage,
1695                );
1696            }
1697
1698            // Drain pending ops (from run_inner_with_ops).
1699            if let Err(e) = self
1700                .drain_ops(
1701                    &mut state.messages,
1702                    &mut ops_rx_opt,
1703                    &mut ops_state,
1704                    on_event,
1705                )
1706                .await
1707            {
1708                return run_return_failure(e, state.messages, iteration, state.total_usage);
1709            }
1710            if ops_state.interrupted {
1711                on_event(AgentEvent::Core(CoreEvent::Interrupted));
1712                let res =
1713                    state.into_result("(interrupted)".to_string(), iteration.saturating_sub(1));
1714                let snapshot = res.messages.clone();
1715                return run_return_interrupted(res, snapshot);
1716            }
1717
1718            on_event(AgentEvent::Core(CoreEvent::IterationStarted { iteration }));
1719
1720            if let Err(e) = self
1721                .dispatch_before_iteration(iteration, &mut state.messages, on_event)
1722                .await
1723            {
1724                return run_return_failure(e, state.messages, iteration, state.total_usage);
1725            }
1726
1727            if let Err(e) = self
1728                .maybe_compact_via_extensions(&mut state.messages, on_event)
1729                .await
1730            {
1731                return run_return_failure(e, state.messages, iteration, state.total_usage);
1732            }
1733
1734            loop {
1735                // LLM step raced against cancel token.
1736                let output = tokio::select! {
1737                    output = llm.chat(&state.messages, tools.tool_defs()) => match output {
1738                        Ok(o) => o,
1739                        Err(e) => return run_return_failure(e, state.messages, iteration, state.total_usage),
1740                    },
1741                    _ = cancel.cancelled() => {
1742                        return run_return_failure(AgentError::Cancelled, state.messages, iteration, state.total_usage);
1743                    }
1744                };
1745
1746                state.accumulate_usage(output.usage);
1747                let stop_reason = output.stop_reason;
1748                let output_tokens = output.usage.map(|u| u.output_tokens);
1749
1750                match output.response {
1751                    crate::LlmResponse::Message(text) => {
1752                        on_event(AgentEvent::Core(CoreEvent::TextChunk(text.clone())));
1753
1754                        let meta = crate::llm::LlmResponseMeta {
1755                            stop_reason,
1756                            output_tokens,
1757                        };
1758                        let outcome = match self
1759                            .handle_text_response(text, meta, &mut state, on_event)
1760                            .await
1761                        {
1762                            Ok(o) => o,
1763                            Err(e) => {
1764                                return run_return_failure(
1765                                    e,
1766                                    state.messages,
1767                                    iteration,
1768                                    state.total_usage,
1769                                )
1770                            }
1771                        };
1772
1773                        match outcome {
1774                            TextResponseOutcome::Continue => continue,
1775                            TextResponseOutcome::Halt(msg) => {
1776                                return run_return_failure(
1777                                    AgentError::Internal(format!(
1778                                        "Turn halted by extension: {}",
1779                                        msg
1780                                    )),
1781                                    state.messages,
1782                                    iteration,
1783                                    state.total_usage,
1784                                );
1785                            }
1786                            TextResponseOutcome::Finalize(text) => {
1787                                state.messages.push(Message::assistant(&text));
1788                                let res = state.into_result(text, iteration);
1789                                let snapshot = res.messages.clone();
1790                                return run_return_success(res, snapshot);
1791                            }
1792                        }
1793                    }
1794                    crate::LlmResponse::ToolCalls(items) => {
1795                        emit_tool_started(&items, on_event);
1796                        let results = self
1797                            .execute_tools_with_policy(
1798                                tools.tool_map(),
1799                                &items,
1800                                &mut state.messages,
1801                                &mut ops_rx_opt,
1802                                &mut ops_state,
1803                                on_event,
1804                            )
1805                            .await;
1806                        if let Err(e) = self
1807                            .finalize_tool_call_batch(
1808                                &items,
1809                                results,
1810                                iteration,
1811                                tools.tool_defs(),
1812                                &mut state,
1813                                on_event,
1814                            )
1815                            .await
1816                        {
1817                            return run_return_failure(
1818                                e,
1819                                state.messages,
1820                                iteration,
1821                                state.total_usage,
1822                            );
1823                        }
1824                        break;
1825                    }
1826                }
1827            }
1828        }
1829
1830        run_return_failure(
1831            AgentError::MaxIterations(self.max_iterations),
1832            state.messages,
1833            self.max_iterations,
1834            state.total_usage,
1835        )
1836    }
1837
1838    /// Estimate the token count of a message list using a simple heuristic
1839    /// (4 characters per token). Used in tests.
1840    #[allow(dead_code)]
1841    fn estimate_tokens(messages: &[Message]) -> usize {
1842        messages.iter().map(|m| m.approx_visible_chars() / 4).sum()
1843    }
1844
1845    // TODO(phase-4): delete maybe_compact_via_extensions entirely.
1846    // After the public API cutover, consumers observe CoreEvent +
1847    // ExtensionEvent directly and the AgentEvent::AutoCompacted
1848    // forwarding adapter below (plus the variant itself in the
1849    // AgentEvent enum) becomes unnecessary.
1850
1851    /// Run the extension pipeline's `transform_request` hook and forward any
1852    /// `AutocompactEvent` as the legacy `AgentEvent::AutoCompacted` variant.
1853    ///
1854    /// Short-circuits immediately when no extensions are registered so the
1855    /// hot path pays no overhead.
1856    async fn maybe_compact_via_extensions(
1857        &self,
1858        messages: &mut Vec<Message>,
1859        on_event: &(impl Fn(AgentEvent) + Send + Sync),
1860    ) -> Result<()> {
1861        // Build an AgentState snapshot for the hook.
1862        let tools_snapshot: Vec<motosan_agent_tool::ToolDef> = self.tool_defs.to_vec();
1863        let agent_state = crate::core::AgentState {
1864            iteration: 0,
1865            messages: messages.as_slice(),
1866            tools: &tools_snapshot,
1867            turn_started_at: std::time::Instant::now(),
1868            token_usage: TokenUsage::default(),
1869        };
1870
1871        // CaptureSink collects any events the extension emits.
1872        let mut sink = crate::core::CaptureSink::default();
1873
1874        // Lock the ExtensionSet and call the extension pipeline.
1875        // Short-circuit on empty set (common case when no extensions are registered).
1876        // ErrorPolicy::Fallback is handled inside ExtensionSet::transform_request —
1877        // a failing extension is logged and skipped; messages are left unchanged.
1878        // Only ErrorPolicy::Abort extensions surface an error here.
1879        let transformed = {
1880            let mut set = self.extensions.lock().await;
1881            if set.is_empty() {
1882                return Ok(());
1883            }
1884            set.transform_request(messages.clone(), &agent_state, &mut sink)
1885                .await
1886                .map_err(|e| crate::error::AgentError::Internal(e.message().to_string()))?
1887        };
1888
1889        *messages = transformed;
1890
1891        // Forward any extension events captured during transform_request.
1892        for (_name, evt) in sink.captured() {
1893            forward_extension_event(evt.as_ref(), on_event);
1894        }
1895
1896        Ok(())
1897    }
1898
1899    /// Dispatch `Extension::before_iteration` at the top of each iteration.
1900    ///
1901    /// Mirrors `maybe_compact_via_extensions` in shape: builds an
1902    /// `AgentState` snapshot, locks the `ExtensionSet`, and propagates
1903    /// per-extension `ErrorPolicy` through `ExtensionSet::before_iteration`.
1904    /// `ErrorPolicy::Fallback` is handled inside the set; only `Abort`
1905    /// surfaces an error here, which is wrapped as `AgentError::Internal`.
1906    ///
1907    /// Returned `FlowDecision`:
1908    /// - `Continue`: caller proceeds to the LLM step normally.
1909    /// - `Inject(msg)`: the message is appended to history and the caller
1910    ///   proceeds to the LLM step (same iteration) — semantically the
1911    ///   extension nudged the conversation before the model sees it.
1912    /// - `Halt(reason)`: the turn is terminated with
1913    ///   `AgentError::Internal(reason.message)`, mirroring how
1914    ///   `handle_text_response` treats `Halt`.
1915    ///
1916    /// Short-circuits immediately when no extensions are registered so the
1917    /// hot path pays no overhead.
1918    async fn dispatch_before_iteration(
1919        &self,
1920        iteration: usize,
1921        messages: &mut Vec<Message>,
1922        on_event: &(impl Fn(AgentEvent) + Send + Sync),
1923    ) -> Result<()> {
1924        if self.extensions.lock().await.is_empty() {
1925            return Ok(());
1926        }
1927
1928        let tools_snapshot: Vec<motosan_agent_tool::ToolDef> = self.tool_defs.to_vec();
1929        let agent_state = crate::core::AgentState {
1930            iteration,
1931            messages: messages.as_slice(),
1932            tools: &tools_snapshot,
1933            turn_started_at: std::time::Instant::now(),
1934            token_usage: TokenUsage::default(),
1935        };
1936
1937        let mut sink = crate::core::CaptureSink::default();
1938        let decision = self
1939            .extensions
1940            .lock()
1941            .await
1942            .before_iteration(&agent_state, &mut sink)
1943            .await
1944            .map_err(|e| crate::error::AgentError::Internal(e.message().to_string()))?;
1945
1946        // Forward any captured extension events.
1947        for (_name, evt) in sink.captured() {
1948            forward_extension_event(evt.as_ref(), on_event);
1949        }
1950
1951        match decision {
1952            FlowDecision::Continue => Ok(()),
1953            FlowDecision::Inject(msg) => {
1954                messages.push(msg);
1955                Ok(())
1956            }
1957            FlowDecision::Halt(reason) => Err(crate::error::AgentError::Internal(reason.message)),
1958        }
1959    }
1960
1961    // TODO(phase-4): delete dispatch_after_llm_response once the public API
1962    // cutover deletes AgentEvent::TokenBudgetContinuation.
1963
1964    /// Dispatch `after_llm_response` via the extension system.
1965    ///
1966    /// Returns the `FlowDecision` from the first extension that returned
1967    /// something other than `Continue`, or `Continue` if all extensions
1968    /// passed. The caller is responsible for interpreting the decision:
1969    /// - `Continue`: proceed to tool dispatch as normal
1970    /// - `Inject(msg)`: append `msg` to history and loop back to another
1971    ///   LLM call (do NOT proceed to tool dispatch)
1972    /// - `Halt(reason)`: terminate the turn with the halt reason
1973    ///
1974    /// Also forwards `TokenBudgetEvent::Continuation` to the legacy
1975    /// `AgentEvent::TokenBudgetContinuation` event stream for backward
1976    /// compatibility. Phase 4 will delete the forwarding.
1977    async fn dispatch_after_llm_response(
1978        &self,
1979        resp: &crate::llm::LlmResponse,
1980        meta: &crate::llm::LlmResponseMeta,
1981        on_event: &(impl Fn(AgentEvent) + Send + Sync),
1982    ) -> Result<FlowDecision> {
1983        if self.extensions.lock().await.is_empty() {
1984            return Ok(FlowDecision::Continue);
1985        }
1986
1987        let tools_snapshot = self.tool_defs.to_vec();
1988        // TODO(phase-4): pass the real messages through instead of &[].
1989        // TokenBudgetExtension doesn't read state.messages so this is OK now.
1990        let empty_messages: Vec<crate::message::Message> = vec![];
1991        let agent_state = crate::core::AgentState {
1992            iteration: 0,
1993            messages: &empty_messages,
1994            tools: &tools_snapshot,
1995            turn_started_at: std::time::Instant::now(),
1996            token_usage: crate::llm::TokenUsage::default(),
1997        };
1998
1999        let mut sink = crate::core::CaptureSink::default();
2000        let decision = self
2001            .extensions
2002            .lock()
2003            .await
2004            .after_llm_response(resp, meta, &agent_state, &mut sink)
2005            .await
2006            .map_err(|e| crate::error::AgentError::Internal(e.message().to_string()))?;
2007
2008        // Forward captured extension events.
2009        for (_name, evt) in sink.captured() {
2010            forward_extension_event(evt.as_ref(), on_event);
2011        }
2012
2013        Ok(decision)
2014    }
2015
2016    fn agent_state_snapshot<'a>(
2017        &self,
2018        iteration: usize,
2019        messages: &'a [crate::message::Message],
2020        tools: &'a [motosan_agent_tool::ToolDef],
2021        token_usage: crate::llm::TokenUsage,
2022    ) -> crate::core::AgentState<'a> {
2023        crate::core::AgentState {
2024            iteration,
2025            messages,
2026            tools,
2027            turn_started_at: std::time::Instant::now(),
2028            token_usage,
2029        }
2030    }
2031
2032    /// Dispatch `Extension::after_tool_result` for a single tool result.
2033    /// Runs the full pipeline; returns the first non-`Continue` decision
2034    /// encountered, or `Continue` if all extensions returned `Continue`.
2035    ///
2036    /// Captured extension events are forwarded via `on_event`.
2037    async fn dispatch_after_tool_result(
2038        &self,
2039        result: &motosan_agent_tool::ToolResult,
2040        agent_state: &crate::core::AgentState<'_>,
2041        on_event: &(impl Fn(AgentEvent) + Send + Sync),
2042    ) -> Result<FlowDecision> {
2043        if self.extensions.lock().await.is_empty() {
2044            return Ok(FlowDecision::Continue);
2045        }
2046
2047        let mut sink = crate::core::CaptureSink::default();
2048        let decision = self
2049            .extensions
2050            .lock()
2051            .await
2052            .after_tool_result(result, agent_state, &mut sink)
2053            .await
2054            .map_err(|e| crate::error::AgentError::Internal(e.message().to_string()))?;
2055
2056        for (_name, evt) in sink.captured() {
2057            forward_extension_event(evt.as_ref(), on_event);
2058        }
2059
2060        Ok(decision)
2061    }
2062
2063    /// Dispatch `Extension::rewrite_tool_result` pipeline for a single
2064    /// tool call's result. Returns the final (possibly rewritten) result.
2065    /// Captured extension events are forwarded via `on_event`.
2066    async fn dispatch_rewrite_tool_result(
2067        &self,
2068        call: &crate::llm::ToolCallItem,
2069        initial: motosan_agent_tool::ToolResult,
2070        agent_state: &crate::core::AgentState<'_>,
2071        on_event: &(impl Fn(AgentEvent) + Send + Sync),
2072    ) -> Result<motosan_agent_tool::ToolResult> {
2073        if self.extensions.lock().await.is_empty() {
2074            return Ok(initial);
2075        }
2076
2077        let mut sink = crate::core::CaptureSink::default();
2078        let final_result = self
2079            .extensions
2080            .lock()
2081            .await
2082            .rewrite_tool_result(call, initial, agent_state, &mut sink)
2083            .await
2084            .map_err(|e| crate::error::AgentError::Internal(e.message().to_string()))?;
2085
2086        for (_name, evt) in sink.captured() {
2087            forward_extension_event(evt.as_ref(), on_event);
2088        }
2089
2090        Ok(final_result)
2091    }
2092
2093    /// Dispatch `Extension::on_terminal` across all extensions.
2094    async fn dispatch_on_terminal(
2095        &self,
2096        result: &crate::core::extension::TurnResult,
2097        iteration: usize,
2098        messages: &[crate::message::Message],
2099        tools: &[motosan_agent_tool::ToolDef],
2100        token_usage: crate::llm::TokenUsage,
2101        on_event: &(impl Fn(AgentEvent) + Send + Sync),
2102    ) -> Result<()> {
2103        if self.extensions.lock().await.is_empty() {
2104            return Ok(());
2105        }
2106
2107        let agent_state = self.agent_state_snapshot(iteration, messages, tools, token_usage);
2108        let mut sink = crate::core::CaptureSink::default();
2109        self.extensions
2110            .lock()
2111            .await
2112            .on_terminal(result, &agent_state, &mut sink)
2113            .await
2114            .map_err(|e| crate::error::AgentError::Internal(e.message().to_string()))?;
2115
2116        for (_name, evt) in sink.captured() {
2117            forward_extension_event(evt.as_ref(), on_event);
2118        }
2119
2120        Ok(())
2121    }
2122
2123    async fn dispatch_on_terminal_from_meta(
2124        &self,
2125        meta: &RunTerminalMeta,
2126        messages: &[crate::message::Message],
2127        tools: &[motosan_agent_tool::ToolDef],
2128        on_event: &(impl Fn(AgentEvent) + Send + Sync),
2129    ) -> Result<()> {
2130        self.dispatch_on_terminal(
2131            &meta.turn_result,
2132            meta.iteration,
2133            messages,
2134            tools,
2135            meta.token_usage,
2136            on_event,
2137        )
2138        .await
2139    }
2140
2141    /// Handle a text response from the LLM inside a run method's inner loop.
2142    ///
2143    /// Dispatches `after_llm_response` to the extension system, interprets
2144    /// the returned `FlowDecision`, and returns a `TextResponseOutcome` that
2145    /// tells the caller what to do next:
2146    ///
2147    /// - `Finalize`: the caller should push `Message::assistant(text)` and
2148    ///   build the `AgentResult`
2149    /// - `Continue`: the extension injected a continuation; the caller's
2150    ///   inner LLM loop should run another iteration
2151    /// - `Halt`: the extension halted the turn; the caller should return
2152    ///   `Err(AgentError::Internal(...))`
2153    ///
2154    /// The `TextChunk`/`TextDone` events are NOT emitted here — call sites
2155    /// emit them before calling this helper (streaming variants emit chunks
2156    /// during the stream itself; non-streaming variants emit `TextChunk`
2157    /// synchronously before dispatching).
2158    async fn handle_text_response(
2159        &self,
2160        text: String,
2161        meta: crate::llm::LlmResponseMeta,
2162        state: &mut TurnState,
2163        on_event: &(impl Fn(AgentEvent) + Send + Sync),
2164    ) -> Result<TextResponseOutcome> {
2165        let resp_for_ext = crate::LlmResponse::Message(text.clone());
2166        let decision = self
2167            .dispatch_after_llm_response(&resp_for_ext, &meta, on_event)
2168            .await?;
2169
2170        match decision {
2171            FlowDecision::Continue => Ok(TextResponseOutcome::Finalize(text)),
2172            FlowDecision::Inject(msg) => {
2173                state.continuation_text.push_str(&text);
2174                state.messages.push(Message::assistant(&text));
2175                state.messages.push(msg);
2176                Ok(TextResponseOutcome::Continue)
2177            }
2178            FlowDecision::Halt(reason) => Ok(TextResponseOutcome::Halt(reason.message)),
2179        }
2180    }
2181
2182    /// Shared body of the streaming iteration
2183    /// loop. `ops_rx` is threaded through so deferred tool slots and
2184    /// the pre-iteration drain stage both have access to the real
2185    /// channel when present.
2186    async fn run_streaming_inner(
2187        &self,
2188        llm: &dyn LlmClient,
2189        messages: Vec<Message>,
2190        extra_tools: &[Arc<dyn Tool>],
2191        mut ops_rx: Option<mpsc::Receiver<AgentOp>>,
2192        on_event: impl Fn(AgentEvent) + Send + Sync,
2193    ) -> (Result<AgentResult>, Vec<Message>, RunTerminalMeta) {
2194        let tools = MergedTools::new(&self.tool_map, &self.tool_defs, extra_tools);
2195        let prepared = match self.prepare_messages(messages).await {
2196            Ok(m) => m,
2197            Err(e) => return run_return_failure(e, Vec::new(), 0, TokenUsage::default()),
2198        };
2199        let mut state = TurnState::new(prepared);
2200        let mut ops_state = OpsState::default();
2201
2202        for iteration in 1..=self.max_iterations {
2203            // Pre-iteration: ingest any pending ops routed through the
2204            // extension pipeline (dispatch_on_op). This matches the
2205            // ordering in `run_inner_with_ops` so callback and
2206            // stream-item paths stay dispatch-equivalent.
2207            if let Err(e) = self
2208                .drain_ops(&mut state.messages, &mut ops_rx, &mut ops_state, &on_event)
2209                .await
2210            {
2211                return run_return_failure(e, state.messages, iteration, state.total_usage);
2212            }
2213            if ops_state.interrupted {
2214                on_event(AgentEvent::Core(CoreEvent::Interrupted));
2215                let res =
2216                    state.into_result("(interrupted)".to_string(), iteration.saturating_sub(1));
2217                let snapshot = res.messages.clone();
2218                return run_return_interrupted(res, snapshot);
2219            }
2220
2221            on_event(AgentEvent::Core(CoreEvent::IterationStarted { iteration }));
2222
2223            // before_iteration extension hook (after IterationStarted, before request build).
2224            if let Err(e) = self
2225                .dispatch_before_iteration(iteration, &mut state.messages, &on_event)
2226                .await
2227            {
2228                return run_return_failure(e, state.messages, iteration, state.total_usage);
2229            }
2230
2231            // Autocompact check before LLM call (after IterationStarted).
2232            if let Err(e) = self
2233                .maybe_compact_via_extensions(&mut state.messages, &on_event)
2234                .await
2235            {
2236                return run_return_failure(e, state.messages, iteration, state.total_usage);
2237            }
2238
2239            loop {
2240                // Streaming LLM step with eager tool execution.
2241                let stream_result = match Self::consume_stream(
2242                    llm,
2243                    &state.messages,
2244                    tools.tool_defs(),
2245                    tools.tool_map(),
2246                    self.tool_timeout,
2247                    &self.tool_context,
2248                    &mut state.total_usage,
2249                    &on_event,
2250                )
2251                .await
2252                {
2253                    Ok(r) => r,
2254                    Err(e) => {
2255                        return run_return_failure(e, state.messages, iteration, state.total_usage)
2256                    }
2257                };
2258
2259                match stream_result.response {
2260                    crate::LlmResponse::Message(text) => {
2261                        if stream_result.accumulated_text.is_empty() && !text.is_empty() {
2262                            on_event(AgentEvent::Core(CoreEvent::TextChunk(text.clone())));
2263                        }
2264                        on_event(AgentEvent::Core(CoreEvent::TextDone(text.clone())));
2265
2266                        let meta = crate::llm::LlmResponseMeta {
2267                            stop_reason: stream_result.stop_reason,
2268                            output_tokens: stream_result.output_tokens,
2269                        };
2270                        let outcome = match self
2271                            .handle_text_response(text, meta, &mut state, &on_event)
2272                            .await
2273                        {
2274                            Ok(o) => o,
2275                            Err(e) => {
2276                                return run_return_failure(
2277                                    e,
2278                                    state.messages,
2279                                    iteration,
2280                                    state.total_usage,
2281                                )
2282                            }
2283                        };
2284
2285                        match outcome {
2286                            TextResponseOutcome::Continue => continue,
2287                            TextResponseOutcome::Halt(msg) => {
2288                                return run_return_failure(
2289                                    AgentError::Internal(format!(
2290                                        "Turn halted by extension: {}",
2291                                        msg
2292                                    )),
2293                                    state.messages,
2294                                    iteration,
2295                                    state.total_usage,
2296                                );
2297                            }
2298                            TextResponseOutcome::Finalize(text) => {
2299                                state.messages.push(Message::assistant(&text));
2300                                let res = state.into_result(text, iteration);
2301                                let snapshot = res.messages.clone();
2302                                return run_return_success(res, snapshot);
2303                            }
2304                        }
2305                    }
2306                    crate::LlmResponse::ToolCalls(items) => {
2307                        if let Some((streamed_items, streamed_results)) =
2308                            stream_result.streaming_results
2309                        {
2310                            let (final_items, final_results) = merge_streamed_tool_results(
2311                                &items,
2312                                streamed_items,
2313                                streamed_results,
2314                                tools.tool_map(),
2315                                self.tool_timeout,
2316                                &self.tool_context,
2317                                &on_event,
2318                            )
2319                            .await;
2320                            if let Err(e) = self
2321                                .finalize_tool_call_batch(
2322                                    &final_items,
2323                                    final_results,
2324                                    iteration,
2325                                    tools.tool_defs(),
2326                                    &mut state,
2327                                    &on_event,
2328                                )
2329                                .await
2330                            {
2331                                return run_return_failure(
2332                                    e,
2333                                    state.messages,
2334                                    iteration,
2335                                    state.total_usage,
2336                                );
2337                            }
2338                        } else {
2339                            // No streaming tool execution — fall back to batch execution.
2340                            emit_tool_started(&items, &on_event);
2341                            let slots = self.dispatch_intercept_tool_calls(&items, &on_event).await;
2342                            // Resolve any Deferred slots via the real ops
2343                            // channel when one is available. Previously this
2344                            // path always passed `&mut None`, which meant
2345                            // ask_user / exit_plan_mode slots could not
2346                            // complete on the streaming callback path.
2347                            let slots = match self
2348                                .resolve_deferred_slots(
2349                                    slots,
2350                                    &mut ops_state,
2351                                    &mut ops_rx,
2352                                    &on_event,
2353                                )
2354                                .await
2355                            {
2356                                Ok(s) => s,
2357                                Err(e) => {
2358                                    return run_return_failure(
2359                                        e,
2360                                        state.messages,
2361                                        iteration,
2362                                        state.total_usage,
2363                                    )
2364                                }
2365                            };
2366                            let mut remaining_items: Vec<crate::llm::ToolCallItem> = Vec::new();
2367                            let mut remaining_indices: Vec<usize> = Vec::new();
2368                            let mut final_results: Vec<Option<motosan_agent_tool::ToolResult>> =
2369                                slots
2370                                    .into_iter()
2371                                    .map(|s| match s {
2372                                        InterceptedSlot::Resolved(r) => Some(r),
2373                                        InterceptedSlot::Pending => None,
2374                                        InterceptedSlot::Deferred(_) => {
2375                                            unreachable!(
2376                                                "InterceptedSlot::Deferred should have been \
2377                                                 resolved by resolve_deferred_slots"
2378                                            );
2379                                        }
2380                                    })
2381                                    .collect();
2382                            for (i, item) in items.iter().enumerate() {
2383                                if final_results[i].is_none() {
2384                                    remaining_items.push(item.clone());
2385                                    remaining_indices.push(i);
2386                                }
2387                            }
2388                            let execution_results = Self::execute_tools_parallel(
2389                                tools.tool_map(),
2390                                &remaining_items,
2391                                self.tool_timeout,
2392                                &self.tool_context,
2393                            )
2394                            .await;
2395                            for (exec_idx, original_idx) in remaining_indices.iter().enumerate() {
2396                                final_results[*original_idx] =
2397                                    Some(execution_results[exec_idx].clone());
2398                            }
2399                            let results: Vec<motosan_agent_tool::ToolResult> = final_results
2400                                .into_iter()
2401                                .map(|opt| opt.expect("all slots filled after intercept + execute"))
2402                                .collect();
2403                            if let Err(e) = self
2404                                .finalize_tool_call_batch(
2405                                    &items,
2406                                    results,
2407                                    iteration,
2408                                    tools.tool_defs(),
2409                                    &mut state,
2410                                    &on_event,
2411                                )
2412                                .await
2413                            {
2414                                return run_return_failure(
2415                                    e,
2416                                    state.messages,
2417                                    iteration,
2418                                    state.total_usage,
2419                                );
2420                            }
2421                        }
2422                        break; // Back to outer iteration loop.
2423                    }
2424                }
2425            }
2426        }
2427
2428        run_return_failure(
2429            AgentError::MaxIterations(self.max_iterations),
2430            state.messages,
2431            self.max_iterations,
2432            state.total_usage,
2433        )
2434    }
2435
2436    /// Inner function covering (chunked LLM + ops_rx + cancel). Mirrors
2437    /// `run_streaming_inner`, but races the streaming LLM step against the
2438    /// cancellation token so terminal hooks observe the real in-progress
2439    /// messages / usage when cancellation wins.
2440    ///
2441    /// # Known latency gap
2442    ///
2443    /// Same as `run_inner_with_ops_and_cancel`: cancellation is not
2444    /// observed during `resolve_deferred_slots` inside the tool-call
2445    /// path. A cancel that arrives while waiting for a deferred slot
2446    /// (e.g. `ask_user` answer) will not be observed until the slot
2447    /// resolves or the 60s defer timeout fires.
2448    #[cfg(feature = "cancellation")]
2449    #[allow(dead_code)] // Called by RunBuilder dispatch in a later task
2450    async fn run_streaming_inner_with_cancel_and_ops(
2451        &self,
2452        llm: &dyn LlmClient,
2453        messages: Vec<Message>,
2454        extra_tools: &[Arc<dyn Tool>],
2455        ops_rx: mpsc::Receiver<AgentOp>,
2456        cancel: &tokio_util::sync::CancellationToken,
2457        on_event: impl Fn(AgentEvent) + Send + Sync,
2458    ) -> (Result<AgentResult>, Vec<Message>, RunTerminalMeta) {
2459        use futures::StreamExt;
2460
2461        let tools = MergedTools::new(&self.tool_map, &self.tool_defs, extra_tools);
2462        let prepared = match self.prepare_messages(messages).await {
2463            Ok(m) => m,
2464            Err(e) => return run_return_failure(e, Vec::new(), 0, TokenUsage::default()),
2465        };
2466        let mut state = TurnState::new(prepared);
2467        let mut ops_state = OpsState::default();
2468        let mut ops_rx_opt = Some(ops_rx);
2469
2470        for iteration in 1..=self.max_iterations {
2471            if cancel.is_cancelled() {
2472                return run_return_failure(
2473                    AgentError::Cancelled,
2474                    state.messages,
2475                    iteration,
2476                    state.total_usage,
2477                );
2478            }
2479
2480            if let Err(e) = self
2481                .drain_ops(
2482                    &mut state.messages,
2483                    &mut ops_rx_opt,
2484                    &mut ops_state,
2485                    &on_event,
2486                )
2487                .await
2488            {
2489                return run_return_failure(e, state.messages, iteration, state.total_usage);
2490            }
2491            if ops_state.interrupted {
2492                on_event(AgentEvent::Core(CoreEvent::Interrupted));
2493                let res =
2494                    state.into_result("(interrupted)".to_string(), iteration.saturating_sub(1));
2495                let snapshot = res.messages.clone();
2496                return run_return_interrupted(res, snapshot);
2497            }
2498
2499            on_event(AgentEvent::Core(CoreEvent::IterationStarted { iteration }));
2500
2501            if let Err(e) = self
2502                .dispatch_before_iteration(iteration, &mut state.messages, &on_event)
2503                .await
2504            {
2505                return run_return_failure(e, state.messages, iteration, state.total_usage);
2506            }
2507
2508            if let Err(e) = self
2509                .maybe_compact_via_extensions(&mut state.messages, &on_event)
2510                .await
2511            {
2512                return run_return_failure(e, state.messages, iteration, state.total_usage);
2513            }
2514
2515            loop {
2516                let (
2517                    accumulated_text,
2518                    response,
2519                    streaming_results,
2520                    stream_stop_reason,
2521                    output_tokens,
2522                ) = {
2523                    let mut stream = llm.chat_stream(&state.messages, tools.tool_defs());
2524                    let mut accumulated = String::new();
2525                    let mut final_response: Option<crate::LlmResponse> = None;
2526                    let mut executor = crate::streaming_executor::StreamingToolExecutor::new();
2527                    let mut submitted_ids: std::collections::HashSet<String> =
2528                        std::collections::HashSet::new();
2529                    let mut stop_reason: Option<crate::llm::StopReason> = None;
2530                    let output_tokens_before = state.total_usage.output_tokens;
2531
2532                    loop {
2533                        tokio::select! {
2534                            chunk_opt = stream.next() => {
2535                                match chunk_opt {
2536                                    Some(chunk_result) => {
2537                                        let chunk = match chunk_result {
2538                                            Ok(c) => c,
2539                                            Err(e) => return run_return_failure(e, state.messages.clone(), iteration, state.total_usage),
2540                                        };
2541                                        match chunk {
2542                                            crate::llm::StreamChunk::TextDelta(delta) => {
2543                                                accumulated.push_str(&delta);
2544                                                on_event(AgentEvent::Core(CoreEvent::TextChunk(delta)));
2545                                            }
2546                                            crate::llm::StreamChunk::ToolUse { id, name, args } => {
2547                                                if !submitted_ids.contains(&id) {
2548                                                    let item = crate::llm::ToolCallItem {
2549                                                        id: id.clone(),
2550                                                        name: name.clone(),
2551                                                        args,
2552                                                    };
2553                                                    on_event(AgentEvent::Core(CoreEvent::ToolStarted {
2554                                                        name: name.clone(),
2555                                                    }));
2556                                                    executor.submit(
2557                                                        item,
2558                                                        tools.tool_map(),
2559                                                        self.tool_timeout,
2560                                                        &self.tool_context,
2561                                                    );
2562                                                    submitted_ids.insert(id);
2563                                                }
2564                                            }
2565                                            crate::llm::StreamChunk::Done(resp) => {
2566                                                final_response = Some(resp);
2567                                            }
2568                                            crate::llm::StreamChunk::Usage(usage) => {
2569                                                state.total_usage.accumulate(usage);
2570                                            }
2571                                            crate::llm::StreamChunk::StopReason(reason) => {
2572                                                stop_reason = Some(reason);
2573                                            }
2574                                        }
2575                                    }
2576                                    None => break,
2577                                }
2578                            }
2579                            _ = cancel.cancelled() => {
2580                                return run_return_failure(AgentError::Cancelled, state.messages.clone(), iteration, state.total_usage);
2581                            }
2582                        }
2583                    }
2584
2585                    let resp = final_response
2586                        .unwrap_or_else(|| crate::LlmResponse::Message(accumulated.clone()));
2587                    let sr = if executor.has_pending() {
2588                        Some(executor.collect().await)
2589                    } else {
2590                        None
2591                    };
2592                    let delta = state.total_usage.output_tokens - output_tokens_before;
2593                    let ot = if delta > 0 { Some(delta) } else { None };
2594                    (accumulated, resp, sr, stop_reason, ot)
2595                };
2596
2597                match response {
2598                    crate::LlmResponse::Message(text) => {
2599                        if accumulated_text.is_empty() && !text.is_empty() {
2600                            on_event(AgentEvent::Core(CoreEvent::TextChunk(text.clone())));
2601                        }
2602                        on_event(AgentEvent::Core(CoreEvent::TextDone(text.clone())));
2603
2604                        let meta = crate::llm::LlmResponseMeta {
2605                            stop_reason: stream_stop_reason,
2606                            output_tokens,
2607                        };
2608                        let outcome = match self
2609                            .handle_text_response(text, meta, &mut state, &on_event)
2610                            .await
2611                        {
2612                            Ok(o) => o,
2613                            Err(e) => {
2614                                return run_return_failure(
2615                                    e,
2616                                    state.messages,
2617                                    iteration,
2618                                    state.total_usage,
2619                                )
2620                            }
2621                        };
2622
2623                        match outcome {
2624                            TextResponseOutcome::Continue => continue,
2625                            TextResponseOutcome::Halt(msg) => {
2626                                return run_return_failure(
2627                                    AgentError::Internal(format!(
2628                                        "Turn halted by extension: {}",
2629                                        msg
2630                                    )),
2631                                    state.messages,
2632                                    iteration,
2633                                    state.total_usage,
2634                                );
2635                            }
2636                            TextResponseOutcome::Finalize(text) => {
2637                                state.messages.push(Message::assistant(&text));
2638                                let res = state.into_result(text, iteration);
2639                                let snapshot = res.messages.clone();
2640                                return run_return_success(res, snapshot);
2641                            }
2642                        }
2643                    }
2644                    crate::LlmResponse::ToolCalls(items) => {
2645                        if let Some((streamed_items, streamed_results)) = streaming_results {
2646                            let (final_items, final_results) = merge_streamed_tool_results(
2647                                &items,
2648                                streamed_items,
2649                                streamed_results,
2650                                tools.tool_map(),
2651                                self.tool_timeout,
2652                                &self.tool_context,
2653                                &on_event,
2654                            )
2655                            .await;
2656                            if let Err(e) = self
2657                                .finalize_tool_call_batch(
2658                                    &final_items,
2659                                    final_results,
2660                                    iteration,
2661                                    tools.tool_defs(),
2662                                    &mut state,
2663                                    &on_event,
2664                                )
2665                                .await
2666                            {
2667                                return run_return_failure(
2668                                    e,
2669                                    state.messages,
2670                                    iteration,
2671                                    state.total_usage,
2672                                );
2673                            }
2674                        } else {
2675                            emit_tool_started(&items, &on_event);
2676                            let slots = self.dispatch_intercept_tool_calls(&items, &on_event).await;
2677                            let slots = match self
2678                                .resolve_deferred_slots(
2679                                    slots,
2680                                    &mut ops_state,
2681                                    &mut ops_rx_opt,
2682                                    &on_event,
2683                                )
2684                                .await
2685                            {
2686                                Ok(s) => s,
2687                                Err(e) => {
2688                                    return run_return_failure(
2689                                        e,
2690                                        state.messages,
2691                                        iteration,
2692                                        state.total_usage,
2693                                    )
2694                                }
2695                            };
2696                            let mut remaining_items: Vec<crate::llm::ToolCallItem> = Vec::new();
2697                            let mut remaining_indices: Vec<usize> = Vec::new();
2698                            let mut final_results: Vec<Option<motosan_agent_tool::ToolResult>> =
2699                                slots
2700                                    .into_iter()
2701                                    .map(|s| match s {
2702                                        InterceptedSlot::Resolved(r) => Some(r),
2703                                        InterceptedSlot::Pending => None,
2704                                        InterceptedSlot::Deferred(_) => {
2705                                            unreachable!(
2706                                                "InterceptedSlot::Deferred should have been \
2707                                                 resolved by resolve_deferred_slots"
2708                                            );
2709                                        }
2710                                    })
2711                                    .collect();
2712                            for (i, item) in items.iter().enumerate() {
2713                                if final_results[i].is_none() {
2714                                    remaining_items.push(item.clone());
2715                                    remaining_indices.push(i);
2716                                }
2717                            }
2718                            let execution_results = Self::execute_tools_parallel(
2719                                tools.tool_map(),
2720                                &remaining_items,
2721                                self.tool_timeout,
2722                                &self.tool_context,
2723                            )
2724                            .await;
2725                            for (exec_idx, original_idx) in remaining_indices.iter().enumerate() {
2726                                final_results[*original_idx] =
2727                                    Some(execution_results[exec_idx].clone());
2728                            }
2729                            let results: Vec<motosan_agent_tool::ToolResult> = final_results
2730                                .into_iter()
2731                                .map(|opt| opt.expect("all slots filled after intercept + execute"))
2732                                .collect();
2733                            if let Err(e) = self
2734                                .finalize_tool_call_batch(
2735                                    &items,
2736                                    results,
2737                                    iteration,
2738                                    tools.tool_defs(),
2739                                    &mut state,
2740                                    &on_event,
2741                                )
2742                                .await
2743                            {
2744                                return run_return_failure(
2745                                    e,
2746                                    state.messages,
2747                                    iteration,
2748                                    state.total_usage,
2749                                );
2750                            }
2751                        }
2752                        break;
2753                    }
2754                }
2755            }
2756        }
2757
2758        run_return_failure(
2759            AgentError::MaxIterations(self.max_iterations),
2760            state.messages,
2761            self.max_iterations,
2762            state.total_usage,
2763        )
2764    }
2765
2766    /// Drain all immediately available ops from `ops_rx` and route each one
2767    /// through `dispatch_on_op` first so that extensions (e.g.
2768    /// `AskUserExtension`, `PlanningExtension`) can buffer ops that arrive
2769    /// before the corresponding deferred tool call exists.  Only ops for
2770    /// which every extension returns `OpDecision::Pass` fall through to the
2771    /// legacy `apply_op` handler (Interrupt, InjectUserMessage, etc.).
2772    ///
2773    /// This fixes the pre-queue bug (#127): previously `drain_ops` called
2774    /// `apply_op` directly, which was a no-op for `AskUserAnswer` and
2775    /// `ApprovePlan`, silently dropping them.
2776    async fn drain_ops(
2777        &self,
2778        messages: &mut Vec<Message>,
2779        ops_rx: &mut Option<mpsc::Receiver<AgentOp>>,
2780        ops_state: &mut OpsState,
2781        on_event: &(impl Fn(AgentEvent) + Send + Sync),
2782    ) -> Result<()> {
2783        let Some(rx) = ops_rx.as_mut() else {
2784            return Ok(());
2785        };
2786        while let Ok(op) = rx.try_recv() {
2787            // Route to extensions first. If no extension claims the op, fall
2788            // through to the legacy inline handler.
2789            let decision = self.dispatch_on_op(&op, on_event).await?;
2790            match decision {
2791                crate::core::OpDecision::Pass => {
2792                    Self::apply_op(op, messages, ops_state);
2793                }
2794                crate::core::OpDecision::Handled | crate::core::OpDecision::Reject(_) => {
2795                    // Extension consumed it. Nothing more to do.
2796                }
2797                crate::core::OpDecision::ResumeDeferred { call_id, .. } => {
2798                    debug_assert!(
2799                        false,
2800                        "extension returned ResumeDeferred from drain_ops for call_id '{}', \
2801                         but no deferred calls can exist at drain time (on_terminal clears \
2802                         pending state, tool calls only happen inside iterations). \
2803                         This indicates an extension bug.",
2804                        call_id
2805                    );
2806                    // In release builds, ignore the return — best-effort resilience.
2807                }
2808            }
2809        }
2810        Ok(())
2811    }
2812
2813    fn apply_op(op: AgentOp, messages: &mut Vec<Message>, ops_state: &mut OpsState) {
2814        match op {
2815            AgentOp::Interrupt => {
2816                ops_state.interrupted = true;
2817            }
2818            AgentOp::InjectUserMessage(text) => {
2819                messages.push(Message::user(&text));
2820            }
2821            AgentOp::InjectHint(hint) => {
2822                messages.push(Message::user(&format!("[Note: {hint}]")));
2823            }
2824            AgentOp::AskUserAnswer { .. } => {
2825                // Handled by AskUserExtension via the Defer/ResumeDeferred protocol.
2826                // apply_op is called for ops outside the resolve_deferred_slots loop;
2827                // AskUserAnswer arriving here is a no-op (no pending answers queue).
2828            }
2829            AgentOp::ApprovePlan { .. } => {
2830                // Handled by PlanningExtension via the Defer/ResumeDeferred protocol.
2831                // apply_op is called for ops outside the resolve_deferred_slots loop;
2832                // ApprovePlan arriving here is a no-op (no pending approval queue).
2833            }
2834        }
2835    }
2836
2837    async fn execute_tools_with_policy(
2838        &self,
2839        tool_map: &HashMap<String, Arc<dyn Tool>>,
2840        items: &[crate::llm::ToolCallItem],
2841        _messages: &mut Vec<Message>,
2842        ops_rx: &mut Option<mpsc::Receiver<AgentOp>>,
2843        ops_state: &mut OpsState,
2844        on_event: &(impl Fn(AgentEvent) + Send + Sync),
2845    ) -> Vec<ToolResult> {
2846        let policy = ToolExecutionPolicy::from_items(items);
2847        match policy {
2848            ToolExecutionPolicy::ParallelOnly => {
2849                let slots = self.dispatch_intercept_tool_calls(items, on_event).await;
2850
2851                // Separate Pending items (need normal execution) from Deferred/Resolved.
2852                let mut pending_items: Vec<crate::llm::ToolCallItem> = Vec::new();
2853                let mut pending_indices: Vec<usize> = Vec::new();
2854                for (i, (slot, item)) in slots.iter().zip(items.iter()).enumerate() {
2855                    if matches!(slot, InterceptedSlot::Pending) {
2856                        pending_items.push(item.clone());
2857                        pending_indices.push(i);
2858                    }
2859                }
2860
2861                // Run Pending tools in parallel with Deferred slot resolution.
2862                // This preserves the old behavior where non-ask_user tools run
2863                // concurrently while waiting for an ask_user answer.
2864                let (resolved_slots, pending_results) = futures::join!(
2865                    self.resolve_deferred_slots(slots, ops_state, ops_rx, on_event),
2866                    Self::execute_tools_parallel(
2867                        tool_map,
2868                        &pending_items,
2869                        self.tool_timeout,
2870                        &self.tool_context,
2871                    )
2872                );
2873                let resolved_slots = resolved_slots.unwrap_or_else(|e| {
2874                    eprintln!("[motosan-agent-loop] resolve_deferred_slots error: {}", e);
2875                    Vec::new()
2876                });
2877
2878                let mut final_results: Vec<Option<motosan_agent_tool::ToolResult>> = resolved_slots
2879                    .into_iter()
2880                    .map(|s| match s {
2881                        InterceptedSlot::Resolved(r) => Some(r),
2882                        InterceptedSlot::Pending => None,
2883                        InterceptedSlot::Deferred(_) => {
2884                            unreachable!(
2885                                "InterceptedSlot::Deferred should have been \
2886                                 resolved by resolve_deferred_slots"
2887                            );
2888                        }
2889                    })
2890                    .collect();
2891                for (exec_idx, original_idx) in pending_indices.iter().enumerate() {
2892                    final_results[*original_idx] = Some(pending_results[exec_idx].clone());
2893                }
2894                final_results
2895                    .into_iter()
2896                    .map(|opt| opt.expect("all slots filled after intercept + execute"))
2897                    .collect()
2898            }
2899        }
2900    }
2901
2902    /// Run each tool call through the extension `intercept_tool_call` pipeline.
2903    ///
2904    /// Returns a slot-aligned `Vec<InterceptedSlot>` matching `items`.
2905    /// See [`InterceptedSlot`] for the three possible outcomes.
2906    ///
2907    /// Forwards any `DelegationEvent::Started/Completed` as the legacy
2908    /// `AgentEvent::DelegateStarted/DelegateCompleted` for backward compatibility.
2909    /// TODO(phase-4): remove the downcast block once the public API cutover removes
2910    /// `AgentEvent::DelegateStarted` and `AgentEvent::DelegateCompleted`.
2911    async fn dispatch_intercept_tool_calls(
2912        &self,
2913        items: &[crate::llm::ToolCallItem],
2914        on_event: &(impl Fn(AgentEvent) + Send + Sync),
2915    ) -> Vec<InterceptedSlot> {
2916        // Fast path: no extensions registered, nothing to intercept.
2917        if self.extensions.lock().await.is_empty() {
2918            return items.iter().map(|_| InterceptedSlot::Pending).collect();
2919        }
2920
2921        let tools_snapshot = self.tool_defs.to_vec();
2922        let empty_messages: Vec<crate::message::Message> = vec![];
2923        let agent_state = crate::core::AgentState {
2924            iteration: 0,
2925            messages: &empty_messages,
2926            tools: &tools_snapshot,
2927            turn_started_at: std::time::Instant::now(),
2928            token_usage: crate::llm::TokenUsage::default(),
2929        };
2930
2931        let mut slots: Vec<InterceptedSlot> = Vec::with_capacity(items.len());
2932
2933        for item in items.iter() {
2934            let mut sink = crate::core::CaptureSink::default();
2935            let decision = {
2936                let mut set = self.extensions.lock().await;
2937                set.intercept_tool_call(item.clone(), &agent_state, &mut sink)
2938                    .await
2939                    .unwrap_or_else(|e| {
2940                        eprintln!(
2941                            "[motosan-agent-loop] dispatch_intercept_tool_calls failed: {}. \
2942                             Treating as Proceed.",
2943                            e
2944                        );
2945                        crate::core::ToolDecision::Proceed(item.clone())
2946                    })
2947            };
2948
2949            // Forward extension events → AgentEvent::Extension stream.
2950            for (_name, evt) in sink.captured() {
2951                forward_extension_event(evt.as_ref(), on_event);
2952            }
2953
2954            let slot = match decision {
2955                crate::core::ToolDecision::ShortCircuit(result) => {
2956                    InterceptedSlot::Resolved(result)
2957                }
2958                crate::core::ToolDecision::Defer { call_id, reason } => {
2959                    // Record the deferred call. Task 5's resolve_deferred_slots
2960                    // will read this map when it processes ResumeDeferred ops.
2961                    let mut deferred = self.deferred_calls.lock().await;
2962                    if deferred.contains_key(&call_id) {
2963                        eprintln!(
2964                            "[motosan-agent-loop] WARNING: duplicate Defer call_id '{}' \
2965                             (reason: {}). Treating as Pending — the previously deferred \
2966                             call may be lost.",
2967                            call_id, reason
2968                        );
2969                        InterceptedSlot::Pending
2970                    } else {
2971                        deferred.insert(
2972                            call_id.clone(),
2973                            DeferredCall {
2974                                call: item.clone(),
2975                                by_extension: "unknown",
2976                                // TODO(phase-3b): plumb the real extension name
2977                                // through ExtensionSet::intercept_tool_call so the
2978                                // diagnostic can name which extension deferred the
2979                                // call. For now, "unknown" is acceptable.
2980                                at: std::time::Instant::now(),
2981                            },
2982                        );
2983                        InterceptedSlot::Deferred(call_id)
2984                    }
2985                }
2986                // Proceed (with possibly modified call) or Replace —
2987                // both fall through to normal dispatch.
2988                _ => InterceptedSlot::Pending,
2989            };
2990
2991            slots.push(slot);
2992        }
2993
2994        slots
2995    }
2996
2997    /// Route an incoming `AgentOp` to the extension system via
2998    /// `ExtensionSet::on_op` and return the resulting `OpDecision`.
2999    ///
3000    /// Returns the `OpDecision` from the first extension that returned
3001    /// something other than `Pass`. `Pass` means "no extension handled
3002    /// this op" — the caller should fall back to legacy inline handling
3003    /// (until Tasks 11 and 15 delete those handlers).
3004    ///
3005    /// Forwards extension events to the legacy `AgentEvent` stream.
3006    /// Phase 3B Tasks 11 and 15 will populate the forwarding loop with
3007    /// AskUserEvent and PlanningEvent downcasts. For Task 3, the loop
3008    /// is intentionally a no-op skeleton.
3009    ///
3010    /// TODO(phase-4): delete this method once the public API cutover
3011    /// routes consumers to CoreEvent + ExtensionEvent directly.
3012    async fn dispatch_on_op(
3013        &self,
3014        op: &AgentOp,
3015        on_event: &(impl Fn(AgentEvent) + Send + Sync),
3016    ) -> Result<crate::core::OpDecision> {
3017        if self.extensions.lock().await.is_empty() {
3018            return Ok(crate::core::OpDecision::Pass);
3019        }
3020
3021        let tools_snapshot = self.tool_defs.to_vec();
3022        let empty_messages: Vec<crate::message::Message> = vec![];
3023        let agent_state = crate::core::AgentState {
3024            iteration: 0,
3025            messages: &empty_messages,
3026            tools: &tools_snapshot,
3027            turn_started_at: std::time::Instant::now(),
3028            token_usage: crate::llm::TokenUsage::default(),
3029        };
3030
3031        let mut sink = crate::core::CaptureSink::default();
3032        let decision = self
3033            .extensions
3034            .lock()
3035            .await
3036            .on_op(op, &agent_state, &mut sink)
3037            .await
3038            .map_err(|e| crate::error::AgentError::Internal(e.message().to_string()))?;
3039
3040        // Forward captured events to the AgentEvent stream.
3041        for (_name, evt) in sink.captured() {
3042            forward_extension_event(evt.as_ref(), on_event);
3043        }
3044
3045        Ok(decision)
3046    }
3047
3048    /// Wait for all `Deferred` slots in the given vec to be resolved
3049    /// via incoming `AgentOp`s and `dispatch_on_op` returning
3050    /// `ResumeDeferred`. Returns the same vec with all `Deferred`
3051    /// entries replaced by `Resolved`.
3052    ///
3053    /// On timeout (default 60s; eventually extension config should drive
3054    /// this), unresolved deferred slots are filled with an error
3055    /// `ToolResult` and a diagnostic is logged via eprintln.
3056    ///
3057    /// During the wait, ops that no extension claims (returning
3058    /// `OpDecision::Pass`) fall through to legacy inline handling
3059    /// (mirroring apply_op). Tasks 11 and 15 will replace the legacy
3060    /// handlers with extension-based ones, but for now this method
3061    /// preserves backward compatibility with the existing AskUserAnswer /
3062    /// ApprovePlan inline op handlers.
3063    async fn resolve_deferred_slots(
3064        &self,
3065        mut slots: Vec<InterceptedSlot>,
3066        ops_state: &mut OpsState,
3067        ops_rx: &mut Option<tokio::sync::mpsc::Receiver<AgentOp>>,
3068        on_event: &(impl Fn(AgentEvent) + Send + Sync),
3069    ) -> Result<Vec<InterceptedSlot>> {
3070        // Fast path: nothing deferred.
3071        if !slots
3072            .iter()
3073            .any(|s| matches!(s, InterceptedSlot::Deferred(_)))
3074        {
3075            return Ok(slots);
3076        }
3077
3078        // Use the minimum timeout from registered extensions (e.g. AskUserConfig::timeout),
3079        // falling back to a 60s ceiling if no extension constrains it.
3080        const DEFAULT_DEFER_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
3081        let overall_timeout = self
3082            .extensions
3083            .lock()
3084            .await
3085            .min_deferred_call_timeout()
3086            .unwrap_or(DEFAULT_DEFER_TIMEOUT);
3087        let deadline = std::time::Instant::now() + overall_timeout;
3088
3089        loop {
3090            // Check if all deferred slots are now resolved.
3091            if !slots
3092                .iter()
3093                .any(|s| matches!(s, InterceptedSlot::Deferred(_)))
3094            {
3095                break;
3096            }
3097
3098            // Compute remaining time for this iteration's wait.
3099            let now = std::time::Instant::now();
3100            if now >= deadline {
3101                // Timeout: resolve every still-Deferred slot with an error.
3102                // Emit AskUserTimeout for any ask_user deferred calls so that
3103                // the legacy AgentEvent::AskUserTimeout contract is preserved.
3104                for slot in slots.iter_mut() {
3105                    if let InterceptedSlot::Deferred(call_id) = slot {
3106                        let cid = call_id.clone();
3107                        self.deferred_calls.lock().await.remove(&cid);
3108                        // Notify all extensions of the timeout so they can emit
3109                        // their own timeout events (e.g. AskUserEvent::Timeout).
3110                        // This replaces the old hardcoded ask_user name check.
3111                        {
3112                            let tools_snapshot = self.tool_defs.to_vec();
3113                            let empty_messages: Vec<crate::message::Message> = vec![];
3114                            let agent_state = crate::core::AgentState {
3115                                iteration: 0,
3116                                messages: &empty_messages,
3117                                tools: &tools_snapshot,
3118                                turn_started_at: std::time::Instant::now(),
3119                                token_usage: crate::llm::TokenUsage::default(),
3120                            };
3121                            let mut sink = crate::core::CaptureSink::default();
3122                            let _ = self
3123                                .extensions
3124                                .lock()
3125                                .await
3126                                .on_defer_timeout(&cid, &agent_state, &mut sink)
3127                                .await;
3128                            for (_name, evt) in sink.captured() {
3129                                forward_extension_event(evt.as_ref(), on_event);
3130                            }
3131                        }
3132                        eprintln!(
3133                            "[motosan-agent-loop] Defer call '{}' timed out after {:?}",
3134                            cid, overall_timeout
3135                        );
3136                        *slot = InterceptedSlot::Resolved(motosan_agent_tool::ToolResult::error(
3137                            format!(
3138                                "Deferred call '{}' timed out after {:?}",
3139                                cid, overall_timeout
3140                            ),
3141                        ));
3142                    }
3143                }
3144                break;
3145            }
3146            let remaining = deadline - now;
3147
3148            // Wait for the next op or for the timeout.
3149            let op = match ops_rx {
3150                Some(rx) => {
3151                    // TEST-ONLY: fire parked notifier before awaiting ops_rx.
3152                    #[cfg(test)]
3153                    if let Some(notify) = self.test_parked_notify.lock().unwrap().as_ref() {
3154                        notify.notify_one();
3155                    }
3156                    tokio::select! {
3157                        received = rx.recv() => {
3158                            match received {
3159                                Some(op) => op,
3160                                None => {
3161                                    // Ops channel closed. Resolve remaining as errors.
3162                                    for slot in slots.iter_mut() {
3163                                        if let InterceptedSlot::Deferred(call_id) = slot {
3164                                            let cid = call_id.clone();
3165                                            self.deferred_calls.lock().await.remove(&cid);
3166                                            eprintln!(
3167                                                "[motosan-agent-loop] Defer call '{}' aborted: ops channel closed",
3168                                                cid
3169                                            );
3170                                            *slot = InterceptedSlot::Resolved(
3171                                                motosan_agent_tool::ToolResult::error(format!(
3172                                                    "Deferred call '{}' aborted: ops channel closed",
3173                                                    cid
3174                                                )),
3175                                            );
3176                                        }
3177                                    }
3178                                    break;
3179                                }
3180                            }
3181                        }
3182                        _ = tokio::time::sleep(remaining) => {
3183                            continue; // loop will detect timeout on next iteration
3184                        }
3185                    }
3186                }
3187                None => {
3188                    // No ops channel — cannot wait for resume. Resolve all
3189                    // deferred slots as errors immediately and notify extensions
3190                    // via on_defer_timeout so they can emit their own events.
3191                    for slot in slots.iter_mut() {
3192                        if let InterceptedSlot::Deferred(call_id) = slot {
3193                            let cid = call_id.clone();
3194                            self.deferred_calls.lock().await.remove(&cid);
3195                            // Notify all extensions of the timeout/abort.
3196                            // This replaces the old hardcoded ask_user name check.
3197                            {
3198                                let tools_snapshot = self.tool_defs.to_vec();
3199                                let empty_messages: Vec<crate::message::Message> = vec![];
3200                                let agent_state = crate::core::AgentState {
3201                                    iteration: 0,
3202                                    messages: &empty_messages,
3203                                    tools: &tools_snapshot,
3204                                    turn_started_at: std::time::Instant::now(),
3205                                    token_usage: crate::llm::TokenUsage::default(),
3206                                };
3207                                let mut sink = crate::core::CaptureSink::default();
3208                                let _ = self
3209                                    .extensions
3210                                    .lock()
3211                                    .await
3212                                    .on_defer_timeout(&cid, &agent_state, &mut sink)
3213                                    .await;
3214                                for (_name, evt) in sink.captured() {
3215                                    forward_extension_event(evt.as_ref(), on_event);
3216                                }
3217                            }
3218                            eprintln!(
3219                                "[motosan-agent-loop] Defer call '{}' aborted: no ops channel",
3220                                cid
3221                            );
3222                            *slot = InterceptedSlot::Resolved(
3223                                motosan_agent_tool::ToolResult::error(format!(
3224                                    "Deferred call '{}' aborted: no ops channel available",
3225                                    cid
3226                                )),
3227                            );
3228                        }
3229                    }
3230                    break;
3231                }
3232            };
3233
3234            // Dispatch the op via the extension system first.
3235            let decision = self.dispatch_on_op(&op, on_event).await?;
3236
3237            match decision {
3238                crate::core::OpDecision::ResumeDeferred { call_id, result } => {
3239                    // Find the slot with matching call_id and fill it.
3240                    let mut found = false;
3241                    for slot in slots.iter_mut() {
3242                        if let InterceptedSlot::Deferred(slot_call_id) = slot {
3243                            if slot_call_id == &call_id {
3244                                *slot = InterceptedSlot::Resolved(result.clone());
3245                                self.deferred_calls.lock().await.remove(&call_id);
3246                                found = true;
3247                                break;
3248                            }
3249                        }
3250                    }
3251                    if !found {
3252                        eprintln!(
3253                            "[motosan-agent-loop] WARNING: extension returned ResumeDeferred \
3254                             for unknown call_id '{}' — no matching deferred slot.",
3255                            call_id
3256                        );
3257                    }
3258                }
3259                crate::core::OpDecision::Handled | crate::core::OpDecision::Reject(_) => {
3260                    // Op was handled by an extension (without resuming a
3261                    // deferred call) — continue the wait.
3262                }
3263                crate::core::OpDecision::Pass => {
3264                    // No extension claimed the op. Inline handling for ops
3265                    // not managed by any extension.
3266                    // AskUserAnswer is fully handled by AskUserExtension.
3267                    // ApprovePlan is fully handled by PlanningExtension.
3268                    match &op {
3269                        AgentOp::Interrupt => {
3270                            ops_state.interrupted = true;
3271                            on_event(AgentEvent::Core(CoreEvent::Interrupted));
3272                            // Resolve all remaining deferred slots with an error
3273                            // so the caller gets a result, then exit.
3274                            for slot in slots.iter_mut() {
3275                                if let InterceptedSlot::Deferred(call_id) = slot {
3276                                    let cid = call_id.clone();
3277                                    self.deferred_calls.lock().await.remove(&cid);
3278                                    *slot = InterceptedSlot::Resolved(
3279                                        motosan_agent_tool::ToolResult::error(format!(
3280                                            "Deferred call '{}' aborted: loop interrupted",
3281                                            cid
3282                                        )),
3283                                    );
3284                                }
3285                            }
3286                            break;
3287                        }
3288                        AgentOp::AskUserAnswer { .. } | AgentOp::ApprovePlan { .. } => {
3289                            // Handled by AskUserExtension / PlanningExtension above via
3290                            // ResumeDeferred. If they fall through to Pass, there was no
3291                            // matching deferred call_id — ignore silently.
3292                        }
3293                        _ => {
3294                            // Other ops not handled during Defer waits.
3295                        }
3296                    }
3297                }
3298            }
3299        }
3300
3301        Ok(slots)
3302    }
3303
3304    async fn execute_tools_parallel(
3305        tool_map: &HashMap<String, Arc<dyn Tool>>,
3306        items: &[crate::llm::ToolCallItem],
3307        timeout: Option<std::time::Duration>,
3308        ctx: &ToolContext,
3309    ) -> Vec<ToolResult> {
3310        join_all(
3311            items
3312                .iter()
3313                .map(|tc| Self::execute_tool(tool_map, &tc.name, tc.args.clone(), timeout, ctx)),
3314        )
3315        .await
3316    }
3317
3318    /// Execute a single tool by name using a pre-built lookup map.
3319    /// Returns an error `ToolResult` if the tool is not found or times out.
3320    async fn execute_tool(
3321        tool_map: &HashMap<String, Arc<dyn Tool>>,
3322        name: &str,
3323        args: serde_json::Value,
3324        timeout: Option<std::time::Duration>,
3325        ctx: &ToolContext,
3326    ) -> ToolResult {
3327        let fut = async {
3328            if let Some(tool) = tool_map.get(name) {
3329                tool.call(args, ctx).await
3330            } else {
3331                ToolResult::error(format!("unknown tool: {name}"))
3332            }
3333        };
3334        if let Some(dur) = timeout {
3335            match tokio::time::timeout(dur, fut).await {
3336                Ok(result) => result,
3337                Err(_) => ToolResult::error(format!("tool '{name}' timed out after {dur:?}")),
3338            }
3339        } else {
3340            fut.await
3341        }
3342    }
3343
3344    /// Finalize a batch of tool call results.
3345    async fn finalize_tool_call_batch(
3346        &self,
3347        items: &[ToolCallItem],
3348        raw_results: Vec<ToolResult>,
3349        iteration: usize,
3350        tools: &[ToolDef],
3351        state: &mut TurnState,
3352        on_event: &(impl Fn(AgentEvent) + Send + Sync),
3353    ) -> Result<()> {
3354        let mut results: Vec<ToolResult> = Vec::with_capacity(raw_results.len());
3355        for (tc, raw) in items.iter().zip(raw_results) {
3356            let agent_state =
3357                self.agent_state_snapshot(iteration, &state.messages, tools, state.total_usage);
3358            let rewritten = self
3359                .dispatch_rewrite_tool_result(tc, raw, &agent_state, on_event)
3360                .await?;
3361            results.push(rewritten);
3362        }
3363
3364        for (tc, result) in items.iter().zip(results.iter()) {
3365            on_event(AgentEvent::Core(CoreEvent::ToolCompleted {
3366                name: tc.name.clone(),
3367                result: result.clone(),
3368            }));
3369            state
3370                .all_tool_calls
3371                .push((tc.name.clone(), tc.args.clone()));
3372        }
3373
3374        let tool_call_refs: Vec<ToolCallRef> = items
3375            .iter()
3376            .map(|tc| ToolCallRef {
3377                id: tc.id.clone(),
3378                name: tc.name.clone(),
3379                args: tc.args.clone(),
3380            })
3381            .collect();
3382        state
3383            .messages
3384            .push(Message::assistant_with_tool_calls("", tool_call_refs));
3385
3386        for (tc, result) in items.iter().zip(results.iter()) {
3387            state
3388                .messages
3389                .push(Message::tool_result(&tc.id, &tool_result_to_string(result)));
3390        }
3391
3392        for result in &results {
3393            let agent_state =
3394                self.agent_state_snapshot(iteration, &state.messages, tools, state.total_usage);
3395            match self
3396                .dispatch_after_tool_result(result, &agent_state, on_event)
3397                .await?
3398            {
3399                FlowDecision::Continue => {}
3400                FlowDecision::Inject(msg) => state.messages.push(msg),
3401                FlowDecision::Halt(reason) => {
3402                    return Err(crate::error::AgentError::Internal(reason.message));
3403                }
3404            }
3405        }
3406
3407        Ok(())
3408    }
3409}
3410
3411/// `ToolExecutionPolicy` controls how a batch of tool calls is executed.
3412///
3413/// `enter_plan_mode` / `exit_plan_mode` are now handled by `PlanningExtension`
3414/// via the Defer/ResumeDeferred protocol (just like `ask_user`), so all batches
3415/// now use the `ParallelOnly` path.
3416#[derive(Debug, Clone, Copy)]
3417enum ToolExecutionPolicy {
3418    ParallelOnly,
3419}
3420
3421impl ToolExecutionPolicy {
3422    /// Determine the execution policy for a batch of tool calls.
3423    ///
3424    /// All interactive tools (`ask_user`, `enter_plan_mode`, `exit_plan_mode`)
3425    /// are now handled by extensions via the Defer protocol — every batch uses
3426    /// `ParallelOnly`.
3427    fn from_items(_items: &[crate::llm::ToolCallItem]) -> Self {
3428        Self::ParallelOnly
3429    }
3430}
3431
3432#[derive(Default)]
3433struct OpsState {
3434    interrupted: bool,
3435}
3436
3437/// Result of dispatching a single tool call through the extension
3438/// `intercept_tool_call` pipeline.
3439///
3440/// - `Pending`: no extension claimed the call; the core's normal
3441///   tool-map dispatch should run it.
3442/// - `Resolved`: an extension returned `ShortCircuit(result)`; the
3443///   call has its final result and skips normal dispatch.
3444/// - `Deferred`: an extension returned `Defer { call_id }`; the
3445///   call is suspended, awaiting a matching `OpDecision::ResumeDeferred`.
3446///   The `call_id` is the key into `Engine::deferred_calls`.
3447#[derive(Debug)]
3448enum InterceptedSlot {
3449    Pending,
3450    Resolved(motosan_agent_tool::ToolResult),
3451    Deferred(String),
3452}
3453
3454/// A tool call that was suspended by an extension via
3455/// `ToolDecision::Defer`. The core records these in a map and
3456/// blocks the turn until a matching `OpDecision::ResumeDeferred`
3457/// arrives via the ops channel.
3458///
3459/// See spec §8.3 for the full protocol.
3460#[derive(Debug)]
3461#[allow(dead_code)] // Used in Phase 3B Tasks 4-5
3462struct DeferredCall {
3463    /// The original tool call that was deferred. Stored so the
3464    /// resume path knows which slot to fill in the tool dispatch
3465    /// batch.
3466    call: crate::llm::ToolCallItem,
3467    /// Name of the extension that returned `Defer`. Used for
3468    /// diagnostics — if a turn times out with this call still
3469    /// deferred, the error message includes which extension was
3470    /// responsible.
3471    by_extension: &'static str,
3472    /// Wall-clock instant when the call was deferred. Used for
3473    /// timeout enforcement.
3474    at: std::time::Instant,
3475}
3476
3477/// Forward a type-erased extension event captured in a `CaptureSink` to the
3478/// `AgentEvent::Extension(...)` stream.
3479///
3480/// Attempts each known extension event type in turn; if none match, the
3481/// event is silently discarded (unknown extension). Called from all four
3482/// dispatch helpers: `maybe_compact_via_extensions`, `dispatch_after_llm_response`,
3483/// `dispatch_intercept_tool_calls`, `dispatch_on_op` (and `resolve_deferred_slots`).
3484fn forward_extension_event(
3485    evt: &dyn crate::core::hook_ctx::ExtEvent,
3486    on_event: &(impl Fn(AgentEvent) + Send + Sync),
3487) {
3488    #[cfg(feature = "redact")]
3489    use crate::extensions::redact::RedactEvent;
3490    use crate::extensions::{
3491        ask_user::AskUserEvent, autocompact::AutocompactEvent, delegation::DelegationEvent,
3492        follow_up::FollowUpEvent, planning::PlanningEvent, stuck_detection::StuckDetectionEvent,
3493        token_budget::TokenBudgetEvent,
3494    };
3495    let any = evt.as_any();
3496    if let Some(e) = any.downcast_ref::<AutocompactEvent>() {
3497        on_event(AgentEvent::Extension(ExtensionEvent::Autocompact(
3498            e.clone(),
3499        )));
3500    } else if let Some(e) = any.downcast_ref::<TokenBudgetEvent>() {
3501        on_event(AgentEvent::Extension(ExtensionEvent::TokenBudget(
3502            e.clone(),
3503        )));
3504    } else if let Some(e) = any.downcast_ref::<StuckDetectionEvent>() {
3505        on_event(AgentEvent::Extension(ExtensionEvent::StuckDetection(
3506            e.clone(),
3507        )));
3508    } else if let Some(e) = any.downcast_ref::<DelegationEvent>() {
3509        on_event(AgentEvent::Extension(ExtensionEvent::Delegation(e.clone())));
3510    } else if let Some(e) = any.downcast_ref::<AskUserEvent>() {
3511        on_event(AgentEvent::Extension(ExtensionEvent::AskUser(e.clone())));
3512    } else if let Some(e) = any.downcast_ref::<PlanningEvent>() {
3513        on_event(AgentEvent::Extension(ExtensionEvent::Planning(e.clone())));
3514    } else if let Some(e) = any.downcast_ref::<FollowUpEvent>() {
3515        on_event(AgentEvent::Extension(ExtensionEvent::FollowUp(e.clone())));
3516    } else {
3517        #[cfg(feature = "redact")]
3518        if let Some(e) = any.downcast_ref::<RedactEvent>() {
3519            on_event(AgentEvent::Extension(ExtensionEvent::Redact(e.clone())));
3520        }
3521    }
3522}
3523
3524/// Convert a [`ToolResult`] into a string suitable for message content.
3525fn tool_result_to_string(result: &ToolResult) -> String {
3526    match result.as_text() {
3527        Some(text) => text.to_string(),
3528        None => {
3529            // Fall back to JSON serialization of the content.
3530            serde_json::to_string(&result.content).unwrap_or_else(|_| "<no content>".to_string())
3531        }
3532    }
3533}
3534
3535// ─────────────────────────────────────────────────────────────────────────────
3536// RunBuilder — the 0.13.0 unified agent run API
3537// ─────────────────────────────────────────────────────────────────────────────
3538
3539/// Configuration builder for a single agent turn, returned by
3540/// [`Engine::run`]. Chain axis setters (`.ops`, `.cancel`, `.chunked`)
3541/// in any order, then call one of the terminators (`.result`,
3542/// `.callback`, `.stream`) to execute the turn.
3543///
3544/// `RunBuilder` is `#[must_use]`. Dropping it without calling a
3545/// terminator emits a compiler warning. Under
3546/// `#![deny(unused_must_use)]` this becomes a hard error:
3547///
3548/// ```compile_fail
3549/// #![deny(unused_must_use)]
3550/// use std::sync::Arc;
3551/// use motosan_agent_loop::{Engine, LlmClient, Message};
3552///
3553/// fn drop_builder(agent: Arc<Engine>, llm: Arc<dyn LlmClient>) {
3554///     agent.run(llm, vec![Message::user("hi")]); // no terminator — denied
3555/// }
3556/// ```
3557///
3558/// Positive control — the same body with a terminator bound to `_`
3559/// compiles cleanly under the same deny, proving the `compile_fail`
3560/// above is failing on `unused_must_use` and not some incidental
3561/// rename or trait-resolution error:
3562///
3563/// ```no_run
3564/// #![deny(unused_must_use)]
3565/// use std::sync::Arc;
3566/// use motosan_agent_loop::{Engine, LlmClient, Message};
3567///
3568/// async fn use_builder(agent: Arc<Engine>, llm: Arc<dyn LlmClient>) {
3569///     let _ = agent.run(llm, vec![Message::user("hi")]).result().await;
3570/// }
3571/// ```
3572///
3573/// All fields are private. External code cannot construct a
3574/// `RunBuilder` directly — use [`Engine::run`].
3575#[must_use = "call .result(), .callback(cb), or .stream() to execute the run"]
3576pub struct RunBuilder {
3577    engine: Arc<Engine>,
3578    llm: Arc<dyn LlmClient>,
3579    messages: Vec<Message>,
3580    ops_rx: Option<mpsc::Receiver<AgentOp>>,
3581    #[cfg(feature = "cancellation")]
3582    cancel: Option<tokio_util::sync::CancellationToken>,
3583    chunked: bool,
3584}
3585
3586impl RunBuilder {
3587    // ─── Terminators ────────────────────────────────────────────
3588
3589    /// Run the turn to completion and return the final `AgentResult`.
3590    /// Intermediate events are generated internally but not delivered
3591    /// anywhere. Use this when you only need the final answer.
3592    pub async fn result(self) -> Result<AgentResult> {
3593        self.dispatch_callback_internal(|_| {}).await
3594    }
3595
3596    /// Run the turn and push each `AgentEvent` through the callback
3597    /// as it happens. Returns the final `AgentResult` when the turn
3598    /// terminates.
3599    pub async fn callback(
3600        self,
3601        on_event: impl Fn(AgentEvent) + Send + Sync + 'static,
3602    ) -> Result<AgentResult> {
3603        self.dispatch_callback_internal(on_event).await
3604    }
3605
3606    // ─── Internal dispatch ──────────────────────────────────────
3607
3608    /// Assemble the axis state and delegate to the appropriate inner
3609    /// function. Called by `.result()` (with a no-op callback) and
3610    /// `.callback(cb)` (with the user's callback, in Task 8).
3611    async fn dispatch_callback_internal(
3612        self,
3613        on_event: impl Fn(AgentEvent) + Send + Sync + 'static,
3614    ) -> Result<AgentResult> {
3615        // Wrap in Arc so all match arms can use it without move conflicts.
3616        let on_event = std::sync::Arc::new(on_event);
3617
3618        let RunBuilder {
3619            engine,
3620            llm,
3621            messages,
3622            ops_rx,
3623            #[cfg(feature = "cancellation")]
3624            cancel,
3625            chunked,
3626        } = self;
3627
3628        // MCP connect (external to the inner functions after Task 1).
3629        #[cfg(feature = "mcp-client")]
3630        let mcp_tools: Vec<Arc<dyn Tool>> = engine.connect_mcp_servers().await?;
3631        #[cfg(not(feature = "mcp-client"))]
3632        let mcp_tools: Vec<Arc<dyn Tool>> = vec![];
3633
3634        // Dispatch based on axis combination.
3635        let (result, _final_messages, terminal_meta) = {
3636            #[cfg(feature = "cancellation")]
3637            {
3638                match (chunked, cancel, ops_rx) {
3639                    // batch LLM, no cancel, ops or no ops
3640                    (false, None, ops) => {
3641                        let cb = std::sync::Arc::clone(&on_event);
3642                        engine
3643                            .run_inner_with_ops(llm.as_ref(), messages, &mcp_tools, &*cb, ops)
3644                            .await
3645                    }
3646                    // batch LLM, cancel, no ops
3647                    (false, Some(token), None) => {
3648                        let cb = std::sync::Arc::clone(&on_event);
3649                        engine
3650                            .run_inner_cancel(llm.as_ref(), messages, &mcp_tools, &*cb, &token)
3651                            .await
3652                    }
3653                    // batch LLM, cancel + ops
3654                    (false, Some(token), Some(rx)) => {
3655                        let cb = std::sync::Arc::clone(&on_event);
3656                        engine
3657                            .run_inner_with_ops_and_cancel(
3658                                llm.as_ref(),
3659                                messages,
3660                                &mcp_tools,
3661                                &*cb,
3662                                rx,
3663                                &token,
3664                            )
3665                            .await
3666                    }
3667                    // chunked LLM, no cancel, ops or no ops
3668                    (true, None, ops) => {
3669                        let cb = std::sync::Arc::clone(&on_event);
3670                        engine
3671                            .run_streaming_inner(
3672                                llm.as_ref(),
3673                                messages,
3674                                &mcp_tools,
3675                                ops,
3676                                move |e| cb(e),
3677                            )
3678                            .await
3679                    }
3680                    // chunked LLM, cancel, no ops
3681                    (true, Some(token), None) => {
3682                        let cb = std::sync::Arc::clone(&on_event);
3683                        engine
3684                            .run_streaming_with_cancel_inner(
3685                                llm.as_ref(),
3686                                messages,
3687                                &mcp_tools,
3688                                move |e| cb(e),
3689                                &token,
3690                            )
3691                            .await
3692                    }
3693                    // chunked LLM, cancel + ops
3694                    (true, Some(token), Some(rx)) => {
3695                        let cb = std::sync::Arc::clone(&on_event);
3696                        engine
3697                            .run_streaming_inner_with_cancel_and_ops(
3698                                llm.as_ref(),
3699                                messages,
3700                                &mcp_tools,
3701                                rx,
3702                                &token,
3703                                move |e| cb(e),
3704                            )
3705                            .await
3706                    }
3707                }
3708            }
3709            #[cfg(not(feature = "cancellation"))]
3710            {
3711                match (chunked, ops_rx) {
3712                    (false, ops) => {
3713                        let cb = std::sync::Arc::clone(&on_event);
3714                        engine
3715                            .run_inner_with_ops(llm.as_ref(), messages, &mcp_tools, &*cb, ops)
3716                            .await
3717                    }
3718                    (true, ops) => {
3719                        let cb = std::sync::Arc::clone(&on_event);
3720                        engine
3721                            .run_streaming_inner(
3722                                llm.as_ref(),
3723                                messages,
3724                                &mcp_tools,
3725                                ops,
3726                                move |e| cb(e),
3727                            )
3728                            .await
3729                    }
3730                }
3731            }
3732        };
3733
3734        let merged_tools = MergedTools::new(&engine.tool_map, &engine.tool_defs, &mcp_tools);
3735        let terminal_result = match engine
3736            .dispatch_on_terminal_from_meta(
3737                &terminal_meta,
3738                &_final_messages,
3739                merged_tools.tool_defs(),
3740                &*on_event,
3741            )
3742            .await
3743        {
3744            Ok(()) => result,
3745            Err(e) => Err(e),
3746        };
3747
3748        // MCP disconnect.
3749        #[cfg(feature = "mcp-client")]
3750        for server in &engine.mcp_servers {
3751            let _ = server.disconnect().await;
3752        }
3753
3754        terminal_result
3755    }
3756
3757    /// Return a pulled stream of `AgentStreamItem`s. The consumer
3758    /// drives the turn by calling `.next().await`. The final
3759    /// `Terminal` item is guaranteed to be the last yielded before
3760    /// the underlying channel closes.
3761    ///
3762    /// Use `.callback(cb)` or `.result()` if you want a push-based
3763    /// delivery model or a final-result-only future.
3764    pub fn stream(
3765        self,
3766    ) -> impl futures::Stream<Item = crate::stream::AgentStreamItem> + Send + 'static {
3767        self.dispatch_stream()
3768    }
3769
3770    /// Spawns a tokio task running the same 8-way dispatch as
3771    /// `dispatch_callback_internal`, with events piped into an mpsc
3772    /// channel. The receiver is returned wrapped in
3773    /// `ReceiverStream`, giving consumers a pull-based delivery
3774    /// model while internally reusing the same axis routing.
3775    fn dispatch_stream(
3776        self,
3777    ) -> impl futures::Stream<Item = crate::stream::AgentStreamItem> + Send + 'static {
3778        use crate::stream::{AgentStreamItem, AgentTerminal};
3779
3780        let capacity = self.engine.channel_config.stream_capacity.unwrap_or(256);
3781        let (tx, rx) = mpsc::channel::<AgentStreamItem>(capacity);
3782
3783        let RunBuilder {
3784            engine,
3785            llm,
3786            messages,
3787            ops_rx,
3788            #[cfg(feature = "cancellation")]
3789            cancel,
3790            chunked,
3791        } = self;
3792        let tx_for_events = tx.clone();
3793
3794        tokio::spawn(async move {
3795            let on_event = move |e: AgentEvent| {
3796                let _ = tx_for_events.try_send(AgentStreamItem::Event(e));
3797            };
3798            let on_event = std::sync::Arc::new(on_event);
3799
3800            // MCP connect.
3801            #[cfg(feature = "mcp-client")]
3802            let mcp_tools: Vec<Arc<dyn Tool>> = match engine.connect_mcp_servers().await {
3803                Ok(t) => t,
3804                Err(e) => {
3805                    let _ = tx
3806                        .send(AgentStreamItem::Terminal(AgentTerminal {
3807                            result: Err(e),
3808                            messages: Vec::new(),
3809                        }))
3810                        .await;
3811                    return;
3812                }
3813            };
3814            #[cfg(not(feature = "mcp-client"))]
3815            let mcp_tools: Vec<Arc<dyn Tool>> = vec![];
3816
3817            // Same 8-way dispatch match as dispatch_callback_internal.
3818            let (result, final_messages, terminal_meta) = {
3819                #[cfg(feature = "cancellation")]
3820                {
3821                    match (chunked, cancel, ops_rx) {
3822                        (false, None, ops) => {
3823                            let cb = std::sync::Arc::clone(&on_event);
3824                            engine
3825                                .run_inner_with_ops(llm.as_ref(), messages, &mcp_tools, &*cb, ops)
3826                                .await
3827                        }
3828                        (false, Some(token), None) => {
3829                            let cb = std::sync::Arc::clone(&on_event);
3830                            engine
3831                                .run_inner_cancel(llm.as_ref(), messages, &mcp_tools, &*cb, &token)
3832                                .await
3833                        }
3834                        (false, Some(token), Some(rx_ops)) => {
3835                            let cb = std::sync::Arc::clone(&on_event);
3836                            engine
3837                                .run_inner_with_ops_and_cancel(
3838                                    llm.as_ref(),
3839                                    messages,
3840                                    &mcp_tools,
3841                                    &*cb,
3842                                    rx_ops,
3843                                    &token,
3844                                )
3845                                .await
3846                        }
3847                        (true, None, ops) => {
3848                            let cb = std::sync::Arc::clone(&on_event);
3849                            engine
3850                                .run_streaming_inner(
3851                                    llm.as_ref(),
3852                                    messages,
3853                                    &mcp_tools,
3854                                    ops,
3855                                    move |e| cb(e),
3856                                )
3857                                .await
3858                        }
3859                        (true, Some(token), None) => {
3860                            let cb = std::sync::Arc::clone(&on_event);
3861                            engine
3862                                .run_streaming_with_cancel_inner(
3863                                    llm.as_ref(),
3864                                    messages,
3865                                    &mcp_tools,
3866                                    move |e| cb(e),
3867                                    &token,
3868                                )
3869                                .await
3870                        }
3871                        (true, Some(token), Some(rx_ops)) => {
3872                            let cb = std::sync::Arc::clone(&on_event);
3873                            engine
3874                                .run_streaming_inner_with_cancel_and_ops(
3875                                    llm.as_ref(),
3876                                    messages,
3877                                    &mcp_tools,
3878                                    rx_ops,
3879                                    &token,
3880                                    move |e| cb(e),
3881                                )
3882                                .await
3883                        }
3884                    }
3885                }
3886                #[cfg(not(feature = "cancellation"))]
3887                {
3888                    match (chunked, ops_rx) {
3889                        (false, ops) => {
3890                            let cb = std::sync::Arc::clone(&on_event);
3891                            engine
3892                                .run_inner_with_ops(llm.as_ref(), messages, &mcp_tools, &*cb, ops)
3893                                .await
3894                        }
3895                        (true, ops) => {
3896                            let cb = std::sync::Arc::clone(&on_event);
3897                            engine
3898                                .run_streaming_inner(
3899                                    llm.as_ref(),
3900                                    messages,
3901                                    &mcp_tools,
3902                                    ops,
3903                                    move |e| cb(e),
3904                                )
3905                                .await
3906                        }
3907                    }
3908                }
3909            };
3910
3911            let merged_tools = MergedTools::new(&engine.tool_map, &engine.tool_defs, &mcp_tools);
3912            let terminal_result = match engine
3913                .dispatch_on_terminal_from_meta(
3914                    &terminal_meta,
3915                    &final_messages,
3916                    merged_tools.tool_defs(),
3917                    &*on_event,
3918                )
3919                .await
3920            {
3921                Ok(()) => result,
3922                Err(e) => Err(e),
3923            };
3924
3925            // MCP disconnect.
3926            #[cfg(feature = "mcp-client")]
3927            for server in &engine.mcp_servers {
3928                let _ = server.disconnect().await;
3929            }
3930
3931            // CRITICAL: send Terminal with awaiting send (not try_send) to
3932            // guarantee delivery even if the consumer is slow.
3933            let _ = tx
3934                .send(AgentStreamItem::Terminal(AgentTerminal {
3935                    result: terminal_result,
3936                    messages: final_messages,
3937                }))
3938                .await;
3939        });
3940
3941        tokio_stream::wrappers::ReceiverStream::new(rx)
3942    }
3943
3944    // ─── Axis setters ────────────────────────────────────────────
3945
3946    /// Route `AgentOp` values from an external channel into the
3947    /// running turn. Calling twice overwrites the first receiver
3948    /// (last-write-wins).
3949    pub fn ops(mut self, rx: mpsc::Receiver<AgentOp>) -> Self {
3950        self.ops_rx = Some(rx);
3951        self
3952    }
3953
3954    /// Attach a cancellation token. When tripped, the turn exits
3955    /// with `AgentError::Cancelled`. Calling twice overwrites.
3956    #[cfg(feature = "cancellation")]
3957    pub fn cancel(mut self, token: tokio_util::sync::CancellationToken) -> Self {
3958        self.cancel = Some(token);
3959        self
3960    }
3961
3962    /// Use [`LlmClient::chat_stream`] instead of [`LlmClient::chat`],
3963    /// so `CoreEvent::TextChunk` events fire per LLM token delta.
3964    /// Observable via `.callback(cb)` or by draining the stream
3965    /// returned by `.stream()`. Without an event observer, the chunks
3966    /// are still generated but not delivered anywhere.
3967    ///
3968    /// Equivalent to `stream=True` on the OpenAI / Anthropic SDKs.
3969    pub fn chunked(mut self) -> Self {
3970        self.chunked = true;
3971        self
3972    }
3973}
3974
3975impl Engine {
3976    /// Build a run configuration for this agent.
3977    ///
3978    /// Chain axis setters (`.ops(rx)`, `.cancel(token)`, `.chunked()`)
3979    /// in any order, then call one of the terminators (`.result()`,
3980    /// `.callback(cb)`, `.stream()`) to execute the turn.
3981    ///
3982    /// # Example
3983    ///
3984    /// ```ignore
3985    /// use std::sync::Arc;
3986    /// use motosan_agent_loop::{Engine, Message};
3987    ///
3988    /// # async fn demo(llm: Arc<dyn motosan_agent_loop::LlmClient>) -> motosan_agent_loop::Result<()> {
3989    /// let agent = Arc::new(Engine::builder().build());
3990    ///
3991    /// // Simplest: just the final result.
3992    /// let result = Arc::clone(&agent)
3993    ///     .run(llm.clone(), vec![Message::user("Hi!")])
3994    ///     .result()
3995    ///     .await?;
3996    /// # Ok(()) }
3997    /// ```
3998    ///
3999    /// See the [`RunBuilder`] struct documentation for the full set
4000    /// of axis setters and terminators.
4001    pub fn run(self: Arc<Self>, llm: Arc<dyn LlmClient>, messages: Vec<Message>) -> RunBuilder {
4002        RunBuilder {
4003            engine: self,
4004            llm,
4005            messages,
4006            ops_rx: None,
4007            #[cfg(feature = "cancellation")]
4008            cancel: None,
4009            chunked: false,
4010        }
4011    }
4012}
4013
4014#[cfg(test)]
4015#[path = "engine_tests.rs"]
4016mod tests;
4017
4018#[cfg(all(test, feature = "mcp-client"))]
4019mod mcp_integration_tests {
4020    use super::*;
4021    use crate::mcp::McpServer;
4022    use async_trait::async_trait;
4023    use motosan_agent_tool::ToolDef;
4024    use serde_json::json;
4025
4026    struct EchoMcpServer {
4027        name: String,
4028    }
4029
4030    #[async_trait]
4031    impl McpServer for EchoMcpServer {
4032        fn name(&self) -> &str {
4033            &self.name
4034        }
4035        async fn connect(&self) -> crate::Result<()> {
4036            Ok(())
4037        }
4038        async fn list_tools(&self) -> crate::Result<Vec<ToolDef>> {
4039            Ok(vec![ToolDef {
4040                name: "echo".to_string(),
4041                description: "Echo input".to_string(),
4042                input_schema: json!({"type": "object", "properties": {"msg": {"type": "string"}}}),
4043            }])
4044        }
4045        async fn call_tool(&self, _name: &str, args: serde_json::Value) -> crate::Result<String> {
4046            Ok(format!(
4047                "echo: {}",
4048                args.get("msg").and_then(|v| v.as_str()).unwrap_or("")
4049            ))
4050        }
4051        async fn disconnect(&self) -> crate::Result<()> {
4052            Ok(())
4053        }
4054    }
4055
4056    #[test]
4057    fn builder_accepts_mcp_server() {
4058        let agent = Arc::new(
4059            Engine::builder()
4060                .mcp_server(EchoMcpServer {
4061                    name: "test_mcp".to_string(),
4062                })
4063                .max_iterations(5)
4064                .build(),
4065        );
4066        assert_eq!(agent.mcp_servers.len(), 1);
4067    }
4068
4069    #[test]
4070    fn builder_accepts_shared_arc_mcp_server() {
4071        let shared: Arc<dyn McpServer> = Arc::new(EchoMcpServer {
4072            name: "shared_mcp".to_string(),
4073        });
4074
4075        let agent_a = Engine::builder()
4076            .mcp_server_arc(Arc::clone(&shared))
4077            .max_iterations(5)
4078            .build();
4079
4080        let agent_b = Engine::builder()
4081            .mcp_server_arc(Arc::clone(&shared))
4082            .max_iterations(5)
4083            .build();
4084
4085        assert_eq!(agent_a.mcp_servers.len(), 1);
4086        assert_eq!(agent_b.mcp_servers.len(), 1);
4087        assert!(Arc::ptr_eq(
4088            &agent_a.mcp_servers[0],
4089            &agent_b.mcp_servers[0]
4090        ));
4091    }
4092
4093    /// A simple mock LLM for MCP streaming tests.
4094    /// Uses the default `chat_stream` fallback (delegates to `chat`).
4095    struct McpTestLlm {
4096        responses: std::sync::Mutex<Vec<crate::llm::LlmResponse>>,
4097    }
4098
4099    impl McpTestLlm {
4100        fn new(responses: Vec<crate::llm::LlmResponse>) -> Self {
4101            Self {
4102                responses: std::sync::Mutex::new(responses),
4103            }
4104        }
4105    }
4106
4107    #[async_trait]
4108    impl crate::llm::LlmClient for McpTestLlm {
4109        async fn chat(
4110            &self,
4111            _messages: &[Message],
4112            _tools: &[ToolDef],
4113        ) -> crate::Result<crate::llm::ChatOutput> {
4114            let mut responses = self.responses.lock().unwrap();
4115            assert!(!responses.is_empty(), "McpTestLlm: no more responses");
4116            let response = responses.remove(0);
4117            Ok(crate::llm::ChatOutput {
4118                response,
4119                usage: None,
4120                stop_reason: None,
4121            })
4122        }
4123    }
4124
4125    #[tokio::test]
4126    async fn run_streaming_with_mcp_server() {
4127        use crate::llm::{LlmResponse, ToolCallItem};
4128        use std::sync::Mutex;
4129
4130        // LLM first requests the MCP tool (namespaced as server__tool),
4131        // then returns a final message.
4132        // Uses the default chat_stream fallback (delegates to chat).
4133        let llm = Arc::new(McpTestLlm::new(vec![
4134            LlmResponse::ToolCalls(vec![ToolCallItem {
4135                id: "call_mcp".to_string(),
4136                name: "test_mcp__echo".to_string(),
4137                args: json!({"msg": "hello"}),
4138            }]),
4139            LlmResponse::Message("MCP says: echo hello".to_string()),
4140        ]));
4141
4142        let events: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
4143        let events_cb = events.clone();
4144
4145        let agent = Arc::new(
4146            Engine::builder()
4147                .mcp_server(EchoMcpServer {
4148                    name: "test_mcp".to_string(),
4149                })
4150                .max_iterations(5)
4151                .build(),
4152        );
4153
4154        let result = Arc::clone(&agent)
4155            .run(
4156                llm.clone() as Arc<dyn LlmClient>,
4157                vec![Message::user("call echo")],
4158            )
4159            .chunked()
4160            .callback(move |event| {
4161                use crate::extensions::ask_user::AskUserEvent;
4162                let label = match &event {
4163                    AgentEvent::Core(CoreEvent::ToolStarted { name }) => {
4164                        format!("started:{name}")
4165                    }
4166                    AgentEvent::Core(CoreEvent::ToolCompleted { name, result }) => {
4167                        format!("completed:{name}:{}", result.as_text().unwrap_or(""))
4168                    }
4169                    AgentEvent::Core(CoreEvent::TextChunk(t)) => format!("chunk:{t}"),
4170                    AgentEvent::Core(CoreEvent::TextDone(t)) => format!("done:{t}"),
4171                    AgentEvent::Core(CoreEvent::IterationStarted { iteration: n }) => {
4172                        format!("iter:{n}")
4173                    }
4174                    AgentEvent::Core(CoreEvent::Interrupted) => "interrupted".to_string(),
4175                    AgentEvent::Extension(ExtensionEvent::AskUser(AskUserEvent::Question {
4176                        questions,
4177                        ..
4178                    })) => format!(
4179                        "ask_user:{}",
4180                        questions.first().map(|q| q.question.as_str()).unwrap_or("")
4181                    ),
4182                    AgentEvent::Extension(ExtensionEvent::AskUser(AskUserEvent::Timeout {
4183                        call_id,
4184                        ..
4185                    })) => {
4186                        format!("ask_user_timeout:{call_id}")
4187                    }
4188                    _ => format!("{event:?}"),
4189                };
4190                events_cb.lock().unwrap().push(label);
4191            })
4192            .await
4193            .unwrap();
4194
4195        assert_eq!(result.answer, "MCP says: echo hello");
4196        assert_eq!(result.tool_calls.len(), 1);
4197        assert_eq!(result.tool_calls[0].0, "test_mcp__echo");
4198
4199        let events = events.lock().unwrap();
4200        // The MCP tool should have been executed and returned "echo: hello".
4201        assert!(events
4202            .iter()
4203            .any(|e| e.starts_with("completed:test_mcp__echo:echo: hello")));
4204    }
4205
4206    /// MCP server that tracks connect/disconnect calls via shared counters.
4207    struct TrackingMcpServer {
4208        name: String,
4209        fail_connect: bool,
4210        connected: Arc<std::sync::atomic::AtomicBool>,
4211        disconnect_count: Arc<std::sync::atomic::AtomicUsize>,
4212    }
4213
4214    #[async_trait]
4215    impl McpServer for TrackingMcpServer {
4216        fn name(&self) -> &str {
4217            &self.name
4218        }
4219        async fn connect(&self) -> crate::Result<()> {
4220            if self.fail_connect {
4221                Err(crate::AgentError::Mcp("connect failed".into()))
4222            } else {
4223                self.connected
4224                    .store(true, std::sync::atomic::Ordering::SeqCst);
4225                Ok(())
4226            }
4227        }
4228        async fn list_tools(&self) -> crate::Result<Vec<ToolDef>> {
4229            Ok(vec![])
4230        }
4231        async fn call_tool(&self, _name: &str, _args: serde_json::Value) -> crate::Result<String> {
4232            Ok(String::new())
4233        }
4234        async fn disconnect(&self) -> crate::Result<()> {
4235            self.disconnect_count
4236                .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
4237            Ok(())
4238        }
4239    }
4240
4241    #[tokio::test]
4242    async fn partial_connect_failure_disconnects_already_connected() {
4243        use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
4244
4245        let server1_connected = Arc::new(AtomicBool::new(false));
4246        let server1_disconnects = Arc::new(AtomicUsize::new(0));
4247        let server2_disconnects = Arc::new(AtomicUsize::new(0));
4248
4249        let server1: Arc<dyn McpServer> = Arc::new(TrackingMcpServer {
4250            name: "ok_server".to_string(),
4251            fail_connect: false,
4252            connected: server1_connected.clone(),
4253            disconnect_count: server1_disconnects.clone(),
4254        });
4255        let server2: Arc<dyn McpServer> = Arc::new(TrackingMcpServer {
4256            name: "fail_server".to_string(),
4257            fail_connect: true,
4258            connected: Arc::new(AtomicBool::new(false)),
4259            disconnect_count: server2_disconnects.clone(),
4260        });
4261
4262        let agent = Arc::new(
4263            Engine::builder()
4264                .mcp_server_arc(server1)
4265                .mcp_server_arc(server2)
4266                .max_iterations(1)
4267                .build(),
4268        );
4269
4270        let llm = Arc::new(McpTestLlm::new(vec![]));
4271        let err = Arc::clone(&agent)
4272            .run(llm.clone() as Arc<dyn LlmClient>, vec![Message::user("hi")])
4273            .result()
4274            .await
4275            .unwrap_err();
4276
4277        assert!(
4278            matches!(err, crate::AgentError::Mcp(_)),
4279            "expected connect error"
4280        );
4281        // Server 1 was successfully connected, so it must have been disconnected.
4282        assert!(server1_connected.load(Ordering::SeqCst));
4283        assert_eq!(server1_disconnects.load(Ordering::SeqCst), 1);
4284        // Server 2 never connected, so disconnect should not have been called.
4285        assert_eq!(server2_disconnects.load(Ordering::SeqCst), 0);
4286    }
4287}