1use super::execution_context::RuntimeExecutionContext;
2use crate::tool_dispatch::{
3 ToolCallLaunch, ToolDispatchOutcome, ToolPreparationOutcome,
4 dispatch_prepared_tool_call_launch_with_execution_context,
5 finalize_tool_result_with_execution_context, prepare_tool_call_with_context,
6 schedule_tool_batch,
7};
8use crate::{
9 ModelToolReturn, SessionEvent, ToolCallOutput, ToolCallRecord, ToolCancellation, ToolFailure,
10 ToolFailureClass, ToolResult, TurnActivityId, TurnEvent,
11};
12use std::collections::HashMap;
13
14#[derive(Clone)]
15pub struct ToolInvocation {
16 pub id: String,
17 pub tool_id: crate::ToolId,
18 pub args: serde_json::Value,
19 pub child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
20}
21
22impl ToolInvocation {
23 pub fn new(id: impl Into<String>, tool_id: crate::ToolId, args: serde_json::Value) -> Self {
24 Self {
25 id: id.into(),
26 tool_id,
27 args,
28 child_execution_trace_hook: None,
29 }
30 }
31
32 pub fn label(&self) -> String {
33 self.tool_id.to_string()
34 }
35
36 pub fn with_child_execution_trace_hook(
37 mut self,
38 hook: crate::ToolChildExecutionTraceHook,
39 ) -> Self {
40 self.child_execution_trace_hook = Some(hook);
41 self
42 }
43}
44
45impl std::fmt::Debug for ToolInvocation {
46 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
47 f.debug_struct("ToolInvocation")
48 .field("id", &self.id)
49 .field("tool_id", &self.tool_id)
50 .field("args", &self.args)
51 .field(
52 "child_execution_trace_hook",
53 &self.child_execution_trace_hook.as_ref().map(|_| "<hook>"),
54 )
55 .finish()
56 }
57}
58
59#[derive(Clone, Debug)]
60pub struct ToolInvocationReply {
61 pub output: ToolCallOutput,
62 pub record: Option<ToolCallRecord>,
63}
64
65impl ToolInvocationReply {
66 pub fn success(value: serde_json::Value) -> Self {
67 Self {
68 output: ToolCallOutput::success(value),
69 record: None,
70 }
71 }
72
73 pub fn error(value: serde_json::Value) -> Self {
74 let message = value
75 .as_str()
76 .map(ToOwned::to_owned)
77 .unwrap_or_else(|| value.to_string());
78 let mut failure = ToolFailure::tool(ToolFailureClass::Execution, "tool_error", message);
79 failure.raw =
80 Some(serde_json::from_value(value).unwrap_or_else(|_| {
81 crate::ToolValue::String("unserializable tool error".to_string())
82 }));
83 Self {
84 output: ToolCallOutput::failure(failure),
85 record: None,
86 }
87 }
88
89 pub fn from_output(output: ToolCallOutput) -> Self {
90 Self {
91 output,
92 record: None,
93 }
94 }
95
96 pub fn cancelled(message: impl Into<String>) -> Self {
97 Self::from_output(ToolCallOutput::cancelled(ToolCancellation::runtime(
98 message,
99 )))
100 }
101
102 pub(crate) fn with_record(mut self, record: ToolCallRecord) -> Self {
103 self.record = Some(record);
104 self
105 }
106}
107
108#[derive(Clone, Debug)]
109pub(crate) struct CompletedProtocolToolCall {
110 pub completed: crate::sansio::CompletedToolCall,
111 pub record: ToolCallRecord,
112}
113
114pub(crate) enum ProtocolToolCallLaunch {
115 Done(CompletedProtocolToolCall),
116 Pending(crate::tool_dispatch::PendingToolDispatchOutcome),
117}
118
119fn cancelled_runtime_tool_call_launch(
120 call_id: String,
121 tool_name: String,
122 args: serde_json::Value,
123 replay: Option<crate::llm::types::ProviderReplayMeta>,
124) -> crate::runtime::ToolCallLaunch {
125 crate::runtime::ToolCallLaunch::Done {
126 result: cancelled_completed_tool_call(call_id, tool_name, args, replay),
127 }
128}
129
130fn cancelled_completed_tool_call(
131 call_id: String,
132 tool_name: String,
133 args: serde_json::Value,
134 replay: Option<crate::llm::types::ProviderReplayMeta>,
135) -> crate::sansio::CompletedToolCall {
136 let output = ToolCallOutput::cancelled(ToolCancellation::runtime("tool call cancelled"));
137 crate::sansio::CompletedToolCall {
138 call_id: call_id.clone(),
139 tool_name: tool_name.clone(),
140 args,
141 model_return: ModelToolReturn {
142 call_id,
143 tool_name,
144 parts: vec![crate::ModelToolReturnPart::text(
145 "[Tool execution cancelled]\ntool call cancelled".to_string(),
146 )],
147 },
148 output,
149 duration_ms: 0,
150 replay,
151 }
152}
153
154fn deterministic_tool_invocation_batch_id(calls: &[ToolInvocation]) -> String {
155 let identity = calls
156 .iter()
157 .map(|call| {
158 serde_json::json!({
159 "id": call.id.clone(),
160 "tool_id": call.tool_id.to_string(),
161 "args": call.args.clone(),
162 })
163 })
164 .collect::<Vec<_>>();
165 let digest = crate::stable_hash::stable_json_sha256_hex(&identity)
166 .unwrap_or_else(|_| format!("len-{}", calls.len()));
167 format!("tool-batch:{digest}")
168}
169
170#[derive(Clone)]
171pub(crate) struct PreparedToolRun {
172 pub prepared: crate::PreparedToolCall,
173 pub index: usize,
174 pub parent_invocation: Option<crate::RuntimeInvocation>,
175 pub activity_id: TurnActivityId,
176}
177
178impl RuntimeExecutionContext<'_> {
179 fn tool_batch_invocation(&self, batch_id: &str) -> crate::RuntimeInvocation {
180 let suffix = format!("tool-batch:{batch_id}");
181 if let Some(parent) = self.parent_invocation.as_ref() {
182 let parent_effect_id = parent.effect_id().unwrap_or("effect");
183 return crate::runtime::causal::child_effect_invocation(
184 parent,
185 format!("{parent_effect_id}:{suffix}"),
186 crate::RuntimeEffectKind::ToolBatch,
187 suffix,
188 );
189 }
190 let replay_key = format!("{}:{suffix}", self.execution_scope_id());
191 crate::RuntimeInvocation::effect(
192 crate::RuntimeScope::new(self.session_id.clone()),
193 suffix,
194 crate::RuntimeEffectKind::ToolBatch,
195 replay_key,
196 )
197 }
198
199 fn should_execute_child_tool_batch_locally(&self) -> bool {
200 self.parent_invocation
201 .as_ref()
202 .and_then(crate::RuntimeInvocation::effect_kind)
203 == Some(crate::RuntimeEffectKind::ToolBatch)
204 && self
205 .dispatch
206 .effect_controller
207 .controller()
208 .durability_tier()
209 == crate::DurabilityTier::Durable
210 }
211
212 pub(crate) async fn execute_prepared_tool_batch_launches(
213 &self,
214 batch: crate::PreparedToolBatch,
215 parent_invocation: crate::RuntimeInvocation,
216 child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
217 ) -> Result<crate::ToolBatchEffectOutcome, crate::RuntimeEffectControllerError> {
218 let indexed_tools = batch.calls.into_iter().enumerate().collect::<Vec<_>>();
219 let cancellation = self.cancellation_token.clone().unwrap_or_default();
220 let tool_cancel = cancellation.child_token();
221 let child_trace_hooks = std::sync::Arc::new(child_trace_hooks);
222 let outcomes = schedule_tool_batch(
223 indexed_tools,
224 |(index, _)| *index,
225 |(_, child)| self.tool_scheduling(&child.call.tool_name),
226 {
227 let context = self.clone();
228 let cancellation = cancellation.clone();
229 let tool_cancel = tool_cancel.clone();
230 let child_trace_hooks = std::sync::Arc::clone(&child_trace_hooks);
231 move |(index, child)| {
232 let context = context.clone().with_cancellation_token(tool_cancel.clone());
233 let cancellation = cancellation.clone();
234 let tool_cancel = tool_cancel.clone();
235 let parent_invocation = parent_invocation.clone();
236 let cancelled_tool = child.call.clone();
237 let child_execution_trace_hook =
238 child_trace_hooks.get(&child.call.call_id).cloned();
239 async move {
240 let tool_call = context.execute_prepared_tool_call_launch(
241 child.call,
242 index,
243 Some(parent_invocation),
244 child_execution_trace_hook,
245 );
246 tokio::pin!(tool_call);
247 tokio::select! {
248 biased;
249 _ = cancellation.cancelled() => {
250 tool_cancel.cancel();
251 let grace = context
252 .dispatch
253 .clock
254 .sleep(std::time::Duration::from_millis(50));
255 tokio::pin!(grace);
256 tokio::select! {
257 biased;
258 outcome = &mut tool_call => outcome,
259 _ = &mut grace => cancelled_runtime_tool_call_launch(
260 cancelled_tool.call_id,
261 cancelled_tool.tool_name,
262 cancelled_tool.args,
263 cancelled_tool.replay,
264 ),
265 }
266 }
267 outcome = &mut tool_call => outcome,
268 }
269 }
270 }
271 },
272 )
273 .await;
274
275 let triggers = self.drain_tool_trigger_outcomes().map_err(|err| {
276 crate::RuntimeEffectControllerError::new("tool_trigger_outcome_drain", err.to_string())
277 })?;
278 Ok(crate::ToolBatchEffectOutcome {
279 launches: outcomes,
280 triggers,
281 })
282 }
283
284 fn prepared_tool_run(
285 &self,
286 prepared: crate::PreparedToolCall,
287 index: usize,
288 parent_invocation: Option<crate::RuntimeInvocation>,
289 ) -> PreparedToolRun {
290 let activity_id = TurnActivityId::new(format!("tool:{}", prepared.call_id));
291 PreparedToolRun {
292 prepared,
293 index,
294 parent_invocation,
295 activity_id,
296 }
297 }
298
299 #[expect(
300 clippy::too_many_arguments,
301 reason = "tool execution carries explicit runtime call metadata"
302 )]
303 pub(crate) async fn execute_tool_call(
304 &self,
305 call_id: String,
306 name: String,
307 args: serde_json::Value,
308 index: usize,
309 replay: Option<crate::llm::types::ProviderReplayMeta>,
310 parent_invocation: Option<crate::RuntimeInvocation>,
311 child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
312 ) -> CompletedProtocolToolCall {
313 let _ = self
314 .dispatch
315 .event_tx
316 .send(SessionEvent::ToolCallStart {
317 call_id: Some(call_id.clone()),
318 name: name.clone(),
319 args: args.clone(),
320 })
321 .await;
322 let tool_correlation_id = TurnActivityId::new(format!("tool:{call_id}"));
323 self.emit_turn_activity(
324 tool_correlation_id.clone(),
325 TurnEvent::ToolCallStarted {
326 call_id: Some(call_id.clone()),
327 name: name.clone(),
328 args: args.clone(),
329 },
330 )
331 .await;
332
333 let parent_invocation = parent_invocation.or_else(|| self.parent_invocation.clone());
334 let mut dispatch = (*self.dispatch).clone();
335 dispatch.parent_invocation = parent_invocation.clone();
336 let pending = crate::sansio::PendingToolCall {
337 call_id: call_id.clone(),
338 tool_name: name,
339 args,
340 replay: replay.clone(),
341 };
342 let launch = match prepare_tool_call_with_context(&dispatch, pending, Some(call_id.clone()))
343 .await
344 {
345 ToolPreparationOutcome::Prepared(prepared) => {
346 let dispatch_context = std::sync::Arc::new(dispatch.clone());
347 let runtime_context = if let Some(parent_invocation) = parent_invocation.clone() {
348 self.clone().with_parent_invocation(parent_invocation)
349 } else {
350 self.clone()
351 };
352 let mut tool_context =
353 crate::ToolContext::from_dispatch(std::sync::Arc::clone(&dispatch_context))
354 .runtime_execution_context(runtime_context)
355 .prepared_call(&prepared)
356 .cancellation_token(self.cancellation_token.clone())
357 .runtime_process_id(self.runtime_process_id.clone())
358 .parent_invocation(parent_invocation.clone())
359 .child_execution_trace_hook(child_execution_trace_hook.clone());
360 if let Some(process_events) = self.process_event_context.as_ref() {
361 tool_context = tool_context.process_events(
362 process_events.process_id.clone(),
363 std::sync::Arc::clone(&process_events.registry),
364 process_events.store.clone(),
365 process_events.session_store_factory.clone(),
366 process_events.queued_work_driver.clone(),
367 );
368 }
369 let tool_context = tool_context.build();
370 dispatch_prepared_tool_call_launch_with_execution_context(
371 dispatch_context.as_ref(),
372 prepared,
373 None,
374 tool_context,
375 )
376 .await
377 }
378 ToolPreparationOutcome::Completed(outcome) => ToolCallLaunch::Done(*outcome),
379 };
380 let mut outcome = match launch {
381 ToolCallLaunch::Done(outcome) => outcome,
382 ToolCallLaunch::Pending(pending) => {
383 self.await_pending_tool_dispatch_outcome(
384 &call_id,
385 parent_invocation.clone(),
386 pending,
387 self.cancellation_token.clone(),
388 )
389 .await
390 }
391 };
392 outcome.record.call_id = Some(call_id.clone());
393
394 self.complete_tool_call(index, call_id, replay, outcome, tool_correlation_id)
395 .await
396 }
397
398 #[expect(
399 clippy::too_many_arguments,
400 reason = "tool execution carries explicit runtime call metadata"
401 )]
402 pub(crate) async fn execute_tool_call_by_id(
403 &self,
404 call_id: String,
405 tool_id: crate::ToolId,
406 args: serde_json::Value,
407 index: usize,
408 replay: Option<crate::llm::types::ProviderReplayMeta>,
409 parent_invocation: Option<crate::RuntimeInvocation>,
410 child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
411 ) -> CompletedProtocolToolCall {
412 let Some(manifest) =
413 crate::tool_dispatch::resolve_callable_manifest_by_id(self.dispatch.as_ref(), &tool_id)
414 else {
415 let outcome = ToolDispatchOutcome {
416 record: ToolCallRecord {
417 call_id: Some(call_id.clone()),
418 tool: tool_id.to_string(),
419 args,
420 output: ToolCallOutput::failure(ToolFailure::runtime(
421 ToolFailureClass::Unavailable,
422 "tool_unavailable",
423 format!("Tool id `{tool_id}` is unavailable in this session"),
424 )),
425 duration_ms: 0,
426 },
427 };
428 let activity_id = TurnActivityId::new(format!("tool:{call_id}"));
429 return self
430 .complete_tool_call(index, call_id, replay, outcome, activity_id)
431 .await;
432 };
433 self.execute_tool_call(
434 call_id,
435 manifest.name,
436 args,
437 index,
438 replay,
439 parent_invocation,
440 child_execution_trace_hook,
441 )
442 .await
443 }
444
445 pub(crate) async fn prepare_tool_call(
446 &self,
447 pending: crate::sansio::PendingToolCall,
448 ) -> ToolPreparationOutcome {
449 let call_id = Some(pending.call_id.clone());
450 prepare_tool_call_with_context(self.dispatch.as_ref(), pending, call_id).await
451 }
452
453 pub(crate) async fn execute_prepared_tool_call_launch(
454 &self,
455 prepared: crate::PreparedToolCall,
456 index: usize,
457 parent_invocation: Option<crate::RuntimeInvocation>,
458 child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
459 ) -> crate::runtime::ToolCallLaunch {
460 match Box::pin(self.execute_prepared_tool_call_launch_inner(
461 prepared,
462 index,
463 parent_invocation,
464 child_execution_trace_hook,
465 ))
466 .await
467 {
468 ProtocolToolCallLaunch::Done(completed) => crate::runtime::ToolCallLaunch::Done {
469 result: completed.completed,
470 },
471 ProtocolToolCallLaunch::Pending(pending) => crate::runtime::ToolCallLaunch::Pending {
472 key: pending.key,
473 pending: pending.pending,
474 duration_ms: pending.duration_ms,
475 },
476 }
477 }
478
479 async fn execute_prepared_tool_call_launch_inner(
480 &self,
481 prepared: crate::PreparedToolCall,
482 index: usize,
483 parent_invocation: Option<crate::RuntimeInvocation>,
484 child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
485 ) -> ProtocolToolCallLaunch {
486 let call_id = prepared.call_id.clone();
487 let name = prepared.tool_name.clone();
488 let args = prepared.args.clone();
489 let replay = prepared.replay.clone();
490 let parent_invocation = parent_invocation.or_else(|| self.parent_invocation.clone());
491 let run = self.prepared_tool_run(prepared, index, parent_invocation);
492 let prepared = run.prepared.clone();
493 let _ = self
494 .dispatch
495 .event_tx
496 .send(SessionEvent::ToolCallStart {
497 call_id: Some(call_id.clone()),
498 name: name.clone(),
499 args: args.clone(),
500 })
501 .await;
502 let tool_correlation_id = run.activity_id.clone();
503 self.emit_turn_activity(
504 tool_correlation_id.clone(),
505 TurnEvent::ToolCallStarted {
506 call_id: Some(call_id.clone()),
507 name: name.clone(),
508 args: args.clone(),
509 },
510 )
511 .await;
512
513 let runtime_context = if let Some(parent_invocation) = run.parent_invocation.clone() {
514 self.clone().with_parent_invocation(parent_invocation)
515 } else {
516 self.clone()
517 };
518 let mut tool_context =
519 crate::ToolContext::from_dispatch(std::sync::Arc::clone(&self.dispatch))
520 .runtime_execution_context(runtime_context)
521 .prepared_call(&prepared)
522 .cancellation_token(self.cancellation_token.clone())
523 .runtime_process_id(self.runtime_process_id.clone())
524 .parent_invocation(run.parent_invocation.clone())
525 .child_execution_trace_hook(child_execution_trace_hook);
526 if let Some(process_events) = self.process_event_context.as_ref() {
527 tool_context = tool_context.process_events(
528 process_events.process_id.clone(),
529 std::sync::Arc::clone(&process_events.registry),
530 process_events.store.clone(),
531 process_events.session_store_factory.clone(),
532 process_events.queued_work_driver.clone(),
533 );
534 }
535 let tool_context = tool_context.build();
536 let outcome = Box::pin(dispatch_prepared_tool_call_launch_with_execution_context(
537 self.dispatch.as_ref(),
538 prepared,
539 None,
540 tool_context,
541 ))
542 .await;
543 match outcome {
544 ToolCallLaunch::Done(mut outcome) => {
545 outcome.record.call_id = Some(call_id.clone());
546 tokio::task::yield_now().await;
547 let completed = self
548 .complete_tool_call(run.index, call_id, replay, outcome, tool_correlation_id)
549 .await;
550 ProtocolToolCallLaunch::Done(completed)
551 }
552 ToolCallLaunch::Pending(pending) => ProtocolToolCallLaunch::Pending(pending),
553 }
554 }
555
556 pub(super) async fn await_process_with_cancellation(
557 &self,
558 process_id: &str,
559 parent_invocation: Option<crate::RuntimeInvocation>,
560 cancellation: Option<tokio_util::sync::CancellationToken>,
561 ) -> Result<crate::ProcessAwaitOutput, crate::PluginError> {
562 let _phase = self.named_phase("process.await_handle");
563 if let Some(cancellation) = cancellation {
564 tokio::select! {
565 result = self.dispatch.processes.await_process(
566 process_id,
567 self.process_scope(parent_invocation.clone()),
568 ) => result,
569 _ = cancellation.cancelled() => {
570 let _ = self.dispatch.processes.cancel(
571 &self.dispatch.session_id,
572 process_id,
573 self.process_scope(parent_invocation.clone()),
574 ).await;
575 self.dispatch.processes.await_process(
576 process_id,
577 self.process_scope(parent_invocation),
578 ).await
579 }
580 }
581 } else {
582 self.dispatch
583 .processes
584 .await_process(process_id, self.process_scope(parent_invocation))
585 .await
586 }
587 }
588
589 pub(crate) async fn complete_tool_call(
590 &self,
591 _index: usize,
592 call_id: String,
593 replay: Option<crate::llm::types::ProviderReplayMeta>,
594 outcome: ToolDispatchOutcome,
595 tool_correlation_id: TurnActivityId,
596 ) -> CompletedProtocolToolCall {
597 let output = outcome.record.output.clone();
598 let projection_output = output.clone();
599 let projection_tool_name = outcome.record.tool.clone();
600 let projection_args = outcome.record.args.clone();
601 let projection_duration_ms = outcome.record.duration_ms;
602 let projection_call_id = call_id.clone();
603 tokio::task::yield_now().await;
604 let plugins = std::sync::Arc::clone(&self.dispatch.plugins);
605 let projection_context = crate::plugin::ToolResultProjectionContext {
606 session_id: self.dispatch.session_id.clone(),
607 tool_name: projection_tool_name,
608 args: projection_args,
609 output: projection_output,
610 duration_ms: projection_duration_ms,
611 call_id: projection_call_id,
612 };
613 let model_return = match plugins.project_tool_result(projection_context).await {
614 Ok(projected) => projected,
615 Err(err) => ModelToolReturn::text(
616 call_id.clone(),
617 outcome.record.tool.clone(),
618 err.to_string(),
619 ),
620 };
621
622 self.emit_turn_activity(
623 tool_correlation_id,
624 TurnEvent::ToolCallCompleted {
625 call_id: Some(call_id.clone()),
626 name: outcome.record.tool.clone(),
627 args: outcome.record.args.clone(),
628 output: output.clone(),
629 duration_ms: outcome.record.duration_ms,
630 },
631 )
632 .await;
633
634 let record = ToolCallRecord {
635 call_id: Some(call_id.clone()),
636 tool: outcome.record.tool.clone(),
637 args: outcome.record.args.clone(),
638 output: output.clone(),
639 duration_ms: outcome.record.duration_ms,
640 };
641 CompletedProtocolToolCall {
642 completed: crate::sansio::CompletedToolCall {
643 call_id,
644 tool_name: outcome.record.tool,
645 args: outcome.record.args,
646 output,
647 model_return,
648 duration_ms: outcome.record.duration_ms,
649 replay,
650 },
651 record,
652 }
653 }
654
655 pub(crate) async fn pending_completion_dispatch_outcome(
656 &self,
657 tool_name: String,
658 args: serde_json::Value,
659 resolution: crate::Resolution,
660 duration_ms: u64,
661 ) -> ToolDispatchOutcome {
662 let output = crate::tool_result::tool_output_from_completion_resolution(resolution);
663 let result = finalize_tool_result_with_execution_context(
664 self.dispatch.as_ref(),
665 &tool_name,
666 &args,
667 ToolResult::from_output(output),
668 duration_ms,
669 )
670 .await;
671 let output = result.into_done_output().unwrap_or_else(|_| {
672 ToolCallOutput::failure(ToolFailure::runtime(
673 ToolFailureClass::Internal,
674 "pending_tool_not_finalized",
675 "pending tool result reached a completed-output projection path",
676 ))
677 });
678 ToolDispatchOutcome {
679 record: ToolCallRecord {
680 call_id: None,
681 tool: tool_name,
682 args,
683 output,
684 duration_ms,
685 },
686 }
687 }
688
689 async fn await_pending_tool_dispatch_outcome(
690 &self,
691 call_id: &str,
692 parent_invocation: Option<crate::RuntimeInvocation>,
693 pending: crate::tool_dispatch::PendingToolDispatchOutcome,
694 cancellation: Option<tokio_util::sync::CancellationToken>,
695 ) -> ToolDispatchOutcome {
696 let fallback;
697 let parent = if let Some(parent) = parent_invocation.as_ref() {
698 parent
699 } else {
700 fallback = crate::RuntimeInvocation::effect(
701 crate::RuntimeScope::new(&self.dispatch.session_id),
702 format!("tool:{call_id}:await"),
703 crate::RuntimeEffectKind::AwaitEvent,
704 format!("tool:{call_id}:await"),
705 );
706 &fallback
707 };
708 let parent_effect_id = parent.effect_id().unwrap_or("tool");
709 let invocation = crate::runtime::causal::child_effect_invocation(
710 parent,
711 format!("{parent_effect_id}:{call_id}:await"),
712 crate::RuntimeEffectKind::AwaitEvent,
713 format!("{call_id}:await"),
714 );
715 let cancellation = cancellation.unwrap_or_default();
716 let deadline = pending
717 .pending
718 .deadline
719 .map(|duration| self.dispatch.clock.now() + duration);
720 let outcome = self
721 .dispatch
722 .effect_controller
723 .controller()
724 .execute_effect(
725 crate::RuntimeEffectEnvelope::new(
726 invocation,
727 crate::RuntimeEffectCommand::AwaitEvent { key: pending.key },
728 ),
729 crate::RuntimeEffectLocalExecutor::await_event_with_clock(
730 cancellation,
731 deadline,
732 std::sync::Arc::clone(&self.dispatch.clock),
733 ),
734 )
735 .await;
736 let resolution = match outcome.and_then(crate::RuntimeEffectOutcome::into_await_event) {
737 Ok(resolution) => resolution,
738 Err(err) => {
739 return ToolDispatchOutcome {
740 record: ToolCallRecord {
741 call_id: None,
742 tool: pending.tool_name,
743 args: pending.args,
744 output: ToolCallOutput::failure(ToolFailure::runtime(
745 ToolFailureClass::Internal,
746 "pending_tool_completion_failed",
747 err.to_string(),
748 )),
749 duration_ms: pending.duration_ms,
750 },
751 };
752 }
753 };
754 self.pending_completion_dispatch_outcome(
755 pending.tool_name,
756 pending.args,
757 resolution,
758 pending.duration_ms,
759 )
760 .await
761 }
762
763 pub async fn call_tool_by_id(
764 &self,
765 call_id: String,
766 tool_id: crate::ToolId,
767 args: serde_json::Value,
768 index: usize,
769 ) -> ToolInvocationReply {
770 let executed = self
771 .execute_tool_call_by_id(call_id, tool_id, args, index, None, None, None)
772 .await;
773 let reply = ToolInvocationReply::from_output(executed.completed.output);
774 reply.with_record(executed.record)
775 }
776
777 pub async fn call_tool_by_id_with_child_execution_trace_hook(
778 &self,
779 call_id: String,
780 tool_id: crate::ToolId,
781 args: serde_json::Value,
782 index: usize,
783 trace_hook: crate::ToolChildExecutionTraceHook,
784 ) -> ToolInvocationReply {
785 let executed = self
786 .execute_tool_call_by_id(call_id, tool_id, args, index, None, None, Some(trace_hook))
787 .await;
788 let reply = ToolInvocationReply::from_output(executed.completed.output);
789 reply.with_record(executed.record)
790 }
791
792 pub async fn call_tool_batch(&self, calls: Vec<ToolInvocation>) -> Vec<ToolInvocationReply> {
793 if calls.is_empty() {
794 return Vec::new();
795 }
796
797 let batch_id = deterministic_tool_invocation_batch_id(&calls);
798 let mut replies = vec![None; calls.len()];
799 let mut prepared_entries = Vec::new();
800
801 for (index, call) in calls.into_iter().enumerate() {
802 let Some(manifest) = crate::tool_dispatch::resolve_callable_manifest_by_id(
803 self.dispatch.as_ref(),
804 &call.tool_id,
805 ) else {
806 let outcome = ToolDispatchOutcome {
807 record: ToolCallRecord {
808 call_id: Some(call.id.clone()),
809 tool: call.tool_id.to_string(),
810 args: call.args,
811 output: ToolCallOutput::failure(ToolFailure::runtime(
812 ToolFailureClass::Unavailable,
813 "tool_unavailable",
814 format!("Tool id `{}` is unavailable in this session", call.tool_id),
815 )),
816 duration_ms: 0,
817 },
818 };
819 let completed = self
820 .complete_tool_call(
821 index,
822 call.id,
823 None,
824 outcome,
825 TurnActivityId::new(format!("tool:{}", batch_id)),
826 )
827 .await;
828 replies[index] = Some(
829 ToolInvocationReply::from_output(completed.completed.output)
830 .with_record(completed.record),
831 );
832 continue;
833 };
834
835 let pending = crate::sansio::PendingToolCall {
836 call_id: call.id.clone(),
837 tool_name: manifest.name,
838 args: call.args,
839 replay: None,
840 };
841 match self.prepare_tool_call(pending).await {
842 ToolPreparationOutcome::Prepared(prepared) => {
843 prepared_entries.push((index, prepared, call.child_execution_trace_hook));
844 }
845 ToolPreparationOutcome::Completed(outcome) => {
846 let completed = self
847 .complete_tool_call(
848 index,
849 call.id,
850 None,
851 *outcome,
852 TurnActivityId::new(format!("tool:{}", batch_id)),
853 )
854 .await;
855 replies[index] = Some(
856 ToolInvocationReply::from_output(completed.completed.output)
857 .with_record(completed.record),
858 );
859 }
860 }
861 }
862
863 if !prepared_entries.is_empty() {
864 let invocation = self.tool_batch_invocation(&batch_id);
865 let batch = crate::PreparedToolBatch::new(
866 batch_id.clone(),
867 prepared_entries
868 .iter()
869 .map(|(_, prepared, _)| prepared.clone())
870 .collect(),
871 );
872 let child_trace_hooks = prepared_entries
873 .iter()
874 .filter_map(|(_, prepared, hook)| {
875 hook.clone().map(|hook| (prepared.call_id.clone(), hook))
876 })
877 .collect();
878 let envelope = crate::RuntimeEffectEnvelope::new(
879 invocation.clone(),
880 crate::RuntimeEffectCommand::ToolBatch { batch },
881 );
882 let local_executor =
883 crate::RuntimeEffectLocalExecutor::tool_batch(self.clone(), child_trace_hooks);
884 let raw_outcome = if self.should_execute_child_tool_batch_locally() {
885 local_executor.execute(envelope).await
886 } else {
887 self.dispatch
888 .effect_controller
889 .controller()
890 .execute_effect(envelope, local_executor)
891 .await
892 };
893 let outcome =
894 match raw_outcome.and_then(crate::RuntimeEffectOutcome::into_tool_batch_effect) {
895 Ok(outcome) => outcome,
896 Err(err) => {
897 for (index, prepared, _) in prepared_entries {
898 replies[index] = Some(ToolInvocationReply::error(serde_json::json!(
899 format!("tool batch failed: {err}")
900 )));
901 let _ = prepared;
902 }
903 return replies
904 .into_iter()
905 .map(|reply| reply.expect("every batch reply slot should be filled"))
906 .collect();
907 }
908 };
909 if outcome.launches.len() != prepared_entries.len() {
910 let message = format!(
911 "tool batch returned {} launches for {} prepared calls",
912 outcome.launches.len(),
913 prepared_entries.len()
914 );
915 for (index, _, _) in prepared_entries {
916 replies[index] = Some(ToolInvocationReply::error(serde_json::json!(message)));
917 }
918 } else {
919 for ((index, prepared, _), launch) in
920 prepared_entries.into_iter().zip(outcome.launches)
921 {
922 let call_id = prepared.call_id.clone();
923 let reply = match launch {
924 crate::runtime::ToolCallLaunch::Done { result } => {
925 let record = ToolCallRecord {
926 call_id: Some(result.call_id.clone()),
927 tool: result.tool_name.clone(),
928 args: result.args.clone(),
929 output: result.output.clone(),
930 duration_ms: result.duration_ms,
931 };
932 ToolInvocationReply::from_output(result.output).with_record(record)
933 }
934 crate::runtime::ToolCallLaunch::Pending {
935 key,
936 pending,
937 duration_ms,
938 } => {
939 let dispatch_outcome = self
940 .await_pending_tool_dispatch_outcome(
941 &call_id,
942 Some(invocation.clone()),
943 crate::tool_dispatch::PendingToolDispatchOutcome {
944 tool_name: prepared.tool_name.clone(),
945 args: prepared.args.clone(),
946 key,
947 pending,
948 duration_ms,
949 },
950 self.cancellation_token.clone(),
951 )
952 .await;
953 let completed = self
954 .complete_tool_call(
955 index,
956 call_id.clone(),
957 prepared.replay.clone(),
958 dispatch_outcome,
959 TurnActivityId::new(format!("tool:{call_id}")),
960 )
961 .await;
962 ToolInvocationReply::from_output(completed.completed.output)
963 .with_record(completed.record)
964 }
965 };
966 replies[index] = Some(reply);
967 }
968 }
969 }
970
971 replies
972 .into_iter()
973 .map(|reply| reply.expect("every batch reply slot should be filled"))
974 .collect()
975 }
976
977 pub async fn start_tool_call(
978 &self,
979 call_id: String,
980 name: String,
981 args: serde_json::Value,
982 ) -> ToolInvocationReply {
983 self.start_tool_process(call_id, name, args).await
984 }
985
986 pub async fn await_tool_handle(
987 &self,
988 call_id: String,
989 handle: serde_json::Value,
990 ) -> ToolInvocationReply {
991 self.await_process_handle(call_id, handle).await
992 }
993
994 pub async fn cancel_tool_handle(
995 &self,
996 call_id: String,
997 handle: serde_json::Value,
998 ) -> ToolInvocationReply {
999 self.cancel_process_handle(call_id, handle).await
1000 }
1001
1002 pub async fn signal_tool_handle(
1003 &self,
1004 call_id: String,
1005 handle: serde_json::Value,
1006 signal_name: String,
1007 payload: serde_json::Value,
1008 ) -> ToolInvocationReply {
1009 self.signal_process_handle(call_id, handle, signal_name, payload)
1010 .await
1011 }
1012}