Skip to main content

tirea_agent_loop/runtime/loop_runner/
tool_exec.rs

1use super::core::{
2    pending_approval_placeholder_message, transition_tool_call_state, ToolCallStateSeed,
3    ToolCallStateTransition,
4};
5use super::parallel_state_merge::merge_parallel_state_patches;
6use super::plugin_runtime::emit_tool_phase;
7use super::{
8    Agent, AgentLoopError, BaseAgent, RunCancellationToken, TOOL_SCOPE_CALLER_MESSAGES_KEY,
9    TOOL_SCOPE_CALLER_STATE_KEY, TOOL_SCOPE_CALLER_THREAD_ID_KEY,
10};
11use crate::contracts::runtime::action::Action;
12use crate::contracts::runtime::behavior::AgentBehavior;
13use crate::contracts::runtime::phase::{Phase, StepContext};
14use crate::contracts::runtime::state::{reduce_state_actions, AnyStateAction, ScopeContext};
15use crate::contracts::runtime::tool_call::ToolGate;
16use crate::contracts::runtime::tool_call::{Tool, ToolDescriptor, ToolResult};
17use crate::contracts::runtime::{
18    ActivityManager, PendingToolCall, SuspendTicket, SuspendedCall, ToolCallResumeMode,
19};
20use crate::contracts::runtime::{
21    DecisionReplayPolicy, StreamResult, ToolCallOutcome, ToolCallStatus, ToolExecution,
22    ToolExecutionEffect, ToolExecutionRequest, ToolExecutionResult, ToolExecutor,
23    ToolExecutorError,
24};
25use crate::contracts::thread::Thread;
26use crate::contracts::thread::{Message, MessageMetadata, ToolCall};
27use crate::contracts::{RunContext, Suspension};
28use crate::engine::convert::tool_response;
29use crate::engine::tool_execution::merge_context_patch_into_effect;
30use crate::runtime::run_context::{await_or_cancel, is_cancelled, CancelAware};
31use async_trait::async_trait;
32use serde_json::Value;
33use std::collections::HashMap;
34use std::sync::Arc;
35use tirea_state::{Patch, TrackedPatch};
36
37/// Outcome of the public `execute_tools*` family of functions.
38///
39/// Tool execution can complete normally or suspend while waiting for
40/// external resolution (e.g. human-in-the-loop approval).
41#[derive(Debug)]
42pub enum ExecuteToolsOutcome {
43    /// All tool calls completed (successfully or with tool-level errors).
44    Completed(Thread),
45    /// Execution suspended on a tool call awaiting external decision.
46    Suspended {
47        thread: Thread,
48        suspended_call: Box<SuspendedCall>,
49    },
50}
51
52impl ExecuteToolsOutcome {
53    /// Extract the thread from either variant.
54    pub fn into_thread(self) -> Thread {
55        match self {
56            Self::Completed(t) | Self::Suspended { thread: t, .. } => t,
57        }
58    }
59
60    /// Returns `true` when execution suspended on a tool call.
61    pub fn is_suspended(&self) -> bool {
62        matches!(self, Self::Suspended { .. })
63    }
64}
65
66pub(super) struct AppliedToolResults {
67    pub(super) suspended_calls: Vec<SuspendedCall>,
68    pub(super) state_snapshot: Option<Value>,
69}
70
71#[derive(Clone)]
72pub(super) struct ToolPhaseContext<'a> {
73    pub(super) tool_descriptors: &'a [ToolDescriptor],
74    pub(super) agent_behavior: Option<&'a dyn AgentBehavior>,
75    pub(super) activity_manager: Arc<dyn ActivityManager>,
76    pub(super) run_config: &'a tirea_contract::RunConfig,
77    pub(super) thread_id: &'a str,
78    pub(super) thread_messages: &'a [Arc<Message>],
79    pub(super) cancellation_token: Option<&'a RunCancellationToken>,
80}
81
82impl<'a> ToolPhaseContext<'a> {
83    pub(super) fn from_request(request: &'a ToolExecutionRequest<'a>) -> Self {
84        Self {
85            tool_descriptors: request.tool_descriptors,
86            agent_behavior: request.agent_behavior,
87            activity_manager: request.activity_manager.clone(),
88            run_config: request.run_config,
89            thread_id: request.thread_id,
90            thread_messages: request.thread_messages,
91            cancellation_token: request.cancellation_token,
92        }
93    }
94}
95
96fn now_unix_millis() -> u64 {
97    std::time::SystemTime::now()
98        .duration_since(std::time::UNIX_EPOCH)
99        .map_or(0, |d| d.as_millis().min(u128::from(u64::MAX)) as u64)
100}
101
102fn suspended_call_from_tool_result(call: &ToolCall, result: &ToolResult) -> SuspendedCall {
103    if let Some(mut explicit) = result.suspension() {
104        if explicit.pending.id.trim().is_empty() || explicit.pending.name.trim().is_empty() {
105            explicit.pending =
106                PendingToolCall::new(call.id.clone(), call.name.clone(), call.arguments.clone());
107        }
108        return SuspendedCall::new(call, explicit);
109    }
110
111    let mut suspension = Suspension::new(&call.id, format!("tool:{}", call.name))
112        .with_parameters(call.arguments.clone());
113    if let Some(message) = result.message.as_ref() {
114        suspension = suspension.with_message(message.clone());
115    }
116
117    SuspendedCall::new(
118        call,
119        SuspendTicket::new(
120            suspension,
121            PendingToolCall::new(call.id.clone(), call.name.clone(), call.arguments.clone()),
122            ToolCallResumeMode::ReplayToolCall,
123        ),
124    )
125}
126
127fn persist_tool_call_status(
128    step: &StepContext<'_>,
129    call: &ToolCall,
130    status: ToolCallStatus,
131    suspended_call: Option<&SuspendedCall>,
132) -> Result<(), AgentLoopError> {
133    let current_state = step.ctx().tool_call_state_for(&call.id).map_err(|e| {
134        AgentLoopError::StateError(format!(
135            "failed to read tool call state for '{}' before setting {:?}: {e}",
136            call.id, status
137        ))
138    })?;
139    let previous_status = current_state
140        .as_ref()
141        .map(|state| state.status)
142        .unwrap_or(ToolCallStatus::New);
143    let current_resume_token = current_state
144        .as_ref()
145        .and_then(|state| state.resume_token.clone());
146    let current_resume = current_state
147        .as_ref()
148        .and_then(|state| state.resume.clone());
149
150    let (next_resume_token, next_resume) = match status {
151        ToolCallStatus::Running => {
152            if matches!(previous_status, ToolCallStatus::Resuming) {
153                (current_resume_token.clone(), current_resume.clone())
154            } else {
155                (None, None)
156            }
157        }
158        ToolCallStatus::Suspended => (
159            suspended_call
160                .map(|entry| entry.ticket.pending.id.clone())
161                .or(current_resume_token.clone()),
162            None,
163        ),
164        ToolCallStatus::Succeeded
165        | ToolCallStatus::Failed
166        | ToolCallStatus::Cancelled
167        | ToolCallStatus::New
168        | ToolCallStatus::Resuming => (current_resume_token, current_resume),
169    };
170
171    let Some(runtime_state) = transition_tool_call_state(
172        current_state,
173        ToolCallStateSeed {
174            call_id: &call.id,
175            tool_name: &call.name,
176            arguments: &call.arguments,
177            status: ToolCallStatus::New,
178            resume_token: None,
179        },
180        ToolCallStateTransition {
181            status,
182            resume_token: next_resume_token,
183            resume: next_resume,
184            updated_at: now_unix_millis(),
185        },
186    ) else {
187        return Err(AgentLoopError::StateError(format!(
188            "invalid tool call status transition for '{}': {:?} -> {:?}",
189            call.id, previous_status, status
190        )));
191    };
192
193    step.ctx()
194        .set_tool_call_state_for(&call.id, runtime_state)
195        .map_err(|e| {
196            AgentLoopError::StateError(format!(
197                "failed to persist tool call state for '{}' as {:?}: {e}",
198                call.id, status
199            ))
200        })
201}
202
203fn map_tool_executor_error(err: AgentLoopError, thread_id: &str) -> ToolExecutorError {
204    match err {
205        AgentLoopError::Cancelled => ToolExecutorError::Cancelled {
206            thread_id: thread_id.to_string(),
207        },
208        other => ToolExecutorError::Failed {
209            message: other.to_string(),
210        },
211    }
212}
213
214/// Executes all tool calls concurrently.
215#[derive(Debug, Clone, Copy, PartialEq, Eq)]
216pub enum ParallelToolExecutionMode {
217    BatchApproval,
218    Streaming,
219}
220
221/// Executes all tool calls concurrently.
222#[derive(Debug, Clone, Copy, PartialEq, Eq)]
223pub struct ParallelToolExecutor {
224    mode: ParallelToolExecutionMode,
225}
226
227impl ParallelToolExecutor {
228    pub const fn batch_approval() -> Self {
229        Self {
230            mode: ParallelToolExecutionMode::BatchApproval,
231        }
232    }
233
234    pub const fn streaming() -> Self {
235        Self {
236            mode: ParallelToolExecutionMode::Streaming,
237        }
238    }
239
240    fn mode_name(self) -> &'static str {
241        match self.mode {
242            ParallelToolExecutionMode::BatchApproval => "parallel_batch_approval",
243            ParallelToolExecutionMode::Streaming => "parallel_streaming",
244        }
245    }
246}
247
248impl Default for ParallelToolExecutor {
249    fn default() -> Self {
250        Self::streaming()
251    }
252}
253
254#[async_trait]
255impl ToolExecutor for ParallelToolExecutor {
256    async fn execute(
257        &self,
258        request: ToolExecutionRequest<'_>,
259    ) -> Result<Vec<ToolExecutionResult>, ToolExecutorError> {
260        let thread_id = request.thread_id;
261        let phase_ctx = ToolPhaseContext::from_request(&request);
262        execute_tools_parallel_with_phases(request.tools, request.calls, request.state, phase_ctx)
263            .await
264            .map_err(|e| map_tool_executor_error(e, thread_id))
265    }
266
267    fn name(&self) -> &'static str {
268        self.mode_name()
269    }
270
271    fn requires_parallel_patch_conflict_check(&self) -> bool {
272        true
273    }
274
275    fn decision_replay_policy(&self) -> DecisionReplayPolicy {
276        match self.mode {
277            ParallelToolExecutionMode::BatchApproval => DecisionReplayPolicy::BatchAllSuspended,
278            ParallelToolExecutionMode::Streaming => DecisionReplayPolicy::Immediate,
279        }
280    }
281}
282
283/// Executes tool calls one-by-one in call order.
284#[derive(Debug, Clone, Copy, Default)]
285pub struct SequentialToolExecutor;
286
287#[async_trait]
288impl ToolExecutor for SequentialToolExecutor {
289    async fn execute(
290        &self,
291        request: ToolExecutionRequest<'_>,
292    ) -> Result<Vec<ToolExecutionResult>, ToolExecutorError> {
293        let thread_id = request.thread_id;
294        let phase_ctx = ToolPhaseContext::from_request(&request);
295        execute_tools_sequential_with_phases(request.tools, request.calls, request.state, phase_ctx)
296            .await
297            .map_err(|e| map_tool_executor_error(e, thread_id))
298    }
299
300    fn name(&self) -> &'static str {
301        "sequential"
302    }
303}
304
305pub(super) fn apply_tool_results_to_session(
306    run_ctx: &mut RunContext,
307    results: &[ToolExecutionResult],
308    metadata: Option<MessageMetadata>,
309    check_parallel_patch_conflicts: bool,
310) -> Result<AppliedToolResults, AgentLoopError> {
311    apply_tool_results_impl(
312        run_ctx,
313        results,
314        metadata,
315        check_parallel_patch_conflicts,
316        None,
317    )
318}
319
320pub(super) fn apply_tool_results_impl(
321    run_ctx: &mut RunContext,
322    results: &[ToolExecutionResult],
323    metadata: Option<MessageMetadata>,
324    check_parallel_patch_conflicts: bool,
325    tool_msg_ids: Option<&HashMap<String, String>>,
326) -> Result<AppliedToolResults, AgentLoopError> {
327    // Collect all suspended calls from results.
328    let suspended: Vec<SuspendedCall> = results
329        .iter()
330        .filter_map(|r| {
331            if matches!(r.outcome, ToolCallOutcome::Suspended) {
332                r.suspended_call.clone()
333            } else {
334                None
335            }
336        })
337        .collect();
338
339    // Collect serialized actions from all tool execution results into RunContext.
340    let all_serialized_actions: Vec<tirea_contract::SerializedAction> = results
341        .iter()
342        .flat_map(|r| r.serialized_actions.iter().cloned())
343        .collect();
344    if !all_serialized_actions.is_empty() {
345        run_ctx.add_serialized_actions(all_serialized_actions);
346    }
347
348    let base_snapshot = run_ctx
349        .snapshot()
350        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
351    let patches = merge_parallel_state_patches(
352        &base_snapshot,
353        results,
354        check_parallel_patch_conflicts,
355        run_ctx.lattice_registry(),
356    )?;
357    let mut state_changed = !patches.is_empty();
358    run_ctx.add_thread_patches(patches);
359
360    // Add tool result messages for all executions.
361    let tool_messages: Vec<Arc<Message>> = results
362        .iter()
363        .flat_map(|r| {
364            let is_suspended = matches!(r.outcome, ToolCallOutcome::Suspended);
365            let mut msgs = if is_suspended {
366                vec![Message::tool(
367                    &r.execution.call.id,
368                    pending_approval_placeholder_message(&r.execution.call.name),
369                )]
370            } else {
371                let mut tool_msg = tool_response(&r.execution.call.id, &r.execution.result);
372                if let Some(id) = tool_msg_ids.and_then(|ids| ids.get(&r.execution.call.id)) {
373                    tool_msg = tool_msg.with_id(id.clone());
374                }
375                vec![tool_msg]
376            };
377            for reminder in &r.reminders {
378                msgs.push(Message::internal_system(format!(
379                    "<system-reminder>{}</system-reminder>",
380                    reminder
381                )));
382            }
383            if let Some(ref meta) = metadata {
384                for msg in &mut msgs {
385                    msg.metadata = Some(meta.clone());
386                }
387            }
388            msgs.into_iter().map(Arc::new).collect::<Vec<_>>()
389        })
390        .collect();
391
392    run_ctx.add_messages(tool_messages);
393
394    // Append user messages produced by tool effects and AfterToolExecute plugins.
395    let user_messages: Vec<Arc<Message>> = results
396        .iter()
397        .flat_map(|r| {
398            r.user_messages
399                .iter()
400                .map(|s| s.trim())
401                .filter(|s| !s.is_empty())
402                .map(|text| {
403                    let mut msg = Message::user(text.to_string());
404                    if let Some(ref meta) = metadata {
405                        msg.metadata = Some(meta.clone());
406                    }
407                    Arc::new(msg)
408                })
409                .collect::<Vec<_>>()
410        })
411        .collect();
412    if !user_messages.is_empty() {
413        run_ctx.add_messages(user_messages);
414    }
415    if !suspended.is_empty() {
416        let state = run_ctx
417            .snapshot()
418            .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
419        let actions: Vec<AnyStateAction> = suspended
420            .iter()
421            .map(|call| call.clone().into_state_action())
422            .collect();
423        let patches = reduce_state_actions(actions, &state, "agent_loop", &ScopeContext::run())
424            .map_err(|e| {
425                AgentLoopError::StateError(format!("failed to reduce suspended call actions: {e}"))
426            })?;
427        for patch in patches {
428            if !patch.patch().is_empty() {
429                state_changed = true;
430                run_ctx.add_thread_patch(patch);
431            }
432        }
433        let state_snapshot = if state_changed {
434            Some(
435                run_ctx
436                    .snapshot()
437                    .map_err(|e| AgentLoopError::StateError(e.to_string()))?,
438            )
439        } else {
440            None
441        };
442        return Ok(AppliedToolResults {
443            suspended_calls: suspended,
444            state_snapshot,
445        });
446    }
447
448    // Keep unresolved suspended calls until explicit resolution.
449    //
450    // Do not emit a synthetic "clear suspended calls" patch when there are
451    // no suspended calls in state. That no-op clear generated one redundant
452    // control-state patch per tool execution and inflated patch histories.
453
454    let state_snapshot = if state_changed {
455        Some(
456            run_ctx
457                .snapshot()
458                .map_err(|e| AgentLoopError::StateError(e.to_string()))?,
459        )
460    } else {
461        None
462    };
463
464    Ok(AppliedToolResults {
465        suspended_calls: Vec::new(),
466        state_snapshot,
467    })
468}
469
470fn tool_result_metadata_from_run_ctx(
471    run_ctx: &RunContext,
472    run_id: Option<&str>,
473) -> Option<MessageMetadata> {
474    let run_id = run_id.map(|id| id.to_string()).or_else(|| {
475        run_ctx.messages().iter().rev().find_map(|m| {
476            m.metadata
477                .as_ref()
478                .and_then(|meta| meta.run_id.as_ref().cloned())
479        })
480    });
481
482    let step_index = run_ctx
483        .messages()
484        .iter()
485        .rev()
486        .find_map(|m| m.metadata.as_ref().and_then(|meta| meta.step_index));
487
488    if run_id.is_none() && step_index.is_none() {
489        None
490    } else {
491        Some(MessageMetadata { run_id, step_index })
492    }
493}
494
495#[allow(dead_code)]
496pub(super) fn next_step_index(run_ctx: &RunContext) -> u32 {
497    run_ctx
498        .messages()
499        .iter()
500        .filter_map(|m| m.metadata.as_ref().and_then(|meta| meta.step_index))
501        .max()
502        .map(|v| v.saturating_add(1))
503        .unwrap_or(0)
504}
505
506pub(super) fn step_metadata(run_id: Option<String>, step_index: u32) -> MessageMetadata {
507    MessageMetadata {
508        run_id,
509        step_index: Some(step_index),
510    }
511}
512
513/// Execute tool calls (simplified version without plugins).
514///
515/// This is the simpler API for tests and cases where no behavior is needed.
516pub async fn execute_tools(
517    thread: Thread,
518    result: &StreamResult,
519    tools: &HashMap<String, Arc<dyn Tool>>,
520    parallel: bool,
521) -> Result<ExecuteToolsOutcome, AgentLoopError> {
522    let parallel_executor = ParallelToolExecutor::streaming();
523    let sequential_executor = SequentialToolExecutor;
524    let executor: &dyn ToolExecutor = if parallel {
525        &parallel_executor
526    } else {
527        &sequential_executor
528    };
529    execute_tools_with_agent_and_executor(thread, result, tools, executor, None).await
530}
531
532/// Execute tool calls with phase-based plugin hooks.
533pub async fn execute_tools_with_config(
534    thread: Thread,
535    result: &StreamResult,
536    tools: &HashMap<String, Arc<dyn Tool>>,
537    agent: &dyn Agent,
538) -> Result<ExecuteToolsOutcome, AgentLoopError> {
539    execute_tools_with_agent_and_executor(
540        thread,
541        result,
542        tools,
543        agent.tool_executor().as_ref(),
544        Some(agent.behavior()),
545    )
546    .await
547}
548
549pub(super) fn scope_with_tool_caller_context(
550    run_ctx: &RunContext,
551    state: &Value,
552) -> Result<tirea_contract::RunConfig, AgentLoopError> {
553    let mut rt = run_ctx.run_config.clone();
554    if rt.value(TOOL_SCOPE_CALLER_THREAD_ID_KEY).is_none() {
555        rt.set(
556            TOOL_SCOPE_CALLER_THREAD_ID_KEY,
557            run_ctx.thread_id().to_string(),
558        )
559        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
560    }
561    if rt.value(TOOL_SCOPE_CALLER_STATE_KEY).is_none() {
562        rt.set(TOOL_SCOPE_CALLER_STATE_KEY, state.clone())
563            .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
564    }
565    if rt.value(TOOL_SCOPE_CALLER_MESSAGES_KEY).is_none() {
566        rt.set(TOOL_SCOPE_CALLER_MESSAGES_KEY, run_ctx.messages().to_vec())
567            .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
568    }
569    Ok(rt)
570}
571
572/// Execute tool calls with behavior hooks.
573pub async fn execute_tools_with_behaviors(
574    thread: Thread,
575    result: &StreamResult,
576    tools: &HashMap<String, Arc<dyn Tool>>,
577    parallel: bool,
578    behavior: Arc<dyn AgentBehavior>,
579) -> Result<ExecuteToolsOutcome, AgentLoopError> {
580    let executor: Arc<dyn ToolExecutor> = if parallel {
581        Arc::new(ParallelToolExecutor::streaming())
582    } else {
583        Arc::new(SequentialToolExecutor)
584    };
585    let agent = BaseAgent::default()
586        .with_behavior(behavior)
587        .with_tool_executor(executor);
588    execute_tools_with_config(thread, result, tools, &agent).await
589}
590
591async fn execute_tools_with_agent_and_executor(
592    thread: Thread,
593    result: &StreamResult,
594    tools: &HashMap<String, Arc<dyn Tool>>,
595    executor: &dyn ToolExecutor,
596    behavior: Option<&dyn AgentBehavior>,
597) -> Result<ExecuteToolsOutcome, AgentLoopError> {
598    // Build RunContext from thread for internal use
599    let rebuilt_state = thread
600        .rebuild_state()
601        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
602    let mut run_ctx = RunContext::new(
603        &thread.id,
604        rebuilt_state.clone(),
605        thread.messages.clone(),
606        tirea_contract::RunConfig::default(),
607    );
608
609    let tool_descriptors: Vec<ToolDescriptor> =
610        tools.values().map(|t| t.descriptor().clone()).collect();
611    // Run the RunStart phase via behavior dispatch
612    if let Some(behavior) = behavior {
613        let run_start_patches = super::plugin_runtime::behavior_run_phase_block(
614            &run_ctx,
615            &tool_descriptors,
616            behavior,
617            &[Phase::RunStart],
618            |_| {},
619            |_| (),
620        )
621        .await?
622        .1;
623        if !run_start_patches.is_empty() {
624            run_ctx.add_thread_patches(run_start_patches);
625        }
626    }
627
628    let replay_executor: Arc<dyn ToolExecutor> = match executor.decision_replay_policy() {
629        DecisionReplayPolicy::BatchAllSuspended => Arc::new(ParallelToolExecutor::batch_approval()),
630        DecisionReplayPolicy::Immediate => Arc::new(ParallelToolExecutor::streaming()),
631    };
632    let replay_config = BaseAgent::default().with_tool_executor(replay_executor);
633    let replay = super::drain_resuming_tool_calls_and_replay(
634        &mut run_ctx,
635        tools,
636        &replay_config,
637        &tool_descriptors,
638    )
639    .await?;
640
641    if replay.replayed {
642        let suspended = run_ctx.suspended_calls().values().next().cloned();
643        let delta = run_ctx.take_delta();
644        let mut out_thread = thread;
645        for msg in delta.messages {
646            out_thread = out_thread.with_message((*msg).clone());
647        }
648        out_thread = out_thread.with_patches(delta.patches);
649        return if let Some(first) = suspended {
650            Ok(ExecuteToolsOutcome::Suspended {
651                thread: out_thread,
652                suspended_call: Box::new(first),
653            })
654        } else {
655            Ok(ExecuteToolsOutcome::Completed(out_thread))
656        };
657    }
658
659    if result.tool_calls.is_empty() {
660        let delta = run_ctx.take_delta();
661        let mut out_thread = thread;
662        for msg in delta.messages {
663            out_thread = out_thread.with_message((*msg).clone());
664        }
665        out_thread = out_thread.with_patches(delta.patches);
666        return Ok(ExecuteToolsOutcome::Completed(out_thread));
667    }
668
669    let current_state = run_ctx
670        .snapshot()
671        .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
672    let rt_for_tools = scope_with_tool_caller_context(&run_ctx, &current_state)?;
673    let results = executor
674        .execute(ToolExecutionRequest {
675            tools,
676            calls: &result.tool_calls,
677            state: &current_state,
678            tool_descriptors: &tool_descriptors,
679            agent_behavior: behavior,
680            activity_manager: tirea_contract::runtime::activity::NoOpActivityManager::arc(),
681            run_config: &rt_for_tools,
682            thread_id: run_ctx.thread_id(),
683            thread_messages: run_ctx.messages(),
684            state_version: run_ctx.version(),
685            cancellation_token: None,
686        })
687        .await?;
688
689    let metadata = tool_result_metadata_from_run_ctx(&run_ctx, None);
690    let applied = apply_tool_results_to_session(
691        &mut run_ctx,
692        &results,
693        metadata,
694        executor.requires_parallel_patch_conflict_check(),
695    )?;
696    let suspended = applied.suspended_calls.into_iter().next();
697
698    // Reconstruct thread from RunContext delta
699    let delta = run_ctx.take_delta();
700    let mut out_thread = thread;
701    for msg in delta.messages {
702        out_thread = out_thread.with_message((*msg).clone());
703    }
704    out_thread = out_thread.with_patches(delta.patches);
705
706    if let Some(first) = suspended {
707        Ok(ExecuteToolsOutcome::Suspended {
708            thread: out_thread,
709            suspended_call: Box::new(first),
710        })
711    } else {
712        Ok(ExecuteToolsOutcome::Completed(out_thread))
713    }
714}
715
716/// Execute tools in parallel with phase hooks.
717pub(super) async fn execute_tools_parallel_with_phases(
718    tools: &HashMap<String, Arc<dyn Tool>>,
719    calls: &[crate::contracts::thread::ToolCall],
720    state: &Value,
721    phase_ctx: ToolPhaseContext<'_>,
722) -> Result<Vec<ToolExecutionResult>, AgentLoopError> {
723    use futures::future::join_all;
724
725    if is_cancelled(phase_ctx.cancellation_token) {
726        return Err(cancelled_error(phase_ctx.thread_id));
727    }
728
729    // Clone run config for parallel tasks (RunConfig is Clone).
730    let run_config_owned = phase_ctx.run_config.clone();
731    let thread_id = phase_ctx.thread_id.to_string();
732    let thread_messages = Arc::new(phase_ctx.thread_messages.to_vec());
733    let tool_descriptors = phase_ctx.tool_descriptors.to_vec();
734    let agent = phase_ctx.agent_behavior;
735
736    let futures = calls.iter().map(|call| {
737        let tool = tools.get(&call.name).cloned();
738        let state = state.clone();
739        let call = call.clone();
740        let tool_descriptors = tool_descriptors.clone();
741        let activity_manager = phase_ctx.activity_manager.clone();
742        let rt = run_config_owned.clone();
743        let sid = thread_id.clone();
744        let thread_messages = thread_messages.clone();
745
746        async move {
747            execute_single_tool_with_phases_impl(
748                tool.as_deref(),
749                &call,
750                &state,
751                &ToolPhaseContext {
752                    tool_descriptors: &tool_descriptors,
753                    agent_behavior: agent,
754                    activity_manager,
755                    run_config: &rt,
756                    thread_id: &sid,
757                    thread_messages: thread_messages.as_slice(),
758                    cancellation_token: None,
759                },
760            )
761            .await
762        }
763    });
764
765    let join_future = join_all(futures);
766    let results = match await_or_cancel(phase_ctx.cancellation_token, join_future).await {
767        CancelAware::Cancelled => return Err(cancelled_error(&thread_id)),
768        CancelAware::Value(results) => results,
769    };
770    let results: Vec<ToolExecutionResult> = results.into_iter().collect::<Result<_, _>>()?;
771    Ok(results)
772}
773
774/// Execute tools sequentially with phase hooks.
775pub(super) async fn execute_tools_sequential_with_phases(
776    tools: &HashMap<String, Arc<dyn Tool>>,
777    calls: &[crate::contracts::thread::ToolCall],
778    initial_state: &Value,
779    phase_ctx: ToolPhaseContext<'_>,
780) -> Result<Vec<ToolExecutionResult>, AgentLoopError> {
781    use tirea_state::apply_patch;
782
783    if is_cancelled(phase_ctx.cancellation_token) {
784        return Err(cancelled_error(phase_ctx.thread_id));
785    }
786
787    let mut state = initial_state.clone();
788    let mut results = Vec::with_capacity(calls.len());
789
790    for call in calls {
791        let tool = tools.get(&call.name).cloned();
792        let call_phase_ctx = ToolPhaseContext {
793            tool_descriptors: phase_ctx.tool_descriptors,
794            agent_behavior: phase_ctx.agent_behavior,
795            activity_manager: phase_ctx.activity_manager.clone(),
796            run_config: phase_ctx.run_config,
797            thread_id: phase_ctx.thread_id,
798            thread_messages: phase_ctx.thread_messages,
799            cancellation_token: None,
800        };
801        let result = match await_or_cancel(
802            phase_ctx.cancellation_token,
803            execute_single_tool_with_phases_impl(tool.as_deref(), call, &state, &call_phase_ctx),
804        )
805        .await
806        {
807            CancelAware::Cancelled => return Err(cancelled_error(phase_ctx.thread_id)),
808            CancelAware::Value(result) => result?,
809        };
810
811        // Apply patch to state for next tool
812        if let Some(ref patch) = result.execution.patch {
813            state = apply_patch(&state, patch.patch()).map_err(|e| {
814                AgentLoopError::StateError(format!(
815                    "failed to apply tool patch for call '{}': {}",
816                    result.execution.call.id, e
817                ))
818            })?;
819        }
820        // Apply pending patches from plugins to state for next tool
821        for pp in &result.pending_patches {
822            state = apply_patch(&state, pp.patch()).map_err(|e| {
823                AgentLoopError::StateError(format!(
824                    "failed to apply plugin patch for call '{}': {}",
825                    result.execution.call.id, e
826                ))
827            })?;
828        }
829
830        results.push(result);
831
832        if results
833            .last()
834            .is_some_and(|r| matches!(r.outcome, ToolCallOutcome::Suspended))
835        {
836            break;
837        }
838    }
839
840    Ok(results)
841}
842
843/// Execute a single tool with phase hooks.
844#[cfg(test)]
845pub(super) async fn execute_single_tool_with_phases(
846    tool: Option<&dyn Tool>,
847    call: &crate::contracts::thread::ToolCall,
848    state: &Value,
849    phase_ctx: &ToolPhaseContext<'_>,
850) -> Result<ToolExecutionResult, AgentLoopError> {
851    execute_single_tool_with_phases_impl(tool, call, state, phase_ctx).await
852}
853
854pub(super) async fn execute_single_tool_with_phases_deferred(
855    tool: Option<&dyn Tool>,
856    call: &crate::contracts::thread::ToolCall,
857    state: &Value,
858    phase_ctx: &ToolPhaseContext<'_>,
859) -> Result<ToolExecutionResult, AgentLoopError> {
860    execute_single_tool_with_phases_impl(tool, call, state, phase_ctx).await
861}
862
863async fn execute_single_tool_with_phases_impl(
864    tool: Option<&dyn Tool>,
865    call: &crate::contracts::thread::ToolCall,
866    state: &Value,
867    phase_ctx: &ToolPhaseContext<'_>,
868) -> Result<ToolExecutionResult, AgentLoopError> {
869    // Create ToolCallContext for plugin phases
870    let doc = tirea_state::DocCell::new(state.clone());
871    let ops = std::sync::Mutex::new(Vec::new());
872    let pending_messages = std::sync::Mutex::new(Vec::new());
873    let plugin_scope = phase_ctx.run_config;
874    let mut plugin_tool_call_ctx = crate::contracts::ToolCallContext::new(
875        &doc,
876        &ops,
877        "plugin_phase",
878        "plugin:tool_phase",
879        plugin_scope,
880        &pending_messages,
881        tirea_contract::runtime::activity::NoOpActivityManager::arc(),
882    );
883    if let Some(token) = phase_ctx.cancellation_token {
884        plugin_tool_call_ctx = plugin_tool_call_ctx.with_cancellation_token(token);
885    }
886
887    // Create StepContext for this tool
888    let mut step = StepContext::new(
889        plugin_tool_call_ctx,
890        phase_ctx.thread_id,
891        phase_ctx.thread_messages,
892        phase_ctx.tool_descriptors.to_vec(),
893    );
894    step.gate = Some(ToolGate::from_tool_call(call));
895    // Phase: BeforeToolExecute
896    emit_tool_phase(
897        Phase::BeforeToolExecute,
898        &mut step,
899        phase_ctx.agent_behavior,
900        &doc,
901    )
902    .await?;
903
904    // Check if blocked or pending
905    let (mut execution, outcome, suspended_call, tool_actions) = if step.tool_blocked() {
906        let reason = step
907            .gate
908            .as_ref()
909            .and_then(|g| g.block_reason.clone())
910            .unwrap_or_else(|| "Blocked by plugin".to_string());
911        (
912            ToolExecution {
913                call: call.clone(),
914                result: ToolResult::error(&call.name, reason),
915                patch: None,
916            },
917            ToolCallOutcome::Failed,
918            None,
919            Vec::<Box<dyn Action>>::new(),
920        )
921    } else if let Some(plugin_result) = step.tool_result().cloned() {
922        let outcome = ToolCallOutcome::from_tool_result(&plugin_result);
923        (
924            ToolExecution {
925                call: call.clone(),
926                result: plugin_result,
927                patch: None,
928            },
929            outcome,
930            None,
931            Vec::<Box<dyn Action>>::new(),
932        )
933    } else {
934        match tool {
935            None => (
936                ToolExecution {
937                    call: call.clone(),
938                    result: ToolResult::error(
939                        &call.name,
940                        format!("Tool '{}' not found", call.name),
941                    ),
942                    patch: None,
943                },
944                ToolCallOutcome::Failed,
945                None,
946                Vec::<Box<dyn Action>>::new(),
947            ),
948            Some(tool) => {
949                if let Err(e) = tool.validate_args(&call.arguments) {
950                    (
951                        ToolExecution {
952                            call: call.clone(),
953                            result: ToolResult::error(&call.name, e.to_string()),
954                            patch: None,
955                        },
956                        ToolCallOutcome::Failed,
957                        None,
958                        Vec::<Box<dyn Action>>::new(),
959                    )
960                } else if step.tool_pending() {
961                    let Some(suspend_ticket) =
962                        step.gate.as_ref().and_then(|g| g.suspend_ticket.clone())
963                    else {
964                        return Err(AgentLoopError::StateError(
965                            "tool is pending but suspend ticket is missing".to_string(),
966                        ));
967                    };
968                    (
969                        ToolExecution {
970                            call: call.clone(),
971                            result: ToolResult::suspended(
972                                &call.name,
973                                "Execution suspended; awaiting external decision",
974                            ),
975                            patch: None,
976                        },
977                        ToolCallOutcome::Suspended,
978                        Some(SuspendedCall::new(call, suspend_ticket)),
979                        Vec::<Box<dyn Action>>::new(),
980                    )
981                } else {
982                    persist_tool_call_status(&step, call, ToolCallStatus::Running, None)?;
983                    // Execute the tool with its own ToolCallContext.
984                    let tool_doc = tirea_state::DocCell::new(state.clone());
985                    let tool_ops = std::sync::Mutex::new(Vec::new());
986                    let tool_pending_msgs = std::sync::Mutex::new(Vec::new());
987                    let mut tool_ctx = crate::contracts::ToolCallContext::new(
988                        &tool_doc,
989                        &tool_ops,
990                        &call.id,
991                        format!("tool:{}", call.name),
992                        plugin_scope,
993                        &tool_pending_msgs,
994                        phase_ctx.activity_manager.clone(),
995                    );
996                    if let Some(token) = phase_ctx.cancellation_token {
997                        tool_ctx = tool_ctx.with_cancellation_token(token);
998                    }
999                    let mut effect =
1000                        match tool.execute_effect(call.arguments.clone(), &tool_ctx).await {
1001                            Ok(effect) => effect,
1002                            Err(e) => ToolExecutionEffect::from(ToolResult::error(
1003                                &call.name,
1004                                e.to_string(),
1005                            )),
1006                        };
1007
1008                    let context_patch = tool_ctx.take_patch();
1009                    if let Err(result) =
1010                        merge_context_patch_into_effect(call, &mut effect, context_patch)
1011                    {
1012                        effect = ToolExecutionEffect::from(*result);
1013                    }
1014                    let (result, actions) = effect.into_parts();
1015                    let outcome = ToolCallOutcome::from_tool_result(&result);
1016
1017                    let suspended_call = if matches!(outcome, ToolCallOutcome::Suspended) {
1018                        Some(suspended_call_from_tool_result(call, &result))
1019                    } else {
1020                        None
1021                    };
1022
1023                    (
1024                        ToolExecution {
1025                            call: call.clone(),
1026                            result,
1027                            patch: None,
1028                        },
1029                        outcome,
1030                        suspended_call,
1031                        actions,
1032                    )
1033                }
1034            }
1035        }
1036    };
1037
1038    // Set tool result in context
1039    if let Some(gate) = step.gate.as_mut() {
1040        gate.result = Some(execution.result.clone());
1041    }
1042
1043    // Partition tool actions: state actions go to execution.patch reduction;
1044    // non-state actions are validated and applied before plugin hooks run.
1045    let mut tool_state_actions = Vec::<AnyStateAction>::new();
1046    let mut other_actions = Vec::<Box<dyn Action>>::new();
1047    for action in tool_actions {
1048        if action.is_state_action() {
1049            if let Some(sa) = action.into_state_action() {
1050                tool_state_actions.push(sa);
1051            }
1052        } else {
1053            other_actions.push(action);
1054        }
1055    }
1056
1057    // Apply non-state tool-emitted actions (validated against AfterToolExecute) before plugin hooks.
1058    for action in &other_actions {
1059        action
1060            .validate(Phase::AfterToolExecute)
1061            .map_err(AgentLoopError::StateError)?;
1062    }
1063    for action in other_actions {
1064        action.apply(&mut step);
1065    }
1066
1067    // Phase: AfterToolExecute
1068    emit_tool_phase(
1069        Phase::AfterToolExecute,
1070        &mut step,
1071        phase_ctx.agent_behavior,
1072        &doc,
1073    )
1074    .await?;
1075
1076    match outcome {
1077        ToolCallOutcome::Suspended => {
1078            persist_tool_call_status(
1079                &step,
1080                call,
1081                ToolCallStatus::Suspended,
1082                suspended_call.as_ref(),
1083            )?;
1084        }
1085        ToolCallOutcome::Succeeded => {
1086            persist_tool_call_status(&step, call, ToolCallStatus::Succeeded, None)?;
1087        }
1088        ToolCallOutcome::Failed => {
1089            persist_tool_call_status(&step, call, ToolCallStatus::Failed, None)?;
1090        }
1091    }
1092
1093    // Conditional cleanup: terminal outcomes delete the entire scoped subtree.
1094    // Suspended outcomes preserve it so tool_call_state survives for resume.
1095    if !matches!(outcome, ToolCallOutcome::Suspended) {
1096        let cleanup_path = format!("__tool_call_scope.{}", call.id);
1097        let cleanup_patch = Patch::with_ops(vec![tirea_state::Op::delete(
1098            tirea_state::parse_path(&cleanup_path),
1099        )]);
1100        let tracked = TrackedPatch::new(cleanup_patch).with_source("framework:scope_cleanup");
1101        step.emit_patch(tracked);
1102    }
1103
1104    // Flush plugin state ops into pending patches
1105    let plugin_patch = step.ctx().take_patch();
1106    if !plugin_patch.patch().is_empty() {
1107        step.emit_patch(plugin_patch);
1108    }
1109
1110    let phase_patch_actions = std::mem::take(&mut step.pending_patches)
1111        .into_iter()
1112        .map(AnyStateAction::Patch);
1113
1114    // Capture serialized actions before reduce consumes them.
1115    let mut serialized_actions: Vec<tirea_contract::SerializedAction> = tool_state_actions
1116        .iter()
1117        .filter_map(|a| a.to_serialized_action())
1118        .collect();
1119
1120    let tool_scope_ctx = ScopeContext::for_call(&call.id);
1121    let execution_patch_parts = reduce_tool_state_actions(
1122        state,
1123        tool_state_actions,
1124        &format!("tool:{}", call.name),
1125        &tool_scope_ctx,
1126    )?;
1127    execution.patch = merge_tracked_patches(&execution_patch_parts, &format!("tool:{}", call.name));
1128
1129    let phase_base_state = if let Some(tool_patch) = execution.patch.as_ref() {
1130        tirea_state::apply_patch(state, tool_patch.patch()).map_err(|e| {
1131            AgentLoopError::StateError(format!(
1132                "failed to apply tool patch for call '{}': {}",
1133                call.id, e
1134            ))
1135        })?
1136    } else {
1137        state.clone()
1138    };
1139    let pending_patches = reduce_tool_state_actions(
1140        &phase_base_state,
1141        phase_patch_actions.collect(),
1142        "agent",
1143        &tool_scope_ctx,
1144    )?;
1145
1146    let reminders = step.messaging.reminders.clone();
1147    let user_messages = std::mem::take(&mut step.messaging.user_messages);
1148
1149    // Merge plugin-phase serialized actions with tool-level ones.
1150    serialized_actions.extend(step.take_pending_serialized_actions());
1151
1152    Ok(ToolExecutionResult {
1153        execution,
1154        outcome,
1155        suspended_call,
1156        reminders,
1157        user_messages,
1158        pending_patches,
1159        serialized_actions,
1160    })
1161}
1162
1163fn reduce_tool_state_actions(
1164    base_state: &Value,
1165    actions: Vec<AnyStateAction>,
1166    source: &str,
1167    scope_ctx: &ScopeContext,
1168) -> Result<Vec<TrackedPatch>, AgentLoopError> {
1169    reduce_state_actions(actions, base_state, source, scope_ctx).map_err(|e| {
1170        AgentLoopError::StateError(format!("failed to reduce tool state actions: {e}"))
1171    })
1172}
1173
1174fn merge_tracked_patches(patches: &[TrackedPatch], source: &str) -> Option<TrackedPatch> {
1175    let mut merged = Patch::new();
1176    for tracked in patches {
1177        merged.extend(tracked.patch().clone());
1178    }
1179    if merged.is_empty() {
1180        None
1181    } else {
1182        Some(TrackedPatch::new(merged).with_source(source.to_string()))
1183    }
1184}
1185
1186fn cancelled_error(_thread_id: &str) -> AgentLoopError {
1187    AgentLoopError::Cancelled
1188}