1use std::sync::Arc;
23use std::time::{Duration, SystemTime, UNIX_EPOCH};
24
25use serde_json::{json, Value};
26use tokio::sync::mpsc;
27use tokio::time::timeout;
28use tokio_util::sync::CancellationToken;
29
30use crate::config::LoopConfig;
31use crate::error::{LoopError, ToolError};
32use crate::event::{AgentEvent, EventSink};
33use crate::plugin::{AfterToolCallContext, BeforeToolCallContext, EventObserver};
34use crate::tool::{detect_arg_parse_error, AgentTool, ExecutionMode, ToolCall, ToolResult};
35use crate::types::{AgentContext, AgentMessage, AssistantContent, ToolResultContent};
36
37const TOOL_UPDATE_DRAIN_GRACE: Duration = Duration::from_millis(50);
38const TOOL_UPDATE_EVENT_QUEUE_CAPACITY: usize = 256;
39
40fn spawn_tool_update_dispatcher(
41 event_sink: Arc<dyn EventSink>,
42 observers: Vec<Arc<dyn EventObserver>>,
43) -> mpsc::Sender<AgentEvent> {
44 let (tx, mut rx) = mpsc::channel::<AgentEvent>(TOOL_UPDATE_EVENT_QUEUE_CAPACITY);
45 tokio::spawn(async move {
46 while let Some(event) = rx.recv().await {
47 event_sink.emit(event.clone()).await;
48 for observer in observers.iter() {
49 observer.on_event(&event).await;
50 }
51 }
52 });
53 tx
54}
55
56fn enqueue_tool_update_event(tx: &mpsc::Sender<AgentEvent>, event: AgentEvent) {
57 match tx.try_send(event) {
58 Ok(()) => {}
59 Err(mpsc::error::TrySendError::Full(_)) => {
60 tracing::warn!("tool update event queue full; dropping partial update");
61 }
62 Err(mpsc::error::TrySendError::Closed(_)) => {}
63 }
64}
65
66pub(crate) struct ExecutedBatch {
68 pub messages: Vec<AgentMessage>,
70 pub terminate: bool,
73}
74
75pub(crate) async fn execute_tool_batch(
76 assistant: &AgentMessage,
77 tool_calls: Vec<ToolCall>,
78 context: &AgentContext,
79 config: &LoopConfig,
80 signal: &CancellationToken,
81 turn_allowlist: Option<&std::collections::HashSet<String>>,
82) -> Result<ExecutedBatch, LoopError> {
83 if tool_calls.is_empty() {
84 return Ok(ExecutedBatch {
85 messages: Vec::new(),
86 terminate: false,
87 });
88 }
89
90 let mut tool_calls = tool_calls;
95 config
96 .protocol
97 .normalize_tool_calls(&mut tool_calls, &config.tools);
98
99 let total_tool_calls = tool_calls.len();
100 let limit_counted_tool_calls = count_limit_counted_tool_calls(&tool_calls, &config.tools);
101 let (tool_calls, unexecuted_tool_calls, max_executed) =
102 split_tool_calls_for_execution(tool_calls, &config.tools, config.max_tool_calls_per_turn);
103
104 let assistant_content = match assistant {
105 AgentMessage::Assistant { content, .. } => content.clone(),
106 _ => AssistantContent { blocks: Vec::new() },
107 };
108
109 if tool_calls.is_empty() {
110 let messages = synthesize_unexecuted_tool_results(
111 assistant,
112 &assistant_content,
113 unexecuted_tool_calls,
114 total_tool_calls,
115 limit_counted_tool_calls,
116 max_executed.unwrap_or(0),
117 context,
118 config,
119 )
120 .await;
121 return Ok(ExecutedBatch {
122 messages,
123 terminate: false,
124 });
125 }
126
127 let any_exclusive = tool_calls.iter().any(|call| {
131 config
132 .tools
133 .get(&call.name)
134 .map(|t| t.requires_exclusive_sandbox())
135 .unwrap_or(false)
136 });
137
138 let effective_mode =
139 if any_exclusive || config.default_execution_mode == ExecutionMode::Sequential {
140 ExecutionMode::Sequential
141 } else {
142 ExecutionMode::Parallel
143 };
144
145 let mut batch = match effective_mode {
146 ExecutionMode::Sequential => {
147 execute_sequential(
148 assistant,
149 &assistant_content,
150 tool_calls,
151 context,
152 config,
153 signal,
154 turn_allowlist,
155 )
156 .await
157 }
158 ExecutionMode::Parallel => {
159 execute_parallel(
160 assistant,
161 &assistant_content,
162 tool_calls,
163 context,
164 config,
165 signal,
166 turn_allowlist,
167 )
168 .await
169 }
170 }?;
171
172 if !unexecuted_tool_calls.is_empty() {
173 batch.messages.extend(
174 synthesize_unexecuted_tool_results(
175 assistant,
176 &assistant_content,
177 unexecuted_tool_calls,
178 total_tool_calls,
179 limit_counted_tool_calls,
180 max_executed.unwrap_or(0),
181 context,
182 config,
183 )
184 .await,
185 );
186 batch.terminate = false;
187 }
188
189 Ok(batch)
190}
191
192fn split_tool_calls_for_execution(
193 tool_calls: Vec<ToolCall>,
194 tools: &crate::tool::ToolRegistry,
195 max_tool_calls: Option<usize>,
196) -> (Vec<ToolCall>, Vec<ToolCall>, Option<usize>) {
197 let Some(max_tool_calls) = max_tool_calls else {
198 return (tool_calls, Vec::new(), None);
199 };
200 let max_tool_calls = max_tool_calls.max(1);
201 if count_limit_counted_tool_calls(&tool_calls, tools) <= max_tool_calls {
202 return (tool_calls, Vec::new(), Some(max_tool_calls));
203 }
204
205 let mut executable = Vec::with_capacity(tool_calls.len());
206 let mut unexecuted = Vec::new();
207 let mut executed_counted = 0usize;
208 for call in tool_calls {
209 if !tool_counts_toward_call_limit(tools, &call.name) {
210 executable.push(call);
216 } else if executed_counted < max_tool_calls {
217 executed_counted += 1;
218 executable.push(call);
219 } else {
220 unexecuted.push(call);
221 }
222 }
223 (executable, unexecuted, Some(max_tool_calls))
224}
225
226fn count_limit_counted_tool_calls(
227 tool_calls: &[ToolCall],
228 tools: &crate::tool::ToolRegistry,
229) -> usize {
230 tool_calls
231 .iter()
232 .filter(|call| tool_counts_toward_call_limit(tools, &call.name))
233 .count()
234}
235
236fn tool_counts_toward_call_limit(tools: &crate::tool::ToolRegistry, name: &str) -> bool {
263 tools
264 .get(name)
265 .map(|tool| tool.counts_toward_tool_call_limit() && !tool.parallel_safe_per_turn())
266 .unwrap_or(false)
267}
268
269fn tool_counts_toward_termination_vote(tools: &crate::tool::ToolRegistry, name: &str) -> bool {
274 tools
275 .get(name)
276 .map(|tool| tool.counts_toward_termination_vote())
277 .unwrap_or(true)
278}
279
280fn compute_batch_terminate<'a, I>(tools: &crate::tool::ToolRegistry, votes: I) -> bool
296where
297 I: IntoIterator<Item = (&'a str, bool)>,
298{
299 let mut counted_total = 0usize;
300 let mut counted_terminate = 0usize;
301 let mut terminating: Vec<&'a str> = Vec::new();
302 let mut advisory_skipped: Vec<&'a str> = Vec::new();
303 for (name, terminate) in votes {
304 if !tool_counts_toward_termination_vote(tools, name) {
305 advisory_skipped.push(name);
306 continue;
307 }
308 counted_total += 1;
309 if terminate {
310 counted_terminate += 1;
311 terminating.push(name);
312 }
313 }
314 let terminated = counted_total > 0 && counted_terminate == counted_total;
315 if terminated && !advisory_skipped.is_empty() {
316 tracing::info!(
317 terminating_tools = ?terminating,
318 advisory_tools = ?advisory_skipped,
319 counted_total,
320 "advisory siblings excluded from unanimous termination vote"
321 );
322 }
323 terminated
324}
325
326#[allow(clippy::too_many_arguments)]
329async fn synthesize_unexecuted_tool_results(
330 assistant: &AgentMessage,
331 assistant_content: &AssistantContent,
332 tool_calls: Vec<ToolCall>,
333 total_tool_calls: usize,
334 limit_counted_tool_calls: usize,
335 max_executed: usize,
336 context: &AgentContext,
337 config: &LoopConfig,
338) -> Vec<AgentMessage> {
339 let mut messages = Vec::with_capacity(tool_calls.len());
340 for call in tool_calls {
341 emit_tool_start(config, &call).await;
342 let outcome = finalize(
343 assistant,
344 assistant_content,
345 &call,
346 &call.arguments,
347 ExecutedOutcome {
348 result: unexecuted_tool_call_result(
349 total_tool_calls,
350 limit_counted_tool_calls,
351 max_executed,
352 ),
353 is_error: true,
354 },
355 &context.messages,
356 &config.plugins.after_tool_call,
357 )
358 .await;
359 emit_tool_end(config, &call, &outcome).await;
360 messages.push(outcome_to_message(&call, outcome));
361 }
362 messages
363}
364
365fn unexecuted_tool_call_message(
366 total_tool_calls: usize,
367 limit_counted_tool_calls: usize,
368 max_executed: usize,
369) -> String {
370 let call_word = if total_tool_calls == 1 {
371 "tool call"
372 } else {
373 "tool calls"
374 };
375 let limited_call_word = if limit_counted_tool_calls == 1 {
376 "limit-counted tool call"
377 } else {
378 "limit-counted tool calls"
379 };
380 let executed_word = if max_executed == 1 { "call" } else { "calls" };
381 if limit_counted_tool_calls != total_tool_calls {
382 return format!(
383 "This tool call was not executed because the assistant turn emitted \
384 {limit_counted_tool_calls} {limited_call_word} ({total_tool_calls} \
385 {call_word} total, including progress-only calls), but only the \
386 first {max_executed} limit-counted {executed_word} can run in one \
387 turn. The earlier allowed calls already ran. Reissue this call in \
388 a later turn, one tool call at a time."
389 );
390 }
391 format!(
392 "This tool call was not executed because the assistant turn emitted \
393 {total_tool_calls} {call_word}, but only the first {max_executed} \
394 {executed_word} can run in one turn. The earlier {max_executed} \
395 {executed_word} already ran. Reissue this call in a later turn, \
396 one tool call at a time."
397 )
398}
399
400fn unexecuted_tool_call_result(
401 total_tool_calls: usize,
402 limit_counted_tool_calls: usize,
403 max_executed: usize,
404) -> ToolResult {
405 let mut result = ToolResult::error(unexecuted_tool_call_message(
406 total_tool_calls,
407 limit_counted_tool_calls,
408 max_executed,
409 ));
410 result.details = json!({
411 "kind": "tool_call_not_executed",
412 "reason": "max_tool_calls_per_turn",
413 "total_tool_calls": total_tool_calls,
414 "limit_counted_tool_calls": limit_counted_tool_calls,
415 "max_executed": max_executed,
416 });
417 result
418}
419
420#[allow(clippy::too_many_arguments)]
421async fn execute_sequential(
422 assistant: &AgentMessage,
423 assistant_content: &AssistantContent,
424 tool_calls: Vec<ToolCall>,
425 context: &AgentContext,
426 config: &LoopConfig,
427 signal: &CancellationToken,
428 turn_allowlist: Option<&std::collections::HashSet<String>>,
429) -> Result<ExecutedBatch, LoopError> {
430 let mut messages = Vec::with_capacity(tool_calls.len());
431 let mut votes: Vec<(String, bool)> = Vec::with_capacity(tool_calls.len());
432
433 for call in tool_calls {
434 let outcome = run_one(
435 assistant,
436 assistant_content,
437 &call,
438 context,
439 config,
440 signal,
441 turn_allowlist,
442 )
443 .await?;
444 votes.push((call.name.clone(), outcome.terminate));
445 messages.push(outcome_to_message(&call, outcome));
446 }
447
448 let terminate =
449 compute_batch_terminate(&config.tools, votes.iter().map(|(n, t)| (n.as_str(), *t)));
450
451 Ok(ExecutedBatch {
452 messages,
453 terminate,
454 })
455}
456
457#[allow(clippy::too_many_arguments)]
458async fn execute_parallel(
459 assistant: &AgentMessage,
460 assistant_content: &AssistantContent,
461 tool_calls: Vec<ToolCall>,
462 context: &AgentContext,
463 config: &LoopConfig,
464 signal: &CancellationToken,
465 turn_allowlist: Option<&std::collections::HashSet<String>>,
466) -> Result<ExecutedBatch, LoopError> {
467 use futures::stream::{FuturesUnordered, StreamExt};
468
469 let batch_token = signal.child_token();
477
478 let mut prepared: Vec<(ToolCall, PreparedCall)> = Vec::with_capacity(tool_calls.len());
481 for call in tool_calls {
482 emit_tool_start(config, &call).await;
483 let prep = prepare_call(
484 assistant,
485 assistant_content,
486 &call,
487 context,
488 config,
489 turn_allowlist,
490 )
491 .await;
492 prepared.push((call, prep));
493 }
494
495 let mut futures = Vec::with_capacity(prepared.len());
496 let mut immediate: Vec<(usize, ToolCall, FinalizedOutcome)> = Vec::new();
497
498 for (idx, (call, prep)) in prepared.into_iter().enumerate() {
499 match prep {
500 PreparedCall::Immediate(executed) => {
501 let finalized = finalize(
509 assistant,
510 assistant_content,
511 &call,
512 &call.arguments,
513 executed,
514 &context.messages,
515 &config.plugins.after_tool_call,
516 )
517 .await;
518 immediate.push((idx, call, finalized));
519 }
520 PreparedCall::Prepared { tool, args } => {
521 let tool_signal = batch_token.child_token();
522 let run_signal = signal.clone();
523 let batch_token_clone = batch_token.clone();
524 let assistant_clone = assistant.clone();
525 let assistant_content_clone = assistant_content.clone();
526 let context_messages = context.messages.clone();
527 let after_hooks = config.plugins.after_tool_call.clone();
528 let event_sink = config.event_sink.clone();
529 let event_observers = config.plugins.event_observer.clone();
530 let call_clone = call.clone();
531 let fut = async move {
532 let id = call_clone.id.clone();
533 let name = call_clone.name.clone();
534 let name_for_message = name.clone();
535 let update_events = spawn_tool_update_dispatcher(event_sink, event_observers);
536 let executed_result = execute_prepared(
537 &tool,
538 &call_clone,
539 args.clone(),
540 tool_signal,
541 Box::new(move |update| {
542 let event = AgentEvent::ToolExecutionUpdate {
543 tool_call_id: id.clone(),
544 tool_name: name.clone(),
545 partial: update,
546 };
547 enqueue_tool_update_event(&update_events, event);
548 }),
549 )
550 .await;
551 let executed = match executed_result {
552 Ok(executed) => executed,
553 Err(LoopError::Aborted)
554 if batch_token_clone.is_cancelled() && !run_signal.is_cancelled() =>
555 {
556 ExecutedOutcome {
562 result: ToolResult::error(format!(
563 "aborted because a sibling tool in the \
564 parallel batch errored — re-run this \
565 {name_for_message} call after addressing the \
566 sibling failure"
567 )),
568 is_error: true,
569 }
570 }
571 Err(other) => return Err(other),
572 };
573 let finalized = finalize(
574 &assistant_clone,
575 &assistant_content_clone,
576 &call_clone,
577 &args,
578 executed,
579 &context_messages,
580 &after_hooks,
581 )
582 .await;
583 Ok::<_, LoopError>((idx, call_clone, finalized))
584 };
585 futures.push(fut);
586 }
587 }
588 }
589
590 let mut unordered: FuturesUnordered<_> = futures.into_iter().collect();
597 let mut completed: Vec<(usize, ToolCall, FinalizedOutcome)> =
598 Vec::with_capacity(unordered.len() + immediate.len());
599 while let Some(result) = unordered.next().await {
600 let entry = result?;
601 if entry.2.is_error {
602 let aborts = config
603 .tools
604 .get(&entry.1.name)
605 .map(|t| t.aborts_siblings_on_error())
606 .unwrap_or(false);
607 if aborts && !batch_token.is_cancelled() {
608 batch_token.cancel();
609 }
610 }
611 completed.push(entry);
612 }
613 completed.extend(immediate);
614 completed.sort_by_key(|(idx, _, _)| *idx);
615
616 let mut messages = Vec::with_capacity(completed.len());
617 let mut votes: Vec<(String, bool)> = Vec::with_capacity(completed.len());
618 for (_idx, call, outcome) in completed {
619 emit_tool_end(config, &call, &outcome).await;
620 votes.push((call.name.clone(), outcome.terminate));
621 messages.push(outcome_to_message(&call, outcome));
622 }
623
624 let terminate =
625 compute_batch_terminate(&config.tools, votes.iter().map(|(n, t)| (n.as_str(), *t)));
626
627 Ok(ExecutedBatch {
628 messages,
629 terminate,
630 })
631}
632
633#[allow(clippy::too_many_arguments)]
636async fn run_one(
637 assistant: &AgentMessage,
638 assistant_content: &AssistantContent,
639 call: &ToolCall,
640 context: &AgentContext,
641 config: &LoopConfig,
642 signal: &CancellationToken,
643 turn_allowlist: Option<&std::collections::HashSet<String>>,
644) -> Result<FinalizedOutcome, LoopError> {
645 emit_tool_start(config, call).await;
646
647 let prep = prepare_call(
648 assistant,
649 assistant_content,
650 call,
651 context,
652 config,
653 turn_allowlist,
654 )
655 .await;
656 let outcome = match prep {
657 PreparedCall::Immediate(executed) => {
658 finalize(
659 assistant,
660 assistant_content,
661 call,
662 &call.arguments,
663 executed,
664 &context.messages,
665 &config.plugins.after_tool_call,
666 )
667 .await
668 }
669 PreparedCall::Prepared { tool, args } => {
670 let event_sink = config.event_sink.clone();
671 let event_observers = config.plugins.event_observer.clone();
672 let id = call.id.clone();
673 let name = call.name.clone();
674 let update_events = spawn_tool_update_dispatcher(event_sink, event_observers);
675 let executed = execute_prepared(
676 &tool,
677 call,
678 args.clone(),
679 signal.clone(),
680 Box::new(move |update| {
681 let event = AgentEvent::ToolExecutionUpdate {
682 tool_call_id: id.clone(),
683 tool_name: name.clone(),
684 partial: update,
685 };
686 enqueue_tool_update_event(&update_events, event);
687 }),
688 )
689 .await?;
690 finalize(
691 assistant,
692 assistant_content,
693 call,
694 &args,
695 executed,
696 &context.messages,
697 &config.plugins.after_tool_call,
698 )
699 .await
700 }
701 };
702
703 emit_tool_end(config, call, &outcome).await;
704 Ok(outcome)
705}
706
707enum PreparedCall {
710 Immediate(ExecutedOutcome),
716 Prepared {
718 tool: Arc<dyn AgentTool>,
719 args: Value,
720 },
721}
722
723struct ExecutedOutcome {
724 result: ToolResult,
725 is_error: bool,
726}
727
728pub(crate) struct FinalizedOutcome {
729 pub result: ToolResult,
730 pub is_error: bool,
731 pub terminate: bool,
732}
733
734struct GateDenial {
740 reason: String,
741 gate: &'static str,
742}
743
744async fn gate_attributed_denial(
745 tool_name: &str,
746 config: &LoopConfig,
747 messages: &[AgentMessage],
748) -> Option<GateDenial> {
749 let available_tool_names: Vec<&str> = config.tools.iter().map(|t| t.name()).collect();
750 let iteration = messages
751 .iter()
752 .filter(|m| matches!(m, AgentMessage::Assistant { .. }))
753 .count();
754 for gate in &config.plugins.tool_gate {
755 let ctx = crate::plugin::ToolGateContext {
756 iteration,
757 messages,
758 conversation_id: config.conversation_id.as_deref(),
759 available_tool_names: &available_tool_names,
760 };
761 if let Some(reason) = gate.denial_reason(tool_name, ctx).await {
762 return Some(GateDenial {
763 reason,
764 gate: gate.name(),
765 });
766 }
767 }
768 None
769}
770
771async fn prepare_call(
772 assistant: &AgentMessage,
773 assistant_content: &AssistantContent,
774 call: &ToolCall,
775 context: &AgentContext,
776 config: &LoopConfig,
777 turn_allowlist: Option<&std::collections::HashSet<String>>,
778) -> PreparedCall {
779 let Some(tool) = config.tools.get(&call.name) else {
780 return PreparedCall::Immediate(ExecutedOutcome {
781 result: ToolResult::error(format!("Tool `{}` not found", call.name)),
782 is_error: true,
783 });
784 };
785
786 if let Some(allowlist) = turn_allowlist {
801 if !allowlist.contains(call.name.as_str()) {
802 let attributed = gate_attributed_denial(&call.name, config, &context.messages).await;
803 let (message, details) =
804 match attributed {
805 Some(denial) => {
806 let details = crate::protocol::generic_hidden_tool_details(
807 &call.name,
808 allowlist,
809 Some(denial.gate),
810 );
811 (denial.reason, details)
812 }
813 None => match config.protocol.hidden_tool_error(
814 crate::protocol::HiddenToolContext {
815 requested_tool: &call.name,
816 allowlist,
817 messages: &context.messages,
818 },
819 ) {
820 Some(err) => (err.message, err.details),
821 None => (
822 crate::protocol::generic_hidden_tool_message(&call.name, allowlist),
823 crate::protocol::generic_hidden_tool_details(
824 &call.name, allowlist, None,
825 ),
826 ),
827 },
828 };
829 let mut result = ToolResult::error(message);
830 result.details = details;
831 return PreparedCall::Immediate(ExecutedOutcome {
832 result,
833 is_error: true,
834 });
835 }
836 }
837
838 if let Some((parse_err, raw)) = detect_arg_parse_error(&call.arguments) {
845 return PreparedCall::Immediate(ExecutedOutcome {
846 result: ToolResult::error(format_arg_parse_error(&call.name, parse_err, raw)),
847 is_error: true,
848 });
849 }
850
851 let prepared_args = tool.prepare_arguments(call.arguments.clone());
852
853 if let Err(err) = tool.validate(&prepared_args) {
854 return PreparedCall::Immediate(ExecutedOutcome {
855 result: ToolResult::error(err.to_string()),
856 is_error: true,
857 });
858 }
859
860 let ctx = BeforeToolCallContext {
861 assistant_message: assistant,
862 assistant_content,
863 tool_call: call,
864 args: &prepared_args,
865 messages: &context.messages,
866 };
867 for hook in &config.plugins.before_tool_call {
868 let decision = hook
869 .on_before_tool_call(BeforeToolCallContext {
870 assistant_message: ctx.assistant_message,
871 assistant_content: ctx.assistant_content,
872 tool_call: ctx.tool_call,
873 args: ctx.args,
874 messages: ctx.messages,
875 })
876 .await;
877 if decision.block {
878 let reason = decision
879 .reason
880 .unwrap_or_else(|| format!("blocked by {}", hook.name()));
881 let mut result = ToolResult::error(reason);
882 if let Some(details) = decision.details {
883 result.details = details;
884 }
885 return PreparedCall::Immediate(ExecutedOutcome {
886 result,
887 is_error: true,
888 });
889 }
890 }
891
892 PreparedCall::Prepared {
893 tool,
894 args: prepared_args,
895 }
896}
897
898async fn execute_prepared(
899 tool: &Arc<dyn AgentTool>,
900 call: &ToolCall,
901 args: Value,
902 signal: CancellationToken,
903 on_update: Box<dyn Fn(ToolResult) + Send + Sync + 'static>,
904) -> Result<ExecutedOutcome, LoopError> {
905 let (tx, mut rx) = mpsc::unbounded_channel::<ToolResult>();
906
907 let mut drain_handle = tokio::spawn(async move {
909 while let Some(partial) = rx.recv().await {
910 on_update(partial);
911 }
912 });
913
914 let result = match tool.execute(&call.id, args, signal, tx).await {
915 Ok(result) => {
916 let is_error = result.is_error;
917 Ok(ExecutedOutcome { result, is_error })
918 }
919 Err(ToolError::Execution(reason)) => Ok(ExecutedOutcome {
920 result: ToolResult::error(ToolError::Execution(reason).to_string()),
921 is_error: true,
922 }),
923 Err(ToolError::Aborted) => Err(LoopError::Aborted),
924 Err(ToolError::Fatal(reason)) => Err(LoopError::ToolFatal {
925 tool: call.name.clone(),
926 reason,
927 }),
928 };
929
930 match timeout(TOOL_UPDATE_DRAIN_GRACE, &mut drain_handle).await {
931 Ok(joined) => {
932 if let Err(error) = joined {
933 tracing::debug!(?error, "tool update dispatcher join failed");
934 }
935 }
936 Err(_) => {
937 drain_handle.abort();
938 if let Err(error) = drain_handle.await {
939 tracing::debug!(?error, "aborted tool update dispatcher");
940 }
941 }
942 }
943 result
944}
945
946#[allow(clippy::too_many_arguments)]
947async fn finalize(
948 assistant: &AgentMessage,
949 _assistant_content: &AssistantContent,
950 call: &ToolCall,
951 args: &Value,
952 mut executed: ExecutedOutcome,
953 messages: &[AgentMessage],
954 after_hooks: &[Arc<dyn crate::plugin::AfterToolCall>],
955) -> FinalizedOutcome {
956 for hook in after_hooks {
957 let ctx = AfterToolCallContext {
958 assistant_message: assistant,
959 tool_call: call,
960 args,
961 result: &executed.result,
962 is_error: executed.is_error,
963 messages,
964 };
965 let decision = hook.on_after_tool_call(ctx).await;
966 if let Some(new_result) = decision.result {
967 executed.is_error = new_result.is_error;
968 executed.result = new_result;
969 }
970 if let Some(mark_error) = decision.mark_error {
971 executed.is_error = mark_error;
972 executed.result.is_error = mark_error;
973 }
974 if let Some(terminate) = decision.terminate {
975 executed.result.terminate = terminate;
976 }
977 }
978
979 FinalizedOutcome {
980 result: executed.result,
981 is_error: executed.is_error,
982 terminate: false,
983 }
984 .with_vote()
987}
988
989impl FinalizedOutcome {
990 fn with_vote(mut self) -> Self {
991 self.terminate = self.result.terminate;
992 self
993 }
994}
995
996fn outcome_to_message(call: &ToolCall, outcome: FinalizedOutcome) -> AgentMessage {
997 let details = match outcome.result.details {
998 serde_json::Value::Null => None,
999 other => Some(other),
1000 };
1001 let message = AgentMessage::ToolResult {
1002 tool_call_id: call.id.clone(),
1003 tool_name: call.name.clone(),
1004 content: ToolResultContent {
1005 blocks: outcome.result.content,
1006 },
1007 is_error: outcome.is_error,
1008 narration: outcome.result.narration,
1014 details,
1019 timestamp: Some(now_ms()),
1020 };
1021 if let AgentMessage::ToolResult {
1029 content,
1030 is_error,
1031 tool_call_id,
1032 tool_name,
1033 ..
1034 } = &message
1035 {
1036 let plain = content.plain_text();
1037 let (head, tail) = head_tail_for_log(&plain);
1038 tracing::debug!(
1039 target: "clark_agent::exec::tool_result_built",
1040 tool_call_id = %tool_call_id,
1041 tool_name = %tool_name,
1042 is_error = *is_error,
1043 content_len = plain.len(),
1044 content_head = %head,
1045 content_tail = %tail,
1046 "outcome_to_message wrote ToolResult into messages"
1047 );
1048 }
1049 message
1050}
1051
1052const TOOL_RESULT_LOG_HEAD: usize = 200;
1053const TOOL_RESULT_LOG_TAIL: usize = 200;
1054
1055fn head_tail_for_log(text: &str) -> (String, String) {
1060 if text.len() <= TOOL_RESULT_LOG_HEAD + TOOL_RESULT_LOG_TAIL {
1061 return (text.to_string(), String::new());
1062 }
1063 let head_end = char_boundary_at_or_before(text, TOOL_RESULT_LOG_HEAD);
1064 let tail_start = char_boundary_at_or_after(text, text.len() - TOOL_RESULT_LOG_TAIL);
1065 (text[..head_end].to_string(), text[tail_start..].to_string())
1066}
1067
1068fn char_boundary_at_or_before(text: &str, mut idx: usize) -> usize {
1069 if idx >= text.len() {
1070 return text.len();
1071 }
1072 while idx > 0 && !text.is_char_boundary(idx) {
1073 idx -= 1;
1074 }
1075 idx
1076}
1077
1078fn char_boundary_at_or_after(text: &str, mut idx: usize) -> usize {
1079 if idx >= text.len() {
1080 return text.len();
1081 }
1082 while idx < text.len() && !text.is_char_boundary(idx) {
1083 idx += 1;
1084 }
1085 idx
1086}
1087
1088fn now_ms() -> u64 {
1089 SystemTime::now()
1090 .duration_since(UNIX_EPOCH)
1091 .map(|d| d.as_millis() as u64)
1092 .unwrap_or(0)
1093}
1094
1095async fn emit_tool_start(config: &LoopConfig, call: &ToolCall) {
1096 let event = AgentEvent::ToolExecutionStart {
1097 tool_call_id: call.id.clone(),
1098 tool_name: call.name.clone(),
1099 args: call.arguments.clone(),
1100 };
1101 config.event_sink.emit(event.clone()).await;
1102 for o in &config.plugins.event_observer {
1103 o.on_event(&event).await;
1104 }
1105}
1106
1107fn format_arg_parse_error(tool_name: &str, parse_err: &str, raw: &str) -> String {
1113 const RAW_MAX: usize = 1024;
1114 let raw_snippet = if raw.len() > RAW_MAX {
1115 format!(
1116 "{}…<{} bytes truncated>",
1117 &raw[..RAW_MAX],
1118 raw.len() - RAW_MAX
1119 )
1120 } else {
1121 raw.to_string()
1122 };
1123 format!(
1124 "Tool `{tool_name}` arguments were not valid JSON: {parse_err}. \
1125 You sent (raw): {raw_snippet}. \
1126 Re-emit the call with a JSON object matching the tool's schema; \
1127 this is a syntax error in your tool-call arguments, not a problem \
1128 with the file or the runtime."
1129 )
1130}
1131
1132async fn emit_tool_end(config: &LoopConfig, call: &ToolCall, outcome: &FinalizedOutcome) {
1133 let event = AgentEvent::ToolExecutionEnd {
1134 tool_call_id: call.id.clone(),
1135 tool_name: call.name.clone(),
1136 result: outcome.result.clone(),
1137 is_error: outcome.is_error,
1138 };
1139 config.event_sink.emit(event.clone()).await;
1140 for o in &config.plugins.event_observer {
1141 o.on_event(&event).await;
1142 }
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147 use super::*;
1148 use crate::ToolResultBlock;
1149 use std::sync::Arc;
1150
1151 struct LimitTool {
1152 name: &'static str,
1153 counts: bool,
1154 vote_counts: bool,
1155 parallel_safe: bool,
1156 }
1157
1158 #[async_trait::async_trait]
1159 impl AgentTool for LimitTool {
1160 fn name(&self) -> &str {
1161 self.name
1162 }
1163
1164 fn description(&self) -> &str {
1165 "test tool"
1166 }
1167
1168 fn parameters_schema(&self) -> Value {
1169 json!({"type": "object"})
1170 }
1171
1172 fn counts_toward_tool_call_limit(&self) -> bool {
1173 self.counts
1174 }
1175
1176 fn parallel_safe_per_turn(&self) -> bool {
1177 self.parallel_safe
1178 }
1179
1180 fn counts_toward_termination_vote(&self) -> bool {
1181 self.vote_counts
1182 }
1183
1184 async fn execute(
1185 &self,
1186 _call_id: &str,
1187 _args: Value,
1188 _signal: CancellationToken,
1189 _update: mpsc::UnboundedSender<ToolResult>,
1190 ) -> Result<ToolResult, ToolError> {
1191 unreachable!("split tests do not execute tools")
1192 }
1193 }
1194
1195 fn registry() -> crate::tool::ToolRegistry {
1196 crate::tool::ToolRegistry::new()
1200 .with(Arc::new(LimitTool {
1201 name: "message_info",
1202 counts: false,
1203 vote_counts: false,
1204 parallel_safe: false,
1205 }))
1206 .with(Arc::new(LimitTool {
1207 name: "browser_navigate",
1208 counts: true,
1209 vote_counts: true,
1210 parallel_safe: true,
1211 }))
1212 .with(Arc::new(LimitTool {
1213 name: "browser_capture",
1214 counts: true,
1215 vote_counts: true,
1216 parallel_safe: true,
1217 }))
1218 .with(Arc::new(LimitTool {
1219 name: "browser_inspect",
1220 counts: true,
1221 vote_counts: true,
1222 parallel_safe: true,
1223 }))
1224 .with(Arc::new(LimitTool {
1225 name: "shell",
1226 counts: true,
1227 vote_counts: true,
1228 parallel_safe: false,
1229 }))
1230 .with(Arc::new(LimitTool {
1231 name: "message_result",
1232 counts: true,
1233 vote_counts: true,
1234 parallel_safe: false,
1235 }))
1236 .with(Arc::new(LimitTool {
1237 name: "message_ask",
1238 counts: true,
1239 vote_counts: true,
1240 parallel_safe: false,
1241 }))
1242 .with(Arc::new(LimitTool {
1243 name: "web_search",
1244 counts: true,
1245 vote_counts: true,
1246 parallel_safe: true,
1247 }))
1248 .with(Arc::new(LimitTool {
1249 name: "file_read",
1250 counts: true,
1251 vote_counts: true,
1252 parallel_safe: true,
1253 }))
1254 }
1255
1256 fn call(name: &str) -> ToolCall {
1257 ToolCall {
1258 id: format!("tc-{name}"),
1259 name: name.to_string(),
1260 arguments: Value::Null,
1261 }
1262 }
1263
1264 fn names(calls: &[ToolCall]) -> Vec<&str> {
1265 calls.iter().map(|call| call.name.as_str()).collect()
1266 }
1267
1268 #[test]
1269 fn progress_only_tools_do_not_starve_first_work_tool() {
1270 let registry = registry();
1271 let (executable, unexecuted, max) = split_tool_calls_for_execution(
1272 vec![call("message_info"), call("browser_navigate")],
1273 ®istry,
1274 Some(1),
1275 );
1276
1277 assert_eq!(max, Some(1));
1278 assert_eq!(names(&executable), vec!["message_info", "browser_navigate"]);
1279 assert!(unexecuted.is_empty());
1280 }
1281
1282 #[test]
1283 fn extra_limit_counted_tools_still_get_synthetic_errors() {
1284 let registry = registry();
1285 let (executable, unexecuted, max) = split_tool_calls_for_execution(
1286 vec![call("message_info"), call("shell"), call("message_result")],
1287 ®istry,
1288 Some(1),
1289 );
1290
1291 assert_eq!(max, Some(1));
1292 assert_eq!(names(&executable), vec!["message_info", "shell"]);
1293 assert_eq!(names(&unexecuted), vec!["message_result"]);
1294 }
1295
1296 #[test]
1297 fn parallel_safe_reads_do_not_burn_the_per_turn_cap() {
1298 let registry = registry();
1303 let (executable, unexecuted, max) = split_tool_calls_for_execution(
1304 vec![
1305 call("web_search"),
1306 call("web_search"),
1307 call("browser_navigate"),
1308 ],
1309 ®istry,
1310 Some(1),
1311 );
1312
1313 assert_eq!(max, Some(1));
1314 assert_eq!(
1315 names(&executable),
1316 vec!["web_search", "web_search", "browser_navigate"]
1317 );
1318 assert!(
1319 unexecuted.is_empty(),
1320 "unexecuted: {:?}",
1321 names(&unexecuted)
1322 );
1323 }
1324
1325 #[test]
1326 fn parallel_safe_reads_do_not_compete_with_a_write_for_the_cap() {
1327 let registry = registry();
1330 let (executable, unexecuted, max) = split_tool_calls_for_execution(
1331 vec![
1332 call("file_read"),
1333 call("file_read"),
1334 call("shell"),
1335 call("shell"),
1336 ],
1337 ®istry,
1338 Some(1),
1339 );
1340
1341 assert_eq!(max, Some(1));
1342 assert_eq!(names(&executable), vec!["file_read", "file_read", "shell"]);
1343 assert_eq!(names(&unexecuted), vec!["shell"]);
1344 }
1345
1346 #[test]
1347 fn browser_tools_do_not_burn_the_per_turn_cap() {
1348 let registry = registry();
1355 let (executable, unexecuted, max) = split_tool_calls_for_execution(
1356 vec![
1357 call("browser_navigate"),
1358 call("browser_navigate"),
1359 call("browser_capture"),
1360 call("browser_inspect"),
1361 call("shell"),
1362 ],
1363 ®istry,
1364 Some(1),
1365 );
1366
1367 assert_eq!(max, Some(1));
1368 assert_eq!(
1369 names(&executable),
1370 vec![
1371 "browser_navigate",
1372 "browser_navigate",
1373 "browser_capture",
1374 "browser_inspect",
1375 "shell",
1376 ]
1377 );
1378 assert!(
1379 unexecuted.is_empty(),
1380 "unexecuted: {:?}",
1381 names(&unexecuted)
1382 );
1383 }
1384
1385 #[test]
1386 fn malformed_calls_do_not_burn_the_cap_or_preempt_real_work() {
1387 let registry = registry();
1395
1396 let (executable, unexecuted, _) = split_tool_calls_for_execution(
1398 vec![call("missing"), call("shell")],
1399 ®istry,
1400 Some(1),
1401 );
1402 assert_eq!(names(&executable), vec!["missing", "shell"]);
1403 assert!(
1404 unexecuted.is_empty(),
1405 "real work must not be preempted by an unknown name: {:?}",
1406 names(&unexecuted)
1407 );
1408
1409 let (executable, unexecuted, _) =
1411 split_tool_calls_for_execution(vec![call(""), call("shell")], ®istry, Some(1));
1412 assert_eq!(names(&executable), vec!["", "shell"]);
1413 assert!(
1414 unexecuted.is_empty(),
1415 "empty-name glitch must not preempt real work: {:?}",
1416 names(&unexecuted)
1417 );
1418
1419 let (executable, unexecuted, _) =
1421 split_tool_calls_for_execution(vec![call("shell"), call("shell")], ®istry, Some(1));
1422 assert_eq!(names(&executable), vec!["shell"]);
1423 assert_eq!(names(&unexecuted), vec!["shell"]);
1424 }
1425
1426 #[test]
1427 fn compute_batch_terminate_passes_when_only_advisory_siblings_dont_vote() {
1428 let registry = registry();
1436 let votes = [("message_result", true), ("message_info", false)];
1437 assert!(compute_batch_terminate(
1438 ®istry,
1439 votes.iter().map(|(n, t)| (*n, *t))
1440 ));
1441 }
1442
1443 #[test]
1444 fn compute_batch_terminate_fails_when_any_counted_tool_did_not_vote_terminate() {
1445 let registry = registry();
1446 let votes = [("message_result", true), ("shell", false)];
1449 assert!(!compute_batch_terminate(
1450 ®istry,
1451 votes.iter().map(|(n, t)| (*n, *t))
1452 ));
1453 }
1454
1455 #[test]
1456 fn compute_batch_terminate_returns_false_for_all_advisory_batches() {
1457 let registry = registry();
1461 let votes = [("message_info", false), ("message_info", false)];
1462 assert!(!compute_batch_terminate(
1463 ®istry,
1464 votes.iter().map(|(n, t)| (*n, *t))
1465 ));
1466 }
1467
1468 #[test]
1469 fn compute_batch_terminate_returns_false_for_empty_batch() {
1470 let registry = registry();
1471 let votes: Vec<(&str, bool)> = Vec::new();
1472 assert!(!compute_batch_terminate(®istry, votes.into_iter()));
1473 }
1474
1475 #[test]
1476 fn compute_batch_terminate_treats_unknown_tools_as_counted() {
1477 let registry = registry();
1481 let votes = [("message_result", true), ("ghost_tool", false)];
1484 assert!(!compute_batch_terminate(
1485 ®istry,
1486 votes.iter().map(|(n, t)| (*n, *t))
1487 ));
1488
1489 let votes = [("message_result", true), ("ghost_tool", true)];
1493 assert!(compute_batch_terminate(
1494 ®istry,
1495 votes.iter().map(|(n, t)| (*n, *t))
1496 ));
1497 }
1498
1499 #[test]
1500 fn compute_batch_terminate_passes_when_message_ask_is_only_counted_terminator() {
1501 let registry = registry();
1504 let votes = [("message_ask", true), ("message_info", false)];
1505 assert!(compute_batch_terminate(
1506 ®istry,
1507 votes.iter().map(|(n, t)| (*n, *t))
1508 ));
1509 }
1510
1511 #[test]
1512 fn head_tail_for_log_returns_full_text_when_short() {
1513 let (head, tail) = head_tail_for_log("hello");
1518 assert_eq!(head, "hello");
1519 assert_eq!(tail, "");
1520 }
1521
1522 #[test]
1523 fn head_tail_for_log_truncates_long_text_with_head_and_tail() {
1524 let payload: String = "abc".repeat(500);
1525 assert!(payload.len() > TOOL_RESULT_LOG_HEAD + TOOL_RESULT_LOG_TAIL);
1526 let (head, tail) = head_tail_for_log(&payload);
1527 assert_eq!(head.len(), TOOL_RESULT_LOG_HEAD);
1528 assert_eq!(tail.len(), TOOL_RESULT_LOG_TAIL);
1529 assert!(payload.starts_with(&head));
1533 assert!(payload.ends_with(&tail));
1534 }
1535
1536 #[test]
1537 fn head_tail_for_log_respects_utf8_char_boundaries() {
1538 let mid = "πλάκα".repeat(50); let prefix: String = "x".repeat(150);
1544 let suffix: String = "y".repeat(150);
1545 let payload = format!("{prefix}{mid}{suffix}");
1546 let (head, tail) = head_tail_for_log(&payload);
1547 assert!(payload.starts_with(&head));
1552 assert!(payload.ends_with(&tail));
1553 assert!(head.len() <= TOOL_RESULT_LOG_HEAD);
1555 assert!(tail.len() <= TOOL_RESULT_LOG_TAIL + 1); }
1557
1558 #[test]
1559 fn unexecuted_message_mentions_progress_only_calls_when_present() {
1560 let result = unexecuted_tool_call_result(3, 2, 1);
1561 let text = match result.content.first() {
1562 Some(ToolResultBlock::Text(text)) => text.text.as_str(),
1563 _ => panic!("expected text result"),
1564 };
1565
1566 assert!(text.contains("2 limit-counted tool calls"));
1567 assert!(text.contains("3 tool calls total, including progress-only calls"));
1568 assert_eq!(
1569 result
1570 .details
1571 .get("limit_counted_tool_calls")
1572 .and_then(Value::as_u64),
1573 Some(2)
1574 );
1575 }
1576}