1use super::core::{
2 clear_agent_pending_interaction, drain_agent_append_user_messages,
3 set_agent_pending_interaction,
4};
5use super::plugin_runtime::emit_phase_checked;
6use super::{
7 AgentConfig, AgentLoopError, RunCancellationToken, TOOL_SCOPE_CALLER_MESSAGES_KEY,
8 TOOL_SCOPE_CALLER_STATE_KEY, TOOL_SCOPE_CALLER_THREAD_ID_KEY,
9};
10use crate::contracts::plugin::phase::{Phase, StepContext, ToolContext};
11use crate::contracts::plugin::AgentPlugin;
12use crate::contracts::runtime::ActivityManager;
13use crate::contracts::runtime::{
14 StreamResult, ToolExecution, ToolExecutionRequest, ToolExecutionResult, ToolExecutor,
15 ToolExecutorError,
16};
17use crate::contracts::thread::Thread;
18use crate::contracts::thread::{Message, MessageMetadata};
19use crate::contracts::tool::{Tool, ToolDescriptor, ToolResult};
20use crate::contracts::Interaction;
21use crate::contracts::RunContext;
22use crate::engine::convert::tool_response;
23use crate::engine::tool_execution::collect_patches;
24use crate::engine::tool_filter::{SCOPE_ALLOWED_TOOLS_KEY, SCOPE_EXCLUDED_TOOLS_KEY};
25use crate::runtime::control::LoopControlState;
26use crate::runtime::run_context::{await_or_cancel, is_cancelled, CancelAware};
27use async_trait::async_trait;
28use serde_json::Value;
29use std::collections::HashMap;
30use std::sync::Arc;
31use tirea_state::State;
32use tirea_state::{PatchExt, TrackedPatch};
33
34pub(super) struct AppliedToolResults {
35 pub(super) pending_interaction: Option<Interaction>,
36 pub(super) state_snapshot: Option<Value>,
37}
38
39#[derive(Clone)]
40pub(super) struct ToolPhaseContext<'a> {
41 pub(super) tool_descriptors: &'a [ToolDescriptor],
42 pub(super) plugins: &'a [Arc<dyn AgentPlugin>],
43 pub(super) activity_manager: Option<Arc<dyn ActivityManager>>,
44 pub(super) run_config: &'a tirea_contract::RunConfig,
45 pub(super) thread_id: &'a str,
46 pub(super) thread_messages: &'a [Arc<Message>],
47 pub(super) cancellation_token: Option<&'a RunCancellationToken>,
48}
49
50impl<'a> ToolPhaseContext<'a> {
51 pub(super) fn from_request(request: &'a ToolExecutionRequest<'a>) -> Self {
52 Self {
53 tool_descriptors: request.tool_descriptors,
54 plugins: request.plugins,
55 activity_manager: request.activity_manager.clone(),
56 run_config: request.run_config,
57 thread_id: request.thread_id,
58 thread_messages: request.thread_messages,
59 cancellation_token: request.cancellation_token,
60 }
61 }
62}
63
64fn map_tool_executor_error(err: AgentLoopError) -> ToolExecutorError {
65 match err {
66 AgentLoopError::Cancelled { run_ctx } => ToolExecutorError::Cancelled {
67 thread_id: run_ctx.thread_id().to_string(),
68 },
69 other => ToolExecutorError::Failed {
70 message: other.to_string(),
71 },
72 }
73}
74
75#[derive(Debug, Clone, Copy, Default)]
77pub struct ParallelToolExecutor;
78
79#[async_trait]
80impl ToolExecutor for ParallelToolExecutor {
81 async fn execute(
82 &self,
83 request: ToolExecutionRequest<'_>,
84 ) -> Result<Vec<ToolExecutionResult>, ToolExecutorError> {
85 let phase_ctx = ToolPhaseContext::from_request(&request);
86 execute_tools_parallel_with_phases(request.tools, request.calls, request.state, phase_ctx)
87 .await
88 .map_err(map_tool_executor_error)
89 }
90
91 fn name(&self) -> &'static str {
92 "parallel"
93 }
94
95 fn requires_parallel_patch_conflict_check(&self) -> bool {
96 true
97 }
98}
99
100#[derive(Debug, Clone, Copy, Default)]
102pub struct SequentialToolExecutor;
103
104#[async_trait]
105impl ToolExecutor for SequentialToolExecutor {
106 async fn execute(
107 &self,
108 request: ToolExecutionRequest<'_>,
109 ) -> Result<Vec<ToolExecutionResult>, ToolExecutorError> {
110 let phase_ctx = ToolPhaseContext::from_request(&request);
111 execute_tools_sequential_with_phases(request.tools, request.calls, request.state, phase_ctx)
112 .await
113 .map_err(map_tool_executor_error)
114 }
115
116 fn name(&self) -> &'static str {
117 "sequential"
118 }
119}
120
121fn validate_parallel_state_patch_conflicts(
122 results: &[ToolExecutionResult],
123) -> Result<(), AgentLoopError> {
124 for (left_idx, left) in results.iter().enumerate() {
125 let mut left_patches: Vec<&TrackedPatch> = Vec::new();
126 if let Some(ref patch) = left.execution.patch {
127 left_patches.push(patch);
128 }
129 left_patches.extend(left.pending_patches.iter());
130
131 if left_patches.is_empty() {
132 continue;
133 }
134
135 for right in results.iter().skip(left_idx + 1) {
136 let mut right_patches: Vec<&TrackedPatch> = Vec::new();
137 if let Some(ref patch) = right.execution.patch {
138 right_patches.push(patch);
139 }
140 right_patches.extend(right.pending_patches.iter());
141
142 if right_patches.is_empty() {
143 continue;
144 }
145
146 for left_patch in &left_patches {
147 for right_patch in &right_patches {
148 let conflicts = left_patch.patch().conflicts_with(right_patch.patch());
149 if let Some(conflict) = conflicts.first() {
150 return Err(AgentLoopError::StateError(format!(
151 "conflicting parallel state patches between '{}' and '{}' at {}",
152 left.execution.call.id, right.execution.call.id, conflict.path
153 )));
154 }
155 }
156 }
157 }
158 }
159
160 Ok(())
161}
162
163pub(super) fn apply_tool_results_to_session(
164 run_ctx: &mut RunContext,
165 results: &[ToolExecutionResult],
166 metadata: Option<MessageMetadata>,
167 check_parallel_patch_conflicts: bool,
168) -> Result<AppliedToolResults, AgentLoopError> {
169 apply_tool_results_impl(
170 run_ctx,
171 results,
172 metadata,
173 check_parallel_patch_conflicts,
174 None,
175 )
176}
177
178pub(super) fn apply_tool_results_impl(
179 run_ctx: &mut RunContext,
180 results: &[ToolExecutionResult],
181 metadata: Option<MessageMetadata>,
182 check_parallel_patch_conflicts: bool,
183 tool_msg_ids: Option<&HashMap<String, String>>,
184) -> Result<AppliedToolResults, AgentLoopError> {
185 let pending_interaction_idx = results.iter().position(|r| r.pending_interaction.is_some());
186 let pending_interaction =
187 pending_interaction_idx.and_then(|idx| results[idx].pending_interaction.clone());
188 let pending_interaction_id = pending_interaction.as_ref().map(|i| i.id.clone());
189
190 if check_parallel_patch_conflicts {
191 validate_parallel_state_patch_conflicts(results)?;
192 }
193
194 let mut patches: Vec<TrackedPatch> = collect_patches(
196 &results
197 .iter()
198 .map(|r| r.execution.clone())
199 .collect::<Vec<_>>(),
200 );
201 for r in results {
202 patches.extend(r.pending_patches.iter().cloned());
203 }
204 let mut state_changed = !patches.is_empty();
205 run_ctx.add_thread_patches(patches);
206
207 let tool_messages: Vec<Arc<Message>> = results
209 .iter()
210 .enumerate()
211 .flat_map(|(idx, r)| {
212 let is_active_pending = pending_interaction_idx == Some(idx);
213 let mut msgs = if is_active_pending {
214 vec![Message::tool(
215 &r.execution.call.id,
216 format!(
217 "Tool '{}' is awaiting approval. Execution paused.",
218 r.execution.call.name
219 ),
220 )]
221 } else {
222 let message_result = if r.pending_interaction.is_some() {
223 ToolResult::error(
224 &r.execution.call.name,
225 format!(
226 "Tool '{}' was deferred because interaction '{}' is already pending in this round.",
227 r.execution.call.name,
228 pending_interaction_id.as_deref().unwrap_or("unknown")
229 ),
230 )
231 } else {
232 r.execution.result.clone()
233 };
234 let mut tool_msg = tool_response(&r.execution.call.id, &message_result);
235 if let Some(id) = tool_msg_ids.and_then(|ids| ids.get(&r.execution.call.id)) {
236 tool_msg = tool_msg.with_id(id.clone());
237 }
238 vec![tool_msg]
239 };
240 for reminder in &r.reminders {
241 msgs.push(Message::internal_system(format!(
242 "<system-reminder>{}</system-reminder>",
243 reminder
244 )));
245 }
246 if let Some(ref meta) = metadata {
247 for msg in &mut msgs {
248 msg.metadata = Some(meta.clone());
249 }
250 }
251 msgs.into_iter().map(Arc::new).collect::<Vec<_>>()
252 })
253 .collect();
254
255 run_ctx.add_messages(tool_messages);
256 let appended_count = drain_agent_append_user_messages(run_ctx, results, metadata.as_ref())?;
257 if appended_count > 0 {
258 state_changed = true;
259 }
260
261 if let Some(interaction) = pending_interaction.clone() {
262 let frontend_invocation = pending_interaction_idx
263 .and_then(|idx| results[idx].pending_frontend_invocation.clone());
264 let state = run_ctx
265 .snapshot()
266 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
267 let patch =
268 set_agent_pending_interaction(&state, interaction.clone(), frontend_invocation)?;
269 if !patch.patch().is_empty() {
270 state_changed = true;
271 run_ctx.add_thread_patch(patch);
272 }
273 let state_snapshot = if state_changed {
274 Some(
275 run_ctx
276 .snapshot()
277 .map_err(|e| AgentLoopError::StateError(e.to_string()))?,
278 )
279 } else {
280 None
281 };
282 return Ok(AppliedToolResults {
283 pending_interaction: Some(interaction),
284 state_snapshot,
285 });
286 }
287
288 let state = run_ctx
291 .snapshot()
292 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
293 if state
294 .get(LoopControlState::PATH)
295 .and_then(|v| v.get("pending_interaction"))
296 .is_some()
297 {
298 let patch = clear_agent_pending_interaction(&state)?;
299 if !patch.patch().is_empty() {
300 state_changed = true;
301 run_ctx.add_thread_patch(patch);
302 }
303 }
304
305 let state_snapshot = if state_changed {
306 Some(
307 run_ctx
308 .snapshot()
309 .map_err(|e| AgentLoopError::StateError(e.to_string()))?,
310 )
311 } else {
312 None
313 };
314
315 Ok(AppliedToolResults {
316 pending_interaction: None,
317 state_snapshot,
318 })
319}
320
321fn tool_result_metadata_from_run_ctx(run_ctx: &RunContext) -> Option<MessageMetadata> {
322 let run_id = run_ctx
323 .run_config
324 .value("run_id")
325 .and_then(|v| v.as_str().map(String::from))
326 .or_else(|| {
327 run_ctx.messages().iter().rev().find_map(|m| {
328 m.metadata
329 .as_ref()
330 .and_then(|meta| meta.run_id.as_ref().cloned())
331 })
332 });
333
334 let step_index = run_ctx
335 .messages()
336 .iter()
337 .rev()
338 .find_map(|m| m.metadata.as_ref().and_then(|meta| meta.step_index));
339
340 if run_id.is_none() && step_index.is_none() {
341 None
342 } else {
343 Some(MessageMetadata { run_id, step_index })
344 }
345}
346
347#[allow(dead_code)]
348pub(super) fn next_step_index(run_ctx: &RunContext) -> u32 {
349 run_ctx
350 .messages()
351 .iter()
352 .filter_map(|m| m.metadata.as_ref().and_then(|meta| meta.step_index))
353 .max()
354 .map(|v| v.saturating_add(1))
355 .unwrap_or(0)
356}
357
358pub(super) fn step_metadata(run_id: Option<String>, step_index: u32) -> MessageMetadata {
359 MessageMetadata {
360 run_id,
361 step_index: Some(step_index),
362 }
363}
364
365pub async fn execute_tools(
369 thread: Thread,
370 result: &StreamResult,
371 tools: &HashMap<String, Arc<dyn Tool>>,
372 parallel: bool,
373) -> Result<Thread, AgentLoopError> {
374 execute_tools_with_plugins(thread, result, tools, parallel, &[]).await
375}
376
377pub async fn execute_tools_with_config(
379 thread: Thread,
380 result: &StreamResult,
381 tools: &HashMap<String, Arc<dyn Tool>>,
382 config: &AgentConfig,
383) -> Result<Thread, AgentLoopError> {
384 execute_tools_with_plugins_and_executor(
385 thread,
386 result,
387 tools,
388 config.tool_executor.as_ref(),
389 &config.plugins,
390 )
391 .await
392}
393
394pub(super) fn scope_with_tool_caller_context(
395 run_ctx: &RunContext,
396 state: &Value,
397 _config: Option<&AgentConfig>,
398) -> Result<tirea_contract::RunConfig, AgentLoopError> {
399 let mut rt = run_ctx.run_config.clone();
400 if rt.value(TOOL_SCOPE_CALLER_THREAD_ID_KEY).is_none() {
401 rt.set(
402 TOOL_SCOPE_CALLER_THREAD_ID_KEY,
403 run_ctx.thread_id().to_string(),
404 )
405 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
406 }
407 if rt.value(TOOL_SCOPE_CALLER_STATE_KEY).is_none() {
408 rt.set(TOOL_SCOPE_CALLER_STATE_KEY, state.clone())
409 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
410 }
411 if rt.value(TOOL_SCOPE_CALLER_MESSAGES_KEY).is_none() {
412 rt.set(TOOL_SCOPE_CALLER_MESSAGES_KEY, run_ctx.messages().to_vec())
413 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
414 }
415 Ok(rt)
416}
417
418pub async fn execute_tools_with_plugins(
420 thread: Thread,
421 result: &StreamResult,
422 tools: &HashMap<String, Arc<dyn Tool>>,
423 parallel: bool,
424 plugins: &[Arc<dyn AgentPlugin>],
425) -> Result<Thread, AgentLoopError> {
426 let parallel_executor = ParallelToolExecutor;
427 let sequential_executor = SequentialToolExecutor;
428 let executor: &dyn ToolExecutor = if parallel {
429 ¶llel_executor
430 } else {
431 &sequential_executor
432 };
433 execute_tools_with_plugins_and_executor(thread, result, tools, executor, plugins).await
434}
435
436pub async fn execute_tools_with_plugins_and_executor(
437 thread: Thread,
438 result: &StreamResult,
439 tools: &HashMap<String, Arc<dyn Tool>>,
440 executor: &dyn ToolExecutor,
441 plugins: &[Arc<dyn AgentPlugin>],
442) -> Result<Thread, AgentLoopError> {
443 if result.tool_calls.is_empty() {
444 return Ok(thread);
445 }
446
447 let rebuilt_state = thread
449 .rebuild_state()
450 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
451 let mut run_ctx = RunContext::new(
452 &thread.id,
453 rebuilt_state.clone(),
454 thread.messages.clone(),
455 tirea_contract::RunConfig::default(),
456 );
457
458 let tool_descriptors: Vec<ToolDescriptor> =
459 tools.values().map(|t| t.descriptor().clone()).collect();
460 let rt_for_tools = scope_with_tool_caller_context(&run_ctx, &rebuilt_state, None)?;
461 let results = executor
462 .execute(ToolExecutionRequest {
463 tools,
464 calls: &result.tool_calls,
465 state: &rebuilt_state,
466 tool_descriptors: &tool_descriptors,
467 plugins,
468 activity_manager: None,
469 run_config: &rt_for_tools,
470 thread_id: run_ctx.thread_id(),
471 thread_messages: run_ctx.messages(),
472 state_version: run_ctx.version(),
473 cancellation_token: None,
474 })
475 .await?;
476
477 let metadata = tool_result_metadata_from_run_ctx(&run_ctx);
478 let applied = apply_tool_results_to_session(
479 &mut run_ctx,
480 &results,
481 metadata,
482 executor.requires_parallel_patch_conflict_check(),
483 )?;
484 if let Some(interaction) = applied.pending_interaction {
485 return Err(AgentLoopError::PendingInteraction {
486 run_ctx: Box::new(run_ctx),
487 interaction: Box::new(interaction),
488 });
489 }
490
491 let delta = run_ctx.take_delta();
493 let mut out_thread = thread;
494 for msg in delta.messages {
495 out_thread = out_thread.with_message((*msg).clone());
496 }
497 out_thread = out_thread.with_patches(delta.patches);
498 Ok(out_thread)
499}
500
501pub(super) async fn execute_tools_parallel_with_phases(
503 tools: &HashMap<String, Arc<dyn Tool>>,
504 calls: &[crate::contracts::thread::ToolCall],
505 state: &Value,
506 phase_ctx: ToolPhaseContext<'_>,
507) -> Result<Vec<ToolExecutionResult>, AgentLoopError> {
508 use futures::future::join_all;
509
510 if is_cancelled(phase_ctx.cancellation_token) {
511 return Err(cancelled_error(phase_ctx.thread_id));
512 }
513
514 let run_config_owned = phase_ctx.run_config.clone();
516 let thread_id = phase_ctx.thread_id.to_string();
517 let thread_messages = Arc::new(phase_ctx.thread_messages.to_vec());
518 let tool_descriptors = phase_ctx.tool_descriptors.to_vec();
519 let plugins = phase_ctx.plugins.to_vec();
520
521 let futures = calls.iter().map(|call| {
522 let tool = tools.get(&call.name).cloned();
523 let state = state.clone();
524 let call = call.clone();
525 let plugins = plugins.clone();
526 let tool_descriptors = tool_descriptors.clone();
527 let activity_manager = phase_ctx.activity_manager.clone();
528 let rt = run_config_owned.clone();
529 let sid = thread_id.clone();
530 let thread_messages = thread_messages.clone();
531
532 async move {
533 execute_single_tool_with_phases(
534 tool.as_deref(),
535 &call,
536 &state,
537 &ToolPhaseContext {
538 tool_descriptors: &tool_descriptors,
539 plugins: &plugins,
540 activity_manager,
541 run_config: &rt,
542 thread_id: &sid,
543 thread_messages: thread_messages.as_slice(),
544 cancellation_token: None,
545 },
546 )
547 .await
548 }
549 });
550
551 let join_future = join_all(futures);
552 let results = match await_or_cancel(phase_ctx.cancellation_token, join_future).await {
553 CancelAware::Cancelled => return Err(cancelled_error(&thread_id)),
554 CancelAware::Value(results) => results,
555 };
556 let mut results: Vec<ToolExecutionResult> = results.into_iter().collect::<Result<_, _>>()?;
557 coalesce_pending_interactions(&mut results);
558 Ok(results)
559}
560
561fn coalesce_pending_interactions(results: &mut [ToolExecutionResult]) {
562 let mut active_pending_id: Option<String> = None;
563 for result in results {
564 let Some(interaction_id) = result.pending_interaction.as_ref().map(|i| i.id.clone()) else {
565 continue;
566 };
567
568 if active_pending_id.is_none() {
569 active_pending_id = Some(interaction_id);
570 continue;
571 }
572
573 let active_id = active_pending_id.as_deref().unwrap_or("unknown");
574 result.pending_interaction = None;
575 result.pending_frontend_invocation = None;
576 result.pending_patches.clear();
577 result.execution.result = ToolResult::error(
578 &result.execution.call.name,
579 format!(
580 "Tool '{}' was deferred because interaction '{}' is already pending in this round.",
581 result.execution.call.name, active_id
582 ),
583 );
584 }
585}
586
587pub(super) async fn execute_tools_sequential_with_phases(
589 tools: &HashMap<String, Arc<dyn Tool>>,
590 calls: &[crate::contracts::thread::ToolCall],
591 initial_state: &Value,
592 phase_ctx: ToolPhaseContext<'_>,
593) -> Result<Vec<ToolExecutionResult>, AgentLoopError> {
594 use tirea_state::apply_patch;
595
596 if is_cancelled(phase_ctx.cancellation_token) {
597 return Err(cancelled_error(phase_ctx.thread_id));
598 }
599
600 let mut state = initial_state.clone();
601 let mut results = Vec::with_capacity(calls.len());
602
603 for call in calls {
604 let tool = tools.get(&call.name).cloned();
605 let call_phase_ctx = ToolPhaseContext {
606 tool_descriptors: phase_ctx.tool_descriptors,
607 plugins: phase_ctx.plugins,
608 activity_manager: phase_ctx.activity_manager.clone(),
609 run_config: phase_ctx.run_config,
610 thread_id: phase_ctx.thread_id,
611 thread_messages: phase_ctx.thread_messages,
612 cancellation_token: None,
613 };
614 let result = match await_or_cancel(
615 phase_ctx.cancellation_token,
616 execute_single_tool_with_phases(tool.as_deref(), call, &state, &call_phase_ctx),
617 )
618 .await
619 {
620 CancelAware::Cancelled => return Err(cancelled_error(phase_ctx.thread_id)),
621 CancelAware::Value(result) => result?,
622 };
623
624 if let Some(ref patch) = result.execution.patch {
626 state = apply_patch(&state, patch.patch()).map_err(|e| {
627 AgentLoopError::StateError(format!(
628 "failed to apply tool patch for call '{}': {}",
629 result.execution.call.id, e
630 ))
631 })?;
632 }
633 for pp in &result.pending_patches {
635 state = apply_patch(&state, pp.patch()).map_err(|e| {
636 AgentLoopError::StateError(format!(
637 "failed to apply plugin patch for call '{}': {}",
638 result.execution.call.id, e
639 ))
640 })?;
641 }
642
643 results.push(result);
644
645 if results
646 .last()
647 .and_then(|r| r.pending_interaction.as_ref())
648 .is_some()
649 {
650 break;
651 }
652 }
653
654 Ok(results)
655}
656
657pub(super) async fn execute_single_tool_with_phases(
659 tool: Option<&dyn Tool>,
660 call: &crate::contracts::thread::ToolCall,
661 state: &Value,
662 phase_ctx: &ToolPhaseContext<'_>,
663) -> Result<ToolExecutionResult, AgentLoopError> {
664 let doc = tirea_state::DocCell::new(state.clone());
666 let ops = std::sync::Mutex::new(Vec::new());
667 let pending_messages = std::sync::Mutex::new(Vec::new());
668 let plugin_scope = phase_ctx.run_config;
669 let plugin_tool_call_ctx = crate::contracts::ToolCallContext::new_with_cancellation(
670 &doc,
671 &ops,
672 "plugin_phase",
673 "plugin:tool_phase",
674 plugin_scope,
675 &pending_messages,
676 None,
677 phase_ctx.cancellation_token,
678 );
679
680 let mut step = StepContext::new(
682 plugin_tool_call_ctx,
683 phase_ctx.thread_id,
684 phase_ctx.thread_messages,
685 phase_ctx.tool_descriptors.to_vec(),
686 );
687 step.tool = Some(ToolContext::new(call));
688
689 emit_phase_checked(Phase::BeforeToolExecute, &mut step, phase_ctx.plugins).await?;
691
692 let (execution, pending_interaction, pending_frontend_invocation) =
694 if !crate::engine::tool_filter::is_scope_allowed(
695 Some(phase_ctx.run_config),
696 &call.name,
697 SCOPE_ALLOWED_TOOLS_KEY,
698 SCOPE_EXCLUDED_TOOLS_KEY,
699 ) {
700 (
701 ToolExecution {
702 call: call.clone(),
703 result: ToolResult::error(
704 &call.name,
705 format!("Tool '{}' is not allowed by current policy", call.name),
706 ),
707 patch: None,
708 },
709 None,
710 None,
711 )
712 } else if step.tool_blocked() {
713 let reason = step
714 .tool
715 .as_ref()
716 .and_then(|t| t.block_reason.clone())
717 .unwrap_or_else(|| "Blocked by plugin".to_string());
718 (
719 ToolExecution {
720 call: call.clone(),
721 result: ToolResult::error(&call.name, reason),
722 patch: None,
723 },
724 None,
725 None,
726 )
727 } else if tool.is_none() {
728 (
729 ToolExecution {
730 call: call.clone(),
731 result: ToolResult::error(
732 &call.name,
733 format!("Tool '{}' not found", call.name),
734 ),
735 patch: None,
736 },
737 None,
738 None,
739 )
740 } else if let Err(e) = tool.unwrap().validate_args(&call.arguments) {
741 (
743 ToolExecution {
744 call: call.clone(),
745 result: ToolResult::error(&call.name, e.to_string()),
746 patch: None,
747 },
748 None,
749 None,
750 )
751 } else if step.tool_pending() {
752 let interaction = step
753 .tool
754 .as_ref()
755 .and_then(|t| t.pending_interaction.clone());
756 let frontend_invocation = step
757 .tool
758 .as_ref()
759 .and_then(|t| t.pending_frontend_invocation.clone());
760 (
761 ToolExecution {
762 call: call.clone(),
763 result: ToolResult::pending(&call.name, "Waiting for user confirmation"),
764 patch: None,
765 },
766 interaction,
767 frontend_invocation,
768 )
769 } else {
770 let tool_doc = tirea_state::DocCell::new(state.clone());
772 let tool_ops = std::sync::Mutex::new(Vec::new());
773 let tool_pending_msgs = std::sync::Mutex::new(Vec::new());
774 let tool_ctx = crate::contracts::ToolCallContext::new_with_cancellation(
775 &tool_doc,
776 &tool_ops,
777 &call.id,
778 format!("tool:{}", call.name),
779 plugin_scope,
780 &tool_pending_msgs,
781 phase_ctx.activity_manager.clone(),
782 phase_ctx.cancellation_token,
783 );
784 let result = match tool
785 .unwrap()
786 .execute(call.arguments.clone(), &tool_ctx)
787 .await
788 {
789 Ok(r) => r,
790 Err(e) => ToolResult::error(&call.name, e.to_string()),
791 };
792
793 let patch = tool_ctx.take_patch();
794 let patch = if patch.patch().is_empty() {
795 None
796 } else {
797 Some(patch)
798 };
799
800 (
801 ToolExecution {
802 call: call.clone(),
803 result,
804 patch,
805 },
806 None,
807 None,
808 )
809 };
810
811 step.set_tool_result(execution.result.clone());
813
814 emit_phase_checked(Phase::AfterToolExecute, &mut step, phase_ctx.plugins).await?;
816
817 let plugin_patch = step.ctx().take_patch();
819 if !plugin_patch.patch().is_empty() {
820 step.pending_patches.push(plugin_patch);
821 }
822
823 let pending_patches = std::mem::take(&mut step.pending_patches);
824
825 Ok(ToolExecutionResult {
826 execution,
827 reminders: step.system_reminders.clone(),
828 pending_interaction,
829 pending_frontend_invocation,
830 pending_patches,
831 })
832}
833
834fn cancelled_error(thread_id: &str) -> AgentLoopError {
835 AgentLoopError::Cancelled {
836 run_ctx: Box::new(RunContext::new(
837 thread_id,
838 serde_json::json!({}),
839 vec![],
840 tirea_contract::RunConfig::default(),
841 )),
842 }
843}