Skip to main content

lash_core/session/
tool_execution.rs

1use super::execution_context::RuntimeExecutionContext;
2use crate::tool_dispatch::{
3    ToolCallLaunch, ToolDispatchOutcome, ToolPreparationOutcome,
4    dispatch_granted_prepared_tool_attempt_launch_with_execution_context,
5    dispatch_granted_prepared_tool_call_launch_with_execution_context,
6    dispatch_prepared_tool_attempt_launch_with_execution_context,
7    dispatch_prepared_tool_call_launch_with_execution_context,
8    finalize_tool_result_with_execution_context, mark_retry_exhausted,
9    prepare_granted_tool_call_with_context, prepare_tool_call_with_context, retry_after_ms,
10    schedule_tool_batch,
11};
12use crate::{
13    ModelToolReturn, SessionEvent, ToolCallOutput, ToolCallRecord, ToolCancellation, ToolFailure,
14    ToolFailureClass, ToolResult, TurnActivityId, TurnEvent,
15};
16use std::collections::HashMap;
17
18#[derive(Clone)]
19pub struct ToolInvocation {
20    pub id: String,
21    pub tool_id: crate::ToolId,
22    pub args: serde_json::Value,
23    pub execution_grant: Option<Box<crate::ToolExecutionGrant>>,
24    pub child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
25}
26
27impl ToolInvocation {
28    pub fn new(id: impl Into<String>, tool_id: crate::ToolId, args: serde_json::Value) -> Self {
29        Self {
30            id: id.into(),
31            tool_id,
32            args,
33            execution_grant: None,
34            child_execution_trace_hook: None,
35        }
36    }
37
38    pub fn with_execution_grant(mut self, grant: crate::ToolExecutionGrant) -> Self {
39        self.execution_grant = Some(Box::new(grant));
40        self
41    }
42
43    pub fn label(&self) -> String {
44        self.tool_id.to_string()
45    }
46
47    pub fn with_child_execution_trace_hook(
48        mut self,
49        hook: crate::ToolChildExecutionTraceHook,
50    ) -> Self {
51        self.child_execution_trace_hook = Some(hook);
52        self
53    }
54}
55
56impl std::fmt::Debug for ToolInvocation {
57    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58        f.debug_struct("ToolInvocation")
59            .field("id", &self.id)
60            .field("tool_id", &self.tool_id)
61            .field("args", &self.args)
62            .field(
63                "execution_grant",
64                &self.execution_grant.as_ref().map(|_| "<grant>"),
65            )
66            .field(
67                "child_execution_trace_hook",
68                &self.child_execution_trace_hook.as_ref().map(|_| "<hook>"),
69            )
70            .finish()
71    }
72}
73
74#[derive(Clone, Debug)]
75pub struct ToolInvocationReply {
76    pub output: ToolCallOutput,
77    pub record: Option<ToolCallRecord>,
78}
79
80impl ToolInvocationReply {
81    pub fn success(value: serde_json::Value) -> Self {
82        Self {
83            output: ToolCallOutput::success(value),
84            record: None,
85        }
86    }
87
88    pub fn error(value: serde_json::Value) -> Self {
89        let message = value
90            .as_str()
91            .map(ToOwned::to_owned)
92            .unwrap_or_else(|| value.to_string());
93        let mut failure = ToolFailure::tool(ToolFailureClass::Execution, "tool_error", message);
94        failure.raw =
95            Some(serde_json::from_value(value).unwrap_or_else(|_| {
96                crate::ToolValue::String("unserializable tool error".to_string())
97            }));
98        Self {
99            output: ToolCallOutput::failure(failure),
100            record: None,
101        }
102    }
103
104    pub fn from_output(output: ToolCallOutput) -> Self {
105        Self {
106            output,
107            record: None,
108        }
109    }
110
111    pub fn cancelled(message: impl Into<String>) -> Self {
112        Self::from_output(ToolCallOutput::cancelled(ToolCancellation::runtime(
113            message,
114        )))
115    }
116
117    pub(crate) fn with_record(mut self, record: ToolCallRecord) -> Self {
118        self.record = Some(record);
119        self
120    }
121}
122
123#[derive(Clone, Debug)]
124pub(crate) struct CompletedProtocolToolCall {
125    pub completed: crate::sansio::CompletedToolCall,
126    pub record: ToolCallRecord,
127}
128
129fn cancelled_runtime_tool_call_launch(
130    call_id: String,
131    tool_name: String,
132    args: serde_json::Value,
133    replay: Option<crate::llm::types::ProviderReplayMeta>,
134) -> crate::runtime::ToolCallLaunch {
135    crate::runtime::ToolCallLaunch::Done {
136        result: cancelled_completed_tool_call(call_id, tool_name, args, replay),
137    }
138}
139
140fn cancelled_completed_tool_call(
141    call_id: String,
142    tool_name: String,
143    args: serde_json::Value,
144    replay: Option<crate::llm::types::ProviderReplayMeta>,
145) -> crate::sansio::CompletedToolCall {
146    let output = ToolCallOutput::cancelled(ToolCancellation::runtime("tool call cancelled"));
147    crate::sansio::CompletedToolCall {
148        call_id: call_id.clone(),
149        tool_name: tool_name.clone(),
150        args,
151        model_return: ModelToolReturn {
152            call_id,
153            tool_name,
154            parts: vec![crate::ModelToolReturnPart::text(
155                "[Tool execution cancelled]\ntool call cancelled".to_string(),
156            )],
157        },
158        output,
159        duration_ms: 0,
160        replay,
161    }
162}
163
164fn runtime_failure_dispatch_outcome(
165    call_id: Option<String>,
166    tool_name: String,
167    args: serde_json::Value,
168    code: impl Into<String>,
169    message: impl Into<String>,
170) -> ToolDispatchOutcome {
171    ToolDispatchOutcome {
172        record: ToolCallRecord {
173            call_id,
174            tool: tool_name,
175            args,
176            output: ToolCallOutput::failure(ToolFailure::runtime(
177                ToolFailureClass::Internal,
178                code,
179                message,
180            )),
181            duration_ms: 0,
182        },
183    }
184}
185
186fn deterministic_tool_invocation_batch_id(calls: &[ToolInvocation]) -> String {
187    let identity = calls
188        .iter()
189        .map(|call| {
190            serde_json::json!({
191                "id": call.id.clone(),
192                "tool_id": call.tool_id.to_string(),
193                "args": call.args.clone(),
194                "execution_grant": call.execution_grant.as_ref().map(|grant| serde_json::json!({
195                    "tool_id": grant.manifest.id.to_string(),
196                    "source_id": grant.source_id.clone(),
197                    "execution_binding": grant.execution_binding.clone(),
198                })),
199            })
200        })
201        .collect::<Vec<_>>();
202    let digest = crate::stable_hash::stable_json_sha256_hex(&identity)
203        .unwrap_or_else(|_| format!("len-{}", calls.len()));
204    format!("tool-batch:{digest}")
205}
206
207struct CoordinatedToolLaunch {
208    launch: crate::runtime::ToolCallLaunch,
209    triggers: Vec<crate::tool_dispatch::ToolTriggerEffectOutcome>,
210}
211
212impl RuntimeExecutionContext<'_> {
213    fn tool_batch_invocation(&self, batch_id: &str) -> crate::RuntimeInvocation {
214        let suffix = format!("tool-batch:{batch_id}");
215        if let Some(parent) = self.parent_invocation.as_ref() {
216            let parent_effect_id = parent.effect_id().unwrap_or("effect");
217            return crate::runtime::causal::child_effect_invocation(
218                parent,
219                format!("{parent_effect_id}:{suffix}"),
220                crate::RuntimeEffectKind::ToolBatch,
221                suffix,
222            );
223        }
224        let replay_key = format!("{}:{suffix}", self.execution_scope_id());
225        crate::RuntimeInvocation::effect(
226            crate::RuntimeScope::new(self.session_id.clone()),
227            suffix,
228            crate::RuntimeEffectKind::ToolBatch,
229            replay_key,
230        )
231    }
232
233    pub(crate) async fn execute_prepared_tool_batch_launches(
234        &self,
235        batch: crate::PreparedToolBatch,
236        parent_invocation: crate::RuntimeInvocation,
237        child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
238    ) -> Result<crate::ToolBatchEffectOutcome, crate::RuntimeEffectControllerError> {
239        let indexed_tools = batch.calls.into_iter().enumerate().collect::<Vec<_>>();
240        let cancellation = self.cancellation_token.clone().unwrap_or_default();
241        let tool_cancel = cancellation.child_token();
242        let child_trace_hooks = std::sync::Arc::new(child_trace_hooks);
243        if !self
244            .dispatch
245            .effect_controller
246            .controller()
247            .supports_concurrent_effects()
248        {
249            let mut launches = Vec::with_capacity(indexed_tools.len());
250            let mut triggers = Vec::new();
251            let mut context = self.clone().with_cancellation_token(tool_cancel.clone());
252            for (index, child) in indexed_tools {
253                if cancellation.is_cancelled() {
254                    tool_cancel.cancel();
255                    launches.push(cancelled_runtime_tool_call_launch(
256                        child.call.call_id,
257                        child.call.tool_name,
258                        child.call.args,
259                        child.call.replay,
260                    ));
261                    continue;
262                }
263                let child_execution_trace_hook =
264                    child_trace_hooks.get(&child.call.call_id).cloned();
265                let outcome = context
266                    .execute_prepared_tool_batch_child(
267                        child,
268                        index,
269                        parent_invocation.clone(),
270                        child_execution_trace_hook,
271                    )
272                    .await;
273                launches.push(outcome.launch);
274                triggers.extend(outcome.triggers);
275                context = context.with_cancellation_token(tool_cancel.clone());
276            }
277            return Ok(crate::ToolBatchEffectOutcome { launches, triggers });
278        }
279        let child_outcomes = schedule_tool_batch(
280            indexed_tools,
281            |(index, _)| *index,
282            |(_, child)| self.tool_scheduling(&child.call.tool_name),
283            {
284                let context = self.clone();
285                let cancellation = cancellation.clone();
286                let tool_cancel = tool_cancel.clone();
287                let child_trace_hooks = std::sync::Arc::clone(&child_trace_hooks);
288                move |(index, child)| {
289                    let context = context.clone().with_cancellation_token(tool_cancel.clone());
290                    let cancellation = cancellation.clone();
291                    let tool_cancel = tool_cancel.clone();
292                    let parent_invocation = parent_invocation.clone();
293                    let cancelled_tool = child.call.clone();
294                    let child_execution_trace_hook =
295                        child_trace_hooks.get(&child.call.call_id).cloned();
296                    async move {
297                        let tool_call = context.execute_prepared_tool_batch_child(
298                            child,
299                            index,
300                            parent_invocation,
301                            child_execution_trace_hook,
302                        );
303                        tokio::pin!(tool_call);
304                        tokio::select! {
305                            biased;
306                            _ = cancellation.cancelled() => {
307                                tool_cancel.cancel();
308                                let grace = context
309                                    .dispatch
310                                    .clock
311                                    .sleep(std::time::Duration::from_millis(50));
312                                tokio::pin!(grace);
313                                tokio::select! {
314                                    biased;
315                                    outcome = &mut tool_call => outcome,
316                                    _ = &mut grace => CoordinatedToolLaunch {
317                                        launch: cancelled_runtime_tool_call_launch(
318                                            cancelled_tool.call_id,
319                                            cancelled_tool.tool_name,
320                                            cancelled_tool.args,
321                                            cancelled_tool.replay,
322                                        ),
323                                        triggers: Vec::new(),
324                                    },
325                                }
326                            }
327                            outcome = &mut tool_call => outcome,
328                        }
329                    }
330                }
331            },
332        )
333        .await;
334
335        let mut launches = Vec::with_capacity(child_outcomes.len());
336        let mut triggers = Vec::new();
337        for outcome in child_outcomes {
338            launches.push(outcome.launch);
339            triggers.extend(outcome.triggers);
340        }
341        Ok(crate::ToolBatchEffectOutcome { launches, triggers })
342    }
343
344    async fn execute_prepared_tool_batch_child(
345        &self,
346        child: crate::PreparedToolBatchCall,
347        index: usize,
348        parent_invocation: crate::RuntimeInvocation,
349        child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
350    ) -> CoordinatedToolLaunch {
351        let call_id = child.call.call_id.clone();
352        let tool_name = child.call.tool_name.clone();
353        let args = child.call.args.clone();
354        let replay = child.call.replay.clone();
355        let activity_id = TurnActivityId::new(format!("tool:{call_id}"));
356        self.emit_tool_call_started(&call_id, &tool_name, args.clone(), activity_id.clone())
357            .await;
358
359        let retry_policy = crate::tool_dispatch::resolve_callable_manifest_by_id(
360            self.dispatch.as_ref(),
361            &child.call.tool_id,
362        )
363        .map(|manifest| manifest.retry_policy)
364        .or_else(|| {
365            child
366                .execution_grant
367                .as_ref()
368                .map(|grant| grant.manifest.retry_policy)
369        })
370        .unwrap_or(crate::ToolRetryPolicy::Never);
371        let max_attempts = retry_policy.max_attempts().max(1);
372        let mut triggers = Vec::new();
373
374        for attempt in 1..=max_attempts {
375            let attempt_invocation =
376                self.tool_attempt_invocation(&parent_invocation, &child.replay_suffix, attempt);
377            let attempt_outcome = self
378                .dispatch
379                .effect_controller
380                .controller()
381                .execute_effect(
382                    crate::RuntimeEffectEnvelope::new(
383                        attempt_invocation,
384                        crate::RuntimeEffectCommand::ToolAttempt {
385                            call: child.call.clone(),
386                            execution_grant: child.execution_grant.clone(),
387                            attempt,
388                            max_attempts,
389                        },
390                    ),
391                    crate::RuntimeEffectLocalExecutor::tool_batch(
392                        self.clone(),
393                        child_execution_trace_hook
394                            .clone()
395                            .map(|hook| {
396                                std::iter::once((child.call.call_id.clone(), hook)).collect()
397                            })
398                            .unwrap_or_default(),
399                    ),
400                )
401                .await;
402            let attempt_outcome = match attempt_outcome {
403                Ok(outcome) => match outcome.into_tool_attempt_effect() {
404                    Ok(outcome) => outcome,
405                    Err(err) => {
406                        let completed = self
407                            .complete_tool_call(
408                                index,
409                                call_id.clone(),
410                                replay,
411                                runtime_failure_dispatch_outcome(
412                                    Some(call_id.clone()),
413                                    tool_name,
414                                    args,
415                                    "tool_attempt_failed",
416                                    err.to_string(),
417                                ),
418                                activity_id,
419                            )
420                            .await;
421                        return CoordinatedToolLaunch {
422                            launch: crate::runtime::ToolCallLaunch::Done {
423                                result: completed.completed,
424                            },
425                            triggers,
426                        };
427                    }
428                },
429                Err(err) => {
430                    let completed = self
431                        .complete_tool_call(
432                            index,
433                            call_id.clone(),
434                            replay,
435                            runtime_failure_dispatch_outcome(
436                                Some(call_id.clone()),
437                                tool_name,
438                                args,
439                                "tool_attempt_failed",
440                                err.to_string(),
441                            ),
442                            activity_id,
443                        )
444                        .await;
445                    return CoordinatedToolLaunch {
446                        launch: crate::runtime::ToolCallLaunch::Done {
447                            result: completed.completed,
448                        },
449                        triggers,
450                    };
451                }
452            };
453            triggers.extend(attempt_outcome.triggers);
454            match attempt_outcome.launch {
455                crate::ToolAttemptLaunch::Pending {
456                    key,
457                    pending,
458                    duration_ms,
459                } => {
460                    let dispatch_outcome = self
461                        .await_pending_tool_dispatch_outcome_with_suffix(
462                            &call_id,
463                            Some(parent_invocation.clone()),
464                            format!("{}:await", child.replay_suffix),
465                            crate::tool_dispatch::PendingToolDispatchOutcome {
466                                tool_name: child.call.tool_name.clone(),
467                                args: child.call.args.clone(),
468                                key,
469                                pending,
470                                duration_ms,
471                            },
472                            self.cancellation_token.clone(),
473                        )
474                        .await;
475                    let completed = self
476                        .complete_tool_call(
477                            index,
478                            call_id.clone(),
479                            child.call.replay.clone(),
480                            dispatch_outcome,
481                            activity_id,
482                        )
483                        .await;
484                    return CoordinatedToolLaunch {
485                        launch: crate::runtime::ToolCallLaunch::Done {
486                            result: completed.completed,
487                        },
488                        triggers,
489                    };
490                }
491                crate::ToolAttemptLaunch::Done { mut record } => {
492                    record.call_id = Some(call_id.clone());
493                    let retry_after = retry_after_ms(
494                        &ToolResult::from_output(record.output.clone()),
495                        retry_policy,
496                        attempt - 1,
497                    );
498                    let Some(retry_after) = retry_after else {
499                        let completed = self
500                            .complete_tool_call(
501                                index,
502                                call_id.clone(),
503                                child.call.replay.clone(),
504                                ToolDispatchOutcome { record },
505                                activity_id,
506                            )
507                            .await;
508                        return CoordinatedToolLaunch {
509                            launch: crate::runtime::ToolCallLaunch::Done {
510                                result: completed.completed,
511                            },
512                            triggers,
513                        };
514                    };
515                    if attempt >= max_attempts {
516                        let exhausted =
517                            mark_retry_exhausted(ToolResult::from_output(record.output), attempt);
518                        record.output = exhausted.into_done_output().unwrap_or_else(|_| {
519                            ToolCallOutput::failure(ToolFailure::runtime(
520                                ToolFailureClass::Internal,
521                                "tool_retry_exhaustion_failed",
522                                "retry exhaustion produced a pending output",
523                            ))
524                        });
525                        let completed = self
526                            .complete_tool_call(
527                                index,
528                                call_id.clone(),
529                                child.call.replay.clone(),
530                                ToolDispatchOutcome { record },
531                                activity_id,
532                            )
533                            .await;
534                        return CoordinatedToolLaunch {
535                            launch: crate::runtime::ToolCallLaunch::Done {
536                                result: completed.completed,
537                            },
538                            triggers,
539                        };
540                    }
541                    if retry_after > 0
542                        && let Err(err) = self
543                            .sleep_before_tool_retry(
544                                &parent_invocation,
545                                &child.replay_suffix,
546                                attempt,
547                                retry_after,
548                            )
549                            .await
550                    {
551                        let completed = self
552                            .complete_tool_call(
553                                index,
554                                call_id.clone(),
555                                child.call.replay.clone(),
556                                runtime_failure_dispatch_outcome(
557                                    Some(call_id.clone()),
558                                    child.call.tool_name.clone(),
559                                    child.call.args.clone(),
560                                    "tool_retry_sleep_failed",
561                                    format!(
562                                        "retry sleep for tool `{}` failed after attempt {attempt}: {err}",
563                                        child.call.tool_name
564                                    ),
565                                ),
566                                activity_id,
567                            )
568                            .await;
569                        return CoordinatedToolLaunch {
570                            launch: crate::runtime::ToolCallLaunch::Done {
571                                result: completed.completed,
572                            },
573                            triggers,
574                        };
575                    }
576                }
577            }
578        }
579
580        let completed = self
581            .complete_tool_call(
582                index,
583                call_id.clone(),
584                child.call.replay,
585                runtime_failure_dispatch_outcome(
586                    Some(call_id),
587                    child.call.tool_name,
588                    child.call.args,
589                    "tool_retry_loop_failed",
590                    "tool retry loop exited without a terminal result",
591                ),
592                activity_id,
593            )
594            .await;
595        CoordinatedToolLaunch {
596            launch: crate::runtime::ToolCallLaunch::Done {
597                result: completed.completed,
598            },
599            triggers,
600        }
601    }
602
603    async fn emit_tool_call_started(
604        &self,
605        call_id: &str,
606        name: &str,
607        args: serde_json::Value,
608        activity_id: TurnActivityId,
609    ) {
610        let _ = self
611            .dispatch
612            .event_tx
613            .send(SessionEvent::ToolCallStart {
614                call_id: Some(call_id.to_string()),
615                name: name.to_string(),
616                args: args.clone(),
617            })
618            .await;
619        self.emit_turn_activity(
620            activity_id,
621            TurnEvent::ToolCallStarted {
622                call_id: Some(call_id.to_string()),
623                name: name.to_string(),
624                args,
625            },
626        )
627        .await;
628    }
629
630    fn tool_attempt_invocation(
631        &self,
632        parent_invocation: &crate::RuntimeInvocation,
633        child_replay_suffix: &str,
634        attempt: u32,
635    ) -> crate::RuntimeInvocation {
636        let suffix = format!("{child_replay_suffix}:attempt:{attempt}");
637        let parent_effect_id = parent_invocation.effect_id().unwrap_or("tool-batch");
638        crate::runtime::causal::child_effect_invocation(
639            parent_invocation,
640            format!("{parent_effect_id}:{suffix}"),
641            crate::RuntimeEffectKind::ToolAttempt,
642            suffix,
643        )
644    }
645
646    async fn sleep_before_tool_retry(
647        &self,
648        parent_invocation: &crate::RuntimeInvocation,
649        child_replay_suffix: &str,
650        attempt: u32,
651        retry_after_ms: u64,
652    ) -> Result<(), crate::RuntimeEffectControllerError> {
653        let suffix = format!("{child_replay_suffix}:attempt:{attempt}:sleep");
654        let parent_effect_id = parent_invocation.effect_id().unwrap_or("tool-batch");
655        let invocation = crate::runtime::causal::child_effect_invocation(
656            parent_invocation,
657            format!("{parent_effect_id}:{suffix}"),
658            crate::RuntimeEffectKind::Sleep,
659            suffix,
660        );
661        let cancellation = self.cancellation_token.clone().unwrap_or_default();
662        let outcome = self
663            .dispatch
664            .effect_controller
665            .controller()
666            .execute_effect(
667                crate::RuntimeEffectEnvelope::new(
668                    invocation,
669                    crate::RuntimeEffectCommand::Sleep {
670                        duration_ms: retry_after_ms,
671                    },
672                ),
673                crate::RuntimeEffectLocalExecutor::sleep_with_clock(
674                    cancellation,
675                    std::sync::Arc::clone(&self.dispatch.clock),
676                ),
677            )
678            .await?;
679        match outcome {
680            crate::RuntimeEffectOutcome::Sleep => Ok(()),
681            other => Err(crate::RuntimeEffectControllerError::new(
682                "runtime_effect_wrong_outcome",
683                format!("expected sleep outcome, got {}", other.kind().as_str()),
684            )),
685        }
686    }
687
688    #[expect(
689        clippy::too_many_arguments,
690        reason = "tool execution carries explicit runtime call metadata"
691    )]
692    pub(crate) async fn execute_tool_call(
693        &self,
694        call_id: String,
695        name: String,
696        args: serde_json::Value,
697        index: usize,
698        replay: Option<crate::llm::types::ProviderReplayMeta>,
699        parent_invocation: Option<crate::RuntimeInvocation>,
700        child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
701    ) -> CompletedProtocolToolCall {
702        let _ = self
703            .dispatch
704            .event_tx
705            .send(SessionEvent::ToolCallStart {
706                call_id: Some(call_id.clone()),
707                name: name.clone(),
708                args: args.clone(),
709            })
710            .await;
711        let tool_correlation_id = TurnActivityId::new(format!("tool:{call_id}"));
712        self.emit_turn_activity(
713            tool_correlation_id.clone(),
714            TurnEvent::ToolCallStarted {
715                call_id: Some(call_id.clone()),
716                name: name.clone(),
717                args: args.clone(),
718            },
719        )
720        .await;
721
722        let parent_invocation = parent_invocation.or_else(|| self.parent_invocation.clone());
723        let mut dispatch = (*self.dispatch).clone();
724        dispatch.parent_invocation = parent_invocation.clone();
725        let pending = crate::sansio::PendingToolCall {
726            call_id: call_id.clone(),
727            tool_name: name,
728            args,
729            replay: replay.clone(),
730        };
731        let launch = match prepare_tool_call_with_context(&dispatch, pending, Some(call_id.clone()))
732            .await
733        {
734            ToolPreparationOutcome::Prepared(prepared) => {
735                let dispatch_context = std::sync::Arc::new(dispatch.clone());
736                let runtime_context = if let Some(parent_invocation) = parent_invocation.clone() {
737                    self.clone().with_parent_invocation(parent_invocation)
738                } else {
739                    self.clone()
740                };
741                let mut tool_context =
742                    crate::ToolContext::from_dispatch(std::sync::Arc::clone(&dispatch_context))
743                        .runtime_execution_context(runtime_context)
744                        .prepared_call(&prepared)
745                        .cancellation_token(self.cancellation_token.clone())
746                        .runtime_process_id(self.runtime_process_id.clone())
747                        .parent_invocation(parent_invocation.clone())
748                        .child_execution_trace_hook(child_execution_trace_hook.clone());
749                if let Some(process_events) = self.process_event_context.as_ref() {
750                    tool_context = tool_context.process_events(
751                        process_events.process_id.clone(),
752                        std::sync::Arc::clone(&process_events.registry),
753                        process_events.store.clone(),
754                        process_events.session_store_factory.clone(),
755                        process_events.queued_work_driver.clone(),
756                    );
757                }
758                let tool_context = tool_context.build();
759                dispatch_prepared_tool_call_launch_with_execution_context(
760                    dispatch_context.as_ref(),
761                    prepared,
762                    None,
763                    tool_context,
764                )
765                .await
766            }
767            ToolPreparationOutcome::Completed(outcome) => ToolCallLaunch::Done(*outcome),
768        };
769        let mut outcome = match launch {
770            ToolCallLaunch::Done(outcome) => outcome,
771            ToolCallLaunch::Pending(pending) => {
772                self.await_pending_tool_dispatch_outcome(
773                    &call_id,
774                    parent_invocation.clone(),
775                    pending,
776                    self.cancellation_token.clone(),
777                )
778                .await
779            }
780        };
781        outcome.record.call_id = Some(call_id.clone());
782
783        self.complete_tool_call(index, call_id, replay, outcome, tool_correlation_id)
784            .await
785    }
786
787    #[expect(
788        clippy::too_many_arguments,
789        reason = "tool execution carries explicit runtime call metadata"
790    )]
791    pub(crate) async fn execute_tool_call_by_id(
792        &self,
793        call_id: String,
794        tool_id: crate::ToolId,
795        args: serde_json::Value,
796        index: usize,
797        replay: Option<crate::llm::types::ProviderReplayMeta>,
798        parent_invocation: Option<crate::RuntimeInvocation>,
799        child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
800    ) -> CompletedProtocolToolCall {
801        let Some(manifest) =
802            crate::tool_dispatch::resolve_callable_manifest_by_id(self.dispatch.as_ref(), &tool_id)
803        else {
804            let outcome = ToolDispatchOutcome {
805                record: ToolCallRecord {
806                    call_id: Some(call_id.clone()),
807                    tool: tool_id.to_string(),
808                    args,
809                    output: ToolCallOutput::failure(ToolFailure::runtime(
810                        ToolFailureClass::Unavailable,
811                        "tool_unavailable",
812                        format!("Tool id `{tool_id}` is unavailable in this session"),
813                    )),
814                    duration_ms: 0,
815                },
816            };
817            let activity_id = TurnActivityId::new(format!("tool:{call_id}"));
818            return self
819                .complete_tool_call(index, call_id, replay, outcome, activity_id)
820                .await;
821        };
822        self.execute_tool_call(
823            call_id,
824            manifest.name,
825            args,
826            index,
827            replay,
828            parent_invocation,
829            child_execution_trace_hook,
830        )
831        .await
832    }
833
834    #[expect(
835        clippy::too_many_arguments,
836        reason = "tool execution carries explicit runtime call metadata"
837    )]
838    pub(crate) async fn execute_tool_call_by_grant(
839        &self,
840        call_id: String,
841        grant: crate::ToolExecutionGrant,
842        args: serde_json::Value,
843        index: usize,
844        replay: Option<crate::llm::types::ProviderReplayMeta>,
845        parent_invocation: Option<crate::RuntimeInvocation>,
846        child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
847    ) -> CompletedProtocolToolCall {
848        let name = grant.manifest.name.clone();
849        let _ = self
850            .dispatch
851            .event_tx
852            .send(SessionEvent::ToolCallStart {
853                call_id: Some(call_id.clone()),
854                name: name.clone(),
855                args: args.clone(),
856            })
857            .await;
858        let tool_correlation_id = TurnActivityId::new(format!("tool:{call_id}"));
859        self.emit_turn_activity(
860            tool_correlation_id.clone(),
861            TurnEvent::ToolCallStarted {
862                call_id: Some(call_id.clone()),
863                name: name.clone(),
864                args: args.clone(),
865            },
866        )
867        .await;
868
869        let parent_invocation = parent_invocation.or_else(|| self.parent_invocation.clone());
870        let mut dispatch = (*self.dispatch).clone();
871        dispatch.parent_invocation = parent_invocation.clone();
872        let pending = crate::sansio::PendingToolCall {
873            call_id: call_id.clone(),
874            tool_name: name,
875            args,
876            replay: replay.clone(),
877        };
878        let launch = match prepare_granted_tool_call_with_context(
879            &dispatch,
880            &grant,
881            pending,
882            Some(call_id.clone()),
883        )
884        .await
885        {
886            ToolPreparationOutcome::Prepared(prepared) => {
887                let dispatch_context = std::sync::Arc::new(dispatch.clone());
888                let runtime_context = if let Some(parent_invocation) = parent_invocation.clone() {
889                    self.clone().with_parent_invocation(parent_invocation)
890                } else {
891                    self.clone()
892                };
893                let mut tool_context =
894                    crate::ToolContext::from_dispatch(std::sync::Arc::clone(&dispatch_context))
895                        .runtime_execution_context(runtime_context)
896                        .prepared_call(&prepared)
897                        .tool_execution_binding(grant.execution_binding.clone())
898                        .cancellation_token(self.cancellation_token.clone())
899                        .runtime_process_id(self.runtime_process_id.clone())
900                        .parent_invocation(parent_invocation.clone())
901                        .child_execution_trace_hook(child_execution_trace_hook.clone());
902                if let Some(process_events) = self.process_event_context.as_ref() {
903                    tool_context = tool_context.process_events(
904                        process_events.process_id.clone(),
905                        std::sync::Arc::clone(&process_events.registry),
906                        process_events.store.clone(),
907                        process_events.session_store_factory.clone(),
908                        process_events.queued_work_driver.clone(),
909                    );
910                }
911                let tool_context = tool_context.build();
912                dispatch_granted_prepared_tool_call_launch_with_execution_context(
913                    dispatch_context.as_ref(),
914                    &grant,
915                    prepared,
916                    None,
917                    tool_context,
918                )
919                .await
920            }
921            ToolPreparationOutcome::Completed(outcome) => ToolCallLaunch::Done(*outcome),
922        };
923        let mut outcome = match launch {
924            ToolCallLaunch::Done(outcome) => outcome,
925            ToolCallLaunch::Pending(pending) => {
926                self.await_pending_tool_dispatch_outcome(
927                    &call_id,
928                    parent_invocation.clone(),
929                    pending,
930                    self.cancellation_token.clone(),
931                )
932                .await
933            }
934        };
935        outcome.record.call_id = Some(call_id.clone());
936
937        self.complete_tool_call(index, call_id, replay, outcome, tool_correlation_id)
938            .await
939    }
940
941    pub(crate) async fn prepare_tool_call(
942        &self,
943        pending: crate::sansio::PendingToolCall,
944    ) -> ToolPreparationOutcome {
945        let call_id = Some(pending.call_id.clone());
946        prepare_tool_call_with_context(self.dispatch.as_ref(), pending, call_id).await
947    }
948
949    pub(crate) async fn execute_prepared_tool_attempt_effect(
950        &self,
951        prepared: crate::PreparedToolCall,
952        execution_grant: Option<Box<crate::ToolExecutionGrant>>,
953        attempt: u32,
954        max_attempts: u32,
955        attempt_invocation: crate::RuntimeInvocation,
956        child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
957    ) -> Result<crate::ToolAttemptEffectOutcome, crate::RuntimeEffectControllerError> {
958        let call_id = prepared.call_id.clone();
959        let mut attempt_dispatch = (*self.dispatch).clone();
960        attempt_dispatch.parent_invocation = Some(attempt_invocation.clone());
961        attempt_dispatch.trigger_outcomes =
962            crate::tool_dispatch::ToolTriggerOutcomeBuffer::default();
963        let attempt_dispatch = std::sync::Arc::new(attempt_dispatch);
964        let mut attempt_context = self.clone();
965        attempt_context.dispatch = std::sync::Arc::clone(&attempt_dispatch);
966        attempt_context.parent_invocation = Some(attempt_invocation.clone());
967
968        let mut tool_context =
969            crate::ToolContext::from_dispatch(std::sync::Arc::clone(&attempt_dispatch))
970                .runtime_execution_context(attempt_context.clone())
971                .prepared_call(&prepared)
972                .cancellation_token(self.cancellation_token.clone())
973                .runtime_process_id(self.runtime_process_id.clone())
974                .parent_invocation(Some(attempt_invocation))
975                .child_execution_trace_hook(child_execution_trace_hook);
976        if let Some(process_events) = self.process_event_context.as_ref() {
977            tool_context = tool_context.process_events(
978                process_events.process_id.clone(),
979                std::sync::Arc::clone(&process_events.registry),
980                process_events.store.clone(),
981                process_events.session_store_factory.clone(),
982                process_events.queued_work_driver.clone(),
983            );
984        }
985        let tool_context = tool_context.build();
986        let launch = match Box::pin(async {
987            if let Some(grant) = execution_grant.as_ref() {
988                dispatch_granted_prepared_tool_attempt_launch_with_execution_context(
989                    attempt_dispatch.as_ref(),
990                    grant,
991                    prepared,
992                    attempt,
993                    max_attempts,
994                    None,
995                    tool_context,
996                )
997                .await
998            } else {
999                dispatch_prepared_tool_attempt_launch_with_execution_context(
1000                    attempt_dispatch.as_ref(),
1001                    prepared,
1002                    attempt,
1003                    max_attempts,
1004                    None,
1005                    tool_context,
1006                )
1007                .await
1008            }
1009        })
1010        .await
1011        {
1012            ToolCallLaunch::Done(outcome) => {
1013                let mut record = outcome.record;
1014                record.call_id = Some(call_id);
1015                crate::ToolAttemptLaunch::Done { record }
1016            }
1017            ToolCallLaunch::Pending(pending) => crate::ToolAttemptLaunch::Pending {
1018                key: pending.key,
1019                pending: pending.pending,
1020                duration_ms: pending.duration_ms,
1021            },
1022        };
1023        let triggers = attempt_context
1024            .drain_tool_trigger_outcomes()
1025            .map_err(|err| {
1026                crate::RuntimeEffectControllerError::new(
1027                    "tool_trigger_outcome_drain",
1028                    err.to_string(),
1029                )
1030            })?;
1031        Ok(crate::ToolAttemptEffectOutcome { launch, triggers })
1032    }
1033
1034    pub(super) async fn await_process_with_cancellation(
1035        &self,
1036        process_id: &str,
1037        parent_invocation: Option<crate::RuntimeInvocation>,
1038        cancellation: Option<tokio_util::sync::CancellationToken>,
1039    ) -> Result<crate::ProcessAwaitOutput, crate::PluginError> {
1040        let _phase = self.named_phase("process.await_handle");
1041        if let Some(cancellation) = cancellation {
1042            tokio::select! {
1043                result = self.dispatch.processes.await_process(
1044                    process_id,
1045                    self.process_scope(parent_invocation.clone()),
1046                ) => result,
1047                _ = cancellation.cancelled() => {
1048                    let _ = self.dispatch.processes.cancel(
1049                        &self.dispatch.session_id,
1050                        process_id,
1051                        self.process_scope(parent_invocation.clone()),
1052                    ).await;
1053                    self.dispatch.processes.await_process(
1054                        process_id,
1055                        self.process_scope(parent_invocation),
1056                    ).await
1057                }
1058            }
1059        } else {
1060            self.dispatch
1061                .processes
1062                .await_process(process_id, self.process_scope(parent_invocation))
1063                .await
1064        }
1065    }
1066
1067    pub(crate) async fn complete_tool_call(
1068        &self,
1069        _index: usize,
1070        call_id: String,
1071        replay: Option<crate::llm::types::ProviderReplayMeta>,
1072        outcome: ToolDispatchOutcome,
1073        tool_correlation_id: TurnActivityId,
1074    ) -> CompletedProtocolToolCall {
1075        let output = outcome.record.output.clone();
1076        let projection_output = output.clone();
1077        let projection_tool_name = outcome.record.tool.clone();
1078        let projection_args = outcome.record.args.clone();
1079        let projection_duration_ms = outcome.record.duration_ms;
1080        let projection_call_id = call_id.clone();
1081        tokio::task::yield_now().await;
1082        let plugins = std::sync::Arc::clone(&self.dispatch.plugins);
1083        let projection_context = crate::plugin::ToolResultProjectionContext {
1084            session_id: self.dispatch.session_id.clone(),
1085            tool_name: projection_tool_name,
1086            args: projection_args,
1087            output: projection_output,
1088            duration_ms: projection_duration_ms,
1089            call_id: projection_call_id,
1090        };
1091        let model_return = match plugins.project_tool_result(projection_context).await {
1092            Ok(projected) => projected,
1093            Err(err) => ModelToolReturn::text(
1094                call_id.clone(),
1095                outcome.record.tool.clone(),
1096                err.to_string(),
1097            ),
1098        };
1099
1100        self.emit_turn_activity(
1101            tool_correlation_id,
1102            TurnEvent::ToolCallCompleted {
1103                call_id: Some(call_id.clone()),
1104                name: outcome.record.tool.clone(),
1105                args: outcome.record.args.clone(),
1106                output: output.clone(),
1107                duration_ms: outcome.record.duration_ms,
1108            },
1109        )
1110        .await;
1111
1112        let record = ToolCallRecord {
1113            call_id: Some(call_id.clone()),
1114            tool: outcome.record.tool.clone(),
1115            args: outcome.record.args.clone(),
1116            output: output.clone(),
1117            duration_ms: outcome.record.duration_ms,
1118        };
1119        CompletedProtocolToolCall {
1120            completed: crate::sansio::CompletedToolCall {
1121                call_id,
1122                tool_name: outcome.record.tool,
1123                args: outcome.record.args,
1124                output,
1125                model_return,
1126                duration_ms: outcome.record.duration_ms,
1127                replay,
1128            },
1129            record,
1130        }
1131    }
1132
1133    pub(crate) async fn pending_completion_dispatch_outcome(
1134        &self,
1135        tool_name: String,
1136        args: serde_json::Value,
1137        resolution: crate::Resolution,
1138        duration_ms: u64,
1139    ) -> ToolDispatchOutcome {
1140        let output = crate::tool_result::tool_output_from_completion_resolution(resolution);
1141        let result = finalize_tool_result_with_execution_context(
1142            self.dispatch.as_ref(),
1143            &tool_name,
1144            &args,
1145            ToolResult::from_output(output),
1146            duration_ms,
1147        )
1148        .await;
1149        let output = result.into_done_output().unwrap_or_else(|_| {
1150            ToolCallOutput::failure(ToolFailure::runtime(
1151                ToolFailureClass::Internal,
1152                "pending_tool_not_finalized",
1153                "pending tool result reached a completed-output projection path",
1154            ))
1155        });
1156        ToolDispatchOutcome {
1157            record: ToolCallRecord {
1158                call_id: None,
1159                tool: tool_name,
1160                args,
1161                output,
1162                duration_ms,
1163            },
1164        }
1165    }
1166
1167    async fn await_pending_tool_dispatch_outcome(
1168        &self,
1169        call_id: &str,
1170        parent_invocation: Option<crate::RuntimeInvocation>,
1171        pending: crate::tool_dispatch::PendingToolDispatchOutcome,
1172        cancellation: Option<tokio_util::sync::CancellationToken>,
1173    ) -> ToolDispatchOutcome {
1174        self.await_pending_tool_dispatch_outcome_with_suffix(
1175            call_id,
1176            parent_invocation,
1177            format!("{call_id}:await"),
1178            pending,
1179            cancellation,
1180        )
1181        .await
1182    }
1183
1184    async fn await_pending_tool_dispatch_outcome_with_suffix(
1185        &self,
1186        call_id: &str,
1187        parent_invocation: Option<crate::RuntimeInvocation>,
1188        replay_suffix: String,
1189        pending: crate::tool_dispatch::PendingToolDispatchOutcome,
1190        cancellation: Option<tokio_util::sync::CancellationToken>,
1191    ) -> ToolDispatchOutcome {
1192        let fallback;
1193        let parent = if let Some(parent) = parent_invocation.as_ref() {
1194            parent
1195        } else {
1196            fallback = crate::RuntimeInvocation::effect(
1197                crate::RuntimeScope::new(&self.dispatch.session_id),
1198                format!("tool:{call_id}:await"),
1199                crate::RuntimeEffectKind::AwaitEvent,
1200                format!("tool:{call_id}:await"),
1201            );
1202            &fallback
1203        };
1204        let parent_effect_id = parent.effect_id().unwrap_or("tool");
1205        let invocation = crate::runtime::causal::child_effect_invocation(
1206            parent,
1207            format!("{parent_effect_id}:{replay_suffix}"),
1208            crate::RuntimeEffectKind::AwaitEvent,
1209            replay_suffix,
1210        );
1211        let cancellation = cancellation.unwrap_or_default();
1212        let deadline = pending
1213            .pending
1214            .deadline
1215            .map(|duration| self.dispatch.clock.now() + duration);
1216        let outcome = self
1217            .dispatch
1218            .effect_controller
1219            .controller()
1220            .execute_effect(
1221                crate::RuntimeEffectEnvelope::new(
1222                    invocation,
1223                    crate::RuntimeEffectCommand::AwaitEvent { key: pending.key },
1224                ),
1225                crate::RuntimeEffectLocalExecutor::await_event_with_clock(
1226                    cancellation,
1227                    deadline,
1228                    std::sync::Arc::clone(&self.dispatch.clock),
1229                ),
1230            )
1231            .await;
1232        let resolution = match outcome.and_then(crate::RuntimeEffectOutcome::into_await_event) {
1233            Ok(resolution) => resolution,
1234            Err(err) => {
1235                return ToolDispatchOutcome {
1236                    record: ToolCallRecord {
1237                        call_id: None,
1238                        tool: pending.tool_name,
1239                        args: pending.args,
1240                        output: ToolCallOutput::failure(ToolFailure::runtime(
1241                            ToolFailureClass::Internal,
1242                            "pending_tool_completion_failed",
1243                            err.to_string(),
1244                        )),
1245                        duration_ms: pending.duration_ms,
1246                    },
1247                };
1248            }
1249        };
1250        self.pending_completion_dispatch_outcome(
1251            pending.tool_name,
1252            pending.args,
1253            resolution,
1254            pending.duration_ms,
1255        )
1256        .await
1257    }
1258
1259    pub async fn call_tool_by_id(
1260        &self,
1261        call_id: String,
1262        tool_id: crate::ToolId,
1263        args: serde_json::Value,
1264        index: usize,
1265    ) -> ToolInvocationReply {
1266        let executed = self
1267            .execute_tool_call_by_id(call_id, tool_id, args, index, None, None, None)
1268            .await;
1269        let reply = ToolInvocationReply::from_output(executed.completed.output);
1270        reply.with_record(executed.record)
1271    }
1272
1273    pub async fn call_tool_by_id_with_child_execution_trace_hook(
1274        &self,
1275        call_id: String,
1276        tool_id: crate::ToolId,
1277        args: serde_json::Value,
1278        index: usize,
1279        trace_hook: crate::ToolChildExecutionTraceHook,
1280    ) -> ToolInvocationReply {
1281        let executed = self
1282            .execute_tool_call_by_id(call_id, tool_id, args, index, None, None, Some(trace_hook))
1283            .await;
1284        let reply = ToolInvocationReply::from_output(executed.completed.output);
1285        reply.with_record(executed.record)
1286    }
1287
1288    pub async fn call_tool_with_execution_grant(
1289        &self,
1290        call_id: String,
1291        grant: crate::ToolExecutionGrant,
1292        args: serde_json::Value,
1293        index: usize,
1294    ) -> ToolInvocationReply {
1295        let executed = self
1296            .execute_tool_call_by_grant(call_id, grant, args, index, None, None, None)
1297            .await;
1298        let reply = ToolInvocationReply::from_output(executed.completed.output);
1299        reply.with_record(executed.record)
1300    }
1301
1302    pub async fn call_tool_with_execution_grant_and_child_execution_trace_hook(
1303        &self,
1304        call_id: String,
1305        grant: crate::ToolExecutionGrant,
1306        args: serde_json::Value,
1307        index: usize,
1308        trace_hook: crate::ToolChildExecutionTraceHook,
1309    ) -> ToolInvocationReply {
1310        let executed = self
1311            .execute_tool_call_by_grant(call_id, grant, args, index, None, None, Some(trace_hook))
1312            .await;
1313        let reply = ToolInvocationReply::from_output(executed.completed.output);
1314        reply.with_record(executed.record)
1315    }
1316
1317    pub async fn call_tool_batch(&self, calls: Vec<ToolInvocation>) -> Vec<ToolInvocationReply> {
1318        if calls.is_empty() {
1319            return Vec::new();
1320        }
1321
1322        let batch_id = deterministic_tool_invocation_batch_id(&calls);
1323        let mut replies = vec![None; calls.len()];
1324        let mut prepared_entries = Vec::new();
1325
1326        for (index, call) in calls.into_iter().enumerate() {
1327            let preparation = if let Some(grant) = call.execution_grant.as_deref().cloned() {
1328                let pending = crate::sansio::PendingToolCall {
1329                    call_id: call.id.clone(),
1330                    tool_name: grant.manifest.name.clone(),
1331                    args: call.args,
1332                    replay: None,
1333                };
1334                (
1335                    Some(grant.clone()),
1336                    prepare_granted_tool_call_with_context(
1337                        self.dispatch.as_ref(),
1338                        &grant,
1339                        pending,
1340                        Some(call.id.clone()),
1341                    )
1342                    .await,
1343                )
1344            } else {
1345                let Some(manifest) = crate::tool_dispatch::resolve_callable_manifest_by_id(
1346                    self.dispatch.as_ref(),
1347                    &call.tool_id,
1348                ) else {
1349                    let outcome = ToolDispatchOutcome {
1350                        record: ToolCallRecord {
1351                            call_id: Some(call.id.clone()),
1352                            tool: call.tool_id.to_string(),
1353                            args: call.args,
1354                            output: ToolCallOutput::failure(ToolFailure::runtime(
1355                                ToolFailureClass::Unavailable,
1356                                "tool_unavailable",
1357                                format!(
1358                                    "Tool id `{}` is unavailable in this session",
1359                                    call.tool_id
1360                                ),
1361                            )),
1362                            duration_ms: 0,
1363                        },
1364                    };
1365                    let completed = self
1366                        .complete_tool_call(
1367                            index,
1368                            call.id,
1369                            None,
1370                            outcome,
1371                            TurnActivityId::new(format!("tool:{}", batch_id)),
1372                        )
1373                        .await;
1374                    replies[index] = Some(
1375                        ToolInvocationReply::from_output(completed.completed.output)
1376                            .with_record(completed.record),
1377                    );
1378                    continue;
1379                };
1380
1381                let pending = crate::sansio::PendingToolCall {
1382                    call_id: call.id.clone(),
1383                    tool_name: manifest.name,
1384                    args: call.args,
1385                    replay: None,
1386                };
1387                (None, self.prepare_tool_call(pending).await)
1388            };
1389            let (execution_grant, preparation) = preparation;
1390            match preparation {
1391                ToolPreparationOutcome::Prepared(prepared) => {
1392                    prepared_entries.push((
1393                        index,
1394                        prepared,
1395                        execution_grant,
1396                        call.child_execution_trace_hook,
1397                    ));
1398                }
1399                ToolPreparationOutcome::Completed(outcome) => {
1400                    let completed = self
1401                        .complete_tool_call(
1402                            index,
1403                            call.id,
1404                            None,
1405                            *outcome,
1406                            TurnActivityId::new(format!("tool:{}", batch_id)),
1407                        )
1408                        .await;
1409                    replies[index] = Some(
1410                        ToolInvocationReply::from_output(completed.completed.output)
1411                            .with_record(completed.record),
1412                    );
1413                }
1414            }
1415        }
1416
1417        if !prepared_entries.is_empty() {
1418            let invocation = self.tool_batch_invocation(&batch_id);
1419            let batch = crate::PreparedToolBatch::new_with_grants(
1420                batch_id.clone(),
1421                prepared_entries
1422                    .iter()
1423                    .map(|(_, prepared, grant, _)| (prepared.clone(), grant.clone()))
1424                    .collect(),
1425            );
1426            let child_trace_hooks = prepared_entries
1427                .iter()
1428                .filter_map(|(_, prepared, _, hook)| {
1429                    hook.clone().map(|hook| (prepared.call_id.clone(), hook))
1430                })
1431                .collect();
1432            let envelope = crate::RuntimeEffectEnvelope::new(
1433                invocation.clone(),
1434                crate::RuntimeEffectCommand::ToolBatch { batch },
1435            );
1436            let local_executor =
1437                crate::RuntimeEffectLocalExecutor::tool_batch(self.clone(), child_trace_hooks);
1438            let raw_outcome = self
1439                .dispatch
1440                .effect_controller
1441                .controller()
1442                .execute_effect(envelope, local_executor)
1443                .await;
1444            let outcome =
1445                match raw_outcome.and_then(crate::RuntimeEffectOutcome::into_tool_batch_effect) {
1446                    Ok(outcome) => outcome,
1447                    Err(err) => {
1448                        for (index, prepared, _, _) in prepared_entries {
1449                            replies[index] = Some(ToolInvocationReply::error(serde_json::json!(
1450                                format!("tool batch failed: {err}")
1451                            )));
1452                            let _ = prepared;
1453                        }
1454                        return replies
1455                            .into_iter()
1456                            .map(|reply| reply.expect("every batch reply slot should be filled"))
1457                            .collect();
1458                    }
1459                };
1460            if outcome.launches.len() != prepared_entries.len() {
1461                let message = format!(
1462                    "tool batch returned {} launches for {} prepared calls",
1463                    outcome.launches.len(),
1464                    prepared_entries.len()
1465                );
1466                for (index, _, _, _) in prepared_entries {
1467                    replies[index] = Some(ToolInvocationReply::error(serde_json::json!(message)));
1468                }
1469            } else {
1470                for ((index, prepared, _, _), launch) in
1471                    prepared_entries.into_iter().zip(outcome.launches)
1472                {
1473                    let call_id = prepared.call_id.clone();
1474                    let reply = match launch {
1475                        crate::runtime::ToolCallLaunch::Done { result } => {
1476                            let record = ToolCallRecord {
1477                                call_id: Some(result.call_id.clone()),
1478                                tool: result.tool_name.clone(),
1479                                args: result.args.clone(),
1480                                output: result.output.clone(),
1481                                duration_ms: result.duration_ms,
1482                            };
1483                            ToolInvocationReply::from_output(result.output).with_record(record)
1484                        }
1485                        crate::runtime::ToolCallLaunch::Pending {
1486                            key,
1487                            pending,
1488                            duration_ms,
1489                        } => {
1490                            let dispatch_outcome = self
1491                                .await_pending_tool_dispatch_outcome(
1492                                    &call_id,
1493                                    Some(invocation.clone()),
1494                                    crate::tool_dispatch::PendingToolDispatchOutcome {
1495                                        tool_name: prepared.tool_name.clone(),
1496                                        args: prepared.args.clone(),
1497                                        key,
1498                                        pending,
1499                                        duration_ms,
1500                                    },
1501                                    self.cancellation_token.clone(),
1502                                )
1503                                .await;
1504                            let completed = self
1505                                .complete_tool_call(
1506                                    index,
1507                                    call_id.clone(),
1508                                    prepared.replay.clone(),
1509                                    dispatch_outcome,
1510                                    TurnActivityId::new(format!("tool:{call_id}")),
1511                                )
1512                                .await;
1513                            ToolInvocationReply::from_output(completed.completed.output)
1514                                .with_record(completed.record)
1515                        }
1516                    };
1517                    replies[index] = Some(reply);
1518                }
1519            }
1520        }
1521
1522        replies
1523            .into_iter()
1524            .map(|reply| reply.expect("every batch reply slot should be filled"))
1525            .collect()
1526    }
1527
1528    pub async fn start_tool_call(
1529        &self,
1530        call_id: String,
1531        name: String,
1532        args: serde_json::Value,
1533    ) -> ToolInvocationReply {
1534        self.start_tool_process(call_id, name, args).await
1535    }
1536
1537    pub async fn await_tool_handle(
1538        &self,
1539        call_id: String,
1540        handle: serde_json::Value,
1541    ) -> ToolInvocationReply {
1542        self.await_process_handle(call_id, handle).await
1543    }
1544
1545    pub async fn cancel_tool_handle(
1546        &self,
1547        call_id: String,
1548        handle: serde_json::Value,
1549    ) -> ToolInvocationReply {
1550        self.cancel_process_handle(call_id, handle).await
1551    }
1552
1553    pub async fn signal_tool_handle(
1554        &self,
1555        call_id: String,
1556        handle: serde_json::Value,
1557        signal_name: String,
1558        payload: serde_json::Value,
1559    ) -> ToolInvocationReply {
1560        self.signal_process_handle(call_id, handle, signal_name, payload)
1561            .await
1562    }
1563}