Skip to main content

tirea_agent_loop/runtime/loop_runner/
tool_exec.rs

1use super::core::{
2    clear_agent_pending_interaction, drain_agent_append_user_messages,
3    set_agent_pending_interaction,
4};
5use super::plugin_runtime::emit_phase_checked;
6use super::{
7    AgentConfig, AgentLoopError, RunCancellationToken, TOOL_SCOPE_CALLER_MESSAGES_KEY,
8    TOOL_SCOPE_CALLER_STATE_KEY, TOOL_SCOPE_CALLER_THREAD_ID_KEY,
9};
10use crate::contracts::plugin::phase::{Phase, StepContext, ToolContext};
11use crate::contracts::plugin::AgentPlugin;
12use crate::contracts::runtime::ActivityManager;
13use crate::contracts::runtime::{
14    StreamResult, ToolExecution, ToolExecutionRequest, ToolExecutionResult, ToolExecutor,
15    ToolExecutorError,
16};
17use crate::contracts::thread::Thread;
18use crate::contracts::thread::{Message, MessageMetadata};
19use crate::contracts::tool::{Tool, ToolDescriptor, ToolResult};
20use crate::contracts::Interaction;
21use crate::contracts::RunContext;
22use crate::engine::convert::tool_response;
23use crate::engine::tool_execution::collect_patches;
24use crate::engine::tool_filter::{SCOPE_ALLOWED_TOOLS_KEY, SCOPE_EXCLUDED_TOOLS_KEY};
25use crate::runtime::control::LoopControlState;
26use crate::runtime::run_context::{await_or_cancel, is_cancelled, CancelAware};
27use async_trait::async_trait;
28use serde_json::Value;
29use std::collections::HashMap;
30use std::sync::Arc;
31use tirea_state::State;
32use tirea_state::{PatchExt, TrackedPatch};
33
34pub(super) struct AppliedToolResults {
35    pub(super) pending_interaction: Option<Interaction>,
36    pub(super) state_snapshot: Option<Value>,
37}
38
39#[derive(Clone)]
40pub(super) struct ToolPhaseContext<'a> {
41    pub(super) tool_descriptors: &'a [ToolDescriptor],
42    pub(super) plugins: &'a [Arc<dyn AgentPlugin>],
43    pub(super) activity_manager: Option<Arc<dyn ActivityManager>>,
44    pub(super) run_config: &'a tirea_contract::RunConfig,
45    pub(super) thread_id: &'a str,
46    pub(super) thread_messages: &'a [Arc<Message>],
47    pub(super) cancellation_token: Option<&'a RunCancellationToken>,
48}
49
50impl<'a> ToolPhaseContext<'a> {
51    pub(super) fn from_request(request: &'a ToolExecutionRequest<'a>) -> Self {
52        Self {
53            tool_descriptors: request.tool_descriptors,
54            plugins: request.plugins,
55            activity_manager: request.activity_manager.clone(),
56            run_config: request.run_config,
57            thread_id: request.thread_id,
58            thread_messages: request.thread_messages,
59            cancellation_token: request.cancellation_token,
60        }
61    }
62}
63
64fn map_tool_executor_error(err: AgentLoopError) -> ToolExecutorError {
65    match err {
66        AgentLoopError::Cancelled { run_ctx } => ToolExecutorError::Cancelled {
67            thread_id: run_ctx.thread_id().to_string(),
68        },
69        other => ToolExecutorError::Failed {
70            message: other.to_string(),
71        },
72    }
73}
74
75/// Executes all tool calls concurrently.
76#[derive(Debug, Clone, Copy, Default)]
77pub struct ParallelToolExecutor;
78
79#[async_trait]
80impl ToolExecutor for ParallelToolExecutor {
81    async fn execute(
82        &self,
83        request: ToolExecutionRequest<'_>,
84    ) -> Result<Vec<ToolExecutionResult>, ToolExecutorError> {
85        let phase_ctx = ToolPhaseContext::from_request(&request);
86        execute_tools_parallel_with_phases(request.tools, request.calls, request.state, phase_ctx)
87            .await
88            .map_err(map_tool_executor_error)
89    }
90
91    fn name(&self) -> &'static str {
92        "parallel"
93    }
94
95    fn requires_parallel_patch_conflict_check(&self) -> bool {
96        true
97    }
98}
99
100/// Executes tool calls one-by-one in call order.
101#[derive(Debug, Clone, Copy, Default)]
102pub struct SequentialToolExecutor;
103
104#[async_trait]
105impl ToolExecutor for SequentialToolExecutor {
106    async fn execute(
107        &self,
108        request: ToolExecutionRequest<'_>,
109    ) -> Result<Vec<ToolExecutionResult>, ToolExecutorError> {
110        let phase_ctx = ToolPhaseContext::from_request(&request);
111        execute_tools_sequential_with_phases(request.tools, request.calls, request.state, phase_ctx)
112            .await
113            .map_err(map_tool_executor_error)
114    }
115
116    fn name(&self) -> &'static str {
117        "sequential"
118    }
119}
120
121fn validate_parallel_state_patch_conflicts(
122    results: &[ToolExecutionResult],
123) -> Result<(), AgentLoopError> {
124    for (left_idx, left) in results.iter().enumerate() {
125        let mut left_patches: Vec<&TrackedPatch> = Vec::new();
126        if let Some(ref patch) = left.execution.patch {
127            left_patches.push(patch);
128        }
129        left_patches.extend(left.pending_patches.iter());
130
131        if left_patches.is_empty() {
132            continue;
133        }
134
135        for right in results.iter().skip(left_idx + 1) {
136            let mut right_patches: Vec<&TrackedPatch> = Vec::new();
137            if let Some(ref patch) = right.execution.patch {
138                right_patches.push(patch);
139            }
140            right_patches.extend(right.pending_patches.iter());
141
142            if right_patches.is_empty() {
143                continue;
144            }
145
146            for left_patch in &left_patches {
147                for right_patch in &right_patches {
148                    let conflicts = left_patch.patch().conflicts_with(right_patch.patch());
149                    if let Some(conflict) = conflicts.first() {
150                        return Err(AgentLoopError::StateError(format!(
151                            "conflicting parallel state patches between '{}' and '{}' at {}",
152                            left.execution.call.id, right.execution.call.id, conflict.path
153                        )));
154                    }
155                }
156            }
157        }
158    }
159
160    Ok(())
161}
162
163pub(super) fn apply_tool_results_to_session(
164    run_ctx: &mut RunContext,
165    results: &[ToolExecutionResult],
166    metadata: Option<MessageMetadata>,
167    check_parallel_patch_conflicts: bool,
168) -> Result<AppliedToolResults, AgentLoopError> {
169    apply_tool_results_impl(
170        run_ctx,
171        results,
172        metadata,
173        check_parallel_patch_conflicts,
174        None,
175    )
176}
177
178pub(super) fn apply_tool_results_impl(
179    run_ctx: &mut RunContext,
180    results: &[ToolExecutionResult],
181    metadata: Option<MessageMetadata>,
182    check_parallel_patch_conflicts: bool,
183    tool_msg_ids: Option<&HashMap<String, String>>,
184) -> Result<AppliedToolResults, AgentLoopError> {
185    let pending_interaction_idx = results.iter().position(|r| r.pending_interaction.is_some());
186    let pending_interaction =
187        pending_interaction_idx.and_then(|idx| results[idx].pending_interaction.clone());
188    let pending_interaction_id = pending_interaction.as_ref().map(|i| i.id.clone());
189
190    if check_parallel_patch_conflicts {
191        validate_parallel_state_patch_conflicts(results)?;
192    }
193
194    // Collect patches from completed tools and plugin pending patches.
195    let mut patches: Vec<TrackedPatch> = collect_patches(
196        &results
197            .iter()
198            .map(|r| r.execution.clone())
199            .collect::<Vec<_>>(),
200    );
201    for r in results {
202        patches.extend(r.pending_patches.iter().cloned());
203    }
204    let mut state_changed = !patches.is_empty();
205    run_ctx.add_thread_patches(patches);
206
207    // Add tool result messages for all executions.
208    let tool_messages: Vec<Arc<Message>> = results
209        .iter()
210        .enumerate()
211        .flat_map(|(idx, r)| {
212            let is_active_pending = pending_interaction_idx == Some(idx);
213            let mut msgs = if is_active_pending {
214                vec![Message::tool(
215                    &r.execution.call.id,
216                    format!(
217                        "Tool '{}' is awaiting approval. Execution paused.",
218                        r.execution.call.name
219                    ),
220                )]
221            } else {
222                let message_result = if r.pending_interaction.is_some() {
223                    ToolResult::error(
224                        &r.execution.call.name,
225                        format!(
226                            "Tool '{}' was deferred because interaction '{}' is already pending in this round.",
227                            r.execution.call.name,
228                            pending_interaction_id.as_deref().unwrap_or("unknown")
229                        ),
230                    )
231                } else {
232                    r.execution.result.clone()
233                };
234                let mut tool_msg = tool_response(&r.execution.call.id, &message_result);
235                if let Some(id) = tool_msg_ids.and_then(|ids| ids.get(&r.execution.call.id)) {
236                    tool_msg = tool_msg.with_id(id.clone());
237                }
238                vec![tool_msg]
239            };
240            for reminder in &r.reminders {
241                msgs.push(Message::internal_system(format!(
242                    "<system-reminder>{}</system-reminder>",
243                    reminder
244                )));
245            }
246            if let Some(ref meta) = metadata {
247                for msg in &mut msgs {
248                    msg.metadata = Some(meta.clone());
249                }
250            }
251            msgs.into_iter().map(Arc::new).collect::<Vec<_>>()
252        })
253        .collect();
254
255    run_ctx.add_messages(tool_messages);
256    let appended_count = drain_agent_append_user_messages(run_ctx, results, metadata.as_ref())?;
257    if appended_count > 0 {
258        state_changed = true;
259    }
260
261    if let Some(interaction) = pending_interaction.clone() {
262        let frontend_invocation = pending_interaction_idx
263            .and_then(|idx| results[idx].pending_frontend_invocation.clone());
264        let state = run_ctx
265            .snapshot()
266            .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
267        let patch =
268            set_agent_pending_interaction(&state, interaction.clone(), frontend_invocation)?;
269        if !patch.patch().is_empty() {
270            state_changed = true;
271            run_ctx.add_thread_patch(patch);
272        }
273        let state_snapshot = if state_changed {
274            Some(
275                run_ctx
276                    .snapshot()
277                    .map_err(|e| AgentLoopError::StateError(e.to_string()))?,
278            )
279        } else {
280            None
281        };
282        return Ok(AppliedToolResults {
283            pending_interaction: Some(interaction),
284            state_snapshot,
285        });
286    }
287
288    // If a previous run left a persisted pending interaction, clear it once we successfully
289    // complete tool execution without creating a new pending interaction.
290    let state = run_ctx
291        .snapshot()
292        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
293    if state
294        .get(LoopControlState::PATH)
295        .and_then(|v| v.get("pending_interaction"))
296        .is_some()
297    {
298        let patch = clear_agent_pending_interaction(&state)?;
299        if !patch.patch().is_empty() {
300            state_changed = true;
301            run_ctx.add_thread_patch(patch);
302        }
303    }
304
305    let state_snapshot = if state_changed {
306        Some(
307            run_ctx
308                .snapshot()
309                .map_err(|e| AgentLoopError::StateError(e.to_string()))?,
310        )
311    } else {
312        None
313    };
314
315    Ok(AppliedToolResults {
316        pending_interaction: None,
317        state_snapshot,
318    })
319}
320
321fn tool_result_metadata_from_run_ctx(run_ctx: &RunContext) -> Option<MessageMetadata> {
322    let run_id = run_ctx
323        .run_config
324        .value("run_id")
325        .and_then(|v| v.as_str().map(String::from))
326        .or_else(|| {
327            run_ctx.messages().iter().rev().find_map(|m| {
328                m.metadata
329                    .as_ref()
330                    .and_then(|meta| meta.run_id.as_ref().cloned())
331            })
332        });
333
334    let step_index = run_ctx
335        .messages()
336        .iter()
337        .rev()
338        .find_map(|m| m.metadata.as_ref().and_then(|meta| meta.step_index));
339
340    if run_id.is_none() && step_index.is_none() {
341        None
342    } else {
343        Some(MessageMetadata { run_id, step_index })
344    }
345}
346
347#[allow(dead_code)]
348pub(super) fn next_step_index(run_ctx: &RunContext) -> u32 {
349    run_ctx
350        .messages()
351        .iter()
352        .filter_map(|m| m.metadata.as_ref().and_then(|meta| meta.step_index))
353        .max()
354        .map(|v| v.saturating_add(1))
355        .unwrap_or(0)
356}
357
358pub(super) fn step_metadata(run_id: Option<String>, step_index: u32) -> MessageMetadata {
359    MessageMetadata {
360        run_id,
361        step_index: Some(step_index),
362    }
363}
364
365/// Execute tool calls (simplified version without plugins).
366///
367/// This is the simpler API for tests and cases where plugins aren't needed.
368pub async fn execute_tools(
369    thread: Thread,
370    result: &StreamResult,
371    tools: &HashMap<String, Arc<dyn Tool>>,
372    parallel: bool,
373) -> Result<Thread, AgentLoopError> {
374    execute_tools_with_plugins(thread, result, tools, parallel, &[]).await
375}
376
377/// Execute tool calls with phase-based plugin hooks.
378pub async fn execute_tools_with_config(
379    thread: Thread,
380    result: &StreamResult,
381    tools: &HashMap<String, Arc<dyn Tool>>,
382    config: &AgentConfig,
383) -> Result<Thread, AgentLoopError> {
384    execute_tools_with_plugins_and_executor(
385        thread,
386        result,
387        tools,
388        config.tool_executor.as_ref(),
389        &config.plugins,
390    )
391    .await
392}
393
394pub(super) fn scope_with_tool_caller_context(
395    run_ctx: &RunContext,
396    state: &Value,
397    _config: Option<&AgentConfig>,
398) -> Result<tirea_contract::RunConfig, AgentLoopError> {
399    let mut rt = run_ctx.run_config.clone();
400    if rt.value(TOOL_SCOPE_CALLER_THREAD_ID_KEY).is_none() {
401        rt.set(
402            TOOL_SCOPE_CALLER_THREAD_ID_KEY,
403            run_ctx.thread_id().to_string(),
404        )
405        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
406    }
407    if rt.value(TOOL_SCOPE_CALLER_STATE_KEY).is_none() {
408        rt.set(TOOL_SCOPE_CALLER_STATE_KEY, state.clone())
409            .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
410    }
411    if rt.value(TOOL_SCOPE_CALLER_MESSAGES_KEY).is_none() {
412        rt.set(TOOL_SCOPE_CALLER_MESSAGES_KEY, run_ctx.messages().to_vec())
413            .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
414    }
415    Ok(rt)
416}
417
418/// Execute tool calls with plugin hooks.
419pub async fn execute_tools_with_plugins(
420    thread: Thread,
421    result: &StreamResult,
422    tools: &HashMap<String, Arc<dyn Tool>>,
423    parallel: bool,
424    plugins: &[Arc<dyn AgentPlugin>],
425) -> Result<Thread, AgentLoopError> {
426    let parallel_executor = ParallelToolExecutor;
427    let sequential_executor = SequentialToolExecutor;
428    let executor: &dyn ToolExecutor = if parallel {
429        &parallel_executor
430    } else {
431        &sequential_executor
432    };
433    execute_tools_with_plugins_and_executor(thread, result, tools, executor, plugins).await
434}
435
436pub async fn execute_tools_with_plugins_and_executor(
437    thread: Thread,
438    result: &StreamResult,
439    tools: &HashMap<String, Arc<dyn Tool>>,
440    executor: &dyn ToolExecutor,
441    plugins: &[Arc<dyn AgentPlugin>],
442) -> Result<Thread, AgentLoopError> {
443    if result.tool_calls.is_empty() {
444        return Ok(thread);
445    }
446
447    // Build RunContext from thread for internal use
448    let rebuilt_state = thread
449        .rebuild_state()
450        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
451    let mut run_ctx = RunContext::new(
452        &thread.id,
453        rebuilt_state.clone(),
454        thread.messages.clone(),
455        tirea_contract::RunConfig::default(),
456    );
457
458    let tool_descriptors: Vec<ToolDescriptor> =
459        tools.values().map(|t| t.descriptor().clone()).collect();
460    let rt_for_tools = scope_with_tool_caller_context(&run_ctx, &rebuilt_state, None)?;
461    let results = executor
462        .execute(ToolExecutionRequest {
463            tools,
464            calls: &result.tool_calls,
465            state: &rebuilt_state,
466            tool_descriptors: &tool_descriptors,
467            plugins,
468            activity_manager: None,
469            run_config: &rt_for_tools,
470            thread_id: run_ctx.thread_id(),
471            thread_messages: run_ctx.messages(),
472            state_version: run_ctx.version(),
473            cancellation_token: None,
474        })
475        .await?;
476
477    let metadata = tool_result_metadata_from_run_ctx(&run_ctx);
478    let applied = apply_tool_results_to_session(
479        &mut run_ctx,
480        &results,
481        metadata,
482        executor.requires_parallel_patch_conflict_check(),
483    )?;
484    if let Some(interaction) = applied.pending_interaction {
485        return Err(AgentLoopError::PendingInteraction {
486            run_ctx: Box::new(run_ctx),
487            interaction: Box::new(interaction),
488        });
489    }
490
491    // Reconstruct thread from RunContext delta
492    let delta = run_ctx.take_delta();
493    let mut out_thread = thread;
494    for msg in delta.messages {
495        out_thread = out_thread.with_message((*msg).clone());
496    }
497    out_thread = out_thread.with_patches(delta.patches);
498    Ok(out_thread)
499}
500
501/// Execute tools in parallel with phase hooks.
502pub(super) async fn execute_tools_parallel_with_phases(
503    tools: &HashMap<String, Arc<dyn Tool>>,
504    calls: &[crate::contracts::thread::ToolCall],
505    state: &Value,
506    phase_ctx: ToolPhaseContext<'_>,
507) -> Result<Vec<ToolExecutionResult>, AgentLoopError> {
508    use futures::future::join_all;
509
510    if is_cancelled(phase_ctx.cancellation_token) {
511        return Err(cancelled_error(phase_ctx.thread_id));
512    }
513
514    // Clone run config for parallel tasks (RunConfig is Clone).
515    let run_config_owned = phase_ctx.run_config.clone();
516    let thread_id = phase_ctx.thread_id.to_string();
517    let thread_messages = Arc::new(phase_ctx.thread_messages.to_vec());
518    let tool_descriptors = phase_ctx.tool_descriptors.to_vec();
519    let plugins = phase_ctx.plugins.to_vec();
520
521    let futures = calls.iter().map(|call| {
522        let tool = tools.get(&call.name).cloned();
523        let state = state.clone();
524        let call = call.clone();
525        let plugins = plugins.clone();
526        let tool_descriptors = tool_descriptors.clone();
527        let activity_manager = phase_ctx.activity_manager.clone();
528        let rt = run_config_owned.clone();
529        let sid = thread_id.clone();
530        let thread_messages = thread_messages.clone();
531
532        async move {
533            execute_single_tool_with_phases(
534                tool.as_deref(),
535                &call,
536                &state,
537                &ToolPhaseContext {
538                    tool_descriptors: &tool_descriptors,
539                    plugins: &plugins,
540                    activity_manager,
541                    run_config: &rt,
542                    thread_id: &sid,
543                    thread_messages: thread_messages.as_slice(),
544                    cancellation_token: None,
545                },
546            )
547            .await
548        }
549    });
550
551    let join_future = join_all(futures);
552    let results = match await_or_cancel(phase_ctx.cancellation_token, join_future).await {
553        CancelAware::Cancelled => return Err(cancelled_error(&thread_id)),
554        CancelAware::Value(results) => results,
555    };
556    let mut results: Vec<ToolExecutionResult> = results.into_iter().collect::<Result<_, _>>()?;
557    coalesce_pending_interactions(&mut results);
558    Ok(results)
559}
560
561fn coalesce_pending_interactions(results: &mut [ToolExecutionResult]) {
562    let mut active_pending_id: Option<String> = None;
563    for result in results {
564        let Some(interaction_id) = result.pending_interaction.as_ref().map(|i| i.id.clone()) else {
565            continue;
566        };
567
568        if active_pending_id.is_none() {
569            active_pending_id = Some(interaction_id);
570            continue;
571        }
572
573        let active_id = active_pending_id.as_deref().unwrap_or("unknown");
574        result.pending_interaction = None;
575        result.pending_frontend_invocation = None;
576        result.pending_patches.clear();
577        result.execution.result = ToolResult::error(
578            &result.execution.call.name,
579            format!(
580                "Tool '{}' was deferred because interaction '{}' is already pending in this round.",
581                result.execution.call.name, active_id
582            ),
583        );
584    }
585}
586
587/// Execute tools sequentially with phase hooks.
588pub(super) async fn execute_tools_sequential_with_phases(
589    tools: &HashMap<String, Arc<dyn Tool>>,
590    calls: &[crate::contracts::thread::ToolCall],
591    initial_state: &Value,
592    phase_ctx: ToolPhaseContext<'_>,
593) -> Result<Vec<ToolExecutionResult>, AgentLoopError> {
594    use tirea_state::apply_patch;
595
596    if is_cancelled(phase_ctx.cancellation_token) {
597        return Err(cancelled_error(phase_ctx.thread_id));
598    }
599
600    let mut state = initial_state.clone();
601    let mut results = Vec::with_capacity(calls.len());
602
603    for call in calls {
604        let tool = tools.get(&call.name).cloned();
605        let call_phase_ctx = ToolPhaseContext {
606            tool_descriptors: phase_ctx.tool_descriptors,
607            plugins: phase_ctx.plugins,
608            activity_manager: phase_ctx.activity_manager.clone(),
609            run_config: phase_ctx.run_config,
610            thread_id: phase_ctx.thread_id,
611            thread_messages: phase_ctx.thread_messages,
612            cancellation_token: None,
613        };
614        let result = match await_or_cancel(
615            phase_ctx.cancellation_token,
616            execute_single_tool_with_phases(tool.as_deref(), call, &state, &call_phase_ctx),
617        )
618        .await
619        {
620            CancelAware::Cancelled => return Err(cancelled_error(phase_ctx.thread_id)),
621            CancelAware::Value(result) => result?,
622        };
623
624        // Apply patch to state for next tool
625        if let Some(ref patch) = result.execution.patch {
626            state = apply_patch(&state, patch.patch()).map_err(|e| {
627                AgentLoopError::StateError(format!(
628                    "failed to apply tool patch for call '{}': {}",
629                    result.execution.call.id, e
630                ))
631            })?;
632        }
633        // Apply pending patches from plugins to state for next tool
634        for pp in &result.pending_patches {
635            state = apply_patch(&state, pp.patch()).map_err(|e| {
636                AgentLoopError::StateError(format!(
637                    "failed to apply plugin patch for call '{}': {}",
638                    result.execution.call.id, e
639                ))
640            })?;
641        }
642
643        results.push(result);
644
645        if results
646            .last()
647            .and_then(|r| r.pending_interaction.as_ref())
648            .is_some()
649        {
650            break;
651        }
652    }
653
654    Ok(results)
655}
656
657/// Execute a single tool with phase hooks.
658pub(super) async fn execute_single_tool_with_phases(
659    tool: Option<&dyn Tool>,
660    call: &crate::contracts::thread::ToolCall,
661    state: &Value,
662    phase_ctx: &ToolPhaseContext<'_>,
663) -> Result<ToolExecutionResult, AgentLoopError> {
664    // Create ToolCallContext for plugin phases
665    let doc = tirea_state::DocCell::new(state.clone());
666    let ops = std::sync::Mutex::new(Vec::new());
667    let pending_messages = std::sync::Mutex::new(Vec::new());
668    let plugin_scope = phase_ctx.run_config;
669    let plugin_tool_call_ctx = crate::contracts::ToolCallContext::new_with_cancellation(
670        &doc,
671        &ops,
672        "plugin_phase",
673        "plugin:tool_phase",
674        plugin_scope,
675        &pending_messages,
676        None,
677        phase_ctx.cancellation_token,
678    );
679
680    // Create StepContext for this tool
681    let mut step = StepContext::new(
682        plugin_tool_call_ctx,
683        phase_ctx.thread_id,
684        phase_ctx.thread_messages,
685        phase_ctx.tool_descriptors.to_vec(),
686    );
687    step.tool = Some(ToolContext::new(call));
688
689    // Phase: BeforeToolExecute
690    emit_phase_checked(Phase::BeforeToolExecute, &mut step, phase_ctx.plugins).await?;
691
692    // Check if blocked or pending
693    let (execution, pending_interaction, pending_frontend_invocation) =
694        if !crate::engine::tool_filter::is_scope_allowed(
695            Some(phase_ctx.run_config),
696            &call.name,
697            SCOPE_ALLOWED_TOOLS_KEY,
698            SCOPE_EXCLUDED_TOOLS_KEY,
699        ) {
700            (
701                ToolExecution {
702                    call: call.clone(),
703                    result: ToolResult::error(
704                        &call.name,
705                        format!("Tool '{}' is not allowed by current policy", call.name),
706                    ),
707                    patch: None,
708                },
709                None,
710                None,
711            )
712        } else if step.tool_blocked() {
713            let reason = step
714                .tool
715                .as_ref()
716                .and_then(|t| t.block_reason.clone())
717                .unwrap_or_else(|| "Blocked by plugin".to_string());
718            (
719                ToolExecution {
720                    call: call.clone(),
721                    result: ToolResult::error(&call.name, reason),
722                    patch: None,
723                },
724                None,
725                None,
726            )
727        } else if tool.is_none() {
728            (
729                ToolExecution {
730                    call: call.clone(),
731                    result: ToolResult::error(
732                        &call.name,
733                        format!("Tool '{}' not found", call.name),
734                    ),
735                    patch: None,
736                },
737                None,
738                None,
739            )
740        } else if let Err(e) = tool.unwrap().validate_args(&call.arguments) {
741            // Argument validation failed — return error to the LLM
742            (
743                ToolExecution {
744                    call: call.clone(),
745                    result: ToolResult::error(&call.name, e.to_string()),
746                    patch: None,
747                },
748                None,
749                None,
750            )
751        } else if step.tool_pending() {
752            let interaction = step
753                .tool
754                .as_ref()
755                .and_then(|t| t.pending_interaction.clone());
756            let frontend_invocation = step
757                .tool
758                .as_ref()
759                .and_then(|t| t.pending_frontend_invocation.clone());
760            (
761                ToolExecution {
762                    call: call.clone(),
763                    result: ToolResult::pending(&call.name, "Waiting for user confirmation"),
764                    patch: None,
765                },
766                interaction,
767                frontend_invocation,
768            )
769        } else {
770            // Execute the tool with its own ToolCallContext
771            let tool_doc = tirea_state::DocCell::new(state.clone());
772            let tool_ops = std::sync::Mutex::new(Vec::new());
773            let tool_pending_msgs = std::sync::Mutex::new(Vec::new());
774            let tool_ctx = crate::contracts::ToolCallContext::new_with_cancellation(
775                &tool_doc,
776                &tool_ops,
777                &call.id,
778                format!("tool:{}", call.name),
779                plugin_scope,
780                &tool_pending_msgs,
781                phase_ctx.activity_manager.clone(),
782                phase_ctx.cancellation_token,
783            );
784            let result = match tool
785                .unwrap()
786                .execute(call.arguments.clone(), &tool_ctx)
787                .await
788            {
789                Ok(r) => r,
790                Err(e) => ToolResult::error(&call.name, e.to_string()),
791            };
792
793            let patch = tool_ctx.take_patch();
794            let patch = if patch.patch().is_empty() {
795                None
796            } else {
797                Some(patch)
798            };
799
800            (
801                ToolExecution {
802                    call: call.clone(),
803                    result,
804                    patch,
805                },
806                None,
807                None,
808            )
809        };
810
811    // Set tool result in context
812    step.set_tool_result(execution.result.clone());
813
814    // Phase: AfterToolExecute
815    emit_phase_checked(Phase::AfterToolExecute, &mut step, phase_ctx.plugins).await?;
816
817    // Flush plugin state ops into pending patches
818    let plugin_patch = step.ctx().take_patch();
819    if !plugin_patch.patch().is_empty() {
820        step.pending_patches.push(plugin_patch);
821    }
822
823    let pending_patches = std::mem::take(&mut step.pending_patches);
824
825    Ok(ToolExecutionResult {
826        execution,
827        reminders: step.system_reminders.clone(),
828        pending_interaction,
829        pending_frontend_invocation,
830        pending_patches,
831    })
832}
833
834fn cancelled_error(thread_id: &str) -> AgentLoopError {
835    AgentLoopError::Cancelled {
836        run_ctx: Box::new(RunContext::new(
837            thread_id,
838            serde_json::json!({}),
839            vec![],
840            tirea_contract::RunConfig::default(),
841        )),
842    }
843}