1use super::execution_context::RuntimeExecutionContext;
2use crate::tool_dispatch::{
3 ToolCallLaunch, ToolDispatchOutcome, ToolPreparationOutcome,
4 dispatch_prepared_tool_attempt_launch_with_execution_context,
5 dispatch_prepared_tool_call_launch_with_execution_context,
6 finalize_tool_result_with_execution_context, mark_retry_exhausted,
7 prepare_tool_call_with_context, retry_after_ms, schedule_tool_batch,
8};
9use crate::{
10 ModelToolReturn, SessionEvent, ToolCallOutput, ToolCallRecord, ToolCancellation, ToolFailure,
11 ToolFailureClass, ToolResult, TurnActivityId, TurnEvent,
12};
13use std::collections::HashMap;
14
15#[derive(Clone)]
16pub struct ToolInvocation {
17 pub id: String,
18 pub tool_id: crate::ToolId,
19 pub args: serde_json::Value,
20 pub child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
21}
22
23impl ToolInvocation {
24 pub fn new(id: impl Into<String>, tool_id: crate::ToolId, args: serde_json::Value) -> Self {
25 Self {
26 id: id.into(),
27 tool_id,
28 args,
29 child_execution_trace_hook: None,
30 }
31 }
32
33 pub fn label(&self) -> String {
34 self.tool_id.to_string()
35 }
36
37 pub fn with_child_execution_trace_hook(
38 mut self,
39 hook: crate::ToolChildExecutionTraceHook,
40 ) -> Self {
41 self.child_execution_trace_hook = Some(hook);
42 self
43 }
44}
45
46impl std::fmt::Debug for ToolInvocation {
47 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
48 f.debug_struct("ToolInvocation")
49 .field("id", &self.id)
50 .field("tool_id", &self.tool_id)
51 .field("args", &self.args)
52 .field(
53 "child_execution_trace_hook",
54 &self.child_execution_trace_hook.as_ref().map(|_| "<hook>"),
55 )
56 .finish()
57 }
58}
59
60#[derive(Clone, Debug)]
61pub struct ToolInvocationReply {
62 pub output: ToolCallOutput,
63 pub record: Option<ToolCallRecord>,
64}
65
66impl ToolInvocationReply {
67 pub fn success(value: serde_json::Value) -> Self {
68 Self {
69 output: ToolCallOutput::success(value),
70 record: None,
71 }
72 }
73
74 pub fn error(value: serde_json::Value) -> Self {
75 let message = value
76 .as_str()
77 .map(ToOwned::to_owned)
78 .unwrap_or_else(|| value.to_string());
79 let mut failure = ToolFailure::tool(ToolFailureClass::Execution, "tool_error", message);
80 failure.raw =
81 Some(serde_json::from_value(value).unwrap_or_else(|_| {
82 crate::ToolValue::String("unserializable tool error".to_string())
83 }));
84 Self {
85 output: ToolCallOutput::failure(failure),
86 record: None,
87 }
88 }
89
90 pub fn from_output(output: ToolCallOutput) -> Self {
91 Self {
92 output,
93 record: None,
94 }
95 }
96
97 pub fn cancelled(message: impl Into<String>) -> Self {
98 Self::from_output(ToolCallOutput::cancelled(ToolCancellation::runtime(
99 message,
100 )))
101 }
102
103 pub(crate) fn with_record(mut self, record: ToolCallRecord) -> Self {
104 self.record = Some(record);
105 self
106 }
107}
108
109#[derive(Clone, Debug)]
110pub(crate) struct CompletedProtocolToolCall {
111 pub completed: crate::sansio::CompletedToolCall,
112 pub record: ToolCallRecord,
113}
114
115fn cancelled_runtime_tool_call_launch(
116 call_id: String,
117 tool_name: String,
118 args: serde_json::Value,
119 replay: Option<crate::llm::types::ProviderReplayMeta>,
120) -> crate::runtime::ToolCallLaunch {
121 crate::runtime::ToolCallLaunch::Done {
122 result: cancelled_completed_tool_call(call_id, tool_name, args, replay),
123 }
124}
125
126fn cancelled_completed_tool_call(
127 call_id: String,
128 tool_name: String,
129 args: serde_json::Value,
130 replay: Option<crate::llm::types::ProviderReplayMeta>,
131) -> crate::sansio::CompletedToolCall {
132 let output = ToolCallOutput::cancelled(ToolCancellation::runtime("tool call cancelled"));
133 crate::sansio::CompletedToolCall {
134 call_id: call_id.clone(),
135 tool_name: tool_name.clone(),
136 args,
137 model_return: ModelToolReturn {
138 call_id,
139 tool_name,
140 parts: vec![crate::ModelToolReturnPart::text(
141 "[Tool execution cancelled]\ntool call cancelled".to_string(),
142 )],
143 },
144 output,
145 duration_ms: 0,
146 replay,
147 }
148}
149
150fn runtime_failure_dispatch_outcome(
151 call_id: Option<String>,
152 tool_name: String,
153 args: serde_json::Value,
154 code: impl Into<String>,
155 message: impl Into<String>,
156) -> ToolDispatchOutcome {
157 ToolDispatchOutcome {
158 record: ToolCallRecord {
159 call_id,
160 tool: tool_name,
161 args,
162 output: ToolCallOutput::failure(ToolFailure::runtime(
163 ToolFailureClass::Internal,
164 code,
165 message,
166 )),
167 duration_ms: 0,
168 },
169 }
170}
171
172fn deterministic_tool_invocation_batch_id(calls: &[ToolInvocation]) -> String {
173 let identity = calls
174 .iter()
175 .map(|call| {
176 serde_json::json!({
177 "id": call.id.clone(),
178 "tool_id": call.tool_id.to_string(),
179 "args": call.args.clone(),
180 })
181 })
182 .collect::<Vec<_>>();
183 let digest = crate::stable_hash::stable_json_sha256_hex(&identity)
184 .unwrap_or_else(|_| format!("len-{}", calls.len()));
185 format!("tool-batch:{digest}")
186}
187
188struct CoordinatedToolLaunch {
189 launch: crate::runtime::ToolCallLaunch,
190 triggers: Vec<crate::tool_dispatch::ToolTriggerEffectOutcome>,
191}
192
193impl RuntimeExecutionContext<'_> {
194 fn tool_batch_invocation(&self, batch_id: &str) -> crate::RuntimeInvocation {
195 let suffix = format!("tool-batch:{batch_id}");
196 if let Some(parent) = self.parent_invocation.as_ref() {
197 let parent_effect_id = parent.effect_id().unwrap_or("effect");
198 return crate::runtime::causal::child_effect_invocation(
199 parent,
200 format!("{parent_effect_id}:{suffix}"),
201 crate::RuntimeEffectKind::ToolBatch,
202 suffix,
203 );
204 }
205 let replay_key = format!("{}:{suffix}", self.execution_scope_id());
206 crate::RuntimeInvocation::effect(
207 crate::RuntimeScope::new(self.session_id.clone()),
208 suffix,
209 crate::RuntimeEffectKind::ToolBatch,
210 replay_key,
211 )
212 }
213
214 pub(crate) async fn execute_prepared_tool_batch_launches(
215 &self,
216 batch: crate::PreparedToolBatch,
217 parent_invocation: crate::RuntimeInvocation,
218 child_trace_hooks: HashMap<String, crate::ToolChildExecutionTraceHook>,
219 ) -> Result<crate::ToolBatchEffectOutcome, crate::RuntimeEffectControllerError> {
220 let indexed_tools = batch.calls.into_iter().enumerate().collect::<Vec<_>>();
221 let cancellation = self.cancellation_token.clone().unwrap_or_default();
222 let tool_cancel = cancellation.child_token();
223 let child_trace_hooks = std::sync::Arc::new(child_trace_hooks);
224 if !self
225 .dispatch
226 .effect_controller
227 .controller()
228 .supports_concurrent_effects()
229 {
230 let mut launches = Vec::with_capacity(indexed_tools.len());
231 let mut triggers = Vec::new();
232 let mut context = self.clone().with_cancellation_token(tool_cancel.clone());
233 for (index, child) in indexed_tools {
234 if cancellation.is_cancelled() {
235 tool_cancel.cancel();
236 launches.push(cancelled_runtime_tool_call_launch(
237 child.call.call_id,
238 child.call.tool_name,
239 child.call.args,
240 child.call.replay,
241 ));
242 continue;
243 }
244 let child_execution_trace_hook =
245 child_trace_hooks.get(&child.call.call_id).cloned();
246 let outcome = context
247 .execute_prepared_tool_batch_child(
248 child,
249 index,
250 parent_invocation.clone(),
251 child_execution_trace_hook,
252 )
253 .await;
254 launches.push(outcome.launch);
255 triggers.extend(outcome.triggers);
256 context = context.with_cancellation_token(tool_cancel.clone());
257 }
258 return Ok(crate::ToolBatchEffectOutcome { launches, triggers });
259 }
260 let child_outcomes = schedule_tool_batch(
261 indexed_tools,
262 |(index, _)| *index,
263 |(_, child)| self.tool_scheduling(&child.call.tool_name),
264 {
265 let context = self.clone();
266 let cancellation = cancellation.clone();
267 let tool_cancel = tool_cancel.clone();
268 let child_trace_hooks = std::sync::Arc::clone(&child_trace_hooks);
269 move |(index, child)| {
270 let context = context.clone().with_cancellation_token(tool_cancel.clone());
271 let cancellation = cancellation.clone();
272 let tool_cancel = tool_cancel.clone();
273 let parent_invocation = parent_invocation.clone();
274 let cancelled_tool = child.call.clone();
275 let child_execution_trace_hook =
276 child_trace_hooks.get(&child.call.call_id).cloned();
277 async move {
278 let tool_call = context.execute_prepared_tool_batch_child(
279 child,
280 index,
281 parent_invocation,
282 child_execution_trace_hook,
283 );
284 tokio::pin!(tool_call);
285 tokio::select! {
286 biased;
287 _ = cancellation.cancelled() => {
288 tool_cancel.cancel();
289 let grace = context
290 .dispatch
291 .clock
292 .sleep(std::time::Duration::from_millis(50));
293 tokio::pin!(grace);
294 tokio::select! {
295 biased;
296 outcome = &mut tool_call => outcome,
297 _ = &mut grace => CoordinatedToolLaunch {
298 launch: cancelled_runtime_tool_call_launch(
299 cancelled_tool.call_id,
300 cancelled_tool.tool_name,
301 cancelled_tool.args,
302 cancelled_tool.replay,
303 ),
304 triggers: Vec::new(),
305 },
306 }
307 }
308 outcome = &mut tool_call => outcome,
309 }
310 }
311 }
312 },
313 )
314 .await;
315
316 let mut launches = Vec::with_capacity(child_outcomes.len());
317 let mut triggers = Vec::new();
318 for outcome in child_outcomes {
319 launches.push(outcome.launch);
320 triggers.extend(outcome.triggers);
321 }
322 Ok(crate::ToolBatchEffectOutcome { launches, triggers })
323 }
324
325 async fn execute_prepared_tool_batch_child(
326 &self,
327 child: crate::PreparedToolBatchCall,
328 index: usize,
329 parent_invocation: crate::RuntimeInvocation,
330 child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
331 ) -> CoordinatedToolLaunch {
332 let call_id = child.call.call_id.clone();
333 let tool_name = child.call.tool_name.clone();
334 let args = child.call.args.clone();
335 let replay = child.call.replay.clone();
336 let activity_id = TurnActivityId::new(format!("tool:{call_id}"));
337 self.emit_tool_call_started(&call_id, &tool_name, args.clone(), activity_id.clone())
338 .await;
339
340 let retry_policy = crate::tool_dispatch::resolve_callable_manifest_by_id(
341 self.dispatch.as_ref(),
342 &child.call.tool_id,
343 )
344 .map(|manifest| manifest.retry_policy)
345 .unwrap_or(crate::ToolRetryPolicy::Never);
346 let max_attempts = retry_policy.max_attempts().max(1);
347 let mut triggers = Vec::new();
348
349 for attempt in 1..=max_attempts {
350 let attempt_invocation =
351 self.tool_attempt_invocation(&parent_invocation, &child.replay_suffix, attempt);
352 let attempt_outcome = self
353 .dispatch
354 .effect_controller
355 .controller()
356 .execute_effect(
357 crate::RuntimeEffectEnvelope::new(
358 attempt_invocation,
359 crate::RuntimeEffectCommand::ToolAttempt {
360 call: child.call.clone(),
361 attempt,
362 max_attempts,
363 },
364 ),
365 crate::RuntimeEffectLocalExecutor::tool_batch(
366 self.clone(),
367 child_execution_trace_hook
368 .clone()
369 .map(|hook| {
370 std::iter::once((child.call.call_id.clone(), hook)).collect()
371 })
372 .unwrap_or_default(),
373 ),
374 )
375 .await;
376 let attempt_outcome = match attempt_outcome {
377 Ok(outcome) => match outcome.into_tool_attempt_effect() {
378 Ok(outcome) => outcome,
379 Err(err) => {
380 let completed = self
381 .complete_tool_call(
382 index,
383 call_id.clone(),
384 replay,
385 runtime_failure_dispatch_outcome(
386 Some(call_id.clone()),
387 tool_name,
388 args,
389 "tool_attempt_failed",
390 err.to_string(),
391 ),
392 activity_id,
393 )
394 .await;
395 return CoordinatedToolLaunch {
396 launch: crate::runtime::ToolCallLaunch::Done {
397 result: completed.completed,
398 },
399 triggers,
400 };
401 }
402 },
403 Err(err) => {
404 let completed = self
405 .complete_tool_call(
406 index,
407 call_id.clone(),
408 replay,
409 runtime_failure_dispatch_outcome(
410 Some(call_id.clone()),
411 tool_name,
412 args,
413 "tool_attempt_failed",
414 err.to_string(),
415 ),
416 activity_id,
417 )
418 .await;
419 return CoordinatedToolLaunch {
420 launch: crate::runtime::ToolCallLaunch::Done {
421 result: completed.completed,
422 },
423 triggers,
424 };
425 }
426 };
427 triggers.extend(attempt_outcome.triggers);
428 match attempt_outcome.launch {
429 crate::ToolAttemptLaunch::Pending {
430 key,
431 pending,
432 duration_ms,
433 } => {
434 let dispatch_outcome = self
435 .await_pending_tool_dispatch_outcome_with_suffix(
436 &call_id,
437 Some(parent_invocation.clone()),
438 format!("{}:await", child.replay_suffix),
439 crate::tool_dispatch::PendingToolDispatchOutcome {
440 tool_name: child.call.tool_name.clone(),
441 args: child.call.args.clone(),
442 key,
443 pending,
444 duration_ms,
445 },
446 self.cancellation_token.clone(),
447 )
448 .await;
449 let completed = self
450 .complete_tool_call(
451 index,
452 call_id.clone(),
453 child.call.replay.clone(),
454 dispatch_outcome,
455 activity_id,
456 )
457 .await;
458 return CoordinatedToolLaunch {
459 launch: crate::runtime::ToolCallLaunch::Done {
460 result: completed.completed,
461 },
462 triggers,
463 };
464 }
465 crate::ToolAttemptLaunch::Done { mut record } => {
466 record.call_id = Some(call_id.clone());
467 let retry_after = retry_after_ms(
468 &ToolResult::from_output(record.output.clone()),
469 retry_policy,
470 attempt - 1,
471 );
472 let Some(retry_after) = retry_after else {
473 let completed = self
474 .complete_tool_call(
475 index,
476 call_id.clone(),
477 child.call.replay.clone(),
478 ToolDispatchOutcome { record },
479 activity_id,
480 )
481 .await;
482 return CoordinatedToolLaunch {
483 launch: crate::runtime::ToolCallLaunch::Done {
484 result: completed.completed,
485 },
486 triggers,
487 };
488 };
489 if attempt >= max_attempts {
490 let exhausted =
491 mark_retry_exhausted(ToolResult::from_output(record.output), attempt);
492 record.output = exhausted.into_done_output().unwrap_or_else(|_| {
493 ToolCallOutput::failure(ToolFailure::runtime(
494 ToolFailureClass::Internal,
495 "tool_retry_exhaustion_failed",
496 "retry exhaustion produced a pending output",
497 ))
498 });
499 let completed = self
500 .complete_tool_call(
501 index,
502 call_id.clone(),
503 child.call.replay.clone(),
504 ToolDispatchOutcome { record },
505 activity_id,
506 )
507 .await;
508 return CoordinatedToolLaunch {
509 launch: crate::runtime::ToolCallLaunch::Done {
510 result: completed.completed,
511 },
512 triggers,
513 };
514 }
515 if retry_after > 0
516 && let Err(err) = self
517 .sleep_before_tool_retry(
518 &parent_invocation,
519 &child.replay_suffix,
520 attempt,
521 retry_after,
522 )
523 .await
524 {
525 let completed = self
526 .complete_tool_call(
527 index,
528 call_id.clone(),
529 child.call.replay.clone(),
530 runtime_failure_dispatch_outcome(
531 Some(call_id.clone()),
532 child.call.tool_name.clone(),
533 child.call.args.clone(),
534 "tool_retry_sleep_failed",
535 format!(
536 "retry sleep for tool `{}` failed after attempt {attempt}: {err}",
537 child.call.tool_name
538 ),
539 ),
540 activity_id,
541 )
542 .await;
543 return CoordinatedToolLaunch {
544 launch: crate::runtime::ToolCallLaunch::Done {
545 result: completed.completed,
546 },
547 triggers,
548 };
549 }
550 }
551 }
552 }
553
554 let completed = self
555 .complete_tool_call(
556 index,
557 call_id.clone(),
558 child.call.replay,
559 runtime_failure_dispatch_outcome(
560 Some(call_id),
561 child.call.tool_name,
562 child.call.args,
563 "tool_retry_loop_failed",
564 "tool retry loop exited without a terminal result",
565 ),
566 activity_id,
567 )
568 .await;
569 CoordinatedToolLaunch {
570 launch: crate::runtime::ToolCallLaunch::Done {
571 result: completed.completed,
572 },
573 triggers,
574 }
575 }
576
577 async fn emit_tool_call_started(
578 &self,
579 call_id: &str,
580 name: &str,
581 args: serde_json::Value,
582 activity_id: TurnActivityId,
583 ) {
584 let _ = self
585 .dispatch
586 .event_tx
587 .send(SessionEvent::ToolCallStart {
588 call_id: Some(call_id.to_string()),
589 name: name.to_string(),
590 args: args.clone(),
591 })
592 .await;
593 self.emit_turn_activity(
594 activity_id,
595 TurnEvent::ToolCallStarted {
596 call_id: Some(call_id.to_string()),
597 name: name.to_string(),
598 args,
599 },
600 )
601 .await;
602 }
603
604 fn tool_attempt_invocation(
605 &self,
606 parent_invocation: &crate::RuntimeInvocation,
607 child_replay_suffix: &str,
608 attempt: u32,
609 ) -> crate::RuntimeInvocation {
610 let suffix = format!("{child_replay_suffix}:attempt:{attempt}");
611 let parent_effect_id = parent_invocation.effect_id().unwrap_or("tool-batch");
612 crate::runtime::causal::child_effect_invocation(
613 parent_invocation,
614 format!("{parent_effect_id}:{suffix}"),
615 crate::RuntimeEffectKind::ToolAttempt,
616 suffix,
617 )
618 }
619
620 async fn sleep_before_tool_retry(
621 &self,
622 parent_invocation: &crate::RuntimeInvocation,
623 child_replay_suffix: &str,
624 attempt: u32,
625 retry_after_ms: u64,
626 ) -> Result<(), crate::RuntimeEffectControllerError> {
627 let suffix = format!("{child_replay_suffix}:attempt:{attempt}:sleep");
628 let parent_effect_id = parent_invocation.effect_id().unwrap_or("tool-batch");
629 let invocation = crate::runtime::causal::child_effect_invocation(
630 parent_invocation,
631 format!("{parent_effect_id}:{suffix}"),
632 crate::RuntimeEffectKind::Sleep,
633 suffix,
634 );
635 let cancellation = self.cancellation_token.clone().unwrap_or_default();
636 let outcome = self
637 .dispatch
638 .effect_controller
639 .controller()
640 .execute_effect(
641 crate::RuntimeEffectEnvelope::new(
642 invocation,
643 crate::RuntimeEffectCommand::Sleep {
644 duration_ms: retry_after_ms,
645 },
646 ),
647 crate::RuntimeEffectLocalExecutor::sleep_with_clock(
648 cancellation,
649 std::sync::Arc::clone(&self.dispatch.clock),
650 ),
651 )
652 .await?;
653 match outcome {
654 crate::RuntimeEffectOutcome::Sleep => Ok(()),
655 other => Err(crate::RuntimeEffectControllerError::new(
656 "runtime_effect_wrong_outcome",
657 format!("expected sleep outcome, got {}", other.kind().as_str()),
658 )),
659 }
660 }
661
662 #[expect(
663 clippy::too_many_arguments,
664 reason = "tool execution carries explicit runtime call metadata"
665 )]
666 pub(crate) async fn execute_tool_call(
667 &self,
668 call_id: String,
669 name: String,
670 args: serde_json::Value,
671 index: usize,
672 replay: Option<crate::llm::types::ProviderReplayMeta>,
673 parent_invocation: Option<crate::RuntimeInvocation>,
674 child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
675 ) -> CompletedProtocolToolCall {
676 let _ = self
677 .dispatch
678 .event_tx
679 .send(SessionEvent::ToolCallStart {
680 call_id: Some(call_id.clone()),
681 name: name.clone(),
682 args: args.clone(),
683 })
684 .await;
685 let tool_correlation_id = TurnActivityId::new(format!("tool:{call_id}"));
686 self.emit_turn_activity(
687 tool_correlation_id.clone(),
688 TurnEvent::ToolCallStarted {
689 call_id: Some(call_id.clone()),
690 name: name.clone(),
691 args: args.clone(),
692 },
693 )
694 .await;
695
696 let parent_invocation = parent_invocation.or_else(|| self.parent_invocation.clone());
697 let mut dispatch = (*self.dispatch).clone();
698 dispatch.parent_invocation = parent_invocation.clone();
699 let pending = crate::sansio::PendingToolCall {
700 call_id: call_id.clone(),
701 tool_name: name,
702 args,
703 replay: replay.clone(),
704 };
705 let launch = match prepare_tool_call_with_context(&dispatch, pending, Some(call_id.clone()))
706 .await
707 {
708 ToolPreparationOutcome::Prepared(prepared) => {
709 let dispatch_context = std::sync::Arc::new(dispatch.clone());
710 let runtime_context = if let Some(parent_invocation) = parent_invocation.clone() {
711 self.clone().with_parent_invocation(parent_invocation)
712 } else {
713 self.clone()
714 };
715 let mut tool_context =
716 crate::ToolContext::from_dispatch(std::sync::Arc::clone(&dispatch_context))
717 .runtime_execution_context(runtime_context)
718 .prepared_call(&prepared)
719 .cancellation_token(self.cancellation_token.clone())
720 .runtime_process_id(self.runtime_process_id.clone())
721 .parent_invocation(parent_invocation.clone())
722 .child_execution_trace_hook(child_execution_trace_hook.clone());
723 if let Some(process_events) = self.process_event_context.as_ref() {
724 tool_context = tool_context.process_events(
725 process_events.process_id.clone(),
726 std::sync::Arc::clone(&process_events.registry),
727 process_events.store.clone(),
728 process_events.session_store_factory.clone(),
729 process_events.queued_work_driver.clone(),
730 );
731 }
732 let tool_context = tool_context.build();
733 dispatch_prepared_tool_call_launch_with_execution_context(
734 dispatch_context.as_ref(),
735 prepared,
736 None,
737 tool_context,
738 )
739 .await
740 }
741 ToolPreparationOutcome::Completed(outcome) => ToolCallLaunch::Done(*outcome),
742 };
743 let mut outcome = match launch {
744 ToolCallLaunch::Done(outcome) => outcome,
745 ToolCallLaunch::Pending(pending) => {
746 self.await_pending_tool_dispatch_outcome(
747 &call_id,
748 parent_invocation.clone(),
749 pending,
750 self.cancellation_token.clone(),
751 )
752 .await
753 }
754 };
755 outcome.record.call_id = Some(call_id.clone());
756
757 self.complete_tool_call(index, call_id, replay, outcome, tool_correlation_id)
758 .await
759 }
760
761 #[expect(
762 clippy::too_many_arguments,
763 reason = "tool execution carries explicit runtime call metadata"
764 )]
765 pub(crate) async fn execute_tool_call_by_id(
766 &self,
767 call_id: String,
768 tool_id: crate::ToolId,
769 args: serde_json::Value,
770 index: usize,
771 replay: Option<crate::llm::types::ProviderReplayMeta>,
772 parent_invocation: Option<crate::RuntimeInvocation>,
773 child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
774 ) -> CompletedProtocolToolCall {
775 let Some(manifest) =
776 crate::tool_dispatch::resolve_callable_manifest_by_id(self.dispatch.as_ref(), &tool_id)
777 else {
778 let outcome = ToolDispatchOutcome {
779 record: ToolCallRecord {
780 call_id: Some(call_id.clone()),
781 tool: tool_id.to_string(),
782 args,
783 output: ToolCallOutput::failure(ToolFailure::runtime(
784 ToolFailureClass::Unavailable,
785 "tool_unavailable",
786 format!("Tool id `{tool_id}` is unavailable in this session"),
787 )),
788 duration_ms: 0,
789 },
790 };
791 let activity_id = TurnActivityId::new(format!("tool:{call_id}"));
792 return self
793 .complete_tool_call(index, call_id, replay, outcome, activity_id)
794 .await;
795 };
796 self.execute_tool_call(
797 call_id,
798 manifest.name,
799 args,
800 index,
801 replay,
802 parent_invocation,
803 child_execution_trace_hook,
804 )
805 .await
806 }
807
808 pub(crate) async fn prepare_tool_call(
809 &self,
810 pending: crate::sansio::PendingToolCall,
811 ) -> ToolPreparationOutcome {
812 let call_id = Some(pending.call_id.clone());
813 prepare_tool_call_with_context(self.dispatch.as_ref(), pending, call_id).await
814 }
815
816 pub(crate) async fn execute_prepared_tool_attempt_effect(
817 &self,
818 prepared: crate::PreparedToolCall,
819 attempt: u32,
820 max_attempts: u32,
821 attempt_invocation: crate::RuntimeInvocation,
822 child_execution_trace_hook: Option<crate::ToolChildExecutionTraceHook>,
823 ) -> Result<crate::ToolAttemptEffectOutcome, crate::RuntimeEffectControllerError> {
824 let call_id = prepared.call_id.clone();
825 let mut attempt_dispatch = (*self.dispatch).clone();
826 attempt_dispatch.parent_invocation = Some(attempt_invocation.clone());
827 attempt_dispatch.trigger_outcomes =
828 crate::tool_dispatch::ToolTriggerOutcomeBuffer::default();
829 let attempt_dispatch = std::sync::Arc::new(attempt_dispatch);
830 let mut attempt_context = self.clone();
831 attempt_context.dispatch = std::sync::Arc::clone(&attempt_dispatch);
832 attempt_context.parent_invocation = Some(attempt_invocation.clone());
833
834 let mut tool_context =
835 crate::ToolContext::from_dispatch(std::sync::Arc::clone(&attempt_dispatch))
836 .runtime_execution_context(attempt_context.clone())
837 .prepared_call(&prepared)
838 .cancellation_token(self.cancellation_token.clone())
839 .runtime_process_id(self.runtime_process_id.clone())
840 .parent_invocation(Some(attempt_invocation))
841 .child_execution_trace_hook(child_execution_trace_hook);
842 if let Some(process_events) = self.process_event_context.as_ref() {
843 tool_context = tool_context.process_events(
844 process_events.process_id.clone(),
845 std::sync::Arc::clone(&process_events.registry),
846 process_events.store.clone(),
847 process_events.session_store_factory.clone(),
848 process_events.queued_work_driver.clone(),
849 );
850 }
851 let tool_context = tool_context.build();
852 let launch = match Box::pin(
853 dispatch_prepared_tool_attempt_launch_with_execution_context(
854 attempt_dispatch.as_ref(),
855 prepared,
856 attempt,
857 max_attempts,
858 None,
859 tool_context,
860 ),
861 )
862 .await
863 {
864 ToolCallLaunch::Done(outcome) => {
865 let mut record = outcome.record;
866 record.call_id = Some(call_id);
867 crate::ToolAttemptLaunch::Done { record }
868 }
869 ToolCallLaunch::Pending(pending) => crate::ToolAttemptLaunch::Pending {
870 key: pending.key,
871 pending: pending.pending,
872 duration_ms: pending.duration_ms,
873 },
874 };
875 let triggers = attempt_context
876 .drain_tool_trigger_outcomes()
877 .map_err(|err| {
878 crate::RuntimeEffectControllerError::new(
879 "tool_trigger_outcome_drain",
880 err.to_string(),
881 )
882 })?;
883 Ok(crate::ToolAttemptEffectOutcome { launch, triggers })
884 }
885
886 pub(super) async fn await_process_with_cancellation(
887 &self,
888 process_id: &str,
889 parent_invocation: Option<crate::RuntimeInvocation>,
890 cancellation: Option<tokio_util::sync::CancellationToken>,
891 ) -> Result<crate::ProcessAwaitOutput, crate::PluginError> {
892 let _phase = self.named_phase("process.await_handle");
893 if let Some(cancellation) = cancellation {
894 tokio::select! {
895 result = self.dispatch.processes.await_process(
896 process_id,
897 self.process_scope(parent_invocation.clone()),
898 ) => result,
899 _ = cancellation.cancelled() => {
900 let _ = self.dispatch.processes.cancel(
901 &self.dispatch.session_id,
902 process_id,
903 self.process_scope(parent_invocation.clone()),
904 ).await;
905 self.dispatch.processes.await_process(
906 process_id,
907 self.process_scope(parent_invocation),
908 ).await
909 }
910 }
911 } else {
912 self.dispatch
913 .processes
914 .await_process(process_id, self.process_scope(parent_invocation))
915 .await
916 }
917 }
918
919 pub(crate) async fn complete_tool_call(
920 &self,
921 _index: usize,
922 call_id: String,
923 replay: Option<crate::llm::types::ProviderReplayMeta>,
924 outcome: ToolDispatchOutcome,
925 tool_correlation_id: TurnActivityId,
926 ) -> CompletedProtocolToolCall {
927 let output = outcome.record.output.clone();
928 let projection_output = output.clone();
929 let projection_tool_name = outcome.record.tool.clone();
930 let projection_args = outcome.record.args.clone();
931 let projection_duration_ms = outcome.record.duration_ms;
932 let projection_call_id = call_id.clone();
933 tokio::task::yield_now().await;
934 let plugins = std::sync::Arc::clone(&self.dispatch.plugins);
935 let projection_context = crate::plugin::ToolResultProjectionContext {
936 session_id: self.dispatch.session_id.clone(),
937 tool_name: projection_tool_name,
938 args: projection_args,
939 output: projection_output,
940 duration_ms: projection_duration_ms,
941 call_id: projection_call_id,
942 };
943 let model_return = match plugins.project_tool_result(projection_context).await {
944 Ok(projected) => projected,
945 Err(err) => ModelToolReturn::text(
946 call_id.clone(),
947 outcome.record.tool.clone(),
948 err.to_string(),
949 ),
950 };
951
952 self.emit_turn_activity(
953 tool_correlation_id,
954 TurnEvent::ToolCallCompleted {
955 call_id: Some(call_id.clone()),
956 name: outcome.record.tool.clone(),
957 args: outcome.record.args.clone(),
958 output: output.clone(),
959 duration_ms: outcome.record.duration_ms,
960 },
961 )
962 .await;
963
964 let record = ToolCallRecord {
965 call_id: Some(call_id.clone()),
966 tool: outcome.record.tool.clone(),
967 args: outcome.record.args.clone(),
968 output: output.clone(),
969 duration_ms: outcome.record.duration_ms,
970 };
971 CompletedProtocolToolCall {
972 completed: crate::sansio::CompletedToolCall {
973 call_id,
974 tool_name: outcome.record.tool,
975 args: outcome.record.args,
976 output,
977 model_return,
978 duration_ms: outcome.record.duration_ms,
979 replay,
980 },
981 record,
982 }
983 }
984
985 pub(crate) async fn pending_completion_dispatch_outcome(
986 &self,
987 tool_name: String,
988 args: serde_json::Value,
989 resolution: crate::Resolution,
990 duration_ms: u64,
991 ) -> ToolDispatchOutcome {
992 let output = crate::tool_result::tool_output_from_completion_resolution(resolution);
993 let result = finalize_tool_result_with_execution_context(
994 self.dispatch.as_ref(),
995 &tool_name,
996 &args,
997 ToolResult::from_output(output),
998 duration_ms,
999 )
1000 .await;
1001 let output = result.into_done_output().unwrap_or_else(|_| {
1002 ToolCallOutput::failure(ToolFailure::runtime(
1003 ToolFailureClass::Internal,
1004 "pending_tool_not_finalized",
1005 "pending tool result reached a completed-output projection path",
1006 ))
1007 });
1008 ToolDispatchOutcome {
1009 record: ToolCallRecord {
1010 call_id: None,
1011 tool: tool_name,
1012 args,
1013 output,
1014 duration_ms,
1015 },
1016 }
1017 }
1018
1019 async fn await_pending_tool_dispatch_outcome(
1020 &self,
1021 call_id: &str,
1022 parent_invocation: Option<crate::RuntimeInvocation>,
1023 pending: crate::tool_dispatch::PendingToolDispatchOutcome,
1024 cancellation: Option<tokio_util::sync::CancellationToken>,
1025 ) -> ToolDispatchOutcome {
1026 self.await_pending_tool_dispatch_outcome_with_suffix(
1027 call_id,
1028 parent_invocation,
1029 format!("{call_id}:await"),
1030 pending,
1031 cancellation,
1032 )
1033 .await
1034 }
1035
1036 async fn await_pending_tool_dispatch_outcome_with_suffix(
1037 &self,
1038 call_id: &str,
1039 parent_invocation: Option<crate::RuntimeInvocation>,
1040 replay_suffix: String,
1041 pending: crate::tool_dispatch::PendingToolDispatchOutcome,
1042 cancellation: Option<tokio_util::sync::CancellationToken>,
1043 ) -> ToolDispatchOutcome {
1044 let fallback;
1045 let parent = if let Some(parent) = parent_invocation.as_ref() {
1046 parent
1047 } else {
1048 fallback = crate::RuntimeInvocation::effect(
1049 crate::RuntimeScope::new(&self.dispatch.session_id),
1050 format!("tool:{call_id}:await"),
1051 crate::RuntimeEffectKind::AwaitEvent,
1052 format!("tool:{call_id}:await"),
1053 );
1054 &fallback
1055 };
1056 let parent_effect_id = parent.effect_id().unwrap_or("tool");
1057 let invocation = crate::runtime::causal::child_effect_invocation(
1058 parent,
1059 format!("{parent_effect_id}:{replay_suffix}"),
1060 crate::RuntimeEffectKind::AwaitEvent,
1061 replay_suffix,
1062 );
1063 let cancellation = cancellation.unwrap_or_default();
1064 let deadline = pending
1065 .pending
1066 .deadline
1067 .map(|duration| self.dispatch.clock.now() + duration);
1068 let outcome = self
1069 .dispatch
1070 .effect_controller
1071 .controller()
1072 .execute_effect(
1073 crate::RuntimeEffectEnvelope::new(
1074 invocation,
1075 crate::RuntimeEffectCommand::AwaitEvent { key: pending.key },
1076 ),
1077 crate::RuntimeEffectLocalExecutor::await_event_with_clock(
1078 cancellation,
1079 deadline,
1080 std::sync::Arc::clone(&self.dispatch.clock),
1081 ),
1082 )
1083 .await;
1084 let resolution = match outcome.and_then(crate::RuntimeEffectOutcome::into_await_event) {
1085 Ok(resolution) => resolution,
1086 Err(err) => {
1087 return ToolDispatchOutcome {
1088 record: ToolCallRecord {
1089 call_id: None,
1090 tool: pending.tool_name,
1091 args: pending.args,
1092 output: ToolCallOutput::failure(ToolFailure::runtime(
1093 ToolFailureClass::Internal,
1094 "pending_tool_completion_failed",
1095 err.to_string(),
1096 )),
1097 duration_ms: pending.duration_ms,
1098 },
1099 };
1100 }
1101 };
1102 self.pending_completion_dispatch_outcome(
1103 pending.tool_name,
1104 pending.args,
1105 resolution,
1106 pending.duration_ms,
1107 )
1108 .await
1109 }
1110
1111 pub async fn call_tool_by_id(
1112 &self,
1113 call_id: String,
1114 tool_id: crate::ToolId,
1115 args: serde_json::Value,
1116 index: usize,
1117 ) -> ToolInvocationReply {
1118 let executed = self
1119 .execute_tool_call_by_id(call_id, tool_id, args, index, None, None, None)
1120 .await;
1121 let reply = ToolInvocationReply::from_output(executed.completed.output);
1122 reply.with_record(executed.record)
1123 }
1124
1125 pub async fn call_tool_by_id_with_child_execution_trace_hook(
1126 &self,
1127 call_id: String,
1128 tool_id: crate::ToolId,
1129 args: serde_json::Value,
1130 index: usize,
1131 trace_hook: crate::ToolChildExecutionTraceHook,
1132 ) -> ToolInvocationReply {
1133 let executed = self
1134 .execute_tool_call_by_id(call_id, tool_id, args, index, None, None, Some(trace_hook))
1135 .await;
1136 let reply = ToolInvocationReply::from_output(executed.completed.output);
1137 reply.with_record(executed.record)
1138 }
1139
1140 pub async fn call_tool_batch(&self, calls: Vec<ToolInvocation>) -> Vec<ToolInvocationReply> {
1141 if calls.is_empty() {
1142 return Vec::new();
1143 }
1144
1145 let batch_id = deterministic_tool_invocation_batch_id(&calls);
1146 let mut replies = vec![None; calls.len()];
1147 let mut prepared_entries = Vec::new();
1148
1149 for (index, call) in calls.into_iter().enumerate() {
1150 let Some(manifest) = crate::tool_dispatch::resolve_callable_manifest_by_id(
1151 self.dispatch.as_ref(),
1152 &call.tool_id,
1153 ) else {
1154 let outcome = ToolDispatchOutcome {
1155 record: ToolCallRecord {
1156 call_id: Some(call.id.clone()),
1157 tool: call.tool_id.to_string(),
1158 args: call.args,
1159 output: ToolCallOutput::failure(ToolFailure::runtime(
1160 ToolFailureClass::Unavailable,
1161 "tool_unavailable",
1162 format!("Tool id `{}` is unavailable in this session", call.tool_id),
1163 )),
1164 duration_ms: 0,
1165 },
1166 };
1167 let completed = self
1168 .complete_tool_call(
1169 index,
1170 call.id,
1171 None,
1172 outcome,
1173 TurnActivityId::new(format!("tool:{}", batch_id)),
1174 )
1175 .await;
1176 replies[index] = Some(
1177 ToolInvocationReply::from_output(completed.completed.output)
1178 .with_record(completed.record),
1179 );
1180 continue;
1181 };
1182
1183 let pending = crate::sansio::PendingToolCall {
1184 call_id: call.id.clone(),
1185 tool_name: manifest.name,
1186 args: call.args,
1187 replay: None,
1188 };
1189 match self.prepare_tool_call(pending).await {
1190 ToolPreparationOutcome::Prepared(prepared) => {
1191 prepared_entries.push((index, prepared, call.child_execution_trace_hook));
1192 }
1193 ToolPreparationOutcome::Completed(outcome) => {
1194 let completed = self
1195 .complete_tool_call(
1196 index,
1197 call.id,
1198 None,
1199 *outcome,
1200 TurnActivityId::new(format!("tool:{}", batch_id)),
1201 )
1202 .await;
1203 replies[index] = Some(
1204 ToolInvocationReply::from_output(completed.completed.output)
1205 .with_record(completed.record),
1206 );
1207 }
1208 }
1209 }
1210
1211 if !prepared_entries.is_empty() {
1212 let invocation = self.tool_batch_invocation(&batch_id);
1213 let batch = crate::PreparedToolBatch::new(
1214 batch_id.clone(),
1215 prepared_entries
1216 .iter()
1217 .map(|(_, prepared, _)| prepared.clone())
1218 .collect(),
1219 );
1220 let child_trace_hooks = prepared_entries
1221 .iter()
1222 .filter_map(|(_, prepared, hook)| {
1223 hook.clone().map(|hook| (prepared.call_id.clone(), hook))
1224 })
1225 .collect();
1226 let envelope = crate::RuntimeEffectEnvelope::new(
1227 invocation.clone(),
1228 crate::RuntimeEffectCommand::ToolBatch { batch },
1229 );
1230 let local_executor =
1231 crate::RuntimeEffectLocalExecutor::tool_batch(self.clone(), child_trace_hooks);
1232 let raw_outcome = self
1233 .dispatch
1234 .effect_controller
1235 .controller()
1236 .execute_effect(envelope, local_executor)
1237 .await;
1238 let outcome =
1239 match raw_outcome.and_then(crate::RuntimeEffectOutcome::into_tool_batch_effect) {
1240 Ok(outcome) => outcome,
1241 Err(err) => {
1242 for (index, prepared, _) in prepared_entries {
1243 replies[index] = Some(ToolInvocationReply::error(serde_json::json!(
1244 format!("tool batch failed: {err}")
1245 )));
1246 let _ = prepared;
1247 }
1248 return replies
1249 .into_iter()
1250 .map(|reply| reply.expect("every batch reply slot should be filled"))
1251 .collect();
1252 }
1253 };
1254 if outcome.launches.len() != prepared_entries.len() {
1255 let message = format!(
1256 "tool batch returned {} launches for {} prepared calls",
1257 outcome.launches.len(),
1258 prepared_entries.len()
1259 );
1260 for (index, _, _) in prepared_entries {
1261 replies[index] = Some(ToolInvocationReply::error(serde_json::json!(message)));
1262 }
1263 } else {
1264 for ((index, prepared, _), launch) in
1265 prepared_entries.into_iter().zip(outcome.launches)
1266 {
1267 let call_id = prepared.call_id.clone();
1268 let reply = match launch {
1269 crate::runtime::ToolCallLaunch::Done { result } => {
1270 let record = ToolCallRecord {
1271 call_id: Some(result.call_id.clone()),
1272 tool: result.tool_name.clone(),
1273 args: result.args.clone(),
1274 output: result.output.clone(),
1275 duration_ms: result.duration_ms,
1276 };
1277 ToolInvocationReply::from_output(result.output).with_record(record)
1278 }
1279 crate::runtime::ToolCallLaunch::Pending {
1280 key,
1281 pending,
1282 duration_ms,
1283 } => {
1284 let dispatch_outcome = self
1285 .await_pending_tool_dispatch_outcome(
1286 &call_id,
1287 Some(invocation.clone()),
1288 crate::tool_dispatch::PendingToolDispatchOutcome {
1289 tool_name: prepared.tool_name.clone(),
1290 args: prepared.args.clone(),
1291 key,
1292 pending,
1293 duration_ms,
1294 },
1295 self.cancellation_token.clone(),
1296 )
1297 .await;
1298 let completed = self
1299 .complete_tool_call(
1300 index,
1301 call_id.clone(),
1302 prepared.replay.clone(),
1303 dispatch_outcome,
1304 TurnActivityId::new(format!("tool:{call_id}")),
1305 )
1306 .await;
1307 ToolInvocationReply::from_output(completed.completed.output)
1308 .with_record(completed.record)
1309 }
1310 };
1311 replies[index] = Some(reply);
1312 }
1313 }
1314 }
1315
1316 replies
1317 .into_iter()
1318 .map(|reply| reply.expect("every batch reply slot should be filled"))
1319 .collect()
1320 }
1321
1322 pub async fn start_tool_call(
1323 &self,
1324 call_id: String,
1325 name: String,
1326 args: serde_json::Value,
1327 ) -> ToolInvocationReply {
1328 self.start_tool_process(call_id, name, args).await
1329 }
1330
1331 pub async fn await_tool_handle(
1332 &self,
1333 call_id: String,
1334 handle: serde_json::Value,
1335 ) -> ToolInvocationReply {
1336 self.await_process_handle(call_id, handle).await
1337 }
1338
1339 pub async fn cancel_tool_handle(
1340 &self,
1341 call_id: String,
1342 handle: serde_json::Value,
1343 ) -> ToolInvocationReply {
1344 self.cancel_process_handle(call_id, handle).await
1345 }
1346
1347 pub async fn signal_tool_handle(
1348 &self,
1349 call_id: String,
1350 handle: serde_json::Value,
1351 signal_name: String,
1352 payload: serde_json::Value,
1353 ) -> ToolInvocationReply {
1354 self.signal_process_handle(call_id, handle, signal_name, payload)
1355 .await
1356 }
1357}