motosan_agent_loop/core/engine.rs
1use std::collections::HashMap;
2use std::sync::Arc;
3
4use futures::future::{join_all, try_join_all};
5use motosan_agent_tool::{Tool, ToolContext, ToolDef, ToolResult};
6use tokio::sync::mpsc;
7
8use crate::context::ContextProvider;
9use crate::core::decision::FlowDecision;
10use crate::core::event::{AgentEvent, CoreEvent, ExtensionEvent};
11use crate::error::AgentError;
12use crate::llm::{LlmClient, TokenUsage, ToolCallItem};
13use crate::message::{Message, ToolCallRef};
14use crate::Result;
15
16/// The current operating mode of the agent loop.
17///
18/// When plan mode is enabled, the agent can enter `Planning` mode to explore
19/// and brainstorm before committing to an implementation approach.
20#[derive(Debug, Clone, Copy, PartialEq, Eq, Default)]
21pub enum AgentMode {
22 /// Normal execution mode — the agent implements directly.
23 #[default]
24 Normal,
25 /// Planning mode — the agent explores, asks questions, and proposes a plan.
26 Planning,
27}
28
29/// Schedule controlling when `next_step_prompt` is injected.
30///
31/// Used by `StuckDetectionExtension` (in `src/extensions/stuck_detection/`)
32/// to control the cadence at which the think-before-act prompt is prepended
33/// to the conversation.
34///
35/// The default is `Every(1)` (inject every iteration), but injection only
36/// happens when a `next_step_prompt` has been configured on the extension.
37#[derive(Debug, Clone, Copy, PartialEq, Eq)]
38pub enum NextStepSchedule {
39 /// Inject only before the first iteration.
40 FirstOnly,
41 /// Inject every `n` iterations (1 = every iteration, 2 = every other, …).
42 ///
43 /// A value of `0` is treated as `1`.
44 Every(usize),
45}
46
47impl Default for NextStepSchedule {
48 fn default() -> Self {
49 Self::Every(1)
50 }
51}
52
53impl NextStepSchedule {
54 /// Returns `true` if the prompt should be injected for the given
55 /// 1-based `iteration`.
56 // TODO(phase-4): remove once StuckDetectionExtension is the only caller.
57 #[allow(dead_code)]
58 pub(crate) fn should_inject(&self, iteration: usize) -> bool {
59 match self {
60 Self::FirstOnly => iteration == 1,
61 Self::Every(n) => {
62 let n = if *n == 0 { 1 } else { *n };
63 iteration == 1 || (iteration - 1).is_multiple_of(n)
64 }
65 }
66 }
67}
68
69/// Policy for handling channel backpressure when a bounded queue is full.
70#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
71pub enum BackpressurePolicy {
72 /// Block the sender until capacity is available (default).
73 #[default]
74 Block,
75 /// Drop the **incoming** (newest) op when the queue is full.
76 ///
77 /// Note: true drop-oldest semantics would require a ring-buffer channel.
78 /// This variant currently behaves like `Reject` but emits [`CoreEvent::OpDropped`]
79 /// instead of [`CoreEvent::OpRejected`], allowing callers to distinguish intent.
80 ///
81 /// [`CoreEvent::OpDropped`]: crate::core::CoreEvent::OpDropped
82 /// [`CoreEvent::OpRejected`]: crate::core::CoreEvent::OpRejected
83 DropOldest,
84 /// Reject the new item and emit a telemetry event.
85 Reject,
86}
87
88/// Configuration for bounded channel capacities used by [`AgentSession`](crate::AgentSession).
89///
90/// All capacities have sensible defaults (64) and can be overridden via the
91/// builder pattern on [`EngineBuilder`].
92#[derive(Debug, Clone)]
93pub struct ChannelConfig {
94 /// Capacity of the user-input channel (messages sent via `AgentSession::send`).
95 pub input_capacity: usize,
96 /// Capacity of the per-turn operations channel (ops like Interrupt, InjectHint).
97 pub ops_capacity: usize,
98 /// Backpressure policy applied when the ops channel is full.
99 pub ops_backpressure: BackpressurePolicy,
100 /// Capacity of the stream channel used by [`Engine::run`].
101 ///
102 /// `None` means use the default of `256`. The channel is bounded; if it
103 /// fills up, intermediate `Event` items are dropped (best-effort) but the
104 /// terminal item is always delivered (sent via the awaiting `send`).
105 pub stream_capacity: Option<usize>,
106}
107
108impl Default for ChannelConfig {
109 fn default() -> Self {
110 Self {
111 input_capacity: 64,
112 ops_capacity: 64,
113 ops_backpressure: BackpressurePolicy::Block,
114 stream_capacity: None,
115 }
116 }
117}
118
119// ---------------------------------------------------------------------------
120// Pipeline stage helpers
121// ---------------------------------------------------------------------------
122
123/// Resolved tool registry for a single run, combining static and dynamic tools.
124///
125/// This always clones the base maps so it can own the data. When `extra_tools`
126/// is empty the clone is the only cost; when extra tools are present they are
127/// inserted into the cloned maps.
128struct MergedTools {
129 map: HashMap<String, Arc<dyn Tool>>,
130 defs: Vec<ToolDef>,
131}
132
133impl MergedTools {
134 /// Clone base maps and merge any extra tools into them.
135 ///
136 /// Note: this always clones `base_map` and `base_defs` because `MergedTools`
137 /// owns its data. If `extra_tools` is empty the clone is the only cost.
138 fn new(
139 base_map: &HashMap<String, Arc<dyn Tool>>,
140 base_defs: &[ToolDef],
141 extra_tools: &[Arc<dyn Tool>],
142 ) -> Self {
143 if extra_tools.is_empty() {
144 return Self {
145 map: base_map.clone(),
146 defs: base_defs.to_vec(),
147 };
148 }
149 let mut map = base_map.clone();
150 let mut defs = base_defs.to_vec();
151 for t in extra_tools {
152 map.insert(t.def().name.clone(), Arc::clone(t));
153 defs.push(t.def());
154 }
155 Self { map, defs }
156 }
157
158 fn tool_map(&self) -> &HashMap<String, Arc<dyn Tool>> {
159 &self.map
160 }
161
162 fn tool_defs(&self) -> &[ToolDef] {
163 &self.defs
164 }
165}
166
167/// Outcome of processing an LLM text response via
168/// `Engine::handle_text_response`.
169enum TextResponseOutcome {
170 /// The turn should finalize with this text as the answer. The caller
171 /// still needs to push `Message::assistant(text)` onto the state
172 /// and build the `AgentResult` (the helper doesn't do this because
173 /// the caller returns different types — `AgentResult` vs the
174 /// `(Result<AgentResult>, Vec<Message>)` tuple).
175 Finalize(String),
176 /// An extension injected a continuation message. The helper already
177 /// pushed `Message::assistant(text)` and `state.messages.push(msg)`,
178 /// and updated `state.continuation_text`. The caller should loop back
179 /// to another LLM call immediately.
180 Continue,
181 /// An extension halted the turn. The caller should return
182 /// `Err(AgentError::Internal(...))` with the halt reason's message.
183 Halt(String),
184}
185
186/// Accumulator for per-run mutable state shared across turn iterations.
187struct RunTerminalMeta {
188 turn_result: crate::core::extension::TurnResult,
189 iteration: usize,
190 token_usage: TokenUsage,
191}
192
193struct TurnState {
194 messages: Vec<Message>,
195 all_tool_calls: Vec<(String, serde_json::Value)>,
196 total_usage: TokenUsage,
197 /// Accumulated text across all token-budget continuation segments.
198 continuation_text: String,
199}
200
201impl TurnState {
202 fn new(messages: Vec<Message>) -> Self {
203 Self {
204 messages,
205 all_tool_calls: Vec::new(),
206 total_usage: TokenUsage::default(),
207 continuation_text: String::new(),
208 }
209 }
210
211 /// Record token usage from an LLM call.
212 fn accumulate_usage(&mut self, usage: Option<TokenUsage>) {
213 if let Some(u) = usage {
214 self.total_usage.accumulate(u);
215 }
216 }
217
218 /// Build the final result after the LLM produces a text answer.
219 ///
220 /// If there were continuations, the accumulated continuation text is
221 /// prepended to the final segment so the full answer is returned.
222 fn into_result(self, answer: String, iteration: usize) -> AgentResult {
223 let full_answer = if self.continuation_text.is_empty() {
224 answer
225 } else {
226 let mut buf = self.continuation_text;
227 buf.push_str(&answer);
228 buf
229 };
230 AgentResult {
231 answer: full_answer,
232 tool_calls: self.all_tool_calls,
233 iterations: iteration,
234 usage: self.total_usage,
235 messages: self.messages,
236 }
237 }
238}
239
240impl RunTerminalMeta {
241 fn success(agent_result: &AgentResult) -> Self {
242 Self {
243 turn_result: crate::core::extension::TurnResult::Success,
244 iteration: agent_result.iterations,
245 token_usage: agent_result.usage,
246 }
247 }
248
249 fn interrupted(iteration: usize, token_usage: TokenUsage) -> Self {
250 Self {
251 turn_result: crate::core::extension::TurnResult::Interrupted,
252 iteration,
253 token_usage,
254 }
255 }
256
257 fn failure(iteration: usize, token_usage: TokenUsage, err: &AgentError) -> Self {
258 Self {
259 turn_result: crate::core::extension::TurnResult::Failure(err.to_string()),
260 iteration,
261 token_usage,
262 }
263 }
264}
265
266fn run_return_success(
267 res: AgentResult,
268 snapshot: Vec<Message>,
269) -> (Result<AgentResult>, Vec<Message>, RunTerminalMeta) {
270 let meta = RunTerminalMeta::success(&res);
271 (Ok(res), snapshot, meta)
272}
273
274fn run_return_interrupted(
275 res: AgentResult,
276 snapshot: Vec<Message>,
277) -> (Result<AgentResult>, Vec<Message>, RunTerminalMeta) {
278 let meta = RunTerminalMeta::interrupted(res.iterations, res.usage);
279 (Ok(res), snapshot, meta)
280}
281
282fn run_return_failure(
283 err: AgentError,
284 messages: Vec<Message>,
285 iteration: usize,
286 token_usage: TokenUsage,
287) -> (Result<AgentResult>, Vec<Message>, RunTerminalMeta) {
288 let meta = RunTerminalMeta::failure(iteration, token_usage, &err);
289 (Err(err), messages, meta)
290}
291
292/// Stage: emit ToolStarted events for all items in a batch.
293fn emit_tool_started(items: &[ToolCallItem], on_event: &(impl Fn(AgentEvent) + Send + Sync)) {
294 for tc in items {
295 on_event(AgentEvent::Core(CoreEvent::ToolStarted {
296 name: tc.name.clone(),
297 }));
298 }
299}
300
301/// Merge streamed tool results with batch-executed results for any tools that
302/// were not streamed, reassembling everything in the canonical order from the
303/// `Done` payload.
304///
305/// Returns `(final_items, final_results)` in the same order as `canonical_items`.
306async fn merge_streamed_tool_results(
307 canonical_items: &[ToolCallItem],
308 streamed_items: Vec<ToolCallItem>,
309 streamed_results: Vec<ToolResult>,
310 tool_map: &HashMap<String, Arc<dyn Tool>>,
311 tool_timeout: Option<std::time::Duration>,
312 tool_context: &ToolContext,
313 on_event: &(impl Fn(AgentEvent) + Send + Sync),
314) -> (Vec<ToolCallItem>, Vec<ToolResult>) {
315 let streamed_ids: std::collections::HashSet<&str> =
316 streamed_items.iter().map(|i| i.id.as_str()).collect();
317
318 // Find tools in the canonical payload that were NOT streamed.
319 let remaining: Vec<ToolCallItem> = canonical_items
320 .iter()
321 .filter(|item| !streamed_ids.contains(item.id.as_str()))
322 .cloned()
323 .collect();
324
325 // Build lookup of streamed results by ID.
326 let streamed_map: HashMap<&str, ToolResult> = streamed_items
327 .iter()
328 .zip(streamed_results)
329 .map(|(item, result)| (item.id.as_str(), result))
330 .collect();
331
332 // Execute remaining tools (if any) and build their lookup.
333 let remaining_map: HashMap<String, ToolResult> = if !remaining.is_empty() {
334 emit_tool_started(&remaining, on_event);
335 let remaining_results =
336 Engine::execute_tools_parallel(tool_map, &remaining, tool_timeout, tool_context).await;
337 remaining
338 .iter()
339 .zip(remaining_results)
340 .map(|(item, result)| (item.id.clone(), result))
341 .collect()
342 } else {
343 HashMap::new()
344 };
345
346 // Reassemble in canonical order, with a defensive fallback for missing IDs.
347 let mut final_items = Vec::with_capacity(canonical_items.len());
348 let mut final_results = Vec::with_capacity(canonical_items.len());
349
350 for item in canonical_items {
351 let result = if let Some(r) = streamed_map.get(item.id.as_str()).cloned() {
352 r
353 } else if let Some(r) = remaining_map.get(item.id.as_str()).cloned() {
354 r
355 } else {
356 // Defensive: this should never happen if the backend follows the
357 // contract, but we must not silently drop a tool call — the LLM
358 // expects a result for every call it made.
359 ToolResult::error(format!(
360 "internal error: no result for tool call '{}'",
361 item.id
362 ))
363 };
364 final_items.push(item.clone());
365 final_results.push(result);
366 }
367
368 (final_items, final_results)
369}
370
371/// A single question within an `ask_user` interaction.
372#[derive(Debug, Clone)]
373pub struct AskUserQuestion {
374 /// The question text to display to the user.
375 pub question: String,
376 /// An optional short header (max 12 chars) for grouping or labeling.
377 pub header: Option<String>,
378 /// The available options for the user to choose from.
379 pub options: Vec<AskUserOption>,
380 /// Whether the user can select multiple options.
381 pub multi_select: bool,
382}
383
384/// A single option within an [`AskUserQuestion`].
385#[derive(Debug, Clone)]
386pub struct AskUserOption {
387 /// The display label for this option.
388 pub label: String,
389 /// An optional longer description of this option.
390 pub description: Option<String>,
391 /// An optional preview string (e.g. a code snippet or summary).
392 pub preview: Option<String>,
393}
394
395/// Commands that can be sent into a running [`Engine`].
396#[derive(Debug, Clone)]
397pub enum AgentOp {
398 /// Stop the current turn at the next safe checkpoint.
399 Interrupt,
400 /// Append a user message before the next LLM iteration.
401 InjectUserMessage(String),
402 /// Append a user-visible note before the next LLM iteration.
403 InjectHint(String),
404 /// Answer a pending `ask_user` request.
405 AskUserAnswer {
406 call_id: Option<String>,
407 answer: String,
408 },
409 /// Approve or reject a pending plan from `exit_plan_mode`.
410 ApprovePlan {
411 /// Whether the plan is approved.
412 approved: bool,
413 /// Optional feedback (used when rejecting).
414 feedback: Option<String>,
415 },
416}
417
418/// The final outcome produced by a completed agent run — the return
419/// type of [`RunBuilder::result`] and [`RunBuilder::callback`], and the
420/// payload carried by the terminal item of [`RunBuilder::stream`].
421#[derive(Debug, Clone)]
422pub struct AgentResult {
423 /// The assistant's final textual answer.
424 pub answer: String,
425 /// History of tool calls made: (tool_name, arguments).
426 pub tool_calls: Vec<(String, serde_json::Value)>,
427 /// Number of LLM round-trips performed.
428 pub iterations: usize,
429 /// Accumulated token usage across all LLM calls.
430 pub usage: TokenUsage,
431 /// Full conversation history including tool call/result pairs.
432 ///
433 /// Callers can pass this to a subsequent `run()` call to continue a
434 /// multi-turn conversation.
435 pub messages: Vec<Message>,
436}
437
438/// Builder for constructing an [`Engine`] with validated configuration.
439pub struct EngineBuilder {
440 tools: Vec<Arc<dyn Tool>>,
441 context_providers: Vec<Box<dyn ContextProvider>>,
442 max_iterations: usize,
443 tool_timeout: Option<std::time::Duration>,
444 tool_context: Option<ToolContext>,
445 channel_config: ChannelConfig,
446 #[cfg(feature = "mcp-client")]
447 mcp_servers: Vec<Arc<dyn crate::mcp::McpServer>>,
448 /// Extensions pending registration; drained into `ExtensionSet` in `build()`.
449 pending_extensions: Vec<(Box<dyn crate::core::Extension>, crate::core::ErrorPolicy)>,
450}
451
452impl EngineBuilder {
453 /// Set the maximum number of LLM round-trips before aborting.
454 pub fn max_iterations(mut self, n: usize) -> Self {
455 self.max_iterations = n;
456 self
457 }
458
459 /// Register a tool with the agent loop.
460 pub fn tool(mut self, tool: Arc<dyn Tool>) -> Self {
461 self.tools.push(tool);
462 self
463 }
464
465 /// Register multiple tools at once.
466 pub fn tools(mut self, tools: impl IntoIterator<Item = Arc<dyn Tool>>) -> Self {
467 self.tools.extend(tools);
468 self
469 }
470
471 /// Set a static system prompt that is injected as a `System` message
472 /// at the beginning of every conversation.
473 ///
474 /// This is a convenience shortcut for registering a [`ContextProvider`]
475 /// that always returns the given string.
476 pub fn system_prompt(self, prompt: impl Into<String>) -> Self {
477 self.context(crate::context::StringContextProvider(prompt.into()))
478 }
479
480 /// Register a context provider that injects dynamic context into the
481 /// conversation before each [`Engine::run`] call.
482 ///
483 /// Multiple providers can be registered; they are invoked in registration
484 /// order and each non-empty result becomes a `System` message.
485 pub fn context(mut self, provider: impl ContextProvider + 'static) -> Self {
486 self.context_providers.push(Box::new(provider));
487 self
488 }
489
490 /// Register multiple context providers at once.
491 ///
492 /// This is the batch equivalent of [`context()`](Self::context), mirroring
493 /// the [`tools()`](Self::tools) / [`tool()`](Self::tool) pattern.
494 pub fn contexts(
495 mut self,
496 providers: impl IntoIterator<Item = Box<dyn ContextProvider>>,
497 ) -> Self {
498 self.context_providers.extend(providers);
499 self
500 }
501
502 /// Register an MCP server whose tools will be connected lazily on `run()`.
503 ///
504 /// Requires the `mcp-client` feature.
505 #[cfg(feature = "mcp-client")]
506 pub fn mcp_server(mut self, server: impl crate::mcp::McpServer + 'static) -> Self {
507 self.mcp_servers.push(std::sync::Arc::new(server));
508 self
509 }
510
511 /// Register a pre-shared `Arc<dyn McpServer>` whose tools will be connected
512 /// lazily on `run()`.
513 ///
514 /// This is useful when the same MCP server instance must be shared between
515 /// multiple [`Engine`]s or retained by the caller.
516 ///
517 /// Requires the `mcp-client` feature.
518 #[cfg(feature = "mcp-client")]
519 pub fn mcp_server_arc(mut self, server: std::sync::Arc<dyn crate::mcp::McpServer>) -> Self {
520 self.mcp_servers.push(server);
521 self
522 }
523
524 /// Set a per-tool-call timeout. When a tool takes longer than `duration`
525 /// it returns a `ToolResult::error("timed out")` instead of blocking.
526 ///
527 /// By default there is no timeout.
528 pub fn tool_timeout(mut self, duration: std::time::Duration) -> Self {
529 self.tool_timeout = Some(duration);
530 self
531 }
532
533 /// Set a custom [`ToolContext`] that will be passed to every tool invocation.
534 ///
535 /// By default, [`ToolContext::default()`] is used. Use this when tools need
536 /// access to session-level state such as a working directory or environment
537 /// variables.
538 pub fn tool_context(mut self, ctx: ToolContext) -> Self {
539 self.tool_context = Some(ctx);
540 self
541 }
542
543 /// Set the full channel configuration for bounded queues.
544 ///
545 /// See [`ChannelConfig`] for details on each field.
546 pub fn channel_config(mut self, config: ChannelConfig) -> Self {
547 self.channel_config = config;
548 self
549 }
550
551 /// Set the capacity of the user-input channel.
552 ///
553 /// Default: 64.
554 pub fn input_channel_capacity(mut self, capacity: usize) -> Self {
555 self.channel_config.input_capacity = capacity;
556 self
557 }
558
559 /// Set the capacity of the per-turn operations channel.
560 ///
561 /// Default: 64.
562 pub fn ops_channel_capacity(mut self, capacity: usize) -> Self {
563 self.channel_config.ops_capacity = capacity;
564 self
565 }
566
567 /// Set the backpressure policy for the operations channel.
568 ///
569 /// Default: [`BackpressurePolicy::Block`].
570 pub fn ops_backpressure(mut self, policy: BackpressurePolicy) -> Self {
571 self.channel_config.ops_backpressure = policy;
572 self
573 }
574
575 /// Set the capacity of the stream channel used by
576 /// [`Engine::run`].
577 ///
578 /// Default: `256`. The channel is bounded; intermediate `Event` items may
579 /// be dropped (best-effort) if the consumer is slower than the producer,
580 /// but the terminal item is always delivered.
581 pub fn stream_capacity(mut self, capacity: usize) -> Self {
582 assert!(capacity > 0, "stream_capacity must be >= 1");
583 self.channel_config.stream_capacity = Some(capacity);
584 self
585 }
586
587 /// Register an extension. See [`crate::core::Extension`] and the
588 /// extension architecture spec for usage. Extensions are dispatched
589 /// in registration order.
590 pub fn extension(mut self, ext: Box<dyn crate::core::Extension>) -> Self {
591 self.pending_extensions
592 .push((ext, crate::core::ErrorPolicy::Fallback));
593 self
594 }
595
596 /// Register an extension with an explicit [`crate::core::ErrorPolicy`].
597 ///
598 /// `ErrorPolicy::Fallback` (the default for [`Self::extension`]) logs hook
599 /// errors and treats them as the hook's default value. `ErrorPolicy::Abort`
600 /// propagates hook errors as turn failures via [`AgentError::Internal`].
601 pub fn extension_with_policy(
602 mut self,
603 ext: Box<dyn crate::core::Extension>,
604 policy: crate::core::ErrorPolicy,
605 ) -> Self {
606 self.pending_extensions.push((ext, policy));
607 self
608 }
609
610 /// Register the built-in extensions from a declarative
611 /// [`crate::extensions::config::ExtensionsConfig`] plus runtime
612 /// [`crate::extensions::config::ExtensionWiring`].
613 ///
614 /// This is a one-call convenience over [`Self::extension`]. The
615 /// builder reuses its own [`Self::tool_context`] so callers don't
616 /// pass it twice — set `.tool_context(...)` **before** calling
617 /// `with_extensions_config` if you need a non-default context.
618 /// The context is snapshotted at call time; a `.tool_context()`
619 /// call placed *after* this method will not reach the extensions
620 /// registered by this call (though it will still apply to the
621 /// resulting `Engine`).
622 ///
623 /// Returns `Err` if the config/wiring pair is invalid (see
624 /// [`crate::extensions::config::ExtensionBuildError`]).
625 ///
626 /// This method is fully compatible with the direct
627 /// [`Self::extension`] path — you can mix config-driven
628 /// registration with manual `.extension(...)` calls in either
629 /// order.
630 pub fn with_extensions_config(
631 mut self,
632 config: crate::extensions::config::ExtensionsConfig,
633 wiring: crate::extensions::config::ExtensionWiring,
634 ) -> std::result::Result<Self, crate::extensions::config::ExtensionBuildError> {
635 let tool_context = self.tool_context.clone().unwrap_or_default();
636 let set = crate::extensions::config::build_extension_set(&config, wiring, tool_context)?;
637 self.pending_extensions.extend(set.into_inner());
638 Ok(self)
639 }
640
641 /// Consume the builder and produce an [`Engine`].
642 ///
643 /// # Panics
644 ///
645 /// Panics if `input_capacity` or `ops_capacity` is zero, since
646 /// `tokio::sync::mpsc::channel(0)` panics at runtime.
647 pub fn build(self) -> Engine {
648 assert!(
649 self.channel_config.input_capacity > 0,
650 "input_capacity must be >= 1"
651 );
652 assert!(
653 self.channel_config.ops_capacity > 0,
654 "ops_capacity must be >= 1"
655 );
656 let tool_map: HashMap<String, Arc<dyn Tool>> = self
657 .tools
658 .iter()
659 .map(|t| (t.def().name.clone(), Arc::clone(t)))
660 .collect();
661 let mut tool_defs: Vec<ToolDef> = self.tools.iter().map(|t| t.def()).collect();
662 // Aggregate tool defs from registered extensions (tool_defs() trait method).
663 // This lets extensions expose their tools (ask_user, enter/exit_plan_mode,
664 // planning, etc.) without any builder-level configuration.
665 for (ext, _) in &self.pending_extensions {
666 tool_defs.extend(ext.tool_defs());
667 }
668 let mut extensions = crate::core::ExtensionSet::new();
669 for (ext, policy) in self.pending_extensions {
670 extensions
671 .add_with_policy(ext, policy)
672 .unwrap_or_else(|e| panic!("duplicate extension name: {}", e.message()));
673 }
674
675 Engine {
676 tool_map,
677 tool_defs,
678 context_providers: self.context_providers,
679 max_iterations: self.max_iterations,
680 tool_timeout: self.tool_timeout,
681 tool_context: self.tool_context.unwrap_or_default(),
682 channel_config: self.channel_config,
683 extensions: tokio::sync::Mutex::new(extensions),
684 deferred_calls: tokio::sync::Mutex::new(std::collections::HashMap::new()),
685 #[cfg(feature = "mcp-client")]
686 mcp_servers: self.mcp_servers,
687 #[cfg(test)]
688 test_parked_notify: std::sync::Mutex::new(None),
689 }
690 }
691}
692
693/// The core ReAct agent loop.
694///
695/// Drives an LLM through iterative reasoning and tool execution until
696/// the model produces a final text answer or the iteration limit is
697/// reached.
698///
699/// # Starting a turn
700///
701/// Every agent turn goes through [`Engine::run`], which returns a
702/// [`RunBuilder`]. Chain axis setters (`.ops`, `.cancel`, `.chunked`)
703/// in any order, then call one of the three terminators
704/// (`.result`, `.callback`, `.stream`) to execute the turn:
705///
706/// ```ignore
707/// use std::sync::Arc;
708/// use motosan_agent_loop::{Engine, Message};
709///
710/// # async fn demo(llm: Arc<dyn motosan_agent_loop::LlmClient>) -> motosan_agent_loop::Result<()> {
711/// let agent = Arc::new(Engine::builder().build());
712///
713/// // Simplest: just the final result.
714/// let result = Arc::clone(&agent)
715/// .run(llm.clone(), vec![Message::user("Hi!")])
716/// .result()
717/// .await?;
718///
719/// // Live token-by-token UI with AskUser support:
720/// # let (_tx, ops_rx) = tokio::sync::mpsc::channel(8);
721/// Arc::clone(&agent)
722/// .run(llm, vec![Message::user("Ask me a question")])
723/// .chunked()
724/// .ops(ops_rx)
725/// .callback(|event| println!("{event:?}"))
726/// .await?;
727/// # Ok(()) }
728/// ```
729///
730/// See [`RunBuilder`] for the full axis/terminator matrix and
731/// [`Engine::run`] for the entry point.
732///
733/// The loop does **not** own the LLM client; instead, [`RunBuilder`]
734/// takes `Arc<dyn LlmClient>` so the same loop can be reused with
735/// different backends.
736pub struct Engine {
737 /// Pre-built lookup map for static tools (excludes per-run MCP tools).
738 tool_map: HashMap<String, Arc<dyn Tool>>,
739 /// Pre-built tool definitions for static tools (excludes per-run MCP tools).
740 tool_defs: Vec<ToolDef>,
741 context_providers: Vec<Box<dyn ContextProvider>>,
742 max_iterations: usize,
743 /// Optional per-tool-call timeout. When set, any tool that takes longer
744 /// returns `ToolResult::error("timed out")` rather than blocking indefinitely.
745 pub(crate) tool_timeout: Option<std::time::Duration>,
746 /// Context passed to every tool invocation.
747 tool_context: ToolContext,
748 /// Channel configuration for bounded queues and backpressure.
749 channel_config: ChannelConfig,
750 /// Registered extensions dispatched on each iteration. Wrapped in a
751 /// `Mutex` so hooks can be called through the `&self` methods of `Engine`.
752 extensions: tokio::sync::Mutex<crate::core::ExtensionSet>,
753 /// Tool calls suspended by extensions via `ToolDecision::Defer`,
754 /// awaiting a matching `OpDecision::ResumeDeferred` from an
755 /// extension's `on_op` hook. See spec §8.3.
756 ///
757 /// `tokio::sync::Mutex` because both `intercept_tool_call` (writes
758 /// new entries) and `on_op` (removes resolved entries) can run
759 /// from different async contexts within a single turn.
760 #[allow(dead_code)] // Used in Phase 3B Tasks 4-5
761 deferred_calls: tokio::sync::Mutex<std::collections::HashMap<String, DeferredCall>>,
762 #[cfg(feature = "mcp-client")]
763 mcp_servers: Vec<Arc<dyn crate::mcp::McpServer>>,
764
765 /// Test-only hook fired by `resolve_deferred_slots` each time it
766 /// is about to park on `ops_rx.recv()`. Lets tests synchronize on
767 /// "engine is now waiting for an op" without wall-clock sleeps.
768 /// Set via `Engine::set_test_parked_notify`. Never touched in
769 /// release builds.
770 ///
771 /// Note: this hook fires on EVERY park, not just the first. If a
772 /// deferred-slot resolution takes multiple iterations (e.g. a tool
773 /// call that defers again after its first resolve), the notifier
774 /// fires once per iteration. Tests that need to distinguish first-
775 /// park from subsequent parks should use a dedicated Notify per
776 /// expected park point, or set the field to `None` between assertions.
777 #[cfg(test)]
778 test_parked_notify: std::sync::Mutex<Option<Arc<tokio::sync::Notify>>>,
779}
780
781/// Result from consuming a streaming LLM response.
782struct ConsumeStreamResult {
783 accumulated_text: String,
784 response: crate::LlmResponse,
785 streaming_results: Option<(Vec<ToolCallItem>, Vec<ToolResult>)>,
786 stop_reason: Option<crate::llm::StopReason>,
787 /// Output tokens consumed by this stream call (delta from before/after).
788 output_tokens: Option<u64>,
789}
790
791impl Engine {
792 /// Create a builder with default settings.
793 pub fn builder() -> EngineBuilder {
794 EngineBuilder {
795 tools: Vec::new(),
796 context_providers: Vec::new(),
797 max_iterations: 10,
798 tool_timeout: None,
799 tool_context: None,
800 channel_config: ChannelConfig::default(),
801 #[cfg(feature = "mcp-client")]
802 mcp_servers: Vec::new(),
803 pending_extensions: Vec::new(),
804 }
805 }
806
807 /// Returns the channel configuration for this agent loop.
808 ///
809 /// Used by [`AgentSession`](crate::AgentSession) to create bounded
810 /// channels with the configured capacities and policies.
811 pub fn channel_config(&self) -> &ChannelConfig {
812 &self.channel_config
813 }
814
815 /// Install a notifier fired each time `resolve_deferred_slots`
816 /// is about to park on `ops_rx.recv()`. The same notifier is
817 /// reused across parks within a single turn; see the field doc
818 /// on `test_parked_notify` for multi-iteration caveats.
819 /// Test-only — not part of the public API.
820 #[cfg(test)]
821 #[allow(dead_code)]
822 pub(crate) fn set_test_parked_notify(&self, notify: Arc<tokio::sync::Notify>) {
823 *self.test_parked_notify.lock().unwrap() = Some(notify);
824 }
825
826 /// Connect all MCP servers, collecting their tools.
827 ///
828 /// If any server fails to connect, all previously-connected servers are
829 /// disconnected (best-effort) before the error is returned.
830 #[cfg(feature = "mcp-client")]
831 async fn connect_mcp_servers(&self) -> Result<Vec<Arc<dyn Tool>>> {
832 use crate::mcp::adapter::McpToolAdapter;
833
834 let mut connected: Vec<&Arc<dyn crate::mcp::McpServer>> = Vec::new();
835 let mut tools: Vec<Arc<dyn Tool>> = Vec::new();
836
837 for server in &self.mcp_servers {
838 match server.connect().await {
839 Ok(()) => {
840 connected.push(server);
841 match McpToolAdapter::from_server(Arc::clone(server)).await {
842 Ok(adapter_tools) => tools.extend(adapter_tools),
843 Err(e) => {
844 for s in &connected {
845 let _ = s.disconnect().await;
846 }
847 return Err(e);
848 }
849 }
850 }
851 Err(e) => {
852 for s in &connected {
853 let _ = s.disconnect().await;
854 }
855 return Err(e);
856 }
857 }
858 }
859
860 Ok(tools)
861 }
862
863 #[cfg(feature = "cancellation")]
864 async fn run_streaming_with_cancel_inner(
865 &self,
866 llm: &dyn LlmClient,
867 messages: Vec<Message>,
868 extra_tools: &[Arc<dyn Tool>],
869 on_event: impl Fn(AgentEvent) + Send + Sync,
870 cancel: &tokio_util::sync::CancellationToken,
871 ) -> (Result<AgentResult>, Vec<Message>, RunTerminalMeta) {
872 use futures::StreamExt;
873
874 let tools = MergedTools::new(&self.tool_map, &self.tool_defs, extra_tools);
875 let prepared = match self.prepare_messages(messages).await {
876 Ok(m) => m,
877 Err(e) => return run_return_failure(e, Vec::new(), 0, TokenUsage::default()),
878 };
879 let mut state = TurnState::new(prepared);
880
881 for iteration in 1..=self.max_iterations {
882 // Pre-turn: cancellation check.
883 if cancel.is_cancelled() {
884 return run_return_failure(
885 AgentError::Cancelled,
886 state.messages,
887 iteration,
888 state.total_usage,
889 );
890 }
891 on_event(AgentEvent::Core(CoreEvent::IterationStarted { iteration }));
892
893 // before_iteration extension hook (after IterationStarted, before request build).
894 if let Err(e) = self
895 .dispatch_before_iteration(iteration, &mut state.messages, &on_event)
896 .await
897 {
898 return run_return_failure(e, state.messages, iteration, state.total_usage);
899 }
900
901 // Autocompact check before LLM call (after IterationStarted).
902 if let Err(e) = self
903 .maybe_compact_via_extensions(&mut state.messages, &on_event)
904 .await
905 {
906 return run_return_failure(e, state.messages, iteration, state.total_usage);
907 }
908
909 loop {
910 // Streaming LLM step with cancellation race and eager tool execution.
911 let (
912 accumulated_text,
913 response,
914 streaming_results,
915 stream_stop_reason,
916 output_tokens,
917 ) = {
918 let mut stream = llm.chat_stream(&state.messages, tools.tool_defs());
919 let mut accumulated = String::new();
920 let mut final_response: Option<crate::LlmResponse> = None;
921 let mut executor = crate::streaming_executor::StreamingToolExecutor::new();
922 let mut submitted_ids: std::collections::HashSet<String> =
923 std::collections::HashSet::new();
924 let mut stop_reason: Option<crate::llm::StopReason> = None;
925
926 // Snapshot output tokens before this stream to compute the delta.
927 let output_tokens_before = state.total_usage.output_tokens;
928
929 loop {
930 tokio::select! {
931 chunk_opt = stream.next() => {
932 match chunk_opt {
933 Some(chunk_result) => {
934 let chunk = match chunk_result {
935 Ok(c) => c,
936 Err(e) => return run_return_failure(e, state.messages.clone(), iteration, state.total_usage),
937 };
938 match chunk {
939 crate::llm::StreamChunk::TextDelta(delta) => {
940 accumulated.push_str(&delta);
941 on_event(AgentEvent::Core(CoreEvent::TextChunk(delta)));
942 }
943 crate::llm::StreamChunk::ToolUse { id, name, args } => {
944 if !submitted_ids.contains(&id) {
945 let item = crate::llm::ToolCallItem {
946 id: id.clone(),
947 name: name.clone(),
948 args,
949 };
950 on_event(AgentEvent::Core(CoreEvent::ToolStarted {
951 name: name.clone(),
952 }));
953 executor.submit(
954 item,
955 tools.tool_map(),
956 self.tool_timeout,
957 &self.tool_context,
958 );
959 submitted_ids.insert(id);
960 }
961 }
962 crate::llm::StreamChunk::Done(resp) => {
963 final_response = Some(resp);
964 }
965 crate::llm::StreamChunk::Usage(usage) => {
966 state.total_usage.accumulate(usage);
967 }
968 crate::llm::StreamChunk::StopReason(reason) => {
969 stop_reason = Some(reason);
970 }
971 }
972 }
973 None => break,
974 }
975 }
976 _ = cancel.cancelled() => {
977 return run_return_failure(AgentError::Cancelled, state.messages.clone(), iteration, state.total_usage);
978 }
979 }
980 }
981
982 let resp = final_response
983 .unwrap_or_else(|| crate::LlmResponse::Message(accumulated.clone()));
984 let sr = if executor.has_pending() {
985 Some(executor.collect().await)
986 } else {
987 None
988 };
989 let delta = state.total_usage.output_tokens - output_tokens_before;
990 let ot = if delta > 0 { Some(delta) } else { None };
991 (accumulated, resp, sr, stop_reason, ot)
992 };
993
994 match response {
995 crate::LlmResponse::Message(text) => {
996 if accumulated_text.is_empty() && !text.is_empty() {
997 on_event(AgentEvent::Core(CoreEvent::TextChunk(text.clone())));
998 }
999 on_event(AgentEvent::Core(CoreEvent::TextDone(text.clone())));
1000
1001 let meta = crate::llm::LlmResponseMeta {
1002 stop_reason: stream_stop_reason,
1003 output_tokens,
1004 };
1005 let outcome = match self
1006 .handle_text_response(text, meta, &mut state, &on_event)
1007 .await
1008 {
1009 Ok(o) => o,
1010 Err(e) => {
1011 return run_return_failure(
1012 e,
1013 state.messages,
1014 iteration,
1015 state.total_usage,
1016 )
1017 }
1018 };
1019
1020 match outcome {
1021 TextResponseOutcome::Continue => continue,
1022 TextResponseOutcome::Halt(msg) => {
1023 return run_return_failure(
1024 AgentError::Internal(format!(
1025 "Turn halted by extension: {}",
1026 msg
1027 )),
1028 state.messages,
1029 iteration,
1030 state.total_usage,
1031 );
1032 }
1033 TextResponseOutcome::Finalize(text) => {
1034 state.messages.push(Message::assistant(&text));
1035 let res = state.into_result(text, iteration);
1036 let snapshot = res.messages.clone();
1037 return run_return_success(res, snapshot);
1038 }
1039 }
1040 }
1041 crate::LlmResponse::ToolCalls(items) => {
1042 if let Some((streamed_items, streamed_results)) = streaming_results {
1043 let (final_items, final_results) = merge_streamed_tool_results(
1044 &items,
1045 streamed_items,
1046 streamed_results,
1047 tools.tool_map(),
1048 self.tool_timeout,
1049 &self.tool_context,
1050 &on_event,
1051 )
1052 .await;
1053 if let Err(e) = self
1054 .finalize_tool_call_batch(
1055 &final_items,
1056 final_results,
1057 iteration,
1058 tools.tool_defs(),
1059 &mut state,
1060 &on_event,
1061 )
1062 .await
1063 {
1064 return run_return_failure(
1065 e,
1066 state.messages,
1067 iteration,
1068 state.total_usage,
1069 );
1070 }
1071 } else {
1072 emit_tool_started(&items, &on_event);
1073 let slots = self.dispatch_intercept_tool_calls(&items, &on_event).await;
1074 // Resolve any Deferred slots. No ops channel in this path.
1075 let mut tmp_ops_state = OpsState::default();
1076 let slots = match self
1077 .resolve_deferred_slots(
1078 slots,
1079 &mut tmp_ops_state,
1080 &mut None,
1081 &on_event,
1082 )
1083 .await
1084 {
1085 Ok(s) => s,
1086 Err(e) => {
1087 return run_return_failure(
1088 e,
1089 state.messages,
1090 iteration,
1091 state.total_usage,
1092 )
1093 }
1094 };
1095 let mut remaining_items: Vec<crate::llm::ToolCallItem> = Vec::new();
1096 let mut remaining_indices: Vec<usize> = Vec::new();
1097 let mut final_results: Vec<Option<motosan_agent_tool::ToolResult>> =
1098 slots
1099 .into_iter()
1100 .map(|s| match s {
1101 InterceptedSlot::Resolved(r) => Some(r),
1102 InterceptedSlot::Pending => None,
1103 InterceptedSlot::Deferred(_) => {
1104 unreachable!(
1105 "InterceptedSlot::Deferred should have been \
1106 resolved by resolve_deferred_slots"
1107 );
1108 }
1109 })
1110 .collect();
1111 for (i, item) in items.iter().enumerate() {
1112 if final_results[i].is_none() {
1113 remaining_items.push(item.clone());
1114 remaining_indices.push(i);
1115 }
1116 }
1117 let execution_results = Self::execute_tools_parallel(
1118 tools.tool_map(),
1119 &remaining_items,
1120 self.tool_timeout,
1121 &self.tool_context,
1122 )
1123 .await;
1124 for (exec_idx, original_idx) in remaining_indices.iter().enumerate() {
1125 final_results[*original_idx] =
1126 Some(execution_results[exec_idx].clone());
1127 }
1128 let results: Vec<motosan_agent_tool::ToolResult> = final_results
1129 .into_iter()
1130 .map(|opt| opt.expect("all slots filled after intercept + execute"))
1131 .collect();
1132 if let Err(e) = self
1133 .finalize_tool_call_batch(
1134 &items,
1135 results,
1136 iteration,
1137 tools.tool_defs(),
1138 &mut state,
1139 &on_event,
1140 )
1141 .await
1142 {
1143 return run_return_failure(
1144 e,
1145 state.messages,
1146 iteration,
1147 state.total_usage,
1148 );
1149 }
1150 }
1151 break; // Back to outer iteration loop.
1152 }
1153 }
1154 }
1155 }
1156
1157 run_return_failure(
1158 AgentError::MaxIterations(self.max_iterations),
1159 state.messages,
1160 self.max_iterations,
1161 state.total_usage,
1162 )
1163 }
1164
1165 /// Consume a streaming LLM response, forwarding text deltas as events,
1166 /// accumulating usage, and eagerly submitting tool calls via
1167 /// [`StreamingToolExecutor`](crate::StreamingToolExecutor).
1168 ///
1169 /// When a [`StreamChunk::ToolUse`] is received mid-stream the tool starts
1170 /// executing immediately. After the stream ends the executor's results
1171 /// are collected. This overlaps tool execution with LLM output generation.
1172 ///
1173 /// Returns the accumulated text, the final [`LlmResponse`](crate::LlmResponse),
1174 /// and optionally the collected streaming tool results (items + results)
1175 /// if any tools were submitted during the stream.
1176 #[allow(clippy::too_many_arguments)]
1177 async fn consume_stream(
1178 llm: &dyn LlmClient,
1179 messages: &[Message],
1180 tool_defs: &[ToolDef],
1181 tool_map: &HashMap<String, Arc<dyn Tool>>,
1182 tool_timeout: Option<std::time::Duration>,
1183 tool_context: &motosan_agent_tool::ToolContext,
1184 total_usage: &mut TokenUsage,
1185 on_event: &(impl Fn(AgentEvent) + Send + Sync),
1186 ) -> Result<ConsumeStreamResult> {
1187 use futures::StreamExt;
1188
1189 let mut stream = llm.chat_stream(messages, tool_defs);
1190 let mut accumulated = String::new();
1191 let mut final_response: Option<crate::LlmResponse> = None;
1192 let mut executor = crate::streaming_executor::StreamingToolExecutor::new();
1193 let mut submitted_ids: std::collections::HashSet<String> = std::collections::HashSet::new();
1194 let mut stop_reason: Option<crate::llm::StopReason> = None;
1195
1196 // Snapshot output tokens before this stream to compute the delta.
1197 let output_tokens_before = total_usage.output_tokens;
1198
1199 while let Some(chunk_result) = stream.next().await {
1200 let chunk = chunk_result?;
1201 match chunk {
1202 crate::llm::StreamChunk::TextDelta(delta) => {
1203 accumulated.push_str(&delta);
1204 on_event(AgentEvent::Core(CoreEvent::TextChunk(delta)));
1205 }
1206 crate::llm::StreamChunk::ToolUse { id, name, args } => {
1207 if !submitted_ids.contains(&id) {
1208 let item = ToolCallItem {
1209 id: id.clone(),
1210 name: name.clone(),
1211 args,
1212 };
1213 on_event(AgentEvent::Core(CoreEvent::ToolStarted {
1214 name: name.clone(),
1215 }));
1216 executor.submit(item, tool_map, tool_timeout, tool_context);
1217 submitted_ids.insert(id);
1218 }
1219 }
1220 crate::llm::StreamChunk::Done(response) => {
1221 final_response = Some(response);
1222 }
1223 crate::llm::StreamChunk::Usage(usage) => {
1224 total_usage.accumulate(usage);
1225 }
1226 crate::llm::StreamChunk::StopReason(reason) => {
1227 stop_reason = Some(reason);
1228 }
1229 }
1230 }
1231
1232 let response =
1233 final_response.unwrap_or_else(|| crate::LlmResponse::Message(accumulated.clone()));
1234
1235 let streaming_results = if executor.has_pending() {
1236 Some(executor.collect().await)
1237 } else {
1238 None
1239 };
1240
1241 // Compute output token delta for this call.
1242 let output_tokens_delta = total_usage.output_tokens - output_tokens_before;
1243 let output_tokens = if output_tokens_delta > 0 {
1244 Some(output_tokens_delta)
1245 } else {
1246 None
1247 };
1248
1249 Ok(ConsumeStreamResult {
1250 accumulated_text: accumulated,
1251 response,
1252 streaming_results,
1253 stop_reason,
1254 output_tokens,
1255 })
1256 }
1257
1258 /// Prepend context-provider messages to the conversation.
1259 ///
1260 /// Each non-empty result is inserted in registration order starting at
1261 /// index 0 (i.e. at the *beginning* of the conversation, as documented
1262 /// on [`ContextProvider`]).
1263 async fn prepare_messages(&self, mut messages: Vec<Message>) -> Result<Vec<Message>> {
1264 if self.context_providers.is_empty() {
1265 return Ok(messages);
1266 }
1267 let query: String = messages
1268 .iter()
1269 .rev()
1270 .find(|m| m.role() == crate::message::Role::User)
1271 .map(|m| m.text())
1272 .unwrap_or_default();
1273
1274 let contexts = try_join_all(self.context_providers.iter().map(|p| p.build(&query))).await?;
1275
1276 let mut insert_idx = 0;
1277 for ctx in contexts {
1278 if !ctx.is_empty() {
1279 messages.insert(insert_idx, Message::system(&ctx));
1280 insert_idx += 1;
1281 }
1282 }
1283 Ok(messages)
1284 }
1285
1286 #[cfg(feature = "cancellation")]
1287 async fn run_inner_cancel(
1288 &self,
1289 llm: &dyn LlmClient,
1290 messages: Vec<Message>,
1291 extra_tools: &[Arc<dyn Tool>],
1292 on_event: &(impl Fn(AgentEvent) + Send + Sync),
1293 cancel: &tokio_util::sync::CancellationToken,
1294 ) -> (Result<AgentResult>, Vec<Message>, RunTerminalMeta) {
1295 let tools = MergedTools::new(&self.tool_map, &self.tool_defs, extra_tools);
1296 let prepared = match self.prepare_messages(messages).await {
1297 Ok(m) => m,
1298 Err(e) => return run_return_failure(e, Vec::new(), 0, TokenUsage::default()),
1299 };
1300 let mut state = TurnState::new(prepared);
1301
1302 for iteration in 1..=self.max_iterations {
1303 // Pre-turn: cancellation check.
1304 if cancel.is_cancelled() {
1305 return run_return_failure(
1306 AgentError::Cancelled,
1307 state.messages,
1308 iteration,
1309 state.total_usage,
1310 );
1311 }
1312 on_event(AgentEvent::Core(CoreEvent::IterationStarted { iteration }));
1313
1314 // before_iteration extension hook (after IterationStarted, before request build).
1315 if let Err(e) = self
1316 .dispatch_before_iteration(iteration, &mut state.messages, on_event)
1317 .await
1318 {
1319 return run_return_failure(e, state.messages, iteration, state.total_usage);
1320 }
1321
1322 // Autocompact check before LLM call (after IterationStarted).
1323 if let Err(e) = self
1324 .maybe_compact_via_extensions(&mut state.messages, on_event)
1325 .await
1326 {
1327 return run_return_failure(e, state.messages, iteration, state.total_usage);
1328 }
1329
1330 loop {
1331 // LLM step with cancellation race.
1332 let output = tokio::select! {
1333 output = llm.chat(&state.messages, tools.tool_defs()) => match output {
1334 Ok(o) => o,
1335 Err(e) => return run_return_failure(e, state.messages, iteration, state.total_usage),
1336 },
1337 _ = cancel.cancelled() => return run_return_failure(AgentError::Cancelled, state.messages, iteration, state.total_usage),
1338 };
1339
1340 state.accumulate_usage(output.usage);
1341 let stop_reason = output.stop_reason;
1342 let output_tokens = output.usage.map(|u| u.output_tokens);
1343
1344 match output.response {
1345 crate::LlmResponse::Message(text) => {
1346 on_event(AgentEvent::Core(CoreEvent::TextChunk(text.clone())));
1347
1348 let meta = crate::llm::LlmResponseMeta {
1349 stop_reason,
1350 output_tokens,
1351 };
1352 let outcome = match self
1353 .handle_text_response(text, meta, &mut state, on_event)
1354 .await
1355 {
1356 Ok(o) => o,
1357 Err(e) => {
1358 return run_return_failure(
1359 e,
1360 state.messages,
1361 iteration,
1362 state.total_usage,
1363 )
1364 }
1365 };
1366
1367 match outcome {
1368 TextResponseOutcome::Continue => continue,
1369 TextResponseOutcome::Halt(msg) => {
1370 return run_return_failure(
1371 AgentError::Internal(format!(
1372 "Turn halted by extension: {}",
1373 msg
1374 )),
1375 state.messages,
1376 iteration,
1377 state.total_usage,
1378 );
1379 }
1380 TextResponseOutcome::Finalize(text) => {
1381 state.messages.push(Message::assistant(&text));
1382 let res = state.into_result(text, iteration);
1383 let snapshot = res.messages.clone();
1384 return run_return_success(res, snapshot);
1385 }
1386 }
1387 }
1388 crate::LlmResponse::ToolCalls(items) => {
1389 emit_tool_started(&items, on_event);
1390 let slots = self.dispatch_intercept_tool_calls(&items, on_event).await;
1391 // Resolve any Deferred slots. No ops channel in this path.
1392 let mut tmp_ops_state = OpsState::default();
1393 let slots = match self
1394 .resolve_deferred_slots(slots, &mut tmp_ops_state, &mut None, on_event)
1395 .await
1396 {
1397 Ok(s) => s,
1398 Err(e) => {
1399 return run_return_failure(
1400 e,
1401 state.messages,
1402 iteration,
1403 state.total_usage,
1404 )
1405 }
1406 };
1407 let mut remaining_items: Vec<crate::llm::ToolCallItem> = Vec::new();
1408 let mut remaining_indices: Vec<usize> = Vec::new();
1409 let mut final_results: Vec<Option<motosan_agent_tool::ToolResult>> = slots
1410 .into_iter()
1411 .map(|s| match s {
1412 InterceptedSlot::Resolved(r) => Some(r),
1413 InterceptedSlot::Pending => None,
1414 InterceptedSlot::Deferred(_) => {
1415 unreachable!(
1416 "InterceptedSlot::Deferred should have been \
1417 resolved by resolve_deferred_slots"
1418 );
1419 }
1420 })
1421 .collect();
1422 for (i, item) in items.iter().enumerate() {
1423 if final_results[i].is_none() {
1424 remaining_items.push(item.clone());
1425 remaining_indices.push(i);
1426 }
1427 }
1428 let execution_results = Self::execute_tools_parallel(
1429 tools.tool_map(),
1430 &remaining_items,
1431 self.tool_timeout,
1432 &self.tool_context,
1433 )
1434 .await;
1435 for (exec_idx, original_idx) in remaining_indices.iter().enumerate() {
1436 final_results[*original_idx] =
1437 Some(execution_results[exec_idx].clone());
1438 }
1439 let results: Vec<motosan_agent_tool::ToolResult> = final_results
1440 .into_iter()
1441 .map(|opt| opt.expect("all slots filled after intercept + execute"))
1442 .collect();
1443 if let Err(e) = self
1444 .finalize_tool_call_batch(
1445 &items,
1446 results,
1447 iteration,
1448 tools.tool_defs(),
1449 &mut state,
1450 on_event,
1451 )
1452 .await
1453 {
1454 return run_return_failure(
1455 e,
1456 state.messages,
1457 iteration,
1458 state.total_usage,
1459 );
1460 }
1461 break;
1462 }
1463 }
1464 }
1465 }
1466
1467 run_return_failure(
1468 AgentError::MaxIterations(self.max_iterations),
1469 state.messages,
1470 self.max_iterations,
1471 state.total_usage,
1472 )
1473 }
1474
1475 /// Internal turn driver.
1476 ///
1477 /// Returns a tuple of `(Result<AgentResult>, Vec<Message>)` so callers
1478 /// (notably `run`) can recover the final conversation history even
1479 /// when the loop terminated with an error. On success, the returned
1480 /// `Vec<Message>` is a clone of `AgentResult.messages` for ergonomic
1481 /// access; on failure it contains the partial history accumulated so far.
1482 async fn run_inner_with_ops(
1483 &self,
1484 llm: &dyn LlmClient,
1485 messages: Vec<Message>,
1486 extra_tools: &[Arc<dyn Tool>],
1487 on_event: &(impl Fn(AgentEvent) + Send + Sync),
1488 mut ops_rx: Option<mpsc::Receiver<AgentOp>>,
1489 ) -> (Result<AgentResult>, Vec<Message>, RunTerminalMeta) {
1490 let tools = MergedTools::new(&self.tool_map, &self.tool_defs, extra_tools);
1491 let prepared = match self.prepare_messages(messages).await {
1492 Ok(m) => m,
1493 Err(e) => return run_return_failure(e, Vec::new(), 0, TokenUsage::default()),
1494 };
1495 let mut state = TurnState::new(prepared);
1496 let mut ops_state = OpsState::default();
1497
1498 for iteration in 1..=self.max_iterations {
1499 // Pre-turn: ingest pending ops.
1500 if let Err(e) = self
1501 .drain_ops(&mut state.messages, &mut ops_rx, &mut ops_state, on_event)
1502 .await
1503 {
1504 return run_return_failure(e, state.messages, iteration, state.total_usage);
1505 }
1506 if ops_state.interrupted {
1507 on_event(AgentEvent::Core(CoreEvent::Interrupted));
1508 let res =
1509 state.into_result("(interrupted)".to_string(), iteration.saturating_sub(1));
1510 let snapshot = res.messages.clone();
1511 return run_return_interrupted(res, snapshot);
1512 }
1513
1514 on_event(AgentEvent::Core(CoreEvent::IterationStarted { iteration }));
1515
1516 // before_iteration extension hook (after IterationStarted, before request build).
1517 if let Err(e) = self
1518 .dispatch_before_iteration(iteration, &mut state.messages, on_event)
1519 .await
1520 {
1521 return run_return_failure(e, state.messages, iteration, state.total_usage);
1522 }
1523
1524 // Autocompact check before LLM call (after IterationStarted).
1525 if let Err(e) = self
1526 .maybe_compact_via_extensions(&mut state.messages, on_event)
1527 .await
1528 {
1529 return run_return_failure(e, state.messages, iteration, state.total_usage);
1530 }
1531
1532 loop {
1533 // LLM step.
1534 let output = match llm.chat(&state.messages, tools.tool_defs()).await {
1535 Ok(o) => o,
1536 Err(e) => {
1537 return run_return_failure(e, state.messages, iteration, state.total_usage);
1538 }
1539 };
1540
1541 state.accumulate_usage(output.usage);
1542 let stop_reason = output.stop_reason;
1543 let output_tokens = output.usage.map(|u| u.output_tokens);
1544
1545 match output.response {
1546 crate::LlmResponse::Message(text) => {
1547 on_event(AgentEvent::Core(CoreEvent::TextChunk(text.clone())));
1548
1549 let meta = crate::llm::LlmResponseMeta {
1550 stop_reason,
1551 output_tokens,
1552 };
1553 let outcome = match self
1554 .handle_text_response(text, meta, &mut state, on_event)
1555 .await
1556 {
1557 Ok(o) => o,
1558 Err(e) => {
1559 return run_return_failure(
1560 e,
1561 state.messages,
1562 iteration,
1563 state.total_usage,
1564 )
1565 }
1566 };
1567
1568 match outcome {
1569 TextResponseOutcome::Continue => continue,
1570 TextResponseOutcome::Halt(msg) => {
1571 return run_return_failure(
1572 AgentError::Internal(format!(
1573 "Turn halted by extension: {}",
1574 msg
1575 )),
1576 state.messages,
1577 iteration,
1578 state.total_usage,
1579 );
1580 }
1581 TextResponseOutcome::Finalize(text) => {
1582 state.messages.push(Message::assistant(&text));
1583 let res = state.into_result(text, iteration);
1584 let snapshot = res.messages.clone();
1585 return run_return_success(res, snapshot);
1586 }
1587 }
1588 }
1589 crate::LlmResponse::ToolCalls(items) => {
1590 // Tool-call execution stage (with ask_user policy).
1591 emit_tool_started(&items, on_event);
1592 let results = self
1593 .execute_tools_with_policy(
1594 tools.tool_map(),
1595 &items,
1596 &mut state.messages,
1597 &mut ops_rx,
1598 &mut ops_state,
1599 on_event,
1600 )
1601 .await;
1602 if let Err(e) = self
1603 .finalize_tool_call_batch(
1604 &items,
1605 results,
1606 iteration,
1607 tools.tool_defs(),
1608 &mut state,
1609 on_event,
1610 )
1611 .await
1612 {
1613 return run_return_failure(
1614 e,
1615 state.messages,
1616 iteration,
1617 state.total_usage,
1618 );
1619 }
1620 break; // Back to outer iteration loop.
1621 }
1622 }
1623 }
1624 }
1625
1626 run_return_failure(
1627 AgentError::MaxIterations(self.max_iterations),
1628 state.messages,
1629 self.max_iterations,
1630 state.total_usage,
1631 )
1632 }
1633
1634 /// Inner function covering (batch LLM + ops_rx + cancel). Combines
1635 /// the ops drain-and-dispatch loop from `run_inner_with_ops` with
1636 /// the pre-iteration cancel check from `run_inner_cancel`. Returns
1637 /// the same `(Result<AgentResult>, Vec<Message>)` tuple shape as
1638 /// `run_inner_with_ops` for dispatch uniformity.
1639 ///
1640 /// # Cancel semantics
1641 ///
1642 /// The cancellation token is checked at two points per iteration:
1643 ///
1644 /// 1. **Before `drain_ops` at iteration top.** A pending cancel wins
1645 /// over any queued ops — the function returns
1646 /// `Err(AgentError::Cancelled)` with no `Interrupted` event and
1647 /// any unread ops in the channel are dropped on the floor. This
1648 /// matches the intuition that cancel is stronger than interrupt.
1649 ///
1650 /// 2. **Inside the `tokio::select!` around `llm.chat()`.** If the
1651 /// token is tripped during the LLM call, the in-flight chat future
1652 /// is dropped and the function returns `Err(AgentError::Cancelled)`.
1653 ///
1654 /// # Known latency gap: tool-call deferred-slot resolution
1655 ///
1656 /// During tool-call execution, this function calls
1657 /// `execute_tools_with_policy`, which in turn calls
1658 /// `resolve_deferred_slots` to wait for matching ops (e.g. the
1659 /// `AskUserAnswer` for an `ask_user` tool call). The cancel token is
1660 /// **not** raced inside `resolve_deferred_slots`, so a cancel that
1661 /// arrives while an `ask_user` slot is still awaiting its answer will
1662 /// not be observed until the slot resolves (or the 60 s defer
1663 /// timeout fires). This is a known limitation inherited from the
1664 /// current `execute_tools_with_policy` signature, which does not
1665 /// accept a cancel token. Fixing it requires threading cancel through
1666 /// to `resolve_deferred_slots`, tracked as a follow-up task.
1667 #[cfg(feature = "cancellation")]
1668 #[allow(dead_code)] // Called by RunBuilder dispatch in a later task
1669 async fn run_inner_with_ops_and_cancel(
1670 &self,
1671 llm: &dyn LlmClient,
1672 messages: Vec<Message>,
1673 extra_tools: &[Arc<dyn Tool>],
1674 on_event: &(impl Fn(AgentEvent) + Send + Sync),
1675 ops_rx: mpsc::Receiver<AgentOp>,
1676 cancel: &tokio_util::sync::CancellationToken,
1677 ) -> (Result<AgentResult>, Vec<Message>, RunTerminalMeta) {
1678 let tools = MergedTools::new(&self.tool_map, &self.tool_defs, extra_tools);
1679 let prepared = match self.prepare_messages(messages).await {
1680 Ok(m) => m,
1681 Err(e) => return run_return_failure(e, Vec::new(), 0, TokenUsage::default()),
1682 };
1683 let mut state = TurnState::new(prepared);
1684 let mut ops_state = OpsState::default();
1685 let mut ops_rx_opt = Some(ops_rx);
1686
1687 for iteration in 1..=self.max_iterations {
1688 // Cancel check (from run_inner_cancel).
1689 if cancel.is_cancelled() {
1690 return run_return_failure(
1691 AgentError::Cancelled,
1692 state.messages,
1693 iteration,
1694 state.total_usage,
1695 );
1696 }
1697
1698 // Drain pending ops (from run_inner_with_ops).
1699 if let Err(e) = self
1700 .drain_ops(
1701 &mut state.messages,
1702 &mut ops_rx_opt,
1703 &mut ops_state,
1704 on_event,
1705 )
1706 .await
1707 {
1708 return run_return_failure(e, state.messages, iteration, state.total_usage);
1709 }
1710 if ops_state.interrupted {
1711 on_event(AgentEvent::Core(CoreEvent::Interrupted));
1712 let res =
1713 state.into_result("(interrupted)".to_string(), iteration.saturating_sub(1));
1714 let snapshot = res.messages.clone();
1715 return run_return_interrupted(res, snapshot);
1716 }
1717
1718 on_event(AgentEvent::Core(CoreEvent::IterationStarted { iteration }));
1719
1720 if let Err(e) = self
1721 .dispatch_before_iteration(iteration, &mut state.messages, on_event)
1722 .await
1723 {
1724 return run_return_failure(e, state.messages, iteration, state.total_usage);
1725 }
1726
1727 if let Err(e) = self
1728 .maybe_compact_via_extensions(&mut state.messages, on_event)
1729 .await
1730 {
1731 return run_return_failure(e, state.messages, iteration, state.total_usage);
1732 }
1733
1734 loop {
1735 // LLM step raced against cancel token.
1736 let output = tokio::select! {
1737 output = llm.chat(&state.messages, tools.tool_defs()) => match output {
1738 Ok(o) => o,
1739 Err(e) => return run_return_failure(e, state.messages, iteration, state.total_usage),
1740 },
1741 _ = cancel.cancelled() => {
1742 return run_return_failure(AgentError::Cancelled, state.messages, iteration, state.total_usage);
1743 }
1744 };
1745
1746 state.accumulate_usage(output.usage);
1747 let stop_reason = output.stop_reason;
1748 let output_tokens = output.usage.map(|u| u.output_tokens);
1749
1750 match output.response {
1751 crate::LlmResponse::Message(text) => {
1752 on_event(AgentEvent::Core(CoreEvent::TextChunk(text.clone())));
1753
1754 let meta = crate::llm::LlmResponseMeta {
1755 stop_reason,
1756 output_tokens,
1757 };
1758 let outcome = match self
1759 .handle_text_response(text, meta, &mut state, on_event)
1760 .await
1761 {
1762 Ok(o) => o,
1763 Err(e) => {
1764 return run_return_failure(
1765 e,
1766 state.messages,
1767 iteration,
1768 state.total_usage,
1769 )
1770 }
1771 };
1772
1773 match outcome {
1774 TextResponseOutcome::Continue => continue,
1775 TextResponseOutcome::Halt(msg) => {
1776 return run_return_failure(
1777 AgentError::Internal(format!(
1778 "Turn halted by extension: {}",
1779 msg
1780 )),
1781 state.messages,
1782 iteration,
1783 state.total_usage,
1784 );
1785 }
1786 TextResponseOutcome::Finalize(text) => {
1787 state.messages.push(Message::assistant(&text));
1788 let res = state.into_result(text, iteration);
1789 let snapshot = res.messages.clone();
1790 return run_return_success(res, snapshot);
1791 }
1792 }
1793 }
1794 crate::LlmResponse::ToolCalls(items) => {
1795 emit_tool_started(&items, on_event);
1796 let results = self
1797 .execute_tools_with_policy(
1798 tools.tool_map(),
1799 &items,
1800 &mut state.messages,
1801 &mut ops_rx_opt,
1802 &mut ops_state,
1803 on_event,
1804 )
1805 .await;
1806 if let Err(e) = self
1807 .finalize_tool_call_batch(
1808 &items,
1809 results,
1810 iteration,
1811 tools.tool_defs(),
1812 &mut state,
1813 on_event,
1814 )
1815 .await
1816 {
1817 return run_return_failure(
1818 e,
1819 state.messages,
1820 iteration,
1821 state.total_usage,
1822 );
1823 }
1824 break;
1825 }
1826 }
1827 }
1828 }
1829
1830 run_return_failure(
1831 AgentError::MaxIterations(self.max_iterations),
1832 state.messages,
1833 self.max_iterations,
1834 state.total_usage,
1835 )
1836 }
1837
1838 /// Estimate the token count of a message list using a simple heuristic
1839 /// (4 characters per token). Used in tests.
1840 #[allow(dead_code)]
1841 fn estimate_tokens(messages: &[Message]) -> usize {
1842 messages.iter().map(|m| m.approx_visible_chars() / 4).sum()
1843 }
1844
1845 // TODO(phase-4): delete maybe_compact_via_extensions entirely.
1846 // After the public API cutover, consumers observe CoreEvent +
1847 // ExtensionEvent directly and the AgentEvent::AutoCompacted
1848 // forwarding adapter below (plus the variant itself in the
1849 // AgentEvent enum) becomes unnecessary.
1850
1851 /// Run the extension pipeline's `transform_request` hook and forward any
1852 /// `AutocompactEvent` as the legacy `AgentEvent::AutoCompacted` variant.
1853 ///
1854 /// Short-circuits immediately when no extensions are registered so the
1855 /// hot path pays no overhead.
1856 async fn maybe_compact_via_extensions(
1857 &self,
1858 messages: &mut Vec<Message>,
1859 on_event: &(impl Fn(AgentEvent) + Send + Sync),
1860 ) -> Result<()> {
1861 // Build an AgentState snapshot for the hook.
1862 let tools_snapshot: Vec<motosan_agent_tool::ToolDef> = self.tool_defs.to_vec();
1863 let agent_state = crate::core::AgentState {
1864 iteration: 0,
1865 messages: messages.as_slice(),
1866 tools: &tools_snapshot,
1867 turn_started_at: std::time::Instant::now(),
1868 token_usage: TokenUsage::default(),
1869 };
1870
1871 // CaptureSink collects any events the extension emits.
1872 let mut sink = crate::core::CaptureSink::default();
1873
1874 // Lock the ExtensionSet and call the extension pipeline.
1875 // Short-circuit on empty set (common case when no extensions are registered).
1876 // ErrorPolicy::Fallback is handled inside ExtensionSet::transform_request —
1877 // a failing extension is logged and skipped; messages are left unchanged.
1878 // Only ErrorPolicy::Abort extensions surface an error here.
1879 let transformed = {
1880 let mut set = self.extensions.lock().await;
1881 if set.is_empty() {
1882 return Ok(());
1883 }
1884 set.transform_request(messages.clone(), &agent_state, &mut sink)
1885 .await
1886 .map_err(|e| crate::error::AgentError::Internal(e.message().to_string()))?
1887 };
1888
1889 *messages = transformed;
1890
1891 // Forward any extension events captured during transform_request.
1892 for (_name, evt) in sink.captured() {
1893 forward_extension_event(evt.as_ref(), on_event);
1894 }
1895
1896 Ok(())
1897 }
1898
1899 /// Dispatch `Extension::before_iteration` at the top of each iteration.
1900 ///
1901 /// Mirrors `maybe_compact_via_extensions` in shape: builds an
1902 /// `AgentState` snapshot, locks the `ExtensionSet`, and propagates
1903 /// per-extension `ErrorPolicy` through `ExtensionSet::before_iteration`.
1904 /// `ErrorPolicy::Fallback` is handled inside the set; only `Abort`
1905 /// surfaces an error here, which is wrapped as `AgentError::Internal`.
1906 ///
1907 /// Returned `FlowDecision`:
1908 /// - `Continue`: caller proceeds to the LLM step normally.
1909 /// - `Inject(msg)`: the message is appended to history and the caller
1910 /// proceeds to the LLM step (same iteration) — semantically the
1911 /// extension nudged the conversation before the model sees it.
1912 /// - `Halt(reason)`: the turn is terminated with
1913 /// `AgentError::Internal(reason.message)`, mirroring how
1914 /// `handle_text_response` treats `Halt`.
1915 ///
1916 /// Short-circuits immediately when no extensions are registered so the
1917 /// hot path pays no overhead.
1918 async fn dispatch_before_iteration(
1919 &self,
1920 iteration: usize,
1921 messages: &mut Vec<Message>,
1922 on_event: &(impl Fn(AgentEvent) + Send + Sync),
1923 ) -> Result<()> {
1924 if self.extensions.lock().await.is_empty() {
1925 return Ok(());
1926 }
1927
1928 let tools_snapshot: Vec<motosan_agent_tool::ToolDef> = self.tool_defs.to_vec();
1929 let agent_state = crate::core::AgentState {
1930 iteration,
1931 messages: messages.as_slice(),
1932 tools: &tools_snapshot,
1933 turn_started_at: std::time::Instant::now(),
1934 token_usage: TokenUsage::default(),
1935 };
1936
1937 let mut sink = crate::core::CaptureSink::default();
1938 let decision = self
1939 .extensions
1940 .lock()
1941 .await
1942 .before_iteration(&agent_state, &mut sink)
1943 .await
1944 .map_err(|e| crate::error::AgentError::Internal(e.message().to_string()))?;
1945
1946 // Forward any captured extension events.
1947 for (_name, evt) in sink.captured() {
1948 forward_extension_event(evt.as_ref(), on_event);
1949 }
1950
1951 match decision {
1952 FlowDecision::Continue => Ok(()),
1953 FlowDecision::Inject(msg) => {
1954 messages.push(msg);
1955 Ok(())
1956 }
1957 FlowDecision::Halt(reason) => Err(crate::error::AgentError::Internal(reason.message)),
1958 }
1959 }
1960
1961 // TODO(phase-4): delete dispatch_after_llm_response once the public API
1962 // cutover deletes AgentEvent::TokenBudgetContinuation.
1963
1964 /// Dispatch `after_llm_response` via the extension system.
1965 ///
1966 /// Returns the `FlowDecision` from the first extension that returned
1967 /// something other than `Continue`, or `Continue` if all extensions
1968 /// passed. The caller is responsible for interpreting the decision:
1969 /// - `Continue`: proceed to tool dispatch as normal
1970 /// - `Inject(msg)`: append `msg` to history and loop back to another
1971 /// LLM call (do NOT proceed to tool dispatch)
1972 /// - `Halt(reason)`: terminate the turn with the halt reason
1973 ///
1974 /// Also forwards `TokenBudgetEvent::Continuation` to the legacy
1975 /// `AgentEvent::TokenBudgetContinuation` event stream for backward
1976 /// compatibility. Phase 4 will delete the forwarding.
1977 async fn dispatch_after_llm_response(
1978 &self,
1979 resp: &crate::llm::LlmResponse,
1980 meta: &crate::llm::LlmResponseMeta,
1981 on_event: &(impl Fn(AgentEvent) + Send + Sync),
1982 ) -> Result<FlowDecision> {
1983 if self.extensions.lock().await.is_empty() {
1984 return Ok(FlowDecision::Continue);
1985 }
1986
1987 let tools_snapshot = self.tool_defs.to_vec();
1988 // TODO(phase-4): pass the real messages through instead of &[].
1989 // TokenBudgetExtension doesn't read state.messages so this is OK now.
1990 let empty_messages: Vec<crate::message::Message> = vec![];
1991 let agent_state = crate::core::AgentState {
1992 iteration: 0,
1993 messages: &empty_messages,
1994 tools: &tools_snapshot,
1995 turn_started_at: std::time::Instant::now(),
1996 token_usage: crate::llm::TokenUsage::default(),
1997 };
1998
1999 let mut sink = crate::core::CaptureSink::default();
2000 let decision = self
2001 .extensions
2002 .lock()
2003 .await
2004 .after_llm_response(resp, meta, &agent_state, &mut sink)
2005 .await
2006 .map_err(|e| crate::error::AgentError::Internal(e.message().to_string()))?;
2007
2008 // Forward captured extension events.
2009 for (_name, evt) in sink.captured() {
2010 forward_extension_event(evt.as_ref(), on_event);
2011 }
2012
2013 Ok(decision)
2014 }
2015
2016 fn agent_state_snapshot<'a>(
2017 &self,
2018 iteration: usize,
2019 messages: &'a [crate::message::Message],
2020 tools: &'a [motosan_agent_tool::ToolDef],
2021 token_usage: crate::llm::TokenUsage,
2022 ) -> crate::core::AgentState<'a> {
2023 crate::core::AgentState {
2024 iteration,
2025 messages,
2026 tools,
2027 turn_started_at: std::time::Instant::now(),
2028 token_usage,
2029 }
2030 }
2031
2032 /// Dispatch `Extension::after_tool_result` for a single tool result.
2033 /// Runs the full pipeline; returns the first non-`Continue` decision
2034 /// encountered, or `Continue` if all extensions returned `Continue`.
2035 ///
2036 /// Captured extension events are forwarded via `on_event`.
2037 async fn dispatch_after_tool_result(
2038 &self,
2039 result: &motosan_agent_tool::ToolResult,
2040 agent_state: &crate::core::AgentState<'_>,
2041 on_event: &(impl Fn(AgentEvent) + Send + Sync),
2042 ) -> Result<FlowDecision> {
2043 if self.extensions.lock().await.is_empty() {
2044 return Ok(FlowDecision::Continue);
2045 }
2046
2047 let mut sink = crate::core::CaptureSink::default();
2048 let decision = self
2049 .extensions
2050 .lock()
2051 .await
2052 .after_tool_result(result, agent_state, &mut sink)
2053 .await
2054 .map_err(|e| crate::error::AgentError::Internal(e.message().to_string()))?;
2055
2056 for (_name, evt) in sink.captured() {
2057 forward_extension_event(evt.as_ref(), on_event);
2058 }
2059
2060 Ok(decision)
2061 }
2062
2063 /// Dispatch `Extension::rewrite_tool_result` pipeline for a single
2064 /// tool call's result. Returns the final (possibly rewritten) result.
2065 /// Captured extension events are forwarded via `on_event`.
2066 async fn dispatch_rewrite_tool_result(
2067 &self,
2068 call: &crate::llm::ToolCallItem,
2069 initial: motosan_agent_tool::ToolResult,
2070 agent_state: &crate::core::AgentState<'_>,
2071 on_event: &(impl Fn(AgentEvent) + Send + Sync),
2072 ) -> Result<motosan_agent_tool::ToolResult> {
2073 if self.extensions.lock().await.is_empty() {
2074 return Ok(initial);
2075 }
2076
2077 let mut sink = crate::core::CaptureSink::default();
2078 let final_result = self
2079 .extensions
2080 .lock()
2081 .await
2082 .rewrite_tool_result(call, initial, agent_state, &mut sink)
2083 .await
2084 .map_err(|e| crate::error::AgentError::Internal(e.message().to_string()))?;
2085
2086 for (_name, evt) in sink.captured() {
2087 forward_extension_event(evt.as_ref(), on_event);
2088 }
2089
2090 Ok(final_result)
2091 }
2092
2093 /// Dispatch `Extension::on_terminal` across all extensions.
2094 async fn dispatch_on_terminal(
2095 &self,
2096 result: &crate::core::extension::TurnResult,
2097 iteration: usize,
2098 messages: &[crate::message::Message],
2099 tools: &[motosan_agent_tool::ToolDef],
2100 token_usage: crate::llm::TokenUsage,
2101 on_event: &(impl Fn(AgentEvent) + Send + Sync),
2102 ) -> Result<()> {
2103 if self.extensions.lock().await.is_empty() {
2104 return Ok(());
2105 }
2106
2107 let agent_state = self.agent_state_snapshot(iteration, messages, tools, token_usage);
2108 let mut sink = crate::core::CaptureSink::default();
2109 self.extensions
2110 .lock()
2111 .await
2112 .on_terminal(result, &agent_state, &mut sink)
2113 .await
2114 .map_err(|e| crate::error::AgentError::Internal(e.message().to_string()))?;
2115
2116 for (_name, evt) in sink.captured() {
2117 forward_extension_event(evt.as_ref(), on_event);
2118 }
2119
2120 Ok(())
2121 }
2122
2123 async fn dispatch_on_terminal_from_meta(
2124 &self,
2125 meta: &RunTerminalMeta,
2126 messages: &[crate::message::Message],
2127 tools: &[motosan_agent_tool::ToolDef],
2128 on_event: &(impl Fn(AgentEvent) + Send + Sync),
2129 ) -> Result<()> {
2130 self.dispatch_on_terminal(
2131 &meta.turn_result,
2132 meta.iteration,
2133 messages,
2134 tools,
2135 meta.token_usage,
2136 on_event,
2137 )
2138 .await
2139 }
2140
2141 /// Handle a text response from the LLM inside a run method's inner loop.
2142 ///
2143 /// Dispatches `after_llm_response` to the extension system, interprets
2144 /// the returned `FlowDecision`, and returns a `TextResponseOutcome` that
2145 /// tells the caller what to do next:
2146 ///
2147 /// - `Finalize`: the caller should push `Message::assistant(text)` and
2148 /// build the `AgentResult`
2149 /// - `Continue`: the extension injected a continuation; the caller's
2150 /// inner LLM loop should run another iteration
2151 /// - `Halt`: the extension halted the turn; the caller should return
2152 /// `Err(AgentError::Internal(...))`
2153 ///
2154 /// The `TextChunk`/`TextDone` events are NOT emitted here — call sites
2155 /// emit them before calling this helper (streaming variants emit chunks
2156 /// during the stream itself; non-streaming variants emit `TextChunk`
2157 /// synchronously before dispatching).
2158 async fn handle_text_response(
2159 &self,
2160 text: String,
2161 meta: crate::llm::LlmResponseMeta,
2162 state: &mut TurnState,
2163 on_event: &(impl Fn(AgentEvent) + Send + Sync),
2164 ) -> Result<TextResponseOutcome> {
2165 let resp_for_ext = crate::LlmResponse::Message(text.clone());
2166 let decision = self
2167 .dispatch_after_llm_response(&resp_for_ext, &meta, on_event)
2168 .await?;
2169
2170 match decision {
2171 FlowDecision::Continue => Ok(TextResponseOutcome::Finalize(text)),
2172 FlowDecision::Inject(msg) => {
2173 state.continuation_text.push_str(&text);
2174 state.messages.push(Message::assistant(&text));
2175 state.messages.push(msg);
2176 Ok(TextResponseOutcome::Continue)
2177 }
2178 FlowDecision::Halt(reason) => Ok(TextResponseOutcome::Halt(reason.message)),
2179 }
2180 }
2181
2182 /// Shared body of the streaming iteration
2183 /// loop. `ops_rx` is threaded through so deferred tool slots and
2184 /// the pre-iteration drain stage both have access to the real
2185 /// channel when present.
2186 async fn run_streaming_inner(
2187 &self,
2188 llm: &dyn LlmClient,
2189 messages: Vec<Message>,
2190 extra_tools: &[Arc<dyn Tool>],
2191 mut ops_rx: Option<mpsc::Receiver<AgentOp>>,
2192 on_event: impl Fn(AgentEvent) + Send + Sync,
2193 ) -> (Result<AgentResult>, Vec<Message>, RunTerminalMeta) {
2194 let tools = MergedTools::new(&self.tool_map, &self.tool_defs, extra_tools);
2195 let prepared = match self.prepare_messages(messages).await {
2196 Ok(m) => m,
2197 Err(e) => return run_return_failure(e, Vec::new(), 0, TokenUsage::default()),
2198 };
2199 let mut state = TurnState::new(prepared);
2200 let mut ops_state = OpsState::default();
2201
2202 for iteration in 1..=self.max_iterations {
2203 // Pre-iteration: ingest any pending ops routed through the
2204 // extension pipeline (dispatch_on_op). This matches the
2205 // ordering in `run_inner_with_ops` so callback and
2206 // stream-item paths stay dispatch-equivalent.
2207 if let Err(e) = self
2208 .drain_ops(&mut state.messages, &mut ops_rx, &mut ops_state, &on_event)
2209 .await
2210 {
2211 return run_return_failure(e, state.messages, iteration, state.total_usage);
2212 }
2213 if ops_state.interrupted {
2214 on_event(AgentEvent::Core(CoreEvent::Interrupted));
2215 let res =
2216 state.into_result("(interrupted)".to_string(), iteration.saturating_sub(1));
2217 let snapshot = res.messages.clone();
2218 return run_return_interrupted(res, snapshot);
2219 }
2220
2221 on_event(AgentEvent::Core(CoreEvent::IterationStarted { iteration }));
2222
2223 // before_iteration extension hook (after IterationStarted, before request build).
2224 if let Err(e) = self
2225 .dispatch_before_iteration(iteration, &mut state.messages, &on_event)
2226 .await
2227 {
2228 return run_return_failure(e, state.messages, iteration, state.total_usage);
2229 }
2230
2231 // Autocompact check before LLM call (after IterationStarted).
2232 if let Err(e) = self
2233 .maybe_compact_via_extensions(&mut state.messages, &on_event)
2234 .await
2235 {
2236 return run_return_failure(e, state.messages, iteration, state.total_usage);
2237 }
2238
2239 loop {
2240 // Streaming LLM step with eager tool execution.
2241 let stream_result = match Self::consume_stream(
2242 llm,
2243 &state.messages,
2244 tools.tool_defs(),
2245 tools.tool_map(),
2246 self.tool_timeout,
2247 &self.tool_context,
2248 &mut state.total_usage,
2249 &on_event,
2250 )
2251 .await
2252 {
2253 Ok(r) => r,
2254 Err(e) => {
2255 return run_return_failure(e, state.messages, iteration, state.total_usage)
2256 }
2257 };
2258
2259 match stream_result.response {
2260 crate::LlmResponse::Message(text) => {
2261 if stream_result.accumulated_text.is_empty() && !text.is_empty() {
2262 on_event(AgentEvent::Core(CoreEvent::TextChunk(text.clone())));
2263 }
2264 on_event(AgentEvent::Core(CoreEvent::TextDone(text.clone())));
2265
2266 let meta = crate::llm::LlmResponseMeta {
2267 stop_reason: stream_result.stop_reason,
2268 output_tokens: stream_result.output_tokens,
2269 };
2270 let outcome = match self
2271 .handle_text_response(text, meta, &mut state, &on_event)
2272 .await
2273 {
2274 Ok(o) => o,
2275 Err(e) => {
2276 return run_return_failure(
2277 e,
2278 state.messages,
2279 iteration,
2280 state.total_usage,
2281 )
2282 }
2283 };
2284
2285 match outcome {
2286 TextResponseOutcome::Continue => continue,
2287 TextResponseOutcome::Halt(msg) => {
2288 return run_return_failure(
2289 AgentError::Internal(format!(
2290 "Turn halted by extension: {}",
2291 msg
2292 )),
2293 state.messages,
2294 iteration,
2295 state.total_usage,
2296 );
2297 }
2298 TextResponseOutcome::Finalize(text) => {
2299 state.messages.push(Message::assistant(&text));
2300 let res = state.into_result(text, iteration);
2301 let snapshot = res.messages.clone();
2302 return run_return_success(res, snapshot);
2303 }
2304 }
2305 }
2306 crate::LlmResponse::ToolCalls(items) => {
2307 if let Some((streamed_items, streamed_results)) =
2308 stream_result.streaming_results
2309 {
2310 let (final_items, final_results) = merge_streamed_tool_results(
2311 &items,
2312 streamed_items,
2313 streamed_results,
2314 tools.tool_map(),
2315 self.tool_timeout,
2316 &self.tool_context,
2317 &on_event,
2318 )
2319 .await;
2320 if let Err(e) = self
2321 .finalize_tool_call_batch(
2322 &final_items,
2323 final_results,
2324 iteration,
2325 tools.tool_defs(),
2326 &mut state,
2327 &on_event,
2328 )
2329 .await
2330 {
2331 return run_return_failure(
2332 e,
2333 state.messages,
2334 iteration,
2335 state.total_usage,
2336 );
2337 }
2338 } else {
2339 // No streaming tool execution — fall back to batch execution.
2340 emit_tool_started(&items, &on_event);
2341 let slots = self.dispatch_intercept_tool_calls(&items, &on_event).await;
2342 // Resolve any Deferred slots via the real ops
2343 // channel when one is available. Previously this
2344 // path always passed `&mut None`, which meant
2345 // ask_user / exit_plan_mode slots could not
2346 // complete on the streaming callback path.
2347 let slots = match self
2348 .resolve_deferred_slots(
2349 slots,
2350 &mut ops_state,
2351 &mut ops_rx,
2352 &on_event,
2353 )
2354 .await
2355 {
2356 Ok(s) => s,
2357 Err(e) => {
2358 return run_return_failure(
2359 e,
2360 state.messages,
2361 iteration,
2362 state.total_usage,
2363 )
2364 }
2365 };
2366 let mut remaining_items: Vec<crate::llm::ToolCallItem> = Vec::new();
2367 let mut remaining_indices: Vec<usize> = Vec::new();
2368 let mut final_results: Vec<Option<motosan_agent_tool::ToolResult>> =
2369 slots
2370 .into_iter()
2371 .map(|s| match s {
2372 InterceptedSlot::Resolved(r) => Some(r),
2373 InterceptedSlot::Pending => None,
2374 InterceptedSlot::Deferred(_) => {
2375 unreachable!(
2376 "InterceptedSlot::Deferred should have been \
2377 resolved by resolve_deferred_slots"
2378 );
2379 }
2380 })
2381 .collect();
2382 for (i, item) in items.iter().enumerate() {
2383 if final_results[i].is_none() {
2384 remaining_items.push(item.clone());
2385 remaining_indices.push(i);
2386 }
2387 }
2388 let execution_results = Self::execute_tools_parallel(
2389 tools.tool_map(),
2390 &remaining_items,
2391 self.tool_timeout,
2392 &self.tool_context,
2393 )
2394 .await;
2395 for (exec_idx, original_idx) in remaining_indices.iter().enumerate() {
2396 final_results[*original_idx] =
2397 Some(execution_results[exec_idx].clone());
2398 }
2399 let results: Vec<motosan_agent_tool::ToolResult> = final_results
2400 .into_iter()
2401 .map(|opt| opt.expect("all slots filled after intercept + execute"))
2402 .collect();
2403 if let Err(e) = self
2404 .finalize_tool_call_batch(
2405 &items,
2406 results,
2407 iteration,
2408 tools.tool_defs(),
2409 &mut state,
2410 &on_event,
2411 )
2412 .await
2413 {
2414 return run_return_failure(
2415 e,
2416 state.messages,
2417 iteration,
2418 state.total_usage,
2419 );
2420 }
2421 }
2422 break; // Back to outer iteration loop.
2423 }
2424 }
2425 }
2426 }
2427
2428 run_return_failure(
2429 AgentError::MaxIterations(self.max_iterations),
2430 state.messages,
2431 self.max_iterations,
2432 state.total_usage,
2433 )
2434 }
2435
2436 /// Inner function covering (chunked LLM + ops_rx + cancel). Mirrors
2437 /// `run_streaming_inner`, but races the streaming LLM step against the
2438 /// cancellation token so terminal hooks observe the real in-progress
2439 /// messages / usage when cancellation wins.
2440 ///
2441 /// # Known latency gap
2442 ///
2443 /// Same as `run_inner_with_ops_and_cancel`: cancellation is not
2444 /// observed during `resolve_deferred_slots` inside the tool-call
2445 /// path. A cancel that arrives while waiting for a deferred slot
2446 /// (e.g. `ask_user` answer) will not be observed until the slot
2447 /// resolves or the 60s defer timeout fires.
2448 #[cfg(feature = "cancellation")]
2449 #[allow(dead_code)] // Called by RunBuilder dispatch in a later task
2450 async fn run_streaming_inner_with_cancel_and_ops(
2451 &self,
2452 llm: &dyn LlmClient,
2453 messages: Vec<Message>,
2454 extra_tools: &[Arc<dyn Tool>],
2455 ops_rx: mpsc::Receiver<AgentOp>,
2456 cancel: &tokio_util::sync::CancellationToken,
2457 on_event: impl Fn(AgentEvent) + Send + Sync,
2458 ) -> (Result<AgentResult>, Vec<Message>, RunTerminalMeta) {
2459 use futures::StreamExt;
2460
2461 let tools = MergedTools::new(&self.tool_map, &self.tool_defs, extra_tools);
2462 let prepared = match self.prepare_messages(messages).await {
2463 Ok(m) => m,
2464 Err(e) => return run_return_failure(e, Vec::new(), 0, TokenUsage::default()),
2465 };
2466 let mut state = TurnState::new(prepared);
2467 let mut ops_state = OpsState::default();
2468 let mut ops_rx_opt = Some(ops_rx);
2469
2470 for iteration in 1..=self.max_iterations {
2471 if cancel.is_cancelled() {
2472 return run_return_failure(
2473 AgentError::Cancelled,
2474 state.messages,
2475 iteration,
2476 state.total_usage,
2477 );
2478 }
2479
2480 if let Err(e) = self
2481 .drain_ops(
2482 &mut state.messages,
2483 &mut ops_rx_opt,
2484 &mut ops_state,
2485 &on_event,
2486 )
2487 .await
2488 {
2489 return run_return_failure(e, state.messages, iteration, state.total_usage);
2490 }
2491 if ops_state.interrupted {
2492 on_event(AgentEvent::Core(CoreEvent::Interrupted));
2493 let res =
2494 state.into_result("(interrupted)".to_string(), iteration.saturating_sub(1));
2495 let snapshot = res.messages.clone();
2496 return run_return_interrupted(res, snapshot);
2497 }
2498
2499 on_event(AgentEvent::Core(CoreEvent::IterationStarted { iteration }));
2500
2501 if let Err(e) = self
2502 .dispatch_before_iteration(iteration, &mut state.messages, &on_event)
2503 .await
2504 {
2505 return run_return_failure(e, state.messages, iteration, state.total_usage);
2506 }
2507
2508 if let Err(e) = self
2509 .maybe_compact_via_extensions(&mut state.messages, &on_event)
2510 .await
2511 {
2512 return run_return_failure(e, state.messages, iteration, state.total_usage);
2513 }
2514
2515 loop {
2516 let (
2517 accumulated_text,
2518 response,
2519 streaming_results,
2520 stream_stop_reason,
2521 output_tokens,
2522 ) = {
2523 let mut stream = llm.chat_stream(&state.messages, tools.tool_defs());
2524 let mut accumulated = String::new();
2525 let mut final_response: Option<crate::LlmResponse> = None;
2526 let mut executor = crate::streaming_executor::StreamingToolExecutor::new();
2527 let mut submitted_ids: std::collections::HashSet<String> =
2528 std::collections::HashSet::new();
2529 let mut stop_reason: Option<crate::llm::StopReason> = None;
2530 let output_tokens_before = state.total_usage.output_tokens;
2531
2532 loop {
2533 tokio::select! {
2534 chunk_opt = stream.next() => {
2535 match chunk_opt {
2536 Some(chunk_result) => {
2537 let chunk = match chunk_result {
2538 Ok(c) => c,
2539 Err(e) => return run_return_failure(e, state.messages.clone(), iteration, state.total_usage),
2540 };
2541 match chunk {
2542 crate::llm::StreamChunk::TextDelta(delta) => {
2543 accumulated.push_str(&delta);
2544 on_event(AgentEvent::Core(CoreEvent::TextChunk(delta)));
2545 }
2546 crate::llm::StreamChunk::ToolUse { id, name, args } => {
2547 if !submitted_ids.contains(&id) {
2548 let item = crate::llm::ToolCallItem {
2549 id: id.clone(),
2550 name: name.clone(),
2551 args,
2552 };
2553 on_event(AgentEvent::Core(CoreEvent::ToolStarted {
2554 name: name.clone(),
2555 }));
2556 executor.submit(
2557 item,
2558 tools.tool_map(),
2559 self.tool_timeout,
2560 &self.tool_context,
2561 );
2562 submitted_ids.insert(id);
2563 }
2564 }
2565 crate::llm::StreamChunk::Done(resp) => {
2566 final_response = Some(resp);
2567 }
2568 crate::llm::StreamChunk::Usage(usage) => {
2569 state.total_usage.accumulate(usage);
2570 }
2571 crate::llm::StreamChunk::StopReason(reason) => {
2572 stop_reason = Some(reason);
2573 }
2574 }
2575 }
2576 None => break,
2577 }
2578 }
2579 _ = cancel.cancelled() => {
2580 return run_return_failure(AgentError::Cancelled, state.messages.clone(), iteration, state.total_usage);
2581 }
2582 }
2583 }
2584
2585 let resp = final_response
2586 .unwrap_or_else(|| crate::LlmResponse::Message(accumulated.clone()));
2587 let sr = if executor.has_pending() {
2588 Some(executor.collect().await)
2589 } else {
2590 None
2591 };
2592 let delta = state.total_usage.output_tokens - output_tokens_before;
2593 let ot = if delta > 0 { Some(delta) } else { None };
2594 (accumulated, resp, sr, stop_reason, ot)
2595 };
2596
2597 match response {
2598 crate::LlmResponse::Message(text) => {
2599 if accumulated_text.is_empty() && !text.is_empty() {
2600 on_event(AgentEvent::Core(CoreEvent::TextChunk(text.clone())));
2601 }
2602 on_event(AgentEvent::Core(CoreEvent::TextDone(text.clone())));
2603
2604 let meta = crate::llm::LlmResponseMeta {
2605 stop_reason: stream_stop_reason,
2606 output_tokens,
2607 };
2608 let outcome = match self
2609 .handle_text_response(text, meta, &mut state, &on_event)
2610 .await
2611 {
2612 Ok(o) => o,
2613 Err(e) => {
2614 return run_return_failure(
2615 e,
2616 state.messages,
2617 iteration,
2618 state.total_usage,
2619 )
2620 }
2621 };
2622
2623 match outcome {
2624 TextResponseOutcome::Continue => continue,
2625 TextResponseOutcome::Halt(msg) => {
2626 return run_return_failure(
2627 AgentError::Internal(format!(
2628 "Turn halted by extension: {}",
2629 msg
2630 )),
2631 state.messages,
2632 iteration,
2633 state.total_usage,
2634 );
2635 }
2636 TextResponseOutcome::Finalize(text) => {
2637 state.messages.push(Message::assistant(&text));
2638 let res = state.into_result(text, iteration);
2639 let snapshot = res.messages.clone();
2640 return run_return_success(res, snapshot);
2641 }
2642 }
2643 }
2644 crate::LlmResponse::ToolCalls(items) => {
2645 if let Some((streamed_items, streamed_results)) = streaming_results {
2646 let (final_items, final_results) = merge_streamed_tool_results(
2647 &items,
2648 streamed_items,
2649 streamed_results,
2650 tools.tool_map(),
2651 self.tool_timeout,
2652 &self.tool_context,
2653 &on_event,
2654 )
2655 .await;
2656 if let Err(e) = self
2657 .finalize_tool_call_batch(
2658 &final_items,
2659 final_results,
2660 iteration,
2661 tools.tool_defs(),
2662 &mut state,
2663 &on_event,
2664 )
2665 .await
2666 {
2667 return run_return_failure(
2668 e,
2669 state.messages,
2670 iteration,
2671 state.total_usage,
2672 );
2673 }
2674 } else {
2675 emit_tool_started(&items, &on_event);
2676 let slots = self.dispatch_intercept_tool_calls(&items, &on_event).await;
2677 let slots = match self
2678 .resolve_deferred_slots(
2679 slots,
2680 &mut ops_state,
2681 &mut ops_rx_opt,
2682 &on_event,
2683 )
2684 .await
2685 {
2686 Ok(s) => s,
2687 Err(e) => {
2688 return run_return_failure(
2689 e,
2690 state.messages,
2691 iteration,
2692 state.total_usage,
2693 )
2694 }
2695 };
2696 let mut remaining_items: Vec<crate::llm::ToolCallItem> = Vec::new();
2697 let mut remaining_indices: Vec<usize> = Vec::new();
2698 let mut final_results: Vec<Option<motosan_agent_tool::ToolResult>> =
2699 slots
2700 .into_iter()
2701 .map(|s| match s {
2702 InterceptedSlot::Resolved(r) => Some(r),
2703 InterceptedSlot::Pending => None,
2704 InterceptedSlot::Deferred(_) => {
2705 unreachable!(
2706 "InterceptedSlot::Deferred should have been \
2707 resolved by resolve_deferred_slots"
2708 );
2709 }
2710 })
2711 .collect();
2712 for (i, item) in items.iter().enumerate() {
2713 if final_results[i].is_none() {
2714 remaining_items.push(item.clone());
2715 remaining_indices.push(i);
2716 }
2717 }
2718 let execution_results = Self::execute_tools_parallel(
2719 tools.tool_map(),
2720 &remaining_items,
2721 self.tool_timeout,
2722 &self.tool_context,
2723 )
2724 .await;
2725 for (exec_idx, original_idx) in remaining_indices.iter().enumerate() {
2726 final_results[*original_idx] =
2727 Some(execution_results[exec_idx].clone());
2728 }
2729 let results: Vec<motosan_agent_tool::ToolResult> = final_results
2730 .into_iter()
2731 .map(|opt| opt.expect("all slots filled after intercept + execute"))
2732 .collect();
2733 if let Err(e) = self
2734 .finalize_tool_call_batch(
2735 &items,
2736 results,
2737 iteration,
2738 tools.tool_defs(),
2739 &mut state,
2740 &on_event,
2741 )
2742 .await
2743 {
2744 return run_return_failure(
2745 e,
2746 state.messages,
2747 iteration,
2748 state.total_usage,
2749 );
2750 }
2751 }
2752 break;
2753 }
2754 }
2755 }
2756 }
2757
2758 run_return_failure(
2759 AgentError::MaxIterations(self.max_iterations),
2760 state.messages,
2761 self.max_iterations,
2762 state.total_usage,
2763 )
2764 }
2765
2766 /// Drain all immediately available ops from `ops_rx` and route each one
2767 /// through `dispatch_on_op` first so that extensions (e.g.
2768 /// `AskUserExtension`, `PlanningExtension`) can buffer ops that arrive
2769 /// before the corresponding deferred tool call exists. Only ops for
2770 /// which every extension returns `OpDecision::Pass` fall through to the
2771 /// legacy `apply_op` handler (Interrupt, InjectUserMessage, etc.).
2772 ///
2773 /// This fixes the pre-queue bug (#127): previously `drain_ops` called
2774 /// `apply_op` directly, which was a no-op for `AskUserAnswer` and
2775 /// `ApprovePlan`, silently dropping them.
2776 async fn drain_ops(
2777 &self,
2778 messages: &mut Vec<Message>,
2779 ops_rx: &mut Option<mpsc::Receiver<AgentOp>>,
2780 ops_state: &mut OpsState,
2781 on_event: &(impl Fn(AgentEvent) + Send + Sync),
2782 ) -> Result<()> {
2783 let Some(rx) = ops_rx.as_mut() else {
2784 return Ok(());
2785 };
2786 while let Ok(op) = rx.try_recv() {
2787 // Route to extensions first. If no extension claims the op, fall
2788 // through to the legacy inline handler.
2789 let decision = self.dispatch_on_op(&op, on_event).await?;
2790 match decision {
2791 crate::core::OpDecision::Pass => {
2792 Self::apply_op(op, messages, ops_state);
2793 }
2794 crate::core::OpDecision::Handled | crate::core::OpDecision::Reject(_) => {
2795 // Extension consumed it. Nothing more to do.
2796 }
2797 crate::core::OpDecision::ResumeDeferred { call_id, .. } => {
2798 debug_assert!(
2799 false,
2800 "extension returned ResumeDeferred from drain_ops for call_id '{}', \
2801 but no deferred calls can exist at drain time (on_terminal clears \
2802 pending state, tool calls only happen inside iterations). \
2803 This indicates an extension bug.",
2804 call_id
2805 );
2806 // In release builds, ignore the return — best-effort resilience.
2807 }
2808 }
2809 }
2810 Ok(())
2811 }
2812
2813 fn apply_op(op: AgentOp, messages: &mut Vec<Message>, ops_state: &mut OpsState) {
2814 match op {
2815 AgentOp::Interrupt => {
2816 ops_state.interrupted = true;
2817 }
2818 AgentOp::InjectUserMessage(text) => {
2819 messages.push(Message::user(&text));
2820 }
2821 AgentOp::InjectHint(hint) => {
2822 messages.push(Message::user(&format!("[Note: {hint}]")));
2823 }
2824 AgentOp::AskUserAnswer { .. } => {
2825 // Handled by AskUserExtension via the Defer/ResumeDeferred protocol.
2826 // apply_op is called for ops outside the resolve_deferred_slots loop;
2827 // AskUserAnswer arriving here is a no-op (no pending answers queue).
2828 }
2829 AgentOp::ApprovePlan { .. } => {
2830 // Handled by PlanningExtension via the Defer/ResumeDeferred protocol.
2831 // apply_op is called for ops outside the resolve_deferred_slots loop;
2832 // ApprovePlan arriving here is a no-op (no pending approval queue).
2833 }
2834 }
2835 }
2836
2837 async fn execute_tools_with_policy(
2838 &self,
2839 tool_map: &HashMap<String, Arc<dyn Tool>>,
2840 items: &[crate::llm::ToolCallItem],
2841 _messages: &mut Vec<Message>,
2842 ops_rx: &mut Option<mpsc::Receiver<AgentOp>>,
2843 ops_state: &mut OpsState,
2844 on_event: &(impl Fn(AgentEvent) + Send + Sync),
2845 ) -> Vec<ToolResult> {
2846 let policy = ToolExecutionPolicy::from_items(items);
2847 match policy {
2848 ToolExecutionPolicy::ParallelOnly => {
2849 let slots = self.dispatch_intercept_tool_calls(items, on_event).await;
2850
2851 // Separate Pending items (need normal execution) from Deferred/Resolved.
2852 let mut pending_items: Vec<crate::llm::ToolCallItem> = Vec::new();
2853 let mut pending_indices: Vec<usize> = Vec::new();
2854 for (i, (slot, item)) in slots.iter().zip(items.iter()).enumerate() {
2855 if matches!(slot, InterceptedSlot::Pending) {
2856 pending_items.push(item.clone());
2857 pending_indices.push(i);
2858 }
2859 }
2860
2861 // Run Pending tools in parallel with Deferred slot resolution.
2862 // This preserves the old behavior where non-ask_user tools run
2863 // concurrently while waiting for an ask_user answer.
2864 let (resolved_slots, pending_results) = futures::join!(
2865 self.resolve_deferred_slots(slots, ops_state, ops_rx, on_event),
2866 Self::execute_tools_parallel(
2867 tool_map,
2868 &pending_items,
2869 self.tool_timeout,
2870 &self.tool_context,
2871 )
2872 );
2873 let resolved_slots = resolved_slots.unwrap_or_else(|e| {
2874 eprintln!("[motosan-agent-loop] resolve_deferred_slots error: {}", e);
2875 Vec::new()
2876 });
2877
2878 let mut final_results: Vec<Option<motosan_agent_tool::ToolResult>> = resolved_slots
2879 .into_iter()
2880 .map(|s| match s {
2881 InterceptedSlot::Resolved(r) => Some(r),
2882 InterceptedSlot::Pending => None,
2883 InterceptedSlot::Deferred(_) => {
2884 unreachable!(
2885 "InterceptedSlot::Deferred should have been \
2886 resolved by resolve_deferred_slots"
2887 );
2888 }
2889 })
2890 .collect();
2891 for (exec_idx, original_idx) in pending_indices.iter().enumerate() {
2892 final_results[*original_idx] = Some(pending_results[exec_idx].clone());
2893 }
2894 final_results
2895 .into_iter()
2896 .map(|opt| opt.expect("all slots filled after intercept + execute"))
2897 .collect()
2898 }
2899 }
2900 }
2901
2902 /// Run each tool call through the extension `intercept_tool_call` pipeline.
2903 ///
2904 /// Returns a slot-aligned `Vec<InterceptedSlot>` matching `items`.
2905 /// See [`InterceptedSlot`] for the three possible outcomes.
2906 ///
2907 /// Forwards any `DelegationEvent::Started/Completed` as the legacy
2908 /// `AgentEvent::DelegateStarted/DelegateCompleted` for backward compatibility.
2909 /// TODO(phase-4): remove the downcast block once the public API cutover removes
2910 /// `AgentEvent::DelegateStarted` and `AgentEvent::DelegateCompleted`.
2911 async fn dispatch_intercept_tool_calls(
2912 &self,
2913 items: &[crate::llm::ToolCallItem],
2914 on_event: &(impl Fn(AgentEvent) + Send + Sync),
2915 ) -> Vec<InterceptedSlot> {
2916 // Fast path: no extensions registered, nothing to intercept.
2917 if self.extensions.lock().await.is_empty() {
2918 return items.iter().map(|_| InterceptedSlot::Pending).collect();
2919 }
2920
2921 let tools_snapshot = self.tool_defs.to_vec();
2922 let empty_messages: Vec<crate::message::Message> = vec![];
2923 let agent_state = crate::core::AgentState {
2924 iteration: 0,
2925 messages: &empty_messages,
2926 tools: &tools_snapshot,
2927 turn_started_at: std::time::Instant::now(),
2928 token_usage: crate::llm::TokenUsage::default(),
2929 };
2930
2931 let mut slots: Vec<InterceptedSlot> = Vec::with_capacity(items.len());
2932
2933 for item in items.iter() {
2934 let mut sink = crate::core::CaptureSink::default();
2935 let decision = {
2936 let mut set = self.extensions.lock().await;
2937 set.intercept_tool_call(item.clone(), &agent_state, &mut sink)
2938 .await
2939 .unwrap_or_else(|e| {
2940 eprintln!(
2941 "[motosan-agent-loop] dispatch_intercept_tool_calls failed: {}. \
2942 Treating as Proceed.",
2943 e
2944 );
2945 crate::core::ToolDecision::Proceed(item.clone())
2946 })
2947 };
2948
2949 // Forward extension events → AgentEvent::Extension stream.
2950 for (_name, evt) in sink.captured() {
2951 forward_extension_event(evt.as_ref(), on_event);
2952 }
2953
2954 let slot = match decision {
2955 crate::core::ToolDecision::ShortCircuit(result) => {
2956 InterceptedSlot::Resolved(result)
2957 }
2958 crate::core::ToolDecision::Defer { call_id, reason } => {
2959 // Record the deferred call. Task 5's resolve_deferred_slots
2960 // will read this map when it processes ResumeDeferred ops.
2961 let mut deferred = self.deferred_calls.lock().await;
2962 if deferred.contains_key(&call_id) {
2963 eprintln!(
2964 "[motosan-agent-loop] WARNING: duplicate Defer call_id '{}' \
2965 (reason: {}). Treating as Pending — the previously deferred \
2966 call may be lost.",
2967 call_id, reason
2968 );
2969 InterceptedSlot::Pending
2970 } else {
2971 deferred.insert(
2972 call_id.clone(),
2973 DeferredCall {
2974 call: item.clone(),
2975 by_extension: "unknown",
2976 // TODO(phase-3b): plumb the real extension name
2977 // through ExtensionSet::intercept_tool_call so the
2978 // diagnostic can name which extension deferred the
2979 // call. For now, "unknown" is acceptable.
2980 at: std::time::Instant::now(),
2981 },
2982 );
2983 InterceptedSlot::Deferred(call_id)
2984 }
2985 }
2986 // Proceed (with possibly modified call) or Replace —
2987 // both fall through to normal dispatch.
2988 _ => InterceptedSlot::Pending,
2989 };
2990
2991 slots.push(slot);
2992 }
2993
2994 slots
2995 }
2996
2997 /// Route an incoming `AgentOp` to the extension system via
2998 /// `ExtensionSet::on_op` and return the resulting `OpDecision`.
2999 ///
3000 /// Returns the `OpDecision` from the first extension that returned
3001 /// something other than `Pass`. `Pass` means "no extension handled
3002 /// this op" — the caller should fall back to legacy inline handling
3003 /// (until Tasks 11 and 15 delete those handlers).
3004 ///
3005 /// Forwards extension events to the legacy `AgentEvent` stream.
3006 /// Phase 3B Tasks 11 and 15 will populate the forwarding loop with
3007 /// AskUserEvent and PlanningEvent downcasts. For Task 3, the loop
3008 /// is intentionally a no-op skeleton.
3009 ///
3010 /// TODO(phase-4): delete this method once the public API cutover
3011 /// routes consumers to CoreEvent + ExtensionEvent directly.
3012 async fn dispatch_on_op(
3013 &self,
3014 op: &AgentOp,
3015 on_event: &(impl Fn(AgentEvent) + Send + Sync),
3016 ) -> Result<crate::core::OpDecision> {
3017 if self.extensions.lock().await.is_empty() {
3018 return Ok(crate::core::OpDecision::Pass);
3019 }
3020
3021 let tools_snapshot = self.tool_defs.to_vec();
3022 let empty_messages: Vec<crate::message::Message> = vec![];
3023 let agent_state = crate::core::AgentState {
3024 iteration: 0,
3025 messages: &empty_messages,
3026 tools: &tools_snapshot,
3027 turn_started_at: std::time::Instant::now(),
3028 token_usage: crate::llm::TokenUsage::default(),
3029 };
3030
3031 let mut sink = crate::core::CaptureSink::default();
3032 let decision = self
3033 .extensions
3034 .lock()
3035 .await
3036 .on_op(op, &agent_state, &mut sink)
3037 .await
3038 .map_err(|e| crate::error::AgentError::Internal(e.message().to_string()))?;
3039
3040 // Forward captured events to the AgentEvent stream.
3041 for (_name, evt) in sink.captured() {
3042 forward_extension_event(evt.as_ref(), on_event);
3043 }
3044
3045 Ok(decision)
3046 }
3047
3048 /// Wait for all `Deferred` slots in the given vec to be resolved
3049 /// via incoming `AgentOp`s and `dispatch_on_op` returning
3050 /// `ResumeDeferred`. Returns the same vec with all `Deferred`
3051 /// entries replaced by `Resolved`.
3052 ///
3053 /// On timeout (default 60s; eventually extension config should drive
3054 /// this), unresolved deferred slots are filled with an error
3055 /// `ToolResult` and a diagnostic is logged via eprintln.
3056 ///
3057 /// During the wait, ops that no extension claims (returning
3058 /// `OpDecision::Pass`) fall through to legacy inline handling
3059 /// (mirroring apply_op). Tasks 11 and 15 will replace the legacy
3060 /// handlers with extension-based ones, but for now this method
3061 /// preserves backward compatibility with the existing AskUserAnswer /
3062 /// ApprovePlan inline op handlers.
3063 async fn resolve_deferred_slots(
3064 &self,
3065 mut slots: Vec<InterceptedSlot>,
3066 ops_state: &mut OpsState,
3067 ops_rx: &mut Option<tokio::sync::mpsc::Receiver<AgentOp>>,
3068 on_event: &(impl Fn(AgentEvent) + Send + Sync),
3069 ) -> Result<Vec<InterceptedSlot>> {
3070 // Fast path: nothing deferred.
3071 if !slots
3072 .iter()
3073 .any(|s| matches!(s, InterceptedSlot::Deferred(_)))
3074 {
3075 return Ok(slots);
3076 }
3077
3078 // Use the minimum timeout from registered extensions (e.g. AskUserConfig::timeout),
3079 // falling back to a 60s ceiling if no extension constrains it.
3080 const DEFAULT_DEFER_TIMEOUT: std::time::Duration = std::time::Duration::from_secs(60);
3081 let overall_timeout = self
3082 .extensions
3083 .lock()
3084 .await
3085 .min_deferred_call_timeout()
3086 .unwrap_or(DEFAULT_DEFER_TIMEOUT);
3087 let deadline = std::time::Instant::now() + overall_timeout;
3088
3089 loop {
3090 // Check if all deferred slots are now resolved.
3091 if !slots
3092 .iter()
3093 .any(|s| matches!(s, InterceptedSlot::Deferred(_)))
3094 {
3095 break;
3096 }
3097
3098 // Compute remaining time for this iteration's wait.
3099 let now = std::time::Instant::now();
3100 if now >= deadline {
3101 // Timeout: resolve every still-Deferred slot with an error.
3102 // Emit AskUserTimeout for any ask_user deferred calls so that
3103 // the legacy AgentEvent::AskUserTimeout contract is preserved.
3104 for slot in slots.iter_mut() {
3105 if let InterceptedSlot::Deferred(call_id) = slot {
3106 let cid = call_id.clone();
3107 self.deferred_calls.lock().await.remove(&cid);
3108 // Notify all extensions of the timeout so they can emit
3109 // their own timeout events (e.g. AskUserEvent::Timeout).
3110 // This replaces the old hardcoded ask_user name check.
3111 {
3112 let tools_snapshot = self.tool_defs.to_vec();
3113 let empty_messages: Vec<crate::message::Message> = vec![];
3114 let agent_state = crate::core::AgentState {
3115 iteration: 0,
3116 messages: &empty_messages,
3117 tools: &tools_snapshot,
3118 turn_started_at: std::time::Instant::now(),
3119 token_usage: crate::llm::TokenUsage::default(),
3120 };
3121 let mut sink = crate::core::CaptureSink::default();
3122 let _ = self
3123 .extensions
3124 .lock()
3125 .await
3126 .on_defer_timeout(&cid, &agent_state, &mut sink)
3127 .await;
3128 for (_name, evt) in sink.captured() {
3129 forward_extension_event(evt.as_ref(), on_event);
3130 }
3131 }
3132 eprintln!(
3133 "[motosan-agent-loop] Defer call '{}' timed out after {:?}",
3134 cid, overall_timeout
3135 );
3136 *slot = InterceptedSlot::Resolved(motosan_agent_tool::ToolResult::error(
3137 format!(
3138 "Deferred call '{}' timed out after {:?}",
3139 cid, overall_timeout
3140 ),
3141 ));
3142 }
3143 }
3144 break;
3145 }
3146 let remaining = deadline - now;
3147
3148 // Wait for the next op or for the timeout.
3149 let op = match ops_rx {
3150 Some(rx) => {
3151 // TEST-ONLY: fire parked notifier before awaiting ops_rx.
3152 #[cfg(test)]
3153 if let Some(notify) = self.test_parked_notify.lock().unwrap().as_ref() {
3154 notify.notify_one();
3155 }
3156 tokio::select! {
3157 received = rx.recv() => {
3158 match received {
3159 Some(op) => op,
3160 None => {
3161 // Ops channel closed. Resolve remaining as errors.
3162 for slot in slots.iter_mut() {
3163 if let InterceptedSlot::Deferred(call_id) = slot {
3164 let cid = call_id.clone();
3165 self.deferred_calls.lock().await.remove(&cid);
3166 eprintln!(
3167 "[motosan-agent-loop] Defer call '{}' aborted: ops channel closed",
3168 cid
3169 );
3170 *slot = InterceptedSlot::Resolved(
3171 motosan_agent_tool::ToolResult::error(format!(
3172 "Deferred call '{}' aborted: ops channel closed",
3173 cid
3174 )),
3175 );
3176 }
3177 }
3178 break;
3179 }
3180 }
3181 }
3182 _ = tokio::time::sleep(remaining) => {
3183 continue; // loop will detect timeout on next iteration
3184 }
3185 }
3186 }
3187 None => {
3188 // No ops channel — cannot wait for resume. Resolve all
3189 // deferred slots as errors immediately and notify extensions
3190 // via on_defer_timeout so they can emit their own events.
3191 for slot in slots.iter_mut() {
3192 if let InterceptedSlot::Deferred(call_id) = slot {
3193 let cid = call_id.clone();
3194 self.deferred_calls.lock().await.remove(&cid);
3195 // Notify all extensions of the timeout/abort.
3196 // This replaces the old hardcoded ask_user name check.
3197 {
3198 let tools_snapshot = self.tool_defs.to_vec();
3199 let empty_messages: Vec<crate::message::Message> = vec![];
3200 let agent_state = crate::core::AgentState {
3201 iteration: 0,
3202 messages: &empty_messages,
3203 tools: &tools_snapshot,
3204 turn_started_at: std::time::Instant::now(),
3205 token_usage: crate::llm::TokenUsage::default(),
3206 };
3207 let mut sink = crate::core::CaptureSink::default();
3208 let _ = self
3209 .extensions
3210 .lock()
3211 .await
3212 .on_defer_timeout(&cid, &agent_state, &mut sink)
3213 .await;
3214 for (_name, evt) in sink.captured() {
3215 forward_extension_event(evt.as_ref(), on_event);
3216 }
3217 }
3218 eprintln!(
3219 "[motosan-agent-loop] Defer call '{}' aborted: no ops channel",
3220 cid
3221 );
3222 *slot = InterceptedSlot::Resolved(
3223 motosan_agent_tool::ToolResult::error(format!(
3224 "Deferred call '{}' aborted: no ops channel available",
3225 cid
3226 )),
3227 );
3228 }
3229 }
3230 break;
3231 }
3232 };
3233
3234 // Dispatch the op via the extension system first.
3235 let decision = self.dispatch_on_op(&op, on_event).await?;
3236
3237 match decision {
3238 crate::core::OpDecision::ResumeDeferred { call_id, result } => {
3239 // Find the slot with matching call_id and fill it.
3240 let mut found = false;
3241 for slot in slots.iter_mut() {
3242 if let InterceptedSlot::Deferred(slot_call_id) = slot {
3243 if slot_call_id == &call_id {
3244 *slot = InterceptedSlot::Resolved(result.clone());
3245 self.deferred_calls.lock().await.remove(&call_id);
3246 found = true;
3247 break;
3248 }
3249 }
3250 }
3251 if !found {
3252 eprintln!(
3253 "[motosan-agent-loop] WARNING: extension returned ResumeDeferred \
3254 for unknown call_id '{}' — no matching deferred slot.",
3255 call_id
3256 );
3257 }
3258 }
3259 crate::core::OpDecision::Handled | crate::core::OpDecision::Reject(_) => {
3260 // Op was handled by an extension (without resuming a
3261 // deferred call) — continue the wait.
3262 }
3263 crate::core::OpDecision::Pass => {
3264 // No extension claimed the op. Inline handling for ops
3265 // not managed by any extension.
3266 // AskUserAnswer is fully handled by AskUserExtension.
3267 // ApprovePlan is fully handled by PlanningExtension.
3268 match &op {
3269 AgentOp::Interrupt => {
3270 ops_state.interrupted = true;
3271 on_event(AgentEvent::Core(CoreEvent::Interrupted));
3272 // Resolve all remaining deferred slots with an error
3273 // so the caller gets a result, then exit.
3274 for slot in slots.iter_mut() {
3275 if let InterceptedSlot::Deferred(call_id) = slot {
3276 let cid = call_id.clone();
3277 self.deferred_calls.lock().await.remove(&cid);
3278 *slot = InterceptedSlot::Resolved(
3279 motosan_agent_tool::ToolResult::error(format!(
3280 "Deferred call '{}' aborted: loop interrupted",
3281 cid
3282 )),
3283 );
3284 }
3285 }
3286 break;
3287 }
3288 AgentOp::AskUserAnswer { .. } | AgentOp::ApprovePlan { .. } => {
3289 // Handled by AskUserExtension / PlanningExtension above via
3290 // ResumeDeferred. If they fall through to Pass, there was no
3291 // matching deferred call_id — ignore silently.
3292 }
3293 _ => {
3294 // Other ops not handled during Defer waits.
3295 }
3296 }
3297 }
3298 }
3299 }
3300
3301 Ok(slots)
3302 }
3303
3304 async fn execute_tools_parallel(
3305 tool_map: &HashMap<String, Arc<dyn Tool>>,
3306 items: &[crate::llm::ToolCallItem],
3307 timeout: Option<std::time::Duration>,
3308 ctx: &ToolContext,
3309 ) -> Vec<ToolResult> {
3310 join_all(
3311 items
3312 .iter()
3313 .map(|tc| Self::execute_tool(tool_map, &tc.name, tc.args.clone(), timeout, ctx)),
3314 )
3315 .await
3316 }
3317
3318 /// Execute a single tool by name using a pre-built lookup map.
3319 /// Returns an error `ToolResult` if the tool is not found or times out.
3320 async fn execute_tool(
3321 tool_map: &HashMap<String, Arc<dyn Tool>>,
3322 name: &str,
3323 args: serde_json::Value,
3324 timeout: Option<std::time::Duration>,
3325 ctx: &ToolContext,
3326 ) -> ToolResult {
3327 let fut = async {
3328 if let Some(tool) = tool_map.get(name) {
3329 tool.call(args, ctx).await
3330 } else {
3331 ToolResult::error(format!("unknown tool: {name}"))
3332 }
3333 };
3334 if let Some(dur) = timeout {
3335 match tokio::time::timeout(dur, fut).await {
3336 Ok(result) => result,
3337 Err(_) => ToolResult::error(format!("tool '{name}' timed out after {dur:?}")),
3338 }
3339 } else {
3340 fut.await
3341 }
3342 }
3343
3344 /// Finalize a batch of tool call results.
3345 async fn finalize_tool_call_batch(
3346 &self,
3347 items: &[ToolCallItem],
3348 raw_results: Vec<ToolResult>,
3349 iteration: usize,
3350 tools: &[ToolDef],
3351 state: &mut TurnState,
3352 on_event: &(impl Fn(AgentEvent) + Send + Sync),
3353 ) -> Result<()> {
3354 let mut results: Vec<ToolResult> = Vec::with_capacity(raw_results.len());
3355 for (tc, raw) in items.iter().zip(raw_results) {
3356 let agent_state =
3357 self.agent_state_snapshot(iteration, &state.messages, tools, state.total_usage);
3358 let rewritten = self
3359 .dispatch_rewrite_tool_result(tc, raw, &agent_state, on_event)
3360 .await?;
3361 results.push(rewritten);
3362 }
3363
3364 for (tc, result) in items.iter().zip(results.iter()) {
3365 on_event(AgentEvent::Core(CoreEvent::ToolCompleted {
3366 name: tc.name.clone(),
3367 result: result.clone(),
3368 }));
3369 state
3370 .all_tool_calls
3371 .push((tc.name.clone(), tc.args.clone()));
3372 }
3373
3374 let tool_call_refs: Vec<ToolCallRef> = items
3375 .iter()
3376 .map(|tc| ToolCallRef {
3377 id: tc.id.clone(),
3378 name: tc.name.clone(),
3379 args: tc.args.clone(),
3380 })
3381 .collect();
3382 state
3383 .messages
3384 .push(Message::assistant_with_tool_calls("", tool_call_refs));
3385
3386 for (tc, result) in items.iter().zip(results.iter()) {
3387 state
3388 .messages
3389 .push(Message::tool_result(&tc.id, &tool_result_to_string(result)));
3390 }
3391
3392 for result in &results {
3393 let agent_state =
3394 self.agent_state_snapshot(iteration, &state.messages, tools, state.total_usage);
3395 match self
3396 .dispatch_after_tool_result(result, &agent_state, on_event)
3397 .await?
3398 {
3399 FlowDecision::Continue => {}
3400 FlowDecision::Inject(msg) => state.messages.push(msg),
3401 FlowDecision::Halt(reason) => {
3402 return Err(crate::error::AgentError::Internal(reason.message));
3403 }
3404 }
3405 }
3406
3407 Ok(())
3408 }
3409}
3410
3411/// `ToolExecutionPolicy` controls how a batch of tool calls is executed.
3412///
3413/// `enter_plan_mode` / `exit_plan_mode` are now handled by `PlanningExtension`
3414/// via the Defer/ResumeDeferred protocol (just like `ask_user`), so all batches
3415/// now use the `ParallelOnly` path.
3416#[derive(Debug, Clone, Copy)]
3417enum ToolExecutionPolicy {
3418 ParallelOnly,
3419}
3420
3421impl ToolExecutionPolicy {
3422 /// Determine the execution policy for a batch of tool calls.
3423 ///
3424 /// All interactive tools (`ask_user`, `enter_plan_mode`, `exit_plan_mode`)
3425 /// are now handled by extensions via the Defer protocol — every batch uses
3426 /// `ParallelOnly`.
3427 fn from_items(_items: &[crate::llm::ToolCallItem]) -> Self {
3428 Self::ParallelOnly
3429 }
3430}
3431
3432#[derive(Default)]
3433struct OpsState {
3434 interrupted: bool,
3435}
3436
3437/// Result of dispatching a single tool call through the extension
3438/// `intercept_tool_call` pipeline.
3439///
3440/// - `Pending`: no extension claimed the call; the core's normal
3441/// tool-map dispatch should run it.
3442/// - `Resolved`: an extension returned `ShortCircuit(result)`; the
3443/// call has its final result and skips normal dispatch.
3444/// - `Deferred`: an extension returned `Defer { call_id }`; the
3445/// call is suspended, awaiting a matching `OpDecision::ResumeDeferred`.
3446/// The `call_id` is the key into `Engine::deferred_calls`.
3447#[derive(Debug)]
3448enum InterceptedSlot {
3449 Pending,
3450 Resolved(motosan_agent_tool::ToolResult),
3451 Deferred(String),
3452}
3453
3454/// A tool call that was suspended by an extension via
3455/// `ToolDecision::Defer`. The core records these in a map and
3456/// blocks the turn until a matching `OpDecision::ResumeDeferred`
3457/// arrives via the ops channel.
3458///
3459/// See spec §8.3 for the full protocol.
3460#[derive(Debug)]
3461#[allow(dead_code)] // Used in Phase 3B Tasks 4-5
3462struct DeferredCall {
3463 /// The original tool call that was deferred. Stored so the
3464 /// resume path knows which slot to fill in the tool dispatch
3465 /// batch.
3466 call: crate::llm::ToolCallItem,
3467 /// Name of the extension that returned `Defer`. Used for
3468 /// diagnostics — if a turn times out with this call still
3469 /// deferred, the error message includes which extension was
3470 /// responsible.
3471 by_extension: &'static str,
3472 /// Wall-clock instant when the call was deferred. Used for
3473 /// timeout enforcement.
3474 at: std::time::Instant,
3475}
3476
3477/// Forward a type-erased extension event captured in a `CaptureSink` to the
3478/// `AgentEvent::Extension(...)` stream.
3479///
3480/// Attempts each known extension event type in turn; if none match, the
3481/// event is silently discarded (unknown extension). Called from all four
3482/// dispatch helpers: `maybe_compact_via_extensions`, `dispatch_after_llm_response`,
3483/// `dispatch_intercept_tool_calls`, `dispatch_on_op` (and `resolve_deferred_slots`).
3484fn forward_extension_event(
3485 evt: &dyn crate::core::hook_ctx::ExtEvent,
3486 on_event: &(impl Fn(AgentEvent) + Send + Sync),
3487) {
3488 #[cfg(feature = "redact")]
3489 use crate::extensions::redact::RedactEvent;
3490 use crate::extensions::{
3491 ask_user::AskUserEvent, autocompact::AutocompactEvent, delegation::DelegationEvent,
3492 follow_up::FollowUpEvent, planning::PlanningEvent, stuck_detection::StuckDetectionEvent,
3493 token_budget::TokenBudgetEvent,
3494 };
3495 let any = evt.as_any();
3496 if let Some(e) = any.downcast_ref::<AutocompactEvent>() {
3497 on_event(AgentEvent::Extension(ExtensionEvent::Autocompact(
3498 e.clone(),
3499 )));
3500 } else if let Some(e) = any.downcast_ref::<TokenBudgetEvent>() {
3501 on_event(AgentEvent::Extension(ExtensionEvent::TokenBudget(
3502 e.clone(),
3503 )));
3504 } else if let Some(e) = any.downcast_ref::<StuckDetectionEvent>() {
3505 on_event(AgentEvent::Extension(ExtensionEvent::StuckDetection(
3506 e.clone(),
3507 )));
3508 } else if let Some(e) = any.downcast_ref::<DelegationEvent>() {
3509 on_event(AgentEvent::Extension(ExtensionEvent::Delegation(e.clone())));
3510 } else if let Some(e) = any.downcast_ref::<AskUserEvent>() {
3511 on_event(AgentEvent::Extension(ExtensionEvent::AskUser(e.clone())));
3512 } else if let Some(e) = any.downcast_ref::<PlanningEvent>() {
3513 on_event(AgentEvent::Extension(ExtensionEvent::Planning(e.clone())));
3514 } else if let Some(e) = any.downcast_ref::<FollowUpEvent>() {
3515 on_event(AgentEvent::Extension(ExtensionEvent::FollowUp(e.clone())));
3516 } else {
3517 #[cfg(feature = "redact")]
3518 if let Some(e) = any.downcast_ref::<RedactEvent>() {
3519 on_event(AgentEvent::Extension(ExtensionEvent::Redact(e.clone())));
3520 }
3521 }
3522}
3523
3524/// Convert a [`ToolResult`] into a string suitable for message content.
3525fn tool_result_to_string(result: &ToolResult) -> String {
3526 match result.as_text() {
3527 Some(text) => text.to_string(),
3528 None => {
3529 // Fall back to JSON serialization of the content.
3530 serde_json::to_string(&result.content).unwrap_or_else(|_| "<no content>".to_string())
3531 }
3532 }
3533}
3534
3535// ─────────────────────────────────────────────────────────────────────────────
3536// RunBuilder — the 0.13.0 unified agent run API
3537// ─────────────────────────────────────────────────────────────────────────────
3538
3539/// Configuration builder for a single agent turn, returned by
3540/// [`Engine::run`]. Chain axis setters (`.ops`, `.cancel`, `.chunked`)
3541/// in any order, then call one of the terminators (`.result`,
3542/// `.callback`, `.stream`) to execute the turn.
3543///
3544/// `RunBuilder` is `#[must_use]`. Dropping it without calling a
3545/// terminator emits a compiler warning. Under
3546/// `#![deny(unused_must_use)]` this becomes a hard error:
3547///
3548/// ```compile_fail
3549/// #![deny(unused_must_use)]
3550/// use std::sync::Arc;
3551/// use motosan_agent_loop::{Engine, LlmClient, Message};
3552///
3553/// fn drop_builder(agent: Arc<Engine>, llm: Arc<dyn LlmClient>) {
3554/// agent.run(llm, vec![Message::user("hi")]); // no terminator — denied
3555/// }
3556/// ```
3557///
3558/// Positive control — the same body with a terminator bound to `_`
3559/// compiles cleanly under the same deny, proving the `compile_fail`
3560/// above is failing on `unused_must_use` and not some incidental
3561/// rename or trait-resolution error:
3562///
3563/// ```no_run
3564/// #![deny(unused_must_use)]
3565/// use std::sync::Arc;
3566/// use motosan_agent_loop::{Engine, LlmClient, Message};
3567///
3568/// async fn use_builder(agent: Arc<Engine>, llm: Arc<dyn LlmClient>) {
3569/// let _ = agent.run(llm, vec![Message::user("hi")]).result().await;
3570/// }
3571/// ```
3572///
3573/// All fields are private. External code cannot construct a
3574/// `RunBuilder` directly — use [`Engine::run`].
3575#[must_use = "call .result(), .callback(cb), or .stream() to execute the run"]
3576pub struct RunBuilder {
3577 engine: Arc<Engine>,
3578 llm: Arc<dyn LlmClient>,
3579 messages: Vec<Message>,
3580 ops_rx: Option<mpsc::Receiver<AgentOp>>,
3581 #[cfg(feature = "cancellation")]
3582 cancel: Option<tokio_util::sync::CancellationToken>,
3583 chunked: bool,
3584}
3585
3586impl RunBuilder {
3587 // ─── Terminators ────────────────────────────────────────────
3588
3589 /// Run the turn to completion and return the final `AgentResult`.
3590 /// Intermediate events are generated internally but not delivered
3591 /// anywhere. Use this when you only need the final answer.
3592 pub async fn result(self) -> Result<AgentResult> {
3593 self.dispatch_callback_internal(|_| {}).await
3594 }
3595
3596 /// Run the turn and push each `AgentEvent` through the callback
3597 /// as it happens. Returns the final `AgentResult` when the turn
3598 /// terminates.
3599 pub async fn callback(
3600 self,
3601 on_event: impl Fn(AgentEvent) + Send + Sync + 'static,
3602 ) -> Result<AgentResult> {
3603 self.dispatch_callback_internal(on_event).await
3604 }
3605
3606 // ─── Internal dispatch ──────────────────────────────────────
3607
3608 /// Assemble the axis state and delegate to the appropriate inner
3609 /// function. Called by `.result()` (with a no-op callback) and
3610 /// `.callback(cb)` (with the user's callback, in Task 8).
3611 async fn dispatch_callback_internal(
3612 self,
3613 on_event: impl Fn(AgentEvent) + Send + Sync + 'static,
3614 ) -> Result<AgentResult> {
3615 // Wrap in Arc so all match arms can use it without move conflicts.
3616 let on_event = std::sync::Arc::new(on_event);
3617
3618 let RunBuilder {
3619 engine,
3620 llm,
3621 messages,
3622 ops_rx,
3623 #[cfg(feature = "cancellation")]
3624 cancel,
3625 chunked,
3626 } = self;
3627
3628 // MCP connect (external to the inner functions after Task 1).
3629 #[cfg(feature = "mcp-client")]
3630 let mcp_tools: Vec<Arc<dyn Tool>> = engine.connect_mcp_servers().await?;
3631 #[cfg(not(feature = "mcp-client"))]
3632 let mcp_tools: Vec<Arc<dyn Tool>> = vec![];
3633
3634 // Dispatch based on axis combination.
3635 let (result, _final_messages, terminal_meta) = {
3636 #[cfg(feature = "cancellation")]
3637 {
3638 match (chunked, cancel, ops_rx) {
3639 // batch LLM, no cancel, ops or no ops
3640 (false, None, ops) => {
3641 let cb = std::sync::Arc::clone(&on_event);
3642 engine
3643 .run_inner_with_ops(llm.as_ref(), messages, &mcp_tools, &*cb, ops)
3644 .await
3645 }
3646 // batch LLM, cancel, no ops
3647 (false, Some(token), None) => {
3648 let cb = std::sync::Arc::clone(&on_event);
3649 engine
3650 .run_inner_cancel(llm.as_ref(), messages, &mcp_tools, &*cb, &token)
3651 .await
3652 }
3653 // batch LLM, cancel + ops
3654 (false, Some(token), Some(rx)) => {
3655 let cb = std::sync::Arc::clone(&on_event);
3656 engine
3657 .run_inner_with_ops_and_cancel(
3658 llm.as_ref(),
3659 messages,
3660 &mcp_tools,
3661 &*cb,
3662 rx,
3663 &token,
3664 )
3665 .await
3666 }
3667 // chunked LLM, no cancel, ops or no ops
3668 (true, None, ops) => {
3669 let cb = std::sync::Arc::clone(&on_event);
3670 engine
3671 .run_streaming_inner(
3672 llm.as_ref(),
3673 messages,
3674 &mcp_tools,
3675 ops,
3676 move |e| cb(e),
3677 )
3678 .await
3679 }
3680 // chunked LLM, cancel, no ops
3681 (true, Some(token), None) => {
3682 let cb = std::sync::Arc::clone(&on_event);
3683 engine
3684 .run_streaming_with_cancel_inner(
3685 llm.as_ref(),
3686 messages,
3687 &mcp_tools,
3688 move |e| cb(e),
3689 &token,
3690 )
3691 .await
3692 }
3693 // chunked LLM, cancel + ops
3694 (true, Some(token), Some(rx)) => {
3695 let cb = std::sync::Arc::clone(&on_event);
3696 engine
3697 .run_streaming_inner_with_cancel_and_ops(
3698 llm.as_ref(),
3699 messages,
3700 &mcp_tools,
3701 rx,
3702 &token,
3703 move |e| cb(e),
3704 )
3705 .await
3706 }
3707 }
3708 }
3709 #[cfg(not(feature = "cancellation"))]
3710 {
3711 match (chunked, ops_rx) {
3712 (false, ops) => {
3713 let cb = std::sync::Arc::clone(&on_event);
3714 engine
3715 .run_inner_with_ops(llm.as_ref(), messages, &mcp_tools, &*cb, ops)
3716 .await
3717 }
3718 (true, ops) => {
3719 let cb = std::sync::Arc::clone(&on_event);
3720 engine
3721 .run_streaming_inner(
3722 llm.as_ref(),
3723 messages,
3724 &mcp_tools,
3725 ops,
3726 move |e| cb(e),
3727 )
3728 .await
3729 }
3730 }
3731 }
3732 };
3733
3734 let merged_tools = MergedTools::new(&engine.tool_map, &engine.tool_defs, &mcp_tools);
3735 let terminal_result = match engine
3736 .dispatch_on_terminal_from_meta(
3737 &terminal_meta,
3738 &_final_messages,
3739 merged_tools.tool_defs(),
3740 &*on_event,
3741 )
3742 .await
3743 {
3744 Ok(()) => result,
3745 Err(e) => Err(e),
3746 };
3747
3748 // MCP disconnect.
3749 #[cfg(feature = "mcp-client")]
3750 for server in &engine.mcp_servers {
3751 let _ = server.disconnect().await;
3752 }
3753
3754 terminal_result
3755 }
3756
3757 /// Return a pulled stream of `AgentStreamItem`s. The consumer
3758 /// drives the turn by calling `.next().await`. The final
3759 /// `Terminal` item is guaranteed to be the last yielded before
3760 /// the underlying channel closes.
3761 ///
3762 /// Use `.callback(cb)` or `.result()` if you want a push-based
3763 /// delivery model or a final-result-only future.
3764 pub fn stream(
3765 self,
3766 ) -> impl futures::Stream<Item = crate::stream::AgentStreamItem> + Send + 'static {
3767 self.dispatch_stream()
3768 }
3769
3770 /// Spawns a tokio task running the same 8-way dispatch as
3771 /// `dispatch_callback_internal`, with events piped into an mpsc
3772 /// channel. The receiver is returned wrapped in
3773 /// `ReceiverStream`, giving consumers a pull-based delivery
3774 /// model while internally reusing the same axis routing.
3775 fn dispatch_stream(
3776 self,
3777 ) -> impl futures::Stream<Item = crate::stream::AgentStreamItem> + Send + 'static {
3778 use crate::stream::{AgentStreamItem, AgentTerminal};
3779
3780 let capacity = self.engine.channel_config.stream_capacity.unwrap_or(256);
3781 let (tx, rx) = mpsc::channel::<AgentStreamItem>(capacity);
3782
3783 let RunBuilder {
3784 engine,
3785 llm,
3786 messages,
3787 ops_rx,
3788 #[cfg(feature = "cancellation")]
3789 cancel,
3790 chunked,
3791 } = self;
3792 let tx_for_events = tx.clone();
3793
3794 tokio::spawn(async move {
3795 let on_event = move |e: AgentEvent| {
3796 let _ = tx_for_events.try_send(AgentStreamItem::Event(e));
3797 };
3798 let on_event = std::sync::Arc::new(on_event);
3799
3800 // MCP connect.
3801 #[cfg(feature = "mcp-client")]
3802 let mcp_tools: Vec<Arc<dyn Tool>> = match engine.connect_mcp_servers().await {
3803 Ok(t) => t,
3804 Err(e) => {
3805 let _ = tx
3806 .send(AgentStreamItem::Terminal(AgentTerminal {
3807 result: Err(e),
3808 messages: Vec::new(),
3809 }))
3810 .await;
3811 return;
3812 }
3813 };
3814 #[cfg(not(feature = "mcp-client"))]
3815 let mcp_tools: Vec<Arc<dyn Tool>> = vec![];
3816
3817 // Same 8-way dispatch match as dispatch_callback_internal.
3818 let (result, final_messages, terminal_meta) = {
3819 #[cfg(feature = "cancellation")]
3820 {
3821 match (chunked, cancel, ops_rx) {
3822 (false, None, ops) => {
3823 let cb = std::sync::Arc::clone(&on_event);
3824 engine
3825 .run_inner_with_ops(llm.as_ref(), messages, &mcp_tools, &*cb, ops)
3826 .await
3827 }
3828 (false, Some(token), None) => {
3829 let cb = std::sync::Arc::clone(&on_event);
3830 engine
3831 .run_inner_cancel(llm.as_ref(), messages, &mcp_tools, &*cb, &token)
3832 .await
3833 }
3834 (false, Some(token), Some(rx_ops)) => {
3835 let cb = std::sync::Arc::clone(&on_event);
3836 engine
3837 .run_inner_with_ops_and_cancel(
3838 llm.as_ref(),
3839 messages,
3840 &mcp_tools,
3841 &*cb,
3842 rx_ops,
3843 &token,
3844 )
3845 .await
3846 }
3847 (true, None, ops) => {
3848 let cb = std::sync::Arc::clone(&on_event);
3849 engine
3850 .run_streaming_inner(
3851 llm.as_ref(),
3852 messages,
3853 &mcp_tools,
3854 ops,
3855 move |e| cb(e),
3856 )
3857 .await
3858 }
3859 (true, Some(token), None) => {
3860 let cb = std::sync::Arc::clone(&on_event);
3861 engine
3862 .run_streaming_with_cancel_inner(
3863 llm.as_ref(),
3864 messages,
3865 &mcp_tools,
3866 move |e| cb(e),
3867 &token,
3868 )
3869 .await
3870 }
3871 (true, Some(token), Some(rx_ops)) => {
3872 let cb = std::sync::Arc::clone(&on_event);
3873 engine
3874 .run_streaming_inner_with_cancel_and_ops(
3875 llm.as_ref(),
3876 messages,
3877 &mcp_tools,
3878 rx_ops,
3879 &token,
3880 move |e| cb(e),
3881 )
3882 .await
3883 }
3884 }
3885 }
3886 #[cfg(not(feature = "cancellation"))]
3887 {
3888 match (chunked, ops_rx) {
3889 (false, ops) => {
3890 let cb = std::sync::Arc::clone(&on_event);
3891 engine
3892 .run_inner_with_ops(llm.as_ref(), messages, &mcp_tools, &*cb, ops)
3893 .await
3894 }
3895 (true, ops) => {
3896 let cb = std::sync::Arc::clone(&on_event);
3897 engine
3898 .run_streaming_inner(
3899 llm.as_ref(),
3900 messages,
3901 &mcp_tools,
3902 ops,
3903 move |e| cb(e),
3904 )
3905 .await
3906 }
3907 }
3908 }
3909 };
3910
3911 let merged_tools = MergedTools::new(&engine.tool_map, &engine.tool_defs, &mcp_tools);
3912 let terminal_result = match engine
3913 .dispatch_on_terminal_from_meta(
3914 &terminal_meta,
3915 &final_messages,
3916 merged_tools.tool_defs(),
3917 &*on_event,
3918 )
3919 .await
3920 {
3921 Ok(()) => result,
3922 Err(e) => Err(e),
3923 };
3924
3925 // MCP disconnect.
3926 #[cfg(feature = "mcp-client")]
3927 for server in &engine.mcp_servers {
3928 let _ = server.disconnect().await;
3929 }
3930
3931 // CRITICAL: send Terminal with awaiting send (not try_send) to
3932 // guarantee delivery even if the consumer is slow.
3933 let _ = tx
3934 .send(AgentStreamItem::Terminal(AgentTerminal {
3935 result: terminal_result,
3936 messages: final_messages,
3937 }))
3938 .await;
3939 });
3940
3941 tokio_stream::wrappers::ReceiverStream::new(rx)
3942 }
3943
3944 // ─── Axis setters ────────────────────────────────────────────
3945
3946 /// Route `AgentOp` values from an external channel into the
3947 /// running turn. Calling twice overwrites the first receiver
3948 /// (last-write-wins).
3949 pub fn ops(mut self, rx: mpsc::Receiver<AgentOp>) -> Self {
3950 self.ops_rx = Some(rx);
3951 self
3952 }
3953
3954 /// Attach a cancellation token. When tripped, the turn exits
3955 /// with `AgentError::Cancelled`. Calling twice overwrites.
3956 #[cfg(feature = "cancellation")]
3957 pub fn cancel(mut self, token: tokio_util::sync::CancellationToken) -> Self {
3958 self.cancel = Some(token);
3959 self
3960 }
3961
3962 /// Use [`LlmClient::chat_stream`] instead of [`LlmClient::chat`],
3963 /// so `CoreEvent::TextChunk` events fire per LLM token delta.
3964 /// Observable via `.callback(cb)` or by draining the stream
3965 /// returned by `.stream()`. Without an event observer, the chunks
3966 /// are still generated but not delivered anywhere.
3967 ///
3968 /// Equivalent to `stream=True` on the OpenAI / Anthropic SDKs.
3969 pub fn chunked(mut self) -> Self {
3970 self.chunked = true;
3971 self
3972 }
3973}
3974
3975impl Engine {
3976 /// Build a run configuration for this agent.
3977 ///
3978 /// Chain axis setters (`.ops(rx)`, `.cancel(token)`, `.chunked()`)
3979 /// in any order, then call one of the terminators (`.result()`,
3980 /// `.callback(cb)`, `.stream()`) to execute the turn.
3981 ///
3982 /// # Example
3983 ///
3984 /// ```ignore
3985 /// use std::sync::Arc;
3986 /// use motosan_agent_loop::{Engine, Message};
3987 ///
3988 /// # async fn demo(llm: Arc<dyn motosan_agent_loop::LlmClient>) -> motosan_agent_loop::Result<()> {
3989 /// let agent = Arc::new(Engine::builder().build());
3990 ///
3991 /// // Simplest: just the final result.
3992 /// let result = Arc::clone(&agent)
3993 /// .run(llm.clone(), vec![Message::user("Hi!")])
3994 /// .result()
3995 /// .await?;
3996 /// # Ok(()) }
3997 /// ```
3998 ///
3999 /// See the [`RunBuilder`] struct documentation for the full set
4000 /// of axis setters and terminators.
4001 pub fn run(self: Arc<Self>, llm: Arc<dyn LlmClient>, messages: Vec<Message>) -> RunBuilder {
4002 RunBuilder {
4003 engine: self,
4004 llm,
4005 messages,
4006 ops_rx: None,
4007 #[cfg(feature = "cancellation")]
4008 cancel: None,
4009 chunked: false,
4010 }
4011 }
4012}
4013
4014#[cfg(test)]
4015#[path = "engine_tests.rs"]
4016mod tests;
4017
4018#[cfg(all(test, feature = "mcp-client"))]
4019mod mcp_integration_tests {
4020 use super::*;
4021 use crate::mcp::McpServer;
4022 use async_trait::async_trait;
4023 use motosan_agent_tool::ToolDef;
4024 use serde_json::json;
4025
4026 struct EchoMcpServer {
4027 name: String,
4028 }
4029
4030 #[async_trait]
4031 impl McpServer for EchoMcpServer {
4032 fn name(&self) -> &str {
4033 &self.name
4034 }
4035 async fn connect(&self) -> crate::Result<()> {
4036 Ok(())
4037 }
4038 async fn list_tools(&self) -> crate::Result<Vec<ToolDef>> {
4039 Ok(vec![ToolDef {
4040 name: "echo".to_string(),
4041 description: "Echo input".to_string(),
4042 input_schema: json!({"type": "object", "properties": {"msg": {"type": "string"}}}),
4043 }])
4044 }
4045 async fn call_tool(&self, _name: &str, args: serde_json::Value) -> crate::Result<String> {
4046 Ok(format!(
4047 "echo: {}",
4048 args.get("msg").and_then(|v| v.as_str()).unwrap_or("")
4049 ))
4050 }
4051 async fn disconnect(&self) -> crate::Result<()> {
4052 Ok(())
4053 }
4054 }
4055
4056 #[test]
4057 fn builder_accepts_mcp_server() {
4058 let agent = Arc::new(
4059 Engine::builder()
4060 .mcp_server(EchoMcpServer {
4061 name: "test_mcp".to_string(),
4062 })
4063 .max_iterations(5)
4064 .build(),
4065 );
4066 assert_eq!(agent.mcp_servers.len(), 1);
4067 }
4068
4069 #[test]
4070 fn builder_accepts_shared_arc_mcp_server() {
4071 let shared: Arc<dyn McpServer> = Arc::new(EchoMcpServer {
4072 name: "shared_mcp".to_string(),
4073 });
4074
4075 let agent_a = Engine::builder()
4076 .mcp_server_arc(Arc::clone(&shared))
4077 .max_iterations(5)
4078 .build();
4079
4080 let agent_b = Engine::builder()
4081 .mcp_server_arc(Arc::clone(&shared))
4082 .max_iterations(5)
4083 .build();
4084
4085 assert_eq!(agent_a.mcp_servers.len(), 1);
4086 assert_eq!(agent_b.mcp_servers.len(), 1);
4087 assert!(Arc::ptr_eq(
4088 &agent_a.mcp_servers[0],
4089 &agent_b.mcp_servers[0]
4090 ));
4091 }
4092
4093 /// A simple mock LLM for MCP streaming tests.
4094 /// Uses the default `chat_stream` fallback (delegates to `chat`).
4095 struct McpTestLlm {
4096 responses: std::sync::Mutex<Vec<crate::llm::LlmResponse>>,
4097 }
4098
4099 impl McpTestLlm {
4100 fn new(responses: Vec<crate::llm::LlmResponse>) -> Self {
4101 Self {
4102 responses: std::sync::Mutex::new(responses),
4103 }
4104 }
4105 }
4106
4107 #[async_trait]
4108 impl crate::llm::LlmClient for McpTestLlm {
4109 async fn chat(
4110 &self,
4111 _messages: &[Message],
4112 _tools: &[ToolDef],
4113 ) -> crate::Result<crate::llm::ChatOutput> {
4114 let mut responses = self.responses.lock().unwrap();
4115 assert!(!responses.is_empty(), "McpTestLlm: no more responses");
4116 let response = responses.remove(0);
4117 Ok(crate::llm::ChatOutput {
4118 response,
4119 usage: None,
4120 stop_reason: None,
4121 })
4122 }
4123 }
4124
4125 #[tokio::test]
4126 async fn run_streaming_with_mcp_server() {
4127 use crate::llm::{LlmResponse, ToolCallItem};
4128 use std::sync::Mutex;
4129
4130 // LLM first requests the MCP tool (namespaced as server__tool),
4131 // then returns a final message.
4132 // Uses the default chat_stream fallback (delegates to chat).
4133 let llm = Arc::new(McpTestLlm::new(vec![
4134 LlmResponse::ToolCalls(vec![ToolCallItem {
4135 id: "call_mcp".to_string(),
4136 name: "test_mcp__echo".to_string(),
4137 args: json!({"msg": "hello"}),
4138 }]),
4139 LlmResponse::Message("MCP says: echo hello".to_string()),
4140 ]));
4141
4142 let events: Arc<Mutex<Vec<String>>> = Arc::new(Mutex::new(Vec::new()));
4143 let events_cb = events.clone();
4144
4145 let agent = Arc::new(
4146 Engine::builder()
4147 .mcp_server(EchoMcpServer {
4148 name: "test_mcp".to_string(),
4149 })
4150 .max_iterations(5)
4151 .build(),
4152 );
4153
4154 let result = Arc::clone(&agent)
4155 .run(
4156 llm.clone() as Arc<dyn LlmClient>,
4157 vec![Message::user("call echo")],
4158 )
4159 .chunked()
4160 .callback(move |event| {
4161 use crate::extensions::ask_user::AskUserEvent;
4162 let label = match &event {
4163 AgentEvent::Core(CoreEvent::ToolStarted { name }) => {
4164 format!("started:{name}")
4165 }
4166 AgentEvent::Core(CoreEvent::ToolCompleted { name, result }) => {
4167 format!("completed:{name}:{}", result.as_text().unwrap_or(""))
4168 }
4169 AgentEvent::Core(CoreEvent::TextChunk(t)) => format!("chunk:{t}"),
4170 AgentEvent::Core(CoreEvent::TextDone(t)) => format!("done:{t}"),
4171 AgentEvent::Core(CoreEvent::IterationStarted { iteration: n }) => {
4172 format!("iter:{n}")
4173 }
4174 AgentEvent::Core(CoreEvent::Interrupted) => "interrupted".to_string(),
4175 AgentEvent::Extension(ExtensionEvent::AskUser(AskUserEvent::Question {
4176 questions,
4177 ..
4178 })) => format!(
4179 "ask_user:{}",
4180 questions.first().map(|q| q.question.as_str()).unwrap_or("")
4181 ),
4182 AgentEvent::Extension(ExtensionEvent::AskUser(AskUserEvent::Timeout {
4183 call_id,
4184 ..
4185 })) => {
4186 format!("ask_user_timeout:{call_id}")
4187 }
4188 _ => format!("{event:?}"),
4189 };
4190 events_cb.lock().unwrap().push(label);
4191 })
4192 .await
4193 .unwrap();
4194
4195 assert_eq!(result.answer, "MCP says: echo hello");
4196 assert_eq!(result.tool_calls.len(), 1);
4197 assert_eq!(result.tool_calls[0].0, "test_mcp__echo");
4198
4199 let events = events.lock().unwrap();
4200 // The MCP tool should have been executed and returned "echo: hello".
4201 assert!(events
4202 .iter()
4203 .any(|e| e.starts_with("completed:test_mcp__echo:echo: hello")));
4204 }
4205
4206 /// MCP server that tracks connect/disconnect calls via shared counters.
4207 struct TrackingMcpServer {
4208 name: String,
4209 fail_connect: bool,
4210 connected: Arc<std::sync::atomic::AtomicBool>,
4211 disconnect_count: Arc<std::sync::atomic::AtomicUsize>,
4212 }
4213
4214 #[async_trait]
4215 impl McpServer for TrackingMcpServer {
4216 fn name(&self) -> &str {
4217 &self.name
4218 }
4219 async fn connect(&self) -> crate::Result<()> {
4220 if self.fail_connect {
4221 Err(crate::AgentError::Mcp("connect failed".into()))
4222 } else {
4223 self.connected
4224 .store(true, std::sync::atomic::Ordering::SeqCst);
4225 Ok(())
4226 }
4227 }
4228 async fn list_tools(&self) -> crate::Result<Vec<ToolDef>> {
4229 Ok(vec![])
4230 }
4231 async fn call_tool(&self, _name: &str, _args: serde_json::Value) -> crate::Result<String> {
4232 Ok(String::new())
4233 }
4234 async fn disconnect(&self) -> crate::Result<()> {
4235 self.disconnect_count
4236 .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
4237 Ok(())
4238 }
4239 }
4240
4241 #[tokio::test]
4242 async fn partial_connect_failure_disconnects_already_connected() {
4243 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
4244
4245 let server1_connected = Arc::new(AtomicBool::new(false));
4246 let server1_disconnects = Arc::new(AtomicUsize::new(0));
4247 let server2_disconnects = Arc::new(AtomicUsize::new(0));
4248
4249 let server1: Arc<dyn McpServer> = Arc::new(TrackingMcpServer {
4250 name: "ok_server".to_string(),
4251 fail_connect: false,
4252 connected: server1_connected.clone(),
4253 disconnect_count: server1_disconnects.clone(),
4254 });
4255 let server2: Arc<dyn McpServer> = Arc::new(TrackingMcpServer {
4256 name: "fail_server".to_string(),
4257 fail_connect: true,
4258 connected: Arc::new(AtomicBool::new(false)),
4259 disconnect_count: server2_disconnects.clone(),
4260 });
4261
4262 let agent = Arc::new(
4263 Engine::builder()
4264 .mcp_server_arc(server1)
4265 .mcp_server_arc(server2)
4266 .max_iterations(1)
4267 .build(),
4268 );
4269
4270 let llm = Arc::new(McpTestLlm::new(vec![]));
4271 let err = Arc::clone(&agent)
4272 .run(llm.clone() as Arc<dyn LlmClient>, vec![Message::user("hi")])
4273 .result()
4274 .await
4275 .unwrap_err();
4276
4277 assert!(
4278 matches!(err, crate::AgentError::Mcp(_)),
4279 "expected connect error"
4280 );
4281 // Server 1 was successfully connected, so it must have been disconnected.
4282 assert!(server1_connected.load(Ordering::SeqCst));
4283 assert_eq!(server1_disconnects.load(Ordering::SeqCst), 1);
4284 // Server 2 never connected, so disconnect should not have been called.
4285 assert_eq!(server2_disconnects.load(Ordering::SeqCst), 0);
4286 }
4287}