Skip to main content

lash_core/session/
tool_execution.rs

1use super::execution_context::RuntimeExecutionContext;
2use crate::tool_dispatch::{
3    ToolCallLaunch, ToolDispatchOutcome, ToolPreparationOutcome,
4    dispatch_prepared_tool_call_launch_with_execution_context,
5    finalize_tool_result_with_execution_context, prepare_tool_call_with_context,
6    schedule_tool_batch,
7};
8use crate::{
9    ModelToolReturn, SessionEvent, ToolCallOutput, ToolCallRecord, ToolCancellation, ToolFailure,
10    ToolFailureClass, ToolResult, TurnActivityId, TurnEvent,
11};
12use std::collections::HashMap;
13
14#[derive(Clone)]
15pub struct ToolInvocation {
16    pub id: String,
17    pub tool_id: crate::ToolId,
18    pub args: serde_json::Value,
19    pub child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
20}
21
22impl ToolInvocation {
23    pub fn new(id: impl Into<String>, tool_id: crate::ToolId, args: serde_json::Value) -> Self {
24        Self {
25            id: id.into(),
26            tool_id,
27            args,
28            child_execution_trace_hook: None,
29        }
30    }
31
32    pub fn label(&self) -> String {
33        self.tool_id.to_string()
34    }
35
36    pub fn with_child_execution_trace_hook(
37        mut self,
38        hook: crate::ToolChildExecutionTraceHook,
39    ) -> Self {
40        self.child_execution_trace_hook = Some(hook);
41        self
42    }
43}
44
45impl std::fmt::Debug for ToolInvocation {
46    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47        f.debug_struct("ToolInvocation")
48            .field("id", &self.id)
49            .field("tool_id", &self.tool_id)
50            .field("args", &self.args)
51            .field(
52                "child_execution_trace_hook",
53                &self.child_execution_trace_hook.as_ref().map(|_| "<hook>"),
54            )
55            .finish()
56    }
57}
58
59#[derive(Clone, Debug)]
60pub struct ToolInvocationReply {
61    pub output: ToolCallOutput,
62    pub record: Option<ToolCallRecord>,
63}
64
65impl ToolInvocationReply {
66    pub fn success(value: serde_json::Value) -> Self {
67        Self {
68            output: ToolCallOutput::success(value),
69            record: None,
70        }
71    }
72
73    pub fn error(value: serde_json::Value) -> Self {
74        let message = value
75            .as_str()
76            .map(ToOwned::to_owned)
77            .unwrap_or_else(|| value.to_string());
78        let mut failure = ToolFailure::tool(ToolFailureClass::Execution, "tool_error", message);
79        failure.raw =
80            Some(serde_json::from_value(value).unwrap_or_else(|_| {
81                crate::ToolValue::String("unserializable tool error".to_string())
82            }));
83        Self {
84            output: ToolCallOutput::failure(failure),
85            record: None,
86        }
87    }
88
89    pub fn from_output(output: ToolCallOutput) -> Self {
90        Self {
91            output,
92            record: None,
93        }
94    }
95
96    pub fn cancelled(message: impl Into<String>) -> Self {
97        Self::from_output(ToolCallOutput::cancelled(ToolCancellation::runtime(
98            message,
99        )))
100    }
101
102    pub(crate) fn with_record(mut self, record: ToolCallRecord) -> Self {
103        self.record = Some(record);
104        self
105    }
106}
107
108#[derive(Clone, Debug)]
109pub(crate) struct CompletedProtocolToolCall {
110    pub completed: crate::sansio::CompletedToolCall,
111    pub record: ToolCallRecord,
112}
113
114pub(crate) enum ProtocolToolCallLaunch {
115    Done(CompletedProtocolToolCall),
116    Pending(crate::tool_dispatch::PendingToolDispatchOutcome),
117}
118
119fn cancelled_runtime_tool_call_launch(
120    call_id: String,
121    tool_name: String,
122    args: serde_json::Value,
123    replay: Option<crate::llm::types::ProviderReplayMeta>,
124) -> crate::runtime::ToolCallLaunch {
125    crate::runtime::ToolCallLaunch::Done {
126        result: cancelled_completed_tool_call(call_id, tool_name, args, replay),
127    }
128}
129
130fn cancelled_completed_tool_call(
131    call_id: String,
132    tool_name: String,
133    args: serde_json::Value,
134    replay: Option<crate::llm::types::ProviderReplayMeta>,
135) -> crate::sansio::CompletedToolCall {
136    let output = ToolCallOutput::cancelled(ToolCancellation::runtime("tool call cancelled"));
137    crate::sansio::CompletedToolCall {
138        call_id: call_id.clone(),
139        tool_name: tool_name.clone(),
140        args,
141        model_return: ModelToolReturn {
142            call_id,
143            tool_name,
144            parts: vec![crate::ModelToolReturnPart::text(
145                "[Tool execution cancelled]\ntool call cancelled".to_string(),
146            )],
147        },
148        output,
149        duration_ms: 0,
150        replay,
151    }
152}
153
154fn deterministic_tool_invocation_batch_id(calls: &[ToolInvocation]) -> String {
155    let identity = calls
156        .iter()
157        .map(|call| {
158            serde_json::json!({
159                "id": call.id.clone(),
160                "tool_id": call.tool_id.to_string(),
161                "args": call.args.clone(),
162            })
163        })
164        .collect::<Vec<_>>();
165    let digest = crate::stable_hash::stable_json_sha256_hex(&identity)
166        .unwrap_or_else(|_| format!("len-{}", calls.len()));
167    format!("tool-batch:{digest}")
168}
169
170#[derive(Clone)]
171pub(crate) struct PreparedToolRun {
172    pub prepared: crate::PreparedToolCall,
173    pub index: usize,
174    pub parent_invocation: Option<crate::RuntimeInvocation>,
175    pub activity_id: TurnActivityId,
176}
177
178impl RuntimeExecutionContext<'_> {
179    fn tool_batch_invocation(&self, batch_id: &str) -> crate::RuntimeInvocation {
180        let suffix = format!("tool-batch:{batch_id}");
181        if let Some(parent) = self.parent_invocation.as_ref() {
182            let parent_effect_id = parent.effect_id().unwrap_or("effect");
183            return crate::runtime::causal::child_effect_invocation(
184                parent,
185                format!("{parent_effect_id}:{suffix}"),
186                crate::RuntimeEffectKind::ToolBatch,
187                suffix,
188            );
189        }
190        let replay_key = format!("{}:{suffix}", self.execution_scope_id());
191        crate::RuntimeInvocation::effect(
192            crate::RuntimeScope::new(self.session_id.clone()),
193            suffix,
194            crate::RuntimeEffectKind::ToolBatch,
195            replay_key,
196        )
197    }
198
199    fn should_execute_child_tool_batch_locally(&self) -> bool {
200        self.parent_invocation
201            .as_ref()
202            .and_then(crate::RuntimeInvocation::effect_kind)
203            == Some(crate::RuntimeEffectKind::ToolBatch)
204            && self
205                .dispatch
206                .effect_controller
207                .controller()
208                .durability_tier()
209                == crate::DurabilityTier::Durable
210    }
211
212    pub(crate) async fn execute_prepared_tool_batch_launches(
213        &self,
214        batch: crate::PreparedToolBatch,
215        parent_invocation: crate::RuntimeInvocation,
216        child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
217    ) -> Result<crate::ToolBatchEffectOutcome, crate::RuntimeEffectControllerError> {
218        let indexed_tools = batch.calls.into_iter().enumerate().collect::<Vec<_>>();
219        let cancellation = self.cancellation_token.clone().unwrap_or_default();
220        let tool_cancel = cancellation.child_token();
221        let child_trace_hooks = std::sync::Arc::new(child_trace_hooks);
222        let outcomes = schedule_tool_batch(
223            indexed_tools,
224            |(index, _)| *index,
225            |(_, child)| self.tool_scheduling(&child.call.tool_name),
226            {
227                let context = self.clone();
228                let cancellation = cancellation.clone();
229                let tool_cancel = tool_cancel.clone();
230                let child_trace_hooks = std::sync::Arc::clone(&child_trace_hooks);
231                move |(index, child)| {
232                    let context = context.clone().with_cancellation_token(tool_cancel.clone());
233                    let cancellation = cancellation.clone();
234                    let tool_cancel = tool_cancel.clone();
235                    let parent_invocation = parent_invocation.clone();
236                    let cancelled_tool = child.call.clone();
237                    let child_execution_trace_hook =
238                        child_trace_hooks.get(&child.call.call_id).cloned();
239                    async move {
240                        let tool_call = context.execute_prepared_tool_call_launch(
241                            child.call,
242                            index,
243                            Some(parent_invocation),
244                            child_execution_trace_hook,
245                        );
246                        tokio::pin!(tool_call);
247                        tokio::select! {
248                            biased;
249                            _ = cancellation.cancelled() => {
250                                tool_cancel.cancel();
251                                let grace = context
252                                    .dispatch
253                                    .clock
254                                    .sleep(std::time::Duration::from_millis(50));
255                                tokio::pin!(grace);
256                                tokio::select! {
257                                    biased;
258                                    outcome = &mut tool_call => outcome,
259                                    _ = &mut grace => cancelled_runtime_tool_call_launch(
260                                        cancelled_tool.call_id,
261                                        cancelled_tool.tool_name,
262                                        cancelled_tool.args,
263                                        cancelled_tool.replay,
264                                    ),
265                                }
266                            }
267                            outcome = &mut tool_call => outcome,
268                        }
269                    }
270                }
271            },
272        )
273        .await;
274
275        let triggers = self.drain_tool_trigger_outcomes().map_err(|err| {
276            crate::RuntimeEffectControllerError::new("tool_trigger_outcome_drain", err.to_string())
277        })?;
278        Ok(crate::ToolBatchEffectOutcome {
279            launches: outcomes,
280            triggers,
281        })
282    }
283
284    fn prepared_tool_run(
285        &self,
286        prepared: crate::PreparedToolCall,
287        index: usize,
288        parent_invocation: Option<crate::RuntimeInvocation>,
289    ) -> PreparedToolRun {
290        let activity_id = TurnActivityId::new(format!("tool:{}", prepared.call_id));
291        PreparedToolRun {
292            prepared,
293            index,
294            parent_invocation,
295            activity_id,
296        }
297    }
298
299    #[expect(
300        clippy::too_many_arguments,
301        reason = "tool execution carries explicit runtime call metadata"
302    )]
303    pub(crate) async fn execute_tool_call(
304        &self,
305        call_id: String,
306        name: String,
307        args: serde_json::Value,
308        index: usize,
309        replay: Option<crate::llm::types::ProviderReplayMeta>,
310        parent_invocation: Option<crate::RuntimeInvocation>,
311        child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
312    ) -> CompletedProtocolToolCall {
313        let _ = self
314            .dispatch
315            .event_tx
316            .send(SessionEvent::ToolCallStart {
317                call_id: Some(call_id.clone()),
318                name: name.clone(),
319                args: args.clone(),
320            })
321            .await;
322        let tool_correlation_id = TurnActivityId::new(format!("tool:{call_id}"));
323        self.emit_turn_activity(
324            tool_correlation_id.clone(),
325            TurnEvent::ToolCallStarted {
326                call_id: Some(call_id.clone()),
327                name: name.clone(),
328                args: args.clone(),
329            },
330        )
331        .await;
332
333        let parent_invocation = parent_invocation.or_else(|| self.parent_invocation.clone());
334        let mut dispatch = (*self.dispatch).clone();
335        dispatch.parent_invocation = parent_invocation.clone();
336        let pending = crate::sansio::PendingToolCall {
337            call_id: call_id.clone(),
338            tool_name: name,
339            args,
340            replay: replay.clone(),
341        };
342        let launch = match prepare_tool_call_with_context(&dispatch, pending, Some(call_id.clone()))
343            .await
344        {
345            ToolPreparationOutcome::Prepared(prepared) => {
346                let dispatch_context = std::sync::Arc::new(dispatch.clone());
347                let runtime_context = if let Some(parent_invocation) = parent_invocation.clone() {
348                    self.clone().with_parent_invocation(parent_invocation)
349                } else {
350                    self.clone()
351                };
352                let mut tool_context =
353                    crate::ToolContext::from_dispatch(std::sync::Arc::clone(&dispatch_context))
354                        .runtime_execution_context(runtime_context)
355                        .prepared_call(&prepared)
356                        .cancellation_token(self.cancellation_token.clone())
357                        .runtime_process_id(self.runtime_process_id.clone())
358                        .parent_invocation(parent_invocation.clone())
359                        .child_execution_trace_hook(child_execution_trace_hook.clone());
360                if let Some(process_events) = self.process_event_context.as_ref() {
361                    tool_context = tool_context.process_events(
362                        process_events.process_id.clone(),
363                        std::sync::Arc::clone(&process_events.registry),
364                        process_events.store.clone(),
365                        process_events.session_store_factory.clone(),
366                        process_events.queued_work_driver.clone(),
367                    );
368                }
369                let tool_context = tool_context.build();
370                dispatch_prepared_tool_call_launch_with_execution_context(
371                    dispatch_context.as_ref(),
372                    prepared,
373                    None,
374                    tool_context,
375                )
376                .await
377            }
378            ToolPreparationOutcome::Completed(outcome) => ToolCallLaunch::Done(*outcome),
379        };
380        let mut outcome = match launch {
381            ToolCallLaunch::Done(outcome) => outcome,
382            ToolCallLaunch::Pending(pending) => {
383                self.await_pending_tool_dispatch_outcome(
384                    &call_id,
385                    parent_invocation.clone(),
386                    pending,
387                    self.cancellation_token.clone(),
388                )
389                .await
390            }
391        };
392        outcome.record.call_id = Some(call_id.clone());
393
394        self.complete_tool_call(index, call_id, replay, outcome, tool_correlation_id)
395            .await
396    }
397
398    #[expect(
399        clippy::too_many_arguments,
400        reason = "tool execution carries explicit runtime call metadata"
401    )]
402    pub(crate) async fn execute_tool_call_by_id(
403        &self,
404        call_id: String,
405        tool_id: crate::ToolId,
406        args: serde_json::Value,
407        index: usize,
408        replay: Option<crate::llm::types::ProviderReplayMeta>,
409        parent_invocation: Option<crate::RuntimeInvocation>,
410        child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
411    ) -> CompletedProtocolToolCall {
412        let Some(manifest) =
413            crate::tool_dispatch::resolve_callable_manifest_by_id(self.dispatch.as_ref(), &tool_id)
414        else {
415            let outcome = ToolDispatchOutcome {
416                record: ToolCallRecord {
417                    call_id: Some(call_id.clone()),
418                    tool: tool_id.to_string(),
419                    args,
420                    output: ToolCallOutput::failure(ToolFailure::runtime(
421                        ToolFailureClass::Unavailable,
422                        "tool_unavailable",
423                        format!("Tool id `{tool_id}` is unavailable in this session"),
424                    )),
425                    duration_ms: 0,
426                },
427            };
428            let activity_id = TurnActivityId::new(format!("tool:{call_id}"));
429            return self
430                .complete_tool_call(index, call_id, replay, outcome, activity_id)
431                .await;
432        };
433        self.execute_tool_call(
434            call_id,
435            manifest.name,
436            args,
437            index,
438            replay,
439            parent_invocation,
440            child_execution_trace_hook,
441        )
442        .await
443    }
444
445    pub(crate) async fn prepare_tool_call(
446        &self,
447        pending: crate::sansio::PendingToolCall,
448    ) -> ToolPreparationOutcome {
449        let call_id = Some(pending.call_id.clone());
450        prepare_tool_call_with_context(self.dispatch.as_ref(), pending, call_id).await
451    }
452
453    pub(crate) async fn execute_prepared_tool_call_launch(
454        &self,
455        prepared: crate::PreparedToolCall,
456        index: usize,
457        parent_invocation: Option<crate::RuntimeInvocation>,
458        child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
459    ) -> crate::runtime::ToolCallLaunch {
460        match Box::pin(self.execute_prepared_tool_call_launch_inner(
461            prepared,
462            index,
463            parent_invocation,
464            child_execution_trace_hook,
465        ))
466        .await
467        {
468            ProtocolToolCallLaunch::Done(completed) => crate::runtime::ToolCallLaunch::Done {
469                result: completed.completed,
470            },
471            ProtocolToolCallLaunch::Pending(pending) => crate::runtime::ToolCallLaunch::Pending {
472                key: pending.key,
473                pending: pending.pending,
474                duration_ms: pending.duration_ms,
475            },
476        }
477    }
478
479    async fn execute_prepared_tool_call_launch_inner(
480        &self,
481        prepared: crate::PreparedToolCall,
482        index: usize,
483        parent_invocation: Option<crate::RuntimeInvocation>,
484        child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
485    ) -> ProtocolToolCallLaunch {
486        let call_id = prepared.call_id.clone();
487        let name = prepared.tool_name.clone();
488        let args = prepared.args.clone();
489        let replay = prepared.replay.clone();
490        let parent_invocation = parent_invocation.or_else(|| self.parent_invocation.clone());
491        let run = self.prepared_tool_run(prepared, index, parent_invocation);
492        let prepared = run.prepared.clone();
493        let _ = self
494            .dispatch
495            .event_tx
496            .send(SessionEvent::ToolCallStart {
497                call_id: Some(call_id.clone()),
498                name: name.clone(),
499                args: args.clone(),
500            })
501            .await;
502        let tool_correlation_id = run.activity_id.clone();
503        self.emit_turn_activity(
504            tool_correlation_id.clone(),
505            TurnEvent::ToolCallStarted {
506                call_id: Some(call_id.clone()),
507                name: name.clone(),
508                args: args.clone(),
509            },
510        )
511        .await;
512
513        let runtime_context = if let Some(parent_invocation) = run.parent_invocation.clone() {
514            self.clone().with_parent_invocation(parent_invocation)
515        } else {
516            self.clone()
517        };
518        let mut tool_context =
519            crate::ToolContext::from_dispatch(std::sync::Arc::clone(&self.dispatch))
520                .runtime_execution_context(runtime_context)
521                .prepared_call(&prepared)
522                .cancellation_token(self.cancellation_token.clone())
523                .runtime_process_id(self.runtime_process_id.clone())
524                .parent_invocation(run.parent_invocation.clone())
525                .child_execution_trace_hook(child_execution_trace_hook);
526        if let Some(process_events) = self.process_event_context.as_ref() {
527            tool_context = tool_context.process_events(
528                process_events.process_id.clone(),
529                std::sync::Arc::clone(&process_events.registry),
530                process_events.store.clone(),
531                process_events.session_store_factory.clone(),
532                process_events.queued_work_driver.clone(),
533            );
534        }
535        let tool_context = tool_context.build();
536        let outcome = Box::pin(dispatch_prepared_tool_call_launch_with_execution_context(
537            self.dispatch.as_ref(),
538            prepared,
539            None,
540            tool_context,
541        ))
542        .await;
543        match outcome {
544            ToolCallLaunch::Done(mut outcome) => {
545                outcome.record.call_id = Some(call_id.clone());
546                tokio::task::yield_now().await;
547                let completed = self
548                    .complete_tool_call(run.index, call_id, replay, outcome, tool_correlation_id)
549                    .await;
550                ProtocolToolCallLaunch::Done(completed)
551            }
552            ToolCallLaunch::Pending(pending) => ProtocolToolCallLaunch::Pending(pending),
553        }
554    }
555
556    pub(super) async fn await_process_with_cancellation(
557        &self,
558        process_id: &str,
559        parent_invocation: Option<crate::RuntimeInvocation>,
560        cancellation: Option<tokio_util::sync::CancellationToken>,
561    ) -> Result<crate::ProcessAwaitOutput, crate::PluginError> {
562        let _phase = self.named_phase("process.await_handle");
563        if let Some(cancellation) = cancellation {
564            tokio::select! {
565                result = self.dispatch.processes.await_process(
566                    process_id,
567                    self.process_scope(parent_invocation.clone()),
568                ) => result,
569                _ = cancellation.cancelled() => {
570                    let _ = self.dispatch.processes.cancel(
571                        &self.dispatch.session_id,
572                        process_id,
573                        self.process_scope(parent_invocation.clone()),
574                    ).await;
575                    self.dispatch.processes.await_process(
576                        process_id,
577                        self.process_scope(parent_invocation),
578                    ).await
579                }
580            }
581        } else {
582            self.dispatch
583                .processes
584                .await_process(process_id, self.process_scope(parent_invocation))
585                .await
586        }
587    }
588
589    pub(crate) async fn complete_tool_call(
590        &self,
591        _index: usize,
592        call_id: String,
593        replay: Option<crate::llm::types::ProviderReplayMeta>,
594        outcome: ToolDispatchOutcome,
595        tool_correlation_id: TurnActivityId,
596    ) -> CompletedProtocolToolCall {
597        let output = outcome.record.output.clone();
598        let projection_output = output.clone();
599        let projection_tool_name = outcome.record.tool.clone();
600        let projection_args = outcome.record.args.clone();
601        let projection_duration_ms = outcome.record.duration_ms;
602        let projection_call_id = call_id.clone();
603        tokio::task::yield_now().await;
604        let plugins = std::sync::Arc::clone(&self.dispatch.plugins);
605        let projection_context = crate::plugin::ToolResultProjectionContext {
606            session_id: self.dispatch.session_id.clone(),
607            tool_name: projection_tool_name,
608            args: projection_args,
609            output: projection_output,
610            duration_ms: projection_duration_ms,
611            call_id: projection_call_id,
612        };
613        let model_return = match plugins.project_tool_result(projection_context).await {
614            Ok(projected) => projected,
615            Err(err) => ModelToolReturn::text(
616                call_id.clone(),
617                outcome.record.tool.clone(),
618                err.to_string(),
619            ),
620        };
621
622        self.emit_turn_activity(
623            tool_correlation_id,
624            TurnEvent::ToolCallCompleted {
625                call_id: Some(call_id.clone()),
626                name: outcome.record.tool.clone(),
627                args: outcome.record.args.clone(),
628                output: output.clone(),
629                duration_ms: outcome.record.duration_ms,
630            },
631        )
632        .await;
633
634        let record = ToolCallRecord {
635            call_id: Some(call_id.clone()),
636            tool: outcome.record.tool.clone(),
637            args: outcome.record.args.clone(),
638            output: output.clone(),
639            duration_ms: outcome.record.duration_ms,
640        };
641        CompletedProtocolToolCall {
642            completed: crate::sansio::CompletedToolCall {
643                call_id,
644                tool_name: outcome.record.tool,
645                args: outcome.record.args,
646                output,
647                model_return,
648                duration_ms: outcome.record.duration_ms,
649                replay,
650            },
651            record,
652        }
653    }
654
655    pub(crate) async fn pending_completion_dispatch_outcome(
656        &self,
657        tool_name: String,
658        args: serde_json::Value,
659        resolution: crate::Resolution,
660        duration_ms: u64,
661    ) -> ToolDispatchOutcome {
662        let output = crate::tool_result::tool_output_from_completion_resolution(resolution);
663        let result = finalize_tool_result_with_execution_context(
664            self.dispatch.as_ref(),
665            &tool_name,
666            &args,
667            ToolResult::from_output(output),
668            duration_ms,
669        )
670        .await;
671        let output = result.into_done_output().unwrap_or_else(|_| {
672            ToolCallOutput::failure(ToolFailure::runtime(
673                ToolFailureClass::Internal,
674                "pending_tool_not_finalized",
675                "pending tool result reached a completed-output projection path",
676            ))
677        });
678        ToolDispatchOutcome {
679            record: ToolCallRecord {
680                call_id: None,
681                tool: tool_name,
682                args,
683                output,
684                duration_ms,
685            },
686        }
687    }
688
689    async fn await_pending_tool_dispatch_outcome(
690        &self,
691        call_id: &str,
692        parent_invocation: Option<crate::RuntimeInvocation>,
693        pending: crate::tool_dispatch::PendingToolDispatchOutcome,
694        cancellation: Option<tokio_util::sync::CancellationToken>,
695    ) -> ToolDispatchOutcome {
696        let fallback;
697        let parent = if let Some(parent) = parent_invocation.as_ref() {
698            parent
699        } else {
700            fallback = crate::RuntimeInvocation::effect(
701                crate::RuntimeScope::new(&self.dispatch.session_id),
702                format!("tool:{call_id}:await"),
703                crate::RuntimeEffectKind::AwaitEvent,
704                format!("tool:{call_id}:await"),
705            );
706            &fallback
707        };
708        let parent_effect_id = parent.effect_id().unwrap_or("tool");
709        let invocation = crate::runtime::causal::child_effect_invocation(
710            parent,
711            format!("{parent_effect_id}:{call_id}:await"),
712            crate::RuntimeEffectKind::AwaitEvent,
713            format!("{call_id}:await"),
714        );
715        let cancellation = cancellation.unwrap_or_default();
716        let deadline = pending
717            .pending
718            .deadline
719            .map(|duration| self.dispatch.clock.now() + duration);
720        let outcome = self
721            .dispatch
722            .effect_controller
723            .controller()
724            .execute_effect(
725                crate::RuntimeEffectEnvelope::new(
726                    invocation,
727                    crate::RuntimeEffectCommand::AwaitEvent { key: pending.key },
728                ),
729                crate::RuntimeEffectLocalExecutor::await_event_with_clock(
730                    cancellation,
731                    deadline,
732                    std::sync::Arc::clone(&self.dispatch.clock),
733                ),
734            )
735            .await;
736        let resolution = match outcome.and_then(crate::RuntimeEffectOutcome::into_await_event) {
737            Ok(resolution) => resolution,
738            Err(err) => {
739                return ToolDispatchOutcome {
740                    record: ToolCallRecord {
741                        call_id: None,
742                        tool: pending.tool_name,
743                        args: pending.args,
744                        output: ToolCallOutput::failure(ToolFailure::runtime(
745                            ToolFailureClass::Internal,
746                            "pending_tool_completion_failed",
747                            err.to_string(),
748                        )),
749                        duration_ms: pending.duration_ms,
750                    },
751                };
752            }
753        };
754        self.pending_completion_dispatch_outcome(
755            pending.tool_name,
756            pending.args,
757            resolution,
758            pending.duration_ms,
759        )
760        .await
761    }
762
763    pub async fn call_tool_by_id(
764        &self,
765        call_id: String,
766        tool_id: crate::ToolId,
767        args: serde_json::Value,
768        index: usize,
769    ) -> ToolInvocationReply {
770        let executed = self
771            .execute_tool_call_by_id(call_id, tool_id, args, index, None, None, None)
772            .await;
773        let reply = ToolInvocationReply::from_output(executed.completed.output);
774        reply.with_record(executed.record)
775    }
776
777    pub async fn call_tool_by_id_with_child_execution_trace_hook(
778        &self,
779        call_id: String,
780        tool_id: crate::ToolId,
781        args: serde_json::Value,
782        index: usize,
783        trace_hook: crate::ToolChildExecutionTraceHook,
784    ) -> ToolInvocationReply {
785        let executed = self
786            .execute_tool_call_by_id(call_id, tool_id, args, index, None, None, Some(trace_hook))
787            .await;
788        let reply = ToolInvocationReply::from_output(executed.completed.output);
789        reply.with_record(executed.record)
790    }
791
792    pub async fn call_tool_batch(&self, calls: Vec<ToolInvocation>) -> Vec<ToolInvocationReply> {
793        if calls.is_empty() {
794            return Vec::new();
795        }
796
797        let batch_id = deterministic_tool_invocation_batch_id(&calls);
798        let mut replies = vec![None; calls.len()];
799        let mut prepared_entries = Vec::new();
800
801        for (index, call) in calls.into_iter().enumerate() {
802            let Some(manifest) = crate::tool_dispatch::resolve_callable_manifest_by_id(
803                self.dispatch.as_ref(),
804                &call.tool_id,
805            ) else {
806                let outcome = ToolDispatchOutcome {
807                    record: ToolCallRecord {
808                        call_id: Some(call.id.clone()),
809                        tool: call.tool_id.to_string(),
810                        args: call.args,
811                        output: ToolCallOutput::failure(ToolFailure::runtime(
812                            ToolFailureClass::Unavailable,
813                            "tool_unavailable",
814                            format!("Tool id `{}` is unavailable in this session", call.tool_id),
815                        )),
816                        duration_ms: 0,
817                    },
818                };
819                let completed = self
820                    .complete_tool_call(
821                        index,
822                        call.id,
823                        None,
824                        outcome,
825                        TurnActivityId::new(format!("tool:{}", batch_id)),
826                    )
827                    .await;
828                replies[index] = Some(
829                    ToolInvocationReply::from_output(completed.completed.output)
830                        .with_record(completed.record),
831                );
832                continue;
833            };
834
835            let pending = crate::sansio::PendingToolCall {
836                call_id: call.id.clone(),
837                tool_name: manifest.name,
838                args: call.args,
839                replay: None,
840            };
841            match self.prepare_tool_call(pending).await {
842                ToolPreparationOutcome::Prepared(prepared) => {
843                    prepared_entries.push((index, prepared, call.child_execution_trace_hook));
844                }
845                ToolPreparationOutcome::Completed(outcome) => {
846                    let completed = self
847                        .complete_tool_call(
848                            index,
849                            call.id,
850                            None,
851                            *outcome,
852                            TurnActivityId::new(format!("tool:{}", batch_id)),
853                        )
854                        .await;
855                    replies[index] = Some(
856                        ToolInvocationReply::from_output(completed.completed.output)
857                            .with_record(completed.record),
858                    );
859                }
860            }
861        }
862
863        if !prepared_entries.is_empty() {
864            let invocation = self.tool_batch_invocation(&batch_id);
865            let batch = crate::PreparedToolBatch::new(
866                batch_id.clone(),
867                prepared_entries
868                    .iter()
869                    .map(|(_, prepared, _)| prepared.clone())
870                    .collect(),
871            );
872            let child_trace_hooks = prepared_entries
873                .iter()
874                .filter_map(|(_, prepared, hook)| {
875                    hook.clone().map(|hook| (prepared.call_id.clone(), hook))
876                })
877                .collect();
878            let envelope = crate::RuntimeEffectEnvelope::new(
879                invocation.clone(),
880                crate::RuntimeEffectCommand::ToolBatch { batch },
881            );
882            let local_executor =
883                crate::RuntimeEffectLocalExecutor::tool_batch(self.clone(), child_trace_hooks);
884            let raw_outcome = if self.should_execute_child_tool_batch_locally() {
885                local_executor.execute(envelope).await
886            } else {
887                self.dispatch
888                    .effect_controller
889                    .controller()
890                    .execute_effect(envelope, local_executor)
891                    .await
892            };
893            let outcome =
894                match raw_outcome.and_then(crate::RuntimeEffectOutcome::into_tool_batch_effect) {
895                    Ok(outcome) => outcome,
896                    Err(err) => {
897                        for (index, prepared, _) in prepared_entries {
898                            replies[index] = Some(ToolInvocationReply::error(serde_json::json!(
899                                format!("tool batch failed: {err}")
900                            )));
901                            let _ = prepared;
902                        }
903                        return replies
904                            .into_iter()
905                            .map(|reply| reply.expect("every batch reply slot should be filled"))
906                            .collect();
907                    }
908                };
909            if outcome.launches.len() != prepared_entries.len() {
910                let message = format!(
911                    "tool batch returned {} launches for {} prepared calls",
912                    outcome.launches.len(),
913                    prepared_entries.len()
914                );
915                for (index, _, _) in prepared_entries {
916                    replies[index] = Some(ToolInvocationReply::error(serde_json::json!(message)));
917                }
918            } else {
919                for ((index, prepared, _), launch) in
920                    prepared_entries.into_iter().zip(outcome.launches)
921                {
922                    let call_id = prepared.call_id.clone();
923                    let reply = match launch {
924                        crate::runtime::ToolCallLaunch::Done { result } => {
925                            let record = ToolCallRecord {
926                                call_id: Some(result.call_id.clone()),
927                                tool: result.tool_name.clone(),
928                                args: result.args.clone(),
929                                output: result.output.clone(),
930                                duration_ms: result.duration_ms,
931                            };
932                            ToolInvocationReply::from_output(result.output).with_record(record)
933                        }
934                        crate::runtime::ToolCallLaunch::Pending {
935                            key,
936                            pending,
937                            duration_ms,
938                        } => {
939                            let dispatch_outcome = self
940                                .await_pending_tool_dispatch_outcome(
941                                    &call_id,
942                                    Some(invocation.clone()),
943                                    crate::tool_dispatch::PendingToolDispatchOutcome {
944                                        tool_name: prepared.tool_name.clone(),
945                                        args: prepared.args.clone(),
946                                        key,
947                                        pending,
948                                        duration_ms,
949                                    },
950                                    self.cancellation_token.clone(),
951                                )
952                                .await;
953                            let completed = self
954                                .complete_tool_call(
955                                    index,
956                                    call_id.clone(),
957                                    prepared.replay.clone(),
958                                    dispatch_outcome,
959                                    TurnActivityId::new(format!("tool:{call_id}")),
960                                )
961                                .await;
962                            ToolInvocationReply::from_output(completed.completed.output)
963                                .with_record(completed.record)
964                        }
965                    };
966                    replies[index] = Some(reply);
967                }
968            }
969        }
970
971        replies
972            .into_iter()
973            .map(|reply| reply.expect("every batch reply slot should be filled"))
974            .collect()
975    }
976
977    pub async fn start_tool_call(
978        &self,
979        call_id: String,
980        name: String,
981        args: serde_json::Value,
982    ) -> ToolInvocationReply {
983        self.start_tool_process(call_id, name, args).await
984    }
985
986    pub async fn await_tool_handle(
987        &self,
988        call_id: String,
989        handle: serde_json::Value,
990    ) -> ToolInvocationReply {
991        self.await_process_handle(call_id, handle).await
992    }
993
994    pub async fn cancel_tool_handle(
995        &self,
996        call_id: String,
997        handle: serde_json::Value,
998    ) -> ToolInvocationReply {
999        self.cancel_process_handle(call_id, handle).await
1000    }
1001
1002    pub async fn signal_tool_handle(
1003        &self,
1004        call_id: String,
1005        handle: serde_json::Value,
1006        signal_name: String,
1007        payload: serde_json::Value,
1008    ) -> ToolInvocationReply {
1009        self.signal_process_handle(call_id, handle, signal_name, payload)
1010            .await
1011    }
1012}