1use super::execution_context::RuntimeExecutionContext;
2use crate::tool_dispatch::{
3 ToolCallLaunch, ToolDispatchOutcome, ToolPreparationOutcome,
4 dispatch_granted_prepared_tool_attempt_launch_with_execution_context,
5 dispatch_granted_prepared_tool_call_launch_with_execution_context,
6 dispatch_prepared_tool_attempt_launch_with_execution_context,
7 dispatch_prepared_tool_call_launch_with_execution_context,
8 finalize_tool_result_with_execution_context, mark_retry_exhausted,
9 prepare_granted_tool_call_with_context, prepare_tool_call_with_context, retry_after_ms,
10 schedule_tool_batch,
11};
12use crate::{
13 ModelToolReturn, SessionEvent, ToolCallOutput, ToolCallRecord, ToolCancellation, ToolFailure,
14 ToolFailureClass, ToolResult, TurnActivityId, TurnEvent,
15};
16use std::collections::HashMap;
17
18#[derive(Clone)]
19pub struct ToolInvocation {
20 pub id: String,
21 pub tool_id: crate::ToolId,
22 pub args: serde_json::Value,
23 pub execution_grant: Option<Box<crate::ToolExecutionGrant>>,
24 pub child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
25}
26
27impl ToolInvocation {
28 pub fn new(id: impl Into<String>, tool_id: crate::ToolId, args: serde_json::Value) -> Self {
29 Self {
30 id: id.into(),
31 tool_id,
32 args,
33 execution_grant: None,
34 child_execution_trace_hook: None,
35 }
36 }
37
38 pub fn with_execution_grant(mut self, grant: crate::ToolExecutionGrant) -> Self {
39 self.execution_grant = Some(Box::new(grant));
40 self
41 }
42
43 pub fn label(&self) -> String {
44 self.tool_id.to_string()
45 }
46
47 pub fn with_child_execution_trace_hook(
48 mut self,
49 hook: crate::ToolChildExecutionTraceHook,
50 ) -> Self {
51 self.child_execution_trace_hook = Some(hook);
52 self
53 }
54}
55
56impl std::fmt::Debug for ToolInvocation {
57 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
58 f.debug_struct("ToolInvocation")
59 .field("id", &self.id)
60 .field("tool_id", &self.tool_id)
61 .field("args", &self.args)
62 .field(
63 "execution_grant",
64 &self.execution_grant.as_ref().map(|_| "<grant>"),
65 )
66 .field(
67 "child_execution_trace_hook",
68 &self.child_execution_trace_hook.as_ref().map(|_| "<hook>"),
69 )
70 .finish()
71 }
72}
73
74#[derive(Clone, Debug)]
75pub struct ToolInvocationReply {
76 pub output: ToolCallOutput,
77 pub record: Option<ToolCallRecord>,
78}
79
80impl ToolInvocationReply {
81 pub fn success(value: serde_json::Value) -> Self {
82 Self {
83 output: ToolCallOutput::success(value),
84 record: None,
85 }
86 }
87
88 pub fn error(value: serde_json::Value) -> Self {
89 let message = value
90 .as_str()
91 .map(ToOwned::to_owned)
92 .unwrap_or_else(|| value.to_string());
93 let mut failure = ToolFailure::tool(ToolFailureClass::Execution, "tool_error", message);
94 failure.raw =
95 Some(serde_json::from_value(value).unwrap_or_else(|_| {
96 crate::ToolValue::String("unserializable tool error".to_string())
97 }));
98 Self {
99 output: ToolCallOutput::failure(failure),
100 record: None,
101 }
102 }
103
104 pub fn from_output(output: ToolCallOutput) -> Self {
105 Self {
106 output,
107 record: None,
108 }
109 }
110
111 pub fn cancelled(message: impl Into<String>) -> Self {
112 Self::from_output(ToolCallOutput::cancelled(ToolCancellation::runtime(
113 message,
114 )))
115 }
116
117 pub(crate) fn with_record(mut self, record: ToolCallRecord) -> Self {
118 self.record = Some(record);
119 self
120 }
121}
122
123#[derive(Clone, Debug)]
124pub(crate) struct CompletedProtocolToolCall {
125 pub completed: crate::sansio::CompletedToolCall,
126 pub record: ToolCallRecord,
127}
128
129fn cancelled_runtime_tool_call_launch(
130 call_id: String,
131 tool_name: String,
132 args: serde_json::Value,
133 replay: Option<crate::llm::types::ProviderReplayMeta>,
134) -> crate::runtime::ToolCallLaunch {
135 crate::runtime::ToolCallLaunch::Done {
136 result: cancelled_completed_tool_call(call_id, tool_name, args, replay),
137 }
138}
139
140fn cancelled_completed_tool_call(
141 call_id: String,
142 tool_name: String,
143 args: serde_json::Value,
144 replay: Option<crate::llm::types::ProviderReplayMeta>,
145) -> crate::sansio::CompletedToolCall {
146 let output = ToolCallOutput::cancelled(ToolCancellation::runtime("tool call cancelled"));
147 crate::sansio::CompletedToolCall {
148 call_id: call_id.clone(),
149 tool_name: tool_name.clone(),
150 args,
151 model_return: ModelToolReturn {
152 call_id,
153 tool_name,
154 parts: vec![crate::ModelToolReturnPart::text(
155 "[Tool execution cancelled]\ntool call cancelled".to_string(),
156 )],
157 },
158 output,
159 duration_ms: 0,
160 replay,
161 }
162}
163
164fn runtime_failure_dispatch_outcome(
165 call_id: Option<String>,
166 tool_name: String,
167 args: serde_json::Value,
168 code: impl Into<String>,
169 message: impl Into<String>,
170) -> ToolDispatchOutcome {
171 ToolDispatchOutcome {
172 record: ToolCallRecord {
173 call_id,
174 tool: tool_name,
175 args,
176 output: ToolCallOutput::failure(ToolFailure::runtime(
177 ToolFailureClass::Internal,
178 code,
179 message,
180 )),
181 duration_ms: 0,
182 },
183 }
184}
185
186fn deterministic_tool_invocation_batch_id(calls: &[ToolInvocation]) -> String {
187 let identity = calls
188 .iter()
189 .map(|call| {
190 serde_json::json!({
191 "id": call.id.clone(),
192 "tool_id": call.tool_id.to_string(),
193 "args": call.args.clone(),
194 "execution_grant": call.execution_grant.as_ref().map(|grant| serde_json::json!({
195 "tool_id": grant.manifest.id.to_string(),
196 "source_id": grant.source_id.clone(),
197 "execution_binding": grant.execution_binding.clone(),
198 })),
199 })
200 })
201 .collect::<Vec<_>>();
202 let digest = crate::stable_hash::stable_json_sha256_hex(&identity)
203 .unwrap_or_else(|_| format!("len-{}", calls.len()));
204 format!("tool-batch:{digest}")
205}
206
207struct CoordinatedToolLaunch {
208 launch: crate::runtime::ToolCallLaunch,
209 triggers: Vec<crate::tool_dispatch::ToolTriggerEffectOutcome>,
210}
211
212impl RuntimeExecutionContext<'_> {
213 fn tool_batch_invocation(&self, batch_id: &str) -> crate::RuntimeInvocation {
214 let suffix = format!("tool-batch:{batch_id}");
215 if let Some(parent) = self.parent_invocation.as_ref() {
216 let parent_effect_id = parent.effect_id().unwrap_or("effect");
217 return crate::runtime::causal::child_effect_invocation(
218 parent,
219 format!("{parent_effect_id}:{suffix}"),
220 crate::RuntimeEffectKind::ToolBatch,
221 suffix,
222 );
223 }
224 let replay_key = format!("{}:{suffix}", self.execution_scope_id());
225 crate::RuntimeInvocation::effect(
226 crate::RuntimeScope::new(self.session_id.clone()),
227 suffix,
228 crate::RuntimeEffectKind::ToolBatch,
229 replay_key,
230 )
231 }
232
233 pub(crate) async fn execute_prepared_tool_batch_launches(
234 &self,
235 batch: crate::PreparedToolBatch,
236 parent_invocation: crate::RuntimeInvocation,
237 child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
238 ) -> Result<crate::ToolBatchEffectOutcome, crate::RuntimeEffectControllerError> {
239 let indexed_tools = batch.calls.into_iter().enumerate().collect::<Vec<_>>();
240 let cancellation = self.cancellation_token.clone().unwrap_or_default();
241 let tool_cancel = cancellation.child_token();
242 let child_trace_hooks = std::sync::Arc::new(child_trace_hooks);
243 if !self
244 .dispatch
245 .effect_controller
246 .controller()
247 .supports_concurrent_effects()
248 {
249 let mut launches = Vec::with_capacity(indexed_tools.len());
250 let mut triggers = Vec::new();
251 let mut context = self.clone().with_cancellation_token(tool_cancel.clone());
252 for (index, child) in indexed_tools {
253 if cancellation.is_cancelled() {
254 tool_cancel.cancel();
255 launches.push(cancelled_runtime_tool_call_launch(
256 child.call.call_id,
257 child.call.tool_name,
258 child.call.args,
259 child.call.replay,
260 ));
261 continue;
262 }
263 let child_execution_trace_hook =
264 child_trace_hooks.get(&child.call.call_id).cloned();
265 let outcome = context
266 .execute_prepared_tool_batch_child(
267 child,
268 index,
269 parent_invocation.clone(),
270 child_execution_trace_hook,
271 )
272 .await;
273 launches.push(outcome.launch);
274 triggers.extend(outcome.triggers);
275 context = context.with_cancellation_token(tool_cancel.clone());
276 }
277 return Ok(crate::ToolBatchEffectOutcome { launches, triggers });
278 }
279 let child_outcomes = schedule_tool_batch(
280 indexed_tools,
281 |(index, _)| *index,
282 |(_, child)| self.tool_scheduling(&child.call.tool_name),
283 {
284 let context = self.clone();
285 let cancellation = cancellation.clone();
286 let tool_cancel = tool_cancel.clone();
287 let child_trace_hooks = std::sync::Arc::clone(&child_trace_hooks);
288 move |(index, child)| {
289 let context = context.clone().with_cancellation_token(tool_cancel.clone());
290 let cancellation = cancellation.clone();
291 let tool_cancel = tool_cancel.clone();
292 let parent_invocation = parent_invocation.clone();
293 let cancelled_tool = child.call.clone();
294 let child_execution_trace_hook =
295 child_trace_hooks.get(&child.call.call_id).cloned();
296 async move {
297 let tool_call = context.execute_prepared_tool_batch_child(
298 child,
299 index,
300 parent_invocation,
301 child_execution_trace_hook,
302 );
303 tokio::pin!(tool_call);
304 tokio::select! {
305 biased;
306 _ = cancellation.cancelled() => {
307 tool_cancel.cancel();
308 let grace = context
309 .dispatch
310 .clock
311 .sleep(std::time::Duration::from_millis(50));
312 tokio::pin!(grace);
313 tokio::select! {
314 biased;
315 outcome = &mut tool_call => outcome,
316 _ = &mut grace => CoordinatedToolLaunch {
317 launch: cancelled_runtime_tool_call_launch(
318 cancelled_tool.call_id,
319 cancelled_tool.tool_name,
320 cancelled_tool.args,
321 cancelled_tool.replay,
322 ),
323 triggers: Vec::new(),
324 },
325 }
326 }
327 outcome = &mut tool_call => outcome,
328 }
329 }
330 }
331 },
332 )
333 .await;
334
335 let mut launches = Vec::with_capacity(child_outcomes.len());
336 let mut triggers = Vec::new();
337 for outcome in child_outcomes {
338 launches.push(outcome.launch);
339 triggers.extend(outcome.triggers);
340 }
341 Ok(crate::ToolBatchEffectOutcome { launches, triggers })
342 }
343
344 async fn execute_prepared_tool_batch_child(
345 &self,
346 child: crate::PreparedToolBatchCall,
347 index: usize,
348 parent_invocation: crate::RuntimeInvocation,
349 child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
350 ) -> CoordinatedToolLaunch {
351 let call_id = child.call.call_id.clone();
352 let tool_name = child.call.tool_name.clone();
353 let args = child.call.args.clone();
354 let replay = child.call.replay.clone();
355 let activity_id = TurnActivityId::new(format!("tool:{call_id}"));
356 self.emit_tool_call_started(&call_id, &tool_name, args.clone(), activity_id.clone())
357 .await;
358
359 let retry_policy = crate::tool_dispatch::resolve_callable_manifest_by_id(
360 self.dispatch.as_ref(),
361 &child.call.tool_id,
362 )
363 .map(|manifest| manifest.retry_policy)
364 .or_else(|| {
365 child
366 .execution_grant
367 .as_ref()
368 .map(|grant| grant.manifest.retry_policy)
369 })
370 .unwrap_or(crate::ToolRetryPolicy::Never);
371 let max_attempts = retry_policy.max_attempts().max(1);
372 let mut triggers = Vec::new();
373
374 for attempt in 1..=max_attempts {
375 let attempt_invocation =
376 self.tool_attempt_invocation(&parent_invocation, &child.replay_suffix, attempt);
377 let attempt_outcome = self
378 .dispatch
379 .effect_controller
380 .controller()
381 .execute_effect(
382 crate::RuntimeEffectEnvelope::new(
383 attempt_invocation,
384 crate::RuntimeEffectCommand::ToolAttempt {
385 call: child.call.clone(),
386 execution_grant: child.execution_grant.clone(),
387 attempt,
388 max_attempts,
389 },
390 ),
391 crate::RuntimeEffectLocalExecutor::tool_batch(
392 self.clone(),
393 child_execution_trace_hook
394 .clone()
395 .map(|hook| {
396 std::iter::once((child.call.call_id.clone(), hook)).collect()
397 })
398 .unwrap_or_default(),
399 ),
400 )
401 .await;
402 let attempt_outcome = match attempt_outcome {
403 Ok(outcome) => match outcome.into_tool_attempt_effect() {
404 Ok(outcome) => outcome,
405 Err(err) => {
406 let completed = self
407 .complete_tool_call(
408 index,
409 call_id.clone(),
410 replay,
411 runtime_failure_dispatch_outcome(
412 Some(call_id.clone()),
413 tool_name,
414 args,
415 "tool_attempt_failed",
416 err.to_string(),
417 ),
418 activity_id,
419 )
420 .await;
421 return CoordinatedToolLaunch {
422 launch: crate::runtime::ToolCallLaunch::Done {
423 result: completed.completed,
424 },
425 triggers,
426 };
427 }
428 },
429 Err(err) => {
430 let completed = self
431 .complete_tool_call(
432 index,
433 call_id.clone(),
434 replay,
435 runtime_failure_dispatch_outcome(
436 Some(call_id.clone()),
437 tool_name,
438 args,
439 "tool_attempt_failed",
440 err.to_string(),
441 ),
442 activity_id,
443 )
444 .await;
445 return CoordinatedToolLaunch {
446 launch: crate::runtime::ToolCallLaunch::Done {
447 result: completed.completed,
448 },
449 triggers,
450 };
451 }
452 };
453 triggers.extend(attempt_outcome.triggers);
454 match attempt_outcome.launch {
455 crate::ToolAttemptLaunch::Pending {
456 key,
457 pending,
458 duration_ms,
459 } => {
460 let dispatch_outcome = self
461 .await_pending_tool_dispatch_outcome_with_suffix(
462 &call_id,
463 Some(parent_invocation.clone()),
464 format!("{}:await", child.replay_suffix),
465 crate::tool_dispatch::PendingToolDispatchOutcome {
466 tool_name: child.call.tool_name.clone(),
467 args: child.call.args.clone(),
468 key,
469 pending,
470 duration_ms,
471 },
472 self.cancellation_token.clone(),
473 )
474 .await;
475 let completed = self
476 .complete_tool_call(
477 index,
478 call_id.clone(),
479 child.call.replay.clone(),
480 dispatch_outcome,
481 activity_id,
482 )
483 .await;
484 return CoordinatedToolLaunch {
485 launch: crate::runtime::ToolCallLaunch::Done {
486 result: completed.completed,
487 },
488 triggers,
489 };
490 }
491 crate::ToolAttemptLaunch::Done { mut record } => {
492 record.call_id = Some(call_id.clone());
493 let retry_after = retry_after_ms(
494 &ToolResult::from_output(record.output.clone()),
495 retry_policy,
496 attempt - 1,
497 );
498 let Some(retry_after) = retry_after else {
499 let completed = self
500 .complete_tool_call(
501 index,
502 call_id.clone(),
503 child.call.replay.clone(),
504 ToolDispatchOutcome { record },
505 activity_id,
506 )
507 .await;
508 return CoordinatedToolLaunch {
509 launch: crate::runtime::ToolCallLaunch::Done {
510 result: completed.completed,
511 },
512 triggers,
513 };
514 };
515 if attempt >= max_attempts {
516 let exhausted =
517 mark_retry_exhausted(ToolResult::from_output(record.output), attempt);
518 record.output = exhausted.into_done_output().unwrap_or_else(|_| {
519 ToolCallOutput::failure(ToolFailure::runtime(
520 ToolFailureClass::Internal,
521 "tool_retry_exhaustion_failed",
522 "retry exhaustion produced a pending output",
523 ))
524 });
525 let completed = self
526 .complete_tool_call(
527 index,
528 call_id.clone(),
529 child.call.replay.clone(),
530 ToolDispatchOutcome { record },
531 activity_id,
532 )
533 .await;
534 return CoordinatedToolLaunch {
535 launch: crate::runtime::ToolCallLaunch::Done {
536 result: completed.completed,
537 },
538 triggers,
539 };
540 }
541 if retry_after > 0
542 && let Err(err) = self
543 .sleep_before_tool_retry(
544 &parent_invocation,
545 &child.replay_suffix,
546 attempt,
547 retry_after,
548 )
549 .await
550 {
551 let completed = self
552 .complete_tool_call(
553 index,
554 call_id.clone(),
555 child.call.replay.clone(),
556 runtime_failure_dispatch_outcome(
557 Some(call_id.clone()),
558 child.call.tool_name.clone(),
559 child.call.args.clone(),
560 "tool_retry_sleep_failed",
561 format!(
562 "retry sleep for tool `{}` failed after attempt {attempt}: {err}",
563 child.call.tool_name
564 ),
565 ),
566 activity_id,
567 )
568 .await;
569 return CoordinatedToolLaunch {
570 launch: crate::runtime::ToolCallLaunch::Done {
571 result: completed.completed,
572 },
573 triggers,
574 };
575 }
576 }
577 }
578 }
579
580 let completed = self
581 .complete_tool_call(
582 index,
583 call_id.clone(),
584 child.call.replay,
585 runtime_failure_dispatch_outcome(
586 Some(call_id),
587 child.call.tool_name,
588 child.call.args,
589 "tool_retry_loop_failed",
590 "tool retry loop exited without a terminal result",
591 ),
592 activity_id,
593 )
594 .await;
595 CoordinatedToolLaunch {
596 launch: crate::runtime::ToolCallLaunch::Done {
597 result: completed.completed,
598 },
599 triggers,
600 }
601 }
602
603 async fn emit_tool_call_started(
604 &self,
605 call_id: &str,
606 name: &str,
607 args: serde_json::Value,
608 activity_id: TurnActivityId,
609 ) {
610 let _ = self
611 .dispatch
612 .event_tx
613 .send(SessionEvent::ToolCallStart {
614 call_id: Some(call_id.to_string()),
615 name: name.to_string(),
616 args: args.clone(),
617 })
618 .await;
619 self.emit_turn_activity(
620 activity_id,
621 TurnEvent::ToolCallStarted {
622 call_id: Some(call_id.to_string()),
623 name: name.to_string(),
624 args,
625 },
626 )
627 .await;
628 }
629
630 fn tool_attempt_invocation(
631 &self,
632 parent_invocation: &crate::RuntimeInvocation,
633 child_replay_suffix: &str,
634 attempt: u32,
635 ) -> crate::RuntimeInvocation {
636 let suffix = format!("{child_replay_suffix}:attempt:{attempt}");
637 let parent_effect_id = parent_invocation.effect_id().unwrap_or("tool-batch");
638 crate::runtime::causal::child_effect_invocation(
639 parent_invocation,
640 format!("{parent_effect_id}:{suffix}"),
641 crate::RuntimeEffectKind::ToolAttempt,
642 suffix,
643 )
644 }
645
646 async fn sleep_before_tool_retry(
647 &self,
648 parent_invocation: &crate::RuntimeInvocation,
649 child_replay_suffix: &str,
650 attempt: u32,
651 retry_after_ms: u64,
652 ) -> Result<(), crate::RuntimeEffectControllerError> {
653 let suffix = format!("{child_replay_suffix}:attempt:{attempt}:sleep");
654 let parent_effect_id = parent_invocation.effect_id().unwrap_or("tool-batch");
655 let invocation = crate::runtime::causal::child_effect_invocation(
656 parent_invocation,
657 format!("{parent_effect_id}:{suffix}"),
658 crate::RuntimeEffectKind::Sleep,
659 suffix,
660 );
661 let cancellation = self.cancellation_token.clone().unwrap_or_default();
662 let outcome = self
663 .dispatch
664 .effect_controller
665 .controller()
666 .execute_effect(
667 crate::RuntimeEffectEnvelope::new(
668 invocation,
669 crate::RuntimeEffectCommand::Sleep {
670 duration_ms: retry_after_ms,
671 },
672 ),
673 crate::RuntimeEffectLocalExecutor::sleep_with_clock(
674 cancellation,
675 std::sync::Arc::clone(&self.dispatch.clock),
676 ),
677 )
678 .await?;
679 match outcome {
680 crate::RuntimeEffectOutcome::Sleep => Ok(()),
681 other => Err(crate::RuntimeEffectControllerError::new(
682 "runtime_effect_wrong_outcome",
683 format!("expected sleep outcome, got {}", other.kind().as_str()),
684 )),
685 }
686 }
687
688 #[expect(
689 clippy::too_many_arguments,
690 reason = "tool execution carries explicit runtime call metadata"
691 )]
692 pub(crate) async fn execute_tool_call(
693 &self,
694 call_id: String,
695 name: String,
696 args: serde_json::Value,
697 index: usize,
698 replay: Option<crate::llm::types::ProviderReplayMeta>,
699 parent_invocation: Option<crate::RuntimeInvocation>,
700 child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
701 ) -> CompletedProtocolToolCall {
702 let _ = self
703 .dispatch
704 .event_tx
705 .send(SessionEvent::ToolCallStart {
706 call_id: Some(call_id.clone()),
707 name: name.clone(),
708 args: args.clone(),
709 })
710 .await;
711 let tool_correlation_id = TurnActivityId::new(format!("tool:{call_id}"));
712 self.emit_turn_activity(
713 tool_correlation_id.clone(),
714 TurnEvent::ToolCallStarted {
715 call_id: Some(call_id.clone()),
716 name: name.clone(),
717 args: args.clone(),
718 },
719 )
720 .await;
721
722 let parent_invocation = parent_invocation.or_else(|| self.parent_invocation.clone());
723 let mut dispatch = (*self.dispatch).clone();
724 dispatch.parent_invocation = parent_invocation.clone();
725 let pending = crate::sansio::PendingToolCall {
726 call_id: call_id.clone(),
727 tool_name: name,
728 args,
729 replay: replay.clone(),
730 };
731 let launch = match prepare_tool_call_with_context(&dispatch, pending, Some(call_id.clone()))
732 .await
733 {
734 ToolPreparationOutcome::Prepared(prepared) => {
735 let dispatch_context = std::sync::Arc::new(dispatch.clone());
736 let runtime_context = if let Some(parent_invocation) = parent_invocation.clone() {
737 self.clone().with_parent_invocation(parent_invocation)
738 } else {
739 self.clone()
740 };
741 let mut tool_context =
742 crate::ToolContext::from_dispatch(std::sync::Arc::clone(&dispatch_context))
743 .runtime_execution_context(runtime_context)
744 .prepared_call(&prepared)
745 .cancellation_token(self.cancellation_token.clone())
746 .runtime_process_id(self.runtime_process_id.clone())
747 .parent_invocation(parent_invocation.clone())
748 .child_execution_trace_hook(child_execution_trace_hook.clone());
749 if let Some(process_events) = self.process_event_context.as_ref() {
750 tool_context = tool_context.process_events(
751 process_events.process_id.clone(),
752 std::sync::Arc::clone(&process_events.registry),
753 process_events.store.clone(),
754 process_events.session_store_factory.clone(),
755 process_events.queued_work_driver.clone(),
756 );
757 }
758 let tool_context = tool_context.build();
759 dispatch_prepared_tool_call_launch_with_execution_context(
760 dispatch_context.as_ref(),
761 prepared,
762 None,
763 tool_context,
764 )
765 .await
766 }
767 ToolPreparationOutcome::Completed(outcome) => ToolCallLaunch::Done(*outcome),
768 };
769 let mut outcome = match launch {
770 ToolCallLaunch::Done(outcome) => outcome,
771 ToolCallLaunch::Pending(pending) => {
772 self.await_pending_tool_dispatch_outcome(
773 &call_id,
774 parent_invocation.clone(),
775 pending,
776 self.cancellation_token.clone(),
777 )
778 .await
779 }
780 };
781 outcome.record.call_id = Some(call_id.clone());
782
783 self.complete_tool_call(index, call_id, replay, outcome, tool_correlation_id)
784 .await
785 }
786
787 #[expect(
788 clippy::too_many_arguments,
789 reason = "tool execution carries explicit runtime call metadata"
790 )]
791 pub(crate) async fn execute_tool_call_by_id(
792 &self,
793 call_id: String,
794 tool_id: crate::ToolId,
795 args: serde_json::Value,
796 index: usize,
797 replay: Option<crate::llm::types::ProviderReplayMeta>,
798 parent_invocation: Option<crate::RuntimeInvocation>,
799 child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
800 ) -> CompletedProtocolToolCall {
801 let Some(manifest) =
802 crate::tool_dispatch::resolve_callable_manifest_by_id(self.dispatch.as_ref(), &tool_id)
803 else {
804 let outcome = ToolDispatchOutcome {
805 record: ToolCallRecord {
806 call_id: Some(call_id.clone()),
807 tool: tool_id.to_string(),
808 args,
809 output: ToolCallOutput::failure(ToolFailure::runtime(
810 ToolFailureClass::Unavailable,
811 "tool_unavailable",
812 format!("Tool id `{tool_id}` is unavailable in this session"),
813 )),
814 duration_ms: 0,
815 },
816 };
817 let activity_id = TurnActivityId::new(format!("tool:{call_id}"));
818 return self
819 .complete_tool_call(index, call_id, replay, outcome, activity_id)
820 .await;
821 };
822 self.execute_tool_call(
823 call_id,
824 manifest.name,
825 args,
826 index,
827 replay,
828 parent_invocation,
829 child_execution_trace_hook,
830 )
831 .await
832 }
833
834 #[expect(
835 clippy::too_many_arguments,
836 reason = "tool execution carries explicit runtime call metadata"
837 )]
838 pub(crate) async fn execute_tool_call_by_grant(
839 &self,
840 call_id: String,
841 grant: crate::ToolExecutionGrant,
842 args: serde_json::Value,
843 index: usize,
844 replay: Option<crate::llm::types::ProviderReplayMeta>,
845 parent_invocation: Option<crate::RuntimeInvocation>,
846 child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
847 ) -> CompletedProtocolToolCall {
848 let name = grant.manifest.name.clone();
849 let _ = self
850 .dispatch
851 .event_tx
852 .send(SessionEvent::ToolCallStart {
853 call_id: Some(call_id.clone()),
854 name: name.clone(),
855 args: args.clone(),
856 })
857 .await;
858 let tool_correlation_id = TurnActivityId::new(format!("tool:{call_id}"));
859 self.emit_turn_activity(
860 tool_correlation_id.clone(),
861 TurnEvent::ToolCallStarted {
862 call_id: Some(call_id.clone()),
863 name: name.clone(),
864 args: args.clone(),
865 },
866 )
867 .await;
868
869 let parent_invocation = parent_invocation.or_else(|| self.parent_invocation.clone());
870 let mut dispatch = (*self.dispatch).clone();
871 dispatch.parent_invocation = parent_invocation.clone();
872 let pending = crate::sansio::PendingToolCall {
873 call_id: call_id.clone(),
874 tool_name: name,
875 args,
876 replay: replay.clone(),
877 };
878 let launch = match prepare_granted_tool_call_with_context(
879 &dispatch,
880 &grant,
881 pending,
882 Some(call_id.clone()),
883 )
884 .await
885 {
886 ToolPreparationOutcome::Prepared(prepared) => {
887 let dispatch_context = std::sync::Arc::new(dispatch.clone());
888 let runtime_context = if let Some(parent_invocation) = parent_invocation.clone() {
889 self.clone().with_parent_invocation(parent_invocation)
890 } else {
891 self.clone()
892 };
893 let mut tool_context =
894 crate::ToolContext::from_dispatch(std::sync::Arc::clone(&dispatch_context))
895 .runtime_execution_context(runtime_context)
896 .prepared_call(&prepared)
897 .tool_execution_binding(grant.execution_binding.clone())
898 .cancellation_token(self.cancellation_token.clone())
899 .runtime_process_id(self.runtime_process_id.clone())
900 .parent_invocation(parent_invocation.clone())
901 .child_execution_trace_hook(child_execution_trace_hook.clone());
902 if let Some(process_events) = self.process_event_context.as_ref() {
903 tool_context = tool_context.process_events(
904 process_events.process_id.clone(),
905 std::sync::Arc::clone(&process_events.registry),
906 process_events.store.clone(),
907 process_events.session_store_factory.clone(),
908 process_events.queued_work_driver.clone(),
909 );
910 }
911 let tool_context = tool_context.build();
912 dispatch_granted_prepared_tool_call_launch_with_execution_context(
913 dispatch_context.as_ref(),
914 &grant,
915 prepared,
916 None,
917 tool_context,
918 )
919 .await
920 }
921 ToolPreparationOutcome::Completed(outcome) => ToolCallLaunch::Done(*outcome),
922 };
923 let mut outcome = match launch {
924 ToolCallLaunch::Done(outcome) => outcome,
925 ToolCallLaunch::Pending(pending) => {
926 self.await_pending_tool_dispatch_outcome(
927 &call_id,
928 parent_invocation.clone(),
929 pending,
930 self.cancellation_token.clone(),
931 )
932 .await
933 }
934 };
935 outcome.record.call_id = Some(call_id.clone());
936
937 self.complete_tool_call(index, call_id, replay, outcome, tool_correlation_id)
938 .await
939 }
940
941 pub(crate) async fn prepare_tool_call(
942 &self,
943 pending: crate::sansio::PendingToolCall,
944 ) -> ToolPreparationOutcome {
945 let call_id = Some(pending.call_id.clone());
946 prepare_tool_call_with_context(self.dispatch.as_ref(), pending, call_id).await
947 }
948
949 pub(crate) async fn execute_prepared_tool_attempt_effect(
950 &self,
951 prepared: crate::PreparedToolCall,
952 execution_grant: Option<Box<crate::ToolExecutionGrant>>,
953 attempt: u32,
954 max_attempts: u32,
955 attempt_invocation: crate::RuntimeInvocation,
956 child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
957 ) -> Result<crate::ToolAttemptEffectOutcome, crate::RuntimeEffectControllerError> {
958 let call_id = prepared.call_id.clone();
959 let mut attempt_dispatch = (*self.dispatch).clone();
960 attempt_dispatch.parent_invocation = Some(attempt_invocation.clone());
961 attempt_dispatch.trigger_outcomes =
962 crate::tool_dispatch::ToolTriggerOutcomeBuffer::default();
963 let attempt_dispatch = std::sync::Arc::new(attempt_dispatch);
964 let mut attempt_context = self.clone();
965 attempt_context.dispatch = std::sync::Arc::clone(&attempt_dispatch);
966 attempt_context.parent_invocation = Some(attempt_invocation.clone());
967
968 let mut tool_context =
969 crate::ToolContext::from_dispatch(std::sync::Arc::clone(&attempt_dispatch))
970 .runtime_execution_context(attempt_context.clone())
971 .prepared_call(&prepared)
972 .cancellation_token(self.cancellation_token.clone())
973 .runtime_process_id(self.runtime_process_id.clone())
974 .parent_invocation(Some(attempt_invocation))
975 .child_execution_trace_hook(child_execution_trace_hook);
976 if let Some(process_events) = self.process_event_context.as_ref() {
977 tool_context = tool_context.process_events(
978 process_events.process_id.clone(),
979 std::sync::Arc::clone(&process_events.registry),
980 process_events.store.clone(),
981 process_events.session_store_factory.clone(),
982 process_events.queued_work_driver.clone(),
983 );
984 }
985 let tool_context = tool_context.build();
986 let launch = match Box::pin(async {
987 if let Some(grant) = execution_grant.as_ref() {
988 dispatch_granted_prepared_tool_attempt_launch_with_execution_context(
989 attempt_dispatch.as_ref(),
990 grant,
991 prepared,
992 attempt,
993 max_attempts,
994 None,
995 tool_context,
996 )
997 .await
998 } else {
999 dispatch_prepared_tool_attempt_launch_with_execution_context(
1000 attempt_dispatch.as_ref(),
1001 prepared,
1002 attempt,
1003 max_attempts,
1004 None,
1005 tool_context,
1006 )
1007 .await
1008 }
1009 })
1010 .await
1011 {
1012 ToolCallLaunch::Done(outcome) => {
1013 let mut record = outcome.record;
1014 record.call_id = Some(call_id);
1015 crate::ToolAttemptLaunch::Done { record }
1016 }
1017 ToolCallLaunch::Pending(pending) => crate::ToolAttemptLaunch::Pending {
1018 key: pending.key,
1019 pending: pending.pending,
1020 duration_ms: pending.duration_ms,
1021 },
1022 };
1023 let triggers = attempt_context
1024 .drain_tool_trigger_outcomes()
1025 .map_err(|err| {
1026 crate::RuntimeEffectControllerError::new(
1027 "tool_trigger_outcome_drain",
1028 err.to_string(),
1029 )
1030 })?;
1031 Ok(crate::ToolAttemptEffectOutcome { launch, triggers })
1032 }
1033
1034 pub(super) async fn await_process_with_cancellation(
1035 &self,
1036 process_id: &str,
1037 parent_invocation: Option<crate::RuntimeInvocation>,
1038 cancellation: Option<tokio_util::sync::CancellationToken>,
1039 ) -> Result<crate::ProcessAwaitOutput, crate::PluginError> {
1040 let _phase = self.named_phase("process.await_handle");
1041 if let Some(cancellation) = cancellation {
1042 tokio::select! {
1043 result = self.dispatch.processes.await_process(
1044 process_id,
1045 self.process_scope(parent_invocation.clone()),
1046 ) => result,
1047 _ = cancellation.cancelled() => {
1048 let _ = self.dispatch.processes.cancel(
1049 &self.dispatch.session_id,
1050 process_id,
1051 self.process_scope(parent_invocation.clone()),
1052 ).await;
1053 self.dispatch.processes.await_process(
1054 process_id,
1055 self.process_scope(parent_invocation),
1056 ).await
1057 }
1058 }
1059 } else {
1060 self.dispatch
1061 .processes
1062 .await_process(process_id, self.process_scope(parent_invocation))
1063 .await
1064 }
1065 }
1066
1067 pub(crate) async fn complete_tool_call(
1068 &self,
1069 _index: usize,
1070 call_id: String,
1071 replay: Option<crate::llm::types::ProviderReplayMeta>,
1072 outcome: ToolDispatchOutcome,
1073 tool_correlation_id: TurnActivityId,
1074 ) -> CompletedProtocolToolCall {
1075 let output = outcome.record.output.clone();
1076 let projection_output = output.clone();
1077 let projection_tool_name = outcome.record.tool.clone();
1078 let projection_args = outcome.record.args.clone();
1079 let projection_duration_ms = outcome.record.duration_ms;
1080 let projection_call_id = call_id.clone();
1081 tokio::task::yield_now().await;
1082 let plugins = std::sync::Arc::clone(&self.dispatch.plugins);
1083 let projection_context = crate::plugin::ToolResultProjectionContext {
1084 session_id: self.dispatch.session_id.clone(),
1085 tool_name: projection_tool_name,
1086 args: projection_args,
1087 output: projection_output,
1088 duration_ms: projection_duration_ms,
1089 call_id: projection_call_id,
1090 };
1091 let model_return = match plugins.project_tool_result(projection_context).await {
1092 Ok(projected) => projected,
1093 Err(err) => ModelToolReturn::text(
1094 call_id.clone(),
1095 outcome.record.tool.clone(),
1096 err.to_string(),
1097 ),
1098 };
1099
1100 self.emit_turn_activity(
1101 tool_correlation_id,
1102 TurnEvent::ToolCallCompleted {
1103 call_id: Some(call_id.clone()),
1104 name: outcome.record.tool.clone(),
1105 args: outcome.record.args.clone(),
1106 output: output.clone(),
1107 duration_ms: outcome.record.duration_ms,
1108 },
1109 )
1110 .await;
1111
1112 let record = ToolCallRecord {
1113 call_id: Some(call_id.clone()),
1114 tool: outcome.record.tool.clone(),
1115 args: outcome.record.args.clone(),
1116 output: output.clone(),
1117 duration_ms: outcome.record.duration_ms,
1118 };
1119 CompletedProtocolToolCall {
1120 completed: crate::sansio::CompletedToolCall {
1121 call_id,
1122 tool_name: outcome.record.tool,
1123 args: outcome.record.args,
1124 output,
1125 model_return,
1126 duration_ms: outcome.record.duration_ms,
1127 replay,
1128 },
1129 record,
1130 }
1131 }
1132
1133 pub(crate) async fn pending_completion_dispatch_outcome(
1134 &self,
1135 tool_name: String,
1136 args: serde_json::Value,
1137 resolution: crate::Resolution,
1138 duration_ms: u64,
1139 ) -> ToolDispatchOutcome {
1140 let output = crate::tool_result::tool_output_from_completion_resolution(resolution);
1141 let result = finalize_tool_result_with_execution_context(
1142 self.dispatch.as_ref(),
1143 &tool_name,
1144 &args,
1145 ToolResult::from_output(output),
1146 duration_ms,
1147 )
1148 .await;
1149 let output = result.into_done_output().unwrap_or_else(|_| {
1150 ToolCallOutput::failure(ToolFailure::runtime(
1151 ToolFailureClass::Internal,
1152 "pending_tool_not_finalized",
1153 "pending tool result reached a completed-output projection path",
1154 ))
1155 });
1156 ToolDispatchOutcome {
1157 record: ToolCallRecord {
1158 call_id: None,
1159 tool: tool_name,
1160 args,
1161 output,
1162 duration_ms,
1163 },
1164 }
1165 }
1166
1167 async fn await_pending_tool_dispatch_outcome(
1168 &self,
1169 call_id: &str,
1170 parent_invocation: Option<crate::RuntimeInvocation>,
1171 pending: crate::tool_dispatch::PendingToolDispatchOutcome,
1172 cancellation: Option<tokio_util::sync::CancellationToken>,
1173 ) -> ToolDispatchOutcome {
1174 self.await_pending_tool_dispatch_outcome_with_suffix(
1175 call_id,
1176 parent_invocation,
1177 format!("{call_id}:await"),
1178 pending,
1179 cancellation,
1180 )
1181 .await
1182 }
1183
1184 async fn await_pending_tool_dispatch_outcome_with_suffix(
1185 &self,
1186 call_id: &str,
1187 parent_invocation: Option<crate::RuntimeInvocation>,
1188 replay_suffix: String,
1189 pending: crate::tool_dispatch::PendingToolDispatchOutcome,
1190 cancellation: Option<tokio_util::sync::CancellationToken>,
1191 ) -> ToolDispatchOutcome {
1192 let fallback;
1193 let parent = if let Some(parent) = parent_invocation.as_ref() {
1194 parent
1195 } else {
1196 fallback = crate::RuntimeInvocation::effect(
1197 crate::RuntimeScope::new(&self.dispatch.session_id),
1198 format!("tool:{call_id}:await"),
1199 crate::RuntimeEffectKind::AwaitEvent,
1200 format!("tool:{call_id}:await"),
1201 );
1202 &fallback
1203 };
1204 let parent_effect_id = parent.effect_id().unwrap_or("tool");
1205 let invocation = crate::runtime::causal::child_effect_invocation(
1206 parent,
1207 format!("{parent_effect_id}:{replay_suffix}"),
1208 crate::RuntimeEffectKind::AwaitEvent,
1209 replay_suffix,
1210 );
1211 let cancellation = cancellation.unwrap_or_default();
1212 let deadline = pending
1213 .pending
1214 .deadline
1215 .map(|duration| self.dispatch.clock.now() + duration);
1216 let outcome = self
1217 .dispatch
1218 .effect_controller
1219 .controller()
1220 .execute_effect(
1221 crate::RuntimeEffectEnvelope::new(
1222 invocation,
1223 crate::RuntimeEffectCommand::AwaitEvent { key: pending.key },
1224 ),
1225 crate::RuntimeEffectLocalExecutor::await_event_with_clock(
1226 cancellation,
1227 deadline,
1228 std::sync::Arc::clone(&self.dispatch.clock),
1229 ),
1230 )
1231 .await;
1232 let resolution = match outcome.and_then(crate::RuntimeEffectOutcome::into_await_event) {
1233 Ok(resolution) => resolution,
1234 Err(err) => {
1235 return ToolDispatchOutcome {
1236 record: ToolCallRecord {
1237 call_id: None,
1238 tool: pending.tool_name,
1239 args: pending.args,
1240 output: ToolCallOutput::failure(ToolFailure::runtime(
1241 ToolFailureClass::Internal,
1242 "pending_tool_completion_failed",
1243 err.to_string(),
1244 )),
1245 duration_ms: pending.duration_ms,
1246 },
1247 };
1248 }
1249 };
1250 self.pending_completion_dispatch_outcome(
1251 pending.tool_name,
1252 pending.args,
1253 resolution,
1254 pending.duration_ms,
1255 )
1256 .await
1257 }
1258
1259 pub async fn call_tool_by_id(
1260 &self,
1261 call_id: String,
1262 tool_id: crate::ToolId,
1263 args: serde_json::Value,
1264 index: usize,
1265 ) -> ToolInvocationReply {
1266 let executed = self
1267 .execute_tool_call_by_id(call_id, tool_id, args, index, None, None, None)
1268 .await;
1269 let reply = ToolInvocationReply::from_output(executed.completed.output);
1270 reply.with_record(executed.record)
1271 }
1272
1273 pub async fn call_tool_by_id_with_child_execution_trace_hook(
1274 &self,
1275 call_id: String,
1276 tool_id: crate::ToolId,
1277 args: serde_json::Value,
1278 index: usize,
1279 trace_hook: crate::ToolChildExecutionTraceHook,
1280 ) -> ToolInvocationReply {
1281 let executed = self
1282 .execute_tool_call_by_id(call_id, tool_id, args, index, None, None, Some(trace_hook))
1283 .await;
1284 let reply = ToolInvocationReply::from_output(executed.completed.output);
1285 reply.with_record(executed.record)
1286 }
1287
1288 pub async fn call_tool_with_execution_grant(
1289 &self,
1290 call_id: String,
1291 grant: crate::ToolExecutionGrant,
1292 args: serde_json::Value,
1293 index: usize,
1294 ) -> ToolInvocationReply {
1295 let executed = self
1296 .execute_tool_call_by_grant(call_id, grant, args, index, None, None, None)
1297 .await;
1298 let reply = ToolInvocationReply::from_output(executed.completed.output);
1299 reply.with_record(executed.record)
1300 }
1301
1302 pub async fn call_tool_with_execution_grant_and_child_execution_trace_hook(
1303 &self,
1304 call_id: String,
1305 grant: crate::ToolExecutionGrant,
1306 args: serde_json::Value,
1307 index: usize,
1308 trace_hook: crate::ToolChildExecutionTraceHook,
1309 ) -> ToolInvocationReply {
1310 let executed = self
1311 .execute_tool_call_by_grant(call_id, grant, args, index, None, None, Some(trace_hook))
1312 .await;
1313 let reply = ToolInvocationReply::from_output(executed.completed.output);
1314 reply.with_record(executed.record)
1315 }
1316
1317 pub async fn call_tool_batch(&self, calls: Vec<ToolInvocation>) -> Vec<ToolInvocationReply> {
1318 if calls.is_empty() {
1319 return Vec::new();
1320 }
1321
1322 let batch_id = deterministic_tool_invocation_batch_id(&calls);
1323 let mut replies = vec![None; calls.len()];
1324 let mut prepared_entries = Vec::new();
1325
1326 for (index, call) in calls.into_iter().enumerate() {
1327 let preparation = if let Some(grant) = call.execution_grant.as_deref().cloned() {
1328 let pending = crate::sansio::PendingToolCall {
1329 call_id: call.id.clone(),
1330 tool_name: grant.manifest.name.clone(),
1331 args: call.args,
1332 replay: None,
1333 };
1334 (
1335 Some(grant.clone()),
1336 prepare_granted_tool_call_with_context(
1337 self.dispatch.as_ref(),
1338 &grant,
1339 pending,
1340 Some(call.id.clone()),
1341 )
1342 .await,
1343 )
1344 } else {
1345 let Some(manifest) = crate::tool_dispatch::resolve_callable_manifest_by_id(
1346 self.dispatch.as_ref(),
1347 &call.tool_id,
1348 ) else {
1349 let outcome = ToolDispatchOutcome {
1350 record: ToolCallRecord {
1351 call_id: Some(call.id.clone()),
1352 tool: call.tool_id.to_string(),
1353 args: call.args,
1354 output: ToolCallOutput::failure(ToolFailure::runtime(
1355 ToolFailureClass::Unavailable,
1356 "tool_unavailable",
1357 format!(
1358 "Tool id `{}` is unavailable in this session",
1359 call.tool_id
1360 ),
1361 )),
1362 duration_ms: 0,
1363 },
1364 };
1365 let completed = self
1366 .complete_tool_call(
1367 index,
1368 call.id,
1369 None,
1370 outcome,
1371 TurnActivityId::new(format!("tool:{}", batch_id)),
1372 )
1373 .await;
1374 replies[index] = Some(
1375 ToolInvocationReply::from_output(completed.completed.output)
1376 .with_record(completed.record),
1377 );
1378 continue;
1379 };
1380
1381 let pending = crate::sansio::PendingToolCall {
1382 call_id: call.id.clone(),
1383 tool_name: manifest.name,
1384 args: call.args,
1385 replay: None,
1386 };
1387 (None, self.prepare_tool_call(pending).await)
1388 };
1389 let (execution_grant, preparation) = preparation;
1390 match preparation {
1391 ToolPreparationOutcome::Prepared(prepared) => {
1392 prepared_entries.push((
1393 index,
1394 prepared,
1395 execution_grant,
1396 call.child_execution_trace_hook,
1397 ));
1398 }
1399 ToolPreparationOutcome::Completed(outcome) => {
1400 let completed = self
1401 .complete_tool_call(
1402 index,
1403 call.id,
1404 None,
1405 *outcome,
1406 TurnActivityId::new(format!("tool:{}", batch_id)),
1407 )
1408 .await;
1409 replies[index] = Some(
1410 ToolInvocationReply::from_output(completed.completed.output)
1411 .with_record(completed.record),
1412 );
1413 }
1414 }
1415 }
1416
1417 if !prepared_entries.is_empty() {
1418 let invocation = self.tool_batch_invocation(&batch_id);
1419 let batch = crate::PreparedToolBatch::new_with_grants(
1420 batch_id.clone(),
1421 prepared_entries
1422 .iter()
1423 .map(|(_, prepared, grant, _)| (prepared.clone(), grant.clone()))
1424 .collect(),
1425 );
1426 let child_trace_hooks = prepared_entries
1427 .iter()
1428 .filter_map(|(_, prepared, _, hook)| {
1429 hook.clone().map(|hook| (prepared.call_id.clone(), hook))
1430 })
1431 .collect();
1432 let envelope = crate::RuntimeEffectEnvelope::new(
1433 invocation.clone(),
1434 crate::RuntimeEffectCommand::ToolBatch { batch },
1435 );
1436 let local_executor =
1437 crate::RuntimeEffectLocalExecutor::tool_batch(self.clone(), child_trace_hooks);
1438 let raw_outcome = self
1439 .dispatch
1440 .effect_controller
1441 .controller()
1442 .execute_effect(envelope, local_executor)
1443 .await;
1444 let outcome =
1445 match raw_outcome.and_then(crate::RuntimeEffectOutcome::into_tool_batch_effect) {
1446 Ok(outcome) => outcome,
1447 Err(err) => {
1448 for (index, prepared, _, _) in prepared_entries {
1449 replies[index] = Some(ToolInvocationReply::error(serde_json::json!(
1450 format!("tool batch failed: {err}")
1451 )));
1452 let _ = prepared;
1453 }
1454 return replies
1455 .into_iter()
1456 .map(|reply| reply.expect("every batch reply slot should be filled"))
1457 .collect();
1458 }
1459 };
1460 if outcome.launches.len() != prepared_entries.len() {
1461 let message = format!(
1462 "tool batch returned {} launches for {} prepared calls",
1463 outcome.launches.len(),
1464 prepared_entries.len()
1465 );
1466 for (index, _, _, _) in prepared_entries {
1467 replies[index] = Some(ToolInvocationReply::error(serde_json::json!(message)));
1468 }
1469 } else {
1470 for ((index, prepared, _, _), launch) in
1471 prepared_entries.into_iter().zip(outcome.launches)
1472 {
1473 let call_id = prepared.call_id.clone();
1474 let reply = match launch {
1475 crate::runtime::ToolCallLaunch::Done { result } => {
1476 let record = ToolCallRecord {
1477 call_id: Some(result.call_id.clone()),
1478 tool: result.tool_name.clone(),
1479 args: result.args.clone(),
1480 output: result.output.clone(),
1481 duration_ms: result.duration_ms,
1482 };
1483 ToolInvocationReply::from_output(result.output).with_record(record)
1484 }
1485 crate::runtime::ToolCallLaunch::Pending {
1486 key,
1487 pending,
1488 duration_ms,
1489 } => {
1490 let dispatch_outcome = self
1491 .await_pending_tool_dispatch_outcome(
1492 &call_id,
1493 Some(invocation.clone()),
1494 crate::tool_dispatch::PendingToolDispatchOutcome {
1495 tool_name: prepared.tool_name.clone(),
1496 args: prepared.args.clone(),
1497 key,
1498 pending,
1499 duration_ms,
1500 },
1501 self.cancellation_token.clone(),
1502 )
1503 .await;
1504 let completed = self
1505 .complete_tool_call(
1506 index,
1507 call_id.clone(),
1508 prepared.replay.clone(),
1509 dispatch_outcome,
1510 TurnActivityId::new(format!("tool:{call_id}")),
1511 )
1512 .await;
1513 ToolInvocationReply::from_output(completed.completed.output)
1514 .with_record(completed.record)
1515 }
1516 };
1517 replies[index] = Some(reply);
1518 }
1519 }
1520 }
1521
1522 replies
1523 .into_iter()
1524 .map(|reply| reply.expect("every batch reply slot should be filled"))
1525 .collect()
1526 }
1527
1528 pub async fn start_tool_call(
1529 &self,
1530 call_id: String,
1531 name: String,
1532 args: serde_json::Value,
1533 ) -> ToolInvocationReply {
1534 self.start_tool_process(call_id, name, args).await
1535 }
1536
1537 pub async fn await_tool_handle(
1538 &self,
1539 call_id: String,
1540 handle: serde_json::Value,
1541 ) -> ToolInvocationReply {
1542 self.await_process_handle(call_id, handle).await
1543 }
1544
1545 pub async fn cancel_tool_handle(
1546 &self,
1547 call_id: String,
1548 handle: serde_json::Value,
1549 ) -> ToolInvocationReply {
1550 self.cancel_process_handle(call_id, handle).await
1551 }
1552
1553 pub async fn signal_tool_handle(
1554 &self,
1555 call_id: String,
1556 handle: serde_json::Value,
1557 signal_name: String,
1558 payload: serde_json::Value,
1559 ) -> ToolInvocationReply {
1560 self.signal_process_handle(call_id, handle, signal_name, payload)
1561 .await
1562 }
1563}