1use super::core::{
2 pending_approval_placeholder_message, transition_tool_call_state, ToolCallStateSeed,
3 ToolCallStateTransition,
4};
5use super::parallel_state_merge::merge_parallel_state_patches;
6use super::plugin_runtime::emit_tool_phase;
7use super::{
8 Agent, AgentLoopError, BaseAgent, RunCancellationToken, TOOL_SCOPE_CALLER_MESSAGES_KEY,
9 TOOL_SCOPE_CALLER_STATE_KEY, TOOL_SCOPE_CALLER_THREAD_ID_KEY,
10};
11use crate::contracts::runtime::action::Action;
12use crate::contracts::runtime::behavior::AgentBehavior;
13use crate::contracts::runtime::phase::{Phase, StepContext};
14use crate::contracts::runtime::state::{reduce_state_actions, AnyStateAction, ScopeContext};
15use crate::contracts::runtime::tool_call::ToolGate;
16use crate::contracts::runtime::tool_call::{Tool, ToolDescriptor, ToolResult};
17use crate::contracts::runtime::{
18 ActivityManager, PendingToolCall, SuspendTicket, SuspendedCall, ToolCallResumeMode,
19};
20use crate::contracts::runtime::{
21 DecisionReplayPolicy, StreamResult, ToolCallOutcome, ToolCallStatus, ToolExecution,
22 ToolExecutionEffect, ToolExecutionRequest, ToolExecutionResult, ToolExecutor,
23 ToolExecutorError,
24};
25use crate::contracts::thread::Thread;
26use crate::contracts::thread::{Message, MessageMetadata, ToolCall};
27use crate::contracts::{RunContext, Suspension};
28use crate::engine::convert::tool_response;
29use crate::engine::tool_execution::merge_context_patch_into_effect;
30use crate::runtime::run_context::{await_or_cancel, is_cancelled, CancelAware};
31use async_trait::async_trait;
32use serde_json::Value;
33use std::collections::HashMap;
34use std::sync::Arc;
35use tirea_state::{Patch, TrackedPatch};
36
37#[derive(Debug)]
42pub enum ExecuteToolsOutcome {
43 Completed(Thread),
45 Suspended {
47 thread: Thread,
48 suspended_call: Box<SuspendedCall>,
49 },
50}
51
52impl ExecuteToolsOutcome {
53 pub fn into_thread(self) -> Thread {
55 match self {
56 Self::Completed(t) | Self::Suspended { thread: t, .. } => t,
57 }
58 }
59
60 pub fn is_suspended(&self) -> bool {
62 matches!(self, Self::Suspended { .. })
63 }
64}
65
66pub(super) struct AppliedToolResults {
67 pub(super) suspended_calls: Vec<SuspendedCall>,
68 pub(super) state_snapshot: Option<Value>,
69}
70
71#[derive(Clone)]
72pub(super) struct ToolPhaseContext<'a> {
73 pub(super) tool_descriptors: &'a [ToolDescriptor],
74 pub(super) agent_behavior: Option<&'a dyn AgentBehavior>,
75 pub(super) activity_manager: Arc<dyn ActivityManager>,
76 pub(super) run_config: &'a tirea_contract::RunConfig,
77 pub(super) thread_id: &'a str,
78 pub(super) thread_messages: &'a [Arc<Message>],
79 pub(super) cancellation_token: Option<&'a RunCancellationToken>,
80}
81
82impl<'a> ToolPhaseContext<'a> {
83 pub(super) fn from_request(request: &'a ToolExecutionRequest<'a>) -> Self {
84 Self {
85 tool_descriptors: request.tool_descriptors,
86 agent_behavior: request.agent_behavior,
87 activity_manager: request.activity_manager.clone(),
88 run_config: request.run_config,
89 thread_id: request.thread_id,
90 thread_messages: request.thread_messages,
91 cancellation_token: request.cancellation_token,
92 }
93 }
94}
95
96fn now_unix_millis() -> u64 {
97 std::time::SystemTime::now()
98 .duration_since(std::time::UNIX_EPOCH)
99 .map_or(0, |d| d.as_millis().min(u128::from(u64::MAX)) as u64)
100}
101
102fn suspended_call_from_tool_result(call: &ToolCall, result: &ToolResult) -> SuspendedCall {
103 if let Some(mut explicit) = result.suspension() {
104 if explicit.pending.id.trim().is_empty() || explicit.pending.name.trim().is_empty() {
105 explicit.pending =
106 PendingToolCall::new(call.id.clone(), call.name.clone(), call.arguments.clone());
107 }
108 return SuspendedCall::new(call, explicit);
109 }
110
111 let mut suspension = Suspension::new(&call.id, format!("tool:{}", call.name))
112 .with_parameters(call.arguments.clone());
113 if let Some(message) = result.message.as_ref() {
114 suspension = suspension.with_message(message.clone());
115 }
116
117 SuspendedCall::new(
118 call,
119 SuspendTicket::new(
120 suspension,
121 PendingToolCall::new(call.id.clone(), call.name.clone(), call.arguments.clone()),
122 ToolCallResumeMode::ReplayToolCall,
123 ),
124 )
125}
126
127fn persist_tool_call_status(
128 step: &StepContext<'_>,
129 call: &ToolCall,
130 status: ToolCallStatus,
131 suspended_call: Option<&SuspendedCall>,
132) -> Result<(), AgentLoopError> {
133 let current_state = step.ctx().tool_call_state_for(&call.id).map_err(|e| {
134 AgentLoopError::StateError(format!(
135 "failed to read tool call state for '{}' before setting {:?}: {e}",
136 call.id, status
137 ))
138 })?;
139 let previous_status = current_state
140 .as_ref()
141 .map(|state| state.status)
142 .unwrap_or(ToolCallStatus::New);
143 let current_resume_token = current_state
144 .as_ref()
145 .and_then(|state| state.resume_token.clone());
146 let current_resume = current_state
147 .as_ref()
148 .and_then(|state| state.resume.clone());
149
150 let (next_resume_token, next_resume) = match status {
151 ToolCallStatus::Running => {
152 if matches!(previous_status, ToolCallStatus::Resuming) {
153 (current_resume_token.clone(), current_resume.clone())
154 } else {
155 (None, None)
156 }
157 }
158 ToolCallStatus::Suspended => (
159 suspended_call
160 .map(|entry| entry.ticket.pending.id.clone())
161 .or(current_resume_token.clone()),
162 None,
163 ),
164 ToolCallStatus::Succeeded
165 | ToolCallStatus::Failed
166 | ToolCallStatus::Cancelled
167 | ToolCallStatus::New
168 | ToolCallStatus::Resuming => (current_resume_token, current_resume),
169 };
170
171 let Some(runtime_state) = transition_tool_call_state(
172 current_state,
173 ToolCallStateSeed {
174 call_id: &call.id,
175 tool_name: &call.name,
176 arguments: &call.arguments,
177 status: ToolCallStatus::New,
178 resume_token: None,
179 },
180 ToolCallStateTransition {
181 status,
182 resume_token: next_resume_token,
183 resume: next_resume,
184 updated_at: now_unix_millis(),
185 },
186 ) else {
187 return Err(AgentLoopError::StateError(format!(
188 "invalid tool call status transition for '{}': {:?} -> {:?}",
189 call.id, previous_status, status
190 )));
191 };
192
193 step.ctx()
194 .set_tool_call_state_for(&call.id, runtime_state)
195 .map_err(|e| {
196 AgentLoopError::StateError(format!(
197 "failed to persist tool call state for '{}' as {:?}: {e}",
198 call.id, status
199 ))
200 })
201}
202
203fn map_tool_executor_error(err: AgentLoopError, thread_id: &str) -> ToolExecutorError {
204 match err {
205 AgentLoopError::Cancelled => ToolExecutorError::Cancelled {
206 thread_id: thread_id.to_string(),
207 },
208 other => ToolExecutorError::Failed {
209 message: other.to_string(),
210 },
211 }
212}
213
214#[derive(Debug, Clone, Copy, PartialEq, Eq)]
216pub enum ParallelToolExecutionMode {
217 BatchApproval,
218 Streaming,
219}
220
221#[derive(Debug, Clone, Copy, PartialEq, Eq)]
223pub struct ParallelToolExecutor {
224 mode: ParallelToolExecutionMode,
225}
226
227impl ParallelToolExecutor {
228 pub const fn batch_approval() -> Self {
229 Self {
230 mode: ParallelToolExecutionMode::BatchApproval,
231 }
232 }
233
234 pub const fn streaming() -> Self {
235 Self {
236 mode: ParallelToolExecutionMode::Streaming,
237 }
238 }
239
240 fn mode_name(self) -> &'static str {
241 match self.mode {
242 ParallelToolExecutionMode::BatchApproval => "parallel_batch_approval",
243 ParallelToolExecutionMode::Streaming => "parallel_streaming",
244 }
245 }
246}
247
248impl Default for ParallelToolExecutor {
249 fn default() -> Self {
250 Self::streaming()
251 }
252}
253
254#[async_trait]
255impl ToolExecutor for ParallelToolExecutor {
256 async fn execute(
257 &self,
258 request: ToolExecutionRequest<'_>,
259 ) -> Result<Vec<ToolExecutionResult>, ToolExecutorError> {
260 let thread_id = request.thread_id;
261 let phase_ctx = ToolPhaseContext::from_request(&request);
262 execute_tools_parallel_with_phases(request.tools, request.calls, request.state, phase_ctx)
263 .await
264 .map_err(|e| map_tool_executor_error(e, thread_id))
265 }
266
267 fn name(&self) -> &'static str {
268 self.mode_name()
269 }
270
271 fn requires_parallel_patch_conflict_check(&self) -> bool {
272 true
273 }
274
275 fn decision_replay_policy(&self) -> DecisionReplayPolicy {
276 match self.mode {
277 ParallelToolExecutionMode::BatchApproval => DecisionReplayPolicy::BatchAllSuspended,
278 ParallelToolExecutionMode::Streaming => DecisionReplayPolicy::Immediate,
279 }
280 }
281}
282
283#[derive(Debug, Clone, Copy, Default)]
285pub struct SequentialToolExecutor;
286
287#[async_trait]
288impl ToolExecutor for SequentialToolExecutor {
289 async fn execute(
290 &self,
291 request: ToolExecutionRequest<'_>,
292 ) -> Result<Vec<ToolExecutionResult>, ToolExecutorError> {
293 let thread_id = request.thread_id;
294 let phase_ctx = ToolPhaseContext::from_request(&request);
295 execute_tools_sequential_with_phases(request.tools, request.calls, request.state, phase_ctx)
296 .await
297 .map_err(|e| map_tool_executor_error(e, thread_id))
298 }
299
300 fn name(&self) -> &'static str {
301 "sequential"
302 }
303}
304
305pub(super) fn apply_tool_results_to_session(
306 run_ctx: &mut RunContext,
307 results: &[ToolExecutionResult],
308 metadata: Option<MessageMetadata>,
309 check_parallel_patch_conflicts: bool,
310) -> Result<AppliedToolResults, AgentLoopError> {
311 apply_tool_results_impl(
312 run_ctx,
313 results,
314 metadata,
315 check_parallel_patch_conflicts,
316 None,
317 )
318}
319
320pub(super) fn apply_tool_results_impl(
321 run_ctx: &mut RunContext,
322 results: &[ToolExecutionResult],
323 metadata: Option<MessageMetadata>,
324 check_parallel_patch_conflicts: bool,
325 tool_msg_ids: Option<&HashMap<String, String>>,
326) -> Result<AppliedToolResults, AgentLoopError> {
327 let suspended: Vec<SuspendedCall> = results
329 .iter()
330 .filter_map(|r| {
331 if matches!(r.outcome, ToolCallOutcome::Suspended) {
332 r.suspended_call.clone()
333 } else {
334 None
335 }
336 })
337 .collect();
338
339 let all_serialized_actions: Vec<tirea_contract::SerializedAction> = results
341 .iter()
342 .flat_map(|r| r.serialized_actions.iter().cloned())
343 .collect();
344 if !all_serialized_actions.is_empty() {
345 run_ctx.add_serialized_actions(all_serialized_actions);
346 }
347
348 let base_snapshot = run_ctx
349 .snapshot()
350 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
351 let patches = merge_parallel_state_patches(
352 &base_snapshot,
353 results,
354 check_parallel_patch_conflicts,
355 run_ctx.lattice_registry(),
356 )?;
357 let mut state_changed = !patches.is_empty();
358 run_ctx.add_thread_patches(patches);
359
360 let tool_messages: Vec<Arc<Message>> = results
362 .iter()
363 .flat_map(|r| {
364 let is_suspended = matches!(r.outcome, ToolCallOutcome::Suspended);
365 let mut msgs = if is_suspended {
366 vec![Message::tool(
367 &r.execution.call.id,
368 pending_approval_placeholder_message(&r.execution.call.name),
369 )]
370 } else {
371 let mut tool_msg = tool_response(&r.execution.call.id, &r.execution.result);
372 if let Some(id) = tool_msg_ids.and_then(|ids| ids.get(&r.execution.call.id)) {
373 tool_msg = tool_msg.with_id(id.clone());
374 }
375 vec![tool_msg]
376 };
377 for reminder in &r.reminders {
378 msgs.push(Message::internal_system(format!(
379 "<system-reminder>{}</system-reminder>",
380 reminder
381 )));
382 }
383 if let Some(ref meta) = metadata {
384 for msg in &mut msgs {
385 msg.metadata = Some(meta.clone());
386 }
387 }
388 msgs.into_iter().map(Arc::new).collect::<Vec<_>>()
389 })
390 .collect();
391
392 run_ctx.add_messages(tool_messages);
393
394 let user_messages: Vec<Arc<Message>> = results
396 .iter()
397 .flat_map(|r| {
398 r.user_messages
399 .iter()
400 .map(|s| s.trim())
401 .filter(|s| !s.is_empty())
402 .map(|text| {
403 let mut msg = Message::user(text.to_string());
404 if let Some(ref meta) = metadata {
405 msg.metadata = Some(meta.clone());
406 }
407 Arc::new(msg)
408 })
409 .collect::<Vec<_>>()
410 })
411 .collect();
412 if !user_messages.is_empty() {
413 run_ctx.add_messages(user_messages);
414 }
415 if !suspended.is_empty() {
416 let state = run_ctx
417 .snapshot()
418 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
419 let actions: Vec<AnyStateAction> = suspended
420 .iter()
421 .map(|call| call.clone().into_state_action())
422 .collect();
423 let patches = reduce_state_actions(actions, &state, "agent_loop", &ScopeContext::run())
424 .map_err(|e| {
425 AgentLoopError::StateError(format!("failed to reduce suspended call actions: {e}"))
426 })?;
427 for patch in patches {
428 if !patch.patch().is_empty() {
429 state_changed = true;
430 run_ctx.add_thread_patch(patch);
431 }
432 }
433 let state_snapshot = if state_changed {
434 Some(
435 run_ctx
436 .snapshot()
437 .map_err(|e| AgentLoopError::StateError(e.to_string()))?,
438 )
439 } else {
440 None
441 };
442 return Ok(AppliedToolResults {
443 suspended_calls: suspended,
444 state_snapshot,
445 });
446 }
447
448 let state_snapshot = if state_changed {
455 Some(
456 run_ctx
457 .snapshot()
458 .map_err(|e| AgentLoopError::StateError(e.to_string()))?,
459 )
460 } else {
461 None
462 };
463
464 Ok(AppliedToolResults {
465 suspended_calls: Vec::new(),
466 state_snapshot,
467 })
468}
469
470fn tool_result_metadata_from_run_ctx(
471 run_ctx: &RunContext,
472 run_id: Option<&str>,
473) -> Option<MessageMetadata> {
474 let run_id = run_id.map(|id| id.to_string()).or_else(|| {
475 run_ctx.messages().iter().rev().find_map(|m| {
476 m.metadata
477 .as_ref()
478 .and_then(|meta| meta.run_id.as_ref().cloned())
479 })
480 });
481
482 let step_index = run_ctx
483 .messages()
484 .iter()
485 .rev()
486 .find_map(|m| m.metadata.as_ref().and_then(|meta| meta.step_index));
487
488 if run_id.is_none() && step_index.is_none() {
489 None
490 } else {
491 Some(MessageMetadata { run_id, step_index })
492 }
493}
494
495#[allow(dead_code)]
496pub(super) fn next_step_index(run_ctx: &RunContext) -> u32 {
497 run_ctx
498 .messages()
499 .iter()
500 .filter_map(|m| m.metadata.as_ref().and_then(|meta| meta.step_index))
501 .max()
502 .map(|v| v.saturating_add(1))
503 .unwrap_or(0)
504}
505
506pub(super) fn step_metadata(run_id: Option<String>, step_index: u32) -> MessageMetadata {
507 MessageMetadata {
508 run_id,
509 step_index: Some(step_index),
510 }
511}
512
513pub async fn execute_tools(
517 thread: Thread,
518 result: &StreamResult,
519 tools: &HashMap<String, Arc<dyn Tool>>,
520 parallel: bool,
521) -> Result<ExecuteToolsOutcome, AgentLoopError> {
522 let parallel_executor = ParallelToolExecutor::streaming();
523 let sequential_executor = SequentialToolExecutor;
524 let executor: &dyn ToolExecutor = if parallel {
525 ¶llel_executor
526 } else {
527 &sequential_executor
528 };
529 execute_tools_with_agent_and_executor(thread, result, tools, executor, None).await
530}
531
532pub async fn execute_tools_with_config(
534 thread: Thread,
535 result: &StreamResult,
536 tools: &HashMap<String, Arc<dyn Tool>>,
537 agent: &dyn Agent,
538) -> Result<ExecuteToolsOutcome, AgentLoopError> {
539 execute_tools_with_agent_and_executor(
540 thread,
541 result,
542 tools,
543 agent.tool_executor().as_ref(),
544 Some(agent.behavior()),
545 )
546 .await
547}
548
549pub(super) fn scope_with_tool_caller_context(
550 run_ctx: &RunContext,
551 state: &Value,
552) -> Result<tirea_contract::RunConfig, AgentLoopError> {
553 let mut rt = run_ctx.run_config.clone();
554 if rt.value(TOOL_SCOPE_CALLER_THREAD_ID_KEY).is_none() {
555 rt.set(
556 TOOL_SCOPE_CALLER_THREAD_ID_KEY,
557 run_ctx.thread_id().to_string(),
558 )
559 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
560 }
561 if rt.value(TOOL_SCOPE_CALLER_STATE_KEY).is_none() {
562 rt.set(TOOL_SCOPE_CALLER_STATE_KEY, state.clone())
563 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
564 }
565 if rt.value(TOOL_SCOPE_CALLER_MESSAGES_KEY).is_none() {
566 rt.set(TOOL_SCOPE_CALLER_MESSAGES_KEY, run_ctx.messages().to_vec())
567 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
568 }
569 Ok(rt)
570}
571
572pub async fn execute_tools_with_behaviors(
574 thread: Thread,
575 result: &StreamResult,
576 tools: &HashMap<String, Arc<dyn Tool>>,
577 parallel: bool,
578 behavior: Arc<dyn AgentBehavior>,
579) -> Result<ExecuteToolsOutcome, AgentLoopError> {
580 let executor: Arc<dyn ToolExecutor> = if parallel {
581 Arc::new(ParallelToolExecutor::streaming())
582 } else {
583 Arc::new(SequentialToolExecutor)
584 };
585 let agent = BaseAgent::default()
586 .with_behavior(behavior)
587 .with_tool_executor(executor);
588 execute_tools_with_config(thread, result, tools, &agent).await
589}
590
591async fn execute_tools_with_agent_and_executor(
592 thread: Thread,
593 result: &StreamResult,
594 tools: &HashMap<String, Arc<dyn Tool>>,
595 executor: &dyn ToolExecutor,
596 behavior: Option<&dyn AgentBehavior>,
597) -> Result<ExecuteToolsOutcome, AgentLoopError> {
598 let rebuilt_state = thread
600 .rebuild_state()
601 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
602 let mut run_ctx = RunContext::new(
603 &thread.id,
604 rebuilt_state.clone(),
605 thread.messages.clone(),
606 tirea_contract::RunConfig::default(),
607 );
608
609 let tool_descriptors: Vec<ToolDescriptor> =
610 tools.values().map(|t| t.descriptor().clone()).collect();
611 if let Some(behavior) = behavior {
613 let run_start_patches = super::plugin_runtime::behavior_run_phase_block(
614 &run_ctx,
615 &tool_descriptors,
616 behavior,
617 &[Phase::RunStart],
618 |_| {},
619 |_| (),
620 )
621 .await?
622 .1;
623 if !run_start_patches.is_empty() {
624 run_ctx.add_thread_patches(run_start_patches);
625 }
626 }
627
628 let replay_executor: Arc<dyn ToolExecutor> = match executor.decision_replay_policy() {
629 DecisionReplayPolicy::BatchAllSuspended => Arc::new(ParallelToolExecutor::batch_approval()),
630 DecisionReplayPolicy::Immediate => Arc::new(ParallelToolExecutor::streaming()),
631 };
632 let replay_config = BaseAgent::default().with_tool_executor(replay_executor);
633 let replay = super::drain_resuming_tool_calls_and_replay(
634 &mut run_ctx,
635 tools,
636 &replay_config,
637 &tool_descriptors,
638 )
639 .await?;
640
641 if replay.replayed {
642 let suspended = run_ctx.suspended_calls().values().next().cloned();
643 let delta = run_ctx.take_delta();
644 let mut out_thread = thread;
645 for msg in delta.messages {
646 out_thread = out_thread.with_message((*msg).clone());
647 }
648 out_thread = out_thread.with_patches(delta.patches);
649 return if let Some(first) = suspended {
650 Ok(ExecuteToolsOutcome::Suspended {
651 thread: out_thread,
652 suspended_call: Box::new(first),
653 })
654 } else {
655 Ok(ExecuteToolsOutcome::Completed(out_thread))
656 };
657 }
658
659 if result.tool_calls.is_empty() {
660 let delta = run_ctx.take_delta();
661 let mut out_thread = thread;
662 for msg in delta.messages {
663 out_thread = out_thread.with_message((*msg).clone());
664 }
665 out_thread = out_thread.with_patches(delta.patches);
666 return Ok(ExecuteToolsOutcome::Completed(out_thread));
667 }
668
669 let current_state = run_ctx
670 .snapshot()
671 .map_err(|e| AgentLoopError::StateError(e.to_string()))?;
672 let rt_for_tools = scope_with_tool_caller_context(&run_ctx, ¤t_state)?;
673 let results = executor
674 .execute(ToolExecutionRequest {
675 tools,
676 calls: &result.tool_calls,
677 state: ¤t_state,
678 tool_descriptors: &tool_descriptors,
679 agent_behavior: behavior,
680 activity_manager: tirea_contract::runtime::activity::NoOpActivityManager::arc(),
681 run_config: &rt_for_tools,
682 thread_id: run_ctx.thread_id(),
683 thread_messages: run_ctx.messages(),
684 state_version: run_ctx.version(),
685 cancellation_token: None,
686 })
687 .await?;
688
689 let metadata = tool_result_metadata_from_run_ctx(&run_ctx, None);
690 let applied = apply_tool_results_to_session(
691 &mut run_ctx,
692 &results,
693 metadata,
694 executor.requires_parallel_patch_conflict_check(),
695 )?;
696 let suspended = applied.suspended_calls.into_iter().next();
697
698 let delta = run_ctx.take_delta();
700 let mut out_thread = thread;
701 for msg in delta.messages {
702 out_thread = out_thread.with_message((*msg).clone());
703 }
704 out_thread = out_thread.with_patches(delta.patches);
705
706 if let Some(first) = suspended {
707 Ok(ExecuteToolsOutcome::Suspended {
708 thread: out_thread,
709 suspended_call: Box::new(first),
710 })
711 } else {
712 Ok(ExecuteToolsOutcome::Completed(out_thread))
713 }
714}
715
716pub(super) async fn execute_tools_parallel_with_phases(
718 tools: &HashMap<String, Arc<dyn Tool>>,
719 calls: &[crate::contracts::thread::ToolCall],
720 state: &Value,
721 phase_ctx: ToolPhaseContext<'_>,
722) -> Result<Vec<ToolExecutionResult>, AgentLoopError> {
723 use futures::future::join_all;
724
725 if is_cancelled(phase_ctx.cancellation_token) {
726 return Err(cancelled_error(phase_ctx.thread_id));
727 }
728
729 let run_config_owned = phase_ctx.run_config.clone();
731 let thread_id = phase_ctx.thread_id.to_string();
732 let thread_messages = Arc::new(phase_ctx.thread_messages.to_vec());
733 let tool_descriptors = phase_ctx.tool_descriptors.to_vec();
734 let agent = phase_ctx.agent_behavior;
735
736 let futures = calls.iter().map(|call| {
737 let tool = tools.get(&call.name).cloned();
738 let state = state.clone();
739 let call = call.clone();
740 let tool_descriptors = tool_descriptors.clone();
741 let activity_manager = phase_ctx.activity_manager.clone();
742 let rt = run_config_owned.clone();
743 let sid = thread_id.clone();
744 let thread_messages = thread_messages.clone();
745
746 async move {
747 execute_single_tool_with_phases_impl(
748 tool.as_deref(),
749 &call,
750 &state,
751 &ToolPhaseContext {
752 tool_descriptors: &tool_descriptors,
753 agent_behavior: agent,
754 activity_manager,
755 run_config: &rt,
756 thread_id: &sid,
757 thread_messages: thread_messages.as_slice(),
758 cancellation_token: None,
759 },
760 )
761 .await
762 }
763 });
764
765 let join_future = join_all(futures);
766 let results = match await_or_cancel(phase_ctx.cancellation_token, join_future).await {
767 CancelAware::Cancelled => return Err(cancelled_error(&thread_id)),
768 CancelAware::Value(results) => results,
769 };
770 let results: Vec<ToolExecutionResult> = results.into_iter().collect::<Result<_, _>>()?;
771 Ok(results)
772}
773
774pub(super) async fn execute_tools_sequential_with_phases(
776 tools: &HashMap<String, Arc<dyn Tool>>,
777 calls: &[crate::contracts::thread::ToolCall],
778 initial_state: &Value,
779 phase_ctx: ToolPhaseContext<'_>,
780) -> Result<Vec<ToolExecutionResult>, AgentLoopError> {
781 use tirea_state::apply_patch;
782
783 if is_cancelled(phase_ctx.cancellation_token) {
784 return Err(cancelled_error(phase_ctx.thread_id));
785 }
786
787 let mut state = initial_state.clone();
788 let mut results = Vec::with_capacity(calls.len());
789
790 for call in calls {
791 let tool = tools.get(&call.name).cloned();
792 let call_phase_ctx = ToolPhaseContext {
793 tool_descriptors: phase_ctx.tool_descriptors,
794 agent_behavior: phase_ctx.agent_behavior,
795 activity_manager: phase_ctx.activity_manager.clone(),
796 run_config: phase_ctx.run_config,
797 thread_id: phase_ctx.thread_id,
798 thread_messages: phase_ctx.thread_messages,
799 cancellation_token: None,
800 };
801 let result = match await_or_cancel(
802 phase_ctx.cancellation_token,
803 execute_single_tool_with_phases_impl(tool.as_deref(), call, &state, &call_phase_ctx),
804 )
805 .await
806 {
807 CancelAware::Cancelled => return Err(cancelled_error(phase_ctx.thread_id)),
808 CancelAware::Value(result) => result?,
809 };
810
811 if let Some(ref patch) = result.execution.patch {
813 state = apply_patch(&state, patch.patch()).map_err(|e| {
814 AgentLoopError::StateError(format!(
815 "failed to apply tool patch for call '{}': {}",
816 result.execution.call.id, e
817 ))
818 })?;
819 }
820 for pp in &result.pending_patches {
822 state = apply_patch(&state, pp.patch()).map_err(|e| {
823 AgentLoopError::StateError(format!(
824 "failed to apply plugin patch for call '{}': {}",
825 result.execution.call.id, e
826 ))
827 })?;
828 }
829
830 results.push(result);
831
832 if results
833 .last()
834 .is_some_and(|r| matches!(r.outcome, ToolCallOutcome::Suspended))
835 {
836 break;
837 }
838 }
839
840 Ok(results)
841}
842
843#[cfg(test)]
845pub(super) async fn execute_single_tool_with_phases(
846 tool: Option<&dyn Tool>,
847 call: &crate::contracts::thread::ToolCall,
848 state: &Value,
849 phase_ctx: &ToolPhaseContext<'_>,
850) -> Result<ToolExecutionResult, AgentLoopError> {
851 execute_single_tool_with_phases_impl(tool, call, state, phase_ctx).await
852}
853
854pub(super) async fn execute_single_tool_with_phases_deferred(
855 tool: Option<&dyn Tool>,
856 call: &crate::contracts::thread::ToolCall,
857 state: &Value,
858 phase_ctx: &ToolPhaseContext<'_>,
859) -> Result<ToolExecutionResult, AgentLoopError> {
860 execute_single_tool_with_phases_impl(tool, call, state, phase_ctx).await
861}
862
863async fn execute_single_tool_with_phases_impl(
864 tool: Option<&dyn Tool>,
865 call: &crate::contracts::thread::ToolCall,
866 state: &Value,
867 phase_ctx: &ToolPhaseContext<'_>,
868) -> Result<ToolExecutionResult, AgentLoopError> {
869 let doc = tirea_state::DocCell::new(state.clone());
871 let ops = std::sync::Mutex::new(Vec::new());
872 let pending_messages = std::sync::Mutex::new(Vec::new());
873 let plugin_scope = phase_ctx.run_config;
874 let mut plugin_tool_call_ctx = crate::contracts::ToolCallContext::new(
875 &doc,
876 &ops,
877 "plugin_phase",
878 "plugin:tool_phase",
879 plugin_scope,
880 &pending_messages,
881 tirea_contract::runtime::activity::NoOpActivityManager::arc(),
882 );
883 if let Some(token) = phase_ctx.cancellation_token {
884 plugin_tool_call_ctx = plugin_tool_call_ctx.with_cancellation_token(token);
885 }
886
887 let mut step = StepContext::new(
889 plugin_tool_call_ctx,
890 phase_ctx.thread_id,
891 phase_ctx.thread_messages,
892 phase_ctx.tool_descriptors.to_vec(),
893 );
894 step.gate = Some(ToolGate::from_tool_call(call));
895 emit_tool_phase(
897 Phase::BeforeToolExecute,
898 &mut step,
899 phase_ctx.agent_behavior,
900 &doc,
901 )
902 .await?;
903
904 let (mut execution, outcome, suspended_call, tool_actions) = if step.tool_blocked() {
906 let reason = step
907 .gate
908 .as_ref()
909 .and_then(|g| g.block_reason.clone())
910 .unwrap_or_else(|| "Blocked by plugin".to_string());
911 (
912 ToolExecution {
913 call: call.clone(),
914 result: ToolResult::error(&call.name, reason),
915 patch: None,
916 },
917 ToolCallOutcome::Failed,
918 None,
919 Vec::<Box<dyn Action>>::new(),
920 )
921 } else if let Some(plugin_result) = step.tool_result().cloned() {
922 let outcome = ToolCallOutcome::from_tool_result(&plugin_result);
923 (
924 ToolExecution {
925 call: call.clone(),
926 result: plugin_result,
927 patch: None,
928 },
929 outcome,
930 None,
931 Vec::<Box<dyn Action>>::new(),
932 )
933 } else {
934 match tool {
935 None => (
936 ToolExecution {
937 call: call.clone(),
938 result: ToolResult::error(
939 &call.name,
940 format!("Tool '{}' not found", call.name),
941 ),
942 patch: None,
943 },
944 ToolCallOutcome::Failed,
945 None,
946 Vec::<Box<dyn Action>>::new(),
947 ),
948 Some(tool) => {
949 if let Err(e) = tool.validate_args(&call.arguments) {
950 (
951 ToolExecution {
952 call: call.clone(),
953 result: ToolResult::error(&call.name, e.to_string()),
954 patch: None,
955 },
956 ToolCallOutcome::Failed,
957 None,
958 Vec::<Box<dyn Action>>::new(),
959 )
960 } else if step.tool_pending() {
961 let Some(suspend_ticket) =
962 step.gate.as_ref().and_then(|g| g.suspend_ticket.clone())
963 else {
964 return Err(AgentLoopError::StateError(
965 "tool is pending but suspend ticket is missing".to_string(),
966 ));
967 };
968 (
969 ToolExecution {
970 call: call.clone(),
971 result: ToolResult::suspended(
972 &call.name,
973 "Execution suspended; awaiting external decision",
974 ),
975 patch: None,
976 },
977 ToolCallOutcome::Suspended,
978 Some(SuspendedCall::new(call, suspend_ticket)),
979 Vec::<Box<dyn Action>>::new(),
980 )
981 } else {
982 persist_tool_call_status(&step, call, ToolCallStatus::Running, None)?;
983 let tool_doc = tirea_state::DocCell::new(state.clone());
985 let tool_ops = std::sync::Mutex::new(Vec::new());
986 let tool_pending_msgs = std::sync::Mutex::new(Vec::new());
987 let mut tool_ctx = crate::contracts::ToolCallContext::new(
988 &tool_doc,
989 &tool_ops,
990 &call.id,
991 format!("tool:{}", call.name),
992 plugin_scope,
993 &tool_pending_msgs,
994 phase_ctx.activity_manager.clone(),
995 );
996 if let Some(token) = phase_ctx.cancellation_token {
997 tool_ctx = tool_ctx.with_cancellation_token(token);
998 }
999 let mut effect =
1000 match tool.execute_effect(call.arguments.clone(), &tool_ctx).await {
1001 Ok(effect) => effect,
1002 Err(e) => ToolExecutionEffect::from(ToolResult::error(
1003 &call.name,
1004 e.to_string(),
1005 )),
1006 };
1007
1008 let context_patch = tool_ctx.take_patch();
1009 if let Err(result) =
1010 merge_context_patch_into_effect(call, &mut effect, context_patch)
1011 {
1012 effect = ToolExecutionEffect::from(*result);
1013 }
1014 let (result, actions) = effect.into_parts();
1015 let outcome = ToolCallOutcome::from_tool_result(&result);
1016
1017 let suspended_call = if matches!(outcome, ToolCallOutcome::Suspended) {
1018 Some(suspended_call_from_tool_result(call, &result))
1019 } else {
1020 None
1021 };
1022
1023 (
1024 ToolExecution {
1025 call: call.clone(),
1026 result,
1027 patch: None,
1028 },
1029 outcome,
1030 suspended_call,
1031 actions,
1032 )
1033 }
1034 }
1035 }
1036 };
1037
1038 if let Some(gate) = step.gate.as_mut() {
1040 gate.result = Some(execution.result.clone());
1041 }
1042
1043 let mut tool_state_actions = Vec::<AnyStateAction>::new();
1046 let mut other_actions = Vec::<Box<dyn Action>>::new();
1047 for action in tool_actions {
1048 if action.is_state_action() {
1049 if let Some(sa) = action.into_state_action() {
1050 tool_state_actions.push(sa);
1051 }
1052 } else {
1053 other_actions.push(action);
1054 }
1055 }
1056
1057 for action in &other_actions {
1059 action
1060 .validate(Phase::AfterToolExecute)
1061 .map_err(AgentLoopError::StateError)?;
1062 }
1063 for action in other_actions {
1064 action.apply(&mut step);
1065 }
1066
1067 emit_tool_phase(
1069 Phase::AfterToolExecute,
1070 &mut step,
1071 phase_ctx.agent_behavior,
1072 &doc,
1073 )
1074 .await?;
1075
1076 match outcome {
1077 ToolCallOutcome::Suspended => {
1078 persist_tool_call_status(
1079 &step,
1080 call,
1081 ToolCallStatus::Suspended,
1082 suspended_call.as_ref(),
1083 )?;
1084 }
1085 ToolCallOutcome::Succeeded => {
1086 persist_tool_call_status(&step, call, ToolCallStatus::Succeeded, None)?;
1087 }
1088 ToolCallOutcome::Failed => {
1089 persist_tool_call_status(&step, call, ToolCallStatus::Failed, None)?;
1090 }
1091 }
1092
1093 if !matches!(outcome, ToolCallOutcome::Suspended) {
1096 let cleanup_path = format!("__tool_call_scope.{}", call.id);
1097 let cleanup_patch = Patch::with_ops(vec![tirea_state::Op::delete(
1098 tirea_state::parse_path(&cleanup_path),
1099 )]);
1100 let tracked = TrackedPatch::new(cleanup_patch).with_source("framework:scope_cleanup");
1101 step.emit_patch(tracked);
1102 }
1103
1104 let plugin_patch = step.ctx().take_patch();
1106 if !plugin_patch.patch().is_empty() {
1107 step.emit_patch(plugin_patch);
1108 }
1109
1110 let phase_patch_actions = std::mem::take(&mut step.pending_patches)
1111 .into_iter()
1112 .map(AnyStateAction::Patch);
1113
1114 let mut serialized_actions: Vec<tirea_contract::SerializedAction> = tool_state_actions
1116 .iter()
1117 .filter_map(|a| a.to_serialized_action())
1118 .collect();
1119
1120 let tool_scope_ctx = ScopeContext::for_call(&call.id);
1121 let execution_patch_parts = reduce_tool_state_actions(
1122 state,
1123 tool_state_actions,
1124 &format!("tool:{}", call.name),
1125 &tool_scope_ctx,
1126 )?;
1127 execution.patch = merge_tracked_patches(&execution_patch_parts, &format!("tool:{}", call.name));
1128
1129 let phase_base_state = if let Some(tool_patch) = execution.patch.as_ref() {
1130 tirea_state::apply_patch(state, tool_patch.patch()).map_err(|e| {
1131 AgentLoopError::StateError(format!(
1132 "failed to apply tool patch for call '{}': {}",
1133 call.id, e
1134 ))
1135 })?
1136 } else {
1137 state.clone()
1138 };
1139 let pending_patches = reduce_tool_state_actions(
1140 &phase_base_state,
1141 phase_patch_actions.collect(),
1142 "agent",
1143 &tool_scope_ctx,
1144 )?;
1145
1146 let reminders = step.messaging.reminders.clone();
1147 let user_messages = std::mem::take(&mut step.messaging.user_messages);
1148
1149 serialized_actions.extend(step.take_pending_serialized_actions());
1151
1152 Ok(ToolExecutionResult {
1153 execution,
1154 outcome,
1155 suspended_call,
1156 reminders,
1157 user_messages,
1158 pending_patches,
1159 serialized_actions,
1160 })
1161}
1162
1163fn reduce_tool_state_actions(
1164 base_state: &Value,
1165 actions: Vec<AnyStateAction>,
1166 source: &str,
1167 scope_ctx: &ScopeContext,
1168) -> Result<Vec<TrackedPatch>, AgentLoopError> {
1169 reduce_state_actions(actions, base_state, source, scope_ctx).map_err(|e| {
1170 AgentLoopError::StateError(format!("failed to reduce tool state actions: {e}"))
1171 })
1172}
1173
1174fn merge_tracked_patches(patches: &[TrackedPatch], source: &str) -> Option<TrackedPatch> {
1175 let mut merged = Patch::new();
1176 for tracked in patches {
1177 merged.extend(tracked.patch().clone());
1178 }
1179 if merged.is_empty() {
1180 None
1181 } else {
1182 Some(TrackedPatch::new(merged).with_source(source.to_string()))
1183 }
1184}
1185
1186fn cancelled_error(_thread_id: &str) -> AgentLoopError {
1187 AgentLoopError::Cancelled
1188}