Skip to main content

lash_core/session/
tool_execution.rs

1use super::execution_context::RuntimeExecutionContext;
2use crate::tool_dispatch::{
3    ToolCallLaunch, ToolDispatchOutcome, ToolPreparationOutcome,
4    dispatch_granted_prepared_tool_attempt_launch_with_execution_context,
5    dispatch_granted_prepared_tool_call_launch_with_execution_context,
6    dispatch_prepared_tool_attempt_launch_with_execution_context,
7    dispatch_prepared_tool_call_launch_with_execution_context,
8    finalize_tool_result_with_execution_context, mark_retry_exhausted,
9    prepare_granted_tool_call_with_context, prepare_tool_call_with_context, retry_after_ms,
10    schedule_tool_batch,
11};
12use crate::{
13    ModelToolReturn, SessionEvent, ToolCallOutput, ToolCallRecord, ToolCancellation, ToolFailure,
14    ToolFailureClass, ToolResult, TurnActivityId, TurnEvent,
15};
16use std::collections::HashMap;
17
18#[derive(Clone)]
19pub struct ToolInvocation {
20    pub id: String,
21    pub tool_id: crate::ToolId,
22    pub args: serde_json::Value,
23    pub execution_grant: Option<Box<crate::ToolExecutionGrant>>,
24    pub child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
25}
26
27impl ToolInvocation {
28    pub fn new(id: impl Into<String>, tool_id: crate::ToolId, args: serde_json::Value) -> Self {
29        Self {
30            id: id.into(),
31            tool_id,
32            args,
33            execution_grant: None,
34            child_execution_trace_hook: None,
35        }
36    }
37
38    pub fn with_execution_grant(mut self, grant: crate::ToolExecutionGrant) -> Self {
39        self.execution_grant = Some(Box::new(grant));
40        self
41    }
42
43    pub fn label(&self) -> String {
44        self.tool_id.to_string()
45    }
46
47    pub fn with_child_execution_trace_hook(
48        mut self,
49        hook: crate::ToolChildExecutionTraceHook,
50    ) -> Self {
51        self.child_execution_trace_hook = Some(hook);
52        self
53    }
54}
55
56impl std::fmt::Debug for ToolInvocation {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        f.debug_struct("ToolInvocation")
59            .field("id", &self.id)
60            .field("tool_id", &self.tool_id)
61            .field("args", &self.args)
62            .field(
63                "execution_grant",
64                &self.execution_grant.as_ref().map(|_| "<grant>"),
65            )
66            .field(
67                "child_execution_trace_hook",
68                &self.child_execution_trace_hook.as_ref().map(|_| "<hook>"),
69            )
70            .finish()
71    }
72}
73
74#[derive(Clone, Debug)]
75pub struct ToolInvocationReply {
76    pub output: ToolCallOutput,
77    pub record: Option<ToolCallRecord>,
78}
79
80impl ToolInvocationReply {
81    pub fn success(value: serde_json::Value) -> Self {
82        Self {
83            output: ToolCallOutput::success(value),
84            record: None,
85        }
86    }
87
88    pub fn error(value: serde_json::Value) -> Self {
89        let message = value
90            .as_str()
91            .map(ToOwned::to_owned)
92            .unwrap_or_else(|| value.to_string());
93        let mut failure = ToolFailure::tool(ToolFailureClass::Execution, "tool_error", message);
94        failure.raw =
95            Some(serde_json::from_value(value).unwrap_or_else(|_| {
96                crate::ToolValue::String("unserializable tool error".to_string())
97            }));
98        Self {
99            output: ToolCallOutput::failure(failure),
100            record: None,
101        }
102    }
103
104    pub fn from_output(output: ToolCallOutput) -> Self {
105        Self {
106            output,
107            record: None,
108        }
109    }
110
111    pub fn cancelled(message: impl Into<String>) -> Self {
112        Self::from_output(ToolCallOutput::cancelled(ToolCancellation::runtime(
113            message,
114        )))
115    }
116
117    pub(crate) fn with_record(mut self, record: ToolCallRecord) -> Self {
118        self.record = Some(record);
119        self
120    }
121}
122
123#[derive(Clone, Debug)]
124pub(crate) struct CompletedProtocolToolCall {
125    pub completed: crate::sansio::CompletedToolCall,
126    pub record: ToolCallRecord,
127}
128
129fn cancelled_runtime_tool_call_launch(
130    call_id: String,
131    tool_name: String,
132    args: serde_json::Value,
133    replay: Option<crate::llm::types::ProviderReplayMeta>,
134) -> crate::runtime::ToolCallLaunch {
135    crate::runtime::ToolCallLaunch::Done {
136        result: cancelled_completed_tool_call(call_id, tool_name, args, replay),
137    }
138}
139
140fn cancelled_completed_tool_call(
141    call_id: String,
142    tool_name: String,
143    args: serde_json::Value,
144    replay: Option<crate::llm::types::ProviderReplayMeta>,
145) -> crate::sansio::CompletedToolCall {
146    let output = ToolCallOutput::cancelled(ToolCancellation::runtime("tool call cancelled"));
147    crate::sansio::CompletedToolCall {
148        call_id: call_id.clone(),
149        tool_name: tool_name.clone(),
150        args,
151        model_return: ModelToolReturn {
152            call_id,
153            tool_name,
154            parts: vec![crate::ModelToolReturnPart::text(
155                "[Tool execution cancelled]\ntool call cancelled".to_string(),
156            )],
157        },
158        output,
159        duration_ms: 0,
160        replay,
161    }
162}
163
164fn runtime_failure_dispatch_outcome(
165    call_id: Option<String>,
166    tool_name: String,
167    args: serde_json::Value,
168    code: impl Into<String>,
169    message: impl Into<String>,
170) -> ToolDispatchOutcome {
171    ToolDispatchOutcome {
172        record: ToolCallRecord {
173            call_id,
174            tool: tool_name,
175            args,
176            output: ToolCallOutput::failure(ToolFailure::runtime(
177                ToolFailureClass::Internal,
178                code,
179                message,
180            )),
181            duration_ms: 0,
182        },
183    }
184}
185
186fn deterministic_tool_invocation_batch_id(calls: &[ToolInvocation]) -> String {
187    let identity = calls
188        .iter()
189        .map(|call| {
190            serde_json::json!({
191                "id": call.id.clone(),
192                "tool_id": call.tool_id.to_string(),
193                "args": call.args.clone(),
194                "execution_grant": call.execution_grant.as_ref().map(|grant| serde_json::json!({
195                    "tool_id": grant.manifest.id.to_string(),
196                    "source_id": grant.source_id.clone(),
197                    "execution_binding": grant.execution_binding.clone(),
198                })),
199            })
200        })
201        .collect::<Vec<_>>();
202    let digest = crate::stable_hash::stable_json_sha256_hex(&identity)
203        .unwrap_or_else(|_| format!("len-{}", calls.len()));
204    format!("tool-batch:{digest}")
205}
206
207struct CoordinatedToolLaunch {
208    launch: crate::runtime::ToolCallLaunch,
209    triggers: Vec<crate::tool_dispatch::ToolTriggerEffectOutcome>,
210}
211
212impl RuntimeExecutionContext<'_> {
213    fn tool_batch_invocation(&self, batch_id: &str) -> crate::RuntimeInvocation {
214        let suffix = format!("tool-batch:{batch_id}");
215        if let Some(parent) = self.parent_invocation.as_ref() {
216            let parent_effect_id = parent.effect_id().unwrap_or("effect");
217            return crate::runtime::causal::child_effect_invocation(
218                parent,
219                format!("{parent_effect_id}:{suffix}"),
220                crate::RuntimeEffectKind::ToolBatch,
221                suffix,
222            );
223        }
224        let replay_key = format!("{}:{suffix}", self.execution_scope_id());
225        crate::RuntimeInvocation::effect(
226            crate::RuntimeScope::new(self.session_id.clone()),
227            suffix,
228            crate::RuntimeEffectKind::ToolBatch,
229            replay_key,
230        )
231    }
232
233    pub(crate) async fn execute_prepared_tool_batch_launches(
234        &self,
235        batch: crate::PreparedToolBatch,
236        parent_invocation: crate::RuntimeInvocation,
237        child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
238    ) -> Result<crate::ToolBatchEffectOutcome, crate::RuntimeEffectControllerError> {
239        let indexed_tools = batch.calls.into_iter().enumerate().collect::<Vec<_>>();
240        let cancellation = self.cancellation_token.clone().unwrap_or_default();
241        let tool_cancel = cancellation.child_token();
242        let child_trace_hooks = std::sync::Arc::new(child_trace_hooks);
243        if !self
244            .dispatch
245            .effect_controller
246            .controller()
247            .supports_concurrent_effects()
248        {
249            let mut launches = Vec::with_capacity(indexed_tools.len());
250            let mut triggers = Vec::new();
251            let mut context = self.clone().with_cancellation_token(tool_cancel.clone());
252            for (index, child) in indexed_tools {
253                if cancellation.is_cancelled() {
254                    tool_cancel.cancel();
255                    launches.push(cancelled_runtime_tool_call_launch(
256                        child.call.call_id,
257                        child.call.tool_name,
258                        child.call.args,
259                        child.call.replay,
260                    ));
261                    continue;
262                }
263                let child_execution_trace_hook =
264                    child_trace_hooks.get(&child.call.call_id).cloned();
265                let outcome = context
266                    .execute_prepared_tool_batch_child(
267                        child,
268                        index,
269                        parent_invocation.clone(),
270                        child_execution_trace_hook,
271                    )
272                    .await;
273                launches.push(outcome.launch);
274                triggers.extend(outcome.triggers);
275                context = context.with_cancellation_token(tool_cancel.clone());
276            }
277            return Ok(crate::ToolBatchEffectOutcome { launches, triggers });
278        }
279        let child_outcomes = schedule_tool_batch(
280            indexed_tools,
281            |(index, _)| *index,
282            |(_, child)| self.tool_scheduling(&child.call.tool_name),
283            {
284                let context = self.clone();
285                let cancellation = cancellation.clone();
286                let tool_cancel = tool_cancel.clone();
287                let child_trace_hooks = std::sync::Arc::clone(&child_trace_hooks);
288                move |(index, child)| {
289                    let context = context.clone().with_cancellation_token(tool_cancel.clone());
290                    let cancellation = cancellation.clone();
291                    let tool_cancel = tool_cancel.clone();
292                    let parent_invocation = parent_invocation.clone();
293                    let cancelled_tool = child.call.clone();
294                    let child_execution_trace_hook =
295                        child_trace_hooks.get(&child.call.call_id).cloned();
296                    async move {
297                        let tool_call = context.execute_prepared_tool_batch_child(
298                            child,
299                            index,
300                            parent_invocation,
301                            child_execution_trace_hook,
302                        );
303                        tokio::pin!(tool_call);
304                        tokio::select! {
305                            biased;
306                            _ = cancellation.cancelled() => {
307                                tool_cancel.cancel();
308                                let grace = context
309                                    .dispatch
310                                    .clock
311                                    .sleep(std::time::Duration::from_millis(50));
312                                tokio::pin!(grace);
313                                tokio::select! {
314                                    biased;
315                                    outcome = &mut tool_call => outcome,
316                                    _ = &mut grace => CoordinatedToolLaunch {
317                                        launch: cancelled_runtime_tool_call_launch(
318                                            cancelled_tool.call_id,
319                                            cancelled_tool.tool_name,
320                                            cancelled_tool.args,
321                                            cancelled_tool.replay,
322                                        ),
323                                        triggers: Vec::new(),
324                                    },
325                                }
326                            }
327                            outcome = &mut tool_call => outcome,
328                        }
329                    }
330                }
331            },
332        )
333        .await;
334
335        let mut launches = Vec::with_capacity(child_outcomes.len());
336        let mut triggers = Vec::new();
337        for outcome in child_outcomes {
338            launches.push(outcome.launch);
339            triggers.extend(outcome.triggers);
340        }
341        Ok(crate::ToolBatchEffectOutcome { launches, triggers })
342    }
343
344    async fn execute_prepared_tool_batch_child(
345        &self,
346        child: crate::PreparedToolBatchCall,
347        index: usize,
348        parent_invocation: crate::RuntimeInvocation,
349        child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
350    ) -> CoordinatedToolLaunch {
351        let call_id = child.call.call_id.clone();
352        let tool_name = child.call.tool_name.clone();
353        let args = child.call.args.clone();
354        let replay = child.call.replay.clone();
355        let activity_id = TurnActivityId::new(format!("tool:{call_id}"));
356        self.emit_tool_call_started(&call_id, &tool_name, args.clone(), activity_id.clone())
357            .await;
358
359        let retry_policy = crate::tool_dispatch::resolve_callable_manifest_by_id(
360            self.dispatch.as_ref(),
361            &child.call.tool_id,
362        )
363        .map(|manifest| manifest.retry_policy)
364        .or_else(|| {
365            child
366                .execution_grant
367                .as_ref()
368                .map(|grant| grant.manifest.retry_policy)
369        })
370        .unwrap_or(crate::ToolRetryPolicy::Never);
371        let max_attempts = retry_policy.max_attempts().max(1);
372        let mut triggers = Vec::new();
373
374        for attempt in 1..=max_attempts {
375            let attempt_invocation =
376                self.tool_attempt_invocation(&parent_invocation, &child.replay_suffix, attempt);
377            let attempt_outcome = self
378                .dispatch
379                .effect_controller
380                .controller()
381                .execute_effect(
382                    crate::RuntimeEffectEnvelope::new(
383                        attempt_invocation,
384                        crate::RuntimeEffectCommand::ToolAttempt {
385                            call: child.call.clone(),
386                            execution_grant: child.execution_grant.clone(),
387                            attempt,
388                            max_attempts,
389                        },
390                    ),
391                    crate::RuntimeEffectLocalExecutor::tool_batch(
392                        self.clone(),
393                        child_execution_trace_hook
394                            .clone()
395                            .map(|hook| {
396                                std::iter::once((child.call.call_id.clone(), hook)).collect()
397                            })
398                            .unwrap_or_default(),
399                    ),
400                )
401                .await;
402            let attempt_outcome = match attempt_outcome {
403                Ok(outcome) => match outcome.into_tool_attempt_effect() {
404                    Ok(outcome) => outcome,
405                    Err(err) => {
406                        let completed = self
407                            .complete_tool_call(
408                                index,
409                                call_id.clone(),
410                                replay,
411                                runtime_failure_dispatch_outcome(
412                                    Some(call_id.clone()),
413                                    tool_name,
414                                    args,
415                                    "tool_attempt_failed",
416                                    err.to_string(),
417                                ),
418                                activity_id,
419                            )
420                            .await;
421                        return CoordinatedToolLaunch {
422                            launch: crate::runtime::ToolCallLaunch::Done {
423                                result: completed.completed,
424                            },
425                            triggers,
426                        };
427                    }
428                },
429                Err(err) => {
430                    let completed = self
431                        .complete_tool_call(
432                            index,
433                            call_id.clone(),
434                            replay,
435                            runtime_failure_dispatch_outcome(
436                                Some(call_id.clone()),
437                                tool_name,
438                                args,
439                                "tool_attempt_failed",
440                                err.to_string(),
441                            ),
442                            activity_id,
443                        )
444                        .await;
445                    return CoordinatedToolLaunch {
446                        launch: crate::runtime::ToolCallLaunch::Done {
447                            result: completed.completed,
448                        },
449                        triggers,
450                    };
451                }
452            };
453            triggers.extend(attempt_outcome.triggers);
454            match attempt_outcome.launch {
455                crate::ToolAttemptLaunch::Pending {
456                    key,
457                    pending,
458                    duration_ms,
459                } => {
460                    let dispatch_outcome = self
461                        .await_pending_tool_dispatch_outcome_with_suffix(
462                            &call_id,
463                            Some(parent_invocation.clone()),
464                            format!("{}:await", child.replay_suffix),
465                            crate::tool_dispatch::PendingToolDispatchOutcome {
466                                tool_name: child.call.tool_name.clone(),
467                                args: child.call.args.clone(),
468                                key,
469                                pending,
470                                duration_ms,
471                            },
472                            self.cancellation_token.clone(),
473                        )
474                        .await;
475                    let completed = self
476                        .complete_tool_call(
477                            index,
478                            call_id.clone(),
479                            child.call.replay.clone(),
480                            dispatch_outcome,
481                            activity_id,
482                        )
483                        .await;
484                    return CoordinatedToolLaunch {
485                        launch: crate::runtime::ToolCallLaunch::Done {
486                            result: completed.completed,
487                        },
488                        triggers,
489                    };
490                }
491                crate::ToolAttemptLaunch::Done { mut record } => {
492                    record.call_id = Some(call_id.clone());
493                    let retry_after = retry_after_ms(
494                        &ToolResult::from_output(record.output.clone()),
495                        retry_policy,
496                        attempt - 1,
497                    );
498                    let Some(retry_after) = retry_after else {
499                        let completed = self
500                            .complete_tool_call(
501                                index,
502                                call_id.clone(),
503                                child.call.replay.clone(),
504                                ToolDispatchOutcome { record },
505                                activity_id,
506                            )
507                            .await;
508                        return CoordinatedToolLaunch {
509                            launch: crate::runtime::ToolCallLaunch::Done {
510                                result: completed.completed,
511                            },
512                            triggers,
513                        };
514                    };
515                    if attempt >= max_attempts {
516                        let exhausted =
517                            mark_retry_exhausted(ToolResult::from_output(record.output), attempt);
518                        record.output = exhausted.into_done_output().unwrap_or_else(|_| {
519                            ToolCallOutput::failure(ToolFailure::runtime(
520                                ToolFailureClass::Internal,
521                                "tool_retry_exhaustion_failed",
522                                "retry exhaustion produced a pending output",
523                            ))
524                        });
525                        let completed = self
526                            .complete_tool_call(
527                                index,
528                                call_id.clone(),
529                                child.call.replay.clone(),
530                                ToolDispatchOutcome { record },
531                                activity_id,
532                            )
533                            .await;
534                        return CoordinatedToolLaunch {
535                            launch: crate::runtime::ToolCallLaunch::Done {
536                                result: completed.completed,
537                            },
538                            triggers,
539                        };
540                    }
541                    if retry_after > 0
542                        && let Err(err) = self
543                            .sleep_before_tool_retry(
544                                &parent_invocation,
545                                &child.replay_suffix,
546                                attempt,
547                                retry_after,
548                            )
549                            .await
550                    {
551                        let completed = self
552                            .complete_tool_call(
553                                index,
554                                call_id.clone(),
555                                child.call.replay.clone(),
556                                runtime_failure_dispatch_outcome(
557                                    Some(call_id.clone()),
558                                    child.call.tool_name.clone(),
559                                    child.call.args.clone(),
560                                    "tool_retry_sleep_failed",
561                                    format!(
562                                        "retry sleep for tool `{}` failed after attempt {attempt}: {err}",
563                                        child.call.tool_name
564                                    ),
565                                ),
566                                activity_id,
567                            )
568                            .await;
569                        return CoordinatedToolLaunch {
570                            launch: crate::runtime::ToolCallLaunch::Done {
571                                result: completed.completed,
572                            },
573                            triggers,
574                        };
575                    }
576                }
577            }
578        }
579
580        let completed = self
581            .complete_tool_call(
582                index,
583                call_id.clone(),
584                child.call.replay,
585                runtime_failure_dispatch_outcome(
586                    Some(call_id),
587                    child.call.tool_name,
588                    child.call.args,
589                    "tool_retry_loop_failed",
590                    "tool retry loop exited without a terminal result",
591                ),
592                activity_id,
593            )
594            .await;
595        CoordinatedToolLaunch {
596            launch: crate::runtime::ToolCallLaunch::Done {
597                result: completed.completed,
598            },
599            triggers,
600        }
601    }
602
603    async fn emit_tool_call_started(
604        &self,
605        call_id: &str,
606        name: &str,
607        args: serde_json::Value,
608        activity_id: TurnActivityId,
609    ) {
610        let _ = self
611            .dispatch
612            .event_tx
613            .send(SessionEvent::ToolCallStart {
614                call_id: Some(call_id.to_string()),
615                name: name.to_string(),
616                args: args.clone(),
617            })
618            .await;
619        self.emit_tool_call_started_trace(call_id, name, &args);
620        self.emit_turn_activity(
621            activity_id,
622            TurnEvent::ToolCallStarted {
623                call_id: Some(call_id.to_string()),
624                name: name.to_string(),
625                args,
626                graph_key: self.code_block_graph_key(),
627                parent_call_id: self.batch_parent_call_id(),
628            },
629        )
630        .await;
631    }
632
633    fn tool_attempt_invocation(
634        &self,
635        parent_invocation: &crate::RuntimeInvocation,
636        child_replay_suffix: &str,
637        attempt: u32,
638    ) -> crate::RuntimeInvocation {
639        let suffix = format!("{child_replay_suffix}:attempt:{attempt}");
640        let parent_effect_id = parent_invocation.effect_id().unwrap_or("tool-batch");
641        crate::runtime::causal::child_effect_invocation(
642            parent_invocation,
643            format!("{parent_effect_id}:{suffix}"),
644            crate::RuntimeEffectKind::ToolAttempt,
645            suffix,
646        )
647    }
648
649    async fn sleep_before_tool_retry(
650        &self,
651        parent_invocation: &crate::RuntimeInvocation,
652        child_replay_suffix: &str,
653        attempt: u32,
654        retry_after_ms: u64,
655    ) -> Result<(), crate::RuntimeEffectControllerError> {
656        let suffix = format!("{child_replay_suffix}:attempt:{attempt}:sleep");
657        let parent_effect_id = parent_invocation.effect_id().unwrap_or("tool-batch");
658        let invocation = crate::runtime::causal::child_effect_invocation(
659            parent_invocation,
660            format!("{parent_effect_id}:{suffix}"),
661            crate::RuntimeEffectKind::Sleep,
662            suffix,
663        );
664        let cancellation = self.cancellation_token.clone().unwrap_or_default();
665        let outcome = self
666            .dispatch
667            .effect_controller
668            .controller()
669            .execute_effect(
670                crate::RuntimeEffectEnvelope::new(
671                    invocation,
672                    crate::RuntimeEffectCommand::Sleep {
673                        duration_ms: retry_after_ms,
674                    },
675                ),
676                crate::RuntimeEffectLocalExecutor::sleep_with_clock(
677                    cancellation,
678                    std::sync::Arc::clone(&self.dispatch.clock),
679                ),
680            )
681            .await?;
682        match outcome {
683            crate::RuntimeEffectOutcome::Sleep => Ok(()),
684            other => Err(crate::RuntimeEffectControllerError::new(
685                "runtime_effect_wrong_outcome",
686                format!("expected sleep outcome, got {}", other.kind().as_str()),
687            )),
688        }
689    }
690
691    #[expect(
692        clippy::too_many_arguments,
693        reason = "tool execution carries explicit runtime call metadata"
694    )]
695    pub(crate) async fn execute_tool_call(
696        &self,
697        call_id: String,
698        name: String,
699        args: serde_json::Value,
700        index: usize,
701        replay: Option<crate::llm::types::ProviderReplayMeta>,
702        parent_invocation: Option<crate::RuntimeInvocation>,
703        child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
704    ) -> CompletedProtocolToolCall {
705        let _ = self
706            .dispatch
707            .event_tx
708            .send(SessionEvent::ToolCallStart {
709                call_id: Some(call_id.clone()),
710                name: name.clone(),
711                args: args.clone(),
712            })
713            .await;
714        let tool_correlation_id = TurnActivityId::new(format!("tool:{call_id}"));
715        self.emit_tool_call_started_trace(&call_id, &name, &args);
716        self.emit_turn_activity(
717            tool_correlation_id.clone(),
718            TurnEvent::ToolCallStarted {
719                call_id: Some(call_id.clone()),
720                name: name.clone(),
721                args: args.clone(),
722                graph_key: self.code_block_graph_key(),
723                parent_call_id: self.batch_parent_call_id(),
724            },
725        )
726        .await;
727
728        let parent_invocation = parent_invocation.or_else(|| self.parent_invocation.clone());
729        let mut dispatch = (*self.dispatch).clone();
730        dispatch.parent_invocation = parent_invocation.clone();
731        let pending = crate::sansio::PendingToolCall {
732            call_id: call_id.clone(),
733            tool_name: name,
734            args,
735            replay: replay.clone(),
736        };
737        let launch = match prepare_tool_call_with_context(&dispatch, pending, Some(call_id.clone()))
738            .await
739        {
740            ToolPreparationOutcome::Prepared(prepared) => {
741                let dispatch_context = std::sync::Arc::new(dispatch.clone());
742                let runtime_context = if let Some(parent_invocation) = parent_invocation.clone() {
743                    self.clone().with_parent_invocation(parent_invocation)
744                } else {
745                    self.clone()
746                };
747                let mut tool_context =
748                    crate::ToolContext::from_dispatch(std::sync::Arc::clone(&dispatch_context))
749                        .runtime_execution_context(runtime_context)
750                        .prepared_call(&prepared)
751                        .cancellation_token(self.cancellation_token.clone())
752                        .runtime_process_id(self.runtime_process_id.clone())
753                        .parent_invocation(parent_invocation.clone())
754                        .child_execution_trace_hook(child_execution_trace_hook.clone());
755                if let Some(process_events) = self.process_event_context.as_ref() {
756                    tool_context = tool_context.process_events(
757                        process_events.process_id.clone(),
758                        std::sync::Arc::clone(&process_events.registry),
759                        process_events.awaiter.clone(),
760                        process_events.store.clone(),
761                        process_events.session_store_factory.clone(),
762                        process_events.queued_work_driver.clone(),
763                    );
764                }
765                let tool_context = tool_context.build();
766                dispatch_prepared_tool_call_launch_with_execution_context(
767                    dispatch_context.as_ref(),
768                    prepared,
769                    None,
770                    tool_context,
771                )
772                .await
773            }
774            ToolPreparationOutcome::Completed(outcome) => ToolCallLaunch::Done(*outcome),
775        };
776        let mut outcome = match launch {
777            ToolCallLaunch::Done(outcome) => outcome,
778            ToolCallLaunch::Pending(pending) => {
779                self.await_pending_tool_dispatch_outcome(
780                    &call_id,
781                    parent_invocation.clone(),
782                    pending,
783                    self.cancellation_token.clone(),
784                )
785                .await
786            }
787        };
788        outcome.record.call_id = Some(call_id.clone());
789
790        self.complete_tool_call(index, call_id, replay, outcome, tool_correlation_id)
791            .await
792    }
793
794    #[expect(
795        clippy::too_many_arguments,
796        reason = "tool execution carries explicit runtime call metadata"
797    )]
798    pub(crate) async fn execute_tool_call_by_id(
799        &self,
800        call_id: String,
801        tool_id: crate::ToolId,
802        args: serde_json::Value,
803        index: usize,
804        replay: Option<crate::llm::types::ProviderReplayMeta>,
805        parent_invocation: Option<crate::RuntimeInvocation>,
806        child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
807    ) -> CompletedProtocolToolCall {
808        let Some(manifest) =
809            crate::tool_dispatch::resolve_callable_manifest_by_id(self.dispatch.as_ref(), &tool_id)
810        else {
811            let outcome = ToolDispatchOutcome {
812                record: ToolCallRecord {
813                    call_id: Some(call_id.clone()),
814                    tool: tool_id.to_string(),
815                    args,
816                    output: ToolCallOutput::failure(ToolFailure::runtime(
817                        ToolFailureClass::Unavailable,
818                        "tool_unavailable",
819                        format!("Tool id `{tool_id}` is unavailable in this session"),
820                    )),
821                    duration_ms: 0,
822                },
823            };
824            let activity_id = TurnActivityId::new(format!("tool:{call_id}"));
825            return self
826                .complete_tool_call(index, call_id, replay, outcome, activity_id)
827                .await;
828        };
829        self.execute_tool_call(
830            call_id,
831            manifest.name,
832            args,
833            index,
834            replay,
835            parent_invocation,
836            child_execution_trace_hook,
837        )
838        .await
839    }
840
841    #[expect(
842        clippy::too_many_arguments,
843        reason = "tool execution carries explicit runtime call metadata"
844    )]
845    pub(crate) async fn execute_tool_call_by_grant(
846        &self,
847        call_id: String,
848        grant: crate::ToolExecutionGrant,
849        args: serde_json::Value,
850        index: usize,
851        replay: Option<crate::llm::types::ProviderReplayMeta>,
852        parent_invocation: Option<crate::RuntimeInvocation>,
853        child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
854    ) -> CompletedProtocolToolCall {
855        let name = grant.manifest.name.clone();
856        let _ = self
857            .dispatch
858            .event_tx
859            .send(SessionEvent::ToolCallStart {
860                call_id: Some(call_id.clone()),
861                name: name.clone(),
862                args: args.clone(),
863            })
864            .await;
865        let tool_correlation_id = TurnActivityId::new(format!("tool:{call_id}"));
866        self.emit_tool_call_started_trace(&call_id, &name, &args);
867        self.emit_turn_activity(
868            tool_correlation_id.clone(),
869            TurnEvent::ToolCallStarted {
870                call_id: Some(call_id.clone()),
871                name: name.clone(),
872                args: args.clone(),
873                graph_key: self.code_block_graph_key(),
874                parent_call_id: self.batch_parent_call_id(),
875            },
876        )
877        .await;
878
879        let parent_invocation = parent_invocation.or_else(|| self.parent_invocation.clone());
880        let mut dispatch = (*self.dispatch).clone();
881        dispatch.parent_invocation = parent_invocation.clone();
882        let pending = crate::sansio::PendingToolCall {
883            call_id: call_id.clone(),
884            tool_name: name,
885            args,
886            replay: replay.clone(),
887        };
888        let launch = match prepare_granted_tool_call_with_context(
889            &dispatch,
890            &grant,
891            pending,
892            Some(call_id.clone()),
893        )
894        .await
895        {
896            ToolPreparationOutcome::Prepared(prepared) => {
897                let dispatch_context = std::sync::Arc::new(dispatch.clone());
898                let runtime_context = if let Some(parent_invocation) = parent_invocation.clone() {
899                    self.clone().with_parent_invocation(parent_invocation)
900                } else {
901                    self.clone()
902                };
903                let mut tool_context =
904                    crate::ToolContext::from_dispatch(std::sync::Arc::clone(&dispatch_context))
905                        .runtime_execution_context(runtime_context)
906                        .prepared_call(&prepared)
907                        .tool_execution_binding(grant.execution_binding.clone())
908                        .cancellation_token(self.cancellation_token.clone())
909                        .runtime_process_id(self.runtime_process_id.clone())
910                        .parent_invocation(parent_invocation.clone())
911                        .child_execution_trace_hook(child_execution_trace_hook.clone());
912                if let Some(process_events) = self.process_event_context.as_ref() {
913                    tool_context = tool_context.process_events(
914                        process_events.process_id.clone(),
915                        std::sync::Arc::clone(&process_events.registry),
916                        process_events.awaiter.clone(),
917                        process_events.store.clone(),
918                        process_events.session_store_factory.clone(),
919                        process_events.queued_work_driver.clone(),
920                    );
921                }
922                let tool_context = tool_context.build();
923                dispatch_granted_prepared_tool_call_launch_with_execution_context(
924                    dispatch_context.as_ref(),
925                    &grant,
926                    prepared,
927                    None,
928                    tool_context,
929                )
930                .await
931            }
932            ToolPreparationOutcome::Completed(outcome) => ToolCallLaunch::Done(*outcome),
933        };
934        let mut outcome = match launch {
935            ToolCallLaunch::Done(outcome) => outcome,
936            ToolCallLaunch::Pending(pending) => {
937                self.await_pending_tool_dispatch_outcome(
938                    &call_id,
939                    parent_invocation.clone(),
940                    pending,
941                    self.cancellation_token.clone(),
942                )
943                .await
944            }
945        };
946        outcome.record.call_id = Some(call_id.clone());
947
948        self.complete_tool_call(index, call_id, replay, outcome, tool_correlation_id)
949            .await
950    }
951
952    pub(crate) async fn prepare_tool_call(
953        &self,
954        pending: crate::sansio::PendingToolCall,
955    ) -> ToolPreparationOutcome {
956        let call_id = Some(pending.call_id.clone());
957        prepare_tool_call_with_context(self.dispatch.as_ref(), pending, call_id).await
958    }
959
960    pub(crate) async fn execute_prepared_tool_attempt_effect(
961        &self,
962        prepared: crate::PreparedToolCall,
963        execution_grant: Option<Box<crate::ToolExecutionGrant>>,
964        attempt: u32,
965        max_attempts: u32,
966        attempt_invocation: crate::RuntimeInvocation,
967        child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
968    ) -> Result<crate::ToolAttemptEffectOutcome, crate::RuntimeEffectControllerError> {
969        let call_id = prepared.call_id.clone();
970        let mut attempt_dispatch = (*self.dispatch).clone();
971        attempt_dispatch.parent_invocation = Some(attempt_invocation.clone());
972        attempt_dispatch.trigger_outcomes =
973            crate::tool_dispatch::ToolTriggerOutcomeBuffer::default();
974        let attempt_dispatch = std::sync::Arc::new(attempt_dispatch);
975        let mut attempt_context = self.clone();
976        attempt_context.dispatch = std::sync::Arc::clone(&attempt_dispatch);
977        attempt_context.parent_invocation = Some(attempt_invocation.clone());
978
979        let mut tool_context =
980            crate::ToolContext::from_dispatch(std::sync::Arc::clone(&attempt_dispatch))
981                .runtime_execution_context(attempt_context.clone())
982                .prepared_call(&prepared)
983                .cancellation_token(self.cancellation_token.clone())
984                .runtime_process_id(self.runtime_process_id.clone())
985                .parent_invocation(Some(attempt_invocation))
986                .child_execution_trace_hook(child_execution_trace_hook);
987        if let Some(process_events) = self.process_event_context.as_ref() {
988            tool_context = tool_context.process_events(
989                process_events.process_id.clone(),
990                std::sync::Arc::clone(&process_events.registry),
991                process_events.awaiter.clone(),
992                process_events.store.clone(),
993                process_events.session_store_factory.clone(),
994                process_events.queued_work_driver.clone(),
995            );
996        }
997        let tool_context = tool_context.build();
998        let launch = match Box::pin(async {
999            if let Some(grant) = execution_grant.as_ref() {
1000                dispatch_granted_prepared_tool_attempt_launch_with_execution_context(
1001                    attempt_dispatch.as_ref(),
1002                    grant,
1003                    prepared,
1004                    attempt,
1005                    max_attempts,
1006                    None,
1007                    tool_context,
1008                )
1009                .await
1010            } else {
1011                dispatch_prepared_tool_attempt_launch_with_execution_context(
1012                    attempt_dispatch.as_ref(),
1013                    prepared,
1014                    attempt,
1015                    max_attempts,
1016                    None,
1017                    tool_context,
1018                )
1019                .await
1020            }
1021        })
1022        .await
1023        {
1024            ToolCallLaunch::Done(outcome) => {
1025                let mut record = outcome.record;
1026                record.call_id = Some(call_id);
1027                crate::ToolAttemptLaunch::Done { record }
1028            }
1029            ToolCallLaunch::Pending(pending) => crate::ToolAttemptLaunch::Pending {
1030                key: pending.key,
1031                pending: pending.pending,
1032                duration_ms: pending.duration_ms,
1033            },
1034        };
1035        let triggers = attempt_context
1036            .drain_tool_trigger_outcomes()
1037            .map_err(|err| {
1038                crate::RuntimeEffectControllerError::new(
1039                    "tool_trigger_outcome_drain",
1040                    err.to_string(),
1041                )
1042            })?;
1043        Ok(crate::ToolAttemptEffectOutcome { launch, triggers })
1044    }
1045
1046    pub(super) async fn await_process_with_cancellation(
1047        &self,
1048        process_id: &str,
1049        parent_invocation: Option<crate::RuntimeInvocation>,
1050        cancellation: Option<tokio_util::sync::CancellationToken>,
1051    ) -> Result<crate::ProcessAwaitOutput, crate::PluginError> {
1052        let _phase = self.named_phase("process.await_handle");
1053        if let Some(cancellation) = cancellation {
1054            tokio::select! {
1055                result = self.dispatch.processes.await_process(
1056                    process_id,
1057                    self.process_scope(parent_invocation.clone()),
1058                ) => result,
1059                _ = cancellation.cancelled() => {
1060                    let _ = self.dispatch.processes.cancel(
1061                        &self.dispatch.session_id,
1062                        process_id,
1063                        self.process_scope(parent_invocation.clone()),
1064                    ).await;
1065                    self.dispatch.processes.await_process(
1066                        process_id,
1067                        self.process_scope(parent_invocation),
1068                    ).await
1069                }
1070            }
1071        } else {
1072            self.dispatch
1073                .processes
1074                .await_process(process_id, self.process_scope(parent_invocation))
1075                .await
1076        }
1077    }
1078
1079    pub(crate) async fn complete_tool_call(
1080        &self,
1081        _index: usize,
1082        call_id: String,
1083        replay: Option<crate::llm::types::ProviderReplayMeta>,
1084        outcome: ToolDispatchOutcome,
1085        tool_correlation_id: TurnActivityId,
1086    ) -> CompletedProtocolToolCall {
1087        let output = outcome.record.output.clone();
1088        let projection_output = output.clone();
1089        let projection_tool_name = outcome.record.tool.clone();
1090        let projection_args = outcome.record.args.clone();
1091        let projection_duration_ms = outcome.record.duration_ms;
1092        let projection_call_id = call_id.clone();
1093        tokio::task::yield_now().await;
1094        let plugins = std::sync::Arc::clone(&self.dispatch.plugins);
1095        let projection_context = crate::plugin::ToolResultProjectionContext {
1096            session_id: self.dispatch.session_id.clone(),
1097            tool_name: projection_tool_name,
1098            args: projection_args,
1099            output: projection_output,
1100            duration_ms: projection_duration_ms,
1101            call_id: projection_call_id,
1102        };
1103        let model_return = match plugins.project_tool_result(projection_context).await {
1104            Ok(projected) => projected,
1105            Err(err) => ModelToolReturn::text(
1106                call_id.clone(),
1107                outcome.record.tool.clone(),
1108                err.to_string(),
1109            ),
1110        };
1111
1112        let record = ToolCallRecord {
1113            call_id: Some(call_id.clone()),
1114            tool: outcome.record.tool.clone(),
1115            args: outcome.record.args.clone(),
1116            output: output.clone(),
1117            duration_ms: outcome.record.duration_ms,
1118        };
1119        self.emit_tool_call_completed_trace(&record);
1120        self.emit_turn_activity(
1121            tool_correlation_id,
1122            TurnEvent::ToolCallCompleted {
1123                call_id: Some(call_id.clone()),
1124                name: outcome.record.tool.clone(),
1125                args: outcome.record.args.clone(),
1126                output: output.clone(),
1127                duration_ms: outcome.record.duration_ms,
1128                graph_key: self.code_block_graph_key(),
1129                parent_call_id: self.batch_parent_call_id(),
1130            },
1131        )
1132        .await;
1133        CompletedProtocolToolCall {
1134            completed: crate::sansio::CompletedToolCall {
1135                call_id,
1136                tool_name: outcome.record.tool,
1137                args: outcome.record.args,
1138                output,
1139                model_return,
1140                duration_ms: outcome.record.duration_ms,
1141                replay,
1142            },
1143            record,
1144        }
1145    }
1146
1147    pub(crate) async fn pending_completion_dispatch_outcome(
1148        &self,
1149        tool_name: String,
1150        args: serde_json::Value,
1151        resolution: crate::Resolution,
1152        duration_ms: u64,
1153    ) -> ToolDispatchOutcome {
1154        let output = crate::tool_result::tool_output_from_completion_resolution(resolution);
1155        let result = finalize_tool_result_with_execution_context(
1156            self.dispatch.as_ref(),
1157            &tool_name,
1158            &args,
1159            ToolResult::from_output(output),
1160            duration_ms,
1161        )
1162        .await;
1163        let output = result.into_done_output().unwrap_or_else(|_| {
1164            ToolCallOutput::failure(ToolFailure::runtime(
1165                ToolFailureClass::Internal,
1166                "pending_tool_not_finalized",
1167                "pending tool result reached a completed-output projection path",
1168            ))
1169        });
1170        ToolDispatchOutcome {
1171            record: ToolCallRecord {
1172                call_id: None,
1173                tool: tool_name,
1174                args,
1175                output,
1176                duration_ms,
1177            },
1178        }
1179    }
1180
1181    async fn await_pending_tool_dispatch_outcome(
1182        &self,
1183        call_id: &str,
1184        parent_invocation: Option<crate::RuntimeInvocation>,
1185        pending: crate::tool_dispatch::PendingToolDispatchOutcome,
1186        cancellation: Option<tokio_util::sync::CancellationToken>,
1187    ) -> ToolDispatchOutcome {
1188        self.await_pending_tool_dispatch_outcome_with_suffix(
1189            call_id,
1190            parent_invocation,
1191            format!("{call_id}:await"),
1192            pending,
1193            cancellation,
1194        )
1195        .await
1196    }
1197
1198    async fn await_pending_tool_dispatch_outcome_with_suffix(
1199        &self,
1200        call_id: &str,
1201        parent_invocation: Option<crate::RuntimeInvocation>,
1202        replay_suffix: String,
1203        pending: crate::tool_dispatch::PendingToolDispatchOutcome,
1204        cancellation: Option<tokio_util::sync::CancellationToken>,
1205    ) -> ToolDispatchOutcome {
1206        let fallback;
1207        let parent = if let Some(parent) = parent_invocation.as_ref() {
1208            parent
1209        } else {
1210            fallback = crate::RuntimeInvocation::effect(
1211                crate::RuntimeScope::new(&self.dispatch.session_id),
1212                format!("tool:{call_id}:await"),
1213                crate::RuntimeEffectKind::AwaitEvent,
1214                format!("tool:{call_id}:await"),
1215            );
1216            &fallback
1217        };
1218        let parent_effect_id = parent.effect_id().unwrap_or("tool");
1219        let invocation = crate::runtime::causal::child_effect_invocation(
1220            parent,
1221            format!("{parent_effect_id}:{replay_suffix}"),
1222            crate::RuntimeEffectKind::AwaitEvent,
1223            replay_suffix,
1224        );
1225        let cancellation = cancellation.unwrap_or_default();
1226        let deadline = pending
1227            .pending
1228            .deadline
1229            .map(|duration| self.dispatch.clock.now() + duration);
1230        let outcome = self
1231            .dispatch
1232            .effect_controller
1233            .controller()
1234            .execute_effect(
1235                crate::RuntimeEffectEnvelope::new(
1236                    invocation,
1237                    crate::RuntimeEffectCommand::AwaitEvent { key: pending.key },
1238                ),
1239                crate::RuntimeEffectLocalExecutor::await_event_with_clock(
1240                    cancellation,
1241                    deadline,
1242                    std::sync::Arc::clone(&self.dispatch.clock),
1243                ),
1244            )
1245            .await;
1246        let resolution = match outcome.and_then(crate::RuntimeEffectOutcome::into_await_event) {
1247            Ok(resolution) => resolution,
1248            Err(err) => {
1249                return ToolDispatchOutcome {
1250                    record: ToolCallRecord {
1251                        call_id: None,
1252                        tool: pending.tool_name,
1253                        args: pending.args,
1254                        output: ToolCallOutput::failure(ToolFailure::runtime(
1255                            ToolFailureClass::Internal,
1256                            "pending_tool_completion_failed",
1257                            err.to_string(),
1258                        )),
1259                        duration_ms: pending.duration_ms,
1260                    },
1261                };
1262            }
1263        };
1264        self.pending_completion_dispatch_outcome(
1265            pending.tool_name,
1266            pending.args,
1267            resolution,
1268            pending.duration_ms,
1269        )
1270        .await
1271    }
1272
1273    pub async fn call_tool_by_id(
1274        &self,
1275        call_id: String,
1276        tool_id: crate::ToolId,
1277        args: serde_json::Value,
1278        index: usize,
1279    ) -> ToolInvocationReply {
1280        let executed = self
1281            .execute_tool_call_by_id(call_id, tool_id, args, index, None, None, None)
1282            .await;
1283        let reply = ToolInvocationReply::from_output(executed.completed.output);
1284        reply.with_record(executed.record)
1285    }
1286
1287    pub async fn call_tool_by_id_with_child_execution_trace_hook(
1288        &self,
1289        call_id: String,
1290        tool_id: crate::ToolId,
1291        args: serde_json::Value,
1292        index: usize,
1293        trace_hook: crate::ToolChildExecutionTraceHook,
1294    ) -> ToolInvocationReply {
1295        let executed = self
1296            .execute_tool_call_by_id(call_id, tool_id, args, index, None, None, Some(trace_hook))
1297            .await;
1298        let reply = ToolInvocationReply::from_output(executed.completed.output);
1299        reply.with_record(executed.record)
1300    }
1301
1302    pub async fn call_tool_with_execution_grant(
1303        &self,
1304        call_id: String,
1305        grant: crate::ToolExecutionGrant,
1306        args: serde_json::Value,
1307        index: usize,
1308    ) -> ToolInvocationReply {
1309        let executed = self
1310            .execute_tool_call_by_grant(call_id, grant, args, index, None, None, None)
1311            .await;
1312        let reply = ToolInvocationReply::from_output(executed.completed.output);
1313        reply.with_record(executed.record)
1314    }
1315
1316    pub async fn call_tool_with_execution_grant_and_child_execution_trace_hook(
1317        &self,
1318        call_id: String,
1319        grant: crate::ToolExecutionGrant,
1320        args: serde_json::Value,
1321        index: usize,
1322        trace_hook: crate::ToolChildExecutionTraceHook,
1323    ) -> ToolInvocationReply {
1324        let executed = self
1325            .execute_tool_call_by_grant(call_id, grant, args, index, None, None, Some(trace_hook))
1326            .await;
1327        let reply = ToolInvocationReply::from_output(executed.completed.output);
1328        reply.with_record(executed.record)
1329    }
1330
1331    pub async fn call_tool_batch(&self, calls: Vec<ToolInvocation>) -> Vec<ToolInvocationReply> {
1332        if calls.is_empty() {
1333            return Vec::new();
1334        }
1335
1336        let batch_id = deterministic_tool_invocation_batch_id(&calls);
1337        let mut replies = vec![None; calls.len()];
1338        let mut prepared_entries = Vec::new();
1339
1340        for (index, call) in calls.into_iter().enumerate() {
1341            let preparation = if let Some(grant) = call.execution_grant.as_deref().cloned() {
1342                let pending = crate::sansio::PendingToolCall {
1343                    call_id: call.id.clone(),
1344                    tool_name: grant.manifest.name.clone(),
1345                    args: call.args,
1346                    replay: None,
1347                };
1348                (
1349                    Some(grant.clone()),
1350                    prepare_granted_tool_call_with_context(
1351                        self.dispatch.as_ref(),
1352                        &grant,
1353                        pending,
1354                        Some(call.id.clone()),
1355                    )
1356                    .await,
1357                )
1358            } else {
1359                let Some(manifest) = crate::tool_dispatch::resolve_callable_manifest_by_id(
1360                    self.dispatch.as_ref(),
1361                    &call.tool_id,
1362                ) else {
1363                    let outcome = ToolDispatchOutcome {
1364                        record: ToolCallRecord {
1365                            call_id: Some(call.id.clone()),
1366                            tool: call.tool_id.to_string(),
1367                            args: call.args,
1368                            output: ToolCallOutput::failure(ToolFailure::runtime(
1369                                ToolFailureClass::Unavailable,
1370                                "tool_unavailable",
1371                                format!(
1372                                    "Tool id `{}` is unavailable in this session",
1373                                    call.tool_id
1374                                ),
1375                            )),
1376                            duration_ms: 0,
1377                        },
1378                    };
1379                    let completed = self
1380                        .complete_tool_call(
1381                            index,
1382                            call.id,
1383                            None,
1384                            outcome,
1385                            TurnActivityId::new(format!("tool:{}", batch_id)),
1386                        )
1387                        .await;
1388                    replies[index] = Some(
1389                        ToolInvocationReply::from_output(completed.completed.output)
1390                            .with_record(completed.record),
1391                    );
1392                    continue;
1393                };
1394
1395                let pending = crate::sansio::PendingToolCall {
1396                    call_id: call.id.clone(),
1397                    tool_name: manifest.name,
1398                    args: call.args,
1399                    replay: None,
1400                };
1401                (None, self.prepare_tool_call(pending).await)
1402            };
1403            let (execution_grant, preparation) = preparation;
1404            match preparation {
1405                ToolPreparationOutcome::Prepared(prepared) => {
1406                    prepared_entries.push((
1407                        index,
1408                        prepared,
1409                        execution_grant,
1410                        call.child_execution_trace_hook,
1411                    ));
1412                }
1413                ToolPreparationOutcome::Completed(outcome) => {
1414                    let completed = self
1415                        .complete_tool_call(
1416                            index,
1417                            call.id,
1418                            None,
1419                            *outcome,
1420                            TurnActivityId::new(format!("tool:{}", batch_id)),
1421                        )
1422                        .await;
1423                    replies[index] = Some(
1424                        ToolInvocationReply::from_output(completed.completed.output)
1425                            .with_record(completed.record),
1426                    );
1427                }
1428            }
1429        }
1430
1431        if !prepared_entries.is_empty() {
1432            let invocation = self.tool_batch_invocation(&batch_id);
1433            let batch = crate::PreparedToolBatch::new_with_grants(
1434                batch_id.clone(),
1435                prepared_entries
1436                    .iter()
1437                    .map(|(_, prepared, grant, _)| (prepared.clone(), grant.clone()))
1438                    .collect(),
1439            );
1440            let child_trace_hooks = prepared_entries
1441                .iter()
1442                .filter_map(|(_, prepared, _, hook)| {
1443                    hook.clone().map(|hook| (prepared.call_id.clone(), hook))
1444                })
1445                .collect();
1446            let envelope = crate::RuntimeEffectEnvelope::new(
1447                invocation.clone(),
1448                crate::RuntimeEffectCommand::ToolBatch { batch },
1449            );
1450            let local_executor =
1451                crate::RuntimeEffectLocalExecutor::tool_batch(self.clone(), child_trace_hooks);
1452            let raw_outcome = self
1453                .dispatch
1454                .effect_controller
1455                .controller()
1456                .execute_effect(envelope, local_executor)
1457                .await;
1458            let outcome =
1459                match raw_outcome.and_then(crate::RuntimeEffectOutcome::into_tool_batch_effect) {
1460                    Ok(outcome) => outcome,
1461                    Err(err) => {
1462                        for (index, prepared, _, _) in prepared_entries {
1463                            replies[index] = Some(ToolInvocationReply::error(serde_json::json!(
1464                                format!("tool batch failed: {err}")
1465                            )));
1466                            let _ = prepared;
1467                        }
1468                        return replies
1469                            .into_iter()
1470                            .map(|reply| reply.expect("every batch reply slot should be filled"))
1471                            .collect();
1472                    }
1473                };
1474            if outcome.launches.len() != prepared_entries.len() {
1475                let message = format!(
1476                    "tool batch returned {} launches for {} prepared calls",
1477                    outcome.launches.len(),
1478                    prepared_entries.len()
1479                );
1480                for (index, _, _, _) in prepared_entries {
1481                    replies[index] = Some(ToolInvocationReply::error(serde_json::json!(message)));
1482                }
1483            } else {
1484                for ((index, prepared, _, _), launch) in
1485                    prepared_entries.into_iter().zip(outcome.launches)
1486                {
1487                    let call_id = prepared.call_id.clone();
1488                    let reply = match launch {
1489                        crate::runtime::ToolCallLaunch::Done { result } => {
1490                            let record = ToolCallRecord {
1491                                call_id: Some(result.call_id.clone()),
1492                                tool: result.tool_name.clone(),
1493                                args: result.args.clone(),
1494                                output: result.output.clone(),
1495                                duration_ms: result.duration_ms,
1496                            };
1497                            ToolInvocationReply::from_output(result.output).with_record(record)
1498                        }
1499                        crate::runtime::ToolCallLaunch::Pending {
1500                            key,
1501                            pending,
1502                            duration_ms,
1503                        } => {
1504                            let dispatch_outcome = self
1505                                .await_pending_tool_dispatch_outcome(
1506                                    &call_id,
1507                                    Some(invocation.clone()),
1508                                    crate::tool_dispatch::PendingToolDispatchOutcome {
1509                                        tool_name: prepared.tool_name.clone(),
1510                                        args: prepared.args.clone(),
1511                                        key,
1512                                        pending,
1513                                        duration_ms,
1514                                    },
1515                                    self.cancellation_token.clone(),
1516                                )
1517                                .await;
1518                            let completed = self
1519                                .complete_tool_call(
1520                                    index,
1521                                    call_id.clone(),
1522                                    prepared.replay.clone(),
1523                                    dispatch_outcome,
1524                                    TurnActivityId::new(format!("tool:{call_id}")),
1525                                )
1526                                .await;
1527                            ToolInvocationReply::from_output(completed.completed.output)
1528                                .with_record(completed.record)
1529                        }
1530                    };
1531                    replies[index] = Some(reply);
1532                }
1533            }
1534        }
1535
1536        replies
1537            .into_iter()
1538            .map(|reply| reply.expect("every batch reply slot should be filled"))
1539            .collect()
1540    }
1541
1542    pub async fn start_tool_call(
1543        &self,
1544        call_id: String,
1545        name: String,
1546        args: serde_json::Value,
1547    ) -> ToolInvocationReply {
1548        self.start_tool_process(call_id, name, args).await
1549    }
1550
1551    pub async fn await_tool_handle(
1552        &self,
1553        call_id: String,
1554        handle: serde_json::Value,
1555    ) -> ToolInvocationReply {
1556        self.await_process_handle(call_id, handle).await
1557    }
1558
1559    pub async fn cancel_tool_handle(
1560        &self,
1561        call_id: String,
1562        handle: serde_json::Value,
1563    ) -> ToolInvocationReply {
1564        self.cancel_process_handle(call_id, handle).await
1565    }
1566
1567    pub async fn signal_tool_handle(
1568        &self,
1569        call_id: String,
1570        handle: serde_json::Value,
1571        signal_name: String,
1572        payload: serde_json::Value,
1573    ) -> ToolInvocationReply {
1574        self.signal_process_handle(call_id, handle, signal_name, payload)
1575            .await
1576    }
1577}