1use super::execution_context::RuntimeExecutionContext;
2use crate::tool_dispatch::{
3 ToolCallLaunch, ToolDispatchOutcome, ToolPreparationOutcome,
4 dispatch_granted_prepared_tool_attempt_launch_with_execution_context,
5 dispatch_granted_prepared_tool_call_launch_with_execution_context,
6 dispatch_prepared_tool_attempt_launch_with_execution_context,
7 dispatch_prepared_tool_call_launch_with_execution_context,
8 finalize_tool_result_with_execution_context, mark_retry_exhausted,
9 prepare_granted_tool_call_with_context, prepare_tool_call_with_context, retry_after_ms,
10 schedule_tool_batch,
11};
12use crate::{
13 ModelToolReturn, SessionEvent, ToolCallOutput, ToolCallRecord, ToolCancellation, ToolFailure,
14 ToolFailureClass, ToolResult, TurnActivityId, TurnEvent,
15};
16use std::collections::HashMap;
17
18#[derive(Clone)]
19pub struct ToolInvocation {
20 pub id: String,
21 pub tool_id: crate::ToolId,
22 pub args: serde_json::Value,
23 pub execution_grant: Option<Box<crate::ToolExecutionGrant>>,
24 pub child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
25}
26
27impl ToolInvocation {
28 pub fn new(id: impl Into<String>, tool_id: crate::ToolId, args: serde_json::Value) -> Self {
29 Self {
30 id: id.into(),
31 tool_id,
32 args,
33 execution_grant: None,
34 child_execution_trace_hook: None,
35 }
36 }
37
38 pub fn with_execution_grant(mut self, grant: crate::ToolExecutionGrant) -> Self {
39 self.execution_grant = Some(Box::new(grant));
40 self
41 }
42
43 pub fn label(&self) -> String {
44 self.tool_id.to_string()
45 }
46
47 pub fn with_child_execution_trace_hook(
48 mut self,
49 hook: crate::ToolChildExecutionTraceHook,
50 ) -> Self {
51 self.child_execution_trace_hook = Some(hook);
52 self
53 }
54}
55
56impl std::fmt::Debug for ToolInvocation {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 f.debug_struct("ToolInvocation")
59 .field("id", &self.id)
60 .field("tool_id", &self.tool_id)
61 .field("args", &self.args)
62 .field(
63 "execution_grant",
64 &self.execution_grant.as_ref().map(|_| "<grant>"),
65 )
66 .field(
67 "child_execution_trace_hook",
68 &self.child_execution_trace_hook.as_ref().map(|_| "<hook>"),
69 )
70 .finish()
71 }
72}
73
74#[derive(Clone, Debug)]
75pub struct ToolInvocationReply {
76 pub output: ToolCallOutput,
77 pub record: Option<ToolCallRecord>,
78}
79
80impl ToolInvocationReply {
81 pub fn success(value: serde_json::Value) -> Self {
82 Self {
83 output: ToolCallOutput::success(value),
84 record: None,
85 }
86 }
87
88 pub fn error(value: serde_json::Value) -> Self {
89 let message = value
90 .as_str()
91 .map(ToOwned::to_owned)
92 .unwrap_or_else(|| value.to_string());
93 let mut failure = ToolFailure::tool(ToolFailureClass::Execution, "tool_error", message);
94 failure.raw =
95 Some(serde_json::from_value(value).unwrap_or_else(|_| {
96 crate::ToolValue::String("unserializable tool error".to_string())
97 }));
98 Self {
99 output: ToolCallOutput::failure(failure),
100 record: None,
101 }
102 }
103
104 pub fn from_output(output: ToolCallOutput) -> Self {
105 Self {
106 output,
107 record: None,
108 }
109 }
110
111 pub fn cancelled(message: impl Into<String>) -> Self {
112 Self::from_output(ToolCallOutput::cancelled(ToolCancellation::runtime(
113 message,
114 )))
115 }
116
117 pub(crate) fn with_record(mut self, record: ToolCallRecord) -> Self {
118 self.record = Some(record);
119 self
120 }
121}
122
123#[derive(Clone, Debug)]
124pub(crate) struct CompletedProtocolToolCall {
125 pub completed: crate::sansio::CompletedToolCall,
126 pub record: ToolCallRecord,
127}
128
129fn cancelled_runtime_tool_call_launch(
130 call_id: String,
131 tool_name: String,
132 args: serde_json::Value,
133 replay: Option<crate::llm::types::ProviderReplayMeta>,
134) -> crate::runtime::ToolCallLaunch {
135 crate::runtime::ToolCallLaunch::Done {
136 result: cancelled_completed_tool_call(call_id, tool_name, args, replay),
137 }
138}
139
140fn cancelled_completed_tool_call(
141 call_id: String,
142 tool_name: String,
143 args: serde_json::Value,
144 replay: Option<crate::llm::types::ProviderReplayMeta>,
145) -> crate::sansio::CompletedToolCall {
146 let output = ToolCallOutput::cancelled(ToolCancellation::runtime("tool call cancelled"));
147 crate::sansio::CompletedToolCall {
148 call_id: call_id.clone(),
149 tool_name: tool_name.clone(),
150 args,
151 model_return: ModelToolReturn {
152 call_id,
153 tool_name,
154 parts: vec![crate::ModelToolReturnPart::text(
155 "[Tool execution cancelled]\ntool call cancelled".to_string(),
156 )],
157 },
158 output,
159 duration_ms: 0,
160 replay,
161 }
162}
163
164fn runtime_failure_dispatch_outcome(
165 call_id: Option<String>,
166 tool_name: String,
167 args: serde_json::Value,
168 code: impl Into<String>,
169 message: impl Into<String>,
170) -> ToolDispatchOutcome {
171 ToolDispatchOutcome {
172 record: ToolCallRecord {
173 call_id,
174 tool: tool_name,
175 args,
176 output: ToolCallOutput::failure(ToolFailure::runtime(
177 ToolFailureClass::Internal,
178 code,
179 message,
180 )),
181 duration_ms: 0,
182 },
183 }
184}
185
186fn deterministic_tool_invocation_batch_id(calls: &[ToolInvocation]) -> String {
187 let identity = calls
188 .iter()
189 .map(|call| {
190 serde_json::json!({
191 "id": call.id.clone(),
192 "tool_id": call.tool_id.to_string(),
193 "args": call.args.clone(),
194 "execution_grant": call.execution_grant.as_ref().map(|grant| serde_json::json!({
195 "tool_id": grant.manifest.id.to_string(),
196 "source_id": grant.source_id.clone(),
197 "execution_binding": grant.execution_binding.clone(),
198 })),
199 })
200 })
201 .collect::<Vec<_>>();
202 let digest = crate::stable_hash::stable_json_sha256_hex(&identity)
203 .unwrap_or_else(|_| format!("len-{}", calls.len()));
204 format!("tool-batch:{digest}")
205}
206
207struct CoordinatedToolLaunch {
208 launch: crate::runtime::ToolCallLaunch,
209 triggers: Vec<crate::tool_dispatch::ToolTriggerEffectOutcome>,
210}
211
212impl RuntimeExecutionContext<'_> {
213 fn tool_batch_invocation(&self, batch_id: &str) -> crate::RuntimeInvocation {
214 let suffix = format!("tool-batch:{batch_id}");
215 if let Some(parent) = self.parent_invocation.as_ref() {
216 let parent_effect_id = parent.effect_id().unwrap_or("effect");
217 return crate::runtime::causal::child_effect_invocation(
218 parent,
219 format!("{parent_effect_id}:{suffix}"),
220 crate::RuntimeEffectKind::ToolBatch,
221 suffix,
222 );
223 }
224 let replay_key = format!("{}:{suffix}", self.execution_scope_id());
225 crate::RuntimeInvocation::effect(
226 crate::RuntimeScope::new(self.session_id.clone()),
227 suffix,
228 crate::RuntimeEffectKind::ToolBatch,
229 replay_key,
230 )
231 }
232
233 pub(crate) async fn execute_prepared_tool_batch_launches(
234 &self,
235 batch: crate::PreparedToolBatch,
236 parent_invocation: crate::RuntimeInvocation,
237 child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
238 ) -> Result<crate::ToolBatchEffectOutcome, crate::RuntimeEffectControllerError> {
239 let indexed_tools = batch.calls.into_iter().enumerate().collect::<Vec<_>>();
240 let cancellation = self.cancellation_token.clone().unwrap_or_default();
241 let tool_cancel = cancellation.child_token();
242 let child_trace_hooks = std::sync::Arc::new(child_trace_hooks);
243 if !self
244 .dispatch
245 .effect_controller
246 .controller()
247 .supports_concurrent_effects()
248 {
249 let mut launches = Vec::with_capacity(indexed_tools.len());
250 let mut triggers = Vec::new();
251 let mut context = self.clone().with_cancellation_token(tool_cancel.clone());
252 for (index, child) in indexed_tools {
253 if cancellation.is_cancelled() {
254 tool_cancel.cancel();
255 launches.push(cancelled_runtime_tool_call_launch(
256 child.call.call_id,
257 child.call.tool_name,
258 child.call.args,
259 child.call.replay,
260 ));
261 continue;
262 }
263 let child_execution_trace_hook =
264 child_trace_hooks.get(&child.call.call_id).cloned();
265 let outcome = context
266 .execute_prepared_tool_batch_child(
267 child,
268 index,
269 parent_invocation.clone(),
270 child_execution_trace_hook,
271 )
272 .await;
273 launches.push(outcome.launch);
274 triggers.extend(outcome.triggers);
275 context = context.with_cancellation_token(tool_cancel.clone());
276 }
277 return Ok(crate::ToolBatchEffectOutcome { launches, triggers });
278 }
279 let child_outcomes = schedule_tool_batch(
280 indexed_tools,
281 |(index, _)| *index,
282 |(_, child)| self.tool_scheduling(&child.call.tool_name),
283 {
284 let context = self.clone();
285 let cancellation = cancellation.clone();
286 let tool_cancel = tool_cancel.clone();
287 let child_trace_hooks = std::sync::Arc::clone(&child_trace_hooks);
288 move |(index, child)| {
289 let context = context.clone().with_cancellation_token(tool_cancel.clone());
290 let cancellation = cancellation.clone();
291 let tool_cancel = tool_cancel.clone();
292 let parent_invocation = parent_invocation.clone();
293 let cancelled_tool = child.call.clone();
294 let child_execution_trace_hook =
295 child_trace_hooks.get(&child.call.call_id).cloned();
296 async move {
297 let tool_call = context.execute_prepared_tool_batch_child(
298 child,
299 index,
300 parent_invocation,
301 child_execution_trace_hook,
302 );
303 tokio::pin!(tool_call);
304 tokio::select! {
305 biased;
306 _ = cancellation.cancelled() => {
307 tool_cancel.cancel();
308 let grace = context
309 .dispatch
310 .clock
311 .sleep(std::time::Duration::from_millis(50));
312 tokio::pin!(grace);
313 tokio::select! {
314 biased;
315 outcome = &mut tool_call => outcome,
316 _ = &mut grace => CoordinatedToolLaunch {
317 launch: cancelled_runtime_tool_call_launch(
318 cancelled_tool.call_id,
319 cancelled_tool.tool_name,
320 cancelled_tool.args,
321 cancelled_tool.replay,
322 ),
323 triggers: Vec::new(),
324 },
325 }
326 }
327 outcome = &mut tool_call => outcome,
328 }
329 }
330 }
331 },
332 )
333 .await;
334
335 let mut launches = Vec::with_capacity(child_outcomes.len());
336 let mut triggers = Vec::new();
337 for outcome in child_outcomes {
338 launches.push(outcome.launch);
339 triggers.extend(outcome.triggers);
340 }
341 Ok(crate::ToolBatchEffectOutcome { launches, triggers })
342 }
343
344 async fn execute_prepared_tool_batch_child(
345 &self,
346 child: crate::PreparedToolBatchCall,
347 index: usize,
348 parent_invocation: crate::RuntimeInvocation,
349 child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
350 ) -> CoordinatedToolLaunch {
351 let call_id = child.call.call_id.clone();
352 let tool_name = child.call.tool_name.clone();
353 let args = child.call.args.clone();
354 let replay = child.call.replay.clone();
355 let activity_id = TurnActivityId::new(format!("tool:{call_id}"));
356 self.emit_tool_call_started(&call_id, &tool_name, args.clone(), activity_id.clone())
357 .await;
358
359 let retry_policy = crate::tool_dispatch::resolve_callable_manifest_by_id(
360 self.dispatch.as_ref(),
361 &child.call.tool_id,
362 )
363 .map(|manifest| manifest.retry_policy)
364 .or_else(|| {
365 child
366 .execution_grant
367 .as_ref()
368 .map(|grant| grant.manifest.retry_policy)
369 })
370 .unwrap_or(crate::ToolRetryPolicy::Never);
371 let max_attempts = retry_policy.max_attempts().max(1);
372 let mut triggers = Vec::new();
373
374 for attempt in 1..=max_attempts {
375 let attempt_invocation =
376 self.tool_attempt_invocation(&parent_invocation, &child.replay_suffix, attempt);
377 let attempt_outcome = self
378 .dispatch
379 .effect_controller
380 .controller()
381 .execute_effect(
382 crate::RuntimeEffectEnvelope::new(
383 attempt_invocation,
384 crate::RuntimeEffectCommand::ToolAttempt {
385 call: child.call.clone(),
386 execution_grant: child.execution_grant.clone(),
387 attempt,
388 max_attempts,
389 },
390 ),
391 crate::RuntimeEffectLocalExecutor::tool_batch(
392 self.clone(),
393 child_execution_trace_hook
394 .clone()
395 .map(|hook| {
396 std::iter::once((child.call.call_id.clone(), hook)).collect()
397 })
398 .unwrap_or_default(),
399 ),
400 )
401 .await;
402 let attempt_outcome = match attempt_outcome {
403 Ok(outcome) => match outcome.into_tool_attempt_effect() {
404 Ok(outcome) => outcome,
405 Err(err) => {
406 let completed = self
407 .complete_tool_call(
408 index,
409 call_id.clone(),
410 replay,
411 runtime_failure_dispatch_outcome(
412 Some(call_id.clone()),
413 tool_name,
414 args,
415 "tool_attempt_failed",
416 err.to_string(),
417 ),
418 activity_id,
419 )
420 .await;
421 return CoordinatedToolLaunch {
422 launch: crate::runtime::ToolCallLaunch::Done {
423 result: completed.completed,
424 },
425 triggers,
426 };
427 }
428 },
429 Err(err) => {
430 let completed = self
431 .complete_tool_call(
432 index,
433 call_id.clone(),
434 replay,
435 runtime_failure_dispatch_outcome(
436 Some(call_id.clone()),
437 tool_name,
438 args,
439 "tool_attempt_failed",
440 err.to_string(),
441 ),
442 activity_id,
443 )
444 .await;
445 return CoordinatedToolLaunch {
446 launch: crate::runtime::ToolCallLaunch::Done {
447 result: completed.completed,
448 },
449 triggers,
450 };
451 }
452 };
453 triggers.extend(attempt_outcome.triggers);
454 match attempt_outcome.launch {
455 crate::ToolAttemptLaunch::Pending {
456 key,
457 pending,
458 duration_ms,
459 } => {
460 let dispatch_outcome = self
461 .await_pending_tool_dispatch_outcome_with_suffix(
462 &call_id,
463 Some(parent_invocation.clone()),
464 format!("{}:await", child.replay_suffix),
465 crate::tool_dispatch::PendingToolDispatchOutcome {
466 tool_name: child.call.tool_name.clone(),
467 args: child.call.args.clone(),
468 key,
469 pending,
470 duration_ms,
471 },
472 self.cancellation_token.clone(),
473 )
474 .await;
475 let completed = self
476 .complete_tool_call(
477 index,
478 call_id.clone(),
479 child.call.replay.clone(),
480 dispatch_outcome,
481 activity_id,
482 )
483 .await;
484 return CoordinatedToolLaunch {
485 launch: crate::runtime::ToolCallLaunch::Done {
486 result: completed.completed,
487 },
488 triggers,
489 };
490 }
491 crate::ToolAttemptLaunch::Done { mut record } => {
492 record.call_id = Some(call_id.clone());
493 let retry_after = retry_after_ms(
494 &ToolResult::from_output(record.output.clone()),
495 retry_policy,
496 attempt - 1,
497 );
498 let Some(retry_after) = retry_after else {
499 let completed = self
500 .complete_tool_call(
501 index,
502 call_id.clone(),
503 child.call.replay.clone(),
504 ToolDispatchOutcome { record },
505 activity_id,
506 )
507 .await;
508 return CoordinatedToolLaunch {
509 launch: crate::runtime::ToolCallLaunch::Done {
510 result: completed.completed,
511 },
512 triggers,
513 };
514 };
515 if attempt >= max_attempts {
516 let exhausted =
517 mark_retry_exhausted(ToolResult::from_output(record.output), attempt);
518 record.output = exhausted.into_done_output().unwrap_or_else(|_| {
519 ToolCallOutput::failure(ToolFailure::runtime(
520 ToolFailureClass::Internal,
521 "tool_retry_exhaustion_failed",
522 "retry exhaustion produced a pending output",
523 ))
524 });
525 let completed = self
526 .complete_tool_call(
527 index,
528 call_id.clone(),
529 child.call.replay.clone(),
530 ToolDispatchOutcome { record },
531 activity_id,
532 )
533 .await;
534 return CoordinatedToolLaunch {
535 launch: crate::runtime::ToolCallLaunch::Done {
536 result: completed.completed,
537 },
538 triggers,
539 };
540 }
541 if retry_after > 0
542 && let Err(err) = self
543 .sleep_before_tool_retry(
544 &parent_invocation,
545 &child.replay_suffix,
546 attempt,
547 retry_after,
548 )
549 .await
550 {
551 let completed = self
552 .complete_tool_call(
553 index,
554 call_id.clone(),
555 child.call.replay.clone(),
556 runtime_failure_dispatch_outcome(
557 Some(call_id.clone()),
558 child.call.tool_name.clone(),
559 child.call.args.clone(),
560 "tool_retry_sleep_failed",
561 format!(
562 "retry sleep for tool `{}` failed after attempt {attempt}: {err}",
563 child.call.tool_name
564 ),
565 ),
566 activity_id,
567 )
568 .await;
569 return CoordinatedToolLaunch {
570 launch: crate::runtime::ToolCallLaunch::Done {
571 result: completed.completed,
572 },
573 triggers,
574 };
575 }
576 }
577 }
578 }
579
580 let completed = self
581 .complete_tool_call(
582 index,
583 call_id.clone(),
584 child.call.replay,
585 runtime_failure_dispatch_outcome(
586 Some(call_id),
587 child.call.tool_name,
588 child.call.args,
589 "tool_retry_loop_failed",
590 "tool retry loop exited without a terminal result",
591 ),
592 activity_id,
593 )
594 .await;
595 CoordinatedToolLaunch {
596 launch: crate::runtime::ToolCallLaunch::Done {
597 result: completed.completed,
598 },
599 triggers,
600 }
601 }
602
603 async fn emit_tool_call_started(
604 &self,
605 call_id: &str,
606 name: &str,
607 args: serde_json::Value,
608 activity_id: TurnActivityId,
609 ) {
610 let _ = self
611 .dispatch
612 .event_tx
613 .send(SessionEvent::ToolCallStart {
614 call_id: Some(call_id.to_string()),
615 name: name.to_string(),
616 args: args.clone(),
617 })
618 .await;
619 self.emit_tool_call_started_trace(call_id, name, &args);
620 self.emit_turn_activity(
621 activity_id,
622 TurnEvent::ToolCallStarted {
623 call_id: Some(call_id.to_string()),
624 name: name.to_string(),
625 args,
626 graph_key: self.code_block_graph_key(),
627 parent_call_id: self.batch_parent_call_id(),
628 },
629 )
630 .await;
631 }
632
633 fn tool_attempt_invocation(
634 &self,
635 parent_invocation: &crate::RuntimeInvocation,
636 child_replay_suffix: &str,
637 attempt: u32,
638 ) -> crate::RuntimeInvocation {
639 let suffix = format!("{child_replay_suffix}:attempt:{attempt}");
640 let parent_effect_id = parent_invocation.effect_id().unwrap_or("tool-batch");
641 crate::runtime::causal::child_effect_invocation(
642 parent_invocation,
643 format!("{parent_effect_id}:{suffix}"),
644 crate::RuntimeEffectKind::ToolAttempt,
645 suffix,
646 )
647 }
648
649 async fn sleep_before_tool_retry(
650 &self,
651 parent_invocation: &crate::RuntimeInvocation,
652 child_replay_suffix: &str,
653 attempt: u32,
654 retry_after_ms: u64,
655 ) -> Result<(), crate::RuntimeEffectControllerError> {
656 let suffix = format!("{child_replay_suffix}:attempt:{attempt}:sleep");
657 let parent_effect_id = parent_invocation.effect_id().unwrap_or("tool-batch");
658 let invocation = crate::runtime::causal::child_effect_invocation(
659 parent_invocation,
660 format!("{parent_effect_id}:{suffix}"),
661 crate::RuntimeEffectKind::Sleep,
662 suffix,
663 );
664 let cancellation = self.cancellation_token.clone().unwrap_or_default();
665 let outcome = self
666 .dispatch
667 .effect_controller
668 .controller()
669 .execute_effect(
670 crate::RuntimeEffectEnvelope::new(
671 invocation,
672 crate::RuntimeEffectCommand::Sleep {
673 duration_ms: retry_after_ms,
674 },
675 ),
676 crate::RuntimeEffectLocalExecutor::sleep_with_clock(
677 cancellation,
678 std::sync::Arc::clone(&self.dispatch.clock),
679 ),
680 )
681 .await?;
682 match outcome {
683 crate::RuntimeEffectOutcome::Sleep => Ok(()),
684 other => Err(crate::RuntimeEffectControllerError::new(
685 "runtime_effect_wrong_outcome",
686 format!("expected sleep outcome, got {}", other.kind().as_str()),
687 )),
688 }
689 }
690
691 #[expect(
692 clippy::too_many_arguments,
693 reason = "tool execution carries explicit runtime call metadata"
694 )]
695 pub(crate) async fn execute_tool_call(
696 &self,
697 call_id: String,
698 name: String,
699 args: serde_json::Value,
700 index: usize,
701 replay: Option<crate::llm::types::ProviderReplayMeta>,
702 parent_invocation: Option<crate::RuntimeInvocation>,
703 child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
704 ) -> CompletedProtocolToolCall {
705 let _ = self
706 .dispatch
707 .event_tx
708 .send(SessionEvent::ToolCallStart {
709 call_id: Some(call_id.clone()),
710 name: name.clone(),
711 args: args.clone(),
712 })
713 .await;
714 let tool_correlation_id = TurnActivityId::new(format!("tool:{call_id}"));
715 self.emit_tool_call_started_trace(&call_id, &name, &args);
716 self.emit_turn_activity(
717 tool_correlation_id.clone(),
718 TurnEvent::ToolCallStarted {
719 call_id: Some(call_id.clone()),
720 name: name.clone(),
721 args: args.clone(),
722 graph_key: self.code_block_graph_key(),
723 parent_call_id: self.batch_parent_call_id(),
724 },
725 )
726 .await;
727
728 let parent_invocation = parent_invocation.or_else(|| self.parent_invocation.clone());
729 let mut dispatch = (*self.dispatch).clone();
730 dispatch.parent_invocation = parent_invocation.clone();
731 let pending = crate::sansio::PendingToolCall {
732 call_id: call_id.clone(),
733 tool_name: name,
734 args,
735 replay: replay.clone(),
736 };
737 let launch = match prepare_tool_call_with_context(&dispatch, pending, Some(call_id.clone()))
738 .await
739 {
740 ToolPreparationOutcome::Prepared(prepared) => {
741 let dispatch_context = std::sync::Arc::new(dispatch.clone());
742 let runtime_context = if let Some(parent_invocation) = parent_invocation.clone() {
743 self.clone().with_parent_invocation(parent_invocation)
744 } else {
745 self.clone()
746 };
747 let mut tool_context =
748 crate::ToolContext::from_dispatch(std::sync::Arc::clone(&dispatch_context))
749 .runtime_execution_context(runtime_context)
750 .prepared_call(&prepared)
751 .cancellation_token(self.cancellation_token.clone())
752 .runtime_process_id(self.runtime_process_id.clone())
753 .parent_invocation(parent_invocation.clone())
754 .child_execution_trace_hook(child_execution_trace_hook.clone());
755 if let Some(process_events) = self.process_event_context.as_ref() {
756 tool_context = tool_context.process_events(
757 process_events.process_id.clone(),
758 std::sync::Arc::clone(&process_events.registry),
759 process_events.awaiter.clone(),
760 process_events.store.clone(),
761 process_events.session_store_factory.clone(),
762 process_events.queued_work_driver.clone(),
763 );
764 }
765 let tool_context = tool_context.build();
766 dispatch_prepared_tool_call_launch_with_execution_context(
767 dispatch_context.as_ref(),
768 prepared,
769 None,
770 tool_context,
771 )
772 .await
773 }
774 ToolPreparationOutcome::Completed(outcome) => ToolCallLaunch::Done(*outcome),
775 };
776 let mut outcome = match launch {
777 ToolCallLaunch::Done(outcome) => outcome,
778 ToolCallLaunch::Pending(pending) => {
779 self.await_pending_tool_dispatch_outcome(
780 &call_id,
781 parent_invocation.clone(),
782 pending,
783 self.cancellation_token.clone(),
784 )
785 .await
786 }
787 };
788 outcome.record.call_id = Some(call_id.clone());
789
790 self.complete_tool_call(index, call_id, replay, outcome, tool_correlation_id)
791 .await
792 }
793
794 #[expect(
795 clippy::too_many_arguments,
796 reason = "tool execution carries explicit runtime call metadata"
797 )]
798 pub(crate) async fn execute_tool_call_by_id(
799 &self,
800 call_id: String,
801 tool_id: crate::ToolId,
802 args: serde_json::Value,
803 index: usize,
804 replay: Option<crate::llm::types::ProviderReplayMeta>,
805 parent_invocation: Option<crate::RuntimeInvocation>,
806 child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
807 ) -> CompletedProtocolToolCall {
808 let Some(manifest) =
809 crate::tool_dispatch::resolve_callable_manifest_by_id(self.dispatch.as_ref(), &tool_id)
810 else {
811 let outcome = ToolDispatchOutcome {
812 record: ToolCallRecord {
813 call_id: Some(call_id.clone()),
814 tool: tool_id.to_string(),
815 args,
816 output: ToolCallOutput::failure(ToolFailure::runtime(
817 ToolFailureClass::Unavailable,
818 "tool_unavailable",
819 format!("Tool id `{tool_id}` is unavailable in this session"),
820 )),
821 duration_ms: 0,
822 },
823 };
824 let activity_id = TurnActivityId::new(format!("tool:{call_id}"));
825 return self
826 .complete_tool_call(index, call_id, replay, outcome, activity_id)
827 .await;
828 };
829 self.execute_tool_call(
830 call_id,
831 manifest.name,
832 args,
833 index,
834 replay,
835 parent_invocation,
836 child_execution_trace_hook,
837 )
838 .await
839 }
840
841 #[expect(
842 clippy::too_many_arguments,
843 reason = "tool execution carries explicit runtime call metadata"
844 )]
845 pub(crate) async fn execute_tool_call_by_grant(
846 &self,
847 call_id: String,
848 grant: crate::ToolExecutionGrant,
849 args: serde_json::Value,
850 index: usize,
851 replay: Option<crate::llm::types::ProviderReplayMeta>,
852 parent_invocation: Option<crate::RuntimeInvocation>,
853 child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
854 ) -> CompletedProtocolToolCall {
855 let name = grant.manifest.name.clone();
856 let _ = self
857 .dispatch
858 .event_tx
859 .send(SessionEvent::ToolCallStart {
860 call_id: Some(call_id.clone()),
861 name: name.clone(),
862 args: args.clone(),
863 })
864 .await;
865 let tool_correlation_id = TurnActivityId::new(format!("tool:{call_id}"));
866 self.emit_tool_call_started_trace(&call_id, &name, &args);
867 self.emit_turn_activity(
868 tool_correlation_id.clone(),
869 TurnEvent::ToolCallStarted {
870 call_id: Some(call_id.clone()),
871 name: name.clone(),
872 args: args.clone(),
873 graph_key: self.code_block_graph_key(),
874 parent_call_id: self.batch_parent_call_id(),
875 },
876 )
877 .await;
878
879 let parent_invocation = parent_invocation.or_else(|| self.parent_invocation.clone());
880 let mut dispatch = (*self.dispatch).clone();
881 dispatch.parent_invocation = parent_invocation.clone();
882 let pending = crate::sansio::PendingToolCall {
883 call_id: call_id.clone(),
884 tool_name: name,
885 args,
886 replay: replay.clone(),
887 };
888 let launch = match prepare_granted_tool_call_with_context(
889 &dispatch,
890 &grant,
891 pending,
892 Some(call_id.clone()),
893 )
894 .await
895 {
896 ToolPreparationOutcome::Prepared(prepared) => {
897 let dispatch_context = std::sync::Arc::new(dispatch.clone());
898 let runtime_context = if let Some(parent_invocation) = parent_invocation.clone() {
899 self.clone().with_parent_invocation(parent_invocation)
900 } else {
901 self.clone()
902 };
903 let mut tool_context =
904 crate::ToolContext::from_dispatch(std::sync::Arc::clone(&dispatch_context))
905 .runtime_execution_context(runtime_context)
906 .prepared_call(&prepared)
907 .tool_execution_binding(grant.execution_binding.clone())
908 .cancellation_token(self.cancellation_token.clone())
909 .runtime_process_id(self.runtime_process_id.clone())
910 .parent_invocation(parent_invocation.clone())
911 .child_execution_trace_hook(child_execution_trace_hook.clone());
912 if let Some(process_events) = self.process_event_context.as_ref() {
913 tool_context = tool_context.process_events(
914 process_events.process_id.clone(),
915 std::sync::Arc::clone(&process_events.registry),
916 process_events.awaiter.clone(),
917 process_events.store.clone(),
918 process_events.session_store_factory.clone(),
919 process_events.queued_work_driver.clone(),
920 );
921 }
922 let tool_context = tool_context.build();
923 dispatch_granted_prepared_tool_call_launch_with_execution_context(
924 dispatch_context.as_ref(),
925 &grant,
926 prepared,
927 None,
928 tool_context,
929 )
930 .await
931 }
932 ToolPreparationOutcome::Completed(outcome) => ToolCallLaunch::Done(*outcome),
933 };
934 let mut outcome = match launch {
935 ToolCallLaunch::Done(outcome) => outcome,
936 ToolCallLaunch::Pending(pending) => {
937 self.await_pending_tool_dispatch_outcome(
938 &call_id,
939 parent_invocation.clone(),
940 pending,
941 self.cancellation_token.clone(),
942 )
943 .await
944 }
945 };
946 outcome.record.call_id = Some(call_id.clone());
947
948 self.complete_tool_call(index, call_id, replay, outcome, tool_correlation_id)
949 .await
950 }
951
952 pub(crate) async fn prepare_tool_call(
953 &self,
954 pending: crate::sansio::PendingToolCall,
955 ) -> ToolPreparationOutcome {
956 let call_id = Some(pending.call_id.clone());
957 prepare_tool_call_with_context(self.dispatch.as_ref(), pending, call_id).await
958 }
959
960 pub(crate) async fn execute_prepared_tool_attempt_effect(
961 &self,
962 prepared: crate::PreparedToolCall,
963 execution_grant: Option<Box<crate::ToolExecutionGrant>>,
964 attempt: u32,
965 max_attempts: u32,
966 attempt_invocation: crate::RuntimeInvocation,
967 child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
968 ) -> Result<crate::ToolAttemptEffectOutcome, crate::RuntimeEffectControllerError> {
969 let call_id = prepared.call_id.clone();
970 let mut attempt_dispatch = (*self.dispatch).clone();
971 attempt_dispatch.parent_invocation = Some(attempt_invocation.clone());
972 attempt_dispatch.trigger_outcomes =
973 crate::tool_dispatch::ToolTriggerOutcomeBuffer::default();
974 let attempt_dispatch = std::sync::Arc::new(attempt_dispatch);
975 let mut attempt_context = self.clone();
976 attempt_context.dispatch = std::sync::Arc::clone(&attempt_dispatch);
977 attempt_context.parent_invocation = Some(attempt_invocation.clone());
978
979 let mut tool_context =
980 crate::ToolContext::from_dispatch(std::sync::Arc::clone(&attempt_dispatch))
981 .runtime_execution_context(attempt_context.clone())
982 .prepared_call(&prepared)
983 .cancellation_token(self.cancellation_token.clone())
984 .runtime_process_id(self.runtime_process_id.clone())
985 .parent_invocation(Some(attempt_invocation))
986 .child_execution_trace_hook(child_execution_trace_hook);
987 if let Some(process_events) = self.process_event_context.as_ref() {
988 tool_context = tool_context.process_events(
989 process_events.process_id.clone(),
990 std::sync::Arc::clone(&process_events.registry),
991 process_events.awaiter.clone(),
992 process_events.store.clone(),
993 process_events.session_store_factory.clone(),
994 process_events.queued_work_driver.clone(),
995 );
996 }
997 let tool_context = tool_context.build();
998 let launch = match Box::pin(async {
999 if let Some(grant) = execution_grant.as_ref() {
1000 dispatch_granted_prepared_tool_attempt_launch_with_execution_context(
1001 attempt_dispatch.as_ref(),
1002 grant,
1003 prepared,
1004 attempt,
1005 max_attempts,
1006 None,
1007 tool_context,
1008 )
1009 .await
1010 } else {
1011 dispatch_prepared_tool_attempt_launch_with_execution_context(
1012 attempt_dispatch.as_ref(),
1013 prepared,
1014 attempt,
1015 max_attempts,
1016 None,
1017 tool_context,
1018 )
1019 .await
1020 }
1021 })
1022 .await
1023 {
1024 ToolCallLaunch::Done(outcome) => {
1025 let mut record = outcome.record;
1026 record.call_id = Some(call_id);
1027 crate::ToolAttemptLaunch::Done { record }
1028 }
1029 ToolCallLaunch::Pending(pending) => crate::ToolAttemptLaunch::Pending {
1030 key: pending.key,
1031 pending: pending.pending,
1032 duration_ms: pending.duration_ms,
1033 },
1034 };
1035 let triggers = attempt_context
1036 .drain_tool_trigger_outcomes()
1037 .map_err(|err| {
1038 crate::RuntimeEffectControllerError::new(
1039 "tool_trigger_outcome_drain",
1040 err.to_string(),
1041 )
1042 })?;
1043 Ok(crate::ToolAttemptEffectOutcome { launch, triggers })
1044 }
1045
1046 pub(super) async fn await_process_with_cancellation(
1047 &self,
1048 process_id: &str,
1049 parent_invocation: Option<crate::RuntimeInvocation>,
1050 cancellation: Option<tokio_util::sync::CancellationToken>,
1051 ) -> Result<crate::ProcessAwaitOutput, crate::PluginError> {
1052 let _phase = self.named_phase("process.await_handle");
1053 if let Some(cancellation) = cancellation {
1054 tokio::select! {
1055 result = self.dispatch.processes.await_process(
1056 process_id,
1057 self.process_scope(parent_invocation.clone()),
1058 ) => result,
1059 _ = cancellation.cancelled() => {
1060 let _ = self.dispatch.processes.cancel(
1061 &self.dispatch.session_id,
1062 process_id,
1063 self.process_scope(parent_invocation.clone()),
1064 ).await;
1065 self.dispatch.processes.await_process(
1066 process_id,
1067 self.process_scope(parent_invocation),
1068 ).await
1069 }
1070 }
1071 } else {
1072 self.dispatch
1073 .processes
1074 .await_process(process_id, self.process_scope(parent_invocation))
1075 .await
1076 }
1077 }
1078
1079 pub(crate) async fn complete_tool_call(
1080 &self,
1081 _index: usize,
1082 call_id: String,
1083 replay: Option<crate::llm::types::ProviderReplayMeta>,
1084 outcome: ToolDispatchOutcome,
1085 tool_correlation_id: TurnActivityId,
1086 ) -> CompletedProtocolToolCall {
1087 let output = outcome.record.output.clone();
1088 let projection_output = output.clone();
1089 let projection_tool_name = outcome.record.tool.clone();
1090 let projection_args = outcome.record.args.clone();
1091 let projection_duration_ms = outcome.record.duration_ms;
1092 let projection_call_id = call_id.clone();
1093 tokio::task::yield_now().await;
1094 let plugins = std::sync::Arc::clone(&self.dispatch.plugins);
1095 let projection_context = crate::plugin::ToolResultProjectionContext {
1096 session_id: self.dispatch.session_id.clone(),
1097 tool_name: projection_tool_name,
1098 args: projection_args,
1099 output: projection_output,
1100 duration_ms: projection_duration_ms,
1101 call_id: projection_call_id,
1102 };
1103 let model_return = match plugins.project_tool_result(projection_context).await {
1104 Ok(projected) => projected,
1105 Err(err) => ModelToolReturn::text(
1106 call_id.clone(),
1107 outcome.record.tool.clone(),
1108 err.to_string(),
1109 ),
1110 };
1111
1112 let record = ToolCallRecord {
1113 call_id: Some(call_id.clone()),
1114 tool: outcome.record.tool.clone(),
1115 args: outcome.record.args.clone(),
1116 output: output.clone(),
1117 duration_ms: outcome.record.duration_ms,
1118 };
1119 self.emit_tool_call_completed_trace(&record);
1120 self.emit_turn_activity(
1121 tool_correlation_id,
1122 TurnEvent::ToolCallCompleted {
1123 call_id: Some(call_id.clone()),
1124 name: outcome.record.tool.clone(),
1125 args: outcome.record.args.clone(),
1126 output: output.clone(),
1127 duration_ms: outcome.record.duration_ms,
1128 graph_key: self.code_block_graph_key(),
1129 parent_call_id: self.batch_parent_call_id(),
1130 },
1131 )
1132 .await;
1133 CompletedProtocolToolCall {
1134 completed: crate::sansio::CompletedToolCall {
1135 call_id,
1136 tool_name: outcome.record.tool,
1137 args: outcome.record.args,
1138 output,
1139 model_return,
1140 duration_ms: outcome.record.duration_ms,
1141 replay,
1142 },
1143 record,
1144 }
1145 }
1146
1147 pub(crate) async fn pending_completion_dispatch_outcome(
1148 &self,
1149 tool_name: String,
1150 args: serde_json::Value,
1151 resolution: crate::Resolution,
1152 duration_ms: u64,
1153 ) -> ToolDispatchOutcome {
1154 let output = crate::tool_result::tool_output_from_completion_resolution(resolution);
1155 let result = finalize_tool_result_with_execution_context(
1156 self.dispatch.as_ref(),
1157 &tool_name,
1158 &args,
1159 ToolResult::from_output(output),
1160 duration_ms,
1161 )
1162 .await;
1163 let output = result.into_done_output().unwrap_or_else(|_| {
1164 ToolCallOutput::failure(ToolFailure::runtime(
1165 ToolFailureClass::Internal,
1166 "pending_tool_not_finalized",
1167 "pending tool result reached a completed-output projection path",
1168 ))
1169 });
1170 ToolDispatchOutcome {
1171 record: ToolCallRecord {
1172 call_id: None,
1173 tool: tool_name,
1174 args,
1175 output,
1176 duration_ms,
1177 },
1178 }
1179 }
1180
1181 async fn await_pending_tool_dispatch_outcome(
1182 &self,
1183 call_id: &str,
1184 parent_invocation: Option<crate::RuntimeInvocation>,
1185 pending: crate::tool_dispatch::PendingToolDispatchOutcome,
1186 cancellation: Option<tokio_util::sync::CancellationToken>,
1187 ) -> ToolDispatchOutcome {
1188 self.await_pending_tool_dispatch_outcome_with_suffix(
1189 call_id,
1190 parent_invocation,
1191 format!("{call_id}:await"),
1192 pending,
1193 cancellation,
1194 )
1195 .await
1196 }
1197
1198 async fn await_pending_tool_dispatch_outcome_with_suffix(
1199 &self,
1200 call_id: &str,
1201 parent_invocation: Option<crate::RuntimeInvocation>,
1202 replay_suffix: String,
1203 pending: crate::tool_dispatch::PendingToolDispatchOutcome,
1204 cancellation: Option<tokio_util::sync::CancellationToken>,
1205 ) -> ToolDispatchOutcome {
1206 let fallback;
1207 let parent = if let Some(parent) = parent_invocation.as_ref() {
1208 parent
1209 } else {
1210 fallback = crate::RuntimeInvocation::effect(
1211 crate::RuntimeScope::new(&self.dispatch.session_id),
1212 format!("tool:{call_id}:await"),
1213 crate::RuntimeEffectKind::AwaitEvent,
1214 format!("tool:{call_id}:await"),
1215 );
1216 &fallback
1217 };
1218 let parent_effect_id = parent.effect_id().unwrap_or("tool");
1219 let invocation = crate::runtime::causal::child_effect_invocation(
1220 parent,
1221 format!("{parent_effect_id}:{replay_suffix}"),
1222 crate::RuntimeEffectKind::AwaitEvent,
1223 replay_suffix,
1224 );
1225 let cancellation = cancellation.unwrap_or_default();
1226 let deadline = pending
1227 .pending
1228 .deadline
1229 .map(|duration| self.dispatch.clock.now() + duration);
1230 let outcome = self
1231 .dispatch
1232 .effect_controller
1233 .controller()
1234 .execute_effect(
1235 crate::RuntimeEffectEnvelope::new(
1236 invocation,
1237 crate::RuntimeEffectCommand::AwaitEvent { key: pending.key },
1238 ),
1239 crate::RuntimeEffectLocalExecutor::await_event_with_clock(
1240 cancellation,
1241 deadline,
1242 std::sync::Arc::clone(&self.dispatch.clock),
1243 ),
1244 )
1245 .await;
1246 let resolution = match outcome.and_then(crate::RuntimeEffectOutcome::into_await_event) {
1247 Ok(resolution) => resolution,
1248 Err(err) => {
1249 return ToolDispatchOutcome {
1250 record: ToolCallRecord {
1251 call_id: None,
1252 tool: pending.tool_name,
1253 args: pending.args,
1254 output: ToolCallOutput::failure(ToolFailure::runtime(
1255 ToolFailureClass::Internal,
1256 "pending_tool_completion_failed",
1257 err.to_string(),
1258 )),
1259 duration_ms: pending.duration_ms,
1260 },
1261 };
1262 }
1263 };
1264 self.pending_completion_dispatch_outcome(
1265 pending.tool_name,
1266 pending.args,
1267 resolution,
1268 pending.duration_ms,
1269 )
1270 .await
1271 }
1272
1273 pub async fn call_tool_by_id(
1274 &self,
1275 call_id: String,
1276 tool_id: crate::ToolId,
1277 args: serde_json::Value,
1278 index: usize,
1279 ) -> ToolInvocationReply {
1280 let executed = self
1281 .execute_tool_call_by_id(call_id, tool_id, args, index, None, None, None)
1282 .await;
1283 let reply = ToolInvocationReply::from_output(executed.completed.output);
1284 reply.with_record(executed.record)
1285 }
1286
1287 pub async fn call_tool_by_id_with_child_execution_trace_hook(
1288 &self,
1289 call_id: String,
1290 tool_id: crate::ToolId,
1291 args: serde_json::Value,
1292 index: usize,
1293 trace_hook: crate::ToolChildExecutionTraceHook,
1294 ) -> ToolInvocationReply {
1295 let executed = self
1296 .execute_tool_call_by_id(call_id, tool_id, args, index, None, None, Some(trace_hook))
1297 .await;
1298 let reply = ToolInvocationReply::from_output(executed.completed.output);
1299 reply.with_record(executed.record)
1300 }
1301
1302 pub async fn call_tool_with_execution_grant(
1303 &self,
1304 call_id: String,
1305 grant: crate::ToolExecutionGrant,
1306 args: serde_json::Value,
1307 index: usize,
1308 ) -> ToolInvocationReply {
1309 let executed = self
1310 .execute_tool_call_by_grant(call_id, grant, args, index, None, None, None)
1311 .await;
1312 let reply = ToolInvocationReply::from_output(executed.completed.output);
1313 reply.with_record(executed.record)
1314 }
1315
1316 pub async fn call_tool_with_execution_grant_and_child_execution_trace_hook(
1317 &self,
1318 call_id: String,
1319 grant: crate::ToolExecutionGrant,
1320 args: serde_json::Value,
1321 index: usize,
1322 trace_hook: crate::ToolChildExecutionTraceHook,
1323 ) -> ToolInvocationReply {
1324 let executed = self
1325 .execute_tool_call_by_grant(call_id, grant, args, index, None, None, Some(trace_hook))
1326 .await;
1327 let reply = ToolInvocationReply::from_output(executed.completed.output);
1328 reply.with_record(executed.record)
1329 }
1330
1331 pub async fn call_tool_batch(&self, calls: Vec<ToolInvocation>) -> Vec<ToolInvocationReply> {
1332 if calls.is_empty() {
1333 return Vec::new();
1334 }
1335
1336 let batch_id = deterministic_tool_invocation_batch_id(&calls);
1337 let mut replies = vec![None; calls.len()];
1338 let mut prepared_entries = Vec::new();
1339
1340 for (index, call) in calls.into_iter().enumerate() {
1341 let preparation = if let Some(grant) = call.execution_grant.as_deref().cloned() {
1342 let pending = crate::sansio::PendingToolCall {
1343 call_id: call.id.clone(),
1344 tool_name: grant.manifest.name.clone(),
1345 args: call.args,
1346 replay: None,
1347 };
1348 (
1349 Some(grant.clone()),
1350 prepare_granted_tool_call_with_context(
1351 self.dispatch.as_ref(),
1352 &grant,
1353 pending,
1354 Some(call.id.clone()),
1355 )
1356 .await,
1357 )
1358 } else {
1359 let Some(manifest) = crate::tool_dispatch::resolve_callable_manifest_by_id(
1360 self.dispatch.as_ref(),
1361 &call.tool_id,
1362 ) else {
1363 let outcome = ToolDispatchOutcome {
1364 record: ToolCallRecord {
1365 call_id: Some(call.id.clone()),
1366 tool: call.tool_id.to_string(),
1367 args: call.args,
1368 output: ToolCallOutput::failure(ToolFailure::runtime(
1369 ToolFailureClass::Unavailable,
1370 "tool_unavailable",
1371 format!(
1372 "Tool id `{}` is unavailable in this session",
1373 call.tool_id
1374 ),
1375 )),
1376 duration_ms: 0,
1377 },
1378 };
1379 let completed = self
1380 .complete_tool_call(
1381 index,
1382 call.id,
1383 None,
1384 outcome,
1385 TurnActivityId::new(format!("tool:{}", batch_id)),
1386 )
1387 .await;
1388 replies[index] = Some(
1389 ToolInvocationReply::from_output(completed.completed.output)
1390 .with_record(completed.record),
1391 );
1392 continue;
1393 };
1394
1395 let pending = crate::sansio::PendingToolCall {
1396 call_id: call.id.clone(),
1397 tool_name: manifest.name,
1398 args: call.args,
1399 replay: None,
1400 };
1401 (None, self.prepare_tool_call(pending).await)
1402 };
1403 let (execution_grant, preparation) = preparation;
1404 match preparation {
1405 ToolPreparationOutcome::Prepared(prepared) => {
1406 prepared_entries.push((
1407 index,
1408 prepared,
1409 execution_grant,
1410 call.child_execution_trace_hook,
1411 ));
1412 }
1413 ToolPreparationOutcome::Completed(outcome) => {
1414 let completed = self
1415 .complete_tool_call(
1416 index,
1417 call.id,
1418 None,
1419 *outcome,
1420 TurnActivityId::new(format!("tool:{}", batch_id)),
1421 )
1422 .await;
1423 replies[index] = Some(
1424 ToolInvocationReply::from_output(completed.completed.output)
1425 .with_record(completed.record),
1426 );
1427 }
1428 }
1429 }
1430
1431 if !prepared_entries.is_empty() {
1432 let invocation = self.tool_batch_invocation(&batch_id);
1433 let batch = crate::PreparedToolBatch::new_with_grants(
1434 batch_id.clone(),
1435 prepared_entries
1436 .iter()
1437 .map(|(_, prepared, grant, _)| (prepared.clone(), grant.clone()))
1438 .collect(),
1439 );
1440 let child_trace_hooks = prepared_entries
1441 .iter()
1442 .filter_map(|(_, prepared, _, hook)| {
1443 hook.clone().map(|hook| (prepared.call_id.clone(), hook))
1444 })
1445 .collect();
1446 let envelope = crate::RuntimeEffectEnvelope::new(
1447 invocation.clone(),
1448 crate::RuntimeEffectCommand::ToolBatch { batch },
1449 );
1450 let local_executor =
1451 crate::RuntimeEffectLocalExecutor::tool_batch(self.clone(), child_trace_hooks);
1452 let raw_outcome = self
1453 .dispatch
1454 .effect_controller
1455 .controller()
1456 .execute_effect(envelope, local_executor)
1457 .await;
1458 let outcome =
1459 match raw_outcome.and_then(crate::RuntimeEffectOutcome::into_tool_batch_effect) {
1460 Ok(outcome) => outcome,
1461 Err(err) => {
1462 for (index, prepared, _, _) in prepared_entries {
1463 replies[index] = Some(ToolInvocationReply::error(serde_json::json!(
1464 format!("tool batch failed: {err}")
1465 )));
1466 let _ = prepared;
1467 }
1468 return replies
1469 .into_iter()
1470 .map(|reply| reply.expect("every batch reply slot should be filled"))
1471 .collect();
1472 }
1473 };
1474 if outcome.launches.len() != prepared_entries.len() {
1475 let message = format!(
1476 "tool batch returned {} launches for {} prepared calls",
1477 outcome.launches.len(),
1478 prepared_entries.len()
1479 );
1480 for (index, _, _, _) in prepared_entries {
1481 replies[index] = Some(ToolInvocationReply::error(serde_json::json!(message)));
1482 }
1483 } else {
1484 for ((index, prepared, _, _), launch) in
1485 prepared_entries.into_iter().zip(outcome.launches)
1486 {
1487 let call_id = prepared.call_id.clone();
1488 let reply = match launch {
1489 crate::runtime::ToolCallLaunch::Done { result } => {
1490 let record = ToolCallRecord {
1491 call_id: Some(result.call_id.clone()),
1492 tool: result.tool_name.clone(),
1493 args: result.args.clone(),
1494 output: result.output.clone(),
1495 duration_ms: result.duration_ms,
1496 };
1497 ToolInvocationReply::from_output(result.output).with_record(record)
1498 }
1499 crate::runtime::ToolCallLaunch::Pending {
1500 key,
1501 pending,
1502 duration_ms,
1503 } => {
1504 let dispatch_outcome = self
1505 .await_pending_tool_dispatch_outcome(
1506 &call_id,
1507 Some(invocation.clone()),
1508 crate::tool_dispatch::PendingToolDispatchOutcome {
1509 tool_name: prepared.tool_name.clone(),
1510 args: prepared.args.clone(),
1511 key,
1512 pending,
1513 duration_ms,
1514 },
1515 self.cancellation_token.clone(),
1516 )
1517 .await;
1518 let completed = self
1519 .complete_tool_call(
1520 index,
1521 call_id.clone(),
1522 prepared.replay.clone(),
1523 dispatch_outcome,
1524 TurnActivityId::new(format!("tool:{call_id}")),
1525 )
1526 .await;
1527 ToolInvocationReply::from_output(completed.completed.output)
1528 .with_record(completed.record)
1529 }
1530 };
1531 replies[index] = Some(reply);
1532 }
1533 }
1534 }
1535
1536 replies
1537 .into_iter()
1538 .map(|reply| reply.expect("every batch reply slot should be filled"))
1539 .collect()
1540 }
1541
1542 pub async fn start_tool_call(
1543 &self,
1544 call_id: String,
1545 name: String,
1546 args: serde_json::Value,
1547 ) -> ToolInvocationReply {
1548 self.start_tool_process(call_id, name, args).await
1549 }
1550
1551 pub async fn await_tool_handle(
1552 &self,
1553 call_id: String,
1554 handle: serde_json::Value,
1555 ) -> ToolInvocationReply {
1556 self.await_process_handle(call_id, handle).await
1557 }
1558
1559 pub async fn cancel_tool_handle(
1560 &self,
1561 call_id: String,
1562 handle: serde_json::Value,
1563 ) -> ToolInvocationReply {
1564 self.cancel_process_handle(call_id, handle).await
1565 }
1566
1567 pub async fn signal_tool_handle(
1568 &self,
1569 call_id: String,
1570 handle: serde_json::Value,
1571 signal_name: String,
1572 payload: serde_json::Value,
1573 ) -> ToolInvocationReply {
1574 self.signal_process_handle(call_id, handle, signal_name, payload)
1575 .await
1576 }
1577}