Skip to main content

lash_core/session/
tool_execution.rs

1use super::execution_context::RuntimeExecutionContext;
2use crate::tool_dispatch::{
3    ToolCallLaunch, ToolDispatchOutcome, ToolPreparationOutcome,
4    dispatch_prepared_tool_attempt_launch_with_execution_context,
5    dispatch_prepared_tool_call_launch_with_execution_context,
6    finalize_tool_result_with_execution_context, mark_retry_exhausted,
7    prepare_tool_call_with_context, retry_after_ms, schedule_tool_batch,
8};
9use crate::{
10    ModelToolReturn, SessionEvent, ToolCallOutput, ToolCallRecord, ToolCancellation, ToolFailure,
11    ToolFailureClass, ToolResult, TurnActivityId, TurnEvent,
12};
13use std::collections::HashMap;
14
15#[derive(Clone)]
16pub struct ToolInvocation {
17    pub id: String,
18    pub tool_id: crate::ToolId,
19    pub args: serde_json::Value,
20    pub child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
21}
22
23impl ToolInvocation {
24    pub fn new(id: impl Into<String>, tool_id: crate::ToolId, args: serde_json::Value) -> Self {
25        Self {
26            id: id.into(),
27            tool_id,
28            args,
29            child_execution_trace_hook: None,
30        }
31    }
32
33    pub fn label(&self) -> String {
34        self.tool_id.to_string()
35    }
36
37    pub fn with_child_execution_trace_hook(
38        mut self,
39        hook: crate::ToolChildExecutionTraceHook,
40    ) -> Self {
41        self.child_execution_trace_hook = Some(hook);
42        self
43    }
44}
45
46impl std::fmt::Debug for ToolInvocation {
47    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48        f.debug_struct("ToolInvocation")
49            .field("id", &self.id)
50            .field("tool_id", &self.tool_id)
51            .field("args", &self.args)
52            .field(
53                "child_execution_trace_hook",
54                &self.child_execution_trace_hook.as_ref().map(|_| "<hook>"),
55            )
56            .finish()
57    }
58}
59
60#[derive(Clone, Debug)]
61pub struct ToolInvocationReply {
62    pub output: ToolCallOutput,
63    pub record: Option<ToolCallRecord>,
64}
65
66impl ToolInvocationReply {
67    pub fn success(value: serde_json::Value) -> Self {
68        Self {
69            output: ToolCallOutput::success(value),
70            record: None,
71        }
72    }
73
74    pub fn error(value: serde_json::Value) -> Self {
75        let message = value
76            .as_str()
77            .map(ToOwned::to_owned)
78            .unwrap_or_else(|| value.to_string());
79        let mut failure = ToolFailure::tool(ToolFailureClass::Execution, "tool_error", message);
80        failure.raw =
81            Some(serde_json::from_value(value).unwrap_or_else(|_| {
82                crate::ToolValue::String("unserializable tool error".to_string())
83            }));
84        Self {
85            output: ToolCallOutput::failure(failure),
86            record: None,
87        }
88    }
89
90    pub fn from_output(output: ToolCallOutput) -> Self {
91        Self {
92            output,
93            record: None,
94        }
95    }
96
97    pub fn cancelled(message: impl Into<String>) -> Self {
98        Self::from_output(ToolCallOutput::cancelled(ToolCancellation::runtime(
99            message,
100        )))
101    }
102
103    pub(crate) fn with_record(mut self, record: ToolCallRecord) -> Self {
104        self.record = Some(record);
105        self
106    }
107}
108
109#[derive(Clone, Debug)]
110pub(crate) struct CompletedProtocolToolCall {
111    pub completed: crate::sansio::CompletedToolCall,
112    pub record: ToolCallRecord,
113}
114
115fn cancelled_runtime_tool_call_launch(
116    call_id: String,
117    tool_name: String,
118    args: serde_json::Value,
119    replay: Option<crate::llm::types::ProviderReplayMeta>,
120) -> crate::runtime::ToolCallLaunch {
121    crate::runtime::ToolCallLaunch::Done {
122        result: cancelled_completed_tool_call(call_id, tool_name, args, replay),
123    }
124}
125
126fn cancelled_completed_tool_call(
127    call_id: String,
128    tool_name: String,
129    args: serde_json::Value,
130    replay: Option<crate::llm::types::ProviderReplayMeta>,
131) -> crate::sansio::CompletedToolCall {
132    let output = ToolCallOutput::cancelled(ToolCancellation::runtime("tool call cancelled"));
133    crate::sansio::CompletedToolCall {
134        call_id: call_id.clone(),
135        tool_name: tool_name.clone(),
136        args,
137        model_return: ModelToolReturn {
138            call_id,
139            tool_name,
140            parts: vec![crate::ModelToolReturnPart::text(
141                "[Tool execution cancelled]\ntool call cancelled".to_string(),
142            )],
143        },
144        output,
145        duration_ms: 0,
146        replay,
147    }
148}
149
150fn runtime_failure_dispatch_outcome(
151    call_id: Option<String>,
152    tool_name: String,
153    args: serde_json::Value,
154    code: impl Into<String>,
155    message: impl Into<String>,
156) -> ToolDispatchOutcome {
157    ToolDispatchOutcome {
158        record: ToolCallRecord {
159            call_id,
160            tool: tool_name,
161            args,
162            output: ToolCallOutput::failure(ToolFailure::runtime(
163                ToolFailureClass::Internal,
164                code,
165                message,
166            )),
167            duration_ms: 0,
168        },
169    }
170}
171
172fn deterministic_tool_invocation_batch_id(calls: &[ToolInvocation]) -> String {
173    let identity = calls
174        .iter()
175        .map(|call| {
176            serde_json::json!({
177                "id": call.id.clone(),
178                "tool_id": call.tool_id.to_string(),
179                "args": call.args.clone(),
180            })
181        })
182        .collect::<Vec<_>>();
183    let digest = crate::stable_hash::stable_json_sha256_hex(&identity)
184        .unwrap_or_else(|_| format!("len-{}", calls.len()));
185    format!("tool-batch:{digest}")
186}
187
188struct CoordinatedToolLaunch {
189    launch: crate::runtime::ToolCallLaunch,
190    triggers: Vec<crate::tool_dispatch::ToolTriggerEffectOutcome>,
191}
192
193impl RuntimeExecutionContext<'_> {
194    fn tool_batch_invocation(&self, batch_id: &str) -> crate::RuntimeInvocation {
195        let suffix = format!("tool-batch:{batch_id}");
196        if let Some(parent) = self.parent_invocation.as_ref() {
197            let parent_effect_id = parent.effect_id().unwrap_or("effect");
198            return crate::runtime::causal::child_effect_invocation(
199                parent,
200                format!("{parent_effect_id}:{suffix}"),
201                crate::RuntimeEffectKind::ToolBatch,
202                suffix,
203            );
204        }
205        let replay_key = format!("{}:{suffix}", self.execution_scope_id());
206        crate::RuntimeInvocation::effect(
207            crate::RuntimeScope::new(self.session_id.clone()),
208            suffix,
209            crate::RuntimeEffectKind::ToolBatch,
210            replay_key,
211        )
212    }
213
214    pub(crate) async fn execute_prepared_tool_batch_launches(
215        &self,
216        batch: crate::PreparedToolBatch,
217        parent_invocation: crate::RuntimeInvocation,
218        child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
219    ) -> Result<crate::ToolBatchEffectOutcome, crate::RuntimeEffectControllerError> {
220        let indexed_tools = batch.calls.into_iter().enumerate().collect::<Vec<_>>();
221        let cancellation = self.cancellation_token.clone().unwrap_or_default();
222        let tool_cancel = cancellation.child_token();
223        let child_trace_hooks = std::sync::Arc::new(child_trace_hooks);
224        if !self
225            .dispatch
226            .effect_controller
227            .controller()
228            .supports_concurrent_effects()
229        {
230            let mut launches = Vec::with_capacity(indexed_tools.len());
231            let mut triggers = Vec::new();
232            let mut context = self.clone().with_cancellation_token(tool_cancel.clone());
233            for (index, child) in indexed_tools {
234                if cancellation.is_cancelled() {
235                    tool_cancel.cancel();
236                    launches.push(cancelled_runtime_tool_call_launch(
237                        child.call.call_id,
238                        child.call.tool_name,
239                        child.call.args,
240                        child.call.replay,
241                    ));
242                    continue;
243                }
244                let child_execution_trace_hook =
245                    child_trace_hooks.get(&child.call.call_id).cloned();
246                let outcome = context
247                    .execute_prepared_tool_batch_child(
248                        child,
249                        index,
250                        parent_invocation.clone(),
251                        child_execution_trace_hook,
252                    )
253                    .await;
254                launches.push(outcome.launch);
255                triggers.extend(outcome.triggers);
256                context = context.with_cancellation_token(tool_cancel.clone());
257            }
258            return Ok(crate::ToolBatchEffectOutcome { launches, triggers });
259        }
260        let child_outcomes = schedule_tool_batch(
261            indexed_tools,
262            |(index, _)| *index,
263            |(_, child)| self.tool_scheduling(&child.call.tool_name),
264            {
265                let context = self.clone();
266                let cancellation = cancellation.clone();
267                let tool_cancel = tool_cancel.clone();
268                let child_trace_hooks = std::sync::Arc::clone(&child_trace_hooks);
269                move |(index, child)| {
270                    let context = context.clone().with_cancellation_token(tool_cancel.clone());
271                    let cancellation = cancellation.clone();
272                    let tool_cancel = tool_cancel.clone();
273                    let parent_invocation = parent_invocation.clone();
274                    let cancelled_tool = child.call.clone();
275                    let child_execution_trace_hook =
276                        child_trace_hooks.get(&child.call.call_id).cloned();
277                    async move {
278                        let tool_call = context.execute_prepared_tool_batch_child(
279                            child,
280                            index,
281                            parent_invocation,
282                            child_execution_trace_hook,
283                        );
284                        tokio::pin!(tool_call);
285                        tokio::select! {
286                            biased;
287                            _ = cancellation.cancelled() => {
288                                tool_cancel.cancel();
289                                let grace = context
290                                    .dispatch
291                                    .clock
292                                    .sleep(std::time::Duration::from_millis(50));
293                                tokio::pin!(grace);
294                                tokio::select! {
295                                    biased;
296                                    outcome = &mut tool_call => outcome,
297                                    _ = &mut grace => CoordinatedToolLaunch {
298                                        launch: cancelled_runtime_tool_call_launch(
299                                            cancelled_tool.call_id,
300                                            cancelled_tool.tool_name,
301                                            cancelled_tool.args,
302                                            cancelled_tool.replay,
303                                        ),
304                                        triggers: Vec::new(),
305                                    },
306                                }
307                            }
308                            outcome = &mut tool_call => outcome,
309                        }
310                    }
311                }
312            },
313        )
314        .await;
315
316        let mut launches = Vec::with_capacity(child_outcomes.len());
317        let mut triggers = Vec::new();
318        for outcome in child_outcomes {
319            launches.push(outcome.launch);
320            triggers.extend(outcome.triggers);
321        }
322        Ok(crate::ToolBatchEffectOutcome { launches, triggers })
323    }
324
325    async fn execute_prepared_tool_batch_child(
326        &self,
327        child: crate::PreparedToolBatchCall,
328        index: usize,
329        parent_invocation: crate::RuntimeInvocation,
330        child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
331    ) -> CoordinatedToolLaunch {
332        let call_id = child.call.call_id.clone();
333        let tool_name = child.call.tool_name.clone();
334        let args = child.call.args.clone();
335        let replay = child.call.replay.clone();
336        let activity_id = TurnActivityId::new(format!("tool:{call_id}"));
337        self.emit_tool_call_started(&call_id, &tool_name, args.clone(), activity_id.clone())
338            .await;
339
340        let retry_policy = crate::tool_dispatch::resolve_callable_manifest_by_id(
341            self.dispatch.as_ref(),
342            &child.call.tool_id,
343        )
344        .map(|manifest| manifest.retry_policy)
345        .unwrap_or(crate::ToolRetryPolicy::Never);
346        let max_attempts = retry_policy.max_attempts().max(1);
347        let mut triggers = Vec::new();
348
349        for attempt in 1..=max_attempts {
350            let attempt_invocation =
351                self.tool_attempt_invocation(&parent_invocation, &child.replay_suffix, attempt);
352            let attempt_outcome = self
353                .dispatch
354                .effect_controller
355                .controller()
356                .execute_effect(
357                    crate::RuntimeEffectEnvelope::new(
358                        attempt_invocation,
359                        crate::RuntimeEffectCommand::ToolAttempt {
360                            call: child.call.clone(),
361                            attempt,
362                            max_attempts,
363                        },
364                    ),
365                    crate::RuntimeEffectLocalExecutor::tool_batch(
366                        self.clone(),
367                        child_execution_trace_hook
368                            .clone()
369                            .map(|hook| {
370                                std::iter::once((child.call.call_id.clone(), hook)).collect()
371                            })
372                            .unwrap_or_default(),
373                    ),
374                )
375                .await;
376            let attempt_outcome = match attempt_outcome {
377                Ok(outcome) => match outcome.into_tool_attempt_effect() {
378                    Ok(outcome) => outcome,
379                    Err(err) => {
380                        let completed = self
381                            .complete_tool_call(
382                                index,
383                                call_id.clone(),
384                                replay,
385                                runtime_failure_dispatch_outcome(
386                                    Some(call_id.clone()),
387                                    tool_name,
388                                    args,
389                                    "tool_attempt_failed",
390                                    err.to_string(),
391                                ),
392                                activity_id,
393                            )
394                            .await;
395                        return CoordinatedToolLaunch {
396                            launch: crate::runtime::ToolCallLaunch::Done {
397                                result: completed.completed,
398                            },
399                            triggers,
400                        };
401                    }
402                },
403                Err(err) => {
404                    let completed = self
405                        .complete_tool_call(
406                            index,
407                            call_id.clone(),
408                            replay,
409                            runtime_failure_dispatch_outcome(
410                                Some(call_id.clone()),
411                                tool_name,
412                                args,
413                                "tool_attempt_failed",
414                                err.to_string(),
415                            ),
416                            activity_id,
417                        )
418                        .await;
419                    return CoordinatedToolLaunch {
420                        launch: crate::runtime::ToolCallLaunch::Done {
421                            result: completed.completed,
422                        },
423                        triggers,
424                    };
425                }
426            };
427            triggers.extend(attempt_outcome.triggers);
428            match attempt_outcome.launch {
429                crate::ToolAttemptLaunch::Pending {
430                    key,
431                    pending,
432                    duration_ms,
433                } => {
434                    let dispatch_outcome = self
435                        .await_pending_tool_dispatch_outcome_with_suffix(
436                            &call_id,
437                            Some(parent_invocation.clone()),
438                            format!("{}:await", child.replay_suffix),
439                            crate::tool_dispatch::PendingToolDispatchOutcome {
440                                tool_name: child.call.tool_name.clone(),
441                                args: child.call.args.clone(),
442                                key,
443                                pending,
444                                duration_ms,
445                            },
446                            self.cancellation_token.clone(),
447                        )
448                        .await;
449                    let completed = self
450                        .complete_tool_call(
451                            index,
452                            call_id.clone(),
453                            child.call.replay.clone(),
454                            dispatch_outcome,
455                            activity_id,
456                        )
457                        .await;
458                    return CoordinatedToolLaunch {
459                        launch: crate::runtime::ToolCallLaunch::Done {
460                            result: completed.completed,
461                        },
462                        triggers,
463                    };
464                }
465                crate::ToolAttemptLaunch::Done { mut record } => {
466                    record.call_id = Some(call_id.clone());
467                    let retry_after = retry_after_ms(
468                        &ToolResult::from_output(record.output.clone()),
469                        retry_policy,
470                        attempt - 1,
471                    );
472                    let Some(retry_after) = retry_after else {
473                        let completed = self
474                            .complete_tool_call(
475                                index,
476                                call_id.clone(),
477                                child.call.replay.clone(),
478                                ToolDispatchOutcome { record },
479                                activity_id,
480                            )
481                            .await;
482                        return CoordinatedToolLaunch {
483                            launch: crate::runtime::ToolCallLaunch::Done {
484                                result: completed.completed,
485                            },
486                            triggers,
487                        };
488                    };
489                    if attempt >= max_attempts {
490                        let exhausted =
491                            mark_retry_exhausted(ToolResult::from_output(record.output), attempt);
492                        record.output = exhausted.into_done_output().unwrap_or_else(|_| {
493                            ToolCallOutput::failure(ToolFailure::runtime(
494                                ToolFailureClass::Internal,
495                                "tool_retry_exhaustion_failed",
496                                "retry exhaustion produced a pending output",
497                            ))
498                        });
499                        let completed = self
500                            .complete_tool_call(
501                                index,
502                                call_id.clone(),
503                                child.call.replay.clone(),
504                                ToolDispatchOutcome { record },
505                                activity_id,
506                            )
507                            .await;
508                        return CoordinatedToolLaunch {
509                            launch: crate::runtime::ToolCallLaunch::Done {
510                                result: completed.completed,
511                            },
512                            triggers,
513                        };
514                    }
515                    if retry_after > 0
516                        && let Err(err) = self
517                            .sleep_before_tool_retry(
518                                &parent_invocation,
519                                &child.replay_suffix,
520                                attempt,
521                                retry_after,
522                            )
523                            .await
524                    {
525                        let completed = self
526                            .complete_tool_call(
527                                index,
528                                call_id.clone(),
529                                child.call.replay.clone(),
530                                runtime_failure_dispatch_outcome(
531                                    Some(call_id.clone()),
532                                    child.call.tool_name.clone(),
533                                    child.call.args.clone(),
534                                    "tool_retry_sleep_failed",
535                                    format!(
536                                        "retry sleep for tool `{}` failed after attempt {attempt}: {err}",
537                                        child.call.tool_name
538                                    ),
539                                ),
540                                activity_id,
541                            )
542                            .await;
543                        return CoordinatedToolLaunch {
544                            launch: crate::runtime::ToolCallLaunch::Done {
545                                result: completed.completed,
546                            },
547                            triggers,
548                        };
549                    }
550                }
551            }
552        }
553
554        let completed = self
555            .complete_tool_call(
556                index,
557                call_id.clone(),
558                child.call.replay,
559                runtime_failure_dispatch_outcome(
560                    Some(call_id),
561                    child.call.tool_name,
562                    child.call.args,
563                    "tool_retry_loop_failed",
564                    "tool retry loop exited without a terminal result",
565                ),
566                activity_id,
567            )
568            .await;
569        CoordinatedToolLaunch {
570            launch: crate::runtime::ToolCallLaunch::Done {
571                result: completed.completed,
572            },
573            triggers,
574        }
575    }
576
577    async fn emit_tool_call_started(
578        &self,
579        call_id: &str,
580        name: &str,
581        args: serde_json::Value,
582        activity_id: TurnActivityId,
583    ) {
584        let _ = self
585            .dispatch
586            .event_tx
587            .send(SessionEvent::ToolCallStart {
588                call_id: Some(call_id.to_string()),
589                name: name.to_string(),
590                args: args.clone(),
591            })
592            .await;
593        self.emit_turn_activity(
594            activity_id,
595            TurnEvent::ToolCallStarted {
596                call_id: Some(call_id.to_string()),
597                name: name.to_string(),
598                args,
599            },
600        )
601        .await;
602    }
603
604    fn tool_attempt_invocation(
605        &self,
606        parent_invocation: &crate::RuntimeInvocation,
607        child_replay_suffix: &str,
608        attempt: u32,
609    ) -> crate::RuntimeInvocation {
610        let suffix = format!("{child_replay_suffix}:attempt:{attempt}");
611        let parent_effect_id = parent_invocation.effect_id().unwrap_or("tool-batch");
612        crate::runtime::causal::child_effect_invocation(
613            parent_invocation,
614            format!("{parent_effect_id}:{suffix}"),
615            crate::RuntimeEffectKind::ToolAttempt,
616            suffix,
617        )
618    }
619
620    async fn sleep_before_tool_retry(
621        &self,
622        parent_invocation: &crate::RuntimeInvocation,
623        child_replay_suffix: &str,
624        attempt: u32,
625        retry_after_ms: u64,
626    ) -> Result<(), crate::RuntimeEffectControllerError> {
627        let suffix = format!("{child_replay_suffix}:attempt:{attempt}:sleep");
628        let parent_effect_id = parent_invocation.effect_id().unwrap_or("tool-batch");
629        let invocation = crate::runtime::causal::child_effect_invocation(
630            parent_invocation,
631            format!("{parent_effect_id}:{suffix}"),
632            crate::RuntimeEffectKind::Sleep,
633            suffix,
634        );
635        let cancellation = self.cancellation_token.clone().unwrap_or_default();
636        let outcome = self
637            .dispatch
638            .effect_controller
639            .controller()
640            .execute_effect(
641                crate::RuntimeEffectEnvelope::new(
642                    invocation,
643                    crate::RuntimeEffectCommand::Sleep {
644                        duration_ms: retry_after_ms,
645                    },
646                ),
647                crate::RuntimeEffectLocalExecutor::sleep_with_clock(
648                    cancellation,
649                    std::sync::Arc::clone(&self.dispatch.clock),
650                ),
651            )
652            .await?;
653        match outcome {
654            crate::RuntimeEffectOutcome::Sleep => Ok(()),
655            other => Err(crate::RuntimeEffectControllerError::new(
656                "runtime_effect_wrong_outcome",
657                format!("expected sleep outcome, got {}", other.kind().as_str()),
658            )),
659        }
660    }
661
662    #[expect(
663        clippy::too_many_arguments,
664        reason = "tool execution carries explicit runtime call metadata"
665    )]
666    pub(crate) async fn execute_tool_call(
667        &self,
668        call_id: String,
669        name: String,
670        args: serde_json::Value,
671        index: usize,
672        replay: Option<crate::llm::types::ProviderReplayMeta>,
673        parent_invocation: Option<crate::RuntimeInvocation>,
674        child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
675    ) -> CompletedProtocolToolCall {
676        let _ = self
677            .dispatch
678            .event_tx
679            .send(SessionEvent::ToolCallStart {
680                call_id: Some(call_id.clone()),
681                name: name.clone(),
682                args: args.clone(),
683            })
684            .await;
685        let tool_correlation_id = TurnActivityId::new(format!("tool:{call_id}"));
686        self.emit_turn_activity(
687            tool_correlation_id.clone(),
688            TurnEvent::ToolCallStarted {
689                call_id: Some(call_id.clone()),
690                name: name.clone(),
691                args: args.clone(),
692            },
693        )
694        .await;
695
696        let parent_invocation = parent_invocation.or_else(|| self.parent_invocation.clone());
697        let mut dispatch = (*self.dispatch).clone();
698        dispatch.parent_invocation = parent_invocation.clone();
699        let pending = crate::sansio::PendingToolCall {
700            call_id: call_id.clone(),
701            tool_name: name,
702            args,
703            replay: replay.clone(),
704        };
705        let launch = match prepare_tool_call_with_context(&dispatch, pending, Some(call_id.clone()))
706            .await
707        {
708            ToolPreparationOutcome::Prepared(prepared) => {
709                let dispatch_context = std::sync::Arc::new(dispatch.clone());
710                let runtime_context = if let Some(parent_invocation) = parent_invocation.clone() {
711                    self.clone().with_parent_invocation(parent_invocation)
712                } else {
713                    self.clone()
714                };
715                let mut tool_context =
716                    crate::ToolContext::from_dispatch(std::sync::Arc::clone(&dispatch_context))
717                        .runtime_execution_context(runtime_context)
718                        .prepared_call(&prepared)
719                        .cancellation_token(self.cancellation_token.clone())
720                        .runtime_process_id(self.runtime_process_id.clone())
721                        .parent_invocation(parent_invocation.clone())
722                        .child_execution_trace_hook(child_execution_trace_hook.clone());
723                if let Some(process_events) = self.process_event_context.as_ref() {
724                    tool_context = tool_context.process_events(
725                        process_events.process_id.clone(),
726                        std::sync::Arc::clone(&process_events.registry),
727                        process_events.store.clone(),
728                        process_events.session_store_factory.clone(),
729                        process_events.queued_work_driver.clone(),
730                    );
731                }
732                let tool_context = tool_context.build();
733                dispatch_prepared_tool_call_launch_with_execution_context(
734                    dispatch_context.as_ref(),
735                    prepared,
736                    None,
737                    tool_context,
738                )
739                .await
740            }
741            ToolPreparationOutcome::Completed(outcome) => ToolCallLaunch::Done(*outcome),
742        };
743        let mut outcome = match launch {
744            ToolCallLaunch::Done(outcome) => outcome,
745            ToolCallLaunch::Pending(pending) => {
746                self.await_pending_tool_dispatch_outcome(
747                    &call_id,
748                    parent_invocation.clone(),
749                    pending,
750                    self.cancellation_token.clone(),
751                )
752                .await
753            }
754        };
755        outcome.record.call_id = Some(call_id.clone());
756
757        self.complete_tool_call(index, call_id, replay, outcome, tool_correlation_id)
758            .await
759    }
760
761    #[expect(
762        clippy::too_many_arguments,
763        reason = "tool execution carries explicit runtime call metadata"
764    )]
765    pub(crate) async fn execute_tool_call_by_id(
766        &self,
767        call_id: String,
768        tool_id: crate::ToolId,
769        args: serde_json::Value,
770        index: usize,
771        replay: Option<crate::llm::types::ProviderReplayMeta>,
772        parent_invocation: Option<crate::RuntimeInvocation>,
773        child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
774    ) -> CompletedProtocolToolCall {
775        let Some(manifest) =
776            crate::tool_dispatch::resolve_callable_manifest_by_id(self.dispatch.as_ref(), &tool_id)
777        else {
778            let outcome = ToolDispatchOutcome {
779                record: ToolCallRecord {
780                    call_id: Some(call_id.clone()),
781                    tool: tool_id.to_string(),
782                    args,
783                    output: ToolCallOutput::failure(ToolFailure::runtime(
784                        ToolFailureClass::Unavailable,
785                        "tool_unavailable",
786                        format!("Tool id `{tool_id}` is unavailable in this session"),
787                    )),
788                    duration_ms: 0,
789                },
790            };
791            let activity_id = TurnActivityId::new(format!("tool:{call_id}"));
792            return self
793                .complete_tool_call(index, call_id, replay, outcome, activity_id)
794                .await;
795        };
796        self.execute_tool_call(
797            call_id,
798            manifest.name,
799            args,
800            index,
801            replay,
802            parent_invocation,
803            child_execution_trace_hook,
804        )
805        .await
806    }
807
808    pub(crate) async fn prepare_tool_call(
809        &self,
810        pending: crate::sansio::PendingToolCall,
811    ) -> ToolPreparationOutcome {
812        let call_id = Some(pending.call_id.clone());
813        prepare_tool_call_with_context(self.dispatch.as_ref(), pending, call_id).await
814    }
815
816    pub(crate) async fn execute_prepared_tool_attempt_effect(
817        &self,
818        prepared: crate::PreparedToolCall,
819        attempt: u32,
820        max_attempts: u32,
821        attempt_invocation: crate::RuntimeInvocation,
822        child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
823    ) -> Result<crate::ToolAttemptEffectOutcome, crate::RuntimeEffectControllerError> {
824        let call_id = prepared.call_id.clone();
825        let mut attempt_dispatch = (*self.dispatch).clone();
826        attempt_dispatch.parent_invocation = Some(attempt_invocation.clone());
827        attempt_dispatch.trigger_outcomes =
828            crate::tool_dispatch::ToolTriggerOutcomeBuffer::default();
829        let attempt_dispatch = std::sync::Arc::new(attempt_dispatch);
830        let mut attempt_context = self.clone();
831        attempt_context.dispatch = std::sync::Arc::clone(&attempt_dispatch);
832        attempt_context.parent_invocation = Some(attempt_invocation.clone());
833
834        let mut tool_context =
835            crate::ToolContext::from_dispatch(std::sync::Arc::clone(&attempt_dispatch))
836                .runtime_execution_context(attempt_context.clone())
837                .prepared_call(&prepared)
838                .cancellation_token(self.cancellation_token.clone())
839                .runtime_process_id(self.runtime_process_id.clone())
840                .parent_invocation(Some(attempt_invocation))
841                .child_execution_trace_hook(child_execution_trace_hook);
842        if let Some(process_events) = self.process_event_context.as_ref() {
843            tool_context = tool_context.process_events(
844                process_events.process_id.clone(),
845                std::sync::Arc::clone(&process_events.registry),
846                process_events.store.clone(),
847                process_events.session_store_factory.clone(),
848                process_events.queued_work_driver.clone(),
849            );
850        }
851        let tool_context = tool_context.build();
852        let launch = match Box::pin(
853            dispatch_prepared_tool_attempt_launch_with_execution_context(
854                attempt_dispatch.as_ref(),
855                prepared,
856                attempt,
857                max_attempts,
858                None,
859                tool_context,
860            ),
861        )
862        .await
863        {
864            ToolCallLaunch::Done(outcome) => {
865                let mut record = outcome.record;
866                record.call_id = Some(call_id);
867                crate::ToolAttemptLaunch::Done { record }
868            }
869            ToolCallLaunch::Pending(pending) => crate::ToolAttemptLaunch::Pending {
870                key: pending.key,
871                pending: pending.pending,
872                duration_ms: pending.duration_ms,
873            },
874        };
875        let triggers = attempt_context
876            .drain_tool_trigger_outcomes()
877            .map_err(|err| {
878                crate::RuntimeEffectControllerError::new(
879                    "tool_trigger_outcome_drain",
880                    err.to_string(),
881                )
882            })?;
883        Ok(crate::ToolAttemptEffectOutcome { launch, triggers })
884    }
885
886    pub(super) async fn await_process_with_cancellation(
887        &self,
888        process_id: &str,
889        parent_invocation: Option<crate::RuntimeInvocation>,
890        cancellation: Option<tokio_util::sync::CancellationToken>,
891    ) -> Result<crate::ProcessAwaitOutput, crate::PluginError> {
892        let _phase = self.named_phase("process.await_handle");
893        if let Some(cancellation) = cancellation {
894            tokio::select! {
895                result = self.dispatch.processes.await_process(
896                    process_id,
897                    self.process_scope(parent_invocation.clone()),
898                ) => result,
899                _ = cancellation.cancelled() => {
900                    let _ = self.dispatch.processes.cancel(
901                        &self.dispatch.session_id,
902                        process_id,
903                        self.process_scope(parent_invocation.clone()),
904                    ).await;
905                    self.dispatch.processes.await_process(
906                        process_id,
907                        self.process_scope(parent_invocation),
908                    ).await
909                }
910            }
911        } else {
912            self.dispatch
913                .processes
914                .await_process(process_id, self.process_scope(parent_invocation))
915                .await
916        }
917    }
918
919    pub(crate) async fn complete_tool_call(
920        &self,
921        _index: usize,
922        call_id: String,
923        replay: Option<crate::llm::types::ProviderReplayMeta>,
924        outcome: ToolDispatchOutcome,
925        tool_correlation_id: TurnActivityId,
926    ) -> CompletedProtocolToolCall {
927        let output = outcome.record.output.clone();
928        let projection_output = output.clone();
929        let projection_tool_name = outcome.record.tool.clone();
930        let projection_args = outcome.record.args.clone();
931        let projection_duration_ms = outcome.record.duration_ms;
932        let projection_call_id = call_id.clone();
933        tokio::task::yield_now().await;
934        let plugins = std::sync::Arc::clone(&self.dispatch.plugins);
935        let projection_context = crate::plugin::ToolResultProjectionContext {
936            session_id: self.dispatch.session_id.clone(),
937            tool_name: projection_tool_name,
938            args: projection_args,
939            output: projection_output,
940            duration_ms: projection_duration_ms,
941            call_id: projection_call_id,
942        };
943        let model_return = match plugins.project_tool_result(projection_context).await {
944            Ok(projected) => projected,
945            Err(err) => ModelToolReturn::text(
946                call_id.clone(),
947                outcome.record.tool.clone(),
948                err.to_string(),
949            ),
950        };
951
952        self.emit_turn_activity(
953            tool_correlation_id,
954            TurnEvent::ToolCallCompleted {
955                call_id: Some(call_id.clone()),
956                name: outcome.record.tool.clone(),
957                args: outcome.record.args.clone(),
958                output: output.clone(),
959                duration_ms: outcome.record.duration_ms,
960            },
961        )
962        .await;
963
964        let record = ToolCallRecord {
965            call_id: Some(call_id.clone()),
966            tool: outcome.record.tool.clone(),
967            args: outcome.record.args.clone(),
968            output: output.clone(),
969            duration_ms: outcome.record.duration_ms,
970        };
971        CompletedProtocolToolCall {
972            completed: crate::sansio::CompletedToolCall {
973                call_id,
974                tool_name: outcome.record.tool,
975                args: outcome.record.args,
976                output,
977                model_return,
978                duration_ms: outcome.record.duration_ms,
979                replay,
980            },
981            record,
982        }
983    }
984
985    pub(crate) async fn pending_completion_dispatch_outcome(
986        &self,
987        tool_name: String,
988        args: serde_json::Value,
989        resolution: crate::Resolution,
990        duration_ms: u64,
991    ) -> ToolDispatchOutcome {
992        let output = crate::tool_result::tool_output_from_completion_resolution(resolution);
993        let result = finalize_tool_result_with_execution_context(
994            self.dispatch.as_ref(),
995            &tool_name,
996            &args,
997            ToolResult::from_output(output),
998            duration_ms,
999        )
1000        .await;
1001        let output = result.into_done_output().unwrap_or_else(|_| {
1002            ToolCallOutput::failure(ToolFailure::runtime(
1003                ToolFailureClass::Internal,
1004                "pending_tool_not_finalized",
1005                "pending tool result reached a completed-output projection path",
1006            ))
1007        });
1008        ToolDispatchOutcome {
1009            record: ToolCallRecord {
1010                call_id: None,
1011                tool: tool_name,
1012                args,
1013                output,
1014                duration_ms,
1015            },
1016        }
1017    }
1018
1019    async fn await_pending_tool_dispatch_outcome(
1020        &self,
1021        call_id: &str,
1022        parent_invocation: Option<crate::RuntimeInvocation>,
1023        pending: crate::tool_dispatch::PendingToolDispatchOutcome,
1024        cancellation: Option<tokio_util::sync::CancellationToken>,
1025    ) -> ToolDispatchOutcome {
1026        self.await_pending_tool_dispatch_outcome_with_suffix(
1027            call_id,
1028            parent_invocation,
1029            format!("{call_id}:await"),
1030            pending,
1031            cancellation,
1032        )
1033        .await
1034    }
1035
1036    async fn await_pending_tool_dispatch_outcome_with_suffix(
1037        &self,
1038        call_id: &str,
1039        parent_invocation: Option<crate::RuntimeInvocation>,
1040        replay_suffix: String,
1041        pending: crate::tool_dispatch::PendingToolDispatchOutcome,
1042        cancellation: Option<tokio_util::sync::CancellationToken>,
1043    ) -> ToolDispatchOutcome {
1044        let fallback;
1045        let parent = if let Some(parent) = parent_invocation.as_ref() {
1046            parent
1047        } else {
1048            fallback = crate::RuntimeInvocation::effect(
1049                crate::RuntimeScope::new(&self.dispatch.session_id),
1050                format!("tool:{call_id}:await"),
1051                crate::RuntimeEffectKind::AwaitEvent,
1052                format!("tool:{call_id}:await"),
1053            );
1054            &fallback
1055        };
1056        let parent_effect_id = parent.effect_id().unwrap_or("tool");
1057        let invocation = crate::runtime::causal::child_effect_invocation(
1058            parent,
1059            format!("{parent_effect_id}:{replay_suffix}"),
1060            crate::RuntimeEffectKind::AwaitEvent,
1061            replay_suffix,
1062        );
1063        let cancellation = cancellation.unwrap_or_default();
1064        let deadline = pending
1065            .pending
1066            .deadline
1067            .map(|duration| self.dispatch.clock.now() + duration);
1068        let outcome = self
1069            .dispatch
1070            .effect_controller
1071            .controller()
1072            .execute_effect(
1073                crate::RuntimeEffectEnvelope::new(
1074                    invocation,
1075                    crate::RuntimeEffectCommand::AwaitEvent { key: pending.key },
1076                ),
1077                crate::RuntimeEffectLocalExecutor::await_event_with_clock(
1078                    cancellation,
1079                    deadline,
1080                    std::sync::Arc::clone(&self.dispatch.clock),
1081                ),
1082            )
1083            .await;
1084        let resolution = match outcome.and_then(crate::RuntimeEffectOutcome::into_await_event) {
1085            Ok(resolution) => resolution,
1086            Err(err) => {
1087                return ToolDispatchOutcome {
1088                    record: ToolCallRecord {
1089                        call_id: None,
1090                        tool: pending.tool_name,
1091                        args: pending.args,
1092                        output: ToolCallOutput::failure(ToolFailure::runtime(
1093                            ToolFailureClass::Internal,
1094                            "pending_tool_completion_failed",
1095                            err.to_string(),
1096                        )),
1097                        duration_ms: pending.duration_ms,
1098                    },
1099                };
1100            }
1101        };
1102        self.pending_completion_dispatch_outcome(
1103            pending.tool_name,
1104            pending.args,
1105            resolution,
1106            pending.duration_ms,
1107        )
1108        .await
1109    }
1110
1111    pub async fn call_tool_by_id(
1112        &self,
1113        call_id: String,
1114        tool_id: crate::ToolId,
1115        args: serde_json::Value,
1116        index: usize,
1117    ) -> ToolInvocationReply {
1118        let executed = self
1119            .execute_tool_call_by_id(call_id, tool_id, args, index, None, None, None)
1120            .await;
1121        let reply = ToolInvocationReply::from_output(executed.completed.output);
1122        reply.with_record(executed.record)
1123    }
1124
1125    pub async fn call_tool_by_id_with_child_execution_trace_hook(
1126        &self,
1127        call_id: String,
1128        tool_id: crate::ToolId,
1129        args: serde_json::Value,
1130        index: usize,
1131        trace_hook: crate::ToolChildExecutionTraceHook,
1132    ) -> ToolInvocationReply {
1133        let executed = self
1134            .execute_tool_call_by_id(call_id, tool_id, args, index, None, None, Some(trace_hook))
1135            .await;
1136        let reply = ToolInvocationReply::from_output(executed.completed.output);
1137        reply.with_record(executed.record)
1138    }
1139
1140    pub async fn call_tool_batch(&self, calls: Vec<ToolInvocation>) -> Vec<ToolInvocationReply> {
1141        if calls.is_empty() {
1142            return Vec::new();
1143        }
1144
1145        let batch_id = deterministic_tool_invocation_batch_id(&calls);
1146        let mut replies = vec![None; calls.len()];
1147        let mut prepared_entries = Vec::new();
1148
1149        for (index, call) in calls.into_iter().enumerate() {
1150            let Some(manifest) = crate::tool_dispatch::resolve_callable_manifest_by_id(
1151                self.dispatch.as_ref(),
1152                &call.tool_id,
1153            ) else {
1154                let outcome = ToolDispatchOutcome {
1155                    record: ToolCallRecord {
1156                        call_id: Some(call.id.clone()),
1157                        tool: call.tool_id.to_string(),
1158                        args: call.args,
1159                        output: ToolCallOutput::failure(ToolFailure::runtime(
1160                            ToolFailureClass::Unavailable,
1161                            "tool_unavailable",
1162                            format!("Tool id `{}` is unavailable in this session", call.tool_id),
1163                        )),
1164                        duration_ms: 0,
1165                    },
1166                };
1167                let completed = self
1168                    .complete_tool_call(
1169                        index,
1170                        call.id,
1171                        None,
1172                        outcome,
1173                        TurnActivityId::new(format!("tool:{}", batch_id)),
1174                    )
1175                    .await;
1176                replies[index] = Some(
1177                    ToolInvocationReply::from_output(completed.completed.output)
1178                        .with_record(completed.record),
1179                );
1180                continue;
1181            };
1182
1183            let pending = crate::sansio::PendingToolCall {
1184                call_id: call.id.clone(),
1185                tool_name: manifest.name,
1186                args: call.args,
1187                replay: None,
1188            };
1189            match self.prepare_tool_call(pending).await {
1190                ToolPreparationOutcome::Prepared(prepared) => {
1191                    prepared_entries.push((index, prepared, call.child_execution_trace_hook));
1192                }
1193                ToolPreparationOutcome::Completed(outcome) => {
1194                    let completed = self
1195                        .complete_tool_call(
1196                            index,
1197                            call.id,
1198                            None,
1199                            *outcome,
1200                            TurnActivityId::new(format!("tool:{}", batch_id)),
1201                        )
1202                        .await;
1203                    replies[index] = Some(
1204                        ToolInvocationReply::from_output(completed.completed.output)
1205                            .with_record(completed.record),
1206                    );
1207                }
1208            }
1209        }
1210
1211        if !prepared_entries.is_empty() {
1212            let invocation = self.tool_batch_invocation(&batch_id);
1213            let batch = crate::PreparedToolBatch::new(
1214                batch_id.clone(),
1215                prepared_entries
1216                    .iter()
1217                    .map(|(_, prepared, _)| prepared.clone())
1218                    .collect(),
1219            );
1220            let child_trace_hooks = prepared_entries
1221                .iter()
1222                .filter_map(|(_, prepared, hook)| {
1223                    hook.clone().map(|hook| (prepared.call_id.clone(), hook))
1224                })
1225                .collect();
1226            let envelope = crate::RuntimeEffectEnvelope::new(
1227                invocation.clone(),
1228                crate::RuntimeEffectCommand::ToolBatch { batch },
1229            );
1230            let local_executor =
1231                crate::RuntimeEffectLocalExecutor::tool_batch(self.clone(), child_trace_hooks);
1232            let raw_outcome = self
1233                .dispatch
1234                .effect_controller
1235                .controller()
1236                .execute_effect(envelope, local_executor)
1237                .await;
1238            let outcome =
1239                match raw_outcome.and_then(crate::RuntimeEffectOutcome::into_tool_batch_effect) {
1240                    Ok(outcome) => outcome,
1241                    Err(err) => {
1242                        for (index, prepared, _) in prepared_entries {
1243                            replies[index] = Some(ToolInvocationReply::error(serde_json::json!(
1244                                format!("tool batch failed: {err}")
1245                            )));
1246                            let _ = prepared;
1247                        }
1248                        return replies
1249                            .into_iter()
1250                            .map(|reply| reply.expect("every batch reply slot should be filled"))
1251                            .collect();
1252                    }
1253                };
1254            if outcome.launches.len() != prepared_entries.len() {
1255                let message = format!(
1256                    "tool batch returned {} launches for {} prepared calls",
1257                    outcome.launches.len(),
1258                    prepared_entries.len()
1259                );
1260                for (index, _, _) in prepared_entries {
1261                    replies[index] = Some(ToolInvocationReply::error(serde_json::json!(message)));
1262                }
1263            } else {
1264                for ((index, prepared, _), launch) in
1265                    prepared_entries.into_iter().zip(outcome.launches)
1266                {
1267                    let call_id = prepared.call_id.clone();
1268                    let reply = match launch {
1269                        crate::runtime::ToolCallLaunch::Done { result } => {
1270                            let record = ToolCallRecord {
1271                                call_id: Some(result.call_id.clone()),
1272                                tool: result.tool_name.clone(),
1273                                args: result.args.clone(),
1274                                output: result.output.clone(),
1275                                duration_ms: result.duration_ms,
1276                            };
1277                            ToolInvocationReply::from_output(result.output).with_record(record)
1278                        }
1279                        crate::runtime::ToolCallLaunch::Pending {
1280                            key,
1281                            pending,
1282                            duration_ms,
1283                        } => {
1284                            let dispatch_outcome = self
1285                                .await_pending_tool_dispatch_outcome(
1286                                    &call_id,
1287                                    Some(invocation.clone()),
1288                                    crate::tool_dispatch::PendingToolDispatchOutcome {
1289                                        tool_name: prepared.tool_name.clone(),
1290                                        args: prepared.args.clone(),
1291                                        key,
1292                                        pending,
1293                                        duration_ms,
1294                                    },
1295                                    self.cancellation_token.clone(),
1296                                )
1297                                .await;
1298                            let completed = self
1299                                .complete_tool_call(
1300                                    index,
1301                                    call_id.clone(),
1302                                    prepared.replay.clone(),
1303                                    dispatch_outcome,
1304                                    TurnActivityId::new(format!("tool:{call_id}")),
1305                                )
1306                                .await;
1307                            ToolInvocationReply::from_output(completed.completed.output)
1308                                .with_record(completed.record)
1309                        }
1310                    };
1311                    replies[index] = Some(reply);
1312                }
1313            }
1314        }
1315
1316        replies
1317            .into_iter()
1318            .map(|reply| reply.expect("every batch reply slot should be filled"))
1319            .collect()
1320    }
1321
1322    pub async fn start_tool_call(
1323        &self,
1324        call_id: String,
1325        name: String,
1326        args: serde_json::Value,
1327    ) -> ToolInvocationReply {
1328        self.start_tool_process(call_id, name, args).await
1329    }
1330
1331    pub async fn await_tool_handle(
1332        &self,
1333        call_id: String,
1334        handle: serde_json::Value,
1335    ) -> ToolInvocationReply {
1336        self.await_process_handle(call_id, handle).await
1337    }
1338
1339    pub async fn cancel_tool_handle(
1340        &self,
1341        call_id: String,
1342        handle: serde_json::Value,
1343    ) -> ToolInvocationReply {
1344        self.cancel_process_handle(call_id, handle).await
1345    }
1346
1347    pub async fn signal_tool_handle(
1348        &self,
1349        call_id: String,
1350        handle: serde_json::Value,
1351        signal_name: String,
1352        payload: serde_json::Value,
1353    ) -> ToolInvocationReply {
1354        self.signal_process_handle(call_id, handle, signal_name, payload)
1355            .await
1356    }
1357}