atomcode_core/turn/runner.rs
1use std::time::Instant;
2
3use futures::StreamExt;
4use tokio::sync::mpsc;
5use tokio_util::sync::CancellationToken;
6
7use atomcode_telemetry::{CurrentContext, Event as TelemetryEvent, LlmErrorKind, ToolErrorKind};
8
9use crate::config::Config;
10use crate::conversation::Conversation;
11use crate::provider::LlmProvider;
12use crate::stream::StreamEvent;
13use crate::tool::{
14 PermissionDecision, ToolCall, ToolCallBuffer, ToolContext, ToolRegistry, ToolResult,
15};
16
17use super::event::{TurnEvent, TurnResult};
18use super::loop_guard::{LoopGuardDecision, LoopGuardState};
19use super::permission::PermissionDecider;
20
21/// Core LLM streaming + tool execution primitive.
22///
23/// Handles exactly one LLM call cycle:
24/// 1. Build messages from conversation
25/// 2. Stream LLM response (text deltas + tool calls)
26/// 3. Execute tool calls (with permission checking)
27/// 4. Add results to conversation
28///
29/// Does NOT handle: retries, discipline (anti-loop, step limits), or conversation management.
30/// The caller (AgentLoop / SubagentLoop) owns those responsibilities.
31pub struct TurnRunner {
32 pub provider: std::sync::Arc<dyn LlmProvider>,
33 pub tools: std::sync::Arc<ToolRegistry>,
34 pub context: ToolContext,
35 pub config: Config,
36 /// Context construction strategy. Shared with the parent
37 /// `AgentLoop::ctx` (same `Arc`) so the turn's actual send and
38 /// the agent's datalog snapshot go through one ctx — per-model
39 /// logic like `apply_model_directives` lands on both paths.
40 /// Rebuilt on `AgentCommand::ReloadConfig` alongside the agent's
41 /// clone.
42 pub ctx: std::sync::Arc<dyn crate::ctx::CtxBuilder>,
43 pub permission: Box<dyn PermissionDecider>,
44 /// Files edited during the current session (tracked for context awareness).
45 pub recently_edited_files: Vec<String>,
46 /// Hook executor — runs user-configured lifecycle hooks at tool execution boundaries.
47 pub hook_executor: std::sync::Arc<crate::hook::executor::HookExecutor>,
48 /// Cross-batch tool-call loop guard. Cleared per user-message by the
49 /// agent (see `handle_send_message`); records every executed tool's
50 /// `(name, args, output_hash)` triple and short-circuits the third
51 /// identical attempt. See `loop_guard.rs` for the full rationale.
52 pub loop_guard: LoopGuardState,
53}
54
55impl TurnRunner {
56 /// Execute one LLM turn: stream response, execute any tool calls, return result.
57 pub async fn run(
58 &mut self,
59 conversation: &mut Conversation,
60 system_prompt: &str,
61 event_tx: &mpsc::UnboundedSender<TurnEvent>,
62 cancel: CancellationToken,
63 ) -> TurnResult {
64 self.run_with_filter(conversation, system_prompt, "", event_tx, cancel, None)
65 .await
66 }
67
68 /// Run with optional tool filter and turn reminder.
69 /// `turn_reminder` is dynamic per-turn context (git status, current task, etc.)
70 /// injected as a <system-reminder> into the last user message to keep the
71 /// system prompt stable for caching.
72 pub async fn run_with_filter(
73 &mut self,
74 conversation: &mut Conversation,
75 system_prompt: &str,
76 turn_reminder: &str,
77 event_tx: &mpsc::UnboundedSender<TurnEvent>,
78 cancel: CancellationToken,
79 allowed_tools: Option<&[&str]>,
80 ) -> TurnResult {
81 // Telemetry: build a per-turn context carrying turn_id / provider / model.
82 // Emitted on every exit path via the `tel_return!` macro below.
83 let turn_id = uuid::Uuid::new_v4();
84 let parent = CurrentContext::current();
85 // Telemetry envelope fields:
86 // provider = vendor type ("claude" / "openai" / "ollama"),
87 // read directly from ProviderConfig — analytics
88 // want the vendor label, not the user's named alias.
89 // provider_host = host parsed from base_url, with vendor default
90 // fallback. Resolved by the telemetry crate so the
91 // default-host table lives next to the schema.
92 // model = LlmProvider::model_name() — the wire-level model
93 // string sent to the API.
94 let pcfg = self
95 .config
96 .providers
97 .get(&self.config.default_provider);
98 let vendor = pcfg.map(|p| p.provider_type.clone());
99 let host = pcfg.and_then(|p| {
100 atomcode_telemetry::resolve_provider_host(&p.provider_type, p.base_url.as_deref())
101 });
102 let scope_ctx = CurrentContext {
103 turn_id: Some(turn_id),
104 provider: parent.provider.clone().or(vendor),
105 provider_host: parent.provider_host.clone().or(host),
106 model: parent
107 .model
108 .clone()
109 .or_else(|| Some(self.provider.model_name().to_string())),
110 ..parent
111 };
112 let turn_started = std::time::Instant::now();
113
114 // 1. Build messages within token budget.
115 // Goes through `self.ctx.build_messages` (trait dispatch), NOT
116 // `ctx::render::build_messages` (free fn) — otherwise per-model
117 // logic like `apply_model_directives` only lands in datalog and
118 // the actually-sent messages diverge from what we logged.
119 let context_window = self.ctx.ctx_window();
120
121 let (messages, ctx_stats) =
122 self.ctx
123 .build_messages(conversation, system_prompt, turn_reminder);
124
125 let actual_tokens: usize = messages.iter().map(|m| m.estimate_tokens()).sum();
126
127 // Set budget hint for read_file dynamic threshold.
128 // read_file checks this to decide full content vs skeleton.
129 self.context.ctx_budget_hint.store(
130 context_window.saturating_sub(actual_tokens),
131 std::sync::atomic::Ordering::Relaxed,
132 );
133 let _ = event_tx.send(TurnEvent::ContextStats {
134 system_tokens: ctx_stats.system_tokens,
135 sent_tokens: actual_tokens.saturating_sub(ctx_stats.system_tokens),
136 dropped_tokens: ctx_stats.dropped_tokens,
137 working_set_tokens: 0,
138 total_messages: messages.len(),
139 });
140
141 // 3. Get tool definitions for the LLM
142 let all_tool_defs = self.tools.get_definitions().await;
143 let mut tool_defs: Vec<_> = if let Some(filter) = allowed_tools {
144 all_tool_defs
145 .into_iter()
146 .filter(|d| filter.contains(&d.name))
147 .collect()
148 } else {
149 all_tool_defs
150 };
151
152 // Inject ALL known-existing files into write_file description.
153 // Includes both edited AND read files — anything the model touched exists on disk.
154 {
155 let mut known_files: Vec<String> = self.recently_edited_files.clone();
156 // Extract read files from conversation tool calls
157 for msg in &messages {
158 if let crate::conversation::message::MessageContent::AssistantWithToolCalls {
159 tool_calls,
160 ..
161 } = &msg.content
162 {
163 for call in tool_calls {
164 if call.name == "read_file" {
165 if let Ok(args) =
166 serde_json::from_str::<serde_json::Value>(&call.arguments)
167 {
168 if let Some(fp) = args.get("file_path").and_then(|v| v.as_str()) {
169 let short = fp.rsplit('/').next().unwrap_or(fp).to_string();
170 if !known_files.contains(&short) {
171 known_files.push(short);
172 }
173 }
174 }
175 }
176 }
177 }
178 }
179 if !known_files.is_empty() {
180 if let Some(wf) = tool_defs.iter_mut().find(|d| d.name == "create_file") {
181 // Display basenames for readability in tool description
182 let display_names: Vec<&str> = known_files
183 .iter()
184 .map(|p| p.rsplit('/').next().unwrap_or(p.as_str()))
185 .collect();
186 let list = if display_names.len() <= 6 {
187 display_names.join(", ")
188 } else {
189 format!(
190 "{}, ... ({} files)",
191 display_names[..5].join(", "),
192 display_names.len()
193 )
194 };
195 wf.description.push_str(&format!(
196 "\nThese files ALREADY EXIST — use edit_file instead: {}",
197 list,
198 ));
199 }
200 }
201 }
202
203 // Log the request to <datalog_dir>/llm/<ts>.json right before send.
204 // `pending_request_log` holds the path so the response call below
205 // can merge into the same file — passed explicitly to avoid the old
206 // process-wide-static approach that bled across concurrent daemon
207 // sessions.
208 //
209 // `datalog_dir` is resolved from `[datalog].dir` (default
210 // `$ATOMCODE_HOME/datalog/<project-slug>/`) — the same root the
211 // markdown writer uses, so request JSON, response JSON, calls.log,
212 // and the markdown summary all live next to each other for any
213 // given project.
214 let pending_request_log = {
215 let wd = self
216 .context
217 .working_dir
218 .try_read()
219 .map(|g| g.clone())
220 .unwrap_or_default();
221 let datalog_dir = crate::turn::datalog::DatalogWriter::resolve_log_dir(
222 &wd,
223 self.config.datalog.dir.as_deref(),
224 );
225 super::log::log_llm_request(
226 &datalog_dir,
227 &messages,
228 &tool_defs,
229 self.provider.model_name(),
230 context_window,
231 0, // step — always 0 in calls.log today; step param
232 // kept for future per-tool-call correlation.
233 self.config.datalog.enabled,
234 )
235 };
236
237 // 3. Start streaming
238 let stream_start = std::time::Instant::now();
239 let stream_result = self.provider.chat_stream(&messages, Some(&tool_defs));
240
241 // 4. Process stream events
242 let mut tool_calls_buf: Vec<ToolCall> = Vec::new();
243 // RAW accumulator — keeps `<tool_call>...</tool_call>` blocks intact
244 // so the rescue path at Done can parse them when the model emitted
245 // its tool calls as XML in text instead of using the structured
246 // tool_calls API (Qwen / GLM / DeepSeek occasional misbehavior).
247 let mut text_buf = String::new();
248 // VISIBLE accumulator — mirror of what `stream_filter` actually
249 // emitted to UI / conversation history. Used for `TurnResult::
250 // Responded.text` so downstream consumers (datalog `log_text`,
251 // ATLAS plan extraction, telemetry) see the same clean text the
252 // user saw, not the raw text_buf with leaked XML. Earlier bug
253 // (5-7 datalog 20-14-23 Turn 5): Responded.text was raw text_buf
254 // → datalog `**Response:**` block carried `<tool_call>grep<arg_key>
255 // pattern</arg_key>...</tool_call>` mid-prose, polluting A/B
256 // analysis.
257 let mut visible_text_buf = String::new();
258 // Reasoning-model thinking content collected separately — not emitted
259 // to scrollback by default (users don't want to read the thinking).
260 // If `text_buf` ends up empty at `Done` but this is non-empty, we
261 // promote reasoning to the final answer: some gateways route entire
262 // responses through `reasoning_content` for MiniMax-M2.7 / DeepSeek-R1,
263 // and without the fallback we'd return a silent 0-token "Nailed it".
264 let mut reasoning_buf = String::new();
265 // Anthropic extended-thinking blocks (text + signature) accumulated
266 // from `StreamEvent::ThinkingBlock`. Carried into the message via
267 // `finalize_stream_with_tool_calls_and_thinking` so the next
268 // request can echo them back — Anthropic 400s otherwise (`The
269 // content[].thinking in the thinking mode must be passed back`).
270 let mut thinking_blocks: Vec<crate::conversation::message::ThinkingBlock> =
271 Vec::new();
272 let mut total_tokens: usize = 0;
273 // Telemetry: per-turn token counters populated from StreamEvent::Usage.
274 let mut tel_input_tokens: u32 = 0;
275 let mut tel_output_tokens: u32 = 0;
276 let mut tel_cached_tokens: u32 = 0;
277 let mut got_usage = false;
278
279 // Telemetry helper: emit LlmChat (with turn_id/provider/model in scope)
280 // and return the given result. `scope_ctx` is cloned for each emission so
281 // the task-local is properly set when `track` reads `CurrentContext::current()`.
282 macro_rules! tel_return {
283 ($result:expr, $tool_count:expr, $conv:expr) => {{
284 let result = $result;
285 let messages_count = $conv.messages.len() as u32;
286 // system_tokens: estimate from the system prompt string
287 let system_tokens: u32 =
288 crate::conversation::message::Message::new(
289 crate::conversation::message::Role::System,
290 system_prompt,
291 ).estimate_tokens() as u32;
292 // tool_def_tokens: direct measurement from tool definitions sent to the LLM.
293 // Each ToolDef contributes name + description + JSON-serialized parameters.
294 let tool_def_tokens: u32 = tool_defs
295 .iter()
296 .map(|d| {
297 let params_len = d.parameters.to_string().len();
298 // name + description + serialized params, ~4 chars/token, +4 overhead
299 (d.name.len() + d.description.len() + params_len) / 4 + 4
300 })
301 .sum::<usize>() as u32;
302 // tool_result_tokens: sum of estimates for Role::Tool messages in conversation
303 let tool_result_tokens: u32 = $conv
304 .messages
305 .iter()
306 .filter(|m| matches!(m.role, crate::conversation::message::Role::Tool))
307 .map(|m| m.estimate_tokens() as u32)
308 .sum();
309 // message_tokens: sum of estimates for Role::User + Role::Assistant messages
310 let message_tokens: u32 = $conv
311 .messages
312 .iter()
313 .filter(|m| matches!(
314 m.role,
315 crate::conversation::message::Role::User
316 | crate::conversation::message::Role::Assistant
317 ))
318 .map(|m| m.estimate_tokens() as u32)
319 .sum();
320 let (error_kind, error_data) = if result.is_failed() {
321 let reason = match &result {
322 TurnResult::Failed(r) => r.clone(),
323 _ => String::new(),
324 };
325 let kind = classify_llm_error(&reason);
326 let error_data = build_llm_error_data(
327 kind,
328 &reason,
329 turn_started.elapsed().as_millis() as u32,
330 scope_ctx.provider.as_deref(),
331 scope_ctx.provider_host.as_deref(),
332 scope_ctx.model.as_deref(),
333 context_window as u32,
334 system_tokens,
335 tool_def_tokens,
336 tool_result_tokens,
337 message_tokens,
338 messages_count,
339 );
340 (Some(kind), error_data)
341 } else {
342 (None, None)
343 };
344 let event = TelemetryEvent::LlmChat {
345 duration_ms: turn_started.elapsed().as_millis() as u32,
346 tool_calls_count: $tool_count as u32,
347 input_tokens: tel_input_tokens,
348 output_tokens: tel_output_tokens,
349 cached_tokens: tel_cached_tokens,
350 had_error: result.is_failed(),
351 context_window: context_window as u32,
352 system_tokens,
353 tool_def_tokens,
354 tool_result_tokens,
355 message_tokens,
356 messages_count,
357 error_kind,
358 error_data,
359 };
360 let tel = self.context.telemetry.clone();
361 let emit_ctx = scope_ctx.clone();
362 CurrentContext::scope(emit_ctx, || async move {
363 tel.track(event);
364 })
365 .await;
366 return result;
367 }};
368 // Variant for early-exit paths where conversation is not available.
369 ($result:expr, $tool_count:expr) => {
370 tel_return!($result, $tool_count, conversation)
371 };
372 }
373
374 let mut stream = match stream_result {
375 Ok(s) => s,
376 Err(e) => tel_return!(TurnResult::Failed(e.to_string()), 0u32),
377 };
378 let mut got_any_event = false;
379 let mut was_truncated = false;
380 // Hides `<tool_call>...</tool_call>` blocks from UI/conversation while
381 // keeping `text_buf` raw so rescue can still parse them at Done.
382 let mut stream_filter = ToolCallStreamFilter::default();
383
384 // Stream timeouts. Defaults are 300s for both first-token and
385 // subsequent-token waits, since slow domestic model providers
386 // (SiliconFlow, Zhipu GLM, etc.) under thinking mode can take >3min
387 // to emit a single token after a large prompt. Override via env
388 // ATOMCODE_FIRST_TOKEN_TIMEOUT_SECS / ATOMCODE_STREAM_TIMEOUT_SECS
389 // for environments where you want a tighter "real hang" detector.
390 fn timeout_from_env(var: &str, default_secs: u64) -> std::time::Duration {
391 std::env::var(var)
392 .ok()
393 .and_then(|v| v.parse::<u64>().ok())
394 .map(std::time::Duration::from_secs)
395 .unwrap_or_else(|| std::time::Duration::from_secs(default_secs))
396 }
397 let first_token_timeout = timeout_from_env("ATOMCODE_FIRST_TOKEN_TIMEOUT_SECS", 300);
398 let stream_timeout = timeout_from_env("ATOMCODE_STREAM_TIMEOUT_SECS", 300);
399
400 loop {
401 let timeout = if got_any_event {
402 stream_timeout
403 } else {
404 first_token_timeout
405 };
406 tokio::select! {
407 biased;
408
409 _ = cancel.cancelled() => {
410 conversation.finalize_stream();
411 tel_return!(TurnResult::Cancelled, 0u32);
412 }
413
414 _ = tokio::time::sleep(timeout) => {
415 conversation.finalize_stream();
416 tel_return!(TurnResult::Failed(format!(
417 "Stream timeout: no event for {:?}",
418 timeout
419 )), 0u32);
420 }
421
422 event = stream.next() => {
423 match event {
424 Some(Ok(StreamEvent::Delta(text))) => {
425 got_any_event = true;
426 // Strip model-internal tags (DeepSeek </think>`, QwQ, etc.)
427 let text = strip_model_tags(&text);
428 if !text.is_empty() {
429 // Raw goes into rescue source so XML tool_call blocks
430 // can be parsed at Done.
431 text_buf.push_str(&text);
432 // Visible stream excludes <tool_call>...</tool_call>
433 // blocks (Qwen/GLM XML leak suppression).
434 let visible = stream_filter.feed(&text);
435 if !visible.is_empty() {
436 conversation.push_delta(&visible);
437 visible_text_buf.push_str(&visible);
438 let _ = event_tx.send(TurnEvent::TextDelta(visible));
439 }
440 }
441 }
442 Some(Ok(StreamEvent::Reasoning(text))) => {
443 got_any_event = true;
444 // Emit to UI for verbose mode (Ctrl+O) display.
445 // Still accumulate for the fallback case where
446 // content ends up empty.
447 let _ = event_tx.send(TurnEvent::ReasoningDelta(text.clone()));
448 reasoning_buf.push_str(&text);
449 }
450 Some(Ok(StreamEvent::ThinkingBlock { text, signature })) => {
451 got_any_event = true;
452 // Anthropic-only path: store the block (with
453 // its signature) for echo-back. Don't emit a
454 // UI event — the text was already streamed
455 // through ReasoningDelta during the deltas.
456 thinking_blocks.push(
457 crate::conversation::message::ThinkingBlock {
458 text,
459 signature,
460 },
461 );
462 }
463 Some(Ok(StreamEvent::ToolCallStart { id, name })) => {
464 got_any_event = true;
465 // Surface the tool name to UI immediately — otherwise users see
466 // "Generating…" for the entire args-streaming window (can be 30s+
467 // for large write_file calls).
468 let _ = event_tx.send(TurnEvent::ToolCallStreaming { name: name.clone(), hint: String::new() });
469 conversation.tool_call_buffer = Some(ToolCallBuffer {
470 id,
471 name,
472 arguments: String::new(),
473 hint_sent: false,
474 });
475 }
476
477 Some(Ok(StreamEvent::ToolCallDelta(args))) => {
478 got_any_event = true;
479 if let Some(ref mut buf) = conversation.tool_call_buffer {
480 buf.arguments.push_str(&args);
481 // Extract file_path from partial args (once only).
482 if !buf.hint_sent && buf.arguments.len() < 300 {
483 if let Some(hint) = extract_path_hint(&buf.arguments) {
484 buf.hint_sent = true;
485 let _ = event_tx.send(TurnEvent::ToolCallStreaming {
486 name: buf.name.clone(),
487 hint,
488 });
489 }
490 }
491 }
492 }
493
494 Some(Ok(StreamEvent::ToolCallDone(mut call))) => {
495 conversation.tool_call_buffer = None;
496 // Variant E — atomgit gateway occasionally
497 // corrupts `function.name` by spilling argument
498 // attributes into it (e.g.
499 // name='grep" path="..." pattern="..."'). The
500 // `arguments` field is then "{}". Drop the call
501 // entirely so it never enters tool_calls_buf nor
502 // the assistant message — the next stream is a
503 // fresh routing-lottery roll. Surface a one-line
504 // Error event so the user sees what happened.
505 if name_looks_corrupt(&call.name) {
506 let _ = event_tx.send(TurnEvent::Error(format!(
507 "Dropped malformed tool_call (provider returned corrupt function name: {:?})",
508 truncate(&call.name, 60)
509 )));
510 continue;
511 }
512
513 // Variants A/A2/B/C/D — atomgit gateway wraps
514 // tool args in `{"arguments": ...}` envelopes
515 // (string- or object-valued, 1-3 levels deep,
516 // with optional sibling fields). Schema-aware
517 // recovery unwraps to the flat form expected by
518 // the OpenAI tool-call protocol. See
519 // `recover_tool_args` for the variant catalogue.
520 let expected = self.tools.expected_top_keys(&call.name).await;
521 if let Some(recovered) =
522 crate::tool::recover_tool_args(&call.arguments, &expected)
523 {
524 call.arguments = recovered;
525 }
526
527 // ToolCallStarted is intentionally NOT sent here —
528 // it's emitted when the tool actually starts executing,
529 // so tool call and result are paired correctly in the
530 // UI for sequential execution.
531 tool_calls_buf.push(call);
532 }
533
534 Some(Ok(StreamEvent::Usage(usage))) => {
535 total_tokens += usage.completion_tokens;
536 // Telemetry: accumulate per-turn token counters.
537 tel_input_tokens = tel_input_tokens.saturating_add(usage.prompt_tokens as u32);
538 tel_output_tokens = tel_output_tokens.saturating_add(usage.completion_tokens as u32);
539 tel_cached_tokens = tel_cached_tokens.saturating_add(usage.cached_tokens as u32);
540 got_usage = true;
541 let _ = event_tx.send(TurnEvent::TokenUsage {
542 prompt_tokens: usage.prompt_tokens,
543 completion_tokens: usage.completion_tokens,
544 total_tokens: usage.prompt_tokens + usage.completion_tokens,
545 cached_tokens: usage.cached_tokens,
546 });
547 }
548
549 Some(Ok(StreamEvent::Done { truncated: is_truncated })) => {
550 // Flush any holdback from the tool_call filter. If the
551 // stream ended mid-`<tool_call>` block, the filter
552 // discards the partial — preferring a missing close to
553 // a leaked tag.
554 let trailing = stream_filter.flush();
555 if !trailing.is_empty() {
556 conversation.push_delta(&trailing);
557 visible_text_buf.push_str(&trailing);
558 let _ = event_tx.send(TurnEvent::TextDelta(trailing));
559 }
560
561 // Reasoning-only fallback: some gateways route the
562 // entire response through `reasoning_content` for
563 // reasoning models (MiniMax-M2.7, DeepSeek-R1). If
564 // we end up here with empty `content`, empty
565 // tool_calls, but a non-empty reasoning buffer, treat
566 // the reasoning as the answer — otherwise the agent's
567 // empty-response retry loop fires twice, sleeps 4s,
568 // and finally reports a silent "Nailed it · 0 tok".
569 //
570 // Rescue runs before this so real tool-call-in-text
571 // escapes still take priority.
572 let rescued_tools = if tool_calls_buf.is_empty() {
573 let rescued = rescue_text_tool_calls(&text_buf);
574 if !rescued.is_empty() {
575 conversation.clear_stream_buffer();
576 tool_calls_buf.extend(rescued);
577 true
578 } else {
579 false
580 }
581 } else {
582 // Repair path: model split intent across two channels
583 // — function-calling JSON arrived with truncated args
584 // (e.g. only `new_string`, missing `old_string`),
585 // while the text stream carried the complete args as
586 // `<tool_call>` XML. Fill missing keys from the XML
587 // pool so the call doesn't fail at execute() with a
588 // misleading "old_string is required". JSON wins on
589 // conflicts; XML only fills gaps.
590 let xml_pool = rescue_text_tool_calls(&text_buf);
591 if !xml_pool.is_empty() {
592 repair_tool_call_args(&mut tool_calls_buf, &xml_pool);
593 }
594 false
595 };
596
597 if text_buf.trim().is_empty()
598 && tool_calls_buf.is_empty()
599 && !rescued_tools
600 && !is_only_placeholder_filler(&reasoning_buf)
601 {
602 // Skip-promotion guard: when the reasoning
603 // channel carries nothing besides copies of
604 // our own outbound placeholder
605 // (`(no reasoning recorded)`), don't promote
606 // it to the assistant text channel. Some
607 // gateways echo back the placeholder as the
608 // response's reasoning_content; more often
609 // the model mimics the pattern from a
610 // context full of historical placeholder
611 // copies — DeepSeek V4 thinking-mode
612 // requires non-empty reasoning_content on
613 // every historical assistant tool_call
614 // message, so a 17-round session has 17
615 // copies of the placeholder in context, and
616 // the response often comes back as 3+
617 // copies concatenated. `is_only_placeholder_filler`
618 // handles any N (≥1) copies plus
619 // interleaved whitespace. Promoting that
620 // would commit a meaningless string to
621 // history AND present `Responded { text:
622 // "(no reasoning recorded)..." }` to the
623 // agent loop, which then calls
624 // finish_turn(Natural) and the user sees a
625 // silent "Nailed it" mid-task stop
626 // (user-reported on DeepSeek V4 Flash, 17
627 // rounds 20 tools, screenshot showed the
628 // placeholder as the only assistant text
629 // before TurnComplete fired). With the
630 // guard: text_buf stays empty, falls
631 // through to the empty-response Failed
632 // branch below, the agent loop's existing
633 // 3-retry-with-backoff path takes over and
634 // surfaces the issue to the user instead of
635 // burying it as success.
636 let promoted = std::mem::take(&mut reasoning_buf);
637 conversation.push_delta(&promoted);
638 text_buf.push_str(&promoted);
639 // Reasoning channel doesn't carry tool_call XML
640 // (it's a separate stream from delta text), so
641 // promoting it directly to visible_text_buf is
642 // safe — no need to re-feed through stream_filter.
643 visible_text_buf.push_str(&promoted);
644 let _ = event_tx.send(TurnEvent::TextDelta(promoted));
645 }
646
647 // Fallback: if the provider didn't report usage (many
648 // OpenAI-compatible APIs ignore stream_options), estimate
649 // output tokens from the streamed text + tool call args.
650 if !got_usage {
651 let mut output_chars = text_buf.len();
652 for tc in &tool_calls_buf {
653 output_chars += tc.arguments.len();
654 }
655 // Rough heuristic: ~2 chars per token for mixed
656 // Chinese/English, ~4 for pure English. Use 3 as a
657 // middle ground since most users mix both.
658 let estimated = (output_chars / 3).max(1);
659 total_tokens += estimated;
660 let _ = event_tx.send(TurnEvent::TokenUsage {
661 prompt_tokens: 0,
662 completion_tokens: estimated,
663 total_tokens: estimated,
664 cached_tokens: 0,
665 });
666 }
667
668 // Normalize tool calls before they enter history. In
669 // particular, merging same-file edit_file calls after
670 // finalization leaves the assistant message declaring
671 // more tool calls than the ToolResults we later append,
672 // which poisons the next provider request.
673 merge_edit_calls(&mut tool_calls_buf);
674
675 // Finalize conversation state. Pass the accumulated
676 // reasoning_buf so thinking-model providers (Moonshot
677 // Kimi K2-thinking/K2.6, etc.) can echo it back on
678 // the next request — without this the provider 400s
679 // with "reasoning_content is missing in assistant
680 // tool call message". The send-side ReasoningPolicy
681 // (per-provider) decides whether the field actually
682 // reaches the wire.
683 if !tool_calls_buf.is_empty() {
684 let reasoning = if reasoning_buf.trim().is_empty() {
685 None
686 } else {
687 Some(reasoning_buf.as_str())
688 };
689 conversation
690 .finalize_stream_with_tool_calls_and_thinking(
691 &tool_calls_buf,
692 reasoning,
693 std::mem::take(&mut thinking_blocks),
694 );
695 } else {
696 conversation.finalize_stream();
697 }
698 was_truncated = is_truncated;
699 break;
700 }
701
702 Some(Ok(StreamEvent::Error(e))) => {
703 conversation.finalize_stream();
704 tel_return!(TurnResult::Failed(e), 0u32);
705 }
706
707 Some(Ok(StreamEvent::Warning(w))) => {
708 // Advisory only — keep streaming. The
709 // TUI surfaces this to the user so a
710 // truncating proxy is visible at the
711 // moment of the bad request, not three
712 // hours later in the datalog.
713 let _ = event_tx.send(TurnEvent::Warning(w));
714 }
715
716 Some(Err(e)) => {
717 conversation.finalize_stream();
718 tel_return!(TurnResult::Failed(e.to_string()), 0u32);
719 }
720
721 None => {
722 // Stream ended without Done event
723 conversation.finalize_stream();
724 break;
725 }
726 }
727 }
728 }
729 }
730
731 // Log LLM response (text + tool calls) into the same per-project
732 // datalog dir as the request — see comment on the matching
733 // `log_llm_request` call above.
734 let response_duration = stream_start.elapsed().as_millis() as u64;
735 let wd = self
736 .context
737 .working_dir
738 .try_read()
739 .map(|g| g.clone())
740 .unwrap_or_default();
741 let datalog_dir = crate::turn::datalog::DatalogWriter::resolve_log_dir(
742 &wd,
743 self.config.datalog.dir.as_deref(),
744 );
745 super::log::log_llm_response(
746 &datalog_dir,
747 pending_request_log,
748 &text_buf,
749 &tool_calls_buf,
750 &reasoning_buf,
751 self.provider.model_name(),
752 0, // step is set by caller
753 response_duration,
754 self.config.datalog.enabled,
755 );
756
757 if tool_calls_buf.is_empty() && text_buf.trim().is_empty() {
758 tel_return!(
759 TurnResult::Failed(
760 "Provider returned an empty response (no text, no tool calls).".to_string(),
761 ),
762 0u32
763 );
764 }
765
766 // 5. If no tool calls, we're done — LLM produced text only.
767 // Use the FILTERED accumulator so downstream consumers
768 // (datalog `log_text`, ATLAS plan extraction, telemetry)
769 // see clean prose, not raw text_buf with leaked XML
770 // tool_call blocks. Earlier bug: 5-7 atomgr datalog
771 // 20-14-23 Turn 5 logged `### 3. 传输层安全<tool_call>grep
772 // <arg_key>...` because Responded.text was raw.
773 if tool_calls_buf.is_empty() {
774 tel_return!(
775 TurnResult::Responded {
776 text: visible_text_buf,
777 tokens: total_tokens,
778 truncated: was_truncated,
779 },
780 0u32
781 );
782 }
783
784 // 6. Tool calls were normalized before being written into conversation
785 // history. From this point on, execute exactly the calls the provider
786 // will see in the assistant message on the next turn.
787 //
788 // Auto-merge multiple edit_file calls on the same file into one multi-edit.
789 // Models often generate 2+ separate edit_file calls for the same file instead of
790 // using the edits array. Merging at framework level is 100% reliable vs prompt ~50%.
791
792 // ── Layer B: per-turn read budget allocation ──
793 // Count read_file calls in this batch and set per-file token budget.
794 // Formula: 20% of ctx budget / num_reads. This ensures N reads in one
795 // turn share the budget fairly — 1 read gets 20%, 3 reads get 6.7% each.
796 // read.rs Layer A checks file_tokens against this to decide full vs skeleton.
797 {
798 let num_reads = tool_calls_buf
799 .iter()
800 .filter(|c| c.name == "read_file")
801 .count()
802 .max(1); // avoid division by zero
803 let budget = self
804 .context
805 .ctx_budget_hint
806 .load(std::sync::atomic::Ordering::Relaxed);
807 let per_file = budget / (5 * num_reads);
808 self.context.read_budget_tokens.store(
809 per_file.max(2000), // floor: ~170 lines always get full content
810 std::sync::atomic::Ordering::Relaxed,
811 );
812 }
813
814 let tool_count = tool_calls_buf.len();
815 let mut seen_calls: std::collections::HashMap<(String, String), usize> =
816 std::collections::HashMap::new();
817 let mut is_dup: Vec<bool> = vec![false; tool_calls_buf.len()];
818 for (i, call) in tool_calls_buf.iter().enumerate() {
819 // Key on the *canonicalised* argument JSON so that semantically
820 // identical calls with cosmetically different formatting collapse.
821 // Weak/streaming models routinely re-emit the same call with
822 // different whitespace, key order, or escape style:
823 // {"pattern":"foo"} vs {"pattern": "foo"}
824 // {"a":1,"b":2} vs {"b":2,"a":1}
825 // The byte-identical comparison below would treat those as
826 // distinct and let N ghost in-flight rows leak into the UI.
827 // serde_json::to_string with a BTreeMap-backed Value sorts keys
828 // and strips whitespace, so two formattings of the same object
829 // yield the same canonical string. Non-JSON args fall back to
830 // the raw string (no regression for free-form tools).
831 let key = (call.name.clone(), normalize_tool_args(&call.arguments));
832 if seen_calls.contains_key(&key) {
833 is_dup[i] = true;
834 } else {
835 seen_calls.insert(key, i);
836 }
837 }
838
839 // ── ToolBatchStarted: fires when ≥ 2 non-duplicate calls fan
840 // out from one assistant message. Lets the UI render a single
841 // grouped block instead of N independent ▸ rows.
842 // Per-call ToolCallStarted events still fire below for backward
843 // compat (UI dedupes via batch_id membership).
844 let non_dup_count = is_dup.iter().filter(|d| !**d).count();
845 let active_batch_id = if non_dup_count >= 2 {
846 let batch_id = format!("batch_{}", uuid::Uuid::new_v4());
847 let calls: Vec<crate::turn::event::ToolBatchCall> = tool_calls_buf
848 .iter()
849 .zip(is_dup.iter())
850 .filter(|(_, dup)| !**dup)
851 .map(|(c, _)| crate::turn::event::ToolBatchCall {
852 id: c.id.clone(),
853 name: c.name.clone(),
854 arguments: c.arguments.clone(),
855 })
856 .collect();
857 let _ = event_tx.send(TurnEvent::ToolBatchStarted {
858 batch_id: batch_id.clone(),
859 calls,
860 });
861 Some((batch_id, std::time::Instant::now(), non_dup_count))
862 } else {
863 None
864 };
865 let mut batch_ok_count: usize = 0;
866
867 let mut files_edited_this_batch: Vec<String> = Vec::new();
868 for (i, call) in tool_calls_buf.iter().enumerate() {
869 if cancel.is_cancelled() {
870 tel_return!(TurnResult::Cancelled, tool_count);
871 }
872
873 // ── Dup-in-batch: silent skip BEFORE any UI event ──
874 // Some thinking-mode models emit the same tool_call N times in
875 // one assistant message. Dispatching them all wastes execute
876 // cycles, so we replay the first call's result for #2..N. The
877 // model still sees one ToolResult per tool_call (parity
878 // preserved via add_tool_result), but the UI must not render
879 // ghost inflight rows for the duplicates — which it would if
880 // ToolCallStarted fired before the is_dup gate.
881 //
882 // Symptom users saw before this gate moved up: a wall of
883 // identical `Bash(...)` rows for each batch where the model
884 // emitted N copies of the same call (e.g. dead_code grep
885 // session with N variants pasted in by mistake).
886 if is_dup[i] {
887 let result = ToolResult {
888 call_id: call.id.clone(),
889 output: "[Duplicate call — same tool and arguments as an earlier call in this batch. \
890 Result already returned above.]".to_string(),
891 success: true,
892 };
893 conversation.add_tool_result(result);
894 continue;
895 }
896
897 // ── Cross-batch loop guard ──
898 // The in-batch `is_dup` above only catches a model emitting
899 // the same call N times *within one assistant message*. The
900 // 22-identical-`Bash(cargo check)` symptom from weak models
901 // is the orthogonal case: identical (name, args) repeating
902 // across many sequential turns with no progress between.
903 // See `loop_guard.rs` for the false-positive avoidance rules
904 // (output-hash + state-change reset) that make this safe to
905 // gate before execution. Same ghost-row reasoning as is_dup:
906 // blocked attempts must not emit ToolCallStarted, otherwise
907 // the UI renders a spinner row that never receives a result.
908 if let LoopGuardDecision::Block(msg) =
909 self.loop_guard.check(&call.name, &call.arguments)
910 {
911 let result = ToolResult {
912 call_id: call.id.clone(),
913 output: msg,
914 // success=false so the model treats this as a soft
915 // error and is more likely to change strategy.
916 success: false,
917 };
918 conversation.add_tool_result(result);
919 continue;
920 }
921
922 // Send ToolCallStarted event when the tool actually starts executing.
923 // This ensures tool call and result are paired correctly in the UI.
924 let _ = event_tx.send(TurnEvent::ToolCallStarted {
925 id: call.id.clone(),
926 name: call.name.clone(),
927 arguments: call.arguments.clone(),
928 });
929
930 // Enforce tool filter at execution time — LLM may call tools
931 // not in the provided tool_defs (e.g., during diagnosis read-only phase).
932 if let Some(filter) = allowed_tools {
933 if !filter.contains(&call.name.as_str()) {
934 let result = ToolResult {
935 call_id: call.id.clone(),
936 output: format!(
937 "Tool '{}' is not available in this phase. Read the code first, then edit.",
938 call.name
939 ),
940 success: false,
941 };
942 let _ = event_tx.send(TurnEvent::ToolCallResult {
943 call_id: call.id.clone(),
944 name: call.name.clone(),
945 output: result.output.clone(),
946 success: false,
947 duration: std::time::Duration::ZERO,
948 });
949 conversation.add_tool_result(result);
950 continue;
951 }
952 }
953 // Dup-in-batch was already short-circuited above (before the
954 // ToolCallStarted emit), so by the time we reach here this is
955 // a real, non-duplicate call to execute.
956 let result = self.execute_single_tool(call, event_tx, &cancel, &conversation.messages).await;
957 if active_batch_id.is_some() && result.success {
958 batch_ok_count += 1;
959 }
960
961 // Track files edited for read interception (batch + cross-turn)
962 // Use full file path as key to avoid basename collisions
963 // (e.g., api/__init__.py vs schemas/__init__.py).
964 if matches!(call.name.as_str(), "edit_file" | "create_file") && result.success {
965 if let Ok(args) = serde_json::from_str::<serde_json::Value>(&call.arguments) {
966 if let Some(fp) = args.get("file_path").and_then(|v| v.as_str()) {
967 let file_key = fp.to_string();
968 if !files_edited_this_batch.contains(&file_key) {
969 files_edited_this_batch.push(file_key.clone());
970 }
971 if !self.recently_edited_files.contains(&file_key) {
972 self.recently_edited_files.push(file_key);
973 }
974 }
975 }
976 }
977
978 // Record into the cross-batch loop guard. Must run on every
979 // real execution (success OR failure) so the next turn's
980 // check() sees the full history. The guard's own state-
981 // change reset rule lives inside record() — runner doesn't
982 // need to know the tool taxonomy.
983 self.loop_guard
984 .record(&call.name, &call.arguments, &result.output, result.success);
985
986 conversation.add_tool_result(result);
987 }
988
989 // ── ToolBatchCompleted: closes the group started above. UI
990 // uses this to swap the spinner header to a static `· N/M ok ·
991 // Xs wall` summary. Only fires when a batch was actually opened.
992 if let Some((batch_id, started_at, total)) = active_batch_id {
993 let _ = event_tx.send(TurnEvent::ToolBatchCompleted {
994 batch_id,
995 ok: batch_ok_count,
996 total,
997 elapsed_ms: started_at.elapsed().as_millis() as u64,
998 });
999 }
1000
1001 // Truncate oversized tool outputs before returning. Without this,
1002 // a single `ls -la node_modules` / wide `find` dump (multi-MB)
1003 // stays raw in `conversation.messages` and the NEXT LLM call
1004 // blows the upstream context limit. Every caller of TurnRunner
1005 // used to have to remember to invoke this — daemon didn't, which
1006 // was the root of the 738K-token 400 bug. Making runner own it
1007 // removes the implicit contract.
1008 crate::ctx::truncate::post_process_tool_results(
1009 &mut conversation.messages,
1010 tool_count,
1011 "", // fallback only — each result is keyed by its own
1012 // call_id → ATC.tool_name lookup (see ctx::truncate).
1013 context_window,
1014 );
1015
1016 tel_return!(
1017 TurnResult::UsedTools {
1018 // Same filtered-vs-raw split as the Responded arm above.
1019 // text_buf keeps raw for the rescue path; visible_text_buf
1020 // is what should reach downstream consumers.
1021 text: if visible_text_buf.is_empty() {
1022 None
1023 } else {
1024 Some(visible_text_buf)
1025 },
1026 tool_count,
1027 tokens: total_tokens,
1028 },
1029 tool_count
1030 );
1031 }
1032
1033 /// EXECUTE mode: run one LLM turn with minimal context.
1034 /// Reads the target file fresh from disk, sends only the file + instruction,
1035 /// and only exposes edit_file. Used for precise, focused edits.
1036 ///
1037 /// Returns the TurnResult and whether any file was edited.
1038 pub async fn run_execute(
1039 &mut self,
1040 file_path: &str,
1041 instruction: &str,
1042 event_tx: &mpsc::UnboundedSender<TurnEvent>,
1043 cancel: CancellationToken,
1044 ) -> TurnResult {
1045 // 1. Read fresh file content from disk
1046 let file_content = match std::fs::read_to_string(file_path) {
1047 Ok(c) => c,
1048 Err(e) => return TurnResult::Failed(format!("Cannot read {}: {}", file_path, e)),
1049 };
1050
1051 // 2. Build minimal conversation: system + user(file + instruction)
1052 let system_prompt = "You are an execution agent. Your ONLY job: apply the edit instruction to the file below.\n\
1053 RULES:\n\
1054 1. Call edit_file IMMEDIATELY with old_string/new_string. Do NOT explain.\n\
1055 2. Do NOT read_file — the file content is already provided.\n\
1056 3. Do NOT fix other issues — ONLY apply the given instruction.\n\
1057 4. If the instruction is unclear, apply your best interpretation.";
1058
1059 let user_message = format!(
1060 "## Instruction\n{}\n\n## File: {}\n```\n{}\n```",
1061 instruction, file_path, file_content,
1062 );
1063
1064 let mut mini_conv = Conversation::new();
1065 mini_conv.add_user_message(&user_message);
1066
1067 // 3. Only expose edit_file
1068 let execute_tools = &["edit_file"];
1069
1070 // 4. Run the LLM turn with filtered tools
1071 let result = self
1072 .run_with_filter(
1073 &mut mini_conv,
1074 system_prompt,
1075 "",
1076 event_tx,
1077 cancel,
1078 Some(execute_tools),
1079 )
1080 .await;
1081
1082 result
1083 }
1084
1085 /// Execute a single tool call with permission checking.
1086 ///
1087 /// `cancel` is polled while the tool future runs so Ctrl+C interrupts
1088 /// mid-execution — without this, long-running tools (deep `glob`, slow
1089 /// `grep`, network calls) complete before the turn-level cancel check
1090 /// runs on the next iteration, and the user sees an unresponsive UI.
1091 async fn execute_single_tool(
1092 &mut self,
1093 call: &ToolCall,
1094 event_tx: &mpsc::UnboundedSender<TurnEvent>,
1095 cancel: &CancellationToken,
1096 conversation_messages: &[crate::conversation::message::Message],
1097 ) -> ToolResult {
1098 // Auto-fix common tool name aliases (models trained on other agents use different names)
1099 // Case-insensitive matching: models may output "Run", "Bash", "Edit_File", etc.
1100 let name_lower = call.name.to_lowercase();
1101 let corrected_name = match name_lower.as_str() {
1102 "create_file" => "write_file",
1103 "find" | "find_files" => "glob",
1104 "run" | "run_command" | "run_server" | "run_shell" | "run_app" | "execute"
1105 | "shell" | "terminal" => "bash",
1106 "list_files" | "ls" => "list_directory",
1107 "search" => "grep",
1108 _ => "",
1109 };
1110 let corrected_name = if corrected_name.is_empty() {
1111 // No alias match — try case-insensitive lookup in registry
1112 if self.tools.get(&call.name).await.is_some() {
1113 call.name.clone()
1114 } else if let Some(name) = self.tools.iter().await
1115 .find(|(k, _)| k.eq_ignore_ascii_case(&call.name))
1116 .map(|(k, _)| k)
1117 {
1118 name
1119 } else {
1120 call.name.clone()
1121 }
1122 } else {
1123 corrected_name.to_string()
1124 };
1125 // Clone the Arc so the borrow of `self.tools` ends here — we need to
1126 // call `self.detect_call_loop(..)` mutably below.
1127 let tool = match self.tools.get(&corrected_name).await {
1128 Some(t) => t,
1129 None => {
1130 let available: String = self.tools.iter().await
1131 .map(|(name, _)| name)
1132 .collect::<Vec<String>>()
1133 .join(", ");
1134 let hint = match call.name.as_str() {
1135 "create_file" => "\nDid you mean write_file? create_file was renamed to write_file.",
1136 "search" => "\nFor file content search: grep(pattern, path)\nFor web search: web_search(query)",
1137 _ => "",
1138 };
1139 let output = format!(
1140 "Error: unknown tool '{}'. Available tools: {}.{}",
1141 call.name, available, hint
1142 );
1143 let _ = event_tx.send(TurnEvent::ToolCallResult {
1144 call_id: call.id.clone(),
1145 name: call.name.clone(),
1146 output: output.clone(),
1147 success: false,
1148 duration: std::time::Duration::ZERO,
1149 });
1150 self.context.telemetry.track(TelemetryEvent::ToolCall {
1151 name: corrected_name.clone(),
1152 success: false,
1153 duration_ms: 0,
1154 error_kind: Some(ToolErrorKind::NotFound),
1155 error_data: Some(serde_json::json!({
1156 "tool_name": call.name,
1157 "duration_ms": 0,
1158 "original_name": if call.name != corrected_name { Some(call.name.as_str()) } else { None },
1159 "available_tools": available,
1160 "reason": format!("Tool '{}' not found", call.name),
1161 }).to_string()),
1162 });
1163 return ToolResult {
1164 call_id: call.id.clone(),
1165 output,
1166 success: false,
1167 };
1168 }
1169 };
1170
1171 // Repair malformed JSON args before approval and execution.
1172 // Providers sometimes emit truncated / unescaped / fenced JSON (especially
1173 // on max_tokens cutoff mid-arguments). Running the repair chain here means
1174 // tool implementations see valid JSON whenever we can salvage anything,
1175 // and surface deterministic errors when we can't.
1176 let repaired_args = super::json_repair::repair_tool_args(&corrected_name, &call.arguments);
1177
1178 // Use corrected name and repaired args for all subsequent checks
1179 let owned_call;
1180 let call = if corrected_name != call.name.as_str() || repaired_args != call.arguments {
1181 owned_call = ToolCall {
1182 id: call.id.clone(),
1183 name: corrected_name.to_string(),
1184 arguments: repaired_args,
1185 };
1186 &owned_call
1187 } else {
1188 call
1189 };
1190
1191 // Schema gate: bounce malformed args back to the model BEFORE
1192 // approval / execute. Provider stream truncation occasionally
1193 // ships `{]` or `{"file_path":"..."]` (closing bracket wrong,
1194 // required field missing); without this guard, write_file's
1195 // fail-closed approval branch would prompt the user, the user
1196 // would Allow, and execute would then fail with the same parse
1197 // error — a wasted approval round-trip on a known-broken call.
1198 // Runs AFTER `repair_tool_args` (so wrapper-shape / fence / nested
1199 // payloads recover first) but BEFORE approval — the unrecoverable
1200 // remainder is what gets bounced.
1201 if let Err(reason) = tool.validate_args(&call.arguments) {
1202 let msg = format!(
1203 "Error: {}. Re-issue {} with a complete JSON object containing all required fields.",
1204 reason, call.name
1205 );
1206 let _ = event_tx.send(TurnEvent::ToolCallResult {
1207 call_id: call.id.clone(),
1208 name: call.name.clone(),
1209 output: msg.clone(),
1210 success: false,
1211 duration: std::time::Duration::ZERO,
1212 });
1213 self.context.telemetry.track(TelemetryEvent::ToolCall {
1214 name: corrected_name.clone(),
1215 success: false,
1216 duration_ms: 0,
1217 error_kind: Some(ToolErrorKind::InvalidArgs),
1218 error_data: Some(serde_json::json!({
1219 "tool_name": corrected_name,
1220 "reason": reason,
1221 "args_summary": build_args_summary(&corrected_name, &call.arguments),
1222 }).to_string()),
1223 });
1224 return ToolResult {
1225 call_id: call.id.clone(),
1226 output: msg,
1227 success: false,
1228 };
1229 }
1230
1231 // Loop detection moved upstream to `dispatch_tools` (gates BEFORE
1232 // ToolCallStarted is emitted, so blocked attempts don't render
1233 // ghost inflight rows in scrollback). When we reach here the
1234 // call has already cleared that guard exactly once.
1235
1236 // Check permission via the injected PermissionDecider.
1237 // AutoApprove tools execute immediately; RequireApproval tools go through
1238 // the decider which handles interactive prompts or automatic policy.
1239 let approval = tool.approval_with_context(&call.arguments, &self.context);
1240 if let crate::tool::ApprovalRequirement::RequireApproval(ref reason)
1241 | crate::tool::ApprovalRequirement::RequireApprovalAlways(ref reason) = approval
1242 {
1243 // Only emit the ApprovalRequested event (which triggers the
1244 // TUI approval prompt) when the decider actually needs user
1245 // input. If the PermissionStore already has a session grant
1246 // or override (e.g. the user pressed [A] on a prior call of
1247 // the same tool in this batch), `will_auto_approve` returns
1248 // true and we skip the event — the subsequent `decide()` call
1249 // will return Allow without blocking. Without this guard,
1250 // parallel MCP calls show N redundant "Waiting for approval"
1251 // prompts even though all but the first are auto-resolved.
1252 let needs_prompt = !self.permission.will_auto_approve(call, &approval);
1253 if needs_prompt {
1254 // Emit an informational event carrying a snapshot of
1255 // conversation.messages so the TUI can persist mid-turn
1256 // session state (e.g. for `/bg`).
1257 let _ = event_tx.send(TurnEvent::ApprovalRequested {
1258 tool_name: call.name.clone(),
1259 reason: reason.clone(),
1260 call: call.clone(),
1261 messages: conversation_messages.to_vec(),
1262 });
1263 }
1264
1265 let decision = self.permission.decide(call, &approval).await;
1266 if !matches!(decision, PermissionDecision::Allow) {
1267 let output = format!("Tool '{}' was denied by the user.", call.name);
1268 let _ = event_tx.send(TurnEvent::ToolCallResult {
1269 call_id: call.id.clone(),
1270 name: call.name.clone(),
1271 output: output.clone(),
1272 success: false,
1273 duration: std::time::Duration::ZERO,
1274 });
1275 self.context.telemetry.track(TelemetryEvent::ToolCall {
1276 name: corrected_name.clone(),
1277 success: false,
1278 duration_ms: 0,
1279 error_kind: Some(ToolErrorKind::DeniedByUser),
1280 error_data: Some(serde_json::json!({
1281 "tool_name": corrected_name,
1282 "duration_ms": 0,
1283 "args_summary": build_args_summary(&corrected_name, &call.arguments),
1284 "approval_reason": reason,
1285 "reason": "User denied tool execution",
1286 }).to_string()),
1287 });
1288 return ToolResult {
1289 call_id: call.id.clone(),
1290 output,
1291 success: false,
1292 };
1293 }
1294 }
1295
1296 // --- PreToolUse Hook ---
1297 if self.hook_executor.has_hooks() {
1298 let hook_ctx = self.build_hook_context(
1299 "pre_tool_use",
1300 Some(&call.name),
1301 Some(&call.arguments),
1302 None,
1303 None,
1304 );
1305 let pre_result = self.hook_executor.run_pre_tool_use(&call.name, &hook_ctx).await;
1306 match pre_result {
1307 crate::hook::PreHookResult::Block { reason } => {
1308 let output = format!("Blocked by hook: {}", reason);
1309 let _ = event_tx.send(TurnEvent::ToolCallResult {
1310 call_id: call.id.clone(),
1311 name: call.name.clone(),
1312 output: output.clone(),
1313 success: false,
1314 duration: std::time::Duration::ZERO,
1315 });
1316 self.context.telemetry.track(TelemetryEvent::ToolCall {
1317 name: corrected_name.clone(),
1318 success: false,
1319 duration_ms: 0,
1320 error_kind: Some(ToolErrorKind::BlockedByHook),
1321 error_data: Some(serde_json::json!({
1322 "tool_name": corrected_name,
1323 "duration_ms": 0,
1324 "args_summary": build_args_summary(&corrected_name, &call.arguments),
1325 "hook_reason": reason,
1326 "reason": "Tool call blocked by PreToolUse hook",
1327 }).to_string()),
1328 });
1329 return ToolResult {
1330 call_id: call.id.clone(),
1331 output,
1332 success: false,
1333 };
1334 }
1335 crate::hook::PreHookResult::Modify { .. } => {
1336 // Modify support deferred — treat as Allow
1337 }
1338 crate::hook::PreHookResult::Allow => {}
1339 }
1340 }
1341
1342 // Snapshot the shared working directory before executing. Tools like
1343 // `change_dir` and `bash` (when the command starts with `cd`) mutate
1344 // `ctx.working_dir` in place; we compare before/after to emit a
1345 // `WorkingDirChanged` event so the TUI footer can track the cwd
1346 // without polling the `Arc<RwLock<PathBuf>>` every frame.
1347 let wd_before = self.context.working_dir.read().await.clone();
1348
1349 // Set up event sender for real-time tool output streaming
1350 self.context.event_tx = Some(std::sync::Arc::new(event_tx.clone()));
1351 self.context.current_call_id = Some(call.id.clone());
1352
1353 // Execute the tool. Race against `cancel` so Ctrl+C aborts a
1354 // long-running tool future instead of waiting for it to finish.
1355 // Dropping the tool future is safe for read-only tools (glob /
1356 // grep / read_file); mutating tools (write_file / edit_file /
1357 // bash) finish fast enough that interrupting them mid-execution
1358 // is acceptable — user pressed Ctrl+C knowing they want to stop.
1359 let start = Instant::now();
1360 let result = tokio::select! {
1361 r = tool.execute(&call.arguments, &self.context) => r,
1362 _ = cancel.cancelled() => {
1363 // Clean up event sender
1364 self.context.event_tx = None;
1365 self.context.current_call_id = None;
1366
1367 let duration = start.elapsed();
1368 let output = "[Cancelled by user]".to_string();
1369 let _ = event_tx.send(TurnEvent::ToolCallResult {
1370 call_id: call.id.clone(),
1371 name: call.name.clone(),
1372 output: output.clone(),
1373 success: false,
1374 duration,
1375 });
1376 self.context.telemetry.track(TelemetryEvent::ToolCall {
1377 name: corrected_name.clone(),
1378 success: false,
1379 duration_ms: duration.as_millis() as u32,
1380 error_kind: Some(ToolErrorKind::ExecutionFailed),
1381 error_data: Some(serde_json::json!({
1382 "tool_name": corrected_name,
1383 "duration_ms": duration.as_millis() as u32,
1384 "args_summary": build_args_summary(&corrected_name, &call.arguments),
1385 "output_tail": "[Cancelled by user]",
1386 "reason": "Tool execution cancelled by user",
1387 }).to_string()),
1388 });
1389 return ToolResult {
1390 call_id: call.id.clone(),
1391 output,
1392 success: false,
1393 };
1394 }
1395 };
1396
1397 // Clean up event sender after tool execution
1398 self.context.event_tx = None;
1399 self.context.current_call_id = None;
1400
1401 let duration = start.elapsed();
1402
1403 // If the tool mutated the shared working directory, surface it as
1404 // a TurnEvent so the TUI layer can keep its footer in sync. Emit
1405 // before ToolCallResult so consumers that redraw on result see
1406 // the new cwd in the same frame.
1407 let wd_after = self.context.working_dir.read().await.clone();
1408 if wd_after != wd_before {
1409 let _ = event_tx.send(TurnEvent::WorkingDirChanged(wd_after));
1410 }
1411
1412 let tool_result = match result {
1413 Ok(mut r) => {
1414 r.call_id = call.id.clone();
1415 r
1416 }
1417 Err(e) => ToolResult {
1418 call_id: call.id.clone(),
1419 output: format!("Error: {}", e),
1420 success: false,
1421 },
1422 };
1423
1424 // --- PostToolUse Hook ---
1425 if self.hook_executor.has_hooks() {
1426 let hook_ctx = self.build_hook_context(
1427 "post_tool_use",
1428 Some(&call.name),
1429 Some(&call.arguments),
1430 Some(&tool_result.output),
1431 Some(tool_result.success),
1432 );
1433 self.hook_executor.run_post_tool_use(&call.name, &hook_ctx).await;
1434 }
1435
1436 let _ = event_tx.send(TurnEvent::ToolCallResult {
1437 call_id: call.id.clone(),
1438 name: call.name.clone(),
1439 output: tool_result.output.clone(),
1440 success: tool_result.success,
1441 duration,
1442 });
1443
1444 // Emit ToolCall telemetry event for both success and failure.
1445 let output_tail = atomcode_telemetry::scrub::truncate_head(
1446 &atomcode_telemetry::scrub::scrub_path(
1447 &tool_result.output,
1448 None,
1449 Some(&self.context.working_dir.read().await.clone()),
1450 ),
1451 200,
1452 );
1453 // Detect warning: exit 0 (success) but stderr present.
1454 let has_stderr = tool_result.output.contains("STDERR:")
1455 || tool_result.output.contains("[stderr]");
1456 let (error_kind, error_data) = if !tool_result.success {
1457 (Some(ToolErrorKind::ExecutionFailed), Some(serde_json::json!({
1458 "tool_name": corrected_name,
1459 "duration_ms": duration.as_millis() as u32,
1460 "args_summary": build_args_summary(&corrected_name, &call.arguments),
1461 "output_tail": output_tail,
1462 "reason": "Tool execution returned an error",
1463 }).to_string()))
1464 } else if has_stderr {
1465 (Some(ToolErrorKind::Warning), Some(serde_json::json!({
1466 "tool_name": corrected_name,
1467 "duration_ms": duration.as_millis() as u32,
1468 "args_summary": build_args_summary(&corrected_name, &call.arguments),
1469 "output_tail": output_tail,
1470 "reason": "Command succeeded (exit 0) but produced stderr output",
1471 "resolution": "Review stderr for potential issues; the command may not have had the intended effect",
1472 }).to_string()))
1473 } else {
1474 (None, None)
1475 };
1476 self.context.telemetry.track(TelemetryEvent::ToolCall {
1477 name: corrected_name.clone(),
1478 success: tool_result.success,
1479 duration_ms: duration.as_millis() as u32,
1480 error_kind,
1481 error_data,
1482 });
1483
1484 tool_result
1485 }
1486
1487 fn build_hook_context(
1488 &self,
1489 event: &str,
1490 tool_name: Option<&str>,
1491 tool_args: Option<&str>,
1492 tool_result: Option<&str>,
1493 tool_success: Option<bool>,
1494 ) -> crate::hook::HookContext {
1495 let wd = self
1496 .context
1497 .working_dir
1498 .try_read()
1499 .map(|g| g.display().to_string())
1500 .unwrap_or_default();
1501 crate::hook::HookContext {
1502 event: event.into(),
1503 tool_name: tool_name.map(String::from),
1504 tool_args: tool_args.and_then(|a| serde_json::from_str(a).ok()),
1505 tool_result: tool_result.map(String::from),
1506 tool_success,
1507 session_id: String::new(),
1508 working_dir: wd,
1509 }
1510 }
1511}
1512
1513/// Canonicalise a tool-call `arguments` string for in-batch dedup keying.
1514///
1515/// Weak/streaming models routinely re-emit the same call with cosmetically
1516/// different formatting — `{"pattern":"foo"}` vs `{"pattern": "foo"}` vs
1517/// `{"a":1,"b":2}` vs `{"b":2,"a":1}`. Byte-comparison treats them as
1518/// distinct, the in-batch `is_dup` misses, and N ghost ToolCallInFlight
1519/// rows leak into the UI (the symptom from the deepseek-v4-flash
1520/// screenshot: 2 empty `Glob(**/*.rs)` rows + 1 with body).
1521///
1522/// We re-parse and serialise compact. `serde_json::Map` is BTreeMap-backed
1523/// when the `preserve_order` feature is off (it is — see workspace
1524/// Cargo.toml), so object keys come out alphabetically — two re-orderings
1525/// of the same object hash to the same canonical string. Non-JSON args
1526/// (free-form text, garbage from broken streams) round-trip through the
1527/// fallback unchanged so we don't regress free-form tools or accidentally
1528/// merge two genuinely different malformed payloads.
1529/// True iff `reasoning` contains nothing besides one or more copies
1530/// of the outbound placeholder (`REASONING_PLACEHOLDER`) interleaved
1531/// with whitespace — including the all-empty / all-whitespace case.
1532///
1533/// The Done-event skip-promotion guard uses this to detect not just
1534/// the trivial single-copy echo but also the multi-copy mimicry seen
1535/// on DeepSeek V4 thinking-mode (a long session has many historical
1536/// copies of the placeholder in context, and the model regenerates the
1537/// pattern in its own response — observed 3+ copies concatenated in a
1538/// single reasoning_content stream).
1539fn is_only_placeholder_filler(reasoning: &str) -> bool {
1540 reasoning
1541 .replace(crate::provider::REASONING_PLACEHOLDER, "")
1542 .trim()
1543 .is_empty()
1544}
1545
1546fn normalize_tool_args(args: &str) -> String {
1547 match serde_json::from_str::<serde_json::Value>(args) {
1548 Ok(v) => serde_json::to_string(&v).unwrap_or_else(|_| args.to_string()),
1549 Err(_) => args.to_string(),
1550 }
1551}
1552
1553/// Strip model-internal reasoning tags from streaming output.
1554/// Extract a file path hint from partial JSON args (e.g. `{"file_path":"/src/main.rs"`).
1555/// Returns the short filename on success, empty on failure. Only fires once — caller
1556/// should stop calling after the first hit.
1557fn extract_path_hint(partial_json: &str) -> Option<String> {
1558 // Look for "file_path":"..." or "path":"..."
1559 for key in &["file_path", "path"] {
1560 let needle = format!("\"{}\":\"", key);
1561 if let Some(start) = partial_json.find(&needle) {
1562 let val_start = start + needle.len();
1563 let rest = &partial_json[val_start..];
1564 // Find the closing quote (or take what we have so far)
1565 let end = rest.find('"').unwrap_or(rest.len());
1566 let full_path = &rest[..end];
1567 if !full_path.is_empty() {
1568 // Return just the filename or last 2 path components
1569 let short = full_path.rsplit('/').take(2).collect::<Vec<_>>();
1570 let display = short.into_iter().rev().collect::<Vec<_>>().join("/");
1571 return Some(display);
1572 }
1573 }
1574 }
1575 None
1576}
1577
1578/// Detect provider-side corruption of a tool_call's `function.name` field.
1579/// atomgit's gateway sometimes spills attribute syntax into `name`, leaving
1580/// `arguments` as `"{}"` — e.g. `name='grep" path="..." pattern="..."'`.
1581/// Legitimate tool names are short ASCII identifiers (`bash`, `read_file`,
1582/// `mcp__server__tool`), so any whitespace/quote/`=`/`<`/`>` or a length
1583/// far above what we register is a strong corruption signal.
1584fn name_looks_corrupt(name: &str) -> bool {
1585 if name.is_empty() {
1586 return true;
1587 }
1588 if name.len() > 96 {
1589 return true;
1590 }
1591 name.chars().any(|c| c.is_whitespace() || matches!(c, '"' | '=' | '<' | '>'))
1592}
1593
1594fn truncate(s: &str, max: usize) -> String {
1595 if s.chars().count() <= max {
1596 s.to_string()
1597 } else {
1598 let mut out: String = s.chars().take(max).collect();
1599 out.push('…');
1600 out
1601 }
1602}
1603
1604/// DeepSeek uses `<think>...</think>`, QwQ uses similar patterns.
1605/// These should not be shown to the user or stored in conversation.
1606fn strip_model_tags(text: &str) -> String {
1607 let mut result = text.to_string();
1608 while let Some(start) = result.find("<think>") {
1609 if let Some(end) = result.find("</think>") {
1610 let end = end + "</think>".len();
1611 result = format!("{}{}", &result[..start], &result[end..]);
1612 } else {
1613 result = result[..start].to_string();
1614 break;
1615 }
1616 }
1617 result = result.replace("</think>", "");
1618 result = result.replace("<|im_start|>", "").replace("<|im_end|>", "");
1619 result
1620}
1621
1622/// Rescue tool calls embedded as text in the model's response. Three variants:
1623/// 1. `<tool_call>name(json)</tool_call>` — paren+JSON
1624/// 2. `<tool_call>name(k=v, k=v)</tool_call>` — paren+kv (legacy single-line)
1625/// 3. `<tool_call><tool_name>name</tool_name><arg_key>k</arg_key><arg_value>v</arg_value>...</tool_call>`
1626/// — Qwen/GLM XML format (multi-line, args may span newlines)
1627/// Returns rescued ToolCalls, empty vec if nothing found.
1628fn rescue_text_tool_calls(text: &str) -> Vec<ToolCall> {
1629 let mut calls = Vec::new();
1630 let mut remaining = text;
1631
1632 while let Some(start) = remaining.find("<tool_call>") {
1633 let after_tag = &remaining[start + "<tool_call>".len()..];
1634
1635 // Prefer </tool_call> close (XML format spans newlines).
1636 // Fall back to first newline only when no close tag is present
1637 // (legacy single-line format).
1638 let (body, advance) = match after_tag.find("</tool_call>") {
1639 Some(pos) => (&after_tag[..pos], pos + "</tool_call>".len()),
1640 None => {
1641 let pos = after_tag.find('\n').unwrap_or(after_tag.len());
1642 (&after_tag[..pos], pos)
1643 }
1644 };
1645 let body = body.trim();
1646
1647 if let Some((name, args_json)) = parse_xml_tool_call(body) {
1648 let call_id = format!("rescued_{}", calls.len());
1649 calls.push(ToolCall {
1650 id: call_id,
1651 name,
1652 arguments: args_json,
1653 });
1654 } else if let Some(paren) = body.find('(') {
1655 let name = body[..paren].trim();
1656 let args_raw = body[paren + 1..].trim_end_matches(')').trim();
1657
1658 if !name.is_empty() {
1659 let args_json = if args_raw.starts_with('{') {
1660 args_raw.to_string()
1661 } else {
1662 let mut json_parts = Vec::new();
1663 for part in args_raw.split(',') {
1664 let part = part.trim();
1665 if let Some(eq) = part.find('=') {
1666 let k = part[..eq].trim();
1667 let v = part[eq + 1..].trim();
1668 let v_quoted = if v.starts_with('"')
1669 || v.starts_with('{')
1670 || v.starts_with('[')
1671 || v == "true"
1672 || v == "false"
1673 || v.parse::<f64>().is_ok()
1674 {
1675 v.to_string()
1676 } else {
1677 format!("\"{}\"", v.replace('\\', "\\\\").replace('"', "\\\""))
1678 };
1679 json_parts.push(format!("\"{}\":{}", k, v_quoted));
1680 }
1681 }
1682 format!("{{{}}}", json_parts.join(","))
1683 };
1684
1685 let call_id = format!("rescued_{}", calls.len());
1686 calls.push(ToolCall {
1687 id: call_id,
1688 name: name.to_string(),
1689 arguments: args_json,
1690 });
1691 }
1692 }
1693
1694 remaining = &after_tag[advance..];
1695 }
1696
1697 calls
1698}
1699
1700/// Parse Qwen/GLM XML-style tool call body:
1701/// `<tool_name>NAME</tool_name><arg_key>K1</arg_key><arg_value>V1</arg_value>...`
1702/// Returns `(name, args_as_json_object)` or None when the format doesn't match.
1703fn parse_xml_tool_call(body: &str) -> Option<(String, String)> {
1704 let name = extract_between(body, "<tool_name>", "</tool_name>")?
1705 .trim()
1706 .to_string();
1707 if name.is_empty() {
1708 return None;
1709 }
1710
1711 let mut map = serde_json::Map::new();
1712 let mut rest = body;
1713 while let Some(k_start) = rest.find("<arg_key>") {
1714 let k_after = &rest[k_start + "<arg_key>".len()..];
1715 let k_end = k_after.find("</arg_key>")?;
1716 let key = k_after[..k_end].trim().to_string();
1717 if key.is_empty() {
1718 return None;
1719 }
1720 let after_key = &k_after[k_end + "</arg_key>".len()..];
1721 let v_start = after_key.find("<arg_value>")?;
1722 let v_after = &after_key[v_start + "<arg_value>".len()..];
1723 let v_end = v_after.find("</arg_value>")?;
1724 let raw_value = &v_after[..v_end];
1725 map.insert(key, coerce_xml_value(raw_value));
1726 rest = &v_after[v_end + "</arg_value>".len()..];
1727 }
1728
1729 if map.is_empty() {
1730 return None;
1731 }
1732 Some((name, serde_json::Value::Object(map).to_string()))
1733}
1734
1735fn extract_between<'a>(haystack: &'a str, open: &str, close: &str) -> Option<&'a str> {
1736 let s = haystack.find(open)? + open.len();
1737 let e = haystack[s..].find(close)? + s;
1738 Some(&haystack[s..e])
1739}
1740
1741/// Best-effort type inference for `<arg_value>` payloads. Bool/int/float/JSON
1742/// literals get unquoted; everything else stays a string (preserves whitespace).
1743fn coerce_xml_value(raw: &str) -> serde_json::Value {
1744 let trimmed = raw.trim();
1745 if trimmed == "true" {
1746 return serde_json::Value::Bool(true);
1747 }
1748 if trimmed == "false" {
1749 return serde_json::Value::Bool(false);
1750 }
1751 if trimmed == "null" {
1752 return serde_json::Value::Null;
1753 }
1754 if let Ok(n) = trimmed.parse::<i64>() {
1755 return serde_json::Value::from(n);
1756 }
1757 if let Ok(f) = trimmed.parse::<f64>() {
1758 return serde_json::Value::from(f);
1759 }
1760 if (trimmed.starts_with('{') && trimmed.ends_with('}'))
1761 || (trimmed.starts_with('[') && trimmed.ends_with(']'))
1762 {
1763 if let Ok(v) = serde_json::from_str::<serde_json::Value>(trimmed) {
1764 return v;
1765 }
1766 }
1767 // Preserve raw (including leading/trailing whitespace) — the model may have
1768 // intended exact-match strings (e.g. old_string for edit_file).
1769 serde_json::Value::String(raw.to_string())
1770}
1771
1772/// Patch `tool_calls_buf` entries with missing keys borrowed from `xml_pool`
1773/// (parsed by `rescue_text_tool_calls` on the same turn's raw text). Used when
1774/// the model split intent across the function-calling JSON channel and the
1775/// `<tool_call>` XML in the text — the JSON path may arrive with only a subset
1776/// of the arguments, while the XML carries the full set. JSON wins on
1777/// conflicts; XML only fills gaps. Multiple calls of the same name are matched
1778/// to XML blocks of the same name in order of appearance.
1779fn repair_tool_call_args(calls: &mut [ToolCall], xml_pool: &[ToolCall]) {
1780 use std::collections::HashMap;
1781
1782 let mut by_name: HashMap<&str, Vec<&ToolCall>> = HashMap::new();
1783 for x in xml_pool {
1784 by_name.entry(x.name.as_str()).or_default().push(x);
1785 }
1786 let mut consumed: HashMap<&str, usize> = HashMap::new();
1787
1788 for call in calls.iter_mut() {
1789 let Some(group) = by_name.get(call.name.as_str()) else {
1790 continue;
1791 };
1792 let idx = consumed.entry(call.name.as_str()).or_insert(0);
1793 let Some(xml_call) = group.get(*idx) else {
1794 continue;
1795 };
1796 *idx += 1;
1797
1798 let xml_obj = match serde_json::from_str::<serde_json::Value>(&xml_call.arguments) {
1799 Ok(serde_json::Value::Object(o)) => o,
1800 _ => continue,
1801 };
1802
1803 let merged = match serde_json::from_str::<serde_json::Value>(&call.arguments) {
1804 Ok(serde_json::Value::Object(mut j_obj)) => {
1805 let mut patched = false;
1806 for (k, v) in xml_obj {
1807 if !j_obj.contains_key(&k) {
1808 j_obj.insert(k, v);
1809 patched = true;
1810 }
1811 }
1812 if patched {
1813 Some(serde_json::Value::Object(j_obj))
1814 } else {
1815 None
1816 }
1817 }
1818 // JSON args unparseable / non-object → take XML wholesale.
1819 _ => Some(serde_json::Value::Object(xml_obj)),
1820 };
1821 if let Some(v) = merged {
1822 call.arguments = v.to_string();
1823 }
1824 }
1825}
1826
1827/// Streaming filter that hides `<tool_call>...</tool_call>` blocks from the
1828/// visible UI/conversation stream while letting the rescue path see the full
1829/// raw text via a separate buffer. Tags can split across delta chunks, so the
1830/// filter holds back trailing bytes that might be a partial tag.
1831#[derive(Default)]
1832struct ToolCallStreamFilter {
1833 inside: bool,
1834 holdback: String,
1835}
1836
1837impl ToolCallStreamFilter {
1838 const OPEN: &'static str = "<tool_call>";
1839 const CLOSE: &'static str = "</tool_call>";
1840
1841 /// Feed a delta chunk; return what's safe to display now.
1842 fn feed(&mut self, chunk: &str) -> String {
1843 let mut work = std::mem::take(&mut self.holdback);
1844 work.push_str(chunk);
1845 let mut out = String::new();
1846
1847 loop {
1848 if self.inside {
1849 match work.find(Self::CLOSE) {
1850 Some(pos) => {
1851 work = work[pos + Self::CLOSE.len()..].to_string();
1852 self.inside = false;
1853 }
1854 None => {
1855 self.holdback = trail_holdback(&work, Self::CLOSE.len() - 1);
1856 return out;
1857 }
1858 }
1859 } else {
1860 match work.find(Self::OPEN) {
1861 Some(pos) => {
1862 out.push_str(&work[..pos]);
1863 work = work[pos + Self::OPEN.len()..].to_string();
1864 self.inside = true;
1865 }
1866 None => {
1867 let hold = trail_holdback(&work, Self::OPEN.len() - 1);
1868 let visible_len = work.len() - hold.len();
1869 out.push_str(&work[..visible_len]);
1870 self.holdback = hold;
1871 return out;
1872 }
1873 }
1874 }
1875 }
1876 }
1877
1878 /// End-of-stream flush. If we're still inside an unclosed `<tool_call>`,
1879 /// the holdback is dropped (prevents leak); otherwise emit any held tail.
1880 fn flush(&mut self) -> String {
1881 if self.inside {
1882 self.holdback.clear();
1883 String::new()
1884 } else {
1885 std::mem::take(&mut self.holdback)
1886 }
1887 }
1888}
1889
1890/// Take up to `max` trailing bytes from `s`, snapped down to a UTF-8 char
1891/// boundary so the holdback is always a valid `String`.
1892fn trail_holdback(s: &str, max: usize) -> String {
1893 if s.len() <= max {
1894 return s.to_string();
1895 }
1896 let mut split = s.len() - max;
1897 while split < s.len() && !s.is_char_boundary(split) {
1898 split += 1;
1899 }
1900 s[split..].to_string()
1901}
1902
1903/// Merge multiple edit_file calls on the same file into one multi-edit call.
1904/// The model often generates 2+ separate edit_file(file, old, new) for the same file;
1905/// we merge them into one edit_file(file, edits=[...]) before execution and before
1906/// the assistant tool-call message is written into conversation history.
1907/// Returns the ids of calls that were merged away.
1908fn merge_edit_calls(calls: &mut Vec<ToolCall>) -> Vec<String> {
1909 use std::collections::HashMap;
1910
1911 // Group edit_file calls by file_path. Preserve order of first occurrence.
1912 let mut file_groups: HashMap<String, Vec<usize>> = HashMap::new();
1913 let mut file_order: Vec<String> = Vec::new();
1914 for (i, call) in calls.iter().enumerate() {
1915 if call.name != "edit_file" {
1916 continue;
1917 }
1918 let fp = serde_json::from_str::<serde_json::Value>(&call.arguments)
1919 .ok()
1920 .and_then(|a| {
1921 a.get("file_path")
1922 .and_then(|v| v.as_str())
1923 .map(String::from)
1924 });
1925 if let Some(fp) = fp {
1926 let entry = file_groups.entry(fp.clone()).or_default();
1927 if entry.is_empty() {
1928 file_order.push(fp);
1929 }
1930 entry.push(i);
1931 }
1932 }
1933
1934 // Only merge groups with 2+ calls
1935 let merge_targets: Vec<(String, Vec<usize>)> = file_order
1936 .into_iter()
1937 .filter_map(|fp| {
1938 let indices = file_groups.remove(&fp)?;
1939 if indices.len() >= 2 {
1940 Some((fp, indices))
1941 } else {
1942 None
1943 }
1944 })
1945 .collect();
1946
1947 if merge_targets.is_empty() {
1948 return Vec::new();
1949 }
1950
1951 let mut remove_indices: Vec<usize> = Vec::new();
1952 let mut removed_ids: Vec<String> = Vec::new();
1953 for (file_path, indices) in &merge_targets {
1954 // Build edits array from individual calls
1955 let mut edits: Vec<serde_json::Value> = Vec::new();
1956 for &idx in indices {
1957 let args: serde_json::Value =
1958 serde_json::from_str(&calls[idx].arguments).unwrap_or_default();
1959 let mut edit = serde_json::Map::new();
1960 if let Some(v) = args.get("old_string") {
1961 edit.insert("old_string".into(), v.clone());
1962 }
1963 if let Some(v) = args.get("new_string") {
1964 edit.insert("new_string".into(), v.clone());
1965 }
1966 if let Some(v) = args.get("start_line") {
1967 edit.insert("start_line".into(), v.clone());
1968 }
1969 if let Some(v) = args.get("end_line") {
1970 edit.insert("end_line".into(), v.clone());
1971 }
1972 edits.push(serde_json::Value::Object(edit));
1973 }
1974
1975 // Replace first call with merged version, mark rest for removal
1976 let first_idx = indices[0];
1977 let merged_args = serde_json::json!({
1978 "file_path": file_path,
1979 "edits": edits,
1980 });
1981 calls[first_idx].arguments = merged_args.to_string();
1982 for &idx in &indices[1..] {
1983 removed_ids.push(calls[idx].id.clone());
1984 remove_indices.push(idx);
1985 }
1986 }
1987
1988 // Remove merged calls (reverse order to preserve indices)
1989 remove_indices.sort_unstable();
1990 remove_indices.dedup();
1991 for idx in remove_indices.into_iter().rev() {
1992 calls.remove(idx);
1993 }
1994
1995 removed_ids
1996}
1997
1998
1999#[cfg(test)]
2000mod is_only_placeholder_filler_tests {
2001 use super::is_only_placeholder_filler;
2002 use crate::provider::REASONING_PLACEHOLDER;
2003
2004 #[test]
2005 fn empty_and_whitespace_are_filler() {
2006 assert!(is_only_placeholder_filler(""));
2007 assert!(is_only_placeholder_filler(" "));
2008 assert!(is_only_placeholder_filler("\n\t \n"));
2009 }
2010
2011 #[test]
2012 fn single_placeholder_is_filler() {
2013 // The original strict-equality guard already caught this; pin
2014 // it so a refactor doesn't regress.
2015 assert!(is_only_placeholder_filler(REASONING_PLACEHOLDER));
2016 }
2017
2018 #[test]
2019 fn multiple_concatenated_placeholders_are_filler() {
2020 // The bug: DeepSeek V4 Flash 17-round session screenshot
2021 // showed the response's reasoning_content as 3 copies of the
2022 // placeholder concatenated with no separator. The old
2023 // `!= REASONING_PLACEHOLDER` check missed this and promoted
2024 // the meaningless string into the assistant text channel.
2025 let three = REASONING_PLACEHOLDER.repeat(3);
2026 assert!(is_only_placeholder_filler(&three));
2027 let five = REASONING_PLACEHOLDER.repeat(5);
2028 assert!(is_only_placeholder_filler(&five));
2029 }
2030
2031 #[test]
2032 fn placeholders_with_whitespace_are_filler() {
2033 // Some gateways insert chunk delimiters (newlines, spaces)
2034 // between repeated placeholder echoes. Filler regardless.
2035 let mixed = format!("{}\n{} {}", REASONING_PLACEHOLDER, REASONING_PLACEHOLDER, REASONING_PLACEHOLDER);
2036 assert!(is_only_placeholder_filler(&mixed));
2037 }
2038
2039 #[test]
2040 fn real_reasoning_is_not_filler() {
2041 assert!(!is_only_placeholder_filler(
2042 "Let me think about this — first, the user wants..."
2043 ));
2044 }
2045
2046 #[test]
2047 fn placeholder_plus_real_content_is_not_filler() {
2048 // If the model emits the placeholder AND some substantive
2049 // text, we still want promotion — the substantive text is
2050 // the real reasoning we'd want to keep.
2051 let mixed = format!("{} but actually I see now that...", REASONING_PLACEHOLDER);
2052 assert!(!is_only_placeholder_filler(&mixed));
2053 }
2054}
2055
2056#[cfg(test)]
2057mod normalize_tool_args_tests {
2058 use super::normalize_tool_args;
2059
2060 #[test]
2061 fn whitespace_variants_collapse() {
2062 // The deepseek-v4-flash screenshot symptom: same call, different
2063 // whitespace → must dedup.
2064 let a = r#"{"pattern":"**/*.rs"}"#;
2065 let b = r#"{"pattern": "**/*.rs"}"#;
2066 let c = r#"{ "pattern":"**/*.rs" }"#;
2067 let d = r#"{
2068 "pattern": "**/*.rs"
2069}"#;
2070 let na = normalize_tool_args(a);
2071 assert_eq!(normalize_tool_args(b), na);
2072 assert_eq!(normalize_tool_args(c), na);
2073 assert_eq!(normalize_tool_args(d), na);
2074 }
2075
2076 #[test]
2077 fn key_order_collapses() {
2078 // serde_json::Map is BTreeMap-backed (no preserve_order feature),
2079 // so re-serialising sorts keys alphabetically.
2080 let a = r#"{"a":1,"b":2}"#;
2081 let b = r#"{"b":2,"a":1}"#;
2082 assert_eq!(normalize_tool_args(a), normalize_tool_args(b));
2083 }
2084
2085 #[test]
2086 fn nested_objects_normalize_recursively() {
2087 let a = r#"{"outer":{"x":1,"y":2}}"#;
2088 let b = r#"{"outer":{"y":2,"x":1}}"#;
2089 assert_eq!(normalize_tool_args(a), normalize_tool_args(b));
2090 }
2091
2092 #[test]
2093 fn semantically_different_args_stay_different() {
2094 // Don't over-collapse — different values must remain distinct so a
2095 // legitimate batch of `Glob(**/*.rs)` + `Glob(**/*.toml)` doesn't
2096 // dedup.
2097 let a = r#"{"pattern":"**/*.rs"}"#;
2098 let b = r#"{"pattern":"**/*.toml"}"#;
2099 assert_ne!(normalize_tool_args(a), normalize_tool_args(b));
2100 }
2101
2102 #[test]
2103 fn non_json_args_pass_through_unchanged() {
2104 // Free-form / malformed payloads must not panic or merge.
2105 // (Two genuinely different garbage strings must stay distinct so
2106 // we don't accidentally dedup unrelated calls.)
2107 let raw = "not even json {{{";
2108 assert_eq!(normalize_tool_args(raw), raw);
2109 assert_ne!(normalize_tool_args("garbage A"), normalize_tool_args("garbage B"));
2110 }
2111}
2112
2113#[cfg(test)]
2114mod tool_call_text_rescue_tests {
2115 use super::{repair_tool_call_args, rescue_text_tool_calls, ToolCall, ToolCallStreamFilter};
2116
2117 #[test]
2118 fn rescues_qwen_xml_format() {
2119 // Qwen/GLM-5.1 sometimes emits args as <arg_key>/<arg_value> XML pairs
2120 // instead of a JSON blob. Without parsing this format the call gets
2121 // dispatched with empty args and edit_file fails with "old_string is
2122 // required" while the raw XML leaks into the user-visible stream.
2123 let text = r#"Let me make the edit:
2124<tool_call>
2125 <tool_name>edit_file</tool_name>
2126 <arg_key>file_path</arg_key><arg_value>src/main.rs</arg_value>
2127 <arg_key>old_string</arg_key><arg_value> attrs
2128 }
2129 }</arg_value>
2130 <arg_key>new_string</arg_key><arg_value> attrs.push(x);
2131 attrs
2132 }
2133 }</arg_value>
2134 <arg_key>replace_all</arg_key><arg_value>false</arg_value>
2135</tool_call>"#;
2136 let calls = rescue_text_tool_calls(text);
2137 assert_eq!(calls.len(), 1, "single XML block should rescue one call");
2138 assert_eq!(calls[0].name, "edit_file");
2139 let v: serde_json::Value = serde_json::from_str(&calls[0].arguments).unwrap();
2140 assert_eq!(v["file_path"], "src/main.rs");
2141 assert_eq!(v["replace_all"], false);
2142 // Whitespace-sensitive — old_string must round-trip exactly so edit_file
2143 // can find the match in the file.
2144 assert_eq!(v["old_string"], " attrs\n }\n }");
2145 }
2146
2147 #[test]
2148 fn xml_without_tool_name_is_skipped() {
2149 // No <tool_name> means we have no idea what to dispatch — better to
2150 // skip than guess. An XML block with only <arg_key> tags is treated as
2151 // a malformed legacy emit (no `(` either, so the paren branch also
2152 // skips), yielding zero calls.
2153 let text = r#"<tool_call>
2154 <arg_key>file_path</arg_key><arg_value>x.rs</arg_value>
2155</tool_call>"#;
2156 let calls = rescue_text_tool_calls(text);
2157 assert!(calls.is_empty());
2158 }
2159
2160 #[test]
2161 fn legacy_paren_json_format_still_works() {
2162 // Don't regress the existing rescue path used by GLM-5 via OpenRouter.
2163 let text = r#"<tool_call>read_file({"file_path":"a.rs"})</tool_call>"#;
2164 let calls = rescue_text_tool_calls(text);
2165 assert_eq!(calls.len(), 1);
2166 assert_eq!(calls[0].name, "read_file");
2167 let v: serde_json::Value = serde_json::from_str(&calls[0].arguments).unwrap();
2168 assert_eq!(v["file_path"], "a.rs");
2169 }
2170
2171 #[test]
2172 fn legacy_paren_kv_format_still_works() {
2173 let text = r#"<tool_call>read_file(file_path=a.rs, offset=10)</tool_call>"#;
2174 let calls = rescue_text_tool_calls(text);
2175 assert_eq!(calls.len(), 1);
2176 assert_eq!(calls[0].name, "read_file");
2177 let v: serde_json::Value = serde_json::from_str(&calls[0].arguments).unwrap();
2178 assert_eq!(v["file_path"], "a.rs");
2179 assert_eq!(v["offset"], 10);
2180 }
2181
2182 #[test]
2183 fn xml_coerces_bool_int_float() {
2184 let text = r#"<tool_call>
2185 <tool_name>cfg</tool_name>
2186 <arg_key>flag</arg_key><arg_value>true</arg_value>
2187 <arg_key>n</arg_key><arg_value>42</arg_value>
2188 <arg_key>f</arg_key><arg_value>3.14</arg_value>
2189 <arg_key>s</arg_key><arg_value>hello</arg_value>
2190</tool_call>"#;
2191 let calls = rescue_text_tool_calls(text);
2192 let v: serde_json::Value = serde_json::from_str(&calls[0].arguments).unwrap();
2193 assert_eq!(v["flag"], true);
2194 assert_eq!(v["n"], 42);
2195 assert!((v["f"].as_f64().unwrap() - 3.14).abs() < 1e-9);
2196 assert_eq!(v["s"], "hello");
2197 }
2198
2199 #[test]
2200 fn stream_filter_passes_plain_text() {
2201 let mut f = ToolCallStreamFilter::default();
2202 let out = f.feed("hello world");
2203 // Holdback may keep up to 10 bytes in case "<tool_call" is starting,
2204 // so flush to get the full output.
2205 let tail = f.flush();
2206 assert_eq!(format!("{}{}", out, tail), "hello world");
2207 }
2208
2209 /// Regression for 5-7 atomgr datalog (build dd425fd, 20-14-23 Turn 5):
2210 /// GLM-5.1 emitted prose then mid-sentence switched to XML tool_call:
2211 /// `### 3. 传输层安全<tool_call>grep<arg_key>pattern</arg_key>...
2212 /// </tool_call>`. The stream_filter caught it for streamed deltas /
2213 /// conversation history, but `TurnResult::Responded.text` used raw
2214 /// `text_buf` → `datalog::log_text` printed the XML in `**Response:**`.
2215 ///
2216 /// Fix: parallel `visible_text_buf` mirrors what the filter actually
2217 /// emitted; `Responded.text` and `UsedTools.text` use it instead of
2218 /// raw text_buf. This test pins the visible-side behavior for the
2219 /// exact Turn 5 input shape.
2220 #[test]
2221 fn glm_xml_leak_mid_prose_strips_to_clean_visible_text() {
2222 let mut f = ToolCallStreamFilter::default();
2223 let mut visible = String::new();
2224
2225 // Replay the actual Turn 5 chunking shape: prose, then XML
2226 // tool_call split across multiple deltas (provider chunks at
2227 // arbitrary boundaries — the filter must hold back across them).
2228 for chunk in [
2229 "### 3. 传输层安全",
2230 "<tool_call>grep",
2231 "<arg_key>pattern</arg_key>",
2232 "<arg_value>http://</arg_value>",
2233 "<arg_key>path</arg_key>",
2234 "<arg_value>/Users/y/project</arg_value>",
2235 "</tool_call>",
2236 ] {
2237 visible.push_str(&f.feed(chunk));
2238 }
2239 visible.push_str(&f.flush());
2240
2241 assert!(
2242 !visible.contains("<tool_call>"),
2243 "visible accumulator must strip <tool_call> open tag: {:?}",
2244 visible
2245 );
2246 assert!(
2247 !visible.contains("</tool_call>"),
2248 "visible accumulator must strip </tool_call> close tag: {:?}",
2249 visible
2250 );
2251 assert!(
2252 !visible.contains("<arg_key>") && !visible.contains("<arg_value>"),
2253 "visible accumulator must strip XML inner tags: {:?}",
2254 visible
2255 );
2256 assert_eq!(
2257 visible, "### 3. 传输层安全",
2258 "only the pre-tool prose should reach Responded.text"
2259 );
2260 }
2261
2262 #[test]
2263 fn stream_filter_strips_complete_block_in_one_chunk() {
2264 let mut f = ToolCallStreamFilter::default();
2265 let out = f.feed("before <tool_call>edit_file({})</tool_call> after");
2266 let tail = f.flush();
2267 let combined = format!("{}{}", out, tail);
2268 assert!(combined.contains("before "));
2269 assert!(combined.contains(" after"));
2270 assert!(!combined.contains("<tool_call>"));
2271 assert!(!combined.contains("</tool_call>"));
2272 assert!(!combined.contains("edit_file"));
2273 }
2274
2275 #[test]
2276 fn stream_filter_strips_block_split_across_chunks() {
2277 // Realistic case: provider streams bytes that split the open tag
2278 // arbitrarily. The filter must hold back partial-tag bytes, not emit
2279 // them, and resume cleanly when the close arrives.
2280 let mut f = ToolCallStreamFilter::default();
2281 let mut visible = String::new();
2282 for chunk in [
2283 "before <tool_",
2284 "call><tool_name>edit_file</tool_name>",
2285 "<arg_key>k</arg_key><arg_value>v</arg_value>",
2286 "</tool_call> after",
2287 ] {
2288 visible.push_str(&f.feed(chunk));
2289 }
2290 visible.push_str(&f.flush());
2291 assert_eq!(visible, "before after");
2292 }
2293
2294 #[test]
2295 fn stream_filter_drops_unclosed_block() {
2296 // If the stream ends mid-`<tool_call>` (truncation, error), discard
2297 // the holdback rather than leaking the open fragment to the user.
2298 let mut f = ToolCallStreamFilter::default();
2299 let out = f.feed("text <tool_call>edit_file({});");
2300 let tail = f.flush();
2301 let combined = format!("{}{}", out, tail);
2302 assert_eq!(combined, "text ");
2303 }
2304
2305 #[test]
2306 fn stream_filter_handles_partial_open_at_chunk_end() {
2307 // The filter must not emit `<` or `<t` etc. as visible text just
2308 // because the chunk happened to end mid-tag.
2309 let mut f = ToolCallStreamFilter::default();
2310 let v1 = f.feed("hello <");
2311 // Could be holdback; not guaranteed any specific output yet.
2312 let v2 = f.feed("tool_call>x</tool_call>!");
2313 let tail = f.flush();
2314 let combined = format!("{}{}{}", v1, v2, tail);
2315 assert_eq!(combined, "hello !");
2316 }
2317
2318 #[test]
2319 fn stream_filter_passes_through_lt_that_isnt_tool_call() {
2320 // A bare `<` followed by non-tool_call content should eventually flush.
2321 let mut f = ToolCallStreamFilter::default();
2322 let mut visible = String::new();
2323 visible.push_str(&f.feed("a < b "));
2324 visible.push_str(&f.feed("and c <"));
2325 visible.push_str(&f.feed("d>e"));
2326 visible.push_str(&f.flush());
2327 assert_eq!(visible, "a < b and c <d>e");
2328 }
2329
2330 fn tc(id: &str, name: &str, args: &str) -> ToolCall {
2331 ToolCall {
2332 id: id.into(),
2333 name: name.into(),
2334 arguments: args.into(),
2335 }
2336 }
2337
2338 #[test]
2339 fn repair_fills_missing_old_string_from_xml() {
2340 // Reproduces the user-reported bug: the function-calling JSON channel
2341 // delivered `{file_path, new_string, replace_all}` (passes
2342 // validate_args because new_string is present) but missing
2343 // `old_string` — execute() then fails with "old_string is required".
2344 // The full args were carried as XML in the text stream. After repair,
2345 // the call has all four keys.
2346 let mut calls = vec![tc(
2347 "c1",
2348 "edit_file",
2349 r#"{"file_path":"x.rs","new_string":"new","replace_all":false}"#,
2350 )];
2351 let xml_pool = vec![tc(
2352 "rescued_0",
2353 "edit_file",
2354 r#"{"file_path":"x.rs","old_string":"old","new_string":"new","replace_all":false}"#,
2355 )];
2356 repair_tool_call_args(&mut calls, &xml_pool);
2357 let merged: serde_json::Value = serde_json::from_str(&calls[0].arguments).unwrap();
2358 assert_eq!(merged["file_path"], "x.rs");
2359 assert_eq!(merged["old_string"], "old");
2360 assert_eq!(merged["new_string"], "new");
2361 assert_eq!(merged["replace_all"], false);
2362 }
2363
2364 #[test]
2365 fn repair_does_not_overwrite_keys_present_in_json() {
2366 // Conflict policy: function-calling JSON is the source of truth; XML
2367 // only fills gaps. If the JSON channel and XML disagree on a key
2368 // (e.g. different new_string), keep JSON.
2369 let mut calls = vec![tc(
2370 "c1",
2371 "edit_file",
2372 r#"{"file_path":"x.rs","new_string":"json_wins"}"#,
2373 )];
2374 let xml_pool = vec![tc(
2375 "rescued_0",
2376 "edit_file",
2377 r#"{"file_path":"x.rs","new_string":"xml_loses","old_string":"old"}"#,
2378 )];
2379 repair_tool_call_args(&mut calls, &xml_pool);
2380 let merged: serde_json::Value = serde_json::from_str(&calls[0].arguments).unwrap();
2381 assert_eq!(merged["new_string"], "json_wins");
2382 assert_eq!(merged["old_string"], "old");
2383 }
2384
2385 #[test]
2386 fn repair_skips_when_names_dont_match() {
2387 // Repair must never cross tool boundaries — patching read_file with
2388 // edit_file's args would dispatch a malformed call.
2389 let original = r#"{"file_path":"x.rs"}"#;
2390 let mut calls = vec![tc("c1", "read_file", original)];
2391 let xml_pool = vec![tc(
2392 "rescued_0",
2393 "edit_file",
2394 r#"{"file_path":"x.rs","old_string":"a","new_string":"b"}"#,
2395 )];
2396 repair_tool_call_args(&mut calls, &xml_pool);
2397 assert_eq!(calls[0].arguments, original);
2398 }
2399
2400 #[test]
2401 fn repair_takes_xml_wholesale_when_json_unparseable() {
2402 // Truncated/garbled args from the JSON channel would otherwise
2403 // fail to parse later anyway. Replace with the XML object.
2404 let mut calls = vec![tc("c1", "edit_file", r#"{"file_path": "trunc"#)];
2405 let xml_pool = vec![tc(
2406 "rescued_0",
2407 "edit_file",
2408 r#"{"file_path":"x.rs","old_string":"a","new_string":"b"}"#,
2409 )];
2410 repair_tool_call_args(&mut calls, &xml_pool);
2411 let merged: serde_json::Value = serde_json::from_str(&calls[0].arguments).unwrap();
2412 assert_eq!(merged["file_path"], "x.rs");
2413 assert_eq!(merged["old_string"], "a");
2414 }
2415
2416 #[test]
2417 fn repair_matches_multiple_same_name_calls_in_order() {
2418 // Two edit_file calls in the same turn, two XML blocks — match by
2419 // order so the second JSON call gets the second XML's args, not the
2420 // first one's reused.
2421 let mut calls = vec![
2422 tc("c1", "edit_file", r#"{"file_path":"a.rs","new_string":"a_new"}"#),
2423 tc("c2", "edit_file", r#"{"file_path":"b.rs","new_string":"b_new"}"#),
2424 ];
2425 let xml_pool = vec![
2426 tc(
2427 "rescued_0",
2428 "edit_file",
2429 r#"{"file_path":"a.rs","old_string":"a_old","new_string":"a_new"}"#,
2430 ),
2431 tc(
2432 "rescued_1",
2433 "edit_file",
2434 r#"{"file_path":"b.rs","old_string":"b_old","new_string":"b_new"}"#,
2435 ),
2436 ];
2437 repair_tool_call_args(&mut calls, &xml_pool);
2438 let m1: serde_json::Value = serde_json::from_str(&calls[0].arguments).unwrap();
2439 let m2: serde_json::Value = serde_json::from_str(&calls[1].arguments).unwrap();
2440 assert_eq!(m1["old_string"], "a_old");
2441 assert_eq!(m2["old_string"], "b_old");
2442 }
2443
2444 #[test]
2445 fn repair_no_op_when_json_already_complete() {
2446 // If the JSON channel got everything right, repair is silent —
2447 // serialization-level identity isn't guaranteed (key order may
2448 // change), but semantic equality holds.
2449 let mut calls = vec![tc(
2450 "c1",
2451 "edit_file",
2452 r#"{"file_path":"x.rs","old_string":"a","new_string":"b","replace_all":false}"#,
2453 )];
2454 let xml_pool = vec![tc(
2455 "rescued_0",
2456 "edit_file",
2457 r#"{"file_path":"x.rs","old_string":"a"}"#,
2458 )];
2459 let before: serde_json::Value = serde_json::from_str(&calls[0].arguments).unwrap();
2460 repair_tool_call_args(&mut calls, &xml_pool);
2461 let after: serde_json::Value = serde_json::from_str(&calls[0].arguments).unwrap();
2462 assert_eq!(before, after);
2463 }
2464
2465 #[test]
2466 fn repair_skips_unparseable_xml() {
2467 // If the XML pool has a bogus entry (e.g. arguments that aren't a
2468 // JSON object), repair must skip it without crashing or polluting
2469 // the JSON call.
2470 let original = r#"{"file_path":"x.rs"}"#;
2471 let mut calls = vec![tc("c1", "edit_file", original)];
2472 let xml_pool = vec![tc("rescued_0", "edit_file", "not even json")];
2473 repair_tool_call_args(&mut calls, &xml_pool);
2474 assert_eq!(calls[0].arguments, original);
2475 }
2476
2477 #[test]
2478 fn stream_filter_handles_utf8_at_holdback_boundary() {
2479 // UTF-8 multi-byte chars must not get split across the holdback
2480 // boundary — the trail snap rounds up to a char boundary.
2481 let mut f = ToolCallStreamFilter::default();
2482 let mut visible = String::new();
2483 visible.push_str(&f.feed("中文 hello "));
2484 visible.push_str(&f.feed("世界"));
2485 visible.push_str(&f.flush());
2486 assert_eq!(visible, "中文 hello 世界");
2487 }
2488}
2489
2490/// Build a structured `error_data` JSON for LLM errors, following the
2491/// telemetry design doc (section 3.5 — `llm_chat` event).
2492///
2493/// Extracts `status_code` from the raw error string (patterns like "401",
2494/// "403", "429", "500", "502", "503") and scrubs the message via
2495/// `scrub::scrub_path` + `scrub::truncate_head(_, 200)`.
2496pub(crate) fn build_llm_error_data(
2497 kind: LlmErrorKind,
2498 reason: &str,
2499 duration_ms: u32,
2500 provider: Option<&str>,
2501 provider_host: Option<&str>,
2502 model: Option<&str>,
2503 context_window: u32,
2504 system_tokens: u32,
2505 tool_def_tokens: u32,
2506 tool_result_tokens: u32,
2507 message_tokens: u32,
2508 messages_count: u32,
2509) -> Option<String> {
2510 use atomcode_telemetry::scrub;
2511
2512 // ── Extract status code from the raw error string ──────────────
2513 let status_code: Option<u16> = extract_status_code(reason);
2514
2515 // ── Build a concise, scrubbed error message ───────────────────
2516 // Strip the raw JSON body that some providers append after a colon.
2517 let home = std::env::var("HOME").ok().map(|h| std::path::PathBuf::from(h));
2518 let cwd = std::env::var("PWD").ok().map(|c| std::path::PathBuf::from(c));
2519 let message_raw = scrub::scrub_path(
2520 reason,
2521 home.as_deref(),
2522 cwd.as_deref(),
2523 );
2524 let message = scrub::truncate_head(&message_raw, 200);
2525
2526 let base = || -> serde_json::Value {
2527 let mut m = serde_json::Map::new();
2528 m.insert("duration_ms".into(), serde_json::json!(duration_ms));
2529 if let Some(p) = provider {
2530 m.insert("provider".into(), serde_json::json!(p));
2531 }
2532 if let Some(h) = provider_host {
2533 m.insert("provider_host".into(), serde_json::json!(h));
2534 }
2535 if let Some(mdl) = model {
2536 m.insert("model".into(), serde_json::json!(mdl));
2537 }
2538 serde_json::Value::Object(m)
2539 };
2540
2541 let map = match kind {
2542 LlmErrorKind::AuthError => {
2543 let mut m = base();
2544 let obj = m.as_object_mut().unwrap();
2545 if let Some(sc) = status_code {
2546 obj.insert("status_code".into(), serde_json::json!(sc));
2547 }
2548 obj.insert("message".into(), serde_json::json!(message));
2549 m
2550 }
2551 LlmErrorKind::RateLimited => {
2552 let mut m = base();
2553 let obj = m.as_object_mut().unwrap();
2554 if let Some(sc) = status_code {
2555 obj.insert("status_code".into(), serde_json::json!(sc));
2556 }
2557 obj.insert("message".into(), serde_json::json!(message));
2558 // retry_after_secs: could be parsed from Retry-After header,
2559 // but we don't have that info here. Leave as null.
2560 obj.insert("retry_after_secs".into(), serde_json::Value::Null);
2561 m
2562 }
2563 LlmErrorKind::ServerError => {
2564 let mut m = base();
2565 let obj = m.as_object_mut().unwrap();
2566 if let Some(sc) = status_code {
2567 obj.insert("status_code".into(), serde_json::json!(sc));
2568 }
2569 obj.insert("message".into(), serde_json::json!(message));
2570 m
2571 }
2572 LlmErrorKind::NetworkError => {
2573 let mut m = base();
2574 let obj = m.as_object_mut().unwrap();
2575 obj.insert("message".into(), serde_json::json!(message));
2576 obj.insert("attempt_duration_ms".into(), serde_json::json!(duration_ms));
2577 obj.insert("is_retry".into(), serde_json::json!(false));
2578 m
2579 }
2580 LlmErrorKind::StreamTimeout => {
2581 let mut m = base();
2582 let obj = m.as_object_mut().unwrap();
2583 obj.insert("timeout_secs".into(), serde_json::json!(duration_ms / 1000));
2584 // Phase heuristic: if no tokens were received → "first_token",
2585 // otherwise "subsequent". We don't have per-event token counts
2586 // at this layer, so default to "first_token".
2587 obj.insert("phase".into(), serde_json::json!("first_token"));
2588 obj.insert("tokens_received".into(), serde_json::json!(0));
2589 m
2590 }
2591 LlmErrorKind::StreamInterrupted => {
2592 let mut m = base();
2593 let obj = m.as_object_mut().unwrap();
2594 obj.insert("message".into(), serde_json::json!(message));
2595 obj.insert("bytes_received".into(), serde_json::Value::Null);
2596 obj.insert("tokens_received".into(), serde_json::Value::Null);
2597 obj.insert("finish_reason".into(), serde_json::Value::Null);
2598 m
2599 }
2600 LlmErrorKind::ContextOverflow => {
2601 let mut m = base();
2602 let obj = m.as_object_mut().unwrap();
2603 let sent_tokens = system_tokens
2604 .saturating_add(tool_def_tokens)
2605 .saturating_add(tool_result_tokens)
2606 .saturating_add(message_tokens);
2607 obj.insert("context_window".into(), serde_json::json!(context_window));
2608 obj.insert("sent_tokens".into(), serde_json::json!(sent_tokens));
2609 obj.insert("system_tokens".into(), serde_json::json!(system_tokens));
2610 obj.insert("tool_def_tokens".into(), serde_json::json!(tool_def_tokens));
2611 obj.insert("tool_result_tokens".into(), serde_json::json!(tool_result_tokens));
2612 obj.insert("message_tokens".into(), serde_json::json!(message_tokens));
2613 obj.insert("messages_count".into(), serde_json::json!(messages_count));
2614 m
2615 }
2616 LlmErrorKind::Other => {
2617 let mut m = base();
2618 let obj = m.as_object_mut().unwrap();
2619 obj.insert("message".into(), serde_json::json!(message));
2620 m
2621 }
2622 };
2623
2624 Some(map.to_string())
2625}
2626
2627/// Extract an HTTP status code from a raw error string.
2628/// Looks for patterns like "401", "403", "429", "500", "502", "503"
2629/// that appear as standalone numbers (not part of a larger number).
2630fn extract_status_code(reason: &str) -> Option<u16> {
2631 // Common HTTP error status codes to look for
2632 let codes = [401u16, 403, 429, 500, 502, 503];
2633 let lower = reason.to_lowercase();
2634 for code in codes {
2635 // Check if the code appears as a standalone number
2636 // Match patterns like "401", "(401)", "error 401", "HTTP 401"
2637 let code_str = code.to_string();
2638 if lower.contains(&code_str) {
2639 return Some(code);
2640 }
2641 }
2642 None
2643}
2644
2645/// Classify an LLM error reason string into a telemetry `LlmErrorKind`.
2646pub(crate) fn classify_llm_error(reason: &str) -> LlmErrorKind {
2647 let r = reason.to_lowercase();
2648 if r.contains("401") || r.contains("403") || r.contains("unauthorized") || r.contains("auth") {
2649 LlmErrorKind::AuthError
2650 } else if r.contains("429") || r.contains("rate") || r.contains("throttl") {
2651 LlmErrorKind::RateLimited
2652 } else if r.contains("500") || r.contains("502") || r.contains("503") {
2653 LlmErrorKind::ServerError
2654 } else if r.contains("stream timeout") || r.contains("no event for") {
2655 LlmErrorKind::StreamTimeout
2656 } else if r.contains("decode") || r.contains("mid-flight") || r.contains("terminated") {
2657 LlmErrorKind::StreamInterrupted
2658 } else if r.contains("context") || r.contains("max_tokens") || r.contains("token limit") {
2659 LlmErrorKind::ContextOverflow
2660 } else if r.contains("connect") || r.contains("dns") || r.contains("network") || r.contains("timeout") {
2661 LlmErrorKind::NetworkError
2662 } else {
2663 LlmErrorKind::Other
2664 }
2665}
2666
2667/// Build a concise summary of tool call arguments for telemetry.
2668/// Extracts top-level JSON keys and truncates values to avoid leaking sensitive data.
2669pub(crate) fn build_args_summary(tool_name: &str, args: &str) -> String {
2670 if let Ok(v) = serde_json::from_str::<serde_json::Value>(args) {
2671 if let Some(obj) = v.as_object() {
2672 let pairs: Vec<String> = obj
2673 .iter()
2674 .map(|(k, v)| {
2675 let val_str = match v {
2676 serde_json::Value::String(s) => {
2677 atomcode_telemetry::scrub::truncate_head(s, 50)
2678 }
2679 serde_json::Value::Number(n) => n.to_string(),
2680 serde_json::Value::Bool(b) => b.to_string(),
2681 serde_json::Value::Null => "null".to_string(),
2682 _ => format!("<{}>", match v {
2683 serde_json::Value::Array(_) => "array",
2684 serde_json::Value::Object(_) => "object",
2685 _ => "value",
2686 }),
2687 };
2688 format!("{}={}", k, val_str)
2689 })
2690 .collect();
2691 return format!("{}({})", tool_name, pairs.join(", "));
2692 }
2693 }
2694 // Fallback: truncate raw args
2695 format!("{}({})", tool_name, atomcode_telemetry::scrub::truncate_head(args, 100))
2696}