Skip to main content

koda_core/
inference.rs

1//! LLM inference loop with streaming, tool execution, and sub-agent delegation.
2//!
3//! Runs the streaming inference → tool execution → re-inference loop
4//! until the LLM produces a final text response.
5//!
6//! ## Loop flow
7//!
8//! ```text
9//! User message
10//!   → Build messages array (history + system prompt)
11//!   → Stream response from provider
12//!   → If tool calls:
13//!       → Normalize tool names (handle model quirks)
14//!       → Check approval (auto/confirm based on effect)
15//!       → Execute tools (parallel when safe)
16//!       → Append results to conversation
17//!       → Loop (re-inference with tool results)
18//!   → If text response:
19//!       → Done — return to REPL
20//! ```
21//!
22//! ## Key behaviors
23//!
24//! - **Streaming**: tokens are emitted as they arrive via `EngineSink`
25//! - **Loop guard**: detects repeated identical tool calls and prompts the user
26//! - **Auto-compact**: triggers compaction when context usage exceeds threshold
27//! - **Microcompact**: ages old tool results between turns
28//! - **Sub-agents**: `InvokeAgent` calls spawn a nested inference loop
29//! - **Cancellation**: `Ctrl+C` cancels the current inference gracefully
30//!
31//! ## Design (DESIGN.md)
32//!
33//! - **Let the model drive (P3)**: The engine is a mechanical loop. It does
34//!   not plan, verify, or make decisions — the model does. This loop streams
35//!   the response, dispatches tool calls, and feeds results back.
36//! - **Rate Limit Retry (P2)**: Exponential backoff for 429 errors. Long
37//!   sessions with Opus hit rate limits regularly.
38
39use crate::config::KodaConfig;
40use crate::db::{Database, Role};
41use crate::engine::{EngineCommand, EngineEvent, EngineSink};
42use crate::file_tracker::FileTracker;
43use crate::inference_helpers::{
44    AUTO_COMPACT_THRESHOLD, CONTEXT_WARN_THRESHOLD, RATE_LIMIT_MAX_RETRIES, assemble_messages,
45    estimate_tokens, is_context_overflow_error, is_image_rejection_error, is_rate_limit_error,
46    is_server_error, rate_limit_backoff,
47};
48use crate::loop_guard::{LoopAction, LoopDetector};
49use crate::persistence::Persistence;
50use crate::providers::{
51    ChatMessage, ImageData, LlmProvider, StreamChunk, TokenUsage, ToolCall, ToolDefinition,
52    stream_collector::SseCollector,
53};
54use crate::skill_scope::SkillToolScope;
55use crate::tool_dispatch::{
56    can_parallelize, execute_tools_parallel, execute_tools_sequential, execute_tools_split_batch,
57};
58use crate::tools::ToolRegistry;
59use crate::trust::TrustMode;
60
61use anyhow::{Context, Result};
62use std::path::Path;
63use std::time::Instant;
64use tokio::sync::mpsc;
65use tokio_util::sync::CancellationToken;
66
67// ---------------------------------------------------------------------------
68// Inference loop helpers (tightly coupled to inference_loop — live here)
69// ---------------------------------------------------------------------------
70
71/// Per-iteration immutable context shared across inference helpers.
72///
73/// Bundles the parameters that `assemble_context`, `preflight_compact_if_needed`,
74/// and `try_overflow_recovery` all share. Built at the top of each loop iteration
75/// (since `system_message` and `iteration` change per turn).
76struct TurnState<'a> {
77    db: &'a Database,
78    session_id: &'a str,
79    system_message: &'a ChatMessage,
80    pending_images: Option<&'a [ImageData]>,
81    iteration: u32,
82    config: &'a KodaConfig,
83    provider: &'a dyn LlmProvider,
84    tool_defs: &'a [ToolDefinition],
85    sink: &'a dyn EngineSink,
86    cancel: &'a CancellationToken,
87}
88
89/// Result of collecting a streamed LLM response.
90struct StreamResult {
91    /// Accumulated text content from the response.
92    text: String,
93    /// All thinking/reasoning content produced during the stream, in order.
94    ///
95    /// Empty string when the model produced no thinking blocks (non-Claude
96    /// providers, or Claude with thinking disabled). Persisted to the DB so
97    /// it can be re-rendered on session resume.
98    thinking_content: String,
99    /// Tool calls requested by the model.
100    tool_calls: Vec<ToolCall>,
101    /// Results from tools executed eagerly during streaming.
102    ///
103    /// Contains `(tool_call_id, output, success, full_output)` for each
104    /// read-only auto-approved tool that finished before the stream ended.
105    /// These tools are skipped during normal dispatch.
106    eager_results: Vec<(String, String, bool, Option<String>)>,
107    /// Token usage statistics.
108    usage: TokenUsage,
109    /// Total character count of text deltas.
110    char_count: usize,
111    /// Whether the stream was interrupted by user cancellation (Ctrl+C).
112    interrupted: bool,
113    /// Whether the stream ended due to a network error.
114    ///
115    /// When `true` the partial response MUST be discarded — it is incomplete
116    /// and storing it would corrupt the session history on resume.
117    network_error: Option<String>,
118}
119
120/// Load conversation history, assemble messages with the system prompt,
121/// attach pending images (first iteration only), and update context tracking.
122///
123/// This is the single source of truth for context assembly — called on initial
124/// build, after pre-flight compaction, and after overflow recovery.
125async fn assemble_context(turn: &TurnState<'_>) -> Result<Vec<ChatMessage>> {
126    let history = turn.db.load_context(turn.session_id).await?;
127
128    // Run per-tool context analysis for smarter compaction decisions.
129    // Logged at debug level; will be surfaced in `/usage` and used by
130    // microcompact (#636 P1) once that lands.
131    let analysis = crate::context_analysis::analyze_context(&history);
132    if analysis.total > 0 {
133        tracing::debug!(
134            "Context analysis: {} total, {}% tool results, {}% duplicate reads",
135            analysis.total,
136            analysis.tool_result_percent(),
137            analysis.duplicate_read_percent(),
138        );
139        for (tool, tokens) in analysis.top_tool_results(3) {
140            tracing::debug!("  {tool}: ~{tokens} tokens");
141        }
142    }
143
144    let mut messages = assemble_messages(turn.system_message, &history);
145
146    // Attach pending images to the last user message (first iteration only)
147    if turn.iteration == 0
148        && let Some(imgs) = turn.pending_images
149        && !imgs.is_empty()
150        && let Some(last_user) = messages.iter_mut().rev().find(|m| m.role == "user")
151    {
152        last_user.images = Some(imgs.to_vec());
153    }
154
155    let context_used = estimate_tokens(&messages);
156    crate::context::update(context_used, turn.config.max_context_tokens);
157    turn.sink.emit(EngineEvent::ContextUsage {
158        used: context_used,
159        max: turn.config.max_context_tokens,
160    });
161
162    // Warn users when approaching the context limit (headless mode silently
163    // drops ContextUsage events, so this Warn is the only signal they get).
164    let ctx_pct = crate::context::percentage();
165    if (CONTEXT_WARN_THRESHOLD..AUTO_COMPACT_THRESHOLD).contains(&ctx_pct) {
166        // Include analysis hints so the user knows *why* context is high.
167        let mut warning = format!("Context at {ctx_pct}% — approaching limit.");
168        let top = analysis.top_tool_results(2);
169        if !top.is_empty() {
170            let hogs: Vec<String> = top
171                .iter()
172                .map(|(name, tokens)| format!("{name} (~{tokens} tok)"))
173                .collect();
174            warning.push_str(&format!(" Top consumers: {}.", hogs.join(", ")));
175        }
176        let waste = analysis.total_duplicate_waste();
177        if waste > 500 {
178            warning.push_str(&format!(" ~{waste} tokens wasted on duplicate file reads."));
179        }
180        warning.push_str(" Run /compact to free up space.");
181        turn.sink.emit(EngineEvent::Warn { message: warning });
182    }
183
184    Ok(messages)
185}
186
187/// Pre-flight budget check: if context usage exceeds the threshold, compact
188/// before sending to the provider. Re-assembles context after successful compaction.
189///
190/// Returns the (possibly updated) message vec.
191async fn preflight_compact_if_needed(
192    turn: &TurnState<'_>,
193    messages: Vec<ChatMessage>,
194) -> Result<Vec<ChatMessage>> {
195    let ctx_pct = crate::context::percentage();
196    if ctx_pct < AUTO_COMPACT_THRESHOLD {
197        return Ok(messages);
198    }
199
200    // Circuit breaker: stop wasting API calls after repeated failures
201    if crate::compact::is_compact_circuit_broken() {
202        tracing::warn!("Pre-flight: context at {ctx_pct}% but circuit breaker tripped — skipping");
203        return Ok(messages);
204    }
205
206    tracing::warn!("Pre-flight: context at {ctx_pct}%, attempting auto-compact");
207    turn.sink.emit(EngineEvent::Info {
208        message: format!("\u{1f4e6} Context at {ctx_pct}% \u{2014} compacting before sending..."),
209    });
210
211    match crate::compact::compact_session_with_provider(
212        turn.db,
213        turn.session_id,
214        turn.config.max_context_tokens,
215        &turn.config.model_settings,
216        turn.provider,
217    )
218    .await
219    {
220        Ok(Ok(result)) => {
221            turn.sink.emit(EngineEvent::Info {
222                message: format!(
223                    "\u{2705} Compacted {} messages (~{} token summary)",
224                    result.deleted, result.summary_tokens
225                ),
226            });
227            assemble_context(turn).await
228        }
229        Ok(Err(skip)) => {
230            tracing::info!("Pre-flight compact skipped: {skip:?}");
231            if matches!(skip, crate::compact::CompactSkip::HistoryTooLarge) {
232                crate::compact::record_compact_failure();
233                turn.sink.emit(EngineEvent::Warn {
234                    message: "\u{26a0}\u{fe0f} Context is full but history is too large for \
235                              this model to summarize. Start a new session (/session) or \
236                              switch to a model with a larger context window."
237                        .to_string(),
238                });
239            }
240            Ok(messages)
241        }
242        Err(e) => {
243            tracing::warn!("Pre-flight compact failed: {e:#}");
244            let tripped = crate::compact::record_compact_failure();
245            let suffix = if tripped {
246                " Auto-compact disabled after repeated failures."
247            } else {
248                " Continuing anyway..."
249            };
250            turn.sink.emit(EngineEvent::Warn {
251                message: format!("Compact failed: {e:#}.{suffix}"),
252            });
253            Ok(messages)
254        }
255    }
256}
257
258/// Attempt to start a chat stream with exponential backoff on rate limits.
259///
260/// Returns `Ok(Some(rx))` on success, `Ok(None)` if cancelled during retries,
261/// or `Err` for non-retriable failures.
262async fn try_with_rate_limit(
263    provider: &dyn LlmProvider,
264    messages: &[ChatMessage],
265    tool_defs: &[ToolDefinition],
266    model_settings: &crate::config::ModelSettings,
267    cancel: &CancellationToken,
268    sink: &dyn EngineSink,
269) -> Result<Option<SseCollector>> {
270    let mut last_err = None;
271    for attempt in 0..RATE_LIMIT_MAX_RETRIES {
272        let result = tokio::select! {
273            result = provider.chat_stream(messages, tool_defs, model_settings) => result,
274            _ = cancel.cancelled() => return Ok(None),
275        };
276        match result {
277            Ok(collector) => return Ok(Some(collector)),
278            Err(e) if is_rate_limit_error(&e) && attempt + 1 < RATE_LIMIT_MAX_RETRIES => {
279                let delay = rate_limit_backoff(attempt);
280                sink.emit(EngineEvent::SpinnerStop);
281                sink.emit(EngineEvent::Warn {
282                    message: format!("\u{23f3} Rate limited. Retrying in {}s...", delay.as_secs()),
283                });
284                tracing::warn!(
285                    "Rate limit (attempt {}/{}): {e:#}",
286                    attempt + 1,
287                    RATE_LIMIT_MAX_RETRIES
288                );
289                tokio::time::sleep(delay).await;
290                sink.emit(EngineEvent::SpinnerStart {
291                    message: format!("Retrying (attempt {})...", attempt + 2),
292                });
293                last_err = Some(e);
294            }
295            Err(e) => return Err(e),
296        }
297    }
298    Err(last_err.unwrap_or_else(|| anyhow::anyhow!("Rate limit retries exhausted")))
299}
300
301/// Recover from a context overflow error: compact the session, re-assemble
302/// context, and retry the provider call once.
303///
304/// Returns `Ok(Some((rx, messages)))` on success (receiver + updated messages),
305/// `Ok(None)` if cancelled during retry, or `Err` if compaction/retry fails.
306async fn try_overflow_recovery(
307    turn: &TurnState<'_>,
308    original_err: anyhow::Error,
309) -> Result<Option<(SseCollector, Vec<ChatMessage>)>> {
310    turn.sink.emit(EngineEvent::SpinnerStop);
311    turn.sink.emit(EngineEvent::Warn {
312        message: "\u{26a0}\u{fe0f} Provider rejected request (context overflow). \
313             Compacting and retrying..."
314            .to_string(),
315    });
316    tracing::warn!("Context overflow from provider: {original_err:#}");
317
318    match crate::compact::compact_session_with_provider(
319        turn.db,
320        turn.session_id,
321        turn.config.max_context_tokens,
322        &turn.config.model_settings,
323        turn.provider,
324    )
325    .await
326    {
327        Ok(Ok(result)) => {
328            turn.sink.emit(EngineEvent::Info {
329                message: format!(
330                    "\u{2705} Compacted {} messages. Retrying...",
331                    result.deleted
332                ),
333            });
334        }
335        _ => {
336            return Err(original_err)
337                .context("LLM inference failed (context overflow, compaction unsuccessful)");
338        }
339    }
340
341    let messages = assemble_context(turn).await?;
342
343    turn.sink.emit(EngineEvent::SpinnerStart {
344        message: "Retrying...".into(),
345    });
346    let collector = tokio::select! {
347        result = turn.provider.chat_stream(&messages, turn.tool_defs, &turn.config.model_settings) => {
348            result.context("LLM inference failed after compaction retry")?
349        }
350        _ = turn.cancel.cancelled() => return Ok(None),
351    };
352    Ok(Some((collector, messages)))
353}
354
355/// Collect a streamed LLM response, executing read-only tools eagerly.
356///
357/// When a `ToolCallReady` event arrives (Anthropic `content_block_stop`),
358/// and the tool is read-only + auto-approved, it executes immediately while
359/// subsequent tool call arguments are still being streamed. This overlaps
360/// tool execution with LLM generation time — the key latency optimization
361/// from Claude Code's `StreamingToolExecutor` pattern.
362///
363/// Handles thinking → response state transitions, cancellation via `CancellationToken`,
364/// and spinner lifecycle. Returns a `StreamResult` — the caller is responsible for
365/// persistence and early-return on interruption.
366async fn collect_stream(
367    rx: &mut mpsc::Receiver<StreamChunk>,
368    sink: &dyn EngineSink,
369    cancel: &CancellationToken,
370    tools: &ToolRegistry,
371    mode: TrustMode,
372    project_root: &Path,
373) -> StreamResult {
374    let mut full_text = String::new();
375    let mut tool_calls: Vec<ToolCall> = Vec::new();
376    let mut eager_results: Vec<(String, String, bool, Option<String>)> = Vec::new();
377    let mut usage = TokenUsage::default();
378    let mut first_token = true;
379    let mut char_count: usize = 0;
380    // Permanent accumulator — never cleared, flows into StreamResult.
381    let mut thinking_content = String::new();
382    // True while we are inside a thinking block (between ThinkingStart and ThinkingDone).
383    let mut in_thinking_block = false;
384    let mut response_banner_shown = false;
385    let mut thinking_banner_shown = false;
386    let mut interrupted = false;
387
388    loop {
389        let chunk = tokio::select! {
390            c = rx.recv() => c,
391            _ = cancel.cancelled() => {
392                interrupted = true;
393                None
394            }
395        };
396
397        if interrupted || cancel.is_cancelled() {
398            sink.emit(EngineEvent::SpinnerStop);
399            if !full_text.is_empty() {
400                sink.emit(EngineEvent::TextDone);
401            }
402            sink.emit(EngineEvent::Warn {
403                message: "Interrupted".into(),
404            });
405            return StreamResult {
406                text: full_text,
407                thinking_content,
408                tool_calls,
409                eager_results,
410                usage,
411                char_count,
412                interrupted: true,
413                network_error: None,
414            };
415        }
416
417        let Some(chunk) = chunk else { break };
418
419        match chunk {
420            StreamChunk::TextDelta(delta) => {
421                if first_token {
422                    if in_thinking_block {
423                        sink.emit(EngineEvent::SpinnerStop);
424                        sink.emit(EngineEvent::ThinkingDone);
425                        in_thinking_block = false;
426                        thinking_banner_shown = true;
427                    }
428                    sink.emit(EngineEvent::SpinnerStop);
429                    first_token = false;
430                }
431
432                if !response_banner_shown && !delta.trim().is_empty() {
433                    sink.emit(EngineEvent::ResponseStart);
434                    response_banner_shown = true;
435                }
436
437                full_text.push_str(&delta);
438                char_count += delta.len();
439                sink.emit(EngineEvent::TextDelta {
440                    text: delta.clone(),
441                });
442            }
443            StreamChunk::ThinkingDelta(delta) => {
444                if !thinking_banner_shown {
445                    sink.emit(EngineEvent::SpinnerStop);
446                    sink.emit(EngineEvent::ThinkingStart);
447                    thinking_banner_shown = true;
448                }
449                in_thinking_block = true;
450                sink.emit(EngineEvent::ThinkingDelta {
451                    text: delta.clone(),
452                });
453                thinking_content.push_str(&delta);
454            }
455            StreamChunk::ToolCallReady(tc) => {
456                // A single tool call finished streaming (Anthropic content_block_stop).
457                // If it's read-only and auto-approved, execute it now while
458                // subsequent tool calls are still being streamed.
459                if in_thinking_block {
460                    sink.emit(EngineEvent::SpinnerStop);
461                    sink.emit(EngineEvent::ThinkingDone);
462                    in_thinking_block = false;
463                }
464                let args: serde_json::Value =
465                    serde_json::from_str(&tc.arguments).unwrap_or_default();
466                let is_read_only = !crate::tools::is_mutating_tool(&tc.function_name);
467                let is_auto_approved = !matches!(
468                    crate::trust::check_tool(&tc.function_name, &args, mode, Some(project_root),),
469                    crate::trust::ToolApproval::NeedsConfirmation
470                        | crate::trust::ToolApproval::Blocked
471                );
472
473                if is_read_only && is_auto_approved && tc.function_name != "InvokeAgent" {
474                    // Execute eagerly — read-only tools are fast (10–50ms),
475                    // the channel buffers incoming chunks while we run.
476                    tracing::debug!("Eager dispatch: {} (id={})", tc.function_name, tc.id);
477                    // Eager dispatch only fires for read-only tools (filtered
478                    // above). Bash is never read-only, so `caller_spawner=None`
479                    // is correct here — there is no Bash path to mis-tag.
480                    let r = tools
481                        .execute(&tc.function_name, &tc.arguments, None, None)
482                        .await;
483                    eager_results.push((tc.id.clone(), r.output, r.success, r.full_output));
484                }
485                // Always add to tool_calls for persistence and normal flow
486                tool_calls.push(tc);
487            }
488            StreamChunk::ToolCalls(tcs) => {
489                if in_thinking_block {
490                    sink.emit(EngineEvent::SpinnerStop);
491                    sink.emit(EngineEvent::ThinkingDone);
492                    in_thinking_block = false;
493                }
494                sink.emit(EngineEvent::SpinnerStop);
495                // Append — some tool calls may already be in the list from ToolCallReady
496                tool_calls.extend(tcs);
497            }
498            StreamChunk::Done(u) => {
499                if in_thinking_block {
500                    sink.emit(EngineEvent::SpinnerStop);
501                    sink.emit(EngineEvent::ThinkingDone);
502                    // `in_thinking_block` not cleared — loop breaks immediately.
503                }
504                usage = u;
505                break;
506            }
507            StreamChunk::NetworkError(err) => {
508                // Connection dropped mid-stream. Stop rendering and surface a
509                // warning. The partial response will be discarded by the caller.
510                sink.emit(EngineEvent::SpinnerStop);
511                if !full_text.is_empty() {
512                    sink.emit(EngineEvent::TextDone);
513                }
514                sink.emit(EngineEvent::Warn {
515                    message: format!("Connection lost mid-stream — turn discarded ({err})"),
516                });
517                return StreamResult {
518                    text: full_text,
519                    thinking_content,
520                    tool_calls,
521                    eager_results,
522                    usage,
523                    char_count,
524                    interrupted: false,
525                    network_error: Some(err),
526                };
527            }
528        }
529    }
530
531    sink.emit(EngineEvent::TextDone);
532
533    if first_token {
534        sink.emit(EngineEvent::SpinnerStop);
535    }
536
537    StreamResult {
538        text: full_text,
539        thinking_content,
540        tool_calls,
541        eager_results,
542        usage,
543        char_count,
544        interrupted: false,
545        network_error: None,
546    }
547}
548
549// ---------------------------------------------------------------------------
550// Inference loop
551// ---------------------------------------------------------------------------
552
553/// All parameters for the inference loop, bundled into a single struct.
554pub struct InferenceContext<'a> {
555    /// Project root directory.
556    pub project_root: &'a Path,
557    /// Global configuration.
558    pub config: &'a KodaConfig,
559    /// Database handle for message persistence.
560    pub db: &'a Database,
561    /// Current session identifier.
562    pub session_id: &'a str,
563    /// System prompt for this session.
564    pub system_prompt: &'a str,
565    /// LLM provider to use.
566    pub provider: &'a dyn LlmProvider,
567    /// Tool registry with all available tools.
568    pub tools: &'a ToolRegistry,
569    /// Pre-computed tool definitions sent to the LLM.
570    pub tool_defs: &'a [ToolDefinition],
571    /// Images attached to the current prompt (consumed on first turn).
572    pub pending_images: Option<Vec<ImageData>>,
573    /// Current trust mode.
574    pub mode: TrustMode,
575    /// Event sink for streaming output to the client.
576    pub sink: &'a dyn EngineSink,
577    /// Cancellation token for graceful interruption.
578    pub cancel: CancellationToken,
579    /// Channel for receiving client commands (approval responses, etc.).
580    pub cmd_rx: &'a mut mpsc::Receiver<EngineCommand>,
581    /// File lifecycle tracker for ownership-aware approval (#465).
582    pub file_tracker: &'a mut FileTracker,
583    /// Background sub-agent registry (#1022 B12). Owned by [`crate::session::KodaSession`]
584    /// so bg agents survive across turns; this is just a borrow into
585    /// the loop. Drained at the top of every iteration to inject
586    /// completed bg results into the conversation.
587    pub bg_agents: &'a std::sync::Arc<crate::bg_agent::BgAgentRegistry>,
588    /// Cross-turn sub-agent result cache (#1022 B12). Owned by [`crate::session::KodaSession`].
589    /// Generation-based invalidation on mutating tool calls is
590    /// honored uniformly via `crate::tool_dispatch::execute_one_tool`.
591    pub sub_agent_cache: &'a crate::sub_agent_cache::SubAgentCache,
592}
593
594/// Run the inference loop: send messages, stream responses, dispatch tool calls.
595#[tracing::instrument(skip_all, fields(session_id = %ctx.session_id, agent = %ctx.config.agent_name))]
596pub async fn inference_loop(ctx: InferenceContext<'_>) -> Result<()> {
597    let InferenceContext {
598        project_root,
599        config,
600        db,
601        session_id,
602        system_prompt,
603        provider,
604        tools,
605        tool_defs,
606        pending_images,
607        mode,
608        sink,
609        cancel,
610        cmd_rx,
611        file_tracker,
612        bg_agents,
613        sub_agent_cache,
614    } = ctx;
615
616    // Hard cap is configurable per-agent; user can extend it interactively.
617    let mut hard_cap = config.max_iterations;
618    let mut iteration = 0u32;
619    let mut made_tool_calls = false;
620    let mut retried_empty = false;
621    let mut loop_detector = LoopDetector::new();
622    // #1022 B12: `bg_agents` and `sub_agent_cache` now live on
623    // `KodaSession` and are borrowed in via `InferenceContext`. Local
624    // construction here was the root cause of the cross-turn drop bug:
625    // when this function returned, the `Arc<BgAgentRegistry>` dropped,
626    // every still-pending bg task was aborted by `AbortOnDropHandle`,
627    // and the result was lost. Owning on the session means the
628    // registry survives across turns and the next call drains anything
629    // that completed during the idle gap.
630    let mut skill_scope = SkillToolScope::new();
631    let mut total_prompt_tokens: i64 = 0;
632    let mut total_completion_tokens: i64 = 0;
633    let mut total_cache_read_tokens: i64 = 0;
634    let mut total_thinking_tokens: i64 = 0;
635    let mut total_char_count: usize = 0;
636    let loop_start = Instant::now();
637
638    // Pre-build the base system message (avoids re-cloning 4-8KB per iteration)
639    let base_system_prompt = system_prompt.to_string();
640
641    // Microcompact: clear old tool results before the first LLM call.
642    // Time-based trigger — only fires when the idle gap since the last
643    // assistant message exceeds the threshold (not during active tool use).
644    if let Ok(Some(mc)) = crate::microcompact::microcompact_session(db, session_id).await {
645        sink.emit(EngineEvent::Info {
646            message: format!(
647                "\u{1f9f9} Microcompact: cleared {} old tool results (~{} tokens)",
648                mc.cleared, mc.tokens_saved,
649            ),
650        });
651    }
652
653    loop {
654        // #1076: forward queued bg-task status transitions to the sink
655        // before processing anything else. Drained from the same
656        // `Arc<BgAgentRegistry>` that owns `drain_completed`, so the
657        // ordering is: status transitions first (so clients see
658        // `Running { iter: N }` heartbeats), then completed-result
659        // injection (so the model gets the final output as a user
660        // message). FIFO inside the queue preserves transition
661        // order across batches.
662        for ev in bg_agents.drain_status_events() {
663            sink.emit(ev);
664        }
665
666        // Inject completed background agent results as user messages
667        for bg_result in bg_agents.drain_completed() {
668            let status = if bg_result.success {
669                "completed"
670            } else {
671                "failed"
672            };
673            let injection = format!(
674                "[Background agent '{}' {status}]\n\
675                 Original task: {}\n\
676                 Result:\n{}",
677                bg_result.agent_name, bg_result.prompt, bg_result.output
678            );
679            // #1022 B9: surface the bg agent's narrative trace so the
680            // user can see what it did, not just that it finished.
681            // Pre-fix this was a single-line "✅ X completed" with
682            // *no visibility* into intermediate tool calls (bg agents
683            // ran with NullSink). Now the trace is appended as
684            // bullet-formatted lines under the completion header.
685            // Trace goes to the *user* via Info; the model sees the
686            // result via the injected user message above (which is
687            // intentionally trace-free — the model already chose
688            // those tool calls and doesn't need to re-read its own
689            // history).
690            let mut msg = format!(
691                "  \u{2705} Background agent '{}' {status}",
692                bg_result.agent_name
693            );
694            if !bg_result.events.is_empty() {
695                msg.push('\n');
696                msg.push_str(&bg_result.events.join("\n"));
697            }
698            sink.emit(EngineEvent::Info { message: msg });
699            db.insert_message(session_id, &Role::User, Some(&injection), None, None, None)
700                .await?;
701        }
702
703        // Drain any `QueueNext` inputs the client sent during the previous
704        // iteration ("mid-turn steer" lane).  We non-blockingly drain the
705        // whole channel, collect texts, and batch-insert one user message
706        // before re-querying the provider.
707        //
708        // Other command types (ApprovalResponse, AskUserResponse, LoopDecision)
709        // are only ever sent while the engine is actively blocking on cmd_rx
710        // inside their respective select! arms, so they will not be present
711        // here at iteration start.  Any unexpected commands are logged and
712        // discarded — they are benign at this position.
713        {
714            let mut next_texts: Vec<String> = Vec::new();
715            while let Ok(cmd) = cmd_rx.try_recv() {
716                match cmd {
717                    EngineCommand::QueueNext { text } => next_texts.push(text),
718                    other => {
719                        tracing::warn!(
720                            "inference_loop: unexpected command at iteration start (discarded): {:?}",
721                            std::mem::discriminant(&other)
722                        );
723                    }
724                }
725            }
726            if !next_texts.is_empty() {
727                let combined = next_texts.join("\n\n");
728                sink.emit(EngineEvent::Info {
729                    message: format!(
730                        "  \u{1f4e5} Injecting {} steer{} into current turn",
731                        next_texts.len(),
732                        if next_texts.len() == 1 { "" } else { "s" },
733                    ),
734                });
735                db.insert_message(session_id, &Role::User, Some(&combined), None, None, None)
736                    .await?;
737            }
738        }
739
740        if iteration >= hard_cap {
741            let recent = loop_detector.recent_names();
742            sink.emit(EngineEvent::LoopCapReached {
743                cap: hard_cap,
744                recent_tools: recent,
745            });
746
747            // Wait for client decision via EngineCommand::LoopDecision
748            let extra = loop {
749                tokio::select! {
750                    cmd = cmd_rx.recv() => match cmd {
751                        Some(EngineCommand::LoopDecision { action }) => {
752                            break action.extra_iterations();
753                        }
754                        Some(EngineCommand::Interrupt) => {
755                            cancel.cancel();
756                            break 0;
757                        }
758                        None => break 0,
759                        _ => continue,
760                    },
761                    _ = cancel.cancelled() => break 0,
762                }
763            };
764
765            if extra == 0 {
766                break Ok(());
767            }
768            hard_cap += extra;
769        }
770
771        // Build system prompt with git context.
772        //
773        // (#1077 Phase B) Progress and todo sections are no longer
774        // injected here. Per `DESIGN.md § Progress Tracking:
775        // Model-Owned, History-Persisted, Engine-Surfaced`, the model
776        // owns its own plan via `TodoWrite`, the conversation history
777        // persists it (the tool calls are themselves in the message
778        // stream), and the engine surfaces transitions via
779        // `EngineEvent::TodoUpdate`. Re-injecting either was the
780        // anti-pattern this issue removes — every reference project
781        // (claude_code_src / codex / zed / gemini-cli) abstains.
782        // Compaction (`compact.rs`) preserves outstanding tasks and
783        // file paths verbatim, which is the durable defense against
784        // context-window forgetting.
785        let git_line = crate::git::git_context(project_root)
786            .map(|ctx| format!("\n{ctx}"))
787            .unwrap_or_default();
788        let system_prompt_full = format!("{base_system_prompt}{git_line}");
789        let system_message = ChatMessage::text("system", &system_prompt_full);
790
791        // Apply skill-scoped tool filtering: when a skill with `allowed_tools`
792        // is active, only those tools (+ meta-tools) are sent to the LLM.
793        let scoped_tool_defs = skill_scope.filter_tool_defs(tool_defs);
794
795        let active_tool_defs: &[ToolDefinition] = &scoped_tool_defs;
796
797        // Build per-iteration immutable context for helpers
798        let turn = TurnState {
799            db,
800            session_id,
801            system_message: &system_message,
802            pending_images: pending_images.as_deref(),
803            iteration,
804            config,
805            provider,
806            tool_defs: active_tool_defs,
807            sink,
808            cancel: &cancel,
809        };
810
811        // Assemble context (load history, attach images, track usage)
812        let messages = assemble_context(&turn).await?;
813
814        // Pre-flight budget check: if context is critically high, compact first
815        let messages = preflight_compact_if_needed(&turn, messages).await?;
816
817        // Track whether this turn carried image attachments so we can surface
818        // a targeted warning if the provider rejects them.
819        let had_images = turn
820            .pending_images
821            .map(|imgs| !imgs.is_empty())
822            .unwrap_or(false);
823
824        // Stream the response (with rate limit retry)
825        sink.emit(EngineEvent::SpinnerStart {
826            message: "Thinking...".into(),
827        });
828
829        let stream_result = try_with_rate_limit(
830            provider,
831            &messages,
832            active_tool_defs,
833            &config.model_settings,
834            &cancel,
835            sink,
836        )
837        .await;
838
839        // Handle cancellation during rate limit retries
840        let stream_result: Result<SseCollector> = match stream_result {
841            Ok(Some(c)) => Ok(c),
842            Ok(None) => {
843                sink.emit(EngineEvent::SpinnerStop);
844                sink.emit(EngineEvent::Warn {
845                    message: "Interrupted".into(),
846                });
847                return Ok(());
848            }
849            Err(e) => Err(e),
850        };
851
852        // Graceful recovery: if the provider returns a context-overflow error,
853        // compact and retry once before giving up.
854        let SseCollector {
855            mut rx,
856            handle: sse_handle,
857        } = match stream_result {
858            Ok(c) => c,
859            Err(e) if is_context_overflow_error(&e) => {
860                match try_overflow_recovery(&turn, e).await? {
861                    Some((rx, _updated)) => rx,
862                    None => {
863                        sink.emit(EngineEvent::SpinnerStop);
864                        sink.emit(EngineEvent::Warn {
865                            message: "Interrupted".into(),
866                        });
867                        return Ok(());
868                    }
869                }
870            }
871            Err(e) if is_server_error(&e) => {
872                sink.emit(EngineEvent::SpinnerStop);
873                sink.emit(EngineEvent::Warn {
874                    message: format!(
875                        "Provider returned a server error: {e:#}. \
876                         This often means the model can't handle the current \
877                         conversation state. Try a different model or start a new session."
878                    ),
879                });
880                return Ok(());
881            }
882            Err(e) if had_images && is_image_rejection_error(&e) => {
883                sink.emit(EngineEvent::SpinnerStop);
884                sink.emit(EngineEvent::Warn {
885                    message: format!(
886                        "⚠ This model rejected the image attachment — \
887                         it likely does not support vision input. \
888                         Switch to a vision-capable model such as \
889                         claude-sonnet, gemini-flash, or gpt-4o. ({e})"
890                    ),
891                });
892                return Ok(());
893            }
894            Err(e) => {
895                return Err(e).context("LLM inference failed");
896            }
897        };
898
899        // Collect the streamed response
900        let stream_result = collect_stream(&mut rx, sink, &cancel, tools, mode, project_root).await;
901
902        if stream_result.interrupted {
903            // Kill the background HTTP reader immediately so the TCP
904            // connection closes and the server (LM Studio, vLLM, or any single-slot
905            // server) can accept the next request (#825).
906            sse_handle.abort();
907            let has_text = !stream_result.text.is_empty();
908            let has_thinking = !stream_result.thinking_content.is_empty();
909            if has_text || has_thinking {
910                let mid = db
911                    .insert_message(
912                        session_id,
913                        &Role::Assistant,
914                        if has_text {
915                            Some(stream_result.text.as_str())
916                        } else {
917                            None
918                        },
919                        None,
920                        None,
921                        None,
922                    )
923                    .await?;
924                if has_thinking {
925                    db.update_message_thinking_content(mid, &stream_result.thinking_content)
926                        .await?;
927                }
928            }
929            return Ok(());
930        }
931
932        // Network drop: warning already emitted by collect_stream.
933        // Discard the partial response — storing it would corrupt the session.
934        if stream_result.network_error.is_some() {
935            sse_handle.abort();
936            return Ok(());
937        }
938
939        let full_text = stream_result.text;
940        let stream_thinking = stream_result.thinking_content;
941        // Normalize tool names from model output to canonical PascalCase (#548).
942        // Models may emit lowercase or snake_case names ("list", "read_file").
943        // This runs for all providers — the canonical fast-path is a single
944        // HashMap lookup — and must happen here (not in providers) so dispatch,
945        // approval, loop guard, undo, and persistence all see consistent
946        // canonical names.
947        let tool_calls = crate::tool_normalize::normalize_tool_calls(stream_result.tool_calls);
948        let usage = stream_result.usage;
949        let char_count = stream_result.char_count;
950
951        // Empty response after tool use — retry once before giving up.
952        if tool_calls.is_empty()
953            && made_tool_calls
954            && full_text.trim().is_empty()
955            && usage.stop_reason != "max_tokens"
956            && !retried_empty
957        {
958            retried_empty = true;
959            sink.emit(EngineEvent::SpinnerStart {
960                message: "Empty response — retrying...".into(),
961            });
962            continue;
963        }
964
965        // Persist the assistant response
966        let content = if full_text.is_empty() {
967            None
968        } else {
969            Some(full_text.as_str())
970        };
971        let tool_calls_json = if tool_calls.is_empty() {
972            None
973        } else {
974            Some(serde_json::to_string(&tool_calls)?)
975        };
976
977        let msg_id = db
978            .insert_message(
979                session_id,
980                &Role::Assistant,
981                content,
982                tool_calls_json.as_deref(),
983                None,
984                Some(&usage),
985            )
986            .await?;
987
988        // Mark the message as fully delivered. This distinguishes clean
989        // completions from interrupted/in-progress turns on session resume.
990        db.mark_message_complete(msg_id).await?;
991
992        // Persist thinking content produced by Claude extended thinking.
993        // Only set for assistant messages from models with thinking enabled;
994        // all other providers leave this empty and we skip the UPDATE.
995        if !stream_thinking.is_empty() {
996            db.update_message_thinking_content(msg_id, &stream_thinking)
997                .await?;
998        }
999
1000        // If no tool calls, we already streamed the response — done
1001        if tool_calls.is_empty() {
1002            if usage.stop_reason == "max_tokens" {
1003                sink.emit(EngineEvent::Warn {
1004                    message: format!(
1005                        "Model {} hit max_tokens limit — response was truncated. \
1006                         The context may be too large. Try /compact or start a new session.",
1007                        config.model,
1008                    ),
1009                });
1010                continue;
1011            } else if made_tool_calls && full_text.trim().is_empty() {
1012                sink.emit(EngineEvent::Warn {
1013                    message: format!(
1014                        "Model {} produced an empty response after tool use. \
1015                         Try rephrasing, run /compact, or switch models with /model.",
1016                        config.model,
1017                    ),
1018                });
1019            }
1020            // `last_prompt_tokens` (this iteration's prompt size) drives the
1021            // context-window % meter: it reflects current context occupancy.
1022            // `total_prompt_tokens` (cumulative across iterations) drives the
1023            // Footer billing line. Conflating them caused #946 — see below.
1024            let last_prompt_tokens = usage.prompt_tokens;
1025            total_prompt_tokens += usage.prompt_tokens;
1026            total_completion_tokens += usage.completion_tokens;
1027            total_cache_read_tokens += usage.cache_read_tokens;
1028            total_thinking_tokens += usage.thinking_tokens;
1029            total_char_count += char_count;
1030
1031            let display_tokens = if total_completion_tokens > 0 {
1032                total_completion_tokens
1033            } else {
1034                (total_char_count / 4) as i64
1035            };
1036
1037            let total_elapsed = loop_start.elapsed();
1038            let total_secs = total_elapsed.as_secs_f64();
1039            let rate = if total_secs > 0.0 && display_tokens > 0 {
1040                display_tokens as f64 / total_secs
1041            } else {
1042                0.0
1043            };
1044
1045            let context = crate::context::format_footer();
1046
1047            // Correct the heuristic context estimate with the actual
1048            // prompt_tokens reported by the provider.  The estimate emitted
1049            // in assemble_context() was based on chars/3.5, which consistently
1050            // underreports for code, tool results, and JSON payloads.
1051            // This corrective event fires once per completed turn, after all
1052            // tool calls resolve, so the status bar reflects real usage.
1053            //
1054            // We use `last_prompt_tokens` (most recent iteration) rather than
1055            // `total_prompt_tokens` (cumulative across iterations) because the
1056            // meter measures *current context-window occupancy*, not cumulative
1057            // billing. Summing iterations would double-count the shared history
1058            // and let the meter exceed 100% on multi-tool-call turns (#946).
1059            crate::context::update(last_prompt_tokens as usize, config.max_context_tokens);
1060            sink.emit(EngineEvent::ContextUsage {
1061                used: last_prompt_tokens as usize,
1062                max: config.max_context_tokens,
1063            });
1064
1065            sink.emit(EngineEvent::Footer {
1066                prompt_tokens: total_prompt_tokens,
1067                completion_tokens: total_completion_tokens,
1068                cache_read_tokens: total_cache_read_tokens,
1069                thinking_tokens: total_thinking_tokens,
1070                total_chars: total_char_count,
1071                elapsed_ms: total_elapsed.as_millis() as u64,
1072                rate,
1073                context,
1074            });
1075
1076            return Ok(());
1077        }
1078
1079        // Accumulate token usage across iterations.
1080        // (`last_prompt_tokens` is scoped to the no-tool-calls terminating
1081        // branch above — the only reader of it. Each loop iteration creates
1082        // its own binding when that branch runs, so there's nothing to update
1083        // here for the meter.)
1084        total_prompt_tokens += usage.prompt_tokens;
1085        total_completion_tokens += usage.completion_tokens;
1086        total_cache_read_tokens += usage.cache_read_tokens;
1087        total_thinking_tokens += usage.thinking_tokens;
1088        total_char_count += char_count;
1089
1090        made_tool_calls = true;
1091
1092        // Record results from eagerly-executed tools (dispatched during streaming)
1093        let eager_ids: std::collections::HashSet<String> = stream_result
1094            .eager_results
1095            .iter()
1096            .map(|(id, _, _, _)| id.clone())
1097            .collect();
1098
1099        if !eager_ids.is_empty() {
1100            tracing::info!(
1101                "{} tool(s) executed eagerly during streaming",
1102                eager_ids.len()
1103            );
1104            for (tc_id, result, success, full_output) in &stream_result.eager_results {
1105                // Find the matching ToolCall for metadata
1106                if let Some(tc) = tool_calls.iter().find(|tc| tc.id == *tc_id) {
1107                    sink.emit(EngineEvent::ToolCallStart {
1108                        id: tc_id.clone(),
1109                        name: tc.function_name.clone(),
1110                        args: serde_json::from_str(&tc.arguments).unwrap_or_default(),
1111                        is_sub_agent: false,
1112                    });
1113                    crate::tool_dispatch::record_tool_result(
1114                        tc,
1115                        result,
1116                        *success,
1117                        full_output.as_deref(),
1118                        db,
1119                        session_id,
1120                        tools.caps.tool_result_chars,
1121                        project_root,
1122                        file_tracker,
1123                        sink,
1124                    )
1125                    .await?;
1126                }
1127            }
1128        }
1129
1130        // Filter out eagerly-executed tools from the remaining dispatch
1131        let remaining_tools: Vec<ToolCall> = tool_calls
1132            .iter()
1133            .filter(|tc| !eager_ids.contains(&tc.id))
1134            .cloned()
1135            .collect();
1136
1137        // Skill scope enforcement: reject tool calls blocked by the active scope.
1138        // Blocked tools get an error result recorded without execution.
1139        let remaining_tools = if skill_scope.is_active() {
1140            let mut allowed = Vec::with_capacity(remaining_tools.len());
1141            for tc in remaining_tools {
1142                if let Some(err_msg) = skill_scope.check_tool_call(&tc.function_name) {
1143                    let parsed_args: serde_json::Value =
1144                        serde_json::from_str(&tc.arguments).unwrap_or_default();
1145                    sink.emit(EngineEvent::ToolCallStart {
1146                        id: tc.id.clone(),
1147                        name: tc.function_name.clone(),
1148                        args: parsed_args,
1149                        is_sub_agent: false,
1150                    });
1151                    crate::tool_dispatch::record_tool_result(
1152                        &tc,
1153                        &err_msg,
1154                        false,
1155                        None,
1156                        db,
1157                        session_id,
1158                        tools.caps.tool_result_chars,
1159                        project_root,
1160                        file_tracker,
1161                        sink,
1162                    )
1163                    .await?;
1164                } else {
1165                    allowed.push(tc);
1166                }
1167            }
1168            allowed
1169        } else {
1170            remaining_tools
1171        };
1172
1173        // Execute remaining tool calls — parallelize when possible.
1174        // #1022 B13: pass `file_tracker` so the parallelizability check
1175        // sees the same Koda-owned-file downgrade the sequential path
1176        // sees. Without it, batches containing `Delete owned.tmp` are
1177        // spuriously refused parallelization (perf regression).
1178        if remaining_tools.len() > 1
1179            && can_parallelize(&remaining_tools, mode, project_root, Some(file_tracker))
1180        {
1181            execute_tools_parallel(
1182                &remaining_tools,
1183                project_root,
1184                config,
1185                db,
1186                session_id,
1187                tools,
1188                mode,
1189                sink,
1190                cancel.clone(),
1191                sub_agent_cache,
1192                file_tracker,
1193                bg_agents,
1194                // Phase E of #996: top-level inference has no spawner
1195                // identity — bg-task tools see the global scope.
1196                None,
1197            )
1198            .await?;
1199        } else if remaining_tools.len() > 1 {
1200            execute_tools_split_batch(
1201                &remaining_tools,
1202                project_root,
1203                config,
1204                db,
1205                session_id,
1206                tools,
1207                mode,
1208                sink,
1209                cancel.clone(),
1210                cmd_rx,
1211                sub_agent_cache,
1212                file_tracker,
1213                bg_agents,
1214                None,
1215            )
1216            .await?;
1217        } else if !remaining_tools.is_empty() {
1218            execute_tools_sequential(
1219                &remaining_tools,
1220                project_root,
1221                config,
1222                db,
1223                session_id,
1224                tools,
1225                mode,
1226                sink,
1227                cancel.clone(),
1228                cmd_rx,
1229                sub_agent_cache,
1230                file_tracker,
1231                bg_agents,
1232                None,
1233            )
1234            .await?;
1235        }
1236
1237        // Update skill scope: if any ActivateSkill call was made, check whether
1238        // the newly activated skill has allowed_tools.
1239        {
1240            let scope_calls: Vec<(String, serde_json::Value)> = tool_calls
1241                .iter()
1242                .map(|tc| {
1243                    let args: serde_json::Value =
1244                        serde_json::from_str(&tc.arguments).unwrap_or_default();
1245                    (tc.function_name.clone(), args)
1246                })
1247                .collect();
1248            let was_active = skill_scope.is_active();
1249            skill_scope.update_from_tool_calls(&scope_calls, &tools.skill_registry);
1250            // Log scope transitions
1251            match (was_active, skill_scope.is_active()) {
1252                (false, true) => {
1253                    sink.emit(EngineEvent::Info {
1254                        message: "\u{1f512} Skill tool scope activated — tool set restricted"
1255                            .into(),
1256                    });
1257                }
1258                (true, false) => {
1259                    sink.emit(EngineEvent::Info {
1260                        message: "\u{1f513} Skill tool scope cleared — full tool set restored"
1261                            .into(),
1262                    });
1263                }
1264                _ => {}
1265            }
1266        }
1267
1268        // Loop detection: consecutive identical tool calls → feedback or stop.
1269        // Modeled after Gemini CLI: first detection injects a nudge message,
1270        // second detection (model ignored feedback) hard-stops.
1271        match loop_detector.record(&tool_calls) {
1272            LoopAction::Ok => {}
1273            LoopAction::InjectFeedback(detail) => {
1274                tracing::warn!(%detail, "Loop detected — injecting feedback");
1275                sink.emit(EngineEvent::Warn {
1276                    message: format!(
1277                        "Loop detected: {detail}. Injecting feedback to nudge the model."
1278                    ),
1279                });
1280                // Inject a system-style user message to redirect the model.
1281                db.insert_message(
1282                    session_id,
1283                    &Role::User,
1284                    Some(&format!(
1285                        "System: Potential loop detected — {detail}. \
1286                         Please take a step back and confirm you're making forward progress. \
1287                         If not, analyze your previous actions and try a different approach. \
1288                         Avoid repeating the same tool calls without new results."
1289                    )),
1290                    None,
1291                    None,
1292                    None,
1293                )
1294                .await?;
1295                loop_detector.clear_after_feedback();
1296                // Continue the loop — give the model a chance to recover
1297            }
1298            LoopAction::HardStop(detail) => {
1299                sink.emit(EngineEvent::Warn {
1300                    message: format!(
1301                        "Loop guard: {detail} — model ignored feedback, stopping. \
1302                         Send a follow-up message to continue."
1303                    ),
1304                });
1305                break Ok(());
1306            }
1307        }
1308
1309        iteration += 1;
1310    }
1311}
1312
1313// ---------------------------------------------------------------------------
1314// Tests
1315// ---------------------------------------------------------------------------
1316
1317#[cfg(test)]
1318mod tests {
1319    use super::*;
1320    use crate::engine::sink::TestSink;
1321    use crate::providers::{StreamChunk, TokenUsage, ToolCall};
1322    use crate::trust::TrustMode;
1323    use tokio::sync::mpsc;
1324
1325    /// Helper: create a ToolRegistry backed by a temp directory.
1326    fn test_tools(root: &Path) -> ToolRegistry {
1327        ToolRegistry::new(root.to_path_buf(), 100_000)
1328    }
1329
1330    /// Helper: send chunks into a channel and collect_stream them.
1331    async fn run_collect(
1332        chunks: Vec<StreamChunk>,
1333        cancel: Option<CancellationToken>,
1334    ) -> StreamResult {
1335        let (tx, mut rx) = mpsc::channel(32);
1336        let sink = TestSink::new();
1337        let cancel = cancel.unwrap_or_default();
1338        let tmp = tempfile::tempdir().unwrap();
1339        let tools = test_tools(tmp.path());
1340
1341        // Send all chunks in a background task.
1342        tokio::spawn(async move {
1343            for chunk in chunks {
1344                let _ = tx.send(chunk).await;
1345            }
1346            // tx drops here → stream ends
1347        });
1348
1349        collect_stream(&mut rx, &sink, &cancel, &tools, TrustMode::Auto, tmp.path()).await
1350    }
1351
1352    // ── Text streaming ───────────────────────────────────────────
1353
1354    #[tokio::test]
1355    async fn collect_stream_accumulates_text_deltas() {
1356        let result = run_collect(
1357            vec![
1358                StreamChunk::TextDelta("Hello ".into()),
1359                StreamChunk::TextDelta("world!".into()),
1360                StreamChunk::Done(TokenUsage::default()),
1361            ],
1362            None,
1363        )
1364        .await;
1365
1366        assert_eq!(result.text, "Hello world!");
1367        assert!(!result.interrupted);
1368        assert!(result.network_error.is_none());
1369        assert!(result.tool_calls.is_empty());
1370        assert_eq!(result.char_count, 12);
1371    }
1372
1373    #[tokio::test]
1374    async fn collect_stream_empty_stream_returns_empty() {
1375        let result = run_collect(vec![StreamChunk::Done(TokenUsage::default())], None).await;
1376
1377        assert!(result.text.is_empty());
1378        assert!(!result.interrupted);
1379        assert!(result.tool_calls.is_empty());
1380    }
1381
1382    #[tokio::test]
1383    async fn collect_stream_preserves_usage_from_done() {
1384        let usage = TokenUsage {
1385            prompt_tokens: 42,
1386            completion_tokens: 17,
1387            stop_reason: "end_turn".into(),
1388            ..Default::default()
1389        };
1390        let result = run_collect(
1391            vec![
1392                StreamChunk::TextDelta("hi".into()),
1393                StreamChunk::Done(usage),
1394            ],
1395            None,
1396        )
1397        .await;
1398
1399        assert_eq!(result.usage.prompt_tokens, 42);
1400        assert_eq!(result.usage.completion_tokens, 17);
1401        assert_eq!(result.usage.stop_reason, "end_turn");
1402    }
1403
1404    // ── Thinking blocks ──────────────────────────────────────────
1405
1406    #[tokio::test]
1407    async fn collect_stream_thinking_then_text() {
1408        let result = run_collect(
1409            vec![
1410                StreamChunk::ThinkingDelta("Let me think...".into()),
1411                StreamChunk::TextDelta("Answer!".into()),
1412                StreamChunk::Done(TokenUsage::default()),
1413            ],
1414            None,
1415        )
1416        .await;
1417
1418        // Thinking content must be captured in the dedicated field.
1419        assert_eq!(result.thinking_content, "Let me think...");
1420        // Thinking deltas should NOT appear in the text output.
1421        assert_eq!(result.text, "Answer!");
1422    }
1423
1424    // ── Tool calls ───────────────────────────────────────────────
1425
1426    #[tokio::test]
1427    async fn collect_stream_tool_calls_batch() {
1428        let tc = ToolCall {
1429            id: "tc_1".into(),
1430            function_name: "Bash".into(),
1431            arguments: r#"{"command":"echo hi"}"#.into(),
1432            thought_signature: None,
1433        };
1434        let result = run_collect(
1435            vec![
1436                StreamChunk::ToolCalls(vec![tc]),
1437                StreamChunk::Done(TokenUsage::default()),
1438            ],
1439            None,
1440        )
1441        .await;
1442
1443        assert_eq!(result.tool_calls.len(), 1);
1444        assert_eq!(result.tool_calls[0].function_name, "Bash");
1445        assert!(result.text.is_empty());
1446    }
1447
1448    #[tokio::test]
1449    async fn collect_stream_eager_executes_read_only_tool() {
1450        // Read is read-only + auto-approved → should be eagerly executed.
1451        let tmp = tempfile::tempdir().unwrap();
1452        let test_file = tmp.path().join("hello.txt");
1453        std::fs::write(&test_file, "file content").unwrap();
1454
1455        let tc = ToolCall {
1456            id: "tc_eager".into(),
1457            function_name: "Read".into(),
1458            arguments: serde_json::json!({"file_path": test_file.to_string_lossy()}).to_string(),
1459            thought_signature: None,
1460        };
1461
1462        let (tx, mut rx) = mpsc::channel(32);
1463        let sink = TestSink::new();
1464        let cancel = CancellationToken::new();
1465        let tools = test_tools(tmp.path());
1466
1467        tokio::spawn(async move {
1468            let _ = tx.send(StreamChunk::ToolCallReady(tc)).await;
1469            let _ = tx.send(StreamChunk::ToolCalls(vec![])).await;
1470            let _ = tx.send(StreamChunk::Done(TokenUsage::default())).await;
1471        });
1472
1473        let result =
1474            collect_stream(&mut rx, &sink, &cancel, &tools, TrustMode::Auto, tmp.path()).await;
1475
1476        assert_eq!(result.tool_calls.len(), 1, "tool call should be recorded");
1477        assert_eq!(result.eager_results.len(), 1, "should have 1 eager result");
1478        let (id, output, success, _) = &result.eager_results[0];
1479        assert_eq!(id, "tc_eager");
1480        assert!(output.contains("file content"), "eager result: {output}");
1481        assert!(success);
1482    }
1483
1484    #[tokio::test]
1485    async fn collect_stream_does_not_eagerly_execute_mutating_tool() {
1486        // Write is mutating → should NOT be eagerly executed.
1487        let tc = ToolCall {
1488            id: "tc_write".into(),
1489            function_name: "Write".into(),
1490            arguments: r#"{"file_path":"/tmp/x","content":"y"}"#.into(),
1491            thought_signature: None,
1492        };
1493        let result = run_collect(
1494            vec![
1495                StreamChunk::ToolCallReady(tc),
1496                StreamChunk::ToolCalls(vec![]),
1497                StreamChunk::Done(TokenUsage::default()),
1498            ],
1499            None,
1500        )
1501        .await;
1502
1503        assert_eq!(result.tool_calls.len(), 1);
1504        assert!(
1505            result.eager_results.is_empty(),
1506            "Write should NOT be eagerly executed"
1507        );
1508    }
1509
1510    // ── Cancellation ─────────────────────────────────────────────
1511
1512    #[tokio::test]
1513    async fn collect_stream_cancellation_sets_interrupted() {
1514        let cancel = CancellationToken::new();
1515        let cancel_clone = cancel.clone();
1516
1517        let (tx, mut rx) = mpsc::channel(32);
1518        let sink = TestSink::new();
1519        let tmp = tempfile::tempdir().unwrap();
1520        let tools = test_tools(tmp.path());
1521
1522        // Send one delta, then cancel, then try to send more.
1523        tokio::spawn(async move {
1524            let _ = tx.send(StreamChunk::TextDelta("partial".into())).await;
1525            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1526            cancel_clone.cancel();
1527            // This should be ignored after cancel:
1528            tokio::time::sleep(std::time::Duration::from_millis(50)).await;
1529            let _ = tx.send(StreamChunk::TextDelta(" ignored".into())).await;
1530        });
1531
1532        let result =
1533            collect_stream(&mut rx, &sink, &cancel, &tools, TrustMode::Auto, tmp.path()).await;
1534
1535        assert!(result.interrupted);
1536        assert!(result.network_error.is_none());
1537        // Partial text should be captured up to cancellation.
1538        assert!(result.text.contains("partial"));
1539    }
1540
1541    // ── Network errors ───────────────────────────────────────────
1542
1543    #[tokio::test]
1544    async fn collect_stream_network_error_preserves_partial() {
1545        let result = run_collect(
1546            vec![
1547                StreamChunk::TextDelta("partial response".into()),
1548                StreamChunk::NetworkError("connection reset".into()),
1549            ],
1550            None,
1551        )
1552        .await;
1553
1554        assert!(!result.interrupted);
1555        assert_eq!(result.network_error.as_deref(), Some("connection reset"));
1556        assert_eq!(result.text, "partial response");
1557    }
1558
1559    #[tokio::test]
1560    async fn collect_stream_network_error_with_no_text() {
1561        let result = run_collect(vec![StreamChunk::NetworkError("timeout".into())], None).await;
1562
1563        assert!(result.text.is_empty());
1564        assert!(result.network_error.is_some());
1565    }
1566}