Skip to main content

clark_agent/
exec.rs

1//! Tool batch execution.
2//!
3//! Canonical prepare / execute / finalize chain for model-emitted tool
4//! calls.
5//!
6//! Two modes:
7//!
8//! - **Parallel** (default): all tools in the batch prep sequentially,
9//!   then run concurrently, then finalize sequentially in source order.
10//! - **Sequential**: each tool is prepped, executed, and finalized
11//!   before the next starts. Triggered by either:
12//!     - any tool in the batch setting `requires_exclusive_sandbox = true`, or
13//!     - `LoopConfig.default_execution_mode = Sequential` (loop-wide pin).
14//!
15//! Hook plumbing:
16//! - `BeforeToolCall::on_before_tool_call` runs after argument validation,
17//!   before `tool.execute`. May `block` to short-circuit with an error
18//!   tool result.
19//! - `AfterToolCall::on_after_tool_call` runs after `tool.execute`. May
20//!   `override_result`, `mark_error`, or vote `terminate`.
21
22use std::sync::Arc;
23use std::time::{Duration, SystemTime, UNIX_EPOCH};
24
25use serde_json::{json, Value};
26use tokio::sync::mpsc;
27use tokio::time::timeout;
28use tokio_util::sync::CancellationToken;
29
30use crate::config::LoopConfig;
31use crate::error::{LoopError, ToolError};
32use crate::event::{AgentEvent, EventSink};
33use crate::plugin::{AfterToolCallContext, BeforeToolCallContext, EventObserver};
34use crate::tool::{detect_arg_parse_error, AgentTool, ExecutionMode, ToolCall, ToolResult};
35use crate::types::{AgentContext, AgentMessage, AssistantContent, ToolResultContent};
36
37const TOOL_UPDATE_DRAIN_GRACE: Duration = Duration::from_millis(50);
38const TOOL_UPDATE_EVENT_QUEUE_CAPACITY: usize = 256;
39
40fn spawn_tool_update_dispatcher(
41    event_sink: Arc<dyn EventSink>,
42    observers: Vec<Arc<dyn EventObserver>>,
43) -> mpsc::Sender<AgentEvent> {
44    let (tx, mut rx) = mpsc::channel::<AgentEvent>(TOOL_UPDATE_EVENT_QUEUE_CAPACITY);
45    tokio::spawn(async move {
46        while let Some(event) = rx.recv().await {
47            event_sink.emit(event.clone()).await;
48            for observer in observers.iter() {
49                observer.on_event(&event).await;
50            }
51        }
52    });
53    tx
54}
55
56fn enqueue_tool_update_event(tx: &mpsc::Sender<AgentEvent>, event: AgentEvent) {
57    match tx.try_send(event) {
58        Ok(()) => {}
59        Err(mpsc::error::TrySendError::Full(_)) => {
60            tracing::warn!("tool update event queue full; dropping partial update");
61        }
62        Err(mpsc::error::TrySendError::Closed(_)) => {}
63    }
64}
65
66/// Result of executing one batch.
67pub(crate) struct ExecutedBatch {
68    /// Tool result messages in source order, ready to push to history.
69    pub messages: Vec<AgentMessage>,
70    /// Unanimous-vote terminate signal: true when every finalized result
71    /// in the batch had `terminate = true`. Empty batches return false.
72    pub terminate: bool,
73}
74
75pub(crate) async fn execute_tool_batch(
76    assistant: &AgentMessage,
77    tool_calls: Vec<ToolCall>,
78    context: &AgentContext,
79    config: &LoopConfig,
80    signal: &CancellationToken,
81    turn_allowlist: Option<&std::collections::HashSet<String>>,
82) -> Result<ExecutedBatch, LoopError> {
83    if tool_calls.is_empty() {
84        return Ok(ExecutedBatch {
85            messages: Vec::new(),
86            terminate: false,
87        });
88    }
89
90    // Let the active protocol policy normalize the batch before registry
91    // lookup — e.g. fold a product's known alias names into a canonical
92    // tool. The default policy is a no-op; the core performs no alias
93    // repair of its own, so no product tool vocabulary lives here.
94    let mut tool_calls = tool_calls;
95    config
96        .protocol
97        .normalize_tool_calls(&mut tool_calls, &config.tools);
98
99    let total_tool_calls = tool_calls.len();
100    let limit_counted_tool_calls = count_limit_counted_tool_calls(&tool_calls, &config.tools);
101    let (tool_calls, unexecuted_tool_calls, max_executed) =
102        split_tool_calls_for_execution(tool_calls, &config.tools, config.max_tool_calls_per_turn);
103
104    let assistant_content = match assistant {
105        AgentMessage::Assistant { content, .. } => content.clone(),
106        _ => AssistantContent { blocks: Vec::new() },
107    };
108
109    if tool_calls.is_empty() {
110        let messages = synthesize_unexecuted_tool_results(
111            assistant,
112            &assistant_content,
113            unexecuted_tool_calls,
114            total_tool_calls,
115            limit_counted_tool_calls,
116            max_executed.unwrap_or(0),
117            context,
118            config,
119        )
120        .await;
121        return Ok(ExecutedBatch {
122            messages,
123            terminate: false,
124        });
125    }
126
127    // A batch downgrades to Sequential when either (a) the loop is
128    // pinned to Sequential mode, or (b) any participating tool needs
129    // exclusive sandbox access.
130    let any_exclusive = tool_calls.iter().any(|call| {
131        config
132            .tools
133            .get(&call.name)
134            .map(|t| t.requires_exclusive_sandbox())
135            .unwrap_or(false)
136    });
137
138    let effective_mode =
139        if any_exclusive || config.default_execution_mode == ExecutionMode::Sequential {
140            ExecutionMode::Sequential
141        } else {
142            ExecutionMode::Parallel
143        };
144
145    let mut batch = match effective_mode {
146        ExecutionMode::Sequential => {
147            execute_sequential(
148                assistant,
149                &assistant_content,
150                tool_calls,
151                context,
152                config,
153                signal,
154                turn_allowlist,
155            )
156            .await
157        }
158        ExecutionMode::Parallel => {
159            execute_parallel(
160                assistant,
161                &assistant_content,
162                tool_calls,
163                context,
164                config,
165                signal,
166                turn_allowlist,
167            )
168            .await
169        }
170    }?;
171
172    if !unexecuted_tool_calls.is_empty() {
173        batch.messages.extend(
174            synthesize_unexecuted_tool_results(
175                assistant,
176                &assistant_content,
177                unexecuted_tool_calls,
178                total_tool_calls,
179                limit_counted_tool_calls,
180                max_executed.unwrap_or(0),
181                context,
182                config,
183            )
184            .await,
185        );
186        batch.terminate = false;
187    }
188
189    Ok(batch)
190}
191
192fn split_tool_calls_for_execution(
193    tool_calls: Vec<ToolCall>,
194    tools: &crate::tool::ToolRegistry,
195    max_tool_calls: Option<usize>,
196) -> (Vec<ToolCall>, Vec<ToolCall>, Option<usize>) {
197    let Some(max_tool_calls) = max_tool_calls else {
198        return (tool_calls, Vec::new(), None);
199    };
200    let max_tool_calls = max_tool_calls.max(1);
201    if count_limit_counted_tool_calls(&tool_calls, tools) <= max_tool_calls {
202        return (tool_calls, Vec::new(), Some(max_tool_calls));
203    }
204
205    let mut executable = Vec::with_capacity(tool_calls.len());
206    let mut unexecuted = Vec::new();
207    let mut executed_counted = 0usize;
208    for call in tool_calls {
209        if !tool_counts_toward_call_limit(tools, &call.name) {
210            // Progress-only tools, parallel-safe reads, and malformed calls
211            // that resolve to no registered tool never burn the per-turn cap;
212            // let them all through (see `tool_counts_toward_call_limit`). A
213            // malformed call still lands as a synthetic "no such tool" error
214            // in `prepare_call`; it just cannot preempt a real call's slot.
215            executable.push(call);
216        } else if executed_counted < max_tool_calls {
217            executed_counted += 1;
218            executable.push(call);
219        } else {
220            unexecuted.push(call);
221        }
222    }
223    (executable, unexecuted, Some(max_tool_calls))
224}
225
226fn count_limit_counted_tool_calls(
227    tool_calls: &[ToolCall],
228    tools: &crate::tool::ToolRegistry,
229) -> usize {
230    tool_calls
231        .iter()
232        .filter(|call| tool_counts_toward_call_limit(tools, &call.name))
233        .count()
234}
235
236/// Whether a tool consumes a slot from the per-turn cap.
237///
238/// A call is exempt from the cap when ANY of these hold:
239/// - it opts out of the cap (progress-only signals that report status
240///   without doing work — see [`AgentTool::counts_toward_tool_call_limit`]);
241/// - it is marked parallel-safe (idempotent reads like `web_search`,
242///   `file_read`, `grep`, `glob`);
243/// - it does not resolve to a registered tool at all — an empty/blank name,
244///   or a name no tool exists for.
245///
246/// The last case is the load-bearing one. A call that resolves to no tool
247/// does NO real work: `prepare_call` short-circuits it to a synchronous
248/// "no such tool" error result without ever invoking a tool. Counting it
249/// would let a glitch — e.g. a streamed tool call that arrived with an empty
250/// `name` — spend the turn's only slot and bump a *real* call into the
251/// unexecuted bin. That was observed in production: under
252/// `max_tool_calls_per_turn = 1`, an empty-name call preempted the model's
253/// real next call. A malformed call must never preempt real work; it still
254/// surfaces its error so the model can react next turn.
255///
256/// Note this is the opposite of the *termination*-vote rule: there, an
257/// unresolved name DOES count (see `tool_counts_toward_termination_vote`),
258/// so a stray call cannot accidentally end the run. The asymmetry is
259/// intentional — a malformed call does no work (so it must not spend the
260/// work budget) and is not a satisfied terminator (so it must not vote to
261/// stop).
262fn tool_counts_toward_call_limit(tools: &crate::tool::ToolRegistry, name: &str) -> bool {
263    tools
264        .get(name)
265        .map(|tool| tool.counts_toward_tool_call_limit() && !tool.parallel_safe_per_turn())
266        .unwrap_or(false)
267}
268
269/// Whether a tool's terminate vote is counted in the unanimous-vote
270/// tally. Unknown / unregistered names default to `true` so a stray
271/// tool call cannot accidentally end the run by being treated as
272/// advisory. See `AgentTool::counts_toward_termination_vote`.
273fn tool_counts_toward_termination_vote(tools: &crate::tool::ToolRegistry, name: &str) -> bool {
274    tools
275        .get(name)
276        .map(|tool| tool.counts_toward_termination_vote())
277        .unwrap_or(true)
278}
279
280/// Compute the batch-level terminate signal, ignoring tools that opt
281/// out via `counts_toward_termination_vote() == false`.
282///
283/// The batch terminates iff:
284/// - at least one *counted* tool is present, AND
285/// - every counted tool voted `terminate: true`.
286///
287/// An all-advisory batch (e.g. only progress-note calls) returns
288/// `false` because no counted tool voted yes — progress notes never
289/// end the run on their own.
290///
291/// When the batch terminates AND advisory siblings were skipped from
292/// the tally, emits a structured `tracing::info` line so operators can
293/// measure how often this fallback actually fires in production — a
294/// non-zero rate names which model still needs the safety net.
295fn compute_batch_terminate<'a, I>(tools: &crate::tool::ToolRegistry, votes: I) -> bool
296where
297    I: IntoIterator<Item = (&'a str, bool)>,
298{
299    let mut counted_total = 0usize;
300    let mut counted_terminate = 0usize;
301    let mut terminating: Vec<&'a str> = Vec::new();
302    let mut advisory_skipped: Vec<&'a str> = Vec::new();
303    for (name, terminate) in votes {
304        if !tool_counts_toward_termination_vote(tools, name) {
305            advisory_skipped.push(name);
306            continue;
307        }
308        counted_total += 1;
309        if terminate {
310            counted_terminate += 1;
311            terminating.push(name);
312        }
313    }
314    let terminated = counted_total > 0 && counted_terminate == counted_total;
315    if terminated && !advisory_skipped.is_empty() {
316        tracing::info!(
317            terminating_tools = ?terminating,
318            advisory_tools = ?advisory_skipped,
319            counted_total,
320            "advisory siblings excluded from unanimous termination vote"
321        );
322    }
323    terminated
324}
325
326// The execution helpers share the same loop context tuple. Keeping the
327// signatures explicit is clearer than introducing a one-off bag of references.
328#[allow(clippy::too_many_arguments)]
329async fn synthesize_unexecuted_tool_results(
330    assistant: &AgentMessage,
331    assistant_content: &AssistantContent,
332    tool_calls: Vec<ToolCall>,
333    total_tool_calls: usize,
334    limit_counted_tool_calls: usize,
335    max_executed: usize,
336    context: &AgentContext,
337    config: &LoopConfig,
338) -> Vec<AgentMessage> {
339    let mut messages = Vec::with_capacity(tool_calls.len());
340    for call in tool_calls {
341        emit_tool_start(config, &call).await;
342        let outcome = finalize(
343            assistant,
344            assistant_content,
345            &call,
346            &call.arguments,
347            ExecutedOutcome {
348                result: unexecuted_tool_call_result(
349                    total_tool_calls,
350                    limit_counted_tool_calls,
351                    max_executed,
352                ),
353                is_error: true,
354            },
355            &context.messages,
356            &config.plugins.after_tool_call,
357        )
358        .await;
359        emit_tool_end(config, &call, &outcome).await;
360        messages.push(outcome_to_message(&call, outcome));
361    }
362    messages
363}
364
365fn unexecuted_tool_call_message(
366    total_tool_calls: usize,
367    limit_counted_tool_calls: usize,
368    max_executed: usize,
369) -> String {
370    let call_word = if total_tool_calls == 1 {
371        "tool call"
372    } else {
373        "tool calls"
374    };
375    let limited_call_word = if limit_counted_tool_calls == 1 {
376        "limit-counted tool call"
377    } else {
378        "limit-counted tool calls"
379    };
380    let executed_word = if max_executed == 1 { "call" } else { "calls" };
381    if limit_counted_tool_calls != total_tool_calls {
382        return format!(
383            "This tool call was not executed because the assistant turn emitted \
384             {limit_counted_tool_calls} {limited_call_word} ({total_tool_calls} \
385             {call_word} total, including progress-only calls), but only the \
386             first {max_executed} limit-counted {executed_word} can run in one \
387             turn. The earlier allowed calls already ran. Reissue this call in \
388             a later turn, one tool call at a time."
389        );
390    }
391    format!(
392        "This tool call was not executed because the assistant turn emitted \
393         {total_tool_calls} {call_word}, but only the first {max_executed} \
394         {executed_word} can run in one turn. The earlier {max_executed} \
395         {executed_word} already ran. Reissue this call in a later turn, \
396         one tool call at a time."
397    )
398}
399
400fn unexecuted_tool_call_result(
401    total_tool_calls: usize,
402    limit_counted_tool_calls: usize,
403    max_executed: usize,
404) -> ToolResult {
405    let mut result = ToolResult::error(unexecuted_tool_call_message(
406        total_tool_calls,
407        limit_counted_tool_calls,
408        max_executed,
409    ));
410    result.details = json!({
411        "kind": "tool_call_not_executed",
412        "reason": "max_tool_calls_per_turn",
413        "total_tool_calls": total_tool_calls,
414        "limit_counted_tool_calls": limit_counted_tool_calls,
415        "max_executed": max_executed,
416    });
417    result
418}
419
420#[allow(clippy::too_many_arguments)]
421async fn execute_sequential(
422    assistant: &AgentMessage,
423    assistant_content: &AssistantContent,
424    tool_calls: Vec<ToolCall>,
425    context: &AgentContext,
426    config: &LoopConfig,
427    signal: &CancellationToken,
428    turn_allowlist: Option<&std::collections::HashSet<String>>,
429) -> Result<ExecutedBatch, LoopError> {
430    let mut messages = Vec::with_capacity(tool_calls.len());
431    let mut votes: Vec<(String, bool)> = Vec::with_capacity(tool_calls.len());
432
433    for call in tool_calls {
434        let outcome = run_one(
435            assistant,
436            assistant_content,
437            &call,
438            context,
439            config,
440            signal,
441            turn_allowlist,
442        )
443        .await?;
444        votes.push((call.name.clone(), outcome.terminate));
445        messages.push(outcome_to_message(&call, outcome));
446    }
447
448    let terminate =
449        compute_batch_terminate(&config.tools, votes.iter().map(|(n, t)| (n.as_str(), *t)));
450
451    Ok(ExecutedBatch {
452        messages,
453        terminate,
454    })
455}
456
457#[allow(clippy::too_many_arguments)]
458async fn execute_parallel(
459    assistant: &AgentMessage,
460    assistant_content: &AssistantContent,
461    tool_calls: Vec<ToolCall>,
462    context: &AgentContext,
463    config: &LoopConfig,
464    signal: &CancellationToken,
465    turn_allowlist: Option<&std::collections::HashSet<String>>,
466) -> Result<ExecutedBatch, LoopError> {
467    use futures::stream::{FuturesUnordered, StreamExt};
468
469    // Per-batch cancellation lever. As a child of `signal` it auto-
470    // cancels when the run-wide signal cancels (so tools react to the
471    // user's abort). It can also be cancelled independently on
472    // sibling-error opt-in (`AgentTool::aborts_siblings_on_error`),
473    // propagating only to siblings in *this* batch — neither sibling
474    // failures nor sibling-triggered cancels affect the run-wide
475    // signal.
476    let batch_token = signal.child_token();
477
478    // Prep + emit start sequentially so prep ordering and event ordering
479    // are deterministic. Then await the executions concurrently.
480    let mut prepared: Vec<(ToolCall, PreparedCall)> = Vec::with_capacity(tool_calls.len());
481    for call in tool_calls {
482        emit_tool_start(config, &call).await;
483        let prep = prepare_call(
484            assistant,
485            assistant_content,
486            &call,
487            context,
488            config,
489            turn_allowlist,
490        )
491        .await;
492        prepared.push((call, prep));
493    }
494
495    let mut futures = Vec::with_capacity(prepared.len());
496    let mut immediate: Vec<(usize, ToolCall, FinalizedOutcome)> = Vec::new();
497
498    for (idx, (call, prep)) in prepared.into_iter().enumerate() {
499        match prep {
500            PreparedCall::Immediate(executed) => {
501                // Route Immediate outcomes through finalize so
502                // AfterToolCall hooks observe every tool result —
503                // including arg-parse / validation / before-block
504                // errors. The `args` we hand to hooks is the original
505                // (potentially sentinel-bearing) call arguments since
506                // we never built prepared args for short-circuited
507                // calls.
508                let finalized = finalize(
509                    assistant,
510                    assistant_content,
511                    &call,
512                    &call.arguments,
513                    executed,
514                    &context.messages,
515                    &config.plugins.after_tool_call,
516                )
517                .await;
518                immediate.push((idx, call, finalized));
519            }
520            PreparedCall::Prepared { tool, args } => {
521                let tool_signal = batch_token.child_token();
522                let run_signal = signal.clone();
523                let batch_token_clone = batch_token.clone();
524                let assistant_clone = assistant.clone();
525                let assistant_content_clone = assistant_content.clone();
526                let context_messages = context.messages.clone();
527                let after_hooks = config.plugins.after_tool_call.clone();
528                let event_sink = config.event_sink.clone();
529                let event_observers = config.plugins.event_observer.clone();
530                let call_clone = call.clone();
531                let fut = async move {
532                    let id = call_clone.id.clone();
533                    let name = call_clone.name.clone();
534                    let name_for_message = name.clone();
535                    let update_events = spawn_tool_update_dispatcher(event_sink, event_observers);
536                    let executed_result = execute_prepared(
537                        &tool,
538                        &call_clone,
539                        args.clone(),
540                        tool_signal,
541                        Box::new(move |update| {
542                            let event = AgentEvent::ToolExecutionUpdate {
543                                tool_call_id: id.clone(),
544                                tool_name: name.clone(),
545                                partial: update,
546                            };
547                            enqueue_tool_update_event(&update_events, event);
548                        }),
549                    )
550                    .await;
551                    let executed = match executed_result {
552                        Ok(executed) => executed,
553                        Err(LoopError::Aborted)
554                            if batch_token_clone.is_cancelled() && !run_signal.is_cancelled() =>
555                        {
556                            // Sibling abort, not user abort. Convert
557                            // to a recoverable tool result so the
558                            // model sees what happened next turn and
559                            // the unanimous-vote termination rule
560                            // stays intact.
561                            ExecutedOutcome {
562                                result: ToolResult::error(format!(
563                                    "aborted because a sibling tool in the \
564                                     parallel batch errored — re-run this \
565                                     {name_for_message} call after addressing the \
566                                     sibling failure"
567                                )),
568                                is_error: true,
569                            }
570                        }
571                        Err(other) => return Err(other),
572                    };
573                    let finalized = finalize(
574                        &assistant_clone,
575                        &assistant_content_clone,
576                        &call_clone,
577                        &args,
578                        executed,
579                        &context_messages,
580                        &after_hooks,
581                    )
582                    .await;
583                    Ok::<_, LoopError>((idx, call_clone, finalized))
584                };
585                futures.push(fut);
586            }
587        }
588    }
589
590    // Drain futures as they complete. When an opted-in tool returns
591    // an error, cancel `batch_token` so still-running siblings exit
592    // promptly (cooperatively — they must check the signal). The
593    // futures already in flight that complete *before* the trigger
594    // produce their natural result. Cancelled siblings produce a
595    // typed `is_error: true` ToolResult via the match arm above.
596    let mut unordered: FuturesUnordered<_> = futures.into_iter().collect();
597    let mut completed: Vec<(usize, ToolCall, FinalizedOutcome)> =
598        Vec::with_capacity(unordered.len() + immediate.len());
599    while let Some(result) = unordered.next().await {
600        let entry = result?;
601        if entry.2.is_error {
602            let aborts = config
603                .tools
604                .get(&entry.1.name)
605                .map(|t| t.aborts_siblings_on_error())
606                .unwrap_or(false);
607            if aborts && !batch_token.is_cancelled() {
608                batch_token.cancel();
609            }
610        }
611        completed.push(entry);
612    }
613    completed.extend(immediate);
614    completed.sort_by_key(|(idx, _, _)| *idx);
615
616    let mut messages = Vec::with_capacity(completed.len());
617    let mut votes: Vec<(String, bool)> = Vec::with_capacity(completed.len());
618    for (_idx, call, outcome) in completed {
619        emit_tool_end(config, &call, &outcome).await;
620        votes.push((call.name.clone(), outcome.terminate));
621        messages.push(outcome_to_message(&call, outcome));
622    }
623
624    let terminate =
625        compute_batch_terminate(&config.tools, votes.iter().map(|(n, t)| (n.as_str(), *t)));
626
627    Ok(ExecutedBatch {
628        messages,
629        terminate,
630    })
631}
632
633/// Execute one tool call synchronously: prep → execute → finalize.
634/// Used by the sequential path.
635#[allow(clippy::too_many_arguments)]
636async fn run_one(
637    assistant: &AgentMessage,
638    assistant_content: &AssistantContent,
639    call: &ToolCall,
640    context: &AgentContext,
641    config: &LoopConfig,
642    signal: &CancellationToken,
643    turn_allowlist: Option<&std::collections::HashSet<String>>,
644) -> Result<FinalizedOutcome, LoopError> {
645    emit_tool_start(config, call).await;
646
647    let prep = prepare_call(
648        assistant,
649        assistant_content,
650        call,
651        context,
652        config,
653        turn_allowlist,
654    )
655    .await;
656    let outcome = match prep {
657        PreparedCall::Immediate(executed) => {
658            finalize(
659                assistant,
660                assistant_content,
661                call,
662                &call.arguments,
663                executed,
664                &context.messages,
665                &config.plugins.after_tool_call,
666            )
667            .await
668        }
669        PreparedCall::Prepared { tool, args } => {
670            let event_sink = config.event_sink.clone();
671            let event_observers = config.plugins.event_observer.clone();
672            let id = call.id.clone();
673            let name = call.name.clone();
674            let update_events = spawn_tool_update_dispatcher(event_sink, event_observers);
675            let executed = execute_prepared(
676                &tool,
677                call,
678                args.clone(),
679                signal.clone(),
680                Box::new(move |update| {
681                    let event = AgentEvent::ToolExecutionUpdate {
682                        tool_call_id: id.clone(),
683                        tool_name: name.clone(),
684                        partial: update,
685                    };
686                    enqueue_tool_update_event(&update_events, event);
687                }),
688            )
689            .await?;
690            finalize(
691                assistant,
692                assistant_content,
693                call,
694                &args,
695                executed,
696                &context.messages,
697                &config.plugins.after_tool_call,
698            )
699            .await
700        }
701    };
702
703    emit_tool_end(config, call, &outcome).await;
704    Ok(outcome)
705}
706
707// ─── Internal pipeline ────────────────────────────────────────────
708
709enum PreparedCall {
710    /// Argument validation, parse-error detection, or `BeforeToolCall`
711    /// short-circuited the call. The loop emits the error tool result
712    /// without invoking `tool.execute`, but still runs `AfterToolCall`
713    /// hooks so observers (terminal-message guard, system-reminder hook,
714    /// etc.) see every tool result — successes and failures alike.
715    Immediate(ExecutedOutcome),
716    /// Ready to execute.
717    Prepared {
718        tool: Arc<dyn AgentTool>,
719        args: Value,
720    },
721}
722
723struct ExecutedOutcome {
724    result: ToolResult,
725    is_error: bool,
726}
727
728pub(crate) struct FinalizedOutcome {
729    pub result: ToolResult,
730    pub is_error: bool,
731    pub terminate: bool,
732}
733
734/// Walk every registered `ToolGate` and ask each for a specific reason
735/// it denies `tool_name`. Returns the first specific reason; `None` if
736/// no gate claims responsibility (caller falls back to the active
737/// [`crate::protocol::ProtocolPolicy`], then to the core's generic
738/// hidden-tool message).
739struct GateDenial {
740    reason: String,
741    gate: &'static str,
742}
743
744async fn gate_attributed_denial(
745    tool_name: &str,
746    config: &LoopConfig,
747    messages: &[AgentMessage],
748) -> Option<GateDenial> {
749    let available_tool_names: Vec<&str> = config.tools.iter().map(|t| t.name()).collect();
750    let iteration = messages
751        .iter()
752        .filter(|m| matches!(m, AgentMessage::Assistant { .. }))
753        .count();
754    for gate in &config.plugins.tool_gate {
755        let ctx = crate::plugin::ToolGateContext {
756            iteration,
757            messages,
758            conversation_id: config.conversation_id.as_deref(),
759            available_tool_names: &available_tool_names,
760        };
761        if let Some(reason) = gate.denial_reason(tool_name, ctx).await {
762            return Some(GateDenial {
763                reason,
764                gate: gate.name(),
765            });
766        }
767    }
768    None
769}
770
771async fn prepare_call(
772    assistant: &AgentMessage,
773    assistant_content: &AssistantContent,
774    call: &ToolCall,
775    context: &AgentContext,
776    config: &LoopConfig,
777    turn_allowlist: Option<&std::collections::HashSet<String>>,
778) -> PreparedCall {
779    let Some(tool) = config.tools.get(&call.name) else {
780        return PreparedCall::Immediate(ExecutedOutcome {
781            result: ToolResult::error(format!("Tool `{}` not found", call.name)),
782            is_error: true,
783        });
784    };
785
786    // Hard-enforce per-turn `ToolGate` narrowing. The allowlist filters
787    // what schemas the model SEES; without this check, the model can
788    // hallucinate a tool name that wasn't advertised this turn and the
789    // dispatcher runs it anyway because the registry is global. That was
790    // observed in production: a model called a terminal delivery tool
791    // after a no-work narrowing had dropped it from the catalog, claimed
792    // success without doing any work, and the file it claimed to create
793    // didn't exist. Refuse here so the model sees a typed tool error and
794    // either picks an allowed tool or surfaces an unrecoverable state.
795    //
796    // Message + details are sourced in priority order:
797    //   1. a `ToolGate` that attributes the denial via `denial_reason`;
798    //   2. the active `ProtocolPolicy` (product vocabulary, if any);
799    //   3. the core's generic, vocabulary-free fallback.
800    if let Some(allowlist) = turn_allowlist {
801        if !allowlist.contains(call.name.as_str()) {
802            let attributed = gate_attributed_denial(&call.name, config, &context.messages).await;
803            let (message, details) =
804                match attributed {
805                    Some(denial) => {
806                        let details = crate::protocol::generic_hidden_tool_details(
807                            &call.name,
808                            allowlist,
809                            Some(denial.gate),
810                        );
811                        (denial.reason, details)
812                    }
813                    None => match config.protocol.hidden_tool_error(
814                        crate::protocol::HiddenToolContext {
815                            requested_tool: &call.name,
816                            allowlist,
817                            messages: &context.messages,
818                        },
819                    ) {
820                        Some(err) => (err.message, err.details),
821                        None => (
822                            crate::protocol::generic_hidden_tool_message(&call.name, allowlist),
823                            crate::protocol::generic_hidden_tool_details(
824                                &call.name, allowlist, None,
825                            ),
826                        ),
827                    },
828                };
829            let mut result = ToolResult::error(message);
830            result.details = details;
831            return PreparedCall::Immediate(ExecutedOutcome {
832                result,
833                is_error: true,
834            });
835        }
836    }
837
838    // Provider stream layers wrap a malformed-JSON tool-args buffer in
839    // a sentinel object so we can surface a clean, model-recoverable
840    // error here instead of the cryptic `invalid type: string, expected
841    // struct …` that comes from each tool's `serde_json::from_value`
842    // running over a `Value::String` fallback. Detect the sentinel
843    // before validation/dispatch.
844    if let Some((parse_err, raw)) = detect_arg_parse_error(&call.arguments) {
845        return PreparedCall::Immediate(ExecutedOutcome {
846            result: ToolResult::error(format_arg_parse_error(&call.name, parse_err, raw)),
847            is_error: true,
848        });
849    }
850
851    let prepared_args = tool.prepare_arguments(call.arguments.clone());
852
853    if let Err(err) = tool.validate(&prepared_args) {
854        return PreparedCall::Immediate(ExecutedOutcome {
855            result: ToolResult::error(err.to_string()),
856            is_error: true,
857        });
858    }
859
860    let ctx = BeforeToolCallContext {
861        assistant_message: assistant,
862        assistant_content,
863        tool_call: call,
864        args: &prepared_args,
865        messages: &context.messages,
866    };
867    for hook in &config.plugins.before_tool_call {
868        let decision = hook
869            .on_before_tool_call(BeforeToolCallContext {
870                assistant_message: ctx.assistant_message,
871                assistant_content: ctx.assistant_content,
872                tool_call: ctx.tool_call,
873                args: ctx.args,
874                messages: ctx.messages,
875            })
876            .await;
877        if decision.block {
878            let reason = decision
879                .reason
880                .unwrap_or_else(|| format!("blocked by {}", hook.name()));
881            let mut result = ToolResult::error(reason);
882            if let Some(details) = decision.details {
883                result.details = details;
884            }
885            return PreparedCall::Immediate(ExecutedOutcome {
886                result,
887                is_error: true,
888            });
889        }
890    }
891
892    PreparedCall::Prepared {
893        tool,
894        args: prepared_args,
895    }
896}
897
898async fn execute_prepared(
899    tool: &Arc<dyn AgentTool>,
900    call: &ToolCall,
901    args: Value,
902    signal: CancellationToken,
903    on_update: Box<dyn Fn(ToolResult) + Send + Sync + 'static>,
904) -> Result<ExecutedOutcome, LoopError> {
905    let (tx, mut rx) = mpsc::unbounded_channel::<ToolResult>();
906
907    // Drain partial updates concurrently so they don't backpressure the tool.
908    let mut drain_handle = tokio::spawn(async move {
909        while let Some(partial) = rx.recv().await {
910            on_update(partial);
911        }
912    });
913
914    let result = match tool.execute(&call.id, args, signal, tx).await {
915        Ok(result) => {
916            let is_error = result.is_error;
917            Ok(ExecutedOutcome { result, is_error })
918        }
919        Err(ToolError::Execution(reason)) => Ok(ExecutedOutcome {
920            result: ToolResult::error(ToolError::Execution(reason).to_string()),
921            is_error: true,
922        }),
923        Err(ToolError::Aborted) => Err(LoopError::Aborted),
924        Err(ToolError::Fatal(reason)) => Err(LoopError::ToolFatal {
925            tool: call.name.clone(),
926            reason,
927        }),
928    };
929
930    match timeout(TOOL_UPDATE_DRAIN_GRACE, &mut drain_handle).await {
931        Ok(joined) => {
932            if let Err(error) = joined {
933                tracing::debug!(?error, "tool update dispatcher join failed");
934            }
935        }
936        Err(_) => {
937            drain_handle.abort();
938            if let Err(error) = drain_handle.await {
939                tracing::debug!(?error, "aborted tool update dispatcher");
940            }
941        }
942    }
943    result
944}
945
946#[allow(clippy::too_many_arguments)]
947async fn finalize(
948    assistant: &AgentMessage,
949    _assistant_content: &AssistantContent,
950    call: &ToolCall,
951    args: &Value,
952    mut executed: ExecutedOutcome,
953    messages: &[AgentMessage],
954    after_hooks: &[Arc<dyn crate::plugin::AfterToolCall>],
955) -> FinalizedOutcome {
956    for hook in after_hooks {
957        let ctx = AfterToolCallContext {
958            assistant_message: assistant,
959            tool_call: call,
960            args,
961            result: &executed.result,
962            is_error: executed.is_error,
963            messages,
964        };
965        let decision = hook.on_after_tool_call(ctx).await;
966        if let Some(new_result) = decision.result {
967            executed.is_error = new_result.is_error;
968            executed.result = new_result;
969        }
970        if let Some(mark_error) = decision.mark_error {
971            executed.is_error = mark_error;
972            executed.result.is_error = mark_error;
973        }
974        if let Some(terminate) = decision.terminate {
975            executed.result.terminate = terminate;
976        }
977    }
978
979    FinalizedOutcome {
980        result: executed.result,
981        is_error: executed.is_error,
982        terminate: false,
983    }
984    // Carry forward the result's own `terminate` field as the outcome's
985    // vote. (Done after the after-hooks have had a chance to override.)
986    .with_vote()
987}
988
989impl FinalizedOutcome {
990    fn with_vote(mut self) -> Self {
991        self.terminate = self.result.terminate;
992        self
993    }
994}
995
996fn outcome_to_message(call: &ToolCall, outcome: FinalizedOutcome) -> AgentMessage {
997    let details = match outcome.result.details {
998        serde_json::Value::Null => None,
999        other => Some(other),
1000    };
1001    let message = AgentMessage::ToolResult {
1002        tool_call_id: call.id.clone(),
1003        tool_name: call.name.clone(),
1004        content: ToolResultContent {
1005            blocks: outcome.result.content,
1006        },
1007        is_error: outcome.is_error,
1008        // Carry the row-caption prose ("Ran `ls`.", "Wrote
1009        // `index.html` (4 KB).") into the persisted history so
1010        // history-aware plugins (working_memory_anchor, smart_context,
1011        // history_repair) see the same prose the UI renders without
1012        // having to walk content blocks past densification headers.
1013        narration: outcome.result.narration,
1014        // Carry the host-side structured payload so downstream plugins
1015        // (delivery gates, artifact dispatchers, …) can read canonical
1016        // fields without text-grepping the prose body. Stripped from
1017        // provider wire formats — the model still sees `content` only.
1018        details,
1019        timestamp: Some(now_ms()),
1020    };
1021    // Instrumentation: the post-`AfterToolCall` boundary is where any
1022    // plugin-driven `override_result` has already landed. Logging the
1023    // final content text head/tail at this point lets
1024    // `RUST_LOG=clark_agent::exec::tool_result_built=debug` captures show
1025    // what actually enters `messages` per turn — useful for triangulating
1026    // any divergence between a tool's emitted args and the user-visible
1027    // result a downstream terminal walker later selects.
1028    if let AgentMessage::ToolResult {
1029        content,
1030        is_error,
1031        tool_call_id,
1032        tool_name,
1033        ..
1034    } = &message
1035    {
1036        let plain = content.plain_text();
1037        let (head, tail) = head_tail_for_log(&plain);
1038        tracing::debug!(
1039            target: "clark_agent::exec::tool_result_built",
1040            tool_call_id = %tool_call_id,
1041            tool_name = %tool_name,
1042            is_error = *is_error,
1043            content_len = plain.len(),
1044            content_head = %head,
1045            content_tail = %tail,
1046            "outcome_to_message wrote ToolResult into messages"
1047        );
1048    }
1049    message
1050}
1051
1052const TOOL_RESULT_LOG_HEAD: usize = 200;
1053const TOOL_RESULT_LOG_TAIL: usize = 200;
1054
1055/// Head/tail snippets of a tool-result text for diagnostic logging.
1056/// Avoids dumping multi-KB tool outputs into the trace stream while
1057/// still making divergence between two snapshots of the "same" text
1058/// visible at a glance.
1059fn head_tail_for_log(text: &str) -> (String, String) {
1060    if text.len() <= TOOL_RESULT_LOG_HEAD + TOOL_RESULT_LOG_TAIL {
1061        return (text.to_string(), String::new());
1062    }
1063    let head_end = char_boundary_at_or_before(text, TOOL_RESULT_LOG_HEAD);
1064    let tail_start = char_boundary_at_or_after(text, text.len() - TOOL_RESULT_LOG_TAIL);
1065    (text[..head_end].to_string(), text[tail_start..].to_string())
1066}
1067
1068fn char_boundary_at_or_before(text: &str, mut idx: usize) -> usize {
1069    if idx >= text.len() {
1070        return text.len();
1071    }
1072    while idx > 0 && !text.is_char_boundary(idx) {
1073        idx -= 1;
1074    }
1075    idx
1076}
1077
1078fn char_boundary_at_or_after(text: &str, mut idx: usize) -> usize {
1079    if idx >= text.len() {
1080        return text.len();
1081    }
1082    while idx < text.len() && !text.is_char_boundary(idx) {
1083        idx += 1;
1084    }
1085    idx
1086}
1087
1088fn now_ms() -> u64 {
1089    SystemTime::now()
1090        .duration_since(UNIX_EPOCH)
1091        .map(|d| d.as_millis() as u64)
1092        .unwrap_or(0)
1093}
1094
1095async fn emit_tool_start(config: &LoopConfig, call: &ToolCall) {
1096    let event = AgentEvent::ToolExecutionStart {
1097        tool_call_id: call.id.clone(),
1098        tool_name: call.name.clone(),
1099        args: call.arguments.clone(),
1100    };
1101    config.event_sink.emit(event.clone()).await;
1102    for o in &config.plugins.event_observer {
1103        o.on_event(&event).await;
1104    }
1105}
1106
1107/// Build a human-readable, model-recoverable error for an argument
1108/// payload that failed JSON parsing in the provider stream layer. Shape
1109/// the message so the model knows (1) it was a syntax problem, not a
1110/// schema problem, (2) what raw text it produced, and (3) what to do
1111/// next. Truncate the raw payload to keep error contexts bounded.
1112fn format_arg_parse_error(tool_name: &str, parse_err: &str, raw: &str) -> String {
1113    const RAW_MAX: usize = 1024;
1114    let raw_snippet = if raw.len() > RAW_MAX {
1115        format!(
1116            "{}…<{} bytes truncated>",
1117            &raw[..RAW_MAX],
1118            raw.len() - RAW_MAX
1119        )
1120    } else {
1121        raw.to_string()
1122    };
1123    format!(
1124        "Tool `{tool_name}` arguments were not valid JSON: {parse_err}. \
1125         You sent (raw): {raw_snippet}. \
1126         Re-emit the call with a JSON object matching the tool's schema; \
1127         this is a syntax error in your tool-call arguments, not a problem \
1128         with the file or the runtime."
1129    )
1130}
1131
1132async fn emit_tool_end(config: &LoopConfig, call: &ToolCall, outcome: &FinalizedOutcome) {
1133    let event = AgentEvent::ToolExecutionEnd {
1134        tool_call_id: call.id.clone(),
1135        tool_name: call.name.clone(),
1136        result: outcome.result.clone(),
1137        is_error: outcome.is_error,
1138    };
1139    config.event_sink.emit(event.clone()).await;
1140    for o in &config.plugins.event_observer {
1141        o.on_event(&event).await;
1142    }
1143}
1144
1145#[cfg(test)]
1146mod tests {
1147    use super::*;
1148    use crate::ToolResultBlock;
1149    use std::sync::Arc;
1150
1151    struct LimitTool {
1152        name: &'static str,
1153        counts: bool,
1154        vote_counts: bool,
1155        parallel_safe: bool,
1156    }
1157
1158    #[async_trait::async_trait]
1159    impl AgentTool for LimitTool {
1160        fn name(&self) -> &str {
1161            self.name
1162        }
1163
1164        fn description(&self) -> &str {
1165            "test tool"
1166        }
1167
1168        fn parameters_schema(&self) -> Value {
1169            json!({"type": "object"})
1170        }
1171
1172        fn counts_toward_tool_call_limit(&self) -> bool {
1173            self.counts
1174        }
1175
1176        fn parallel_safe_per_turn(&self) -> bool {
1177            self.parallel_safe
1178        }
1179
1180        fn counts_toward_termination_vote(&self) -> bool {
1181            self.vote_counts
1182        }
1183
1184        async fn execute(
1185            &self,
1186            _call_id: &str,
1187            _args: Value,
1188            _signal: CancellationToken,
1189            _update: mpsc::UnboundedSender<ToolResult>,
1190        ) -> Result<ToolResult, ToolError> {
1191            unreachable!("split tests do not execute tools")
1192        }
1193    }
1194
1195    fn registry() -> crate::tool::ToolRegistry {
1196        // Same registry the call-limit tests use plus the
1197        // termination-vote opt-out: `message_info` is advisory, the
1198        // other tools count.
1199        crate::tool::ToolRegistry::new()
1200            .with(Arc::new(LimitTool {
1201                name: "message_info",
1202                counts: false,
1203                vote_counts: false,
1204                parallel_safe: false,
1205            }))
1206            .with(Arc::new(LimitTool {
1207                name: "browser_navigate",
1208                counts: true,
1209                vote_counts: true,
1210                parallel_safe: true,
1211            }))
1212            .with(Arc::new(LimitTool {
1213                name: "browser_capture",
1214                counts: true,
1215                vote_counts: true,
1216                parallel_safe: true,
1217            }))
1218            .with(Arc::new(LimitTool {
1219                name: "browser_inspect",
1220                counts: true,
1221                vote_counts: true,
1222                parallel_safe: true,
1223            }))
1224            .with(Arc::new(LimitTool {
1225                name: "shell",
1226                counts: true,
1227                vote_counts: true,
1228                parallel_safe: false,
1229            }))
1230            .with(Arc::new(LimitTool {
1231                name: "message_result",
1232                counts: true,
1233                vote_counts: true,
1234                parallel_safe: false,
1235            }))
1236            .with(Arc::new(LimitTool {
1237                name: "message_ask",
1238                counts: true,
1239                vote_counts: true,
1240                parallel_safe: false,
1241            }))
1242            .with(Arc::new(LimitTool {
1243                name: "web_search",
1244                counts: true,
1245                vote_counts: true,
1246                parallel_safe: true,
1247            }))
1248            .with(Arc::new(LimitTool {
1249                name: "file_read",
1250                counts: true,
1251                vote_counts: true,
1252                parallel_safe: true,
1253            }))
1254    }
1255
1256    fn call(name: &str) -> ToolCall {
1257        ToolCall {
1258            id: format!("tc-{name}"),
1259            name: name.to_string(),
1260            arguments: Value::Null,
1261        }
1262    }
1263
1264    fn names(calls: &[ToolCall]) -> Vec<&str> {
1265        calls.iter().map(|call| call.name.as_str()).collect()
1266    }
1267
1268    #[test]
1269    fn progress_only_tools_do_not_starve_first_work_tool() {
1270        let registry = registry();
1271        let (executable, unexecuted, max) = split_tool_calls_for_execution(
1272            vec![call("message_info"), call("browser_navigate")],
1273            &registry,
1274            Some(1),
1275        );
1276
1277        assert_eq!(max, Some(1));
1278        assert_eq!(names(&executable), vec!["message_info", "browser_navigate"]);
1279        assert!(unexecuted.is_empty());
1280    }
1281
1282    #[test]
1283    fn extra_limit_counted_tools_still_get_synthetic_errors() {
1284        let registry = registry();
1285        let (executable, unexecuted, max) = split_tool_calls_for_execution(
1286            vec![call("message_info"), call("shell"), call("message_result")],
1287            &registry,
1288            Some(1),
1289        );
1290
1291        assert_eq!(max, Some(1));
1292        assert_eq!(names(&executable), vec!["message_info", "shell"]);
1293        assert_eq!(names(&unexecuted), vec!["message_result"]);
1294    }
1295
1296    #[test]
1297    fn parallel_safe_reads_do_not_burn_the_per_turn_cap() {
1298        // Two web_searches + one browser_navigate in a single turn:
1299        // before this change the second web_search would be dropped with
1300        // "only the first 1 call can run". After, the parallel-safe
1301        // reads execute alongside the one counted work tool.
1302        let registry = registry();
1303        let (executable, unexecuted, max) = split_tool_calls_for_execution(
1304            vec![
1305                call("web_search"),
1306                call("web_search"),
1307                call("browser_navigate"),
1308            ],
1309            &registry,
1310            Some(1),
1311        );
1312
1313        assert_eq!(max, Some(1));
1314        assert_eq!(
1315            names(&executable),
1316            vec!["web_search", "web_search", "browser_navigate"]
1317        );
1318        assert!(
1319            unexecuted.is_empty(),
1320            "unexecuted: {:?}",
1321            names(&unexecuted)
1322        );
1323    }
1324
1325    #[test]
1326    fn parallel_safe_reads_do_not_compete_with_a_write_for_the_cap() {
1327        // shell (write) still gets its single slot; the parallel-safe
1328        // reads pass through. A second shell would still be dropped.
1329        let registry = registry();
1330        let (executable, unexecuted, max) = split_tool_calls_for_execution(
1331            vec![
1332                call("file_read"),
1333                call("file_read"),
1334                call("shell"),
1335                call("shell"),
1336            ],
1337            &registry,
1338            Some(1),
1339        );
1340
1341        assert_eq!(max, Some(1));
1342        assert_eq!(names(&executable), vec!["file_read", "file_read", "shell"]);
1343        assert_eq!(names(&unexecuted), vec!["shell"]);
1344    }
1345
1346    #[test]
1347    fn browser_tools_do_not_burn_the_per_turn_cap() {
1348        // Browser tools require exclusive sandbox access, so the
1349        // executor still runs this batch sequentially. They are
1350        // nevertheless safe to admit together in one assistant turn:
1351        // a model often opens two related URLs, captures one page, and
1352        // inspects another before yielding. The per-turn cap should
1353        // not drop the later browser calls.
1354        let registry = registry();
1355        let (executable, unexecuted, max) = split_tool_calls_for_execution(
1356            vec![
1357                call("browser_navigate"),
1358                call("browser_navigate"),
1359                call("browser_capture"),
1360                call("browser_inspect"),
1361                call("shell"),
1362            ],
1363            &registry,
1364            Some(1),
1365        );
1366
1367        assert_eq!(max, Some(1));
1368        assert_eq!(
1369            names(&executable),
1370            vec![
1371                "browser_navigate",
1372                "browser_navigate",
1373                "browser_capture",
1374                "browser_inspect",
1375                "shell",
1376            ]
1377        );
1378        assert!(
1379            unexecuted.is_empty(),
1380            "unexecuted: {:?}",
1381            names(&unexecuted)
1382        );
1383    }
1384
1385    #[test]
1386    fn malformed_calls_do_not_burn_the_cap_or_preempt_real_work() {
1387        // A call that resolves to no registered tool (unknown name, or an
1388        // empty/blank name from a streaming glitch) does no real work — it
1389        // only yields a synthetic "no such tool" error in `prepare_call`. It
1390        // must NOT spend the turn's slot, or it bumps a real call into the
1391        // unexecuted bin. Regression for a production case where, under
1392        // `max_tool_calls_per_turn = 1`, an empty-name call preempted the
1393        // model's real next call.
1394        let registry = registry();
1395
1396        // Unknown name first, real counting tool second: both run; nothing deferred.
1397        let (executable, unexecuted, _) = split_tool_calls_for_execution(
1398            vec![call("missing"), call("shell")],
1399            &registry,
1400            Some(1),
1401        );
1402        assert_eq!(names(&executable), vec!["missing", "shell"]);
1403        assert!(
1404            unexecuted.is_empty(),
1405            "real work must not be preempted by an unknown name: {:?}",
1406            names(&unexecuted)
1407        );
1408
1409        // Empty name first (the prod glitch shape): the real call still runs.
1410        let (executable, unexecuted, _) =
1411            split_tool_calls_for_execution(vec![call(""), call("shell")], &registry, Some(1));
1412        assert_eq!(names(&executable), vec!["", "shell"]);
1413        assert!(
1414            unexecuted.is_empty(),
1415            "empty-name glitch must not preempt real work: {:?}",
1416            names(&unexecuted)
1417        );
1418
1419        // Two real counting tools: the cap still bites — the second is deferred.
1420        let (executable, unexecuted, _) =
1421            split_tool_calls_for_execution(vec![call("shell"), call("shell")], &registry, Some(1));
1422        assert_eq!(names(&executable), vec!["shell"]);
1423        assert_eq!(names(&unexecuted), vec!["shell"]);
1424    }
1425
1426    #[test]
1427    fn compute_batch_terminate_passes_when_only_advisory_siblings_dont_vote() {
1428        // Some models tail a terminating delivery call with a polite,
1429        // advisory sign-off call that opts out of the termination vote
1430        // (`counts_toward_termination_vote == false`). Under a strict
1431        // every-result-must-vote rule the trailing advisory call
1432        // (terminate=false) would block termination and the run would
1433        // grind to its iteration cap. With the advisory opt-out the
1434        // batch terminates on the strength of the delivery call alone.
1435        let registry = registry();
1436        let votes = [("message_result", true), ("message_info", false)];
1437        assert!(compute_batch_terminate(
1438            &registry,
1439            votes.iter().map(|(n, t)| (*n, *t))
1440        ));
1441    }
1442
1443    #[test]
1444    fn compute_batch_terminate_fails_when_any_counted_tool_did_not_vote_terminate() {
1445        let registry = registry();
1446        // `message_result` voted yes, but a real work tool (`shell`)
1447        // is still mid-flight or didn't vote — keep running.
1448        let votes = [("message_result", true), ("shell", false)];
1449        assert!(!compute_batch_terminate(
1450            &registry,
1451            votes.iter().map(|(n, t)| (*n, *t))
1452        ));
1453    }
1454
1455    #[test]
1456    fn compute_batch_terminate_returns_false_for_all_advisory_batches() {
1457        // An all-`message_info` batch must NEVER end the run; progress
1458        // notes are status, not termination, even when the model
1459        // emits several in a row.
1460        let registry = registry();
1461        let votes = [("message_info", false), ("message_info", false)];
1462        assert!(!compute_batch_terminate(
1463            &registry,
1464            votes.iter().map(|(n, t)| (*n, *t))
1465        ));
1466    }
1467
1468    #[test]
1469    fn compute_batch_terminate_returns_false_for_empty_batch() {
1470        let registry = registry();
1471        let votes: Vec<(&str, bool)> = Vec::new();
1472        assert!(!compute_batch_terminate(&registry, votes.into_iter()));
1473    }
1474
1475    #[test]
1476    fn compute_batch_terminate_treats_unknown_tools_as_counted() {
1477        // Unknown / unregistered tool names default to counted so a
1478        // stray call cannot accidentally terminate the run by being
1479        // silently classified as advisory.
1480        let registry = registry();
1481        // `message_result` voted yes, but an unknown tool emitted
1482        // `terminate=false`. Unknown counts → must not terminate.
1483        let votes = [("message_result", true), ("ghost_tool", false)];
1484        assert!(!compute_batch_terminate(
1485            &registry,
1486            votes.iter().map(|(n, t)| (*n, *t))
1487        ));
1488
1489        // And the symmetric case: an unknown tool that voted yes,
1490        // alongside `message_result` voting yes → still counted, so
1491        // the batch terminates.
1492        let votes = [("message_result", true), ("ghost_tool", true)];
1493        assert!(compute_batch_terminate(
1494            &registry,
1495            votes.iter().map(|(n, t)| (*n, *t))
1496        ));
1497    }
1498
1499    #[test]
1500    fn compute_batch_terminate_passes_when_message_ask_is_only_counted_terminator() {
1501        // Symmetric to the message_result case: message_ask (also a
1502        // terminating tool) tailed by message_info still terminates.
1503        let registry = registry();
1504        let votes = [("message_ask", true), ("message_info", false)];
1505        assert!(compute_batch_terminate(
1506            &registry,
1507            votes.iter().map(|(n, t)| (*n, *t))
1508        ));
1509    }
1510
1511    #[test]
1512    fn head_tail_for_log_returns_full_text_when_short() {
1513        // Short payloads (≤ HEAD+TAIL) round-trip in `head` with an
1514        // empty `tail` so the trace line stays compact and the
1515        // diagnostic reader doesn't have to reconstruct the full
1516        // string from two halves when there's nothing to truncate.
1517        let (head, tail) = head_tail_for_log("hello");
1518        assert_eq!(head, "hello");
1519        assert_eq!(tail, "");
1520    }
1521
1522    #[test]
1523    fn head_tail_for_log_truncates_long_text_with_head_and_tail() {
1524        let payload: String = "abc".repeat(500);
1525        assert!(payload.len() > TOOL_RESULT_LOG_HEAD + TOOL_RESULT_LOG_TAIL);
1526        let (head, tail) = head_tail_for_log(&payload);
1527        assert_eq!(head.len(), TOOL_RESULT_LOG_HEAD);
1528        assert_eq!(tail.len(), TOOL_RESULT_LOG_TAIL);
1529        // First/last bytes must come from the original — guards
1530        // against a regression where the helper accidentally re-orders
1531        // or drops the boundary characters.
1532        assert!(payload.starts_with(&head));
1533        assert!(payload.ends_with(&tail));
1534    }
1535
1536    #[test]
1537    fn head_tail_for_log_respects_utf8_char_boundaries() {
1538        // Multi-byte chars must not be split mid-codepoint or the
1539        // tracing macro would panic (and instrumentation would crash
1540        // the loop). Build a payload long enough to truncate, padded
1541        // with multi-byte chars at both boundary regions.
1542        let mid = "πλάκα".repeat(50); // each char is 2 bytes
1543        let prefix: String = "x".repeat(150);
1544        let suffix: String = "y".repeat(150);
1545        let payload = format!("{prefix}{mid}{suffix}");
1546        let (head, tail) = head_tail_for_log(&payload);
1547        // Validity assertions: both slices are valid UTF-8 (they
1548        // already are since they came from `&str`), and the boundary
1549        // is on a char boundary in the original. Round-trip check:
1550        // the head must be a prefix of payload and tail a suffix.
1551        assert!(payload.starts_with(&head));
1552        assert!(payload.ends_with(&tail));
1553        // Head capped at HEAD bytes (last char-boundary at or before).
1554        assert!(head.len() <= TOOL_RESULT_LOG_HEAD);
1555        assert!(tail.len() <= TOOL_RESULT_LOG_TAIL + 1); // +1 for boundary slack
1556    }
1557
1558    #[test]
1559    fn unexecuted_message_mentions_progress_only_calls_when_present() {
1560        let result = unexecuted_tool_call_result(3, 2, 1);
1561        let text = match result.content.first() {
1562            Some(ToolResultBlock::Text(text)) => text.text.as_str(),
1563            _ => panic!("expected text result"),
1564        };
1565
1566        assert!(text.contains("2 limit-counted tool calls"));
1567        assert!(text.contains("3 tool calls total, including progress-only calls"));
1568        assert_eq!(
1569            result
1570                .details
1571                .get("limit_counted_tool_calls")
1572                .and_then(Value::as_u64),
1573            Some(2)
1574        );
1575    }
1576}